diff --git a/src/util/queue.zig b/src/util/queue.zig new file mode 100644 index 0000000..4e1e7fc --- /dev/null +++ b/src/util/queue.zig @@ -0,0 +1,109 @@ +const std = @import("std"); +const Atomic = std.atomic.Value; + +pub fn MPSCQueue(comptime T: type) type { + return struct { + const Self = @This(); + + buffer: std.ArrayList(T), + head: Atomic(usize), + tail: Atomic(usize), + allocator: std.mem.Allocator, + + pub fn init(allocator: std.mem.Allocator) Self { + return .{ + .buffer = std.ArrayList(T).init(allocator), + .head = Atomic(usize).init(0), + .tail = Atomic(usize).init(0), + .allocator = allocator, + }; + } + + pub fn deinit(self: *Self) void { + self.buffer.deinit(); + } + + pub fn push(self: *Self, item: T) !void { + const tail = self.tail.load(.monotonic); + + // Ensure capacity + if (tail >= self.buffer.items.len) { + const new_capacity = if (self.buffer.items.len == 0) 8 else self.buffer.items.len * 2; + try self.buffer.resize(new_capacity); + } + + // Store item and update tail + self.buffer.items[tail] = item; + self.tail.store(tail + 1, .release); + } + + pub fn pop(self: *Self) ?T { + while (true) { + const head = self.head.load(.acquire); + const tail = self.tail.load(.acquire); + + if (head >= tail) { + return null; + } + + const item = self.buffer.items[head]; + + if (self.head.cmpxchgStrong(head, head + 1, .acq_rel, .monotonic)) |_| { + continue; + } + + return item; + } + } + + pub fn isEmpty(self: *Self) bool { + const head = self.head.load(.acquire); + const tail = self.tail.load(.acquire); + return head >= tail; + } + }; +} + +const t = std.testing; + +test "MPSC Queue basic operations" { + var queue = MPSCQueue(i32).init(t.allocator); + defer queue.deinit(); + + try queue.push(1); + try queue.push(2); + try queue.push(3); + + try t.expect(queue.pop().? == 1); + try t.expect(queue.pop().? == 2); + try t.expect(queue.pop().? == 3); + try t.expect(queue.pop() == null); +} + +const expected = [_]i32{ 1, 2, 3, 4, 5 }; + +fn threadOne(queue: *MPSCQueue(i32)) !void { + var i: usize = 0; + while (queue.pop()) |item| : (i += 1) { + try t.expectEqual(expected[i], item); + } +} + +fn threadTwo(queue: *MPSCQueue(i32)) !void { + for (expected) |item| { + try queue.push(item); + } +} + +test "MPSC Threaded" { + const Thread = std.Thread; + + var q = MPSCQueue(i32).init(t.allocator); + defer q.deinit(); + + const t1 = try Thread.spawn(.{ .allocator = t.allocator }, threadOne, .{&q}); + const t2 = try Thread.spawn(.{ .allocator = t.allocator }, threadTwo, .{&q}); + + t1.join(); + t2.join(); +} diff --git a/src/util/utils.zig b/src/util/utils.zig index 1b21ec0..975793e 100644 --- a/src/util/utils.zig +++ b/src/util/utils.zig @@ -1,6 +1,9 @@ const str = @import("./smartString.zig"); pub const SmartString = str.SmartString; +const queue = @import("./queue.zig"); +pub const Queue = queue.Queue; + pub const niceTypeName = @import("./niceTypeName.zig").niceTypeName; comptime { @@ -9,5 +12,6 @@ comptime { if (builtin.is_test) { std.mem.doNotOptimizeAway(str); + std.mem.doNotOptimizeAway(queue); } }