Skip to content

Commit

Permalink
🚧 init p2p tcp connections
Browse files Browse the repository at this point in the history
  • Loading branch information
AbdelStark committed Sep 3, 2024
1 parent 8798e20 commit 5afb2ca
Show file tree
Hide file tree
Showing 5 changed files with 292 additions and 19 deletions.
3 changes: 3 additions & 0 deletions src/config.zig
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ pub const Config = struct {
/// Data directory
datadir: []const u8,

seednode: []const u8,

/// Load the configuration from a file
///
/// # Arguments
Expand All @@ -44,6 +46,7 @@ pub const Config = struct {
.p2p_port = 8333,
.testnet = false,
.datadir = try allocator.dupe(u8, ".bitcoin"),
.seednode = "",
};

var buf: [1024]u8 = undefined;
Expand Down
1 change: 1 addition & 0 deletions src/mempool.zig
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ test "Mempool" {
.p2p_port = 8333,
.testnet = false,
.datadir = "/tmp/btczee",
.seednode = "",
};
var mempool = try Mempool.init(allocator, &config);
defer mempool.deinit();
Expand Down
97 changes: 78 additions & 19 deletions src/p2p.zig
Original file line number Diff line number Diff line change
@@ -1,44 +1,103 @@
const std = @import("std");
const net = std.net;
const Config = @import("config.zig").Config;
const Peer = @import("peer.zig").Peer;

/// P2P network handler.
///
/// The P2P network is responsible for handling the peer-to-peer network.
/// It is responsible for handling the network protocol, the block relay, and the node sync.
pub const P2P = struct {
allocator: std.mem.Allocator,
config: *const Config,
peers: std.ArrayList(*Peer),
listener: ?net.Server,

/// Initialize the P2P network
///
/// # Arguments
/// - `allocator`: Memory allocator
/// - `config`: Configuration
///
/// # Returns
/// - `P2P`: P2P network handler
pub fn init(allocator: std.mem.Allocator, config: *const Config) !P2P {
return P2P{
.allocator = allocator,
.config = config,
.peers = std.ArrayList(*Peer).init(allocator),
.listener = null,
};
}

/// Deinitialize the P2P network
///
/// # Arguments
/// - `self`: P2P network handler
pub fn deinit(self: *P2P) void {
// Clean up resources if needed
_ = self;
if (self.listener) |*l| l.deinit();
for (self.peers.items) |peer| {
peer.deinit();
}
self.peers.deinit();
}

/// Start the P2P network
///
/// # Arguments
/// - `self`: P2P network handler
pub fn start(self: *P2P) !void {
std.log.info("Starting P2P network on port {}", .{self.config.p2p_port});
// Implement P2P network initialization

// Initialize the listener
// const address = try net.Address.parseIp4("0.0.0.0", self.config.p2p_port);
// const stream = try net.tcpConnectToAddress(address);

// self.listener = net.Server{
// .listen_address = address,
// .stream = stream,
// };

// // Start accepting connections
// try self.acceptConnections();

// // Connect to seed nodes
// try self.connectToSeedNodes();
}

/// Accept incoming connections
fn acceptConnections(self: *P2P) !void {
while (true) {
const connection = self.listener.?.accept() catch |err| {
std.log.err("Failed to accept connection: {}", .{err});
continue;
};

// Handle the new connection in a separate thread
// TODO: Error handling
_ = try std.Thread.spawn(.{}, handleConnection, .{ self, connection });
}
}

/// Handle a new connection
fn handleConnection(self: *P2P, connection: net.Server.Connection) void {
const peer = Peer.init(self.allocator, connection) catch |err| {
std.log.err("Failed to initialize peer: {}", .{err});
connection.stream.close();
return;
};

self.peers.append(peer) catch |err| {
std.log.err("Failed to add peer: {}", .{err});
peer.deinit();
return;
};

peer.start() catch |err| {
std.log.err("Peer encountered an error: {}", .{err});
_ = self.peers.swapRemove(self.peers.items.len - 1);
peer.deinit();
};
}

/// Connect to seed nodes
fn connectToSeedNodes(self: *P2P) !void {
if (self.config.seednode.len == 0) {
return;
}

const address = try net.Address.parseIp4(self.config.seednode, 8333);
const stream = try net.tcpConnectToAddress(address);

const peer = try Peer.init(self.allocator, .{ .stream = stream, .address = address });
try self.peers.append(peer);

// Start the peer in a new thread
// TODO: Error handling
_ = try std.Thread.spawn(.{}, Peer.start, .{peer});
}
};
121 changes: 121 additions & 0 deletions src/peer.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
const std = @import("std");
const net = std.net;
const protocol = @import("protocol.zig");

/// Represents a peer connection in the Bitcoin network
pub const Peer = struct {
allocator: std.mem.Allocator,
stream: net.Stream,
address: net.Address,
version: ?protocol.VersionMessage,
last_seen: i64,
is_outbound: bool,

/// Initialize a new peer
pub fn init(allocator: std.mem.Allocator, connection: net.Server.Connection) !*Peer {
const peer = try allocator.create(Peer);
peer.* = .{
.allocator = allocator,
.stream = connection.stream,
.address = connection.address,
.version = null,
.last_seen = std.time.timestamp(),
.is_outbound = false,
};
return peer;
}

/// Clean up peer resources
pub fn deinit(self: *Peer) void {
self.stream.close();
self.allocator.destroy(self);
}

/// Start peer operations
pub fn start(self: *Peer) !void {
std.log.info("Starting peer connection with {}", .{self.address});

try self.sendVersionMessage();
try self.handleMessages();
}

/// Send version message to peer
fn sendVersionMessage(self: *Peer) !void {
const version_msg = protocol.VersionMessage{
.version = 70015,
.services = 1,
.timestamp = @intCast(std.time.timestamp()),
.addr_recv = protocol.NetworkAddress.init(self.address),
};

try self.sendMessage("version", version_msg);
}

/// Handle incoming messages from peer
fn handleMessages(self: *Peer) !void {
var buffer: [1024]u8 = undefined;

while (true) {
const bytes_read = try self.stream.read(&buffer);
if (bytes_read == 0) break; // Connection closed

// Mock message parsing
const message_type = self.parseMessageType(buffer[0..bytes_read]);
try self.handleMessage(message_type, buffer[0..bytes_read]);

self.last_seen = std.time.timestamp();
}
}

/// Mock function to parse message type
fn parseMessageType(self: *Peer, data: []const u8) []const u8 {
_ = self;
if (std.mem.startsWith(u8, data, "version")) {
return "version";
} else if (std.mem.startsWith(u8, data, "verack")) {
return "verack";
} else {
return "unknown";
}
}

/// Handle a specific message type
fn handleMessage(self: *Peer, message_type: []const u8, data: []const u8) !void {
if (std.mem.eql(u8, message_type, "version")) {
try self.handleVersionMessage(data);
} else if (std.mem.eql(u8, message_type, "verack")) {
try self.handleVerackMessage();
} else {
std.log.warn("Received unknown message type from peer", .{});
}
}

/// Handle version message
fn handleVersionMessage(self: *Peer, data: []const u8) !void {
_ = data; // In a real implementation, parse the version message

// Mock version message handling
self.version = protocol.VersionMessage{
.version = 70015,
.services = 1,
.timestamp = @intCast(std.time.timestamp()),
.addr_recv = protocol.NetworkAddress.init(self.address),
// ... other fields ...
};

try self.sendMessage("verack", {});
}

/// Handle verack message
fn handleVerackMessage(self: *Peer) !void {
std.log.info("Received verack from peer {}", .{self.address});
// In a real implementation, mark the connection as established
}

/// Send a message to the peer
fn sendMessage(self: *Peer, command: []const u8, message: anytype) !void {
_ = message;
// In a real implementation, serialize the message and send it
try self.stream.writer().print("{s}\n", .{command});
}
};
89 changes: 89 additions & 0 deletions src/protocol.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
const std = @import("std");
const net = std.net;

/// Protocol version
pub const PROTOCOL_VERSION: u32 = 70015;

/// Network services
pub const ServiceFlags = struct {
pub const NODE_NETWORK: u64 = 1;
pub const NODE_GETUTXO: u64 = 2;
pub const NODE_BLOOM: u64 = 4;
pub const NODE_WITNESS: u64 = 8;
pub const NODE_NETWORK_LIMITED: u64 = 1024;
};

/// Command string length
pub const COMMAND_SIZE: usize = 12;

/// Magic bytes for mainnet
pub const MAGIC_BYTES: [4]u8 = .{ 0xF9, 0xBE, 0xB4, 0xD9 };

/// NetworkAddress represents a network address
pub const NetworkAddress = struct {
services: u64,
ip: [16]u8,
port: u16,

pub fn init(address: net.Address) NetworkAddress {
const result = NetworkAddress{
.services = ServiceFlags.NODE_NETWORK,
.ip = [_]u8{0} ** 16,
.port = address.getPort(),
};
// TODO: Handle untagged union properly (for IPv6)

return result;
}
};

/// VersionMessage represents the "version" message
pub const VersionMessage = struct {
version: i32,
services: u64,
timestamp: i64,
addr_recv: NetworkAddress,
addr_from: NetworkAddress = .{
.services = 0,
.ip = [_]u8{0} ** 16,
.port = 0,
},
nonce: u64 = 0,
user_agent: []const u8 = "",
start_height: i32 = 0,
relay: bool = false,
};

/// Header structure for all messages
pub const MessageHeader = struct {
magic: [4]u8,
command: [COMMAND_SIZE]u8,
length: u32,
checksum: u32,
};

/// Serialize a message to bytes
pub fn serializeMessage(allocator: std.mem.Allocator, command: []const u8, payload: anytype) ![]u8 {
_ = allocator;
_ = command;
_ = payload;
// In a real implementation, this would serialize the message
// For now, we'll just return a mock serialized message
return "serialized message";
}

/// Deserialize bytes to a message
pub fn deserializeMessage(allocator: std.mem.Allocator, bytes: []const u8) !void {
_ = allocator;
_ = bytes;
// In a real implementation, this would deserialize the message
// For now, we'll just do nothing
}

/// Calculate checksum for a message
pub fn calculateChecksum(data: []const u8) u32 {
_ = data;
// In a real implementation, this would calculate the checksum
// For now, we'll just return a mock checksum
return 0x12345678;
}

0 comments on commit 5afb2ca

Please sign in to comment.