Skip to content

Commit 9150a0c

Browse files
feat: add new "dashboard" metrics subscriber (#4527)
## Changes Made - adds a new "dashboard" metrics subscriber that'll make a rest call to `DAFT_DASHBOARD_METRICS_URL` with the provided metrics information - small internal refactoring to split `runtime_stats` into module and submodules ## Related Issues <!-- Link to related GitHub issues, e.g., "Closes #123" --> ## Checklist - [ ] Documented in API Docs (if applicable) - [ ] Documented in User Guide (if applicable) - [ ] If adding a new documentation page, doc is added to `docs/mkdocs.yml` navigation - [ ] Documentation builds and is formatted properly (tag @/ccmao1130 for docs review)
1 parent e984a22 commit 9150a0c

File tree

16 files changed

+1213
-430
lines changed

16 files changed

+1213
-430
lines changed

Cargo.lock

Lines changed: 4 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/common/tracing/src/lib.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ pub fn init_opentelemetry_providers() {
4848
}
4949

5050
pub fn flush_opentelemetry_providers() {
51-
flush_oltp_metrics_provider();
5251
flush_oltp_tracer_provider();
5352
}
5453

@@ -77,7 +76,7 @@ async fn init_otlp_metrics_provider(otlp_endpoint: &str) {
7776
*mg = Some(metrics_provider);
7877
}
7978

80-
fn flush_oltp_metrics_provider() {
79+
pub fn flush_oltp_metrics_provider() {
8180
let mg = GLOBAL_METER_PROVIDER.lock().unwrap();
8281
if let Some(meter_provider) = mg.as_ref() {
8382
if let Err(e) = meter_provider.force_flush() {

src/daft-local-execution/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ parking_lot = {workspace = true}
3636
pin-project = "1"
3737
pyo3 = {workspace = true, optional = true}
3838
pyo3-async-runtimes = {workspace = true, optional = true}
39+
reqwest = {version = "0.12.19", default-features = false}
3940
snafu = {workspace = true}
4041
tokio = {workspace = true}
4142
tokio-util = {workspace = true}

src/daft-local-execution/src/intermediate_ops/intermediate_op.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ use crate::{
1717
pipeline::{NodeInfo, PipelineNode, RuntimeContext},
1818
progress_bar::ProgressBarColor,
1919
resource_manager::MemoryManager,
20-
runtime_stats::{CountingReceiver, CountingSender, RuntimeStatsContext},
20+
runtime_stats::{
21+
CountingReceiver, CountingSender, RuntimeStatsContext, RuntimeStatsEventHandler,
22+
},
2123
ExecutionRuntimeContext, ExecutionTaskSpawner, OperatorOutput, PipelineExecutionSnafu,
2224
};
2325

@@ -124,12 +126,18 @@ impl IntermediateNode {
124126
receiver: Receiver<Arc<MicroPartition>>,
125127
sender: Sender<Arc<MicroPartition>>,
126128
rt_context: Arc<RuntimeStatsContext>,
129+
rt_stats_handler: Arc<RuntimeStatsEventHandler>,
127130
memory_manager: Arc<MemoryManager>,
128131
) -> DaftResult<()> {
129132
let span = info_span!("IntermediateOp::execute");
130133
let compute_runtime = get_compute_runtime();
131-
let task_spawner =
132-
ExecutionTaskSpawner::new(compute_runtime, memory_manager, rt_context, span);
134+
let task_spawner = ExecutionTaskSpawner::new(
135+
compute_runtime,
136+
memory_manager,
137+
rt_context,
138+
rt_stats_handler,
139+
span,
140+
);
133141
let mut state = op.make_state()?;
134142
while let Some(morsel) = receiver.recv().await {
135143
loop {
@@ -172,6 +180,7 @@ impl IntermediateNode {
172180
input_receiver,
173181
output_sender,
174182
self.runtime_stats.clone(),
183+
runtime_handle.runtime_stats_handler(),
175184
memory_manager.clone(),
176185
),
177186
self.intermediate_op.name(),
@@ -236,21 +245,27 @@ impl PipelineNode for IntermediateNode {
236245
true,
237246
self.runtime_stats.clone(),
238247
);
248+
239249
for child in &self.children {
240250
let child_result_receiver = child.start(maintain_order, runtime_handle)?;
241251
child_result_receivers.push(CountingReceiver::new(
242252
child_result_receiver,
243253
self.runtime_stats.clone(),
244254
progress_bar.clone(),
255+
runtime_handle.runtime_stats_handler(),
245256
));
246257
}
247258
let op = self.intermediate_op.clone();
248259
let num_workers = op.max_concurrency().context(PipelineExecutionSnafu {
249260
node_name: self.name(),
250261
})?;
251262
let (destination_sender, destination_receiver) = create_channel(0);
252-
let counting_sender =
253-
CountingSender::new(destination_sender, self.runtime_stats.clone(), progress_bar);
263+
let counting_sender = CountingSender::new(
264+
destination_sender,
265+
self.runtime_stats.clone(),
266+
progress_bar,
267+
runtime_handle.runtime_stats_handler(),
268+
);
254269

255270
let dispatch_spawner = self
256271
.intermediate_op

src/daft-local-execution/src/lib.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ pub(crate) struct ExecutionRuntimeContext {
130130
default_morsel_size: usize,
131131
memory_manager: Arc<MemoryManager>,
132132
progress_bar_manager: Option<Arc<dyn ProgressBarManager>>,
133+
rt_stats_handler: Arc<RuntimeStatsEventHandler>,
133134
}
134135

135136
impl ExecutionRuntimeContext {
@@ -138,12 +139,14 @@ impl ExecutionRuntimeContext {
138139
default_morsel_size: usize,
139140
memory_manager: Arc<MemoryManager>,
140141
progress_bar_manager: Option<Arc<dyn ProgressBarManager>>,
142+
rt_stats_handler: Arc<RuntimeStatsEventHandler>,
141143
) -> Self {
142144
Self {
143145
worker_set: TaskSet::new(),
144146
default_morsel_size,
145147
memory_manager,
146148
progress_bar_manager,
149+
rt_stats_handler,
147150
}
148151
}
149152
pub fn spawn_local(
@@ -196,6 +199,11 @@ impl ExecutionRuntimeContext {
196199
pub(crate) fn memory_manager(&self) -> Arc<MemoryManager> {
197200
self.memory_manager.clone()
198201
}
202+
203+
#[must_use]
204+
pub(crate) fn runtime_stats_handler(&self) -> Arc<RuntimeStatsEventHandler> {
205+
self.rt_stats_handler.clone()
206+
}
199207
}
200208

201209
impl Drop for ExecutionRuntimeContext {
@@ -210,6 +218,7 @@ pub(crate) struct ExecutionTaskSpawner {
210218
runtime_ref: RuntimeRef,
211219
memory_manager: Arc<MemoryManager>,
212220
runtime_context: Arc<RuntimeStatsContext>,
221+
rt_stats_handler: Arc<RuntimeStatsEventHandler>,
213222
outer_span: tracing::Span,
214223
}
215224

@@ -218,12 +227,14 @@ impl ExecutionTaskSpawner {
218227
runtime_ref: RuntimeRef,
219228
memory_manager: Arc<MemoryManager>,
220229
runtime_context: Arc<RuntimeStatsContext>,
230+
rt_stats_handler: Arc<RuntimeStatsEventHandler>,
221231
span: tracing::Span,
222232
) -> Self {
223233
Self {
224234
runtime_ref,
225235
memory_manager,
226236
runtime_context,
237+
rt_stats_handler,
227238
outer_span: span,
228239
}
229240
}
@@ -242,6 +253,7 @@ impl ExecutionTaskSpawner {
242253
let timed_fut = TimedFuture::new(
243254
instrumented,
244255
self.runtime_context.clone(),
256+
self.rt_stats_handler.clone(),
245257
self.outer_span.clone(),
246258
);
247259
let memory_manager = self.memory_manager.clone();
@@ -260,6 +272,7 @@ impl ExecutionTaskSpawner {
260272
let timed_fut = TimedFuture::new(
261273
instrumented,
262274
self.runtime_context.clone(),
275+
self.rt_stats_handler.clone(),
263276
self.outer_span.clone(),
264277
);
265278
self.runtime_ref.spawn(timed_fut)
@@ -269,6 +282,8 @@ impl ExecutionTaskSpawner {
269282
#[cfg(feature = "python")]
270283
use pyo3::prelude::*;
271284

285+
use crate::runtime_stats::RuntimeStatsEventHandler;
286+
272287
#[derive(Debug, Snafu)]
273288
pub enum Error {
274289
#[snafu(display("Error joining spawned task: {}", source))]

src/daft-local-execution/src/progress_bar.rs

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -73,20 +73,24 @@ impl OperatorProgressBar {
7373
return false;
7474
}
7575

76-
let prev = self.last_update.load(Ordering::Acquire);
77-
let elapsed = (now - self.start_time).as_nanos() as u64;
78-
let diff = elapsed.saturating_sub(prev);
79-
80-
// Fast path - check if enough time has passed
81-
if diff < Self::UPDATE_INTERVAL {
82-
return false;
76+
{
77+
{
78+
let prev = self.last_update.load(Ordering::Acquire);
79+
let elapsed = (now - self.start_time).as_nanos() as u64;
80+
let diff = elapsed.saturating_sub(prev);
81+
82+
// Fast path - check if enough time has passed
83+
if diff < Self::UPDATE_INTERVAL {
84+
return false;
85+
}
86+
87+
// Only calculate remainder if we're actually going to update
88+
let remainder = diff % Self::UPDATE_INTERVAL;
89+
self.last_update
90+
.store(elapsed - remainder, Ordering::Release);
91+
true
92+
}
8393
}
84-
85-
// Only calculate remainder if we're actually going to update
86-
let remainder = diff % Self::UPDATE_INTERVAL;
87-
self.last_update
88-
.store(elapsed - remainder, Ordering::Release);
89-
true
9094
}
9195

9296
pub fn render(&self) {

src/daft-local-execution/src/run.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use crate::{
3737
},
3838
progress_bar::{make_progress_bar_manager, ProgressBarManager},
3939
resource_manager::get_or_init_memory_manager,
40+
runtime_stats::RuntimeStatsEventHandler,
4041
ExecutionRuntimeContext,
4142
};
4243

@@ -240,6 +241,7 @@ pub struct NativeExecutor {
240241
cancel: CancellationToken,
241242
runtime: Option<Arc<tokio::runtime::Runtime>>,
242243
pb_manager: Option<Arc<dyn ProgressBarManager>>,
244+
rt_stats_handler: Arc<RuntimeStatsEventHandler>,
243245
enable_explain_analyze: bool,
244246
}
245247

@@ -248,8 +250,10 @@ impl Default for NativeExecutor {
248250
Self {
249251
cancel: CancellationToken::new(),
250252
runtime: None,
253+
// todo: make progressbar another subscriber instances
251254
pb_manager: should_enable_progress_bar().then(make_progress_bar_manager),
252255
enable_explain_analyze: should_enable_explain_analyze(),
256+
rt_stats_handler: Arc::new(RuntimeStatsEventHandler::new()),
253257
}
254258
}
255259
}
@@ -290,6 +294,7 @@ impl NativeExecutor {
290294

291295
let rt = self.runtime.clone();
292296
let pb_manager = self.pb_manager.clone();
297+
let stats_handler = self.rt_stats_handler.clone();
293298
let enable_explain_analyze = self.enable_explain_analyze;
294299
// todo: split this into a run and run_async method
295300
// the run_async should spawn a task instead of a thread like this
@@ -308,6 +313,7 @@ impl NativeExecutor {
308313
cfg.default_morsel_size,
309314
memory_manager.clone(),
310315
pb_manager,
316+
stats_handler.clone(),
311317
);
312318
let receiver = pipeline.start(true, &mut runtime_handle)?;
313319

@@ -331,7 +337,7 @@ impl NativeExecutor {
331337

332338
let local_set = tokio::task::LocalSet::new();
333339
local_set.block_on(&runtime, async {
334-
tokio::select! {
340+
let result = tokio::select! {
335341
biased;
336342
() = cancel.cancelled() => {
337343
log::info!("Execution engine cancelled");
@@ -342,7 +348,14 @@ impl NativeExecutor {
342348
Ok(())
343349
}
344350
result = execution_task => result,
351+
};
352+
353+
// Flush remaining stats events
354+
if let Err(e) = stats_handler.flush().await {
355+
log::warn!("Failed to flush runtime stats: {}", e);
345356
}
357+
358+
result
346359
})?;
347360
if enable_explain_analyze {
348361
let curr_ms = SystemTime::now()

0 commit comments

Comments
 (0)