diff --git a/std/c/index.zig b/std/c/index.zig index 7de8634d0..871cf625b 100644 --- a/std/c/index.zig +++ b/std/c/index.zig @@ -21,8 +21,10 @@ pub extern "c" fn lseek(fd: c_int, offset: isize, whence: c_int) isize; pub extern "c" fn open(path: [*]const u8, oflag: c_int, ...) c_int; pub extern "c" fn raise(sig: c_int) c_int; pub extern "c" fn read(fd: c_int, buf: *c_void, nbyte: usize) isize; +pub extern "c" fn pread(fd: c_int, buf: *c_void, nbyte: usize, offset: u64) isize; pub extern "c" fn stat(noalias path: [*]const u8, noalias buf: *Stat) c_int; pub extern "c" fn write(fd: c_int, buf: *const c_void, nbyte: usize) isize; +pub extern "c" fn pwrite(fd: c_int, buf: *const c_void, nbyte: usize, offset: u64) isize; pub extern "c" fn mmap(addr: ?*c_void, len: usize, prot: c_int, flags: c_int, fd: c_int, offset: isize) ?*c_void; pub extern "c" fn munmap(addr: *c_void, len: usize) c_int; pub extern "c" fn unlink(path: [*]const u8) c_int; diff --git a/std/event/fs.zig b/std/event/fs.zig index 9b2a447c8..1f810c484 100644 --- a/std/event/fs.zig +++ b/std/event/fs.zig @@ -27,7 +27,7 @@ pub const Request = struct { pub const PWriteV = struct { fd: os.FileHandle, - iov: []os.linux.iovec_const, + iov: []os.posix.iovec_const, offset: usize, result: Error!void, @@ -36,7 +36,7 @@ pub const Request = struct { pub const PReadV = struct { fd: os.FileHandle, - iov: []os.linux.iovec, + iov: []os.posix.iovec, offset: usize, result: Error!usize, @@ -83,11 +83,11 @@ pub async fn pwritev(loop: *event.Loop, fd: os.FileHandle, offset: usize, data: resume @handle(); } - const iovecs = try loop.allocator.alloc(os.linux.iovec_const, data.len); + const iovecs = try loop.allocator.alloc(os.posix.iovec_const, data.len); defer loop.allocator.free(iovecs); for (data) |buf, i| { - iovecs[i] = os.linux.iovec_const{ + iovecs[i] = os.posix.iovec_const{ .iov_base = buf.ptr, .iov_len = buf.len, }; @@ -116,7 +116,7 @@ pub async fn pwritev(loop: *event.Loop, fd: os.FileHandle, offset: usize, data: }; suspend { - loop.linuxFsRequest(&req_node); + loop.posixFsRequest(&req_node); } return req_node.data.msg.PWriteV.result; @@ -132,11 +132,11 @@ pub async fn preadv(loop: *event.Loop, fd: os.FileHandle, offset: usize, data: [ resume @handle(); } - const iovecs = try loop.allocator.alloc(os.linux.iovec, data.len); + const iovecs = try loop.allocator.alloc(os.posix.iovec, data.len); defer loop.allocator.free(iovecs); for (data) |buf, i| { - iovecs[i] = os.linux.iovec{ + iovecs[i] = os.posix.iovec{ .iov_base = buf.ptr, .iov_len = buf.len, }; @@ -165,7 +165,7 @@ pub async fn preadv(loop: *event.Loop, fd: os.FileHandle, offset: usize, data: [ }; suspend { - loop.linuxFsRequest(&req_node); + loop.posixFsRequest(&req_node); } return req_node.data.msg.PReadV.result; @@ -201,7 +201,7 @@ pub async fn openRead(loop: *event.Loop, path: []const u8) os.File.OpenError!os. }; suspend { - loop.linuxFsRequest(&req_node); + loop.posixFsRequest(&req_node); } return req_node.data.msg.OpenRead.result; @@ -243,7 +243,7 @@ pub async fn openReadWrite( }; suspend { - loop.linuxFsRequest(&req_node); + loop.posixFsRequest(&req_node); } return req_node.data.msg.OpenRW.result; @@ -280,7 +280,7 @@ pub const CloseOperation = struct { /// Defer this after creating. pub fn deinit(self: *CloseOperation) void { if (self.have_fd) { - self.loop.linuxFsRequest(&self.close_req_node); + self.loop.posixFsRequest(&self.close_req_node); } else { self.loop.allocator.destroy(self); } @@ -330,7 +330,7 @@ pub async fn writeFileMode(loop: *event.Loop, path: []const u8, contents: []cons }; suspend { - loop.linuxFsRequest(&req_node); + loop.posixFsRequest(&req_node); } return req_node.data.msg.WriteFile.result; diff --git a/std/event/loop.zig b/std/event/loop.zig index c917c274e..78191e60d 100644 --- a/std/event/loop.zig +++ b/std/event/loop.zig @@ -127,11 +127,6 @@ pub const Loop = struct { .finish = fs.Request.Finish.NoAction, }, }; - self.os_data.fs_thread = try os.spawnThread(self, linuxFsRun); - errdefer { - self.linuxFsRequest(&self.os_data.fs_end_request); - self.os_data.fs_thread.wait(); - } errdefer { while (self.available_eventfd_resume_nodes.pop()) |node| os.close(node.data.eventfd); @@ -168,6 +163,12 @@ pub const Loop = struct { &self.os_data.final_eventfd_event, ); + self.os_data.fs_thread = try os.spawnThread(self, posixFsRun); + errdefer { + self.posixFsRequest(&self.os_data.fs_end_request); + self.os_data.fs_thread.wait(); + } + var extra_thread_index: usize = 0; errdefer { // writing 8 bytes to an eventfd cannot fail @@ -185,10 +186,25 @@ pub const Loop = struct { self.os_data.kqfd = try os.bsdKQueue(); errdefer os.close(self.os_data.kqfd); + self.os_data.fs_kqfd = try os.bsdKQueue(); + errdefer os.close(self.os_data.fs_kqfd); + + self.os_data.fs_queue = std.atomic.Queue(fs.Request).init(); + // we need another thread for the file system because Darwin does not have an async + // file system I/O API. + self.os_data.fs_end_request = fs.RequestNode{ + .prev = undefined, + .next = undefined, + .data = fs.Request{ + .msg = fs.Request.Msg.End, + .finish = fs.Request.Finish.NoAction, + }, + }; + self.os_data.kevents = try self.allocator.alloc(posix.Kevent, extra_thread_count); errdefer self.allocator.free(self.os_data.kevents); - const eventlist = ([*]posix.Kevent)(undefined)[0..0]; + const empty_kevs = ([*]posix.Kevent)(undefined)[0..0]; for (self.eventfd_resume_nodes) |*eventfd_node, i| { eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{ @@ -207,12 +223,11 @@ pub const Loop = struct { .udata = @ptrToInt(&eventfd_node.data.base), }, }, - .prev = undefined, .next = undefined, }; self.available_eventfd_resume_nodes.push(eventfd_node); const kevent_array = (*[1]posix.Kevent)(&eventfd_node.data.kevent); - _ = try os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null); + _ = try os.bsdKEvent(self.os_data.kqfd, kevent_array, empty_kevs, null); eventfd_node.data.kevent.flags = posix.EV_CLEAR | posix.EV_ENABLE; eventfd_node.data.kevent.fflags = posix.NOTE_TRIGGER; // this one is for waiting for events @@ -236,14 +251,38 @@ pub const Loop = struct { .data = 0, .udata = @ptrToInt(&self.final_resume_node), }; - const kevent_array = (*[1]posix.Kevent)(&self.os_data.final_kevent); - _ = try os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null); + const final_kev_arr = (*[1]posix.Kevent)(&self.os_data.final_kevent); + _ = try os.bsdKEvent(self.os_data.kqfd, final_kev_arr, empty_kevs, null); self.os_data.final_kevent.flags = posix.EV_ENABLE; self.os_data.final_kevent.fflags = posix.NOTE_TRIGGER; + self.os_data.fs_kevent_wake = posix.Kevent{ + .ident = extra_thread_count + 1, + .filter = posix.EVFILT_USER, + .flags = posix.EV_ADD, + .fflags = posix.NOTE_TRIGGER, + .data = 0, + .udata = undefined, + }; + + self.os_data.fs_kevent_wait = posix.Kevent{ + .ident = extra_thread_count + 1, + .filter = posix.EVFILT_USER, + .flags = posix.EV_ADD|posix.EV_CLEAR, + .fflags = 0, + .data = 0, + .udata = undefined, + }; + + self.os_data.fs_thread = try os.spawnThread(self, posixFsRun); + errdefer { + self.posixFsRequest(&self.os_data.fs_end_request); + self.os_data.fs_thread.wait(); + } + var extra_thread_index: usize = 0; errdefer { - _ = os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null) catch unreachable; + _ = os.bsdKEvent(self.os_data.kqfd, final_kev_arr, empty_kevs, null) catch unreachable; while (extra_thread_index != 0) { extra_thread_index -= 1; self.extra_threads[extra_thread_index].wait(); @@ -312,6 +351,7 @@ pub const Loop = struct { builtin.Os.macosx => { self.allocator.free(self.os_data.kevents); os.close(self.os_data.kqfd); + os.close(self.os_data.fs_kqfd); }, builtin.Os.windows => { os.close(self.os_data.io_port); @@ -375,8 +415,8 @@ pub const Loop = struct { switch (builtin.os) { builtin.Os.macosx => { const kevent_array = (*[1]posix.Kevent)(&eventfd_node.kevent); - const eventlist = ([*]posix.Kevent)(undefined)[0..0]; - _ = os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null) catch { + const empty_kevs = ([*]posix.Kevent)(undefined)[0..0]; + _ = os.bsdKEvent(self.os_data.kqfd, kevent_array, empty_kevs, null) catch { self.next_tick_queue.unget(next_tick_node); self.available_eventfd_resume_nodes.push(resume_stack_node); return; @@ -493,16 +533,17 @@ pub const Loop = struct { // cause all the threads to stop switch (builtin.os) { builtin.Os.linux => { - self.linuxFsRequest(&self.os_data.fs_end_request); + self.posixFsRequest(&self.os_data.fs_end_request); // writing 8 bytes to an eventfd cannot fail os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable; return; }, builtin.Os.macosx => { + self.posixFsRequest(&self.os_data.fs_end_request); const final_kevent = (*[1]posix.Kevent)(&self.os_data.final_kevent); - const eventlist = ([*]posix.Kevent)(undefined)[0..0]; + const empty_kevs = ([*]posix.Kevent)(undefined)[0..0]; // cannot fail because we already added it and this just enables it - _ = os.bsdKEvent(self.os_data.kqfd, final_kevent, eventlist, null) catch unreachable; + _ = os.bsdKEvent(self.os_data.kqfd, final_kevent, empty_kevs, null) catch unreachable; return; }, builtin.Os.windows => { @@ -576,6 +617,7 @@ pub const Loop = struct { self.finishOneEvent(); } } + break; }, builtin.Os.windows => { var completion_key: usize = undefined; @@ -610,19 +652,29 @@ pub const Loop = struct { } } - fn linuxFsRequest(self: *Loop, request_node: *fs.RequestNode) void { - self.beginOneEvent(); // finished in linuxFsRun after processing the msg + fn posixFsRequest(self: *Loop, request_node: *fs.RequestNode) void { + self.beginOneEvent(); // finished in posixFsRun after processing the msg self.os_data.fs_queue.put(request_node); - _ = @atomicRmw(i32, &self.os_data.fs_queue_len, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); // let this wrap - const rc = os.linux.futex_wake(@ptrToInt(&self.os_data.fs_queue_len), os.linux.FUTEX_WAKE, 1); - switch (os.linux.getErrno(rc)) { - 0 => {}, - posix.EINVAL => unreachable, - else => unreachable, + switch (builtin.os) { + builtin.Os.macosx => { + const fs_kevs = (*[1]posix.Kevent)(&self.os_data.fs_kevent_wake); + const empty_kevs = ([*]posix.Kevent)(undefined)[0..0]; + _ = os.bsdKEvent(self.os_data.fs_kqfd, fs_kevs, empty_kevs, null) catch unreachable; + }, + builtin.Os.linux => { + _ = @atomicRmw(i32, &self.os_data.fs_queue_len, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); // let this wrap + const rc = os.linux.futex_wake(@ptrToInt(&self.os_data.fs_queue_len), os.linux.FUTEX_WAKE, 1); + switch (os.linux.getErrno(rc)) { + 0 => {}, + posix.EINVAL => unreachable, + else => unreachable, + } + }, + else => @compileError("Unsupported OS"), } } - fn linuxFsRun(self: *Loop) void { + fn posixFsRun(self: *Loop) void { var processed_count: i32 = 0; // we let this wrap while (true) { while (self.os_data.fs_queue.get()) |node| { @@ -664,12 +716,22 @@ pub const Loop = struct { } self.finishOneEvent(); } - const rc = os.linux.futex_wait(@ptrToInt(&self.os_data.fs_queue_len), os.linux.FUTEX_WAIT, processed_count, null); - switch (os.linux.getErrno(rc)) { - 0 => continue, - posix.EINTR => continue, - posix.EAGAIN => continue, - else => unreachable, + switch (builtin.os) { + builtin.Os.linux => { + const rc = os.linux.futex_wait(@ptrToInt(&self.os_data.fs_queue_len), os.linux.FUTEX_WAIT, processed_count, null); + switch (os.linux.getErrno(rc)) { + 0 => continue, + posix.EINTR => continue, + posix.EAGAIN => continue, + else => unreachable, + } + }, + builtin.Os.macosx => { + const fs_kevs = (*[1]posix.Kevent)(&self.os_data.fs_kevent_wait); + var out_kevs: [1]posix.Kevent = undefined; + _ = os.bsdKEvent(self.os_data.fs_kqfd, fs_kevs, out_kevs[0..], null) catch unreachable; + }, + else => @compileError("Unsupported OS"), } } } @@ -696,6 +758,12 @@ pub const Loop = struct { kqfd: i32, final_kevent: posix.Kevent, kevents: []posix.Kevent, + fs_kevent_wake: posix.Kevent, + fs_kevent_wait: posix.Kevent, + fs_thread: *os.Thread, + fs_kqfd: i32, + fs_queue: std.atomic.Queue(fs.Request), + fs_end_request: fs.RequestNode, }; }; diff --git a/std/os/darwin.zig b/std/os/darwin.zig index c5c892b8a..935d28d6f 100644 --- a/std/os/darwin.zig +++ b/std/os/darwin.zig @@ -646,6 +646,10 @@ pub fn read(fd: i32, buf: [*]u8, nbyte: usize) usize { return errnoWrap(c.read(fd, @ptrCast(*c_void, buf), nbyte)); } +pub fn pread(fd: i32, buf: [*]u8, nbyte: usize, offset: u64) usize { + return errnoWrap(c.pread(fd, @ptrCast(*c_void, buf), nbyte, offset)); +} + pub fn stat(noalias path: [*]const u8, noalias buf: *stat) usize { return errnoWrap(c.stat(path, buf)); } @@ -654,6 +658,10 @@ pub fn write(fd: i32, buf: [*]const u8, nbyte: usize) usize { return errnoWrap(c.write(fd, @ptrCast(*const c_void, buf), nbyte)); } +pub fn pwrite(fd: i32, buf: [*]const u8, nbyte: usize, offset: u64) usize { + return errnoWrap(c.pwrite(fd, @ptrCast(*const c_void, buf), nbyte, offset)); +} + pub fn mmap(address: ?[*]u8, length: usize, prot: usize, flags: u32, fd: i32, offset: isize) usize { const ptr_result = c.mmap( @ptrCast(*c_void, address), diff --git a/std/os/index.zig b/std/os/index.zig index 0205f3f0a..cc3d060ae 100644 --- a/std/os/index.zig +++ b/std/os/index.zig @@ -246,23 +246,64 @@ pub fn posixRead(fd: i32, buf: []u8) !void { } } +/// Number of bytes read is returned. Upon reading end-of-file, zero is returned. pub fn posix_preadv(fd: i32, iov: [*]const posix.iovec, count: usize, offset: u64) !usize { - while (true) { - const rc = posix.preadv(fd, iov, count, offset); - const err = posix.getErrno(rc); - switch (err) { - 0 => return rc, - posix.EINTR => continue, - posix.EINVAL => unreachable, - posix.EFAULT => unreachable, - posix.EAGAIN => return error.WouldBlock, - posix.EBADF => return error.FileClosed, - posix.EIO => return error.InputOutput, - posix.EISDIR => return error.IsDir, - posix.ENOBUFS => return error.SystemResources, - posix.ENOMEM => return error.SystemResources, - else => return unexpectedErrorPosix(err), - } + switch (builtin.os) { + builtin.Os.macosx => { + // Darwin does not have preadv but it does have pread. + var off: usize = 0; + var iov_i: usize = 0; + var inner_off: usize = 0; + while (true) { + const v = iov[iov_i]; + const rc = darwin.pread(fd, v.iov_base + inner_off, v.iov_len - inner_off, offset + off); + const err = darwin.getErrno(rc); + switch (err) { + 0 => { + off += rc; + inner_off += rc; + if (inner_off == v.iov_len) { + iov_i += 1; + inner_off = 0; + if (iov_i == count) { + return off; + } + } + if (rc == 0) return off; // EOF + continue; + }, + posix.EINTR => continue, + posix.EINVAL => unreachable, + posix.EFAULT => unreachable, + posix.ESPIPE => unreachable, // fd is not seekable + posix.EAGAIN => return error.WouldBlock, + posix.EBADF => return error.FileClosed, + posix.EIO => return error.InputOutput, + posix.EISDIR => return error.IsDir, + posix.ENOBUFS => return error.SystemResources, + posix.ENOMEM => return error.SystemResources, + else => return unexpectedErrorPosix(err), + } + } + }, + builtin.Os.linux, builtin.Os.freebsd => while (true) { + const rc = posix.preadv(fd, iov, count, offset); + const err = posix.getErrno(rc); + switch (err) { + 0 => return rc, + posix.EINTR => continue, + posix.EINVAL => unreachable, + posix.EFAULT => unreachable, + posix.EAGAIN => return error.WouldBlock, + posix.EBADF => return error.FileClosed, + posix.EIO => return error.InputOutput, + posix.EISDIR => return error.IsDir, + posix.ENOBUFS => return error.SystemResources, + posix.ENOMEM => return error.SystemResources, + else => return unexpectedErrorPosix(err), + } + }, + else => @compileError("Unsupported OS"), } } @@ -311,25 +352,67 @@ pub fn posixWrite(fd: i32, bytes: []const u8) !void { } pub fn posix_pwritev(fd: i32, iov: [*]const posix.iovec_const, count: usize, offset: u64) PosixWriteError!void { - while (true) { - const rc = posix.pwritev(fd, iov, count, offset); - const err = posix.getErrno(rc); - switch (err) { - 0 => return, - posix.EINTR => continue, - posix.EINVAL => unreachable, - posix.EFAULT => unreachable, - posix.EAGAIN => return PosixWriteError.WouldBlock, - posix.EBADF => return PosixWriteError.FileClosed, - posix.EDESTADDRREQ => return PosixWriteError.DestinationAddressRequired, - posix.EDQUOT => return PosixWriteError.DiskQuota, - posix.EFBIG => return PosixWriteError.FileTooBig, - posix.EIO => return PosixWriteError.InputOutput, - posix.ENOSPC => return PosixWriteError.NoSpaceLeft, - posix.EPERM => return PosixWriteError.AccessDenied, - posix.EPIPE => return PosixWriteError.BrokenPipe, - else => return unexpectedErrorPosix(err), - } + switch (builtin.os) { + builtin.Os.macosx => { + // Darwin does not have pwritev but it does have pwrite. + var off: usize = 0; + var iov_i: usize = 0; + var inner_off: usize = 0; + while (true) { + const v = iov[iov_i]; + const rc = darwin.pwrite(fd, v.iov_base + inner_off, v.iov_len - inner_off, offset + off); + const err = darwin.getErrno(rc); + switch (err) { + 0 => { + off += rc; + inner_off += rc; + if (inner_off == v.iov_len) { + iov_i += 1; + inner_off = 0; + if (iov_i == count) { + return; + } + } + continue; + }, + posix.EINTR => continue, + posix.ESPIPE => unreachable, // fd is not seekable + posix.EINVAL => unreachable, + posix.EFAULT => unreachable, + posix.EAGAIN => return PosixWriteError.WouldBlock, + posix.EBADF => return PosixWriteError.FileClosed, + posix.EDESTADDRREQ => return PosixWriteError.DestinationAddressRequired, + posix.EDQUOT => return PosixWriteError.DiskQuota, + posix.EFBIG => return PosixWriteError.FileTooBig, + posix.EIO => return PosixWriteError.InputOutput, + posix.ENOSPC => return PosixWriteError.NoSpaceLeft, + posix.EPERM => return PosixWriteError.AccessDenied, + posix.EPIPE => return PosixWriteError.BrokenPipe, + else => return unexpectedErrorPosix(err), + } + } + }, + builtin.Os.linux => while (true) { + const rc = posix.pwritev(fd, iov, count, offset); + const err = posix.getErrno(rc); + switch (err) { + 0 => return, + posix.EINTR => continue, + posix.EINVAL => unreachable, + posix.EFAULT => unreachable, + posix.EAGAIN => return PosixWriteError.WouldBlock, + posix.EBADF => return PosixWriteError.FileClosed, + posix.EDESTADDRREQ => return PosixWriteError.DestinationAddressRequired, + posix.EDQUOT => return PosixWriteError.DiskQuota, + posix.EFBIG => return PosixWriteError.FileTooBig, + posix.EIO => return PosixWriteError.InputOutput, + posix.ENOSPC => return PosixWriteError.NoSpaceLeft, + posix.EPERM => return PosixWriteError.AccessDenied, + posix.EPIPE => return PosixWriteError.BrokenPipe, + else => return unexpectedErrorPosix(err), + } + }, + else => @compileError("Unsupported OS"), } }