diff --git a/lib/std/event/lock.zig b/lib/std/event/lock.zig index d27a12aef..a83395d7d 100644 --- a/lib/std/event/lock.zig +++ b/lib/std/event/lock.zig @@ -16,107 +16,107 @@ const Loop = std.event.Loop; /// Allows only one actor to hold the lock. /// TODO: make this API also work in blocking I/O mode. pub const Lock = struct { - shared: bool, - queue: Queue, - queue_empty: bool, + mutex: std.Mutex = std.Mutex{}, + head: usize = UNLOCKED, - const Queue = std.atomic.Queue(anyframe); + const UNLOCKED = 0; + const LOCKED = 1; const global_event_loop = Loop.instance orelse @compileError("std.event.Lock currently only works with event-based I/O"); - pub const Held = struct { - lock: *Lock, - - pub fn release(self: Held) void { - // Resume the next item from the queue. - if (self.lock.queue.get()) |node| { - global_event_loop.onNextTick(node); - return; - } - - // We need to release the lock. - @atomicStore(bool, &self.lock.queue_empty, true, .SeqCst); - @atomicStore(bool, &self.lock.shared, false, .SeqCst); - - // There might be a queue item. If we know the queue is empty, we can be done, - // because the other actor will try to obtain the lock. - // But if there's a queue item, we are the actor which must loop and attempt - // to grab the lock again. - if (@atomicLoad(bool, &self.lock.queue_empty, .SeqCst)) { - return; - } - - while (true) { - if (@atomicRmw(bool, &self.lock.shared, .Xchg, true, .SeqCst)) { - // We did not obtain the lock. Great, the queue is someone else's problem. - return; - } - - // Resume the next item from the queue. - if (self.lock.queue.get()) |node| { - global_event_loop.onNextTick(node); - return; - } - - // Release the lock again. - @atomicStore(bool, &self.lock.queue_empty, true, .SeqCst); - @atomicStore(bool, &self.lock.shared, false, .SeqCst); - - // Find out if we can be done. - if (@atomicLoad(bool, &self.lock.queue_empty, .SeqCst)) { - return; - } - } - } + const Waiter = struct { + // forced Waiter alignment to ensure it doesn't clash with LOCKED + next: ?*Waiter align(2), + tail: *Waiter, + node: Loop.NextTickNode, }; - pub fn init() Lock { - return Lock{ - .shared = false, - .queue = Queue.init(), - .queue_empty = true, + pub fn acquire(self: *Lock) Held { + const held = self.mutex.acquire(); + + // self.head transitions from multiple stages depending on the value: + // UNLOCKED -> LOCKED: + // acquire Lock ownership when theres no waiters + // LOCKED -> : + // Lock is already owned, enqueue first Waiter + // -> : + // Lock is owned with pending waiters. Push our waiter to the queue. + + if (self.head == UNLOCKED) { + self.head = LOCKED; + held.release(); + return Held{ .lock = self }; + } + + var waiter: Waiter = undefined; + waiter.next = null; + waiter.tail = &waiter; + + const head = switch (self.head) { + UNLOCKED => unreachable, + LOCKED => null, + else => @intToPtr(*Waiter, self.head), }; - } - pub fn initLocked() Lock { - return Lock{ - .shared = true, - .queue = Queue.init(), - .queue_empty = true, - }; - } + if (head) |h| { + h.tail.next = &waiter; + h.tail = &waiter; + } else { + self.head = @ptrToInt(&waiter); + } - /// Must be called when not locked. Not thread safe. - /// All calls to acquire() and release() must complete before calling deinit(). - pub fn deinit(self: *Lock) void { - assert(!self.shared); - while (self.queue.get()) |node| resume node.data; - } - - pub fn acquire(self: *Lock) callconv(.Async) Held { - var my_tick_node = Loop.NextTickNode.init(@frame()); - - errdefer _ = self.queue.remove(&my_tick_node); // TODO test canceling an acquire suspend { - self.queue.put(&my_tick_node); - - // At this point, we are in the queue, so we might have already been resumed. - - // We set this bit so that later we can rely on the fact, that if queue_empty == true, some actor - // will attempt to grab the lock. - @atomicStore(bool, &self.queue_empty, false, .SeqCst); - - if (!@atomicRmw(bool, &self.shared, .Xchg, true, .SeqCst)) { - if (self.queue.get()) |node| { - // Whether this node is us or someone else, we tail resume it. - resume node.data; - } - } + waiter.node = Loop.NextTickNode{ + .prev = undefined, + .next = undefined, + .data = @frame(), + }; + held.release(); } return Held{ .lock = self }; } + + pub const Held = struct { + lock: *Lock, + + pub fn release(self: Held) void { + const waiter = blk: { + const held = self.lock.mutex.acquire(); + defer held.release(); + + // self.head goes through the reverse transition from acquire(): + // -> : + // pop a waiter from the queue to give Lock ownership when theres still others pending + // -> LOCKED: + // pop the laster waiter from the queue, while also giving it lock ownership when awaken + // LOCKED -> UNLOCKED: + // last lock owner releases lock while no one else is waiting for it + + switch (self.lock.head) { + UNLOCKED => { + unreachable; // Lock unlocked while unlocking + }, + LOCKED => { + self.lock.head = UNLOCKED; + break :blk null; + }, + else => { + const waiter = @intToPtr(*Waiter, self.lock.head); + self.lock.head = if (waiter.next == null) LOCKED else @ptrToInt(waiter.next); + if (waiter.next) |next| + next.tail = waiter.tail; + break :blk waiter; + }, + } + }; + + if (waiter) |w| { + global_event_loop.onNextTick(&w.node); + } + } + }; }; test "std.event.Lock" { @@ -128,41 +128,16 @@ test "std.event.Lock" { // TODO https://github.com/ziglang/zig/issues/3251 if (builtin.os.tag == .freebsd) return error.SkipZigTest; - // TODO this file has bit-rotted. repair it - if (true) return error.SkipZigTest; - - var lock = Lock.init(); - defer lock.deinit(); - - _ = async testLock(&lock); + var lock = Lock{}; + testLock(&lock); const expected_result = [1]i32{3 * @intCast(i32, shared_test_data.len)} ** shared_test_data.len; testing.expectEqualSlices(i32, &expected_result, &shared_test_data); } -fn testLock(lock: *Lock) callconv(.Async) void { +fn testLock(lock: *Lock) void { var handle1 = async lockRunner(lock); - var tick_node1 = Loop.NextTickNode{ - .prev = undefined, - .next = undefined, - .data = &handle1, - }; - Loop.instance.?.onNextTick(&tick_node1); - var handle2 = async lockRunner(lock); - var tick_node2 = Loop.NextTickNode{ - .prev = undefined, - .next = undefined, - .data = &handle2, - }; - Loop.instance.?.onNextTick(&tick_node2); - var handle3 = async lockRunner(lock); - var tick_node3 = Loop.NextTickNode{ - .prev = undefined, - .next = undefined, - .data = &handle3, - }; - Loop.instance.?.onNextTick(&tick_node3); await handle1; await handle2; @@ -171,13 +146,13 @@ fn testLock(lock: *Lock) callconv(.Async) void { var shared_test_data = [1]i32{0} ** 10; var shared_test_index: usize = 0; -fn lockRunner(lock: *Lock) callconv(.Async) void { - suspend; // resumed by onNextTick + +fn lockRunner(lock: *Lock) void { + Lock.global_event_loop.yield(); var i: usize = 0; while (i < shared_test_data.len) : (i += 1) { - var lock_frame = async lock.acquire(); - const handle = await lock_frame; + const handle = lock.acquire(); defer handle.release(); shared_test_index = 0;