use global event loop in std.event types

This commit is contained in:
Vexu 2019-11-06 21:41:35 +02:00 committed by Andrew Kelley
parent 913f7d0450
commit 4530adbd33
6 changed files with 55 additions and 77 deletions

View File

@ -3,7 +3,6 @@ const assert = std.debug.assert;
const testing = std.testing; const testing = std.testing;
const builtin = @import("builtin"); const builtin = @import("builtin");
const Lock = std.event.Lock; const Lock = std.event.Lock;
const Loop = std.event.Loop;
/// This is a value that starts out unavailable, until resolve() is called /// This is a value that starts out unavailable, until resolve() is called
/// While it is unavailable, functions suspend when they try to get() it, /// While it is unavailable, functions suspend when they try to get() it,
@ -23,9 +22,9 @@ pub fn Future(comptime T: type) type {
const Self = @This(); const Self = @This();
const Queue = std.atomic.Queue(anyframe); const Queue = std.atomic.Queue(anyframe);
pub fn init(loop: *Loop) Self { pub fn init() Self {
return Self{ return Self{
.lock = Lock.initLocked(loop), .lock = Lock.initLocked(),
.available = 0, .available = 0,
.data = undefined, .data = undefined,
}; };
@ -90,17 +89,11 @@ test "std.event.Future" {
// TODO provide a way to run tests in evented I/O mode // TODO provide a way to run tests in evented I/O mode
if (!std.io.is_async) return error.SkipZigTest; if (!std.io.is_async) return error.SkipZigTest;
var loop: Loop = undefined; const handle = async testFuture();
try loop.initMultiThreaded();
defer loop.deinit();
const handle = async testFuture(&loop);
loop.run();
} }
fn testFuture(loop: *Loop) void { fn testFuture() void {
var future = Future(i32).init(loop); var future = Future(i32).init();
var a = async waitOnFuture(&future); var a = async waitOnFuture(&future);
var b = async waitOnFuture(&future); var b = async waitOnFuture(&future);

View File

@ -1,8 +1,8 @@
const std = @import("../std.zig"); const std = @import("../std.zig");
const builtin = @import("builtin"); const builtin = @import("builtin");
const Lock = std.event.Lock; const Lock = std.event.Lock;
const Loop = std.event.Loop;
const testing = std.testing; const testing = std.testing;
const Allocator = std.mem.Allocator;
/// ReturnType must be `void` or `E!void` /// ReturnType must be `void` or `E!void`
pub fn Group(comptime ReturnType: type) type { pub fn Group(comptime ReturnType: type) type {
@ -10,6 +10,7 @@ pub fn Group(comptime ReturnType: type) type {
frame_stack: Stack, frame_stack: Stack,
alloc_stack: Stack, alloc_stack: Stack,
lock: Lock, lock: Lock,
allocator: *Allocator,
const Self = @This(); const Self = @This();
@ -19,17 +20,18 @@ pub fn Group(comptime ReturnType: type) type {
}; };
const Stack = std.atomic.Stack(anyframe->ReturnType); const Stack = std.atomic.Stack(anyframe->ReturnType);
pub fn init(loop: *Loop) Self { pub fn init(allocator: *Allocator) Self {
return Self{ return Self{
.frame_stack = Stack.init(), .frame_stack = Stack.init(),
.alloc_stack = Stack.init(), .alloc_stack = Stack.init(),
.lock = Lock.init(loop), .lock = Lock.init(),
.allocator = allocator,
}; };
} }
/// Add a frame to the group. Thread-safe. /// Add a frame to the group. Thread-safe.
pub fn add(self: *Self, handle: anyframe->ReturnType) (error{OutOfMemory}!void) { pub fn add(self: *Self, handle: anyframe->ReturnType) (error{OutOfMemory}!void) {
const node = try self.lock.loop.allocator.create(Stack.Node); const node = try self.allocator.create(Stack.Node);
node.* = Stack.Node{ node.* = Stack.Node{
.next = undefined, .next = undefined,
.data = handle, .data = handle,
@ -66,7 +68,7 @@ pub fn Group(comptime ReturnType: type) type {
} }
while (self.alloc_stack.pop()) |node| { while (self.alloc_stack.pop()) |node| {
const handle = node.data; const handle = node.data;
self.lock.loop.allocator.destroy(node); self.allocator.destroy(node);
if (Error == void) { if (Error == void) {
await handle; await handle;
} else { } else {
@ -87,18 +89,12 @@ test "std.event.Group" {
// TODO provide a way to run tests in evented I/O mode // TODO provide a way to run tests in evented I/O mode
if (!std.io.is_async) return error.SkipZigTest; if (!std.io.is_async) return error.SkipZigTest;
var loop: Loop = undefined; const handle = async testGroup(std.heap.direct_allocator);
try loop.initMultiThreaded();
defer loop.deinit();
const handle = async testGroup(&loop);
loop.run();
} }
async fn testGroup(loop: *Loop) void { async fn testGroup(allocator: *Allocator) void {
var count: usize = 0; var count: usize = 0;
var group = Group(void).init(loop); var group = Group(void).init(allocator);
var sleep_a_little_frame = async sleepALittle(&count); var sleep_a_little_frame = async sleepALittle(&count);
group.add(&sleep_a_little_frame) catch @panic("memory"); group.add(&sleep_a_little_frame) catch @panic("memory");
var increase_by_ten_frame = async increaseByTen(&count); var increase_by_ten_frame = async increaseByTen(&count);
@ -106,7 +102,7 @@ async fn testGroup(loop: *Loop) void {
group.wait(); group.wait();
testing.expect(count == 11); testing.expect(count == 11);
var another = Group(anyerror!void).init(loop); var another = Group(anyerror!void).init(allocator);
var something_else_frame = async somethingElse(); var something_else_frame = async somethingElse();
another.add(&something_else_frame) catch @panic("memory"); another.add(&something_else_frame) catch @panic("memory");
var something_that_fails_frame = async doSomethingThatFails(); var something_that_fails_frame = async doSomethingThatFails();

View File

@ -11,20 +11,22 @@ const Loop = std.event.Loop;
/// Allows only one actor to hold the lock. /// Allows only one actor to hold the lock.
/// TODO: make this API also work in blocking I/O mode. /// TODO: make this API also work in blocking I/O mode.
pub const Lock = struct { pub const Lock = struct {
loop: *Loop,
shared_bit: u8, // TODO make this a bool shared_bit: u8, // TODO make this a bool
queue: Queue, queue: Queue,
queue_empty_bit: u8, // TODO make this a bool queue_empty_bit: u8, // TODO make this a bool
const Queue = std.atomic.Queue(anyframe); const Queue = std.atomic.Queue(anyframe);
const global_event_loop = Loop.instance orelse
@compileError("std.event.Lock currently only works with event-based I/O");
pub const Held = struct { pub const Held = struct {
lock: *Lock, lock: *Lock,
pub fn release(self: Held) void { pub fn release(self: Held) void {
// Resume the next item from the queue. // Resume the next item from the queue.
if (self.lock.queue.get()) |node| { if (self.lock.queue.get()) |node| {
self.lock.loop.onNextTick(node); global_event_loop.onNextTick(node);
return; return;
} }
@ -49,7 +51,7 @@ pub const Lock = struct {
// Resume the next item from the queue. // Resume the next item from the queue.
if (self.lock.queue.get()) |node| { if (self.lock.queue.get()) |node| {
self.lock.loop.onNextTick(node); global_event_loop.onNextTick(node);
return; return;
} }
@ -65,18 +67,16 @@ pub const Lock = struct {
} }
}; };
pub fn init(loop: *Loop) Lock { pub fn init() Lock {
return Lock{ return Lock{
.loop = loop,
.shared_bit = 0, .shared_bit = 0,
.queue = Queue.init(), .queue = Queue.init(),
.queue_empty_bit = 1, .queue_empty_bit = 1,
}; };
} }
pub fn initLocked(loop: *Loop) Lock { pub fn initLocked() Lock {
return Lock{ return Lock{
.loop = loop,
.shared_bit = 1, .shared_bit = 1,
.queue = Queue.init(), .queue = Queue.init(),
.queue_empty_bit = 1, .queue_empty_bit = 1,
@ -126,27 +126,22 @@ test "std.event.Lock" {
// TODO provide a way to run tests in evented I/O mode // TODO provide a way to run tests in evented I/O mode
if (!std.io.is_async) return error.SkipZigTest; if (!std.io.is_async) return error.SkipZigTest;
var loop: Loop = undefined; var lock = Lock.init();
try loop.initMultiThreaded();
defer loop.deinit();
var lock = Lock.init(&loop);
defer lock.deinit(); defer lock.deinit();
_ = async testLock(&loop, &lock); _ = async testLock(&lock);
loop.run();
testing.expectEqualSlices(i32, [1]i32{3 * @intCast(i32, shared_test_data.len)} ** shared_test_data.len, shared_test_data); testing.expectEqualSlices(i32, [1]i32{3 * @intCast(i32, shared_test_data.len)} ** shared_test_data.len, shared_test_data);
} }
async fn testLock(loop: *Loop, lock: *Lock) void { async fn testLock(lock: *Lock) void {
var handle1 = async lockRunner(lock); var handle1 = async lockRunner(lock);
var tick_node1 = Loop.NextTickNode{ var tick_node1 = Loop.NextTickNode{
.prev = undefined, .prev = undefined,
.next = undefined, .next = undefined,
.data = &handle1, .data = &handle1,
}; };
loop.onNextTick(&tick_node1); Loop.instance.?.onNextTick(&tick_node1);
var handle2 = async lockRunner(lock); var handle2 = async lockRunner(lock);
var tick_node2 = Loop.NextTickNode{ var tick_node2 = Loop.NextTickNode{
@ -154,7 +149,7 @@ async fn testLock(loop: *Loop, lock: *Lock) void {
.next = undefined, .next = undefined,
.data = &handle2, .data = &handle2,
}; };
loop.onNextTick(&tick_node2); Loop.instance.?.onNextTick(&tick_node2);
var handle3 = async lockRunner(lock); var handle3 = async lockRunner(lock);
var tick_node3 = Loop.NextTickNode{ var tick_node3 = Loop.NextTickNode{
@ -162,7 +157,7 @@ async fn testLock(loop: *Loop, lock: *Lock) void {
.next = undefined, .next = undefined,
.data = &handle3, .data = &handle3,
}; };
loop.onNextTick(&tick_node3); Loop.instance.?.onNextTick(&tick_node3);
await handle1; await handle1;
await handle2; await handle2;

View File

@ -1,6 +1,5 @@
const std = @import("../std.zig"); const std = @import("../std.zig");
const Lock = std.event.Lock; const Lock = std.event.Lock;
const Loop = std.event.Loop;
/// Thread-safe async/await lock that protects one piece of data. /// Thread-safe async/await lock that protects one piece of data.
/// Functions which are waiting for the lock are suspended, and /// Functions which are waiting for the lock are suspended, and
@ -21,9 +20,9 @@ pub fn Locked(comptime T: type) type {
} }
}; };
pub fn init(loop: *Loop, data: T) Self { pub fn init(data: T) Self {
return Self{ return Self{
.lock = Lock.init(loop), .lock = Lock.init(),
.private_data = data, .private_data = data,
}; };
} }
@ -35,7 +34,7 @@ pub fn Locked(comptime T: type) type {
pub async fn acquire(self: *Self) HeldLock { pub async fn acquire(self: *Self) HeldLock {
return HeldLock{ return HeldLock{
// TODO guaranteed allocation elision // TODO guaranteed allocation elision
.held = await (async self.lock.acquire() catch unreachable), .held = self.lock.acquire(),
.value = &self.private_data, .value = &self.private_data,
}; };
} }

View File

@ -13,7 +13,6 @@ const Loop = std.event.Loop;
/// When a write lock is held, it will not be released until the writer queue is empty. /// When a write lock is held, it will not be released until the writer queue is empty.
/// TODO: make this API also work in blocking I/O mode /// TODO: make this API also work in blocking I/O mode
pub const RwLock = struct { pub const RwLock = struct {
loop: *Loop,
shared_state: u8, // TODO make this an enum shared_state: u8, // TODO make this an enum
writer_queue: Queue, writer_queue: Queue,
reader_queue: Queue, reader_queue: Queue,
@ -29,6 +28,9 @@ pub const RwLock = struct {
const Queue = std.atomic.Queue(anyframe); const Queue = std.atomic.Queue(anyframe);
const global_event_loop = Loop.instance orelse
@compileError("std.event.RwLock currently only works with event-based I/O");
pub const HeldRead = struct { pub const HeldRead = struct {
lock: *RwLock, lock: *RwLock,
@ -55,7 +57,7 @@ pub const RwLock = struct {
// See if we can leave it locked for writing, and pass the lock to the next writer // See if we can leave it locked for writing, and pass the lock to the next writer
// in the queue to grab the lock. // in the queue to grab the lock.
if (self.lock.writer_queue.get()) |node| { if (self.lock.writer_queue.get()) |node| {
self.lock.loop.onNextTick(node); global_event_loop.onNextTick(node);
return; return;
} }
@ -64,7 +66,7 @@ pub const RwLock = struct {
// Switch to a read lock. // Switch to a read lock.
_ = @atomicRmw(u8, &self.lock.shared_state, .Xchg, State.ReadLock, .SeqCst); _ = @atomicRmw(u8, &self.lock.shared_state, .Xchg, State.ReadLock, .SeqCst);
while (self.lock.reader_queue.get()) |node| { while (self.lock.reader_queue.get()) |node| {
self.lock.loop.onNextTick(node); global_event_loop.onNextTick(node);
} }
return; return;
} }
@ -76,9 +78,8 @@ pub const RwLock = struct {
} }
}; };
pub fn init(loop: *Loop) RwLock { pub fn init() RwLock {
return RwLock{ return RwLock{
.loop = loop,
.shared_state = State.Unlocked, .shared_state = State.Unlocked,
.writer_queue = Queue.init(), .writer_queue = Queue.init(),
.writer_queue_empty_bit = 1, .writer_queue_empty_bit = 1,
@ -120,7 +121,7 @@ pub const RwLock = struct {
// Give out all the read locks. // Give out all the read locks.
if (self.reader_queue.get()) |first_node| { if (self.reader_queue.get()) |first_node| {
while (self.reader_queue.get()) |node| { while (self.reader_queue.get()) |node| {
self.loop.onNextTick(node); global_event_loop.onNextTick(node);
} }
resume first_node.data; resume first_node.data;
} }
@ -171,7 +172,7 @@ pub const RwLock = struct {
} }
// If there's an item in the writer queue, give them the lock, and we're done. // If there's an item in the writer queue, give them the lock, and we're done.
if (self.writer_queue.get()) |node| { if (self.writer_queue.get()) |node| {
self.loop.onNextTick(node); global_event_loop.onNextTick(node);
return; return;
} }
// Release the lock again. // Release the lock again.
@ -187,9 +188,9 @@ pub const RwLock = struct {
} }
// If there are any items in the reader queue, give out all the reader locks, and we're done. // If there are any items in the reader queue, give out all the reader locks, and we're done.
if (self.reader_queue.get()) |first_node| { if (self.reader_queue.get()) |first_node| {
self.loop.onNextTick(first_node); global_event_loop.onNextTick(first_node);
while (self.reader_queue.get()) |node| { while (self.reader_queue.get()) |node| {
self.loop.onNextTick(node); global_event_loop.onNextTick(node);
} }
return; return;
} }
@ -216,46 +217,41 @@ test "std.event.RwLock" {
// TODO provide a way to run tests in evented I/O mode // TODO provide a way to run tests in evented I/O mode
if (!std.io.is_async) return error.SkipZigTest; if (!std.io.is_async) return error.SkipZigTest;
var loop: Loop = undefined; var lock = RwLock.init();
try loop.initMultiThreaded();
defer loop.deinit();
var lock = RwLock.init(&loop);
defer lock.deinit(); defer lock.deinit();
const handle = testLock(&loop, &lock); const handle = testLock(std.heap.direct_allocator, &lock);
loop.run();
const expected_result = [1]i32{shared_it_count * @intCast(i32, shared_test_data.len)} ** shared_test_data.len; const expected_result = [1]i32{shared_it_count * @intCast(i32, shared_test_data.len)} ** shared_test_data.len;
testing.expectEqualSlices(i32, expected_result, shared_test_data); testing.expectEqualSlices(i32, expected_result, shared_test_data);
} }
async fn testLock(loop: *Loop, lock: *RwLock) void { async fn testLock(allocator: *Allocator, lock: *RwLock) void {
var read_nodes: [100]Loop.NextTickNode = undefined; var read_nodes: [100]Loop.NextTickNode = undefined;
for (read_nodes) |*read_node| { for (read_nodes) |*read_node| {
const frame = loop.allocator.create(@Frame(readRunner)) catch @panic("memory"); const frame = allocator.create(@Frame(readRunner)) catch @panic("memory");
read_node.data = frame; read_node.data = frame;
frame.* = async readRunner(lock); frame.* = async readRunner(lock);
loop.onNextTick(read_node); Loop.instance.?.onNextTick(read_node);
} }
var write_nodes: [shared_it_count]Loop.NextTickNode = undefined; var write_nodes: [shared_it_count]Loop.NextTickNode = undefined;
for (write_nodes) |*write_node| { for (write_nodes) |*write_node| {
const frame = loop.allocator.create(@Frame(writeRunner)) catch @panic("memory"); const frame = allocator.create(@Frame(writeRunner)) catch @panic("memory");
write_node.data = frame; write_node.data = frame;
frame.* = async writeRunner(lock); frame.* = async writeRunner(lock);
loop.onNextTick(write_node); Loop.instance.?.onNextTick(write_node);
} }
for (write_nodes) |*write_node| { for (write_nodes) |*write_node| {
const casted = @ptrCast(*const @Frame(writeRunner), write_node.data); const casted = @ptrCast(*const @Frame(writeRunner), write_node.data);
await casted; await casted;
loop.allocator.destroy(casted); allocator.destroy(casted);
} }
for (read_nodes) |*read_node| { for (read_nodes) |*read_node| {
const casted = @ptrCast(*const @Frame(readRunner), read_node.data); const casted = @ptrCast(*const @Frame(readRunner), read_node.data);
await casted; await casted;
loop.allocator.destroy(casted); allocator.destroy(casted);
} }
} }

View File

@ -1,6 +1,5 @@
const std = @import("../std.zig"); const std = @import("../std.zig");
const RwLock = std.event.RwLock; const RwLock = std.event.RwLock;
const Loop = std.event.Loop;
/// Thread-safe async/await RW lock that protects one piece of data. /// Thread-safe async/await RW lock that protects one piece of data.
/// Functions which are waiting for the lock are suspended, and /// Functions which are waiting for the lock are suspended, and
@ -30,9 +29,9 @@ pub fn RwLocked(comptime T: type) type {
} }
}; };
pub fn init(loop: *Loop, data: T) Self { pub fn init(data: T) Self {
return Self{ return Self{
.lock = RwLock.init(loop), .lock = RwLock.init(),
.locked_data = data, .locked_data = data,
}; };
} }
@ -43,14 +42,14 @@ pub fn RwLocked(comptime T: type) type {
pub async fn acquireRead(self: *Self) HeldReadLock { pub async fn acquireRead(self: *Self) HeldReadLock {
return HeldReadLock{ return HeldReadLock{
.held = await (async self.lock.acquireRead() catch unreachable), .held = self.lock.acquireRead(),
.value = &self.locked_data, .value = &self.locked_data,
}; };
} }
pub async fn acquireWrite(self: *Self) HeldWriteLock { pub async fn acquireWrite(self: *Self) HeldWriteLock {
return HeldWriteLock{ return HeldWriteLock{
.held = await (async self.lock.acquireWrite() catch unreachable), .held = self.lock.acquireWrite(),
.value = &self.locked_data, .value = &self.locked_data,
}; };
} }