1
1
import os
2
+ import sys
2
3
import threading
4
+ import time
3
5
import traceback
4
6
from concurrent .futures import Future , ThreadPoolExecutor
5
7
from datetime import datetime
@@ -170,7 +172,6 @@ def submit_job(self, service, program, event=None):
170
172
item (Event, optional): The event item to process. Defaults to None.
171
173
"""
172
174
log_message = f"Submitting service { service .__name__ } to be executed"
173
- item_id = None
174
175
# Content services dont provide an event.
175
176
if event :
176
177
log_message += f" with { event .log_message } "
@@ -186,6 +187,95 @@ def submit_job(self, service, program, event=None):
186
187
sse_manager .publish_event ("event_update" , self .get_event_updates ())
187
188
future .add_done_callback (lambda f :self ._process_future (f , service ))
188
189
190
+ # For debugging purposes we can monitor the execution time of the service. (comment out above and uncomment below)
191
+ # def submit_job(self, service, program, event=None):
192
+ # """
193
+ # Submits a job to be executed by the service.
194
+
195
+ # Args:
196
+ # service (type): The service class to execute.
197
+ # program (Program): The program containing the service.
198
+ # item (Event, optional): The event item to process. Defaults to None.
199
+ # """
200
+ # log_message = f"Submitting service {service.__name__} to be executed"
201
+ # if event:
202
+ # log_message += f" with {event.log_message}"
203
+ # logger.debug(log_message)
204
+
205
+ # cancellation_event = threading.Event()
206
+ # executor = self._find_or_create_executor(service)
207
+
208
+ # # Add start time to track execution duration
209
+ # start_time = datetime.now()
210
+
211
+ # def _monitor_execution(future):
212
+ # """Monitor execution time and log if taking too long"""
213
+ # while not future.done():
214
+ # execution_time = (datetime.now() - start_time).total_seconds()
215
+ # if execution_time > 180: # 3 minutes
216
+ # current_thread = None
217
+ # for thread in threading.enumerate():
218
+ # if thread.name.startswith(service.__name__) and not thread.name.endswith('_monitor'):
219
+ # current_thread = thread
220
+ # break
221
+
222
+ # if current_thread:
223
+ # # Get stack frames for the worker thread
224
+ # frames = sys._current_frames()
225
+ # thread_frame = None
226
+ # for thread_id, frame in frames.items():
227
+ # if thread_id == current_thread.ident:
228
+ # thread_frame = frame
229
+ # break
230
+
231
+ # if thread_frame:
232
+ # stack_trace = ''.join(traceback.format_stack(thread_frame))
233
+ # else:
234
+ # stack_trace = "Could not get stack trace for worker thread"
235
+ # else:
236
+ # stack_trace = "Could not find worker thread"
237
+
238
+ # logger.warning(
239
+ # f"Service {service.__name__} execution taking longer than 3 minutes!\n"
240
+ # f"Event: {event.log_message if event else 'No event'}\n"
241
+ # f"Execution time: {execution_time:.1f} seconds\n"
242
+ # f"Thread name: {current_thread.name if current_thread else 'Unknown'}\n"
243
+ # f"Thread alive: {current_thread.is_alive() if current_thread else 'Unknown'}\n"
244
+ # f"Stack trace:\n{stack_trace}"
245
+ # )
246
+
247
+ # # Cancel the future and kill the thread
248
+ # future.cancellation_event.set()
249
+ # future.cancel()
250
+ # if current_thread:
251
+ # logger.warning(f"Killing thread {current_thread.name} due to timeout")
252
+ # self._futures.remove(future)
253
+ # if event:
254
+ # self.remove_event_from_running(event)
255
+ # return # Exit the monitoring thread
256
+
257
+ # time.sleep(60) # Check every minute
258
+
259
+ # future = executor.submit(db_functions.run_thread_with_db_item,
260
+ # program.all_services[service].run,
261
+ # service, program, event, cancellation_event)
262
+
263
+ # # Start monitoring thread
264
+ # monitor_thread = threading.Thread(
265
+ # target=_monitor_execution,
266
+ # args=(future,),
267
+ # name=f"{service.__name__}_monitor",
268
+ # daemon=True
269
+ # )
270
+ # monitor_thread.start()
271
+
272
+ # future.cancellation_event = cancellation_event
273
+ # if event:
274
+ # future.event = event
275
+ # self._futures.append(future)
276
+ # sse_manager.publish_event("event_update", self.get_event_updates())
277
+ # future.add_done_callback(lambda f: self._process_future(f, service))
278
+
189
279
def cancel_job (self , item_id : str , suppress_logs = False ):
190
280
"""
191
281
Cancels a job associated with the given item.
0 commit comments