std.event.fs support for macos

The file I/O stuff is working, but the fs watching
stuff is not yet.
This commit is contained in:
Andrew Kelley 2018-08-07 00:49:09 -04:00
parent 2c9ed664dd
commit fd50a6896b
5 changed files with 239 additions and 78 deletions

View File

@ -21,8 +21,10 @@ pub extern "c" fn lseek(fd: c_int, offset: isize, whence: c_int) isize;
pub extern "c" fn open(path: [*]const u8, oflag: c_int, ...) c_int;
pub extern "c" fn raise(sig: c_int) c_int;
pub extern "c" fn read(fd: c_int, buf: *c_void, nbyte: usize) isize;
pub extern "c" fn pread(fd: c_int, buf: *c_void, nbyte: usize, offset: u64) isize;
pub extern "c" fn stat(noalias path: [*]const u8, noalias buf: *Stat) c_int;
pub extern "c" fn write(fd: c_int, buf: *const c_void, nbyte: usize) isize;
pub extern "c" fn pwrite(fd: c_int, buf: *const c_void, nbyte: usize, offset: u64) isize;
pub extern "c" fn mmap(addr: ?*c_void, len: usize, prot: c_int, flags: c_int, fd: c_int, offset: isize) ?*c_void;
pub extern "c" fn munmap(addr: *c_void, len: usize) c_int;
pub extern "c" fn unlink(path: [*]const u8) c_int;

View File

@ -27,7 +27,7 @@ pub const Request = struct {
pub const PWriteV = struct {
fd: os.FileHandle,
iov: []os.linux.iovec_const,
iov: []os.posix.iovec_const,
offset: usize,
result: Error!void,
@ -36,7 +36,7 @@ pub const Request = struct {
pub const PReadV = struct {
fd: os.FileHandle,
iov: []os.linux.iovec,
iov: []os.posix.iovec,
offset: usize,
result: Error!usize,
@ -83,11 +83,11 @@ pub async fn pwritev(loop: *event.Loop, fd: os.FileHandle, offset: usize, data:
resume @handle();
}
const iovecs = try loop.allocator.alloc(os.linux.iovec_const, data.len);
const iovecs = try loop.allocator.alloc(os.posix.iovec_const, data.len);
defer loop.allocator.free(iovecs);
for (data) |buf, i| {
iovecs[i] = os.linux.iovec_const{
iovecs[i] = os.posix.iovec_const{
.iov_base = buf.ptr,
.iov_len = buf.len,
};
@ -116,7 +116,7 @@ pub async fn pwritev(loop: *event.Loop, fd: os.FileHandle, offset: usize, data:
};
suspend {
loop.linuxFsRequest(&req_node);
loop.posixFsRequest(&req_node);
}
return req_node.data.msg.PWriteV.result;
@ -132,11 +132,11 @@ pub async fn preadv(loop: *event.Loop, fd: os.FileHandle, offset: usize, data: [
resume @handle();
}
const iovecs = try loop.allocator.alloc(os.linux.iovec, data.len);
const iovecs = try loop.allocator.alloc(os.posix.iovec, data.len);
defer loop.allocator.free(iovecs);
for (data) |buf, i| {
iovecs[i] = os.linux.iovec{
iovecs[i] = os.posix.iovec{
.iov_base = buf.ptr,
.iov_len = buf.len,
};
@ -165,7 +165,7 @@ pub async fn preadv(loop: *event.Loop, fd: os.FileHandle, offset: usize, data: [
};
suspend {
loop.linuxFsRequest(&req_node);
loop.posixFsRequest(&req_node);
}
return req_node.data.msg.PReadV.result;
@ -201,7 +201,7 @@ pub async fn openRead(loop: *event.Loop, path: []const u8) os.File.OpenError!os.
};
suspend {
loop.linuxFsRequest(&req_node);
loop.posixFsRequest(&req_node);
}
return req_node.data.msg.OpenRead.result;
@ -243,7 +243,7 @@ pub async fn openReadWrite(
};
suspend {
loop.linuxFsRequest(&req_node);
loop.posixFsRequest(&req_node);
}
return req_node.data.msg.OpenRW.result;
@ -280,7 +280,7 @@ pub const CloseOperation = struct {
/// Defer this after creating.
pub fn deinit(self: *CloseOperation) void {
if (self.have_fd) {
self.loop.linuxFsRequest(&self.close_req_node);
self.loop.posixFsRequest(&self.close_req_node);
} else {
self.loop.allocator.destroy(self);
}
@ -330,7 +330,7 @@ pub async fn writeFileMode(loop: *event.Loop, path: []const u8, contents: []cons
};
suspend {
loop.linuxFsRequest(&req_node);
loop.posixFsRequest(&req_node);
}
return req_node.data.msg.WriteFile.result;

View File

@ -127,11 +127,6 @@ pub const Loop = struct {
.finish = fs.Request.Finish.NoAction,
},
};
self.os_data.fs_thread = try os.spawnThread(self, linuxFsRun);
errdefer {
self.linuxFsRequest(&self.os_data.fs_end_request);
self.os_data.fs_thread.wait();
}
errdefer {
while (self.available_eventfd_resume_nodes.pop()) |node| os.close(node.data.eventfd);
@ -168,6 +163,12 @@ pub const Loop = struct {
&self.os_data.final_eventfd_event,
);
self.os_data.fs_thread = try os.spawnThread(self, posixFsRun);
errdefer {
self.posixFsRequest(&self.os_data.fs_end_request);
self.os_data.fs_thread.wait();
}
var extra_thread_index: usize = 0;
errdefer {
// writing 8 bytes to an eventfd cannot fail
@ -185,10 +186,25 @@ pub const Loop = struct {
self.os_data.kqfd = try os.bsdKQueue();
errdefer os.close(self.os_data.kqfd);
self.os_data.fs_kqfd = try os.bsdKQueue();
errdefer os.close(self.os_data.fs_kqfd);
self.os_data.fs_queue = std.atomic.Queue(fs.Request).init();
// we need another thread for the file system because Darwin does not have an async
// file system I/O API.
self.os_data.fs_end_request = fs.RequestNode{
.prev = undefined,
.next = undefined,
.data = fs.Request{
.msg = fs.Request.Msg.End,
.finish = fs.Request.Finish.NoAction,
},
};
self.os_data.kevents = try self.allocator.alloc(posix.Kevent, extra_thread_count);
errdefer self.allocator.free(self.os_data.kevents);
const eventlist = ([*]posix.Kevent)(undefined)[0..0];
const empty_kevs = ([*]posix.Kevent)(undefined)[0..0];
for (self.eventfd_resume_nodes) |*eventfd_node, i| {
eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{
@ -207,12 +223,11 @@ pub const Loop = struct {
.udata = @ptrToInt(&eventfd_node.data.base),
},
},
.prev = undefined,
.next = undefined,
};
self.available_eventfd_resume_nodes.push(eventfd_node);
const kevent_array = (*[1]posix.Kevent)(&eventfd_node.data.kevent);
_ = try os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null);
_ = try os.bsdKEvent(self.os_data.kqfd, kevent_array, empty_kevs, null);
eventfd_node.data.kevent.flags = posix.EV_CLEAR | posix.EV_ENABLE;
eventfd_node.data.kevent.fflags = posix.NOTE_TRIGGER;
// this one is for waiting for events
@ -236,14 +251,38 @@ pub const Loop = struct {
.data = 0,
.udata = @ptrToInt(&self.final_resume_node),
};
const kevent_array = (*[1]posix.Kevent)(&self.os_data.final_kevent);
_ = try os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null);
const final_kev_arr = (*[1]posix.Kevent)(&self.os_data.final_kevent);
_ = try os.bsdKEvent(self.os_data.kqfd, final_kev_arr, empty_kevs, null);
self.os_data.final_kevent.flags = posix.EV_ENABLE;
self.os_data.final_kevent.fflags = posix.NOTE_TRIGGER;
self.os_data.fs_kevent_wake = posix.Kevent{
.ident = extra_thread_count + 1,
.filter = posix.EVFILT_USER,
.flags = posix.EV_ADD,
.fflags = posix.NOTE_TRIGGER,
.data = 0,
.udata = undefined,
};
self.os_data.fs_kevent_wait = posix.Kevent{
.ident = extra_thread_count + 1,
.filter = posix.EVFILT_USER,
.flags = posix.EV_ADD|posix.EV_CLEAR,
.fflags = 0,
.data = 0,
.udata = undefined,
};
self.os_data.fs_thread = try os.spawnThread(self, posixFsRun);
errdefer {
self.posixFsRequest(&self.os_data.fs_end_request);
self.os_data.fs_thread.wait();
}
var extra_thread_index: usize = 0;
errdefer {
_ = os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null) catch unreachable;
_ = os.bsdKEvent(self.os_data.kqfd, final_kev_arr, empty_kevs, null) catch unreachable;
while (extra_thread_index != 0) {
extra_thread_index -= 1;
self.extra_threads[extra_thread_index].wait();
@ -312,6 +351,7 @@ pub const Loop = struct {
builtin.Os.macosx => {
self.allocator.free(self.os_data.kevents);
os.close(self.os_data.kqfd);
os.close(self.os_data.fs_kqfd);
},
builtin.Os.windows => {
os.close(self.os_data.io_port);
@ -375,8 +415,8 @@ pub const Loop = struct {
switch (builtin.os) {
builtin.Os.macosx => {
const kevent_array = (*[1]posix.Kevent)(&eventfd_node.kevent);
const eventlist = ([*]posix.Kevent)(undefined)[0..0];
_ = os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null) catch {
const empty_kevs = ([*]posix.Kevent)(undefined)[0..0];
_ = os.bsdKEvent(self.os_data.kqfd, kevent_array, empty_kevs, null) catch {
self.next_tick_queue.unget(next_tick_node);
self.available_eventfd_resume_nodes.push(resume_stack_node);
return;
@ -493,16 +533,17 @@ pub const Loop = struct {
// cause all the threads to stop
switch (builtin.os) {
builtin.Os.linux => {
self.linuxFsRequest(&self.os_data.fs_end_request);
self.posixFsRequest(&self.os_data.fs_end_request);
// writing 8 bytes to an eventfd cannot fail
os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable;
return;
},
builtin.Os.macosx => {
self.posixFsRequest(&self.os_data.fs_end_request);
const final_kevent = (*[1]posix.Kevent)(&self.os_data.final_kevent);
const eventlist = ([*]posix.Kevent)(undefined)[0..0];
const empty_kevs = ([*]posix.Kevent)(undefined)[0..0];
// cannot fail because we already added it and this just enables it
_ = os.bsdKEvent(self.os_data.kqfd, final_kevent, eventlist, null) catch unreachable;
_ = os.bsdKEvent(self.os_data.kqfd, final_kevent, empty_kevs, null) catch unreachable;
return;
},
builtin.Os.windows => {
@ -576,6 +617,7 @@ pub const Loop = struct {
self.finishOneEvent();
}
}
break;
},
builtin.Os.windows => {
var completion_key: usize = undefined;
@ -610,19 +652,29 @@ pub const Loop = struct {
}
}
fn linuxFsRequest(self: *Loop, request_node: *fs.RequestNode) void {
self.beginOneEvent(); // finished in linuxFsRun after processing the msg
fn posixFsRequest(self: *Loop, request_node: *fs.RequestNode) void {
self.beginOneEvent(); // finished in posixFsRun after processing the msg
self.os_data.fs_queue.put(request_node);
_ = @atomicRmw(i32, &self.os_data.fs_queue_len, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); // let this wrap
const rc = os.linux.futex_wake(@ptrToInt(&self.os_data.fs_queue_len), os.linux.FUTEX_WAKE, 1);
switch (os.linux.getErrno(rc)) {
0 => {},
posix.EINVAL => unreachable,
else => unreachable,
switch (builtin.os) {
builtin.Os.macosx => {
const fs_kevs = (*[1]posix.Kevent)(&self.os_data.fs_kevent_wake);
const empty_kevs = ([*]posix.Kevent)(undefined)[0..0];
_ = os.bsdKEvent(self.os_data.fs_kqfd, fs_kevs, empty_kevs, null) catch unreachable;
},
builtin.Os.linux => {
_ = @atomicRmw(i32, &self.os_data.fs_queue_len, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); // let this wrap
const rc = os.linux.futex_wake(@ptrToInt(&self.os_data.fs_queue_len), os.linux.FUTEX_WAKE, 1);
switch (os.linux.getErrno(rc)) {
0 => {},
posix.EINVAL => unreachable,
else => unreachable,
}
},
else => @compileError("Unsupported OS"),
}
}
fn linuxFsRun(self: *Loop) void {
fn posixFsRun(self: *Loop) void {
var processed_count: i32 = 0; // we let this wrap
while (true) {
while (self.os_data.fs_queue.get()) |node| {
@ -664,12 +716,22 @@ pub const Loop = struct {
}
self.finishOneEvent();
}
const rc = os.linux.futex_wait(@ptrToInt(&self.os_data.fs_queue_len), os.linux.FUTEX_WAIT, processed_count, null);
switch (os.linux.getErrno(rc)) {
0 => continue,
posix.EINTR => continue,
posix.EAGAIN => continue,
else => unreachable,
switch (builtin.os) {
builtin.Os.linux => {
const rc = os.linux.futex_wait(@ptrToInt(&self.os_data.fs_queue_len), os.linux.FUTEX_WAIT, processed_count, null);
switch (os.linux.getErrno(rc)) {
0 => continue,
posix.EINTR => continue,
posix.EAGAIN => continue,
else => unreachable,
}
},
builtin.Os.macosx => {
const fs_kevs = (*[1]posix.Kevent)(&self.os_data.fs_kevent_wait);
var out_kevs: [1]posix.Kevent = undefined;
_ = os.bsdKEvent(self.os_data.fs_kqfd, fs_kevs, out_kevs[0..], null) catch unreachable;
},
else => @compileError("Unsupported OS"),
}
}
}
@ -696,6 +758,12 @@ pub const Loop = struct {
kqfd: i32,
final_kevent: posix.Kevent,
kevents: []posix.Kevent,
fs_kevent_wake: posix.Kevent,
fs_kevent_wait: posix.Kevent,
fs_thread: *os.Thread,
fs_kqfd: i32,
fs_queue: std.atomic.Queue(fs.Request),
fs_end_request: fs.RequestNode,
};
};

View File

@ -646,6 +646,10 @@ pub fn read(fd: i32, buf: [*]u8, nbyte: usize) usize {
return errnoWrap(c.read(fd, @ptrCast(*c_void, buf), nbyte));
}
pub fn pread(fd: i32, buf: [*]u8, nbyte: usize, offset: u64) usize {
return errnoWrap(c.pread(fd, @ptrCast(*c_void, buf), nbyte, offset));
}
pub fn stat(noalias path: [*]const u8, noalias buf: *stat) usize {
return errnoWrap(c.stat(path, buf));
}
@ -654,6 +658,10 @@ pub fn write(fd: i32, buf: [*]const u8, nbyte: usize) usize {
return errnoWrap(c.write(fd, @ptrCast(*const c_void, buf), nbyte));
}
pub fn pwrite(fd: i32, buf: [*]const u8, nbyte: usize, offset: u64) usize {
return errnoWrap(c.pwrite(fd, @ptrCast(*const c_void, buf), nbyte, offset));
}
pub fn mmap(address: ?[*]u8, length: usize, prot: usize, flags: u32, fd: i32, offset: isize) usize {
const ptr_result = c.mmap(
@ptrCast(*c_void, address),

View File

@ -246,23 +246,64 @@ pub fn posixRead(fd: i32, buf: []u8) !void {
}
}
/// Number of bytes read is returned. Upon reading end-of-file, zero is returned.
pub fn posix_preadv(fd: i32, iov: [*]const posix.iovec, count: usize, offset: u64) !usize {
while (true) {
const rc = posix.preadv(fd, iov, count, offset);
const err = posix.getErrno(rc);
switch (err) {
0 => return rc,
posix.EINTR => continue,
posix.EINVAL => unreachable,
posix.EFAULT => unreachable,
posix.EAGAIN => return error.WouldBlock,
posix.EBADF => return error.FileClosed,
posix.EIO => return error.InputOutput,
posix.EISDIR => return error.IsDir,
posix.ENOBUFS => return error.SystemResources,
posix.ENOMEM => return error.SystemResources,
else => return unexpectedErrorPosix(err),
}
switch (builtin.os) {
builtin.Os.macosx => {
// Darwin does not have preadv but it does have pread.
var off: usize = 0;
var iov_i: usize = 0;
var inner_off: usize = 0;
while (true) {
const v = iov[iov_i];
const rc = darwin.pread(fd, v.iov_base + inner_off, v.iov_len - inner_off, offset + off);
const err = darwin.getErrno(rc);
switch (err) {
0 => {
off += rc;
inner_off += rc;
if (inner_off == v.iov_len) {
iov_i += 1;
inner_off = 0;
if (iov_i == count) {
return off;
}
}
if (rc == 0) return off; // EOF
continue;
},
posix.EINTR => continue,
posix.EINVAL => unreachable,
posix.EFAULT => unreachable,
posix.ESPIPE => unreachable, // fd is not seekable
posix.EAGAIN => return error.WouldBlock,
posix.EBADF => return error.FileClosed,
posix.EIO => return error.InputOutput,
posix.EISDIR => return error.IsDir,
posix.ENOBUFS => return error.SystemResources,
posix.ENOMEM => return error.SystemResources,
else => return unexpectedErrorPosix(err),
}
}
},
builtin.Os.linux, builtin.Os.freebsd => while (true) {
const rc = posix.preadv(fd, iov, count, offset);
const err = posix.getErrno(rc);
switch (err) {
0 => return rc,
posix.EINTR => continue,
posix.EINVAL => unreachable,
posix.EFAULT => unreachable,
posix.EAGAIN => return error.WouldBlock,
posix.EBADF => return error.FileClosed,
posix.EIO => return error.InputOutput,
posix.EISDIR => return error.IsDir,
posix.ENOBUFS => return error.SystemResources,
posix.ENOMEM => return error.SystemResources,
else => return unexpectedErrorPosix(err),
}
},
else => @compileError("Unsupported OS"),
}
}
@ -311,25 +352,67 @@ pub fn posixWrite(fd: i32, bytes: []const u8) !void {
}
pub fn posix_pwritev(fd: i32, iov: [*]const posix.iovec_const, count: usize, offset: u64) PosixWriteError!void {
while (true) {
const rc = posix.pwritev(fd, iov, count, offset);
const err = posix.getErrno(rc);
switch (err) {
0 => return,
posix.EINTR => continue,
posix.EINVAL => unreachable,
posix.EFAULT => unreachable,
posix.EAGAIN => return PosixWriteError.WouldBlock,
posix.EBADF => return PosixWriteError.FileClosed,
posix.EDESTADDRREQ => return PosixWriteError.DestinationAddressRequired,
posix.EDQUOT => return PosixWriteError.DiskQuota,
posix.EFBIG => return PosixWriteError.FileTooBig,
posix.EIO => return PosixWriteError.InputOutput,
posix.ENOSPC => return PosixWriteError.NoSpaceLeft,
posix.EPERM => return PosixWriteError.AccessDenied,
posix.EPIPE => return PosixWriteError.BrokenPipe,
else => return unexpectedErrorPosix(err),
}
switch (builtin.os) {
builtin.Os.macosx => {
// Darwin does not have pwritev but it does have pwrite.
var off: usize = 0;
var iov_i: usize = 0;
var inner_off: usize = 0;
while (true) {
const v = iov[iov_i];
const rc = darwin.pwrite(fd, v.iov_base + inner_off, v.iov_len - inner_off, offset + off);
const err = darwin.getErrno(rc);
switch (err) {
0 => {
off += rc;
inner_off += rc;
if (inner_off == v.iov_len) {
iov_i += 1;
inner_off = 0;
if (iov_i == count) {
return;
}
}
continue;
},
posix.EINTR => continue,
posix.ESPIPE => unreachable, // fd is not seekable
posix.EINVAL => unreachable,
posix.EFAULT => unreachable,
posix.EAGAIN => return PosixWriteError.WouldBlock,
posix.EBADF => return PosixWriteError.FileClosed,
posix.EDESTADDRREQ => return PosixWriteError.DestinationAddressRequired,
posix.EDQUOT => return PosixWriteError.DiskQuota,
posix.EFBIG => return PosixWriteError.FileTooBig,
posix.EIO => return PosixWriteError.InputOutput,
posix.ENOSPC => return PosixWriteError.NoSpaceLeft,
posix.EPERM => return PosixWriteError.AccessDenied,
posix.EPIPE => return PosixWriteError.BrokenPipe,
else => return unexpectedErrorPosix(err),
}
}
},
builtin.Os.linux => while (true) {
const rc = posix.pwritev(fd, iov, count, offset);
const err = posix.getErrno(rc);
switch (err) {
0 => return,
posix.EINTR => continue,
posix.EINVAL => unreachable,
posix.EFAULT => unreachable,
posix.EAGAIN => return PosixWriteError.WouldBlock,
posix.EBADF => return PosixWriteError.FileClosed,
posix.EDESTADDRREQ => return PosixWriteError.DestinationAddressRequired,
posix.EDQUOT => return PosixWriteError.DiskQuota,
posix.EFBIG => return PosixWriteError.FileTooBig,
posix.EIO => return PosixWriteError.InputOutput,
posix.ENOSPC => return PosixWriteError.NoSpaceLeft,
posix.EPERM => return PosixWriteError.AccessDenied,
posix.EPIPE => return PosixWriteError.BrokenPipe,
else => return unexpectedErrorPosix(err),
}
},
else => @compileError("Unsupported OS"),
}
}