std lib: flesh out the async I/O streaming API a bit

This commit is contained in:
Andrew Kelley 2018-10-01 10:53:39 -04:00
parent 9d4eaf1e07
commit d1ec8377d1
No known key found for this signature in database
GPG Key ID: 4E7CD66038A4D47C
7 changed files with 187 additions and 71 deletions

View File

@ -2710,6 +2710,11 @@ static Error resolve_struct_alignment(CodeGen *g, ZigType *struct_type) {
// be resolving ResolveStatusZeroBitsKnown // be resolving ResolveStatusZeroBitsKnown
assert(field->type_entry != nullptr); assert(field->type_entry != nullptr);
if (type_is_invalid(field->type_entry)) {
struct_type->data.structure.resolve_status = ResolveStatusInvalid;
break;
}
if (!type_has_bits(field->type_entry)) if (!type_has_bits(field->type_entry))
continue; continue;

View File

@ -29,5 +29,9 @@ pub fn Int(comptime T: type) type {
pub fn xchg(self: *Self, new_value: T) T { pub fn xchg(self: *Self, new_value: T) T {
return @atomicRmw(T, &self.unprotected_value, builtin.AtomicRmwOp.Xchg, new_value, AtomicOrder.SeqCst); return @atomicRmw(T, &self.unprotected_value, builtin.AtomicRmwOp.Xchg, new_value, AtomicOrder.SeqCst);
} }
pub fn fetchAdd(self: *Self, op: T) T {
return @atomicRmw(T, &self.unprotected_value, builtin.AtomicRmwOp.Add, op, AtomicOrder.SeqCst);
}
}; };
} }

View File

@ -30,20 +30,20 @@ pub const Request = struct {
pub const PWriteV = struct { pub const PWriteV = struct {
fd: os.FileHandle, fd: os.FileHandle,
iov: []os.posix.iovec_const, iov: []const os.posix.iovec_const,
offset: usize, offset: usize,
result: Error!void, result: Error!void,
pub const Error = os.File.WriteError; pub const Error = os.PosixWriteError;
}; };
pub const PReadV = struct { pub const PReadV = struct {
fd: os.FileHandle, fd: os.FileHandle,
iov: []os.posix.iovec, iov: []const os.posix.iovec,
offset: usize, offset: usize,
result: Error!usize, result: Error!usize,
pub const Error = os.File.ReadError; pub const Error = os.PosixReadError;
}; };
pub const Open = struct { pub const Open = struct {
@ -72,28 +72,47 @@ pub const Request = struct {
}; };
}; };
pub const PWriteVError = error{OutOfMemory} || os.File.WriteError;
/// data - just the inner references - must live until pwritev promise completes. /// data - just the inner references - must live until pwritev promise completes.
pub async fn pwritev(loop: *Loop, fd: os.FileHandle, data: []const []const u8, offset: usize) !void { pub async fn pwritev(loop: *Loop, fd: os.FileHandle, data: []const []const u8, offset: usize) PWriteVError!void {
// workaround for https://github.com/ziglang/zig/issues/1194
suspend {
resume @handle();
}
switch (builtin.os) { switch (builtin.os) {
builtin.Os.macosx, builtin.Os.macosx,
builtin.Os.linux, builtin.Os.linux,
=> return await (async pwritevPosix(loop, fd, data, offset) catch unreachable), => {
builtin.Os.windows => return await (async pwritevWindows(loop, fd, data, offset) catch unreachable), const iovecs = try loop.allocator.alloc(os.posix.iovec_const, data.len);
defer loop.allocator.free(iovecs);
for (data) |buf, i| {
iovecs[i] = os.posix.iovec_const{
.iov_base = buf.ptr,
.iov_len = buf.len,
};
}
return await (async pwritevPosix(loop, fd, iovecs, offset) catch unreachable);
},
builtin.Os.windows => {
const data_copy = try std.mem.dupe(loop.allocator, []const u8, data);
defer loop.allocator.free(data_copy);
return await (async pwritevWindows(loop, fd, data, offset) catch unreachable);
},
else => @compileError("Unsupported OS"), else => @compileError("Unsupported OS"),
} }
} }
/// data - just the inner references - must live until pwritev promise completes. /// data must outlive the returned promise
pub async fn pwritevWindows(loop: *Loop, fd: os.FileHandle, data: []const []const u8, offset: usize) !void { pub async fn pwritevWindows(loop: *Loop, fd: os.FileHandle, data: []const []const u8, offset: usize) os.WindowsWriteError!void {
if (data.len == 0) return; if (data.len == 0) return;
if (data.len == 1) return await (async pwriteWindows(loop, fd, data[0], offset) catch unreachable); if (data.len == 1) return await (async pwriteWindows(loop, fd, data[0], offset) catch unreachable);
const data_copy = try std.mem.dupe(loop.allocator, []const u8, data);
defer loop.allocator.free(data_copy);
// TODO do these in parallel // TODO do these in parallel
var off = offset; var off = offset;
for (data_copy) |buf| { for (data) |buf| {
try await (async pwriteWindows(loop, fd, buf, off) catch unreachable); try await (async pwriteWindows(loop, fd, buf, off) catch unreachable);
off += buf.len; off += buf.len;
} }
@ -144,23 +163,18 @@ pub async fn pwriteWindows(loop: *Loop, fd: os.FileHandle, data: []const u8, off
} }
} }
/// data - just the inner references - must live until pwritev promise completes. /// iovecs must live until pwritev promise completes.
pub async fn pwritevPosix(loop: *Loop, fd: os.FileHandle, data: []const []const u8, offset: usize) !void { pub async fn pwritevPosix(
loop: *Loop,
fd: os.FileHandle,
iovecs: []const posix.iovec_const,
offset: usize,
) os.PosixWriteError!void {
// workaround for https://github.com/ziglang/zig/issues/1194 // workaround for https://github.com/ziglang/zig/issues/1194
suspend { suspend {
resume @handle(); resume @handle();
} }
const iovecs = try loop.allocator.alloc(os.posix.iovec_const, data.len);
defer loop.allocator.free(iovecs);
for (data) |buf, i| {
iovecs[i] = os.posix.iovec_const{
.iov_base = buf.ptr,
.iov_len = buf.len,
};
}
var req_node = RequestNode{ var req_node = RequestNode{
.prev = null, .prev = null,
.next = null, .next = null,
@ -192,38 +206,59 @@ pub async fn pwritevPosix(loop: *Loop, fd: os.FileHandle, data: []const []const
return req_node.data.msg.PWriteV.result; return req_node.data.msg.PWriteV.result;
} }
pub const PReadVError = error{OutOfMemory} || os.File.ReadError;
/// data - just the inner references - must live until preadv promise completes. /// data - just the inner references - must live until preadv promise completes.
pub async fn preadv(loop: *Loop, fd: os.FileHandle, data: []const []u8, offset: usize) !usize { pub async fn preadv(loop: *Loop, fd: os.FileHandle, data: []const []u8, offset: usize) PReadVError!usize {
// workaround for https://github.com/ziglang/zig/issues/1194
suspend {
resume @handle();
}
assert(data.len != 0); assert(data.len != 0);
switch (builtin.os) { switch (builtin.os) {
builtin.Os.macosx, builtin.Os.macosx,
builtin.Os.linux, builtin.Os.linux,
=> return await (async preadvPosix(loop, fd, data, offset) catch unreachable), => {
builtin.Os.windows => return await (async preadvWindows(loop, fd, data, offset) catch unreachable), const iovecs = try loop.allocator.alloc(os.posix.iovec, data.len);
defer loop.allocator.free(iovecs);
for (data) |buf, i| {
iovecs[i] = os.posix.iovec{
.iov_base = buf.ptr,
.iov_len = buf.len,
};
}
return await (async preadvPosix(loop, fd, iovecs, offset) catch unreachable);
},
builtin.Os.windows => {
const data_copy = try std.mem.dupe(loop.allocator, []u8, data);
defer loop.allocator.free(data_copy);
return await (async preadvWindows(loop, fd, data_copy, offset) catch unreachable);
},
else => @compileError("Unsupported OS"), else => @compileError("Unsupported OS"),
} }
} }
pub async fn preadvWindows(loop: *Loop, fd: os.FileHandle, data: []const []u8, offset: u64) !usize { /// data must outlive the returned promise
pub async fn preadvWindows(loop: *Loop, fd: os.FileHandle, data: []const []u8, offset: u64) os.WindowsReadError!usize {
assert(data.len != 0); assert(data.len != 0);
if (data.len == 1) return await (async preadWindows(loop, fd, data[0], offset) catch unreachable); if (data.len == 1) return await (async preadWindows(loop, fd, data[0], offset) catch unreachable);
const data_copy = try std.mem.dupe(loop.allocator, []u8, data);
defer loop.allocator.free(data_copy);
// TODO do these in parallel? // TODO do these in parallel?
var off: usize = 0; var off: usize = 0;
var iov_i: usize = 0; var iov_i: usize = 0;
var inner_off: usize = 0; var inner_off: usize = 0;
while (true) { while (true) {
const v = data_copy[iov_i]; const v = data[iov_i];
const amt_read = try await (async preadWindows(loop, fd, v[inner_off .. v.len - inner_off], offset + off) catch unreachable); const amt_read = try await (async preadWindows(loop, fd, v[inner_off .. v.len - inner_off], offset + off) catch unreachable);
off += amt_read; off += amt_read;
inner_off += amt_read; inner_off += amt_read;
if (inner_off == v.len) { if (inner_off == v.len) {
iov_i += 1; iov_i += 1;
inner_off = 0; inner_off = 0;
if (iov_i == data_copy.len) { if (iov_i == data.len) {
return off; return off;
} }
} }
@ -275,23 +310,18 @@ pub async fn preadWindows(loop: *Loop, fd: os.FileHandle, data: []u8, offset: u6
return usize(bytes_transferred); return usize(bytes_transferred);
} }
/// data - just the inner references - must live until preadv promise completes. /// iovecs must live until preadv promise completes
pub async fn preadvPosix(loop: *Loop, fd: os.FileHandle, data: []const []u8, offset: usize) !usize { pub async fn preadvPosix(
loop: *Loop,
fd: os.FileHandle,
iovecs: []const posix.iovec,
offset: usize,
) os.PosixReadError!usize {
// workaround for https://github.com/ziglang/zig/issues/1194 // workaround for https://github.com/ziglang/zig/issues/1194
suspend { suspend {
resume @handle(); resume @handle();
} }
const iovecs = try loop.allocator.alloc(os.posix.iovec, data.len);
defer loop.allocator.free(iovecs);
for (data) |buf, i| {
iovecs[i] = os.posix.iovec{
.iov_base = buf.ptr,
.iov_len = buf.len,
};
}
var req_node = RequestNode{ var req_node = RequestNode{
.prev = null, .prev = null,
.next = null, .next = null,
@ -1339,3 +1369,55 @@ async fn testFsWatch(loop: *Loop) !void {
// TODO test deleting the file and then re-adding it. we should get events for both // TODO test deleting the file and then re-adding it. we should get events for both
} }
pub const OutStream = struct {
fd: os.FileHandle,
stream: Stream,
loop: *Loop,
offset: usize,
pub const Error = os.File.WriteError;
pub const Stream = event.io.OutStream(Error);
pub fn init(loop: *Loop, fd: os.FileHandle, offset: usize) OutStream {
return OutStream{
.fd = fd,
.loop = loop,
.offset = offset,
.stream = Stream{ .writeFn = writeFn },
};
}
async<*mem.Allocator> fn writeFn(out_stream: *Stream, bytes: []const u8) Error!void {
const self = @fieldParentPtr(OutStream, "stream", out_stream);
const offset = self.offset;
self.offset += bytes.len;
return await (async pwritev(self.loop, self.fd, [][]const u8{bytes}, offset) catch unreachable);
}
};
pub const InStream = struct {
fd: os.FileHandle,
stream: Stream,
loop: *Loop,
offset: usize,
pub const Error = PReadVError; // TODO make this not have OutOfMemory
pub const Stream = event.io.InStream(Error);
pub fn init(loop: *Loop, fd: os.FileHandle, offset: usize) InStream {
return InStream{
.fd = fd,
.loop = loop,
.offset = offset,
.stream = Stream{ .readFn = readFn },
};
}
async<*mem.Allocator> fn readFn(in_stream: *Stream, bytes: []u8) Error!usize {
const self = @fieldParentPtr(InStream, "stream", in_stream);
const amt = try await (async preadv(self.loop, self.fd, [][]u8{bytes}, self.offset) catch unreachable);
self.offset += amt;
return amt;
}
};

View File

@ -2,6 +2,7 @@ const std = @import("../index.zig");
const builtin = @import("builtin"); const builtin = @import("builtin");
const Allocator = std.mem.Allocator; const Allocator = std.mem.Allocator;
const assert = std.debug.assert; const assert = std.debug.assert;
const mem = std.mem;
pub fn InStream(comptime ReadError: type) type { pub fn InStream(comptime ReadError: type) type {
return struct { return struct {
@ -20,6 +21,20 @@ pub fn InStream(comptime ReadError: type) type {
return await (async self.readFn(self, buffer) catch unreachable); return await (async self.readFn(self, buffer) catch unreachable);
} }
pub async fn readIntLe(self: *Self, comptime T: type) !T {
return await (async self.readInt(builtin.Endian.Little, T) catch unreachable);
}
pub async fn readIntBe(self: *Self, comptime T: type) !T {
return await (async self.readInt(builtin.Endian.Big, T) catch unreachable);
}
pub async fn readInt(self: *Self, endian: builtin.Endian, comptime T: type) !T {
var bytes: [@sizeOf(T)]u8 = undefined;
try await (async self.readFull(bytes[0..]) catch unreachable);
return mem.readInt(bytes, T, endian);
}
/// Same as `read` but end of stream returns `error.EndOfStream`. /// Same as `read` but end of stream returns `error.EndOfStream`.
pub async fn readFull(self: *Self, buf: []u8) !void { pub async fn readFull(self: *Self, buf: []u8) !void {
var index: usize = 0; var index: usize = 0;

View File

@ -365,14 +365,7 @@ pub const File = struct {
} }
} }
pub const ReadError = error{ pub const ReadError = os.WindowsReadError || os.PosixReadError;
FileClosed,
InputOutput,
IsDir,
SystemResources,
Unexpected,
};
pub fn read(self: File, buffer: []u8) ReadError!usize { pub fn read(self: File, buffer: []u8) ReadError!usize {
if (is_posix) { if (is_posix) {
@ -386,7 +379,7 @@ pub const File = struct {
posix.EINVAL => unreachable, posix.EINVAL => unreachable,
posix.EFAULT => unreachable, posix.EFAULT => unreachable,
posix.EAGAIN => unreachable, posix.EAGAIN => unreachable,
posix.EBADF => return error.FileClosed, posix.EBADF => unreachable, // always a race condition
posix.EIO => return error.InputOutput, posix.EIO => return error.InputOutput,
posix.EISDIR => return error.IsDir, posix.EISDIR => return error.IsDir,
posix.ENOBUFS => return error.SystemResources, posix.ENOBUFS => return error.SystemResources,

View File

@ -72,6 +72,7 @@ pub const windowsGetQueuedCompletionStatus = windows_util.windowsGetQueuedComple
pub const WindowsWaitError = windows_util.WaitError; pub const WindowsWaitError = windows_util.WaitError;
pub const WindowsOpenError = windows_util.OpenError; pub const WindowsOpenError = windows_util.OpenError;
pub const WindowsWriteError = windows_util.WriteError; pub const WindowsWriteError = windows_util.WriteError;
pub const WindowsReadError = windows_util.ReadError;
pub const FileHandle = if (is_windows) windows.HANDLE else i32; pub const FileHandle = if (is_windows) windows.HANDLE else i32;
@ -227,6 +228,13 @@ pub fn close(handle: FileHandle) void {
} }
} }
pub const PosixReadError = error{
InputOutput,
SystemResources,
IsDir,
Unexpected,
};
/// Calls POSIX read, and keeps trying if it gets interrupted. /// Calls POSIX read, and keeps trying if it gets interrupted.
pub fn posixRead(fd: i32, buf: []u8) !void { pub fn posixRead(fd: i32, buf: []u8) !void {
// Linux can return EINVAL when read amount is > 0x7ffff000 // Linux can return EINVAL when read amount is > 0x7ffff000
@ -238,24 +246,27 @@ pub fn posixRead(fd: i32, buf: []u8) !void {
const want_to_read = math.min(buf.len - index, usize(max_buf_len)); const want_to_read = math.min(buf.len - index, usize(max_buf_len));
const rc = posix.read(fd, buf.ptr + index, want_to_read); const rc = posix.read(fd, buf.ptr + index, want_to_read);
const err = posix.getErrno(rc); const err = posix.getErrno(rc);
if (err > 0) { switch (err) {
return switch (err) { 0 => {
index += rc;
continue;
},
posix.EINTR => continue, posix.EINTR => continue,
posix.EINVAL, posix.EFAULT => unreachable, posix.EINVAL => unreachable,
posix.EFAULT => unreachable,
posix.EAGAIN => unreachable, posix.EAGAIN => unreachable,
posix.EBADF => unreachable, // always a race condition posix.EBADF => unreachable, // always a race condition
posix.EIO => error.InputOutput, posix.EIO => return error.InputOutput,
posix.EISDIR => error.IsDir, posix.EISDIR => return error.IsDir,
posix.ENOBUFS, posix.ENOMEM => error.SystemResources, posix.ENOBUFS => return error.SystemResources,
else => unexpectedErrorPosix(err), posix.ENOMEM => return error.SystemResources,
}; else => return unexpectedErrorPosix(err),
} }
index += rc;
} }
} }
/// Number of bytes read is returned. Upon reading end-of-file, zero is returned. /// Number of bytes read is returned. Upon reading end-of-file, zero is returned.
pub fn posix_preadv(fd: i32, iov: [*]const posix.iovec, count: usize, offset: u64) !usize { pub fn posix_preadv(fd: i32, iov: [*]const posix.iovec, count: usize, offset: u64) PosixReadError!usize {
switch (builtin.os) { switch (builtin.os) {
builtin.Os.macosx => { builtin.Os.macosx => {
// Darwin does not have preadv but it does have pread. // Darwin does not have preadv but it does have pread.
@ -284,7 +295,7 @@ pub fn posix_preadv(fd: i32, iov: [*]const posix.iovec, count: usize, offset: u6
posix.EINVAL => unreachable, posix.EINVAL => unreachable,
posix.EFAULT => unreachable, posix.EFAULT => unreachable,
posix.ESPIPE => unreachable, // fd is not seekable posix.ESPIPE => unreachable, // fd is not seekable
posix.EAGAIN => unreachable, // use posixAsyncPReadV for non blocking posix.EAGAIN => unreachable, // this function is not for non blocking
posix.EBADF => unreachable, // always a race condition posix.EBADF => unreachable, // always a race condition
posix.EIO => return error.InputOutput, posix.EIO => return error.InputOutput,
posix.EISDIR => return error.IsDir, posix.EISDIR => return error.IsDir,
@ -302,7 +313,7 @@ pub fn posix_preadv(fd: i32, iov: [*]const posix.iovec, count: usize, offset: u6
posix.EINTR => continue, posix.EINTR => continue,
posix.EINVAL => unreachable, posix.EINVAL => unreachable,
posix.EFAULT => unreachable, posix.EFAULT => unreachable,
posix.EAGAIN => unreachable, // use posixAsyncPReadV for non blocking posix.EAGAIN => unreachable, // don't call this function for non blocking
posix.EBADF => unreachable, // always a race condition posix.EBADF => unreachable, // always a race condition
posix.EIO => return error.InputOutput, posix.EIO => return error.InputOutput,
posix.EISDIR => return error.IsDir, posix.EISDIR => return error.IsDir,
@ -328,7 +339,7 @@ pub const PosixWriteError = error{
}; };
/// Calls POSIX write, and keeps trying if it gets interrupted. /// Calls POSIX write, and keeps trying if it gets interrupted.
pub fn posixWrite(fd: i32, bytes: []const u8) !void { pub fn posixWrite(fd: i32, bytes: []const u8) PosixWriteError!void {
// Linux can return EINVAL when write amount is > 0x7ffff000 // Linux can return EINVAL when write amount is > 0x7ffff000
// See https://github.com/ziglang/zig/pull/743#issuecomment-363165856 // See https://github.com/ziglang/zig/pull/743#issuecomment-363165856
const max_bytes_len = 0x7ffff000; const max_bytes_len = 0x7ffff000;

View File

@ -42,6 +42,12 @@ pub fn windowsClose(handle: windows.HANDLE) void {
assert(windows.CloseHandle(handle) != 0); assert(windows.CloseHandle(handle) != 0);
} }
pub const ReadError = error{
OperationAborted,
BrokenPipe,
Unexpected,
};
pub const WriteError = error{ pub const WriteError = error{
SystemResources, SystemResources,
OperationAborted, OperationAborted,