Skip to content

Commit 5ab543f

Browse files
author
Marc-Andre Giroux
committed
poc: PooledPlanner
1 parent 35052d8 commit 5ab543f

File tree

4 files changed

+364
-0
lines changed

4 files changed

+364
-0
lines changed

federation-2/router-bridge/src/error.rs

+5
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,9 @@ pub enum Error {
3636
/// The deno response id we tried to deserialize.
3737
id: String,
3838
},
39+
/// An uncaught error was raised when invoking a custom script.
40+
///
41+
/// This contains the script invocation error message.
42+
#[error("internal error: `{0}`")]
43+
Internal(String),
3944
}

federation-2/router-bridge/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,5 @@ pub mod error;
1010
pub mod introspect;
1111
mod js;
1212
pub mod planner;
13+
mod pool;
1314
mod worker;

federation-2/router-bridge/src/planner.rs

+231
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::fmt::Debug;
66
use std::fmt::Display;
77
use std::fmt::Formatter;
88
use std::marker::PhantomData;
9+
use std::num::NonZeroUsize;
910
use std::sync::Arc;
1011

1112
use serde::de::DeserializeOwned;
@@ -14,6 +15,7 @@ use serde::Serialize;
1415
use thiserror::Error;
1516

1617
use crate::introspect::IntrospectionResponse;
18+
use crate::pool::JsWorkerPool;
1719
use crate::worker::JsWorker;
1820

1921
// ------------------------------------
@@ -398,6 +400,235 @@ where
398400
}
399401
}
400402

403+
/// A Deno worker backed query Planner,
404+
/// using a pool of JsRuntimes load balanced
405+
/// using Power of Two Choices.
406+
pub struct PooledPlanner<T>
407+
where
408+
T: DeserializeOwned + Send + Debug + 'static,
409+
{
410+
pool: Arc<JsWorkerPool>,
411+
schema_id: u64,
412+
t: PhantomData<T>,
413+
}
414+
415+
impl<T> Debug for PooledPlanner<T>
416+
where
417+
T: DeserializeOwned + Send + Debug + 'static,
418+
{
419+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
420+
f.debug_struct("PooledPlanner")
421+
.field("schema_id", &self.schema_id)
422+
.finish()
423+
}
424+
}
425+
426+
impl<T> PooledPlanner<T>
427+
where
428+
T: DeserializeOwned + Send + Debug + 'static,
429+
{
430+
/// Instantiate a `Planner` from a schema string
431+
pub async fn new(
432+
schema: String,
433+
config: QueryPlannerConfig,
434+
pool_size: NonZeroUsize,
435+
) -> Result<Self, Vec<PlannerError>> {
436+
let schema_id: u64 = rand::random();
437+
438+
let pool = JsWorkerPool::new(include_str!("../bundled/plan_worker.js"), pool_size);
439+
440+
let workers_are_setup = pool
441+
.broadcast_request::<PlanCmd, BridgeSetupResult<serde_json::Value>>(
442+
PlanCmd::UpdateSchema {
443+
schema,
444+
config,
445+
schema_id,
446+
},
447+
)
448+
.await
449+
.map_err(|e| {
450+
vec![WorkerError {
451+
name: Some("planner setup error".to_string()),
452+
message: Some(e.to_string()),
453+
stack: None,
454+
extensions: None,
455+
locations: Default::default(),
456+
}
457+
.into()]
458+
});
459+
460+
// Both cases below the mean schema update failed.
461+
// We need to pay attention here.
462+
// returning early will drop the worker, which will join the jsruntime thread.
463+
// however the event loop will run for ever. We need to let the worker know it needs to exit,
464+
// before we drop the worker
465+
match workers_are_setup {
466+
Err(setup_error) => {
467+
let _ = pool
468+
.broadcast_request::<PlanCmd, serde_json::Value>(PlanCmd::Exit { schema_id })
469+
.await;
470+
return Err(setup_error);
471+
}
472+
Ok(responses) => {
473+
for r in responses {
474+
if let Some(error) = r.errors {
475+
let _ = pool.broadcast_send(None, PlanCmd::Exit { schema_id }).await;
476+
return Err(error);
477+
}
478+
}
479+
}
480+
}
481+
482+
let pool = Arc::new(pool);
483+
484+
Ok(Self {
485+
pool,
486+
schema_id,
487+
t: PhantomData,
488+
})
489+
}
490+
491+
/// Update `Planner` from a schema string
492+
pub async fn update(
493+
&self,
494+
schema: String,
495+
config: QueryPlannerConfig,
496+
) -> Result<Self, Vec<PlannerError>> {
497+
let schema_id: u64 = rand::random();
498+
499+
let workers_are_setup = self
500+
.pool
501+
.broadcast_request::<PlanCmd, BridgeSetupResult<serde_json::Value>>(
502+
PlanCmd::UpdateSchema {
503+
schema,
504+
config,
505+
schema_id,
506+
},
507+
)
508+
.await
509+
.map_err(|e| {
510+
vec![WorkerError {
511+
name: Some("planner setup error".to_string()),
512+
message: Some(e.to_string()),
513+
stack: None,
514+
extensions: None,
515+
locations: Default::default(),
516+
}
517+
.into()]
518+
});
519+
520+
// If the update failed, we keep the existing schema in place
521+
match workers_are_setup {
522+
Err(setup_error) => {
523+
return Err(setup_error);
524+
}
525+
Ok(responses) => {
526+
for r in responses {
527+
if let Some(error) = r.errors {
528+
let _ = self
529+
.pool
530+
.broadcast_send(None, PlanCmd::Exit { schema_id })
531+
.await;
532+
return Err(error);
533+
}
534+
}
535+
}
536+
}
537+
538+
Ok(Self {
539+
pool: self.pool.clone(),
540+
schema_id,
541+
t: PhantomData,
542+
})
543+
}
544+
545+
/// Plan a query against an instantiated query planner
546+
pub async fn plan(
547+
&self,
548+
query: String,
549+
operation_name: Option<String>,
550+
options: PlanOptions,
551+
) -> Result<PlanResult<T>, crate::error::Error> {
552+
self.pool
553+
.request(PlanCmd::Plan {
554+
query,
555+
operation_name,
556+
schema_id: self.schema_id,
557+
options,
558+
})
559+
.await
560+
}
561+
562+
/// Generate the API schema from the current schema
563+
pub async fn api_schema(&self) -> Result<ApiSchema, crate::error::Error> {
564+
self.pool
565+
.request(PlanCmd::ApiSchema {
566+
schema_id: self.schema_id,
567+
})
568+
.await
569+
}
570+
571+
/// Generate the introspection response for this query
572+
pub async fn introspect(
573+
&self,
574+
query: String,
575+
) -> Result<IntrospectionResponse, crate::error::Error> {
576+
self.pool
577+
.request(PlanCmd::Introspect {
578+
query,
579+
schema_id: self.schema_id,
580+
})
581+
.await
582+
}
583+
584+
/// Get the operation signature for a query
585+
pub async fn operation_signature(
586+
&self,
587+
query: String,
588+
operation_name: Option<String>,
589+
) -> Result<String, crate::error::Error> {
590+
self.pool
591+
.request(PlanCmd::Signature {
592+
query,
593+
operation_name,
594+
schema_id: self.schema_id,
595+
})
596+
.await
597+
}
598+
599+
/// Extract the subgraph schemas from the supergraph schema
600+
pub async fn subgraphs(&self) -> Result<HashMap<String, String>, crate::error::Error> {
601+
self.pool
602+
.request(PlanCmd::Subgraphs {
603+
schema_id: self.schema_id,
604+
})
605+
.await
606+
}
607+
}
608+
609+
impl<T> Drop for PooledPlanner<T>
610+
where
611+
T: DeserializeOwned + Send + Debug + 'static,
612+
{
613+
fn drop(&mut self) {
614+
// Send a PlanCmd::Exit signal
615+
let pool_clone = self.pool.clone();
616+
let schema_id = self.schema_id;
617+
let _ = std::thread::spawn(move || {
618+
let runtime = tokio::runtime::Builder::new_current_thread()
619+
.build()
620+
.unwrap();
621+
622+
let _ = runtime.block_on(async move {
623+
pool_clone
624+
.broadcast_send(None, PlanCmd::Exit { schema_id })
625+
.await
626+
});
627+
})
628+
.join();
629+
}
630+
}
631+
401632
/// A Deno worker backed query Planner.
402633
403634
pub struct Planner<T>
+127
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
use rand::Rng;
2+
use serde::de::DeserializeOwned;
3+
use serde::Serialize;
4+
use std::fmt::Debug;
5+
use std::sync::atomic::Ordering;
6+
use std::{num::NonZeroUsize, sync::atomic::AtomicUsize};
7+
8+
use std::sync::Arc;
9+
use tokio::task::JoinSet;
10+
11+
use crate::{error::Error, worker::JsWorker};
12+
13+
pub(crate) struct JsWorkerPool {
14+
workers: Vec<Arc<JsWorker>>,
15+
pending_requests: Vec<AtomicUsize>,
16+
}
17+
18+
impl JsWorkerPool {
19+
pub(crate) fn new(worker_source_code: &'static str, size: NonZeroUsize) -> Self {
20+
let workers: Vec<Arc<JsWorker>> = (0..size.into())
21+
.map(|_| Arc::new(JsWorker::new(worker_source_code)))
22+
.collect();
23+
24+
let pending_requests: Vec<AtomicUsize> =
25+
(0..size.into()).map(|_| AtomicUsize::new(0)).collect();
26+
27+
Self {
28+
workers,
29+
pending_requests,
30+
}
31+
}
32+
33+
pub(crate) async fn request<Request, Response>(
34+
&self,
35+
command: Request,
36+
) -> Result<Response, Error>
37+
where
38+
Request: std::hash::Hash + Serialize + Send + Debug + 'static,
39+
Response: DeserializeOwned + Send + Debug + 'static,
40+
{
41+
let (i, worker) = self.choice_of_two();
42+
43+
self.pending_requests[i].fetch_add(1, Ordering::SeqCst);
44+
let result = worker.request(command).await;
45+
self.pending_requests[i].fetch_add(1, Ordering::SeqCst);
46+
47+
result
48+
}
49+
50+
pub(crate) async fn broadcast_request<Request, Response>(
51+
&self,
52+
command: Request,
53+
) -> Result<Vec<Response>, Error>
54+
where
55+
Request: std::hash::Hash + Serialize + Send + Debug + Clone + 'static,
56+
Response: DeserializeOwned + Send + Debug + 'static,
57+
{
58+
let mut join_set = JoinSet::new();
59+
60+
#[allow(clippy::unnecessary_to_owned)]
61+
for worker in self.workers.iter().cloned() {
62+
let command_clone = command.clone();
63+
64+
join_set.spawn(async move { worker.request(command_clone).await });
65+
}
66+
67+
let mut responses = Vec::new();
68+
69+
while let Some(result) = join_set.join_next().await {
70+
let response = result.map_err(|_e| Error::Internal("could not join spawned task".into()))?;
71+
responses.push(response?);
72+
}
73+
74+
Ok(responses)
75+
}
76+
77+
pub(crate) async fn broadcast_send<Request>(
78+
&self,
79+
id_opt: Option<String>,
80+
request: Request,
81+
) -> Result<(), Error>
82+
where
83+
Request: std::hash::Hash + Serialize + Send + Debug + Clone + 'static,
84+
{
85+
let mut join_set = JoinSet::new();
86+
87+
#[allow(clippy::unnecessary_to_owned)]
88+
for worker in self.workers.iter().cloned() {
89+
let request_clone = request.clone();
90+
let id_opt_clone = id_opt.clone();
91+
92+
join_set.spawn(async move { worker.send(id_opt_clone, request_clone).await });
93+
}
94+
95+
let mut results = Vec::new();
96+
97+
while let Some(result) = join_set.join_next().await {
98+
let result = result.map_err(|_e| Error::Internal("could not join spawned task".into()))?;
99+
results.push(result?);
100+
}
101+
102+
Ok(())
103+
}
104+
105+
fn choice_of_two(&self) -> (usize, &JsWorker) {
106+
let mut rng = rand::thread_rng();
107+
108+
let len = self.workers.len();
109+
110+
let index1 = rng.gen_range(0..len);
111+
let mut index2 = rng.gen_range(0..len);
112+
while index2 == index1 {
113+
index2 = rng.gen_range(0..len);
114+
}
115+
116+
let index1_load = &self.pending_requests[index1].load(Ordering::SeqCst);
117+
let index2_load = &self.pending_requests[index2].load(Ordering::SeqCst);
118+
119+
let choice = if index1_load < index2_load {
120+
index1
121+
} else {
122+
index2
123+
};
124+
125+
(choice, &self.workers[choice])
126+
}
127+
}

0 commit comments

Comments
 (0)