diff --git a/src/store.rs b/src/store.rs index 8c84a90..9594fb3 100644 --- a/src/store.rs +++ b/src/store.rs @@ -5,7 +5,7 @@ use std::collections::{HashMap,BTreeMap}; use async_std::{ prelude::*, stream::Stream,stream,channel,sync::{Arc,RwLock,Mutex}, - task,task::{Context,Poll},pin::Pin, + task,task::{Waker,Context,Poll},pin::Pin, }; use desert::ToBytes; pub type Keypair = ([u8;32],[u8;64]); @@ -20,6 +20,7 @@ struct LiveStream { sender: channel::Sender, receiver: channel::Receiver, live_streams: Arc>>, + waker: Arc>>, } impl LiveStream { @@ -29,13 +30,14 @@ impl LiveStream { live_streams: Arc>>, ) -> Self { let (sender,receiver) = channel::bounded(options.limit); - Self { id, options, sender, receiver, live_streams } + Self { id, options, sender, receiver, live_streams, waker: Arc::new(Mutex::new(None)) } } - pub fn send(&self, post: Post) { + pub async fn send(&mut self, post: Post) { if let Err(_) = self.sender.try_send(post) {} + if let Some(waker) = self.waker.lock().await.as_ref() { + waker.wake_by_ref(); + } } -} -impl LiveStream { pub fn matches(&self, post: &Post) -> bool { if Some(&self.options.channel) != post.get_channel() { return false } match (self.options.time_start, self.options.time_end) { @@ -50,10 +52,28 @@ impl LiveStream { impl Stream for LiveStream { type Item = Result; fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll> { - let r = futures::ready![Pin::new(&mut self.receiver.recv()).poll(ctx)].unwrap(); - Poll::Ready(Some(Ok(r))) + let r = Pin::new(&mut self.receiver.recv()).poll(ctx); + match r { + Poll::Ready(Ok(x)) => { + let m_waker = self.waker.clone(); + task::block_on(async move { *m_waker.lock().await = None; }); + Poll::Ready(Some(Ok(x))) + }, + Poll::Ready(Err(x)) => { + let m_waker = self.waker.clone(); + task::block_on(async move { *m_waker.lock().await = None; }); + Poll::Ready(Some(Err(x.into()))) + }, + Poll::Pending => { + let m_waker = self.waker.clone(); + let waker = ctx.waker().clone(); + task::block_on(async move { *m_waker.lock().await = Some(waker); }); + Poll::Pending + }, + } } } + impl Drop for LiveStream { fn drop(&mut self) { let live_streams = self.live_streams.clone(); @@ -170,9 +190,9 @@ impl Store for MemoryStore { } } if let Some(senders) = self.live_streams.read().await.get(channel) { - for stream in senders.read().await.iter() { + for stream in senders.write().await.iter_mut() { if stream.matches(&post) { - stream.send(post.clone()); + stream.send(post.clone()).await; } } }