Merge pull request #3670 from Vexu/atomics-enum

Support atomic operations with enums
master
Andrew Kelley 2019-11-12 17:45:29 +00:00 committed by GitHub
commit e32b4829f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 99 additions and 58 deletions

View File

@ -12,12 +12,13 @@ pub fn Future(comptime T: type) type {
return struct {
lock: Lock,
data: T,
available: Available,
/// TODO make this an enum
/// 0 - not started
/// 1 - started
/// 2 - finished
available: u8,
const Available = enum(u8) {
NotStarted,
Started,
Finished,
};
const Self = @This();
const Queue = std.atomic.Queue(anyframe);
@ -34,7 +35,7 @@ pub fn Future(comptime T: type) type {
/// available.
/// Thread-safe.
pub async fn get(self: *Self) *T {
if (@atomicLoad(u8, &self.available, .SeqCst) == 2) {
if (@atomicLoad(Available, &self.available, .SeqCst) == .Finished) {
return &self.data;
}
const held = self.lock.acquire();
@ -46,7 +47,7 @@ pub fn Future(comptime T: type) type {
/// Gets the data without waiting for it. If it's available, a pointer is
/// returned. Otherwise, null is returned.
pub fn getOrNull(self: *Self) ?*T {
if (@atomicLoad(u8, &self.available, .SeqCst) == 2) {
if (@atomicLoad(Available, &self.available, .SeqCst) == .Finished) {
return &self.data;
} else {
return null;
@ -59,7 +60,7 @@ pub fn Future(comptime T: type) type {
/// It's not required to call start() before resolve() but it can be useful since
/// this method is thread-safe.
pub async fn start(self: *Self) ?*T {
const state = @cmpxchgStrong(u8, &self.available, 0, 1, .SeqCst, .SeqCst) orelse return null;
const state = @cmpxchgStrong(Available, &self.available, .NotStarted, .Started, .SeqCst, .SeqCst) orelse return null;
switch (state) {
1 => {
const held = self.lock.acquire();
@ -74,8 +75,8 @@ pub fn Future(comptime T: type) type {
/// Make the data become available. May be called only once.
/// Before calling this, modify the `data` property.
pub fn resolve(self: *Self) void {
const prev = @atomicRmw(u8, &self.available, .Xchg, 2, .SeqCst);
assert(prev == 0 or prev == 1); // resolve() called twice
const prev = @atomicRmw(Available, &self.available, .Xchg, .Finished, .SeqCst);
assert(prev != .Finished); // resolve() called twice
Lock.Held.release(Lock.Held{ .lock = &self.lock });
}
};

View File

@ -13,17 +13,17 @@ const Loop = std.event.Loop;
/// When a write lock is held, it will not be released until the writer queue is empty.
/// TODO: make this API also work in blocking I/O mode
pub const RwLock = struct {
shared_state: u8, // TODO make this an enum
shared_state: State,
writer_queue: Queue,
reader_queue: Queue,
writer_queue_empty_bit: u8, // TODO make this a bool
reader_queue_empty_bit: u8, // TODO make this a bool
reader_lock_count: usize,
const State = struct {
const Unlocked = 0;
const WriteLock = 1;
const ReadLock = 2;
const State = enum(u8) {
Unlocked,
WriteLock,
ReadLock,
};
const Queue = std.atomic.Queue(anyframe);
@ -41,7 +41,7 @@ pub const RwLock = struct {
}
_ = @atomicRmw(u8, &self.lock.reader_queue_empty_bit, .Xchg, 1, .SeqCst);
if (@cmpxchgStrong(u8, &self.lock.shared_state, State.ReadLock, State.Unlocked, .SeqCst, .SeqCst) != null) {
if (@cmpxchgStrong(State, &self.lock.shared_state, .ReadLock, .Unlocked, .SeqCst, .SeqCst) != null) {
// Didn't unlock. Someone else's problem.
return;
}
@ -64,7 +64,7 @@ pub const RwLock = struct {
// We need to release the write lock. Check if any readers are waiting to grab the lock.
if (@atomicLoad(u8, &self.lock.reader_queue_empty_bit, .SeqCst) == 0) {
// Switch to a read lock.
_ = @atomicRmw(u8, &self.lock.shared_state, .Xchg, State.ReadLock, .SeqCst);
_ = @atomicRmw(State, &self.lock.shared_state, .Xchg, .ReadLock, .SeqCst);
while (self.lock.reader_queue.get()) |node| {
global_event_loop.onNextTick(node);
}
@ -72,7 +72,7 @@ pub const RwLock = struct {
}
_ = @atomicRmw(u8, &self.lock.writer_queue_empty_bit, .Xchg, 1, .SeqCst);
_ = @atomicRmw(u8, &self.lock.shared_state, .Xchg, State.Unlocked, .SeqCst);
_ = @atomicRmw(State, &self.lock.shared_state, .Xchg, State.Unlocked, .SeqCst);
self.lock.commonPostUnlock();
}
@ -80,7 +80,7 @@ pub const RwLock = struct {
pub fn init() RwLock {
return RwLock{
.shared_state = State.Unlocked,
.shared_state = .Unlocked,
.writer_queue = Queue.init(),
.writer_queue_empty_bit = 1,
.reader_queue = Queue.init(),
@ -92,7 +92,7 @@ pub const RwLock = struct {
/// Must be called when not locked. Not thread safe.
/// All calls to acquire() and release() must complete before calling deinit().
pub fn deinit(self: *RwLock) void {
assert(self.shared_state == State.Unlocked);
assert(self.shared_state == .Unlocked);
while (self.writer_queue.get()) |node| resume node.data;
while (self.reader_queue.get()) |node| resume node.data;
}
@ -116,7 +116,7 @@ pub const RwLock = struct {
_ = @atomicRmw(u8, &self.reader_queue_empty_bit, .Xchg, 0, .SeqCst);
// Here we don't care if we are the one to do the locking or if it was already locked for reading.
const have_read_lock = if (@cmpxchgStrong(u8, &self.shared_state, State.Unlocked, State.ReadLock, .SeqCst, .SeqCst)) |old_state| old_state == State.ReadLock else true;
const have_read_lock = if (@cmpxchgStrong(State, &self.shared_state, .Unlocked, .ReadLock, .SeqCst, .SeqCst)) |old_state| old_state == .ReadLock else true;
if (have_read_lock) {
// Give out all the read locks.
if (self.reader_queue.get()) |first_node| {
@ -147,7 +147,7 @@ pub const RwLock = struct {
_ = @atomicRmw(u8, &self.writer_queue_empty_bit, .Xchg, 0, .SeqCst);
// Here we must be the one to acquire the write lock. It cannot already be locked.
if (@cmpxchgStrong(u8, &self.shared_state, State.Unlocked, State.WriteLock, .SeqCst, .SeqCst) == null) {
if (@cmpxchgStrong(State, &self.shared_state, .Unlocked, .WriteLock, .SeqCst, .SeqCst) == null) {
// We now have a write lock.
if (self.writer_queue.get()) |node| {
// Whether this node is us or someone else, we tail resume it.
@ -166,7 +166,7 @@ pub const RwLock = struct {
// But if there's a writer_queue item or a reader_queue item,
// we are the actor which must loop and attempt to grab the lock again.
if (@atomicLoad(u8, &self.writer_queue_empty_bit, .SeqCst) == 0) {
if (@cmpxchgStrong(u8, &self.shared_state, State.Unlocked, State.WriteLock, .SeqCst, .SeqCst) != null) {
if (@cmpxchgStrong(State, &self.shared_state, .Unlocked, .WriteLock, .SeqCst, .SeqCst) != null) {
// We did not obtain the lock. Great, the queues are someone else's problem.
return;
}
@ -177,12 +177,12 @@ pub const RwLock = struct {
}
// Release the lock again.
_ = @atomicRmw(u8, &self.writer_queue_empty_bit, .Xchg, 1, .SeqCst);
_ = @atomicRmw(u8, &self.shared_state, .Xchg, State.Unlocked, .SeqCst);
_ = @atomicRmw(State, &self.shared_state, .Xchg, .Unlocked, .SeqCst);
continue;
}
if (@atomicLoad(u8, &self.reader_queue_empty_bit, .SeqCst) == 0) {
if (@cmpxchgStrong(u8, &self.shared_state, State.Unlocked, State.ReadLock, .SeqCst, .SeqCst) != null) {
if (@cmpxchgStrong(State, &self.shared_state, .Unlocked, .ReadLock, .SeqCst, .SeqCst) != null) {
// We did not obtain the lock. Great, the queues are someone else's problem.
return;
}
@ -196,7 +196,7 @@ pub const RwLock = struct {
}
// Release the lock again.
_ = @atomicRmw(u8, &self.reader_queue_empty_bit, .Xchg, 1, .SeqCst);
if (@cmpxchgStrong(u8, &self.shared_state, State.ReadLock, State.Unlocked, .SeqCst, .SeqCst) != null) {
if (@cmpxchgStrong(State, &self.shared_state, .ReadLock, .Unlocked, .SeqCst, .SeqCst) != null) {
// Didn't unlock. Someone else's problem.
return;
}

View File

@ -1,24 +1,26 @@
const std = @import("std.zig");
const builtin = @import("builtin");
const assert = std.debug.assert;
const testing = std.testing;
const AtomicRmwOp = builtin.AtomicRmwOp;
const AtomicOrder = builtin.AtomicOrder;
/// Thread-safe initialization of global data.
/// TODO use a mutex instead of a spinlock
pub fn lazyInit(comptime T: type) LazyInit(T) {
return LazyInit(T){
.data = undefined,
.state = 0,
};
}
fn LazyInit(comptime T: type) type {
return struct {
state: u8, // TODO make this an enum
state: State = .NotResolved,
data: Data,
const State = enum(u8) {
NotResolved,
Resolving,
Resolved,
};
const Self = @This();
// TODO this isn't working for void, investigate and then remove this special case
@ -30,14 +32,14 @@ fn LazyInit(comptime T: type) type {
/// perform the initialization and then call resolve().
pub fn get(self: *Self) ?Ptr {
while (true) {
var state = @cmpxchgWeak(u8, &self.state, 0, 1, AtomicOrder.SeqCst, AtomicOrder.SeqCst) orelse return null;
var state = @cmpxchgWeak(State, &self.state, .NotResolved, .Resolving, .SeqCst, .SeqCst) orelse return null;
switch (state) {
0 => continue,
1 => {
.NotResolved => continue,
.Resolving => {
// TODO mutex instead of a spinlock
continue;
},
2 => {
.Resolved => {
if (@sizeOf(T) == 0) {
return @as(T, undefined);
} else {
@ -50,8 +52,8 @@ fn LazyInit(comptime T: type) type {
}
pub fn resolve(self: *Self) void {
const prev = @atomicRmw(u8, &self.state, AtomicRmwOp.Xchg, 2, AtomicOrder.SeqCst);
assert(prev == 1); // resolve() called twice
const prev = @atomicRmw(State, &self.state, .Xchg, .Resolved, .SeqCst);
assert(prev != .Resolved); // resolve() called twice
}
};
}

View File

@ -39,12 +39,14 @@ pub const Mutex = if (builtin.single_threaded)
}
else
struct {
state: u32, // TODO: make this an enum
state: State, // TODO: make this an enum
parker: ThreadParker,
const Unlocked = 0;
const Sleeping = 1;
const Locked = 2;
const State = enum(u32) {
Unlocked,
Sleeping,
Locked,
};
/// number of iterations to spin yielding the cpu
const SPIN_CPU = 4;
@ -57,7 +59,7 @@ else
pub fn init() Mutex {
return Mutex{
.state = Unlocked,
.state = .Unlocked,
.parker = ThreadParker.init(),
};
}
@ -70,10 +72,10 @@ else
mutex: *Mutex,
pub fn release(self: Held) void {
switch (@atomicRmw(u32, &self.mutex.state, .Xchg, Unlocked, .Release)) {
Locked => {},
Sleeping => self.mutex.parker.unpark(&self.mutex.state),
Unlocked => unreachable, // unlocking an unlocked mutex
switch (@atomicRmw(State, &self.mutex.state, .Xchg, .Unlocked, .Release)) {
.Locked => {},
.Sleeping => self.mutex.parker.unpark(@ptrCast(*const u32, &self.mutex.state)),
.Unlocked => unreachable, // unlocking an unlocked mutex
else => unreachable, // should never be anything else
}
}
@ -83,34 +85,34 @@ else
// Try and speculatively grab the lock.
// If it fails, the state is either Locked or Sleeping
// depending on if theres a thread stuck sleeping below.
var state = @atomicRmw(u32, &self.state, .Xchg, Locked, .Acquire);
if (state == Unlocked)
var state = @atomicRmw(State, &self.state, .Xchg, .Locked, .Acquire);
if (state == .Unlocked)
return Held{ .mutex = self };
while (true) {
// try and acquire the lock using cpu spinning on failure
var spin: usize = 0;
while (spin < SPIN_CPU) : (spin += 1) {
var value = @atomicLoad(u32, &self.state, .Monotonic);
while (value == Unlocked)
value = @cmpxchgWeak(u32, &self.state, Unlocked, state, .Acquire, .Monotonic) orelse return Held{ .mutex = self };
var value = @atomicLoad(State, &self.state, .Monotonic);
while (value == .Unlocked)
value = @cmpxchgWeak(State, &self.state, .Unlocked, state, .Acquire, .Monotonic) orelse return Held{ .mutex = self };
SpinLock.yield(SPIN_CPU_COUNT);
}
// try and acquire the lock using thread rescheduling on failure
spin = 0;
while (spin < SPIN_THREAD) : (spin += 1) {
var value = @atomicLoad(u32, &self.state, .Monotonic);
while (value == Unlocked)
value = @cmpxchgWeak(u32, &self.state, Unlocked, state, .Acquire, .Monotonic) orelse return Held{ .mutex = self };
var value = @atomicLoad(State, &self.state, .Monotonic);
while (value == .Unlocked)
value = @cmpxchgWeak(State, &self.state, .Unlocked, state, .Acquire, .Monotonic) orelse return Held{ .mutex = self };
std.os.sched_yield() catch std.time.sleep(1);
}
// failed to acquire the lock, go to sleep until woken up by `Held.release()`
if (@atomicRmw(u32, &self.state, .Xchg, Sleeping, .Acquire) == Unlocked)
if (@atomicRmw(State, &self.state, .Xchg, .Sleeping, .Acquire) == .Unlocked)
return Held{ .mutex = self };
state = Sleeping;
self.parker.park(&self.state, Sleeping);
state = .Sleeping;
self.parker.park(@ptrCast(*const u32, &self.state), @enumToInt(State.Sleeping));
}
}
};

View File

@ -25621,9 +25621,29 @@ static ZigType *ir_resolve_atomic_operand_type(IrAnalyze *ira, IrInstruction *op
buf_sprintf("%" PRIu32 "-bit integer type is not a power of 2", operand_type->data.integral.bit_count));
return ira->codegen->builtin_types.entry_invalid;
}
} else if (operand_type->id == ZigTypeIdEnum) {
ZigType *int_type = operand_type->data.enumeration.tag_int_type;
if (int_type->data.integral.bit_count < 8) {
ir_add_error(ira, op,
buf_sprintf("expected enum tag type 8 bits or larger, found %" PRIu32 "-bit tag type",
int_type->data.integral.bit_count));
return ira->codegen->builtin_types.entry_invalid;
}
uint32_t max_atomic_bits = target_arch_largest_atomic_bits(ira->codegen->zig_target->arch);
if (int_type->data.integral.bit_count > max_atomic_bits) {
ir_add_error(ira, op,
buf_sprintf("expected %" PRIu32 "-bit enum tag type or smaller, found %" PRIu32 "-bit tag type",
max_atomic_bits, int_type->data.integral.bit_count));
return ira->codegen->builtin_types.entry_invalid;
}
if (!is_power_of_2(int_type->data.integral.bit_count)) {
ir_add_error(ira, op,
buf_sprintf("%" PRIu32 "-bit enum tag type is not a power of 2", int_type->data.integral.bit_count));
return ira->codegen->builtin_types.entry_invalid;
}
} else if (get_codegen_ptr_type(operand_type) == nullptr) {
ir_add_error(ira, op,
buf_sprintf("expected integer or pointer type, found '%s'", buf_ptr(&operand_type->name)));
buf_sprintf("expected integer, enum or pointer type, found '%s'", buf_ptr(&operand_type->name)));
return ira->codegen->builtin_types.entry_invalid;
}

View File

@ -107,3 +107,19 @@ test "cmpxchg on a global variable" {
_ = @cmpxchgWeak(u32, &a_global_variable, 1234, 42, .Acquire, .Monotonic);
expectEqual(@as(u32, 42), a_global_variable);
}
test "atomic load and rmw with enum" {
const Value = enum(u8) {
a,
b,
c,
};
var x = Value.a;
expect(@atomicLoad(Value, &x, .SeqCst) != .b);
_ = @atomicRmw(Value, &x, .Xchg, .c, .SeqCst);
expect(@atomicLoad(Value, &x, .SeqCst) == .c);
expect(@atomicLoad(Value, &x, .SeqCst) != .a);
expect(@atomicLoad(Value, &x, .SeqCst) != .b);
}