async tcp server proof of concept

master
Andrew Kelley 2018-04-09 00:52:45 -04:00
parent cbda0fa78c
commit e85a10e9f5
11 changed files with 231 additions and 50 deletions

View File

@ -432,6 +432,7 @@ set(ZIG_STD_FILES
"dwarf.zig"
"elf.zig"
"empty.zig"
"event.zig"
"fmt/errol/enum3.zig"
"fmt/errol/index.zig"
"fmt/errol/lookup.zig"

View File

@ -408,6 +408,9 @@ static uint32_t get_err_ret_trace_arg_index(CodeGen *g, FnTableEntry *fn_table_e
if (!g->have_err_ret_tracing) {
return UINT32_MAX;
}
if (fn_table_entry->type_entry->data.fn.fn_type_id.cc == CallingConventionAsync) {
return 0;
}
TypeTableEntry *fn_type = fn_table_entry->type_entry;
if (!fn_type_can_fail(&fn_type->data.fn.fn_type_id)) {
return UINT32_MAX;

View File

@ -2755,9 +2755,10 @@ static IrInstruction *ir_mark_gen(IrInstruction *instruction) {
static bool ir_gen_defers_for_block(IrBuilder *irb, Scope *inner_scope, Scope *outer_scope, bool gen_error_defers) {
Scope *scope = inner_scope;
bool is_noreturn = false;
while (scope != outer_scope) {
if (!scope)
return false;
return is_noreturn;
if (scope->id == ScopeIdDefer) {
AstNode *defer_node = scope->source_node;
@ -2770,14 +2771,18 @@ static bool ir_gen_defers_for_block(IrBuilder *irb, Scope *inner_scope, Scope *o
Scope *defer_expr_scope = defer_node->data.defer.expr_scope;
IrInstruction *defer_expr_value = ir_gen_node(irb, defer_expr_node, defer_expr_scope);
if (defer_expr_value != irb->codegen->invalid_instruction) {
ir_mark_gen(ir_build_check_statement_is_void(irb, defer_expr_scope, defer_expr_node, defer_expr_value));
if (defer_expr_value->value.type != nullptr && defer_expr_value->value.type->id == TypeTableEntryIdUnreachable) {
is_noreturn = true;
} else {
ir_mark_gen(ir_build_check_statement_is_void(irb, defer_expr_scope, defer_expr_node, defer_expr_value));
}
}
}
}
scope = scope->parent;
}
return true;
return is_noreturn;
}
static void ir_set_cursor_at_end(IrBuilder *irb, IrBasicBlock *basic_block) {
@ -2936,12 +2941,13 @@ static IrInstruction *ir_gen_return(IrBuilder *irb, Scope *scope, AstNode *node,
ir_mark_gen(ir_build_cond_br(irb, scope, node, is_err_val, return_block, continue_block, is_comptime));
ir_set_cursor_at_end_and_append_block(irb, return_block);
ir_gen_defers_for_block(irb, scope, outer_scope, true);
IrInstruction *err_val = ir_build_unwrap_err_code(irb, scope, node, err_union_ptr);
if (irb->codegen->have_err_ret_tracing && !should_inline) {
ir_build_save_err_ret_addr(irb, scope, node);
if (!ir_gen_defers_for_block(irb, scope, outer_scope, true)) {
IrInstruction *err_val = ir_build_unwrap_err_code(irb, scope, node, err_union_ptr);
if (irb->codegen->have_err_ret_tracing && !should_inline) {
ir_build_save_err_ret_addr(irb, scope, node);
}
ir_gen_async_return(irb, scope, node, err_val, false);
}
ir_gen_async_return(irb, scope, node, err_val, false);
ir_set_cursor_at_end_and_append_block(irb, continue_block);
IrInstruction *unwrapped_ptr = ir_build_unwrap_err_payload(irb, scope, node, err_union_ptr, false);
@ -5695,7 +5701,7 @@ static IrInstruction *ir_gen_continue(IrBuilder *irb, Scope *continue_scope, Ast
IrBasicBlock *dest_block = loop_scope->continue_block;
ir_gen_defers_for_block(irb, continue_scope, dest_block->scope, false);
return ir_build_br(irb, continue_scope, node, dest_block, is_comptime);
return ir_mark_gen(ir_build_br(irb, continue_scope, node, dest_block, is_comptime));
}
static IrInstruction *ir_gen_error_type(IrBuilder *irb, Scope *scope, AstNode *node) {
@ -6178,7 +6184,7 @@ static IrInstruction *ir_gen_await_expr(IrBuilder *irb, Scope *parent_scope, Ast
ir_set_cursor_at_end_and_append_block(irb, cleanup_block);
ir_gen_defers_for_block(irb, parent_scope, outer_scope, true);
ir_build_br(irb, parent_scope, node, irb->exec->coro_final_cleanup_block, const_bool_false);
ir_mark_gen(ir_build_br(irb, parent_scope, node, irb->exec->coro_final_cleanup_block, const_bool_false));
ir_set_cursor_at_end_and_append_block(irb, resume_block);
IrInstruction *yes_suspend_result = ir_build_load_ptr(irb, parent_scope, node, my_result_var_ptr);
@ -6254,7 +6260,7 @@ static IrInstruction *ir_gen_suspend(IrBuilder *irb, Scope *parent_scope, AstNod
ir_set_cursor_at_end_and_append_block(irb, cleanup_block);
ir_gen_defers_for_block(irb, parent_scope, outer_scope, true);
ir_build_br(irb, parent_scope, node, irb->exec->coro_final_cleanup_block, const_bool_false);
ir_mark_gen(ir_build_br(irb, parent_scope, node, irb->exec->coro_final_cleanup_block, const_bool_false));
ir_set_cursor_at_end_and_append_block(irb, resume_block);
return ir_build_const_void(irb, parent_scope, node);
@ -16746,6 +16752,11 @@ static TypeTableEntry *ir_analyze_instruction_fn_proto(IrAnalyze *ira, IrInstruc
return ira->codegen->builtin_types.entry_invalid;
if (fn_type_id.cc == CallingConventionAsync) {
if (instruction->async_allocator_type_value == nullptr) {
ir_add_error(ira, &instruction->base,
buf_sprintf("async fn proto missing allocator type"));
return ira->codegen->builtin_types.entry_invalid;
}
IrInstruction *async_allocator_type_value = instruction->async_allocator_type_value->other;
fn_type_id.async_allocator_type = ir_resolve_type(ira, async_allocator_type_value);
if (type_is_invalid(fn_type_id.async_allocator_type))

View File

@ -55,3 +55,11 @@ pub const dirent = extern struct {
d_type: u8,
d_name: u8, // field address is address of first byte of name
};
pub const sockaddr = extern struct {
sa_len: u8,
sa_family: sa_family_t,
sa_data: [14]u8,
};
pub const sa_family_t = u8;

View File

@ -1,4 +1,5 @@
const std = @import("index.zig");
const builtin = @import("builtin");
const assert = std.debug.assert;
const event = this;
const mem = std.mem;
@ -38,7 +39,7 @@ pub const TcpServer = struct {
{
self.handleRequestFn = handleRequestFn;
try std.os.posixBind(self.sockfd, &address.sockaddr);
try std.os.posixBind(self.sockfd, &address.os_addr);
try std.os.posixListen(self.sockfd, posix.SOMAXCONN);
self.listen_address = std.net.Address.initPosix(try std.os.posixGetSockName(self.sockfd));
@ -59,7 +60,7 @@ pub const TcpServer = struct {
pub async fn handler(self: &TcpServer) void {
while (true) {
var accepted_addr: std.net.Address = undefined;
if (std.os.posixAccept(self.sockfd, &accepted_addr.sockaddr,
if (std.os.posixAccept(self.sockfd, &accepted_addr.os_addr,
posix.SOCK_NONBLOCK | posix.SOCK_CLOEXEC)) |accepted_fd|
{
var socket = std.os.File.openHandle(accepted_fd);
@ -118,7 +119,7 @@ pub const Loop = struct {
pub fn addFd(self: &Loop, fd: i32, prom: promise) !void {
var ev = std.os.linux.epoll_event {
.events = std.os.linux.EPOLLIN|std.os.linux.EPOLLET,
.events = std.os.linux.EPOLLIN|std.os.linux.EPOLLOUT|std.os.linux.EPOLLET,
.data = std.os.linux.epoll_data {
.ptr = @ptrToInt(prom),
},
@ -155,7 +156,24 @@ pub const Loop = struct {
}
};
pub async fn connect(loop: &Loop, _address: &const std.net.Address) !std.os.File {
var address = *_address; // TODO https://github.com/zig-lang/zig/issues/733
const sockfd = try std.os.posixSocket(posix.AF_INET, posix.SOCK_STREAM|posix.SOCK_CLOEXEC|posix.SOCK_NONBLOCK, posix.PROTO_tcp);
errdefer std.os.close(sockfd);
try std.os.posixConnectAsync(sockfd, &address.os_addr);
try await try async loop.waitFd(sockfd);
try std.os.posixGetSockOptConnectError(sockfd);
return std.os.File.openHandle(sockfd);
}
test "listen on a port, send bytes, receive bytes" {
if (builtin.os != builtin.Os.linux) {
// TODO build abstractions for other operating systems
return;
}
const MyServer = struct {
tcp_server: TcpServer,
@ -198,11 +216,20 @@ test "listen on a port, send bytes, receive bytes" {
defer server.tcp_server.deinit();
try server.tcp_server.listen(addr, MyServer.handler);
var stderr_file = try std.io.getStdErr();
var stderr_stream = &std.io.FileOutStream.init(&stderr_file).stream;
try stderr_stream.print("\nlistening at ");
try server.tcp_server.listen_address.format(stderr_stream);
try stderr_stream.print("\n");
const p = try async<std.debug.global_allocator> doAsyncTest(&loop, server.tcp_server.listen_address);
defer cancel p;
loop.run();
}
async fn doAsyncTest(loop: &Loop, address: &const std.net.Address) void {
errdefer @panic("test failure");
var socket_file = try await try async event.connect(loop, address);
defer socket_file.close();
var buf: [512]u8 = undefined;
const amt_read = try socket_file.read(buf[0..]);
const msg = buf[0..amt_read];
assert(mem.eql(u8, msg, "hello from server\n"));
loop.stop();
}

View File

@ -50,7 +50,7 @@ test "std" {
_ = @import("dwarf.zig");
_ = @import("elf.zig");
_ = @import("empty.zig");
//TODO_ = @import("event.zig");
_ = @import("event.zig");
_ = @import("fmt/index.zig");
_ = @import("hash/index.zig");
_ = @import("io.zig");

View File

@ -1,15 +1,26 @@
const std = @import("index.zig");
const builtin = @import("builtin");
const assert = std.debug.assert;
const net = this;
const posix = std.os.posix;
const mem = std.mem;
pub const TmpWinAddr = struct {
family: u8,
data: [14]u8,
};
pub const OsAddress = switch (builtin.os) {
builtin.Os.windows => TmpWinAddr,
else => posix.sockaddr,
};
pub const Address = struct {
sockaddr: posix.sockaddr,
os_addr: OsAddress,
pub fn initIp4(ip4: u32, port: u16) Address {
return Address {
.sockaddr = posix.sockaddr {
.os_addr = posix.sockaddr {
.in = posix.sockaddr_in {
.family = posix.AF_INET,
.port = std.mem.endianSwapIfLe(u16, port),
@ -23,7 +34,7 @@ pub const Address = struct {
pub fn initIp6(ip6: &const Ip6Addr, port: u16) Address {
return Address {
.family = posix.AF_INET6,
.sockaddr = posix.sockaddr {
.os_addr = posix.sockaddr {
.in6 = posix.sockaddr_in6 {
.family = posix.AF_INET6,
.port = std.mem.endianSwapIfLe(u16, port),
@ -37,19 +48,19 @@ pub const Address = struct {
pub fn initPosix(addr: &const posix.sockaddr) Address {
return Address {
.sockaddr = *addr,
.os_addr = *addr,
};
}
pub fn format(self: &const Address, out_stream: var) !void {
switch (self.sockaddr.in.family) {
switch (self.os_addr.in.family) {
posix.AF_INET => {
const native_endian_port = std.mem.endianSwapIfLe(u16, self.sockaddr.in.port);
const bytes = ([]const u8)((&self.sockaddr.in.addr)[0..1]);
const native_endian_port = std.mem.endianSwapIfLe(u16, self.os_addr.in.port);
const bytes = ([]const u8)((&self.os_addr.in.addr)[0..1]);
try out_stream.print("{}.{}.{}.{}:{}", bytes[0], bytes[1], bytes[2], bytes[3], native_endian_port);
},
posix.AF_INET6 => {
const native_endian_port = std.mem.endianSwapIfLe(u16, self.sockaddr.in6.port);
const native_endian_port = std.mem.endianSwapIfLe(u16, self.os_addr.in6.port);
try out_stream.print("[TODO render ip6 address]:{}", native_endian_port);
},
else => try out_stream.write("(unrecognized address family)"),

View File

@ -301,6 +301,9 @@ pub const timespec = c.timespec;
pub const Stat = c.Stat;
pub const dirent = c.dirent;
pub const sa_family_t = c.sa_family_t;
pub const sockaddr = c.sockaddr;
/// Renamed from `sigaction` to `Sigaction` to avoid conflict with the syscall.
pub const Sigaction = struct {
handler: extern fn(i32)void,

View File

@ -2217,7 +2217,7 @@ pub fn linuxEpollCtl(epfd: i32, op: u32, fd: i32, event: &linux.epoll_event) Lin
pub fn linuxEpollWait(epfd: i32, events: []linux.epoll_event, timeout: i32) usize {
while (true) {
const rc = posix.epoll_wait(epfd, &events[0], u32(events.len), timeout);
const rc = posix.epoll_wait(epfd, events.ptr, u32(events.len), timeout);
const err = posix.getErrno(rc);
switch (err) {
0 => return rc,
@ -2253,3 +2253,134 @@ pub fn posixGetSockName(sockfd: i32) PosixGetSockNameError!posix.sockaddr {
posix.ENOBUFS => return PosixGetSockNameError.SystemResources,
}
}
pub const PosixConnectError = error {
/// For UNIX domain sockets, which are identified by pathname: Write permission is denied on the socket
/// file, or search permission is denied for one of the directories in the path prefix.
/// or
/// The user tried to connect to a broadcast address without having the socket broadcast flag enabled or
/// the connection request failed because of a local firewall rule.
PermissionDenied,
/// Local address is already in use.
AddressInUse,
/// (Internet domain sockets) The socket referred to by sockfd had not previously been bound to an
/// address and, upon attempting to bind it to an ephemeral port, it was determined that all port numbers
/// in the ephemeral port range are currently in use. See the discussion of
/// /proc/sys/net/ipv4/ip_local_port_range in ip(7).
AddressNotAvailable,
/// The passed address didn't have the correct address family in its sa_family field.
AddressFamilyNotSupported,
/// Insufficient entries in the routing cache.
SystemResources,
/// A connect() on a stream socket found no one listening on the remote address.
ConnectionRefused,
/// Network is unreachable.
NetworkUnreachable,
/// Timeout while attempting connection. The server may be too busy to accept new connections. Note
/// that for IP sockets the timeout may be very long when syncookies are enabled on the server.
ConnectionTimedOut,
Unexpected,
};
pub fn posixConnect(sockfd: i32, sockaddr: &const posix.sockaddr) PosixConnectError!void {
while (true) {
const rc = posix.connect(sockfd, sockaddr, @sizeOf(posix.sockaddr));
const err = posix.getErrno(rc);
switch (err) {
0 => return,
else => return unexpectedErrorPosix(err),
posix.EACCES => return PosixConnectError.PermissionDenied,
posix.EPERM => return PosixConnectError.PermissionDenied,
posix.EADDRINUSE => return PosixConnectError.AddressInUse,
posix.EADDRNOTAVAIL => return PosixConnectError.AddressNotAvailable,
posix.EAFNOSUPPORT => return PosixConnectError.AddressFamilyNotSupported,
posix.EAGAIN => return PosixConnectError.SystemResources,
posix.EALREADY => unreachable, // The socket is nonblocking and a previous connection attempt has not yet been completed.
posix.EBADF => unreachable, // sockfd is not a valid open file descriptor.
posix.ECONNREFUSED => return PosixConnectError.ConnectionRefused,
posix.EFAULT => unreachable, // The socket structure address is outside the user's address space.
posix.EINPROGRESS => unreachable, // The socket is nonblocking and the connection cannot be completed immediately.
posix.EINTR => continue,
posix.EISCONN => unreachable, // The socket is already connected.
posix.ENETUNREACH => return PosixConnectError.NetworkUnreachable,
posix.ENOTSOCK => unreachable, // The file descriptor sockfd does not refer to a socket.
posix.EPROTOTYPE => unreachable, // The socket type does not support the requested communications protocol.
posix.ETIMEDOUT => return PosixConnectError.ConnectionTimedOut,
}
}
}
/// Same as posixConnect except it is for blocking socket file descriptors.
/// It expects to receive EINPROGRESS.
pub fn posixConnectAsync(sockfd: i32, sockaddr: &const posix.sockaddr) PosixConnectError!void {
while (true) {
const rc = posix.connect(sockfd, sockaddr, @sizeOf(posix.sockaddr));
const err = posix.getErrno(rc);
switch (err) {
0, posix.EINPROGRESS => return,
else => return unexpectedErrorPosix(err),
posix.EACCES => return PosixConnectError.PermissionDenied,
posix.EPERM => return PosixConnectError.PermissionDenied,
posix.EADDRINUSE => return PosixConnectError.AddressInUse,
posix.EADDRNOTAVAIL => return PosixConnectError.AddressNotAvailable,
posix.EAFNOSUPPORT => return PosixConnectError.AddressFamilyNotSupported,
posix.EAGAIN => return PosixConnectError.SystemResources,
posix.EALREADY => unreachable, // The socket is nonblocking and a previous connection attempt has not yet been completed.
posix.EBADF => unreachable, // sockfd is not a valid open file descriptor.
posix.ECONNREFUSED => return PosixConnectError.ConnectionRefused,
posix.EFAULT => unreachable, // The socket structure address is outside the user's address space.
posix.EINTR => continue,
posix.EISCONN => unreachable, // The socket is already connected.
posix.ENETUNREACH => return PosixConnectError.NetworkUnreachable,
posix.ENOTSOCK => unreachable, // The file descriptor sockfd does not refer to a socket.
posix.EPROTOTYPE => unreachable, // The socket type does not support the requested communications protocol.
posix.ETIMEDOUT => return PosixConnectError.ConnectionTimedOut,
}
}
}
pub fn posixGetSockOptConnectError(sockfd: i32) PosixConnectError!void {
var err_code: i32 = undefined;
var size: u32 = @sizeOf(i32);
const rc = posix.getsockopt(sockfd, posix.SOL_SOCKET, posix.SO_ERROR, @ptrCast(&u8, &err_code), &size);
assert(size == 4);
const err = posix.getErrno(rc);
switch (err) {
0 => switch (err_code) {
0 => return,
else => return unexpectedErrorPosix(err),
posix.EACCES => return PosixConnectError.PermissionDenied,
posix.EPERM => return PosixConnectError.PermissionDenied,
posix.EADDRINUSE => return PosixConnectError.AddressInUse,
posix.EADDRNOTAVAIL => return PosixConnectError.AddressNotAvailable,
posix.EAFNOSUPPORT => return PosixConnectError.AddressFamilyNotSupported,
posix.EAGAIN => return PosixConnectError.SystemResources,
posix.EALREADY => unreachable, // The socket is nonblocking and a previous connection attempt has not yet been completed.
posix.EBADF => unreachable, // sockfd is not a valid open file descriptor.
posix.ECONNREFUSED => return PosixConnectError.ConnectionRefused,
posix.EFAULT => unreachable, // The socket structure address is outside the user's address space.
posix.EISCONN => unreachable, // The socket is already connected.
posix.ENETUNREACH => return PosixConnectError.NetworkUnreachable,
posix.ENOTSOCK => unreachable, // The file descriptor sockfd does not refer to a socket.
posix.EPROTOTYPE => unreachable, // The socket type does not support the requested communications protocol.
posix.ETIMEDOUT => return PosixConnectError.ConnectionTimedOut,
},
else => return unexpectedErrorPosix(err),
posix.EBADF => unreachable, // The argument sockfd is not a valid file descriptor.
posix.EFAULT => unreachable, // The address pointed to by optval or optlen is not in a valid part of the process address space.
posix.EINVAL => unreachable,
posix.ENOPROTOOPT => unreachable, // The option is unknown at the level indicated.
posix.ENOTSOCK => unreachable, // The file descriptor sockfd does not refer to a socket.
}
}

View File

@ -775,12 +775,12 @@ pub fn socket(domain: u32, socket_type: u32, protocol: u32) usize {
return syscall3(SYS_socket, domain, socket_type, protocol);
}
pub fn setsockopt(fd: i32, level: i32, optname: i32, optval: &const u8, optlen: socklen_t) usize {
return syscall5(SYS_setsockopt, usize(fd), usize(level), usize(optname), usize(optval), @ptrToInt(optlen));
pub fn setsockopt(fd: i32, level: u32, optname: u32, optval: &const u8, optlen: socklen_t) usize {
return syscall5(SYS_setsockopt, usize(fd), level, optname, usize(optval), @ptrToInt(optlen));
}
pub fn getsockopt(fd: i32, level: i32, optname: i32, noalias optval: &u8, noalias optlen: &socklen_t) usize {
return syscall5(SYS_getsockopt, usize(fd), usize(level), usize(optname), @ptrToInt(optval), @ptrToInt(optlen));
pub fn getsockopt(fd: i32, level: u32, optname: u32, noalias optval: &u8, noalias optlen: &socklen_t) usize {
return syscall5(SYS_getsockopt, usize(fd), level, optname, @ptrToInt(optval), @ptrToInt(optlen));
}
pub fn sendmsg(fd: i32, msg: &const msghdr, flags: u32) usize {
@ -833,14 +833,14 @@ pub fn fstat(fd: i32, stat_buf: &Stat) usize {
return syscall2(SYS_fstat, usize(fd), @ptrToInt(stat_buf));
}
pub const epoll_data = extern union {
pub const epoll_data = packed union {
ptr: usize,
fd: i32,
@"u32": u32,
@"u64": u64,
};
pub const epoll_event = extern struct {
pub const epoll_event = packed struct {
events: u32,
data: epoll_data,
};

View File

@ -224,17 +224,3 @@ async fn printTrace(p: promise->error!void) void {
}
};
}
test "coroutine in a struct field" {
const Foo = struct {
bar: async fn() void,
};
var foo = Foo {
.bar = simpleAsyncFn2,
};
cancel try async<std.debug.global_allocator> foo.bar();
}
async fn simpleAsyncFn2() void {
suspend;
}