Skip to content

Commit ac4d79b

Browse files
committed
chore: fix formatting on stream! and try_stream! macros
tokio-rs/async-stream#68 pointed out we can use the `stream!({ ... })` macro syntax to force rustfmt to format the inner code block.
1 parent 0284620 commit ac4d79b

File tree

12 files changed

+177
-100
lines changed

12 files changed

+177
-100
lines changed

backend/api/src/api/v1/gql/subscription/chat.rs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,22 +49,30 @@ impl ChatSubscription {
4949
.await
5050
.map_err_gql("failed to subscribe to chat messages")?;
5151

52-
Ok(stream! {
52+
Ok(stream!({
5353
yield Ok(welcome_message);
5454
while let Ok(message) = message_stream.recv().await {
5555
let event = pb::scuffle::events::ChatMessage::decode(
56-
message.as_bytes().map_err_gql("invalid redis value type")?
57-
).map_err_gql("failed to decode chat message")?;
56+
message.as_bytes().map_err_gql("invalid redis value type")?,
57+
)
58+
.map_err_gql("failed to decode chat message")?;
5859

5960
yield Ok(ChatMessage {
60-
id: Uuid::parse_str(&event.id).map_err_gql("failed to parse chat message id")?,
61-
author_id: Uuid::parse_str(&event.author_id).map_err_gql("failed to parse chat message author id")?,
62-
channel_id: Uuid::parse_str(&event.channel_id).map_err_gql("failed to parse chat message channel id")?,
61+
id: Uuid::parse_str(&event.id)
62+
.map_err_gql("failed to parse chat message id")?,
63+
author_id: Uuid::parse_str(&event.author_id)
64+
.map_err_gql("failed to parse chat message author id")?,
65+
channel_id: Uuid::parse_str(&event.channel_id)
66+
.map_err_gql("failed to parse chat message channel id")?,
6367
content: event.content,
64-
created_at: Utc.timestamp_opt(event.created_at, 0).single().map_err_gql("failed to parse chat message created at")?.into(),
68+
created_at: Utc
69+
.timestamp_opt(event.created_at, 0)
70+
.single()
71+
.map_err_gql("failed to parse chat message created at")?
72+
.into(),
6573
r#type: MessageType::User,
6674
});
6775
}
68-
})
76+
}))
6977
}
7078
}

backend/api/src/api/v1/gql/subscription/user.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,17 @@ impl UserSubscription {
3939
.await
4040
.map_err_gql("failed to subscribe to user display name")?;
4141

42-
Ok(async_stream::stream! {
42+
Ok(async_stream::stream!({
4343
yield Ok(DisplayNameStream {
4444
display_name: user.display_name.clone(),
4545
username: user.username.clone(),
4646
});
4747

4848
while let Ok(message) = subscription.recv().await {
4949
let event = pb::scuffle::events::UserDisplayName::decode(
50-
message.as_bytes().map_err_gql("invalid redis value")?
51-
).map_err_gql("failed to decode user display name")?;
50+
message.as_bytes().map_err_gql("invalid redis value")?,
51+
)
52+
.map_err_gql("failed to decode user display name")?;
5253

5354
if let Some(username) = event.username {
5455
user.username = username;
@@ -63,6 +64,6 @@ impl UserSubscription {
6364
username: user.username.clone(),
6465
});
6566
}
66-
})
67+
}))
6768
}
6869
}

backend/api/src/grpc/health.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,17 +53,24 @@ impl health_server::Health for HealthServer {
5353
async fn watch(&self, _: Request<HealthCheckRequest>) -> Result<Response<Self::WatchStream>> {
5454
let global = self.global.clone();
5555

56-
let output = try_stream! {
56+
let output = try_stream!({
5757
loop {
5858
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
5959

60-
let serving = global.upgrade().map(|g| !g.ctx.is_done()).unwrap_or_default();
60+
let serving = global
61+
.upgrade()
62+
.map(|g| !g.ctx.is_done())
63+
.unwrap_or_default();
6164

6265
yield HealthCheckResponse {
63-
status: if serving { ServingStatus::Serving.into() } else { ServingStatus::NotServing.into() },
66+
status: if serving {
67+
ServingStatus::Serving.into()
68+
} else {
69+
ServingStatus::NotServing.into()
70+
},
6471
};
6572
}
66-
};
73+
});
6774

6875
Ok(Response::new(Box::pin(output)))
6976
}

common/src/rmq.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -127,35 +127,35 @@ impl ConnectionPool {
127127
let queue_name = queue_name.to_string();
128128
let connection_name = connection_name.to_string();
129129

130-
stream! {
130+
stream!({
131131
'connection_loop: loop {
132132
let channel = self.aquire().await?;
133-
let mut consumer = channel.basic_consume(&queue_name, &connection_name, options, table.clone()).await?;
133+
let mut consumer = channel
134+
.basic_consume(&queue_name, &connection_name, options, table.clone())
135+
.await?;
134136
loop {
135137
let m = consumer.next().await;
136138
match m {
137139
Some(Ok(m)) => {
138140
yield Ok(m);
139-
},
140-
Some(Err(e)) => {
141-
match e {
142-
lapin::Error::IOError(e) => {
143-
if e.kind() == std::io::ErrorKind::ConnectionReset {
144-
continue 'connection_loop;
145-
}
146-
},
147-
_ => {
148-
yield Err(anyhow!("failed to get message: {}", e));
141+
}
142+
Some(Err(e)) => match e {
143+
lapin::Error::IOError(e) => {
144+
if e.kind() == std::io::ErrorKind::ConnectionReset {
145+
continue 'connection_loop;
149146
}
150147
}
148+
_ => {
149+
yield Err(anyhow!("failed to get message: {}", e));
150+
}
151151
},
152152
None => {
153153
continue 'connection_loop;
154-
},
154+
}
155155
}
156156
}
157157
}
158-
}
158+
})
159159
}
160160

161161
pub async fn aquire(&self) -> Result<Channel> {

video/edge/src/grpc/health.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,17 +49,24 @@ impl health_server::Health for HealthServer {
4949
async fn watch(&self, _: Request<HealthCheckRequest>) -> Result<Response<Self::WatchStream>> {
5050
let global = self.global.clone();
5151

52-
let output = try_stream! {
52+
let output = try_stream!({
5353
loop {
5454
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
5555

56-
let serving = global.upgrade().map(|g| !g.ctx.is_done()).unwrap_or_default();
56+
let serving = global
57+
.upgrade()
58+
.map(|g| !g.ctx.is_done())
59+
.unwrap_or_default();
5760

5861
yield HealthCheckResponse {
59-
status: if serving { ServingStatus::Serving.into() } else { ServingStatus::NotServing.into() },
62+
status: if serving {
63+
ServingStatus::Serving.into()
64+
} else {
65+
ServingStatus::NotServing.into()
66+
},
6067
};
6168
}
62-
};
69+
});
6370

6471
Ok(Response::new(Box::pin(output)))
6572
}

video/ingest/src/grpc/health.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,17 +49,24 @@ impl health_server::Health for HealthServer {
4949
async fn watch(&self, _: Request<HealthCheckRequest>) -> Result<Response<Self::WatchStream>> {
5050
let global = self.global.clone();
5151

52-
let output = try_stream! {
52+
let output = try_stream!({
5353
loop {
5454
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
5555

56-
let serving = global.upgrade().map(|g| !g.ctx.is_done()).unwrap_or_default();
56+
let serving = global
57+
.upgrade()
58+
.map(|g| !g.ctx.is_done())
59+
.unwrap_or_default();
5760

5861
yield HealthCheckResponse {
59-
status: if serving { ServingStatus::Serving.into() } else { ServingStatus::NotServing.into() },
62+
status: if serving {
63+
ServingStatus::Serving.into()
64+
} else {
65+
ServingStatus::NotServing.into()
66+
},
6067
};
6168
}
62-
};
69+
});
6370

6471
Ok(Response::new(Box::pin(output)))
6572
}

video/ingest/src/grpc/ingest.rs

Lines changed: 23 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -68,39 +68,37 @@ impl ingest_server::Ingest for IngestServer {
6868
return Err(Status::not_found("Stream not found"));
6969
}
7070

71-
let output = try_stream! {
71+
let output = try_stream!({
7272
while let Some(event) = channel_rx.recv().await {
7373
let event = match event {
74-
WatchStreamEvent::InitSegment(data) => {
75-
WatchStreamResponse {
76-
data: Some(watch_stream_response::Data::InitSegment(data)),
77-
}
74+
WatchStreamEvent::InitSegment(data) => WatchStreamResponse {
75+
data: Some(watch_stream_response::Data::InitSegment(data)),
7876
},
79-
WatchStreamEvent::MediaSegment(ms) => {
80-
WatchStreamResponse {
81-
data: Some(watch_stream_response::Data::MediaSegment(
82-
watch_stream_response::MediaSegment {
83-
data: ms.data,
84-
keyframe: ms.keyframe,
85-
timestamp: ms.timestamp,
86-
data_type: match ms.ty {
87-
transmuxer::MediaType::Audio => watch_stream_response::media_segment::DataType::Audio.into(),
88-
transmuxer::MediaType::Video => watch_stream_response::media_segment::DataType::Video.into(),
77+
WatchStreamEvent::MediaSegment(ms) => WatchStreamResponse {
78+
data: Some(watch_stream_response::Data::MediaSegment(
79+
watch_stream_response::MediaSegment {
80+
data: ms.data,
81+
keyframe: ms.keyframe,
82+
timestamp: ms.timestamp,
83+
data_type: match ms.ty {
84+
transmuxer::MediaType::Audio => {
85+
watch_stream_response::media_segment::DataType::Audio.into()
8986
}
90-
}
91-
)),
92-
}
93-
}
94-
WatchStreamEvent::ShuttingDown(stream_shutdown) => {
95-
WatchStreamResponse {
96-
data: Some(watch_stream_response::Data::ShuttingDown(stream_shutdown)),
97-
}
98-
}
87+
transmuxer::MediaType::Video => {
88+
watch_stream_response::media_segment::DataType::Video.into()
89+
}
90+
},
91+
},
92+
)),
93+
},
94+
WatchStreamEvent::ShuttingDown(stream_shutdown) => WatchStreamResponse {
95+
data: Some(watch_stream_response::Data::ShuttingDown(stream_shutdown)),
96+
},
9997
};
10098

10199
yield event;
102100
}
103-
};
101+
});
104102

105103
Ok(Response::new(Box::pin(output)))
106104
}

video/ingest/src/tests/ingest.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -275,8 +275,13 @@ impl TestState {
275275

276276
let stream = {
277277
let global = global.clone();
278-
stream! {
279-
let mut stream = pin!(global.rmq.basic_consume(global.config.transcoder.events_subject.clone(), "", Default::default(), Default::default()));
278+
stream!({
279+
let mut stream = pin!(global.rmq.basic_consume(
280+
global.config.transcoder.events_subject.clone(),
281+
"",
282+
Default::default(),
283+
Default::default()
284+
));
280285
loop {
281286
select! {
282287
message = stream.next() => {
@@ -288,7 +293,7 @@ impl TestState {
288293
}
289294
}
290295
}
291-
}
296+
})
292297
};
293298

294299
Self {

video/transcoder/src/grpc/health.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,17 +49,24 @@ impl health_server::Health for HealthServer {
4949
async fn watch(&self, _: Request<HealthCheckRequest>) -> Result<Response<Self::WatchStream>> {
5050
let global = self.global.clone();
5151

52-
let output = try_stream! {
52+
let output = try_stream!({
5353
loop {
5454
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
5555

56-
let serving = global.upgrade().map(|g| !g.ctx.is_done()).unwrap_or_default();
56+
let serving = global
57+
.upgrade()
58+
.map(|g| !g.ctx.is_done())
59+
.unwrap_or_default();
5760

5861
yield HealthCheckResponse {
59-
status: if serving { ServingStatus::Serving.into() } else { ServingStatus::NotServing.into() },
62+
status: if serving {
63+
ServingStatus::Serving.into()
64+
} else {
65+
ServingStatus::NotServing.into()
66+
},
6067
};
6168
}
62-
};
69+
});
6370

6471
Ok(Response::new(Box::pin(output)))
6572
}

video/transcoder/src/transcoder/job/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ fn report_to_ingest(
250250
mut client: IngestClient<Channel>,
251251
mut channel: mpsc::Receiver<TranscoderEventRequest>,
252252
) -> impl Stream<Item = Result<()>> + Send + 'static {
253-
stream! {
253+
stream!({
254254
loop {
255255
select! {
256256
msg = channel.recv() => {
@@ -276,7 +276,7 @@ fn report_to_ingest(
276276
}
277277
}
278278
}
279-
}
279+
})
280280
}
281281

282282
impl Job {

0 commit comments

Comments
 (0)