Skip to content

Commit ed11ba4

Browse files
committed
implement Connection trait
1 parent 5908fb0 commit ed11ba4

11 files changed

+273
-97
lines changed

postgres-macros/tests/fail-nightly/enum_extra_variant.stderr

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ error[E0277]: the trait bound `Vec<User>: Query<Struct<(StructColumn<i64, "id">,
44
26 | let _: Vec<User> = sql!("SELECT id, role FROM users").await.unwrap();
55
| -^^^^^
66
| ||
7-
| |the trait `Query<Struct<(StructColumn<i64, "id">, StructColumn<sqlm_postgres::types::Enum<(EnumVariant<"admin">, EnumVariant<"user">)>, "role">)>>` is not implemented for `Vec<User>`
7+
| |the trait `Query<Struct<(StructColumn<i64, "id">, StructColumn<sqlm_postgres::types::Enum<(EnumVariant<"admin">, EnumVariant<"user">)>, "role">)>>` is not implemented for `Vec<User>`, which is required by `Sql<'_, Struct<(StructColumn<i64, "id">, StructColumn<sqlm_postgres::types::Enum<(EnumVariant<"admin">, EnumVariant<"user">)>, "role">)>, _>: IntoFuture`
88
| help: remove the `.await`
99
|
1010
= help: the trait `Query<Struct<(StructColumn<i64, "id">, StructColumn<sqlm_postgres::types::Enum<(EnumVariant<"admin">, EnumVariant<"moderator">, EnumVariant<"user">)>, "role">)>>` is implemented for `Vec<User>`

postgres-macros/tests/fail-nightly/enum_missing_variant.stderr

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ error[E0277]: the trait bound `Vec<User>: Query<Struct<(StructColumn<i64, "id">,
44
20 | let _: Vec<User> = sql!("SELECT id, role FROM users").await.unwrap();
55
| -^^^^^
66
| ||
7-
| |the trait `Query<Struct<(StructColumn<i64, "id">, StructColumn<sqlm_postgres::types::Enum<(EnumVariant<"admin">, EnumVariant<"user">)>, "role">)>>` is not implemented for `Vec<User>`
7+
| |the trait `Query<Struct<(StructColumn<i64, "id">, StructColumn<sqlm_postgres::types::Enum<(EnumVariant<"admin">, EnumVariant<"user">)>, "role">)>>` is not implemented for `Vec<User>`, which is required by `Sql<'_, Struct<(StructColumn<i64, "id">, StructColumn<sqlm_postgres::types::Enum<(EnumVariant<"admin">, EnumVariant<"user">)>, "role">)>, _>: IntoFuture`
88
| help: remove the `.await`
99
|
1010
= help: the trait `Query<Struct<(StructColumn<i64, "id">, StructColumn<sqlm_postgres::types::Enum<(EnumVariant<"user">,)>, "role">)>>` is implemented for `Vec<User>`

postgres-macros/tests/fail-nightly/enum_variant_mismatch.stderr

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ error[E0277]: the trait bound `Vec<User>: Query<Struct<(StructColumn<i64, "id">,
44
23 | let _: Vec<User> = sql!("SELECT id, role FROM users").await.unwrap();
55
| -^^^^^
66
| ||
7-
| |the trait `Query<Struct<(StructColumn<i64, "id">, StructColumn<sqlm_postgres::types::Enum<(EnumVariant<"admin">, EnumVariant<"user">)>, "role">)>>` is not implemented for `Vec<User>`
7+
| |the trait `Query<Struct<(StructColumn<i64, "id">, StructColumn<sqlm_postgres::types::Enum<(EnumVariant<"admin">, EnumVariant<"user">)>, "role">)>>` is not implemented for `Vec<User>`, which is required by `Sql<'_, Struct<(StructColumn<i64, "id">, StructColumn<sqlm_postgres::types::Enum<(EnumVariant<"admin">, EnumVariant<"user">)>, "role">)>, _>: IntoFuture`
88
| help: remove the `.await`
99
|
1010
= help: the trait `Query<Struct<(StructColumn<i64, "id">, StructColumn<sqlm_postgres::types::Enum<(EnumVariant<"moderator">, EnumVariant<"user">)>, "role">)>>` is implemented for `Vec<User>`

postgres-macros/tests/fail-stable/enum_extra_variant.stderr

+4-3
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@ error[E0277]: the trait bound `Vec<User>: Query<Struct<(StructColumn<i64, 689821
22
--> tests/fail-stable/enum_extra_variant.rs:26:59
33
|
44
26 | let _: Vec<User> = sql!("SELECT id, role FROM users").await.unwrap();
5-
| -^^^^^ the trait `Query<Struct<(StructColumn<i64, 6898215271518772730>, StructColumn<sqlm_postgres::types::Enum<(EnumVariant<5755620910692865178>, EnumVariant<10465144470622129318>)>, 18137070463969723500>)>>` is not implemented for `Vec<User>`
6-
| |
5+
| -^^^^^
6+
| ||
7+
| |the trait `Query<Struct<(StructColumn<i64, 6898215271518772730>, StructColumn<sqlm_postgres::types::Enum<(EnumVariant<5755620910692865178>, EnumVariant<10465144470622129318>)>, 18137070463969723500>)>>` is not implemented for `Vec<User>`
78
| help: remove the `.await`
89
|
9-
= help: the trait `Query<Struct<Cols>>` is implemented for `Vec<T>`
10+
= help: the trait `Query<Struct<(StructColumn<i64, 6898215271518772730>, StructColumn<sqlm_postgres::types::Enum<(EnumVariant<5755620910692865178>, EnumVariant<16036746858103170191>, EnumVariant<10465144470622129318>)>, 18137070463969723500>)>>` is implemented for `Vec<User>`
1011
= note: required for `Sql<'_, Struct<(StructColumn<i64, 6898215271518772730>, StructColumn<Enum<...>, 18137070463969723500>)>, ...>` to implement `IntoFuture`

postgres-macros/tests/fail-stable/enum_missing_variant.stderr

+4-3
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@ error[E0277]: the trait bound `Vec<User>: Query<Struct<(StructColumn<i64, 689821
22
--> tests/fail-stable/enum_missing_variant.rs:20:59
33
|
44
20 | let _: Vec<User> = sql!("SELECT id, role FROM users").await.unwrap();
5-
| -^^^^^ the trait `Query<Struct<(StructColumn<i64, 6898215271518772730>, StructColumn<sqlm_postgres::types::Enum<(EnumVariant<5755620910692865178>, EnumVariant<10465144470622129318>)>, 18137070463969723500>)>>` is not implemented for `Vec<User>`
6-
| |
5+
| -^^^^^
6+
| ||
7+
| |the trait `Query<Struct<(StructColumn<i64, 6898215271518772730>, StructColumn<sqlm_postgres::types::Enum<(EnumVariant<5755620910692865178>, EnumVariant<10465144470622129318>)>, 18137070463969723500>)>>` is not implemented for `Vec<User>`
78
| help: remove the `.await`
89
|
9-
= help: the trait `Query<Struct<Cols>>` is implemented for `Vec<T>`
10+
= help: the trait `Query<Struct<(StructColumn<i64, 6898215271518772730>, StructColumn<sqlm_postgres::types::Enum<(EnumVariant<10465144470622129318>,)>, 18137070463969723500>)>>` is implemented for `Vec<User>`
1011
= note: required for `Sql<'_, Struct<(StructColumn<i64, 6898215271518772730>, StructColumn<Enum<...>, 18137070463969723500>)>, ...>` to implement `IntoFuture`

postgres-macros/tests/fail-stable/enum_variant_mismatch.stderr

+4-3
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@ error[E0277]: the trait bound `Vec<User>: Query<Struct<(StructColumn<i64, 689821
22
--> tests/fail-stable/enum_variant_mismatch.rs:23:59
33
|
44
23 | let _: Vec<User> = sql!("SELECT id, role FROM users").await.unwrap();
5-
| -^^^^^ the trait `Query<Struct<(StructColumn<i64, 6898215271518772730>, StructColumn<sqlm_postgres::types::Enum<(EnumVariant<5755620910692865178>, EnumVariant<10465144470622129318>)>, 18137070463969723500>)>>` is not implemented for `Vec<User>`
6-
| |
5+
| -^^^^^
6+
| ||
7+
| |the trait `Query<Struct<(StructColumn<i64, 6898215271518772730>, StructColumn<sqlm_postgres::types::Enum<(EnumVariant<5755620910692865178>, EnumVariant<10465144470622129318>)>, 18137070463969723500>)>>` is not implemented for `Vec<User>`
78
| help: remove the `.await`
89
|
9-
= help: the trait `Query<Struct<Cols>>` is implemented for `Vec<T>`
10+
= help: the trait `Query<Struct<(StructColumn<i64, 6898215271518772730>, StructColumn<sqlm_postgres::types::Enum<(EnumVariant<16036746858103170191>, EnumVariant<10465144470622129318>)>, 18137070463969723500>)>>` is implemented for `Vec<User>`
1011
= note: required for `Sql<'_, Struct<(StructColumn<i64, 6898215271518772730>, StructColumn<Enum<...>, 18137070463969723500>)>, ...>` to implement `IntoFuture`

postgres/src/connection.rs

+167
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
// TODO: remove once Rust's async lifetime in trait story got improved
2+
#![allow(clippy::manual_async_fn)]
3+
4+
use std::future::Future;
5+
6+
use deadpool_postgres::GenericClient;
7+
use tokio_postgres::types::ToSql;
8+
use tokio_postgres::Row;
9+
10+
use crate::Error;
11+
12+
pub trait Connection: Send + Sync {
13+
fn query_one<'a>(
14+
&'a self,
15+
query: &'a str,
16+
parameters: &'a [&'a (dyn ToSql + Sync)],
17+
) -> impl Future<Output = Result<Row, Error>> + Send + 'a;
18+
19+
fn query_opt<'a>(
20+
&'a self,
21+
query: &'a str,
22+
parameters: &'a [&'a (dyn ToSql + Sync)],
23+
) -> impl Future<Output = Result<Option<Row>, Error>> + Send + 'a;
24+
25+
fn query<'a>(
26+
&'a self,
27+
query: &'a str,
28+
parameters: &'a [&'a (dyn ToSql + Sync)],
29+
) -> impl Future<Output = Result<Vec<Row>, Error>> + Send + 'a;
30+
31+
fn execute<'a>(
32+
&'a self,
33+
query: &'a str,
34+
parameters: &'a [&'a (dyn ToSql + Sync)],
35+
) -> impl Future<Output = Result<(), Error>> + Send + 'a;
36+
}
37+
38+
impl Connection for deadpool_postgres::Client {
39+
fn query_one<'a>(
40+
&'a self,
41+
query: &'a str,
42+
parameters: &'a [&'a (dyn ToSql + Sync)],
43+
) -> impl Future<Output = Result<Row, Error>> + Send + 'a {
44+
async move {
45+
let stmt = self.prepare_cached(query).await?;
46+
Ok(tokio_postgres::Client::query_one(self, &stmt, parameters).await?)
47+
}
48+
}
49+
50+
fn query_opt<'a>(
51+
&'a self,
52+
query: &'a str,
53+
parameters: &'a [&'a (dyn ToSql + Sync)],
54+
) -> impl Future<Output = Result<Option<Row>, Error>> + Send + 'a {
55+
async move {
56+
let stmt = self.prepare_cached(query).await?;
57+
Ok(tokio_postgres::Client::query_opt(self, &stmt, parameters).await?)
58+
}
59+
}
60+
61+
fn query<'a>(
62+
&'a self,
63+
query: &'a str,
64+
parameters: &'a [&'a (dyn ToSql + Sync)],
65+
) -> impl Future<Output = Result<Vec<Row>, Error>> + Send + 'a {
66+
async move {
67+
let stmt = self.prepare_cached(query).await?;
68+
Ok(tokio_postgres::Client::query(self, &stmt, parameters).await?)
69+
}
70+
}
71+
72+
fn execute<'a>(
73+
&'a self,
74+
query: &'a str,
75+
parameters: &'a [&'a (dyn ToSql + Sync)],
76+
) -> impl Future<Output = Result<(), Error>> + Send + 'a {
77+
async move {
78+
let stmt = self.prepare_cached(query).await?;
79+
tokio_postgres::Client::execute(self, &stmt, parameters).await?;
80+
Ok(())
81+
}
82+
}
83+
}
84+
85+
impl<'t> Connection for deadpool_postgres::Transaction<'t> {
86+
fn query_one<'a>(
87+
&'a self,
88+
query: &'a str,
89+
parameters: &'a [&'a (dyn ToSql + Sync)],
90+
) -> impl Future<Output = Result<Row, Error>> + Send + 'a {
91+
async move {
92+
let stmt = self.prepare_cached(query).await?;
93+
Ok(tokio_postgres::Transaction::query_one(self, &stmt, parameters).await?)
94+
}
95+
}
96+
97+
fn query_opt<'a>(
98+
&'a self,
99+
query: &'a str,
100+
parameters: &'a [&'a (dyn ToSql + Sync)],
101+
) -> impl Future<Output = Result<Option<Row>, Error>> + Send + 'a {
102+
async move {
103+
let stmt = self.prepare_cached(query).await?;
104+
Ok(tokio_postgres::Transaction::query_opt(self, &stmt, parameters).await?)
105+
}
106+
}
107+
108+
fn query<'a>(
109+
&'a self,
110+
query: &'a str,
111+
parameters: &'a [&'a (dyn ToSql + Sync)],
112+
) -> impl Future<Output = Result<Vec<Row>, Error>> + Send + 'a {
113+
async move {
114+
let stmt = self.prepare_cached(query).await?;
115+
Ok(tokio_postgres::Transaction::query(self, &stmt, parameters).await?)
116+
}
117+
}
118+
119+
fn execute<'a>(
120+
&'a self,
121+
query: &'a str,
122+
parameters: &'a [&'a (dyn ToSql + Sync)],
123+
) -> impl Future<Output = Result<(), Error>> + Send + 'a {
124+
async move {
125+
let stmt = self.prepare_cached(query).await?;
126+
tokio_postgres::Transaction::execute(self, &stmt, parameters).await?;
127+
Ok(())
128+
}
129+
}
130+
}
131+
132+
impl<'b, C> Connection for &'b C
133+
where
134+
C: Connection,
135+
{
136+
fn query_one<'a>(
137+
&'a self,
138+
query: &'a str,
139+
parameters: &'a [&'a (dyn ToSql + Sync)],
140+
) -> impl Future<Output = Result<Row, Error>> + Send + 'a {
141+
(*self).query_one(query, parameters)
142+
}
143+
144+
fn query_opt<'a>(
145+
&'a self,
146+
query: &'a str,
147+
parameters: &'a [&'a (dyn ToSql + Sync)],
148+
) -> impl Future<Output = Result<Option<Row>, Error>> + Send + 'a {
149+
(*self).query_opt(query, parameters)
150+
}
151+
152+
fn query<'a>(
153+
&'a self,
154+
query: &'a str,
155+
parameters: &'a [&'a (dyn ToSql + Sync)],
156+
) -> impl Future<Output = Result<Vec<Row>, Error>> + Send + 'a {
157+
(*self).query(query, parameters)
158+
}
159+
160+
fn execute<'a>(
161+
&'a self,
162+
query: &'a str,
163+
parameters: &'a [&'a (dyn ToSql + Sync)],
164+
) -> impl Future<Output = Result<(), Error>> + Send + 'a {
165+
(*self).execute(query, parameters)
166+
}
167+
}

postgres/src/future.rs

+61-6
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,33 @@ where
1919
type IntoFuture = SqlFuture<'a, T>;
2020

2121
fn into_future(self) -> Self::IntoFuture {
22+
SqlFuture::new(self)
23+
}
24+
}
25+
26+
pub struct SqlFuture<'a, T> {
27+
future: Pin<Box<dyn Future<Output = Result<T, Error>> + Send + 'a>>,
28+
marker: PhantomData<&'a ()>,
29+
}
30+
31+
impl<'a, T> SqlFuture<'a, T> {
32+
pub fn new<Cols>(sql: Sql<'a, Cols, T>) -> Self
33+
where
34+
T: Query<Cols> + Send + Sync + 'a,
35+
Cols: Send + Sync + 'a,
36+
{
2237
let span =
23-
tracing::debug_span!("sql query", query = self.query, parameters = ?self.parameters);
38+
tracing::debug_span!("sql query", query = sql.query, parameters = ?sql.parameters);
2439
let start = Instant::now();
2540

2641
SqlFuture {
2742
future: Box::pin(
43+
// Note: changes here must be applied to `with_connection` below too!
2844
async move {
2945
let mut i = 1;
3046
loop {
31-
match T::query(&self).await {
47+
let conn = super::connect().await?;
48+
match T::query(&sql, &conn).await {
3249
Ok(r) => {
3350
let elapsed = start.elapsed();
3451
tracing::trace!(?elapsed, "sql query finished");
@@ -55,11 +72,49 @@ where
5572
marker: PhantomData,
5673
}
5774
}
58-
}
5975

60-
pub struct SqlFuture<'a, T> {
61-
future: Pin<Box<dyn Future<Output = Result<T, Error>> + Send + 'a>>,
62-
marker: PhantomData<&'a ()>,
76+
pub fn with_connection<Cols>(sql: Sql<'a, Cols, T>, conn: impl super::Connection + 'a) -> Self
77+
where
78+
T: Query<Cols> + Send + Sync + 'a,
79+
Cols: Send + Sync + 'a,
80+
{
81+
let span =
82+
tracing::debug_span!("sql query", query = sql.query, parameters = ?sql.parameters);
83+
let start = Instant::now();
84+
85+
SqlFuture {
86+
future: Box::pin(
87+
// Note: changes here must be applied to `bew` above too!
88+
async move {
89+
let mut i = 1;
90+
loop {
91+
match T::query(&sql, &conn).await {
92+
Ok(r) => {
93+
let elapsed = start.elapsed();
94+
tracing::trace!(?elapsed, "sql query finished");
95+
return Ok(r);
96+
}
97+
Err(Error {
98+
kind: ErrorKind::Postgres(err),
99+
..
100+
}) if err.is_closed() && i <= 5 => {
101+
// retry pool size + 1 times if connection is closed (might have
102+
// received a closed one from the connection pool)
103+
i += 1;
104+
tracing::trace!("retry due to connection closed error");
105+
continue;
106+
}
107+
Err(err) => {
108+
return Err(err);
109+
}
110+
}
111+
}
112+
}
113+
.instrument(span),
114+
),
115+
marker: PhantomData,
116+
}
117+
}
63118
}
64119

65120
impl<'a, T> Future for SqlFuture<'a, T> {

0 commit comments

Comments
 (0)