@@ -27,7 +27,9 @@ pub struct Dispatcher {
27
27
receiver : tokio_stream:: wrappers:: UnboundedReceiverStream < DispatcherEvent > ,
28
28
}
29
29
impl Dispatcher {
30
- pub fn new ( streams_table : Arc < RwLock < pipeless:: config:: streams:: StreamsTable > > ) -> Self {
30
+ pub fn new (
31
+ streams_table : Arc < RwLock < pipeless:: config:: streams:: StreamsTable > > ,
32
+ ) -> Self {
31
33
let ( sender, receiver) = tokio:: sync:: mpsc:: unbounded_channel :: < DispatcherEvent > ( ) ;
32
34
Self {
33
35
sender,
@@ -36,7 +38,6 @@ impl Dispatcher {
36
38
) ,
37
39
streams_table
38
40
}
39
-
40
41
}
41
42
42
43
pub fn get_sender ( & self ) -> tokio:: sync:: mpsc:: UnboundedSender < DispatcherEvent > {
@@ -66,7 +67,7 @@ impl Dispatcher {
66
67
67
68
pub fn start (
68
69
dispatcher : Dispatcher ,
69
- frame_path_executor_arc : Arc < RwLock < pipeless:: stages:: path:: FramePathExecutor > >
70
+ frame_path_executor_arc : Arc < RwLock < pipeless:: stages:: path:: FramePathExecutor > > ,
70
71
) {
71
72
let running_managers: Arc < RwLock < HashMap < uuid:: Uuid , pipeless:: pipeline:: Manager > > > = Arc :: new ( RwLock :: new ( HashMap :: new ( ) ) ) ;
72
73
let frame_path_executor_arc = frame_path_executor_arc. clone ( ) ;
@@ -152,6 +153,7 @@ pub fn start(
152
153
new_manager. get_pipeline_id ( ) . await
153
154
) {
154
155
error ! ( "Error adding new stream to the streams config table: {}" , err) ;
156
+ pipeless:: event_exporters:: events:: export_stream_start_error_event ( entry. get_id ( ) ) . await ;
155
157
}
156
158
let mut managers_map_guard = running_managers. write ( ) . await ;
157
159
managers_map_guard. insert ( new_manager. get_pipeline_id ( ) . await , new_manager) ;
@@ -160,6 +162,7 @@ pub fn start(
160
162
error ! ( "Unable to create new pipeline: {}. Rolling back streams configuration." , err. to_string( ) ) ;
161
163
let removed = streams_table_guard. remove ( entry. get_id ( ) ) ;
162
164
if removed. is_none ( ) { warn ! ( "Error rolling back table, entry not found." ) } ;
165
+ pipeless:: event_exporters:: events:: export_stream_start_error_event ( entry. get_id ( ) ) . await ;
163
166
}
164
167
}
165
168
} ,
@@ -195,50 +198,59 @@ pub fn start(
195
198
}
196
199
}
197
200
DispatcherEvent :: PipelineFinished ( pipeline_id, finish_state) => {
198
- let mut table_write_guard = streams_table. write ( ) . await ;
199
- let stream_entry_option = table_write_guard. find_by_pipeline_id_mut ( pipeline_id) ;
200
- if let Some ( entry) = stream_entry_option {
201
- // Remove the pipeline from the stream entry since it finished
202
- entry. unassign_pipeline ( ) ;
203
-
204
- // Update the target state of the stream based on the restart policy
205
- match entry. get_restart_policy ( ) {
206
- pipeless:: config:: streams:: RestartPolicy :: Never => {
207
- match finish_state {
208
- pipeless:: pipeline:: PipelineEndReason :: Completed => entry. set_target_state ( pipeless:: config:: streams:: StreamEntryState :: Completed ) ,
209
- pipeless:: pipeline:: PipelineEndReason :: Error => entry. set_target_state ( pipeless:: config:: streams:: StreamEntryState :: Error ) ,
210
- pipeless:: pipeline:: PipelineEndReason :: Updated => entry. set_target_state ( pipeless:: config:: streams:: StreamEntryState :: Running ) ,
211
- }
212
- } ,
213
- pipeless:: config:: streams:: RestartPolicy :: Always => {
214
- entry. set_target_state ( pipeless:: config:: streams:: StreamEntryState :: Running ) ;
215
- } ,
216
- pipeless:: config:: streams:: RestartPolicy :: OnError => {
217
- if finish_state == pipeless:: pipeline:: PipelineEndReason :: Error {
218
- entry. set_target_state ( pipeless:: config:: streams:: StreamEntryState :: Running ) ;
219
- } else {
220
- entry. set_target_state ( pipeless:: config:: streams:: StreamEntryState :: Error ) ;
221
- }
222
- } ,
223
- pipeless:: config:: streams:: RestartPolicy :: OnEos => {
224
- if finish_state == pipeless:: pipeline:: PipelineEndReason :: Completed {
201
+ let mut stream_uuid: Option < uuid:: Uuid > = None ;
202
+ { // context to release the write lock
203
+ let mut table_write_guard = streams_table. write ( ) . await ;
204
+ let stream_entry_option = table_write_guard. find_by_pipeline_id_mut ( pipeline_id) ;
205
+ if let Some ( entry) = stream_entry_option {
206
+ stream_uuid = Some ( entry. get_id ( ) ) ;
207
+ // Remove the pipeline from the stream entry since it finished
208
+ entry. unassign_pipeline ( ) ;
209
+
210
+ // Update the target state of the stream based on the restart policy
211
+ match entry. get_restart_policy ( ) {
212
+ pipeless:: config:: streams:: RestartPolicy :: Never => {
213
+ match finish_state {
214
+ pipeless:: pipeline:: PipelineEndReason :: Completed => entry. set_target_state ( pipeless:: config:: streams:: StreamEntryState :: Completed ) ,
215
+ pipeless:: pipeline:: PipelineEndReason :: Error => entry. set_target_state ( pipeless:: config:: streams:: StreamEntryState :: Error ) ,
216
+ pipeless:: pipeline:: PipelineEndReason :: Updated => entry. set_target_state ( pipeless:: config:: streams:: StreamEntryState :: Running ) ,
217
+ }
218
+ } ,
219
+ pipeless:: config:: streams:: RestartPolicy :: Always => {
225
220
entry. set_target_state ( pipeless:: config:: streams:: StreamEntryState :: Running ) ;
226
- } else {
227
- entry. set_target_state ( pipeless:: config:: streams:: StreamEntryState :: Completed ) ;
228
- }
229
- } ,
230
- }
221
+ } ,
222
+ pipeless:: config:: streams:: RestartPolicy :: OnError => {
223
+ if finish_state == pipeless:: pipeline:: PipelineEndReason :: Error {
224
+ entry. set_target_state ( pipeless:: config:: streams:: StreamEntryState :: Running ) ;
225
+ } else {
226
+ entry. set_target_state ( pipeless:: config:: streams:: StreamEntryState :: Error ) ;
227
+ }
228
+ } ,
229
+ pipeless:: config:: streams:: RestartPolicy :: OnEos => {
230
+ if finish_state == pipeless:: pipeline:: PipelineEndReason :: Completed {
231
+ entry. set_target_state ( pipeless:: config:: streams:: StreamEntryState :: Running ) ;
232
+ } else {
233
+ entry. set_target_state ( pipeless:: config:: streams:: StreamEntryState :: Completed ) ;
234
+ }
235
+ } ,
236
+ }
231
237
232
- // Create new event since we have modified the streams config table
233
- if let Err ( err) = dispatcher_sender. send ( DispatcherEvent :: TableChange ) {
234
- warn ! ( "Unable to send dispatcher event for streams table changed. Error: {}" , err. to_string( ) ) ;
238
+ // Create new event since we have modified the streams config table
239
+ if let Err ( err) = dispatcher_sender. send ( DispatcherEvent :: TableChange ) {
240
+ warn ! ( "Unable to send dispatcher event for streams table changed. Error: {}" , err. to_string( ) ) ;
241
+ }
242
+ } else {
243
+ warn ! ( "
244
+ Unable to unassign pipeline for stream. Stream entry not found.
245
+ Pipeline id: {}
246
+ " , pipeline_id) ;
235
247
}
236
- } else {
237
- warn ! ( "
238
- Unable to unassign pipeline for stream. Stream entry not found.
239
- Pipeline id: {}
240
- " , pipeline_id) ;
241
248
}
249
+
250
+ pipeless:: event_exporters:: events:: export_stream_finished_event (
251
+ stream_uuid. unwrap_or_default ( ) ,
252
+ finish_state. to_string ( ) . as_str ( )
253
+ ) . await ;
242
254
}
243
255
}
244
256
}
0 commit comments