introduce std.event.fs for async file system functions

only works on linux so far
master
Andrew Kelley 2018-07-25 23:16:13 -04:00
parent 5d4a02c350
commit cc45527333
10 changed files with 559 additions and 97 deletions

View File

@ -460,6 +460,7 @@ set(ZIG_STD_FILES
"empty.zig"
"event.zig"
"event/channel.zig"
"event/fs.zig"
"event/future.zig"
"event/group.zig"
"event/lock.zig"

View File

@ -603,10 +603,10 @@ pub const Builder = struct {
}
fn copyFile(self: *Builder, source_path: []const u8, dest_path: []const u8) !void {
return self.copyFileMode(source_path, dest_path, os.default_file_mode);
return self.copyFileMode(source_path, dest_path, os.File.default_mode);
}
fn copyFileMode(self: *Builder, source_path: []const u8, dest_path: []const u8, mode: os.FileMode) !void {
fn copyFileMode(self: *Builder, source_path: []const u8, dest_path: []const u8, mode: os.File.Mode) !void {
if (self.verbose) {
warn("cp {} {}\n", source_path, dest_path);
}

View File

@ -672,14 +672,10 @@ fn parseFormValueRef(allocator: *mem.Allocator, in_stream: var, comptime T: type
const ParseFormValueError = error{
EndOfStream,
Io,
BadFd,
Unexpected,
InvalidDebugInfo,
EndOfFile,
IsDir,
OutOfMemory,
};
} || std.os.File.ReadError;
fn parseFormValue(allocator: *mem.Allocator, in_stream: var, form_id: u64, is_64: bool) ParseFormValueError!FormValue {
return switch (form_id) {

View File

@ -1,17 +1,19 @@
pub const Channel = @import("event/channel.zig").Channel;
pub const Future = @import("event/future.zig").Future;
pub const Group = @import("event/group.zig").Group;
pub const Lock = @import("event/lock.zig").Lock;
pub const Locked = @import("event/locked.zig").Locked;
pub const Loop = @import("event/loop.zig").Loop;
pub const Lock = @import("event/lock.zig").Lock;
pub const fs = @import("event/fs.zig");
pub const tcp = @import("event/tcp.zig");
pub const Channel = @import("event/channel.zig").Channel;
pub const Group = @import("event/group.zig").Group;
pub const Future = @import("event/future.zig").Future;
test "import event tests" {
_ = @import("event/channel.zig");
_ = @import("event/fs.zig");
_ = @import("event/future.zig");
_ = @import("event/group.zig");
_ = @import("event/lock.zig");
_ = @import("event/locked.zig");
_ = @import("event/loop.zig");
_ = @import("event/lock.zig");
_ = @import("event/tcp.zig");
_ = @import("event/channel.zig");
_ = @import("event/group.zig");
_ = @import("event/future.zig");
}

343
std/event/fs.zig Normal file
View File

@ -0,0 +1,343 @@
const std = @import("../index.zig");
const event = std.event;
const assert = std.debug.assert;
const os = std.os;
const mem = std.mem;
pub const RequestNode = std.atomic.Queue(Request).Node;
pub const Request = struct {
msg: Msg,
finish: Finish,
pub const Finish = union(enum) {
TickNode: event.Loop.NextTickNode,
DeallocCloseOperation: *CloseOperation,
NoAction,
};
pub const Msg = union(enum) {
PWriteV: PWriteV,
PReadV: PReadV,
OpenRead: OpenRead,
Close: Close,
WriteFile: WriteFile,
End, // special - means the fs thread should exit
pub const PWriteV = struct {
fd: os.FileHandle,
data: []const []const u8,
offset: usize,
result: Error!void,
pub const Error = error{};
};
pub const PReadV = struct {
fd: os.FileHandle,
iov: []os.linux.iovec,
offset: usize,
result: Error!usize,
pub const Error = os.File.ReadError;
};
pub const OpenRead = struct {
/// must be null terminated. TODO https://github.com/ziglang/zig/issues/265
path: []const u8,
result: Error!os.FileHandle,
pub const Error = os.File.OpenError;
};
pub const WriteFile = struct {
/// must be null terminated. TODO https://github.com/ziglang/zig/issues/265
path: []const u8,
contents: []const u8,
mode: os.File.Mode,
result: Error!void,
pub const Error = os.File.OpenError || os.File.WriteError;
};
pub const Close = struct {
fd: os.FileHandle,
};
};
};
/// data - both the outer and inner references - must live until pwritev promise completes.
pub async fn pwritev(loop: *event.Loop, fd: os.FileHandle, offset: usize, data: []const []const u8) !void {
//const data_dupe = try mem.dupe(loop.allocator, []const u8, data);
//defer loop.allocator.free(data_dupe);
// workaround for https://github.com/ziglang/zig/issues/1194
var my_handle: promise = undefined;
suspend |p| {
my_handle = p;
resume p;
}
var req_node = RequestNode{
.next = undefined,
.data = Request{
.msg = Request.Msg{
.PWriteV = Request.Msg.PWriteV{
.fd = fd,
.data = data,
.offset = offset,
.result = undefined,
},
},
.finish = Request.Finish{
.TickNode = event.Loop.NextTickNode{
.next = undefined,
.data = my_handle,
},
},
},
};
suspend |_| {
loop.linuxFsRequest(&req_node);
}
return req_node.data.msg.PWriteV.result;
}
/// data - just the inner references - must live until pwritev promise completes.
pub async fn preadv(loop: *event.Loop, fd: os.FileHandle, offset: usize, data: []const []u8) !usize {
//const data_dupe = try mem.dupe(loop.allocator, []const u8, data);
//defer loop.allocator.free(data_dupe);
// workaround for https://github.com/ziglang/zig/issues/1194
var my_handle: promise = undefined;
suspend |p| {
my_handle = p;
resume p;
}
const iovecs = try loop.allocator.alloc(os.linux.iovec, data.len);
defer loop.allocator.free(iovecs);
for (data) |buf, i| {
iovecs[i] = os.linux.iovec{
.iov_base = buf.ptr,
.iov_len = buf.len,
};
}
var req_node = RequestNode{
.next = undefined,
.data = Request{
.msg = Request.Msg{
.PReadV = Request.Msg.PReadV{
.fd = fd,
.iov = iovecs,
.offset = offset,
.result = undefined,
},
},
.finish = Request.Finish{
.TickNode = event.Loop.NextTickNode{
.next = undefined,
.data = my_handle,
},
},
},
};
suspend |_| {
loop.linuxFsRequest(&req_node);
}
return req_node.data.msg.PReadV.result;
}
pub async fn openRead(loop: *event.Loop, path: []const u8) os.File.OpenError!os.FileHandle {
// workaround for https://github.com/ziglang/zig/issues/1194
var my_handle: promise = undefined;
suspend |p| {
my_handle = p;
resume p;
}
var req_node = RequestNode{
.next = undefined,
.data = Request{
.msg = Request.Msg{
.OpenRead = Request.Msg.OpenRead{
.path = path,
.result = undefined,
},
},
.finish = Request.Finish{
.TickNode = event.Loop.NextTickNode{
.next = undefined,
.data = my_handle,
},
},
},
};
suspend |_| {
loop.linuxFsRequest(&req_node);
}
return req_node.data.msg.OpenRead.result;
}
/// This abstraction helps to close file handles in defer expressions
/// without suspending. Start a CloseOperation before opening a file.
pub const CloseOperation = struct {
loop: *event.Loop,
have_fd: bool,
close_req_node: RequestNode,
pub fn create(loop: *event.Loop) (error{OutOfMemory}!*CloseOperation) {
const self = try loop.allocator.createOne(CloseOperation);
self.* = CloseOperation{
.loop = loop,
.have_fd = false,
.close_req_node = RequestNode{
.next = undefined,
.data = Request{
.msg = Request.Msg{
.Close = Request.Msg.Close{ .fd = undefined },
},
.finish = Request.Finish{ .DeallocCloseOperation = self },
},
},
};
return self;
}
/// Defer this after creating.
pub fn deinit(self: *CloseOperation) void {
if (self.have_fd) {
self.loop.linuxFsRequest(&self.close_req_node);
} else {
self.loop.allocator.destroy(self);
}
}
pub fn setHandle(self: *CloseOperation, handle: os.FileHandle) void {
self.close_req_node.data.msg.Close.fd = handle;
self.have_fd = true;
}
};
/// contents must remain alive until writeFile completes.
pub async fn writeFile(loop: *event.Loop, path: []const u8, contents: []const u8) !void {
return await (async writeFileMode(loop, path, contents, os.File.default_mode) catch unreachable);
}
/// contents must remain alive until writeFile completes.
pub async fn writeFileMode(loop: *event.Loop, path: []const u8, contents: []const u8, mode: os.File.Mode) !void {
// workaround for https://github.com/ziglang/zig/issues/1194
var my_handle: promise = undefined;
suspend |p| {
my_handle = p;
resume p;
}
const path_with_null = try std.cstr.addNullByte(loop.allocator, path);
defer loop.allocator.free(path_with_null);
var req_node = RequestNode{
.next = undefined,
.data = Request{
.msg = Request.Msg{
.WriteFile = Request.Msg.WriteFile{
.path = path_with_null[0..path.len],
.contents = contents,
.mode = mode,
.result = undefined,
},
},
.finish = Request.Finish{
.TickNode = event.Loop.NextTickNode{
.next = undefined,
.data = my_handle,
},
},
},
};
suspend |_| {
loop.linuxFsRequest(&req_node);
}
return req_node.data.msg.WriteFile.result;
}
/// The promise resumes when the last data has been confirmed written, but before the file handle
/// is closed.
pub async fn readFile(loop: *event.Loop, file_path: []const u8, max_size: usize) ![]u8 {
var close_op = try CloseOperation.create(loop);
defer close_op.deinit();
const path_with_null = try std.cstr.addNullByte(loop.allocator, file_path);
defer loop.allocator.free(path_with_null);
const fd = try await (async openRead(loop, path_with_null[0..file_path.len]) catch unreachable);
close_op.setHandle(fd);
var list = std.ArrayList(u8).init(loop.allocator);
defer list.deinit();
while (true) {
try list.ensureCapacity(list.len + os.page_size);
const buf = list.items[list.len..];
const buf_array = [][]u8{buf};
const amt = try await (async preadv(loop, fd, list.len, buf_array) catch unreachable);
list.len += amt;
if (amt < buf.len) {
return list.toOwnedSlice();
}
}
}
const test_tmp_dir = "std_event_fs_test";
test "write a file, watch it, write it again" {
var da = std.heap.DirectAllocator.init();
defer da.deinit();
const allocator = &da.allocator;
// TODO move this into event loop too
try os.makePath(allocator, test_tmp_dir);
defer os.deleteTree(allocator, test_tmp_dir) catch {};
var loop: event.Loop = undefined;
try loop.initMultiThreaded(allocator);
defer loop.deinit();
var result: error!void = undefined;
const handle = try async<allocator> testFsWatchCantFail(&loop, &result);
defer cancel handle;
loop.run();
return result;
}
async fn testFsWatchCantFail(loop: *event.Loop, result: *(error!void)) void {
result.* = await async testFsWatch(loop) catch unreachable;
}
async fn testFsWatch(loop: *event.Loop) !void {
const file_path = try os.path.join(loop.allocator, test_tmp_dir, "file.txt");
defer loop.allocator.free(file_path);
const contents =
\\line 1
\\line 2
;
// first just write then read the file
try await try async writeFile(loop, file_path, contents);
const read_contents = try await try async readFile(loop, file_path, 1024 * 1024);
assert(mem.eql(u8, read_contents, contents));
}

View File

@ -2,10 +2,12 @@ const std = @import("../index.zig");
const builtin = @import("builtin");
const assert = std.debug.assert;
const mem = std.mem;
const posix = std.os.posix;
const windows = std.os.windows;
const AtomicRmwOp = builtin.AtomicRmwOp;
const AtomicOrder = builtin.AtomicOrder;
const fs = std.event.fs;
const os = std.os;
const posix = os.posix;
const windows = os.windows;
pub const Loop = struct {
allocator: *mem.Allocator,
@ -13,7 +15,7 @@ pub const Loop = struct {
os_data: OsData,
final_resume_node: ResumeNode,
pending_event_count: usize,
extra_threads: []*std.os.Thread,
extra_threads: []*os.Thread,
// pre-allocated eventfds. all permanently active.
// this is how we send promises to be resumed on other threads.
@ -65,7 +67,7 @@ pub const Loop = struct {
/// TODO copy elision / named return values so that the threads referencing *Loop
/// have the correct pointer value.
pub fn initMultiThreaded(self: *Loop, allocator: *mem.Allocator) !void {
const core_count = try std.os.cpuCount(allocator);
const core_count = try os.cpuCount(allocator);
return self.initInternal(allocator, core_count);
}
@ -92,7 +94,7 @@ pub const Loop = struct {
);
errdefer self.allocator.free(self.eventfd_resume_nodes);
self.extra_threads = try self.allocator.alloc(*std.os.Thread, extra_thread_count);
self.extra_threads = try self.allocator.alloc(*os.Thread, extra_thread_count);
errdefer self.allocator.free(self.extra_threads);
try self.initOsData(extra_thread_count);
@ -104,17 +106,34 @@ pub const Loop = struct {
self.allocator.free(self.extra_threads);
}
const InitOsDataError = std.os.LinuxEpollCreateError || mem.Allocator.Error || std.os.LinuxEventFdError ||
std.os.SpawnThreadError || std.os.LinuxEpollCtlError || std.os.BsdKEventError ||
std.os.WindowsCreateIoCompletionPortError;
const InitOsDataError = os.LinuxEpollCreateError || mem.Allocator.Error || os.LinuxEventFdError ||
os.SpawnThreadError || os.LinuxEpollCtlError || os.BsdKEventError ||
os.WindowsCreateIoCompletionPortError;
const wakeup_bytes = []u8{0x1} ** 8;
fn initOsData(self: *Loop, extra_thread_count: usize) InitOsDataError!void {
switch (builtin.os) {
builtin.Os.linux => {
self.os_data.fs_queue = std.atomic.Queue(fs.Request).init();
self.os_data.fs_queue_len = 0;
// we need another thread for the file system because Linux does not have an async
// file system I/O API.
self.os_data.fs_end_request = fs.RequestNode{
.next = undefined,
.data = fs.Request{
.msg = fs.Request.Msg.End,
.finish = fs.Request.Finish.NoAction,
},
};
self.os_data.fs_thread = try os.spawnThread(self, linuxFsRun);
errdefer {
while (self.available_eventfd_resume_nodes.pop()) |node| std.os.close(node.data.eventfd);
self.linuxFsRequest(&self.os_data.fs_end_request);
self.os_data.fs_thread.wait();
}
errdefer {
while (self.available_eventfd_resume_nodes.pop()) |node| os.close(node.data.eventfd);
}
for (self.eventfd_resume_nodes) |*eventfd_node| {
eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{
@ -123,7 +142,7 @@ pub const Loop = struct {
.id = ResumeNode.Id.EventFd,
.handle = undefined,
},
.eventfd = try std.os.linuxEventFd(1, posix.EFD_CLOEXEC | posix.EFD_NONBLOCK),
.eventfd = try os.linuxEventFd(1, posix.EFD_CLOEXEC | posix.EFD_NONBLOCK),
.epoll_op = posix.EPOLL_CTL_ADD,
},
.next = undefined,
@ -131,17 +150,17 @@ pub const Loop = struct {
self.available_eventfd_resume_nodes.push(eventfd_node);
}
self.os_data.epollfd = try std.os.linuxEpollCreate(posix.EPOLL_CLOEXEC);
errdefer std.os.close(self.os_data.epollfd);
self.os_data.epollfd = try os.linuxEpollCreate(posix.EPOLL_CLOEXEC);
errdefer os.close(self.os_data.epollfd);
self.os_data.final_eventfd = try std.os.linuxEventFd(0, posix.EFD_CLOEXEC | posix.EFD_NONBLOCK);
errdefer std.os.close(self.os_data.final_eventfd);
self.os_data.final_eventfd = try os.linuxEventFd(0, posix.EFD_CLOEXEC | posix.EFD_NONBLOCK);
errdefer os.close(self.os_data.final_eventfd);
self.os_data.final_eventfd_event = posix.epoll_event{
.events = posix.EPOLLIN,
.data = posix.epoll_data{ .ptr = @ptrToInt(&self.final_resume_node) },
};
try std.os.linuxEpollCtl(
try os.linuxEpollCtl(
self.os_data.epollfd,
posix.EPOLL_CTL_ADD,
self.os_data.final_eventfd,
@ -151,19 +170,19 @@ pub const Loop = struct {
var extra_thread_index: usize = 0;
errdefer {
// writing 8 bytes to an eventfd cannot fail
std.os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable;
os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable;
while (extra_thread_index != 0) {
extra_thread_index -= 1;
self.extra_threads[extra_thread_index].wait();
}
}
while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) {
self.extra_threads[extra_thread_index] = try std.os.spawnThread(self, workerRun);
self.extra_threads[extra_thread_index] = try os.spawnThread(self, workerRun);
}
},
builtin.Os.macosx => {
self.os_data.kqfd = try std.os.bsdKQueue();
errdefer std.os.close(self.os_data.kqfd);
self.os_data.kqfd = try os.bsdKQueue();
errdefer os.close(self.os_data.kqfd);
self.os_data.kevents = try self.allocator.alloc(posix.Kevent, extra_thread_count);
errdefer self.allocator.free(self.os_data.kevents);
@ -191,7 +210,7 @@ pub const Loop = struct {
};
self.available_eventfd_resume_nodes.push(eventfd_node);
const kevent_array = (*[1]posix.Kevent)(&eventfd_node.data.kevent);
_ = try std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null);
_ = try os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null);
eventfd_node.data.kevent.flags = posix.EV_CLEAR | posix.EV_ENABLE;
eventfd_node.data.kevent.fflags = posix.NOTE_TRIGGER;
// this one is for waiting for events
@ -216,30 +235,30 @@ pub const Loop = struct {
.udata = @ptrToInt(&self.final_resume_node),
};
const kevent_array = (*[1]posix.Kevent)(&self.os_data.final_kevent);
_ = try std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null);
_ = try os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null);
self.os_data.final_kevent.flags = posix.EV_ENABLE;
self.os_data.final_kevent.fflags = posix.NOTE_TRIGGER;
var extra_thread_index: usize = 0;
errdefer {
_ = std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null) catch unreachable;
_ = os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null) catch unreachable;
while (extra_thread_index != 0) {
extra_thread_index -= 1;
self.extra_threads[extra_thread_index].wait();
}
}
while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) {
self.extra_threads[extra_thread_index] = try std.os.spawnThread(self, workerRun);
self.extra_threads[extra_thread_index] = try os.spawnThread(self, workerRun);
}
},
builtin.Os.windows => {
self.os_data.io_port = try std.os.windowsCreateIoCompletionPort(
self.os_data.io_port = try os.windowsCreateIoCompletionPort(
windows.INVALID_HANDLE_VALUE,
null,
undefined,
undefined,
);
errdefer std.os.close(self.os_data.io_port);
errdefer os.close(self.os_data.io_port);
for (self.eventfd_resume_nodes) |*eventfd_node, i| {
eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{
@ -262,7 +281,7 @@ pub const Loop = struct {
while (i < extra_thread_index) : (i += 1) {
while (true) {
const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1);
std.os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, @ptrToInt(&self.final_resume_node), overlapped) catch continue;
os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, @ptrToInt(&self.final_resume_node), overlapped) catch continue;
break;
}
}
@ -272,7 +291,7 @@ pub const Loop = struct {
}
}
while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) {
self.extra_threads[extra_thread_index] = try std.os.spawnThread(self, workerRun);
self.extra_threads[extra_thread_index] = try os.spawnThread(self, workerRun);
}
},
else => {},
@ -282,17 +301,17 @@ pub const Loop = struct {
fn deinitOsData(self: *Loop) void {
switch (builtin.os) {
builtin.Os.linux => {
std.os.close(self.os_data.final_eventfd);
while (self.available_eventfd_resume_nodes.pop()) |node| std.os.close(node.data.eventfd);
std.os.close(self.os_data.epollfd);
os.close(self.os_data.final_eventfd);
while (self.available_eventfd_resume_nodes.pop()) |node| os.close(node.data.eventfd);
os.close(self.os_data.epollfd);
self.allocator.free(self.eventfd_resume_nodes);
},
builtin.Os.macosx => {
self.allocator.free(self.os_data.kevents);
std.os.close(self.os_data.kqfd);
os.close(self.os_data.kqfd);
},
builtin.Os.windows => {
std.os.close(self.os_data.io_port);
os.close(self.os_data.io_port);
},
else => {},
}
@ -307,17 +326,17 @@ pub const Loop = struct {
try self.modFd(
fd,
posix.EPOLL_CTL_ADD,
std.os.linux.EPOLLIN | std.os.linux.EPOLLOUT | std.os.linux.EPOLLET,
os.linux.EPOLLIN | os.linux.EPOLLOUT | os.linux.EPOLLET,
resume_node,
);
}
pub fn modFd(self: *Loop, fd: i32, op: u32, events: u32, resume_node: *ResumeNode) !void {
var ev = std.os.linux.epoll_event{
var ev = os.linux.epoll_event{
.events = events,
.data = std.os.linux.epoll_data{ .ptr = @ptrToInt(resume_node) },
.data = os.linux.epoll_data{ .ptr = @ptrToInt(resume_node) },
};
try std.os.linuxEpollCtl(self.os_data.epollfd, op, fd, &ev);
try os.linuxEpollCtl(self.os_data.epollfd, op, fd, &ev);
}
pub fn removeFd(self: *Loop, fd: i32) void {
@ -326,7 +345,7 @@ pub const Loop = struct {
}
fn removeFdNoCounter(self: *Loop, fd: i32) void {
std.os.linuxEpollCtl(self.os_data.epollfd, std.os.linux.EPOLL_CTL_DEL, fd, undefined) catch {};
os.linuxEpollCtl(self.os_data.epollfd, os.linux.EPOLL_CTL_DEL, fd, undefined) catch {};
}
pub async fn waitFd(self: *Loop, fd: i32) !void {
@ -353,7 +372,7 @@ pub const Loop = struct {
builtin.Os.macosx => {
const kevent_array = (*[1]posix.Kevent)(&eventfd_node.kevent);
const eventlist = ([*]posix.Kevent)(undefined)[0..0];
_ = std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null) catch {
_ = os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null) catch {
self.next_tick_queue.unget(next_tick_node);
self.available_eventfd_resume_nodes.push(resume_stack_node);
return;
@ -361,8 +380,8 @@ pub const Loop = struct {
},
builtin.Os.linux => {
// the pending count is already accounted for
const epoll_events = posix.EPOLLONESHOT | std.os.linux.EPOLLIN | std.os.linux.EPOLLOUT |
std.os.linux.EPOLLET;
const epoll_events = posix.EPOLLONESHOT | os.linux.EPOLLIN | os.linux.EPOLLOUT |
os.linux.EPOLLET;
self.modFd(
eventfd_node.eventfd,
eventfd_node.epoll_op,
@ -379,7 +398,7 @@ pub const Loop = struct {
// the consumer code can decide whether to read the completion key.
// it has to do this for normal I/O, so we match that behavior here.
const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1);
std.os.windowsPostQueuedCompletionStatus(
os.windowsPostQueuedCompletionStatus(
self.os_data.io_port,
undefined,
eventfd_node.completion_key,
@ -406,6 +425,9 @@ pub const Loop = struct {
self.finishOneEvent(); // the reference we start with
self.workerRun();
self.os_data.fs_thread.wait();
for (self.extra_threads) |extra_thread| {
extra_thread.wait();
}
@ -453,15 +475,16 @@ pub const Loop = struct {
// cause all the threads to stop
switch (builtin.os) {
builtin.Os.linux => {
self.linuxFsRequest(&self.os_data.fs_end_request);
// writing 8 bytes to an eventfd cannot fail
std.os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable;
os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable;
return;
},
builtin.Os.macosx => {
const final_kevent = (*[1]posix.Kevent)(&self.os_data.final_kevent);
const eventlist = ([*]posix.Kevent)(undefined)[0..0];
// cannot fail because we already added it and this just enables it
_ = std.os.bsdKEvent(self.os_data.kqfd, final_kevent, eventlist, null) catch unreachable;
_ = os.bsdKEvent(self.os_data.kqfd, final_kevent, eventlist, null) catch unreachable;
return;
},
builtin.Os.windows => {
@ -469,7 +492,7 @@ pub const Loop = struct {
while (i < self.extra_threads.len + 1) : (i += 1) {
while (true) {
const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1);
std.os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, @ptrToInt(&self.final_resume_node), overlapped) catch continue;
os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, @ptrToInt(&self.final_resume_node), overlapped) catch continue;
break;
}
}
@ -492,8 +515,8 @@ pub const Loop = struct {
switch (builtin.os) {
builtin.Os.linux => {
// only process 1 event so we don't steal from other threads
var events: [1]std.os.linux.epoll_event = undefined;
const count = std.os.linuxEpollWait(self.os_data.epollfd, events[0..], -1);
var events: [1]os.linux.epoll_event = undefined;
const count = os.linuxEpollWait(self.os_data.epollfd, events[0..], -1);
for (events[0..count]) |ev| {
const resume_node = @intToPtr(*ResumeNode, ev.data.ptr);
const handle = resume_node.handle;
@ -516,7 +539,7 @@ pub const Loop = struct {
},
builtin.Os.macosx => {
var eventlist: [1]posix.Kevent = undefined;
const count = std.os.bsdKEvent(self.os_data.kqfd, self.os_data.kevents, eventlist[0..], null) catch unreachable;
const count = os.bsdKEvent(self.os_data.kqfd, self.os_data.kevents, eventlist[0..], null) catch unreachable;
for (eventlist[0..count]) |ev| {
const resume_node = @intToPtr(*ResumeNode, ev.udata);
const handle = resume_node.handle;
@ -541,9 +564,9 @@ pub const Loop = struct {
while (true) {
var nbytes: windows.DWORD = undefined;
var overlapped: ?*windows.OVERLAPPED = undefined;
switch (std.os.windowsGetQueuedCompletionStatus(self.os_data.io_port, &nbytes, &completion_key, &overlapped, windows.INFINITE)) {
std.os.WindowsWaitResult.Aborted => return,
std.os.WindowsWaitResult.Normal => {},
switch (os.windowsGetQueuedCompletionStatus(self.os_data.io_port, &nbytes, &completion_key, &overlapped, windows.INFINITE)) {
os.WindowsWaitResult.Aborted => return,
os.WindowsWaitResult.Normal => {},
}
if (overlapped != null) break;
}
@ -569,11 +592,73 @@ pub const Loop = struct {
}
}
fn linuxFsRequest(self: *Loop, request_node: *fs.RequestNode) void {
_ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
self.os_data.fs_queue.put(request_node);
_ = @atomicRmw(i32, &self.os_data.fs_queue_len, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); // let this wrap
const rc = os.linux.futex_wake(@ptrToInt(&self.os_data.fs_queue_len), os.linux.FUTEX_WAKE, 1);
switch (os.linux.getErrno(rc)) {
0 => {},
posix.EINVAL => unreachable,
else => unreachable,
}
}
fn linuxFsRun(self: *Loop) void {
var processed_count: i32 = 0; // we let this wrap
while (true) {
while (self.os_data.fs_queue.get()) |node| {
processed_count +%= 1;
switch (node.data.msg) {
@TagType(fs.Request.Msg).PWriteV => @panic("TODO"),
@TagType(fs.Request.Msg).PReadV => |*msg| {
msg.result = os.posix_preadv(msg.fd, msg.iov.ptr, msg.iov.len, msg.offset);
},
@TagType(fs.Request.Msg).OpenRead => |*msg| {
const flags = posix.O_LARGEFILE | posix.O_RDONLY;
msg.result = os.posixOpenC(msg.path.ptr, flags, 0);
},
@TagType(fs.Request.Msg).Close => |*msg| os.close(msg.fd),
@TagType(fs.Request.Msg).WriteFile => |*msg| blk: {
const flags = posix.O_LARGEFILE | posix.O_WRONLY | posix.O_CREAT |
posix.O_CLOEXEC | posix.O_TRUNC;
const fd = os.posixOpenC(msg.path.ptr, flags, msg.mode) catch |err| {
msg.result = err;
break :blk;
};
defer os.close(fd);
msg.result = os.posixWrite(fd, msg.contents);
},
@TagType(fs.Request.Msg).End => return,
}
switch (node.data.finish) {
@TagType(fs.Request.Finish).TickNode => |*tick_node| self.onNextTick(tick_node),
@TagType(fs.Request.Finish).DeallocCloseOperation => |close_op| {
self.allocator.destroy(close_op);
},
@TagType(fs.Request.Finish).NoAction => {},
}
self.finishOneEvent();
}
const rc = os.linux.futex_wait(@ptrToInt(&self.os_data.fs_queue_len), os.linux.FUTEX_WAIT, processed_count, null);
switch (os.linux.getErrno(rc)) {
0 => continue,
posix.EINTR => continue,
posix.EAGAIN => continue,
else => unreachable,
}
}
}
const OsData = switch (builtin.os) {
builtin.Os.linux => struct {
epollfd: i32,
final_eventfd: i32,
final_eventfd_event: std.os.linux.epoll_event,
final_eventfd_event: os.linux.epoll_event,
fs_thread: *os.Thread,
fs_queue_len: i32, // we let this wrap
fs_queue: std.atomic.Queue(fs.Request),
fs_end_request: fs.RequestNode,
},
builtin.Os.macosx => MacOsData,
builtin.Os.windows => struct {

View File

@ -415,13 +415,12 @@ pub fn PeekStream(comptime buffer_size: usize, comptime InStreamError: type) typ
self.at_end = (read < left);
return pos + read;
}
};
}
pub const SliceInStream = struct {
const Self = this;
pub const Error = error { };
pub const Error = error{};
pub const Stream = InStream(Error);
pub stream: Stream,
@ -481,13 +480,12 @@ pub const SliceOutStream = struct {
assert(self.pos <= self.slice.len);
const n =
if (self.pos + bytes.len <= self.slice.len)
bytes.len
else
self.slice.len - self.pos;
const n = if (self.pos + bytes.len <= self.slice.len)
bytes.len
else
self.slice.len - self.pos;
std.mem.copy(u8, self.slice[self.pos..self.pos + n], bytes[0..n]);
std.mem.copy(u8, self.slice[self.pos .. self.pos + n], bytes[0..n]);
self.pos += n;
if (n < bytes.len) {
@ -586,7 +584,7 @@ pub const BufferedAtomicFile = struct {
});
errdefer allocator.destroy(self);
self.atomic_file = try os.AtomicFile.init(allocator, dest_path, os.default_file_mode);
self.atomic_file = try os.AtomicFile.init(allocator, dest_path, os.File.default_mode);
errdefer self.atomic_file.deinit();
self.file_stream = FileOutStream.init(&self.atomic_file.file);

View File

@ -15,10 +15,21 @@ pub const File = struct {
/// The OS-specific file descriptor or file handle.
handle: os.FileHandle,
pub const Mode = switch (builtin.os) {
Os.windows => void,
else => u32,
};
pub const default_mode = switch (builtin.os) {
Os.windows => {},
else => 0o666,
};
pub const OpenError = os.WindowsOpenError || os.PosixOpenError;
/// `path` needs to be copied in memory to add a null terminating byte, hence the allocator.
/// Call close to clean up.
/// TODO deprecated, just use open
pub fn openRead(allocator: *mem.Allocator, path: []const u8) OpenError!File {
if (is_posix) {
const flags = posix.O_LARGEFILE | posix.O_RDONLY;
@ -39,16 +50,18 @@ pub const File = struct {
}
}
/// Calls `openWriteMode` with os.default_file_mode for the mode.
/// Calls `openWriteMode` with os.File.default_mode for the mode.
/// TODO deprecated, just use open
pub fn openWrite(allocator: *mem.Allocator, path: []const u8) OpenError!File {
return openWriteMode(allocator, path, os.default_file_mode);
return openWriteMode(allocator, path, os.File.default_mode);
}
/// If the path does not exist it will be created.
/// If a file already exists in the destination it will be truncated.
/// `path` needs to be copied in memory to add a null terminating byte, hence the allocator.
/// Call close to clean up.
pub fn openWriteMode(allocator: *mem.Allocator, path: []const u8, file_mode: os.FileMode) OpenError!File {
/// TODO deprecated, just use open
pub fn openWriteMode(allocator: *mem.Allocator, path: []const u8, file_mode: Mode) OpenError!File {
if (is_posix) {
const flags = posix.O_LARGEFILE | posix.O_WRONLY | posix.O_CREAT | posix.O_CLOEXEC | posix.O_TRUNC;
const fd = try os.posixOpen(allocator, path, flags, file_mode);
@ -72,7 +85,8 @@ pub const File = struct {
/// If a file already exists in the destination this returns OpenError.PathAlreadyExists
/// `path` needs to be copied in memory to add a null terminating byte, hence the allocator.
/// Call close to clean up.
pub fn openWriteNoClobber(allocator: *mem.Allocator, path: []const u8, file_mode: os.FileMode) OpenError!File {
/// TODO deprecated, just use open
pub fn openWriteNoClobber(allocator: *mem.Allocator, path: []const u8, file_mode: Mode) OpenError!File {
if (is_posix) {
const flags = posix.O_LARGEFILE | posix.O_WRONLY | posix.O_CREAT | posix.O_CLOEXEC | posix.O_EXCL;
const fd = try os.posixOpen(allocator, path, flags, file_mode);
@ -282,7 +296,7 @@ pub const File = struct {
Unexpected,
};
pub fn mode(self: *File) ModeError!os.FileMode {
pub fn mode(self: *File) ModeError!Mode {
if (is_posix) {
var stat: posix.Stat = undefined;
const err = posix.getErrno(posix.fstat(self.handle, &stat));
@ -296,7 +310,7 @@ pub const File = struct {
// TODO: we should be able to cast u16 to ModeError!u32, making this
// explicit cast not necessary
return os.FileMode(stat.mode);
return Mode(stat.mode);
} else if (is_windows) {
return {};
} else {
@ -305,9 +319,11 @@ pub const File = struct {
}
pub const ReadError = error{
BadFd,
Io,
FileClosed,
InputOutput,
IsDir,
WouldBlock,
SystemResources,
Unexpected,
};
@ -323,9 +339,12 @@ pub const File = struct {
posix.EINTR => continue,
posix.EINVAL => unreachable,
posix.EFAULT => unreachable,
posix.EBADF => return error.BadFd,
posix.EIO => return error.Io,
posix.EAGAIN => return error.WouldBlock,
posix.EBADF => return error.FileClosed,
posix.EIO => return error.InputOutput,
posix.EISDIR => return error.IsDir,
posix.ENOBUFS => return error.SystemResources,
posix.ENOMEM => return error.SystemResources,
else => return os.unexpectedErrorPosix(read_err),
}
}

View File

@ -38,16 +38,6 @@ pub const path = @import("path.zig");
pub const File = @import("file.zig").File;
pub const time = @import("time.zig");
pub const FileMode = switch (builtin.os) {
Os.windows => void,
else => u32,
};
pub const default_file_mode = switch (builtin.os) {
Os.windows => {},
else => 0o666,
};
pub const page_size = 4 * 1024;
pub const UserInfo = @import("get_user_id.zig").UserInfo;
@ -256,6 +246,26 @@ pub fn posixRead(fd: i32, buf: []u8) !void {
}
}
pub fn posix_preadv(fd: i32, iov: [*]const posix.iovec, count: usize, offset: u64) !usize {
while (true) {
const rc = posix.preadv(fd, iov, count, offset);
const err = posix.getErrno(rc);
switch (err) {
0 => return rc,
posix.EINTR => continue,
posix.EINVAL => unreachable,
posix.EFAULT => unreachable,
posix.EAGAIN => return error.WouldBlock,
posix.EBADF => return error.FileClosed,
posix.EIO => return error.InputOutput,
posix.EISDIR => return error.IsDir,
posix.ENOBUFS => return error.SystemResources,
posix.ENOMEM => return error.SystemResources,
else => return unexpectedErrorPosix(err),
}
}
}
pub const PosixWriteError = error{
WouldBlock,
FileClosed,
@ -853,7 +863,7 @@ pub fn copyFile(allocator: *Allocator, source_path: []const u8, dest_path: []con
/// Guaranteed to be atomic. However until https://patchwork.kernel.org/patch/9636735/ is
/// merged and readily available,
/// there is a possibility of power loss or application termination leaving temporary files present
pub fn copyFileMode(allocator: *Allocator, source_path: []const u8, dest_path: []const u8, mode: FileMode) !void {
pub fn copyFileMode(allocator: *Allocator, source_path: []const u8, dest_path: []const u8, mode: File.Mode) !void {
var in_file = try os.File.openRead(allocator, source_path);
defer in_file.close();
@ -879,7 +889,7 @@ pub const AtomicFile = struct {
/// dest_path must remain valid for the lifetime of AtomicFile
/// call finish to atomically replace dest_path with contents
pub fn init(allocator: *Allocator, dest_path: []const u8, mode: FileMode) !AtomicFile {
pub fn init(allocator: *Allocator, dest_path: []const u8, mode: File.Mode) !AtomicFile {
const dirname = os.path.dirname(dest_path);
var rand_buf: [12]u8 = undefined;

View File

@ -692,6 +692,10 @@ pub fn futex_wait(uaddr: usize, futex_op: u32, val: i32, timeout: ?*timespec) us
return syscall4(SYS_futex, uaddr, futex_op, @bitCast(u32, val), @ptrToInt(timeout));
}
pub fn futex_wake(uaddr: usize, futex_op: u32, val: i32) usize {
return syscall3(SYS_futex, uaddr, futex_op, @bitCast(u32, val));
}
pub fn getcwd(buf: [*]u8, size: usize) usize {
return syscall2(SYS_getcwd, @ptrToInt(buf), size);
}
@ -742,6 +746,10 @@ pub fn read(fd: i32, buf: [*]u8, count: usize) usize {
return syscall3(SYS_read, @intCast(usize, fd), @ptrToInt(buf), count);
}
pub fn preadv(fd: i32, iov: [*]const iovec, count: usize, offset: u64) usize {
return syscall4(SYS_preadv, @intCast(usize, fd), @ptrToInt(iov), count, offset);
}
// TODO https://github.com/ziglang/zig/issues/265
pub fn rmdir(path: [*]const u8) usize {
return syscall1(SYS_rmdir, @ptrToInt(path));