236 lines
8.3 KiB
Zig
236 lines
8.3 KiB
Zig
const std = @import("index.zig");
|
|
const builtin = @import("builtin");
|
|
const assert = std.debug.assert;
|
|
const event = this;
|
|
const mem = std.mem;
|
|
const posix = std.os.posix;
|
|
|
|
pub const TcpServer = struct {
|
|
handleRequestFn: async<&mem.Allocator> fn (&TcpServer, &const std.net.Address, &const std.os.File) void,
|
|
|
|
loop: &Loop,
|
|
sockfd: i32,
|
|
accept_coro: ?promise,
|
|
listen_address: std.net.Address,
|
|
|
|
waiting_for_emfile_node: PromiseNode,
|
|
|
|
const PromiseNode = std.LinkedList(promise).Node;
|
|
|
|
pub fn init(loop: &Loop) !TcpServer {
|
|
const sockfd = try std.os.posixSocket(posix.AF_INET,
|
|
posix.SOCK_STREAM|posix.SOCK_CLOEXEC|posix.SOCK_NONBLOCK,
|
|
posix.PROTO_tcp);
|
|
errdefer std.os.close(sockfd);
|
|
|
|
// TODO can't initialize handler coroutine here because we need well defined copy elision
|
|
return TcpServer {
|
|
.loop = loop,
|
|
.sockfd = sockfd,
|
|
.accept_coro = null,
|
|
.handleRequestFn = undefined,
|
|
.waiting_for_emfile_node = undefined,
|
|
.listen_address = undefined,
|
|
};
|
|
}
|
|
|
|
pub fn listen(self: &TcpServer, address: &const std.net.Address,
|
|
handleRequestFn: async<&mem.Allocator> fn (&TcpServer, &const std.net.Address, &const std.os.File)void) !void
|
|
{
|
|
self.handleRequestFn = handleRequestFn;
|
|
|
|
try std.os.posixBind(self.sockfd, &address.os_addr);
|
|
try std.os.posixListen(self.sockfd, posix.SOMAXCONN);
|
|
self.listen_address = std.net.Address.initPosix(try std.os.posixGetSockName(self.sockfd));
|
|
|
|
self.accept_coro = try async<self.loop.allocator> TcpServer.handler(self);
|
|
errdefer cancel ??self.accept_coro;
|
|
|
|
try self.loop.addFd(self.sockfd, ??self.accept_coro);
|
|
errdefer self.loop.removeFd(self.sockfd);
|
|
|
|
}
|
|
|
|
pub fn deinit(self: &TcpServer) void {
|
|
self.loop.removeFd(self.sockfd);
|
|
if (self.accept_coro) |accept_coro| cancel accept_coro;
|
|
std.os.close(self.sockfd);
|
|
}
|
|
|
|
pub async fn handler(self: &TcpServer) void {
|
|
while (true) {
|
|
var accepted_addr: std.net.Address = undefined;
|
|
if (std.os.posixAccept(self.sockfd, &accepted_addr.os_addr,
|
|
posix.SOCK_NONBLOCK | posix.SOCK_CLOEXEC)) |accepted_fd|
|
|
{
|
|
var socket = std.os.File.openHandle(accepted_fd);
|
|
_ = async<self.loop.allocator> self.handleRequestFn(self, accepted_addr, socket) catch |err| switch (err) {
|
|
error.OutOfMemory => {
|
|
socket.close();
|
|
continue;
|
|
},
|
|
};
|
|
} else |err| switch (err) {
|
|
error.WouldBlock => {
|
|
suspend; // we will get resumed by epoll_wait in the event loop
|
|
continue;
|
|
},
|
|
error.ProcessFdQuotaExceeded => {
|
|
errdefer std.os.emfile_promise_queue.remove(&self.waiting_for_emfile_node);
|
|
suspend |p| {
|
|
self.waiting_for_emfile_node = PromiseNode.init(p);
|
|
std.os.emfile_promise_queue.append(&self.waiting_for_emfile_node);
|
|
}
|
|
continue;
|
|
},
|
|
error.ConnectionAborted,
|
|
error.FileDescriptorClosed => continue,
|
|
|
|
error.PageFault => unreachable,
|
|
error.InvalidSyscall => unreachable,
|
|
error.FileDescriptorNotASocket => unreachable,
|
|
error.OperationNotSupported => unreachable,
|
|
|
|
error.SystemFdQuotaExceeded,
|
|
error.SystemResources,
|
|
error.ProtocolFailure,
|
|
error.BlockedByFirewall,
|
|
error.Unexpected => {
|
|
@panic("TODO handle this error");
|
|
},
|
|
}
|
|
}
|
|
}
|
|
};
|
|
|
|
pub const Loop = struct {
|
|
allocator: &mem.Allocator,
|
|
epollfd: i32,
|
|
keep_running: bool,
|
|
|
|
fn init(allocator: &mem.Allocator) !Loop {
|
|
const epollfd = try std.os.linuxEpollCreate(std.os.linux.EPOLL_CLOEXEC);
|
|
return Loop {
|
|
.keep_running = true,
|
|
.allocator = allocator,
|
|
.epollfd = epollfd,
|
|
};
|
|
}
|
|
|
|
pub fn addFd(self: &Loop, fd: i32, prom: promise) !void {
|
|
var ev = std.os.linux.epoll_event {
|
|
.events = std.os.linux.EPOLLIN|std.os.linux.EPOLLOUT|std.os.linux.EPOLLET,
|
|
.data = std.os.linux.epoll_data {
|
|
.ptr = @ptrToInt(prom),
|
|
},
|
|
};
|
|
try std.os.linuxEpollCtl(self.epollfd, std.os.linux.EPOLL_CTL_ADD, fd, &ev);
|
|
}
|
|
|
|
pub fn removeFd(self: &Loop, fd: i32) void {
|
|
std.os.linuxEpollCtl(self.epollfd, std.os.linux.EPOLL_CTL_DEL, fd, undefined) catch {};
|
|
}
|
|
|
|
async fn waitFd(self: &Loop, fd: i32) !void {
|
|
defer self.removeFd(fd);
|
|
suspend |p| {
|
|
try self.addFd(fd, p);
|
|
}
|
|
}
|
|
|
|
pub fn stop(self: &Loop) void {
|
|
// TODO make atomic
|
|
self.keep_running = false;
|
|
// TODO activate an fd in the epoll set
|
|
}
|
|
|
|
pub fn run(self: &Loop) void {
|
|
while (self.keep_running) {
|
|
var events: [16]std.os.linux.epoll_event = undefined;
|
|
const count = std.os.linuxEpollWait(self.epollfd, events[0..], -1);
|
|
for (events[0..count]) |ev| {
|
|
const p = @intToPtr(promise, ev.data.ptr);
|
|
resume p;
|
|
}
|
|
}
|
|
}
|
|
};
|
|
|
|
pub async fn connect(loop: &Loop, _address: &const std.net.Address) !std.os.File {
|
|
var address = *_address; // TODO https://github.com/zig-lang/zig/issues/733
|
|
|
|
const sockfd = try std.os.posixSocket(posix.AF_INET, posix.SOCK_STREAM|posix.SOCK_CLOEXEC|posix.SOCK_NONBLOCK, posix.PROTO_tcp);
|
|
errdefer std.os.close(sockfd);
|
|
|
|
try std.os.posixConnectAsync(sockfd, &address.os_addr);
|
|
try await try async loop.waitFd(sockfd);
|
|
try std.os.posixGetSockOptConnectError(sockfd);
|
|
|
|
return std.os.File.openHandle(sockfd);
|
|
}
|
|
|
|
test "listen on a port, send bytes, receive bytes" {
|
|
if (builtin.os != builtin.Os.linux) {
|
|
// TODO build abstractions for other operating systems
|
|
return;
|
|
}
|
|
const MyServer = struct {
|
|
tcp_server: TcpServer,
|
|
|
|
const Self = this;
|
|
|
|
async<&mem.Allocator> fn handler(tcp_server: &TcpServer, _addr: &const std.net.Address,
|
|
_socket: &const std.os.File) void
|
|
{
|
|
const self = @fieldParentPtr(Self, "tcp_server", tcp_server);
|
|
var socket = *_socket; // TODO https://github.com/zig-lang/zig/issues/733
|
|
defer socket.close();
|
|
const next_handler = async errorableHandler(self, _addr, socket) catch |err| switch (err) {
|
|
error.OutOfMemory => @panic("unable to handle connection: out of memory"),
|
|
};
|
|
(await next_handler) catch |err| {
|
|
std.debug.panic("unable to handle connection: {}\n", err);
|
|
};
|
|
suspend |p| { cancel p; }
|
|
}
|
|
|
|
async fn errorableHandler(self: &Self, _addr: &const std.net.Address,
|
|
_socket: &const std.os.File) !void
|
|
{
|
|
const addr = *_addr; // TODO https://github.com/zig-lang/zig/issues/733
|
|
var socket = *_socket; // TODO https://github.com/zig-lang/zig/issues/733
|
|
|
|
var adapter = std.io.FileOutStream.init(&socket);
|
|
var stream = &adapter.stream;
|
|
try stream.print("hello from server\n");
|
|
}
|
|
};
|
|
|
|
const ip4addr = std.net.parseIp4("127.0.0.1") catch unreachable;
|
|
const addr = std.net.Address.initIp4(ip4addr, 0);
|
|
|
|
var loop = try Loop.init(std.debug.global_allocator);
|
|
var server = MyServer {
|
|
.tcp_server = try TcpServer.init(&loop),
|
|
};
|
|
defer server.tcp_server.deinit();
|
|
try server.tcp_server.listen(addr, MyServer.handler);
|
|
|
|
const p = try async<std.debug.global_allocator> doAsyncTest(&loop, server.tcp_server.listen_address);
|
|
defer cancel p;
|
|
loop.run();
|
|
}
|
|
|
|
async fn doAsyncTest(loop: &Loop, address: &const std.net.Address) void {
|
|
errdefer @panic("test failure");
|
|
|
|
var socket_file = try await try async event.connect(loop, address);
|
|
defer socket_file.close();
|
|
|
|
var buf: [512]u8 = undefined;
|
|
const amt_read = try socket_file.read(buf[0..]);
|
|
const msg = buf[0..amt_read];
|
|
assert(mem.eql(u8, msg, "hello from server\n"));
|
|
loop.stop();
|
|
}
|