Merge pull request #963 from zig-lang/atomic-stack-and-queue

Atomic stack and queue
master
Andrew Kelley 2018-04-29 12:29:40 -04:00 committed by GitHub
commit f37e79e720
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 527 additions and 90 deletions

View File

@ -415,6 +415,9 @@ set(ZIG_CPP_SOURCES
set(ZIG_STD_FILES set(ZIG_STD_FILES
"array_list.zig" "array_list.zig"
"atomic/index.zig"
"atomic/stack.zig"
"atomic/queue.zig"
"base64.zig" "base64.zig"
"buf_map.zig" "buf_map.zig"
"buf_set.zig" "buf_set.zig"

View File

@ -18184,6 +18184,11 @@ static TypeTableEntry *ir_analyze_instruction_atomic_rmw(IrAnalyze *ira, IrInstr
} else { } else {
if (!ir_resolve_atomic_order(ira, instruction->ordering->other, &ordering)) if (!ir_resolve_atomic_order(ira, instruction->ordering->other, &ordering))
return ira->codegen->builtin_types.entry_invalid; return ira->codegen->builtin_types.entry_invalid;
if (ordering == AtomicOrderUnordered) {
ir_add_error(ira, instruction->ordering,
buf_sprintf("@atomicRmw atomic ordering must not be Unordered"));
return ira->codegen->builtin_types.entry_invalid;
}
} }
if (instr_is_comptime(casted_operand) && instr_is_comptime(casted_ptr) && casted_ptr->value.data.x_ptr.mut == ConstPtrMutComptimeVar) if (instr_is_comptime(casted_operand) && instr_is_comptime(casted_ptr) && casted_ptr->value.data.x_ptr.mut == ConstPtrMutComptimeVar)

7
std/atomic/index.zig Normal file
View File

@ -0,0 +1,7 @@
pub const Stack = @import("stack.zig").Stack;
pub const Queue = @import("queue.zig").Queue;
test "std.atomic" {
_ = @import("stack.zig").Stack;
_ = @import("queue.zig").Queue;
}

120
std/atomic/queue.zig Normal file
View File

@ -0,0 +1,120 @@
const builtin = @import("builtin");
const AtomicOrder = builtin.AtomicOrder;
const AtomicRmwOp = builtin.AtomicRmwOp;
/// Many reader, many writer, non-allocating, thread-safe, lock-free
pub fn Queue(comptime T: type) type {
return struct {
head: &Node,
tail: &Node,
root: Node,
pub const Self = this;
pub const Node = struct {
next: ?&Node,
data: T,
};
// TODO: well defined copy elision: https://github.com/zig-lang/zig/issues/287
pub fn init(self: &Self) void {
self.root.next = null;
self.head = &self.root;
self.tail = &self.root;
}
pub fn put(self: &Self, node: &Node) void {
node.next = null;
const tail = @atomicRmw(&Node, &self.tail, AtomicRmwOp.Xchg, node, AtomicOrder.SeqCst);
_ = @atomicRmw(?&Node, &tail.next, AtomicRmwOp.Xchg, node, AtomicOrder.SeqCst);
}
pub fn get(self: &Self) ?&Node {
var head = @atomicLoad(&Node, &self.head, AtomicOrder.Acquire);
while (true) {
const node = head.next ?? return null;
head = @cmpxchgWeak(&Node, &self.head, head, node, AtomicOrder.Release, AtomicOrder.Acquire) ?? return node;
}
}
};
}
const std = @import("std");
const Context = struct {
allocator: &std.mem.Allocator,
queue: &Queue(i32),
put_sum: isize,
get_sum: isize,
get_count: usize,
puts_done: u8, // TODO make this a bool
};
const puts_per_thread = 10000;
const put_thread_count = 3;
test "std.atomic.queue" {
var direct_allocator = std.heap.DirectAllocator.init();
defer direct_allocator.deinit();
var plenty_of_memory = try direct_allocator.allocator.alloc(u8, 64 * 1024 * 1024);
defer direct_allocator.allocator.free(plenty_of_memory);
var fixed_buffer_allocator = std.heap.ThreadSafeFixedBufferAllocator.init(plenty_of_memory);
var a = &fixed_buffer_allocator.allocator;
var queue: Queue(i32) = undefined;
queue.init();
var context = Context {
.allocator = a,
.queue = &queue,
.put_sum = 0,
.get_sum = 0,
.puts_done = 0,
.get_count = 0,
};
var putters: [put_thread_count]&std.os.Thread = undefined;
for (putters) |*t| {
*t = try std.os.spawnThread(&context, startPuts);
}
var getters: [put_thread_count]&std.os.Thread = undefined;
for (getters) |*t| {
*t = try std.os.spawnThread(&context, startGets);
}
for (putters) |t| t.wait();
_ = @atomicRmw(u8, &context.puts_done, builtin.AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
for (getters) |t| t.wait();
std.debug.assert(context.put_sum == context.get_sum);
std.debug.assert(context.get_count == puts_per_thread * put_thread_count);
}
fn startPuts(ctx: &Context) u8 {
var put_count: usize = puts_per_thread;
var r = std.rand.DefaultPrng.init(0xdeadbeef);
while (put_count != 0) : (put_count -= 1) {
std.os.time.sleep(0, 1); // let the os scheduler be our fuzz
const x = @bitCast(i32, r.random.scalar(u32));
const node = ctx.allocator.create(Queue(i32).Node) catch unreachable;
node.data = x;
ctx.queue.put(node);
_ = @atomicRmw(isize, &ctx.put_sum, builtin.AtomicRmwOp.Add, x, AtomicOrder.SeqCst);
}
return 0;
}
fn startGets(ctx: &Context) u8 {
while (true) {
while (ctx.queue.get()) |node| {
std.os.time.sleep(0, 1); // let the os scheduler be our fuzz
_ = @atomicRmw(isize, &ctx.get_sum, builtin.AtomicRmwOp.Add, node.data, builtin.AtomicOrder.SeqCst);
_ = @atomicRmw(usize, &ctx.get_count, builtin.AtomicRmwOp.Add, 1, builtin.AtomicOrder.SeqCst);
}
if (@atomicLoad(u8, &ctx.puts_done, builtin.AtomicOrder.SeqCst) == 1) {
break;
}
}
return 0;
}

126
std/atomic/stack.zig Normal file
View File

@ -0,0 +1,126 @@
const builtin = @import("builtin");
const AtomicOrder = builtin.AtomicOrder;
/// Many reader, many writer, non-allocating, thread-safe, lock-free
pub fn Stack(comptime T: type) type {
return struct {
root: ?&Node,
pub const Self = this;
pub const Node = struct {
next: ?&Node,
data: T,
};
pub fn init() Self {
return Self {
.root = null,
};
}
/// push operation, but only if you are the first item in the stack. if you did not succeed in
/// being the first item in the stack, returns the other item that was there.
pub fn pushFirst(self: &Self, node: &Node) ?&Node {
node.next = null;
return @cmpxchgStrong(?&Node, &self.root, null, node, AtomicOrder.SeqCst, AtomicOrder.SeqCst);
}
pub fn push(self: &Self, node: &Node) void {
var root = @atomicLoad(?&Node, &self.root, AtomicOrder.SeqCst);
while (true) {
node.next = root;
root = @cmpxchgWeak(?&Node, &self.root, root, node, AtomicOrder.SeqCst, AtomicOrder.SeqCst) ?? break;
}
}
pub fn pop(self: &Self) ?&Node {
var root = @atomicLoad(?&Node, &self.root, AtomicOrder.Acquire);
while (true) {
root = @cmpxchgWeak(?&Node, &self.root, root, (root ?? return null).next, AtomicOrder.SeqCst, AtomicOrder.SeqCst) ?? return root;
}
}
pub fn isEmpty(self: &Self) bool {
return @atomicLoad(?&Node, &self.root, AtomicOrder.SeqCst) == null;
}
};
}
const std = @import("std");
const Context = struct {
allocator: &std.mem.Allocator,
stack: &Stack(i32),
put_sum: isize,
get_sum: isize,
get_count: usize,
puts_done: u8, // TODO make this a bool
};
const puts_per_thread = 1000;
const put_thread_count = 3;
test "std.atomic.stack" {
var direct_allocator = std.heap.DirectAllocator.init();
defer direct_allocator.deinit();
var plenty_of_memory = try direct_allocator.allocator.alloc(u8, 64 * 1024 * 1024);
defer direct_allocator.allocator.free(plenty_of_memory);
var fixed_buffer_allocator = std.heap.ThreadSafeFixedBufferAllocator.init(plenty_of_memory);
var a = &fixed_buffer_allocator.allocator;
var stack = Stack(i32).init();
var context = Context {
.allocator = a,
.stack = &stack,
.put_sum = 0,
.get_sum = 0,
.puts_done = 0,
.get_count = 0,
};
var putters: [put_thread_count]&std.os.Thread = undefined;
for (putters) |*t| {
*t = try std.os.spawnThread(&context, startPuts);
}
var getters: [put_thread_count]&std.os.Thread = undefined;
for (getters) |*t| {
*t = try std.os.spawnThread(&context, startGets);
}
for (putters) |t| t.wait();
_ = @atomicRmw(u8, &context.puts_done, builtin.AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
for (getters) |t| t.wait();
std.debug.assert(context.put_sum == context.get_sum);
std.debug.assert(context.get_count == puts_per_thread * put_thread_count);
}
fn startPuts(ctx: &Context) u8 {
var put_count: usize = puts_per_thread;
var r = std.rand.DefaultPrng.init(0xdeadbeef);
while (put_count != 0) : (put_count -= 1) {
std.os.time.sleep(0, 1); // let the os scheduler be our fuzz
const x = @bitCast(i32, r.random.scalar(u32));
const node = ctx.allocator.create(Stack(i32).Node) catch unreachable;
node.data = x;
ctx.stack.push(node);
_ = @atomicRmw(isize, &ctx.put_sum, builtin.AtomicRmwOp.Add, x, AtomicOrder.SeqCst);
}
return 0;
}
fn startGets(ctx: &Context) u8 {
while (true) {
while (ctx.stack.pop()) |node| {
std.os.time.sleep(0, 1); // let the os scheduler be our fuzz
_ = @atomicRmw(isize, &ctx.get_sum, builtin.AtomicRmwOp.Add, node.data, builtin.AtomicOrder.SeqCst);
_ = @atomicRmw(usize, &ctx.get_count, builtin.AtomicRmwOp.Add, 1, builtin.AtomicOrder.SeqCst);
}
if (@atomicLoad(u8, &ctx.puts_done, builtin.AtomicOrder.SeqCst) == 1) {
break;
}
}
return 0;
}

View File

@ -81,3 +81,8 @@ pub const sockaddr = extern struct {
}; };
pub const sa_family_t = u8; pub const sa_family_t = u8;
pub const pthread_attr_t = extern struct {
__sig: c_long,
__opaque: [56]u8,
};

View File

@ -53,3 +53,13 @@ pub extern "c" fn malloc(usize) ?&c_void;
pub extern "c" fn realloc(&c_void, usize) ?&c_void; pub extern "c" fn realloc(&c_void, usize) ?&c_void;
pub extern "c" fn free(&c_void) void; pub extern "c" fn free(&c_void) void;
pub extern "c" fn posix_memalign(memptr: &&c_void, alignment: usize, size: usize) c_int; pub extern "c" fn posix_memalign(memptr: &&c_void, alignment: usize, size: usize) c_int;
pub extern "pthread" fn pthread_create(noalias newthread: &pthread_t,
noalias attr: ?&const pthread_attr_t, start_routine: extern fn(?&c_void) ?&c_void,
noalias arg: ?&c_void) c_int;
pub extern "pthread" fn pthread_attr_init(attr: &pthread_attr_t) c_int;
pub extern "pthread" fn pthread_attr_setstack(attr: &pthread_attr_t, stackaddr: &c_void, stacksize: usize) c_int;
pub extern "pthread" fn pthread_attr_destroy(attr: &pthread_attr_t) c_int;
pub extern "pthread" fn pthread_join(thread: pthread_t, arg_return: ?&?&c_void) c_int;
pub const pthread_t = &@OpaqueType();

View File

@ -3,3 +3,8 @@ pub use @import("../os/linux/errno.zig");
pub extern "c" fn getrandom(buf_ptr: &u8, buf_len: usize, flags: c_uint) c_int; pub extern "c" fn getrandom(buf_ptr: &u8, buf_len: usize, flags: c_uint) c_int;
extern "c" fn __errno_location() &c_int; extern "c" fn __errno_location() &c_int;
pub const _errno = __errno_location; pub const _errno = __errno_location;
pub const pthread_attr_t = extern struct {
__size: [56]u8,
__align: c_long,
};

View File

@ -47,13 +47,6 @@ pub const DirectAllocator = struct {
const HeapHandle = if (builtin.os == Os.windows) os.windows.HANDLE else void; const HeapHandle = if (builtin.os == Os.windows) os.windows.HANDLE else void;
//pub const canary_bytes = []u8 {48, 239, 128, 46, 18, 49, 147, 9, 195, 59, 203, 3, 245, 54, 9, 122};
//pub const want_safety = switch (builtin.mode) {
// builtin.Mode.Debug => true,
// builtin.Mode.ReleaseSafe => true,
// else => false,
//};
pub fn init() DirectAllocator { pub fn init() DirectAllocator {
return DirectAllocator { return DirectAllocator {
.allocator = Allocator { .allocator = Allocator {
@ -98,7 +91,7 @@ pub const DirectAllocator = struct {
const unused_start = addr; const unused_start = addr;
const unused_len = aligned_addr - 1 - unused_start; const unused_len = aligned_addr - 1 - unused_start;
var err = p.munmap(@intToPtr(&u8, unused_start), unused_len); var err = p.munmap(unused_start, unused_len);
debug.assert(p.getErrno(err) == 0); debug.assert(p.getErrno(err) == 0);
//It is impossible that there is an unoccupied page at the top of our //It is impossible that there is an unoccupied page at the top of our
@ -139,7 +132,7 @@ pub const DirectAllocator = struct {
const rem = @rem(new_addr_end, os.page_size); const rem = @rem(new_addr_end, os.page_size);
const new_addr_end_rounded = new_addr_end + if (rem == 0) 0 else (os.page_size - rem); const new_addr_end_rounded = new_addr_end + if (rem == 0) 0 else (os.page_size - rem);
if (old_addr_end > new_addr_end_rounded) { if (old_addr_end > new_addr_end_rounded) {
_ = os.posix.munmap(@intToPtr(&u8, new_addr_end_rounded), old_addr_end - new_addr_end_rounded); _ = os.posix.munmap(new_addr_end_rounded, old_addr_end - new_addr_end_rounded);
} }
return old_mem[0..new_size]; return old_mem[0..new_size];
} }
@ -177,7 +170,7 @@ pub const DirectAllocator = struct {
switch (builtin.os) { switch (builtin.os) {
Os.linux, Os.macosx, Os.ios => { Os.linux, Os.macosx, Os.ios => {
_ = os.posix.munmap(bytes.ptr, bytes.len); _ = os.posix.munmap(@ptrToInt(bytes.ptr), bytes.len);
}, },
Os.windows => { Os.windows => {
const record_addr = @ptrToInt(bytes.ptr) + bytes.len; const record_addr = @ptrToInt(bytes.ptr) + bytes.len;
@ -298,7 +291,7 @@ pub const FixedBufferAllocator = struct {
fn alloc(allocator: &Allocator, n: usize, alignment: u29) ![]u8 { fn alloc(allocator: &Allocator, n: usize, alignment: u29) ![]u8 {
const self = @fieldParentPtr(FixedBufferAllocator, "allocator", allocator); const self = @fieldParentPtr(FixedBufferAllocator, "allocator", allocator);
const addr = @ptrToInt(&self.buffer[self.end_index]); const addr = @ptrToInt(self.buffer.ptr) + self.end_index;
const rem = @rem(addr, alignment); const rem = @rem(addr, alignment);
const march_forward_bytes = if (rem == 0) 0 else (alignment - rem); const march_forward_bytes = if (rem == 0) 0 else (alignment - rem);
const adjusted_index = self.end_index + march_forward_bytes; const adjusted_index = self.end_index + march_forward_bytes;
@ -325,6 +318,54 @@ pub const FixedBufferAllocator = struct {
fn free(allocator: &Allocator, bytes: []u8) void { } fn free(allocator: &Allocator, bytes: []u8) void { }
}; };
/// lock free
pub const ThreadSafeFixedBufferAllocator = struct {
allocator: Allocator,
end_index: usize,
buffer: []u8,
pub fn init(buffer: []u8) ThreadSafeFixedBufferAllocator {
return ThreadSafeFixedBufferAllocator {
.allocator = Allocator {
.allocFn = alloc,
.reallocFn = realloc,
.freeFn = free,
},
.buffer = buffer,
.end_index = 0,
};
}
fn alloc(allocator: &Allocator, n: usize, alignment: u29) ![]u8 {
const self = @fieldParentPtr(ThreadSafeFixedBufferAllocator, "allocator", allocator);
var end_index = @atomicLoad(usize, &self.end_index, builtin.AtomicOrder.SeqCst);
while (true) {
const addr = @ptrToInt(self.buffer.ptr) + end_index;
const rem = @rem(addr, alignment);
const march_forward_bytes = if (rem == 0) 0 else (alignment - rem);
const adjusted_index = end_index + march_forward_bytes;
const new_end_index = adjusted_index + n;
if (new_end_index > self.buffer.len) {
return error.OutOfMemory;
}
end_index = @cmpxchgWeak(usize, &self.end_index, end_index, new_end_index,
builtin.AtomicOrder.SeqCst, builtin.AtomicOrder.SeqCst) ?? return self.buffer[adjusted_index .. new_end_index];
}
}
fn realloc(allocator: &Allocator, old_mem: []u8, new_size: usize, alignment: u29) ![]u8 {
if (new_size <= old_mem.len) {
return old_mem[0..new_size];
} else {
const result = try alloc(allocator, new_size, alignment);
mem.copy(u8, result, old_mem);
return result;
}
}
fn free(allocator: &Allocator, bytes: []u8) void { }
};
test "c_allocator" { test "c_allocator" {
@ -363,6 +404,13 @@ test "FixedBufferAllocator" {
try testAllocatorLargeAlignment(&fixed_buffer_allocator.allocator); try testAllocatorLargeAlignment(&fixed_buffer_allocator.allocator);
} }
test "ThreadSafeFixedBufferAllocator" {
var fixed_buffer_allocator = ThreadSafeFixedBufferAllocator.init(test_fixed_buffer_allocator_memory[0..]);
try testAllocator(&fixed_buffer_allocator.allocator);
try testAllocatorLargeAlignment(&fixed_buffer_allocator.allocator);
}
fn testAllocator(allocator: &mem.Allocator) !void { fn testAllocator(allocator: &mem.Allocator) !void {
var slice = try allocator.alloc(&i32, 100); var slice = try allocator.alloc(&i32, 100);

View File

@ -8,6 +8,7 @@ pub const HashMap = @import("hash_map.zig").HashMap;
pub const LinkedList = @import("linked_list.zig").LinkedList; pub const LinkedList = @import("linked_list.zig").LinkedList;
pub const IntrusiveLinkedList = @import("linked_list.zig").IntrusiveLinkedList; pub const IntrusiveLinkedList = @import("linked_list.zig").IntrusiveLinkedList;
pub const atomic = @import("atomic/index.zig");
pub const base64 = @import("base64.zig"); pub const base64 = @import("base64.zig");
pub const build = @import("build.zig"); pub const build = @import("build.zig");
pub const c = @import("c/index.zig"); pub const c = @import("c/index.zig");
@ -34,6 +35,7 @@ pub const zig = @import("zig/index.zig");
test "std" { test "std" {
// run tests from these // run tests from these
_ = @import("atomic/index.zig");
_ = @import("array_list.zig"); _ = @import("array_list.zig");
_ = @import("buf_map.zig"); _ = @import("buf_map.zig");
_ = @import("buf_set.zig"); _ = @import("buf_set.zig");

View File

@ -32,6 +32,7 @@ pub const Allocator = struct {
freeFn: fn (self: &Allocator, old_mem: []u8) void, freeFn: fn (self: &Allocator, old_mem: []u8) void,
fn create(self: &Allocator, comptime T: type) !&T { fn create(self: &Allocator, comptime T: type) !&T {
if (@sizeOf(T) == 0) return &{};
const slice = try self.alloc(T, 1); const slice = try self.alloc(T, 1);
return &slice[0]; return &slice[0];
} }

View File

@ -184,7 +184,7 @@ pub fn write(fd: i32, buf: &const u8, nbyte: usize) usize {
return errnoWrap(c.write(fd, @ptrCast(&const c_void, buf), nbyte)); return errnoWrap(c.write(fd, @ptrCast(&const c_void, buf), nbyte));
} }
pub fn mmap(address: ?&u8, length: usize, prot: usize, flags: usize, fd: i32, pub fn mmap(address: ?&u8, length: usize, prot: usize, flags: u32, fd: i32,
offset: isize) usize offset: isize) usize
{ {
const ptr_result = c.mmap(@ptrCast(&c_void, address), length, const ptr_result = c.mmap(@ptrCast(&c_void, address), length,
@ -193,8 +193,8 @@ pub fn mmap(address: ?&u8, length: usize, prot: usize, flags: usize, fd: i32,
return errnoWrap(isize_result); return errnoWrap(isize_result);
} }
pub fn munmap(address: &u8, length: usize) usize { pub fn munmap(address: usize, length: usize) usize {
return errnoWrap(c.munmap(@ptrCast(&c_void, address), length)); return errnoWrap(c.munmap(@intToPtr(&c_void, address), length));
} }
pub fn unlink(path: &const u8) usize { pub fn unlink(path: &const u8) usize {
@ -341,4 +341,4 @@ pub const timeval = c.timeval;
pub const mach_timebase_info_data = c.mach_timebase_info_data; pub const mach_timebase_info_data = c.mach_timebase_info_data;
pub const mach_absolute_time = c.mach_absolute_time; pub const mach_absolute_time = c.mach_absolute_time;
pub const mach_timebase_info = c.mach_timebase_info; pub const mach_timebase_info = c.mach_timebase_info;

View File

@ -2,6 +2,10 @@ const std = @import("../index.zig");
const builtin = @import("builtin"); const builtin = @import("builtin");
const Os = builtin.Os; const Os = builtin.Os;
const is_windows = builtin.os == Os.windows; const is_windows = builtin.os == Os.windows;
const is_posix = switch (builtin.os) {
builtin.Os.linux, builtin.Os.macosx => true,
else => false,
};
const os = this; const os = this;
test "std.os" { test "std.os" {
@ -2343,24 +2347,58 @@ pub fn posixGetSockOptConnectError(sockfd: i32) PosixConnectError!void {
} }
pub const Thread = struct { pub const Thread = struct {
pid: i32, data: Data,
allocator: ?&mem.Allocator,
stack: []u8, pub const use_pthreads = is_posix and builtin.link_libc;
const Data = if (use_pthreads) struct {
handle: c.pthread_t,
stack_addr: usize,
stack_len: usize,
} else switch (builtin.os) {
builtin.Os.linux => struct {
pid: i32,
stack_addr: usize,
stack_len: usize,
},
builtin.Os.windows => struct {
handle: windows.HANDLE,
alloc_start: &c_void,
heap_handle: windows.HANDLE,
},
else => @compileError("Unsupported OS"),
};
pub fn wait(self: &const Thread) void { pub fn wait(self: &const Thread) void {
while (true) { if (use_pthreads) {
const pid_value = @atomicLoad(i32, &self.pid, builtin.AtomicOrder.SeqCst); const err = c.pthread_join(self.data.handle, null);
if (pid_value == 0) break; switch (err) {
const rc = linux.futex_wait(@ptrToInt(&self.pid), linux.FUTEX_WAIT, pid_value, null); 0 => {},
switch (linux.getErrno(rc)) { posix.EINVAL => unreachable,
0 => continue, posix.ESRCH => unreachable,
posix.EINTR => continue, posix.EDEADLK => unreachable,
posix.EAGAIN => continue,
else => unreachable, else => unreachable,
} }
} assert(posix.munmap(self.data.stack_addr, self.data.stack_len) == 0);
if (self.allocator) |a| { } else switch (builtin.os) {
a.free(self.stack); builtin.Os.linux => {
while (true) {
const pid_value = @atomicLoad(i32, &self.data.pid, builtin.AtomicOrder.SeqCst);
if (pid_value == 0) break;
const rc = linux.futex_wait(@ptrToInt(&self.data.pid), linux.FUTEX_WAIT, pid_value, null);
switch (linux.getErrno(rc)) {
0 => continue,
posix.EINTR => continue,
posix.EAGAIN => continue,
else => unreachable,
}
}
assert(posix.munmap(self.data.stack_addr, self.data.stack_len) == 0);
},
builtin.Os.windows => {
assert(windows.WaitForSingleObject(self.data.handle, windows.INFINITE) == windows.WAIT_OBJECT_0);
assert(windows.HeapFree(self.data.heap_handle, 0, self.data.alloc_start) != 0);
},
else => @compileError("Unsupported OS"),
} }
} }
}; };
@ -2385,38 +2423,94 @@ pub const SpawnThreadError = error {
/// be copied. /// be copied.
SystemResources, SystemResources,
/// Not enough userland memory to spawn the thread.
OutOfMemory,
Unexpected, Unexpected,
}; };
pub const SpawnThreadAllocatorError = SpawnThreadError || error{OutOfMemory};
/// caller must call wait on the returned thread /// caller must call wait on the returned thread
/// fn startFn(@typeOf(context)) T /// fn startFn(@typeOf(context)) T
/// where T is u8, noreturn, void, or !void /// where T is u8, noreturn, void, or !void
pub fn spawnThreadAllocator(allocator: &mem.Allocator, context: var, comptime startFn: var) SpawnThreadAllocatorError!&Thread { /// caller must call wait on the returned thread
pub fn spawnThread(context: var, comptime startFn: var) SpawnThreadError!&Thread {
// TODO compile-time call graph analysis to determine stack upper bound // TODO compile-time call graph analysis to determine stack upper bound
// https://github.com/zig-lang/zig/issues/157 // https://github.com/zig-lang/zig/issues/157
const default_stack_size = 8 * 1024 * 1024; const default_stack_size = 8 * 1024 * 1024;
const stack_bytes = try allocator.alloc(u8, default_stack_size);
const thread = try spawnThread(stack_bytes, context, startFn);
thread.allocator = allocator;
return thread;
}
/// stack must be big enough to store one Thread and one @typeOf(context), each with default alignment, at the end
/// fn startFn(@typeOf(context)) T
/// where T is u8, noreturn, void, or !void
/// caller must call wait on the returned thread
pub fn spawnThread(stack: []u8, context: var, comptime startFn: var) SpawnThreadError!&Thread {
const Context = @typeOf(context); const Context = @typeOf(context);
comptime assert(@ArgType(@typeOf(startFn), 0) == Context); comptime assert(@ArgType(@typeOf(startFn), 0) == Context);
var stack_end: usize = @ptrToInt(stack.ptr) + stack.len; if (builtin.os == builtin.Os.windows) {
const WinThread = struct {
const OuterContext = struct {
thread: Thread,
inner: Context,
};
extern fn threadMain(arg: windows.LPVOID) windows.DWORD {
if (@sizeOf(Context) == 0) {
return startFn({});
} else {
return startFn(*@ptrCast(&Context, @alignCast(@alignOf(Context), arg)));
}
}
};
const heap_handle = windows.GetProcessHeap() ?? return SpawnThreadError.OutOfMemory;
const byte_count = @alignOf(WinThread.OuterContext) + @sizeOf(WinThread.OuterContext);
const bytes_ptr = windows.HeapAlloc(heap_handle, 0, byte_count) ?? return SpawnThreadError.OutOfMemory;
errdefer assert(windows.HeapFree(heap_handle, 0, bytes_ptr) != 0);
const bytes = @ptrCast(&u8, bytes_ptr)[0..byte_count];
const outer_context = std.heap.FixedBufferAllocator.init(bytes).allocator.create(WinThread.OuterContext) catch unreachable;
outer_context.inner = context;
outer_context.thread.data.heap_handle = heap_handle;
outer_context.thread.data.alloc_start = bytes_ptr;
const parameter = if (@sizeOf(Context) == 0) null else @ptrCast(&c_void, &outer_context.inner);
outer_context.thread.data.handle = windows.CreateThread(null, default_stack_size, WinThread.threadMain,
parameter, 0, null) ??
{
const err = windows.GetLastError();
return switch (err) {
else => os.unexpectedErrorWindows(err),
};
};
return &outer_context.thread;
}
const MainFuncs = struct {
extern fn linuxThreadMain(ctx_addr: usize) u8 {
if (@sizeOf(Context) == 0) {
return startFn({});
} else {
return startFn(*@intToPtr(&const Context, ctx_addr));
}
}
extern fn posixThreadMain(ctx: ?&c_void) ?&c_void {
if (@sizeOf(Context) == 0) {
_ = startFn({});
return null;
} else {
_ = startFn(*@ptrCast(&const Context, @alignCast(@alignOf(Context), ctx)));
return null;
}
}
};
const MAP_GROWSDOWN = if (builtin.os == builtin.Os.linux) linux.MAP_GROWSDOWN else 0;
const mmap_len = default_stack_size;
const stack_addr = posix.mmap(null, mmap_len, posix.PROT_READ|posix.PROT_WRITE,
posix.MAP_PRIVATE|posix.MAP_ANONYMOUS|MAP_GROWSDOWN, -1, 0);
if (stack_addr == posix.MAP_FAILED) return error.OutOfMemory;
errdefer assert(posix.munmap(stack_addr, mmap_len) == 0);
var stack_end: usize = stack_addr + mmap_len;
var arg: usize = undefined; var arg: usize = undefined;
if (@sizeOf(Context) != 0) { if (@sizeOf(Context) != 0) {
stack_end -= @sizeOf(Context); stack_end -= @sizeOf(Context);
stack_end -= stack_end % @alignOf(Context); stack_end -= stack_end % @alignOf(Context);
assert(stack_end >= @ptrToInt(stack.ptr)); assert(stack_end >= stack_addr);
const context_ptr = @alignCast(@alignOf(Context), @intToPtr(&Context, stack_end)); const context_ptr = @alignCast(@alignOf(Context), @intToPtr(&Context, stack_end));
*context_ptr = context; *context_ptr = context;
arg = stack_end; arg = stack_end;
@ -2424,36 +2518,53 @@ pub fn spawnThread(stack: []u8, context: var, comptime startFn: var) SpawnThread
stack_end -= @sizeOf(Thread); stack_end -= @sizeOf(Thread);
stack_end -= stack_end % @alignOf(Thread); stack_end -= stack_end % @alignOf(Thread);
assert(stack_end >= @ptrToInt(stack.ptr)); assert(stack_end >= stack_addr);
const thread_ptr = @alignCast(@alignOf(Thread), @intToPtr(&Thread, stack_end)); const thread_ptr = @alignCast(@alignOf(Thread), @intToPtr(&Thread, stack_end));
thread_ptr.stack = stack;
thread_ptr.allocator = null;
const threadMain = struct { thread_ptr.data.stack_addr = stack_addr;
extern fn threadMain(ctx_addr: usize) u8 { thread_ptr.data.stack_len = mmap_len;
if (@sizeOf(Context) == 0) {
return startFn({}); if (builtin.os == builtin.Os.windows) {
} else { // use windows API directly
return startFn(*@intToPtr(&const Context, ctx_addr)); @compileError("TODO support spawnThread for Windows");
} } else if (Thread.use_pthreads) {
// use pthreads
var attr: c.pthread_attr_t = undefined;
if (c.pthread_attr_init(&attr) != 0) return SpawnThreadError.SystemResources;
defer assert(c.pthread_attr_destroy(&attr) == 0);
// align to page
stack_end -= stack_end % os.page_size;
assert(c.pthread_attr_setstack(&attr, @intToPtr(&c_void, stack_addr), stack_end - stack_addr) == 0);
const err = c.pthread_create(&thread_ptr.data.handle, &attr, MainFuncs.posixThreadMain, @intToPtr(&c_void, arg));
switch (err) {
0 => return thread_ptr,
posix.EAGAIN => return SpawnThreadError.SystemResources,
posix.EPERM => unreachable,
posix.EINVAL => unreachable,
else => return unexpectedErrorPosix(usize(err)),
} }
}.threadMain; } else if (builtin.os == builtin.Os.linux) {
// use linux API directly
const flags = posix.CLONE_VM | posix.CLONE_FS | posix.CLONE_FILES | posix.CLONE_SIGHAND const flags = posix.CLONE_VM | posix.CLONE_FS | posix.CLONE_FILES | posix.CLONE_SIGHAND
| posix.CLONE_THREAD | posix.CLONE_SYSVSEM // | posix.CLONE_SETTLS | posix.CLONE_THREAD | posix.CLONE_SYSVSEM // | posix.CLONE_SETTLS
| posix.CLONE_PARENT_SETTID | posix.CLONE_CHILD_CLEARTID | posix.CLONE_DETACHED; | posix.CLONE_PARENT_SETTID | posix.CLONE_CHILD_CLEARTID | posix.CLONE_DETACHED;
const newtls: usize = 0; const newtls: usize = 0;
const rc = posix.clone(threadMain, stack_end, flags, arg, &thread_ptr.pid, newtls, &thread_ptr.pid); const rc = posix.clone(MainFuncs.linuxThreadMain, stack_end, flags, arg, &thread_ptr.data.pid, newtls, &thread_ptr.data.pid);
const err = posix.getErrno(rc); const err = posix.getErrno(rc);
switch (err) { switch (err) {
0 => return thread_ptr, 0 => return thread_ptr,
posix.EAGAIN => return SpawnThreadError.ThreadQuotaExceeded, posix.EAGAIN => return SpawnThreadError.ThreadQuotaExceeded,
posix.EINVAL => unreachable, posix.EINVAL => unreachable,
posix.ENOMEM => return SpawnThreadError.SystemResources, posix.ENOMEM => return SpawnThreadError.SystemResources,
posix.ENOSPC => unreachable, posix.ENOSPC => unreachable,
posix.EPERM => unreachable, posix.EPERM => unreachable,
posix.EUSERS => unreachable, posix.EUSERS => unreachable,
else => return unexpectedErrorPosix(err), else => return unexpectedErrorPosix(err),
}
} else {
@compileError("Unsupported OS");
} }
} }

View File

@ -706,13 +706,13 @@ pub fn umount2(special: &const u8, flags: u32) usize {
return syscall2(SYS_umount2, @ptrToInt(special), flags); return syscall2(SYS_umount2, @ptrToInt(special), flags);
} }
pub fn mmap(address: ?&u8, length: usize, prot: usize, flags: usize, fd: i32, offset: isize) usize { pub fn mmap(address: ?&u8, length: usize, prot: usize, flags: u32, fd: i32, offset: isize) usize {
return syscall6(SYS_mmap, @ptrToInt(address), length, prot, flags, usize(fd), return syscall6(SYS_mmap, @ptrToInt(address), length, prot, flags, usize(fd),
@bitCast(usize, offset)); @bitCast(usize, offset));
} }
pub fn munmap(address: &u8, length: usize) usize { pub fn munmap(address: usize, length: usize) usize {
return syscall2(SYS_munmap, @ptrToInt(address), length); return syscall2(SYS_munmap, address, length);
} }
pub fn read(fd: i32, buf: &u8, count: usize) usize { pub fn read(fd: i32, buf: &u8, count: usize) usize {

View File

@ -44,24 +44,12 @@ test "access file" {
} }
test "spawn threads" { test "spawn threads" {
if (builtin.os != builtin.Os.linux) {
// TODO implement threads on macos and windows
return;
}
var direct_allocator = std.heap.DirectAllocator.init();
defer direct_allocator.deinit();
var shared_ctx: i32 = 1; var shared_ctx: i32 = 1;
const thread1 = try std.os.spawnThreadAllocator(&direct_allocator.allocator, {}, start1); const thread1 = try std.os.spawnThread({}, start1);
const thread4 = try std.os.spawnThreadAllocator(&direct_allocator.allocator, &shared_ctx, start2); const thread2 = try std.os.spawnThread(&shared_ctx, start2);
const thread3 = try std.os.spawnThread(&shared_ctx, start2);
var stack1: [1024]u8 = undefined; const thread4 = try std.os.spawnThread(&shared_ctx, start2);
var stack2: [1024]u8 = undefined;
const thread2 = try std.os.spawnThread(stack1[0..], &shared_ctx, start2);
const thread3 = try std.os.spawnThread(stack2[0..], &shared_ctx, start2);
thread1.wait(); thread1.wait();
thread2.wait(); thread2.wait();

View File

@ -28,6 +28,9 @@ pub extern "kernel32" stdcallcc fn CreateProcessA(lpApplicationName: ?LPCSTR, lp
pub extern "kernel32" stdcallcc fn CreateSymbolicLinkA(lpSymlinkFileName: LPCSTR, lpTargetFileName: LPCSTR, pub extern "kernel32" stdcallcc fn CreateSymbolicLinkA(lpSymlinkFileName: LPCSTR, lpTargetFileName: LPCSTR,
dwFlags: DWORD) BOOLEAN; dwFlags: DWORD) BOOLEAN;
pub extern "kernel32" stdcallcc fn CreateThread(lpThreadAttributes: ?LPSECURITY_ATTRIBUTES, dwStackSize: SIZE_T, lpStartAddress: LPTHREAD_START_ROUTINE, lpParameter: ?LPVOID, dwCreationFlags: DWORD, lpThreadId: ?LPDWORD) ?HANDLE;
pub extern "kernel32" stdcallcc fn DeleteFileA(lpFileName: LPCSTR) BOOL; pub extern "kernel32" stdcallcc fn DeleteFileA(lpFileName: LPCSTR) BOOL;
pub extern "kernel32" stdcallcc fn ExitProcess(exit_code: UINT) noreturn; pub extern "kernel32" stdcallcc fn ExitProcess(exit_code: UINT) noreturn;
@ -318,6 +321,9 @@ pub const HEAP_CREATE_ENABLE_EXECUTE = 0x00040000;
pub const HEAP_GENERATE_EXCEPTIONS = 0x00000004; pub const HEAP_GENERATE_EXCEPTIONS = 0x00000004;
pub const HEAP_NO_SERIALIZE = 0x00000001; pub const HEAP_NO_SERIALIZE = 0x00000001;
pub const PTHREAD_START_ROUTINE = extern fn(LPVOID) DWORD;
pub const LPTHREAD_START_ROUTINE = PTHREAD_START_ROUTINE;
test "import" { test "import" {
_ = @import("util.zig"); _ = @import("util.zig");
} }