Skip to content

Commit

Permalink
Merge pull request #8 from OpenArchive/feature/dht-schema-group-methods
Browse files Browse the repository at this point in the history
Store route blob ID on the DHT and manage route IDs
  • Loading branch information
RangerMauve authored Oct 1, 2024
2 parents c3b0549 + 071b406 commit 2652c55
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 27 deletions.
56 changes: 38 additions & 18 deletions src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,20 @@ use std::mem;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::fs;
use tokio::sync::{mpsc, oneshot};
use tokio::sync::{mpsc::{self, Receiver}, oneshot, broadcast};
use tracing::info;
use veilid_core::{
api_startup_config, vld0_generate_keypair, CryptoKey, CryptoSystem, CryptoSystemVLD0,
CryptoTyped, DHTSchema, KeyPair, ProtectedStore, RoutingContext, SharedSecret, UpdateCallback,
VeilidAPI, VeilidConfigInner, VeilidUpdate, CRYPTO_KIND_VLD0, TypedKey
VeilidAPI, VeilidConfigInner, VeilidUpdate, CRYPTO_KIND_VLD0, TypedKey, VeilidConfigProtectedStore
};
use xdg::BaseDirectories;

pub struct Backend {
path: PathBuf,
port: u16,
veilid_api: Option<VeilidAPI>,
update_rx: Option<broadcast::Receiver<VeilidUpdate>>,
groups: HashMap<CryptoKey, Box<Group>>,
repos: HashMap<CryptoKey, Box<Repo>>,
}
Expand All @@ -31,6 +32,7 @@ impl Backend {
path: base_path.to_path_buf(),
port,
veilid_api: None,
update_rx: None,
groups: HashMap::new(),
repos: HashMap::new(),
})
Expand All @@ -51,22 +53,15 @@ impl Backend {
)
})?;

let (tx, mut rx) = mpsc::channel(1);
let (update_tx, update_rx) = broadcast::channel::<VeilidUpdate>(32);

let update_callback: UpdateCallback = Arc::new(move |update| {
// Else handle update for something
// info!("Received update: {:?}", update);
if let VeilidUpdate::Attachment(attachment_state) = &update {
if attachment_state.public_internet_ready {
println!("Public internet ready!");
let tx = tx.clone();
tokio::spawn(async move {
if tx.send(()).await.is_err() {
println!("receiver dropped");
}
});
let update_tx = update_tx.clone();
tokio::spawn(async move {
if let Err(e) = update_tx.send(update) {
println!("Failed to send update: {}", e);
}
}
});
});

let xdg_dirs = BaseDirectories::with_prefix("save-dweb-backend")?;
Expand Down Expand Up @@ -111,7 +106,12 @@ impl Backend {

println!("Waiting for network ready state");

rx.recv().await.expect("Unable to wait for veilid init");
self.update_rx = Some(update_rx);

// Wait for network ready state
if let Some(rx) = &self.update_rx {
self.wait_for_network(rx.resubscribe()).await?;
}

Ok(())
}
Expand All @@ -129,13 +129,25 @@ impl Backend {
Ok(())
}

async fn wait_for_network(&self, mut update_rx: broadcast::Receiver<VeilidUpdate>) -> Result<()> {
while let Ok(update) = update_rx.recv().await {
if let VeilidUpdate::Attachment(attachment_state) = update {
if attachment_state.public_internet_ready {
println!("Public internet ready!");
break;
}
}
}
Ok(())
}

pub async fn create_group(&mut self) -> Result<Group> {
let veilid = self
.veilid_api
.as_ref()
.ok_or_else(|| anyhow!("Veilid API is not initialized"))?;
let routing_context = veilid.routing_context()?;
let schema = DHTSchema::dflt(1)?;
let schema = DHTSchema::dflt(3)?;
let kind = Some(CRYPTO_KIND_VLD0);

let dht_record = routing_context.create_dht_record(schema, kind).await?;
Expand Down Expand Up @@ -234,7 +246,7 @@ impl Backend {
.as_ref()
.ok_or_else(|| anyhow!("Veilid API is not initialized"))?;
let routing_context = veilid.routing_context()?;
let schema = DHTSchema::dflt(1)?;
let schema = DHTSchema::dflt(3)?;
let kind = Some(CRYPTO_KIND_VLD0);

let dht_record = routing_context.create_dht_record(schema, kind).await?;
Expand Down Expand Up @@ -305,4 +317,12 @@ impl Backend {

Ok(Box::new(repo))
}

pub fn subscribe_updates(&self) -> Option<broadcast::Receiver<VeilidUpdate>> {
self.update_rx.as_ref().map(|rx| rx.resubscribe())
}

pub fn get_veilid_api(&self) -> Option<&VeilidAPI> {
self.veilid_api.as_ref()
}
}
69 changes: 68 additions & 1 deletion src/common.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
#![allow(async_fn_in_trait)]
#![allow(clippy::async_yields_async)]

use crate::constants::ROUTE_ID_DHT_KEY;
use serde::{Serialize, Deserialize};
use eyre::{Result, anyhow};
use std::sync::Arc;
use veilid_core::{
CryptoKey, SharedSecret, CryptoTyped, DHTRecordDescriptor, RoutingContext, CryptoSystemVLD0,
ProtectedStore, Nonce, CRYPTO_KIND_VLD0, CryptoSystem
ProtectedStore, Nonce, CRYPTO_KIND_VLD0, CryptoSystem, KeyPair, VeilidAPI
};

#[derive(Serialize, Deserialize)]
Expand Down Expand Up @@ -40,6 +41,16 @@ pub trait DHTEntity {
fn get_dht_record(&self) -> DHTRecordDescriptor;
fn get_secret_key(&self) -> Option<CryptoKey>;

// Default method to get the owner key
fn owner_key(&self) -> CryptoKey {
self.get_dht_record().owner().clone()
}

// Default method to get the owner secret
fn owner_secret(&self) -> Option<CryptoKey> {
self.get_dht_record().owner_secret().cloned()
}

fn encrypt_aead(&self, data: &[u8], associated_data: Option<&[u8]>) -> Result<Vec<u8>> {
let nonce = self.get_crypto_system().random_nonce();
let mut buffer = Vec::with_capacity(nonce.as_slice().len() + data.len());
Expand Down Expand Up @@ -90,6 +101,62 @@ pub trait DHTEntity {
Ok(())
}

async fn store_route_id_in_dht(
&self,
route_id_blob: Vec<u8>,
) -> Result<()> {
let routing_context = &self.get_routing_context();
let dht_record = self.get_dht_record();
routing_context.set_dht_value(
dht_record.key().clone(),
ROUTE_ID_DHT_KEY,
route_id_blob,
None,
)
.await
.map_err(|e| anyhow!("Failed to store route ID blob in DHT: {}", e))?;

Ok(())
}

async fn get_route_id_from_dht(&self, subkey: u32) -> Result<Vec<u8>> {
let routing_context = &self.get_routing_context();

// Use the existing DHT record
let dht_record = self.get_dht_record();

// Get the stored route ID blob at subkey
let stored_blob = routing_context
.get_dht_value(dht_record.key().clone(), ROUTE_ID_DHT_KEY, false)
.await?
.ok_or_else(|| anyhow!("Route ID blob not found in DHT"))?;

Ok(stored_blob.data().to_vec())
}


// Send an AppMessage to the repo owner using the stored route ID blob
async fn send_message_to_owner(
&self,
veilid: &VeilidAPI,
message: Vec<u8>,
subkey: u32,
) -> Result<()> {
let routing_context = &self.get_routing_context();

// Retrieve the route ID blob from DHT
let route_id_blob = self.get_route_id_from_dht(subkey).await?;

// Import the route using the blob via VeilidAPI
let route_id = veilid.import_remote_private_route(route_id_blob)?;

// Send an AppMessage to the repo owner using the imported route ID
routing_context
.app_message(veilid_core::Target::PrivateRoute(route_id), message)
.await?;

Ok(())
}

fn get_write_key(&self) -> Option<CryptoKey> {
unimplemented!("WIP")
Expand Down
3 changes: 2 additions & 1 deletion src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ pub const TEST_GROUP_NAME: &str = "Test Group";
pub const UNABLE_TO_STORE_KEYPAIR: &str = "Unable to store keypair";
pub const FAILED_TO_LOAD_KEYPAIR: &str = "Failed to load keypair";
pub const KEYPAIR_NOT_FOUND: &str = "Keypair not found";
pub const FAILED_TO_DESERIALIZE_KEYPAIR: &str = "Failed to deserialize keypair";
pub const FAILED_TO_DESERIALIZE_KEYPAIR: &str = "Failed to deserialize keypair";
pub const ROUTE_ID_DHT_KEY: u32 = 2;
1 change: 1 addition & 0 deletions src/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ impl Group {
Err(anyhow!("Repo not found"))
}
}

}

impl DHTEntity for Group {
Expand Down
64 changes: 57 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,19 @@ pub mod backend;
pub mod common;
pub mod constants;

use crate::constants::{GROUP_NOT_FOUND, UNABLE_TO_SET_GROUP_NAME, UNABLE_TO_GET_GROUP_NAME, TEST_GROUP_NAME, UNABLE_TO_STORE_KEYPAIR, FAILED_TO_LOAD_KEYPAIR, KEYPAIR_NOT_FOUND, FAILED_TO_DESERIALIZE_KEYPAIR};
use crate::constants::{GROUP_NOT_FOUND, UNABLE_TO_SET_GROUP_NAME, UNABLE_TO_GET_GROUP_NAME, TEST_GROUP_NAME, UNABLE_TO_STORE_KEYPAIR, FAILED_TO_LOAD_KEYPAIR, KEYPAIR_NOT_FOUND, FAILED_TO_DESERIALIZE_KEYPAIR, ROUTE_ID_DHT_KEY};

use crate::backend::Backend;
use crate::common::{CommonKeypair, DHTEntity};
use veilid_core::{
vld0_generate_keypair, TypedKey, CRYPTO_KIND_VLD0
vld0_generate_keypair, TypedKey, CRYPTO_KIND_VLD0, VeilidUpdate, VALID_CRYPTO_KINDS
};

#[cfg(test)]
mod tests {
use super::*;
use tokio::fs;
use tokio::sync::mpsc;
use tmpdir::TmpDir;

#[tokio::test]
Expand All @@ -37,7 +38,7 @@ mod tests {
backend.stop().await.expect("Unable to stop");

backend.start().await.expect("Unable to restart");
let loaded_group = backend.get_group(TypedKey::new(CRYPTO_KIND_VLD0, group.id())).await.expect(GROUP_NOT_FOUND);
let mut loaded_group = backend.get_group(TypedKey::new(CRYPTO_KIND_VLD0, group.id())).await.expect(GROUP_NOT_FOUND);

let protected_store = backend.get_protected_store().unwrap();
let keypair_data = protected_store.load_user_secret(group.id().to_string())
Expand All @@ -48,16 +49,14 @@ mod tests {

// Check that the id matches group.id()
assert_eq!(retrieved_keypair.id, group.id());

// Check that the public_key matches the owner public key from the DHT record
assert_eq!(retrieved_keypair.public_key, loaded_group.get_dht_record().owner().clone());

// Check that the secret and encryption keys match
assert_eq!(retrieved_keypair.secret_key, group.get_secret_key());
assert_eq!(retrieved_keypair.encryption_key, group.get_encryption_key());

let mut loaded_group = backend.get_group(TypedKey::new(CRYPTO_KIND_VLD0, group.id())).await.expect(GROUP_NOT_FOUND);

// Check if we can get group name
let group_name = loaded_group.get_name().await.expect(UNABLE_TO_GET_GROUP_NAME);
assert_eq!(group_name, TEST_GROUP_NAME);
Expand All @@ -76,7 +75,7 @@ mod tests {
assert_eq!(name, repo_name);

// Add repo to group
loaded_group.add_repo(repo).await.expect("Unable to add repo to group");
loaded_group.add_repo(repo.clone()).await.expect("Unable to add repo to group");

// List known repos
let repos = loaded_group.list_repos().await;
Expand All @@ -89,6 +88,57 @@ mod tests {
let retrieved_name = loaded_repo.get_name().await.expect("Unable to get repo name after restart");
assert_eq!(retrieved_name, repo_name);

// Get the update receiver from the backend
let update_rx = backend.subscribe_updates().expect("Failed to subscribe to updates");

// Set up a channel to receive AppMessage updates
let (message_tx, mut message_rx) = mpsc::channel(1);

// Spawn a task to listen for updates
tokio::spawn(async move {
let mut rx = update_rx.resubscribe();
while let Ok(update) = rx.recv().await {
if let VeilidUpdate::AppMessage(app_message) = update {
// Optionally, filter by route_id or other criteria
message_tx.send(app_message).await.unwrap();
}
}
});

// Get VeilidAPI instance from backend
let veilid_api = backend.get_veilid_api().expect("Failed to get VeilidAPI instance");

// Create a new private route
let (route_id, route_id_blob) = veilid_api
.new_custom_private_route(
&VALID_CRYPTO_KINDS,
veilid_core::Stability::Reliable,
veilid_core::Sequencing::PreferOrdered,
)
.await
.expect("Failed to create route");

// Store the route_id_blob in DHT
loaded_repo
.store_route_id_in_dht(route_id_blob.clone())
.await
.expect("Failed to store route ID blob in DHT");

// Define the message to send
let message = b"Test Message to Repo Owner".to_vec();

// Send the message
loaded_repo
.send_message_to_owner(veilid_api, message.clone(), ROUTE_ID_DHT_KEY)
.await
.expect("Failed to send message to repo owner");

// Receive the message
let received_app_message = message_rx.recv().await.expect("Failed to receive message");

// Verify the message
assert_eq!(received_app_message.message(), message.as_slice());

backend.stop().await.expect("Unable to stop");
}

Expand Down

0 comments on commit 2652c55

Please sign in to comment.