std.event.Loop: use EPOLLONESHOT to save 1 syscall
when a thread pool worker accepts a coroutine to resume
This commit is contained in:
parent
eb326e1553
commit
57f36c4201
@ -132,6 +132,7 @@ pub const Loop = struct {
|
|||||||
|
|
||||||
pub const EventFd = struct {
|
pub const EventFd = struct {
|
||||||
base: ResumeNode,
|
base: ResumeNode,
|
||||||
|
epoll_op: u32,
|
||||||
eventfd: i32,
|
eventfd: i32,
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
@ -204,6 +205,7 @@ pub const Loop = struct {
|
|||||||
.handle = undefined,
|
.handle = undefined,
|
||||||
},
|
},
|
||||||
.eventfd = try std.os.linuxEventFd(1, posix.EFD_CLOEXEC | posix.EFD_NONBLOCK),
|
.eventfd = try std.os.linuxEventFd(1, posix.EFD_CLOEXEC | posix.EFD_NONBLOCK),
|
||||||
|
.epoll_op = posix.EPOLL_CTL_ADD,
|
||||||
},
|
},
|
||||||
.next = undefined,
|
.next = undefined,
|
||||||
};
|
};
|
||||||
@ -265,15 +267,20 @@ pub const Loop = struct {
|
|||||||
errdefer {
|
errdefer {
|
||||||
_ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
|
_ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
|
||||||
}
|
}
|
||||||
try self.addFdNoCounter(fd, resume_node);
|
try self.modFd(
|
||||||
|
fd,
|
||||||
|
posix.EPOLL_CTL_ADD,
|
||||||
|
std.os.linux.EPOLLIN | std.os.linux.EPOLLOUT | std.os.linux.EPOLLET,
|
||||||
|
resume_node,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn addFdNoCounter(self: *Loop, fd: i32, resume_node: *ResumeNode) !void {
|
pub fn modFd(self: *Loop, fd: i32, op: u32, events: u32, resume_node: *ResumeNode) !void {
|
||||||
var ev = std.os.linux.epoll_event{
|
var ev = std.os.linux.epoll_event{
|
||||||
.events = std.os.linux.EPOLLIN | std.os.linux.EPOLLOUT | std.os.linux.EPOLLET,
|
.events = events,
|
||||||
.data = std.os.linux.epoll_data{ .ptr = @ptrToInt(resume_node) },
|
.data = std.os.linux.epoll_data{ .ptr = @ptrToInt(resume_node) },
|
||||||
};
|
};
|
||||||
try std.os.linuxEpollCtl(self.os_data.epollfd, std.os.linux.EPOLL_CTL_ADD, fd, &ev);
|
try std.os.linuxEpollCtl(self.os_data.epollfd, op, fd, &ev);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn removeFd(self: *Loop, fd: i32) void {
|
pub fn removeFd(self: *Loop, fd: i32) void {
|
||||||
@ -331,7 +338,8 @@ pub const Loop = struct {
|
|||||||
const eventfd_node = &resume_stack_node.data;
|
const eventfd_node = &resume_stack_node.data;
|
||||||
eventfd_node.base.handle = handle;
|
eventfd_node.base.handle = handle;
|
||||||
// the pending count is already accounted for
|
// the pending count is already accounted for
|
||||||
self.addFdNoCounter(eventfd_node.eventfd, &eventfd_node.base) catch |_| {
|
const epoll_events = posix.EPOLLONESHOT | std.os.linux.EPOLLIN | std.os.linux.EPOLLOUT | std.os.linux.EPOLLET;
|
||||||
|
self.modFd(eventfd_node.eventfd, eventfd_node.epoll_op, epoll_events, &eventfd_node.base) catch |_| {
|
||||||
// fine, we didn't need it anyway
|
// fine, we didn't need it anyway
|
||||||
_ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
|
_ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
|
||||||
self.os_data.available_eventfd_resume_nodes.push(resume_stack_node);
|
self.os_data.available_eventfd_resume_nodes.push(resume_stack_node);
|
||||||
@ -371,7 +379,7 @@ pub const Loop = struct {
|
|||||||
ResumeNode.Id.Stop => return,
|
ResumeNode.Id.Stop => return,
|
||||||
ResumeNode.Id.EventFd => {
|
ResumeNode.Id.EventFd => {
|
||||||
const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node);
|
const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node);
|
||||||
self.removeFdNoCounter(event_fd_node.eventfd);
|
event_fd_node.epoll_op = posix.EPOLL_CTL_MOD;
|
||||||
const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node);
|
const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node);
|
||||||
self.os_data.available_eventfd_resume_nodes.push(stack_node);
|
self.os_data.available_eventfd_resume_nodes.push(stack_node);
|
||||||
},
|
},
|
||||||
@ -902,7 +910,7 @@ test "std.event.Lock" {
|
|||||||
defer cancel handle;
|
defer cancel handle;
|
||||||
loop.run();
|
loop.run();
|
||||||
|
|
||||||
assert(mem.eql(i32, shared_test_data, [1]i32{3 * 10} ** 10));
|
assert(mem.eql(i32, shared_test_data, [1]i32{3 * @intCast(i32, shared_test_data.len)} ** shared_test_data.len));
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn testLock(loop: *Loop, lock: *Lock) void {
|
async fn testLock(loop: *Loop, lock: *Lock) void {
|
||||||
@ -945,7 +953,7 @@ async fn lockRunner(lock: *Lock) void {
|
|||||||
suspend; // resumed by onNextTick
|
suspend; // resumed by onNextTick
|
||||||
|
|
||||||
var i: usize = 0;
|
var i: usize = 0;
|
||||||
while (i < 10) : (i += 1) {
|
while (i < shared_test_data.len) : (i += 1) {
|
||||||
const handle = await (async lock.acquire() catch @panic("out of memory"));
|
const handle = await (async lock.acquire() catch @panic("out of memory"));
|
||||||
defer handle.release();
|
defer handle.release();
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user