Skip to content

Commit ce1b28c

Browse files
committed
Make event exporter global
Signed-off-by: Miguel A. Cabrera Minagorri <[email protected]>
1 parent ccb5a3d commit ce1b28c

File tree

5 files changed

+74
-18
lines changed

5 files changed

+74
-18
lines changed

Diff for: pipeless/src/cli/start.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,15 @@ pub fn start_pipeless_node(project_dir: &str, export_redis_events: bool) {
4040
} else {
4141
pipeless::event_exporters::EventExporter::new_none_exporter()
4242
};
43+
{ // Context to lock the global event exporter in order to set it
44+
let mut e_exp = pipeless::event_exporters::EVENT_EXPORTER.lock().await;
45+
*e_exp = event_exporter;
46+
}
4347

4448
let streams_table = Arc::new(RwLock::new(pipeless::config::streams::StreamsTable::new()));
4549
let dispatcher = pipeless::dispatcher::Dispatcher::new(streams_table.clone());
4650
let dispatcher_sender = dispatcher.get_sender().clone();
47-
pipeless::dispatcher::start(dispatcher, frame_path_executor, event_exporter);
51+
pipeless::dispatcher::start(dispatcher, frame_path_executor);
4852

4953
// Use the REST adapter to manage streams
5054
let rest_adapter = pipeless::config::adapters::rest::RestAdapter::new(streams_table.clone());

Diff for: pipeless/src/dispatcher.rs

+6-16
Original file line numberDiff line numberDiff line change
@@ -68,25 +68,21 @@ impl Dispatcher {
6868
pub fn start(
6969
dispatcher: Dispatcher,
7070
frame_path_executor_arc: Arc<RwLock<pipeless::stages::path::FramePathExecutor>>,
71-
event_exporter: pipeless::event_exporters::EventExporter,
7271
) {
7372
let running_managers: Arc<RwLock<HashMap<uuid::Uuid, pipeless::pipeline::Manager>>> = Arc::new(RwLock::new(HashMap::new()));
7473
let frame_path_executor_arc = frame_path_executor_arc.clone();
75-
let event_exporter_arc = Arc::new(tokio::sync::Mutex::new(event_exporter));
7674

7775
tokio::spawn(async move {
7876
let running_managers = running_managers.clone();
7977
let dispatcher_sender = dispatcher.get_sender().clone();
8078
let streams_table = dispatcher.get_streams_table().clone();
81-
let event_exporter_arc = event_exporter_arc.clone();
8279
// Process events forever
8380
let concurrent_limit = 3;
8481
dispatcher.process_events(concurrent_limit, move |event, _end_signal| {
8582
let frame_path_executor_arc = frame_path_executor_arc.clone();
8683
let running_managers = running_managers.clone();
8784
let dispatcher_sender = dispatcher_sender.clone();
8885
let streams_table = streams_table.clone();
89-
let event_exporter_arc = event_exporter_arc.clone();
9086
async move {
9187
match event {
9288
DispatcherEvent::TableChange => {
@@ -157,6 +153,7 @@ pub fn start(
157153
new_manager.get_pipeline_id().await
158154
) {
159155
error!("Error adding new stream to the streams config table: {}", err);
156+
pipeless::event_exporters::events::export_stream_start_error_event(entry.get_id()).await;
160157
}
161158
let mut managers_map_guard = running_managers.write().await;
162159
managers_map_guard.insert(new_manager.get_pipeline_id().await, new_manager);
@@ -165,6 +162,7 @@ pub fn start(
165162
error!("Unable to create new pipeline: {}. Rolling back streams configuration.", err.to_string());
166163
let removed = streams_table_guard.remove(entry.get_id());
167164
if removed.is_none() { warn!("Error rolling back table, entry not found.") };
165+
pipeless::event_exporters::events::export_stream_start_error_event(entry.get_id()).await;
168166
}
169167
}
170168
},
@@ -249,18 +247,10 @@ pub fn start(
249247
}
250248
}
251249

252-
// Export the event
253-
let ext_event: serde_json::Value = serde_json::json!({
254-
"type": "StreamFinished",
255-
"end_state": finish_state.to_string(),
256-
"stream_uuid": stream_uuid.unwrap_or_default(),
257-
});
258-
let ext_event_json_str = serde_json::to_string(&ext_event);
259-
if let Ok(json_str) = ext_event_json_str {
260-
event_exporter_arc.lock().await.publish(&json_str).await;
261-
} else {
262-
warn!("Error serializing event to JSON string, skipping external publishing");
263-
}
250+
pipeless::event_exporters::events::export_stream_finished_event(
251+
stream_uuid.unwrap_or_default(),
252+
finish_state.to_string().as_str()
253+
).await;
264254
}
265255
}
266256
}

Diff for: pipeless/src/event_exporters/events.rs

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
use std::fmt;
2+
use log::warn;
3+
4+
pub enum EventType {
5+
StreamStartError,
6+
StreamFinished,
7+
}
8+
impl fmt::Display for EventType {
9+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
10+
match self {
11+
EventType::StreamStartError => write!(f, "StreamStartError"),
12+
EventType::StreamFinished => write!(f, "StreamFinished"),
13+
}
14+
}
15+
}
16+
17+
/*
18+
* Exports a stream finished event to the external event exporter when it is enabled
19+
*/
20+
pub async fn export_stream_finished_event(stream_uuid: uuid::Uuid, stream_end_state: &str) {
21+
let ext_event: serde_json::Value = serde_json::json!({
22+
"type": EventType::StreamFinished.to_string(),
23+
"end_state": stream_end_state,
24+
"stream_uuid": stream_uuid.to_string(),
25+
});
26+
let ext_event_json_str = serde_json::to_string(&ext_event);
27+
if let Ok(json_str) = ext_event_json_str {
28+
super::EVENT_EXPORTER.lock().await.publish(&json_str).await;
29+
} else {
30+
warn!("Error serializing event to JSON string, skipping external publishing");
31+
}
32+
}
33+
34+
/*
35+
* Exports a stream start error event to the external event exporter when it is enabled
36+
*/
37+
pub async fn export_stream_start_error_event(stream_uuid: uuid::Uuid) {
38+
let ext_event: serde_json::Value = serde_json::json!({
39+
"type": EventType::StreamStartError.to_string(),
40+
"end_state": "error",
41+
"stream_uuid": stream_uuid.to_string(),
42+
});
43+
let ext_event_json_str = serde_json::to_string(&ext_event);
44+
if let Ok(json_str) = ext_event_json_str {
45+
super::EVENT_EXPORTER.lock().await.publish(&json_str).await;
46+
} else {
47+
warn!("Error serializing event to JSON string, skipping external publishing");
48+
}
49+
}

Diff for: pipeless/src/event_exporters/mod.rs

+13
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
1+
use std::sync::Arc;
2+
13
use log::warn;
24
use redis::AsyncCommands;
5+
use lazy_static::lazy_static;
6+
use tokio::sync::Mutex;
7+
8+
pub mod events;
39

410
pub enum EventExporterEnum {
511
Redis(Redis),
@@ -51,3 +57,10 @@ impl Redis {
5157
}
5258
}
5359
}
60+
61+
// Create global variable to access the event exporter from any point of the code
62+
// It uses an Arc to be shared among threads and a Mutex since the connection is updated on every push
63+
lazy_static! {
64+
pub static ref EVENT_EXPORTER: Arc<Mutex<EventExporter>> =
65+
Arc::new(Mutex::new(EventExporter::new_none_exporter()));
66+
}

Diff for: pipeless/src/main.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ fn main() {
124124

125125
match &cli.command {
126126
Some(Commands::Init { project_name , template}) => pipeless_ai::cli::init::init(&project_name, template),
127-
Some(Commands::Start { project_dir , export_redis_events }) => pipeless_ai::cli::start::start_pipeless_node(&project_dir, *export_redis_events),
127+
Some(Commands::Start { project_dir , export_events_redis }) => pipeless_ai::cli::start::start_pipeless_node(&project_dir, *export_events_redis),
128128
Some(Commands::Add { command }) => {
129129
match &command {
130130
Some(AddCommand::Stream { input_uri, output_uri, frame_path , restart_policy}) => pipeless_ai::cli::streams::add(input_uri, output_uri, frame_path, restart_policy),

0 commit comments

Comments
 (0)