WIP: Channel.getOrNull

master
Andrew Kelley 2018-08-02 17:04:17 -04:00
parent e3ae2cfb52
commit 821805aa92
17 changed files with 495 additions and 323 deletions

View File

@ -717,13 +717,13 @@ pub const Compilation = struct {
}
async fn buildAsync(self: *Compilation) void {
while (true) {
// TODO directly awaiting async should guarantee memory allocation elision
const build_result = await (async self.compileAndLink() catch unreachable);
var build_result = await (async self.initialCompile() catch unreachable);
while (true) {
const link_result = if (build_result) self.maybeLink() else |err| err;
// this makes a handy error return trace and stack trace in debug mode
if (std.debug.runtime_safety) {
build_result catch unreachable;
link_result catch unreachable;
}
const compile_errors = blk: {
@ -732,7 +732,7 @@ pub const Compilation = struct {
break :blk held.value.toOwnedSlice();
};
if (build_result) |_| {
if (link_result) |_| {
if (compile_errors.len == 0) {
await (async self.events.put(Event.Ok) catch unreachable);
} else {
@ -745,108 +745,158 @@ pub const Compilation = struct {
await (async self.events.put(Event{ .Error = err }) catch unreachable);
}
// for now we stop after 1
return;
var group = event.Group(BuildError!void).init(self.loop);
while (self.fs_watch.channel.getOrNull()) |root_scope| {
try group.call(rebuildFile, self, root_scope);
}
build_result = await (async group.wait() catch unreachable);
}
}
async fn compileAndLink(self: *Compilation) !void {
if (self.root_src_path) |root_src_path| {
// TODO async/await os.path.real
const root_src_real_path = os.path.real(self.gpa(), root_src_path) catch |err| {
try printError("unable to get real path '{}': {}", root_src_path, err);
async fn rebuildFile(self: *Compilation, root_scope: *Scope.Root) !void {
const tree_scope = blk: {
const source_code = (await (async fs.readFile(
self.loop,
root_src_real_path,
max_src_size,
) catch unreachable)) catch |err| {
try printError("unable to open '{}': {}", root_src_real_path, err);
return err;
};
const root_scope = blk: {
errdefer self.gpa().free(root_src_real_path);
errdefer self.gpa().free(source_code);
const source_code = (await (async fs.readFile(
self.loop,
root_src_real_path,
max_src_size,
) catch unreachable)) catch |err| {
try printError("unable to open '{}': {}", root_src_real_path, err);
return err;
};
errdefer self.gpa().free(source_code);
const tree = try self.gpa().createOne(ast.Tree);
tree.* = try std.zig.parse(self.gpa(), source_code);
errdefer {
tree.deinit();
self.gpa().destroy(tree);
}
break :blk try Scope.Root.create(self, tree, root_src_real_path);
};
defer root_scope.base.deref(self);
const tree = root_scope.tree;
var error_it = tree.errors.iterator(0);
while (error_it.next()) |parse_error| {
const msg = try Msg.createFromParseErrorAndScope(self, root_scope, parse_error);
errdefer msg.destroy();
try await (async self.addCompileErrorAsync(msg) catch unreachable);
}
if (tree.errors.len != 0) {
return;
const tree = try self.gpa().createOne(ast.Tree);
tree.* = try std.zig.parse(self.gpa(), source_code);
errdefer {
tree.deinit();
self.gpa().destroy(tree);
}
const decls = try Scope.Decls.create(self, &root_scope.base);
defer decls.base.deref(self);
break :blk try Scope.AstTree.create(self, tree, root_scope);
};
defer tree_scope.base.deref(self);
var decl_group = event.Group(BuildError!void).init(self.loop);
var decl_group_consumed = false;
errdefer if (!decl_group_consumed) decl_group.cancelAll();
var error_it = tree_scope.tree.errors.iterator(0);
while (error_it.next()) |parse_error| {
const msg = try Msg.createFromParseErrorAndScope(self, tree_scope, parse_error);
errdefer msg.destroy();
var it = tree.root_node.decls.iterator(0);
while (it.next()) |decl_ptr| {
const decl = decl_ptr.*;
switch (decl.id) {
ast.Node.Id.Comptime => {
const comptime_node = @fieldParentPtr(ast.Node.Comptime, "base", decl);
try await (async self.addCompileErrorAsync(msg) catch unreachable);
}
if (tree_scope.tree.errors.len != 0) {
return;
}
try self.prelink_group.call(addCompTimeBlock, self, &decls.base, comptime_node);
},
ast.Node.Id.VarDecl => @panic("TODO"),
ast.Node.Id.FnProto => {
const fn_proto = @fieldParentPtr(ast.Node.FnProto, "base", decl);
const locked_table = await (async root_scope.decls.table.acquireWrite() catch unreachable);
defer locked_table.release();
const name = if (fn_proto.name_token) |name_token| tree.tokenSlice(name_token) else {
try self.addCompileError(root_scope, Span{
.first = fn_proto.fn_token,
.last = fn_proto.fn_token + 1,
}, "missing function name");
continue;
};
var decl_group = event.Group(BuildError!void).init(self.loop);
defer decl_group.deinit();
try self.rebuildChangedDecls(
&decl_group,
locked_table,
root_scope.decls,
&tree_scope.tree.root_node.decls,
tree_scope,
);
try await (async decl_group.wait() catch unreachable);
}
async fn rebuildChangedDecls(
self: *Compilation,
group: *event.Group(BuildError!void),
locked_table: *Decl.Table,
decl_scope: *Scope.Decls,
ast_decls: &ast.Node.Root.DeclList,
tree_scope: *Scope.AstTree,
) !void {
var existing_decls = try locked_table.clone();
defer existing_decls.deinit();
var ast_it = ast_decls.iterator(0);
while (ast_it.next()) |decl_ptr| {
const decl = decl_ptr.*;
switch (decl.id) {
ast.Node.Id.Comptime => {
const comptime_node = @fieldParentPtr(ast.Node.Comptime, "base", decl);
// TODO connect existing comptime decls to updated source files
try self.prelink_group.call(addCompTimeBlock, self, &decl_scope.base, comptime_node);
},
ast.Node.Id.VarDecl => @panic("TODO"),
ast.Node.Id.FnProto => {
const fn_proto = @fieldParentPtr(ast.Node.FnProto, "base", decl);
const name = if (fn_proto.name_token) |name_token| tree_scope.tree.tokenSlice(name_token) else {
try self.addCompileError(root_scope, Span{
.first = fn_proto.fn_token,
.last = fn_proto.fn_token + 1,
}, "missing function name");
continue;
};
if (existing_decls.remove(name)) |entry| {
// compare new code to existing
const existing_decl = entry.value;
// Just compare the old bytes to the new bytes of the top level decl.
// Even if the AST is technically the same, we want error messages to display
// from the most recent source.
@panic("TODO handle decl comparison");
// Add the new thing before dereferencing the old thing. This way we don't end
// up pointlessly re-creating things we end up using in the new thing.
} else {
// add new decl
const fn_decl = try self.gpa().create(Decl.Fn{
.base = Decl{
.id = Decl.Id.Fn,
.name = name,
.visib = parseVisibToken(tree, fn_proto.visib_token),
.visib = parseVisibToken(tree_scope.tree, fn_proto.visib_token),
.resolution = event.Future(BuildError!void).init(self.loop),
.parent_scope = &decls.base,
.parent_scope = &decl_scope.base,
},
.value = Decl.Fn.Val{ .Unresolved = {} },
.fn_proto = fn_proto,
});
errdefer self.gpa().destroy(fn_decl);
try decl_group.call(addTopLevelDecl, self, decls, &fn_decl.base);
},
ast.Node.Id.TestDecl => @panic("TODO"),
else => unreachable,
}
try group.call(addTopLevelDecl, self, &fn_decl.base, locked_table);
}
},
ast.Node.Id.TestDecl => @panic("TODO"),
else => unreachable,
}
decl_group_consumed = true;
try await (async decl_group.wait() catch unreachable);
// Now other code can rely on the decls scope having a complete list of names.
decls.name_future.resolve();
}
var existing_decl_it = existing_decls.iterator();
while (existing_decl_it.next()) |entry| {
// this decl was deleted
const existing_decl = entry.value;
@panic("TODO handle decl deletion");
}
}
async fn initialCompile(self: *Compilation) !void {
if (self.root_src_path) |root_src_path| {
const root_scope = blk: {
// TODO async/await os.path.real
const root_src_real_path = os.path.real(self.gpa(), root_src_path) catch |err| {
try printError("unable to get real path '{}': {}", root_src_path, err);
return err;
};
errdefer self.gpa().free(root_src_real_path);
break :blk try Scope.Root.create(self, root_src_real_path);
};
defer root_scope.base.deref(self);
try self.rebuildFile(root_scope);
}
}
async fn maybeLink(self: *Compilation) !void {
(await (async self.prelink_group.wait() catch unreachable)) catch |err| switch (err) {
error.SemanticAnalysisFailed => {},
else => return err,
@ -920,28 +970,20 @@ pub const Compilation = struct {
analyzed_code.destroy(comp.gpa());
}
async fn addTopLevelDecl(self: *Compilation, decls: *Scope.Decls, decl: *Decl) !void {
async fn addTopLevelDecl(
self: *Compilation,
decl: *Decl,
locked_table: *Decl.Table,
) !void {
const tree = decl.findRootScope().tree;
const is_export = decl.isExported(tree);
var add_to_table_resolved = false;
const add_to_table = async self.addDeclToTable(decls, decl) catch unreachable;
errdefer if (!add_to_table_resolved) cancel add_to_table; // TODO https://github.com/ziglang/zig/issues/1261
if (is_export) {
try self.prelink_group.call(verifyUniqueSymbol, self, decl);
try self.prelink_group.call(resolveDecl, self, decl);
}
add_to_table_resolved = true;
try await add_to_table;
}
async fn addDeclToTable(self: *Compilation, decls: *Scope.Decls, decl: *Decl) !void {
const held = await (async decls.table.acquire() catch unreachable);
defer held.release();
if (try held.value.put(decl.name, decl)) |other_decl| {
if (try locked_table.put(decl.name, decl)) |other_decl| {
try self.addCompileError(decls.base.findRoot(), decl.getSpan(), "redefinition of '{}'", decl.name);
// TODO note: other definition here
}

View File

@ -16,6 +16,7 @@ pub const Decl = struct {
visib: Visib,
resolution: event.Future(Compilation.BuildError!void),
parent_scope: *Scope,
tree_scope: *Scope.AstTree,
pub const Table = std.HashMap([]const u8, *Decl, mem.hash_slice_u8, mem.eql_slice_u8);

View File

@ -49,7 +49,7 @@ pub const Msg = struct {
};
const ScopeAndComp = struct {
root_scope: *Scope.Root,
tree_scope: *Scope.AstTree,
compilation: *Compilation,
};
@ -60,7 +60,7 @@ pub const Msg = struct {
path_and_tree.allocator.destroy(self);
},
Data.ScopeAndComp => |scope_and_comp| {
scope_and_comp.root_scope.base.deref(scope_and_comp.compilation);
scope_and_comp.tree_scope.base.deref(scope_and_comp.compilation);
scope_and_comp.compilation.gpa().free(self.text);
scope_and_comp.compilation.gpa().destroy(self);
},
@ -84,7 +84,7 @@ pub const Msg = struct {
return path_and_tree.realpath;
},
Data.ScopeAndComp => |scope_and_comp| {
return scope_and_comp.root_scope.realpath;
return scope_and_comp.tree_scope.root().realpath;
},
}
}
@ -95,31 +95,31 @@ pub const Msg = struct {
return path_and_tree.tree;
},
Data.ScopeAndComp => |scope_and_comp| {
return scope_and_comp.root_scope.tree;
return scope_and_comp.tree_scope.tree;
},
}
}
/// Takes ownership of text
/// References root_scope, and derefs when the msg is freed
pub fn createFromScope(comp: *Compilation, root_scope: *Scope.Root, span: Span, text: []u8) !*Msg {
/// References tree_scope, and derefs when the msg is freed
pub fn createFromScope(comp: *Compilation, tree_scope: *Scope.AstTree, span: Span, text: []u8) !*Msg {
const msg = try comp.gpa().create(Msg{
.text = text,
.span = span,
.data = Data{
.ScopeAndComp = ScopeAndComp{
.root_scope = root_scope,
.tree_scope = tree_scope,
.compilation = comp,
},
},
});
root_scope.base.ref();
tree_scope.base.ref();
return msg;
}
pub fn createFromParseErrorAndScope(
comp: *Compilation,
root_scope: *Scope.Root,
tree_scope: *Scope.AstTree,
parse_error: *const ast.Error,
) !*Msg {
const loc_token = parse_error.loc();
@ -127,7 +127,7 @@ pub const Msg = struct {
defer text_buf.deinit();
var out_stream = &std.io.BufferOutStream.init(&text_buf).stream;
try parse_error.render(&root_scope.tree.tokens, out_stream);
try parse_error.render(&tree_scope.tree.tokens, out_stream);
const msg = try comp.gpa().create(Msg{
.text = undefined,
@ -137,12 +137,12 @@ pub const Msg = struct {
},
.data = Data{
.ScopeAndComp = ScopeAndComp{
.root_scope = root_scope,
.tree_scope = tree_scope,
.compilation = comp,
},
},
});
root_scope.base.ref();
tree_scope.base.ref();
msg.text = text_buf.toOwnedSlice();
return msg;
}

View File

@ -1929,8 +1929,9 @@ pub const Builder = struct {
Scope.Id.Root => return Ident.NotFound,
Scope.Id.Decls => {
const decls = @fieldParentPtr(Scope.Decls, "base", s);
const table = await (async decls.getTableReadOnly() catch unreachable);
if (table.get(name)) |entry| {
const locked_table = await (async decls.table.acquireRead() catch unreachable);
defer locked_table.release();
if (locked_table.value.get(name)) |entry| {
return Ident{ .Decl = entry.value };
}
},

View File

@ -143,7 +143,7 @@ pub const LibCInstallation = struct {
pub async fn findNative(self: *LibCInstallation, loop: *event.Loop) !void {
self.initEmpty();
var group = event.Group(FindError!void).init(loop);
errdefer group.cancelAll();
errdefer group.deinit();
var windows_sdk: ?*c.ZigWindowsSDK = null;
errdefer if (windows_sdk) |sdk| c.zig_free_windows_sdk(@ptrCast(?[*]c.ZigWindowsSDK, sdk));
@ -313,7 +313,7 @@ pub const LibCInstallation = struct {
},
};
var group = event.Group(FindError!void).init(loop);
errdefer group.cancelAll();
errdefer group.deinit();
for (dyn_tests) |*dyn_test| {
try group.call(testNativeDynamicLinker, self, loop, dyn_test);
}
@ -341,7 +341,6 @@ pub const LibCInstallation = struct {
}
}
async fn findNativeKernel32LibDir(self: *LibCInstallation, loop: *event.Loop, sdk: *c.ZigWindowsSDK) FindError!void {
var search_buf: [2]Search = undefined;
const searches = fillSearch(&search_buf, sdk);
@ -450,7 +449,6 @@ fn fillSearch(search_buf: *[2]Search, sdk: *c.ZigWindowsSDK) []Search {
return search_buf[0..search_end];
}
fn fileExists(allocator: *std.mem.Allocator, path: []const u8) !bool {
if (std.os.File.access(allocator, path)) |_| {
return true;

View File

@ -71,26 +71,26 @@ pub fn main() !void {
}
const commands = []Command{
Command{
.name = "build-exe",
.exec = cmdBuildExe,
},
Command{
.name = "build-lib",
.exec = cmdBuildLib,
},
Command{
.name = "build-obj",
.exec = cmdBuildObj,
},
//Command{
// .name = "build-exe",
// .exec = cmdBuildExe,
//},
//Command{
// .name = "build-lib",
// .exec = cmdBuildLib,
//},
//Command{
// .name = "build-obj",
// .exec = cmdBuildObj,
//},
Command{
.name = "fmt",
.exec = cmdFmt,
},
Command{
.name = "libc",
.exec = cmdLibC,
},
//Command{
// .name = "libc",
// .exec = cmdLibC,
//},
Command{
.name = "targets",
.exec = cmdTargets,
@ -472,23 +472,22 @@ fn buildOutputType(allocator: *Allocator, args: []const []const u8, out_type: Co
}
async fn processBuildEvents(comp: *Compilation, color: errmsg.Color) void {
// TODO directly awaiting async should guarantee memory allocation elision
const build_event = await (async comp.events.get() catch unreachable);
while (true) {
// TODO directly awaiting async should guarantee memory allocation elision
const build_event = await (async comp.events.get() catch unreachable);
switch (build_event) {
Compilation.Event.Ok => {
return;
},
Compilation.Event.Error => |err| {
std.debug.warn("build failed: {}\n", @errorName(err));
os.exit(1);
},
Compilation.Event.Fail => |msgs| {
for (msgs) |msg| {
defer msg.destroy();
msg.printToFile(&stderr_file, color) catch os.exit(1);
}
},
switch (build_event) {
Compilation.Event.Ok => {},
Compilation.Event.Error => |err| {
stderr.print("build failed: {}\n", @errorName(err)) catch os.exit(1);
},
Compilation.Event.Fail => |msgs| {
for (msgs) |msg| {
defer msg.destroy();
msg.printToFile(&stderr_file, color) catch os.exit(1);
}
},
}
}
}

View File

@ -36,6 +36,7 @@ pub const Scope = struct {
Id.Defer => @fieldParentPtr(Defer, "base", base).destroy(comp),
Id.DeferExpr => @fieldParentPtr(DeferExpr, "base", base).destroy(comp),
Id.Var => @fieldParentPtr(Var, "base", base).destroy(comp),
Id.AstTree => @fieldParentPtr(AstTree, "base", base).destroy(comp),
}
}
}
@ -97,6 +98,7 @@ pub const Scope = struct {
pub const Id = enum {
Root,
AstTree,
Decls,
Block,
FnDef,
@ -108,13 +110,12 @@ pub const Scope = struct {
pub const Root = struct {
base: Scope,
tree: *ast.Tree,
realpath: []const u8,
decls: *Decls,
/// Creates a Root scope with 1 reference
/// Takes ownership of realpath
/// Takes ownership of tree, will deinit and destroy when done.
pub fn create(comp: *Compilation, tree: *ast.Tree, realpath: []u8) !*Root {
pub fn create(comp: *Compilation, realpath: []u8) !*Root {
const self = try comp.gpa().createOne(Root);
self.* = Root{
.base = Scope{
@ -122,40 +123,64 @@ pub const Scope = struct {
.parent = null,
.ref_count = std.atomic.Int(usize).init(1),
},
.tree = tree,
.realpath = realpath,
.decls = undefined,
};
errdefer comp.gpa().destroy(self);
self.decls = try Decls.create(comp, &self.base);
return self;
}
pub fn destroy(self: *Root, comp: *Compilation) void {
self.decls.base.deref(comp);
comp.gpa().free(self.realpath);
comp.gpa().destroy(self);
}
};
pub const AstTree = struct {
base: Scope,
tree: *ast.Tree,
/// Creates a scope with 1 reference
/// Takes ownership of tree, will deinit and destroy when done.
pub fn create(comp: *Compilation, tree: *ast.Tree, root: *Root) !*AstTree {
const self = try comp.gpa().createOne(Root);
self.* = AstTree{
.base = undefined,
.tree = tree,
};
self.base.init(Id.AstTree, &root.base);
return self;
}
pub fn destroy(self: *AstTree, comp: *Compilation) void {
comp.gpa().free(self.tree.source);
self.tree.deinit();
comp.gpa().destroy(self.tree);
comp.gpa().free(self.realpath);
comp.gpa().destroy(self);
}
pub fn root(self: *AstTree) *Root {
return self.base.findRoot();
}
};
pub const Decls = struct {
base: Scope,
/// The lock must be respected for writing. However once name_future resolves,
/// readers can freely access it.
table: event.Locked(Decl.Table),
/// Once this future is resolved, the table is complete and available for unlocked
/// read-only access. It does not mean all the decls are resolved; it means only that
/// the table has all the names. Each decl in the table has its own resolution state.
name_future: event.Future(void),
/// This table remains Write Locked when the names are incomplete or possibly outdated.
/// So if a reader manages to grab a lock, it can be sure that the set of names is complete
/// and correct.
table: event.RwLocked(Decl.Table),
/// Creates a Decls scope with 1 reference
pub fn create(comp: *Compilation, parent: *Scope) !*Decls {
const self = try comp.gpa().createOne(Decls);
self.* = Decls{
.base = undefined,
.table = event.Locked(Decl.Table).init(comp.loop, Decl.Table.init(comp.gpa())),
.table = event.RwLocked(Decl.Table).init(comp.loop, Decl.Table.init(comp.gpa())),
.name_future = event.Future(void).init(comp.loop),
};
self.base.init(Id.Decls, parent);
@ -166,11 +191,6 @@ pub const Scope = struct {
self.table.deinit();
comp.gpa().destroy(self);
}
pub async fn getTableReadOnly(self: *Decls) *Decl.Table {
_ = await (async self.name_future.get() catch unreachable);
return &self.table.private_data;
}
};
pub const Block = struct {

View File

@ -50,7 +50,7 @@ pub const TestContext = struct {
errdefer self.event_loop_local.deinit();
self.group = std.event.Group(error!void).init(&self.loop);
errdefer self.group.cancelAll();
errdefer self.group.deinit();
self.zig_lib_dir = try introspect.resolveZigLibDir(allocator);
errdefer allocator.free(self.zig_lib_dir);

View File

@ -1,40 +1,38 @@
const std = @import("../index.zig");
const builtin = @import("builtin");
const AtomicOrder = builtin.AtomicOrder;
const AtomicRmwOp = builtin.AtomicRmwOp;
const assert = std.debug.assert;
/// Many producer, many consumer, non-allocating, thread-safe.
/// Uses a spinlock to protect get() and put().
/// Uses a mutex to protect access.
pub fn Queue(comptime T: type) type {
return struct {
head: ?*Node,
tail: ?*Node,
lock: u8,
mutex: std.Mutex,
pub const Self = this;
pub const Node = struct {
next: ?*Node,
data: T,
};
pub const Node = std.LinkedList(T).Node;
pub fn init() Self {
return Self{
.head = null,
.tail = null,
.lock = 0,
.mutex = std.Mutex.init(),
};
}
pub fn put(self: *Self, node: *Node) void {
node.next = null;
while (@atomicRmw(u8, &self.lock, builtin.AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst) != 0) {}
defer assert(@atomicRmw(u8, &self.lock, builtin.AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst) == 1);
const held = self.mutex.acquire();
defer held.release();
const opt_tail = self.tail;
node.prev = self.tail;
self.tail = node;
if (opt_tail) |tail| {
tail.next = node;
if (node.prev) |prev_tail| {
prev_tail.next = node;
} else {
assert(self.head == null);
self.head = node;
@ -42,18 +40,27 @@ pub fn Queue(comptime T: type) type {
}
pub fn get(self: *Self) ?*Node {
while (@atomicRmw(u8, &self.lock, builtin.AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst) != 0) {}
defer assert(@atomicRmw(u8, &self.lock, builtin.AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst) == 1);
const held = self.mutex.acquire();
defer held.release();
const head = self.head orelse return null;
self.head = head.next;
if (head.next == null) self.tail = null;
if (head.next) |new_head| {
new_head.prev = null;
} else {
self.tail = null;
}
// This way, a get() and a remove() are thread-safe with each other.
head.prev = null;
head.next = null;
return head;
}
pub fn unget(self: *Self, node: *Node) void {
while (@atomicRmw(u8, &self.lock, builtin.AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst) != 0) {}
defer assert(@atomicRmw(u8, &self.lock, builtin.AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst) == 1);
node.prev = null;
const held = self.mutex.acquire();
defer held.release();
const opt_head = self.head;
self.head = node;
@ -65,13 +72,39 @@ pub fn Queue(comptime T: type) type {
}
}
/// Thread-safe with get() and remove(). Returns whether node was actually removed.
pub fn remove(self: *Self, node: *Node) bool {
const held = self.mutex.acquire();
defer held.release();
if (node.prev == null and node.next == null and self.head != node) {
return false;
}
if (node.prev) |prev| {
prev.next = node.next;
} else {
self.head = node.next;
}
if (node.next) |next| {
next.prev = node.prev;
} else {
self.tail = node.prev;
}
node.prev = null;
node.next = null;
return true;
}
pub fn isEmpty(self: *Self) bool {
return @atomicLoad(?*Node, &self.head, builtin.AtomicOrder.SeqCst) != null;
const held = self.mutex.acquire();
defer held.release();
return self.head != null;
}
pub fn dump(self: *Self) void {
while (@atomicRmw(u8, &self.lock, builtin.AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst) != 0) {}
defer assert(@atomicRmw(u8, &self.lock, builtin.AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst) == 1);
const held = self.mutex.acquire();
defer held.release();
std.debug.warn("head: ");
dumpRecursive(self.head, 0);
@ -93,9 +126,6 @@ pub fn Queue(comptime T: type) type {
};
}
const std = @import("../index.zig");
const assert = std.debug.assert;
const Context = struct {
allocator: *std.mem.Allocator,
queue: *Queue(i32),
@ -169,6 +199,7 @@ fn startPuts(ctx: *Context) u8 {
std.os.time.sleep(0, 1); // let the os scheduler be our fuzz
const x = @bitCast(i32, r.random.scalar(u32));
const node = ctx.allocator.create(Queue(i32).Node{
.prev = undefined,
.next = undefined,
.data = x,
}) catch unreachable;
@ -198,12 +229,14 @@ test "std.atomic.Queue single-threaded" {
var node_0 = Queue(i32).Node{
.data = 0,
.next = undefined,
.prev = undefined,
};
queue.put(&node_0);
var node_1 = Queue(i32).Node{
.data = 1,
.next = undefined,
.prev = undefined,
};
queue.put(&node_1);
@ -212,12 +245,14 @@ test "std.atomic.Queue single-threaded" {
var node_2 = Queue(i32).Node{
.data = 2,
.next = undefined,
.prev = undefined,
};
queue.put(&node_2);
var node_3 = Queue(i32).Node{
.data = 3,
.next = undefined,
.prev = undefined,
};
queue.put(&node_3);
@ -228,6 +263,7 @@ test "std.atomic.Queue single-threaded" {
var node_4 = Queue(i32).Node{
.data = 4,
.next = undefined,
.prev = undefined,
};
queue.put(&node_4);

View File

@ -3,7 +3,7 @@ pub const Future = @import("event/future.zig").Future;
pub const Group = @import("event/group.zig").Group;
pub const Lock = @import("event/lock.zig").Lock;
pub const Locked = @import("event/locked.zig").Locked;
pub const RwLock = @import("event/rwlock.zig").Lock;
pub const RwLock = @import("event/rwlock.zig").RwLock;
pub const RwLocked = @import("event/rwlocked.zig").RwLocked;
pub const Loop = @import("event/loop.zig").Loop;
pub const fs = @import("event/fs.zig");

View File

@ -5,7 +5,7 @@ const AtomicRmwOp = builtin.AtomicRmwOp;
const AtomicOrder = builtin.AtomicOrder;
const Loop = std.event.Loop;
/// many producer, many consumer, thread-safe, lock-free, runtime configurable buffer size
/// many producer, many consumer, thread-safe, runtime configurable buffer size
/// when buffer is empty, consumers suspend and are resumed by producers
/// when buffer is full, producers suspend and are resumed by consumers
pub fn Channel(comptime T: type) type {
@ -13,6 +13,7 @@ pub fn Channel(comptime T: type) type {
loop: *Loop,
getters: std.atomic.Queue(GetNode),
or_null_queue: std.atomic.Queue(*std.atomic.Queue(GetNode).Node),
putters: std.atomic.Queue(PutNode),
get_count: usize,
put_count: usize,
@ -26,8 +27,22 @@ pub fn Channel(comptime T: type) type {
const SelfChannel = this;
const GetNode = struct {
ptr: *T,
tick_node: *Loop.NextTickNode,
data: Data,
const Data = union(enum) {
Normal: Normal,
OrNull: OrNull,
};
const Normal = struct {
ptr: *T,
};
const OrNull = struct {
ptr: *?T,
or_null: *std.atomic.Queue(*std.atomic.Queue(GetNode).Node).Node,
};
};
const PutNode = struct {
data: T,
@ -48,6 +63,7 @@ pub fn Channel(comptime T: type) type {
.need_dispatch = 0,
.getters = std.atomic.Queue(GetNode).init(),
.putters = std.atomic.Queue(PutNode).init(),
.or_null_queue = std.atomic.Queue(*std.atomic.Queue(GetNode).Node).init(),
.get_count = 0,
.put_count = 0,
});
@ -71,18 +87,31 @@ pub fn Channel(comptime T: type) type {
/// puts a data item in the channel. The promise completes when the value has been added to the
/// buffer, or in the case of a zero size buffer, when the item has been retrieved by a getter.
pub async fn put(self: *SelfChannel, data: T) void {
// TODO fix this workaround
var my_handle: promise = undefined;
suspend |p| {
my_handle = p;
resume p;
}
var my_tick_node = Loop.NextTickNode.init(my_handle);
var queue_node = std.atomic.Queue(PutNode).Node.init(PutNode{
.tick_node = &my_tick_node,
.data = data,
});
// TODO test canceling a put()
errdefer {
_ = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
const need_dispatch = !self.putters.remove(&queue_node);
self.loop.cancelOnNextTick(&my_tick_node);
if (need_dispatch) {
// oops we made the put_count incorrect for a period of time. fix by dispatching.
_ = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
self.dispatch();
}
}
suspend |handle| {
var my_tick_node = Loop.NextTickNode{
.next = undefined,
.data = handle,
};
var queue_node = std.atomic.Queue(PutNode).Node{
.data = PutNode{
.tick_node = &my_tick_node,
.data = data,
},
.next = undefined,
};
self.putters.put(&queue_node);
_ = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
@ -93,21 +122,37 @@ pub fn Channel(comptime T: type) type {
/// await this function to get an item from the channel. If the buffer is empty, the promise will
/// complete when the next item is put in the channel.
pub async fn get(self: *SelfChannel) T {
// TODO fix this workaround
var my_handle: promise = undefined;
suspend |p| {
my_handle = p;
resume p;
}
// TODO integrate this function with named return values
// so we can get rid of this extra result copy
var result: T = undefined;
suspend |handle| {
var my_tick_node = Loop.NextTickNode{
.next = undefined,
.data = handle,
};
var queue_node = std.atomic.Queue(GetNode).Node{
.data = GetNode{
.ptr = &result,
.tick_node = &my_tick_node,
},
.next = undefined,
};
var my_tick_node = Loop.NextTickNode.init(my_handle);
var queue_node = std.atomic.Queue(GetNode).Node.init(GetNode{
.tick_node = &my_tick_node,
.data = GetNode.Data{
.Normal = GetNode.Normal{ .ptr = &result },
},
});
// TODO test canceling a get()
errdefer {
_ = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
const need_dispatch = !self.getters.remove(&queue_node);
self.loop.cancelOnNextTick(&my_tick_node);
if (need_dispatch) {
// oops we made the get_count incorrect for a period of time. fix by dispatching.
_ = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
self.dispatch();
}
}
suspend |_| {
self.getters.put(&queue_node);
_ = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
@ -116,8 +161,64 @@ pub fn Channel(comptime T: type) type {
return result;
}
fn getOrNull(self: *SelfChannel) ?T {
TODO();
//pub async fn select(comptime EnumUnion: type, channels: ...) EnumUnion {
// assert(@memberCount(EnumUnion) == channels.len); // enum union and channels mismatch
// assert(channels.len != 0); // enum unions cannot have 0 fields
// if (channels.len == 1) {
// const result = await (async channels[0].get() catch unreachable);
// return @unionInit(EnumUnion, @memberName(EnumUnion, 0), result);
// }
//}
/// Await this function to get an item from the channel. If the buffer is empty and there are no
/// puts waiting, this returns null.
/// Await is necessary for locking purposes. The function will be resumed after checking the channel
/// for data and will not wait for data to be available.
pub async fn getOrNull(self: *SelfChannel) ?T {
// TODO fix this workaround
var my_handle: promise = undefined;
suspend |p| {
my_handle = p;
resume p;
}
// TODO integrate this function with named return values
// so we can get rid of this extra result copy
var result: ?T = null;
var my_tick_node = Loop.NextTickNode.init(my_handle);
var or_null_node = std.atomic.Queue(*std.atomic.Queue(GetNode).Node).Node.init(undefined);
var queue_node = std.atomic.Queue(GetNode).Node.init(GetNode{
.tick_node = &my_tick_node,
.data = GetNode.Data{
.OrNull = GetNode.OrNull{
.ptr = &result,
.or_null = &or_null_node,
},
},
});
or_null_node.data = &queue_node;
// TODO test canceling getOrNull
errdefer {
_ = self.or_null_queue.remove(&or_null_node);
_ = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
const need_dispatch = !self.getters.remove(&queue_node);
self.loop.cancelOnNextTick(&my_tick_node);
if (need_dispatch) {
// oops we made the get_count incorrect for a period of time. fix by dispatching.
_ = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
self.dispatch();
}
}
suspend |_| {
self.getters.put(&queue_node);
_ = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
self.or_null_queue.put(&or_null_node);
self.dispatch();
}
return result;
}
fn dispatch(self: *SelfChannel) void {
@ -143,7 +244,15 @@ pub fn Channel(comptime T: type) type {
if (get_count == 0) break :one_dispatch;
const get_node = &self.getters.get().?.data;
get_node.ptr.* = self.buffer_nodes[self.buffer_index -% self.buffer_len];
switch (get_node.data) {
GetNode.Data.Normal => |info| {
info.ptr.* = self.buffer_nodes[self.buffer_index -% self.buffer_len];
},
GetNode.Data.OrNull => |info| {
_ = self.or_null_queue.remove(info.or_null);
info.ptr.* = self.buffer_nodes[self.buffer_index -% self.buffer_len];
},
}
self.loop.onNextTick(get_node.tick_node);
self.buffer_len -= 1;
@ -155,7 +264,15 @@ pub fn Channel(comptime T: type) type {
const get_node = &self.getters.get().?.data;
const put_node = &self.putters.get().?.data;
get_node.ptr.* = put_node.data;
switch (get_node.data) {
GetNode.Data.Normal => |info| {
info.ptr.* = put_node.data;
},
GetNode.Data.OrNull => |info| {
_ = self.or_null_queue.remove(info.or_null);
info.ptr.* = put_node.data;
},
}
self.loop.onNextTick(get_node.tick_node);
self.loop.onNextTick(put_node.tick_node);
@ -180,6 +297,16 @@ pub fn Channel(comptime T: type) type {
_ = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
_ = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
// All the "get or null" functions should resume now.
var remove_count: usize = 0;
while (self.or_null_queue.get()) |or_null_node| {
remove_count += @boolToInt(self.getters.remove(or_null_node.data));
self.loop.onNextTick(or_null_node.data.data.tick_node);
}
if (remove_count != 0) {
_ = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, remove_count, AtomicOrder.SeqCst);
}
// clear need-dispatch flag
const need_dispatch = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
if (need_dispatch != 0) continue;
@ -230,6 +357,15 @@ async fn testChannelGetter(loop: *Loop, channel: *Channel(i32)) void {
const value2_promise = try async channel.get();
const value2 = await value2_promise;
assert(value2 == 4567);
const value3_promise = try async channel.getOrNull();
const value3 = await value3_promise;
assert(value3 == null);
const last_put = try async testPut(channel, 4444);
const value4 = await try async channel.getOrNull();
assert(value4.? == 4444);
await last_put;
}
async fn testChannelPutter(channel: *Channel(i32)) void {
@ -237,3 +373,6 @@ async fn testChannelPutter(channel: *Channel(i32)) void {
await (async channel.put(4567) catch @panic("out of memory"));
}
async fn testPut(channel: *Channel(i32), value: i32) void {
await (async channel.put(value) catch @panic("out of memory"));
}

View File

@ -99,6 +99,7 @@ pub async fn pwritev(loop: *event.Loop, fd: os.FileHandle, offset: usize, data:
}
var req_node = RequestNode{
.prev = undefined,
.next = undefined,
.data = Request{
.msg = Request.Msg{
@ -111,6 +112,7 @@ pub async fn pwritev(loop: *event.Loop, fd: os.FileHandle, offset: usize, data:
},
.finish = Request.Finish{
.TickNode = event.Loop.NextTickNode{
.prev = undefined,
.next = undefined,
.data = my_handle,
},
@ -148,6 +150,7 @@ pub async fn preadv(loop: *event.Loop, fd: os.FileHandle, offset: usize, data: [
}
var req_node = RequestNode{
.prev = undefined,
.next = undefined,
.data = Request{
.msg = Request.Msg{
@ -160,6 +163,7 @@ pub async fn preadv(loop: *event.Loop, fd: os.FileHandle, offset: usize, data: [
},
.finish = Request.Finish{
.TickNode = event.Loop.NextTickNode{
.prev = undefined,
.next = undefined,
.data = my_handle,
},
@ -186,6 +190,7 @@ pub async fn openRead(loop: *event.Loop, path: []const u8) os.File.OpenError!os.
defer loop.allocator.free(path_with_null);
var req_node = RequestNode{
.prev = undefined,
.next = undefined,
.data = Request{
.msg = Request.Msg{
@ -196,6 +201,7 @@ pub async fn openRead(loop: *event.Loop, path: []const u8) os.File.OpenError!os.
},
.finish = Request.Finish{
.TickNode = event.Loop.NextTickNode{
.prev = undefined,
.next = undefined,
.data = my_handle,
},
@ -227,6 +233,7 @@ pub async fn openReadWrite(
defer loop.allocator.free(path_with_null);
var req_node = RequestNode{
.prev = undefined,
.next = undefined,
.data = Request{
.msg = Request.Msg{
@ -238,6 +245,7 @@ pub async fn openReadWrite(
},
.finish = Request.Finish{
.TickNode = event.Loop.NextTickNode{
.prev = undefined,
.next = undefined,
.data = my_handle,
},
@ -267,6 +275,7 @@ pub const CloseOperation = struct {
.loop = loop,
.have_fd = false,
.close_req_node = RequestNode{
.prev = undefined,
.next = undefined,
.data = Request{
.msg = Request.Msg{
@ -312,6 +321,7 @@ pub async fn writeFileMode(loop: *event.Loop, path: []const u8, contents: []cons
defer loop.allocator.free(path_with_null);
var req_node = RequestNode{
.prev = undefined,
.next = undefined,
.data = Request{
.msg = Request.Msg{
@ -324,6 +334,7 @@ pub async fn writeFileMode(loop: *event.Loop, path: []const u8, contents: []cons
},
.finish = Request.Finish{
.TickNode = event.Loop.NextTickNode{
.prev = undefined,
.next = undefined,
.data = my_handle,
},

View File

@ -91,13 +91,16 @@ pub const Lock = struct {
}
pub async fn acquire(self: *Lock) Held {
suspend |handle| {
// TODO explicitly put this memory in the coroutine frame #1194
var my_tick_node = Loop.NextTickNode{
.data = handle,
.next = undefined,
};
// TODO explicitly put this memory in the coroutine frame #1194
var my_handle: promise = undefined;
suspend |p| {
my_handle = p;
resume p;
}
var my_tick_node = Loop.NextTickNode.init(my_handle);
errdefer _ = self.queue.remove(&my_tick_node); // TODO test canceling an acquire
suspend |_| {
self.queue.put(&my_tick_node);
// At this point, we are in the queue, so we might have already been resumed and this coroutine
@ -170,6 +173,7 @@ async fn testLock(loop: *Loop, lock: *Lock) void {
}
const handle1 = async lockRunner(lock) catch @panic("out of memory");
var tick_node1 = Loop.NextTickNode{
.prev = undefined,
.next = undefined,
.data = handle1,
};
@ -177,6 +181,7 @@ async fn testLock(loop: *Loop, lock: *Lock) void {
const handle2 = async lockRunner(lock) catch @panic("out of memory");
var tick_node2 = Loop.NextTickNode{
.prev = undefined,
.next = undefined,
.data = handle2,
};
@ -184,6 +189,7 @@ async fn testLock(loop: *Loop, lock: *Lock) void {
const handle3 = async lockRunner(lock) catch @panic("out of memory");
var tick_node3 = Loop.NextTickNode{
.prev = undefined,
.next = undefined,
.data = handle3,
};

View File

@ -120,6 +120,7 @@ pub const Loop = struct {
// we need another thread for the file system because Linux does not have an async
// file system I/O API.
self.os_data.fs_end_request = fs.RequestNode{
.prev = undefined,
.next = undefined,
.data = fs.Request{
.msg = fs.Request.Msg.End,
@ -206,6 +207,7 @@ pub const Loop = struct {
.udata = @ptrToInt(&eventfd_node.data.base),
},
},
.prev = undefined,
.next = undefined,
};
self.available_eventfd_resume_nodes.push(eventfd_node);
@ -270,6 +272,7 @@ pub const Loop = struct {
// this one is for sending events
.completion_key = @ptrToInt(&eventfd_node.data.base),
},
.prev = undefined,
.next = undefined,
};
self.available_eventfd_resume_nodes.push(eventfd_node);
@ -422,6 +425,12 @@ pub const Loop = struct {
self.dispatch();
}
pub fn cancelOnNextTick(self: *Loop, node: *NextTickNode) void {
if (self.next_tick_queue.remove(node)) {
self.finishOneEvent();
}
}
pub fn run(self: *Loop) void {
self.finishOneEvent(); // the reference we start with
@ -443,6 +452,7 @@ pub const Loop = struct {
suspend |p| {
handle.* = p;
var my_tick_node = Loop.NextTickNode{
.prev = undefined,
.next = undefined,
.data = p,
};
@ -464,6 +474,7 @@ pub const Loop = struct {
pub async fn yield(self: *Loop) void {
suspend |p| {
var my_tick_node = Loop.NextTickNode{
.prev = undefined,
.next = undefined,
.data = p,
};

View File

@ -101,6 +101,7 @@ pub const RwLock = struct {
// TODO explicitly put this memory in the coroutine frame #1194
var my_tick_node = Loop.NextTickNode{
.data = handle,
.prev = undefined,
.next = undefined,
};
@ -133,6 +134,7 @@ pub const RwLock = struct {
// TODO explicitly put this memory in the coroutine frame #1194
var my_tick_node = Loop.NextTickNode{
.data = handle,
.prev = undefined,
.next = undefined,
};

View File

@ -6,7 +6,6 @@ pub const Buffer = @import("buffer.zig").Buffer;
pub const BufferOutStream = @import("buffer.zig").BufferOutStream;
pub const HashMap = @import("hash_map.zig").HashMap;
pub const LinkedList = @import("linked_list.zig").LinkedList;
pub const IntrusiveLinkedList = @import("linked_list.zig").IntrusiveLinkedList;
pub const SegmentedList = @import("segmented_list.zig").SegmentedList;
pub const DynLib = @import("dynamic_library.zig").DynLib;
pub const Mutex = @import("mutex.zig").Mutex;

View File

@ -4,18 +4,8 @@ const assert = debug.assert;
const mem = std.mem;
const Allocator = mem.Allocator;
/// Generic non-intrusive doubly linked list.
pub fn LinkedList(comptime T: type) type {
return BaseLinkedList(T, void, "");
}
/// Generic intrusive doubly linked list.
pub fn IntrusiveLinkedList(comptime ParentType: type, comptime field_name: []const u8) type {
return BaseLinkedList(void, ParentType, field_name);
}
/// Generic doubly linked list.
fn BaseLinkedList(comptime T: type, comptime ParentType: type, comptime field_name: []const u8) type {
pub fn LinkedList(comptime T: type) type {
return struct {
const Self = this;
@ -25,23 +15,13 @@ fn BaseLinkedList(comptime T: type, comptime ParentType: type, comptime field_na
next: ?*Node,
data: T,
pub fn init(value: *const T) Node {
pub fn init(data: T) Node {
return Node{
.prev = null,
.next = null,
.data = value.*,
.data = data,
};
}
pub fn initIntrusive() Node {
// TODO: when #678 is solved this can become `init`.
return Node.init({});
}
pub fn toData(node: *Node) *ParentType {
comptime assert(isIntrusive());
return @fieldParentPtr(ParentType, field_name, node);
}
};
first: ?*Node,
@ -60,10 +40,6 @@ fn BaseLinkedList(comptime T: type, comptime ParentType: type, comptime field_na
};
}
fn isIntrusive() bool {
return ParentType != void or field_name.len != 0;
}
/// Insert a new node after an existing one.
///
/// Arguments:
@ -192,7 +168,6 @@ fn BaseLinkedList(comptime T: type, comptime ParentType: type, comptime field_na
/// Returns:
/// A pointer to the new node.
pub fn allocateNode(list: *Self, allocator: *Allocator) !*Node {
comptime assert(!isIntrusive());
return allocator.create(Node(undefined));
}
@ -202,7 +177,6 @@ fn BaseLinkedList(comptime T: type, comptime ParentType: type, comptime field_na
/// node: Pointer to the node to deallocate.
/// allocator: Dynamic memory allocator.
pub fn destroyNode(list: *Self, node: *Node, allocator: *Allocator) void {
comptime assert(!isIntrusive());
allocator.destroy(node);
}
@ -214,8 +188,7 @@ fn BaseLinkedList(comptime T: type, comptime ParentType: type, comptime field_na
///
/// Returns:
/// A pointer to the new node.
pub fn createNode(list: *Self, data: *const T, allocator: *Allocator) !*Node {
comptime assert(!isIntrusive());
pub fn createNode(list: *Self, data: T, allocator: *Allocator) !*Node {
var node = try list.allocateNode(allocator);
node.* = Node.init(data);
return node;
@ -274,69 +247,3 @@ test "basic linked list test" {
assert(list.last.?.data == 4);
assert(list.len == 2);
}
const ElementList = IntrusiveLinkedList(Element, "link");
const Element = struct {
value: u32,
link: IntrusiveLinkedList(Element, "link").Node,
};
test "basic intrusive linked list test" {
const allocator = debug.global_allocator;
var list = ElementList.init();
var one = Element{
.value = 1,
.link = ElementList.Node.initIntrusive(),
};
var two = Element{
.value = 2,
.link = ElementList.Node.initIntrusive(),
};
var three = Element{
.value = 3,
.link = ElementList.Node.initIntrusive(),
};
var four = Element{
.value = 4,
.link = ElementList.Node.initIntrusive(),
};
var five = Element{
.value = 5,
.link = ElementList.Node.initIntrusive(),
};
list.append(&two.link); // {2}
list.append(&five.link); // {2, 5}
list.prepend(&one.link); // {1, 2, 5}
list.insertBefore(&five.link, &four.link); // {1, 2, 4, 5}
list.insertAfter(&two.link, &three.link); // {1, 2, 3, 4, 5}
// Traverse forwards.
{
var it = list.first;
var index: u32 = 1;
while (it) |node| : (it = node.next) {
assert(node.toData().value == index);
index += 1;
}
}
// Traverse backwards.
{
var it = list.last;
var index: u32 = 1;
while (it) |node| : (it = node.prev) {
assert(node.toData().value == (6 - index));
index += 1;
}
}
var first = list.popFirst(); // {2, 3, 4, 5}
var last = list.pop(); // {2, 3, 4}
list.remove(&three.link); // {2, 4}
assert(list.first.?.toData().value == 2);
assert(list.last.?.toData().value == 4);
assert(list.len == 2);
}