Skip to content

Commit

Permalink
Fake lightning wallet + mint core (#10)
Browse files Browse the repository at this point in the history
* fake lightning wallet
* nut04 mint resp
* some mint methods + types
  • Loading branch information
StringNick authored Sep 6, 2024
1 parent bc6171e commit 1074273
Show file tree
Hide file tree
Showing 9 changed files with 717 additions and 15 deletions.
9 changes: 9 additions & 0 deletions build.zig
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ pub fn build(b: *std.Build) !void {
// set a preferred release mode, allowing the user to decide how to optimize.
const optimize = b.standardOptimizeOption(.{});

// Channel module
const channel_m = b.addModule("channels", .{
.target = target,
.optimize = optimize,
.root_source_file = b.path("src/channels/channels.zig"),
});

// **************************************************************
// * HANDLE DEPENDENCY MODULES *
// **************************************************************
Expand Down Expand Up @@ -199,6 +206,8 @@ pub fn build(b: *std.Build) !void {
lib_unit_tests.root_module.addImport("bitcoin", bitcoin_zig.module("bitcoin"));
lib_unit_tests.root_module.addImport("base58", base58_module);

lib_unit_tests.root_module.addImport("channels", channel_m);

const run_lib_unit_tests = b.addRunArtifact(lib_unit_tests);

const test_step = b.step("test", "Run unit tests");
Expand Down
335 changes: 335 additions & 0 deletions src/channels/channels.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,335 @@
const std = @import("std");

const ChanError = error{
Closed,
OutOfMemory,
NotImplemented,
DataCorruption,
};

pub fn Chan(comptime T: type) type {
return BufferedChan(T, 0);
}

pub fn BufferedChan(comptime T: type, comptime bufSize: u8) type {
return struct {
const Self = @This();
const bufType = [bufSize]?T;
buf: bufType = [_]?T{null} ** bufSize,
closed: bool = false,
mut: std.Thread.Mutex = std.Thread.Mutex{},
alloc: std.mem.Allocator = undefined,
recvQ: std.ArrayList(*Receiver) = undefined,
sendQ: std.ArrayList(*Sender) = undefined,

// represents a thread waiting on recv
const Receiver = struct {
mut: std.Thread.Mutex = std.Thread.Mutex{},
cond: std.Thread.Condition = std.Thread.Condition{},
data: ?T = null,

fn putDataAndSignal(self: *@This(), data: T) void {
defer self.cond.signal();
self.data = data;
}
};

// represents a thread waiting on send
const Sender = struct {
mut: std.Thread.Mutex = std.Thread.Mutex{},
cond: std.Thread.Condition = std.Thread.Condition{},
data: T,

fn getDataAndSignal(self: *@This()) T {
defer self.cond.signal();
return self.data;
}
};

pub fn init(alloc: std.mem.Allocator) Self {
return Self{
.alloc = alloc,
.recvQ = std.ArrayList(*Receiver).init(alloc),
.sendQ = std.ArrayList(*Sender).init(alloc),
};
}

pub fn deinit(self: *Self) void {
self.recvQ.deinit();
self.sendQ.deinit();
}

pub fn close(self: *Self) void {
self.closed = true;
}

pub fn capacity(self: *Self) u8 {
return self.buf.len;
}

pub fn debugBuf(self: *Self) void {
std.debug.print("{d} Buffer debug\n", .{std.time.milliTimestamp()});
for (self.buf, 0..) |item, i| {
if (item) |unwrapped| {
std.debug.print("[{d}] = {d}\n", .{ i, unwrapped });
}
}
}

pub fn len(self: *Self) u8 {
var i: u8 = 0;
for (self.buf) |item| {
if (item) |_| {
i += 1;
} else {
break;
}
}
return i;
}

pub fn send(self: *Self, data: T) ChanError!void {
if (self.closed) return ChanError.Closed;

self.mut.lock();
errdefer self.mut.unlock();

// case: receiver already waiting
// pull receiver (if any) and give it data. Signal receiver that it's done waiting.
if (self.recvQ.items.len > 0) {
defer self.mut.unlock();
var receiver: *Receiver = self.recvQ.orderedRemove(0);
receiver.putDataAndSignal(data);
return;
}

// case: room in buffer
const l = self.len();
if (l < self.capacity() and bufSize > 0) {
defer self.mut.unlock();

// insert into first null spot in buffer
self.buf[l] = data;
return;
}

// hold on sender queue. Receivers will signal when they take data.
var sender = Sender{ .data = data };

// prime condition
sender.mut.lock(); // cond.wait below will unlock it and wait until signal, then relock it
defer sender.mut.unlock(); // unlocks the relock

try self.sendQ.append(&sender); // make visible to other threads
self.mut.unlock(); // allow all other threads to proceed. This thread is done reading/writing

// now just wait for receiver to signal sender
sender.cond.wait(&sender.mut);
return;
}

pub fn recv(self: *Self) ChanError!T {
if (self.closed) return ChanError.Closed;
self.mut.lock();
errdefer self.mut.unlock();

// case: value in buffer
const l = self.len();
if (l > 0 and bufSize > 0) {
defer self.mut.unlock();
const val = self.buf[0] orelse return ChanError.DataCorruption;

// advance items in buffer
if (l > 1) {
for (self.buf[1..l], 0..l - 1) |item, i| {
self.buf[i] = item;
}
}
self.buf[l - 1] = null;

// top up buffer with a waiting sender, if any
if (self.sendQ.items.len > 0) {
var sender: *Sender = self.sendQ.orderedRemove(0);
const valFromSender: T = sender.getDataAndSignal();
self.buf[l - 1] = valFromSender;
}

return val;
}

// case: sender already waiting
// pull sender and take its data. Signal sender that it's done waiting.
if (self.sendQ.items.len > 0) {
defer self.mut.unlock();
var sender: *Sender = self.sendQ.orderedRemove(0);
const data: T = sender.getDataAndSignal();
return data;
}

// hold on receiver queue. Senders will signal when they take it.
var receiver = Receiver{};

// prime condition
receiver.mut.lock();
defer receiver.mut.unlock();

try self.recvQ.append(&receiver);
self.mut.unlock();

// now wait for sender to signal receiver
receiver.cond.wait(&receiver.mut);
// sender should have put data in .data
if (receiver.data) |data| {
return data;
} else {
return ChanError.DataCorruption;
}
}
};
}

test "unbufferedChan" {
// create channel of u8
const T = Chan(u8);
var chan = T.init(std.testing.allocator);
defer chan.deinit();

// spawn thread that immediately waits on channel
const thread = struct {
fn func(c: *T) !void {
const val = try c.recv();
std.debug.print("{d} Thread Received {d}\n", .{ std.time.milliTimestamp(), val });
}
};
const t = try std.Thread.spawn(.{}, thread.func, .{&chan});
defer t.join();

// let thread wait a bit before sending value
std.time.sleep(1_000_000_000);

const val: u8 = 10;
std.debug.print("{d} Main Sending {d}\n", .{ std.time.milliTimestamp(), val });
try chan.send(val);
}

test "bidirectional unbufferedChan" {
std.debug.print("\n", .{});

const T = Chan(u8);
var chan = T.init(std.testing.allocator);
defer chan.deinit();

const thread = struct {
fn func(c: *T) !void {
std.time.sleep(2_000_000_000);
const val = try c.recv();
std.debug.print("{d} Thread Received {d}\n", .{ std.time.milliTimestamp(), val });
std.time.sleep(1_000_000_000);
std.debug.print("{d} Thread Sending {d}\n", .{ std.time.milliTimestamp(), val + 1 });
try c.send(val + 1);
std.time.sleep(2_000_000_000);
std.debug.print("{d} Thread Sending {d}\n", .{ std.time.milliTimestamp(), val + 100 });
try c.send(val + 100);
std.debug.print("{d} Thread Exit\n", .{std.time.milliTimestamp()});
}
};

const t = try std.Thread.spawn(.{}, thread.func, .{&chan});
defer t.join();

std.time.sleep(1_000_000_000);
var val: u8 = 10;
std.debug.print("{d} Main Sending {d}\n", .{ std.time.milliTimestamp(), val });
try chan.send(val);
val = try chan.recv();
std.debug.print("{d} Main Received {d}\n", .{ std.time.milliTimestamp(), val });
val = try chan.recv();
std.debug.print("{d} Main Received {d}\n", .{ std.time.milliTimestamp(), val });
}

test "buffered Chan" {
std.debug.print("\n", .{});

const T = BufferedChan(u8, 3);
var chan = T.init(std.testing.allocator);
defer chan.deinit();

const thread = struct {
fn func(c: *T) !void {
std.time.sleep(2_000_000_000);
std.debug.print("{d} Thread Receiving\n", .{std.time.milliTimestamp()});
var val = try c.recv();
std.debug.print("{d} Thread Received {d}\n", .{ std.time.milliTimestamp(), val });
std.time.sleep(1_000_000_000);
std.debug.print("{d} Thread Receiving\n", .{std.time.milliTimestamp()});
val = try c.recv();
std.debug.print("{d} Thread Received {d}\n", .{ std.time.milliTimestamp(), val });
std.time.sleep(1_000_000_000);
std.debug.print("{d} Thread Receiving\n", .{std.time.milliTimestamp()});
val = try c.recv();
std.debug.print("{d} Thread Received {d}\n", .{ std.time.milliTimestamp(), val });
std.time.sleep(1_000_000_000);
std.debug.print("{d} Thread Receiving\n", .{std.time.milliTimestamp()});
val = try c.recv();
std.debug.print("{d} Thread Received {d}\n", .{ std.time.milliTimestamp(), val });
}
};

const t = try std.Thread.spawn(.{}, thread.func, .{&chan});
defer t.join();

std.time.sleep(1_000_000_000);
var val: u8 = 10;
std.debug.print("{d} Main Sending {d}\n", .{ std.time.milliTimestamp(), val });
try chan.send(val);
std.debug.print("{d} Main Sent {d}\n", .{ std.time.milliTimestamp(), val });

val = 11;
std.debug.print("{d} Main Sending {d}\n", .{ std.time.milliTimestamp(), val });
try chan.send(val);
std.debug.print("{d} Main Sent {d}\n", .{ std.time.milliTimestamp(), val });

val = 12;
std.debug.print("{d} Main Sending {d}\n", .{ std.time.milliTimestamp(), val });
try chan.send(val);
std.debug.print("{d} Main Sent {d}\n", .{ std.time.milliTimestamp(), val });

val = 13;
std.debug.print("{d} Main Sending {d}\n", .{ std.time.milliTimestamp(), val });
try chan.send(val);
std.debug.print("{d} Main Sent {d}\n", .{ std.time.milliTimestamp(), val });
}

test "chan of chan" {
std.debug.print("\n", .{});

const T = BufferedChan(u8, 3);
const TofT = Chan(T);
var chanOfChan = TofT.init(std.testing.allocator);
defer chanOfChan.deinit();

const thread = struct {
fn func(cOC: *TofT) !void {
std.time.sleep(2_000_000_000);
std.debug.print("{d} Thread Receiving\n", .{std.time.milliTimestamp()});
var c = try cOC.recv();
std.debug.print("{d} Thread Received chan of chan: {any}\n", .{ std.time.milliTimestamp(), cOC });
std.debug.print("{d} Thread pulling from chan buffer\n", .{std.time.milliTimestamp()});
var val = try c.recv(); // should have value on buffer
std.debug.print("{d} Thread received from chan: {d}\n", .{ std.time.milliTimestamp(), val });
}
};

const t = try std.Thread.spawn(.{}, thread.func, .{&chanOfChan});
defer t.join();

std.time.sleep(1_000_000_000);
var val: u8 = 10;
var chan = T.init(std.testing.allocator);
defer chan.deinit();
std.debug.print("{d} Main sending u8 to chan {d}\n", .{ std.time.milliTimestamp(), val });
try chan.send(val);

std.debug.print("{d} Main sending chan across chanOfChan\n", .{std.time.milliTimestamp()});
try chanOfChan.send(chan);
}
12 changes: 12 additions & 0 deletions src/core/lightning/lightning.zig
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Mint Lightning
const root = @import("../../lib.zig");
const std = @import("std");

const Bolt11Invoice = root.lightning_invoices.Bolt11Invoice;
const Amount = root.core.amount.Amount;
const MeltQuoteState = root.core.nuts.nut05.QuoteState;
Expand All @@ -25,6 +27,12 @@ pub const PayInvoiceResponse = struct {
status: MeltQuoteState,
/// Totoal Amount Spent
total_spent: Amount,

pub fn deinit(self: PayInvoiceResponse, allocator: std.mem.Allocator) void {
allocator.free(self.payment_hash);

if (self.payment_preimage) |p| allocator.free(p);
}
};

/// Payment quote response
Expand All @@ -35,6 +43,10 @@ pub const PaymentQuoteResponse = struct {
amount: Amount,
/// Fee required for melt
fee: u64,

pub fn deinit(self: PaymentQuoteResponse, allocator: std.mem.Allocator) void {
allocator.free(self.request_lookup_id);
}
};

/// Ln backend settings
Expand Down
Loading

0 comments on commit 1074273

Please sign in to comment.