2018-07-11 12:58:48 -07:00
|
|
|
const std = @import("../index.zig");
|
|
|
|
const assert = std.debug.assert;
|
|
|
|
const builtin = @import("builtin");
|
|
|
|
const AtomicRmwOp = builtin.AtomicRmwOp;
|
|
|
|
const AtomicOrder = builtin.AtomicOrder;
|
|
|
|
const Lock = std.event.Lock;
|
|
|
|
const Loop = std.event.Loop;
|
|
|
|
|
2018-07-16 17:52:50 -07:00
|
|
|
/// This is a value that starts out unavailable, until resolve() is called
|
2018-07-11 12:58:48 -07:00
|
|
|
/// While it is unavailable, coroutines suspend when they try to get() it,
|
2018-07-16 17:52:50 -07:00
|
|
|
/// and then are resumed when resolve() is called.
|
|
|
|
/// At this point the value remains forever available, and another resolve() is not allowed.
|
2018-07-11 12:58:48 -07:00
|
|
|
pub fn Future(comptime T: type) type {
|
|
|
|
return struct {
|
|
|
|
lock: Lock,
|
|
|
|
data: T,
|
2018-07-16 17:52:50 -07:00
|
|
|
|
|
|
|
/// TODO make this an enum
|
|
|
|
/// 0 - not started
|
|
|
|
/// 1 - started
|
|
|
|
/// 2 - finished
|
|
|
|
available: u8,
|
2018-07-11 12:58:48 -07:00
|
|
|
|
|
|
|
const Self = this;
|
2018-07-11 16:38:01 -07:00
|
|
|
const Queue = std.atomic.Queue(promise);
|
2018-07-11 12:58:48 -07:00
|
|
|
|
|
|
|
pub fn init(loop: *Loop) Self {
|
|
|
|
return Self{
|
|
|
|
.lock = Lock.initLocked(loop),
|
|
|
|
.available = 0,
|
|
|
|
.data = undefined,
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Obtain the value. If it's not available, wait until it becomes
|
|
|
|
/// available.
|
|
|
|
/// Thread-safe.
|
2018-07-11 16:38:01 -07:00
|
|
|
pub async fn get(self: *Self) *T {
|
2018-07-16 17:52:50 -07:00
|
|
|
if (@atomicLoad(u8, &self.available, AtomicOrder.SeqCst) == 2) {
|
2018-07-11 16:38:01 -07:00
|
|
|
return &self.data;
|
2018-07-11 12:58:48 -07:00
|
|
|
}
|
|
|
|
const held = await (async self.lock.acquire() catch unreachable);
|
2018-07-11 16:38:01 -07:00
|
|
|
held.release();
|
2018-07-11 12:58:48 -07:00
|
|
|
|
2018-07-11 16:38:01 -07:00
|
|
|
return &self.data;
|
2018-07-11 12:58:48 -07:00
|
|
|
}
|
|
|
|
|
2018-07-14 21:04:12 -07:00
|
|
|
/// Gets the data without waiting for it. If it's available, a pointer is
|
|
|
|
/// returned. Otherwise, null is returned.
|
|
|
|
pub fn getOrNull(self: *Self) ?*T {
|
2018-07-16 17:52:50 -07:00
|
|
|
if (@atomicLoad(u8, &self.available, AtomicOrder.SeqCst) == 2) {
|
2018-07-14 21:04:12 -07:00
|
|
|
return &self.data;
|
|
|
|
} else {
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-07-16 17:52:50 -07:00
|
|
|
/// If someone else has started working on the data, wait for them to complete
|
|
|
|
/// and return a pointer to the data. Otherwise, return null, and the caller
|
|
|
|
/// should start working on the data.
|
|
|
|
/// It's not required to call start() before resolve() but it can be useful since
|
|
|
|
/// this method is thread-safe.
|
|
|
|
pub async fn start(self: *Self) ?*T {
|
|
|
|
const state = @cmpxchgStrong(u8, &self.available, 0, 1, AtomicOrder.SeqCst, AtomicOrder.SeqCst) orelse return null;
|
|
|
|
switch (state) {
|
|
|
|
1 => {
|
|
|
|
const held = await (async self.lock.acquire() catch unreachable);
|
|
|
|
held.release();
|
|
|
|
return &self.data;
|
|
|
|
},
|
|
|
|
2 => return &self.data,
|
|
|
|
else => unreachable,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-07-11 12:58:48 -07:00
|
|
|
/// Make the data become available. May be called only once.
|
2018-07-11 16:38:01 -07:00
|
|
|
/// Before calling this, modify the `data` property.
|
|
|
|
pub fn resolve(self: *Self) void {
|
2018-07-16 17:52:50 -07:00
|
|
|
const prev = @atomicRmw(u8, &self.available, AtomicRmwOp.Xchg, 2, AtomicOrder.SeqCst);
|
|
|
|
assert(prev == 0 or prev == 1); // resolve() called twice
|
2018-07-11 12:58:48 -07:00
|
|
|
Lock.Held.release(Lock.Held{ .lock = &self.lock });
|
|
|
|
}
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
|
|
|
test "std.event.Future" {
|
|
|
|
var da = std.heap.DirectAllocator.init();
|
|
|
|
defer da.deinit();
|
|
|
|
|
|
|
|
const allocator = &da.allocator;
|
|
|
|
|
|
|
|
var loop: Loop = undefined;
|
2018-07-12 12:08:40 -07:00
|
|
|
try loop.initMultiThreaded(allocator);
|
2018-07-11 12:58:48 -07:00
|
|
|
defer loop.deinit();
|
|
|
|
|
|
|
|
const handle = try async<allocator> testFuture(&loop);
|
|
|
|
defer cancel handle;
|
|
|
|
|
|
|
|
loop.run();
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn testFuture(loop: *Loop) void {
|
2018-07-29 01:12:52 -07:00
|
|
|
suspend {
|
2018-08-02 01:52:40 -07:00
|
|
|
resume @handle();
|
2018-07-11 17:17:47 -07:00
|
|
|
}
|
2018-07-11 12:58:48 -07:00
|
|
|
var future = Future(i32).init(loop);
|
|
|
|
|
|
|
|
const a = async waitOnFuture(&future) catch @panic("memory");
|
|
|
|
const b = async waitOnFuture(&future) catch @panic("memory");
|
|
|
|
const c = async resolveFuture(&future) catch @panic("memory");
|
|
|
|
|
|
|
|
const result = (await a) + (await b);
|
|
|
|
cancel c;
|
|
|
|
assert(result == 12);
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn waitOnFuture(future: *Future(i32)) i32 {
|
2018-07-29 01:12:52 -07:00
|
|
|
suspend {
|
2018-08-02 01:52:40 -07:00
|
|
|
resume @handle();
|
2018-07-11 17:17:47 -07:00
|
|
|
}
|
2018-07-11 16:38:01 -07:00
|
|
|
return (await (async future.get() catch @panic("memory"))).*;
|
2018-07-11 12:58:48 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
async fn resolveFuture(future: *Future(i32)) void {
|
2018-07-29 01:12:52 -07:00
|
|
|
suspend {
|
2018-08-02 01:52:40 -07:00
|
|
|
resume @handle();
|
2018-07-11 17:17:47 -07:00
|
|
|
}
|
2018-07-11 16:38:01 -07:00
|
|
|
future.data = 6;
|
|
|
|
future.resolve();
|
2018-07-11 12:58:48 -07:00
|
|
|
}
|