Skip to content

Commit

Permalink
feat(metrics): add progress metrics via collector (#17359)
Browse files Browse the repository at this point in the history
* add query scan rows metrics

* fix build

* register the metrics

* track scan & write progress

* track spill progress

* refactor

* attach session manager to it

* update the finished query

* finish the metrics

* fix clippy

* fix header

* fix cargo fmt

* fix tablo
  • Loading branch information
flaneur2020 authored Feb 15, 2025
1 parent 31d0fac commit 506cfd9
Show file tree
Hide file tree
Showing 11 changed files with 241 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions src/common/base/src/base/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ pub struct ProgressValues {
pub bytes: usize,
}

impl ProgressValues {
pub fn add(&self, other: &ProgressValues) -> ProgressValues {
ProgressValues {
rows: self.rows + other.rows,
bytes: self.bytes + other.bytes,
}
}
}

#[derive(Debug)]
pub struct Progress {
rows: AtomicUsize,
Expand Down
4 changes: 4 additions & 0 deletions src/common/base/src/runtime/metrics/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ impl GlobalRegistry {
metric
}

pub fn register_collector(&self, collector: Box<dyn prometheus_client::collector::Collector>) {
self.inner.lock().registry.register_collector(collector);
}

pub(crate) fn new_scoped_metric(&self, index: usize) -> impl Iterator<Item = ScopedMetric> {
let global_registry = self.inner.lock();
let mut scoped_metrics = Vec::with_capacity(global_registry.metrics.len() - index);
Expand Down
7 changes: 7 additions & 0 deletions src/common/metrics/src/metrics/interpreter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ const METRIC_QUERY_TOTAL_PARTITIONS: &str = "query_total_partitions";
const METRIC_QUERY_RESULT_ROWS: &str = "query_result_rows";
const METRIC_QUERY_RESULT_BYTES: &str = "query_result_bytes";

pub const METRIC_QUERY_SCAN_PROGRESS_ROWS: &str = "query_scan_progress_rows";
pub const METRIC_QUERY_SCAN_PROGRESS_BYTES: &str = "query_scan_progress_bytes";
pub const METRIC_QUERY_WRITE_PROGRESS_ROWS: &str = "query_write_progress_rows";
pub const METRIC_QUERY_WRITE_PROGRESS_BYTES: &str = "query_write_progress_bytes";
pub const METRIC_QUERY_SPILL_PROGRESS_ROWS: &str = "query_spill_progress_rows";
pub const METRIC_QUERY_SPILL_PROGRESS_BYTES: &str = "query_spill_progress_bytes";

pub static QUERY_START: LazyLock<FamilyCounter<VecLabels>> =
LazyLock::new(|| register_counter_family(METRIC_QUERY_START));
pub static QUERY_SUCCESS: LazyLock<FamilyCounter<VecLabels>> =
Expand Down
2 changes: 2 additions & 0 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ pub struct ProcessInfo {
/// storage metrics for persisted data reading.
pub data_metrics: Option<StorageMetrics>,
pub scan_progress_value: Option<ProgressValues>,
pub write_progress_value: Option<ProgressValues>,
pub spill_progress_value: Option<ProgressValues>,
pub mysql_connection_id: Option<u32>,
pub created_time: SystemTime,
pub status_info: Option<String>,
Expand Down
1 change: 1 addition & 0 deletions src/query/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ paste = { workspace = true }
petgraph = { workspace = true }
pin-project-lite = { workspace = true }
poem = { workspace = true }
prometheus-client = { workspace = true }
prost = { workspace = true }
rand = { workspace = true }
recursive = { workspace = true }
Expand Down
10 changes: 10 additions & 0 deletions src/query/service/src/interpreters/interpreter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,16 @@ fn log_query_finished(ctx: &QueryContext, error: Option<ErrorCode>, has_profiles
let typ = session.get_type();
if typ.is_user_session() {
SessionManager::instance().status.write().query_finish(now);
SessionManager::instance()
.metrics_collector
.track_finished_query(
ctx.get_scan_progress_value(),
ctx.get_write_progress_value(),
ctx.get_join_spill_progress_value(),
ctx.get_aggregate_spill_progress_value(),
ctx.get_group_by_spill_progress_value(),
ctx.get_window_partition_spill_progress_value(),
);
}

if let Err(error) = InterpreterQueryLog::log_finish(ctx, now, error, has_profiles) {
Expand Down
1 change: 1 addition & 0 deletions src/query/service/src/sessions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod session;
mod session_ctx;
mod session_info;
mod session_mgr;
mod session_mgr_metrics;
mod session_mgr_status;
mod session_privilege_mgr;
mod session_status;
Expand Down
23 changes: 23 additions & 0 deletions src/query/service/src/sessions/session_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ impl Session {
memory_usage,
data_metrics: Self::query_data_metrics(session_ctx),
scan_progress_value: Self::query_scan_progress_value(session_ctx),
write_progress_value: Self::query_write_progress_value(session_ctx),
spill_progress_value: Self::query_spill_progress_value(session_ctx),
mysql_connection_id: self.mysql_connection_id,
created_time: Self::query_created_time(session_ctx),
status_info: shared_query_context
Expand Down Expand Up @@ -105,6 +107,27 @@ impl Session {
.map(|context_shared| context_shared.scan_progress.get_values())
}

fn query_write_progress_value(status: &SessionContext) -> Option<ProgressValues> {
status
.get_query_context_shared()
.as_ref()
.map(|context_shared| context_shared.write_progress.get_values())
}

fn query_spill_progress_value(status: &SessionContext) -> Option<ProgressValues> {
status
.get_query_context_shared()
.as_ref()
.map(|context_shared| {
context_shared
.agg_spill_progress
.get_values()
.add(&context_shared.join_spill_progress.get_values())
.add(&context_shared.window_partition_spill_progress.get_values())
.add(&context_shared.group_by_spill_progress.get_values())
})
}

fn query_created_time(status: &SessionContext) -> SystemTime {
match status.get_query_context_shared() {
None => SystemTime::now(),
Expand Down
15 changes: 12 additions & 3 deletions src/query/service/src/sessions/session_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::time::Duration;
use databend_common_base::base::tokio;
use databend_common_base::base::GlobalInstance;
use databend_common_base::base::SignalStream;
use databend_common_base::runtime::metrics::GLOBAL_METRICS_REGISTRY;
use databend_common_catalog::table_context::ProcessInfoState;
use databend_common_config::GlobalConfig;
use databend_common_config::InnerConfig;
Expand All @@ -38,6 +39,7 @@ use log::info;
use parking_lot::RwLock;

use crate::sessions::session::Session;
use crate::sessions::session_mgr_metrics::SessionManagerMetricsCollector;
use crate::sessions::ProcessInfo;
use crate::sessions::SessionContext;
use crate::sessions::SessionManagerStatus;
Expand All @@ -47,6 +49,7 @@ pub struct SessionManager {
pub(in crate::sessions) max_sessions: usize,
pub(in crate::sessions) active_sessions: Arc<RwLock<HashMap<String, Weak<Session>>>>,
pub status: Arc<RwLock<SessionManagerStatus>>,
pub metrics_collector: SessionManagerMetricsCollector,

// When typ is MySQL, insert into this map, key is id, val is MySQL connection id.
pub(crate) mysql_conn_map: Arc<RwLock<HashMap<Option<u32>, String>>>,
Expand All @@ -55,20 +58,26 @@ pub struct SessionManager {

impl SessionManager {
pub fn init(conf: &InnerConfig) -> Result<()> {
GlobalInstance::set(Self::create(conf));
let global_instance = Self::create(conf);
GlobalInstance::set(global_instance.clone());
GLOBAL_METRICS_REGISTRY
.register_collector(Box::new(global_instance.metrics_collector.clone()));

Ok(())
}

pub fn create(conf: &InnerConfig) -> Arc<SessionManager> {
let max_sessions = conf.query.max_active_sessions as usize;
Arc::new(SessionManager {
let mgr = Arc::new(SessionManager {
max_sessions,
mysql_basic_conn_id: AtomicU32::new(9_u32.to_le()),
status: Arc::new(RwLock::new(SessionManagerStatus::default())),
mysql_conn_map: Arc::new(RwLock::new(HashMap::with_capacity(max_sessions))),
active_sessions: Arc::new(RwLock::new(HashMap::with_capacity(max_sessions))),
})
metrics_collector: SessionManagerMetricsCollector::new(),
});
mgr.metrics_collector.attach_session_manager(mgr.clone());
mgr
}

pub fn instance() -> Arc<SessionManager> {
Expand Down
171 changes: 171 additions & 0 deletions src/query/service/src/sessions/session_mgr_metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use databend_common_base::base::ProgressValues;
use databend_common_metrics::interpreter::METRIC_QUERY_SCAN_PROGRESS_BYTES;
use databend_common_metrics::interpreter::METRIC_QUERY_SCAN_PROGRESS_ROWS;
use databend_common_metrics::interpreter::METRIC_QUERY_SPILL_PROGRESS_BYTES;
use databend_common_metrics::interpreter::METRIC_QUERY_SPILL_PROGRESS_ROWS;
use databend_common_metrics::interpreter::METRIC_QUERY_WRITE_PROGRESS_BYTES;
use databend_common_metrics::interpreter::METRIC_QUERY_WRITE_PROGRESS_ROWS;
use parking_lot::Mutex;
use prometheus_client::collector::Collector;
use prometheus_client::encoding::EncodeMetric;
use prometheus_client::metrics::counter::ConstCounter;

use crate::sessions::SessionManager;

/// [`SessionManagerMetricsCollector`] dumps the progress metrics of scan/write/spills
/// from the [`SessionManager`]'s running queries to the prometheus. To avoid the progress
/// metrics being decreased, we also need to accumulate these progress values after the query
/// is finished.
#[derive(Clone)]
pub struct SessionManagerMetricsCollector {
inner: Arc<Mutex<SessionManagerMetricsCollectorInner>>,
}

pub(crate) struct SessionManagerMetricsCollectorInner {
session_mgr: Option<Arc<SessionManager>>,
finished_scan_total: ProgressValues,
finished_write_total: ProgressValues,
finished_spill_total: ProgressValues,
}

impl SessionManagerMetricsCollector {
pub fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(SessionManagerMetricsCollectorInner {
session_mgr: None,
finished_scan_total: ProgressValues::default(),
finished_write_total: ProgressValues::default(),
finished_spill_total: ProgressValues::default(),
})),
}
}

pub fn attach_session_manager(&self, session_mgr: Arc<SessionManager>) {
let mut guard = self.inner.lock();
guard.session_mgr.replace(session_mgr);
}

pub fn track_finished_query(
&self,
scan: ProgressValues,
write: ProgressValues,
join_spill: ProgressValues,
aggregate_spill: ProgressValues,
group_by_spill: ProgressValues,
window_partition_spill: ProgressValues,
) {
let mut guard = self.inner.lock();
guard.finished_scan_total = guard.finished_scan_total.add(&scan);
guard.finished_write_total = guard.finished_write_total.add(&write);
guard.finished_spill_total = guard
.finished_spill_total
.add(&join_spill)
.add(&aggregate_spill)
.add(&group_by_spill)
.add(&window_partition_spill);
}
}

impl Default for SessionManagerMetricsCollector {
fn default() -> Self {
Self::new()
}
}

impl std::fmt::Debug for SessionManagerMetricsCollector {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "SessionMetricsCollector")
}
}

impl Collector for SessionManagerMetricsCollector {
fn encode(
&self,
mut encoder: prometheus_client::encoding::DescriptorEncoder,
) -> Result<(), std::fmt::Error> {
let processes = {
match self.inner.lock().session_mgr.as_ref() {
Some(mgr) => mgr.processes_info(),
None => return Ok(()),
}
};

let (mut scan_progress, mut write_progress, mut spill_progress) = {
let guard = self.inner.lock();
(
guard.finished_scan_total.clone(),
guard.finished_write_total.clone(),
guard.finished_spill_total.clone(),
)
};
for process in processes {
if let Some(scan) = &process.scan_progress_value {
scan_progress = scan_progress.add(scan);
}
if let Some(write) = &process.write_progress_value {
write_progress = write_progress.add(write);
}
if let Some(spill) = &process.spill_progress_value {
spill_progress = spill_progress.add(spill);
}
}

let metrics = vec![
(
METRIC_QUERY_SCAN_PROGRESS_ROWS,
scan_progress.rows as f64,
"Total scan rows in progress.",
),
(
METRIC_QUERY_SCAN_PROGRESS_BYTES,
scan_progress.bytes as f64,
"Total scan bytes in progress.",
),
(
METRIC_QUERY_WRITE_PROGRESS_ROWS,
write_progress.rows as f64,
"Total write rows in progress.",
),
(
METRIC_QUERY_WRITE_PROGRESS_BYTES,
write_progress.bytes as f64,
"Total write bytes in progress.",
),
(
METRIC_QUERY_SPILL_PROGRESS_ROWS,
spill_progress.rows as f64,
"Total spill rows in progress.",
),
(
METRIC_QUERY_SPILL_PROGRESS_BYTES,
spill_progress.bytes as f64,
"Total spill bytes in progress.",
),
];

for (name, value, help) in metrics {
let counter = ConstCounter::new(value);
let counter_encoder =
encoder.encode_descriptor(name, help, None, counter.metric_type())?;
counter.encode(counter_encoder)?;
}

Ok(())
}
}

0 comments on commit 506cfd9

Please sign in to comment.