Skip to content

Commit e65c9dc

Browse files
authored
Merge pull request #150 from Raiden1411/async_adventures
2 parents f5a6535 + be88383 commit e65c9dc

File tree

13 files changed

+1145
-126
lines changed

13 files changed

+1145
-126
lines changed

build.zig

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,8 +373,16 @@ fn addDependencies(
373373
.optimize = optimize,
374374
});
375375

376+
const aio = b.dependency("aio", .{
377+
.target = target,
378+
.optimize = optimize,
379+
});
380+
376381
mod.addImport("c_kzg_4844", c_kzg_4844_dep.module("c_kzg_4844"));
377382
mod.linkLibrary(c_kzg_4844_dep.artifact("c_kzg_4844"));
383+
384+
mod.addImport("aio", aio.module("aio"));
385+
mod.addImport("coro", aio.module("coro"));
378386
}
379387
/// Builds and runs the benchmarks
380388
fn buildBenchmark(

build.zig.zon

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@
1313
.c_kzg_4844 = .{
1414
.path = "./pkg/c-kzg-4844",
1515
},
16+
.aio = .{
17+
.url = "git+https://github.com/Cloudef/zig-aio.git#8d0507183396220a77f36e5182e5c91995d89449",
18+
.hash = "aio-0.0.0-776t3n5WBQASyyJ0vCpZxIX7U6NTVEvaWVf1-9sjcVYa",
19+
},
1620
},
1721
.paths = .{
1822
// This makes *all* files, recursively, included in this package. It is generally

examples/autobahn/autobahn.zig

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ const std = @import("std");
22
const clients = @import("zabi").clients;
33

44
const Allocator = std.mem.Allocator;
5-
const WebSocketClient = clients.WebSocketClient;
5+
const WebSocketClient = clients.blocking.WebSocketClient;
6+
const AsyncWebSocketClient = clients.non_blocking.AsyncWebSocketClient;
67

78
pub fn main() !void {
89
var gpa = std.heap.GeneralPurposeAllocator(.{}){};

src/clients/IPC.zig

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ const FeeHistory = transaction.FeeHistory;
4747
const Gwei = types.Gwei;
4848
const Hash = types.Hash;
4949
const Hex = types.Hex;
50-
const IpcReader = @import("ipc_reader.zig").IpcReader;
50+
const IpcReader = @import("blocking/IpcReader.zig");
5151
const JsonParsed = std.json.Parsed;
5252
const Log = log.Log;
5353
const LogRequest = log.LogRequest;

src/clients/WebSocket.zig

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ const Uri = std.Uri;
7979
const Value = std.json.Value;
8080
const WatchLogsRequest = log.WatchLogsRequest;
8181
const Wei = types.Wei;
82-
const WsClient = @import("WebSocketClient.zig");
82+
const WsClient = @import("blocking/WebSocketClient.zig");
8383

8484
const WebSocketHandler = @This();
8585

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
//! Socket reader that is expected to be reading socket messages that are json messages.
2+
//! Will only allocate more memory if required.
3+
//!
4+
//! Calling `deinit` will close the socket and clear the buffer.
5+
const aio = @import("aio");
6+
const coro = @import("coro");
7+
const std = @import("std");
8+
const testing = std.testing;
9+
10+
const Allocator = std.mem.Allocator;
11+
const Stream = std.net.Stream;
12+
const Buffer = std.fifo.LinearFifo(u8, .Dynamic);
13+
14+
const Self = @This();
15+
16+
/// Set of possible errors when reading from the socket.
17+
pub const ReadError = aio.Recv.Error || coro.io.Error ||
18+
Allocator.Error || error{Closed};
19+
20+
/// Set of possible error when writting to the stream.
21+
pub const WriteError = coro.io.Error || aio.Send.Error;
22+
23+
pub const InitError = coro.io.Error || aio.Connect.Error ||
24+
aio.Socket.Error || Allocator.Error;
25+
26+
/// LinearFifo that grows as needed.
27+
buffer: Buffer,
28+
/// Socket stream to read from the unix socket.
29+
fd: std.posix.socket_t,
30+
/// Amount of bytes to discard on a successfull read.
31+
overflow: usize,
32+
/// The tell reader if the stream is closed.
33+
closed: bool,
34+
35+
/// Sets the initial reader state in order to perform any necessary actions.
36+
///
37+
/// **Example**
38+
/// ```zig
39+
/// const stream = std.net.connectUnixSocket("/tmp/tmp.socket");
40+
///
41+
/// const ipc_reader = try AsyncIpcReader.init(std.heap.page_allocator, stream);
42+
/// defer ipc_reader.deinit();
43+
/// ```
44+
pub fn init(allocator: Allocator, path: []const u8) !@This() {
45+
var socket: std.posix.socket_t = undefined;
46+
47+
try coro.io.single(.socket, .{
48+
.domain = std.posix.AF.UNIX,
49+
.flags = std.posix.SOCK.STREAM | std.posix.SOCK.CLOEXEC,
50+
.protocol = std.posix.IPPROTO.IP,
51+
.out_socket = &socket,
52+
});
53+
54+
var addr = try std.net.Address.initUnix(path);
55+
try coro.io.single(.connect, .{
56+
.socket = socket,
57+
.addr = &addr.any,
58+
.addrlen = addr.getOsSockLen(),
59+
});
60+
61+
return .{
62+
.buffer = Buffer.init(allocator),
63+
.fd = socket,
64+
.overflow = 0,
65+
.closed = false,
66+
};
67+
}
68+
/// Frees the buffer and closes the stream.
69+
pub fn deinit(self: *@This()) void {
70+
if (@atomicRmw(bool, &self.closed, .Xchg, true, .acq_rel) == false) {
71+
self.buffer.deinit();
72+
coro.io.single(.close_socket, .{ .socket = self.fd }) catch {};
73+
}
74+
}
75+
/// "Reads" a json message and moves the necessary position members in order
76+
/// to have the necessary message.
77+
pub fn jsonMessage(self: *@This()) usize {
78+
self.buffer.discard(self.overflow);
79+
self.buffer.realign();
80+
81+
if (self.buffer.count <= 1) {
82+
self.overflow = 0;
83+
return 0;
84+
}
85+
86+
var depth: usize = 0;
87+
var index: usize = 0;
88+
while (index < self.buffer.buf.len) : (index += 1) {
89+
switch (self.buffer.buf[index]) {
90+
'{' => depth += 1,
91+
'}' => depth -= 1,
92+
else => {},
93+
}
94+
95+
// Check if we read a message or not.
96+
if (depth == 0) {
97+
self.overflow = index + 1;
98+
return index + 1;
99+
}
100+
}
101+
102+
self.overflow = 0;
103+
return 0;
104+
}
105+
/// Reads the bytes directly from the socket. Will allocate more memory as needed.
106+
pub fn read(self: *@This()) ReadError!void {
107+
const buffer = try self.buffer.writableWithSize(std.math.maxInt(u16));
108+
var read_bytes: usize = 0;
109+
110+
try coro.io.single(.recv, .{
111+
.socket = self.fd,
112+
.buffer = buffer,
113+
.out_read = &read_bytes,
114+
});
115+
116+
if (read_bytes == 0)
117+
return error.Closed;
118+
119+
self.buffer.update(read_bytes);
120+
}
121+
/// Reads one message from the socket stream.
122+
///
123+
/// Will only make the socket read request if the buffer is at max capacity.
124+
/// Will grow the buffer as needed.
125+
pub fn readMessage(self: *@This()) ReadError![]u8 {
126+
while (true) {
127+
const size = self.jsonMessage();
128+
129+
if (size == 0) {
130+
try self.read();
131+
continue;
132+
}
133+
134+
return @constCast(self.buffer.readableSliceOfLen(size));
135+
}
136+
}
137+
/// Writes a message to the socket stream.
138+
pub fn writeMessage(
139+
self: *@This(),
140+
message: []u8,
141+
) WriteError!void {
142+
return coro.io.single(.send, .{
143+
.socket = self.fd,
144+
.buffer = message,
145+
});
146+
}

0 commit comments

Comments
 (0)