Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use user's relay list #346

Draft
wants to merge 11 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions .rustfmt.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
edition = "2021"
46 changes: 24 additions & 22 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ serde_derive = "1"
serde = { version = "1", features = ["derive"] } # You only need this if you want app persistence
tracing = "0.1.40"
#wasm-bindgen = "0.2.83"
nostrdb = { git = "https://github.com/damus-io/nostrdb-rs", rev = "9bbafd8a2e904b77a51e7cfca71eb5bb5650e829" }
#nostrdb = { git = "https://github.com/damus-io/nostrdb-rs", rev = "9bbafd8a2e904b77a51e7cfca71eb5bb5650e829" }
#nostrdb = { path = "/Users/jb55/dev/github/damus-io/nostrdb-rs" }
nostrdb = { path = "../nostrdb-rs" }
#nostrdb = { git = "https://github.com/ksedgwic/nostrdb-rs.git", rev = "3c3ac0cba199a2594cdf6785209bfdcc52ec64c4" }
#nostrdb = "0.3.4"
enostr = { path = "enostr" }
serde_json = "1.0.89"
Expand All @@ -43,7 +45,8 @@ strum_macros = "0.26"
bitflags = "2.5.0"
uuid = { version = "1.10.0", features = ["v4"] }
indexmap = "2.6.0"

futures = "0.3.31"
once_cell = "1.20.0"

[target.'cfg(target_os = "macos")'.dependencies]
security-framework = "2.11.0"
Expand Down
5 changes: 4 additions & 1 deletion enostr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ serde_derive = "1"
serde = { version = "1", features = ["derive"] } # You only need this if you want app persistence
serde_json = "1.0.89"
nostr = { version = "0.30.0" }
nostrdb = { git = "https://github.com/damus-io/nostrdb-rs", rev = "9bbafd8a2e904b77a51e7cfca71eb5bb5650e829" }
#nostrdb = { git = "https://github.com/damus-io/nostrdb-rs", rev = "9bbafd8a2e904b77a51e7cfca71eb5bb5650e829" }
nostrdb = { path = "/home/user/bonsai/nostrdb-rs" }
#nostrdb = { git = "https://github.com/ksedgwic/nostrdb-rs.git", rev = "3c3ac0cba199a2594cdf6785209bfdcc52ec64c4" }
hex = "0.4.3"
tracing = "0.1.40"
env_logger = "0.11.1"
url = "2.5.2"
12 changes: 11 additions & 1 deletion enostr/src/relay/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,17 @@ impl<'a> RelayMessage<'a> {
// Event
// Relay response format: ["EVENT", <subscription id>, <event JSON>]
if &msg[0..=7] == "[\"EVENT\"" {
return Ok(Self::event(msg, "fixme"));
let mut start = 9;
while let Some(&b' ') = msg.as_bytes().get(start) {
start += 1; // Move past optional spaces
}
if let Some(comma_index) = msg[start..].find(',') {
let subid_end = start + comma_index;
let subid = &msg[start..subid_end].trim().trim_matches('"');
return Ok(Self::event(msg, subid));
} else {
return Ok(Self::event(msg, "fixme"));
}
}

// EOSE (NIP-15)
Expand Down
74 changes: 72 additions & 2 deletions enostr/src/relay/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@ use crate::relay::{Relay, RelayStatus};
use crate::{ClientMessage, Result};
use nostrdb::Filter;

use std::collections::HashMap;
use std::collections::HashSet;
use std::time::{Duration, Instant};

use url::Url;

#[cfg(not(target_arch = "wasm32"))]
use ewebsock::{WsEvent, WsMessage};

#[cfg(not(target_arch = "wasm32"))]
use tracing::{debug, error};
use tracing::{debug, error, info};

#[derive(Debug)]
pub struct PoolEvent<'a> {
Expand Down Expand Up @@ -40,6 +44,7 @@ impl PoolRelay {

pub struct RelayPool {
pub relays: Vec<PoolRelay>,
pub subs: HashMap<String, Vec<Filter>>,
pub ping_rate: Duration,
}

Expand All @@ -54,6 +59,7 @@ impl RelayPool {
pub fn new() -> RelayPool {
RelayPool {
relays: vec![],
subs: HashMap::new(),
ping_rate: Duration::from_secs(25),
}
}
Expand Down Expand Up @@ -83,9 +89,11 @@ impl RelayPool {
for relay in &mut self.relays {
relay.relay.send(&ClientMessage::close(subid.clone()));
}
self.subs.remove(&subid);
}

pub fn subscribe(&mut self, subid: String, filter: Vec<Filter>) {
self.subs.insert(subid.clone(), filter.clone());
for relay in &mut self.relays {
relay.relay.subscribe(subid.clone(), filter.clone());
}
Expand Down Expand Up @@ -152,14 +160,76 @@ impl RelayPool {
url: String,
wakeup: impl Fn() + Send + Sync + Clone + 'static,
) -> Result<()> {
let url = Self::canonicalize_url(&url);
// Check if the URL already exists in the pool.
if self.has(&url) {
return Ok(());
}
let relay = Relay::new(url, wakeup)?;
let pool_relay = PoolRelay::new(relay);
let mut pool_relay = PoolRelay::new(relay);

// Add all of the existing subscriptions to the new relay
for (subid, filters) in &self.subs {
pool_relay.relay.subscribe(subid.clone(), filters.clone());
}

self.relays.push(pool_relay);

Ok(())
}

// Add and remove relays to match the provided list
pub fn set_relays(
&mut self,
urls: &Vec<String>,
wakeup: impl Fn() + Send + Sync + Clone + 'static,
) -> Result<()> {
// Canonicalize the new URLs.
let new_urls = urls
.iter()
.map(|u| Self::canonicalize_url(u))
.collect::<HashSet<_>>();

// Get the old URLs from the existing relays.
let old_urls = self
.relays
.iter()
.map(|pr| pr.relay.url.clone())
.collect::<HashSet<_>>();

debug!("old relays: {:?}", old_urls);
debug!("new relays: {:?}", new_urls);

if new_urls.len() == 0 {
info!("bootstrapping, not clearing the relay list ...");
return Ok(());
}

// Remove the relays that are in old_urls but not in new_urls.
let to_remove: HashSet<_> = old_urls.difference(&new_urls).cloned().collect();
self.relays.retain(|pr| !to_remove.contains(&pr.relay.url));

// FIXME - how do we close connections the removed relays?

// Add the relays that are in new_urls but not in old_urls.
let to_add: HashSet<_> = new_urls.difference(&old_urls).cloned().collect();
for url in to_add {
if let Err(e) = self.add_url(url.clone(), wakeup.clone()) {
error!("Failed to add relay with URL {}: {:?}", url, e);
}
}

Ok(())
}

// standardize the format (ie, trailing slashes) to avoid dups
fn canonicalize_url(url: &String) -> String {
match Url::parse(&url) {
Ok(parsed_url) => parsed_url.to_string(),
Err(_) => url.clone(), // If parsing fails, return the original URL.
}
}

/// Attempts to receive a pool event from a list of relays. The
/// function searches each relay in the list in order, attempting to
/// receive a message from each. If a message is received, return it.
Expand Down
Loading
Loading