diff --git a/src/dataflow-types/src/client.rs b/src/dataflow-types/src/client.rs index a92e2e97475f0..07ee3be3df3b2 100644 --- a/src/dataflow-types/src/client.rs +++ b/src/dataflow-types/src/client.rs @@ -729,15 +729,17 @@ pub trait GenericClient: fmt::Debug + Send { where R: Send + 'a, { - Box::pin(async_stream::stream! { + Box::pin(async_stream::stream!({ loop { match self.recv().await { Ok(Some(response)) => yield Ok(response), Err(error) => yield Err(error), - Ok(None) => { return; } + Ok(None) => { + return; + } } } - }) + })) } } diff --git a/src/persist-client/src/read.rs b/src/persist-client/src/read.rs index 513742ffaf73f..5b7219f85e409 100644 --- a/src/persist-client/src/read.rs +++ b/src/persist-client/src/read.rs @@ -241,13 +241,13 @@ where { /// Convert listener into futures::Stream pub fn into_stream(mut self) -> impl Stream> { - async_stream::stream! { - loop{ + async_stream::stream!({ + loop { for msg in self.next().await { yield msg; } } - } + }) } /// Attempt to pull out the next values of this subscription. diff --git a/src/storage/src/source/mod.rs b/src/storage/src/source/mod.rs index 6ee3a7347fc0b..ca0a81ee71d49 100644 --- a/src/storage/src/source/mod.rs +++ b/src/storage/src/source/mod.rs @@ -445,11 +445,11 @@ pub trait SourceReader { where Self: Sized + 'a, { - Box::pin(async_stream::stream! { + Box::pin(async_stream::stream!({ while let Some(msg) = self.next(timestamp_frequency).await { yield msg; } - }) + })) } } @@ -950,8 +950,16 @@ where let sync_activator = scope.sync_activator_for(&info.address[..]); let base_metrics = base_metrics.clone(); let source_connector = source_connector.clone(); - let mut source_reader = Box::pin(async_stream::stream! { - let mut timestamper = match ReclockOperator::new(name.clone(), storage_metadata, now, timestamp_frequency.clone(), as_of.clone()).await { + let mut source_reader = Box::pin(async_stream::stream!({ + let mut timestamper = match ReclockOperator::new( + name.clone(), + storage_metadata, + now, + timestamp_frequency.clone(), + as_of.clone(), + ) + .await + { Ok(t) => t, Err(e) => { error!("Failed to create source {} timestamper: {:#}", name, e); @@ -1037,7 +1045,7 @@ where } } } - }); + })); let activator = scope.activator_for(&info.address[..]); move |cap, output| { diff --git a/src/storage/src/source/persist_source.rs b/src/storage/src/source/persist_source.rs index b388e3c11862d..d5688ec7dfb44 100644 --- a/src/storage/src/source/persist_source.rs +++ b/src/storage/src/source/persist_source.rs @@ -60,7 +60,7 @@ where // This is a generator that sets up an async `Stream` that can be continously polled to get the // values that are `yield`-ed from it's body. - let async_stream = async_stream::try_stream! { + let async_stream = async_stream::try_stream!({ // We are reading only from worker 0. We can split the work of reading from the snapshot to // multiple workers, but someone has to distribute the splits. Also, in the glorious // STORAGE future, we will use multiple persist shards to back a STORAGE collection. Then, @@ -112,7 +112,7 @@ where yield event; } } - }; + }); let mut pinned_stream = Box::pin(async_stream);