forked from grafbase/grafbase
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgateway_nanny.rs
136 lines (112 loc) · 4.1 KB
/
gateway_nanny.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
use std::collections::HashMap;
use std::sync::Arc;
use crate::ConfigWatcher;
use super::bus::{EngineSender, GraphWatcher};
use engine_v2::Engine;
use futures_concurrency::stream::Merge;
use futures_util::{future::BoxFuture, stream::BoxStream, FutureExt as _, StreamExt};
use runtime::rate_limiting::KeyedRateLimitConfig;
use runtime_local::rate_limiting::in_memory::key_based::InMemoryRateLimiter;
use tokio::sync::mpsc;
use tokio_stream::wrappers::WatchStream;
/// The GatewayNanny looks after the `Gateway` - on updates to the graph or config it'll
/// create a new `Gateway` and publish it on the gateway channel
pub(crate) struct EngineNanny {
graph: GraphWatcher,
config: ConfigWatcher,
sender: EngineSender,
}
impl EngineNanny {
pub fn new(graph: GraphWatcher, config: ConfigWatcher, sender: EngineSender) -> Self {
Self { graph, config, sender }
}
pub async fn handler(self) {
log::trace!("starting the gateway nanny");
let streams: [BoxStream<'static, NannyMessage>; 2] = [
Box::pin(WatchStream::new(self.graph.clone()).map(|_| NannyMessage::GraphUpdated)),
Box::pin(WatchStream::new(self.config.clone()).map(|_| NannyMessage::ConfigUpdated)),
];
let mut stream = streams.merge();
while let Some(message) = stream.next().await {
log::trace!("nanny received a {message:?}");
let config = self
.graph
.borrow()
.clone()
.map(|graph| engine_config_builder::build_config(&self.config.borrow(), graph));
let gateway = new_gateway(config).await;
if let Err(error) = self.sender.send(gateway) {
log::error!("Couldn't publish new gateway: {error:?}");
}
}
}
}
pub(super) async fn new_gateway(config: Option<engine_v2::VersionedConfig>) -> Option<Arc<Engine<CliRuntime>>> {
let config = config?.into_latest();
let rate_limiting_configs = config
.as_keyed_rate_limit_config()
.into_iter()
.map(|(k, v)| {
(
k.to_string(),
runtime::rate_limiting::GraphRateLimit {
limit: v.limit,
duration: v.duration,
},
)
})
.collect::<HashMap<_, _>>();
let (_, rx) = mpsc::channel(100);
let runtime = CliRuntime {
fetcher: runtime_local::NativeFetcher::runtime_fetcher(),
trusted_documents: runtime::trusted_documents_client::Client::new(
runtime_noop::trusted_documents::NoopTrustedDocuments,
),
kv: runtime_local::InMemoryKvStore::runtime(),
meter: grafbase_telemetry::metrics::meter_from_global_provider(),
rate_limiter: InMemoryRateLimiter::runtime(KeyedRateLimitConfig { rate_limiting_configs }, rx),
};
let schema = config.try_into().ok()?;
let engine = Engine::new(Arc::new(schema), None, runtime).await;
Some(Arc::new(engine))
}
pub struct CliRuntime {
fetcher: runtime::fetch::Fetcher,
trusted_documents: runtime::trusted_documents_client::Client,
kv: runtime::kv::KvStore,
meter: grafbase_telemetry::otel::opentelemetry::metrics::Meter,
rate_limiter: runtime::rate_limiting::RateLimiter,
}
impl engine_v2::Runtime for CliRuntime {
type Hooks = ();
type CacheFactory = ();
fn fetcher(&self) -> &runtime::fetch::Fetcher {
&self.fetcher
}
fn trusted_documents(&self) -> &runtime::trusted_documents_client::Client {
&self.trusted_documents
}
fn kv(&self) -> &runtime::kv::KvStore {
&self.kv
}
fn meter(&self) -> &grafbase_telemetry::otel::opentelemetry::metrics::Meter {
&self.meter
}
fn hooks(&self) -> &() {
&()
}
fn cache_factory(&self) -> &() {
&()
}
fn rate_limiter(&self) -> &runtime::rate_limiting::RateLimiter {
&self.rate_limiter
}
fn sleep(&self, duration: std::time::Duration) -> BoxFuture<'static, ()> {
tokio::time::sleep(duration).boxed()
}
}
#[derive(Debug)]
enum NannyMessage {
GraphUpdated,
ConfigUpdated,
}