Skip to content

Commit 5695b81

Browse files
committed
tide: add dataloader example
tide: update dataloader example, should return all keys HashMap in BatchFn tide: update dataloader example, support returns sql error tide: clean
1 parent cac74d5 commit 5695b81

File tree

4 files changed

+290
-1
lines changed

4 files changed

+290
-1
lines changed

.gitignore

+3
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,6 @@ node_modules
33
.idea
44
.DS_Store
55
Cargo.lock
6+
memory:
7+
memory:*
8+

Cargo.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,5 @@ members = [
2020

2121
"tide/starwars",
2222
"tide/token-from-header",
23-
]
23+
"tide/dataloader",
24+
]

tide/dataloader/Cargo.toml

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
[package]
2+
name = "tide-dataloader"
3+
version = "0.1.0"
4+
authors = ["vkill <[email protected]>"]
5+
edition = "2018"
6+
7+
[dependencies]
8+
async-graphql = "1.11.0"
9+
async-graphql-tide = "1.2.0"
10+
tide = "0.8"
11+
async-std = "1.5.0"
12+
dataloader = "0.12.0"
13+
sqlx = { version = "0.3.5", features = ["sqlite"] }
14+
async-trait = "0.1.30"
15+
16+
[dev-dependencies]
17+
serde_json = "1.0.51"
18+
surf = "2.0.0-alpha.1"

tide/dataloader/src/main.rs

+267
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
use async_graphql::http::playground_source;
2+
use async_graphql::{Context, EmptyMutation, EmptySubscription, FieldResult, Schema};
3+
use async_std::task;
4+
use async_trait::async_trait;
5+
use dataloader::cached::Loader;
6+
use dataloader::BatchFn;
7+
use sqlx::{sqlite::SqliteQueryAs, Pool, SqliteConnection};
8+
use std::collections::HashMap;
9+
use std::env;
10+
use std::result;
11+
use tide::{
12+
http::{headers, mime},
13+
Request, Response, StatusCode,
14+
};
15+
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
16+
17+
#[derive(sqlx::FromRow, Clone)]
18+
pub struct Book {
19+
id: i32,
20+
name: String,
21+
author: String,
22+
}
23+
24+
#[async_graphql::Object]
25+
impl Book {
26+
async fn id(&self) -> &i32 {
27+
&self.id
28+
}
29+
30+
async fn name(&self) -> &str {
31+
&self.name
32+
}
33+
34+
async fn author(&self) -> &str {
35+
&self.author
36+
}
37+
}
38+
39+
#[derive(Clone)]
40+
enum BatchFnLoadError {
41+
NotFound,
42+
DBError(String),
43+
}
44+
45+
pub struct BookBatcher(Pool<SqliteConnection>);
46+
impl BookBatcher {
47+
fn new(sqlite_pool: Pool<SqliteConnection>) -> Self {
48+
Self(sqlite_pool)
49+
}
50+
}
51+
type BookBatcherLoadHashMapValue = result::Result<Book, BatchFnLoadError>;
52+
53+
#[async_trait]
54+
impl BatchFn<i32, BookBatcherLoadHashMapValue> for BookBatcher {
55+
async fn load(&self, keys: &[i32]) -> HashMap<i32, BookBatcherLoadHashMapValue> {
56+
println!("load book by batch {:?}", keys);
57+
58+
if keys.contains(&9) {
59+
return keys
60+
.iter()
61+
.map(|k| {
62+
(
63+
*k,
64+
Err(BatchFnLoadError::DBError("MOCK DBError".to_owned())),
65+
)
66+
})
67+
.collect();
68+
}
69+
70+
let stmt = format!(
71+
r#"SELECT id, name, author FROM books WHERE id in ({})"#,
72+
(0..keys.len())
73+
.map(|i| format!("${}", i + 1))
74+
.collect::<Vec<String>>()
75+
.join(",")
76+
);
77+
78+
let books: result::Result<Vec<Book>, sqlx::Error> = keys
79+
.iter()
80+
.fold(sqlx::query_as(&stmt), |q, key| q.bind(key))
81+
.fetch_all(&self.0)
82+
.await;
83+
84+
match books {
85+
Ok(books) => {
86+
let books_map = books.into_iter().map(|book| (book.id, Ok(book))).collect();
87+
88+
keys.iter().fold(
89+
books_map,
90+
|mut map: HashMap<i32, BookBatcherLoadHashMapValue>, key| {
91+
map.entry(*key).or_insert(Err(BatchFnLoadError::NotFound));
92+
map
93+
},
94+
)
95+
}
96+
Err(e) => keys
97+
.iter()
98+
.map(|k| (*k, Err(BatchFnLoadError::DBError(e.to_string()))))
99+
.collect(),
100+
}
101+
}
102+
}
103+
104+
struct QueryRoot;
105+
106+
#[async_graphql::Object]
107+
impl QueryRoot {
108+
async fn book(&self, ctx: &Context<'_>, id: i32) -> FieldResult<Option<Book>> {
109+
println!("pre load book by id {:?}", id);
110+
match ctx
111+
.data::<Loader<i32, BookBatcherLoadHashMapValue, BookBatcher>>()
112+
.load(id)
113+
.await
114+
{
115+
Ok(book) => Ok(Some(book)),
116+
Err(err) => match err {
117+
BatchFnLoadError::NotFound => Ok(None),
118+
BatchFnLoadError::DBError(db_err) => Err(db_err.into()),
119+
},
120+
}
121+
}
122+
}
123+
124+
struct AppState {
125+
schema: Schema<QueryRoot, EmptyMutation, EmptySubscription>,
126+
}
127+
128+
fn main() -> Result<()> {
129+
task::block_on(run())
130+
}
131+
132+
async fn run() -> Result<()> {
133+
let sqlite_pool: Pool<SqliteConnection> = Pool::new("sqlite::memory:").await?;
134+
135+
sqlx::query(
136+
r#"
137+
CREATE TABLE IF NOT EXISTS books (
138+
id INTEGER PRIMARY KEY NOT NULL,
139+
name TEXT NOT NULL,
140+
author TEXT NOT NULL
141+
);
142+
"#,
143+
)
144+
.execute(&sqlite_pool)
145+
.await?;
146+
147+
sqlx::query(
148+
r#"
149+
INSERT OR IGNORE INTO books (id, name, author)
150+
VALUES (1, 'name1', 'author1'), (2, 'name2', 'author2'), (3, 'name3', 'author3')
151+
;
152+
"#,
153+
)
154+
.execute(&sqlite_pool)
155+
.await?;
156+
157+
let book_loader = Loader::new(BookBatcher::new(sqlite_pool));
158+
159+
let schema = Schema::build(QueryRoot, EmptyMutation, EmptySubscription)
160+
.data(book_loader)
161+
.finish();
162+
163+
let app_state = AppState { schema };
164+
let mut app = tide::with_state(app_state);
165+
166+
app.at("/").post(|req: Request<AppState>| async move {
167+
let schema = req.state().schema.clone();
168+
async_graphql_tide::graphql(req, schema, |query_builder| query_builder).await
169+
});
170+
app.at("/").get(|_| async move {
171+
let resp = Response::new(StatusCode::Ok)
172+
.body_string(playground_source("/", None))
173+
.set_header(headers::CONTENT_TYPE, mime::HTML.to_string());
174+
175+
Ok(resp)
176+
});
177+
178+
let listen_addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "localhost:8000".to_owned());
179+
println!("Playground: http://{}", listen_addr);
180+
app.listen(listen_addr).await?;
181+
182+
Ok(())
183+
}
184+
185+
#[cfg(test)]
186+
mod tests {
187+
use super::*;
188+
use async_std::prelude::*;
189+
use serde_json::{json, Value};
190+
use std::time::Duration;
191+
192+
#[test]
193+
fn sample() -> Result<()> {
194+
task::block_on(async {
195+
let listen_addr = find_listen_addr().await;
196+
env::set_var("LISTEN_ADDR", format!("{}", listen_addr));
197+
198+
let server: task::JoinHandle<Result<()>> = task::spawn(async move {
199+
run().await?;
200+
201+
Ok(())
202+
});
203+
204+
let client: task::JoinHandle<Result<()>> = task::spawn(async move {
205+
let listen_addr = env::var("LISTEN_ADDR").unwrap();
206+
207+
task::sleep(Duration::from_millis(300)).await;
208+
209+
//
210+
let string = surf::post(format!("http://{}", listen_addr))
211+
.body_string(
212+
r#"{"query":"{ book1: book(id: 1) {id, name, author} book2: book(id: 2) {id, name, author} book3: book(id: 3) {id, name, author} book4: book(id: 4) {id, name, author} }"}"#
213+
.to_owned(),
214+
)
215+
.set_header("Content-Type".parse().unwrap(), "application/json")
216+
.recv_string()
217+
.await?;
218+
println!("{}", string);
219+
220+
let v: Value = serde_json::from_str(&string)?;
221+
assert_eq!(
222+
v["data"]["book1"],
223+
json!({"id": 1, "name": "name1", "author": "author1"})
224+
);
225+
assert_eq!(
226+
v["data"]["book2"],
227+
json!({"id": 2, "name": "name2", "author": "author2"})
228+
);
229+
assert_eq!(
230+
v["data"]["book3"],
231+
json!({"id": 3, "name": "name3", "author": "author3"})
232+
);
233+
assert_eq!(v["data"]["book4"], json!(null));
234+
235+
//
236+
let string = surf::post(format!("http://{}", listen_addr))
237+
.body_string(
238+
r#"{"query":"{ book1: book(id: 1) {id, name, author} book4: book(id: 4) {id, name, author} book9: book(id: 9) {id, name, author} }"}"#
239+
.to_owned(),
240+
)
241+
.set_header("Content-Type".parse().unwrap(), "application/json")
242+
.recv_string()
243+
.await?;
244+
println!("{}", string);
245+
246+
let v: Value = serde_json::from_str(&string)?;
247+
let error = v["errors"].as_array().unwrap()[0].clone();
248+
assert_eq!(error["message"], json!("MOCK DBError"));
249+
assert_eq!(error["path"].to_string(), r#"["book9"]"#);
250+
251+
Ok(())
252+
});
253+
254+
server.race(client).await?;
255+
256+
Ok(())
257+
})
258+
}
259+
260+
async fn find_listen_addr() -> async_std::net::SocketAddr {
261+
async_std::net::TcpListener::bind("localhost:0")
262+
.await
263+
.unwrap()
264+
.local_addr()
265+
.unwrap()
266+
}
267+
}

0 commit comments

Comments
 (0)