windows: std.fs functions support concurrent ops
when reading and writing the same file descriptors
This commit is contained in:
parent
0cfd019377
commit
178d69191b
@ -737,7 +737,7 @@ async fn fmtPath(fmt: *Fmt, file_path_ref: []const u8) FmtError!void {
|
|||||||
file_path,
|
file_path,
|
||||||
max_src_size,
|
max_src_size,
|
||||||
)) catch |err| switch (err) {
|
)) catch |err| switch (err) {
|
||||||
error.IsDir => {
|
error.IsDir, error.AccessDenied => {
|
||||||
// TODO make event based (and dir.next())
|
// TODO make event based (and dir.next())
|
||||||
var dir = try std.os.Dir.open(fmt.loop.allocator, file_path);
|
var dir = try std.os.Dir.open(fmt.loop.allocator, file_path);
|
||||||
defer dir.close();
|
defer dir.close();
|
||||||
|
@ -109,30 +109,28 @@ pub async fn pwriteWindows(loop: *Loop, fd: os.FileHandle, data: []const u8, off
|
|||||||
.base = Loop.ResumeNode{
|
.base = Loop.ResumeNode{
|
||||||
.id = Loop.ResumeNode.Id.Basic,
|
.id = Loop.ResumeNode.Id.Basic,
|
||||||
.handle = @handle(),
|
.handle = @handle(),
|
||||||
|
.overlapped = windows.OVERLAPPED{
|
||||||
|
.Internal = 0,
|
||||||
|
.InternalHigh = 0,
|
||||||
|
.Offset = @truncate(u32, offset),
|
||||||
|
.OffsetHigh = @truncate(u32, offset >> 32),
|
||||||
|
.hEvent = null,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
const completion_key = @ptrToInt(&resume_node.base);
|
// TODO only call create io completion port once per fd
|
||||||
// TODO support concurrent async ops on the file handle
|
_ = try os.windowsCreateIoCompletionPort(fd, loop.os_data.io_port, undefined, undefined);
|
||||||
// we can do this by ignoring completion key and using @fieldParentPtr with the *Overlapped
|
|
||||||
_ = try os.windowsCreateIoCompletionPort(fd, loop.os_data.io_port, completion_key, undefined);
|
|
||||||
var overlapped = windows.OVERLAPPED{
|
|
||||||
.Internal = 0,
|
|
||||||
.InternalHigh = 0,
|
|
||||||
.Offset = @truncate(u32, offset),
|
|
||||||
.OffsetHigh = @truncate(u32, offset >> 32),
|
|
||||||
.hEvent = null,
|
|
||||||
};
|
|
||||||
loop.beginOneEvent();
|
loop.beginOneEvent();
|
||||||
errdefer loop.finishOneEvent();
|
errdefer loop.finishOneEvent();
|
||||||
|
|
||||||
errdefer {
|
errdefer {
|
||||||
_ = windows.CancelIoEx(fd, &overlapped);
|
_ = windows.CancelIoEx(fd, &resume_node.base.overlapped);
|
||||||
}
|
}
|
||||||
suspend {
|
suspend {
|
||||||
_ = windows.WriteFile(fd, data.ptr, @intCast(windows.DWORD, data.len), null, &overlapped);
|
_ = windows.WriteFile(fd, data.ptr, @intCast(windows.DWORD, data.len), null, &resume_node.base.overlapped);
|
||||||
}
|
}
|
||||||
var bytes_transferred: windows.DWORD = undefined;
|
var bytes_transferred: windows.DWORD = undefined;
|
||||||
if (windows.GetOverlappedResult(fd, &overlapped, &bytes_transferred, windows.FALSE) == 0) {
|
if (windows.GetOverlappedResult(fd, &resume_node.base.overlapped, &bytes_transferred, windows.FALSE) == 0) {
|
||||||
const err = windows.GetLastError();
|
const err = windows.GetLastError();
|
||||||
return switch (err) {
|
return switch (err) {
|
||||||
windows.ERROR.IO_PENDING => unreachable,
|
windows.ERROR.IO_PENDING => unreachable,
|
||||||
@ -243,30 +241,28 @@ pub async fn preadWindows(loop: *Loop, fd: os.FileHandle, data: []u8, offset: u6
|
|||||||
.base = Loop.ResumeNode{
|
.base = Loop.ResumeNode{
|
||||||
.id = Loop.ResumeNode.Id.Basic,
|
.id = Loop.ResumeNode.Id.Basic,
|
||||||
.handle = @handle(),
|
.handle = @handle(),
|
||||||
|
.overlapped = windows.OVERLAPPED{
|
||||||
|
.Internal = 0,
|
||||||
|
.InternalHigh = 0,
|
||||||
|
.Offset = @truncate(u32, offset),
|
||||||
|
.OffsetHigh = @truncate(u32, offset >> 32),
|
||||||
|
.hEvent = null,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
const completion_key = @ptrToInt(&resume_node.base);
|
// TODO only call create io completion port once per fd
|
||||||
// TODO support concurrent async ops on the file handle
|
_ = try os.windowsCreateIoCompletionPort(fd, loop.os_data.io_port, undefined, undefined);
|
||||||
// we can do this by ignoring completion key and using @fieldParentPtr with the *Overlapped
|
|
||||||
_ = try os.windowsCreateIoCompletionPort(fd, loop.os_data.io_port, completion_key, undefined);
|
|
||||||
var overlapped = windows.OVERLAPPED{
|
|
||||||
.Internal = 0,
|
|
||||||
.InternalHigh = 0,
|
|
||||||
.Offset = @truncate(u32, offset),
|
|
||||||
.OffsetHigh = @truncate(u32, offset >> 32),
|
|
||||||
.hEvent = null,
|
|
||||||
};
|
|
||||||
loop.beginOneEvent();
|
loop.beginOneEvent();
|
||||||
errdefer loop.finishOneEvent();
|
errdefer loop.finishOneEvent();
|
||||||
|
|
||||||
errdefer {
|
errdefer {
|
||||||
_ = windows.CancelIoEx(fd, &overlapped);
|
_ = windows.CancelIoEx(fd, &resume_node.base.overlapped);
|
||||||
}
|
}
|
||||||
suspend {
|
suspend {
|
||||||
_ = windows.ReadFile(fd, data.ptr, @intCast(windows.DWORD, data.len), null, &overlapped);
|
_ = windows.ReadFile(fd, data.ptr, @intCast(windows.DWORD, data.len), null, &resume_node.base.overlapped);
|
||||||
}
|
}
|
||||||
var bytes_transferred: windows.DWORD = undefined;
|
var bytes_transferred: windows.DWORD = undefined;
|
||||||
if (windows.GetOverlappedResult(fd, &overlapped, &bytes_transferred, windows.FALSE) == 0) {
|
if (windows.GetOverlappedResult(fd, &resume_node.base.overlapped, &bytes_transferred, windows.FALSE) == 0) {
|
||||||
const err = windows.GetLastError();
|
const err = windows.GetLastError();
|
||||||
return switch (err) {
|
return switch (err) {
|
||||||
windows.ERROR.IO_PENDING => unreachable,
|
windows.ERROR.IO_PENDING => unreachable,
|
||||||
@ -1074,23 +1070,22 @@ pub fn Watch(comptime V: type) type {
|
|||||||
.base = Loop.ResumeNode{
|
.base = Loop.ResumeNode{
|
||||||
.id = Loop.ResumeNode.Id.Basic,
|
.id = Loop.ResumeNode.Id.Basic,
|
||||||
.handle = @handle(),
|
.handle = @handle(),
|
||||||
|
.overlapped = windows.OVERLAPPED{
|
||||||
|
.Internal = 0,
|
||||||
|
.InternalHigh = 0,
|
||||||
|
.Offset = 0,
|
||||||
|
.OffsetHigh = 0,
|
||||||
|
.hEvent = null,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
const completion_key = @ptrToInt(&resume_node.base);
|
|
||||||
var overlapped = windows.OVERLAPPED{
|
|
||||||
.Internal = 0,
|
|
||||||
.InternalHigh = 0,
|
|
||||||
.Offset = 0,
|
|
||||||
.OffsetHigh = 0,
|
|
||||||
.hEvent = null,
|
|
||||||
};
|
|
||||||
var event_buf: [4096]u8 align(@alignOf(windows.FILE_NOTIFY_INFORMATION)) = undefined;
|
var event_buf: [4096]u8 align(@alignOf(windows.FILE_NOTIFY_INFORMATION)) = undefined;
|
||||||
|
|
||||||
// TODO handle this error not in the channel but in the setup
|
// TODO handle this error not in the channel but in the setup
|
||||||
_ = os.windowsCreateIoCompletionPort(
|
_ = os.windowsCreateIoCompletionPort(
|
||||||
dir_handle,
|
dir_handle,
|
||||||
self.channel.loop.os_data.io_port,
|
self.channel.loop.os_data.io_port,
|
||||||
completion_key,
|
undefined,
|
||||||
undefined,
|
undefined,
|
||||||
) catch |err| {
|
) catch |err| {
|
||||||
await (async self.channel.put(err) catch unreachable);
|
await (async self.channel.put(err) catch unreachable);
|
||||||
@ -1103,7 +1098,7 @@ pub fn Watch(comptime V: type) type {
|
|||||||
self.channel.loop.beginOneEvent();
|
self.channel.loop.beginOneEvent();
|
||||||
errdefer self.channel.loop.finishOneEvent();
|
errdefer self.channel.loop.finishOneEvent();
|
||||||
errdefer {
|
errdefer {
|
||||||
_ = windows.CancelIoEx(dir_handle, &overlapped);
|
_ = windows.CancelIoEx(dir_handle, &resume_node.base.overlapped);
|
||||||
}
|
}
|
||||||
suspend {
|
suspend {
|
||||||
_ = windows.ReadDirectoryChangesW(
|
_ = windows.ReadDirectoryChangesW(
|
||||||
@ -1116,13 +1111,13 @@ pub fn Watch(comptime V: type) type {
|
|||||||
windows.FILE_NOTIFY_CHANGE_LAST_WRITE | windows.FILE_NOTIFY_CHANGE_LAST_ACCESS |
|
windows.FILE_NOTIFY_CHANGE_LAST_WRITE | windows.FILE_NOTIFY_CHANGE_LAST_ACCESS |
|
||||||
windows.FILE_NOTIFY_CHANGE_CREATION | windows.FILE_NOTIFY_CHANGE_SECURITY,
|
windows.FILE_NOTIFY_CHANGE_CREATION | windows.FILE_NOTIFY_CHANGE_SECURITY,
|
||||||
null, // number of bytes transferred (unused for async)
|
null, // number of bytes transferred (unused for async)
|
||||||
&overlapped,
|
&resume_node.base.overlapped,
|
||||||
null, // completion routine - unused because we use IOCP
|
null, // completion routine - unused because we use IOCP
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
var bytes_transferred: windows.DWORD = undefined;
|
var bytes_transferred: windows.DWORD = undefined;
|
||||||
if (windows.GetOverlappedResult(dir_handle, &overlapped, &bytes_transferred, windows.FALSE) == 0) {
|
if (windows.GetOverlappedResult(dir_handle, &resume_node.base.overlapped, &bytes_transferred, windows.FALSE) == 0) {
|
||||||
const errno = windows.GetLastError();
|
const errno = windows.GetLastError();
|
||||||
const err = switch (errno) {
|
const err = switch (errno) {
|
||||||
else => os.unexpectedErrorWindows(errno),
|
else => os.unexpectedErrorWindows(errno),
|
||||||
|
@ -27,6 +27,19 @@ pub const Loop = struct {
|
|||||||
pub const ResumeNode = struct {
|
pub const ResumeNode = struct {
|
||||||
id: Id,
|
id: Id,
|
||||||
handle: promise,
|
handle: promise,
|
||||||
|
overlapped: Overlapped,
|
||||||
|
|
||||||
|
const overlapped_init = switch (builtin.os) {
|
||||||
|
builtin.Os.windows => windows.OVERLAPPED{
|
||||||
|
.Internal = 0,
|
||||||
|
.InternalHigh = 0,
|
||||||
|
.Offset = 0,
|
||||||
|
.OffsetHigh = 0,
|
||||||
|
.hEvent = null,
|
||||||
|
},
|
||||||
|
else => {},
|
||||||
|
};
|
||||||
|
const Overlapped = @typeOf(overlapped_init);
|
||||||
|
|
||||||
pub const Id = enum {
|
pub const Id = enum {
|
||||||
Basic,
|
Basic,
|
||||||
@ -101,6 +114,7 @@ pub const Loop = struct {
|
|||||||
.final_resume_node = ResumeNode{
|
.final_resume_node = ResumeNode{
|
||||||
.id = ResumeNode.Id.Stop,
|
.id = ResumeNode.Id.Stop,
|
||||||
.handle = undefined,
|
.handle = undefined,
|
||||||
|
.overlapped = ResumeNode.overlapped_init,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
const extra_thread_count = thread_count - 1;
|
const extra_thread_count = thread_count - 1;
|
||||||
@ -153,6 +167,7 @@ pub const Loop = struct {
|
|||||||
.base = ResumeNode{
|
.base = ResumeNode{
|
||||||
.id = ResumeNode.Id.EventFd,
|
.id = ResumeNode.Id.EventFd,
|
||||||
.handle = undefined,
|
.handle = undefined,
|
||||||
|
.overlapped = ResumeNode.overlapped_init,
|
||||||
},
|
},
|
||||||
.eventfd = try os.linuxEventFd(1, posix.EFD_CLOEXEC | posix.EFD_NONBLOCK),
|
.eventfd = try os.linuxEventFd(1, posix.EFD_CLOEXEC | posix.EFD_NONBLOCK),
|
||||||
.epoll_op = posix.EPOLL_CTL_ADD,
|
.epoll_op = posix.EPOLL_CTL_ADD,
|
||||||
@ -225,6 +240,7 @@ pub const Loop = struct {
|
|||||||
.base = ResumeNode{
|
.base = ResumeNode{
|
||||||
.id = ResumeNode.Id.EventFd,
|
.id = ResumeNode.Id.EventFd,
|
||||||
.handle = undefined,
|
.handle = undefined,
|
||||||
|
.overlapped = ResumeNode.overlapped_init,
|
||||||
},
|
},
|
||||||
// this one is for sending events
|
// this one is for sending events
|
||||||
.kevent = posix.Kevent{
|
.kevent = posix.Kevent{
|
||||||
@ -311,6 +327,7 @@ pub const Loop = struct {
|
|||||||
.base = ResumeNode{
|
.base = ResumeNode{
|
||||||
.id = ResumeNode.Id.EventFd,
|
.id = ResumeNode.Id.EventFd,
|
||||||
.handle = undefined,
|
.handle = undefined,
|
||||||
|
.overlapped = ResumeNode.overlapped_init,
|
||||||
},
|
},
|
||||||
// this one is for sending events
|
// this one is for sending events
|
||||||
.completion_key = @ptrToInt(&eventfd_node.data.base),
|
.completion_key = @ptrToInt(&eventfd_node.data.base),
|
||||||
@ -325,8 +342,8 @@ pub const Loop = struct {
|
|||||||
var i: usize = 0;
|
var i: usize = 0;
|
||||||
while (i < extra_thread_index) : (i += 1) {
|
while (i < extra_thread_index) : (i += 1) {
|
||||||
while (true) {
|
while (true) {
|
||||||
const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1);
|
const overlapped = &self.final_resume_node.overlapped;
|
||||||
os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, @ptrToInt(&self.final_resume_node), overlapped) catch continue;
|
os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, undefined, overlapped) catch continue;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -413,6 +430,7 @@ pub const Loop = struct {
|
|||||||
.base = ResumeNode{
|
.base = ResumeNode{
|
||||||
.id = ResumeNode.Id.Basic,
|
.id = ResumeNode.Id.Basic,
|
||||||
.handle = @handle(),
|
.handle = @handle(),
|
||||||
|
.overlapped = ResumeNode.overlapped_init,
|
||||||
},
|
},
|
||||||
.kev = undefined,
|
.kev = undefined,
|
||||||
};
|
};
|
||||||
@ -489,15 +507,11 @@ pub const Loop = struct {
|
|||||||
};
|
};
|
||||||
},
|
},
|
||||||
builtin.Os.windows => {
|
builtin.Os.windows => {
|
||||||
// this value is never dereferenced but we need it to be non-null so that
|
|
||||||
// the consumer code can decide whether to read the completion key.
|
|
||||||
// it has to do this for normal I/O, so we match that behavior here.
|
|
||||||
const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1);
|
|
||||||
os.windowsPostQueuedCompletionStatus(
|
os.windowsPostQueuedCompletionStatus(
|
||||||
self.os_data.io_port,
|
self.os_data.io_port,
|
||||||
undefined,
|
undefined,
|
||||||
eventfd_node.completion_key,
|
undefined,
|
||||||
overlapped,
|
&eventfd_node.base.overlapped,
|
||||||
) catch {
|
) catch {
|
||||||
self.next_tick_queue.unget(next_tick_node);
|
self.next_tick_queue.unget(next_tick_node);
|
||||||
self.available_eventfd_resume_nodes.push(resume_stack_node);
|
self.available_eventfd_resume_nodes.push(resume_stack_node);
|
||||||
@ -606,8 +620,8 @@ pub const Loop = struct {
|
|||||||
var i: usize = 0;
|
var i: usize = 0;
|
||||||
while (i < self.extra_threads.len + 1) : (i += 1) {
|
while (i < self.extra_threads.len + 1) : (i += 1) {
|
||||||
while (true) {
|
while (true) {
|
||||||
const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1);
|
const overlapped = &self.final_resume_node.overlapped;
|
||||||
os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, @ptrToInt(&self.final_resume_node), overlapped) catch continue;
|
os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, undefined, overlapped) catch continue;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -680,17 +694,19 @@ pub const Loop = struct {
|
|||||||
},
|
},
|
||||||
builtin.Os.windows => {
|
builtin.Os.windows => {
|
||||||
var completion_key: usize = undefined;
|
var completion_key: usize = undefined;
|
||||||
while (true) {
|
const overlapped = while (true) {
|
||||||
var nbytes: windows.DWORD = undefined;
|
var nbytes: windows.DWORD = undefined;
|
||||||
var overlapped: ?*windows.OVERLAPPED = undefined;
|
var overlapped: ?*windows.OVERLAPPED = undefined;
|
||||||
switch (os.windowsGetQueuedCompletionStatus(self.os_data.io_port, &nbytes, &completion_key, &overlapped, windows.INFINITE)) {
|
switch (os.windowsGetQueuedCompletionStatus(self.os_data.io_port, &nbytes, &completion_key,
|
||||||
|
&overlapped, windows.INFINITE))
|
||||||
|
{
|
||||||
os.WindowsWaitResult.Aborted => return,
|
os.WindowsWaitResult.Aborted => return,
|
||||||
os.WindowsWaitResult.Normal => {},
|
os.WindowsWaitResult.Normal => {},
|
||||||
os.WindowsWaitResult.Cancelled => continue,
|
os.WindowsWaitResult.Cancelled => continue,
|
||||||
}
|
}
|
||||||
if (overlapped != null) break;
|
if (overlapped) |o| break o;
|
||||||
}
|
} else unreachable; // TODO else unreachable should not be necessary
|
||||||
const resume_node = @intToPtr(*ResumeNode, completion_key);
|
const resume_node = @fieldParentPtr(ResumeNode, "overlapped", overlapped);
|
||||||
const handle = resume_node.handle;
|
const handle = resume_node.handle;
|
||||||
const resume_node_id = resume_node.id;
|
const resume_node_id = resume_node.id;
|
||||||
switch (resume_node_id) {
|
switch (resume_node_id) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user