-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathactix-demo.rs
129 lines (114 loc) · 3.39 KB
/
actix-demo.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
//! To configure Prometheus to send samples to this binary, add the following to your prometheus.yml:
//!
//! ```yml
//! remote_write:
//! - url: "http://localhost:9201/api/write"
//!
//! remote_read:
//! - url: "http://localhost:9201/api/read"
//! ```
use async_trait::async_trait;
use std::{
future::{ready, Ready},
sync::Arc,
};
use actix_web::{web, App, FromRequest, HttpServer};
use prom_remote_api::{
types::{
Error, Label, Query, QueryResult, RemoteStorage, RemoteStorageRef, Result, Sample,
TimeSeries, WriteRequest,
},
web::actix::{read, write},
};
#[derive(Clone, Copy)]
struct MockStorage;
fn generate_samples(start_ms: i64, end_ms: i64, step_ms: i64) -> Vec<Sample> {
// instant query
if step_ms == 0 {
return vec![Sample {
value: 1.0,
timestamp: start_ms,
}];
}
// range query
(start_ms..end_ms)
.step_by(step_ms as usize)
.enumerate()
.map(|(i, timestamp)| Sample {
value: 1.0 + i as f64,
timestamp,
})
.collect()
}
#[async_trait]
impl RemoteStorage for MockStorage {
type Err = Error;
type Context = RequestContext;
async fn write(&self, ctx: Self::Context, req: WriteRequest) -> Result<()> {
let user = ctx.user;
println!("mock write, user:{user}, req:{req:?}");
Ok(())
}
async fn process_query(&self, ctx: &Self::Context, query: Query) -> Result<QueryResult> {
let user = &ctx.user;
println!("mock read, user:{user}, req:{query:?}");
Ok(QueryResult {
timeseries: vec![TimeSeries {
labels: vec![
Label {
name: "job".to_string(),
value: "mock-remote".to_string(),
},
Label {
name: "instance".to_string(),
value: "127.0.0.1:9201".to_string(),
},
Label {
name: "__name__".to_string(),
value: "up".to_string(),
},
],
samples: generate_samples(
query.start_timestamp_ms,
query.end_timestamp_ms,
query
.hints
.as_ref()
.map(|hint| hint.step_ms)
.unwrap_or(1000),
),
..Default::default()
}],
})
}
}
struct RequestContext {
user: String,
}
impl FromRequest for RequestContext {
type Error = Error;
type Future = Ready<Result<Self>>;
fn from_request(
_req: &actix_web::HttpRequest,
_payload: &mut actix_web::dev::Payload,
) -> Self::Future {
ready(Ok(RequestContext {
user: "foo".to_string(),
}))
}
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let port = 9201;
println!("Listen on {port}...");
let storage: RemoteStorageRef<_, _> = Arc::new(MockStorage);
HttpServer::new(move || {
App::new()
.app_data(web::Data::new(storage.clone()))
.route("/api/read", web::post().to(read::<RequestContext, Error>))
.route("/api/write", web::post().to(write::<RequestContext, Error>))
})
.bind(("127.0.0.1", port))?
.run()
.await
}