better mutex implementation
based on Ulrich Drepper's "Futexes are tricky" paper, Mutex, Take 3 also includes testsmaster
parent
66cb75d114
commit
3f13a59cbc
|
@ -8,6 +8,8 @@ const linux = std.os.linux;
|
|||
|
||||
/// Lock may be held only once. If the same thread
|
||||
/// tries to acquire the same mutex twice, it deadlocks.
|
||||
/// The Linux implementation is based on mutex3 from
|
||||
/// https://www.akkadia.org/drepper/futex.pdf
|
||||
pub const Mutex = struct {
|
||||
/// 0: unlocked
|
||||
/// 1: locked, no waiters
|
||||
|
@ -25,12 +27,10 @@ pub const Mutex = struct {
|
|||
|
||||
pub fn release(self: Held) void {
|
||||
if (builtin.os == builtin.Os.linux) {
|
||||
// Always unlock. If the previous state was Locked-No-Waiters, then we're done.
|
||||
// Otherwise, wake a waiter up.
|
||||
const prev = @atomicRmw(i32, &self.mutex.linux_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.Release);
|
||||
if (prev != 1) {
|
||||
assert(prev == 2);
|
||||
const rc = linux.futex_wake(&self.mutex.linux_lock, linux.FUTEX_WAKE, 1);
|
||||
const c = @atomicRmw(i32, &self.mutex.linux_lock, AtomicRmwOp.Sub, 1, AtomicOrder.Release);
|
||||
if (c != 1) {
|
||||
_ = @atomicRmw(i32, &self.mutex.linux_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.Release);
|
||||
const rc = linux.futex_wake(&self.mutex.linux_lock, linux.FUTEX_WAKE | linux.FUTEX_PRIVATE_FLAG, 1);
|
||||
switch (linux.getErrno(rc)) {
|
||||
0 => {},
|
||||
linux.EINVAL => unreachable,
|
||||
|
@ -52,21 +52,18 @@ pub const Mutex = struct {
|
|||
|
||||
pub fn acquire(self: *Mutex) Held {
|
||||
if (builtin.os == builtin.Os.linux) {
|
||||
// First try to go from Unlocked to Locked-No-Waiters. If this succeeds, no syscalls are needed.
|
||||
// Otherwise, we need to be in the Locked-With-Waiters state. If we are already in that state,
|
||||
// proceed to futex_wait. Otherwise, try to go from Locked-No-Waiters to Locked-With-Waiters.
|
||||
// If that succeeds, proceed to futex_wait. Otherwise start the whole loop over again.
|
||||
while (@cmpxchgWeak(i32, &self.linux_lock, 0, 1, AtomicOrder.Acquire, AtomicOrder.Monotonic)) |l| {
|
||||
if (l == 2 or
|
||||
@cmpxchgWeak(i32, &self.linux_lock, 1, 2, AtomicOrder.Acquire, AtomicOrder.Monotonic) == null)
|
||||
{
|
||||
const rc = linux.futex_wait(&self.linux_lock, linux.FUTEX_WAIT, 2, null);
|
||||
switch (linux.getErrno(rc)) {
|
||||
0, linux.EINTR, linux.EAGAIN => continue,
|
||||
linux.EINVAL => unreachable,
|
||||
else => unreachable,
|
||||
}
|
||||
var c = @cmpxchgWeak(i32, &self.linux_lock, 0, 1, AtomicOrder.Acquire, AtomicOrder.Monotonic) orelse
|
||||
return Held{ .mutex = self };
|
||||
if (c != 2)
|
||||
c = @atomicRmw(i32, &self.linux_lock, AtomicRmwOp.Xchg, 2, AtomicOrder.Acquire);
|
||||
while (c != 0) {
|
||||
const rc = linux.futex_wait(&self.linux_lock, linux.FUTEX_WAIT | linux.FUTEX_PRIVATE_FLAG, 2, null);
|
||||
switch (linux.getErrno(rc)) {
|
||||
0, linux.EINTR, linux.EAGAIN => {},
|
||||
linux.EINVAL => unreachable,
|
||||
else => unreachable,
|
||||
}
|
||||
c = @atomicRmw(i32, &self.linux_lock, AtomicRmwOp.Xchg, 2, AtomicOrder.Acquire);
|
||||
}
|
||||
} else {
|
||||
_ = self.spin_lock.acquire();
|
||||
|
@ -74,3 +71,47 @@ pub const Mutex = struct {
|
|||
return Held{ .mutex = self };
|
||||
}
|
||||
};
|
||||
|
||||
const Context = struct {
|
||||
mutex: *Mutex,
|
||||
data: i128,
|
||||
|
||||
const incr_count = 10000;
|
||||
};
|
||||
|
||||
test "std.Mutex" {
|
||||
var direct_allocator = std.heap.DirectAllocator.init();
|
||||
defer direct_allocator.deinit();
|
||||
|
||||
var plenty_of_memory = try direct_allocator.allocator.alloc(u8, 300 * 1024);
|
||||
defer direct_allocator.allocator.free(plenty_of_memory);
|
||||
|
||||
var fixed_buffer_allocator = std.heap.ThreadSafeFixedBufferAllocator.init(plenty_of_memory);
|
||||
var a = &fixed_buffer_allocator.allocator;
|
||||
|
||||
var mutex = Mutex.init();
|
||||
var context = Context{
|
||||
.mutex = &mutex,
|
||||
.data = 0,
|
||||
};
|
||||
|
||||
const thread_count = 10;
|
||||
var threads: [thread_count]*std.os.Thread = undefined;
|
||||
for (threads) |*t| {
|
||||
t.* = try std.os.spawnThread(&context, worker);
|
||||
}
|
||||
for (threads) |t|
|
||||
t.wait();
|
||||
|
||||
std.debug.assertOrPanic(context.data == thread_count * Context.incr_count);
|
||||
}
|
||||
|
||||
fn worker(ctx: *Context) void {
|
||||
var i: usize = 0;
|
||||
while (i != Context.incr_count) : (i += 1) {
|
||||
const held = ctx.mutex.acquire();
|
||||
defer held.release();
|
||||
|
||||
ctx.data += 1;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue