175 lines
5.8 KiB
Zig
175 lines
5.8 KiB
Zig
const std = @import("std.zig");
|
|
const builtin = @import("builtin");
|
|
const AtomicOrder = builtin.AtomicOrder;
|
|
const AtomicRmwOp = builtin.AtomicRmwOp;
|
|
const testing = std.testing;
|
|
const SpinLock = std.SpinLock;
|
|
const linux = std.os.linux;
|
|
const windows = std.os.windows;
|
|
|
|
/// Lock may be held only once. If the same thread
|
|
/// tries to acquire the same mutex twice, it deadlocks.
|
|
/// This type must be initialized at runtime, and then deinitialized when no
|
|
/// longer needed, to free resources.
|
|
/// If you need static initialization, use std.StaticallyInitializedMutex.
|
|
/// The Linux implementation is based on mutex3 from
|
|
/// https://www.akkadia.org/drepper/futex.pdf
|
|
/// When an application is built in single threaded release mode, all the functions are
|
|
/// no-ops. In single threaded debug mode, there is deadlock detection.
|
|
pub const Mutex = if (builtin.single_threaded)
|
|
struct {
|
|
lock: @typeOf(lock_init),
|
|
|
|
const lock_init = if (std.debug.runtime_safety) false else {};
|
|
|
|
pub const Held = struct {
|
|
mutex: *Mutex,
|
|
|
|
pub fn release(self: Held) void {
|
|
if (std.debug.runtime_safety) {
|
|
self.mutex.lock = false;
|
|
}
|
|
}
|
|
};
|
|
pub fn init() Mutex {
|
|
return Mutex{ .lock = lock_init };
|
|
}
|
|
pub fn deinit(self: *Mutex) void {}
|
|
|
|
pub fn acquire(self: *Mutex) Held {
|
|
if (std.debug.runtime_safety and self.lock) {
|
|
@panic("deadlock detected");
|
|
}
|
|
return Held{ .mutex = self };
|
|
}
|
|
}
|
|
else switch (builtin.os) {
|
|
builtin.Os.linux => struct {
|
|
/// 0: unlocked
|
|
/// 1: locked, no waiters
|
|
/// 2: locked, one or more waiters
|
|
lock: i32,
|
|
|
|
pub const Held = struct {
|
|
mutex: *Mutex,
|
|
|
|
pub fn release(self: Held) void {
|
|
const c = @atomicRmw(i32, &self.mutex.lock, AtomicRmwOp.Sub, 1, AtomicOrder.Release);
|
|
if (c != 1) {
|
|
_ = @atomicRmw(i32, &self.mutex.lock, AtomicRmwOp.Xchg, 0, AtomicOrder.Release);
|
|
const rc = linux.futex_wake(&self.mutex.lock, linux.FUTEX_WAKE | linux.FUTEX_PRIVATE_FLAG, 1);
|
|
switch (linux.getErrno(rc)) {
|
|
0 => {},
|
|
linux.EINVAL => unreachable,
|
|
else => unreachable,
|
|
}
|
|
}
|
|
}
|
|
};
|
|
|
|
pub fn init() Mutex {
|
|
return Mutex{ .lock = 0 };
|
|
}
|
|
|
|
pub fn deinit(self: *Mutex) void {}
|
|
|
|
pub fn acquire(self: *Mutex) Held {
|
|
var c = @cmpxchgWeak(i32, &self.lock, 0, 1, AtomicOrder.Acquire, AtomicOrder.Monotonic) orelse
|
|
return Held{ .mutex = self };
|
|
if (c != 2)
|
|
c = @atomicRmw(i32, &self.lock, AtomicRmwOp.Xchg, 2, AtomicOrder.Acquire);
|
|
while (c != 0) {
|
|
const rc = linux.futex_wait(&self.lock, linux.FUTEX_WAIT | linux.FUTEX_PRIVATE_FLAG, 2, null);
|
|
switch (linux.getErrno(rc)) {
|
|
0, linux.EINTR, linux.EAGAIN => {},
|
|
linux.EINVAL => unreachable,
|
|
else => unreachable,
|
|
}
|
|
c = @atomicRmw(i32, &self.lock, AtomicRmwOp.Xchg, 2, AtomicOrder.Acquire);
|
|
}
|
|
return Held{ .mutex = self };
|
|
}
|
|
},
|
|
// TODO once https://github.com/ziglang/zig/issues/287 (copy elision) is solved, we can make a
|
|
// better implementation of this. The problem is we need the init() function to have access to
|
|
// the address of the CRITICAL_SECTION, and then have it not move.
|
|
builtin.Os.windows => std.StaticallyInitializedMutex,
|
|
else => struct {
|
|
/// TODO better implementation than spin lock.
|
|
/// When changing this, one must also change the corresponding
|
|
/// std.StaticallyInitializedMutex code, since it aliases this type,
|
|
/// under the assumption that it works both statically and at runtime.
|
|
lock: SpinLock,
|
|
|
|
pub const Held = struct {
|
|
mutex: *Mutex,
|
|
|
|
pub fn release(self: Held) void {
|
|
SpinLock.Held.release(SpinLock.Held{ .spinlock = &self.mutex.lock });
|
|
}
|
|
};
|
|
|
|
pub fn init() Mutex {
|
|
return Mutex{ .lock = SpinLock.init() };
|
|
}
|
|
|
|
pub fn deinit(self: *Mutex) void {}
|
|
|
|
pub fn acquire(self: *Mutex) Held {
|
|
_ = self.lock.acquire();
|
|
return Held{ .mutex = self };
|
|
}
|
|
},
|
|
};
|
|
|
|
const TestContext = struct {
|
|
mutex: *Mutex,
|
|
data: i128,
|
|
|
|
const incr_count = 10000;
|
|
};
|
|
|
|
test "std.Mutex" {
|
|
var direct_allocator = std.heap.DirectAllocator.init();
|
|
defer direct_allocator.deinit();
|
|
|
|
var plenty_of_memory = try direct_allocator.allocator.alloc(u8, 300 * 1024);
|
|
defer direct_allocator.allocator.free(plenty_of_memory);
|
|
|
|
var fixed_buffer_allocator = std.heap.ThreadSafeFixedBufferAllocator.init(plenty_of_memory);
|
|
var a = &fixed_buffer_allocator.allocator;
|
|
|
|
var mutex = Mutex.init();
|
|
defer mutex.deinit();
|
|
|
|
var context = TestContext{
|
|
.mutex = &mutex,
|
|
.data = 0,
|
|
};
|
|
|
|
if (builtin.single_threaded) {
|
|
worker(&context);
|
|
testing.expect(context.data == TestContext.incr_count);
|
|
} else {
|
|
const thread_count = 10;
|
|
var threads: [thread_count]*std.os.Thread = undefined;
|
|
for (threads) |*t| {
|
|
t.* = try std.os.spawnThread(&context, worker);
|
|
}
|
|
for (threads) |t|
|
|
t.wait();
|
|
|
|
testing.expect(context.data == thread_count * TestContext.incr_count);
|
|
}
|
|
}
|
|
|
|
fn worker(ctx: *TestContext) void {
|
|
var i: usize = 0;
|
|
while (i != TestContext.incr_count) : (i += 1) {
|
|
const held = ctx.mutex.acquire();
|
|
defer held.release();
|
|
|
|
ctx.data += 1;
|
|
}
|
|
}
|