Skip to content

Commit

Permalink
Merge branch 'tickets/DM-47591'
Browse files Browse the repository at this point in the history
  • Loading branch information
kfindeisen committed Nov 19, 2024
2 parents 5e87ac2 + b8a61bb commit 7ace24a
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 23 deletions.
27 changes: 27 additions & 0 deletions python/activator/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

__all__ = ["NonRetriableError", "RetriableError", "GracefulShutdownInterrupt",
"InvalidVisitError", "IgnorableVisit",
"InvalidPipelineError", "NoGoodPipelinesError",
"PipelinePreExecutionError", "PipelineExecutionError",
]


Expand Down Expand Up @@ -107,3 +109,28 @@ class IgnorableVisit(ValueError):
activator.visit.SummitVisit
activator.visit.FannedOutVisit
"""


class InvalidPipelineError(ValueError):
"""Exception raised if a pipeline cannot be loaded or configured.
"""


class NoGoodPipelinesError(RuntimeError):
"""Exception raised if none of the configured pipelines could be run on
the data.
"""


class PipelinePreExecutionError(RuntimeError):
"""Exception raised if a pipeline could not be prepared for execution.
Usually chained to an internal exception.
"""


class PipelineExecutionError(RuntimeError):
"""Exception raised if a pipeline attempted to run, but failed.
Usually chained to an internal exception.
"""
72 changes: 49 additions & 23 deletions python/activator/middleware_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@

from .caching import DatasetCache
from .config import PipelinesConfig
from .exception import GracefulShutdownInterrupt, NonRetriableError, RetriableError
from .exception import GracefulShutdownInterrupt, NonRetriableError, RetriableError, \
InvalidPipelineError, NoGoodPipelinesError, PipelinePreExecutionError, PipelineExecutionError
from .visit import FannedOutVisit
from .timer import enforce_schema, time_this_to_bundle

Expand Down Expand Up @@ -560,7 +561,12 @@ def prep_butler(self) -> None:
self._transfer_data(all_datasets, calib_datasets)

with time_this_to_bundle(bundle, action_id, "prep_butlerPreprocessTime"):
self._run_preprocessing()
try:
self._run_preprocessing()
except NoGoodPipelinesError:
_log.exception("Preprocessing pipelines not runnable, trying main pipelines anyway.")
except (PipelinePreExecutionError, PipelineExecutionError):
_log.exception("Preprocessing pipeline failed, trying main pipelines anyway.")

# IMPORTANT: do not remove or rename entries in this list. New entries can be added as needed.
enforce_schema(bundle, {action_id: ["prep_butlerTotalTime",
Expand Down Expand Up @@ -1226,17 +1232,21 @@ def _try_pipelines(self, pipelines, in_collections, data_ids, *, label):
Raises
------
RuntimeError
Raised if any pipeline could not be loaded/configured, or if graph
generation failed for all pipelines.
TODO: could be a good case for a custom exception here.
activator.exception.InvalidPipelineError
Raised if any pipeline could not be loaded/configured.
activator.exception.NoGoodPipelinesError
Raised if graph generation failed for all pipelines.
activator.exception.PipelinePreExecutionError
Raised if pipeline execution was attempted but pre-execution failed.
activator.exception.PipelineExecutionError
Raised if pipeline execution was attempted but failed.
"""
# Try pipelines in order until one works.
for pipeline_file in pipelines:
try:
pipeline = self._prep_pipeline(pipeline_file)
except FileNotFoundError as e:
raise RuntimeError from e
raise InvalidPipelineError(f"Could not load {pipeline_file}.") from e
init_output_run = self._get_init_output_run(pipeline_file, self._day_obs)
output_run = self._get_output_run(pipeline_file, self._day_obs)
exec_butler = Butler(butler=self.butler,
Expand Down Expand Up @@ -1265,10 +1275,13 @@ def _try_pipelines(self, pipelines, in_collections, data_ids, *, label):
# Past this point, partial execution creates datasets.
# Don't retry -- either fail (raise) or break.

# If this is a fresh (local) repo, then types like calexp,
# *Diff_diaSrcTable, etc. have not been registered.
# TODO: after DM-38041, move pre-execution to one-time repo setup.
executor.pre_execute_qgraph(qgraph, register_dataset_types=True, save_init_outputs=True)
try:
# If this is a fresh (local) repo, then types like calexp,
# *Diff_diaSrcTable, etc. have not been registered.
# TODO: after DM-38041, move pre-execution to one-time repo setup.
executor.pre_execute_qgraph(qgraph, register_dataset_types=True, save_init_outputs=True)
except Exception as e:
raise PipelinePreExecutionError(f"PreExecInit failed for {pipeline_file}.") from e
_log.info(f"Running '{pipeline_file}' on {data_ids}")
try:
with lsst.utils.timer.time_this(
Expand All @@ -1279,13 +1292,14 @@ def _try_pipelines(self, pipelines, in_collections, data_ids, *, label):
)
_log.info(f"{label.capitalize()} pipeline successfully run.")
return init_output_run, output_run
except Exception as e:
raise PipelineExecutionError(f"Execution failed for {pipeline_file}.") from e
finally:
# Refresh so that registry queries know the processed products.
self.butler.registry.refresh()
break
else:
# TODO: a good place for a custom exception?
raise RuntimeError(f"No {label} pipeline graph could be built.")
raise NoGoodPipelinesError(f"No {label} pipeline graph could be built.")

def _run_preprocessing(self) -> None:
"""Preprocess a visit ahead of incoming image(s).
Expand All @@ -1295,10 +1309,14 @@ def _run_preprocessing(self) -> None:
Raises
------
RuntimeError
Raised if any pipeline could not be loaded/configured, or if graph
generation failed for all pipelines.
TODO: could be a good case for a custom exception here.
activator.exception.InvalidPipelineError
Raised if any pipeline could not be loaded/configured.
activator.exception.NoGoodPipelinesError
Raised if graph generation failed for all pipelines.
activator.exception.PipelinePreExecutionError
Raised if pipeline execution was attempted but pre-execution failed.
activator.exception.PipelineExecutionError
Raised if pipeline execution was attempted but failed.
"""
pipeline_files = self._get_pre_pipeline_files()
if not pipeline_files:
Expand Down Expand Up @@ -1357,16 +1375,22 @@ def run_pipeline(self, exposure_ids: set[int]) -> None:
Raises
------
RuntimeError
Raised if any pipeline could not be loaded/configured, or if graph
generation failed for all pipelines.
TODO: could be a good case for a custom exception here.
NonRetriableError
activator.exception.InvalidPipelineError
Raised if any pipeline could not be loaded/configured.
activator.exception.NoGoodPipelinesError
Raised if graph generation failed for all pipelines.
activator.exception.PipelinePreExecutionError
Raised if pipeline execution was attempted but pre-execution failed.
activator.exception.PipelineExecutionError
Raised if pipeline execution was attempted but failed, and neither
`~activator.exception.NonRetriableError` nor
`~activator.exception.RetriableError` apply.
activator.exception.NonRetriableError
Raised if external resources (such as the APDB or alert stream)
may have been left in a state that makes it unsafe to retry
failures. This exception is always chained to another exception
representing the original error.
RetriableError
activator.exception.RetriableError
Raised if the conditions for NonRetriableError are not met, *and*
the pipeline fails in a way that is expected to be transient. This
exception is always chained to another exception representing the
Expand Down Expand Up @@ -1409,6 +1433,8 @@ def run_pipeline(self, exposure_ids: set[int]) -> None:
raise NonRetriableError("APDB modified") from e
else:
raise RetriableError("External interrupt") from e
# Catch Exception just in case there's a surprise -- raising
# NonRetriableError on *all* irrevocable changes is important.
except Exception as e:
try:
state_changed = self._check_permanent_changes(where)
Expand Down

0 comments on commit 7ace24a

Please sign in to comment.