(breaking) improve and simplify fixed buffer streams API

This commit is contained in:
Andrew Kelley 2020-03-10 16:31:04 -04:00
parent ba0e3be5cf
commit b6fbd524f1
No known key found for this signature in database
GPG Key ID: 7C5F548F728501A9
9 changed files with 218 additions and 231 deletions

View File

@ -95,18 +95,18 @@ pub fn getStdIn() File {
pub const SeekableStream = @import("io/seekable_stream.zig").SeekableStream;
pub const InStream = @import("io/in_stream.zig").InStream;
pub const OutStream = @import("io/out_stream.zig").OutStream;
pub const FixedBufferInStream = @import("io/fixed_buffer_stream.zig").FixedBufferInStream;
pub const BufferedAtomicFile = @import("io/buffered_atomic_file.zig").BufferedAtomicFile;
pub const BufferedOutStream = @import("io/buffered_out_stream.zig").BufferedOutStream;
pub const BufferedOutStreamCustom = @import("io/buffered_out_stream.zig").BufferedOutStreamCustom;
pub const bufferedOutStream = @import("io/buffered_out_stream.zig").bufferedOutStream;
pub const CountingOutStream = @import("io/counting_out_stream.zig").CountingOutStream;
pub const BufferedInStream = @import("io/buffered_in_stream.zig").BufferedInStream;
pub const bufferedInStream = @import("io/buffered_in_stream.zig").bufferedInStream;
pub fn fixedBufferStream(bytes: []const u8) FixedBufferInStream {
return (FixedBufferInStream{ .bytes = bytes, .pos = 0 });
}
pub const FixedBufferStream = @import("io/fixed_buffer_stream.zig").FixedBufferStream;
pub const fixedBufferStream = @import("io/fixed_buffer_stream.zig").fixedBufferStream;
pub const CountingOutStream = @import("io/counting_out_stream.zig").CountingOutStream;
pub fn cOutStream(c_file: *std.c.FILE) COutStream {
return .{ .context = c_file };
@ -144,92 +144,6 @@ pub fn readFileAlloc(allocator: *mem.Allocator, path: []const u8) ![]u8 {
return fs.cwd().readFileAlloc(allocator, path, math.maxInt(usize));
}
pub fn BufferedInStream(comptime Error: type) type {
return BufferedInStreamCustom(mem.page_size, Error);
}
pub fn BufferedInStreamCustom(comptime buffer_size: usize, comptime Error: type) type {
return struct {
const Self = @This();
const Stream = InStream(Error);
stream: Stream,
unbuffered_in_stream: *Stream,
const FifoType = std.fifo.LinearFifo(u8, std.fifo.LinearFifoBufferType{ .Static = buffer_size });
fifo: FifoType,
pub fn init(unbuffered_in_stream: *Stream) Self {
return Self{
.unbuffered_in_stream = unbuffered_in_stream,
.fifo = FifoType.init(),
.stream = Stream{ .readFn = readFn },
};
}
fn readFn(in_stream: *Stream, dest: []u8) !usize {
const self = @fieldParentPtr(Self, "stream", in_stream);
var dest_index: usize = 0;
while (dest_index < dest.len) {
const written = self.fifo.read(dest[dest_index..]);
if (written == 0) {
// fifo empty, fill it
const writable = self.fifo.writableSlice(0);
assert(writable.len > 0);
const n = try self.unbuffered_in_stream.read(writable);
if (n == 0) {
// reading from the unbuffered stream returned nothing
// so we have nothing left to read.
return dest_index;
}
self.fifo.update(n);
}
dest_index += written;
}
return dest.len;
}
};
}
test "io.BufferedInStream" {
const OneByteReadInStream = struct {
const Error = error{NoError};
const Stream = InStream(Error);
stream: Stream,
str: []const u8,
curr: usize,
fn init(str: []const u8) @This() {
return @This(){
.stream = Stream{ .readFn = readFn },
.str = str,
.curr = 0,
};
}
fn readFn(in_stream: *Stream, dest: []u8) Error!usize {
const self = @fieldParentPtr(@This(), "stream", in_stream);
if (self.str.len <= self.curr or dest.len == 0)
return 0;
dest[0] = self.str[self.curr];
self.curr += 1;
return 1;
}
};
const str = "This is a test";
var one_byte_stream = OneByteReadInStream.init(str);
var buf_in_stream = BufferedInStream(OneByteReadInStream.Error).init(&one_byte_stream.stream);
const stream = &buf_in_stream.stream;
const res = try stream.readAllAlloc(testing.allocator, str.len + 1);
defer testing.allocator.free(res);
testing.expectEqualSlices(u8, str, res);
}
/// Creates a stream which supports 'un-reading' data, so that it can be read again.
/// This makes look-ahead style parsing much easier.
pub fn PeekStream(comptime buffer_type: std.fifo.LinearFifoBufferType, comptime InStreamError: type) type {
@ -473,64 +387,6 @@ pub fn BitInStream(endian: builtin.Endian, comptime Error: type) type {
};
}
/// This is a simple OutStream that writes to a fixed buffer. If the returned number
/// of bytes written is less than requested, the buffer is full.
/// Returns error.OutOfMemory when no bytes would be written.
pub const SliceOutStream = struct {
pub const Error = error{OutOfMemory};
pub const Stream = OutStream(Error);
stream: Stream,
pos: usize,
slice: []u8,
pub fn init(slice: []u8) SliceOutStream {
return SliceOutStream{
.slice = slice,
.pos = 0,
.stream = Stream{ .writeFn = writeFn },
};
}
pub fn getWritten(self: *const SliceOutStream) []const u8 {
return self.slice[0..self.pos];
}
pub fn reset(self: *SliceOutStream) void {
self.pos = 0;
}
fn writeFn(out_stream: *Stream, bytes: []const u8) Error!usize {
const self = @fieldParentPtr(SliceOutStream, "stream", out_stream);
if (bytes.len == 0) return 0;
assert(self.pos <= self.slice.len);
const n = if (self.pos + bytes.len <= self.slice.len)
bytes.len
else
self.slice.len - self.pos;
std.mem.copy(u8, self.slice[self.pos .. self.pos + n], bytes[0..n]);
self.pos += n;
if (n == 0) return error.OutOfMemory;
return n;
}
};
test "io.SliceOutStream" {
var buf: [255]u8 = undefined;
var slice_stream = SliceOutStream.init(buf[0..]);
const stream = &slice_stream.stream;
try stream.print("{}{}!", .{ "Hello", "World" });
testing.expectEqualSlices(u8, "HelloWorld!", slice_stream.getWritten());
}
/// An OutStream that doesn't write to anything.
pub const null_out_stream = @as(NullOutStream, .{ .context = {} });

View File

@ -10,7 +10,7 @@ pub const BufferedAtomicFile = struct {
allocator: *mem.Allocator,
pub const buffer_size = 4096;
pub const BufferedOutStream = std.io.BufferedOutStreamCustom(buffer_size, File.OutStream);
pub const BufferedOutStream = std.io.BufferedOutStream(buffer_size, File.OutStream);
pub const OutStream = std.io.OutStream(*BufferedOutStream, BufferedOutStream.Error, BufferedOutStream.write);
/// TODO when https://github.com/ziglang/zig/issues/2761 is solved
@ -29,7 +29,7 @@ pub const BufferedAtomicFile = struct {
errdefer self.atomic_file.deinit();
self.file_stream = self.atomic_file.file.outStream();
self.buffered_stream = std.io.bufferedOutStream(buffer_size, self.file_stream);
self.buffered_stream = .{ .unbuffered_out_stream = self.file_stream };
return self;
}

View File

@ -0,0 +1,84 @@
const std = @import("../std.zig");
const io = std.io;
pub fn BufferedInStream(comptime buffer_size: usize, comptime InStreamType) type {
return struct {
unbuffered_in_stream: InStreamType,
fifo: FifoType = FifoType.init(),
pub const Error = InStreamType.Error;
pub const InStream = io.InStream(*Self, Error, read);
const Self = @This();
const FifoType = std.fifo.LinearFifo(u8, std.fifo.LinearFifoBufferType{ .Static = buffer_size });
pub fn read(self: *Self, dest: []u8) Error!usize {
var dest_index: usize = 0;
while (dest_index < dest.len) {
const written = self.fifo.read(dest[dest_index..]);
if (written == 0) {
// fifo empty, fill it
const writable = self.fifo.writableSlice(0);
assert(writable.len > 0);
const n = try self.unbuffered_in_stream.read(writable);
if (n == 0) {
// reading from the unbuffered stream returned nothing
// so we have nothing left to read.
return dest_index;
}
self.fifo.update(n);
}
dest_index += written;
}
return dest.len;
}
pub fn inStream(self: *Self) InStream {
return .{ .context = self };
}
};
}
pub fn bufferedInStream(underlying_stream: var) BufferedInStream(4096, @TypeOf(underlying_stream)) {
return .{ .unbuffered_in_stream = underlying_stream };
}
test "io.BufferedInStream" {
const OneByteReadInStream = struct {
str: []const u8,
curr: usize,
const Error = error{NoError};
const Self = @This();
const InStream = io.InStream(*Self, Error, read);
fn init(str: []const u8) Self {
return Self{
.str = str,
.curr = 0,
};
}
fn read(self: *Self, dest: []u8) Error!usize {
if (self.str.len <= self.curr or dest.len == 0)
return 0;
dest[0] = self.str[self.curr];
self.curr += 1;
return 1;
}
fn inStream(self: *Self) InStream {
return .{ .context = self };
}
};
const str = "This is a test";
var one_byte_stream = OneByteReadInStream.init(str);
var buf_in_stream = bufferedInStream(one_byte_stream.inStream());
const stream = buf_in_stream.inStream();
const res = try stream.readAllAlloc(testing.allocator, str.len + 1);
defer testing.allocator.free(res);
testing.expectEqualSlices(u8, str, res);
}

View File

@ -1,14 +1,10 @@
const std = @import("../std.zig");
const io = std.io;
pub fn BufferedOutStream(comptime OutStreamType: type) type {
return BufferedOutStreamCustom(4096, OutStreamType);
}
pub fn BufferedOutStreamCustom(comptime buffer_size: usize, comptime OutStreamType: type) type {
pub fn BufferedOutStream(comptime buffer_size: usize, comptime OutStreamType: type) type {
return struct {
unbuffered_out_stream: OutStreamType,
fifo: FifoType,
fifo: FifoType = FifoType.init(),
pub const Error = OutStreamType.Error;
pub const OutStream = io.OutStream(*Self, Error, write);
@ -16,13 +12,6 @@ pub fn BufferedOutStreamCustom(comptime buffer_size: usize, comptime OutStreamTy
const Self = @This();
const FifoType = std.fifo.LinearFifo(u8, std.fifo.LinearFifoBufferType{ .Static = buffer_size });
pub fn init(unbuffered_out_stream: OutStreamType) Self {
return Self{
.unbuffered_out_stream = unbuffered_out_stream,
.fifo = FifoType.init(),
};
}
pub fn flush(self: *Self) !void {
while (true) {
const slice = self.fifo.readableSlice(0);
@ -47,10 +36,6 @@ pub fn BufferedOutStreamCustom(comptime buffer_size: usize, comptime OutStreamTy
};
}
pub fn bufferedOutStream(
comptime buffer_size: usize,
underlying_stream: var,
) BufferedOutStreamCustom(buffer_size, @TypeOf(underlying_stream)) {
return BufferedOutStreamCustom(buffer_size, @TypeOf(underlying_stream)).init(underlying_stream);
pub fn bufferedOutStream(underlying_stream: var) BufferedOutStream(4096, @TypeOf(underlying_stream)) {
return .{ .unbuffered_out_stream = underlying_stream };
}

View File

@ -1,66 +1,129 @@
const std = @import("../std.zig");
const io = std.io;
const testing = std.testing;
pub const FixedBufferInStream = struct {
bytes: []const u8,
pos: usize,
/// This turns a slice into an `io.OutStream`, `io.InStream`, or `io.SeekableStream`.
/// If the supplied slice is const, then `io.OutStream` is not available.
pub fn FixedBufferStream(comptime Buffer: type) type {
return struct {
/// `Buffer` is either a `[]u8` or `[]const u8`.
buffer: Buffer,
pos: usize,
pub const SeekError = error{EndOfStream};
pub const GetSeekPosError = error{};
pub const ReadError = error{EndOfStream};
pub const WriteError = error{OutOfMemory};
pub const SeekError = error{EndOfStream};
pub const GetSeekPosError = error{};
pub const InStream = io.InStream(*FixedBufferInStream, error{}, read);
pub const InStream = io.InStream(*Self, ReadError, read);
pub const OutStream = io.OutStream(*Self, WriteError, write);
pub fn inStream(self: *FixedBufferInStream) InStream {
return .{ .context = self };
}
pub const SeekableStream = io.SeekableStream(
*Self,
SeekError,
GetSeekPosError,
seekTo,
seekBy,
getPos,
getEndPos,
);
pub const SeekableStream = io.SeekableStream(
*FixedBufferInStream,
SeekError,
GetSeekPosError,
seekTo,
seekBy,
getPos,
getEndPos,
);
const Self = @This();
pub fn seekableStream(self: *FixedBufferInStream) SeekableStream {
return .{ .context = self };
}
pub fn read(self: *FixedBufferInStream, dest: []u8) error{}!usize {
const size = std.math.min(dest.len, self.bytes.len - self.pos);
const end = self.pos + size;
std.mem.copy(u8, dest[0..size], self.bytes[self.pos..end]);
self.pos = end;
return size;
}
pub fn seekTo(self: *FixedBufferInStream, pos: u64) SeekError!void {
const usize_pos = std.math.cast(usize, pos) catch return error.EndOfStream;
if (usize_pos > self.bytes.len) return error.EndOfStream;
self.pos = usize_pos;
}
pub fn seekBy(self: *FixedBufferInStream, amt: i64) SeekError!void {
if (amt < 0) {
const abs_amt = std.math.cast(usize, -amt) catch return error.EndOfStream;
if (abs_amt > self.pos) return error.EndOfStream;
self.pos -= abs_amt;
} else {
const usize_amt = std.math.cast(usize, amt) catch return error.EndOfStream;
if (self.pos + usize_amt > self.bytes.len) return error.EndOfStream;
self.pos += usize_amt;
pub fn inStream(self: *Self) InStream {
return .{ .context = self };
}
}
pub fn getEndPos(self: *FixedBufferInStream) GetSeekPosError!u64 {
return self.bytes.len;
}
pub fn outStream(self: *Self) OutStream {
return .{ .context = self };
}
pub fn getPos(self: *FixedBufferInStream) GetSeekPosError!u64 {
return self.pos;
}
};
pub fn seekableStream(self: *Self) SeekableStream {
return .{ .context = self };
}
pub fn read(self: *Self, dest: []u8) ReadError!usize {
const size = std.math.min(dest.len, self.buffer.len - self.pos);
const end = self.pos + size;
std.mem.copy(u8, dest[0..size], self.buffer[self.pos..end]);
self.pos = end;
if (size == 0) return error.EndOfStream;
return size;
}
/// If the returned number of bytes written is less than requested, the
/// buffer is full. Returns `error.OutOfMemory` when no bytes would be written.
pub fn write(self: *Self, bytes: []const u8) WriteError!usize {
if (bytes.len == 0) return 0;
assert(self.pos <= self.buffer.len);
const n = if (self.pos + bytes.len <= self.buffer.len)
bytes.len
else
self.buffer.len - self.pos;
std.mem.copy(u8, self.buffer[self.pos .. self.pos + n], bytes[0..n]);
self.pos += n;
if (n == 0) return error.OutOfMemory;
return n;
}
pub fn seekTo(self: *Self, pos: u64) SeekError!void {
const usize_pos = std.math.cast(usize, pos) catch return error.EndOfStream;
if (usize_pos > self.buffer.len) return error.EndOfStream;
self.pos = usize_pos;
}
pub fn seekBy(self: *Self, amt: i64) SeekError!void {
if (amt < 0) {
const abs_amt = std.math.cast(usize, -amt) catch return error.EndOfStream;
if (abs_amt > self.pos) return error.EndOfStream;
self.pos -= abs_amt;
} else {
const usize_amt = std.math.cast(usize, amt) catch return error.EndOfStream;
if (self.pos + usize_amt > self.buffer.len) return error.EndOfStream;
self.pos += usize_amt;
}
}
pub fn getEndPos(self: *Self) GetSeekPosError!u64 {
return self.buffer.len;
}
pub fn getPos(self: *Self) GetSeekPosError!u64 {
return self.pos;
}
pub fn getWritten(self: Self) []const u8 {
return self.slice[0..self.pos];
}
pub fn reset(self: *Self) void {
self.pos = 0;
}
};
}
pub fn fixedBufferStream(buffer: var) FixedBufferStream(NonSentinelSpan(@TypeOf(buffer))) {
return .{ .buffer = std.mem.span(buffer), .pos = 0 };
}
fn NonSentinelSpan(comptime T: type) type {
var ptr_info = @typeInfo(std.mem.Span(T)).Pointer;
ptr_info.sentinel = null;
return @Type(std.builtin.TypeInfo{ .Pointer = ptr_info });
}
test "FixedBufferStream" {
var buf: [255]u8 = undefined;
var fbs = fixedBufferStream(&buf);
const stream = fbs.outStream();
try stream.print("{}{}!", .{ "Hello", "World" });
testing.expectEqualSlices(u8, "HelloWorld!", fbs.getWritten());
}

View File

@ -2107,8 +2107,8 @@ test "import more json tests" {
test "write json then parse it" {
var out_buffer: [1000]u8 = undefined;
var slice_out_stream = std.io.SliceOutStream.init(&out_buffer);
const out_stream = &slice_out_stream.stream;
var fixed_buffer_stream = std.io.fixedBufferStream(&out_buffer);
const out_stream = fixed_buffer_stream.outStream();
var jw = WriteStream(@TypeOf(out_stream).Child, 4).init(out_stream);
try jw.beginObject();

View File

@ -177,7 +177,7 @@ pub const Progress = struct {
pub fn log(self: *Progress, comptime format: []const u8, args: var) void {
const file = self.terminal orelse return;
self.refresh();
file.outStream().stream.print(format, args) catch {
file.outStream().print(format, args) catch {
self.terminal = null;
return;
};

View File

@ -5,7 +5,6 @@ pub const BloomFilter = @import("bloom_filter.zig").BloomFilter;
pub const BufMap = @import("buf_map.zig").BufMap;
pub const BufSet = @import("buf_set.zig").BufSet;
pub const Buffer = @import("buffer.zig").Buffer;
pub const BufferOutStream = @import("io.zig").BufferOutStream;
pub const ChildProcess = @import("child_process.zig").ChildProcess;
pub const DynLib = @import("dynamic_library.zig").DynLib;
pub const HashMap = @import("hash_map.zig").HashMap;

View File

@ -93,7 +93,7 @@ pub fn cmdTargets(
};
defer allocator.free(available_glibcs);
var bos = io.bufferedOutStream(4096, stdout);
var bos = io.bufferedOutStream(stdout);
const bos_stream = bos.outStream();
var jws = std.json.WriteStream(@TypeOf(bos_stream), 6).init(bos_stream);