From d3d3e4e374e47b275dd3e0483634852b2d0a56d8 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Thu, 31 Oct 2019 11:41:39 -0400 Subject: [PATCH] startup code sets up event loop if I/O mode is declared evented --- lib/std/event/channel.zig | 105 +++++++++++--------------------------- lib/std/event/future.zig | 6 +-- lib/std/event/group.zig | 5 +- lib/std/event/lock.zig | 6 ++- lib/std/event/loop.zig | 33 ++++++------ lib/std/event/rwlock.zig | 6 ++- lib/std/special/start.zig | 39 ++++++++++++-- 7 files changed, 95 insertions(+), 105 deletions(-) diff --git a/lib/std/event/channel.zig b/lib/std/event/channel.zig index 88edc90f1..74a2fe27f 100644 --- a/lib/std/event/channel.zig +++ b/lib/std/event/channel.zig @@ -7,12 +7,8 @@ const Loop = std.event.Loop; /// Many producer, many consumer, thread-safe, runtime configurable buffer size. /// When buffer is empty, consumers suspend and are resumed by producers. /// When buffer is full, producers suspend and are resumed by consumers. -/// TODO now that async function rewrite has landed, this API should be adjusted -/// to not use the event loop's allocator, and to not require allocation. pub fn Channel(comptime T: type) type { return struct { - loop: *Loop, - getters: std.atomic.Queue(GetNode), or_null_queue: std.atomic.Queue(*std.atomic.Queue(GetNode).Node), putters: std.atomic.Queue(PutNode), @@ -50,16 +46,17 @@ pub fn Channel(comptime T: type) type { tick_node: *Loop.NextTickNode, }; - /// Call `destroy` when done. - pub fn create(loop: *Loop, capacity: usize) !*SelfChannel { - const buffer_nodes = try loop.allocator.alloc(T, capacity); - errdefer loop.allocator.free(buffer_nodes); + const global_event_loop = Loop.instance orelse + @compileError("std.event.Channel currently only works with event-based I/O"); - const self = try loop.allocator.create(SelfChannel); + /// Call `deinit` to free resources when done. + /// `buffer` must live until `deinit` is called. + /// For a zero length buffer, use `[0]T{}`. + /// TODO https://github.com/ziglang/zig/issues/2765 + pub fn init(self: *SelfChannel, buffer: []T) void { self.* = SelfChannel{ - .loop = loop, .buffer_len = 0, - .buffer_nodes = buffer_nodes, + .buffer_nodes = buffer, .buffer_index = 0, .dispatch_lock = 0, .need_dispatch = 0, @@ -69,21 +66,19 @@ pub fn Channel(comptime T: type) type { .get_count = 0, .put_count = 0, }; - errdefer loop.allocator.destroy(self); - - return self; } - /// must be called when all calls to put and get have suspended and no more calls occur - pub fn destroy(self: *SelfChannel) void { + /// Must be called when all calls to put and get have suspended and no more calls occur. + /// This can be omitted if caller can guarantee that the suspended putters and getters + /// do not need to be run to completion. Note that this may leave awaiters hanging. + pub fn deinit(self: *SelfChannel) void { while (self.getters.get()) |get_node| { resume get_node.data.tick_node.data; } while (self.putters.get()) |put_node| { resume put_node.data.tick_node.data; } - self.loop.allocator.free(self.buffer_nodes); - self.loop.allocator.destroy(self); + self.* = undefined; } /// puts a data item in the channel. The function returns when the value has been added to the @@ -96,17 +91,6 @@ pub fn Channel(comptime T: type) type { .data = data, }); - // TODO test canceling a put() - errdefer { - _ = @atomicRmw(usize, &self.put_count, .Sub, 1, .SeqCst); - const need_dispatch = !self.putters.remove(&queue_node); - self.loop.cancelOnNextTick(&my_tick_node); - if (need_dispatch) { - // oops we made the put_count incorrect for a period of time. fix by dispatching. - _ = @atomicRmw(usize, &self.put_count, .Add, 1, .SeqCst); - self.dispatch(); - } - } suspend { self.putters.put(&queue_node); _ = @atomicRmw(usize, &self.put_count, .Add, 1, .SeqCst); @@ -128,18 +112,6 @@ pub fn Channel(comptime T: type) type { }, }); - // TODO test canceling a get() - errdefer { - _ = @atomicRmw(usize, &self.get_count, .Sub, 1, .SeqCst); - const need_dispatch = !self.getters.remove(&queue_node); - self.loop.cancelOnNextTick(&my_tick_node); - if (need_dispatch) { - // oops we made the get_count incorrect for a period of time. fix by dispatching. - _ = @atomicRmw(usize, &self.get_count, .Add, 1, .SeqCst); - self.dispatch(); - } - } - suspend { self.getters.put(&queue_node); _ = @atomicRmw(usize, &self.get_count, .Add, 1, .SeqCst); @@ -158,11 +130,9 @@ pub fn Channel(comptime T: type) type { // } //} - /// Await this function to get an item from the channel. If the buffer is empty and there are no - /// puts waiting, this returns null. - /// Await is necessary for locking purposes. The function will be resumed after checking the channel - /// for data and will not wait for data to be available. - pub async fn getOrNull(self: *SelfChannel) ?T { + /// Get an item from the channel. If the buffer is empty and there are no + /// puts waiting, this returns `null`. + pub fn getOrNull(self: *SelfChannel) ?T { // TODO integrate this function with named return values // so we can get rid of this extra result copy var result: ?T = null; @@ -179,19 +149,6 @@ pub fn Channel(comptime T: type) type { }); or_null_node.data = &queue_node; - // TODO test canceling getOrNull - errdefer { - _ = self.or_null_queue.remove(&or_null_node); - _ = @atomicRmw(usize, &self.get_count, .Sub, 1, .SeqCst); - const need_dispatch = !self.getters.remove(&queue_node); - self.loop.cancelOnNextTick(&my_tick_node); - if (need_dispatch) { - // oops we made the get_count incorrect for a period of time. fix by dispatching. - _ = @atomicRmw(usize, &self.get_count, .Add, 1, .SeqCst); - self.dispatch(); - } - } - suspend { self.getters.put(&queue_node); _ = @atomicRmw(usize, &self.get_count, .Add, 1, .SeqCst); @@ -234,7 +191,7 @@ pub fn Channel(comptime T: type) type { info.ptr.* = self.buffer_nodes[self.buffer_index -% self.buffer_len]; }, } - self.loop.onNextTick(get_node.tick_node); + global_event_loop.onNextTick(get_node.tick_node); self.buffer_len -= 1; get_count = @atomicRmw(usize, &self.get_count, .Sub, 1, .SeqCst); @@ -254,8 +211,8 @@ pub fn Channel(comptime T: type) type { info.ptr.* = put_node.data; }, } - self.loop.onNextTick(get_node.tick_node); - self.loop.onNextTick(put_node.tick_node); + global_event_loop.onNextTick(get_node.tick_node); + global_event_loop.onNextTick(put_node.tick_node); get_count = @atomicRmw(usize, &self.get_count, .Sub, 1, .SeqCst); put_count = @atomicRmw(usize, &self.put_count, .Sub, 1, .SeqCst); @@ -266,7 +223,7 @@ pub fn Channel(comptime T: type) type { const put_node = &self.putters.get().?.data; self.buffer_nodes[self.buffer_index] = put_node.data; - self.loop.onNextTick(put_node.tick_node); + global_event_loop.onNextTick(put_node.tick_node); self.buffer_index +%= 1; self.buffer_len += 1; @@ -282,7 +239,7 @@ pub fn Channel(comptime T: type) type { var remove_count: usize = 0; while (self.or_null_queue.get()) |or_null_node| { remove_count += @boolToInt(self.getters.remove(or_null_node.data)); - self.loop.onNextTick(or_null_node.data.data.tick_node); + global_event_loop.onNextTick(or_null_node.data.data.tick_node); } if (remove_count != 0) { _ = @atomicRmw(usize, &self.get_count, .Sub, remove_count, .SeqCst); @@ -315,21 +272,21 @@ test "std.event.Channel" { // https://github.com/ziglang/zig/issues/3251 if (builtin.os == .freebsd) return error.SkipZigTest; - var loop: Loop = undefined; - // TODO make a multi threaded test - try loop.initSingleThreaded(std.heap.direct_allocator); - defer loop.deinit(); + // TODO provide a way to run tests in evented I/O mode + if (!std.io.is_async) return error.SkipZigTest; - const channel = try Channel(i32).create(&loop, 0); - defer channel.destroy(); + var channel: Channel(i32) = undefined; + channel.init([0]i32{}); + defer channel.deinit(); - const handle = async testChannelGetter(&loop, channel); - const putter = async testChannelPutter(channel); + var handle = async testChannelGetter(&channel); + var putter = async testChannelPutter(&channel); - loop.run(); + await handle; + await putter; } -async fn testChannelGetter(loop: *Loop, channel: *Channel(i32)) void { +async fn testChannelGetter(channel: *Channel(i32)) void { const value1 = channel.get(); testing.expect(value1 == 1234); diff --git a/lib/std/event/future.zig b/lib/std/event/future.zig index c5376bc9f..3e5754982 100644 --- a/lib/std/event/future.zig +++ b/lib/std/event/future.zig @@ -87,11 +87,11 @@ test "std.event.Future" { if (builtin.single_threaded) return error.SkipZigTest; // https://github.com/ziglang/zig/issues/3251 if (builtin.os == .freebsd) return error.SkipZigTest; - - const allocator = std.heap.direct_allocator; + // TODO provide a way to run tests in evented I/O mode + if (!std.io.is_async) return error.SkipZigTest; var loop: Loop = undefined; - try loop.initMultiThreaded(allocator); + try loop.initMultiThreaded(); defer loop.deinit(); const handle = async testFuture(&loop); diff --git a/lib/std/event/group.zig b/lib/std/event/group.zig index 95715edce..0f5978c3e 100644 --- a/lib/std/event/group.zig +++ b/lib/std/event/group.zig @@ -87,10 +87,11 @@ test "std.event.Group" { // https://github.com/ziglang/zig/issues/1908 if (builtin.single_threaded) return error.SkipZigTest; - const allocator = std.heap.direct_allocator; + // TODO provide a way to run tests in evented I/O mode + if (!std.io.is_async) return error.SkipZigTest; var loop: Loop = undefined; - try loop.initMultiThreaded(allocator); + try loop.initMultiThreaded(); defer loop.deinit(); const handle = async testGroup(&loop); diff --git a/lib/std/event/lock.zig b/lib/std/event/lock.zig index 57d295d6b..275b431ca 100644 --- a/lib/std/event/lock.zig +++ b/lib/std/event/lock.zig @@ -9,6 +9,7 @@ const Loop = std.event.Loop; /// Functions which are waiting for the lock are suspended, and /// are resumed when the lock is released, in order. /// Allows only one actor to hold the lock. +/// TODO: make this API also work in blocking I/O mode. pub const Lock = struct { loop: *Loop, shared_bit: u8, // TODO make this a bool @@ -125,10 +126,11 @@ test "std.event.Lock" { // TODO https://github.com/ziglang/zig/issues/3251 if (builtin.os == .freebsd) return error.SkipZigTest; - const allocator = std.heap.direct_allocator; + // TODO provide a way to run tests in evented I/O mode + if (!std.io.is_async) return error.SkipZigTest; var loop: Loop = undefined; - try loop.initMultiThreaded(allocator); + try loop.initMultiThreaded(); defer loop.deinit(); var lock = Lock.init(&loop); diff --git a/lib/std/event/loop.zig b/lib/std/event/loop.zig index ae8d76676..d5d73dabb 100644 --- a/lib/std/event/loop.zig +++ b/lib/std/event/loop.zig @@ -96,11 +96,11 @@ pub const Loop = struct { /// TODO copy elision / named return values so that the threads referencing *Loop /// have the correct pointer value. /// https://github.com/ziglang/zig/issues/2761 and https://github.com/ziglang/zig/issues/2765 - pub fn init(self: *Loop, allocator: *mem.Allocator) !void { + pub fn init(self: *Loop) !void { if (builtin.single_threaded) { - return self.initSingleThreaded(allocator); + return self.initSingleThreaded(); } else { - return self.initMultiThreaded(allocator); + return self.initMultiThreaded(); } } @@ -108,25 +108,28 @@ pub const Loop = struct { /// TODO copy elision / named return values so that the threads referencing *Loop /// have the correct pointer value. /// https://github.com/ziglang/zig/issues/2761 and https://github.com/ziglang/zig/issues/2765 - pub fn initSingleThreaded(self: *Loop, allocator: *mem.Allocator) !void { - return self.initInternal(allocator, 1); + pub fn initSingleThreaded(self: *Loop) !void { + return self.initThreadPool(1); } - /// The allocator must be thread-safe because we use it for multiplexing - /// async functions onto kernel threads. /// After initialization, call run(). + /// This is the same as `initThreadPool` using `Thread.cpuCount` to determine the thread + /// pool size. /// TODO copy elision / named return values so that the threads referencing *Loop /// have the correct pointer value. /// https://github.com/ziglang/zig/issues/2761 and https://github.com/ziglang/zig/issues/2765 - pub fn initMultiThreaded(self: *Loop, allocator: *mem.Allocator) !void { - if (builtin.single_threaded) @compileError("initMultiThreaded unavailable when building in single-threaded mode"); + pub fn initMultiThreaded(self: *Loop) !void { + if (builtin.single_threaded) + @compileError("initMultiThreaded unavailable when building in single-threaded mode"); const core_count = try Thread.cpuCount(); - return self.initInternal(allocator, core_count); + return self.initThreadPool(core_count); } /// Thread count is the total thread count. The thread pool size will be /// max(thread_count - 1, 0) - fn initInternal(self: *Loop, allocator: *mem.Allocator, thread_count: usize) !void { + pub fn initThreadPool(self: *Loop, thread_count: usize) !void { + // TODO: https://github.com/ziglang/zig/issues/3539 + const allocator = std.heap.direct_allocator; self.* = Loop{ .pending_event_count = 1, .allocator = allocator, @@ -932,10 +935,8 @@ test "std.event.Loop - basic" { // https://github.com/ziglang/zig/issues/1908 if (builtin.single_threaded) return error.SkipZigTest; - const allocator = std.heap.direct_allocator; - var loop: Loop = undefined; - try loop.initMultiThreaded(allocator); + try loop.initMultiThreaded(); defer loop.deinit(); loop.run(); @@ -945,10 +946,8 @@ test "std.event.Loop - call" { // https://github.com/ziglang/zig/issues/1908 if (builtin.single_threaded) return error.SkipZigTest; - const allocator = std.heap.direct_allocator; - var loop: Loop = undefined; - try loop.initMultiThreaded(allocator); + try loop.initMultiThreaded(); defer loop.deinit(); var did_it = false; diff --git a/lib/std/event/rwlock.zig b/lib/std/event/rwlock.zig index bf7ea0fa9..7f86b004b 100644 --- a/lib/std/event/rwlock.zig +++ b/lib/std/event/rwlock.zig @@ -11,6 +11,7 @@ const Loop = std.event.Loop; /// Many readers can hold the lock at the same time; however locking for writing is exclusive. /// When a read lock is held, it will not be released until the reader queue is empty. /// When a write lock is held, it will not be released until the writer queue is empty. +/// TODO: make this API also work in blocking I/O mode pub const RwLock = struct { loop: *Loop, shared_state: u8, // TODO make this an enum @@ -212,10 +213,11 @@ test "std.event.RwLock" { // https://github.com/ziglang/zig/issues/1908 if (builtin.single_threaded) return error.SkipZigTest; - const allocator = std.heap.direct_allocator; + // TODO provide a way to run tests in evented I/O mode + if (!std.io.is_async) return error.SkipZigTest; var loop: Loop = undefined; - try loop.initMultiThreaded(allocator); + try loop.initMultiThreaded(); defer loop.deinit(); var lock = RwLock.init(&loop); diff --git a/lib/std/special/start.zig b/lib/std/special/start.zig index bb28646fd..cdf09606a 100644 --- a/lib/std/special/start.zig +++ b/lib/std/special/start.zig @@ -35,7 +35,9 @@ comptime { } extern fn wasm_freestanding_start() void { - _ = callMain(); + // This is marked inline because for some reason LLVM in release mode fails to inline it, + // and we want fewer call frames in stack traces. + _ = @inlineCall(callMain); } extern fn EfiMain(handle: uefi.Handle, system_table: *uefi.tables.SystemTable) usize { @@ -63,7 +65,9 @@ extern fn EfiMain(handle: uefi.Handle, system_table: *uefi.tables.SystemTable) u nakedcc fn _start() noreturn { if (builtin.os == builtin.Os.wasi) { - std.os.wasi.proc_exit(callMain()); + // This is marked inline because for some reason LLVM in release mode fails to inline it, + // and we want fewer call frames in stack traces. + std.os.wasi.proc_exit(@inlineCall(callMain)); } switch (builtin.arch) { @@ -110,7 +114,7 @@ extern fn WinMainCRTStartup() noreturn { std.debug.maybeEnableSegfaultHandler(); - std.os.windows.kernel32.ExitProcess(callMain()); + std.os.windows.kernel32.ExitProcess(initEventLoopAndCallMain()); } // TODO https://github.com/ziglang/zig/issues/265 @@ -170,7 +174,7 @@ fn callMainWithArgs(argc: usize, argv: [*][*]u8, envp: [][*]u8) u8 { std.debug.maybeEnableSegfaultHandler(); - return callMain(); + return initEventLoopAndCallMain(); } extern fn main(c_argc: i32, c_argv: [*][*]u8, c_envp: [*]?[*]u8) i32 { @@ -185,7 +189,32 @@ const bad_main_ret = "expected return type of main to be 'void', '!void', 'noret // This is marked inline because for some reason LLVM in release mode fails to inline it, // and we want fewer call frames in stack traces. -inline fn callMain() u8 { +inline fn initEventLoopAndCallMain() u8 { + if (std.event.Loop.instance) |loop| { + loop.init() catch |err| { + std.debug.warn("error: {}\n", @errorName(err)); + if (@errorReturnTrace()) |trace| { + std.debug.dumpStackTrace(trace.*); + } + return 1; + }; + defer loop.deinit(); + + var result: u8 = undefined; + var frame: @Frame(callMain) = undefined; + _ = @asyncCall(&frame, &result, callMain); + loop.run(); + return result; + } else { + // This is marked inline because for some reason LLVM in release mode fails to inline it, + // and we want fewer call frames in stack traces. + return @inlineCall(callMain); + } +} + +// This is not marked inline because it is called with @asyncCall when +// there is an event loop. +fn callMain() u8 { switch (@typeInfo(@typeOf(root.main).ReturnType)) { .NoReturn => { root.main();