Skip to content

Commit

Permalink
move stream code out of store
Browse files Browse the repository at this point in the history
  • Loading branch information
substack committed Mar 22, 2022
1 parent d321d32 commit 2291cc3
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 87 deletions.
8 changes: 0 additions & 8 deletions src/cable.rs

This file was deleted.

2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ mod store;
pub use store::*;
mod error;
pub use error::*;
mod stream;
pub use stream::*;
use length_prefixed_stream::{decode_with_options,DecodeOptions};

#[derive(Clone,Debug,PartialEq)]
Expand Down
84 changes: 5 additions & 79 deletions src/store.rs
Original file line number Diff line number Diff line change
@@ -1,88 +1,14 @@
use crate::{Error,Post,PostBody,Channel,Hash,Payload,ChannelOptions};
use crate::{
Error,Post,PostBody,Channel,Hash,Payload,ChannelOptions,
stream::{LiveStream,PostStream,HashStream},
};
use sodiumoxide::crypto;
use std::convert::TryInto;
use std::collections::{HashMap,BTreeMap};
use async_std::{
prelude::*,
stream::Stream,stream,channel,sync::{Arc,RwLock,Mutex},
task,task::{Waker,Context,Poll},pin::Pin,
};
use async_std::{prelude::*,task,stream,sync::{Arc,RwLock,Mutex}};
use desert::ToBytes;
pub type Keypair = ([u8;32],[u8;64]);
pub type GetPostOptions = ChannelOptions;
pub type PostStream<'a> = Box<dyn Stream<Item=Result<Post,Error>>+Unpin+Send+'a>;
pub type HashStream<'a> = Box<dyn Stream<Item=Result<Hash,Error>>+Unpin+Send+'a>;

#[derive(Clone)]
struct LiveStream {
id: usize,
options: ChannelOptions,
sender: channel::Sender<Post>,
receiver: channel::Receiver<Post>,
live_streams: Arc<RwLock<Vec<Self>>>,
waker: Arc<Mutex<Option<Waker>>>,
}

impl LiveStream {
pub fn new(
id: usize,
options: ChannelOptions,
live_streams: Arc<RwLock<Vec<Self>>>,
) -> Self {
let (sender,receiver) = channel::bounded(options.limit);
Self { id, options, sender, receiver, live_streams, waker: Arc::new(Mutex::new(None)) }
}
pub async fn send(&mut self, post: Post) {
if let Err(_) = self.sender.try_send(post) {}
if let Some(waker) = self.waker.lock().await.as_ref() {
waker.wake_by_ref();
}
}
pub fn matches(&self, post: &Post) -> bool {
if Some(&self.options.channel) != post.get_channel() { return false }
match (self.options.time_start, self.options.time_end) {
(0,0) => true,
(0,end) => post.get_timestamp().map(|t| t <= end).unwrap_or(false),
(start,0) => post.get_timestamp().map(|t| start <= t).unwrap_or(false),
(start,end) => post.get_timestamp().map(|t| start <= t && t <= end).unwrap_or(false),
}
}
}

impl Stream for LiveStream {
type Item = Result<Post,Error>;
fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
let r = Pin::new(&mut self.receiver.recv()).poll(ctx);
match r {
Poll::Ready(Ok(x)) => {
let m_waker = self.waker.clone();
task::block_on(async move { *m_waker.lock().await = None; });
Poll::Ready(Some(Ok(x)))
},
Poll::Ready(Err(x)) => {
let m_waker = self.waker.clone();
task::block_on(async move { *m_waker.lock().await = None; });
Poll::Ready(Some(Err(x.into())))
},
Poll::Pending => {
let m_waker = self.waker.clone();
let waker = ctx.waker().clone();
task::block_on(async move { *m_waker.lock().await = Some(waker); });
Poll::Pending
},
}
}
}

impl Drop for LiveStream {
fn drop(&mut self) {
let live_streams = self.live_streams.clone();
let id = self.id;
task::block_on(async move {
live_streams.write().await.drain_filter(|s| s.id == id);
});
}
}

#[async_trait::async_trait]
pub trait Store: Clone+Send+Sync+Unpin+'static {
Expand Down
80 changes: 80 additions & 0 deletions src/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use crate::{Error,ChannelOptions,Post,Hash};
use async_std::{
prelude::*,
task,channel,stream::Stream,pin::Pin,
task::{Waker,Context,Poll},sync::{Arc,RwLock,Mutex},
};

pub type PostStream<'a> = Box<dyn Stream<Item=Result<Post,Error>>+Unpin+Send+'a>;
pub type HashStream<'a> = Box<dyn Stream<Item=Result<Hash,Error>>+Unpin+Send+'a>;

#[derive(Clone)]
pub struct LiveStream {
id: usize,
options: ChannelOptions,
sender: channel::Sender<Post>,
receiver: channel::Receiver<Post>,
live_streams: Arc<RwLock<Vec<Self>>>,
waker: Arc<Mutex<Option<Waker>>>,
}

impl LiveStream {
pub fn new(
id: usize,
options: ChannelOptions,
live_streams: Arc<RwLock<Vec<Self>>>,
) -> Self {
let (sender,receiver) = channel::bounded(options.limit);
Self { id, options, sender, receiver, live_streams, waker: Arc::new(Mutex::new(None)) }
}
pub async fn send(&mut self, post: Post) {
if let Err(_) = self.sender.try_send(post) {}
if let Some(waker) = self.waker.lock().await.as_ref() {
waker.wake_by_ref();
}
}
pub fn matches(&self, post: &Post) -> bool {
if Some(&self.options.channel) != post.get_channel() { return false }
match (self.options.time_start, self.options.time_end) {
(0,0) => true,
(0,end) => post.get_timestamp().map(|t| t <= end).unwrap_or(false),
(start,0) => post.get_timestamp().map(|t| start <= t).unwrap_or(false),
(start,end) => post.get_timestamp().map(|t| start <= t && t <= end).unwrap_or(false),
}
}
}

impl Stream for LiveStream {
type Item = Result<Post,Error>;
fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
let r = Pin::new(&mut self.receiver.recv()).poll(ctx);
match r {
Poll::Ready(Ok(x)) => {
let m_waker = self.waker.clone();
task::block_on(async move { *m_waker.lock().await = None; });
Poll::Ready(Some(Ok(x)))
},
Poll::Ready(Err(x)) => {
let m_waker = self.waker.clone();
task::block_on(async move { *m_waker.lock().await = None; });
Poll::Ready(Some(Err(x.into())))
},
Poll::Pending => {
let m_waker = self.waker.clone();
let waker = ctx.waker().clone();
task::block_on(async move { *m_waker.lock().await = Some(waker); });
Poll::Pending
},
}
}
}

impl Drop for LiveStream {
fn drop(&mut self) {
let live_streams = self.live_streams.clone();
let id = self.id;
task::block_on(async move {
live_streams.write().await.drain_filter(|s| s.id == id);
});
}
}

0 comments on commit 2291cc3

Please sign in to comment.