From 3fddf6467decebf023f426f464fc499431d89947 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Fri, 15 Nov 2024 11:04:11 -0800 Subject: [PATCH 1/2] Create unique exceptions for pipeline execution failures. These exceptions let us distinguish failures in loading or managing pipelines from failures in task execution. --- python/activator/exception.py | 27 ++++++++++ python/activator/middleware_interface.py | 65 ++++++++++++++++-------- 2 files changed, 70 insertions(+), 22 deletions(-) diff --git a/python/activator/exception.py b/python/activator/exception.py index 9fc173c8..3f5f6786 100644 --- a/python/activator/exception.py +++ b/python/activator/exception.py @@ -22,6 +22,8 @@ __all__ = ["NonRetriableError", "RetriableError", "GracefulShutdownInterrupt", "InvalidVisitError", "IgnorableVisit", + "InvalidPipelineError", "NoGoodPipelinesError", + "PipelinePreExecutionError", "PipelineExecutionError", ] @@ -107,3 +109,28 @@ class IgnorableVisit(ValueError): activator.visit.SummitVisit activator.visit.FannedOutVisit """ + + +class InvalidPipelineError(ValueError): + """Exception raised if a pipeline cannot be loaded or configured. + """ + + +class NoGoodPipelinesError(RuntimeError): + """Exception raised if none of the configured pipelines could be run on + the data. + """ + + +class PipelinePreExecutionError(RuntimeError): + """Exception raised if a pipeline could not be prepared for execution. + + Usually chained to an internal exception. + """ + + +class PipelineExecutionError(RuntimeError): + """Exception raised if a pipeline attempted to run, but failed. + + Usually chained to an internal exception. + """ diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index a8163971..599ea576 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -52,7 +52,8 @@ from .caching import DatasetCache from .config import PipelinesConfig -from .exception import GracefulShutdownInterrupt, NonRetriableError, RetriableError +from .exception import GracefulShutdownInterrupt, NonRetriableError, RetriableError, \ + InvalidPipelineError, NoGoodPipelinesError, PipelinePreExecutionError, PipelineExecutionError from .visit import FannedOutVisit from .timer import enforce_schema, time_this_to_bundle @@ -1226,17 +1227,21 @@ def _try_pipelines(self, pipelines, in_collections, data_ids, *, label): Raises ------ - RuntimeError - Raised if any pipeline could not be loaded/configured, or if graph - generation failed for all pipelines. - TODO: could be a good case for a custom exception here. + activator.exception.InvalidPipelineError + Raised if any pipeline could not be loaded/configured. + activator.exception.NoGoodPipelinesError + Raised if graph generation failed for all pipelines. + activator.exception.PipelinePreExecutionError + Raised if pipeline execution was attempted but pre-execution failed. + activator.exception.PipelineExecutionError + Raised if pipeline execution was attempted but failed. """ # Try pipelines in order until one works. for pipeline_file in pipelines: try: pipeline = self._prep_pipeline(pipeline_file) except FileNotFoundError as e: - raise RuntimeError from e + raise InvalidPipelineError(f"Could not load {pipeline_file}.") from e init_output_run = self._get_init_output_run(pipeline_file, self._day_obs) output_run = self._get_output_run(pipeline_file, self._day_obs) exec_butler = Butler(butler=self.butler, @@ -1265,10 +1270,13 @@ def _try_pipelines(self, pipelines, in_collections, data_ids, *, label): # Past this point, partial execution creates datasets. # Don't retry -- either fail (raise) or break. - # If this is a fresh (local) repo, then types like calexp, - # *Diff_diaSrcTable, etc. have not been registered. - # TODO: after DM-38041, move pre-execution to one-time repo setup. - executor.pre_execute_qgraph(qgraph, register_dataset_types=True, save_init_outputs=True) + try: + # If this is a fresh (local) repo, then types like calexp, + # *Diff_diaSrcTable, etc. have not been registered. + # TODO: after DM-38041, move pre-execution to one-time repo setup. + executor.pre_execute_qgraph(qgraph, register_dataset_types=True, save_init_outputs=True) + except Exception as e: + raise PipelinePreExecutionError(f"PreExecInit failed for {pipeline_file}.") from e _log.info(f"Running '{pipeline_file}' on {data_ids}") try: with lsst.utils.timer.time_this( @@ -1279,13 +1287,14 @@ def _try_pipelines(self, pipelines, in_collections, data_ids, *, label): ) _log.info(f"{label.capitalize()} pipeline successfully run.") return init_output_run, output_run + except Exception as e: + raise PipelineExecutionError(f"Execution failed for {pipeline_file}.") from e finally: # Refresh so that registry queries know the processed products. self.butler.registry.refresh() break else: - # TODO: a good place for a custom exception? - raise RuntimeError(f"No {label} pipeline graph could be built.") + raise NoGoodPipelinesError(f"No {label} pipeline graph could be built.") def _run_preprocessing(self) -> None: """Preprocess a visit ahead of incoming image(s). @@ -1295,10 +1304,14 @@ def _run_preprocessing(self) -> None: Raises ------ - RuntimeError - Raised if any pipeline could not be loaded/configured, or if graph - generation failed for all pipelines. - TODO: could be a good case for a custom exception here. + activator.exception.InvalidPipelineError + Raised if any pipeline could not be loaded/configured. + activator.exception.NoGoodPipelinesError + Raised if graph generation failed for all pipelines. + activator.exception.PipelinePreExecutionError + Raised if pipeline execution was attempted but pre-execution failed. + activator.exception.PipelineExecutionError + Raised if pipeline execution was attempted but failed. """ pipeline_files = self._get_pre_pipeline_files() if not pipeline_files: @@ -1357,16 +1370,22 @@ def run_pipeline(self, exposure_ids: set[int]) -> None: Raises ------ - RuntimeError - Raised if any pipeline could not be loaded/configured, or if graph - generation failed for all pipelines. - TODO: could be a good case for a custom exception here. - NonRetriableError + activator.exception.InvalidPipelineError + Raised if any pipeline could not be loaded/configured. + activator.exception.NoGoodPipelinesError + Raised if graph generation failed for all pipelines. + activator.exception.PipelinePreExecutionError + Raised if pipeline execution was attempted but pre-execution failed. + activator.exception.PipelineExecutionError + Raised if pipeline execution was attempted but failed, and neither + `~activator.exception.NonRetriableError` nor + `~activator.exception.RetriableError` apply. + activator.exception.NonRetriableError Raised if external resources (such as the APDB or alert stream) may have been left in a state that makes it unsafe to retry failures. This exception is always chained to another exception representing the original error. - RetriableError + activator.exception.RetriableError Raised if the conditions for NonRetriableError are not met, *and* the pipeline fails in a way that is expected to be transient. This exception is always chained to another exception representing the @@ -1409,6 +1428,8 @@ def run_pipeline(self, exposure_ids: set[int]) -> None: raise NonRetriableError("APDB modified") from e else: raise RetriableError("External interrupt") from e + # Catch Exception just in case there's a surprise -- raising + # NonRetriableError on *all* irrevocable changes is important. except Exception as e: try: state_changed = self._check_permanent_changes(where) From b8a61bbec9aa99e3a89f2726c0779c32b8e43673 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Fri, 15 Nov 2024 11:24:06 -0800 Subject: [PATCH 2/2] Catch pipeline failures in preprocessing. It may be possible to run main pipelines even if preprocessing fails or cannot be attempted; defer that decision to quantum graph generation. --- python/activator/middleware_interface.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 599ea576..c49cc559 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -561,7 +561,12 @@ def prep_butler(self) -> None: self._transfer_data(all_datasets, calib_datasets) with time_this_to_bundle(bundle, action_id, "prep_butlerPreprocessTime"): - self._run_preprocessing() + try: + self._run_preprocessing() + except NoGoodPipelinesError: + _log.exception("Preprocessing pipelines not runnable, trying main pipelines anyway.") + except (PipelinePreExecutionError, PipelineExecutionError): + _log.exception("Preprocessing pipeline failed, trying main pipelines anyway.") # IMPORTANT: do not remove or rename entries in this list. New entries can be added as needed. enforce_schema(bundle, {action_id: ["prep_butlerTotalTime",