zig/std/event/lock.zig

205 lines
7.4 KiB
Zig
Raw Normal View History

2018-07-09 19:22:44 -07:00
const std = @import("../index.zig");
const builtin = @import("builtin");
const assert = std.debug.assert;
const mem = std.mem;
const AtomicRmwOp = builtin.AtomicRmwOp;
const AtomicOrder = builtin.AtomicOrder;
const Loop = std.event.Loop;
/// Thread-safe async/await lock.
/// Does not make any syscalls - coroutines which are waiting for the lock are suspended, and
/// are resumed when the lock is released, in order.
pub const Lock = struct {
loop: *Loop,
shared_bit: u8, // TODO make this a bool
queue: Queue,
queue_empty_bit: u8, // TODO make this a bool
const Queue = std.atomic.QueueMpsc(promise);
pub const Held = struct {
lock: *Lock,
pub fn release(self: Held) void {
// Resume the next item from the queue.
if (self.lock.queue.get()) |node| {
self.lock.loop.onNextTick(node);
return;
}
// We need to release the lock.
_ = @atomicRmw(u8, &self.lock.queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
_ = @atomicRmw(u8, &self.lock.shared_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
// There might be a queue item. If we know the queue is empty, we can be done,
// because the other actor will try to obtain the lock.
// But if there's a queue item, we are the actor which must loop and attempt
// to grab the lock again.
if (@atomicLoad(u8, &self.lock.queue_empty_bit, AtomicOrder.SeqCst) == 1) {
return;
}
while (true) {
const old_bit = @atomicRmw(u8, &self.lock.shared_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
if (old_bit != 0) {
// We did not obtain the lock. Great, the queue is someone else's problem.
return;
}
// Resume the next item from the queue.
if (self.lock.queue.get()) |node| {
self.lock.loop.onNextTick(node);
return;
}
// Release the lock again.
_ = @atomicRmw(u8, &self.lock.queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
_ = @atomicRmw(u8, &self.lock.shared_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
// Find out if we can be done.
if (@atomicLoad(u8, &self.lock.queue_empty_bit, AtomicOrder.SeqCst) == 1) {
return;
}
}
}
};
pub fn init(loop: *Loop) Lock {
return Lock{
.loop = loop,
.shared_bit = 0,
.queue = Queue.init(),
.queue_empty_bit = 1,
};
}
/// Must be called when not locked. Not thread safe.
/// All calls to acquire() and release() must complete before calling deinit().
pub fn deinit(self: *Lock) void {
assert(self.shared_bit == 0);
while (self.queue.get()) |node| cancel node.data;
}
pub async fn acquire(self: *Lock) Held {
s: suspend |handle| {
// TODO explicitly put this memory in the coroutine frame #1194
var my_tick_node = Loop.NextTickNode{
.data = handle,
.next = undefined,
};
self.queue.put(&my_tick_node);
// At this point, we are in the queue, so we might have already been resumed and this coroutine
// frame might be destroyed. For the rest of the suspend block we cannot access the coroutine frame.
// We set this bit so that later we can rely on the fact, that if queue_empty_bit is 1, some actor
// will attempt to grab the lock.
_ = @atomicRmw(u8, &self.queue_empty_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
while (true) {
const old_bit = @atomicRmw(u8, &self.shared_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
if (old_bit != 0) {
// We did not obtain the lock. Trust that our queue entry will resume us, and allow
// suspend to complete.
break;
}
// We got the lock. However we might have already been resumed from the queue.
if (self.queue.get()) |node| {
// Whether this node is us or someone else, we tail resume it.
resume node.data;
break;
} else {
// We already got resumed, and there are none left in the queue, which means that
// we aren't even supposed to hold the lock right now.
_ = @atomicRmw(u8, &self.queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
_ = @atomicRmw(u8, &self.shared_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
// There might be a queue item. If we know the queue is empty, we can be done,
// because the other actor will try to obtain the lock.
// But if there's a queue item, we are the actor which must loop and attempt
// to grab the lock again.
if (@atomicLoad(u8, &self.queue_empty_bit, AtomicOrder.SeqCst) == 1) {
break;
} else {
continue;
}
}
unreachable;
}
}
return Held{ .lock = self };
}
};
test "std.event.Lock" {
var da = std.heap.DirectAllocator.init();
defer da.deinit();
const allocator = &da.allocator;
var loop: Loop = undefined;
try loop.initMultiThreaded(allocator);
defer loop.deinit();
var lock = Lock.init(&loop);
defer lock.deinit();
const handle = try async<allocator> testLock(&loop, &lock);
defer cancel handle;
loop.run();
assert(mem.eql(i32, shared_test_data, [1]i32{3 * @intCast(i32, shared_test_data.len)} ** shared_test_data.len));
}
async fn testLock(loop: *Loop, lock: *Lock) void {
// TODO explicitly put next tick node memory in the coroutine frame #1194
suspend |p| {
resume p;
}
const handle1 = async lockRunner(lock) catch @panic("out of memory");
var tick_node1 = Loop.NextTickNode{
.next = undefined,
.data = handle1,
};
loop.onNextTick(&tick_node1);
const handle2 = async lockRunner(lock) catch @panic("out of memory");
var tick_node2 = Loop.NextTickNode{
.next = undefined,
.data = handle2,
};
loop.onNextTick(&tick_node2);
const handle3 = async lockRunner(lock) catch @panic("out of memory");
var tick_node3 = Loop.NextTickNode{
.next = undefined,
.data = handle3,
};
loop.onNextTick(&tick_node3);
await handle1;
await handle2;
await handle3;
}
var shared_test_data = [1]i32{0} ** 10;
var shared_test_index: usize = 0;
async fn lockRunner(lock: *Lock) void {
suspend; // resumed by onNextTick
var i: usize = 0;
while (i < shared_test_data.len) : (i += 1) {
const lock_promise = async lock.acquire() catch @panic("out of memory");
const handle = await lock_promise;
defer handle.release();
shared_test_index = 0;
while (shared_test_index < shared_test_data.len) : (shared_test_index += 1) {
shared_test_data[shared_test_index] = shared_test_data[shared_test_index] + 1;
}
}
}