diff --git a/CMakeLists.txt b/CMakeLists.txt index 98b2ab9db..25c0dfa12 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -470,6 +470,7 @@ set(ZIG_STD_FILES "event/fs.zig" "event/future.zig" "event/group.zig" + "event/io.zig" "event/lock.zig" "event/locked.zig" "event/loop.zig" diff --git a/doc/docgen.zig b/doc/docgen.zig index cbfd67c74..2e2aa4b42 100644 --- a/doc/docgen.zig +++ b/doc/docgen.zig @@ -41,12 +41,12 @@ pub fn main() !void { var out_file = try os.File.openWrite(out_file_name); defer out_file.close(); - var file_in_stream = io.FileInStream.init(in_file); + var file_in_stream = in_file.inStream(); const input_file_bytes = try file_in_stream.stream.readAllAlloc(allocator, max_doc_file_size); - var file_out_stream = io.FileOutStream.init(out_file); - var buffered_out_stream = io.BufferedOutStream(io.FileOutStream.Error).init(&file_out_stream.stream); + var file_out_stream = out_file.outStream(); + var buffered_out_stream = io.BufferedOutStream(os.File.WriteError).init(&file_out_stream.stream); var tokenizer = Tokenizer.init(in_file_name, input_file_bytes); var toc = try genToc(allocator, &tokenizer); diff --git a/example/guess_number/main.zig b/example/guess_number/main.zig index 062f93e7f..ef0b9c08f 100644 --- a/example/guess_number/main.zig +++ b/example/guess_number/main.zig @@ -6,8 +6,7 @@ const os = std.os; pub fn main() !void { var stdout_file = try io.getStdOut(); - var stdout_file_stream = io.FileOutStream.init(stdout_file); - const stdout = &stdout_file_stream.stream; + const stdout = &stdout_file.outStream().stream; try stdout.print("Welcome to the Guess Number Game in Zig.\n"); diff --git a/src-self-hosted/errmsg.zig b/src-self-hosted/errmsg.zig index 6cf29b944..0e552fde7 100644 --- a/src-self-hosted/errmsg.zig +++ b/src-self-hosted/errmsg.zig @@ -278,7 +278,7 @@ pub const Msg = struct { Color.On => true, Color.Off => false, }; - var stream = &std.io.FileOutStream.init(file).stream; + var stream = &file.outStream().stream; return msg.printToStream(stream, color_on); } }; diff --git a/src-self-hosted/libc_installation.zig b/src-self-hosted/libc_installation.zig index 5e292ff8b..bbfc0125f 100644 --- a/src-self-hosted/libc_installation.zig +++ b/src-self-hosted/libc_installation.zig @@ -30,7 +30,7 @@ pub const LibCInstallation = struct { self: *LibCInstallation, allocator: *std.mem.Allocator, libc_file: []const u8, - stderr: *std.io.OutStream(std.io.FileOutStream.Error), + stderr: *std.io.OutStream(std.os.File.WriteError), ) !void { self.initEmpty(); @@ -100,7 +100,7 @@ pub const LibCInstallation = struct { } } - pub fn render(self: *const LibCInstallation, out: *std.io.OutStream(std.io.FileOutStream.Error)) !void { + pub fn render(self: *const LibCInstallation, out: *std.io.OutStream(std.os.File.WriteError)) !void { @setEvalBranchQuota(4000); try out.print( \\# The directory that contains `stdlib.h`. diff --git a/src-self-hosted/main.zig b/src-self-hosted/main.zig index 6aa5f067e..6d4f20e55 100644 --- a/src-self-hosted/main.zig +++ b/src-self-hosted/main.zig @@ -21,8 +21,8 @@ const errmsg = @import("errmsg.zig"); const LibCInstallation = @import("libc_installation.zig").LibCInstallation; var stderr_file: os.File = undefined; -var stderr: *io.OutStream(io.FileOutStream.Error) = undefined; -var stdout: *io.OutStream(io.FileOutStream.Error) = undefined; +var stderr: *io.OutStream(os.File.WriteError) = undefined; +var stdout: *io.OutStream(os.File.WriteError) = undefined; const max_src_size = 2 * 1024 * 1024 * 1024; // 2 GiB @@ -55,11 +55,11 @@ pub fn main() !void { const allocator = std.heap.c_allocator; var stdout_file = try std.io.getStdOut(); - var stdout_out_stream = std.io.FileOutStream.init(stdout_file); + var stdout_out_stream = stdout_file.outStream(); stdout = &stdout_out_stream.stream; stderr_file = try std.io.getStdErr(); - var stderr_out_stream = std.io.FileOutStream.init(stderr_file); + var stderr_out_stream = stderr_file.outStream(); stderr = &stderr_out_stream.stream; const args = try os.argsAlloc(allocator); @@ -619,7 +619,7 @@ fn cmdFmt(allocator: *Allocator, args: []const []const u8) !void { } var stdin_file = try io.getStdIn(); - var stdin = io.FileInStream.init(stdin_file); + var stdin = stdin_file.inStream(); const source_code = try stdin.stream.readAllAlloc(allocator, max_src_size); defer allocator.free(source_code); diff --git a/std/atomic/queue.zig b/std/atomic/queue.zig index eea77d553..796120eee 100644 --- a/std/atomic/queue.zig +++ b/std/atomic/queue.zig @@ -114,7 +114,7 @@ pub fn Queue(comptime T: type) type { fn dumpRecursive(optional_node: ?*Node, indent: usize) void { var stderr_file = std.io.getStdErr() catch return; - const stderr = &std.io.FileOutStream.init(stderr_file).stream; + const stderr = &stderr_file.outStream().stream; stderr.writeByteNTimes(' ', indent) catch return; if (optional_node) |node| { std.debug.warn("0x{x}={}\n", @ptrToInt(node), node.data); diff --git a/std/coff.zig b/std/coff.zig index bf5a93ef3..6a1aa34b4 100644 --- a/std/coff.zig +++ b/std/coff.zig @@ -41,7 +41,7 @@ pub const Coff = struct { pub fn loadHeader(self: *Coff) !void { const pe_pointer_offset = 0x3C; - var file_stream = io.FileInStream.init(self.in_file); + var file_stream = self.in_file.inStream(); const in = &file_stream.stream; var magic: [2]u8 = undefined; @@ -77,7 +77,7 @@ pub const Coff = struct { try self.loadOptionalHeader(&file_stream); } - fn loadOptionalHeader(self: *Coff, file_stream: *io.FileInStream) !void { + fn loadOptionalHeader(self: *Coff, file_stream: *os.File.InStream) !void { const in = &file_stream.stream; self.pe_header.magic = try in.readIntLe(u16); // For now we're only interested in finding the reference to the .pdb, @@ -115,7 +115,7 @@ pub const Coff = struct { const file_offset = debug_dir.virtual_address - header.virtual_address + header.pointer_to_raw_data; try self.in_file.seekTo(file_offset + debug_dir.size); - var file_stream = io.FileInStream.init(self.in_file); + var file_stream = self.in_file.inStream(); const in = &file_stream.stream; var cv_signature: [4]u8 = undefined; // CodeView signature @@ -146,7 +146,7 @@ pub const Coff = struct { self.sections = ArrayList(Section).init(self.allocator); - var file_stream = io.FileInStream.init(self.in_file); + var file_stream = self.in_file.inStream(); const in = &file_stream.stream; var name: [8]u8 = undefined; diff --git a/std/crypto/throughput_test.zig b/std/crypto/throughput_test.zig index 8894b4fff..10ef1bcd4 100644 --- a/std/crypto/throughput_test.zig +++ b/std/crypto/throughput_test.zig @@ -130,7 +130,7 @@ fn printPad(stdout: var, s: []const u8) !void { pub fn main() !void { var stdout_file = try std.io.getStdOut(); - var stdout_out_stream = std.io.FileOutStream.init(stdout_file); + var stdout_out_stream = stdout_file.outStream(); const stdout = &stdout_out_stream.stream; var buffer: [1024]u8 = undefined; diff --git a/std/debug/index.zig b/std/debug/index.zig index 4a8ee365b..edd47b0ab 100644 --- a/std/debug/index.zig +++ b/std/debug/index.zig @@ -34,10 +34,10 @@ const Module = struct { /// Tries to write to stderr, unbuffered, and ignores any error returned. /// Does not append a newline. var stderr_file: os.File = undefined; -var stderr_file_out_stream: io.FileOutStream = undefined; +var stderr_file_out_stream: os.File.OutStream = undefined; /// TODO multithreaded awareness -var stderr_stream: ?*io.OutStream(io.FileOutStream.Error) = null; +var stderr_stream: ?*io.OutStream(os.File.WriteError) = null; var stderr_mutex = std.Mutex.init(); pub fn warn(comptime fmt: []const u8, args: ...) void { const held = stderr_mutex.acquire(); @@ -46,12 +46,12 @@ pub fn warn(comptime fmt: []const u8, args: ...) void { stderr.print(fmt, args) catch return; } -pub fn getStderrStream() !*io.OutStream(io.FileOutStream.Error) { +pub fn getStderrStream() !*io.OutStream(os.File.WriteError) { if (stderr_stream) |st| { return st; } else { stderr_file = try io.getStdErr(); - stderr_file_out_stream = io.FileOutStream.init(stderr_file); + stderr_file_out_stream = stderr_file.outStream(); const st = &stderr_file_out_stream.stream; stderr_stream = st; return st; @@ -876,7 +876,7 @@ fn openSelfDebugInfoLinux(allocator: *mem.Allocator) !DebugInfo { } pub fn findElfSection(elf: *Elf, name: []const u8) ?*elf.Shdr { - var file_stream = io.FileInStream.init(elf.in_file); + var file_stream = elf.in_file.inStream(); const in = &file_stream.stream; section_loop: for (elf.section_headers) |*elf_section| { @@ -1068,7 +1068,7 @@ pub const DebugInfo = switch (builtin.os) { } pub fn readString(self: *DebugInfo) ![]u8 { - var in_file_stream = io.FileInStream.init(self.self_exe_file); + var in_file_stream = self.self_exe_file.inStream(); const in_stream = &in_file_stream.stream; return readStringRaw(self.allocator(), in_stream); } @@ -1405,7 +1405,7 @@ fn parseFormValue(allocator: *mem.Allocator, in_stream: var, form_id: u64, is_64 fn parseAbbrevTable(st: *DebugInfo) !AbbrevTable { const in_file = st.self_exe_file; - var in_file_stream = io.FileInStream.init(in_file); + var in_file_stream = in_file.inStream(); const in_stream = &in_file_stream.stream; var result = AbbrevTable.init(st.allocator()); while (true) { @@ -1456,7 +1456,7 @@ fn getAbbrevTableEntry(abbrev_table: *const AbbrevTable, abbrev_code: u64) ?*con fn parseDie(st: *DebugInfo, abbrev_table: *const AbbrevTable, is_64: bool) !Die { const in_file = st.self_exe_file; - var in_file_stream = io.FileInStream.init(in_file); + var in_file_stream = in_file.inStream(); const in_stream = &in_file_stream.stream; const abbrev_code = try readULeb128(in_stream); const table_entry = getAbbrevTableEntry(abbrev_table, abbrev_code) orelse return error.InvalidDebugInfo; @@ -1682,7 +1682,7 @@ fn getLineNumberInfoLinux(di: *DebugInfo, compile_unit: *const CompileUnit, targ var this_offset = di.debug_line.offset; var this_index: usize = 0; - var in_file_stream = io.FileInStream.init(in_file); + var in_file_stream = in_file.inStream(); const in_stream = &in_file_stream.stream; while (this_offset < debug_line_end) : (this_index += 1) { @@ -1857,7 +1857,7 @@ fn scanAllCompileUnits(st: *DebugInfo) !void { var this_unit_offset = st.debug_info.offset; var cu_index: usize = 0; - var in_file_stream = io.FileInStream.init(st.self_exe_file); + var in_file_stream = st.self_exe_file.inStream(); const in_stream = &in_file_stream.stream; while (this_unit_offset < debug_info_end) { @@ -1923,7 +1923,7 @@ fn scanAllCompileUnits(st: *DebugInfo) !void { } fn findCompileUnit(st: *DebugInfo, target_address: u64) !*const CompileUnit { - var in_file_stream = io.FileInStream.init(st.self_exe_file); + var in_file_stream = st.self_exe_file.inStream(); const in_stream = &in_file_stream.stream; for (st.compile_unit_list.toSlice()) |*compile_unit| { if (compile_unit.pc_range) |range| { diff --git a/std/elf.zig b/std/elf.zig index a3a72dc72..af74f3e05 100644 --- a/std/elf.zig +++ b/std/elf.zig @@ -381,7 +381,7 @@ pub const Elf = struct { elf.in_file = file; elf.auto_close_stream = false; - var file_stream = io.FileInStream.init(elf.in_file); + var file_stream = elf.in_file.inStream(); const in = &file_stream.stream; var magic: [4]u8 = undefined; @@ -525,7 +525,7 @@ pub const Elf = struct { } pub fn findSection(elf: *Elf, name: []const u8) !?*SectionHeader { - var file_stream = io.FileInStream.init(elf.in_file); + var file_stream = elf.in_file.inStream(); const in = &file_stream.stream; section_loop: for (elf.section_headers) |*elf_section| { diff --git a/std/event.zig b/std/event.zig index 56c5223ba..8409cc5c0 100644 --- a/std/event.zig +++ b/std/event.zig @@ -6,6 +6,7 @@ pub const Locked = @import("event/locked.zig").Locked; pub const RwLock = @import("event/rwlock.zig").RwLock; pub const RwLocked = @import("event/rwlocked.zig").RwLocked; pub const Loop = @import("event/loop.zig").Loop; +pub const io = @import("event/io.zig"); pub const fs = @import("event/fs.zig"); pub const net = @import("event/net.zig"); @@ -14,6 +15,7 @@ test "import event tests" { _ = @import("event/fs.zig"); _ = @import("event/future.zig"); _ = @import("event/group.zig"); + _ = @import("event/io.zig"); _ = @import("event/lock.zig"); _ = @import("event/locked.zig"); _ = @import("event/rwlock.zig"); diff --git a/std/event/fs.zig b/std/event/fs.zig index bde5a306b..7f96ae22b 100644 --- a/std/event/fs.zig +++ b/std/event/fs.zig @@ -1246,9 +1246,7 @@ pub fn Watch(comptime V: type) type { os.linux.EPOLLET | os.linux.EPOLLIN, ) catch unreachable)) catch |err| { const transformed_err = switch (err) { - error.InvalidFileDescriptor => unreachable, error.FileDescriptorAlreadyPresentInSet => unreachable, - error.InvalidSyscall => unreachable, error.OperationCausesCircularLoop => unreachable, error.FileDescriptorNotRegistered => unreachable, error.SystemResources => error.SystemResources, diff --git a/std/event/io.zig b/std/event/io.zig new file mode 100644 index 000000000..13bff6645 --- /dev/null +++ b/std/event/io.zig @@ -0,0 +1,48 @@ +const std = @import("../index.zig"); +const builtin = @import("builtin"); +const Allocator = std.mem.Allocator; +const assert = std.debug.assert; + +pub fn InStream(comptime ReadError: type) type { + return struct { + const Self = @This(); + pub const Error = ReadError; + + /// Return the number of bytes read. It may be less than buffer.len. + /// If the number of bytes read is 0, it means end of stream. + /// End of stream is not an error condition. + readFn: async<*Allocator> fn (self: *Self, buffer: []u8) Error!usize, + + /// Return the number of bytes read. It may be less than buffer.len. + /// If the number of bytes read is 0, it means end of stream. + /// End of stream is not an error condition. + pub async fn read(self: *Self, buffer: []u8) !usize { + return await (async self.readFn(self, buffer) catch unreachable); + } + + /// Same as `read` but end of stream returns `error.EndOfStream`. + pub async fn readFull(self: *Self, buf: []u8) !void { + var index: usize = 0; + while (index != buf.len) { + const amt_read = try await (async self.read(buf[index..]) catch unreachable); + if (amt_read == 0) return error.EndOfStream; + index += amt_read; + } + } + + pub async fn readStruct(self: *Self, comptime T: type, ptr: *T) !void { + // Only extern and packed structs have defined in-memory layout. + comptime assert(@typeInfo(T).Struct.layout != builtin.TypeInfo.ContainerLayout.Auto); + return await (async self.readFull(@sliceToBytes((*[1]T)(ptr)[0..])) catch unreachable); + } + }; +} + +pub fn OutStream(comptime WriteError: type) type { + return struct { + const Self = @This(); + pub const Error = WriteError; + + writeFn: async<*Allocator> fn (self: *Self, buffer: []u8) Error!void, + }; +} diff --git a/std/event/net.zig b/std/event/net.zig index c1e6e891c..25aa85efb 100644 --- a/std/event/net.zig +++ b/std/event/net.zig @@ -3,12 +3,12 @@ const builtin = @import("builtin"); const assert = std.debug.assert; const event = std.event; const mem = std.mem; -const posix = std.os.posix; -const windows = std.os.windows; +const os = std.os; +const posix = os.posix; const Loop = std.event.Loop; pub const Server = struct { - handleRequestFn: async<*mem.Allocator> fn (*Server, *const std.net.Address, *const std.os.File) void, + handleRequestFn: async<*mem.Allocator> fn (*Server, *const std.net.Address, *const os.File) void, loop: *Loop, sockfd: ?i32, @@ -40,17 +40,17 @@ pub const Server = struct { pub fn listen( self: *Server, address: *const std.net.Address, - handleRequestFn: async<*mem.Allocator> fn (*Server, *const std.net.Address, *const std.os.File) void, + handleRequestFn: async<*mem.Allocator> fn (*Server, *const std.net.Address, *const os.File) void, ) !void { self.handleRequestFn = handleRequestFn; - const sockfd = try std.os.posixSocket(posix.AF_INET, posix.SOCK_STREAM | posix.SOCK_CLOEXEC | posix.SOCK_NONBLOCK, posix.PROTO_tcp); - errdefer std.os.close(sockfd); + const sockfd = try os.posixSocket(posix.AF_INET, posix.SOCK_STREAM | posix.SOCK_CLOEXEC | posix.SOCK_NONBLOCK, posix.PROTO_tcp); + errdefer os.close(sockfd); self.sockfd = sockfd; - try std.os.posixBind(sockfd, &address.os_addr); - try std.os.posixListen(sockfd, posix.SOMAXCONN); - self.listen_address = std.net.Address.initPosix(try std.os.posixGetSockName(sockfd)); + try os.posixBind(sockfd, &address.os_addr); + try os.posixListen(sockfd, posix.SOMAXCONN); + self.listen_address = std.net.Address.initPosix(try os.posixGetSockName(sockfd)); self.accept_coro = try async Server.handler(self); errdefer cancel self.accept_coro.?; @@ -63,19 +63,25 @@ pub const Server = struct { /// Stop listening pub fn close(self: *Server) void { self.loop.linuxRemoveFd(self.sockfd.?); - std.os.close(self.sockfd.?); + os.close(self.sockfd.?); } pub fn deinit(self: *Server) void { if (self.accept_coro) |accept_coro| cancel accept_coro; - if (self.sockfd) |sockfd| std.os.close(sockfd); + if (self.sockfd) |sockfd| os.close(sockfd); } pub async fn handler(self: *Server) void { while (true) { var accepted_addr: std.net.Address = undefined; - if (std.os.posixAccept(self.sockfd.?, &accepted_addr.os_addr, posix.SOCK_NONBLOCK | posix.SOCK_CLOEXEC)) |accepted_fd| { - var socket = std.os.File.openHandle(accepted_fd); + // TODO just inline the following function here and don't expose it as posixAsyncAccept + if (os.posixAsyncAccept(self.sockfd.?, &accepted_addr.os_addr, posix.SOCK_NONBLOCK | posix.SOCK_CLOEXEC)) |accepted_fd| { + if (accepted_fd == -1) { + // would block + suspend; // we will get resumed by epoll_wait in the event loop + continue; + } + var socket = os.File.openHandle(accepted_fd); _ = async self.handleRequestFn(self, accepted_addr, socket) catch |err| switch (err) { error.OutOfMemory => { socket.close(); @@ -83,22 +89,16 @@ pub const Server = struct { }, }; } else |err| switch (err) { - error.WouldBlock => { - suspend; // we will get resumed by epoll_wait in the event loop - continue; - }, error.ProcessFdQuotaExceeded => { - errdefer std.os.emfile_promise_queue.remove(&self.waiting_for_emfile_node); + errdefer os.emfile_promise_queue.remove(&self.waiting_for_emfile_node); suspend { self.waiting_for_emfile_node = PromiseNode.init(@handle()); - std.os.emfile_promise_queue.append(&self.waiting_for_emfile_node); + os.emfile_promise_queue.append(&self.waiting_for_emfile_node); } continue; }, - error.ConnectionAborted, error.FileDescriptorClosed => continue, + error.ConnectionAborted => continue, - error.PageFault => unreachable, - error.InvalidSyscall => unreachable, error.FileDescriptorNotASocket => unreachable, error.OperationNotSupported => unreachable, @@ -111,64 +111,161 @@ pub const Server = struct { }; pub async fn connectUnixSocket(loop: *Loop, path: []const u8) !i32 { - const sockfd = try std.os.posixSocket( + const sockfd = try os.posixSocket( posix.AF_UNIX, posix.SOCK_STREAM | posix.SOCK_CLOEXEC | posix.SOCK_NONBLOCK, 0, ); - errdefer std.os.close(sockfd); + errdefer os.close(sockfd); - var sock_addr = posix.sockaddr{ - .un = posix.sockaddr_un{ - .family = posix.AF_UNIX, - .path = undefined, - }, + var sock_addr = posix.sockaddr_un{ + .family = posix.AF_UNIX, + .path = undefined, }; - if (path.len > @typeOf(sock_addr.un.path).len) return error.NameTooLong; - mem.copy(u8, sock_addr.un.path[0..], path); + if (path.len > @typeOf(sock_addr.path).len) return error.NameTooLong; + mem.copy(u8, sock_addr.path[0..], path); const size = @intCast(u32, @sizeOf(posix.sa_family_t) + path.len); - try std.os.posixConnectAsync(sockfd, &sock_addr, size); + try os.posixConnectAsync(sockfd, &sock_addr, size); try await try async loop.linuxWaitFd(sockfd, posix.EPOLLIN | posix.EPOLLOUT | posix.EPOLLET); - try std.os.posixGetSockOptConnectError(sockfd); + try os.posixGetSockOptConnectError(sockfd); return sockfd; } -pub async fn socketRead(loop: *std.event.Loop, fd: i32, buffer: []u8) !void { - while (true) { - return std.os.posixRead(fd, buffer) catch |err| switch (err) { - error.WouldBlock => { - try await try async loop.linuxWaitFd(fd, std.os.posix.EPOLLET | std.os.posix.EPOLLIN); - continue; - }, - else => return err, - }; - } +pub const ReadError = error{ + SystemResources, + Unexpected, + UserResourceLimitReached, + InputOutput, + + FileDescriptorNotRegistered, // TODO remove this possibility + OperationCausesCircularLoop, // TODO remove this possibility + FileDescriptorAlreadyPresentInSet, // TODO remove this possibility + FileDescriptorIncompatibleWithEpoll, // TODO remove this possibility +}; + +/// returns number of bytes read. 0 means EOF. +pub async fn read(loop: *std.event.Loop, fd: os.FileHandle, buffer: []u8) ReadError!usize { + const iov = posix.iovec{ + .iov_base = buffer.ptr, + .iov_len = buffer.len, + }; + const iovs: *const [1]posix.iovec = &iov; + return await (async readvPosix(loop, fd, iovs, 1) catch unreachable); } -pub async fn socketWrite(loop: *std.event.Loop, fd: i32, buffer: []const u8) !void { + +pub const WriteError = error{}; + +pub async fn write(loop: *std.event.Loop, fd: os.FileHandle, buffer: []const u8) WriteError!void { + const iov = posix.iovec_const{ + .iov_base = buffer.ptr, + .iov_len = buffer.len, + }; + const iovs: *const [1]posix.iovec_const = &iov; + return await (async writevPosix(loop, fd, iovs, 1) catch unreachable); +} + +pub async fn writevPosix(loop: *Loop, fd: i32, iov: [*]const posix.iovec_const, count: usize) !void { while (true) { - return std.os.posixWrite(fd, buffer) catch |err| switch (err) { - error.WouldBlock => { - try await try async loop.linuxWaitFd(fd, std.os.posix.EPOLLET | std.os.posix.EPOLLOUT); - continue; + switch (builtin.os) { + builtin.Os.macosx, builtin.Os.linux => { + const rc = posix.writev(fd, iov, count); + const err = posix.getErrno(rc); + switch (err) { + 0 => return, + posix.EINTR => continue, + posix.ESPIPE => unreachable, + posix.EINVAL => unreachable, + posix.EFAULT => unreachable, + posix.EAGAIN => { + try await (async loop.linuxWaitFd(fd, posix.EPOLLET | posix.EPOLLOUT) catch unreachable); + continue; + }, + posix.EBADF => unreachable, // always a race condition + posix.EDESTADDRREQ => unreachable, // connect was never called + posix.EDQUOT => unreachable, + posix.EFBIG => unreachable, + posix.EIO => return error.InputOutput, + posix.ENOSPC => unreachable, + posix.EPERM => return error.AccessDenied, + posix.EPIPE => unreachable, + else => return os.unexpectedErrorPosix(err), + } }, - else => return err, - }; + else => @compileError("Unsupported OS"), + } } } -pub async fn connect(loop: *Loop, _address: *const std.net.Address) !std.os.File { - var address = _address.*; // TODO https://github.com/ziglang/zig/issues/733 +/// returns number of bytes read. 0 means EOF. +pub async fn readvPosix(loop: *std.event.Loop, fd: i32, iov: [*]posix.iovec, count: usize) !usize { + while (true) { + switch (builtin.os) { + builtin.Os.linux, builtin.Os.freebsd, builtin.Os.macosx => { + const rc = posix.readv(fd, iov, count); + const err = posix.getErrno(rc); + switch (err) { + 0 => return rc, + posix.EINTR => continue, + posix.EINVAL => unreachable, + posix.EFAULT => unreachable, + posix.EAGAIN => { + try await (async loop.linuxWaitFd(fd, posix.EPOLLET | posix.EPOLLIN) catch unreachable); + continue; + }, + posix.EBADF => unreachable, // always a race condition + posix.EIO => return error.InputOutput, + posix.EISDIR => unreachable, + posix.ENOBUFS => return error.SystemResources, + posix.ENOMEM => return error.SystemResources, + else => return os.unexpectedErrorPosix(err), + } + }, + else => @compileError("Unsupported OS"), + } + } +} - const sockfd = try std.os.posixSocket(posix.AF_INET, posix.SOCK_STREAM | posix.SOCK_CLOEXEC | posix.SOCK_NONBLOCK, posix.PROTO_tcp); - errdefer std.os.close(sockfd); +pub async fn writev(loop: *Loop, fd: os.FileHandle, data: []const []const u8) !void { + const iovecs = try loop.allocator.alloc(os.posix.iovec_const, data.len); + defer loop.allocator.free(iovecs); - try std.os.posixConnectAsync(sockfd, &address.os_addr, @sizeOf(posix.sockaddr_in)); + for (data) |buf, i| { + iovecs[i] = os.posix.iovec_const{ + .iov_base = buf.ptr, + .iov_len = buf.len, + }; + } + + return await (async writevPosix(loop, fd, iovecs.ptr, data.len) catch unreachable); +} + +pub async fn readv(loop: *Loop, fd: os.FileHandle, data: []const []u8) !usize { + const iovecs = try loop.allocator.alloc(os.posix.iovec, data.len); + defer loop.allocator.free(iovecs); + + for (data) |buf, i| { + iovecs[i] = os.posix.iovec{ + .iov_base = buf.ptr, + .iov_len = buf.len, + }; + } + + return await (async readvPosix(loop, fd, iovecs.ptr, data.len) catch unreachable); +} + +pub async fn connect(loop: *Loop, _address: *const std.net.Address) !os.File { + var address = _address.*; // TODO https://github.com/ziglang/zig/issues/1592 + + const sockfd = try os.posixSocket(posix.AF_INET, posix.SOCK_STREAM | posix.SOCK_CLOEXEC | posix.SOCK_NONBLOCK, posix.PROTO_tcp); + errdefer os.close(sockfd); + + try os.posixConnectAsync(sockfd, &address.os_addr, @sizeOf(posix.sockaddr_in)); try await try async loop.linuxWaitFd(sockfd, posix.EPOLLIN | posix.EPOLLOUT | posix.EPOLLET); - try std.os.posixGetSockOptConnectError(sockfd); + try os.posixGetSockOptConnectError(sockfd); - return std.os.File.openHandle(sockfd); + return os.File.openHandle(sockfd); } test "listen on a port, send bytes, receive bytes" { @@ -181,9 +278,9 @@ test "listen on a port, send bytes, receive bytes" { tcp_server: Server, const Self = @This(); - async<*mem.Allocator> fn handler(tcp_server: *Server, _addr: *const std.net.Address, _socket: *const std.os.File) void { + async<*mem.Allocator> fn handler(tcp_server: *Server, _addr: *const std.net.Address, _socket: *const os.File) void { const self = @fieldParentPtr(Self, "tcp_server", tcp_server); - var socket = _socket.*; // TODO https://github.com/ziglang/zig/issues/733 + var socket = _socket.*; // TODO https://github.com/ziglang/zig/issues/1592 defer socket.close(); // TODO guarantee elision of this allocation const next_handler = async errorableHandler(self, _addr, socket) catch unreachable; @@ -194,12 +291,11 @@ test "listen on a port, send bytes, receive bytes" { cancel @handle(); } } - async fn errorableHandler(self: *Self, _addr: *const std.net.Address, _socket: std.os.File) !void { - const addr = _addr.*; // TODO https://github.com/ziglang/zig/issues/733 - var socket = _socket; // TODO https://github.com/ziglang/zig/issues/733 + async fn errorableHandler(self: *Self, _addr: *const std.net.Address, _socket: os.File) !void { + const addr = _addr.*; // TODO https://github.com/ziglang/zig/issues/1592 + var socket = _socket; // TODO https://github.com/ziglang/zig/issues/1592 - var adapter = std.io.FileOutStream.init(socket); - var stream = &adapter.stream; + const stream = &socket.outStream().stream; try stream.print("hello from server\n"); } }; @@ -230,3 +326,47 @@ async fn doAsyncTest(loop: *Loop, address: *const std.net.Address, server: *Serv assert(mem.eql(u8, msg, "hello from server\n")); server.close(); } + +pub const OutStream = struct { + fd: os.FileHandle, + stream: Stream, + loop: *Loop, + + pub const Error = WriteError; + pub const Stream = event.io.OutStream(Error); + + pub fn init(loop: *Loop, fd: os.FileHandle) OutStream { + return OutStream{ + .fd = fd, + .loop = loop, + .stream = Stream{ .writeFn = writeFn }, + }; + } + + async<*mem.Allocator> fn writeFn(out_stream: *Stream, bytes: []const u8) Error!void { + const self = @fieldParentPtr(OutStream, "stream", out_stream); + return await (async write(self.loop, self.fd, bytes) catch unreachable); + } +}; + +pub const InStream = struct { + fd: os.FileHandle, + stream: Stream, + loop: *Loop, + + pub const Error = ReadError; + pub const Stream = event.io.InStream(Error); + + pub fn init(loop: *Loop, fd: os.FileHandle) InStream { + return InStream{ + .fd = fd, + .loop = loop, + .stream = Stream{ .readFn = readFn }, + }; + } + + async<*mem.Allocator> fn readFn(in_stream: *Stream, bytes: []u8) Error!usize { + const self = @fieldParentPtr(InStream, "stream", in_stream); + return await (async read(self.loop, self.fd, bytes) catch unreachable); + } +}; diff --git a/std/io.zig b/std/io.zig index c10fdcbbe..3d20d8b9d 100644 --- a/std/io.zig +++ b/std/io.zig @@ -32,48 +32,6 @@ pub fn getStdIn() GetStdIoErrs!File { return File.openHandle(handle); } -/// Implementation of InStream trait for File -pub const FileInStream = struct { - file: File, - stream: Stream, - - pub const Error = @typeOf(File.read).ReturnType.ErrorSet; - pub const Stream = InStream(Error); - - pub fn init(file: File) FileInStream { - return FileInStream{ - .file = file, - .stream = Stream{ .readFn = readFn }, - }; - } - - fn readFn(in_stream: *Stream, buffer: []u8) Error!usize { - const self = @fieldParentPtr(FileInStream, "stream", in_stream); - return self.file.read(buffer); - } -}; - -/// Implementation of OutStream trait for File -pub const FileOutStream = struct { - file: File, - stream: Stream, - - pub const Error = File.WriteError; - pub const Stream = OutStream(Error); - - pub fn init(file: File) FileOutStream { - return FileOutStream{ - .file = file, - .stream = Stream{ .writeFn = writeFn }, - }; - } - - fn writeFn(out_stream: *Stream, bytes: []const u8) !void { - const self = @fieldParentPtr(FileOutStream, "stream", out_stream); - return self.file.write(bytes); - } -}; - pub fn InStream(comptime ReadError: type) type { return struct { const Self = @This(); @@ -280,7 +238,7 @@ pub fn readFileAllocAligned(allocator: *mem.Allocator, path: []const u8, comptim const buf = try allocator.alignedAlloc(u8, A, size); errdefer allocator.free(buf); - var adapter = FileInStream.init(file); + var adapter = file.inStream(); try adapter.stream.readNoEof(buf[0..size]); return buf; } @@ -577,8 +535,8 @@ pub const BufferOutStream = struct { pub const BufferedAtomicFile = struct { atomic_file: os.AtomicFile, - file_stream: FileOutStream, - buffered_stream: BufferedOutStream(FileOutStream.Error), + file_stream: os.File.OutStream, + buffered_stream: BufferedOutStream(os.File.WriteError), pub fn create(allocator: *mem.Allocator, dest_path: []const u8) !*BufferedAtomicFile { // TODO with well defined copy elision we don't need this allocation @@ -592,8 +550,8 @@ pub const BufferedAtomicFile = struct { self.atomic_file = try os.AtomicFile.init(allocator, dest_path, os.File.default_mode); errdefer self.atomic_file.deinit(); - self.file_stream = FileOutStream.init(self.atomic_file.file); - self.buffered_stream = BufferedOutStream(FileOutStream.Error).init(&self.file_stream.stream); + self.file_stream = self.atomic_file.file.outStream(); + self.buffered_stream = BufferedOutStream(os.File.WriteError).init(&self.file_stream.stream); return self; } @@ -609,7 +567,7 @@ pub const BufferedAtomicFile = struct { try self.atomic_file.finish(); } - pub fn stream(self: *BufferedAtomicFile) *OutStream(FileOutStream.Error) { + pub fn stream(self: *BufferedAtomicFile) *OutStream(os.File.WriteError) { return &self.buffered_stream.stream; } }; @@ -622,7 +580,7 @@ test "import io tests" { pub fn readLine(buf: []u8) !usize { var stdin = getStdIn() catch return error.StdInUnavailable; - var adapter = FileInStream.init(stdin); + var adapter = stdin.inStream(); var stream = &adapter.stream; var index: usize = 0; while (true) { @@ -642,3 +600,5 @@ pub fn readLine(buf: []u8) !usize { } } } + + diff --git a/std/io_test.zig b/std/io_test.zig index 7403c9699..522419952 100644 --- a/std/io_test.zig +++ b/std/io_test.zig @@ -19,8 +19,8 @@ test "write a file, read it, then delete it" { var file = try os.File.openWrite(tmp_file_name); defer file.close(); - var file_out_stream = io.FileOutStream.init(file); - var buf_stream = io.BufferedOutStream(io.FileOutStream.Error).init(&file_out_stream.stream); + var file_out_stream = file.outStream(); + var buf_stream = io.BufferedOutStream(os.File.WriteError).init(&file_out_stream.stream); const st = &buf_stream.stream; try st.print("begin"); try st.write(data[0..]); @@ -35,8 +35,8 @@ test "write a file, read it, then delete it" { const expected_file_size = "begin".len + data.len + "end".len; assert(file_size == expected_file_size); - var file_in_stream = io.FileInStream.init(file); - var buf_stream = io.BufferedInStream(io.FileInStream.Error).init(&file_in_stream.stream); + var file_in_stream = file.inStream(); + var buf_stream = io.BufferedInStream(os.File.ReadError).init(&file_in_stream.stream); const st = &buf_stream.stream; const contents = try st.readAllAlloc(allocator, 2 * 1024); defer allocator.free(contents); diff --git a/std/os/child_process.zig b/std/os/child_process.zig index 8b76a3cb9..5c02f1ebe 100644 --- a/std/os/child_process.zig +++ b/std/os/child_process.zig @@ -211,8 +211,8 @@ pub const ChildProcess = struct { defer Buffer.deinit(&stdout); defer Buffer.deinit(&stderr); - var stdout_file_in_stream = io.FileInStream.init(child.stdout.?); - var stderr_file_in_stream = io.FileInStream.init(child.stderr.?); + var stdout_file_in_stream = child.stdout.?.inStream(); + var stderr_file_in_stream = child.stderr.?.inStream(); try stdout_file_in_stream.stream.readAllBuffer(&stdout, max_output_size); try stderr_file_in_stream.stream.readAllBuffer(&stderr, max_output_size); diff --git a/std/os/file.zig b/std/os/file.zig index 020a5dca5..4a98dbcfa 100644 --- a/std/os/file.zig +++ b/std/os/file.zig @@ -1,6 +1,7 @@ const std = @import("../index.zig"); const builtin = @import("builtin"); const os = std.os; +const io = std.io; const mem = std.mem; const math = std.math; const assert = std.debug.assert; @@ -368,7 +369,6 @@ pub const File = struct { FileClosed, InputOutput, IsDir, - WouldBlock, SystemResources, Unexpected, @@ -385,7 +385,7 @@ pub const File = struct { posix.EINTR => continue, posix.EINVAL => unreachable, posix.EFAULT => unreachable, - posix.EAGAIN => return error.WouldBlock, + posix.EAGAIN => unreachable, posix.EBADF => return error.FileClosed, posix.EIO => return error.InputOutput, posix.EISDIR => return error.IsDir, @@ -431,4 +431,46 @@ pub const File = struct { @compileError("Unsupported OS"); } } + + pub fn inStream(file: File) InStream { + return InStream{ + .file = file, + .stream = InStream.Stream{ .readFn = InStream.readFn }, + }; + } + + pub fn outStream(file: File) OutStream { + return OutStream{ + .file = file, + .stream = OutStream.Stream{ .writeFn = OutStream.writeFn }, + }; + } + + /// Implementation of io.InStream trait for File + pub const InStream = struct { + file: File, + stream: Stream, + + pub const Error = ReadError; + pub const Stream = io.InStream(Error); + + fn readFn(in_stream: *Stream, buffer: []u8) Error!usize { + const self = @fieldParentPtr(InStream, "stream", in_stream); + return self.file.read(buffer); + } + }; + + /// Implementation of io.OutStream trait for File + pub const OutStream = struct { + file: File, + stream: Stream, + + pub const Error = WriteError; + pub const Stream = io.OutStream(Error); + + fn writeFn(out_stream: *Stream, bytes: []const u8) Error!void { + const self = @fieldParentPtr(OutStream, "stream", out_stream); + return self.file.write(bytes); + } + }; }; diff --git a/std/os/index.zig b/std/os/index.zig index a53d1d205..778e3b965 100644 --- a/std/os/index.zig +++ b/std/os/index.zig @@ -242,8 +242,8 @@ pub fn posixRead(fd: i32, buf: []u8) !void { return switch (err) { posix.EINTR => continue, posix.EINVAL, posix.EFAULT => unreachable, - posix.EAGAIN => error.WouldBlock, - posix.EBADF => error.FileClosed, + posix.EAGAIN => unreachable, + posix.EBADF => unreachable, // always a race condition posix.EIO => error.InputOutput, posix.EISDIR => error.IsDir, posix.ENOBUFS, posix.ENOMEM => error.SystemResources, @@ -284,8 +284,8 @@ pub fn posix_preadv(fd: i32, iov: [*]const posix.iovec, count: usize, offset: u6 posix.EINVAL => unreachable, posix.EFAULT => unreachable, posix.ESPIPE => unreachable, // fd is not seekable - posix.EAGAIN => return error.WouldBlock, - posix.EBADF => return error.FileClosed, + posix.EAGAIN => unreachable, // use posixAsyncPReadV for non blocking + posix.EBADF => unreachable, // always a race condition posix.EIO => return error.InputOutput, posix.EISDIR => return error.IsDir, posix.ENOBUFS => return error.SystemResources, @@ -302,8 +302,8 @@ pub fn posix_preadv(fd: i32, iov: [*]const posix.iovec, count: usize, offset: u6 posix.EINTR => continue, posix.EINVAL => unreachable, posix.EFAULT => unreachable, - posix.EAGAIN => return error.WouldBlock, - posix.EBADF => return error.FileClosed, + posix.EAGAIN => unreachable, // use posixAsyncPReadV for non blocking + posix.EBADF => unreachable, // always a race condition posix.EIO => return error.InputOutput, posix.EISDIR => return error.IsDir, posix.ENOBUFS => return error.SystemResources, @@ -316,9 +316,6 @@ pub fn posix_preadv(fd: i32, iov: [*]const posix.iovec, count: usize, offset: u6 } pub const PosixWriteError = error{ - WouldBlock, - FileClosed, - DestinationAddressRequired, DiskQuota, FileTooBig, InputOutput, @@ -349,9 +346,9 @@ pub fn posixWrite(fd: i32, bytes: []const u8) !void { posix.EINTR => continue, posix.EINVAL => unreachable, posix.EFAULT => unreachable, - posix.EAGAIN => return PosixWriteError.WouldBlock, - posix.EBADF => return PosixWriteError.FileClosed, - posix.EDESTADDRREQ => return PosixWriteError.DestinationAddressRequired, + posix.EAGAIN => unreachable, // use posixAsyncWrite for non-blocking + posix.EBADF => unreachable, // always a race condition + posix.EDESTADDRREQ => unreachable, // connect was never called posix.EDQUOT => return PosixWriteError.DiskQuota, posix.EFBIG => return PosixWriteError.FileTooBig, posix.EIO => return PosixWriteError.InputOutput, @@ -391,9 +388,9 @@ pub fn posix_pwritev(fd: i32, iov: [*]const posix.iovec_const, count: usize, off posix.ESPIPE => unreachable, // fd is not seekable posix.EINVAL => unreachable, posix.EFAULT => unreachable, - posix.EAGAIN => return PosixWriteError.WouldBlock, - posix.EBADF => return PosixWriteError.FileClosed, - posix.EDESTADDRREQ => return PosixWriteError.DestinationAddressRequired, + posix.EAGAIN => unreachable, // use posixAsyncPWriteV for non-blocking + posix.EBADF => unreachable, // always a race condition + posix.EDESTADDRREQ => unreachable, // connect was never called posix.EDQUOT => return PosixWriteError.DiskQuota, posix.EFBIG => return PosixWriteError.FileTooBig, posix.EIO => return PosixWriteError.InputOutput, @@ -412,9 +409,9 @@ pub fn posix_pwritev(fd: i32, iov: [*]const posix.iovec_const, count: usize, off posix.EINTR => continue, posix.EINVAL => unreachable, posix.EFAULT => unreachable, - posix.EAGAIN => return PosixWriteError.WouldBlock, - posix.EBADF => return PosixWriteError.FileClosed, - posix.EDESTADDRREQ => return PosixWriteError.DestinationAddressRequired, + posix.EAGAIN => unreachable, // use posixAsyncPWriteV for non-blocking + posix.EBADF => unreachable, // always a race condition + posix.EDESTADDRREQ => unreachable, // connect was never called posix.EDQUOT => return PosixWriteError.DiskQuota, posix.EFBIG => return PosixWriteError.FileTooBig, posix.EIO => return PosixWriteError.InputOutput, @@ -2287,22 +2284,9 @@ pub const PosixBindError = error{ /// use. See the discussion of /proc/sys/net/ipv4/ip_local_port_range ip(7). AddressInUse, - /// sockfd is not a valid file descriptor. - InvalidFileDescriptor, - - /// The socket is already bound to an address, or addrlen is wrong, or addr is not - /// a valid address for this socket's domain. - InvalidSocketOrAddress, - - /// The file descriptor sockfd does not refer to a socket. - FileDescriptorNotASocket, - /// A nonexistent interface was requested or the requested address was not local. AddressNotAvailable, - /// addr points outside the user's accessible address space. - PageFault, - /// Too many symbolic links were encountered in resolving addr. SymLinkLoop, @@ -2333,11 +2317,11 @@ pub fn posixBind(fd: i32, addr: *const posix.sockaddr) PosixBindError!void { 0 => return, posix.EACCES => return PosixBindError.AccessDenied, posix.EADDRINUSE => return PosixBindError.AddressInUse, - posix.EBADF => return PosixBindError.InvalidFileDescriptor, - posix.EINVAL => return PosixBindError.InvalidSocketOrAddress, - posix.ENOTSOCK => return PosixBindError.FileDescriptorNotASocket, + posix.EBADF => unreachable, // always a race condition if this error is returned + posix.EINVAL => unreachable, + posix.ENOTSOCK => unreachable, posix.EADDRNOTAVAIL => return PosixBindError.AddressNotAvailable, - posix.EFAULT => return PosixBindError.PageFault, + posix.EFAULT => unreachable, posix.ELOOP => return PosixBindError.SymLinkLoop, posix.ENAMETOOLONG => return PosixBindError.NameTooLong, posix.ENOENT => return PosixBindError.FileNotFound, @@ -2356,9 +2340,6 @@ const PosixListenError = error{ /// use. See the discussion of /proc/sys/net/ipv4/ip_local_port_range in ip(7). AddressInUse, - /// The argument sockfd is not a valid file descriptor. - InvalidFileDescriptor, - /// The file descriptor sockfd does not refer to a socket. FileDescriptorNotASocket, @@ -2375,7 +2356,7 @@ pub fn posixListen(sockfd: i32, backlog: u32) PosixListenError!void { switch (err) { 0 => return, posix.EADDRINUSE => return PosixListenError.AddressInUse, - posix.EBADF => return PosixListenError.InvalidFileDescriptor, + posix.EBADF => unreachable, posix.ENOTSOCK => return PosixListenError.FileDescriptorNotASocket, posix.EOPNOTSUPP => return PosixListenError.OperationNotSupported, else => return unexpectedErrorPosix(err), @@ -2383,21 +2364,8 @@ pub fn posixListen(sockfd: i32, backlog: u32) PosixListenError!void { } pub const PosixAcceptError = error{ - /// The socket is marked nonblocking and no connections are present to be accepted. - WouldBlock, - - /// sockfd is not an open file descriptor. - FileDescriptorClosed, - ConnectionAborted, - /// The addr argument is not in a writable part of the user address space. - PageFault, - - /// Socket is not listening for connections, or addrlen is invalid (e.g., is negative), - /// or invalid value in flags. - InvalidSyscall, - /// The per-process limit on the number of open file descriptors has been reached. ProcessFdQuotaExceeded, @@ -2433,14 +2401,43 @@ pub fn posixAccept(fd: i32, addr: *posix.sockaddr, flags: u32) PosixAcceptError! posix.EINTR => continue, else => return unexpectedErrorPosix(err), - posix.EAGAIN => return PosixAcceptError.WouldBlock, - posix.EBADF => return PosixAcceptError.FileDescriptorClosed, + posix.EAGAIN => unreachable, // use posixAsyncAccept for non-blocking + posix.EBADF => unreachable, // always a race condition posix.ECONNABORTED => return PosixAcceptError.ConnectionAborted, - posix.EFAULT => return PosixAcceptError.PageFault, - posix.EINVAL => return PosixAcceptError.InvalidSyscall, + posix.EFAULT => unreachable, + posix.EINVAL => unreachable, posix.EMFILE => return PosixAcceptError.ProcessFdQuotaExceeded, posix.ENFILE => return PosixAcceptError.SystemFdQuotaExceeded, - posix.ENOBUFS, posix.ENOMEM => return PosixAcceptError.SystemResources, + posix.ENOBUFS => return PosixAcceptError.SystemResources, + posix.ENOMEM => return PosixAcceptError.SystemResources, + posix.ENOTSOCK => return PosixAcceptError.FileDescriptorNotASocket, + posix.EOPNOTSUPP => return PosixAcceptError.OperationNotSupported, + posix.EPROTO => return PosixAcceptError.ProtocolFailure, + posix.EPERM => return PosixAcceptError.BlockedByFirewall, + } + } +} + +/// Returns -1 if would block. +pub fn posixAsyncAccept(fd: i32, addr: *posix.sockaddr, flags: u32) PosixAcceptError!i32 { + while (true) { + var sockaddr_size = u32(@sizeOf(posix.sockaddr)); + const rc = posix.accept4(fd, addr, &sockaddr_size, flags); + const err = posix.getErrno(rc); + switch (err) { + 0 => return @intCast(i32, rc), + posix.EINTR => continue, + else => return unexpectedErrorPosix(err), + + posix.EAGAIN => return -1, + posix.EBADF => unreachable, // always a race condition + posix.ECONNABORTED => return PosixAcceptError.ConnectionAborted, + posix.EFAULT => unreachable, + posix.EINVAL => unreachable, + posix.EMFILE => return PosixAcceptError.ProcessFdQuotaExceeded, + posix.ENFILE => return PosixAcceptError.SystemFdQuotaExceeded, + posix.ENOBUFS => return PosixAcceptError.SystemResources, + posix.ENOMEM => return PosixAcceptError.SystemResources, posix.ENOTSOCK => return PosixAcceptError.FileDescriptorNotASocket, posix.EOPNOTSUPP => return PosixAcceptError.OperationNotSupported, posix.EPROTO => return PosixAcceptError.ProtocolFailure, @@ -2450,9 +2447,6 @@ pub fn posixAccept(fd: i32, addr: *posix.sockaddr, flags: u32) PosixAcceptError! } pub const LinuxEpollCreateError = error{ - /// Invalid value specified in flags. - InvalidSyscall, - /// The per-user limit on the number of epoll instances imposed by /// /proc/sys/fs/epoll/max_user_instances was encountered. See epoll(7) for further /// details. @@ -2476,7 +2470,7 @@ pub fn linuxEpollCreate(flags: u32) LinuxEpollCreateError!i32 { 0 => return @intCast(i32, rc), else => return unexpectedErrorPosix(err), - posix.EINVAL => return LinuxEpollCreateError.InvalidSyscall, + posix.EINVAL => unreachable, posix.EMFILE => return LinuxEpollCreateError.ProcessFdQuotaExceeded, posix.ENFILE => return LinuxEpollCreateError.SystemFdQuotaExceeded, posix.ENOMEM => return LinuxEpollCreateError.SystemResources, @@ -2484,22 +2478,10 @@ pub fn linuxEpollCreate(flags: u32) LinuxEpollCreateError!i32 { } pub const LinuxEpollCtlError = error{ - /// epfd or fd is not a valid file descriptor. - InvalidFileDescriptor, - /// op was EPOLL_CTL_ADD, and the supplied file descriptor fd is already registered /// with this epoll instance. FileDescriptorAlreadyPresentInSet, - /// epfd is not an epoll file descriptor, or fd is the same as epfd, or the requested - /// operation op is not supported by this interface, or - /// An invalid event type was specified along with EPOLLEXCLUSIVE in events, or - /// op was EPOLL_CTL_MOD and events included EPOLLEXCLUSIVE, or - /// op was EPOLL_CTL_MOD and the EPOLLEXCLUSIVE flag has previously been applied to - /// this epfd, fd pair, or - /// EPOLLEXCLUSIVE was specified in event and fd refers to an epoll instance. - InvalidSyscall, - /// fd refers to an epoll instance and this EPOLL_CTL_ADD operation would result in a /// circular loop of epoll instances monitoring one another. OperationCausesCircularLoop, @@ -2531,9 +2513,9 @@ pub fn linuxEpollCtl(epfd: i32, op: u32, fd: i32, event: *linux.epoll_event) Lin 0 => return, else => return unexpectedErrorPosix(err), - posix.EBADF => return LinuxEpollCtlError.InvalidFileDescriptor, + posix.EBADF => unreachable, // always a race condition if this happens posix.EEXIST => return LinuxEpollCtlError.FileDescriptorAlreadyPresentInSet, - posix.EINVAL => return LinuxEpollCtlError.InvalidSyscall, + posix.EINVAL => unreachable, posix.ELOOP => return LinuxEpollCtlError.OperationCausesCircularLoop, posix.ENOENT => return LinuxEpollCtlError.FileDescriptorNotRegistered, posix.ENOMEM => return LinuxEpollCtlError.SystemResources, diff --git a/std/os/linux/index.zig b/std/os/linux/index.zig index 6323c89dd..e00c664af 100644 --- a/std/os/linux/index.zig +++ b/std/os/linux/index.zig @@ -793,6 +793,14 @@ pub fn preadv(fd: i32, iov: [*]const iovec, count: usize, offset: u64) usize { return syscall4(SYS_preadv, @intCast(usize, fd), @ptrToInt(iov), count, offset); } +pub fn readv(fd: i32, iov: [*]const iovec, count: usize) usize { + return syscall3(SYS_readv, @intCast(usize, fd), @ptrToInt(iov), count); +} + +pub fn writev(fd: i32, iov: [*]const iovec_const, count: usize) usize { + return syscall3(SYS_writev, @intCast(usize, fd), @ptrToInt(iov), count); +} + pub fn pwritev(fd: i32, iov: [*]const iovec_const, count: usize, offset: u64) usize { return syscall4(SYS_pwritev, @intCast(usize, fd), @ptrToInt(iov), count, offset); } diff --git a/std/pdb.zig b/std/pdb.zig index 3bb6c9d58..6d2371194 100644 --- a/std/pdb.zig +++ b/std/pdb.zig @@ -482,7 +482,7 @@ const Msf = struct { streams: []MsfStream, fn openFile(self: *Msf, allocator: *mem.Allocator, file: os.File) !void { - var file_stream = io.FileInStream.init(file); + var file_stream = file.inStream(); const in = &file_stream.stream; var superblock: SuperBlock = undefined; @@ -597,7 +597,7 @@ const MsfStream = struct { .stream = Stream{ .readFn = readFn }, }; - var file_stream = io.FileInStream.init(file); + var file_stream = file.inStream(); const in = &file_stream.stream; try file.seekTo(pos); @@ -627,7 +627,7 @@ const MsfStream = struct { var offset = self.pos % self.block_size; try self.in_file.seekTo(block * self.block_size + offset); - var file_stream = io.FileInStream.init(self.in_file); + var file_stream = self.in_file.inStream(); const in = &file_stream.stream; var size: usize = 0; diff --git a/std/special/build_runner.zig b/std/special/build_runner.zig index 8cf237f63..f227e02c7 100644 --- a/std/special/build_runner.zig +++ b/std/special/build_runner.zig @@ -48,16 +48,16 @@ pub fn main() !void { var prefix: ?[]const u8 = null; var stderr_file = io.getStdErr(); - var stderr_file_stream: io.FileOutStream = undefined; + var stderr_file_stream: os.File.OutStream = undefined; var stderr_stream = if (stderr_file) |f| x: { - stderr_file_stream = io.FileOutStream.init(f); + stderr_file_stream = f.outStream(); break :x &stderr_file_stream.stream; } else |err| err; var stdout_file = io.getStdOut(); - var stdout_file_stream: io.FileOutStream = undefined; + var stdout_file_stream: os.File.OutStream = undefined; var stdout_stream = if (stdout_file) |f| x: { - stdout_file_stream = io.FileOutStream.init(f); + stdout_file_stream = f.outStream(); break :x &stdout_file_stream.stream; } else |err| err; diff --git a/std/zig/bench.zig b/std/zig/bench.zig index 2db33e591..6a9e8495b 100644 --- a/std/zig/bench.zig +++ b/std/zig/bench.zig @@ -24,7 +24,7 @@ pub fn main() !void { const mb_per_sec = bytes_per_sec / (1024 * 1024); var stdout_file = try std.io.getStdOut(); - const stdout = &std.io.FileOutStream.init(stdout_file).stream; + const stdout = &stdout_file.outStream().stream; try stdout.print("{.3} MiB/s, {} KiB used \n", mb_per_sec, memory_used / 1024); } diff --git a/std/zig/parser_test.zig b/std/zig/parser_test.zig index efa9ad765..401076d53 100644 --- a/std/zig/parser_test.zig +++ b/std/zig/parser_test.zig @@ -1873,7 +1873,7 @@ var fixed_buffer_mem: [100 * 1024]u8 = undefined; fn testParse(source: []const u8, allocator: *mem.Allocator, anything_changed: *bool) ![]u8 { var stderr_file = try io.getStdErr(); - var stderr = &io.FileOutStream.init(stderr_file).stream; + var stderr = &stderr_file.outStream().stream; var tree = try std.zig.parse(allocator, source); defer tree.deinit(); diff --git a/test/compare_output.zig b/test/compare_output.zig index bcd9d15b9..554f85c16 100644 --- a/test/compare_output.zig +++ b/test/compare_output.zig @@ -19,7 +19,7 @@ pub fn addCases(cases: *tests.CompareOutputContext) void { \\ \\pub fn main() void { \\ privateFunction(); - \\ const stdout = &FileOutStream.init(getStdOut() catch unreachable).stream; + \\ const stdout = &(getStdOut() catch unreachable).outStream().stream; \\ stdout.print("OK 2\n") catch unreachable; \\} \\ @@ -34,7 +34,7 @@ pub fn addCases(cases: *tests.CompareOutputContext) void { \\// purposefully conflicting function with main.zig \\// but it's private so it should be OK \\fn privateFunction() void { - \\ const stdout = &FileOutStream.init(getStdOut() catch unreachable).stream; + \\ const stdout = &(getStdOut() catch unreachable).outStream().stream; \\ stdout.print("OK 1\n") catch unreachable; \\} \\ @@ -60,7 +60,7 @@ pub fn addCases(cases: *tests.CompareOutputContext) void { tc.addSourceFile("foo.zig", \\use @import("std").io; \\pub fn foo_function() void { - \\ const stdout = &FileOutStream.init(getStdOut() catch unreachable).stream; + \\ const stdout = &(getStdOut() catch unreachable).outStream().stream; \\ stdout.print("OK\n") catch unreachable; \\} ); @@ -71,7 +71,7 @@ pub fn addCases(cases: *tests.CompareOutputContext) void { \\ \\pub fn bar_function() void { \\ if (foo_function()) { - \\ const stdout = &FileOutStream.init(getStdOut() catch unreachable).stream; + \\ const stdout = &(getStdOut() catch unreachable).outStream().stream; \\ stdout.print("OK\n") catch unreachable; \\ } \\} @@ -103,7 +103,7 @@ pub fn addCases(cases: *tests.CompareOutputContext) void { \\pub const a_text = "OK\n"; \\ \\pub fn ok() void { - \\ const stdout = &io.FileOutStream.init(io.getStdOut() catch unreachable).stream; + \\ const stdout = &(io.getStdOut() catch unreachable).outStream().stream; \\ stdout.print(b_text) catch unreachable; \\} ); @@ -121,7 +121,7 @@ pub fn addCases(cases: *tests.CompareOutputContext) void { \\const io = @import("std").io; \\ \\pub fn main() void { - \\ const stdout = &io.FileOutStream.init(io.getStdOut() catch unreachable).stream; + \\ const stdout = &(io.getStdOut() catch unreachable).outStream().stream; \\ stdout.print("Hello, world!\n{d4} {x3} {c}\n", u32(12), u16(0x12), u8('a')) catch unreachable; \\} , "Hello, world!\n0012 012 a\n"); @@ -274,7 +274,7 @@ pub fn addCases(cases: *tests.CompareOutputContext) void { \\ var x_local : i32 = print_ok(x); \\} \\fn print_ok(val: @typeOf(x)) @typeOf(foo) { - \\ const stdout = &io.FileOutStream.init(io.getStdOut() catch unreachable).stream; + \\ const stdout = &(io.getStdOut() catch unreachable).outStream().stream; \\ stdout.print("OK\n") catch unreachable; \\ return 0; \\} @@ -356,7 +356,7 @@ pub fn addCases(cases: *tests.CompareOutputContext) void { \\pub fn main() void { \\ const bar = Bar {.field2 = 13,}; \\ const foo = Foo {.field1 = bar,}; - \\ const stdout = &io.FileOutStream.init(io.getStdOut() catch unreachable).stream; + \\ const stdout = &(io.getStdOut() catch unreachable).outStream().stream; \\ if (!foo.method()) { \\ stdout.print("BAD\n") catch unreachable; \\ } @@ -370,7 +370,7 @@ pub fn addCases(cases: *tests.CompareOutputContext) void { cases.add("defer with only fallthrough", \\const io = @import("std").io; \\pub fn main() void { - \\ const stdout = &io.FileOutStream.init(io.getStdOut() catch unreachable).stream; + \\ const stdout = &(io.getStdOut() catch unreachable).outStream().stream; \\ stdout.print("before\n") catch unreachable; \\ defer stdout.print("defer1\n") catch unreachable; \\ defer stdout.print("defer2\n") catch unreachable; @@ -383,7 +383,7 @@ pub fn addCases(cases: *tests.CompareOutputContext) void { \\const io = @import("std").io; \\const os = @import("std").os; \\pub fn main() void { - \\ const stdout = &io.FileOutStream.init(io.getStdOut() catch unreachable).stream; + \\ const stdout = &(io.getStdOut() catch unreachable).outStream().stream; \\ stdout.print("before\n") catch unreachable; \\ defer stdout.print("defer1\n") catch unreachable; \\ defer stdout.print("defer2\n") catch unreachable; @@ -400,7 +400,7 @@ pub fn addCases(cases: *tests.CompareOutputContext) void { \\ do_test() catch return; \\} \\fn do_test() !void { - \\ const stdout = &io.FileOutStream.init(io.getStdOut() catch unreachable).stream; + \\ const stdout = &(io.getStdOut() catch unreachable).outStream().stream; \\ stdout.print("before\n") catch unreachable; \\ defer stdout.print("defer1\n") catch unreachable; \\ errdefer stdout.print("deferErr\n") catch unreachable; @@ -419,7 +419,7 @@ pub fn addCases(cases: *tests.CompareOutputContext) void { \\ do_test() catch return; \\} \\fn do_test() !void { - \\ const stdout = &io.FileOutStream.init(io.getStdOut() catch unreachable).stream; + \\ const stdout = &(io.getStdOut() catch unreachable).outStream().stream; \\ stdout.print("before\n") catch unreachable; \\ defer stdout.print("defer1\n") catch unreachable; \\ errdefer stdout.print("deferErr\n") catch unreachable; @@ -436,7 +436,7 @@ pub fn addCases(cases: *tests.CompareOutputContext) void { \\const io = @import("std").io; \\ \\pub fn main() void { - \\ const stdout = &io.FileOutStream.init(io.getStdOut() catch unreachable).stream; + \\ const stdout = &(io.getStdOut() catch unreachable).outStream().stream; \\ stdout.print(foo_txt) catch unreachable; \\} , "1234\nabcd\n"); @@ -456,7 +456,7 @@ pub fn addCases(cases: *tests.CompareOutputContext) void { \\pub fn main() !void { \\ var args_it = os.args(); \\ var stdout_file = try io.getStdOut(); - \\ var stdout_adapter = io.FileOutStream.init(stdout_file); + \\ var stdout_adapter = stdout_file.outStream(); \\ const stdout = &stdout_adapter.stream; \\ var index: usize = 0; \\ _ = args_it.skip(); @@ -497,7 +497,7 @@ pub fn addCases(cases: *tests.CompareOutputContext) void { \\pub fn main() !void { \\ var args_it = os.args(); \\ var stdout_file = try io.getStdOut(); - \\ var stdout_adapter = io.FileOutStream.init(stdout_file); + \\ var stdout_adapter = stdout_file.outStream(); \\ const stdout = &stdout_adapter.stream; \\ var index: usize = 0; \\ _ = args_it.skip(); diff --git a/test/standalone/brace_expansion/main.zig b/test/standalone/brace_expansion/main.zig index 36b4f8bd4..adbc41490 100644 --- a/test/standalone/brace_expansion/main.zig +++ b/test/standalone/brace_expansion/main.zig @@ -191,7 +191,7 @@ pub fn main() !void { var stdin_buf = try Buffer.initSize(global_allocator, 0); defer stdin_buf.deinit(); - var stdin_adapter = io.FileInStream.init(stdin_file); + var stdin_adapter = stdin_file.inStream(); try stdin_adapter.stream.readAllBuffer(&stdin_buf, @maxValue(usize)); var result_buf = try Buffer.initSize(global_allocator, 0); diff --git a/test/tests.zig b/test/tests.zig index 88908e877..ec4dcd1f5 100644 --- a/test/tests.zig +++ b/test/tests.zig @@ -278,8 +278,8 @@ pub const CompareOutputContext = struct { var stdout = Buffer.initNull(b.allocator); var stderr = Buffer.initNull(b.allocator); - var stdout_file_in_stream = io.FileInStream.init(child.stdout.?); - var stderr_file_in_stream = io.FileInStream.init(child.stderr.?); + var stdout_file_in_stream = child.stdout.?.inStream(); + var stderr_file_in_stream = child.stderr.?.inStream(); stdout_file_in_stream.stream.readAllBuffer(&stdout, max_stdout_size) catch unreachable; stderr_file_in_stream.stream.readAllBuffer(&stderr, max_stdout_size) catch unreachable; @@ -593,8 +593,8 @@ pub const CompileErrorContext = struct { var stdout_buf = Buffer.initNull(b.allocator); var stderr_buf = Buffer.initNull(b.allocator); - var stdout_file_in_stream = io.FileInStream.init(child.stdout.?); - var stderr_file_in_stream = io.FileInStream.init(child.stderr.?); + var stdout_file_in_stream = child.stdout.?.inStream(); + var stderr_file_in_stream = child.stderr.?.inStream(); stdout_file_in_stream.stream.readAllBuffer(&stdout_buf, max_stdout_size) catch unreachable; stderr_file_in_stream.stream.readAllBuffer(&stderr_buf, max_stdout_size) catch unreachable; @@ -857,8 +857,8 @@ pub const TranslateCContext = struct { var stdout_buf = Buffer.initNull(b.allocator); var stderr_buf = Buffer.initNull(b.allocator); - var stdout_file_in_stream = io.FileInStream.init(child.stdout.?); - var stderr_file_in_stream = io.FileInStream.init(child.stderr.?); + var stdout_file_in_stream = child.stdout.?.inStream(); + var stderr_file_in_stream = child.stderr.?.inStream(); stdout_file_in_stream.stream.readAllBuffer(&stdout_buf, max_stdout_size) catch unreachable; stderr_file_in_stream.stream.readAllBuffer(&stderr_buf, max_stdout_size) catch unreachable;