From ff6aba80b3ce15bb3445b1154d49fc2ddae1467a Mon Sep 17 00:00:00 2001 From: Ken Sedgwick Date: Wed, 9 Oct 2024 09:52:28 -0700 Subject: [PATCH] WIP: Add dispatcher and task modules --- src/app.rs | 25 ++++++++++- src/dispatcher.rs | 67 +++++++++++++++++++++++++++++ src/lib.rs | 2 + src/task.rs | 105 ++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 198 insertions(+), 1 deletion(-) create mode 100644 src/dispatcher.rs create mode 100644 src/task.rs diff --git a/src/app.rs b/src/app.rs index 19177efb..0eb03243 100644 --- a/src/app.rs +++ b/src/app.rs @@ -4,6 +4,7 @@ use crate::{ app_style::user_requested_visuals_change, args::Args, column::Columns, + dispatcher::{self, HandlerTable}, draft::Drafts, error::{Error, FilterError}, filter::{self, FilterState}, @@ -16,6 +17,7 @@ use crate::{ notes_holder::NotesHolderStorage, profile::Profile, subscriptions::{SubKind, Subscriptions}, + task, thread::Thread, timeline::{Timeline, TimelineId, TimelineKind, ViewFilter}, ui::{self, DesktopSidePanel}, @@ -32,6 +34,7 @@ use egui_extras::{Size, StripBuilder}; use nostrdb::{Config, Filter, Ndb, Note, Transaction}; +use futures::SinkExt; use std::collections::HashMap; use std::path::Path; use std::time::Duration; @@ -61,6 +64,7 @@ pub struct Damus { pub img_cache: ImageCache, pub accounts: AccountManager, pub subscriptions: Subscriptions, + pub dispatch: HandlerTable, frame_history: crate::frame_history::FrameHistory, @@ -472,6 +476,11 @@ fn update_damus(damus: &mut Damus, ctx: &egui::Context) { .insert("unknownids".to_string(), SubKind::OneShot); setup_initial_nostrdb_subs(&damus.ndb, &mut damus.note_cache, &mut damus.columns) .expect("home subscription failed"); + + let damusref = damus.reference(); + tokio::spawn(async move { + task::setup_user_relays(damusref).await; + }); } DamusState::NewTimelineSub(new_timeline_id) => { @@ -511,7 +520,7 @@ fn update_damus(damus: &mut Damus, ctx: &egui::Context) { damus.columns.attempt_perform_deletion_request(); } -fn process_event(damus: &mut Damus, _subid: &str, event: &str) { +fn process_event(damus: &mut Damus, subid: &str, event: &str) { #[cfg(feature = "profiling")] puffin::profile_function!(); @@ -519,6 +528,18 @@ fn process_event(damus: &mut Damus, _subid: &str, event: &str) { if let Err(_err) = damus.ndb.process_event(event) { error!("error processing event {}", event); } + + // Notify waiting subscribers that a pool event has happened + if let Some(handler) = damus.dispatch.get(subid) { + let mut handler_clone = handler.clone(); + tokio::spawn(async move { + handler_clone + .sender + .send(dispatcher::Event::Pool) + .await + .ok(); + }); + } } fn handle_eose(damus: &mut Damus, subid: &str, relay_url: &str) -> Result<()> { @@ -726,6 +747,7 @@ impl Damus { debug, unknown_ids: UnknownIds::default(), subscriptions: Subscriptions::default(), + dispatch: HandlerTable::default(), since_optimize: parsed_args.since_optimize, threads: NotesHolderStorage::default(), profiles: NotesHolderStorage::default(), @@ -822,6 +844,7 @@ impl Damus { debug, unknown_ids: UnknownIds::default(), subscriptions: Subscriptions::default(), + dispatch: HandlerTable::default(), since_optimize: true, threads: NotesHolderStorage::default(), profiles: NotesHolderStorage::default(), diff --git a/src/dispatcher.rs b/src/dispatcher.rs new file mode 100644 index 00000000..c85d6fbf --- /dev/null +++ b/src/dispatcher.rs @@ -0,0 +1,67 @@ +use futures::channel::mpsc; +use std::collections::HashMap; +use std::error::Error; +use std::fmt; +use uuid::Uuid; + +use nostrdb::Filter; + +use crate::Damus; + +#[allow(dead_code)] // until InternalError is used +#[derive(Debug)] +pub enum DispatcherError { + InternalError(String), +} + +impl fmt::Display for DispatcherError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + DispatcherError::InternalError(msg) => write!(f, "Internal error: {}", msg), + } + } +} + +impl Error for DispatcherError {} + +pub type DispatcherResult = Result; + +#[derive(Debug)] +pub enum Event { + Pool, +} + +/// Used by the relay code to dispatch events to a waiting handlers +#[derive(Debug, Clone)] +pub struct SubscriptionHandler { + pub sender: mpsc::Sender, +} + +/// Maps subscription id to handler for the subscription +pub type HandlerTable = HashMap; + +/// Used by async tasks to receive events +#[allow(dead_code)] // until id is read +#[derive(Debug)] +pub struct Subscription { + pub id: String, + pub receiver: mpsc::Receiver, +} + +pub fn subscribe( + damus: &mut Damus, + filters: &[Filter], + bufsz: usize, +) -> DispatcherResult { + let (sender, receiver) = mpsc::channel::(bufsz); + let id = Uuid::new_v4().to_string(); + damus + .dispatch + .insert(id.clone(), SubscriptionHandler { sender }); + damus.pool.subscribe(id.clone(), filters.into()); + Ok(Subscription { id, receiver }) +} + +pub fn _unsubscribe(_sub: Subscription) -> DispatcherResult<()> { + unimplemented!() +} diff --git a/src/lib.rs b/src/lib.rs index f0dc98d7..dac2344f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,6 +11,7 @@ mod app_style; mod args; mod colors; mod column; +mod dispatcher; mod draft; mod filter; mod fonts; @@ -32,6 +33,7 @@ pub mod relay_pool_manager; mod result; mod route; mod subscriptions; +mod task; mod test_data; mod thread; mod time; diff --git a/src/task.rs b/src/task.rs new file mode 100644 index 00000000..b7c556d5 --- /dev/null +++ b/src/task.rs @@ -0,0 +1,105 @@ +use futures::stream::StreamExt; +use tracing::{debug, error}; + +use enostr::RelayPool; +use nostrdb::{Filter, Ndb, Transaction}; + +use crate::dispatcher; +use crate::note::NoteRef; +use crate::{with_mut_damus, DamusRef}; + +pub async fn setup_user_relays(damusref: DamusRef) { + debug!("do_setup_user_relays starting"); + + let filter = with_mut_damus(&damusref, |damus| { + debug!("setup_user_relays: acquired damus for filter"); + + let account = damus + .accounts + .get_selected_account() + .as_ref() + .map(|a| a.pubkey.bytes()) + .expect("selected account"); + + // NIP-65 + Filter::new() + .authors([account]) + .kinds([10002]) + .limit(1) + .build() + }); + + let mut sub = with_mut_damus(&damusref, |damus| { + debug!("setup_user_relays: acquired damus for query + subscribe"); + let txn = Transaction::new(&damus.ndb).expect("transaction"); + let relays = query_nip65_relays(&damus.ndb, &txn, &filter); + debug!("setup_user_relays: query #1 relays: {:#?}", relays); + add_relays(&mut damus.pool, relays); + + // Add a relay subscription to the pool + dispatcher::subscribe(damus, &[filter.clone()], 10).expect("subscribe") + }); + debug!("setup_user_relays: sub {}", sub.id); + + loop { + match sub.receiver.next().await { + Some(ev) => { + debug!("setup_user_relays: saw {:?}", ev); + with_mut_damus(&damusref, |damus| { + let txn = Transaction::new(&damus.ndb).expect("transaction"); + let relays = query_nip65_relays(&damus.ndb, &txn, &filter); + debug!("setup_user_relays: query #2 relays: {:#?}", relays); + add_relays(&mut damus.pool, relays); + }) + } + None => { + debug!("setup_user_relays: saw None"); + break; + } + } + } + + debug!("do_setup_user_relays finished"); +} + +fn _query_note_json(ndb: &Ndb, txn: &Transaction, filter: &Filter) -> Vec { + let lim = filter.limit().unwrap_or(crate::filter::default_limit()) as i32; + let results = ndb + .query(txn, &[filter.clone()], lim) + .expect("query results"); + results + .iter() + .map(|qr| NoteRef::new(qr.note_key, qr.note.created_at())) + .filter_map(|nr| ndb.get_note_by_key(txn, nr.key).ok()) + .map(|n| n.json().unwrap()) + .collect() +} + +fn query_nip65_relays(ndb: &Ndb, txn: &Transaction, filter: &Filter) -> Vec { + let lim = filter.limit().unwrap_or(crate::filter::default_limit()) as i32; + let results = ndb + .query(txn, &[filter.clone()], lim) + .expect("query results"); + results + .iter() + .map(|qr| NoteRef::new(qr.note_key, qr.note.created_at())) + .filter_map(|nr| ndb.get_note_by_key(txn, nr.key).ok()) + .flat_map(|n| { + n.tags() + .iter() + .filter_map(|ti| ti.get_unchecked(1).variant().str()) + .map(|s| s.to_string()) + }) + .collect() +} + +fn add_relays(pool: &mut RelayPool, relays: Vec) { + let wakeup = move || { + // FIXME - how do we repaint? + }; + for relay in relays { + if let Err(e) = pool.add_url(relay, wakeup.clone()) { + error!("{:?}", e) + } + } +}