`std.GeneralPurposeAllocator` is now available. It is a function that takes a configuration struct (with default field values) and returns an allocator. There is a detailed description of this allocator in the doc comments at the top of the new file. The main feature of this allocator is that it is *safe*. It prevents double-free, use-after-free, and detects leaks. Some deprecation compile errors are removed. The Allocator interface gains `old_align` as a new parameter to `resizeFn`. This is useful to quickly look up allocations. `std.heap.page_allocator` is improved to use mmap address hints to avoid obtaining the same virtual address pages when unmapping and mapping pages. The new general purpose allocator uses the page allocator as its backing allocator by default. `std.testing.allocator` is replaced with usage of this new allocator, which does leak checking, and so the LeakCheckAllocator is retired. stage1 is improved so that the `@typeInfo` of a pointer has a lazy value for the alignment of the child type, to avoid false dependency loops when dealing with pointers to async function frames. The `std.mem.Allocator` interface is refactored to be in its own file. `std.Mutex` now exposes the dummy mutex with `std.Mutex.Dummy`. This allocator is great for debug mode, however it needs some work to have better performance in release modes. The next step will be setting up a series of tests in ziglang/gotta-go-fast and then making improvements to the implementation.
350 lines
12 KiB
Zig
350 lines
12 KiB
Zig
const std = @import("std.zig");
|
|
const builtin = @import("builtin");
|
|
const os = std.os;
|
|
const assert = std.debug.assert;
|
|
const windows = os.windows;
|
|
const testing = std.testing;
|
|
const SpinLock = std.SpinLock;
|
|
const ResetEvent = std.ResetEvent;
|
|
|
|
/// Lock may be held only once. If the same thread tries to acquire
|
|
/// the same mutex twice, it deadlocks. This type supports static
|
|
/// initialization and is at most `@sizeOf(usize)` in size. When an
|
|
/// application is built in single threaded release mode, all the
|
|
/// functions are no-ops. In single threaded debug mode, there is
|
|
/// deadlock detection.
|
|
///
|
|
/// Example usage:
|
|
/// var m = Mutex.init();
|
|
/// defer m.deinit();
|
|
///
|
|
/// const lock = m.acquire();
|
|
/// defer lock.release();
|
|
/// ... critical code
|
|
///
|
|
/// Non-blocking:
|
|
/// if (m.tryAcquire) |lock| {
|
|
/// defer lock.release();
|
|
/// // ... critical section
|
|
/// } else {
|
|
/// // ... lock not acquired
|
|
/// }
|
|
pub const Mutex = if (builtin.single_threaded)
|
|
Dummy
|
|
else if (builtin.os.tag == .windows)
|
|
// https://locklessinc.com/articles/keyed_events/
|
|
extern union {
|
|
locked: u8,
|
|
waiters: u32,
|
|
|
|
const WAKE = 1 << 8;
|
|
const WAIT = 1 << 9;
|
|
|
|
pub const Dummy = Dummy;
|
|
|
|
pub fn init() Mutex {
|
|
return Mutex{ .waiters = 0 };
|
|
}
|
|
|
|
pub fn deinit(self: *Mutex) void {
|
|
self.* = undefined;
|
|
}
|
|
|
|
pub fn tryAcquire(self: *Mutex) ?Held {
|
|
if (@atomicRmw(u8, &self.locked, .Xchg, 1, .Acquire) != 0)
|
|
return null;
|
|
return Held{ .mutex = self };
|
|
}
|
|
|
|
pub fn acquire(self: *Mutex) Held {
|
|
return self.tryAcquire() orelse self.acquireSlow();
|
|
}
|
|
|
|
fn acquireSpinning(self: *Mutex) Held {
|
|
@setCold(true);
|
|
while (true) : (SpinLock.yield()) {
|
|
return self.tryAcquire() orelse continue;
|
|
}
|
|
}
|
|
|
|
fn acquireSlow(self: *Mutex) Held {
|
|
// try to use NT keyed events for blocking, falling back to spinlock if unavailable
|
|
@setCold(true);
|
|
const handle = ResetEvent.OsEvent.Futex.getEventHandle() orelse return self.acquireSpinning();
|
|
const key = @ptrCast(*const c_void, &self.waiters);
|
|
|
|
while (true) : (SpinLock.loopHint(1)) {
|
|
const waiters = @atomicLoad(u32, &self.waiters, .Monotonic);
|
|
|
|
// try and take lock if unlocked
|
|
if ((waiters & 1) == 0) {
|
|
if (@atomicRmw(u8, &self.locked, .Xchg, 1, .Acquire) == 0) {
|
|
return Held{ .mutex = self };
|
|
}
|
|
|
|
// otherwise, try and update the waiting count.
|
|
// then unset the WAKE bit so that another unlocker can wake up a thread.
|
|
} else if (@cmpxchgWeak(u32, &self.waiters, waiters, (waiters + WAIT) | 1, .Monotonic, .Monotonic) == null) {
|
|
const rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, null);
|
|
assert(rc == .SUCCESS);
|
|
_ = @atomicRmw(u32, &self.waiters, .Sub, WAKE, .Monotonic);
|
|
}
|
|
}
|
|
}
|
|
|
|
pub const Held = struct {
|
|
mutex: *Mutex,
|
|
|
|
pub fn release(self: Held) void {
|
|
// unlock without a rmw/cmpxchg instruction
|
|
@atomicStore(u8, @ptrCast(*u8, &self.mutex.locked), 0, .Release);
|
|
const handle = ResetEvent.OsEvent.Futex.getEventHandle() orelse return;
|
|
const key = @ptrCast(*const c_void, &self.mutex.waiters);
|
|
|
|
while (true) : (SpinLock.loopHint(1)) {
|
|
const waiters = @atomicLoad(u32, &self.mutex.waiters, .Monotonic);
|
|
|
|
// no one is waiting
|
|
if (waiters < WAIT) return;
|
|
// someone grabbed the lock and will do the wake instead
|
|
if (waiters & 1 != 0) return;
|
|
// someone else is currently waking up
|
|
if (waiters & WAKE != 0) return;
|
|
|
|
// try to decrease the waiter count & set the WAKE bit meaning a thread is waking up
|
|
if (@cmpxchgWeak(u32, &self.mutex.waiters, waiters, waiters - WAIT + WAKE, .Release, .Monotonic) == null) {
|
|
const rc = windows.ntdll.NtReleaseKeyedEvent(handle, key, windows.FALSE, null);
|
|
assert(rc == .SUCCESS);
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
};
|
|
}
|
|
else if (builtin.link_libc or builtin.os.tag == .linux)
|
|
// stack-based version of https://github.com/Amanieu/parking_lot/blob/master/core/src/word_lock.rs
|
|
struct {
|
|
state: usize,
|
|
|
|
pub const Dummy = Dummy;
|
|
|
|
/// number of times to spin trying to acquire the lock.
|
|
/// https://webkit.org/blog/6161/locking-in-webkit/
|
|
const SPIN_COUNT = 40;
|
|
|
|
const MUTEX_LOCK: usize = 1 << 0;
|
|
const QUEUE_LOCK: usize = 1 << 1;
|
|
const QUEUE_MASK: usize = ~(MUTEX_LOCK | QUEUE_LOCK);
|
|
|
|
const Node = struct {
|
|
next: ?*Node,
|
|
event: ResetEvent,
|
|
};
|
|
|
|
pub fn init() Mutex {
|
|
return Mutex{ .state = 0 };
|
|
}
|
|
|
|
pub fn deinit(self: *Mutex) void {
|
|
self.* = undefined;
|
|
}
|
|
|
|
pub fn tryAcquire(self: *Mutex) ?Held {
|
|
if (@cmpxchgWeak(usize, &self.state, 0, MUTEX_LOCK, .Acquire, .Monotonic) != null)
|
|
return null;
|
|
return Held{ .mutex = self };
|
|
}
|
|
|
|
pub fn acquire(self: *Mutex) Held {
|
|
return self.tryAcquire() orelse {
|
|
self.acquireSlow();
|
|
return Held{ .mutex = self };
|
|
};
|
|
}
|
|
|
|
fn acquireSlow(self: *Mutex) void {
|
|
// inlining the fast path and hiding *Slow()
|
|
// calls behind a @setCold(true) appears to
|
|
// improve performance in release builds.
|
|
@setCold(true);
|
|
while (true) {
|
|
|
|
// try and spin for a bit to acquire the mutex if theres currently no queue
|
|
var spin_count: u32 = SPIN_COUNT;
|
|
var state = @atomicLoad(usize, &self.state, .Monotonic);
|
|
while (spin_count != 0) : (spin_count -= 1) {
|
|
if (state & MUTEX_LOCK == 0) {
|
|
_ = @cmpxchgWeak(usize, &self.state, state, state | MUTEX_LOCK, .Acquire, .Monotonic) orelse return;
|
|
} else if (state & QUEUE_MASK == 0) {
|
|
break;
|
|
}
|
|
SpinLock.yield();
|
|
state = @atomicLoad(usize, &self.state, .Monotonic);
|
|
}
|
|
|
|
// create the ResetEvent node on the stack
|
|
// (faster than threadlocal on platforms like OSX)
|
|
var node: Node = undefined;
|
|
node.event = ResetEvent.init();
|
|
defer node.event.deinit();
|
|
|
|
// we've spun too long, try and add our node to the LIFO queue.
|
|
// if the mutex becomes available in the process, try and grab it instead.
|
|
while (true) {
|
|
if (state & MUTEX_LOCK == 0) {
|
|
_ = @cmpxchgWeak(usize, &self.state, state, state | MUTEX_LOCK, .Acquire, .Monotonic) orelse return;
|
|
} else {
|
|
node.next = @intToPtr(?*Node, state & QUEUE_MASK);
|
|
const new_state = @ptrToInt(&node) | (state & ~QUEUE_MASK);
|
|
_ = @cmpxchgWeak(usize, &self.state, state, new_state, .Release, .Monotonic) orelse {
|
|
node.event.wait();
|
|
break;
|
|
};
|
|
}
|
|
SpinLock.yield();
|
|
state = @atomicLoad(usize, &self.state, .Monotonic);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Returned when the lock is acquired. Call release to
|
|
/// release.
|
|
pub const Held = struct {
|
|
mutex: *Mutex,
|
|
|
|
/// Release the held lock.
|
|
pub fn release(self: Held) void {
|
|
// first, remove the lock bit so another possibly parallel acquire() can succeed.
|
|
// use .Sub since it can be usually compiled down more efficiency
|
|
// (`lock sub` on x86) vs .And ~MUTEX_LOCK (`lock cmpxchg` loop on x86)
|
|
const state = @atomicRmw(usize, &self.mutex.state, .Sub, MUTEX_LOCK, .Release);
|
|
|
|
// if the LIFO queue isnt locked and it has a node, try and wake up the node.
|
|
if ((state & QUEUE_LOCK) == 0 and (state & QUEUE_MASK) != 0)
|
|
self.mutex.releaseSlow();
|
|
}
|
|
};
|
|
|
|
fn releaseSlow(self: *Mutex) void {
|
|
@setCold(true);
|
|
|
|
// try and lock the LFIO queue to pop a node off,
|
|
// stopping altogether if its already locked or the queue is empty
|
|
var state = @atomicLoad(usize, &self.state, .Monotonic);
|
|
while (true) : (SpinLock.loopHint(1)) {
|
|
if (state & QUEUE_LOCK != 0 or state & QUEUE_MASK == 0)
|
|
return;
|
|
state = @cmpxchgWeak(usize, &self.state, state, state | QUEUE_LOCK, .Acquire, .Monotonic) orelse break;
|
|
}
|
|
|
|
// acquired the QUEUE_LOCK, try and pop a node to wake it.
|
|
// if the mutex is locked, then unset QUEUE_LOCK and let
|
|
// the thread who holds the mutex do the wake-up on unlock()
|
|
while (true) : (SpinLock.loopHint(1)) {
|
|
if ((state & MUTEX_LOCK) != 0) {
|
|
state = @cmpxchgWeak(usize, &self.state, state, state & ~QUEUE_LOCK, .Release, .Acquire) orelse return;
|
|
} else {
|
|
const node = @intToPtr(*Node, state & QUEUE_MASK);
|
|
const new_state = @ptrToInt(node.next);
|
|
state = @cmpxchgWeak(usize, &self.state, state, new_state, .Release, .Acquire) orelse {
|
|
node.event.set();
|
|
return;
|
|
};
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// for platforms without a known OS blocking
|
|
// primitive, default to SpinLock for correctness
|
|
else
|
|
SpinLock;
|
|
|
|
/// This has the sematics as `Mutex`, however it does not actually do any
|
|
/// synchronization. Operations are safety-checked no-ops.
|
|
pub const Dummy = struct {
|
|
lock: @TypeOf(lock_init),
|
|
|
|
const lock_init = if (std.debug.runtime_safety) false else {};
|
|
|
|
pub const Held = struct {
|
|
mutex: *Dummy,
|
|
|
|
pub fn release(self: Held) void {
|
|
if (std.debug.runtime_safety) {
|
|
self.mutex.lock = false;
|
|
}
|
|
}
|
|
};
|
|
|
|
/// Create a new mutex in unlocked state.
|
|
pub fn init() Dummy {
|
|
return Dummy{ .lock = lock_init };
|
|
}
|
|
|
|
/// Free a mutex created with init. Calling this while the
|
|
/// mutex is held is illegal behavior.
|
|
pub fn deinit(self: *Dummy) void {
|
|
self.* = undefined;
|
|
}
|
|
|
|
/// Try to acquire the mutex without blocking. Returns null if
|
|
/// the mutex is unavailable. Otherwise returns Held. Call
|
|
/// release on Held.
|
|
pub fn tryAcquire(self: *Dummy) ?Held {
|
|
if (std.debug.runtime_safety) {
|
|
if (self.lock) return null;
|
|
self.lock = true;
|
|
}
|
|
return Held{ .mutex = self };
|
|
}
|
|
|
|
/// Acquire the mutex. Will deadlock if the mutex is already
|
|
/// held by the calling thread.
|
|
pub fn acquire(self: *Dummy) Held {
|
|
return self.tryAcquire() orelse @panic("deadlock detected");
|
|
}
|
|
};
|
|
|
|
const TestContext = struct {
|
|
mutex: *Mutex,
|
|
data: i128,
|
|
|
|
const incr_count = 10000;
|
|
};
|
|
|
|
test "std.Mutex" {
|
|
var mutex = Mutex.init();
|
|
defer mutex.deinit();
|
|
|
|
var context = TestContext{
|
|
.mutex = &mutex,
|
|
.data = 0,
|
|
};
|
|
|
|
if (builtin.single_threaded) {
|
|
worker(&context);
|
|
testing.expect(context.data == TestContext.incr_count);
|
|
} else {
|
|
const thread_count = 10;
|
|
var threads: [thread_count]*std.Thread = undefined;
|
|
for (threads) |*t| {
|
|
t.* = try std.Thread.spawn(&context, worker);
|
|
}
|
|
for (threads) |t|
|
|
t.wait();
|
|
|
|
testing.expect(context.data == thread_count * TestContext.incr_count);
|
|
}
|
|
}
|
|
|
|
fn worker(ctx: *TestContext) void {
|
|
var i: usize = 0;
|
|
while (i != TestContext.incr_count) : (i += 1) {
|
|
const held = ctx.mutex.acquire();
|
|
defer held.release();
|
|
|
|
ctx.data += 1;
|
|
}
|
|
}
|