Merge pull request #3974 from kprotty/event_broadcast
ResetEvent fixes & broadcast notify
This commit is contained in:
commit
25e71216c4
@ -220,6 +220,7 @@ pub const PTHREAD_COND_INITIALIZER = pthread_cond_t{};
|
|||||||
pub extern "c" fn pthread_cond_wait(noalias cond: *pthread_cond_t, noalias mutex: *pthread_mutex_t) c_int;
|
pub extern "c" fn pthread_cond_wait(noalias cond: *pthread_cond_t, noalias mutex: *pthread_mutex_t) c_int;
|
||||||
pub extern "c" fn pthread_cond_timedwait(noalias cond: *pthread_cond_t, noalias mutex: *pthread_mutex_t, noalias abstime: *const timespec) c_int;
|
pub extern "c" fn pthread_cond_timedwait(noalias cond: *pthread_cond_t, noalias mutex: *pthread_mutex_t, noalias abstime: *const timespec) c_int;
|
||||||
pub extern "c" fn pthread_cond_signal(cond: *pthread_cond_t) c_int;
|
pub extern "c" fn pthread_cond_signal(cond: *pthread_cond_t) c_int;
|
||||||
|
pub extern "c" fn pthread_cond_broadcast(cond: *pthread_cond_t) c_int;
|
||||||
pub extern "c" fn pthread_cond_destroy(cond: *pthread_cond_t) c_int;
|
pub extern "c" fn pthread_cond_destroy(cond: *pthread_cond_t) c_int;
|
||||||
|
|
||||||
pub const pthread_t = *@OpaqueType();
|
pub const pthread_t = *@OpaqueType();
|
||||||
|
@ -1,6 +1,8 @@
|
|||||||
const std = @import("std.zig");
|
const std = @import("std.zig");
|
||||||
const builtin = @import("builtin");
|
const builtin = @import("builtin");
|
||||||
const os = std.os;
|
const os = std.os;
|
||||||
|
const assert = std.debug.assert;
|
||||||
|
const windows = os.windows;
|
||||||
const testing = std.testing;
|
const testing = std.testing;
|
||||||
const SpinLock = std.SpinLock;
|
const SpinLock = std.SpinLock;
|
||||||
const ResetEvent = std.ResetEvent;
|
const ResetEvent = std.ResetEvent;
|
||||||
@ -73,20 +75,33 @@ else if (builtin.os == .windows)
|
|||||||
return self.tryAcquire() orelse self.acquireSlow();
|
return self.tryAcquire() orelse self.acquireSlow();
|
||||||
}
|
}
|
||||||
|
|
||||||
fn acquireSlow(self: *Mutex) Held {
|
fn acquireSpinning(self: *Mutex) Held {
|
||||||
@setCold(true);
|
@setCold(true);
|
||||||
|
while (true) : (SpinLock.yield()) {
|
||||||
|
return self.tryAcquire() orelse continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn acquireSlow(self: *Mutex) Held {
|
||||||
|
// try to use NT keyed events for blocking, falling back to spinlock if unavailable
|
||||||
|
@setCold(true);
|
||||||
|
const handle = ResetEvent.OsEvent.Futex.getEventHandle() orelse return self.acquireSpinning();
|
||||||
|
const key = @ptrCast(*const c_void, &self.waiters);
|
||||||
|
|
||||||
while (true) : (SpinLock.loopHint(1)) {
|
while (true) : (SpinLock.loopHint(1)) {
|
||||||
const waiters = @atomicLoad(u32, &self.waiters, .Monotonic);
|
const waiters = @atomicLoad(u32, &self.waiters, .Monotonic);
|
||||||
|
|
||||||
// try and take lock if unlocked
|
// try and take lock if unlocked
|
||||||
if ((waiters & 1) == 0) {
|
if ((waiters & 1) == 0) {
|
||||||
if (@atomicRmw(u8, &self.locked, .Xchg, 1, .Acquire) == 0)
|
if (@atomicRmw(u8, &self.locked, .Xchg, 1, .Acquire) == 0) {
|
||||||
return Held{ .mutex = self };
|
return Held{ .mutex = self };
|
||||||
|
}
|
||||||
|
|
||||||
// otherwise, try and update the waiting count.
|
// otherwise, try and update the waiting count.
|
||||||
// then unset the WAKE bit so that another unlocker can wake up a thread.
|
// then unset the WAKE bit so that another unlocker can wake up a thread.
|
||||||
} else if (@cmpxchgWeak(u32, &self.waiters, waiters, (waiters + WAIT) | 1, .Monotonic, .Monotonic) == null) {
|
} else if (@cmpxchgWeak(u32, &self.waiters, waiters, (waiters + WAIT) | 1, .Monotonic, .Monotonic) == null) {
|
||||||
ResetEvent.OsEvent.Futex.wait(@ptrCast(*i32, &self.waiters), undefined, null) catch unreachable;
|
const rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, null);
|
||||||
|
assert(rc == 0);
|
||||||
_ = @atomicRmw(u32, &self.waiters, .Sub, WAKE, .Monotonic);
|
_ = @atomicRmw(u32, &self.waiters, .Sub, WAKE, .Monotonic);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -98,6 +113,8 @@ else if (builtin.os == .windows)
|
|||||||
pub fn release(self: Held) void {
|
pub fn release(self: Held) void {
|
||||||
// unlock without a rmw/cmpxchg instruction
|
// unlock without a rmw/cmpxchg instruction
|
||||||
@atomicStore(u8, @ptrCast(*u8, &self.mutex.locked), 0, .Release);
|
@atomicStore(u8, @ptrCast(*u8, &self.mutex.locked), 0, .Release);
|
||||||
|
const handle = ResetEvent.OsEvent.Futex.getEventHandle() orelse return;
|
||||||
|
const key = @ptrCast(*const c_void, &self.mutex.waiters);
|
||||||
|
|
||||||
while (true) : (SpinLock.loopHint(1)) {
|
while (true) : (SpinLock.loopHint(1)) {
|
||||||
const waiters = @atomicLoad(u32, &self.mutex.waiters, .Monotonic);
|
const waiters = @atomicLoad(u32, &self.mutex.waiters, .Monotonic);
|
||||||
@ -110,8 +127,11 @@ else if (builtin.os == .windows)
|
|||||||
if (waiters & WAKE != 0) return;
|
if (waiters & WAKE != 0) return;
|
||||||
|
|
||||||
// try to decrease the waiter count & set the WAKE bit meaning a thread is waking up
|
// try to decrease the waiter count & set the WAKE bit meaning a thread is waking up
|
||||||
if (@cmpxchgWeak(u32, &self.mutex.waiters, waiters, waiters - WAIT + WAKE, .Release, .Monotonic) == null)
|
if (@cmpxchgWeak(u32, &self.mutex.waiters, waiters, waiters - WAIT + WAKE, .Release, .Monotonic) == null) {
|
||||||
return ResetEvent.OsEvent.Futex.wake(@ptrCast(*i32, &self.mutex.waiters));
|
const rc = windows.ntdll.NtReleaseKeyedEvent(handle, key, windows.FALSE, null);
|
||||||
|
assert(rc == 0);
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -36,7 +36,7 @@ pub const ResetEvent = struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Sets the event if not already set and
|
/// Sets the event if not already set and
|
||||||
/// wakes up at least one thread waiting the event.
|
/// wakes up all the threads waiting on the event.
|
||||||
pub fn set(self: *ResetEvent) void {
|
pub fn set(self: *ResetEvent) void {
|
||||||
return self.os_event.set();
|
return self.os_event.set();
|
||||||
}
|
}
|
||||||
@ -135,7 +135,7 @@ const PosixEvent = struct {
|
|||||||
|
|
||||||
if (!self.is_set) {
|
if (!self.is_set) {
|
||||||
self.is_set = true;
|
self.is_set = true;
|
||||||
assert(c.pthread_cond_signal(&self.cond) == 0);
|
assert(c.pthread_cond_broadcast(&self.cond) == 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -181,40 +181,39 @@ const PosixEvent = struct {
|
|||||||
};
|
};
|
||||||
|
|
||||||
const AtomicEvent = struct {
|
const AtomicEvent = struct {
|
||||||
state: State,
|
waiters: u32,
|
||||||
|
|
||||||
const State = enum(i32) {
|
const WAKE = 1 << 0;
|
||||||
Empty,
|
const WAIT = 1 << 1;
|
||||||
Waiting,
|
|
||||||
Signaled,
|
|
||||||
};
|
|
||||||
|
|
||||||
fn init() AtomicEvent {
|
fn init() AtomicEvent {
|
||||||
return AtomicEvent{ .state = .Empty };
|
return AtomicEvent{ .waiters = 0 };
|
||||||
}
|
}
|
||||||
|
|
||||||
fn deinit(self: *AtomicEvent) void {
|
fn deinit(self: *AtomicEvent) void {
|
||||||
self.* = undefined;
|
self.* = undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn isSet(self: *AtomicEvent) bool {
|
fn isSet(self: *const AtomicEvent) bool {
|
||||||
return @atomicLoad(State, &self.state, .Acquire) == .Signaled;
|
return @atomicLoad(u32, &self.waiters, .Acquire) == WAKE;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn reset(self: *AtomicEvent) void {
|
fn reset(self: *AtomicEvent) void {
|
||||||
@atomicStore(State, &self.state, .Empty, .Monotonic);
|
@atomicStore(u32, &self.waiters, 0, .Monotonic);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn set(self: *AtomicEvent) void {
|
fn set(self: *AtomicEvent) void {
|
||||||
if (@atomicRmw(State, &self.state, .Xchg, .Signaled, .Release) == .Waiting)
|
const waiters = @atomicRmw(u32, &self.waiters, .Xchg, WAKE, .Release);
|
||||||
Futex.wake(@ptrCast(*i32, &self.state));
|
if (waiters >= WAIT) {
|
||||||
|
return Futex.wake(&self.waiters, waiters >> 1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn wait(self: *AtomicEvent, timeout: ?u64) !void {
|
fn wait(self: *AtomicEvent, timeout: ?u64) !void {
|
||||||
var state = @atomicLoad(State, &self.state, .Monotonic);
|
var waiters = @atomicLoad(u32, &self.waiters, .Acquire);
|
||||||
while (state == .Empty) {
|
while (waiters != WAKE) {
|
||||||
state = @cmpxchgWeak(State, &self.state, .Empty, .Waiting, .Acquire, .Monotonic) orelse
|
waiters = @cmpxchgWeak(u32, &self.waiters, waiters, waiters + WAIT, .Acquire, .Acquire)
|
||||||
return Futex.wait(@ptrCast(*i32, &self.state), @enumToInt(State.Waiting), timeout);
|
orelse return Futex.wait(&self.waiters, timeout);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -225,15 +224,15 @@ const AtomicEvent = struct {
|
|||||||
};
|
};
|
||||||
|
|
||||||
const SpinFutex = struct {
|
const SpinFutex = struct {
|
||||||
fn wake(ptr: *i32) void {}
|
fn wake(waiters: *u32, wake_count: u32) void {}
|
||||||
|
|
||||||
fn wait(ptr: *i32, expected: i32, timeout: ?u64) !void {
|
fn wait(waiters: *u32, timeout: ?u64) !void {
|
||||||
// TODO: handle platforms where a monotonic timer isnt available
|
// TODO: handle platforms where a monotonic timer isnt available
|
||||||
var timer: time.Timer = undefined;
|
var timer: time.Timer = undefined;
|
||||||
if (timeout != null)
|
if (timeout != null)
|
||||||
timer = time.Timer.start() catch unreachable;
|
timer = time.Timer.start() catch unreachable;
|
||||||
|
|
||||||
while (@atomicLoad(i32, ptr, .Acquire) == expected) {
|
while (@atomicLoad(u32, waiters, .Acquire) != WAKE) {
|
||||||
SpinLock.yield();
|
SpinLock.yield();
|
||||||
if (timeout) |timeout_ns| {
|
if (timeout) |timeout_ns| {
|
||||||
if (timer.read() >= timeout_ns)
|
if (timer.read() >= timeout_ns)
|
||||||
@ -244,12 +243,14 @@ const AtomicEvent = struct {
|
|||||||
};
|
};
|
||||||
|
|
||||||
const LinuxFutex = struct {
|
const LinuxFutex = struct {
|
||||||
fn wake(ptr: *i32) void {
|
fn wake(waiters: *u32, wake_count: u32) void {
|
||||||
const rc = linux.futex_wake(ptr, linux.FUTEX_WAKE | linux.FUTEX_PRIVATE_FLAG, 1);
|
const waiting = std.math.maxInt(i32); // wake_count
|
||||||
|
const ptr = @ptrCast(*const i32, waiters);
|
||||||
|
const rc = linux.futex_wake(ptr, linux.FUTEX_WAKE | linux.FUTEX_PRIVATE_FLAG, waiting);
|
||||||
assert(linux.getErrno(rc) == 0);
|
assert(linux.getErrno(rc) == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn wait(ptr: *i32, expected: i32, timeout: ?u64) !void {
|
fn wait(waiters: *u32, timeout: ?u64) !void {
|
||||||
var ts: linux.timespec = undefined;
|
var ts: linux.timespec = undefined;
|
||||||
var ts_ptr: ?*linux.timespec = null;
|
var ts_ptr: ?*linux.timespec = null;
|
||||||
if (timeout) |timeout_ns| {
|
if (timeout) |timeout_ns| {
|
||||||
@ -258,7 +259,12 @@ const AtomicEvent = struct {
|
|||||||
ts.tv_nsec = @intCast(isize, timeout_ns % time.ns_per_s);
|
ts.tv_nsec = @intCast(isize, timeout_ns % time.ns_per_s);
|
||||||
}
|
}
|
||||||
|
|
||||||
while (@atomicLoad(i32, ptr, .Acquire) == expected) {
|
while (true) {
|
||||||
|
const waiting = @atomicLoad(u32, waiters, .Acquire);
|
||||||
|
if (waiting == WAKE)
|
||||||
|
return;
|
||||||
|
const expected = @intCast(i32, waiting);
|
||||||
|
const ptr = @ptrCast(*const i32, waiters);
|
||||||
const rc = linux.futex_wait(ptr, linux.FUTEX_WAIT | linux.FUTEX_PRIVATE_FLAG, expected, ts_ptr);
|
const rc = linux.futex_wait(ptr, linux.FUTEX_WAIT | linux.FUTEX_PRIVATE_FLAG, expected, ts_ptr);
|
||||||
switch (linux.getErrno(rc)) {
|
switch (linux.getErrno(rc)) {
|
||||||
0 => continue,
|
0 => continue,
|
||||||
@ -272,15 +278,20 @@ const AtomicEvent = struct {
|
|||||||
};
|
};
|
||||||
|
|
||||||
const WindowsFutex = struct {
|
const WindowsFutex = struct {
|
||||||
pub fn wake(ptr: *i32) void {
|
pub fn wake(waiters: *u32, wake_count: u32) void {
|
||||||
const handle = getEventHandle() orelse return SpinFutex.wake(ptr);
|
const handle = getEventHandle() orelse return SpinFutex.wake(waiters, wake_count);
|
||||||
const key = @ptrCast(*const c_void, ptr);
|
const key = @ptrCast(*const c_void, waiters);
|
||||||
const rc = windows.ntdll.NtReleaseKeyedEvent(handle, key, windows.FALSE, null);
|
|
||||||
assert(rc == 0);
|
var waiting = wake_count;
|
||||||
|
while (waiting != 0) : (waiting -= 1) {
|
||||||
|
const rc = windows.ntdll.NtReleaseKeyedEvent(handle, key, windows.FALSE, null);
|
||||||
|
assert(rc == 0);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn wait(ptr: *i32, expected: i32, timeout: ?u64) !void {
|
pub fn wait(waiters: *u32, timeout: ?u64) !void {
|
||||||
const handle = getEventHandle() orelse return SpinFutex.wait(ptr, expected, timeout);
|
const handle = getEventHandle() orelse return SpinFutex.wait(waiters, timeout);
|
||||||
|
const key = @ptrCast(*const c_void, waiters);
|
||||||
|
|
||||||
// NT uses timeouts in units of 100ns with negative value being relative
|
// NT uses timeouts in units of 100ns with negative value being relative
|
||||||
var timeout_ptr: ?*windows.LARGE_INTEGER = null;
|
var timeout_ptr: ?*windows.LARGE_INTEGER = null;
|
||||||
@ -291,10 +302,26 @@ const AtomicEvent = struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NtWaitForKeyedEvent doesnt have spurious wake-ups
|
// NtWaitForKeyedEvent doesnt have spurious wake-ups
|
||||||
const key = @ptrCast(*const c_void, ptr);
|
var rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, timeout_ptr);
|
||||||
const rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, timeout_ptr);
|
|
||||||
switch (rc) {
|
switch (rc) {
|
||||||
windows.WAIT_TIMEOUT => return error.TimedOut,
|
windows.WAIT_TIMEOUT => {
|
||||||
|
// update the wait count to signal that we're not waiting anymore.
|
||||||
|
// if the .set() thread already observed that we are, perform a
|
||||||
|
// matching NtWaitForKeyedEvent so that the .set() thread doesn't
|
||||||
|
// deadlock trying to run NtReleaseKeyedEvent above.
|
||||||
|
var waiting = @atomicLoad(u32, waiters, .Monotonic);
|
||||||
|
while (true) {
|
||||||
|
if (waiting == WAKE) {
|
||||||
|
rc = windows.ntdll.NtWaitForKeyedEvent(handle, key, windows.FALSE, null);
|
||||||
|
assert(rc == windows.WAIT_OBJECT_0);
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
waiting = @cmpxchgWeak(u32, waiters, waiting, waiting - WAIT, .Acquire, .Monotonic) orelse break;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return error.TimedOut;
|
||||||
|
},
|
||||||
windows.WAIT_OBJECT_0 => {},
|
windows.WAIT_OBJECT_0 => {},
|
||||||
else => unreachable,
|
else => unreachable,
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user