-
Notifications
You must be signed in to change notification settings - Fork 33
/
Copy pathcontroller.rs
313 lines (286 loc) · 11.4 KB
/
controller.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
use crate::{telemetry, Error, Metrics, Result};
use chrono::{DateTime, Utc};
use futures::StreamExt;
use kube::{
api::{Api, ListParams, Patch, PatchParams, ResourceExt},
client::Client,
runtime::{
controller::{Action, Controller},
events::{Event, EventType, Recorder, Reporter},
finalizer::{finalizer, Event as Finalizer},
watcher::Config,
},
CustomResource, Resource,
};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::sync::Arc;
use tokio::{sync::RwLock, time::Duration};
use tracing::*;
pub static DOCUMENT_FINALIZER: &str = "documents.kube.rs";
/// Generate the Kubernetes wrapper struct `Document` from our Spec and Status struct
///
/// This provides a hook for generating the CRD yaml (in crdgen.rs)
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[cfg_attr(test, derive(Default))]
#[kube(kind = "Document", group = "kube.rs", version = "v1", namespaced)]
#[kube(status = "DocumentStatus", shortname = "doc")]
pub struct DocumentSpec {
pub title: String,
pub hide: bool,
pub content: String,
}
/// The status object of `Document`
#[derive(Deserialize, Serialize, Clone, Default, Debug, JsonSchema)]
pub struct DocumentStatus {
pub hidden: bool,
}
impl Document {
fn was_hidden(&self) -> bool {
self.status.as_ref().map(|s| s.hidden).unwrap_or(false)
}
}
// Context for our reconciler
#[derive(Clone)]
pub struct Context {
/// Kubernetes client
pub client: Client,
/// Diagnostics read by the web server
pub diagnostics: Arc<RwLock<Diagnostics>>,
/// Prometheus metrics
pub metrics: Arc<Metrics>,
}
#[instrument(skip(ctx, doc), fields(trace_id))]
async fn reconcile(doc: Arc<Document>, ctx: Arc<Context>) -> Result<Action> {
let trace_id = telemetry::get_trace_id();
Span::current().record("trace_id", &field::display(&trace_id));
let _timer = ctx.metrics.app.reconcile.count_and_measure();
ctx.diagnostics.write().await.last_event = Utc::now();
let ns = doc.namespace().unwrap(); // doc is namespace scoped
let docs: Api<Document> = Api::namespaced(ctx.client.clone(), &ns);
info!("Reconciling Document \"{}\" in {}", doc.name_any(), ns);
finalizer(&docs, DOCUMENT_FINALIZER, doc, |event| async {
match event {
Finalizer::Apply(doc) => doc.reconcile(ctx.clone()).await,
Finalizer::Cleanup(doc) => doc.cleanup(ctx.clone()).await,
}
})
.await
.map_err(|e| Error::FinalizerError(Box::new(e)))
}
fn error_policy(doc: Arc<Document>, error: &Error, ctx: Arc<Context>) -> Action {
warn!("reconcile failed: {:?}", error);
ctx.metrics.app.reconcile.set_failure(&doc, error);
Action::requeue(Duration::from_secs(5 * 60))
}
impl Document {
// Reconcile (for non-finalizer related changes)
async fn reconcile(&self, ctx: Arc<Context>) -> Result<Action> {
let client = ctx.client.clone();
let recorder = ctx.diagnostics.read().await.recorder(client.clone(), self);
let ns = self.namespace().unwrap();
let name = self.name_any();
let docs: Api<Document> = Api::namespaced(client, &ns);
let should_hide = self.spec.hide;
if !self.was_hidden() && should_hide {
// send an event once per hide
recorder
.publish(Event {
type_: EventType::Normal,
reason: "HideRequested".into(),
note: Some(format!("Hiding `{name}`")),
action: "Hiding".into(),
secondary: None,
})
.await
.map_err(Error::KubeError)?;
}
if name == "illegal" {
return Err(Error::IllegalDocument); // error names show up in metrics
}
// always overwrite status object with what we saw
let new_status = Patch::Apply(json!({
"apiVersion": "kube.rs/v1",
"kind": "Document",
"status": DocumentStatus {
hidden: should_hide,
}
}));
let ps = PatchParams::apply("cntrlr").force();
let _o = docs
.patch_status(&name, &ps, &new_status)
.await
.map_err(Error::KubeError)?;
// If no events were received, check back every 5 minutes
Ok(Action::requeue(Duration::from_secs(5 * 60)))
}
// Finalizer cleanup (the object was deleted, ensure nothing is orphaned)
async fn cleanup(&self, ctx: Arc<Context>) -> Result<Action> {
let recorder = ctx.diagnostics.read().await.recorder(ctx.client.clone(), self);
// Document doesn't have any real cleanup, so we just publish an event
recorder
.publish(Event {
type_: EventType::Normal,
reason: "DeleteRequested".into(),
note: Some(format!("Delete `{}`", self.name_any())),
action: "Deleting".into(),
secondary: None,
})
.await
.map_err(Error::KubeError)?;
Ok(Action::await_change())
}
}
/// Diagnostics to be exposed by the web server
#[derive(Clone, Serialize)]
pub struct Diagnostics {
#[serde(deserialize_with = "from_ts")]
pub last_event: DateTime<Utc>,
#[serde(skip)]
pub reporter: Reporter,
}
impl Default for Diagnostics {
fn default() -> Self {
Self {
last_event: Utc::now(),
reporter: "doc-controller".into(),
}
}
}
impl Diagnostics {
fn recorder(&self, client: Client, doc: &Document) -> Recorder {
Recorder::new(client, self.reporter.clone(), doc.object_ref(&()))
}
}
/// State shared between the controller and the web server
#[derive(Clone, Default)]
pub struct State {
/// Diagnostics populated by the reconciler
diagnostics: Arc<RwLock<Diagnostics>>,
/// Metrics and encoder
metrics: Arc<Metrics>,
}
/// State wrapper around the controller outputs for the web server
impl State {
/// Metrics getter
pub fn metrics(&self) -> Arc<Metrics> {
self.metrics.clone()
}
/// State getter
pub async fn diagnostics(&self) -> Diagnostics {
self.diagnostics.read().await.clone()
}
// Create a Controller Context that can update State
pub fn to_context(&self, client: Client) -> Arc<Context> {
Arc::new(Context {
client,
metrics: self.metrics.clone(),
diagnostics: self.diagnostics.clone(),
})
}
}
/// Initialize the controller and shared state (given the crd is installed)
pub async fn run(state: State) {
let client = Client::try_default().await.expect("failed to create kube Client");
let docs = Api::<Document>::all(client.clone());
if let Err(e) = docs.list(&ListParams::default().limit(1)).await {
error!("CRD is not queryable; {e:?}. Is the CRD installed?");
info!("Installation: cargo run --bin crdgen | kubectl apply -f -");
std::process::exit(1);
}
Controller::new(docs, Config::default().any_semantic())
.shutdown_on_signal()
.run(reconcile, error_policy, state.to_context(client))
.filter_map(|x| async move { std::result::Result::ok(x) })
.for_each(|_| futures::future::ready(()))
.await;
}
// Mock tests relying on fixtures.rs and its primitive apiserver mocks
#[cfg(test)]
mod test {
use super::{error_policy, reconcile, Context, Document};
use crate::fixtures::{timeout_after_1s, Scenario};
use std::sync::Arc;
#[tokio::test]
async fn documents_without_finalizer_gets_a_finalizer() {
let (testctx, fakeserver) = Context::test();
let doc = Document::test();
let mocksrv = fakeserver.run(Scenario::FinalizerCreation(doc.clone()));
reconcile(Arc::new(doc), testctx).await.expect("reconciler");
timeout_after_1s(mocksrv).await;
}
#[tokio::test]
async fn finalized_doc_causes_status_patch() {
let (testctx, fakeserver) = Context::test();
let doc = Document::test().finalized();
let mocksrv = fakeserver.run(Scenario::StatusPatch(doc.clone()));
reconcile(Arc::new(doc), testctx).await.expect("reconciler");
timeout_after_1s(mocksrv).await;
}
#[tokio::test]
async fn finalized_doc_with_hide_causes_event_and_hide_patch() {
let (testctx, fakeserver) = Context::test();
let doc = Document::test().finalized().needs_hide();
let scenario = Scenario::EventPublishThenStatusPatch("HideRequested".into(), doc.clone());
let mocksrv = fakeserver.run(scenario);
reconcile(Arc::new(doc), testctx).await.expect("reconciler");
timeout_after_1s(mocksrv).await;
}
#[tokio::test]
async fn finalized_doc_with_delete_timestamp_causes_delete() {
let (testctx, fakeserver) = Context::test();
let doc = Document::test().finalized().needs_delete();
let mocksrv = fakeserver.run(Scenario::Cleanup("DeleteRequested".into(), doc.clone()));
reconcile(Arc::new(doc), testctx).await.expect("reconciler");
timeout_after_1s(mocksrv).await;
}
#[tokio::test]
async fn illegal_doc_reconcile_errors_which_bumps_failure_metric() {
let (testctx, fakeserver) = Context::test();
let doc = Arc::new(Document::illegal().finalized());
let mocksrv = fakeserver.run(Scenario::RadioSilence);
let res = reconcile(doc.clone(), testctx.clone()).await;
timeout_after_1s(mocksrv).await;
assert!(res.is_err(), "apply reconciler fails on illegal doc");
let err = res.unwrap_err();
assert!(err.to_string().contains("IllegalDocument"));
// calling error policy with the reconciler error should cause the correct metric to be set
error_policy(doc.clone(), &err, testctx.clone());
let metrics = &testctx.metrics.app.reconcile;
let failures = metrics.get_failures("illegal", "finalizererror(applyfailed(illegaldocument))");
assert_eq!(failures, 1);
}
// Integration test without mocks
use kube::api::{Api, ListParams, Patch, PatchParams};
#[tokio::test]
#[ignore = "uses k8s current-context"]
async fn integration_reconcile_should_set_status_and_send_event() {
let client = kube::Client::try_default().await.unwrap();
let ctx = super::State::default().to_context(client.clone());
// create a test doc
let doc = Document::test().finalized().needs_hide();
let docs: Api<Document> = Api::namespaced(client.clone(), "default");
let ssapply = PatchParams::apply("ctrltest");
let patch = Patch::Apply(doc.clone());
docs.patch("test", &ssapply, &patch).await.unwrap();
// reconcile it (as if it was just applied to the cluster like this)
reconcile(Arc::new(doc), ctx).await.unwrap();
// verify side-effects happened
let output = docs.get_status("test").await.unwrap();
assert!(output.status.is_some());
// verify hide event was found
let events: Api<k8s_openapi::api::core::v1::Event> = Api::all(client.clone());
let opts = ListParams::default().fields("involvedObject.kind=Document,involvedObject.name=test");
let event = events
.list(&opts)
.await
.unwrap()
.into_iter()
.filter(|e| e.reason.as_deref() == Some("HideRequested"))
.last()
.unwrap();
dbg!("got ev: {:?}", &event);
assert_eq!(event.action.as_deref(), Some("Hiding"));
}
}