Skip to content

Commit c225d86

Browse files
committed
format async-stream
1 parent 1c95a4b commit c225d86

File tree

4 files changed

+23
-13
lines changed

4 files changed

+23
-13
lines changed

src/dataflow-types/src/client.rs

+5-3
Original file line numberDiff line numberDiff line change
@@ -729,15 +729,17 @@ pub trait GenericClient<C, R>: fmt::Debug + Send {
729729
where
730730
R: Send + 'a,
731731
{
732-
Box::pin(async_stream::stream! {
732+
Box::pin(async_stream::stream!({
733733
loop {
734734
match self.recv().await {
735735
Ok(Some(response)) => yield Ok(response),
736736
Err(error) => yield Err(error),
737-
Ok(None) => { return; }
737+
Ok(None) => {
738+
return;
739+
}
738740
}
739741
}
740-
})
742+
}))
741743
}
742744
}
743745

src/persist-client/src/read.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -241,13 +241,13 @@ where
241241
{
242242
/// Convert listener into futures::Stream
243243
pub fn into_stream(mut self) -> impl Stream<Item = ListenEvent<K, V, T, D>> {
244-
async_stream::stream! {
245-
loop{
244+
async_stream::stream!({
245+
loop {
246246
for msg in self.next().await {
247247
yield msg;
248248
}
249249
}
250-
}
250+
})
251251
}
252252

253253
/// Attempt to pull out the next values of this subscription.

src/storage/src/source/mod.rs

+13-5
Original file line numberDiff line numberDiff line change
@@ -445,11 +445,11 @@ pub trait SourceReader {
445445
where
446446
Self: Sized + 'a,
447447
{
448-
Box::pin(async_stream::stream! {
448+
Box::pin(async_stream::stream!({
449449
while let Some(msg) = self.next(timestamp_frequency).await {
450450
yield msg;
451451
}
452-
})
452+
}))
453453
}
454454
}
455455

@@ -950,8 +950,16 @@ where
950950
let sync_activator = scope.sync_activator_for(&info.address[..]);
951951
let base_metrics = base_metrics.clone();
952952
let source_connector = source_connector.clone();
953-
let mut source_reader = Box::pin(async_stream::stream! {
954-
let mut timestamper = match ReclockOperator::new(name.clone(), storage_metadata, now, timestamp_frequency.clone(), as_of.clone()).await {
953+
let mut source_reader = Box::pin(async_stream::stream!({
954+
let mut timestamper = match ReclockOperator::new(
955+
name.clone(),
956+
storage_metadata,
957+
now,
958+
timestamp_frequency.clone(),
959+
as_of.clone(),
960+
)
961+
.await
962+
{
955963
Ok(t) => t,
956964
Err(e) => {
957965
error!("Failed to create source {} timestamper: {:#}", name, e);
@@ -1037,7 +1045,7 @@ where
10371045
}
10381046
}
10391047
}
1040-
});
1048+
}));
10411049

10421050
let activator = scope.activator_for(&info.address[..]);
10431051
move |cap, output| {

src/storage/src/source/persist_source.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ where
6060

6161
// This is a generator that sets up an async `Stream` that can be continously polled to get the
6262
// values that are `yield`-ed from it's body.
63-
let async_stream = async_stream::try_stream! {
63+
let async_stream = async_stream::try_stream!({
6464
// We are reading only from worker 0. We can split the work of reading from the snapshot to
6565
// multiple workers, but someone has to distribute the splits. Also, in the glorious
6666
// STORAGE future, we will use multiple persist shards to back a STORAGE collection. Then,
@@ -112,7 +112,7 @@ where
112112
yield event;
113113
}
114114
}
115-
};
115+
});
116116

117117
let mut pinned_stream = Box::pin(async_stream);
118118

0 commit comments

Comments
 (0)