Skip to content

Commit 6eaf8bf

Browse files
authored
feat(telemetry): add telemetry when checking license (#18371) (#18394)
Signed-off-by: tabVersion <[email protected]>
1 parent 4050a04 commit 6eaf8bf

File tree

15 files changed

+296
-157
lines changed

15 files changed

+296
-157
lines changed

Cargo.lock

Lines changed: 19 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ members = [
1010
"src/common/fields-derive",
1111
"src/common/heap_profiling",
1212
"src/common/metrics",
13+
"src/common/telemetry_event",
1314
"src/compute",
1415
"src/connector",
1516
"src/connector/codec",
@@ -243,6 +244,7 @@ risingwave_udf = { path = "./src/expr/udf" }
243244
risingwave_variables = { path = "./src/utils/variables" }
244245
risingwave_java_binding = { path = "./src/java_binding" }
245246
risingwave_jni_core = { path = "src/jni_core" }
247+
risingwave_telemetry_event = { path = "./src/common/telemetry_event" }
246248
rw_futures_util = { path = "src/utils/futures_util" }
247249
rw_resource_util = { path = "src/utils/resource_util" }
248250
rw_iter_util = { path = "src/utils/iter_util" }

src/common/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ risingwave_common_secret = { path = "./secret" }
9090
risingwave_error = { workspace = true }
9191
risingwave_license = { workspace = true }
9292
risingwave_pb = { workspace = true }
93+
risingwave_telemetry_event = { workspace = true }
9394
rust_decimal = { version = "1", features = ["db-postgres", "maths"] }
9495
rw_iter_util = { workspace = true }
9596
rw_resource_util = { workspace = true }

src/common/src/telemetry/mod.rs

Lines changed: 7 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,22 @@ pub mod pb_compatible;
1717
pub mod report;
1818

1919
use std::env;
20-
use std::time::SystemTime;
2120

2221
use risingwave_pb::telemetry::PbTelemetryClusterType;
22+
pub use risingwave_telemetry_event::{
23+
current_timestamp, post_telemetry_report_pb, report_event_common, request_to_telemetry_event,
24+
TelemetryError, TelemetryResult,
25+
};
2326
use serde::{Deserialize, Serialize};
2427
use sysinfo::System;
25-
use thiserror_ext::AsReport;
2628

2729
use crate::util::env_var::env_var_is_true_or;
2830
use crate::util::resource_util::cpu::total_cpu_available;
2931
use crate::util::resource_util::memory::{system_memory_available_bytes, total_memory_used_bytes};
3032
use crate::RW_VERSION;
3133

34+
type Result<T> = core::result::Result<T, TelemetryError>;
35+
3236
pub const TELEMETRY_CLUSTER_TYPE: &str = "RW_TELEMETRY_TYPE";
3337
pub const TELEMETRY_CLUSTER_TYPE_HOSTED: &str = "hosted"; // hosted on RisingWave Cloud
3438
pub const TELEMETRY_CLUSTER_TYPE_KUBERNETES: &str = "kubernetes";
@@ -50,21 +54,13 @@ pub fn telemetry_cluster_type_from_env_var() -> PbTelemetryClusterType {
5054
}
5155

5256
/// Url of telemetry backend
53-
pub const TELEMETRY_REPORT_URL: &str = "https://telemetry.risingwave.dev/api/v2/report";
54-
57+
pub use risingwave_telemetry_event::TELEMETRY_REPORT_URL;
5558
/// Telemetry reporting interval in seconds, 6 hours
5659
pub const TELEMETRY_REPORT_INTERVAL: u64 = 6 * 60 * 60;
5760

5861
/// Environment Variable that is default to be true
5962
const TELEMETRY_ENV_ENABLE: &str = "ENABLE_TELEMETRY";
6063

61-
pub type TelemetryResult<T> = core::result::Result<T, TelemetryError>;
62-
63-
/// Telemetry errors are generally recoverable/ignorable. `String` is good enough.
64-
pub type TelemetryError = String;
65-
66-
type Result<T> = core::result::Result<T, TelemetryError>;
67-
6864
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
6965
pub enum TelemetryNodeType {
7066
Meta,
@@ -148,40 +144,12 @@ impl Default for SystemData {
148144
}
149145
}
150146

151-
/// Sends a `POST` request of the telemetry reporting to a URL.
152-
pub async fn post_telemetry_report_pb(url: &str, report_body: Vec<u8>) -> Result<()> {
153-
let client = reqwest::Client::new();
154-
let res = client
155-
.post(url)
156-
.header(reqwest::header::CONTENT_TYPE, "application/x-protobuf")
157-
.body(report_body)
158-
.send()
159-
.await
160-
.map_err(|err| format!("failed to send telemetry report, err: {}", err.as_report()))?;
161-
if res.status().is_success() {
162-
Ok(())
163-
} else {
164-
Err(format!(
165-
"telemetry response is error, url {}, status {}",
166-
url,
167-
res.status()
168-
))
169-
}
170-
}
171-
172147
/// check whether telemetry is enabled in environment variable
173148
pub fn telemetry_env_enabled() -> bool {
174149
// default to be true
175150
env_var_is_true_or(TELEMETRY_ENV_ENABLE, true)
176151
}
177152

178-
pub fn current_timestamp() -> u64 {
179-
SystemTime::now()
180-
.duration_since(SystemTime::UNIX_EPOCH)
181-
.expect("Clock might go backward")
182-
.as_secs()
183-
}
184-
185153
pub fn report_scarf_enabled() -> bool {
186154
telemetry_env_enabled()
187155
&& !matches!(

src/common/src/telemetry/report.rs

Lines changed: 4 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -12,21 +12,18 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::sync::{Arc, OnceLock};
15+
use std::sync::Arc;
1616

17-
use prost::Message;
18-
use risingwave_pb::telemetry::{
19-
EventMessage as PbEventMessage, PbTelemetryDatabaseObject,
20-
TelemetryEventStage as PbTelemetryEventStage,
17+
pub use risingwave_telemetry_event::{
18+
current_timestamp, post_telemetry_report_pb, TELEMETRY_REPORT_URL, TELEMETRY_TRACKING_ID,
2119
};
2220
use tokio::sync::oneshot::Sender;
2321
use tokio::task::JoinHandle;
2422
use tokio::time::{interval, Duration};
2523
use uuid::Uuid;
2624

27-
use super::{current_timestamp, Result, TELEMETRY_REPORT_INTERVAL, TELEMETRY_REPORT_URL};
25+
use super::{Result, TELEMETRY_REPORT_INTERVAL};
2826
use crate::telemetry::pb_compatible::TelemetryToProtobuf;
29-
use crate::telemetry::post_telemetry_report_pb;
3027

3128
#[async_trait::async_trait]
3229
pub trait TelemetryInfoFetcher {
@@ -47,8 +44,6 @@ pub trait TelemetryReportCreator {
4744
fn report_type(&self) -> &str;
4845
}
4946

50-
static TELEMETRY_TRACKING_ID: OnceLock<String> = OnceLock::new();
51-
5247
pub async fn start_telemetry_reporting<F, I>(
5348
info_fetcher: Arc<I>,
5449
report_creator: Arc<F>,
@@ -126,99 +121,3 @@ where
126121
});
127122
(join_handle, shutdown_tx)
128123
}
129-
130-
pub fn report_event_common(
131-
event_stage: PbTelemetryEventStage,
132-
event_name: &str,
133-
catalog_id: i64,
134-
connector_name: Option<String>,
135-
object: Option<PbTelemetryDatabaseObject>,
136-
attributes: Option<jsonbb::Value>, // any json string
137-
node: String,
138-
) {
139-
let event_tracking_id: String;
140-
if let Some(tracking_id) = TELEMETRY_TRACKING_ID.get() {
141-
event_tracking_id = tracking_id.to_string();
142-
} else {
143-
tracing::info!("Telemetry tracking_id is not set, event reporting disabled");
144-
return;
145-
}
146-
147-
request_to_telemetry_event(
148-
event_tracking_id,
149-
event_stage,
150-
event_name,
151-
catalog_id,
152-
connector_name,
153-
object,
154-
attributes,
155-
node,
156-
false,
157-
);
158-
}
159-
160-
fn request_to_telemetry_event(
161-
tracking_id: String,
162-
event_stage: PbTelemetryEventStage,
163-
event_name: &str,
164-
catalog_id: i64,
165-
connector_name: Option<String>,
166-
object: Option<PbTelemetryDatabaseObject>,
167-
attributes: Option<jsonbb::Value>, // any json string
168-
node: String,
169-
is_test: bool,
170-
) {
171-
let event = PbEventMessage {
172-
tracking_id,
173-
event_time_sec: current_timestamp(),
174-
event_stage: event_stage as i32,
175-
event_name: event_name.to_string(),
176-
connector_name,
177-
object: object.map(|c| c as i32),
178-
catalog_id,
179-
attributes: attributes.map(|a| a.to_string()),
180-
node,
181-
is_test,
182-
};
183-
let report_bytes = event.encode_to_vec();
184-
185-
tokio::spawn(async move {
186-
const TELEMETRY_EVENT_REPORT_TYPE: &str = "event";
187-
let url = (TELEMETRY_REPORT_URL.to_owned() + "/" + TELEMETRY_EVENT_REPORT_TYPE).to_owned();
188-
post_telemetry_report_pb(&url, report_bytes)
189-
.await
190-
.unwrap_or_else(|e| tracing::info!("{}", e))
191-
});
192-
}
193-
194-
#[cfg(test)]
195-
mod test {
196-
197-
use super::*;
198-
199-
#[ignore]
200-
#[tokio::test]
201-
async fn test_telemetry_report_event() {
202-
let event_stage = PbTelemetryEventStage::CreateStreamJob;
203-
let event_name = "test_feature";
204-
let catalog_id = 1;
205-
let connector_name = Some("test_connector".to_string());
206-
let object = Some(PbTelemetryDatabaseObject::Source);
207-
let attributes = None;
208-
let node = "test_node".to_string();
209-
210-
request_to_telemetry_event(
211-
"7d45669c-08c7-4571-ae3d-d3a3e70a2f7e".to_string(),
212-
event_stage,
213-
event_name,
214-
catalog_id,
215-
connector_name,
216-
object,
217-
attributes,
218-
node,
219-
true,
220-
);
221-
222-
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
223-
}
224-
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
[package]
2+
name = "risingwave_telemetry_event"
3+
version = { workspace = true }
4+
edition = { workspace = true }
5+
homepage = { workspace = true }
6+
keywords = { workspace = true }
7+
license = { workspace = true }
8+
repository = { workspace = true }
9+
10+
[package.metadata.cargo-machete]
11+
ignored = ["workspace-hack"]
12+
13+
[package.metadata.cargo-udeps.ignore]
14+
normal = ["workspace-hack"]
15+
16+
[dependencies]
17+
jsonbb = { workspace = true }
18+
prost = { workspace = true }
19+
reqwest = { version = "0.12.2", features = ["json"] }
20+
risingwave_pb = { workspace = true }
21+
thiserror = "1"
22+
thiserror-ext = { workspace = true }
23+
tokio = { version = "0.2", package = "madsim-tokio", features = [
24+
"rt",
25+
"rt-multi-thread",
26+
"sync",
27+
"macros",
28+
"time",
29+
"signal",
30+
] }
31+
tracing = "0.1"
32+
uuid = { version = "1", features = ["v4"] }

0 commit comments

Comments
 (0)