diff --git a/.codespell-ignores b/.codespell-ignores index 446610f6aa9..c1bccebd1bb 100644 --- a/.codespell-ignores +++ b/.codespell-ignores @@ -3,3 +3,4 @@ usera nd studi referenc +livetime diff --git a/pyproject.toml b/pyproject.toml index 42f7ea9d9c7..ee2880dd6bc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -108,6 +108,7 @@ ctapipe-process = "ctapipe.tools.process:main" ctapipe-merge = "ctapipe.tools.merge:main" ctapipe-optimize-event-selection = "ctapipe.tools.optimize_event_selection:main" ctapipe-compute-irf = "ctapipe.tools.compute_irf:main" +ctapipe-create-dl3 = "ctapipe.tools.create_dl3:main" ctapipe-fileinfo = "ctapipe.tools.fileinfo:main" ctapipe-quickstart = "ctapipe.tools.quickstart:main" ctapipe-calculate-pixel-statistics = "ctapipe.tools.calculate_pixel_stats:main" diff --git a/src/ctapipe/conftest.py b/src/ctapipe/conftest.py index f4555044f4d..c5f56335e01 100644 --- a/src/ctapipe/conftest.py +++ b/src/ctapipe/conftest.py @@ -783,7 +783,7 @@ def irf_event_loader_test_config(): "energy_reconstructor": "ExtraTreesRegressor", "geometry_reconstructor": "HillasReconstructor", "gammaness_classifier": "ExtraTreesClassifier", - "EventQualityQuery": { + "EventQualitySelection": { "quality_criteria": [ ( "multiplicity 4", @@ -816,9 +816,17 @@ def irf_events_table(): N2 = 100 N = N1 + N2 epp = EventPreprocessor() - tab = epp.make_empty_table() - - ids, bulk, unitless = tab.colnames[:2], tab.colnames[2:-2], tab.colnames[-2:] + keep_columns, _, _ = epp.get_columns_keep_rename_scheme(None, True) + tab = epp.make_empty_table(keep_columns) + + ids, bulk, unitless = [], [], [] + for c in tab.columns: + if "id" in c: + ids.append(c) + elif tab[c].unit == u.dimensionless_unscaled: + unitless.append(c) + else: + bulk.append(c) id_tab = QTable( data=np.zeros((N, len(ids)), dtype=np.uint64), diff --git a/src/ctapipe/irf/__init__.py b/src/ctapipe/irf/__init__.py index dbbd812e1c1..8270690b35a 100644 --- a/src/ctapipe/irf/__init__.py +++ b/src/ctapipe/irf/__init__.py @@ -18,6 +18,7 @@ check_bins_in_range, make_bins_per_decade, ) +from .cuts import EventQualitySelection, EventSelection from .irfs import ( BackgroundRate2dMaker, EffectiveArea2dMaker, @@ -56,4 +57,6 @@ "FLUX_UNIT", "check_bins_in_range", "make_bins_per_decade", + "EventSelection", + "EventQualitySelection", ] diff --git a/src/ctapipe/irf/cuts/__init__.py b/src/ctapipe/irf/cuts/__init__.py new file mode 100644 index 00000000000..2ade3dc5524 --- /dev/null +++ b/src/ctapipe/irf/cuts/__init__.py @@ -0,0 +1,4 @@ +from .quality_cuts import EventQualitySelection +from .selection_cuts import EventSelection + +__all__ = ["EventSelection", "EventQualitySelection"] diff --git a/src/ctapipe/irf/cuts/quality_cuts.py b/src/ctapipe/irf/cuts/quality_cuts.py new file mode 100644 index 00000000000..70eb18bafcd --- /dev/null +++ b/src/ctapipe/irf/cuts/quality_cuts.py @@ -0,0 +1,58 @@ +from astropy.table import Table + +from ...core import QualityQuery +from ...core.traits import List, Tuple, Unicode + + +class EventQualitySelection(QualityQuery): + """ + Event pre-selection quality criteria for IRF and DL3 computation with different defaults. + """ + + quality_criteria = List( + Tuple(Unicode(), Unicode()), + default_value=[ + ( + "multiplicity 4", + "np.count_nonzero(HillasReconstructor_telescopes,axis=1) >= 4", + ), + ("valid classifier", "RandomForestClassifier_is_valid"), + ("valid geom reco", "HillasReconstructor_is_valid"), + ("valid energy reco", "RandomForestRegressor_is_valid"), + ], + help=QualityQuery.quality_criteria.help, + ).tag(config=True) + + def calculate_selection(self, events: Table): + """ + Add the selection columns to the events, will only compute quality selection + + Parameters + ---------- + events: Table + The table containing the events on which selection need to be applied + + Returns + ------- + Table + events with selection columns added. + """ + return self.calculate_quality_selection(events) + + def calculate_quality_selection(self, events: Table): + """ + Add the selection columns to the events, will only compute quality selection + + Parameters + ---------- + events: Table + The table containing the events on which selection need to be applied + + Returns + ------- + Table + events with selection columns added. + """ + events["selected_quality"] = self.get_table_mask(events) + events["selected"] = events["selected_quality"] + return events diff --git a/src/ctapipe/irf/cuts/selection_cuts.py b/src/ctapipe/irf/cuts/selection_cuts.py new file mode 100644 index 00000000000..f37c3d1c3b9 --- /dev/null +++ b/src/ctapipe/irf/cuts/selection_cuts.py @@ -0,0 +1,88 @@ +import operator + +from astropy.table import Table +from pyirf.cuts import evaluate_binned_cut + +from ...core.traits import Path +from ..optimize.results import OptimizationResult +from .quality_cuts import EventQualitySelection + +__all__ = ["EventSelection"] + + +class EventSelection(EventQualitySelection): + """ + Event selection quality and gammaness criteria for IRF and DL3 + """ + + cuts_file = Path( + allow_none=False, + directory_ok=False, + exists=True, + help="Path to the cuts file to apply to the observation.", + ).tag(config=True) + + def __init__(self, config=None, parent=None, **kwargs): + super().__init__(config=config, parent=parent, **kwargs) + self.cuts = OptimizationResult.read(self.cuts_file) + + def calculate_selection(self, events: Table, apply_spatial_selection: bool = False): + """ + Add the selection columns to the events + + Parameters + ---------- + events: Table + The table containing the events on which selection need to be applied + apply_spatial_selection: bool + True if the theta cuts should be applied + + Returns + ------- + Table + events with selection columns added. + """ + events = self.calculate_quality_selection(events) + events = self.calculate_gamma_selection(events, apply_spatial_selection) + events["selected"] = events["selected_quality"] & events["selected_gamma"] + return events + + def calculate_gamma_selection( + self, events: Table, apply_spatial_selection: bool = False + ) -> Table: + """ + Add the selection columns to the events, will compute only gamma criteria + + Parameters + ---------- + events: Table + The table containing the events on which selection need to be applied + apply_spatial_selection: bool + True if the theta cuts should be applied + + Returns + ------- + Table + events with selection columns added. + """ + + events["selected_gh"] = evaluate_binned_cut( + events["gh_score"], + events["reco_energy"], + self.cuts.gh_cuts, + operator.ge, + ) + + if apply_spatial_selection: + events["selected_theta"] = evaluate_binned_cut( + events["theta"], + events["reco_energy"], + self.cuts.spatial_selection_table, + operator.le, + ) + events["selected_gamma"] = events["selected_theta"] & events["selected_gh"] + else: + events["selected_gamma"] = events["selected_gh"] + + events["selected"] = events["selected_gamma"] + return events diff --git a/src/ctapipe/irf/dl3.py b/src/ctapipe/irf/dl3.py new file mode 100644 index 00000000000..efe527f73e4 --- /dev/null +++ b/src/ctapipe/irf/dl3.py @@ -0,0 +1,734 @@ +from abc import abstractmethod +from datetime import UTC, datetime +from typing import Any, Dict, List, Tuple + +import astropy.units as u +import numpy as np +from astropy.coordinates import ICRS, AltAz, EarthLocation, SkyCoord +from astropy.io import fits +from astropy.io.fits import Header +from astropy.io.fits.hdu.base import ExtensionHDU +from astropy.table import QTable +from astropy.time import Time, TimeDelta + +from ctapipe.compat import COPY_IF_NEEDED +from ctapipe.coordinates.tests.test_coordinates import location +from ctapipe.core import Component +from ctapipe.core.traits import AstroTime, Bool +from ctapipe.version import version as ctapipe_version + + +class DL3_Format(Component): + """ + Base class for writing a DL3 file + """ + + overwrite = Bool( + default_value=False, + help="If true, allow to overwrite already existing output file", + ).tag(config=True) + + optional_dl3_columns = Bool( + default_value=False, help="If true add optional columns to produce file" + ).tag(config=True) + + raise_error_for_optional = Bool( + default_value=True, + help="If true will raise error in the case optional column are missing", + ).tag(config=True) + + reference_time = AstroTime( + default_value=Time("2018-01-01T00:00:00", scale="tai"), + help="The reference time that will be used in the FITS file", + ).tag(config=True) + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self._obs_id = None + self._events = None + self._pointing = None + self._pointing_mode = None + self._gti = None + self._aeff = None + self._psf = None + self._edisp = None + self._bkg = None + self._dead_time_fraction = None + self._location = None + self._telescope_information = None + self._target_information = None + self._software_information = None + + @abstractmethod + def write_file(self, path): + pass + + @property + def obs_id(self) -> int: + return self._obs_id + + @obs_id.setter + def obs_id(self, obs_id: int): + """ + Parameters + ---------- + obs_id : int + Observation ID + """ + if self._obs_id is not None: + self.log.warning( + "Obs id for DL3 file was already set, replacing current obs id" + ) + self._obs_id = obs_id + + @property + def events(self) -> QTable: + return self._events + + @events.setter + def events(self, events: QTable): + """ + Parameters + ---------- + events : QTable + A table with a line for each event + """ + if self._events is not None: + self.log.warning( + "Events table for DL3 file was already set, replacing current event table" + ) + self._events = events + + @property + def pointing(self) -> List[Tuple[Time, SkyCoord]]: + return self._pointing + + @pointing.setter + def pointing(self, pointing: List[Tuple[Time, SkyCoord]]): + """ + Parameters + ---------- + pointing : QTable + A list with for each entry containing the time at which the coordinate where evaluated and the associated coordinates + """ + if self._pointing is not None: + self.log.warning( + "Pointing for DL3 file was already set, replacing current pointing" + ) + self._pointing = pointing + + @property + def pointing_mode(self) -> str: + return self._pointing_mode + + @pointing_mode.setter + def pointing_mode(self, pointing_mode: str): + """ + Parameters + ---------- + pointing_mode : str + The name of the pointing mode used for the observation + """ + if self._pointing_mode is not None: + self.log.warning( + "Pointing for DL3 file was already set, replacing current pointing" + ) + self._pointing_mode = pointing_mode + + @property + def gti(self) -> List[Tuple[Time, Time]]: + return self._gti + + @gti.setter + def gti(self, gti: List[Tuple[Time, Time]]): + """ + Parameters + ---------- + gti : QTable + A list with for each entry containing the time at which the coordinate where evaluated and + """ + if self._gti is not None: + self.log.warning("GTI for DL3 file was already set, replacing current gti") + self._gti = gti + + @property + def aeff(self) -> ExtensionHDU: + return self._aeff + + @aeff.setter + def aeff(self, aeff: ExtensionHDU): + """ + Parameters + ---------- + aeff : ExtensionHDU + The effective area HDU read from the fits file containing IRFs + """ + if self._aeff is not None: + self.log.warning( + "Effective area for DL3 file was already set, replacing current effective area" + ) + self._aeff = aeff + + @property + def psf(self) -> ExtensionHDU: + return self._psf + + @psf.setter + def psf(self, psf: ExtensionHDU): + """ + Parameters + ---------- + psf : ExtensionHDU + The PSF HDU read from the fits file containing IRFs + """ + if self._psf is not None: + self.log.warning("PSF for DL3 file was already set, replacing current PSF") + self._psf = psf + + @property + def edisp(self) -> ExtensionHDU: + return self._edisp + + @edisp.setter + def edisp(self, edisp: ExtensionHDU): + """ + Parameters + ---------- + edisp : ExtensionHDU + The EDISP HDU read from the fits file containing IRFs + """ + if self._edisp is not None: + self.log.warning( + "EDISP for DL3 file was already set, replacing current EDISP" + ) + self._edisp = edisp + + @property + def bkg(self) -> ExtensionHDU: + return self._bkg + + @bkg.setter + def bkg(self, bkg: ExtensionHDU): + """ + Parameters + ---------- + bkg : ExtensionHDU + The background HDU read from the fits file containing IRFs + """ + if self._bkg is not None: + self.log.warning( + "Background for DL3 file was already set, replacing current background" + ) + self._bkg = bkg + + @property + def location(self) -> EarthLocation: + return self._location + + @location.setter + def location(self, location: EarthLocation): + """ + Parameters + ---------- + location : EarthLocation + The location of the telescope + """ + if self._location is not None: + self.log.warning( + "Telescope location for DL3 file was already set, replacing current location" + ) + self._location = location + + @property + def dead_time_fraction(self) -> float: + return self._dead_time_fraction + + @dead_time_fraction.setter + def dead_time_fraction(self, dead_time_fraction: float): + """ + Parameters + ---------- + dead_time_fraction : float + The location of the telescope + """ + if self.dead_time_fraction is not None: + self.log.warning( + "Dead time fraction for DL3 file was already set, replacing current dead time fraction" + ) + self._dead_time_fraction = dead_time_fraction + + @property + def telescope_information(self) -> Dict[str, Any]: + return self._telescope_information + + @telescope_information.setter + def telescope_information(self, telescope_information: Dict[str, Any]): + """ + Parameters + ---------- + telescope_information : dict[str, any] + A dictionary containing general information about telescope with as key : organisation, array, subarray, telescope_list + """ + if self._telescope_information is not None: + self.log.warning( + "Telescope information for DL3 file was already set, replacing current information" + ) + self._telescope_information = telescope_information + + @property + def target_information(self) -> Dict[str, Any]: + return self._target_information + + @target_information.setter + def target_information(self, target_information: Dict[str, Any]): + """ + Parameters + ---------- + target_information : dict[str, any] + A dictionary containing general information about the targeted source with as key : observer, object_name, object_coordinate + """ + if self._target_information is not None: + self.log.warning( + "Target information for DL3 file was already set, replacing current target information" + ) + self._target_information = target_information + + @property + def software_information(self) -> Dict[str, Any]: + return self._software_information + + @software_information.setter + def software_information(self, software_information: Dict[str, Any]): + """ + Parameters + ---------- + software_information : dict[str, any] + A dictionary containing general information about the software used to produce the file with as key : analysis_version, calibration_version, dst_version + """ + if self._software_information is not None: + self.log.warning( + "Software information for DL3 file was already set, replacing current software information" + ) + self._software_information = software_information + + +class DL3_GADF(DL3_Format): + """ + Class to write DL3 in GADF format, subclass of DL3_Format + """ + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.file_creation_time = datetime.now(tz=UTC) + self._reference_time = self.reference_time.tai + + def write_file(self, path): + """ + This function will write the new DL3 file + All the content associated with the file should have been specified previously, otherwise error will be raised + + Parameters + ---------- + path : str + The full path and filename of the new file to write + """ + self.file_creation_time = datetime.now(tz=UTC) + + hdu_dl3 = fits.HDUList( + [fits.PrimaryHDU(header=Header(self.get_hdu_header_base_format()))] + ) + hdu_dl3.append( + fits.BinTableHDU( + data=self.transform_events_columns_for_gadf_format(self.events), + name="EVENTS", + header=Header(self.get_hdu_header_events()), + ) + ) + hdu_dl3.append( + fits.BinTableHDU( + data=self.create_gti_table(), + name="GTI", + header=Header(self.get_hdu_header_gti()), + ) + ) + hdu_dl3.append( + fits.BinTableHDU( + data=self.create_pointing_table(), + name="POINTING", + header=Header(self.get_hdu_header_pointing()), + ) + ) + + if self.aeff is None: + raise ValueError("Missing effective area IRF") + hdu_dl3.append(self.aeff) + hdu_dl3[-1].header["OBS_ID"] = self.obs_id + if self.psf is None: + raise ValueError("Missing PSF IRF") + hdu_dl3.append(self.psf) + hdu_dl3[-1].header["OBS_ID"] = self.obs_id + if self.edisp is None: + raise ValueError("Missing EDISP IRF") + hdu_dl3.append(self.edisp) + hdu_dl3[-1].header["OBS_ID"] = self.obs_id + if self.bkg is not None: + hdu_dl3.append(self.bkg) + hdu_dl3[-1].header["OBS_ID"] = self.obs_id + + hdu_dl3.writeto(path, checksum=True, overwrite=self.overwrite) + + def get_hdu_header_base_format(self) -> Dict[str, Any]: + """ + Return the base information that should be included in all HDU of the final fits file + """ + return { + "HDUCLASS": "GADF", + "HDUVERS": "v0.3", + "HDUDOC": "https://gamma-astro-data-formats.readthedocs.io/en/v0.3/index.html", + "CREATOR": "ctapipe " + ctapipe_version, + "CREATED": self.file_creation_time.isoformat(), + } + + def get_hdu_header_base_time(self) -> Dict[str, Any]: + """ + Return the information about time parameters used in several HDU + """ + if self.gti is None: + raise ValueError("No available time information for the DL3 file") + if self.dead_time_fraction is None: + raise ValueError("No available dead time fraction for the DL3 file") + start_time = None + stop_time = None + ontime = TimeDelta(0.0 * u.s) + for gti_interval in self.gti: + ontime += gti_interval[1] - gti_interval[0] + start_time = ( + gti_interval[0] + if start_time is None + else min(start_time, gti_interval[0]) + ) + stop_time = ( + gti_interval[1] + if stop_time is None + else max(stop_time, gti_interval[1]) + ) + + return { + "MJDREFI": int(self._reference_time.mjd), + "MJDREFF": self._reference_time.mjd % 1, + "TIMEUNIT": "s", + "TIMEREF": "GEOCENTER", + "TIMESYS": "TAI", + "TSTART": (start_time.tai - self._reference_time).to_value(u.s), + "TSTOP": (stop_time.tai - self._reference_time).to_value(u.s), + "ONTIME": ontime.to_value(u.s), + "LIVETIME": ontime.to_value(u.s) * self.dead_time_fraction, + "DEADC": self.dead_time_fraction, + "TELAPSE": (stop_time.tai - start_time.tai).to_value(u.s), + "DATE-OBS": start_time.tai.fits, + "DATE-BEG": start_time.tai.fits, + "DATE-AVG": (start_time.tai + (stop_time.tai - start_time.tai) / 2.0).fits, + "DATE-END": stop_time.tai.fits, + } + + def get_hdu_header_base_observation_information( + self, obs_id_only: bool = False + ) -> Dict[str, Any]: + """ + Return generic information on the observation setting (id, target, ...) + + Parameters + ---------- + obs_id_only : bool + If true, will return a dict with as only information the obs_id + """ + if self.obs_id is None: + raise ValueError("Observation ID is missing.") + header = {"OBS_ID": self.obs_id} + if self.target_information is not None and not obs_id_only: + header["OBSERVER"] = self.target_information["observer"] + header["OBJECT"] = self.target_information["object_name"] + object_coordinate = self.target_information[ + "object_coordinate" + ].transform_to(ICRS()) + header["RA_OBJ"] = ( + "nan" + if np.isnan(object_coordinate.ra.to_value(u.deg)) + else object_coordinate.ra.to_value(u.deg) + ) + header["DEC_OBJ"] = ( + "nan" + if np.isnan(object_coordinate.dec.to_value(u.deg)) + else object_coordinate.dec.to_value(u.deg) + ) + return header + + def get_hdu_header_base_subarray_information(self) -> Dict[str, Any]: + """ + Return generic information on the array used for observations + """ + if self.telescope_information is None: + raise ValueError("Telescope information are missing.") + header = { + "ORIGIN": self.telescope_information["organisation"], + "TELESCOP": self.telescope_information["array"], + "INSTRUME": self.telescope_information["subarray"], + "TELLIST": str(self.telescope_information["telescope_list"]), + "N_TELS": len(self.telescope_information["telescope_list"]), + } + return header + + def get_hdu_header_base_software_information(self) -> Dict[str, Any]: + """ + Return information about the software versions used to process the observation + """ + header = {} + if self.software_information is not None: + header["DST_VER"] = self.software_information["dst_version"] + header["ANA_VER"] = self.software_information["analysis_version"] + header["CAL_VER"] = self.software_information["calibration_version"] + return header + + def get_hdu_header_base_pointing(self) -> Dict[str, Any]: + """ + Return information on the pointing during the observation + """ + if self.pointing is None: + raise ValueError("Pointing information are missing") + if self.pointing_mode is None: + raise ValueError("Pointing mode is missing") + if self.location is None: + raise ValueError("Telescope location information are missing") + + gti_table = self.create_gti_table() + delta_time_evaluation = [] + for i in range(len(gti_table)): + delta_time_evaluation += list( + np.linspace(gti_table["START"][i], gti_table["STOP"][i], 100) + ) + delta_time_evaluation = u.Quantity(delta_time_evaluation) + time_evaluation = self._reference_time + TimeDelta(delta_time_evaluation) + + pointing_table = self.create_pointing_table() + if self.pointing_mode == "TRACK": + obs_mode = "POINTING" + icrs_coordinate = SkyCoord( + ra=np.interp( + delta_time_evaluation, + xp=pointing_table["TIME"], + fp=pointing_table["RA_PNT"], + ), + dec=np.interp( + delta_time_evaluation, + xp=pointing_table["TIME"], + fp=pointing_table["DEC_PNT"], + ), + ) + altaz_coordinate = icrs_coordinate.transform_to( + AltAz(location=self.location, obstime=time_evaluation) + ) + elif self.pointing_mode == "DRIFT": + obs_mode = "DRIFT" + altaz_coordinate = AltAz( + alt=np.interp( + delta_time_evaluation, + xp=pointing_table["TIME"], + fp=pointing_table["ALT_PNT"], + ), + az=np.interp( + delta_time_evaluation, + xp=pointing_table["TIME"], + fp=pointing_table["AZ_PNT"], + ), + location=self.location, + obstime=time_evaluation, + ) + icrs_coordinate = altaz_coordinate.transform_to(ICRS()) + else: + raise ValueError("Unknown pointing mode") + + header = { + "RADECSYS": "ICRS", + "EQUINOX": 2000.0, + "OBS_MODE": obs_mode, + "RA_PNT": np.mean(icrs_coordinate.ra.to_value(u.deg)), + "DEC_PNT": np.mean(icrs_coordinate.dec.to_value(u.deg)), + "ALT_PNT": np.mean(altaz_coordinate.alt.to_value(u.deg)), + "AZ_PNT": np.mean(altaz_coordinate.az.to_value(u.deg)), + "GEOLON": self.location.lon.to_value(u.deg), + "GEOLAT": self.location.lat.to_value(u.deg), + "ALTITUDE": self.location.height.to_value(u.m), + "OBSGEO-X": self.location.x.to_value(u.m), + "OBSGEO-Y": self.location.y.to_value(u.m), + "OBSGEO-Z": self.location.z.to_value(u.m), + } + return header + + def get_hdu_header_events(self) -> Dict[str, Any]: + """ + The output dictionary contain all the necessary information that should be added to the header of the events HDU + """ + header = self.get_hdu_header_base_format() + header.update({"HDUCLAS1": "EVENTS"}) + header.update(self.get_hdu_header_base_time()) + header.update(self.get_hdu_header_base_pointing()) + header.update(self.get_hdu_header_base_observation_information()) + header.update(self.get_hdu_header_base_subarray_information()) + header.update(self.get_hdu_header_base_software_information()) + return header + + def get_hdu_header_gti(self) -> Dict[str, Any]: + """ + The output dictionary contain all the necessary information that should be added to the header of the GTI HDU + """ + header = self.get_hdu_header_base_format() + header.update({"HDUCLAS1": "GTI"}) + header.update(self.get_hdu_header_base_time()) + header.update( + self.get_hdu_header_base_observation_information(obs_id_only=True) + ) + return header + + def get_hdu_header_pointing(self) -> Dict[str, Any]: + """ + The output dictionary contain all the necessary information that should be added to the header of the pointing HDU + """ + header = self.get_hdu_header_base_format() + header.update({"HDUCLAS1": "POINTING"}) + header.update(self.get_hdu_header_base_pointing()) + header.update( + self.get_hdu_header_base_observation_information(obs_id_only=True) + ) + return header + + def transform_events_columns_for_gadf_format(self, events: QTable) -> QTable: + """ + Return an event table containing only the columns that should be added to the EVENTS HDU + It also rename all the columns to match the name expected in the GADF format + + Parameters + ---------- + events : QTable + The base events table to process + """ + rename_from = ["event_id", "time", "reco_ra", "reco_dec", "reco_energy"] + rename_to = ["EVENT_ID", "TIME", "RA", "DEC", "ENERGY"] + + if self.optional_dl3_columns: + rename_from_optional = [ + "multiplicity", + "reco_glon", + "reco_glat", + "reco_alt", + "reco_az", + "reco_fov_lon", + "reco_fov_lat", + "reco_source_fov_offset", + "reco_source_fov_position_angle", + "gh_score", + "reco_dir_uncert", + "reco_energy_uncert", + "reco_core_x", + "reco_core_y", + "reco_core_uncert", + "reco_h_max", + "reco_h_max_uncert", + "reco_x_max", + "reco_x_max_uncert", + ] + rename_to_optional = [ + "MULTIP", + "GLON", + "GLAT", + "ALT", + "AZ", + "DETX", + "DETY", + "THETA", + "PHI", + "GAMANESS", + "DIR_ERR", + "ENERGY_ERR", + "COREX", + "COREY", + "CORE_ERR", + "HMAX", + "HMAX_ERR", + "XMAX", + "XMAX_ERR", + ] + + if self.raise_error_for_optional: + for i, c in enumerate(rename_from_optional): + if c not in events.colnames: + self.log.warning( + f"Optional column {c} is missing from the events table." + ) + else: + rename_from.append(rename_from_optional[i]) + rename_to.append(rename_to_optional[i]) + + for c in rename_from: + if c not in events.colnames: + raise ValueError( + f"Required column {c} is missing from the events table." + ) + + renamed_events = QTable(events, copy=COPY_IF_NEEDED) + renamed_events.rename_columns(rename_from, rename_to) + renamed_events = renamed_events[rename_to] + return renamed_events + + def create_gti_table(self) -> QTable: + """ + Build a table that contains GTI information with the GADF names and format, to be concerted directly as a TableHDU + """ + table_structure = {"START": [], "STOP": []} + for gti_interval in self.gti: + table_structure["START"].append( + (gti_interval[0].tai - self._reference_time).to(u.s) + ) + table_structure["STOP"].append( + (gti_interval[1].tai - self._reference_time).to(u.s) + ) + + QTable(table_structure).sort("START") + for i in range(len(table_structure) - 1): + if table_structure["STOP"][i] > table_structure["START"][i + 1]: + self.log.warning("Overlapping GTI intervals") + break + + return QTable(table_structure) + + def create_pointing_table(self) -> QTable: + """ + Build a table that contains pointing information with the GADF names and format, to be concerted directly as a TableHDU + """ + if self.pointing is None: + raise ValueError("Pointing information are missing") + if self.location is None: + raise ValueError("Telescope location information are missing") + + table_structure = { + "TIME": [], + "RA_PNT": [], + "DEC_PNT": [], + "ALT_PNT": [], + "AZ_PNT": [], + } + + for pointing in self.pointing: + time = pointing[0] + pointing_icrs = pointing[1].transform_to(ICRS()) + pointing_altaz = pointing[1].transform_to( + AltAz(location=location, obstime=time) + ) + table_structure["TIME"].append((time.tai - self._reference_time).to(u.s)) + table_structure["RA_PNT"].append(pointing_icrs.ra.to(u.deg)) + table_structure["DEC_PNT"].append(pointing_icrs.dec.to(u.deg)) + table_structure["ALT_PNT"].append(pointing_altaz.alt.to(u.deg)) + table_structure["AZ_PNT"].append(pointing_altaz.az.to(u.deg)) + + table = QTable(table_structure) + table.sort("TIME") + return table diff --git a/src/ctapipe/irf/optimize/__init__.py b/src/ctapipe/irf/optimize/__init__.py new file mode 100644 index 00000000000..e1ccdd75b83 --- /dev/null +++ b/src/ctapipe/irf/optimize/__init__.py @@ -0,0 +1,15 @@ +from .algorithm import ( + GhPercentileCutCalculator, + PercentileCuts, + PointSourceSensitivityOptimizer, + ThetaPercentileCutCalculator, +) +from .results import OptimizationResult + +__all__ = [ + "OptimizationResult", + "GhPercentileCutCalculator", + "PercentileCuts", + "PointSourceSensitivityOptimizer", + "ThetaPercentileCutCalculator", +] diff --git a/src/ctapipe/irf/optimize.py b/src/ctapipe/irf/optimize/algorithm.py similarity index 68% rename from src/ctapipe/irf/optimize.py rename to src/ctapipe/irf/optimize/algorithm.py index fd8f9e9ad52..a117d4aea94 100644 --- a/src/ctapipe/irf/optimize.py +++ b/src/ctapipe/irf/optimize/algorithm.py @@ -2,147 +2,28 @@ import operator from abc import abstractmethod -from collections.abc import Sequence import astropy.units as u import numpy as np -from astropy.io import fits -from astropy.table import QTable, Table +from astropy.table import QTable from pyirf.cut_optimization import optimize_gh_cut from pyirf.cuts import calculate_percentile_cut, evaluate_binned_cut -from ..core import Component, QualityQuery -from ..core.traits import AstroQuantity, Float, Integer, Path -from .binning import DefaultRecoEnergyBins, ResultValidRange -from .preprocessing import EventQualityQuery +from ...core import Component +from ...core.traits import AstroQuantity, Float, Integer +from ..binning import DefaultRecoEnergyBins +from ..cuts import EventQualitySelection +from .results import OptimizationResult __all__ = [ "CutOptimizerBase", "GhPercentileCutCalculator", - "OptimizationResult", "PercentileCuts", "PointSourceSensitivityOptimizer", "ThetaPercentileCutCalculator", ] -class OptimizationResult: - """Result of an optimization of G/H and theta cuts or only G/H cuts.""" - - def __init__( - self, - valid_energy_min: u.Quantity, - valid_energy_max: u.Quantity, - valid_offset_min: u.Quantity, - valid_offset_max: u.Quantity, - gh_cuts: QTable, - clf_prefix: str, - spatial_selection_table: QTable | None = None, - quality_query: QualityQuery | Sequence | None = None, - ) -> None: - if quality_query: - if isinstance(quality_query, QualityQuery): - if len(quality_query.quality_criteria) == 0: - quality_query.quality_criteria = [ - (" ", " ") - ] # Ensures table serialises properly - - self.quality_query = quality_query - elif isinstance(quality_query, list): - self.quality_query = QualityQuery(quality_criteria=quality_query) - else: - self.quality_query = QualityQuery(quality_criteria=list(quality_query)) - else: - self.quality_query = QualityQuery(quality_criteria=[(" ", " ")]) - - self.valid_energy = ResultValidRange(min=valid_energy_min, max=valid_energy_max) - self.valid_offset = ResultValidRange(min=valid_offset_min, max=valid_offset_max) - self.gh_cuts = gh_cuts - self.clf_prefix = clf_prefix - self.spatial_selection_table = spatial_selection_table - - def __repr__(self): - if self.spatial_selection_table is not None: - return ( - f"" - ) - else: - return ( - f"" - ) - - def write(self, output_name: Path | str, overwrite: bool = False) -> None: - """Write an ``OptimizationResult`` to a file in FITS format.""" - - cut_expr_tab = Table( - rows=self.quality_query.quality_criteria, - names=["name", "cut_expr"], - dtype=[np.str_, np.str_], - ) - cut_expr_tab.meta["EXTNAME"] = "QUALITY_CUTS_EXPR" - - self.gh_cuts.meta["EXTNAME"] = "GH_CUTS" - self.gh_cuts.meta["CLFNAME"] = self.clf_prefix - - energy_lim_tab = QTable( - rows=[[self.valid_energy.min, self.valid_energy.max]], - names=["energy_min", "energy_max"], - ) - energy_lim_tab.meta["EXTNAME"] = "VALID_ENERGY" - - offset_lim_tab = QTable( - rows=[[self.valid_offset.min, self.valid_offset.max]], - names=["offset_min", "offset_max"], - ) - offset_lim_tab.meta["EXTNAME"] = "VALID_OFFSET" - - results = [cut_expr_tab, self.gh_cuts, energy_lim_tab, offset_lim_tab] - - if self.spatial_selection_table is not None: - self.spatial_selection_table.meta["EXTNAME"] = "RAD_MAX" - results.append(self.spatial_selection_table) - - # Overwrite if needed and allowed - results[0].write(output_name, format="fits", overwrite=overwrite) - - for table in results[1:]: - table.write(output_name, format="fits", append=True) - - @classmethod - def read(cls, file_name): - """Read an ``OptimizationResult`` from a file in FITS format.""" - - with fits.open(file_name) as hdul: - cut_expr_tab = Table.read(hdul[1]) - cut_expr_lst = [(name, expr) for name, expr in cut_expr_tab.iterrows()] - if (" ", " ") in cut_expr_lst: - cut_expr_lst.remove((" ", " ")) - - quality_query = QualityQuery(quality_criteria=cut_expr_lst) - gh_cuts = QTable.read(hdul[2]) - valid_energy = QTable.read(hdul[3]) - valid_offset = QTable.read(hdul[4]) - spatial_selection_table = QTable.read(hdul[5]) if len(hdul) > 5 else None - - return cls( - quality_query=quality_query, - valid_energy_min=valid_energy["energy_min"], - valid_energy_max=valid_energy["energy_max"], - valid_offset_min=valid_offset["offset_min"], - valid_offset_max=valid_offset["offset_max"], - gh_cuts=gh_cuts, - clf_prefix=gh_cuts.meta["CLFNAME"], - spatial_selection_table=spatial_selection_table, - ) - - class CutOptimizerBase(DefaultRecoEnergyBins): """Base class for cut optimization algorithms.""" @@ -168,7 +49,7 @@ def _check_events(self, events: dict[str, QTable]): def __call__( self, events: dict[str, QTable], - quality_query: EventQualityQuery, + quality_query: EventQualitySelection, clf_prefix: str, ) -> OptimizationResult: """ @@ -181,7 +62,7 @@ def __call__( Dictionary containing tables of events used for calculating cuts. This has to include "signal" events and can include "background" events. quality_query: ctapipe.irf.EventPreprocessor - ``ctapipe.core.QualityQuery`` subclass containing preselection + ``ctapipe.irf.cuts.EventQualitySelection`` subclass containing preselection criteria for events. clf_prefix: str Prefix of the output from the G/H classifier for which the @@ -305,7 +186,7 @@ def __init__(self, config=None, parent=None, **kwargs): def __call__( self, events: dict[str, QTable], - quality_query: EventQualityQuery, + quality_query: EventQualitySelection, clf_prefix: str, ) -> OptimizationResult: self._check_events(events) @@ -328,7 +209,7 @@ def __call__( ) result = OptimizationResult( - quality_query=quality_query, + quality_selection=quality_query, gh_cuts=gh_cuts, clf_prefix=clf_prefix, valid_energy_min=self.reco_energy_min, @@ -393,7 +274,7 @@ def __init__(self, config=None, parent=None, **kwargs): def __call__( self, events: dict[str, QTable], - quality_query: EventQualityQuery, + quality_query: EventQualitySelection, clf_prefix: str, ) -> OptimizationResult: self._check_events(events) @@ -454,7 +335,7 @@ def __call__( ) result = OptimizationResult( - quality_query=quality_query, + quality_selection=quality_query, gh_cuts=gh_cuts, clf_prefix=clf_prefix, valid_energy_min=valid_energy[0], diff --git a/src/ctapipe/irf/optimize/results.py b/src/ctapipe/irf/optimize/results.py new file mode 100644 index 00000000000..83b1968ac13 --- /dev/null +++ b/src/ctapipe/irf/optimize/results.py @@ -0,0 +1,133 @@ +from typing import Sequence + +import numpy as np +from astropy import units as u +from astropy.io import fits +from astropy.table import QTable, Table + +from ...core.traits import Path +from ...irf import ResultValidRange +from ..cuts.quality_cuts import EventQualitySelection + + +class OptimizationResult: + """Result of an optimization of G/H and theta cuts or only G/H cuts.""" + + def __init__( + self, + valid_energy_min: u.Quantity, + valid_energy_max: u.Quantity, + valid_offset_min: u.Quantity, + valid_offset_max: u.Quantity, + gh_cuts: QTable, + clf_prefix: str, + spatial_selection_table: QTable | None = None, + quality_selection: EventQualitySelection | Sequence | None = None, + ) -> None: + if quality_selection: + if isinstance(quality_selection, EventQualitySelection): + if len(quality_selection.quality_criteria) == 0: + quality_selection.quality_criteria = [ + (" ", " ") + ] # Ensures table serialises properly + + self.quality_selection = quality_selection + elif isinstance(quality_selection, list): + self.quality_selection = EventQualitySelection( + quality_criteria=quality_selection + ) + else: + self.quality_selection = EventQualitySelection( + quality_criteria=list(quality_selection) + ) + else: + self.quality_selection = EventQualitySelection( + quality_criteria=[(" ", " ")] + ) + + self.valid_energy = ResultValidRange(min=valid_energy_min, max=valid_energy_max) + self.valid_offset = ResultValidRange(min=valid_offset_min, max=valid_offset_max) + self.gh_cuts = gh_cuts + self.clf_prefix = clf_prefix + self.spatial_selection_table = spatial_selection_table + + def __repr__(self): + if self.spatial_selection_table is not None: + return ( + f"" + ) + else: + return ( + f"" + ) + + def write(self, output_name: Path | str, overwrite: bool = False) -> None: + """Write an ``OptimizationResult`` to a file in FITS format.""" + + cut_expr_tab = Table( + rows=self.quality_selection.quality_criteria, + names=["name", "cut_expr"], + dtype=[np.str_, np.str_], + ) + cut_expr_tab.meta["EXTNAME"] = "QUALITY_CUTS_EXPR" + + self.gh_cuts.meta["EXTNAME"] = "GH_CUTS" + self.gh_cuts.meta["CLFNAME"] = self.clf_prefix + + energy_lim_tab = QTable( + rows=[[self.valid_energy.min, self.valid_energy.max]], + names=["energy_min", "energy_max"], + ) + energy_lim_tab.meta["EXTNAME"] = "VALID_ENERGY" + + offset_lim_tab = QTable( + rows=[[self.valid_offset.min, self.valid_offset.max]], + names=["offset_min", "offset_max"], + ) + offset_lim_tab.meta["EXTNAME"] = "VALID_OFFSET" + + results = [cut_expr_tab, self.gh_cuts, energy_lim_tab, offset_lim_tab] + + if self.spatial_selection_table is not None: + self.spatial_selection_table.meta["EXTNAME"] = "RAD_MAX" + results.append(self.spatial_selection_table) + + # Overwrite if needed and allowed + results[0].write(output_name, format="fits", overwrite=overwrite) + + for table in results[1:]: + table.write(output_name, format="fits", append=True) + + @classmethod + def read(cls, file_name): + """Read an ``OptimizationResult`` from a file in FITS format.""" + + with fits.open(file_name) as hdul: + cut_expr_tab = Table.read(hdul[1]) + cut_expr_lst = [(name, expr) for name, expr in cut_expr_tab.iterrows()] + if (" ", " ") in cut_expr_lst: + cut_expr_lst.remove((" ", " ")) + + quality_query = EventQualitySelection(quality_criteria=cut_expr_lst) + gh_cuts = QTable.read(hdul[2]) + valid_energy = QTable.read(hdul[3]) + valid_offset = QTable.read(hdul[4]) + spatial_selection_table = QTable.read(hdul[5]) if len(hdul) > 5 else None + + return cls( + quality_selection=quality_query, + valid_energy_min=valid_energy["energy_min"], + valid_energy_max=valid_energy["energy_max"], + valid_offset_min=valid_offset["offset_min"], + valid_offset_max=valid_offset["offset_max"], + gh_cuts=gh_cuts, + clf_prefix=gh_cuts.meta["CLFNAME"], + spatial_selection_table=spatial_selection_table, + ) diff --git a/src/ctapipe/irf/tests/test_optimize.py b/src/ctapipe/irf/optimize/test/test_optimize_algorithm.py similarity index 57% rename from src/ctapipe/irf/tests/test_optimize.py rename to src/ctapipe/irf/optimize/test/test_optimize_algorithm.py index 366c7ca334a..e1126c17571 100644 --- a/src/ctapipe/irf/tests/test_optimize.py +++ b/src/ctapipe/irf/optimize/test/test_optimize_algorithm.py @@ -1,45 +1,9 @@ import astropy.units as u import numpy as np import pytest -from astropy.table import QTable -from ctapipe.core import QualityQuery, non_abstract_children -from ctapipe.irf.optimize import CutOptimizerBase - - -def test_optimization_result(tmp_path, irf_event_loader_test_config): - from ctapipe.irf import ( - EventPreprocessor, - OptimizationResult, - ResultValidRange, - ) - - result_path = tmp_path / "result.h5" - epp = EventPreprocessor(irf_event_loader_test_config) - gh_cuts = QTable( - data=[[0.2, 0.8, 1.5] * u.TeV, [0.8, 1.5, 10] * u.TeV, [0.82, 0.91, 0.88]], - names=["low", "high", "cut"], - ) - result = OptimizationResult( - quality_query=epp.quality_query, - gh_cuts=gh_cuts, - clf_prefix="ExtraTreesClassifier", - valid_energy_min=0.2 * u.TeV, - valid_energy_max=10 * u.TeV, - valid_offset_min=0 * u.deg, - valid_offset_max=np.inf * u.deg, - spatial_selection_table=None, - ) - result.write(result_path) - assert result_path.exists() - - loaded = OptimizationResult.read(result_path) - assert isinstance(loaded, OptimizationResult) - assert isinstance(loaded.quality_query, QualityQuery) - assert isinstance(loaded.valid_energy, ResultValidRange) - assert isinstance(loaded.valid_offset, ResultValidRange) - assert isinstance(loaded.gh_cuts, QTable) - assert loaded.clf_prefix == "ExtraTreesClassifier" +from ctapipe.core import non_abstract_children +from ctapipe.irf.optimize.algorithm import CutOptimizerBase def test_gh_percentile_cut_calculator(): @@ -90,28 +54,22 @@ def test_cut_optimizer( from ctapipe.irf import EventLoader, OptimizationResult, Spectra gamma_loader = EventLoader( - config=irf_event_loader_test_config, file=gamma_diffuse_full_reco_file, target_spectrum=Spectra.CRAB_HEGRA, + config=irf_event_loader_test_config, ) - gamma_events, _, _ = gamma_loader.load_preselected_events( - chunk_size=10000, - obs_time=u.Quantity(50, u.h), - ) + gamma_events = gamma_loader.load_preselected_events(chunk_size=10000) proton_loader = EventLoader( - config=irf_event_loader_test_config, file=proton_full_reco_file, target_spectrum=Spectra.IRFDOC_PROTON_SPECTRUM, + config=irf_event_loader_test_config, ) - proton_events, _, _ = proton_loader.load_preselected_events( - chunk_size=10000, - obs_time=u.Quantity(50, u.h), - ) + proton_events = proton_loader.load_preselected_events(chunk_size=10000) optimizer = Optimizer() result = optimizer( events={"signal": gamma_events, "background": proton_events}, - quality_query=gamma_loader.epp.quality_query, # identical qualityquery for all particle types + quality_query=gamma_loader.epp.event_selection, # identical qualityquery for all particle types clf_prefix="ExtraTreesClassifier", ) assert isinstance(result, OptimizationResult) diff --git a/src/ctapipe/irf/optimize/test/test_optimize_results.py b/src/ctapipe/irf/optimize/test/test_optimize_results.py new file mode 100644 index 00000000000..b9ec78bb9bf --- /dev/null +++ b/src/ctapipe/irf/optimize/test/test_optimize_results.py @@ -0,0 +1,40 @@ +import numpy as np +from astropy import units as u +from astropy.table import QTable + +from ctapipe.irf.cuts import EventQualitySelection + + +def test_optimization_result(tmp_path, irf_event_loader_test_config): + from ctapipe.irf import ( + EventPreprocessor, + OptimizationResult, + ResultValidRange, + ) + + result_path = tmp_path / "result.h5" + epp = EventPreprocessor(config=irf_event_loader_test_config) + gh_cuts = QTable( + data=[[0.2, 0.8, 1.5] * u.TeV, [0.8, 1.5, 10] * u.TeV, [0.82, 0.91, 0.88]], + names=["low", "high", "cut"], + ) + result = OptimizationResult( + quality_selection=epp.event_selection, + gh_cuts=gh_cuts, + clf_prefix="ExtraTreesClassifier", + valid_energy_min=0.2 * u.TeV, + valid_energy_max=10 * u.TeV, + valid_offset_min=0 * u.deg, + valid_offset_max=np.inf * u.deg, + spatial_selection_table=None, + ) + result.write(result_path) + assert result_path.exists() + + loaded = OptimizationResult.read(result_path) + assert isinstance(loaded, OptimizationResult) + assert isinstance(loaded.quality_selection, EventQualitySelection) + assert isinstance(loaded.valid_energy, ResultValidRange) + assert isinstance(loaded.valid_offset, ResultValidRange) + assert isinstance(loaded.gh_cuts, QTable) + assert loaded.clf_prefix == "ExtraTreesClassifier" diff --git a/src/ctapipe/irf/preprocessing.py b/src/ctapipe/irf/preprocessing.py index 21653693793..e722cd5bddf 100644 --- a/src/ctapipe/irf/preprocessing.py +++ b/src/ctapipe/irf/preprocessing.py @@ -1,11 +1,13 @@ """Module containing classes related to event loading and preprocessing""" from pathlib import Path +from typing import Any, Dict import astropy.units as u import numpy as np -from astropy.coordinates import AltAz, SkyCoord +from astropy.coordinates import ICRS, AltAz, EarthLocation, Galactic, SkyCoord from astropy.table import Column, QTable, Table, vstack +from astropy.time import Time, TimeDelta from pyirf.simulations import SimulatedEventsInfo from pyirf.spectral import ( DIFFUSE_FLUX_UNIT, @@ -13,44 +15,33 @@ PowerLaw, calculate_event_weights, ) -from pyirf.utils import calculate_source_fov_offset, calculate_theta +from pyirf.utils import ( + calculate_source_fov_offset, + calculate_source_fov_position_angle, + calculate_theta, +) from tables import NoSuchNodeError +from ..atmosphere import AtmosphereDensityProfile from ..compat import COPY_IF_NEEDED -from ..containers import CoordinateFrameType +from ..containers import CoordinateFrameType, PointingMode from ..coordinates import NominalFrame -from ..core import Component, QualityQuery -from ..core.traits import List, Tuple, Unicode -from ..io import TableLoader +from ..core import Component +from ..core.traits import Bool, Unicode +from ..io import EventSource, TableLoader +from ..version import version as ctapipe_version +from .cuts import EventQualitySelection, EventSelection from .spectra import SPECTRA, Spectra -__all__ = ["EventLoader", "EventPreprocessor", "EventQualityQuery"] +__all__ = ["EventLoader", "EventPreprocessor"] - -class EventQualityQuery(QualityQuery): - """ - Event pre-selection quality criteria for IRF computation with different defaults. - """ - - quality_criteria = List( - Tuple(Unicode(), Unicode()), - default_value=[ - ( - "multiplicity 4", - "np.count_nonzero(HillasReconstructor_telescopes,axis=1) >= 4", - ), - ("valid classifier", "RandomForestClassifier_is_valid"), - ("valid geom reco", "HillasReconstructor_is_valid"), - ("valid energy reco", "RandomForestRegressor_is_valid"), - ], - help=QualityQuery.quality_criteria.help, - ).tag(config=True) +from ..io.astropy_helpers import join_allow_empty class EventPreprocessor(Component): """Defines pre-selection cuts and the necessary renaming of columns.""" - classes = [EventQualityQuery] + classes = [EventQualitySelection, EventSelection] energy_reconstructor = Unicode( default_value="RandomForestRegressor", @@ -67,12 +58,167 @@ class EventPreprocessor(Component): help="Prefix of the classifier `_prediction` column", ).tag(config=True) - def __init__(self, config=None, parent=None, **kwargs): + irf_pre_processing = Bool( + default_value=True, + help="If true the pre processing assume the purpose is for IRF production, if false, for DL3 production", + ).tag(config=True) + + optional_dl3_columns = Bool( + default_value=False, help="If true add optional columns to produce file" + ).tag(config=True) + + raise_error_for_optional = Bool( + default_value=True, + help="If true will raise error in the case optional column are missing", + ).tag(config=True) + + def __init__( + self, quality_selection_only: bool = True, config=None, parent=None, **kwargs + ): super().__init__(config=config, parent=parent, **kwargs) - self.quality_query = EventQualityQuery(parent=self) + if quality_selection_only: + self.event_selection = EventQualitySelection(parent=self) + else: + self.event_selection = EventSelection(parent=self) + + def get_columns_keep_rename_scheme( + self, events: Table = None, already_derived: bool = False + ): + """ + Function to get the columns to keep, and the scheme for renaming columns + """ + keep_columns = [ + "obs_id", + "event_id", + ] + if self.irf_pre_processing: + keep_columns += [ + "true_energy", + "true_az", + "true_alt", + ] + else: + keep_columns += [ + "time", + ] + + if already_derived: + rename_from = [] + rename_to = [] + keep_columns += ["reco_energy", "reco_az", "reco_alt", "gh_score"] + + if self.irf_pre_processing: + keep_columns += [ + "pointing_az", + "pointing_alt", + "reco_fov_lat", + "reco_fov_lon", + "theta", + "true_source_fov_offset", + "reco_source_fov_offset", + "weight", + ] + else: + keep_columns += ["reco_ra", "reco_dec"] + if self.optional_dl3_columns: + keep_columns_optional = [ + "multiplicity", + "reco_glon", + "reco_glat", + "reco_fov_lat", + "reco_fov_lon", + "reco_source_fov_offset", + "reco_source_fov_position_angle", + "reco_energy_uncert", + "reco_dir_uncert", + "reco_core_x", + "reco_core_y", + "reco_core_uncert", + "reco_h_max", + "reco_h_max_uncert", + "reco_x_max", + "reco_x_max_uncert", + ] + + if self.raise_error_for_optional: + if events is None: + raise ValueError( + "Require events table to assess existence of optional columns" + ) + for c in keep_columns_optional: + if c not in events.colnames: + self.log.warning( + f"Optional column {c} is missing from the events table." + ) + else: + keep_columns.append(c) + else: + keep_columns += keep_columns_optional + + else: + if self.optional_dl3_columns: + keep_columns += [ + "tels_with_trigger", + ] + + rename_from = [ + f"{self.energy_reconstructor}_energy", + f"{self.geometry_reconstructor}_az", + f"{self.geometry_reconstructor}_alt", + f"{self.gammaness_classifier}_prediction", + "subarray_pointing_lat", + "subarray_pointing_lon", + ] + rename_to = [ + "reco_energy", + "reco_az", + "reco_alt", + "gh_score", + "pointing_alt", + "pointing_az", + ] + if self.optional_dl3_columns: + rename_from_optional = [ + f"{self.energy_reconstructor}_energy_uncert", + f"{self.geometry_reconstructor}_ang_distance_uncert", + f"{self.geometry_reconstructor}_core_x", + f"{self.geometry_reconstructor}_core_y", + f"{self.geometry_reconstructor}_core_uncert_x", + f"{self.geometry_reconstructor}_core_uncert_y", + f"{self.geometry_reconstructor}_h_max", + f"{self.geometry_reconstructor}_h_max_uncert", + ] + rename_to_optional = [ + "reco_energy_uncert", + "reco_dir_uncert", + "reco_core_x", + "reco_core_y", + "reco_core_uncert_x", + "reco_core_uncert_y", + "reco_h_max", + "reco_h_max_uncert", + ] + if not self.raise_error_for_optional: + if events is None: + raise ValueError( + "Require events table to assess existence of optional columns" + ) + for i, c in enumerate(rename_from_optional): + if c not in events.colnames: + self.log.warning( + f"Optional column {c} is missing from the DL2 file." + ) + else: + rename_from.append(rename_from_optional[i]) + rename_to.append(rename_to_optional[i]) + else: + rename_from += rename_from_optional + rename_to += rename_to_optional + + return keep_columns, rename_from, rename_to def normalise_column_names(self, events: Table) -> QTable: - if events["subarray_pointing_lat"].std() > 1e-3: + if self.irf_pre_processing and events["subarray_pointing_lat"].std() > 1e-3: raise NotImplementedError( "No support for making irfs from varying pointings yet" ) @@ -81,29 +227,9 @@ def normalise_column_names(self, events: Table) -> QTable: "At the moment only pointing in altaz is supported." ) - keep_columns = [ - "obs_id", - "event_id", - "true_energy", - "true_az", - "true_alt", - ] - rename_from = [ - f"{self.energy_reconstructor}_energy", - f"{self.geometry_reconstructor}_az", - f"{self.geometry_reconstructor}_alt", - f"{self.gammaness_classifier}_prediction", - "subarray_pointing_lat", - "subarray_pointing_lon", - ] - rename_to = [ - "reco_energy", - "reco_az", - "reco_alt", - "gh_score", - "pointing_alt", - "pointing_az", - ] + keep_columns, rename_from, rename_to = self.get_columns_keep_rename_scheme( + events + ) keep_columns.extend(rename_from) for c in keep_columns: if c not in events.colnames: @@ -112,11 +238,122 @@ def normalise_column_names(self, events: Table) -> QTable: f"Required column {c} is missing." ) - events = QTable(events[keep_columns], copy=COPY_IF_NEEDED) - events.rename_columns(rename_from, rename_to) + renamed_events = QTable(events, copy=COPY_IF_NEEDED) + renamed_events.rename_columns(rename_from, rename_to) + return renamed_events + + def keep_necessary_columns_only(self, events: Table) -> QTable: + keep_columns, _, _ = self.get_columns_keep_rename_scheme(events, True) + for c in keep_columns: + if c not in events.colnames: + raise ValueError( + f"Required column {c} is missing from the events table." + ) + + return QTable(events[keep_columns], copy=COPY_IF_NEEDED) + + def make_derived_columns( + self, + events: QTable, + location_subarray: EarthLocation, + atmosphere_density_profile: AtmosphereDensityProfile = None, + ) -> QTable: + """ + This function compute all the derived columns necessary either to irf production or DL3 file + """ + + events["gh_score"].unit = u.dimensionless_unscaled + + if self.irf_pre_processing: + frame_subarray = AltAz() + else: + frame_subarray = AltAz( + location=location_subarray, obstime=Time(events["time"]) + ) + reco = SkyCoord( + alt=events["reco_alt"], + az=events["reco_az"], + frame=frame_subarray, + ) + + if self.irf_pre_processing or self.optional_dl3_columns: + pointing = SkyCoord( + alt=events["pointing_alt"], + az=events["pointing_az"], + frame=frame_subarray, + ) + nominal = NominalFrame(origin=pointing) + reco_nominal = reco.transform_to(nominal) + events["reco_fov_lon"] = u.Quantity(-reco_nominal.fov_lon) # minus for GADF + events["reco_fov_lat"] = u.Quantity(reco_nominal.fov_lat) + events["reco_source_fov_offset"] = calculate_source_fov_offset( + events, prefix="reco" + ) + if not self.irf_pre_processing: + events[ + "reco_source_fov_position_angle" + ] = calculate_source_fov_position_angle(events, prefix="reco") + + if self.irf_pre_processing: + events["weight"] = ( + 1.0 * u.dimensionless_unscaled + ) # defer calculation of proper weights to later + events["theta"] = calculate_theta( + events, + assumed_source_az=events["true_az"], + assumed_source_alt=events["true_alt"], + ) + events["true_source_fov_offset"] = calculate_source_fov_offset( + events, prefix="true" + ) + else: + reco_icrs = reco.transform_to(ICRS()) + events["reco_ra"] = u.Quantity(reco_icrs.ra) + events["reco_dec"] = u.Quantity(reco_icrs.dec) + if self.optional_dl3_columns: + reco_gal = reco_icrs.transform_to(Galactic()) + events["reco_glon"] = u.Quantity(reco_gal.l) + events["reco_glat"] = u.Quantity(reco_gal.b) + + events["multiplicity"] = np.sum(events["tels_with_trigger"], axis=1) + + events["reco_core_uncert"] = np.sqrt( + events["reco_core_uncert_x"] ** 2 + + events["reco_core_uncert_y"] ** 2 + ) + + try: + events[ + "reco_x_max" + ] = atmosphere_density_profile.slant_depth_from_height( + events["reco_h_max"], np.pi * u.rad / 2 - events["reco_alt"] + ).to(u.g / u.cm / u.cm) + uncert_up = ( + atmosphere_density_profile.slant_depth_from_height( + events["reco_h_max"] + events["reco_h_max_uncert"], + np.pi * u.rad / 2 - events["reco_alt"], + ) + - events["reco_x_max"] + ) + uncert_down = events[ + "reco_x_max" + ] - atmosphere_density_profile.slant_depth_from_height( + events["reco_h_max"] + events["reco_h_max_uncert"], + np.pi * u.rad / 2 - events["reco_alt"], + ) + events["reco_x_max_uncert"] = ((uncert_up + uncert_down) / 2.0).to( + u.g / u.cm / u.cm + ) + except Exception as e: + if self.raise_error_for_optional: + raise e + else: + events["reco_x_max"] = np.nan * u.g / u.cm / u.cm + events["reco_x_max_uncert"] = np.nan * u.g / u.cm / u.cm + return events - def make_empty_table(self) -> QTable: + def make_empty_table(self, columns_to_use: list[str]) -> QTable: """ This function defines the columns later functions expect to be present in the event table. @@ -124,6 +361,11 @@ def make_empty_table(self) -> QTable: columns = [ Column(name="obs_id", dtype=np.uint64, description="Observation block ID"), Column(name="event_id", dtype=np.uint64, description="Array event ID"), + Time( + val=[], + scale="tai", + format="mjd", + ), Column( name="true_energy", unit=u.TeV, @@ -144,6 +386,11 @@ def make_empty_table(self) -> QTable: unit=u.TeV, description="Reconstructed energy", ), + Column( + name="reco_energy_uncert", + unit=u.TeV, + description="Uncertainty on the reconstructed energy", + ), Column( name="reco_az", unit=u.deg, @@ -154,6 +401,31 @@ def make_empty_table(self) -> QTable: unit=u.deg, description="Reconstructed altitude", ), + Column( + name="reco_dir_uncert", + unit=u.deg, + description="Uncertainty on the reconstructed direction", + ), + Column( + name="reco_ra", + unit=u.deg, + description="Reconstructed direction, Right Ascension (ICRS)", + ), + Column( + name="reco_dec", + unit=u.deg, + description="Reconstructed direction, Declination (ICRS)", + ), + Column( + name="reco_glon", + unit=u.deg, + description="Reconstructed direction, galactic longitude", + ), + Column( + name="reco_glat", + unit=u.deg, + description="Reconstructed direction, galactic latitude", + ), Column( name="reco_fov_lat", unit=u.deg, @@ -176,11 +448,21 @@ def make_empty_table(self) -> QTable: unit=u.deg, description="Simulated angular offset from pointing direction", ), + Column( + name="true_source_fov_position_angle", + unit=u.deg, + description="Simulated angular position angle from pointing direction", + ), Column( name="reco_source_fov_offset", unit=u.deg, description="Reconstructed angular offset from pointing direction", ), + Column( + name="reco_source_fov_position_angle", + unit=u.deg, + description="Reconstructed angular position angle from pointing direction", + ), Column( name="gh_score", unit=u.dimensionless_unscaled, @@ -193,9 +475,71 @@ def make_empty_table(self) -> QTable: unit=u.dimensionless_unscaled, description="Event weight", ), + Column( + name="multiplicity", + description="Number of telescopes used for the reconstruction", + ), + Column( + name="reco_core_x", + unit=u.m, + description="Reconstructed position of the core of the shower, x coordinate", + ), + Column( + name="reco_core_y", + unit=u.m, + description="Reconstructed position of the core of the shower, y coordinate", + ), + Column( + name="reco_core_uncert", + unit=u.m, + description="Uncertainty on the reconstructed position of the core of the shower", + ), + Column( + name="reco_h_max", + unit=u.m, + description="Reconstructed altitude of the maximum of the shower", + ), + Column( + name="reco_h_max_uncert", + unit=u.m, + description="Uncertainty on the reconstructed altitude of the maximum of the shower", + ), + Column( + name="reco_x_max", + unit=u.g / u.cm / u.cm, + description="Reconstructed maximum of the shower", + ), + Column( + name="reco_x_max_uncert", + unit=u.g / u.cm / u.cm, + description="Uncertainty on the maximum of the shower", + ), ] - return QTable(columns) + # Rearrange in a dict, easier for searching after + columns_dict = {} + for i in range(len(columns)): + if type(columns[i]) is Time: + columns_dict["time"] = columns[i] + else: + columns_dict[columns[i].name] = columns[i] + + # Select only the necessary columns + columns_for_keep = [] + index_time_column = -1 + for i, c in enumerate(columns_to_use): + if c in columns_dict.keys(): + columns_for_keep.append(columns_dict[c]) + if c == "time": + index_time_column = i + else: + raise ValueError(f"Missing columns definition for {c}") + + empty_table = QTable(columns_for_keep) + if index_time_column >= 0: + empty_table.rename_column("col" + str(index_time_column), "time") + + return empty_table class EventLoader(Component): @@ -206,93 +550,238 @@ class EventLoader(Component): classes = [EventPreprocessor] - def __init__(self, file: Path, target_spectrum: Spectra, **kwargs): + def __init__( + self, + file: Path, + target_spectrum: Spectra = None, + quality_selection_only: bool = True, + **kwargs, + ): super().__init__(**kwargs) - self.epp = EventPreprocessor(parent=self) - self.target_spectrum = SPECTRA[target_spectrum] + self.epp = EventPreprocessor( + quality_selection_only=quality_selection_only, parent=self + ) + if target_spectrum is not None: + self.target_spectrum = SPECTRA[target_spectrum] + else: + self.target_spectrum = None self.file = file - - def load_preselected_events( - self, chunk_size: int, obs_time: u.Quantity - ) -> tuple[QTable, int, dict]: - opts = dict(dl2=True, simulated=True, observation_info=True) - with TableLoader(self.file, parent=self, **opts) as load: - header = self.epp.make_empty_table() - sim_info, spectrum = self.get_simulation_information(load, obs_time) - meta = {"sim_info": sim_info, "spectrum": spectrum} + self.opts_loader = dict(dl2=True, simulated=True, observation_info=True) + + def load_preselected_events(self, chunk_size: int) -> tuple[QTable, int, dict]: + # Load atmosphere density profile + atmosphere_density_profile = None + if self.epp.optional_dl3_columns: + try: + with EventSource(self.file) as source: + atmosphere_density_profile = source.atmosphere_density_profile + except Exception: + self.log.warning("Unable to read atmosphere density profile") + + # Load event + with TableLoader(self.file, parent=self, **self.opts_loader) as load: + keep_columns, _, _ = self.epp.get_columns_keep_rename_scheme(None, True) + header = self.epp.make_empty_table(keep_columns) bits = [header] - n_raw_events = 0 - for _, _, events in load.read_subarray_events_chunked(chunk_size, **opts): - selected = events[self.epp.quality_query.get_table_mask(events)] - selected = self.epp.normalise_column_names(selected) - selected = self.make_derived_columns(selected) + for _, _, events in load.read_subarray_events_chunked( + chunk_size, **self.opts_loader + ): + events = self.epp.normalise_column_names(events) + + events = self.epp.make_derived_columns( + events, load.subarray.reference_location, atmosphere_density_profile + ) + events = self.epp.event_selection.calculate_selection(events) + selected = events[events["selected"]] + selected = self.epp.keep_necessary_columns_only(selected) bits.append(selected) - n_raw_events += len(events) bits.append(header) # Putting it last ensures the correct metadata is used table = vstack(bits, join_type="exact", metadata_conflicts="silent") - return table, n_raw_events, meta + return table - def get_simulation_information( - self, loader: TableLoader, obs_time: u.Quantity - ) -> tuple[SimulatedEventsInfo, PowerLaw]: - sim = loader.read_simulation_configuration() - try: - show = loader.read_shower_distribution() - except NoSuchNodeError: - # Fall back to using the run header - show = Table([sim["n_showers"]], names=["n_entries"], dtype=[np.int64]) - - for itm in ["spectral_index", "energy_range_min", "energy_range_max"]: - if len(np.unique(sim[itm])) > 1: - raise NotImplementedError( - f"Unsupported: '{itm}' differs across simulation runs" + def get_observation_information(self) -> Dict[str, Any]: + """ + Retrieve all useful information on the observation for DL3 production + """ + with TableLoader(self.file, parent=self, **self.opts_loader) as load: + # Extract telescope location + meta = {} + meta["location"] = load.subarray.reference_location + + # Read observation information + obs_info_table = load.read_observation_information() + schedule_info_table = load.read_scheduling_blocks() + obs_all_info_table = join_allow_empty( + obs_info_table, schedule_info_table, "sb_id", "inner" + ) + if len(obs_all_info_table) == 0: + self.log.error("No observation information available in the DL2 file") + raise ValueError("Missing observation information in the DL2 file") + elif len(obs_all_info_table) != len(obs_info_table): + self.log.error( + "Scheduling blocks are missing for some observation block" + ) + raise ValueError( + "Scheduling blocks are missing for some observation block" ) - sim_info = SimulatedEventsInfo( - n_showers=show["n_entries"].sum(), - energy_min=sim["energy_range_min"].quantity[0], - energy_max=sim["energy_range_max"].quantity[0], - max_impact=sim["max_scatter_range"].quantity[0], - spectral_index=sim["spectral_index"][0], - viewcone_max=sim["max_viewcone_radius"].quantity[0], - viewcone_min=sim["min_viewcone_radius"].quantity[0], - ) + # Check for obs ids + if len(np.unique(obs_all_info_table["obs_id"])): + self.log.warning( + "Multiple observations included in the DL2 file, only the id of the first observation will be reported in the DL3 file, data from all observations will be included" + ) + meta["obs_id"] = np.unique(obs_all_info_table["obs_id"])[0] + + # Extract GTI + list_gti = [] + mask_nan = np.isnan(obs_all_info_table["actual_duration"]) + if np.sum(mask_nan) > 0: + self.log.warning("Duration of the run is nan, replaced with zero") + obs_all_info_table["actual_duration"][mask_nan] = 0.0 * u.s + for i in range(len(obs_all_info_table)): + start_time = Time(obs_all_info_table["actual_start_time"][i]).tai + stop_time = start_time + TimeDelta( + obs_all_info_table["actual_duration"][i] + * obs_all_info_table["actual_duration"].unit + ) + list_gti.append((start_time, stop_time)) + meta["gti"] = list_gti - return sim_info, PowerLaw.from_simulation(sim_info, obstime=obs_time) + # Dead time fraction + self.log.warning( + "Dead time fraction is not read from the file, a default value of 1 is used" + ) + meta["dead_time_fraction"] = 1.0 + + # Extract pointing information + if len(np.unique(obs_all_info_table["pointing_mode"])) != 1: + self.log.error("Multiple pointing mode are not supported") + raise ValueError("Multiple pointing mode are not supported") + meta["pointing"] = {} + meta["pointing"]["pointing_mode"] = PointingMode( + obs_all_info_table["pointing_mode"][0] + ).name + list_pointing = [] + for i in range(len(obs_all_info_table)): + if ( + CoordinateFrameType( + obs_all_info_table["subarray_pointing_frame"][i] + ).name + == "ALTAZ" + ): + pointing_start = AltAz( + alt=obs_all_info_table["subarray_pointing_lat"][i] + * obs_all_info_table["subarray_pointing_lat"].unit, + az=obs_all_info_table["subarray_pointing_lon"][i] + * obs_all_info_table["subarray_pointing_lon"].unit, + location=meta["location"], + obstime=meta["gti"][i][0], + ) + pointing_stop = AltAz( + alt=obs_all_info_table["subarray_pointing_lat"][i] + * obs_all_info_table["subarray_pointing_lat"].unit, + az=obs_all_info_table["subarray_pointing_lon"][i] + * obs_all_info_table["subarray_pointing_lon"].unit, + location=meta["location"], + obstime=meta["gti"][i][1], + ) + elif ( + CoordinateFrameType( + obs_all_info_table["subarray_pointing_frame"][i] + ).name + == "ICRS" + ): + pointing_start = ICRS( + dec=obs_all_info_table["subarray_pointing_lat"][i] + * obs_all_info_table["subarray_pointing_lat"].unit, + ra=obs_all_info_table["subarray_pointing_lon"][i] + * obs_all_info_table["subarray_pointing_lon"].unit, + ) + pointing_stop = pointing_start + elif ( + CoordinateFrameType( + obs_all_info_table["subarray_pointing_frame"][i] + ).name + == "GALACTIC" + ): + pointing_start = Galactic( + b=obs_all_info_table["subarray_pointing_lat"][i] + * obs_all_info_table["subarray_pointing_lat"].unit, + l=obs_all_info_table["subarray_pointing_lon"][i] + * obs_all_info_table["subarray_pointing_lon"].unit, + ) + pointing_stop = pointing_start + list_pointing.append((meta["gti"][i][0], pointing_start)) + list_pointing.append((meta["gti"][i][1], pointing_stop)) + meta["pointing"]["pointing_list"] = list_pointing + + # Telescope information + self.log.warning( + "Name of organisation, array and subarray are not read from the file and are default value" + ) + meta["telescope_information"] = { + "organisation": "UNKNOWN", + "array": "UNKNOWN", + "subarray": "UNKNOWN", + "telescope_list": np.array( + load.subarray.get_tel_ids(load.subarray.tel) + ), + } + + # Target information + self.log.warning( + "Name of the target, coordinates and observer are not read from the file and are default value" + ) + meta["target"] = { + "observer": "UNKNOWN", + "object_name": "UNKNOWN", + "object_coordinate": ICRS(np.nan * u.deg, np.nan * u.deg), + } + + # Software information + self.log.warning("Software version are unknown") + meta["software_version"] = { + "analysis_version": "ctapipe " + ctapipe_version, + "calibration_version": "UNKNOWN", + "dst_version": "UNKNOWN", + } + + return meta - def make_derived_columns(self, events: QTable) -> QTable: - events["weight"] = ( - 1.0 * u.dimensionless_unscaled - ) # defer calculation of proper weights to later - events["gh_score"].unit = u.dimensionless_unscaled - events["theta"] = calculate_theta( - events, - assumed_source_az=events["true_az"], - assumed_source_alt=events["true_alt"], - ) - events["true_source_fov_offset"] = calculate_source_fov_offset( - events, prefix="true" - ) - events["reco_source_fov_offset"] = calculate_source_fov_offset( - events, prefix="reco" - ) + def get_simulation_information( + self, obs_time: u.Quantity + ) -> tuple[SimulatedEventsInfo, PowerLaw]: + with TableLoader(self.file, parent=self, **self.opts_loader) as load: + sim = load.read_simulation_configuration() + try: + show = load.read_shower_distribution() + except NoSuchNodeError: + # Fall back to using the run header + show = Table([sim["n_showers"]], names=["n_entries"], dtype=[np.int64]) + + for itm in ["spectral_index", "energy_range_min", "energy_range_max"]: + if len(np.unique(sim[itm])) > 1: + raise NotImplementedError( + f"Unsupported: '{itm}' differs across simulation runs" + ) + + sim_info = SimulatedEventsInfo( + n_showers=show["n_entries"].sum(), + energy_min=sim["energy_range_min"].quantity[0], + energy_max=sim["energy_range_max"].quantity[0], + max_impact=sim["max_scatter_range"].quantity[0], + spectral_index=sim["spectral_index"][0], + viewcone_max=sim["max_viewcone_radius"].quantity[0], + viewcone_min=sim["min_viewcone_radius"].quantity[0], + ) - altaz = AltAz() - pointing = SkyCoord( - alt=events["pointing_alt"], az=events["pointing_az"], frame=altaz - ) - reco = SkyCoord( - alt=events["reco_alt"], - az=events["reco_az"], - frame=altaz, - ) - nominal = NominalFrame(origin=pointing) - reco_nominal = reco.transform_to(nominal) - events["reco_fov_lon"] = u.Quantity(-reco_nominal.fov_lon) # minus for GADF - events["reco_fov_lat"] = u.Quantity(reco_nominal.fov_lat) - return events + meta = { + "sim_info": sim_info, + "spectrum": PowerLaw.from_simulation(sim_info, obstime=obs_time), + } + return meta def make_event_weights( self, @@ -301,6 +790,11 @@ def make_event_weights( kind: str, fov_offset_bins: u.Quantity | None = None, ) -> QTable: + if self.target_spectrum is None: + raise Exception( + "No spectrum is defined, need a spectrum for events weighting" + ) + if ( kind == "gammas" and self.target_spectrum.normalization.unit.is_equivalent( diff --git a/src/ctapipe/irf/tests/test_benchmarks.py b/src/ctapipe/irf/tests/test_benchmarks.py index 8264003c9ee..4dbd2f6a37f 100644 --- a/src/ctapipe/irf/tests/test_benchmarks.py +++ b/src/ctapipe/irf/tests/test_benchmarks.py @@ -87,23 +87,17 @@ def test_make_2d_sensitivity( from ctapipe.irf.tests.test_irfs import _check_boundaries_in_hdu gamma_loader = EventLoader( - config=irf_event_loader_test_config, file=gamma_diffuse_full_reco_file, target_spectrum=Spectra.CRAB_HEGRA, + config=irf_event_loader_test_config, ) - gamma_events, _, _ = gamma_loader.load_preselected_events( - chunk_size=10000, - obs_time=u.Quantity(50, u.h), - ) + gamma_events = gamma_loader.load_preselected_events(chunk_size=10000) proton_loader = EventLoader( - config=irf_event_loader_test_config, file=proton_full_reco_file, target_spectrum=Spectra.IRFDOC_PROTON_SPECTRUM, + config=irf_event_loader_test_config, ) - proton_events, _, _ = proton_loader.load_preselected_events( - chunk_size=10000, - obs_time=u.Quantity(50, u.h), - ) + proton_events = proton_loader.load_preselected_events(chunk_size=10000) sens_maker = Sensitivity2dMaker( fov_offset_n_bins=3, diff --git a/src/ctapipe/irf/tests/test_preprocessing.py b/src/ctapipe/irf/tests/test_preprocessing.py index 71cedc22ade..8daa90ebe23 100644 --- a/src/ctapipe/irf/tests/test_preprocessing.py +++ b/src/ctapipe/irf/tests/test_preprocessing.py @@ -67,14 +67,13 @@ def test_event_loader(gamma_diffuse_full_reco_file, irf_event_loader_test_config from ctapipe.irf import EventLoader, Spectra loader = EventLoader( - config=irf_event_loader_test_config, file=gamma_diffuse_full_reco_file, target_spectrum=Spectra.CRAB_HEGRA, + config=irf_event_loader_test_config, ) - events, count, meta = loader.load_preselected_events( - chunk_size=10000, - obs_time=u.Quantity(50, u.h), - ) + events = loader.load_preselected_events(chunk_size=10000) + count = len(events) + meta = loader.get_simulation_information(obs_time=u.Quantity(50, u.h)) columns = [ "obs_id", diff --git a/src/ctapipe/resources/compute_irf.yaml b/src/ctapipe/resources/compute_irf.yaml index 806f73d160b..2fed8528371 100644 --- a/src/ctapipe/resources/compute_irf.yaml +++ b/src/ctapipe/resources/compute_irf.yaml @@ -23,7 +23,7 @@ EventPreprocessor: geometry_reconstructor: "HillasReconstructor" gammaness_classifier: "RandomForestClassifier" - EventQualityQuery: + EventQualitySelection: quality_criteria: - ["multiplicity 4", "np.count_nonzero(HillasReconstructor_telescopes,axis=1) >= 4"] - ["valid classifier", "RandomForestClassifier_is_valid"] diff --git a/src/ctapipe/resources/optimize_cuts.yaml b/src/ctapipe/resources/optimize_cuts.yaml index 2639b1f8373..056b17ad238 100644 --- a/src/ctapipe/resources/optimize_cuts.yaml +++ b/src/ctapipe/resources/optimize_cuts.yaml @@ -17,7 +17,7 @@ EventPreprocessor: geometry_reconstructor: "HillasReconstructor" gammaness_classifier: "RandomForestClassifier" - EventQualityQuery: + EventQualitySelection: quality_criteria: - ["multiplicity 4", "np.count_nonzero(HillasReconstructor_telescopes,axis=1) >= 4"] - ["valid classifier", "RandomForestClassifier_is_valid"] diff --git a/src/ctapipe/tools/compute_irf.py b/src/ctapipe/tools/compute_irf.py index f9465293ec7..740125b91a8 100644 --- a/src/ctapipe/tools/compute_irf.py +++ b/src/ctapipe/tools/compute_irf.py @@ -21,6 +21,7 @@ from ..core.traits import AstroQuantity, Bool, Integer, classes_with_traits, flag from ..irf import ( EventLoader, + EventPreprocessor, OptimizationResult, Spectra, check_bins_in_range, @@ -30,13 +31,13 @@ EnergyBiasResolutionMakerBase, SensitivityMakerBase, ) +from ..irf.cuts import EventQualitySelection from ..irf.irfs import ( BackgroundRateMakerBase, EffectiveAreaMakerBase, EnergyDispersionMakerBase, PSFMakerBase, ) -from ..irf.preprocessing import EventQualityQuery __all__ = ["IrfTool"] @@ -231,6 +232,10 @@ def setup(self): """ Initialize components from config and load g/h (and theta) cuts. """ + + # Force the preprocessing for IRF + EventPreprocessor.irf_pre_processing = True + self.opt_result = OptimizationResult.read(self.cuts_file) if ( self.spatial_selection_applied @@ -471,34 +476,34 @@ def start(self): ) if ( - loader.epp.quality_query.quality_criteria - != self.opt_result.quality_query.quality_criteria + loader.epp.event_selection.quality_criteria + != self.opt_result.quality_selection.quality_criteria ): self.log.warning( "Quality criteria are different from quality criteria used for " "calculating g/h / theta cuts. Provided quality criteria:\n%s. " "\nUsing the same quality criteria as g/h / theta cuts:\n%s. " % ( - loader.epp.quality_query.to_table(functions=True)[ + loader.epp.event_selection.to_table(functions=True)[ "criteria", "func" ], - self.opt_result.quality_query.to_table(functions=True)[ + self.opt_result.quality_selection.to_table(functions=True)[ "criteria", "func" ], ) ) - loader.epp.quality_query = EventQualityQuery( + loader.epp.event_selection = EventQualitySelection( parent=loader, - quality_criteria=self.opt_result.quality_query.quality_criteria, + quality_criteria=self.opt_result.quality_selection.quality_criteria, ) self.log.debug( "%s Quality criteria: %s" - % (particle_type, loader.epp.quality_query.quality_criteria) - ) - events, count, meta = loader.load_preselected_events( - self.chunk_size, self.obs_time + % (particle_type, loader.epp.event_selection.quality_criteria) ) + events = loader.load_preselected_events(self.chunk_size) + count = len(events) + meta = loader.get_simulation_information(obs_time=u.Quantity(50, u.h)) # Only calculate event weights if background or sensitivity should be calculated. if self.do_background: # Sensitivity is only calculated, if do_background is true diff --git a/src/ctapipe/tools/create_dl3.py b/src/ctapipe/tools/create_dl3.py new file mode 100644 index 00000000000..e8933826d98 --- /dev/null +++ b/src/ctapipe/tools/create_dl3.py @@ -0,0 +1,153 @@ +from astropy.io import fits + +from ctapipe.core import Tool, traits +from ctapipe.core.traits import Bool, Integer, classes_with_traits, flag + +from ..irf import EventLoader, EventPreprocessor +from ..irf.cuts import EventSelection +from ..irf.dl3 import DL3_GADF, DL3_Format + +__all__ = ["DL3Tool"] + + +class DL3Tool(Tool): + name = "ctapipe-create-dl3" + description = "Create DL3 file from DL2 observation file" + + dl2_file = traits.Path( + allow_none=False, + directory_ok=False, + exists=True, + help="DL2 input filename and path.", + ).tag(config=True) + + output_file = traits.Path( + allow_none=False, + directory_ok=False, + help="Output file", + ).tag(config=True) + + irfs_file = traits.Path( + allow_none=False, + directory_ok=False, + exists=True, + help="Path to the IRFs describing the observation", + ).tag(config=True) + + chunk_size = Integer( + default_value=100000, + allow_none=True, + help="How many subarray events to load at once while selecting.", + ).tag(config=True) + + optional_dl3_columns = Bool( + default_value=False, help="If true add optional columns to produce file" + ).tag(config=True) + + raise_error_for_optional = Bool( + default_value=True, + help="If true will raise error in the case optional column are missing", + ).tag(config=True) + + # Which classes are registered for configuration + classes = ( + [ + EventLoader, + ] + + classes_with_traits(EventSelection) + + classes_with_traits(EventPreprocessor) + ) + + aliases = { + "cuts": "EventSelection.cuts_file", + "dl2-file": "DL3Tool.dl2_file", + "irfs-file": "DL3Tool.irfs_file", + "output": "DL3Tool.output_file", + "chunk-size": "DL3Tool.chunk_size", + } + + flags = { + **flag( + "optional-columns", + "DL3Tool.optional_dl3_columns", + "Add optional columns for events in the DL3 file", + "Do not add optional column for events in the DL3 file", + ), + **flag( + "raise-error-for-optional", + "DL3Tool.raise_error_for_optional", + "Raise an error if an optional column is missing", + "Only display a warning if an optional column is missing, it will lead to optional columns missing in the DL3 file", + ), + } + + def setup(self): + """ + Initialize components from config and load g/h (and theta) cuts. + """ + + # Setting preprocessing for DL3 + EventPreprocessor.irf_pre_processing = False + EventPreprocessor.optional_dl3_columns = self.optional_dl3_columns + EventPreprocessor.raise_error_for_optional = self.raise_error_for_optional + + # Setting the GADF format object + DL3_Format.optional_dl3_columns = self.optional_dl3_columns + DL3_Format.raise_error_for_optional = self.raise_error_for_optional + DL3_Format.overwrite = self.overwrite + + self.dl3_format = DL3_GADF() + + def start(self): + self.log.info("Loading events from DL2") + self.event_loader = EventLoader( + parent=self, file=self.dl2_file, quality_selection_only=False + ) + self.dl3_format.events = self.event_loader.load_preselected_events( + self.chunk_size + ) + meta = self.event_loader.get_observation_information() + self.dl3_format.obs_id = meta["obs_id"] + self.dl3_format.pointing = meta["pointing"]["pointing_list"] + self.dl3_format.pointing_mode = meta["pointing"]["pointing_mode"] + self.dl3_format.gti = meta["gti"] + self.dl3_format.dead_time_fraction = meta["dead_time_fraction"] + + self.dl3_format.location = meta["location"] + self.dl3_format.telescope_information = meta["telescope_information"] + self.dl3_format.target_information = meta["target"] + self.dl3_format.software_information = meta["software_version"] + + self.log.info("Loading IRFs") + hdu_irfs = fits.open(self.irfs_file, checksum=True) + for i in range(1, len(hdu_irfs)): + if "HDUCLAS2" in hdu_irfs[i].header.keys(): + if hdu_irfs[i].header["HDUCLAS2"] == "EFF_AREA": + if self.dl3_format.aeff is None: + self.dl3_format.aeff = hdu_irfs[i] + elif "EXTNAME" in hdu_irfs[i].header and not ( + "PROTONS" in hdu_irfs[i].header["EXTNAME"] + or "ELECTRONS" in hdu_irfs[i].header["EXTNAME"] + ): + self.dl3_format.aeff = hdu_irfs[i] + elif hdu_irfs[i].header["HDUCLAS2"] == "EDISP": + self.dl3_format.edisp = hdu_irfs[i] + elif hdu_irfs[i].header["HDUCLAS2"] == "PSF": + self.dl3_format.psf = hdu_irfs[i] + elif hdu_irfs[i].header["HDUCLAS2"] == "BKG": + self.dl3_format.bkg = hdu_irfs[i] + + self.log.info("Writing DL3 File") + self.dl3_format.write_file(self.output_file) + + def finish(self): + pass + + +def main(): + tool = DL3Tool() + tool.run() + + +if __name__ == "main": + main() diff --git a/src/ctapipe/tools/optimize_event_selection.py b/src/ctapipe/tools/optimize_event_selection.py index e2066fa77e1..71cd5e08476 100644 --- a/src/ctapipe/tools/optimize_event_selection.py +++ b/src/ctapipe/tools/optimize_event_selection.py @@ -3,10 +3,11 @@ import astropy.units as u from astropy.table import vstack +from ctapipe.irf.optimize.algorithm import CutOptimizerBase + from ..core import Provenance, Tool, traits from ..core.traits import AstroQuantity, Integer, classes_with_traits -from ..irf import EventLoader, Spectra -from ..irf.optimize import CutOptimizerBase +from ..irf import EventLoader, EventPreprocessor, Spectra __all__ = ["EventSelectionOptimizer"] @@ -108,6 +109,10 @@ def setup(self): """ Initialize components from config. """ + + # Force the preprocessing for IRF + EventPreprocessor.irf_pre_processing = True + self.optimizer = CutOptimizerBase.from_name( self.optimization_algorithm, parent=self ) @@ -147,9 +152,9 @@ def start(self): """ reduced_events = dict() for particle_type, loader in self.event_loaders.items(): - events, count, meta = loader.load_preselected_events( - self.chunk_size, self.obs_time - ) + events = loader.load_preselected_events(self.chunk_size) + count = len(events) + meta = loader.get_simulation_information(obs_time=u.Quantity(50, u.h)) if self.optimizer.needs_background: events = loader.make_event_weights( events, @@ -205,7 +210,7 @@ def start(self): self.result = self.optimizer( events={"signal": self.signal_events, "background": self.background_events}, # identical quality_query for all particle types - quality_query=self.event_loaders["gammas"].epp.quality_query, + quality_query=self.event_loaders["gammas"].epp.event_selection, clf_prefix=self.event_loaders["gammas"].epp.gammaness_classifier, ) diff --git a/src/ctapipe/tools/process.py b/src/ctapipe/tools/process.py index 7ae3d86e52e..fd5a034d7e7 100644 --- a/src/ctapipe/tools/process.py +++ b/src/ctapipe/tools/process.py @@ -265,7 +265,7 @@ def _write_processing_statistics(self): reconstructor_names, reconstructors ): write_table( - reconstructor.quality_query.to_table(functions=True), + reconstructor.quality_selection.to_table(functions=True), self.write.output_path, f"/dl2/service/tel_event_statistics/{reconstructor_name}", append=True, diff --git a/src/ctapipe/tools/tests/test_compute_irf.py b/src/ctapipe/tools/tests/test_compute_irf.py index acd35a50aaf..d6d838dbd63 100644 --- a/src/ctapipe/tools/tests/test_compute_irf.py +++ b/src/ctapipe/tools/tests/test_compute_irf.py @@ -244,7 +244,7 @@ def test_irf_tool_wrong_cuts( "energy_reconstructor": "ExtraTreesRegressor", "geometry_reconstructor": "HillasReconstructor", "gammaness_classifier": "ExtraTreesClassifier", - "EventQualityQuery": { + "EventQualitySelection": { "quality_criteria": [ # No criteria for minimum event multiplicity ("valid classifier", "ExtraTreesClassifier_is_valid"), diff --git a/src/ctapipe/tools/tests/test_optimize_event_selection.py b/src/ctapipe/tools/tests/test_optimize_event_selection.py index cb787653a0a..6a581cf5dee 100644 --- a/src/ctapipe/tools/tests/test_optimize_event_selection.py +++ b/src/ctapipe/tools/tests/test_optimize_event_selection.py @@ -4,7 +4,8 @@ import pytest from astropy.table import QTable -from ctapipe.core import QualityQuery, run_tool +from ctapipe.core import run_tool +from ctapipe.irf.cuts import EventQualitySelection pytest.importorskip("pyirf") @@ -36,7 +37,7 @@ def test_cuts_optimization( result = OptimizationResult.read(output_path) assert isinstance(result, OptimizationResult) - assert isinstance(result.quality_query, QualityQuery) + assert isinstance(result.quality_selection, EventQualitySelection) assert isinstance(result.valid_energy, ResultValidRange) assert isinstance(result.valid_offset, ResultValidRange) assert isinstance(result.gh_cuts, QTable)