Merge pull request #6441 from kprotty/lock
New std.event.Lock implementation
This commit is contained in:
commit
a0c0f9ead5
@ -16,107 +16,107 @@ const Loop = std.event.Loop;
|
|||||||
/// Allows only one actor to hold the lock.
|
/// Allows only one actor to hold the lock.
|
||||||
/// TODO: make this API also work in blocking I/O mode.
|
/// TODO: make this API also work in blocking I/O mode.
|
||||||
pub const Lock = struct {
|
pub const Lock = struct {
|
||||||
shared: bool,
|
mutex: std.Mutex = std.Mutex{},
|
||||||
queue: Queue,
|
head: usize = UNLOCKED,
|
||||||
queue_empty: bool,
|
|
||||||
|
|
||||||
const Queue = std.atomic.Queue(anyframe);
|
const UNLOCKED = 0;
|
||||||
|
const LOCKED = 1;
|
||||||
|
|
||||||
const global_event_loop = Loop.instance orelse
|
const global_event_loop = Loop.instance orelse
|
||||||
@compileError("std.event.Lock currently only works with event-based I/O");
|
@compileError("std.event.Lock currently only works with event-based I/O");
|
||||||
|
|
||||||
pub const Held = struct {
|
const Waiter = struct {
|
||||||
lock: *Lock,
|
// forced Waiter alignment to ensure it doesn't clash with LOCKED
|
||||||
|
next: ?*Waiter align(2),
|
||||||
pub fn release(self: Held) void {
|
tail: *Waiter,
|
||||||
// Resume the next item from the queue.
|
node: Loop.NextTickNode,
|
||||||
if (self.lock.queue.get()) |node| {
|
|
||||||
global_event_loop.onNextTick(node);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// We need to release the lock.
|
|
||||||
@atomicStore(bool, &self.lock.queue_empty, true, .SeqCst);
|
|
||||||
@atomicStore(bool, &self.lock.shared, false, .SeqCst);
|
|
||||||
|
|
||||||
// There might be a queue item. If we know the queue is empty, we can be done,
|
|
||||||
// because the other actor will try to obtain the lock.
|
|
||||||
// But if there's a queue item, we are the actor which must loop and attempt
|
|
||||||
// to grab the lock again.
|
|
||||||
if (@atomicLoad(bool, &self.lock.queue_empty, .SeqCst)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
while (true) {
|
|
||||||
if (@atomicRmw(bool, &self.lock.shared, .Xchg, true, .SeqCst)) {
|
|
||||||
// We did not obtain the lock. Great, the queue is someone else's problem.
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Resume the next item from the queue.
|
|
||||||
if (self.lock.queue.get()) |node| {
|
|
||||||
global_event_loop.onNextTick(node);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Release the lock again.
|
|
||||||
@atomicStore(bool, &self.lock.queue_empty, true, .SeqCst);
|
|
||||||
@atomicStore(bool, &self.lock.shared, false, .SeqCst);
|
|
||||||
|
|
||||||
// Find out if we can be done.
|
|
||||||
if (@atomicLoad(bool, &self.lock.queue_empty, .SeqCst)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
pub fn init() Lock {
|
pub fn acquire(self: *Lock) Held {
|
||||||
return Lock{
|
const held = self.mutex.acquire();
|
||||||
.shared = false,
|
|
||||||
.queue = Queue.init(),
|
// self.head transitions from multiple stages depending on the value:
|
||||||
.queue_empty = true,
|
// UNLOCKED -> LOCKED:
|
||||||
|
// acquire Lock ownership when theres no waiters
|
||||||
|
// LOCKED -> <Waiter head ptr>:
|
||||||
|
// Lock is already owned, enqueue first Waiter
|
||||||
|
// <head ptr> -> <head ptr>:
|
||||||
|
// Lock is owned with pending waiters. Push our waiter to the queue.
|
||||||
|
|
||||||
|
if (self.head == UNLOCKED) {
|
||||||
|
self.head = LOCKED;
|
||||||
|
held.release();
|
||||||
|
return Held{ .lock = self };
|
||||||
|
}
|
||||||
|
|
||||||
|
var waiter: Waiter = undefined;
|
||||||
|
waiter.next = null;
|
||||||
|
waiter.tail = &waiter;
|
||||||
|
|
||||||
|
const head = switch (self.head) {
|
||||||
|
UNLOCKED => unreachable,
|
||||||
|
LOCKED => null,
|
||||||
|
else => @intToPtr(*Waiter, self.head),
|
||||||
};
|
};
|
||||||
}
|
|
||||||
|
|
||||||
pub fn initLocked() Lock {
|
if (head) |h| {
|
||||||
return Lock{
|
h.tail.next = &waiter;
|
||||||
.shared = true,
|
h.tail = &waiter;
|
||||||
.queue = Queue.init(),
|
} else {
|
||||||
.queue_empty = true,
|
self.head = @ptrToInt(&waiter);
|
||||||
};
|
}
|
||||||
}
|
|
||||||
|
|
||||||
/// Must be called when not locked. Not thread safe.
|
|
||||||
/// All calls to acquire() and release() must complete before calling deinit().
|
|
||||||
pub fn deinit(self: *Lock) void {
|
|
||||||
assert(!self.shared);
|
|
||||||
while (self.queue.get()) |node| resume node.data;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn acquire(self: *Lock) callconv(.Async) Held {
|
|
||||||
var my_tick_node = Loop.NextTickNode.init(@frame());
|
|
||||||
|
|
||||||
errdefer _ = self.queue.remove(&my_tick_node); // TODO test canceling an acquire
|
|
||||||
suspend {
|
suspend {
|
||||||
self.queue.put(&my_tick_node);
|
waiter.node = Loop.NextTickNode{
|
||||||
|
.prev = undefined,
|
||||||
// At this point, we are in the queue, so we might have already been resumed.
|
.next = undefined,
|
||||||
|
.data = @frame(),
|
||||||
// We set this bit so that later we can rely on the fact, that if queue_empty == true, some actor
|
};
|
||||||
// will attempt to grab the lock.
|
held.release();
|
||||||
@atomicStore(bool, &self.queue_empty, false, .SeqCst);
|
|
||||||
|
|
||||||
if (!@atomicRmw(bool, &self.shared, .Xchg, true, .SeqCst)) {
|
|
||||||
if (self.queue.get()) |node| {
|
|
||||||
// Whether this node is us or someone else, we tail resume it.
|
|
||||||
resume node.data;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return Held{ .lock = self };
|
return Held{ .lock = self };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub const Held = struct {
|
||||||
|
lock: *Lock,
|
||||||
|
|
||||||
|
pub fn release(self: Held) void {
|
||||||
|
const waiter = blk: {
|
||||||
|
const held = self.lock.mutex.acquire();
|
||||||
|
defer held.release();
|
||||||
|
|
||||||
|
// self.head goes through the reverse transition from acquire():
|
||||||
|
// <head ptr> -> <new head ptr>:
|
||||||
|
// pop a waiter from the queue to give Lock ownership when theres still others pending
|
||||||
|
// <head ptr> -> LOCKED:
|
||||||
|
// pop the laster waiter from the queue, while also giving it lock ownership when awaken
|
||||||
|
// LOCKED -> UNLOCKED:
|
||||||
|
// last lock owner releases lock while no one else is waiting for it
|
||||||
|
|
||||||
|
switch (self.lock.head) {
|
||||||
|
UNLOCKED => {
|
||||||
|
unreachable; // Lock unlocked while unlocking
|
||||||
|
},
|
||||||
|
LOCKED => {
|
||||||
|
self.lock.head = UNLOCKED;
|
||||||
|
break :blk null;
|
||||||
|
},
|
||||||
|
else => {
|
||||||
|
const waiter = @intToPtr(*Waiter, self.lock.head);
|
||||||
|
self.lock.head = if (waiter.next == null) LOCKED else @ptrToInt(waiter.next);
|
||||||
|
if (waiter.next) |next|
|
||||||
|
next.tail = waiter.tail;
|
||||||
|
break :blk waiter;
|
||||||
|
},
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if (waiter) |w| {
|
||||||
|
global_event_loop.onNextTick(&w.node);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
test "std.event.Lock" {
|
test "std.event.Lock" {
|
||||||
@ -128,41 +128,16 @@ test "std.event.Lock" {
|
|||||||
// TODO https://github.com/ziglang/zig/issues/3251
|
// TODO https://github.com/ziglang/zig/issues/3251
|
||||||
if (builtin.os.tag == .freebsd) return error.SkipZigTest;
|
if (builtin.os.tag == .freebsd) return error.SkipZigTest;
|
||||||
|
|
||||||
// TODO this file has bit-rotted. repair it
|
var lock = Lock{};
|
||||||
if (true) return error.SkipZigTest;
|
testLock(&lock);
|
||||||
|
|
||||||
var lock = Lock.init();
|
|
||||||
defer lock.deinit();
|
|
||||||
|
|
||||||
_ = async testLock(&lock);
|
|
||||||
|
|
||||||
const expected_result = [1]i32{3 * @intCast(i32, shared_test_data.len)} ** shared_test_data.len;
|
const expected_result = [1]i32{3 * @intCast(i32, shared_test_data.len)} ** shared_test_data.len;
|
||||||
testing.expectEqualSlices(i32, &expected_result, &shared_test_data);
|
testing.expectEqualSlices(i32, &expected_result, &shared_test_data);
|
||||||
}
|
}
|
||||||
fn testLock(lock: *Lock) callconv(.Async) void {
|
fn testLock(lock: *Lock) void {
|
||||||
var handle1 = async lockRunner(lock);
|
var handle1 = async lockRunner(lock);
|
||||||
var tick_node1 = Loop.NextTickNode{
|
|
||||||
.prev = undefined,
|
|
||||||
.next = undefined,
|
|
||||||
.data = &handle1,
|
|
||||||
};
|
|
||||||
Loop.instance.?.onNextTick(&tick_node1);
|
|
||||||
|
|
||||||
var handle2 = async lockRunner(lock);
|
var handle2 = async lockRunner(lock);
|
||||||
var tick_node2 = Loop.NextTickNode{
|
|
||||||
.prev = undefined,
|
|
||||||
.next = undefined,
|
|
||||||
.data = &handle2,
|
|
||||||
};
|
|
||||||
Loop.instance.?.onNextTick(&tick_node2);
|
|
||||||
|
|
||||||
var handle3 = async lockRunner(lock);
|
var handle3 = async lockRunner(lock);
|
||||||
var tick_node3 = Loop.NextTickNode{
|
|
||||||
.prev = undefined,
|
|
||||||
.next = undefined,
|
|
||||||
.data = &handle3,
|
|
||||||
};
|
|
||||||
Loop.instance.?.onNextTick(&tick_node3);
|
|
||||||
|
|
||||||
await handle1;
|
await handle1;
|
||||||
await handle2;
|
await handle2;
|
||||||
@ -171,13 +146,13 @@ fn testLock(lock: *Lock) callconv(.Async) void {
|
|||||||
|
|
||||||
var shared_test_data = [1]i32{0} ** 10;
|
var shared_test_data = [1]i32{0} ** 10;
|
||||||
var shared_test_index: usize = 0;
|
var shared_test_index: usize = 0;
|
||||||
fn lockRunner(lock: *Lock) callconv(.Async) void {
|
|
||||||
suspend; // resumed by onNextTick
|
fn lockRunner(lock: *Lock) void {
|
||||||
|
Lock.global_event_loop.yield();
|
||||||
|
|
||||||
var i: usize = 0;
|
var i: usize = 0;
|
||||||
while (i < shared_test_data.len) : (i += 1) {
|
while (i < shared_test_data.len) : (i += 1) {
|
||||||
var lock_frame = async lock.acquire();
|
const handle = lock.acquire();
|
||||||
const handle = await lock_frame;
|
|
||||||
defer handle.release();
|
defer handle.release();
|
||||||
|
|
||||||
shared_test_index = 0;
|
shared_test_index = 0;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user