Add a queue implementation

This commit is contained in:
Lyssieth 2024-12-10 04:06:01 +02:00
parent 058d555193
commit 63bbd536fd
Signed by untrusted user who does not match committer: lyssieth
GPG key ID: 200268854934CFAB
2 changed files with 113 additions and 0 deletions

109
src/util/queue.zig Normal file
View file

@ -0,0 +1,109 @@
const std = @import("std");
const Atomic = std.atomic.Value;
pub fn MPSCQueue(comptime T: type) type {
return struct {
const Self = @This();
buffer: std.ArrayList(T),
head: Atomic(usize),
tail: Atomic(usize),
allocator: std.mem.Allocator,
pub fn init(allocator: std.mem.Allocator) Self {
return .{
.buffer = std.ArrayList(T).init(allocator),
.head = Atomic(usize).init(0),
.tail = Atomic(usize).init(0),
.allocator = allocator,
};
}
pub fn deinit(self: *Self) void {
self.buffer.deinit();
}
pub fn push(self: *Self, item: T) !void {
const tail = self.tail.load(.monotonic);
// Ensure capacity
if (tail >= self.buffer.items.len) {
const new_capacity = if (self.buffer.items.len == 0) 8 else self.buffer.items.len * 2;
try self.buffer.resize(new_capacity);
}
// Store item and update tail
self.buffer.items[tail] = item;
self.tail.store(tail + 1, .release);
}
pub fn pop(self: *Self) ?T {
while (true) {
const head = self.head.load(.acquire);
const tail = self.tail.load(.acquire);
if (head >= tail) {
return null;
}
const item = self.buffer.items[head];
if (self.head.cmpxchgStrong(head, head + 1, .acq_rel, .monotonic)) |_| {
continue;
}
return item;
}
}
pub fn isEmpty(self: *Self) bool {
const head = self.head.load(.acquire);
const tail = self.tail.load(.acquire);
return head >= tail;
}
};
}
const t = std.testing;
test "MPSC Queue basic operations" {
var queue = MPSCQueue(i32).init(t.allocator);
defer queue.deinit();
try queue.push(1);
try queue.push(2);
try queue.push(3);
try t.expect(queue.pop().? == 1);
try t.expect(queue.pop().? == 2);
try t.expect(queue.pop().? == 3);
try t.expect(queue.pop() == null);
}
const expected = [_]i32{ 1, 2, 3, 4, 5 };
fn threadOne(queue: *MPSCQueue(i32)) !void {
var i: usize = 0;
while (queue.pop()) |item| : (i += 1) {
try t.expectEqual(expected[i], item);
}
}
fn threadTwo(queue: *MPSCQueue(i32)) !void {
for (expected) |item| {
try queue.push(item);
}
}
test "MPSC Threaded" {
const Thread = std.Thread;
var q = MPSCQueue(i32).init(t.allocator);
defer q.deinit();
const t1 = try Thread.spawn(.{ .allocator = t.allocator }, threadOne, .{&q});
const t2 = try Thread.spawn(.{ .allocator = t.allocator }, threadTwo, .{&q});
t1.join();
t2.join();
}

View file

@ -1,6 +1,9 @@
const str = @import("./smartString.zig"); const str = @import("./smartString.zig");
pub const SmartString = str.SmartString; pub const SmartString = str.SmartString;
const queue = @import("./queue.zig");
pub const Queue = queue.Queue;
pub const niceTypeName = @import("./niceTypeName.zig").niceTypeName; pub const niceTypeName = @import("./niceTypeName.zig").niceTypeName;
comptime { comptime {
@ -9,5 +12,6 @@ comptime {
if (builtin.is_test) { if (builtin.is_test) {
std.mem.doNotOptimizeAway(str); std.mem.doNotOptimizeAway(str);
std.mem.doNotOptimizeAway(queue);
} }
} }