Skip to content

Commit d8c5c6f

Browse files
authored
refactor: use new (1.58) format strings with catpured args (#3)
1 parent 2a96915 commit d8c5c6f

File tree

5 files changed

+69
-71
lines changed

5 files changed

+69
-71
lines changed

examples/simple.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ struct Message {
1313
#[tokio::main]
1414
async fn main() {
1515
if let Err(e) = run().await {
16-
eprintln!("ERRORx: {}", e);
16+
eprintln!("ERROR: {e}");
1717
if let Some(e) = e.source() {
18-
eprintln!("SOURCE: {}", e);
18+
eprintln!("SOURCE: {e}");
1919
}
2020
}
2121
}
@@ -32,7 +32,9 @@ async fn run() -> Result<(), Error> {
3232

3333
for envelope in envelopes {
3434
let envelope = envelope?;
35-
println!("Message text: {}", envelope.message.text);
35+
36+
let text = envelope.message.text;
37+
println!("Message text: {text}");
3638

3739
pub_sub_client
3840
.acknowledge(SUBSCRIPTION, vec![&envelope.ack_id], None)

examples/transform_versioned.rs

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use anyhow::anyhow;
2-
use pub_sub_client::{Error, PubSubClient, ReceivedMessage};
2+
use pub_sub_client::{Error, MessageEnvelope, PubSubClient, ReceivedMessage};
33
use serde::Deserialize;
44
use serde_json::{json, Value};
55
use std::error::Error as _;
@@ -17,9 +17,9 @@ enum Message {
1717
#[tokio::main]
1818
async fn main() {
1919
if let Err(e) = run().await {
20-
eprintln!("ERROR: {}", e);
20+
eprintln!("ERROR: {e}");
2121
if let Some(e) = e.source() {
22-
eprintln!("SOURCE: {}", e);
22+
eprintln!("SOURCE: {e}");
2323
}
2424
}
2525
}
@@ -40,14 +40,17 @@ async fn run() -> Result<(), Error> {
4040
.await?;
4141

4242
for msg_envelope in msg_envelopes {
43-
let m = msg_envelope?;
44-
println!(
45-
"id: {}, message: {:?}, delivery_attempt: {}",
46-
m.id, m.message, m.delivery_attempt
47-
);
43+
let MessageEnvelope {
44+
id,
45+
ack_id,
46+
attributes: _,
47+
message,
48+
delivery_attempt,
49+
} = msg_envelope?;
50+
println!("id: {id}, message: {message:?}, delivery_attempt: {delivery_attempt}");
4851

4952
pub_sub_client
50-
.acknowledge(SUBSCRIPTION, vec![&m.ack_id], Some(Duration::from_secs(10)))
53+
.acknowledge(SUBSCRIPTION, vec![&ack_id], Some(Duration::from_secs(10)))
5154
.await?;
5255
println!("Successfully acknowledged");
5356
}
@@ -80,6 +83,6 @@ fn transform(
8083
Ok(value)
8184
}
8285
"v2" => Ok(value),
83-
unknown => Err(anyhow!("Unknow version `{}`", unknown).into()),
86+
unknown => Err(anyhow!("Unknow version `{unknown}`").into()),
8487
}
8588
}

src/lib.rs

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ use goauth::auth::JwtClaims;
88
use goauth::credentials::Credentials;
99
use goauth::fetcher::TokenFetcher;
1010
use goauth::scopes::Scope;
11+
use reqwest::Response;
12+
use serde::Serialize;
1113
use smpl_jwt::Jwt;
1214
use std::env;
1315
use std::time::Duration;
@@ -34,10 +36,7 @@ impl PubSubClient {
3436
pub fn new(key_path: &str, refresh_buffer: Duration) -> Result<Self, Error> {
3537
let credentials =
3638
Credentials::from_file(key_path).map_err(|source| Error::Initialization {
37-
reason: format!(
38-
"Missing or malformed service account key file at `{}`",
39-
key_path
40-
),
39+
reason: format!("Missing or malformed service account key at `{key_path}`"),
4140
source,
4241
})?;
4342

@@ -52,10 +51,7 @@ impl PubSubClient {
5251
credentials
5352
.rsa_key()
5453
.map_err(|source| Error::Initialization {
55-
reason: format!(
56-
"Malformed private key as part of service account key file at `{}`",
57-
key_path
58-
),
54+
reason: format!("Malformed private key in service account key at `{key_path}`"),
5955
source,
6056
})?,
6157
None,
@@ -74,6 +70,32 @@ impl PubSubClient {
7470
reqwest_client: reqwest::Client::new(),
7571
})
7672
}
73+
74+
async fn send_request<R: Serialize>(
75+
&self,
76+
url: &str,
77+
request: &R,
78+
timeout: Option<Duration>,
79+
) -> Result<Response, Error> {
80+
let token = self
81+
.token_fetcher
82+
.fetch_token()
83+
.await
84+
.map_err(|source| Error::TokenFetch { source })?;
85+
86+
let request = self
87+
.reqwest_client
88+
.post(url)
89+
.bearer_auth(token.access_token())
90+
.json(request);
91+
let request = timeout.into_iter().fold(request, |r, t| r.timeout(t));
92+
93+
let response = request
94+
.send()
95+
.await
96+
.map_err(|source| Error::HttpServiceCommunication { source })?;
97+
Ok(response)
98+
}
7799
}
78100

79101
#[cfg(test)]
@@ -103,10 +125,7 @@ mod tests {
103125
reason: _,
104126
source: _,
105127
} => (),
106-
other => panic!(
107-
"Expected Error::InvalidServiceAccountKey, but was {}",
108-
other
109-
),
128+
other => panic!("Expected Error::InvalidServiceAccountKey, but was `{other}`"),
110129
}
111130
}
112131

@@ -119,10 +138,7 @@ mod tests {
119138
reason: _,
120139
source: _,
121140
} => (),
122-
other => panic!(
123-
"Expected Error::InvalidServiceAccountKey, but was {}",
124-
other
125-
),
141+
other => panic!("Expected Error::InvalidServiceAccountKey, but was `{other}`"),
126142
}
127143
}
128144

@@ -135,7 +151,7 @@ mod tests {
135151
reason: _,
136152
source: _,
137153
} => (),
138-
other => panic!("Expected Error::InvalidPrivateKey, but was {}", other),
154+
other => panic!("Expected Error::InvalidPrivateKey, but was `{other}`"),
139155
}
140156
}
141157
}

src/subscriber/mod.rs

Lines changed: 11 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -90,12 +90,10 @@ impl PubSubClient {
9090
max_messages: u32,
9191
timeout: Option<Duration>,
9292
) -> Result<Vec<ReceivedMessage>, Error> {
93-
let url = format!(
94-
"{}/v1/projects/{}/subscriptions/{}:pull",
95-
self.base_url, self.project_id, subscription_id
96-
);
9793
let request = PullRequest { max_messages };
98-
let response = self.send_request(&url, &request, timeout).await?;
94+
let response = self
95+
.send_request(&self.url(subscription_id, "pull"), &request, timeout)
96+
.await?;
9997

10098
if !response.status().is_success() {
10199
return Err(unexpected_http_status_code(response).await);
@@ -117,12 +115,10 @@ impl PubSubClient {
117115
ack_ids: Vec<&str>,
118116
timeout: Option<Duration>,
119117
) -> Result<(), Error> {
120-
let url = format!(
121-
"{}/v1/projects/{}/subscriptions/{}:acknowledge",
122-
self.base_url, self.project_id, subscription_id
123-
);
124118
let request = AcknowledgeRequest { ack_ids };
125-
let response = self.send_request(&url, &request, timeout).await?;
119+
let response = self
120+
.send_request(&self.url(subscription_id, "acknowledge"), &request, timeout)
121+
.await?;
126122

127123
if !response.status().is_success() {
128124
return Err(unexpected_http_status_code(response).await);
@@ -131,30 +127,10 @@ impl PubSubClient {
131127
Ok(())
132128
}
133129

134-
async fn send_request<R: Serialize>(
135-
&self,
136-
url: &str,
137-
request: &R,
138-
timeout: Option<Duration>,
139-
) -> Result<Response, Error> {
140-
let token = self
141-
.token_fetcher
142-
.fetch_token()
143-
.await
144-
.map_err(|source| Error::TokenFetch { source })?;
145-
146-
let request = self
147-
.reqwest_client
148-
.post(url)
149-
.bearer_auth(token.access_token())
150-
.json(request);
151-
let request = timeout.into_iter().fold(request, |r, t| r.timeout(t));
152-
153-
let response = request
154-
.send()
155-
.await
156-
.map_err(|source| Error::HttpServiceCommunication { source })?;
157-
Ok(response)
130+
fn url(&self, subscription_id: &str, action: &str) -> String {
131+
let base_url = &self.base_url;
132+
let project_id = &self.project_id;
133+
format!("{base_url}/v1/projects/{project_id}/subscriptions/{subscription_id}:{action}")
158134
}
159135
}
160136

@@ -303,7 +279,7 @@ mod tests {
303279
Ok(value)
304280
}
305281
"v2" => Ok(value),
306-
unknown => Err(anyhow!("Unknow version `{}`", unknown).into()),
282+
unknown => Err(anyhow!("Unknow version `{unknown}`").into()),
307283
}
308284
}
309285
}

tests/test.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,17 @@ async fn test() {
2323
// Set up testcontainers
2424
let docker_cli = Cli::default();
2525
let node = docker_cli.run(CloudSdk::pubsub());
26-
let base_url = format!("http://localhost:{}", node.get_host_port(PUBSUB_PORT));
27-
let topic_name = format!("projects/{}/topics/{}", PROJECT_ID, TOPIC_ID);
28-
let subscription_name = format!("projects/{}/subscriptions/{}", PROJECT_ID, SUBSCRIPTION_ID);
26+
let pubsub_port = node.get_host_port(PUBSUB_PORT);
27+
let base_url = format!("http://localhost:{pubsub_port}");
28+
let topic_name = format!("projects/{PROJECT_ID}/topics/{TOPIC_ID}");
29+
let subscription_name = format!("projects/{PROJECT_ID}/subscriptions/{SUBSCRIPTION_ID}");
2930

3031
// We interact with Pub/Sub via HTTP
3132
let reqwest_client = Client::new();
3233

3334
// Create topic
3435
let response = reqwest_client
35-
.put(format!("{}/v1/{}", base_url, topic_name))
36+
.put(format!("{base_url}/v1/{topic_name}"))
3637
.send()
3738
.await;
3839
assert!(response.is_ok());
@@ -41,7 +42,7 @@ async fn test() {
4142

4243
// Create subscription
4344
let response = reqwest_client
44-
.put(format!("{}/v1/{}", base_url, subscription_name))
45+
.put(format!("{base_url}/v1/{subscription_name}"))
4546
.json(&json!({ "topic": topic_name }))
4647
.send()
4748
.await;
@@ -53,7 +54,7 @@ async fn test() {
5354
let foo = base64::encode(json!({ "Foo": { "text": TEXT } }).to_string());
5455
let bar = base64::encode(json!({ "Bar": { "text": TEXT } }).to_string());
5556
let response = reqwest_client
56-
.post(format!("{}/v1/{}:publish", base_url, topic_name))
57+
.post(format!("{base_url}/v1/{topic_name}:publish"))
5758
.json(&json!(
5859
{
5960
"messages": [

0 commit comments

Comments
 (0)