Skip to content

Commit

Permalink
working end to end finally
Browse files Browse the repository at this point in the history
  • Loading branch information
substack committed Mar 20, 2022
1 parent 4b55006 commit d321d32
Showing 1 changed file with 29 additions and 9 deletions.
38 changes: 29 additions & 9 deletions src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::collections::{HashMap,BTreeMap};
use async_std::{
prelude::*,
stream::Stream,stream,channel,sync::{Arc,RwLock,Mutex},
task,task::{Context,Poll},pin::Pin,
task,task::{Waker,Context,Poll},pin::Pin,
};
use desert::ToBytes;
pub type Keypair = ([u8;32],[u8;64]);
Expand All @@ -20,6 +20,7 @@ struct LiveStream {
sender: channel::Sender<Post>,
receiver: channel::Receiver<Post>,
live_streams: Arc<RwLock<Vec<Self>>>,
waker: Arc<Mutex<Option<Waker>>>,
}

impl LiveStream {
Expand All @@ -29,13 +30,14 @@ impl LiveStream {
live_streams: Arc<RwLock<Vec<Self>>>,
) -> Self {
let (sender,receiver) = channel::bounded(options.limit);
Self { id, options, sender, receiver, live_streams }
Self { id, options, sender, receiver, live_streams, waker: Arc::new(Mutex::new(None)) }
}
pub fn send(&self, post: Post) {
pub async fn send(&mut self, post: Post) {
if let Err(_) = self.sender.try_send(post) {}
if let Some(waker) = self.waker.lock().await.as_ref() {
waker.wake_by_ref();
}
}
}
impl LiveStream {
pub fn matches(&self, post: &Post) -> bool {
if Some(&self.options.channel) != post.get_channel() { return false }
match (self.options.time_start, self.options.time_end) {
Expand All @@ -50,10 +52,28 @@ impl LiveStream {
impl Stream for LiveStream {
type Item = Result<Post,Error>;
fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
let r = futures::ready![Pin::new(&mut self.receiver.recv()).poll(ctx)].unwrap();
Poll::Ready(Some(Ok(r)))
let r = Pin::new(&mut self.receiver.recv()).poll(ctx);
match r {
Poll::Ready(Ok(x)) => {
let m_waker = self.waker.clone();
task::block_on(async move { *m_waker.lock().await = None; });
Poll::Ready(Some(Ok(x)))
},
Poll::Ready(Err(x)) => {
let m_waker = self.waker.clone();
task::block_on(async move { *m_waker.lock().await = None; });
Poll::Ready(Some(Err(x.into())))
},
Poll::Pending => {
let m_waker = self.waker.clone();
let waker = ctx.waker().clone();
task::block_on(async move { *m_waker.lock().await = Some(waker); });
Poll::Pending
},
}
}
}

impl Drop for LiveStream {
fn drop(&mut self) {
let live_streams = self.live_streams.clone();
Expand Down Expand Up @@ -170,9 +190,9 @@ impl Store for MemoryStore {
}
}
if let Some(senders) = self.live_streams.read().await.get(channel) {
for stream in senders.read().await.iter() {
for stream in senders.write().await.iter_mut() {
if stream.matches(&post) {
stream.send(post.clone());
stream.send(post.clone()).await;
}
}
}
Expand Down

0 comments on commit d321d32

Please sign in to comment.