const builtin = @import("builtin"); const std = @import("../index.zig"); const event = std.event; const assert = std.debug.assert; const os = std.os; const mem = std.mem; const posix = os.posix; pub const RequestNode = std.atomic.Queue(Request).Node; pub const Request = struct { msg: Msg, finish: Finish, pub const Finish = union(enum) { TickNode: event.Loop.NextTickNode, DeallocCloseOperation: *CloseOperation, NoAction, }; pub const Msg = union(enum) { PWriteV: PWriteV, PReadV: PReadV, Open: Open, Close: Close, WriteFile: WriteFile, End, // special - means the fs thread should exit pub const PWriteV = struct { fd: os.FileHandle, iov: []os.posix.iovec_const, offset: usize, result: Error!void, pub const Error = os.File.WriteError; }; pub const PReadV = struct { fd: os.FileHandle, iov: []os.posix.iovec, offset: usize, result: Error!usize, pub const Error = os.File.ReadError; }; pub const Open = struct { /// must be null terminated. TODO https://github.com/ziglang/zig/issues/265 path: []const u8, flags: u32, mode: os.File.Mode, result: Error!os.FileHandle, pub const Error = os.File.OpenError; }; pub const WriteFile = struct { /// must be null terminated. TODO https://github.com/ziglang/zig/issues/265 path: []const u8, contents: []const u8, mode: os.File.Mode, result: Error!void, pub const Error = os.File.OpenError || os.File.WriteError; }; pub const Close = struct { fd: os.FileHandle, }; }; }; /// data - just the inner references - must live until pwritev promise completes. pub async fn pwritev(loop: *event.Loop, fd: os.FileHandle, data: []const []const u8, offset: usize) !void { // workaround for https://github.com/ziglang/zig/issues/1194 suspend { resume @handle(); } const iovecs = try loop.allocator.alloc(os.posix.iovec_const, data.len); defer loop.allocator.free(iovecs); for (data) |buf, i| { iovecs[i] = os.posix.iovec_const{ .iov_base = buf.ptr, .iov_len = buf.len, }; } var req_node = RequestNode{ .prev = null, .next = null, .data = Request{ .msg = Request.Msg{ .PWriteV = Request.Msg.PWriteV{ .fd = fd, .iov = iovecs, .offset = offset, .result = undefined, }, }, .finish = Request.Finish{ .TickNode = event.Loop.NextTickNode{ .prev = null, .next = null, .data = @handle(), }, }, }, }; errdefer loop.posixFsCancel(&req_node); suspend { loop.posixFsRequest(&req_node); } return req_node.data.msg.PWriteV.result; } /// data - just the inner references - must live until pwritev promise completes. pub async fn preadv(loop: *event.Loop, fd: os.FileHandle, data: []const []u8, offset: usize) !usize { //const data_dupe = try mem.dupe(loop.allocator, []const u8, data); //defer loop.allocator.free(data_dupe); // workaround for https://github.com/ziglang/zig/issues/1194 suspend { resume @handle(); } const iovecs = try loop.allocator.alloc(os.posix.iovec, data.len); defer loop.allocator.free(iovecs); for (data) |buf, i| { iovecs[i] = os.posix.iovec{ .iov_base = buf.ptr, .iov_len = buf.len, }; } var req_node = RequestNode{ .prev = null, .next = null, .data = Request{ .msg = Request.Msg{ .PReadV = Request.Msg.PReadV{ .fd = fd, .iov = iovecs, .offset = offset, .result = undefined, }, }, .finish = Request.Finish{ .TickNode = event.Loop.NextTickNode{ .prev = null, .next = null, .data = @handle(), }, }, }, }; errdefer loop.posixFsCancel(&req_node); suspend { loop.posixFsRequest(&req_node); } return req_node.data.msg.PReadV.result; } pub async fn open( loop: *event.Loop, path: []const u8, flags: u32, mode: os.File.Mode, ) os.File.OpenError!os.FileHandle { // workaround for https://github.com/ziglang/zig/issues/1194 suspend { resume @handle(); } const path_with_null = try std.cstr.addNullByte(loop.allocator, path); defer loop.allocator.free(path_with_null); var req_node = RequestNode{ .prev = null, .next = null, .data = Request{ .msg = Request.Msg{ .Open = Request.Msg.Open{ .path = path_with_null[0..path.len], .flags = flags, .mode = mode, .result = undefined, }, }, .finish = Request.Finish{ .TickNode = event.Loop.NextTickNode{ .prev = null, .next = null, .data = @handle(), }, }, }, }; errdefer loop.posixFsCancel(&req_node); suspend { loop.posixFsRequest(&req_node); } return req_node.data.msg.Open.result; } pub async fn openRead(loop: *event.Loop, path: []const u8) os.File.OpenError!os.FileHandle { const flags = posix.O_LARGEFILE | posix.O_RDONLY | posix.O_CLOEXEC; return await (async open(loop, path, flags, 0) catch unreachable); } /// Creates if does not exist. Does not truncate. pub async fn openReadWrite( loop: *event.Loop, path: []const u8, mode: os.File.Mode, ) os.File.OpenError!os.FileHandle { const flags = posix.O_LARGEFILE | posix.O_RDWR | posix.O_CREAT | posix.O_CLOEXEC; return await (async open(loop, path, flags, mode) catch unreachable); } /// This abstraction helps to close file handles in defer expressions /// without the possibility of failure and without the use of suspend points. /// Start a `CloseOperation` before opening a file, so that you can defer /// `CloseOperation.finish`. /// If you call `setHandle` then finishing will close the fd; otherwise finishing /// will deallocate the `CloseOperation`. pub const CloseOperation = struct { loop: *event.Loop, have_fd: bool, close_req_node: RequestNode, pub fn start(loop: *event.Loop) (error{OutOfMemory}!*CloseOperation) { const self = try loop.allocator.createOne(CloseOperation); self.* = CloseOperation{ .loop = loop, .have_fd = false, .close_req_node = RequestNode{ .prev = null, .next = null, .data = Request{ .msg = Request.Msg{ .Close = Request.Msg.Close{ .fd = undefined }, }, .finish = Request.Finish{ .DeallocCloseOperation = self }, }, }, }; return self; } /// Defer this after creating. pub fn finish(self: *CloseOperation) void { if (self.have_fd) { self.loop.posixFsRequest(&self.close_req_node); } else { self.loop.allocator.destroy(self); } } pub fn setHandle(self: *CloseOperation, handle: os.FileHandle) void { self.close_req_node.data.msg.Close.fd = handle; self.have_fd = true; } /// Undo a `setHandle`. pub fn clearHandle(self: *CloseOperation) void { self.have_fd = false; } pub fn getHandle(self: *CloseOperation) os.FileHandle { assert(self.have_fd); return self.close_req_node.data.msg.Close.fd; } }; /// contents must remain alive until writeFile completes. pub async fn writeFile(loop: *event.Loop, path: []const u8, contents: []const u8) !void { return await (async writeFileMode(loop, path, contents, os.File.default_mode) catch unreachable); } /// contents must remain alive until writeFile completes. pub async fn writeFileMode(loop: *event.Loop, path: []const u8, contents: []const u8, mode: os.File.Mode) !void { // workaround for https://github.com/ziglang/zig/issues/1194 suspend { resume @handle(); } const path_with_null = try std.cstr.addNullByte(loop.allocator, path); defer loop.allocator.free(path_with_null); var req_node = RequestNode{ .prev = null, .next = null, .data = Request{ .msg = Request.Msg{ .WriteFile = Request.Msg.WriteFile{ .path = path_with_null[0..path.len], .contents = contents, .mode = mode, .result = undefined, }, }, .finish = Request.Finish{ .TickNode = event.Loop.NextTickNode{ .prev = null, .next = null, .data = @handle(), }, }, }, }; errdefer loop.posixFsCancel(&req_node); suspend { loop.posixFsRequest(&req_node); } return req_node.data.msg.WriteFile.result; } /// The promise resumes when the last data has been confirmed written, but before the file handle /// is closed. /// Caller owns returned memory. pub async fn readFile(loop: *event.Loop, file_path: []const u8, max_size: usize) ![]u8 { var close_op = try CloseOperation.start(loop); defer close_op.finish(); const path_with_null = try std.cstr.addNullByte(loop.allocator, file_path); defer loop.allocator.free(path_with_null); const fd = try await (async openRead(loop, path_with_null[0..file_path.len]) catch unreachable); close_op.setHandle(fd); var list = std.ArrayList(u8).init(loop.allocator); defer list.deinit(); while (true) { try list.ensureCapacity(list.len + os.page_size); const buf = list.items[list.len..]; const buf_array = [][]u8{buf}; const amt = try await (async preadv(loop, fd, buf_array, list.len) catch unreachable); list.len += amt; if (list.len > max_size) { return error.FileTooBig; } if (amt < buf.len) { return list.toOwnedSlice(); } } } pub fn Watch(comptime V: type) type { return struct { channel: *event.Channel(Event), os_data: OsData, const OsData = switch (builtin.os) { builtin.Os.macosx => struct{ file_table: FileTable, table_lock: event.Lock, const FileTable = std.AutoHashMap([]const u8, *Put); const Put = struct { putter: promise, value_ptr: *V, }; }, builtin.Os.linux => struct { putter: promise, inotify_fd: i32, wd_table: WdTable, table_lock: event.Lock, const FileTable = std.AutoHashMap([]const u8, V); }, else => @compileError("Unsupported OS"), }; const WdTable = std.AutoHashMap(i32, Dir); const FileToHandle = std.AutoHashMap([]const u8, promise); const Self = this; const Dir = struct { dirname: []const u8, file_table: OsData.FileTable, }; pub const Event = union(enum) { CloseWrite: V, Err: Error, pub const Error = error{ UserResourceLimitReached, SystemResources, AccessDenied, }; }; pub fn create(loop: *event.Loop, event_buf_count: usize) !*Self { const channel = try event.Channel(Self.Event).create(loop, event_buf_count); errdefer channel.destroy(); switch (builtin.os) { builtin.Os.linux => { const inotify_fd = try os.linuxINotifyInit1(os.linux.IN_NONBLOCK | os.linux.IN_CLOEXEC); errdefer os.close(inotify_fd); var result: *Self = undefined; _ = try async linuxEventPutter(inotify_fd, channel, &result); return result; }, builtin.Os.macosx => { const self = try loop.allocator.createOne(Self); errdefer loop.allocator.destroy(self); self.* = Self{ .channel = channel, .os_data = OsData{ .table_lock = event.Lock.init(loop), .file_table = OsData.FileTable.init(loop.allocator), }, }; return self; }, else => @compileError("Unsupported OS"), } } pub fn destroy(self: *Self) void { switch (builtin.os) { builtin.Os.macosx => { self.os_data.table_lock.deinit(); var it = self.os_data.file_table.iterator(); while (it.next()) |entry| { cancel entry.value.putter; self.channel.loop.allocator.free(entry.key); } self.channel.destroy(); }, builtin.Os.linux => cancel self.os_data.putter, else => @compileError("Unsupported OS"), } } pub async fn addFile(self: *Self, file_path: []const u8, value: V) !?V { switch (builtin.os) { builtin.Os.macosx => return await (async addFileMacosx(self, file_path, value) catch unreachable), builtin.Os.linux => return await (async addFileLinux(self, file_path, value) catch unreachable), else => @compileError("Unsupported OS"), } } async fn addFileMacosx(self: *Self, file_path: []const u8, value: V) !?V { const resolved_path = try os.path.resolve(self.channel.loop.allocator, file_path); var resolved_path_consumed = false; defer if (!resolved_path_consumed) self.channel.loop.allocator.free(resolved_path); var close_op = try CloseOperation.start(self.channel.loop); var close_op_consumed = false; defer if (!close_op_consumed) close_op.finish(); const flags = posix.O_SYMLINK|posix.O_EVTONLY; const mode = 0; const fd = try await (async open(self.channel.loop, resolved_path, flags, mode) catch unreachable); close_op.setHandle(fd); var put_data: *OsData.Put = undefined; const putter = try async self.kqPutEvents(close_op, value, &put_data); close_op_consumed = true; errdefer cancel putter; const result = blk: { const held = await (async self.os_data.table_lock.acquire() catch unreachable); defer held.release(); const gop = try self.os_data.file_table.getOrPut(resolved_path); if (gop.found_existing) { const prev_value = gop.kv.value.value_ptr.*; cancel gop.kv.value.putter; gop.kv.value = put_data; break :blk prev_value; } else { resolved_path_consumed = true; gop.kv.value = put_data; break :blk null; } }; return result; } async fn kqPutEvents(self: *Self, close_op: *CloseOperation, value: V, out_put: **OsData.Put) void { // TODO https://github.com/ziglang/zig/issues/1194 suspend { resume @handle(); } var value_copy = value; var put = OsData.Put{ .putter = @handle(), .value_ptr = &value_copy, }; out_put.* = &put; self.channel.loop.beginOneEvent(); defer { close_op.finish(); self.channel.loop.finishOneEvent(); } while (true) { (await (async self.channel.loop.bsdWaitKev( @intCast(usize, close_op.getHandle()), posix.EVFILT_VNODE, posix.NOTE_WRITE, ) catch unreachable)) catch |err| switch (err) { error.EventNotFound => unreachable, error.ProcessNotFound => unreachable, error.AccessDenied, error.SystemResources => { // TODO https://github.com/ziglang/zig/issues/769 const casted_err = @errSetCast(error{AccessDenied,SystemResources}, err); await (async self.channel.put(Self.Event{ .Err = casted_err }) catch unreachable); }, }; await (async self.channel.put(Self.Event{ .CloseWrite = value_copy }) catch unreachable); } } async fn addFileLinux(self: *Self, file_path: []const u8, value: V) !?V { const dirname = os.path.dirname(file_path) orelse "."; const dirname_with_null = try std.cstr.addNullByte(self.channel.loop.allocator, dirname); var dirname_with_null_consumed = false; defer if (!dirname_with_null_consumed) self.channel.loop.allocator.free(dirname_with_null); const basename = os.path.basename(file_path); const basename_with_null = try std.cstr.addNullByte(self.channel.loop.allocator, basename); var basename_with_null_consumed = false; defer if (!basename_with_null_consumed) self.channel.loop.allocator.free(basename_with_null); const wd = try os.linuxINotifyAddWatchC( self.os_data.inotify_fd, dirname_with_null.ptr, os.linux.IN_CLOSE_WRITE | os.linux.IN_ONLYDIR | os.linux.IN_EXCL_UNLINK, ); // wd is either a newly created watch or an existing one. const held = await (async self.os_data.table_lock.acquire() catch unreachable); defer held.release(); const gop = try self.os_data.wd_table.getOrPut(wd); if (!gop.found_existing) { gop.kv.value = Dir{ .dirname = dirname_with_null, .file_table = OsData.FileTable.init(self.channel.loop.allocator), }; dirname_with_null_consumed = true; } const dir = &gop.kv.value; const file_table_gop = try dir.file_table.getOrPut(basename_with_null); if (file_table_gop.found_existing) { const prev_value = file_table_gop.kv.value; file_table_gop.kv.value = value; return prev_value; } else { file_table_gop.kv.value = value; basename_with_null_consumed = true; return null; } } pub async fn removeFile(self: *Self, file_path: []const u8) ?V { @panic("TODO"); } async fn linuxEventPutter(inotify_fd: i32, channel: *event.Channel(Event), out_watch: **Self) void { // TODO https://github.com/ziglang/zig/issues/1194 suspend { resume @handle(); } const loop = channel.loop; var watch = Self{ .channel = channel, .os_data = OsData{ .putter = @handle(), .inotify_fd = inotify_fd, .wd_table = WdTable.init(loop.allocator), .table_lock = event.Lock.init(loop), }, }; out_watch.* = &watch; loop.beginOneEvent(); defer { watch.os_data.table_lock.deinit(); var wd_it = watch.os_data.wd_table.iterator(); while (wd_it.next()) |wd_entry| { var file_it = wd_entry.value.file_table.iterator(); while (file_it.next()) |file_entry| { loop.allocator.free(file_entry.key); } loop.allocator.free(wd_entry.value.dirname); } loop.finishOneEvent(); os.close(inotify_fd); channel.destroy(); } var event_buf: [4096]u8 align(@alignOf(os.linux.inotify_event)) = undefined; while (true) { const rc = os.linux.read(inotify_fd, &event_buf, event_buf.len); const errno = os.linux.getErrno(rc); switch (errno) { 0 => { // can't use @bytesToSlice because of the special variable length name field var ptr = event_buf[0..].ptr; const end_ptr = ptr + event_buf.len; var ev: *os.linux.inotify_event = undefined; while (@ptrToInt(ptr) < @ptrToInt(end_ptr)) : (ptr += @sizeOf(os.linux.inotify_event) + ev.len) { ev = @ptrCast(*os.linux.inotify_event, ptr); if (ev.mask & os.linux.IN_CLOSE_WRITE == os.linux.IN_CLOSE_WRITE) { const basename_ptr = ptr + @sizeOf(os.linux.inotify_event); const basename_with_null = basename_ptr[0 .. std.cstr.len(basename_ptr) + 1]; const user_value = blk: { const held = await (async watch.os_data.table_lock.acquire() catch unreachable); defer held.release(); const dir = &watch.os_data.wd_table.get(ev.wd).?.value; if (dir.file_table.get(basename_with_null)) |entry| { break :blk entry.value; } else { break :blk null; } }; if (user_value) |v| { await (async channel.put(Self.Event{ .CloseWrite = v }) catch unreachable); } } } }, os.linux.EINTR => continue, os.linux.EINVAL => unreachable, os.linux.EFAULT => unreachable, os.linux.EAGAIN => { (await (async loop.linuxWaitFd( inotify_fd, os.linux.EPOLLET | os.linux.EPOLLIN, ) catch unreachable)) catch |err| { const transformed_err = switch (err) { error.InvalidFileDescriptor => unreachable, error.FileDescriptorAlreadyPresentInSet => unreachable, error.InvalidSyscall => unreachable, error.OperationCausesCircularLoop => unreachable, error.FileDescriptorNotRegistered => unreachable, error.SystemResources => error.SystemResources, error.UserResourceLimitReached => error.UserResourceLimitReached, error.FileDescriptorIncompatibleWithEpoll => unreachable, error.Unexpected => unreachable, }; await (async channel.put(Self.Event{ .Err = transformed_err }) catch unreachable); }; }, else => unreachable, } } } }; } const test_tmp_dir = "std_event_fs_test"; test "write a file, watch it, write it again" { var da = std.heap.DirectAllocator.init(); defer da.deinit(); const allocator = &da.allocator; // TODO move this into event loop too try os.makePath(allocator, test_tmp_dir); defer os.deleteTree(allocator, test_tmp_dir) catch {}; var loop: event.Loop = undefined; try loop.initMultiThreaded(allocator); defer loop.deinit(); var result: error!void = error.ResultNeverWritten; const handle = try async testFsWatchCantFail(&loop, &result); defer cancel handle; loop.run(); return result; } async fn testFsWatchCantFail(loop: *event.Loop, result: *(error!void)) void { result.* = await async testFsWatch(loop) catch unreachable; } async fn testFsWatch(loop: *event.Loop) !void { const file_path = try os.path.join(loop.allocator, test_tmp_dir, "file.txt"); defer loop.allocator.free(file_path); const contents = \\line 1 \\line 2 ; const line2_offset = 7; // first just write then read the file try await try async writeFile(loop, file_path, contents); const read_contents = try await try async readFile(loop, file_path, 1024 * 1024); assert(mem.eql(u8, read_contents, contents)); // now watch the file var watch = try Watch(void).create(loop, 0); defer watch.destroy(); assert((try await try async watch.addFile(file_path, {})) == null); const ev = try async watch.channel.get(); var ev_consumed = false; defer if (!ev_consumed) cancel ev; // overwrite line 2 const fd = try await try async openReadWrite(loop, file_path, os.File.default_mode); { defer os.close(fd); try await try async pwritev(loop, fd, []const []const u8{"lorem ipsum"}, line2_offset); } ev_consumed = true; switch (await ev) { Watch(void).Event.CloseWrite => {}, Watch(void).Event.Err => |err| return err, } const contents_updated = try await try async readFile(loop, file_path, 1024 * 1024); assert(mem.eql(u8, contents_updated, \\line 1 \\lorem ipsum )); }