From 2291cc3ad59a3464b69cace5c06e02c4cff7346e Mon Sep 17 00:00:00 2001 From: substack Date: Mon, 21 Mar 2022 22:54:31 -1000 Subject: [PATCH] move stream code out of store --- src/cable.rs | 8 ----- src/lib.rs | 2 ++ src/store.rs | 84 +++------------------------------------------------ src/stream.rs | 80 ++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 87 insertions(+), 87 deletions(-) delete mode 100644 src/cable.rs create mode 100644 src/stream.rs diff --git a/src/cable.rs b/src/cable.rs deleted file mode 100644 index 1011e68..0000000 --- a/src/cable.rs +++ /dev/null @@ -1,8 +0,0 @@ -pub trait Storage { -} - -pub struct Cable { -} - -impl Cable { -} diff --git a/src/lib.rs b/src/lib.rs index 4891cee..7c6ae7f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,6 +22,8 @@ mod store; pub use store::*; mod error; pub use error::*; +mod stream; +pub use stream::*; use length_prefixed_stream::{decode_with_options,DecodeOptions}; #[derive(Clone,Debug,PartialEq)] diff --git a/src/store.rs b/src/store.rs index 9594fb3..bcf30fb 100644 --- a/src/store.rs +++ b/src/store.rs @@ -1,88 +1,14 @@ -use crate::{Error,Post,PostBody,Channel,Hash,Payload,ChannelOptions}; +use crate::{ + Error,Post,PostBody,Channel,Hash,Payload,ChannelOptions, + stream::{LiveStream,PostStream,HashStream}, +}; use sodiumoxide::crypto; use std::convert::TryInto; use std::collections::{HashMap,BTreeMap}; -use async_std::{ - prelude::*, - stream::Stream,stream,channel,sync::{Arc,RwLock,Mutex}, - task,task::{Waker,Context,Poll},pin::Pin, -}; +use async_std::{prelude::*,task,stream,sync::{Arc,RwLock,Mutex}}; use desert::ToBytes; pub type Keypair = ([u8;32],[u8;64]); pub type GetPostOptions = ChannelOptions; -pub type PostStream<'a> = Box>+Unpin+Send+'a>; -pub type HashStream<'a> = Box>+Unpin+Send+'a>; - -#[derive(Clone)] -struct LiveStream { - id: usize, - options: ChannelOptions, - sender: channel::Sender, - receiver: channel::Receiver, - live_streams: Arc>>, - waker: Arc>>, -} - -impl LiveStream { - pub fn new( - id: usize, - options: ChannelOptions, - live_streams: Arc>>, - ) -> Self { - let (sender,receiver) = channel::bounded(options.limit); - Self { id, options, sender, receiver, live_streams, waker: Arc::new(Mutex::new(None)) } - } - pub async fn send(&mut self, post: Post) { - if let Err(_) = self.sender.try_send(post) {} - if let Some(waker) = self.waker.lock().await.as_ref() { - waker.wake_by_ref(); - } - } - pub fn matches(&self, post: &Post) -> bool { - if Some(&self.options.channel) != post.get_channel() { return false } - match (self.options.time_start, self.options.time_end) { - (0,0) => true, - (0,end) => post.get_timestamp().map(|t| t <= end).unwrap_or(false), - (start,0) => post.get_timestamp().map(|t| start <= t).unwrap_or(false), - (start,end) => post.get_timestamp().map(|t| start <= t && t <= end).unwrap_or(false), - } - } -} - -impl Stream for LiveStream { - type Item = Result; - fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll> { - let r = Pin::new(&mut self.receiver.recv()).poll(ctx); - match r { - Poll::Ready(Ok(x)) => { - let m_waker = self.waker.clone(); - task::block_on(async move { *m_waker.lock().await = None; }); - Poll::Ready(Some(Ok(x))) - }, - Poll::Ready(Err(x)) => { - let m_waker = self.waker.clone(); - task::block_on(async move { *m_waker.lock().await = None; }); - Poll::Ready(Some(Err(x.into()))) - }, - Poll::Pending => { - let m_waker = self.waker.clone(); - let waker = ctx.waker().clone(); - task::block_on(async move { *m_waker.lock().await = Some(waker); }); - Poll::Pending - }, - } - } -} - -impl Drop for LiveStream { - fn drop(&mut self) { - let live_streams = self.live_streams.clone(); - let id = self.id; - task::block_on(async move { - live_streams.write().await.drain_filter(|s| s.id == id); - }); - } -} #[async_trait::async_trait] pub trait Store: Clone+Send+Sync+Unpin+'static { diff --git a/src/stream.rs b/src/stream.rs new file mode 100644 index 0000000..a1c6d3c --- /dev/null +++ b/src/stream.rs @@ -0,0 +1,80 @@ +use crate::{Error,ChannelOptions,Post,Hash}; +use async_std::{ + prelude::*, + task,channel,stream::Stream,pin::Pin, + task::{Waker,Context,Poll},sync::{Arc,RwLock,Mutex}, +}; + +pub type PostStream<'a> = Box>+Unpin+Send+'a>; +pub type HashStream<'a> = Box>+Unpin+Send+'a>; + +#[derive(Clone)] +pub struct LiveStream { + id: usize, + options: ChannelOptions, + sender: channel::Sender, + receiver: channel::Receiver, + live_streams: Arc>>, + waker: Arc>>, +} + +impl LiveStream { + pub fn new( + id: usize, + options: ChannelOptions, + live_streams: Arc>>, + ) -> Self { + let (sender,receiver) = channel::bounded(options.limit); + Self { id, options, sender, receiver, live_streams, waker: Arc::new(Mutex::new(None)) } + } + pub async fn send(&mut self, post: Post) { + if let Err(_) = self.sender.try_send(post) {} + if let Some(waker) = self.waker.lock().await.as_ref() { + waker.wake_by_ref(); + } + } + pub fn matches(&self, post: &Post) -> bool { + if Some(&self.options.channel) != post.get_channel() { return false } + match (self.options.time_start, self.options.time_end) { + (0,0) => true, + (0,end) => post.get_timestamp().map(|t| t <= end).unwrap_or(false), + (start,0) => post.get_timestamp().map(|t| start <= t).unwrap_or(false), + (start,end) => post.get_timestamp().map(|t| start <= t && t <= end).unwrap_or(false), + } + } +} + +impl Stream for LiveStream { + type Item = Result; + fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll> { + let r = Pin::new(&mut self.receiver.recv()).poll(ctx); + match r { + Poll::Ready(Ok(x)) => { + let m_waker = self.waker.clone(); + task::block_on(async move { *m_waker.lock().await = None; }); + Poll::Ready(Some(Ok(x))) + }, + Poll::Ready(Err(x)) => { + let m_waker = self.waker.clone(); + task::block_on(async move { *m_waker.lock().await = None; }); + Poll::Ready(Some(Err(x.into()))) + }, + Poll::Pending => { + let m_waker = self.waker.clone(); + let waker = ctx.waker().clone(); + task::block_on(async move { *m_waker.lock().await = Some(waker); }); + Poll::Pending + }, + } + } +} + +impl Drop for LiveStream { + fn drop(&mut self) { + let live_streams = self.live_streams.clone(); + let id = self.id; + task::block_on(async move { + live_streams.write().await.drain_filter(|s| s.id == id); + }); + } +}