Add more comments & cleanup AutoResetEvent

master
kprotty 2020-10-11 19:16:07 -05:00
parent e9a4c3dd82
commit 12508025a4
2 changed files with 109 additions and 74 deletions

View File

@ -43,99 +43,132 @@ pub const AutoResetEvent = struct {
const event_align = std.math.max(@alignOf(std.ResetEvent), 2);
pub fn wait(self: *AutoResetEvent) void {
self.waitInner(null) catch unreachable;
self.waitFor(null) catch unreachable;
}
pub fn timedWait(self: *AutoResetEvent, timeout: u64) error{TimedOut}!void {
return self.waitInner(timeout);
return self.waitFor(timeout);
}
fn waitInner(self: *AutoResetEvent, timeout: ?u64) error{TimedOut}!void {
// the local ResetEvent is lazily initialized
var has_reset_event = false;
fn waitFor(self: *AutoResetEvent, timeout: ?u64) error{TimedOut}!void {
// lazily initialized std.ResetEvent
var reset_event: std.ResetEvent align(event_align) = undefined;
var has_reset_event = false;
defer if (has_reset_event) {
reset_event.deinit();
};
var state = @atomicLoad(usize, &self.state, .SeqCst);
while (true) {
switch (state) {
UNSET => {
if (!has_reset_event) {
has_reset_event = true;
reset_event = std.ResetEvent.init();
}
state = @cmpxchgWeak(
usize,
&self.state,
state,
@ptrToInt(&reset_event),
.SeqCst,
.SeqCst,
) orelse {
if (timeout) |timeout_ns| {
reset_event.timedWait(timeout_ns) catch {
state = @cmpxchgStrong(
usize,
&self.state,
@ptrToInt(&reset_event),
UNSET,
.SeqCst,
.SeqCst,
) orelse return error.TimedOut;
assert(state == SET);
reset_event.wait();
};
} else {
reset_event.wait();
}
return;
};
},
SET => {
@atomicStore(usize, &self.state, UNSET, .SeqCst);
return;
},
else => {
unreachable; // multiple waiters on the same Event
}
// consume a notification if there is any
if (state == SET) {
@atomicStore(usize, &self.state, UNSET, .SeqCst);
return;
}
// check if theres currently a pending ResetEvent pointer already registered
if (state != UNSET) {
unreachable; // multiple waiting threads on the same AutoResetEvent
}
// lazily initialize the ResetEvent if it hasn't been already
if (!has_reset_event) {
has_reset_event = true;
reset_event = std.ResetEvent.init();
}
// Since the AutoResetEvent currently isnt set,
// try to register our ResetEvent on it to wait
// for a set() call from another thread.
if (@cmpxchgWeak(
usize,
&self.state,
UNSET,
@ptrToInt(&reset_event),
.SeqCst,
.SeqCst,
)) |new_state| {
state = new_state;
continue;
}
// if no timeout was specified, then just wait forever
const timeout_ns = timeout orelse {
reset_event.wait();
return;
};
// wait with a timeout and return if signalled via set()
if (reset_event.timedWait(timeout_ns)) |_| {
return;
} else |timed_out| {}
// If we timed out, we need to transition the AutoResetEvent back to UNSET.
// If we don't, then when we return, a set() thread could observe a pointer to an invalid ResetEvent.
state = @cmpxchgStrong(
usize,
&self.state,
@ptrToInt(&reset_event),
UNSET,
.SeqCst,
.SeqCst,
) orelse return error.TimedOut;
// We didn't manage to unregister ourselves from the state.
if (state == SET) {
unreachable; // AutoResetEvent notified without waking up the waiting thread
} else if (state != UNSET) {
unreachable; // multiple waiting threads on the same AutoResetEvent observed when timing out
}
// This menas a set() thread saw our ResetEvent pointer, acquired it, and is trying to wake it up.
// We need to wait for it to wake up our ResetEvent before we can return and invalidate it.
// We don't return error.TimedOut here as it technically notified us while we were "timing out".
reset_event.wait();
return;
}
}
pub fn set(self: *AutoResetEvent) void {
var state = @atomicLoad(usize, &self.state, .SeqCst);
while (true) {
switch (state) {
UNSET => {
state = @cmpxchgWeak(
usize,
&self.state,
state,
SET,
.SeqCst,
.SeqCst,
) orelse return;
},
SET => {
return;
},
else => |reset_event_ptr| {
state = @cmpxchgWeak(
usize,
&self.state,
state,
UNSET,
.SeqCst,
.SeqCst,
) orelse {
const reset_event = @intToPtr(*align(event_align) std.ResetEvent, reset_event_ptr);
reset_event.set();
return;
};
}
// If the AutoResetEvent is already set, there is nothing else left to do
if (state == SET) {
return;
}
// If the AutoResetEvent isn't set,
// then try to leave a notification for the wait() thread that we set() it.
if (state == UNSET) {
state = @cmpxchgWeak(
usize,
&self.state,
UNSET,
SET,
.SeqCst,
.SeqCst,
) orelse return;
continue;
}
// There is a ResetEvent pointer registered on the AutoResetEvent event thats waiting.
// Try to acquire ownership of it so that we can wake it up.
// This also resets the AutoResetEvent so that there is no race condition as defined above.
if (@cmpxchgWeak(
usize,
&self.state,
state,
UNSET,
.SeqCst,
.SeqCst,
)) |new_state| {
state = new_state;
continue;
}
const reset_event = @intToPtr(*align(event_align) std.ResetEvent, state);
reset_event.set();
return;
}
}
};
@ -161,7 +194,6 @@ test "std.AutoResetEvent" {
const Self = @This();
fn sender(self: *Self) void {
std.debug.print("\n", .{});
testing.expect(self.value == 0);
self.value = 1;
self.out.set();

View File

@ -862,8 +862,11 @@ pub const Loop = struct {
const held = self.entries.mutex.acquire();
defer held.release();
// starting from the head
var head = self.entries.head orelse return null;
// traverse the list of waiting entires to
// find the Node with the smallest `expires` field
var min = head;
while (head.next) |node| {
const minEntry = @fieldParentPtr(Entry, "node", min);