From 14a7638ce067441b26a4b1a1ef84c52af4670882 Mon Sep 17 00:00:00 2001 From: Lukas Beiske Date: Fri, 19 Jan 2024 15:44:22 +0100 Subject: [PATCH 01/22] Add component for aggregating dl1 features --- src/ctapipe/containers.py | 9 +- src/ctapipe/image/__init__.py | 3 +- src/ctapipe/image/statistics.py | 145 +++++++++++++++++++++++++++++++- 3 files changed, 153 insertions(+), 4 deletions(-) diff --git a/src/ctapipe/containers.py b/src/ctapipe/containers.py index 1a76d141a1d..60009aba96c 100644 --- a/src/ctapipe/containers.py +++ b/src/ctapipe/containers.py @@ -410,13 +410,16 @@ class MorphologyContainer(Container): n_large_islands = Field(-1, "Number of > 50 pixel islands") -class StatisticsContainer(Container): +class BaseStatisticsContainer(Container): """Store descriptive statistics""" max = Field(np.float32(nan), "value of pixel with maximum intensity") min = Field(np.float32(nan), "value of pixel with minimum intensity") mean = Field(np.float32(nan), "mean intensity") std = Field(np.float32(nan), "standard deviation of intensity") + + +class StatisticsContainer(BaseStatisticsContainer): skewness = Field(nan, "skewness of intensity") kurtosis = Field(nan, "kurtosis of intensity") @@ -522,6 +525,10 @@ class DL1Container(Container): default_factory=partial(Map, DL1CameraContainer), description="map of tel_id to DL1CameraContainer", ) + aggregate = Field( + default_factory=partial(Map, BaseStatisticsContainer), + description="map of image parameter to aggregation statistics", + ) class DL1CameraCalibrationContainer(Container): diff --git a/src/ctapipe/image/__init__.py b/src/ctapipe/image/__init__.py index 61799ac01a5..04d083dad21 100644 --- a/src/ctapipe/image/__init__.py +++ b/src/ctapipe/image/__init__.py @@ -60,7 +60,7 @@ neg_log_likelihood_numeric, ) from .reducer import DataVolumeReducer, NullDataVolumeReducer, TailCutsDataVolumeReducer -from .statistics import descriptive_statistics +from .statistics import FeatureAggregator, descriptive_statistics from .timing import timing_parameters __all__ = [ @@ -119,4 +119,5 @@ "TailCutsDataVolumeReducer", "InvalidPixelHandler", "NeighborAverage", + "FeatureAggregator", ] diff --git a/src/ctapipe/image/statistics.py b/src/ctapipe/image/statistics.py index ee8ddbd8aed..cd50b2419a5 100644 --- a/src/ctapipe/image/statistics.py +++ b/src/ctapipe/image/statistics.py @@ -1,9 +1,19 @@ +import astropy.units as u import numpy as np +from astropy.stats import circmean, circstd +from astropy.table import Table from numba import njit -from ..containers import StatisticsContainer +from ..containers import ( + ArrayEventContainer, + BaseStatisticsContainer, + StatisticsContainer, +) +from ..core import Component +from ..core.traits import List, Tuple, Unicode +from ..vectorization import max_ufunc, min_ufunc, weighted_mean_ufunc -__all__ = ["descriptive_statistics", "skewness", "kurtosis"] +__all__ = ["descriptive_statistics", "skewness", "kurtosis", "FeatureAggregator"] @njit(cache=True) @@ -92,3 +102,134 @@ def descriptive_statistics( skewness=skewness(values, mean=mean, std=std), kurtosis=kurtosis(values, mean=mean, std=std), ) + + +class FeatureAggregator(Component): + """Array-event-wise aggregation of image parameters.""" + + image_parameters = List( + Tuple(Unicode(), Unicode()), + default_value=[], + help=( + "List of 2-Tuples of Strings: ('prefix', 'feature'). " + "The image parameter to be aggregated is 'prefix_feature'." + ), + ).tag(config=True) + + def __call__(self, event: ArrayEventContainer) -> None: + """Fill event container with aggregated image parameters.""" + for prefix, feature in self.image_parameters: + values = [] + unit = None + for tel_id in event.dl1.tel.keys(): + value = event.dl1.tel[tel_id].parameters[prefix][feature] + if isinstance(value, u.Quantity): + if not unit: + unit = value.unit + value = value.to_value(unit) + + valid = value >= 0 if prefix == "morphology" else ~np.isnan(value) + if valid: + values.append(value) + + if len(values) > 0: + if feature.endswith(("psi", "phi")): + mean = circmean( + u.Quantity(values, unit, copy=False).to_value(u.rad) + ) + std = circstd(u.Quantity(values, unit, copy=False).to_value(u.rad)) + else: + mean = np.mean(values) + std = np.std(values) + + # Use the same dtype for all columns, independent of the dtype + # of the aggregated image parameter, since `_mean` and `_std` + # requiere floats anyway. + max = np.float64(np.max(values)) + min = np.float64(np.min(values)) + else: + max = np.nan + min = np.nan + mean = np.nan + std = np.nan + + if unit: + max = u.Quantity(max, unit, copy=False) + min = u.Quantity(min, unit, copy=False) + if feature.endswith(("psi", "phi")): + mean = u.Quantity(mean, u.rad, copy=False).to(unit) + std = u.Quantity(std, u.rad, copy=False).to(unit) + else: + mean = u.Quantity(mean, unit, copy=False) + std = u.Quantity(std, unit, copy=False) + + event.dl1.aggregate[prefix + "_" + feature] = BaseStatisticsContainer( + max=max, min=min, mean=mean, std=std, prefix=prefix + "_" + feature + ) + + def aggregate_table(self, mono_parameters: Table) -> dict[str, Table]: + """ + Construct table containing aggregated image parameters + from table of telescope events. + """ + agg_tables = {} + for prefix, feature in self.image_parameters: + col = prefix + "_" + feature + unit = mono_parameters[col].quantity.unit + if prefix == "morphology": + valid = mono_parameters[col] >= 0 + else: + valid = ~np.isnan(mono_parameters[col]) + + valid_parameters = mono_parameters[valid] + array_events, indices, multiplicity = np.unique( + mono_parameters["obs_id", "event_id"], + return_inverse=True, + return_counts=True, + ) + agg_table = Table(array_events) + for colname in ("obs_id", "event_id"): + agg_table[colname].description = mono_parameters[colname].description + + n_array_events = len(array_events) + if len(valid_parameters) > 0: + mono_column = valid_parameters[col] + means = weighted_mean_ufunc( + mono_column, + np.array([1]), + n_array_events, + indices[valid], + ) + # FIXME: This has the same problem of strange NaNs as + # e.g. "energy_uncert" generated by the StereoMeanCombiner! + # Output is also incorrect, but I don't understand why... + vars = weighted_mean_ufunc( + (mono_column - np.repeat(means, multiplicity)[valid]) ** 2, + np.array([1]), + n_array_events, + indices[valid], + ) + max = max_ufunc( + mono_column, + n_array_events, + indices[valid], + ) + min = min_ufunc( + mono_column, + n_array_events, + indices[valid], + ) + else: + means = np.full(n_array_events, np.nan) + vars = np.full(n_array_events, np.nan) + max = np.full(n_array_events, np.nan) + min = np.full(n_array_events, np.nan) + + agg_table[col + "_max"] = u.Quantity(max, unit, copy=False) + agg_table[col + "_min"] = u.Quantity(min, unit, copy=False) + agg_table[col + "_mean"] = u.Quantity(means, unit, copy=False) + agg_table[col + "_std"] = u.Quantity(np.sqrt(vars), unit, copy=False) + + agg_tables[col] = agg_table + + return agg_tables From 4874ce4b939f9f6e29c586fb03f35cfa9309d90e Mon Sep 17 00:00:00 2001 From: Lukas Beiske Date: Fri, 19 Jan 2024 15:48:07 +0100 Subject: [PATCH 02/22] Refactor numpy vectorization functions into own module --- src/ctapipe/reco/stereo_combination.py | 45 ++------- src/ctapipe/vectorization/__init__.py | 3 + src/ctapipe/vectorization/aggregate.py | 128 +++++++++++++++++++++++++ 3 files changed, 138 insertions(+), 38 deletions(-) create mode 100644 src/ctapipe/vectorization/__init__.py create mode 100644 src/ctapipe/vectorization/aggregate.py diff --git a/src/ctapipe/reco/stereo_combination.py b/src/ctapipe/reco/stereo_combination.py index c7dbd00b910..881cd50302b 100644 --- a/src/ctapipe/reco/stereo_combination.py +++ b/src/ctapipe/reco/stereo_combination.py @@ -16,6 +16,7 @@ ReconstructedEnergyContainer, ReconstructedGeometryContainer, ) +from ..vectorization import weighted_mean_ufunc from .utils import add_defaults_and_meta _containers = { @@ -30,38 +31,6 @@ ] -def _grouped_add(tel_data, n_array_events, indices): - """ - Calculate the group-wise sum for each array event over the - corresponding telescope events. ``indices`` is an array - that gives the index of the subarray event for each telescope event, - returned by - ``np.unique(tel_events[["obs_id", "event_id"]], return_inverse=True)`` - """ - combined_values = np.zeros(n_array_events) - np.add.at(combined_values, indices, tel_data) - return combined_values - - -def _weighted_mean_ufunc(tel_values, weights, n_array_events, indices): - # avoid numerical problems by very large or small weights - weights = weights / weights.max() - sum_prediction = _grouped_add( - tel_values * weights, - n_array_events, - indices, - ) - sum_of_weights = _grouped_add( - weights, - n_array_events, - indices, - ) - mean = np.full(n_array_events, np.nan) - valid = sum_of_weights > 0 - mean[valid] = sum_prediction[valid] / sum_of_weights[valid] - return mean - - class StereoCombiner(Component): """ Base Class for algorithms combining telescope-wise predictions to common prediction. @@ -309,7 +278,7 @@ def predict_table(self, mono_predictions: Table) -> Table: if self.property is ReconstructionProperty.PARTICLE_TYPE: if len(valid_predictions) > 0: mono_predictions = valid_predictions[f"{prefix}_prediction"] - stereo_predictions = _weighted_mean_ufunc( + stereo_predictions = weighted_mean_ufunc( mono_predictions, weights, n_array_events, indices[valid] ) else: @@ -328,13 +297,13 @@ def predict_table(self, mono_predictions: Table) -> Table: if self.log_target: mono_energies = np.log(mono_energies) - stereo_energy = _weighted_mean_ufunc( + stereo_energy = weighted_mean_ufunc( mono_energies, weights, n_array_events, indices[valid], ) - variance = _weighted_mean_ufunc( + variance = weighted_mean_ufunc( (mono_energies - np.repeat(stereo_energy, multiplicity)[valid]) ** 2, weights, @@ -369,13 +338,13 @@ def predict_table(self, mono_predictions: Table) -> Table: # https://en.wikipedia.org/wiki/Von_Mises%E2%80%93Fisher_distribution#Mean_direction mono_x, mono_y, mono_z = coord.cartesian.get_xyz() - stereo_x = _weighted_mean_ufunc( + stereo_x = weighted_mean_ufunc( mono_x, weights, n_array_events, indices[valid] ) - stereo_y = _weighted_mean_ufunc( + stereo_y = weighted_mean_ufunc( mono_y, weights, n_array_events, indices[valid] ) - stereo_z = _weighted_mean_ufunc( + stereo_z = weighted_mean_ufunc( mono_z, weights, n_array_events, indices[valid] ) diff --git a/src/ctapipe/vectorization/__init__.py b/src/ctapipe/vectorization/__init__.py new file mode 100644 index 00000000000..149b6597f50 --- /dev/null +++ b/src/ctapipe/vectorization/__init__.py @@ -0,0 +1,3 @@ +from .aggregate import max_ufunc, min_ufunc, weighted_mean_ufunc + +__all__ = ["weighted_mean_ufunc", "max_ufunc", "min_ufunc"] diff --git a/src/ctapipe/vectorization/aggregate.py b/src/ctapipe/vectorization/aggregate.py new file mode 100644 index 00000000000..23ade314591 --- /dev/null +++ b/src/ctapipe/vectorization/aggregate.py @@ -0,0 +1,128 @@ +import numpy as np + +__all__ = ["weighted_mean_ufunc", "max_ufunc", "min_ufunc"] + + +def _grouped_add(tel_data, n_array_events, indices): + """ + Calculate the group-wise sum for each array event over the + corresponding telescope events. ``indices`` is an array + that gives the index of the subarray event for each telescope event, + returned by + ``np.unique(tel_events[["obs_id", "event_id"]], return_inverse=True)`` + """ + combined_values = np.zeros(n_array_events) + np.add.at(combined_values, indices, tel_data) + return combined_values + + +def weighted_mean_ufunc(tel_values, weights, n_array_events, indices): + """ + Calculate the weighted mean for each array event over the + corresponding telescope events. + + Parameters + ---------- + tel_values: np.ndarray + values for each telescope event + weights: np.ndarray + weights used for averaging + n_array_events: int + number of array events with corresponding telescope events in ``tel_values`` + indices: np.ndarray + index of the subarray event for each telescope event, returned by + ``np.unique(tel_events[["obs_id", "event_id"]], return_inverse=True)`` + + Returns + ------- + array: np.ndarray + weighted mean for each array event + """ + # avoid numerical problems by very large or small weights + weights = weights / weights.max() + sum_prediction = _grouped_add( + tel_values * weights, + n_array_events, + indices, + ) + sum_of_weights = _grouped_add( + weights, + n_array_events, + indices, + ) + mean = np.full(n_array_events, np.nan) + valid = sum_of_weights > 0 + mean[valid] = sum_prediction[valid] / sum_of_weights[valid] + return mean + + +def max_ufunc(tel_values, n_array_events, indices): + """ + Find the maximum value for each array event from the + corresponding telescope events. + + Parameters + ---------- + tel_values: np.ndarray + values for each telescope event + n_array_events: int + number of array events with corresponding telescope events in ``tel_values`` + indices: np.ndarray + index of the subarray event for each telescope event, returned by + ``np.unique(tel_events[["obs_id", "event_id"]], return_inverse=True)`` + + Returns + ------- + array: np.ndarray + maximum value for each array event + """ + if np.issubdtype(tel_values[0], np.integer): + fillvalue = np.iinfo(tel_values.dtype).min + elif np.issubdtype(tel_values[0], np.floating): + fillvalue = np.finfo(tel_values.dtype).min + else: + raise ValueError("Non-numerical dtypes are not supported") + + max_values = np.full(n_array_events, fillvalue) + np.maximum.at(max_values, indices, tel_values) + + result = np.full(n_array_events, np.nan) + valid = max_values > fillvalue + result[valid] = max_values[valid] + return result + + +def min_ufunc(tel_values, n_array_events, indices): + """ + Find the minimum value for each array event from the + corresponding telescope events. + + Parameters + ---------- + tel_values: np.ndarray + values for each telescope event + n_array_events: int + number of array events with corresponding telescope events in ``tel_values`` + indices: np.ndarray + index of the subarray event for each telescope event, returned by + ``np.unique(tel_events[["obs_id", "event_id"]], return_inverse=True)`` + + Returns + ------- + array: np.ndarray + minimum value for each array event + """ + if np.issubdtype(tel_values[0], np.integer): + fillvalue = np.iinfo(tel_values.dtype).max + elif np.issubdtype(tel_values[0], np.floating): + fillvalue = np.finfo(tel_values.dtype).max + else: + raise ValueError("Non-numerical dtypes are not supported") + + min_values = np.full(n_array_events, fillvalue) + np.minimum.at(min_values, indices, tel_values) + + result = np.full(n_array_events, np.nan) + valid = min_values < fillvalue + result[valid] = min_values[valid] + return result From 81f61045efcf3ecb03725b49dc528d9f833096b6 Mon Sep 17 00:00:00 2001 From: Lukas Beiske Date: Fri, 19 Jan 2024 15:52:40 +0100 Subject: [PATCH 03/22] Add dl1 aggregates to io; add separete tool; add it to ctapipe-process --- pyproject.toml | 1 + src/ctapipe/io/datawriter.py | 18 +++ src/ctapipe/io/tableloader.py | 47 +++++-- src/ctapipe/tools/aggregate_features.py | 175 ++++++++++++++++++++++++ src/ctapipe/tools/process.py | 19 ++- 5 files changed, 244 insertions(+), 16 deletions(-) create mode 100644 src/ctapipe/tools/aggregate_features.py diff --git a/pyproject.toml b/pyproject.toml index c0b8e248540..9791fc71113 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -101,6 +101,7 @@ ctapipe-train-energy-regressor = "ctapipe.tools.train_energy_regressor:main" ctapipe-train-particle-classifier = "ctapipe.tools.train_particle_classifier:main" ctapipe-train-disp-reconstructor = "ctapipe.tools.train_disp_reconstructor:main" ctapipe-apply-models = "ctapipe.tools.apply_models:main" +ctapipe-aggregate-image-parameters = "ctapipe.tools.aggregate_features:main" [project.entry-points.ctapipe_io] HDF5EventSource = "ctapipe.io.hdf5eventsource:HDF5EventSource" diff --git a/src/ctapipe/io/datawriter.py b/src/ctapipe/io/datawriter.py index eb353d43620..44ecad651c2 100644 --- a/src/ctapipe/io/datawriter.py +++ b/src/ctapipe/io/datawriter.py @@ -206,6 +206,11 @@ class DataWriter(Component): help="Store muon parameters if available", default_value=False ).tag(config=True) + write_dl1_aggregates = Bool( + help="Store array-event-wise aggregated DL1 image parameters if available", + default_value=False, + ).tag(config=True) + compression_level = Int( help="compression level, 0=None, 9=maximum", default_value=5, min=0, max=9 ).tag(config=True) @@ -338,6 +343,9 @@ def __call__(self, event: ArrayEventContainer): if self.write_muon_parameters: self._write_muon_telescope_events(event) + if self.write_dl1_aggregates: + self._write_dl1_aggregates(event) + def _write_constant_pointing(self, event): """ Write pointing configuration from event data assuming fixed pointing over the OB. @@ -723,6 +731,16 @@ def _write_muon_telescope_events(self, event: ArrayEventContainer): [tel_index, muon.ring, muon.parameters, muon.efficiency], ) + def _write_dl1_aggregates(self, event: ArrayEventContainer): + """ + Write array-event-wise aggregated DL1 image parameters. + """ + for feature_name, container in event.dl1.aggregate.items(): + self._writer.write( + table_name=f"dl1/event/aggregate/{feature_name}", + containers=[event.index, container], + ) + def _write_dl2_telescope_events(self, event: ArrayEventContainer): """ write per-telescope DL2 shower information. diff --git a/src/ctapipe/io/tableloader.py b/src/ctapipe/io/tableloader.py index 36a954fb7de..1576b7fa301 100644 --- a/src/ctapipe/io/tableloader.py +++ b/src/ctapipe/io/tableloader.py @@ -22,6 +22,7 @@ IMAGES_GROUP = "/dl1/event/telescope/images" MUON_GROUP = "/dl1/event/telescope/muon" TRIGGER_TABLE = "/dl1/event/subarray/trigger" +PARAMETER_AGGS_GROUP = "/dl1/event/aggregate" SHOWER_TABLE = "/simulation/event/subarray/shower" TRUE_IMAGES_GROUP = "/simulation/event/telescope/images" TRUE_PARAMETERS_GROUP = "/simulation/event/telescope/parameters" @@ -179,6 +180,9 @@ class TableLoader(Component): config=True ) dl1_muons = traits.Bool(False, help="load muon ring parameters").tag(config=True) + dl1_aggregates = traits.Bool( + False, help="load array-event-wise aggregated image parameters" + ).tag(config=True) dl2 = traits.Bool(True, help="load available dl2 stereo parameters").tag( config=True @@ -280,6 +284,7 @@ def _check_args(self, **kwargs): "dl1_parameters": PARAMETERS_GROUP, "dl1_images": IMAGES_GROUP, "dl1_muons": MUON_GROUP, + "dl1_aggregates": PARAMETER_AGGS_GROUP, "true_parameters": TRUE_PARAMETERS_GROUP, "true_images": TRUE_IMAGES_GROUP, "observation_info": OBSERVATION_TABLE, @@ -380,6 +385,7 @@ def read_subarray_events( self, start=None, stop=None, + dl1_aggregates=None, dl2=None, simulated=None, observation_info=None, @@ -390,6 +396,8 @@ def read_subarray_events( Parameters ---------- + dl1_aggregates: bool + load available aggregated dl1 image parameters dl2: bool load available dl2 stereo parameters simulated: bool @@ -407,10 +415,12 @@ def read_subarray_events( Table with primary index columns "obs_id" and "event_id". """ updated_args = self._check_args( + dl1_aggregates=dl1_aggregates, dl2=dl2, simulated=simulated, observation_info=observation_info, ) + dl1_aggregates = updated_args["dl1_aggregates"] dl2 = updated_args["dl2"] simulated = updated_args["simulated"] observation_info = updated_args["observation_info"] @@ -423,20 +433,29 @@ def read_subarray_events( showers = read_table(self.h5file, SHOWER_TABLE, start=start, stop=stop) table = _merge_subarray_tables(table, showers) - if dl2: - if DL2_SUBARRAY_GROUP in self.h5file: - for group_name in self.h5file.root[DL2_SUBARRAY_GROUP]._v_children: - group_path = f"{DL2_SUBARRAY_GROUP}/{group_name}" - group = self.h5file.root[group_path] - - for algorithm in group._v_children: - dl2 = read_table( - self.h5file, - f"{group_path}/{algorithm}", - start=start, - stop=stop, - ) - table = _merge_subarray_tables(table, dl2) + if dl2 and DL2_SUBARRAY_GROUP in self.h5file: + for group_name in self.h5file.root[DL2_SUBARRAY_GROUP]._v_children: + group_path = f"{DL2_SUBARRAY_GROUP}/{group_name}" + group = self.h5file.root[group_path] + + for algorithm in group._v_children: + dl2 = read_table( + self.h5file, + f"{group_path}/{algorithm}", + start=start, + stop=stop, + ) + table = _merge_subarray_tables(table, dl2) + + if dl1_aggregates and PARAMETER_AGGS_GROUP in self.h5file: + for group_name in self.h5file.root[PARAMETER_AGGS_GROUP]._v_children: + aggs = read_table( + self.h5file, + f"{PARAMETER_AGGS_GROUP}/{group_name}", + start=start, + stop=stop, + ) + table = _merge_subarray_tables(table, aggs) if observation_info: table = self._join_observation_info(table) diff --git a/src/ctapipe/tools/aggregate_features.py b/src/ctapipe/tools/aggregate_features.py new file mode 100644 index 00000000000..ab4e806e53f --- /dev/null +++ b/src/ctapipe/tools/aggregate_features.py @@ -0,0 +1,175 @@ +""" +Tool to aggregate DL1 image parameters array-event-wise. +""" +import numpy as np +import tables +from tqdm.auto import tqdm + +from ..core import Tool, ToolConfigurationError +from ..core.traits import Bool, Integer, Path, flag +from ..image import FeatureAggregator +from ..io import HDF5Merger, TableLoader, read_table, write_table +from ..io.tableloader import _join_subarray_events + +__all__ = ["AggregateFeatures"] + + +class AggregateFeatures(Tool): + """ + Aggregate DL1 image parameters array-event-wise. + + This tool calculates the maximal, minimal, and mean value, + as well as the standart deviation for any given DL1 image parameter + for all array events given as input. + """ + + name = "ctapipe-aggregate-image-parameters" + description = __doc__ + examples = """ + ctapipe-aggregate-image-parameters \\ + --input gamma.dl1.h5 \\ + --output gamma_incl_agg.dl1.h5 + """ + + input_path = Path( + default_value=None, + allow_none=False, + directory_ok=False, + exists=True, + help="Input file containing DL1 image parameters", + ).tag(config=True) + + output_path = Path( + default_value=None, + allow_none=False, + directory_ok=False, + help="Output file", + ).tag(config=True) + + chunk_size = Integer( + default_value=100000, + allow_none=True, + help="How many array events to load at once for making predictions", + ).tag(config=True) + + progress_bar = Bool( + help="Show progress bar during processing", + default_value=True, + ).tag(config=True) + + aliases = { + ("i", "input"): "AggregateFeatures.input_path", + ("o", "output"): "AggregateFeatures.output_path", + "chunk-size": "AggregateFeatures.chunk_size", + } + + flags = { + **flag( + "progress", + "AggregateFeatures.progress_bar", + "show a progress bar during event processing", + "don't show a progress bar during event processing", + ), + **flag( + "dl1-parameters", + "HDF5Merger.dl1_parameters", + "Include dl1 parameters in output", + "Exclude dl1 parameters in output", + ), + **flag( + "dl1-images", + "HDF5Merger.dl1_images", + "Include dl1 images in output", + "Exclude dl1 images in output", + ), + **flag( + "true-parameters", + "HDF5Merger.true_parameters", + "Include true parameters in output", + "Exclude true parameters in output", + ), + **flag( + "true-images", + "HDF5Merger.true_images", + "Include true images in output", + "Exclude true images in output", + ), + "overwrite": ( + { + "HDF5Merger": {"overwrite": True}, + "AggregateFeatures": {"overwrite": True}, + }, + "Overwrite output file if it exists", + ), + } + + classes = [TableLoader, FeatureAggregator] + + def setup(self): + """Initilize components form config.""" + self.check_output(self.output_path) + self.log.info("Copying to output destination.") + with HDF5Merger(self.output_path, parent=self) as merger: + merger(self.input_path) + + self.h5file = self.enter_context(tables.open_file(self.output_path, mode="r+")) + self.loader = self.enter_context( + TableLoader( + self.input_path, + parent=self, + ) + ) + self.aggregator = FeatureAggregator(parent=self) + if len(self.aggregator.image_parameters) == 0: + raise ToolConfigurationError( + "No image parameters to aggregate are specified." + ) + + def start(self): + """Aggregate DL1 image parameters for input tables.""" + chunk_iterator = self.loader.read_telescope_events_chunked( + self.chunk_size, + dl2=False, + simulated=False, + true_parameters=False, + ) + bar = tqdm( + chunk_iterator, + desc="Aggregating parameters", + unit=" Array Events", + total=chunk_iterator.n_total, + disable=not self.progress_bar, + ) + with bar: + for chunk, (start, stop, table) in enumerate(chunk_iterator): + self.log.debug("Aggregating for chunk %d", chunk) + agg_tables = self.aggregator.aggregate_table(table) + + # to ensure events are stored in the correct order, + # we resort to trigger table order + trigger = read_table( + self.h5file, "/dl1/event/subarray/trigger", start=start, stop=stop + )[["obs_id", "event_id"]] + trigger["__sort_index__"] = np.arange(len(trigger)) + + for colname, agg_table in agg_tables.items(): + agg_table = _join_subarray_events(trigger, agg_table) + agg_table.sort("__sort_index__") + del agg_table["__sort_index__"] + + write_table( + agg_table, + self.output_path, + f"/dl1/event/aggregate/{colname}", + append=True, + ) + + bar.update(stop - start) + + +def main(): + AggregateFeatures().run() + + +if __name__ == "__main__": + main() diff --git a/src/ctapipe/tools/process.py b/src/ctapipe/tools/process.py index dc0123c5d08..42187048546 100644 --- a/src/ctapipe/tools/process.py +++ b/src/ctapipe/tools/process.py @@ -7,9 +7,9 @@ from tqdm.auto import tqdm from ..calib import CameraCalibrator, GainSelector -from ..core import QualityQuery, Tool +from ..core import QualityQuery, Tool, ToolConfigurationError from ..core.traits import Bool, classes_with_traits, flag -from ..image import ImageCleaner, ImageModifier, ImageProcessor +from ..image import FeatureAggregator, ImageCleaner, ImageModifier, ImageProcessor from ..image.extractor import ImageExtractor from ..image.muon import MuonProcessor from ..instrument import SoftwareTrigger @@ -137,6 +137,12 @@ class ProcessorTool(Tool): "store DL1/Event/Telescope muon parameters in output", "don't store DL1/Event/Telescope muon parameters in output", ), + **flag( + "aggregate-dl1-image-parameters", + "DataWriter.write_dl1_aggregates", + "store array-event-wise aggregated DL1 image parameters in output", + "don't store array-event-wise aggregated DL1 image parameters in output", + ), "camera-frame": ( {"ImageProcessor": {"use_telescope_frame": False}}, "Use camera frame for image parameters instead of telescope frame", @@ -153,6 +159,7 @@ class ProcessorTool(Tool): metadata.Instrument, metadata.Contact, SoftwareTrigger, + FeatureAggregator, ] + classes_with_traits(EventSource) + classes_with_traits(ImageCleaner) @@ -184,6 +191,7 @@ def setup(self): self.calibrate = CameraCalibrator(parent=self, subarray=subarray) self.process_images = ImageProcessor(subarray=subarray, parent=self) self.process_shower = ShowerProcessor(subarray=subarray, parent=self) + self.aggregate = FeatureAggregator(parent=self) self.write = self.enter_context( DataWriter(event_source=self.event_source, parent=self) ) @@ -319,6 +327,13 @@ def start(self): if self.should_compute_dl2: self.process_shower(event) + if self.write.write_dl1_aggregates: + if len(self.aggregate.image_parameters) == 0: + raise ToolConfigurationError( + "No DL1 image parameters to aggregate are specified." + ) + self.aggregate(event) + self.write(event) def finish(self): From d7b86502aa3a309d8227a06d758dd32486907182 Mon Sep 17 00:00:00 2001 From: Lukas Beiske Date: Fri, 19 Jan 2024 16:13:23 +0100 Subject: [PATCH 04/22] Add changelog --- docs/changes/2497.feature.rst | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changes/2497.feature.rst diff --git a/docs/changes/2497.feature.rst b/docs/changes/2497.feature.rst new file mode 100644 index 00000000000..f9af5ecbb85 --- /dev/null +++ b/docs/changes/2497.feature.rst @@ -0,0 +1,5 @@ +Add the ``FeatureAggregator`` component and the ``ctapipe-aggregate-image-parameters`` tool +for aggregating the telescope-wise dl1 image parameters for each array event. +This functionality is also added to ``ctapipe-process``. + +Refactor helper functions for vectorized numpy calculations into new ``ctapipe.vectorization`` module. From 7072b100a9f6d89e4167a1ef713440a9f76dba12 Mon Sep 17 00:00:00 2001 From: Lukas Beiske Date: Fri, 19 Jan 2024 16:15:51 +0100 Subject: [PATCH 05/22] Add BaseStatisticsContainer to __all__ --- src/ctapipe/containers.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/ctapipe/containers.py b/src/ctapipe/containers.py index 60009aba96c..dc906e34e66 100644 --- a/src/ctapipe/containers.py +++ b/src/ctapipe/containers.py @@ -55,6 +55,7 @@ "TimingParametersContainer", "TriggerContainer", "WaveformCalibrationContainer", + "BaseStatisticsContainer", "StatisticsContainer", "IntensityStatisticsContainer", "PeakTimeStatisticsContainer", From c49cd532c65829ae3d9fd8e7d4fe653063bb744b Mon Sep 17 00:00:00 2001 From: Lukas Beiske Date: Fri, 19 Jan 2024 16:29:26 +0100 Subject: [PATCH 06/22] Add module docstrings; do not overwrite python built-ins --- src/ctapipe/image/statistics.py | 37 +++++++++++++++----------- src/ctapipe/vectorization/aggregate.py | 1 + 2 files changed, 22 insertions(+), 16 deletions(-) diff --git a/src/ctapipe/image/statistics.py b/src/ctapipe/image/statistics.py index cd50b2419a5..d31b11c8a1c 100644 --- a/src/ctapipe/image/statistics.py +++ b/src/ctapipe/image/statistics.py @@ -1,3 +1,4 @@ +"""Calculating statistics of image parameters.""" import astropy.units as u import numpy as np from astropy.stats import circmean, circstd @@ -145,17 +146,17 @@ def __call__(self, event: ArrayEventContainer) -> None: # Use the same dtype for all columns, independent of the dtype # of the aggregated image parameter, since `_mean` and `_std` # requiere floats anyway. - max = np.float64(np.max(values)) - min = np.float64(np.min(values)) + max_value = np.float64(np.max(values)) + min_value = np.float64(np.min(values)) else: - max = np.nan - min = np.nan + max_value = np.nan + min_value = np.nan mean = np.nan std = np.nan if unit: - max = u.Quantity(max, unit, copy=False) - min = u.Quantity(min, unit, copy=False) + max_value = u.Quantity(max_value, unit, copy=False) + min_value = u.Quantity(min_value, unit, copy=False) if feature.endswith(("psi", "phi")): mean = u.Quantity(mean, u.rad, copy=False).to(unit) std = u.Quantity(std, u.rad, copy=False).to(unit) @@ -164,7 +165,11 @@ def __call__(self, event: ArrayEventContainer) -> None: std = u.Quantity(std, unit, copy=False) event.dl1.aggregate[prefix + "_" + feature] = BaseStatisticsContainer( - max=max, min=min, mean=mean, std=std, prefix=prefix + "_" + feature + max=max_value, + min=min_value, + mean=mean, + std=std, + prefix=prefix + "_" + feature, ) def aggregate_table(self, mono_parameters: Table) -> dict[str, Table]: @@ -203,32 +208,32 @@ def aggregate_table(self, mono_parameters: Table) -> dict[str, Table]: # FIXME: This has the same problem of strange NaNs as # e.g. "energy_uncert" generated by the StereoMeanCombiner! # Output is also incorrect, but I don't understand why... - vars = weighted_mean_ufunc( + variance = weighted_mean_ufunc( (mono_column - np.repeat(means, multiplicity)[valid]) ** 2, np.array([1]), n_array_events, indices[valid], ) - max = max_ufunc( + max_values = max_ufunc( mono_column, n_array_events, indices[valid], ) - min = min_ufunc( + min_values = min_ufunc( mono_column, n_array_events, indices[valid], ) else: means = np.full(n_array_events, np.nan) - vars = np.full(n_array_events, np.nan) - max = np.full(n_array_events, np.nan) - min = np.full(n_array_events, np.nan) + variance = np.full(n_array_events, np.nan) + max_values = np.full(n_array_events, np.nan) + min_values = np.full(n_array_events, np.nan) - agg_table[col + "_max"] = u.Quantity(max, unit, copy=False) - agg_table[col + "_min"] = u.Quantity(min, unit, copy=False) + agg_table[col + "_max"] = u.Quantity(max_values, unit, copy=False) + agg_table[col + "_min"] = u.Quantity(min_values, unit, copy=False) agg_table[col + "_mean"] = u.Quantity(means, unit, copy=False) - agg_table[col + "_std"] = u.Quantity(np.sqrt(vars), unit, copy=False) + agg_table[col + "_std"] = u.Quantity(np.sqrt(variance), unit, copy=False) agg_tables[col] = agg_table diff --git a/src/ctapipe/vectorization/aggregate.py b/src/ctapipe/vectorization/aggregate.py index 23ade314591..2027b61790c 100644 --- a/src/ctapipe/vectorization/aggregate.py +++ b/src/ctapipe/vectorization/aggregate.py @@ -1,3 +1,4 @@ +"""Helper functions for vectorizing numpy operations.""" import numpy as np __all__ = ["weighted_mean_ufunc", "max_ufunc", "min_ufunc"] From 07c3bdae7edd833e33d431843bbc6245992b8e0e Mon Sep 17 00:00:00 2001 From: Lukas Beiske Date: Tue, 23 Jan 2024 14:07:54 +0100 Subject: [PATCH 07/22] Update TableLoader --- src/ctapipe/io/tableloader.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/src/ctapipe/io/tableloader.py b/src/ctapipe/io/tableloader.py index 1576b7fa301..deb947b3356 100644 --- a/src/ctapipe/io/tableloader.py +++ b/src/ctapipe/io/tableloader.py @@ -22,7 +22,7 @@ IMAGES_GROUP = "/dl1/event/telescope/images" MUON_GROUP = "/dl1/event/telescope/muon" TRIGGER_TABLE = "/dl1/event/subarray/trigger" -PARAMETER_AGGS_GROUP = "/dl1/event/aggregate" +PARAMETER_AGGS_GROUP = "/dl1/event/subarray/aggregated_image_parameters" SHOWER_TABLE = "/simulation/event/subarray/shower" TRUE_IMAGES_GROUP = "/simulation/event/telescope/images" TRUE_PARAMETERS_GROUP = "/simulation/event/telescope/parameters" @@ -448,14 +448,13 @@ def read_subarray_events( table = _merge_subarray_tables(table, dl2) if dl1_aggregates and PARAMETER_AGGS_GROUP in self.h5file: - for group_name in self.h5file.root[PARAMETER_AGGS_GROUP]._v_children: - aggs = read_table( - self.h5file, - f"{PARAMETER_AGGS_GROUP}/{group_name}", - start=start, - stop=stop, - ) - table = _merge_subarray_tables(table, aggs) + aggs = read_table( + self.h5file, + PARAMETER_AGGS_GROUP, + start=start, + stop=stop, + ) + table = _merge_subarray_tables(table, aggs) if observation_info: table = self._join_observation_info(table) From 679fc60787b50d1b589593ecc13b2f67d9994625 Mon Sep 17 00:00:00 2001 From: Lukas Beiske Date: Wed, 24 Jan 2024 14:28:58 +0100 Subject: [PATCH 08/22] Update DataWriter --- src/ctapipe/io/datawriter.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/ctapipe/io/datawriter.py b/src/ctapipe/io/datawriter.py index 44ecad651c2..607d4823bf4 100644 --- a/src/ctapipe/io/datawriter.py +++ b/src/ctapipe/io/datawriter.py @@ -735,11 +735,10 @@ def _write_dl1_aggregates(self, event: ArrayEventContainer): """ Write array-event-wise aggregated DL1 image parameters. """ - for feature_name, container in event.dl1.aggregate.items(): - self._writer.write( - table_name=f"dl1/event/aggregate/{feature_name}", - containers=[event.index, container], - ) + self._writer.write( + table_name="dl1/event/subarray/aggregated_image_parameters", + containers=[event.index] + list(event.dl1.aggregate.values()), + ) def _write_dl2_telescope_events(self, event: ArrayEventContainer): """ From 4a442dcdfda9368c62af118b08be9d0f906ad0dc Mon Sep 17 00:00:00 2001 From: Lukas Beiske Date: Wed, 24 Jan 2024 14:36:42 +0100 Subject: [PATCH 09/22] Move collect_features into vectorization module --- src/ctapipe/reco/preprocessing.py | 63 +------------------ src/ctapipe/reco/sklearn.py | 3 +- src/ctapipe/reco/tests/test_preprocessing.py | 40 ------------ src/ctapipe/vectorization/collect.py | 63 +++++++++++++++++++ .../vectorization/tests/test_collect.py | 38 +++++++++++ 5 files changed, 104 insertions(+), 103 deletions(-) create mode 100644 src/ctapipe/vectorization/collect.py create mode 100644 src/ctapipe/vectorization/tests/test_collect.py diff --git a/src/ctapipe/reco/preprocessing.py b/src/ctapipe/reco/preprocessing.py index eb887ea3906..e7ddf9ec668 100644 --- a/src/ctapipe/reco/preprocessing.py +++ b/src/ctapipe/reco/preprocessing.py @@ -11,19 +11,16 @@ import astropy.units as u import numpy as np from astropy.coordinates import AltAz -from astropy.table import QTable, Table +from astropy.table import Table from numpy.lib.recfunctions import structured_to_unstructured from ctapipe.coordinates import MissingFrameAttributeWarning, TelescopeFrame -from ..containers import ArrayEventContainer - LOG = logging.getLogger(__name__) __all__ = [ "check_valid_rows", - "collect_features", "table_to_float", "table_to_X", "horizontal_to_telescope", @@ -67,64 +64,6 @@ def table_to_X(table: Table, features: list[str], log=LOG): return X, valid -def collect_features( - event: ArrayEventContainer, tel_id: int, subarray_table=None -) -> Table: - """Loop over all containers with features. - - Parameters - ---------- - event : ArrayEventContainer - The event container from which to collect the features - tel_id : int - The telscope id for which to collect the features - subarray_table : Table - The subarray as "to_table("joined")", to be added to the features. - - Returns - ------- - Table - """ - features = {} - - features.update(event.trigger.as_dict(recursive=False, flatten=True)) - - features.update( - event.dl1.tel[tel_id].parameters.as_dict( - add_prefix=True, - recursive=True, - flatten=True, - ) - ) - - features.update( - event.dl2.tel[tel_id].as_dict( - add_prefix=True, - recursive=True, - flatten=True, - add_key=False, # prefix is already the map key for dl2 stuff - ) - ) - - features.update( - event.dl2.stereo.as_dict( - add_prefix=True, - recursive=True, - flatten=True, - add_key=False, # prefix is already the map key for dl2 stuff - ) - ) - - if subarray_table is not None: - # to include units in features - if not isinstance(subarray_table, QTable): - subarray_table = QTable(subarray_table, copy=False) - - features.update(subarray_table.loc[tel_id]) - - return Table({k: [v] for k, v in features.items()}) - - @u.quantity_input(alt=u.deg, az=u.deg, pointing_alt=u.deg, pointing_az=u.deg) def horizontal_to_telescope(alt, az, pointing_alt, pointing_az): """Transform coordinates from horizontal coordinates into TelescopeFrame""" diff --git a/src/ctapipe/reco/sklearn.py b/src/ctapipe/reco/sklearn.py index b9005a462fe..5dfef407f1c 100644 --- a/src/ctapipe/reco/sklearn.py +++ b/src/ctapipe/reco/sklearn.py @@ -37,7 +37,8 @@ traits, ) from ..io import write_table -from .preprocessing import collect_features, table_to_X, telescope_to_horizontal +from ..vectorization import collect_features +from .preprocessing import table_to_X, telescope_to_horizontal from .reconstructor import ReconstructionProperty, Reconstructor from .stereo_combination import StereoCombiner from .utils import add_defaults_and_meta diff --git a/src/ctapipe/reco/tests/test_preprocessing.py b/src/ctapipe/reco/tests/test_preprocessing.py index 861fb33404e..833fd5aaec4 100644 --- a/src/ctapipe/reco/tests/test_preprocessing.py +++ b/src/ctapipe/reco/tests/test_preprocessing.py @@ -62,43 +62,3 @@ def test_check_valid_rows(): valid = check_valid_rows(t, warn=False) assert_array_equal(valid, [True, True, True, False, False]) - - -def test_collect_features(example_event, example_subarray): - from ctapipe.calib import CameraCalibrator - from ctapipe.image import ImageProcessor - from ctapipe.reco import ShowerProcessor - from ctapipe.reco.preprocessing import collect_features - - event = example_event - subarray = example_subarray - - calib = CameraCalibrator(subarray) - image_processor = ImageProcessor(subarray) - shower_processor = ShowerProcessor(subarray) - - calib(event) - image_processor(event) - shower_processor(event) - - tel_id = next(iter(event.dl2.tel)) - tab = collect_features(event, tel_id=tel_id) - - k = "HillasReconstructor" - impact = event.dl2.tel[tel_id].impact[k] - assert tab[f"{k}_tel_impact_distance"].quantity[0] == impact.distance - - geometry = event.dl2.stereo.geometry[k] - assert tab[f"{k}_az"].quantity[0] == geometry.az - - hillas = event.dl1.tel[tel_id].parameters.hillas - assert tab["hillas_intensity"].quantity[0] == hillas.intensity - - leakage = event.dl1.tel[tel_id].parameters.leakage - assert tab["leakage_intensity_width_1"].quantity[0] == leakage.intensity_width_1 - - tab = collect_features( - event, tel_id=tel_id, subarray_table=subarray.to_table("joined") - ) - focal_length = subarray.tel[tel_id].optics.equivalent_focal_length - assert tab["equivalent_focal_length"].quantity[0] == focal_length diff --git a/src/ctapipe/vectorization/collect.py b/src/ctapipe/vectorization/collect.py new file mode 100644 index 00000000000..7751978d30f --- /dev/null +++ b/src/ctapipe/vectorization/collect.py @@ -0,0 +1,63 @@ +from astropy.table import QTable, Table + +from ..containers import ArrayEventContainer + +__all__ = ["collect_features"] + + +def collect_features( + event: ArrayEventContainer, tel_id: int, subarray_table=None +) -> Table: + """Loop over all containers with features. + + Parameters + ---------- + event : ArrayEventContainer + The event container from which to collect the features + tel_id : int + The telscope id for which to collect the features + subarray_table : Table + The subarray as "to_table("joined")", to be added to the features. + + Returns + ------- + Table + """ + features = {} + + features.update(event.trigger.as_dict(recursive=False, flatten=True)) + + features.update( + event.dl1.tel[tel_id].parameters.as_dict( + add_prefix=True, + recursive=True, + flatten=True, + ) + ) + + features.update( + event.dl2.tel[tel_id].as_dict( + add_prefix=True, + recursive=True, + flatten=True, + add_key=False, # prefix is already the map key for dl2 stuff + ) + ) + + features.update( + event.dl2.stereo.as_dict( + add_prefix=True, + recursive=True, + flatten=True, + add_key=False, # prefix is already the map key for dl2 stuff + ) + ) + + if subarray_table is not None: + # to include units in features + if not isinstance(subarray_table, QTable): + subarray_table = QTable(subarray_table, copy=False) + + features.update(subarray_table.loc[tel_id]) + + return Table({k: [v] for k, v in features.items()}) diff --git a/src/ctapipe/vectorization/tests/test_collect.py b/src/ctapipe/vectorization/tests/test_collect.py new file mode 100644 index 00000000000..7404778cd8d --- /dev/null +++ b/src/ctapipe/vectorization/tests/test_collect.py @@ -0,0 +1,38 @@ +def test_collect_features(example_event, example_subarray): + from ctapipe.calib import CameraCalibrator + from ctapipe.image import ImageProcessor + from ctapipe.reco import ShowerProcessor + from ctapipe.vectorization import collect_features + + event = example_event + subarray = example_subarray + + calib = CameraCalibrator(subarray) + image_processor = ImageProcessor(subarray) + shower_processor = ShowerProcessor(subarray) + + calib(event) + image_processor(event) + shower_processor(event) + + tel_id = next(iter(event.dl2.tel)) + tab = collect_features(event, tel_id=tel_id) + + k = "HillasReconstructor" + impact = event.dl2.tel[tel_id].impact[k] + assert tab[f"{k}_tel_impact_distance"].quantity[0] == impact.distance + + geometry = event.dl2.stereo.geometry[k] + assert tab[f"{k}_az"].quantity[0] == geometry.az + + hillas = event.dl1.tel[tel_id].parameters.hillas + assert tab["hillas_intensity"].quantity[0] == hillas.intensity + + leakage = event.dl1.tel[tel_id].parameters.leakage + assert tab["leakage_intensity_width_1"].quantity[0] == leakage.intensity_width_1 + + tab = collect_features( + event, tel_id=tel_id, subarray_table=subarray.to_table("joined") + ) + focal_length = subarray.tel[tel_id].optics.equivalent_focal_length + assert tab["equivalent_focal_length"].quantity[0] == focal_length From b7d4c9f099a7c5478c11c50aff603110c3685da4 Mon Sep 17 00:00:00 2001 From: Lukas Beiske Date: Wed, 24 Jan 2024 20:23:36 +0100 Subject: [PATCH 10/22] Add numba function to replace np.unique and other optimization of vectorization functions --- src/ctapipe/image/statistics.py | 162 ++++++++++-------------- src/ctapipe/reco/stereo_combination.py | 87 +++++++------ src/ctapipe/tools/aggregate_features.py | 33 ++--- src/ctapipe/tools/apply_models.py | 33 ++--- src/ctapipe/vectorization/__init__.py | 11 +- src/ctapipe/vectorization/aggregate.py | 157 +++++++++++++++++------ 6 files changed, 270 insertions(+), 213 deletions(-) diff --git a/src/ctapipe/image/statistics.py b/src/ctapipe/image/statistics.py index d31b11c8a1c..0c4690468ec 100644 --- a/src/ctapipe/image/statistics.py +++ b/src/ctapipe/image/statistics.py @@ -1,8 +1,7 @@ """Calculating statistics of image parameters.""" import astropy.units as u import numpy as np -from astropy.stats import circmean, circstd -from astropy.table import Table +from astropy.table import Table, vstack from numba import njit from ..containers import ( @@ -10,9 +9,15 @@ BaseStatisticsContainer, StatisticsContainer, ) -from ..core import Component +from ..core import Component, FeatureGenerator, QualityQuery from ..core.traits import List, Tuple, Unicode -from ..vectorization import max_ufunc, min_ufunc, weighted_mean_ufunc +from ..vectorization import ( + collect_features, + get_subarray_index, + max_ufunc, + min_ufunc, + weighted_mean_std_ufunc, +) __all__ = ["descriptive_statistics", "skewness", "kurtosis", "FeatureAggregator"] @@ -117,124 +122,95 @@ class FeatureAggregator(Component): ), ).tag(config=True) + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.feature_generator = FeatureGenerator(parent=self) + self.quality_query = QualityQuery(parent=self) + def __call__(self, event: ArrayEventContainer) -> None: """Fill event container with aggregated image parameters.""" - for prefix, feature in self.image_parameters: - values = [] - unit = None - for tel_id in event.dl1.tel.keys(): - value = event.dl1.tel[tel_id].parameters[prefix][feature] - if isinstance(value, u.Quantity): - if not unit: - unit = value.unit - value = value.to_value(unit) - - valid = value >= 0 if prefix == "morphology" else ~np.isnan(value) - if valid: - values.append(value) - - if len(values) > 0: - if feature.endswith(("psi", "phi")): - mean = circmean( - u.Quantity(values, unit, copy=False).to_value(u.rad) - ) - std = circstd(u.Quantity(values, unit, copy=False).to_value(u.rad)) - else: - mean = np.mean(values) - std = np.std(values) - - # Use the same dtype for all columns, independent of the dtype - # of the aggregated image parameter, since `_mean` and `_std` - # requiere floats anyway. - max_value = np.float64(np.max(values)) - min_value = np.float64(np.min(values)) + table = None + for tel_id in event.trigger.tels_with_trigger: + t = collect_features(event, tel_id) + t["obs_id"] = event.index.obs_id + t["event_id"] = event.index.event_id + if not table: + table = t else: - max_value = np.nan - min_value = np.nan - mean = np.nan - std = np.nan - - if unit: - max_value = u.Quantity(max_value, unit, copy=False) - min_value = u.Quantity(min_value, unit, copy=False) - if feature.endswith(("psi", "phi")): - mean = u.Quantity(mean, u.rad, copy=False).to(unit) - std = u.Quantity(std, u.rad, copy=False).to(unit) - else: - mean = u.Quantity(mean, unit, copy=False) - std = u.Quantity(std, unit, copy=False) - - event.dl1.aggregate[prefix + "_" + feature] = BaseStatisticsContainer( - max=max_value, - min=min_value, - mean=mean, - std=std, - prefix=prefix + "_" + feature, + table = vstack([table, t]) + + agg_table = self.aggregate_table(table) + for col in [ + prefix + "_" + feature for prefix, feature in self.image_parameters + ]: + event.dl1.aggregate[col] = BaseStatisticsContainer( + max=agg_table[col + "_max"], + min=agg_table[col + "_min"], + mean=agg_table[col + "_mean"], + std=agg_table[col + "_std"], + prefix=col, ) - def aggregate_table(self, mono_parameters: Table) -> dict[str, Table]: + def aggregate_table(self, mono_parameters: Table) -> Table: """ Construct table containing aggregated image parameters from table of telescope events. """ - agg_tables = {} + mono_parameters = self.feature_generator(mono_parameters) + passes_cuts = self.quality_query.get_table_mask(mono_parameters) + + obs_ids, event_ids, multiplicity, tel_to_array_indices = get_subarray_index( + mono_parameters + ) + n_array_events = len(obs_ids) + agg_table = Table({"obs_id": obs_ids, "event_id": event_ids}) + # copy metadata + for colname in ("obs_id", "event_id"): + agg_table[colname].description = mono_parameters[colname].description + for prefix, feature in self.image_parameters: + if feature in ("psi", "phi"): + raise NotImplementedError( + "Aggregating rotation angels or polar coordinates" + " is not supported." + ) + col = prefix + "_" + feature unit = mono_parameters[col].quantity.unit if prefix == "morphology": - valid = mono_parameters[col] >= 0 + valid = mono_parameters[col] >= 0 & passes_cuts else: - valid = ~np.isnan(mono_parameters[col]) + valid = ~np.isnan(mono_parameters[col]) & passes_cuts - valid_parameters = mono_parameters[valid] - array_events, indices, multiplicity = np.unique( - mono_parameters["obs_id", "event_id"], - return_inverse=True, - return_counts=True, - ) - agg_table = Table(array_events) - for colname in ("obs_id", "event_id"): - agg_table[colname].description = mono_parameters[colname].description - - n_array_events = len(array_events) - if len(valid_parameters) > 0: - mono_column = valid_parameters[col] - means = weighted_mean_ufunc( - mono_column, - np.array([1]), + if np.sum(valid) > 0: + means, stds = weighted_mean_std_ufunc( + mono_parameters[col], + valid, n_array_events, - indices[valid], - ) - # FIXME: This has the same problem of strange NaNs as - # e.g. "energy_uncert" generated by the StereoMeanCombiner! - # Output is also incorrect, but I don't understand why... - variance = weighted_mean_ufunc( - (mono_column - np.repeat(means, multiplicity)[valid]) ** 2, - np.array([1]), - n_array_events, - indices[valid], + tel_to_array_indices, + multiplicity, ) max_values = max_ufunc( - mono_column, + mono_parameters[col], + valid, n_array_events, - indices[valid], + tel_to_array_indices, ) min_values = min_ufunc( - mono_column, + mono_parameters[col], + valid, n_array_events, - indices[valid], + tel_to_array_indices, ) else: means = np.full(n_array_events, np.nan) - variance = np.full(n_array_events, np.nan) + stds = np.full(n_array_events, np.nan) max_values = np.full(n_array_events, np.nan) min_values = np.full(n_array_events, np.nan) agg_table[col + "_max"] = u.Quantity(max_values, unit, copy=False) agg_table[col + "_min"] = u.Quantity(min_values, unit, copy=False) agg_table[col + "_mean"] = u.Quantity(means, unit, copy=False) - agg_table[col + "_std"] = u.Quantity(np.sqrt(variance), unit, copy=False) - - agg_tables[col] = agg_table + agg_table[col + "_std"] = u.Quantity(stds, unit, copy=False) - return agg_tables + return agg_table diff --git a/src/ctapipe/reco/stereo_combination.py b/src/ctapipe/reco/stereo_combination.py index 881cd50302b..ad4c609f41c 100644 --- a/src/ctapipe/reco/stereo_combination.py +++ b/src/ctapipe/reco/stereo_combination.py @@ -16,7 +16,7 @@ ReconstructedEnergyContainer, ReconstructedGeometryContainer, ) -from ..vectorization import weighted_mean_ufunc +from ..vectorization import get_subarray_index, weighted_mean_std_ufunc from .utils import add_defaults_and_meta _containers = { @@ -256,30 +256,30 @@ def predict_table(self, mono_predictions: Table) -> Table: This means you might end up with less events if all telescope predictions of a shower are invalid. """ - prefix = f"{self.prefix}_tel" # TODO: Integrate table quality query once its done valid = mono_predictions[f"{prefix}_is_valid"] - valid_predictions = mono_predictions[valid] - array_events, indices, multiplicity = np.unique( - mono_predictions[["obs_id", "event_id"]], - return_inverse=True, - return_counts=True, + obs_ids, event_ids, multiplicity, tel_to_array_indices = get_subarray_index( + mono_predictions ) - stereo_table = Table(array_events) + n_array_events = len(obs_ids) + stereo_table = Table({"obs_id": obs_ids, "event_id": event_ids}) # copy metadata for colname in ("obs_id", "event_id"): stereo_table[colname].description = mono_predictions[colname].description - n_array_events = len(array_events) - weights = self._calculate_weights(valid_predictions) + weights = self._calculate_weights(mono_predictions[valid]) if self.property is ReconstructionProperty.PARTICLE_TYPE: - if len(valid_predictions) > 0: - mono_predictions = valid_predictions[f"{prefix}_prediction"] - stereo_predictions = weighted_mean_ufunc( - mono_predictions, weights, n_array_events, indices[valid] + if np.sum(valid) > 0: + stereo_predictions, _ = weighted_mean_std_ufunc( + mono_predictions[f"{prefix}_prediction"], + valid, + n_array_events, + tel_to_array_indices, + multiplicity, + weights=weights, ) else: stereo_predictions = np.full(n_array_events, np.nan) @@ -289,29 +289,21 @@ def predict_table(self, mono_predictions: Table) -> Table: stereo_table[f"{self.prefix}_goodness_of_fit"] = np.nan elif self.property is ReconstructionProperty.ENERGY: - if len(valid_predictions) > 0: - mono_energies = valid_predictions[f"{prefix}_energy"].quantity.to_value( + if np.sum(valid) > 0: + mono_energies = mono_predictions[f"{prefix}_energy"].quantity.to_value( u.TeV ) - if self.log_target: mono_energies = np.log(mono_energies) - stereo_energy = weighted_mean_ufunc( + stereo_energy, std = weighted_mean_std_ufunc( mono_energies, - weights, - n_array_events, - indices[valid], - ) - variance = weighted_mean_ufunc( - (mono_energies - np.repeat(stereo_energy, multiplicity)[valid]) - ** 2, - weights, + valid, n_array_events, - indices[valid], + tel_to_array_indices, + multiplicity, + weights=weights, ) - std = np.sqrt(variance) - if self.log_target: stereo_energy = np.exp(stereo_energy) std = np.exp(std) @@ -330,22 +322,37 @@ def predict_table(self, mono_predictions: Table) -> Table: stereo_table[f"{self.prefix}_goodness_of_fit"] = np.nan elif self.property is ReconstructionProperty.GEOMETRY: - if len(valid_predictions) > 0: + if np.sum(valid) > 0: coord = AltAz( - alt=valid_predictions[f"{prefix}_alt"], - az=valid_predictions[f"{prefix}_az"], + alt=mono_predictions[f"{prefix}_alt"], + az=mono_predictions[f"{prefix}_az"], ) # https://en.wikipedia.org/wiki/Von_Mises%E2%80%93Fisher_distribution#Mean_direction mono_x, mono_y, mono_z = coord.cartesian.get_xyz() - stereo_x = weighted_mean_ufunc( - mono_x, weights, n_array_events, indices[valid] + stereo_x, _ = weighted_mean_std_ufunc( + mono_x, + valid, + n_array_events, + tel_to_array_indices, + multiplicity, + weights=weights, ) - stereo_y = weighted_mean_ufunc( - mono_y, weights, n_array_events, indices[valid] + stereo_y, _ = weighted_mean_std_ufunc( + mono_y, + valid, + n_array_events, + tel_to_array_indices, + multiplicity, + weights=weights, ) - stereo_z = weighted_mean_ufunc( - mono_z, weights, n_array_events, indices[valid] + stereo_z, _ = weighted_mean_std_ufunc( + mono_z, + valid, + n_array_events, + tel_to_array_indices, + multiplicity, + weights=weights, ) mean_cartesian = CartesianRepresentation( @@ -382,7 +389,9 @@ def predict_table(self, mono_predictions: Table) -> Table: tel_ids = [[] for _ in range(n_array_events)] - for index, tel_id in zip(indices[valid], valid_predictions["tel_id"]): + for index, tel_id in zip( + tel_to_array_indices[valid], mono_predictions["tel_id"][valid] + ): tel_ids[index].append(tel_id) stereo_table[f"{self.prefix}_telescopes"] = tel_ids diff --git a/src/ctapipe/tools/aggregate_features.py b/src/ctapipe/tools/aggregate_features.py index ab4e806e53f..a8c1b91522d 100644 --- a/src/ctapipe/tools/aggregate_features.py +++ b/src/ctapipe/tools/aggregate_features.py @@ -1,15 +1,13 @@ """ Tool to aggregate DL1 image parameters array-event-wise. """ -import numpy as np import tables from tqdm.auto import tqdm from ..core import Tool, ToolConfigurationError from ..core.traits import Bool, Integer, Path, flag from ..image import FeatureAggregator -from ..io import HDF5Merger, TableLoader, read_table, write_table -from ..io.tableloader import _join_subarray_events +from ..io import HDF5Merger, TableLoader, write_table __all__ = ["AggregateFeatures"] @@ -129,7 +127,6 @@ def start(self): """Aggregate DL1 image parameters for input tables.""" chunk_iterator = self.loader.read_telescope_events_chunked( self.chunk_size, - dl2=False, simulated=False, true_parameters=False, ) @@ -143,27 +140,13 @@ def start(self): with bar: for chunk, (start, stop, table) in enumerate(chunk_iterator): self.log.debug("Aggregating for chunk %d", chunk) - agg_tables = self.aggregator.aggregate_table(table) - - # to ensure events are stored in the correct order, - # we resort to trigger table order - trigger = read_table( - self.h5file, "/dl1/event/subarray/trigger", start=start, stop=stop - )[["obs_id", "event_id"]] - trigger["__sort_index__"] = np.arange(len(trigger)) - - for colname, agg_table in agg_tables.items(): - agg_table = _join_subarray_events(trigger, agg_table) - agg_table.sort("__sort_index__") - del agg_table["__sort_index__"] - - write_table( - agg_table, - self.output_path, - f"/dl1/event/aggregate/{colname}", - append=True, - ) - + agg_table = self.aggregator.aggregate_table(table) + write_table( + agg_table, + self.output_path, + "/dl1/event/subarray/aggregated_image_parameters", + append=True, + ) bar.update(stop - start) diff --git a/src/ctapipe/tools/apply_models.py b/src/ctapipe/tools/apply_models.py index 6ec237006e0..91b19510d52 100644 --- a/src/ctapipe/tools/apply_models.py +++ b/src/ctapipe/tools/apply_models.py @@ -9,9 +9,8 @@ from ctapipe.core.tool import Tool from ctapipe.core.traits import Bool, Integer, List, Path, classes_with_traits, flag from ctapipe.io import HDF5Merger, TableLoader, write_table -from ctapipe.io.astropy_helpers import read_table +from ctapipe.io.astropy_helpers import join_allow_empty, read_table from ctapipe.io.tableio import TelListToMaskTransform -from ctapipe.io.tableloader import _join_subarray_events from ctapipe.reco import Reconstructor __all__ = [ @@ -218,6 +217,23 @@ def _apply(self, reconstructor, tel_tables, start, stop): def _combine(self, reconstructor, tel_tables, start, stop): stereo_table = vstack(list(tel_tables.values())) + # stacking the single telescope tables and joining + # potentially changes the order of the subarray events. + # to ensure events are stored in the correct order, + # we resort to trigger table order + trigger = read_table( + self.h5file, "/dl1/event/subarray/trigger", start=start, stop=stop + )[["obs_id", "event_id"]] + trigger["__sort_index__"] = np.arange(len(trigger)) + + stereo_table = join_allow_empty( + stereo_table, + trigger, + keys=["obs_id", "event_id"], + join_type="left", + ) + stereo_table.sort("__sort_index__") + combiner = reconstructor.stereo_combiner stereo_predictions = combiner.predict_table(stereo_table) del stereo_table @@ -230,19 +246,6 @@ def _combine(self, reconstructor, tel_tables, start, stop): stereo_predictions[c.name] = np.array([trafo(r) for r in c]) stereo_predictions[c.name].description = c.description - # stacking the single telescope tables and joining - # potentially changes the order of the subarray events. - # to ensure events are stored in the correct order, - # we resort to trigger table order - trigger = read_table( - self.h5file, "/dl1/event/subarray/trigger", start=start, stop=stop - )[["obs_id", "event_id"]] - trigger["__sort_index__"] = np.arange(len(trigger)) - - stereo_predictions = _join_subarray_events(trigger, stereo_predictions) - stereo_predictions.sort("__sort_index__") - del stereo_predictions["__sort_index__"] - write_table( stereo_predictions, self.output_path, diff --git a/src/ctapipe/vectorization/__init__.py b/src/ctapipe/vectorization/__init__.py index 149b6597f50..42eec6aeebb 100644 --- a/src/ctapipe/vectorization/__init__.py +++ b/src/ctapipe/vectorization/__init__.py @@ -1,3 +1,10 @@ -from .aggregate import max_ufunc, min_ufunc, weighted_mean_ufunc +from .aggregate import get_subarray_index, max_ufunc, min_ufunc, weighted_mean_std_ufunc +from .collect import collect_features -__all__ = ["weighted_mean_ufunc", "max_ufunc", "min_ufunc"] +__all__ = [ + "get_subarray_index", + "weighted_mean_std_ufunc", + "max_ufunc", + "min_ufunc", + "collect_features", +] diff --git a/src/ctapipe/vectorization/aggregate.py b/src/ctapipe/vectorization/aggregate.py index 2027b61790c..99ecca9b081 100644 --- a/src/ctapipe/vectorization/aggregate.py +++ b/src/ctapipe/vectorization/aggregate.py @@ -1,7 +1,73 @@ """Helper functions for vectorizing numpy operations.""" import numpy as np +from numba import njit, uint64 + +__all__ = ["get_subarray_index", "weighted_mean_std_ufunc", "max_ufunc", "min_ufunc"] + + +@njit +def _get_subarray_index(obs_ids, event_ids): + n_tel_events = len(obs_ids) + idx = np.zeros(n_tel_events, dtype=uint64) + current_idx = 0 + subarray_obs_index = [] + subarray_event_index = [] + multiplicities = [] + multiplicity = 0 + + if n_tel_events > 0: + subarray_obs_index.append(obs_ids[0]) + subarray_event_index.append(event_ids[0]) + multiplicity += 1 + + for i in range(1, n_tel_events): + if obs_ids[i] != obs_ids[i - 1] or event_ids[i] != event_ids[i - 1]: + # append to subarray events + multiplicities.append(multiplicity) + subarray_obs_index.append(obs_ids[i]) + subarray_event_index.append(event_ids[i]) + # reset state + current_idx += 1 + multiplicity = 0 + + multiplicity += 1 + idx[i] = current_idx + + # Append multiplicity of the last subarray event + if n_tel_events > 0: + multiplicities.append(multiplicity) + + return ( + np.asarray(subarray_obs_index), + np.asarray(subarray_event_index), + np.asarray(multiplicities), + idx, + ) + + +def get_subarray_index(tel_table): + """ + Get the obs_ids and event_ids of all subarray events contained + in a table of telescope events, their multiplicity and an array + giving the index of the subarray event for each telescope event. + This requires the telescope events to be SORTED by their corresponding + subarray events (meaning by ``["obs_id", "event_id"]``). + + Parameters + ---------- + tel_table: astropy.table.Table + table with telescope events as rows -__all__ = ["weighted_mean_ufunc", "max_ufunc", "min_ufunc"] + Returns + ------- + Tuple(np.ndarray, np.ndarray, np.ndarray, np.ndarray) + obs_ids of subarray events, event_ids of subarray events, + multiplicity of subarray events, index of the subarray event + for each telescope event + """ + obs_idx = tel_table["obs_id"] + event_idx = tel_table["event_id"] + return _get_subarray_index(obs_idx, event_idx) def _grouped_add(tel_data, n_array_events, indices): @@ -17,30 +83,45 @@ def _grouped_add(tel_data, n_array_events, indices): return combined_values -def weighted_mean_ufunc(tel_values, weights, n_array_events, indices): +def weighted_mean_std_ufunc( + tel_values, + valid_tel, + n_array_events, + indices, + multiplicity, + weights=np.array([1]), +): """ - Calculate the weighted mean for each array event over the - corresponding telescope events. + Calculate the weighted mean and standart deviation for each array event + over the corresponding telescope events. Parameters ---------- tel_values: np.ndarray values for each telescope event - weights: np.ndarray - weights used for averaging + valid_tel: array-like + boolean mask giving the valid values of ``tel_values`` n_array_events: int number of array events with corresponding telescope events in ``tel_values`` indices: np.ndarray - index of the subarray event for each telescope event, returned by - ``np.unique(tel_events[["obs_id", "event_id"]], return_inverse=True)`` + index of the subarray event for each telescope event, returned as + the fourth return value of ``get_subarray_index`` + multiplicity: np.ndarray + multiplicity of the subarray events in the same order as the order of + subarray events in ``indices`` + weights: np.ndarray + weights used for averaging (equal/no weights are used by default) Returns ------- - array: np.ndarray - weighted mean for each array event + Tuple(np.ndarray, np.ndarray) + weighted mean and standart deviation for each array event """ # avoid numerical problems by very large or small weights weights = weights / weights.max() + tel_values = tel_values[valid_tel] + indices = indices[valid_tel] + sum_prediction = _grouped_add( tel_values * weights, n_array_events, @@ -54,10 +135,18 @@ def weighted_mean_ufunc(tel_values, weights, n_array_events, indices): mean = np.full(n_array_events, np.nan) valid = sum_of_weights > 0 mean[valid] = sum_prediction[valid] / sum_of_weights[valid] - return mean + + sum_sq_residulas = _grouped_add( + (tel_values - np.repeat(mean, multiplicity)[valid_tel]) ** 2 * weights, + n_array_events, + indices, + ) + variance = np.full(n_array_events, np.nan) + variance[valid] = sum_sq_residulas[valid] / sum_of_weights[valid] + return mean, np.sqrt(variance) -def max_ufunc(tel_values, n_array_events, indices): +def max_ufunc(tel_values, valid_tel, n_array_events, indices): """ Find the maximum value for each array event from the corresponding telescope events. @@ -66,34 +155,29 @@ def max_ufunc(tel_values, n_array_events, indices): ---------- tel_values: np.ndarray values for each telescope event + valid_tel: array-like + boolean mask giving the valid values of ``tel_values`` n_array_events: int number of array events with corresponding telescope events in ``tel_values`` indices: np.ndarray - index of the subarray event for each telescope event, returned by - ``np.unique(tel_events[["obs_id", "event_id"]], return_inverse=True)`` + index of the subarray event for each telescope event, returned as + the fourth return value of ``get_subarray_index`` Returns ------- - array: np.ndarray + np.ndarray maximum value for each array event """ - if np.issubdtype(tel_values[0], np.integer): - fillvalue = np.iinfo(tel_values.dtype).min - elif np.issubdtype(tel_values[0], np.floating): - fillvalue = np.finfo(tel_values.dtype).min - else: - raise ValueError("Non-numerical dtypes are not supported") - - max_values = np.full(n_array_events, fillvalue) - np.maximum.at(max_values, indices, tel_values) + max_values = np.full(n_array_events, -np.inf) + np.maximum.at(max_values, indices[valid_tel], tel_values[valid_tel]) result = np.full(n_array_events, np.nan) - valid = max_values > fillvalue + valid = max_values > -np.inf result[valid] = max_values[valid] return result -def min_ufunc(tel_values, n_array_events, indices): +def min_ufunc(tel_values, valid_tel, n_array_events, indices): """ Find the minimum value for each array event from the corresponding telescope events. @@ -102,28 +186,23 @@ def min_ufunc(tel_values, n_array_events, indices): ---------- tel_values: np.ndarray values for each telescope event + valid_tel: array-like + boolean mask giving the valid values of ``tel_values`` n_array_events: int number of array events with corresponding telescope events in ``tel_values`` indices: np.ndarray - index of the subarray event for each telescope event, returned by - ``np.unique(tel_events[["obs_id", "event_id"]], return_inverse=True)`` + index of the subarray event for each telescope event, returned as + the fourth return value of ``get_subarray_index`` Returns ------- - array: np.ndarray + np.ndarray minimum value for each array event """ - if np.issubdtype(tel_values[0], np.integer): - fillvalue = np.iinfo(tel_values.dtype).max - elif np.issubdtype(tel_values[0], np.floating): - fillvalue = np.finfo(tel_values.dtype).max - else: - raise ValueError("Non-numerical dtypes are not supported") - - min_values = np.full(n_array_events, fillvalue) - np.minimum.at(min_values, indices, tel_values) + min_values = np.full(n_array_events, np.inf) + np.minimum.at(min_values, indices[valid_tel], tel_values[valid_tel]) result = np.full(n_array_events, np.nan) - valid = min_values < fillvalue + valid = min_values < np.inf result[valid] = min_values[valid] return result From 43cb24182f933f79df2e4163af95b4c86f3534b9 Mon Sep 17 00:00:00 2001 From: Lukas Beiske Date: Mon, 29 Jan 2024 14:20:16 +0100 Subject: [PATCH 11/22] Iterate over dl1 tels not tels_with_trigger --- src/ctapipe/image/statistics.py | 2 +- src/ctapipe/tools/aggregate_features.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ctapipe/image/statistics.py b/src/ctapipe/image/statistics.py index 0c4690468ec..83d1d878d15 100644 --- a/src/ctapipe/image/statistics.py +++ b/src/ctapipe/image/statistics.py @@ -130,7 +130,7 @@ def __init__(self, *args, **kwargs): def __call__(self, event: ArrayEventContainer) -> None: """Fill event container with aggregated image parameters.""" table = None - for tel_id in event.trigger.tels_with_trigger: + for tel_id in event.dl1.tel.keys(): t = collect_features(event, tel_id) t["obs_id"] = event.index.obs_id t["event_id"] = event.index.event_id diff --git a/src/ctapipe/tools/aggregate_features.py b/src/ctapipe/tools/aggregate_features.py index a8c1b91522d..5bc81e8a941 100644 --- a/src/ctapipe/tools/aggregate_features.py +++ b/src/ctapipe/tools/aggregate_features.py @@ -104,7 +104,7 @@ class AggregateFeatures(Tool): classes = [TableLoader, FeatureAggregator] def setup(self): - """Initilize components form config.""" + """Initilize components from config.""" self.check_output(self.output_path) self.log.info("Copying to output destination.") with HDF5Merger(self.output_path, parent=self) as merger: From 7f258fc7ef4f5e8ec3a25954ff1ac850ecc8835f Mon Sep 17 00:00:00 2001 From: Lukas Beiske Date: Mon, 29 Jan 2024 14:21:13 +0100 Subject: [PATCH 12/22] Add tests --- src/ctapipe/image/tests/test_statistics.py | 34 ++++++++ .../tools/tests/test_aggregate_features.py | 81 +++++++++++++++++++ .../vectorization/tests/test_aggregate.py | 50 ++++++++++++ 3 files changed, 165 insertions(+) create mode 100644 src/ctapipe/tools/tests/test_aggregate_features.py create mode 100644 src/ctapipe/vectorization/tests/test_aggregate.py diff --git a/src/ctapipe/image/tests/test_statistics.py b/src/ctapipe/image/tests/test_statistics.py index b9773edc9a7..06581027c74 100644 --- a/src/ctapipe/image/tests/test_statistics.py +++ b/src/ctapipe/image/tests/test_statistics.py @@ -1,6 +1,14 @@ +import astropy.units as u import numpy as np import scipy.stats +from ctapipe.containers import ( + ArrayEventContainer, + HillasParametersContainer, + ImageParametersContainer, + MorphologyContainer, +) + def test_statistics(): from ctapipe.image import descriptive_statistics @@ -60,3 +68,29 @@ def test_return_type(): stats = descriptive_statistics(data, container_class=PeakTimeStatisticsContainer) assert isinstance(stats, PeakTimeStatisticsContainer) + + +def test_feature_aggregator(): + from ctapipe.image import FeatureAggregator + + event = ArrayEventContainer() + for tel_id, length, n_islands in zip((2, 7, 11), (0.3, 0.5, 0.4), (2, 1, 3)): + event.dl1.tel[tel_id].parameters = ImageParametersContainer( + hillas=HillasParametersContainer(length=length * u.deg), + morphology=MorphologyContainer(n_islands=n_islands), + ) + + features = [ + ("hillas", "length"), + ("morphology", "n_islands"), + ] + aggregate_featuers = FeatureAggregator(image_parameters=features) + aggregate_featuers(event) + assert event.dl1.aggregate["hillas_length"].max == 0.5 * u.deg + assert event.dl1.aggregate["hillas_length"].min == 0.3 * u.deg + assert u.isclose(event.dl1.aggregate["hillas_length"].mean, 0.4 * u.deg) + assert u.isclose(event.dl1.aggregate["hillas_length"].std, 0.081649658 * u.deg) + assert event.dl1.aggregate["morphology_n_islands"].max == 3 + assert event.dl1.aggregate["morphology_n_islands"].min == 1 + assert np.isclose(event.dl1.aggregate["morphology_n_islands"].mean, 2) + assert np.isclose(event.dl1.aggregate["morphology_n_islands"].std, 0.81649658) diff --git a/src/ctapipe/tools/tests/test_aggregate_features.py b/src/ctapipe/tools/tests/test_aggregate_features.py new file mode 100644 index 00000000000..ebc25b89f57 --- /dev/null +++ b/src/ctapipe/tools/tests/test_aggregate_features.py @@ -0,0 +1,81 @@ +import json + +import pytest + +from ctapipe.core import ToolConfigurationError, run_tool +from ctapipe.io import TableLoader + + +def test_aggregate_features(dl2_shower_geometry_file_lapalma, tmp_path): + from ctapipe.tools.aggregate_features import AggregateFeatures + + input_path = dl2_shower_geometry_file_lapalma + output_path = tmp_path / "aggregated.dl1.h5" + config_path = tmp_path / "config.json" + + with pytest.raises( + ToolConfigurationError, + match="No image parameters to aggregate are specified.", + ): + run_tool( + AggregateFeatures(), + argv=[ + f"--input={input_path}", + f"--output={output_path}", + ], + raises=True, + ) + + config = { + "FeatureAggregator": { + "FeatureGenerator": { + "features": [("hillas_abs_skewness", "abs(hillas_skewness)")] + }, + "image_parameters": [ + ("hillas", "length"), + ("timing", "slope"), + ("HillasReconstructor", "tel_impact_distance"), + ("hillas", "abs_skewness"), + ], + } + } + with config_path.open("w") as f: + json.dump(config, f) + + # test "overwrite" works + with pytest.raises(ToolConfigurationError, match="exists, but overwrite=False"): + run_tool( + AggregateFeatures(), + argv=[ + f"--input={input_path}", + f"--output={output_path}", + f"--config={config_path}", + ], + raises=True, + ) + + ret = run_tool( + AggregateFeatures(), + argv=[ + f"--input={input_path}", + f"--output={output_path}", + f"--config={config_path}", + "--overwrite", + ], + raises=True, + ) + assert ret == 0 + + with TableLoader(output_path) as loader: + events = loader.read_subarray_events( + dl1_aggregates=True, + dl2=False, + simulated=False, + ) + assert "hillas_length_max" in events.colnames + assert "hillas_length_min" in events.colnames + assert "hillas_length_mean" in events.colnames + assert "hillas_length_std" in events.colnames + assert "timing_slope_mean" in events.colnames + assert "HillasReconstructor_tel_impact_distance_mean" in events.colnames + assert "hillas_abs_skewness_mean" in events.colnames diff --git a/src/ctapipe/vectorization/tests/test_aggregate.py b/src/ctapipe/vectorization/tests/test_aggregate.py new file mode 100644 index 00000000000..14c3f578c4f --- /dev/null +++ b/src/ctapipe/vectorization/tests/test_aggregate.py @@ -0,0 +1,50 @@ +import numpy as np +from astropy.table import Table + +from ctapipe.io import TableLoader, read_table +from ctapipe.io.tests.test_table_loader import check_equal_array_event_order + + +def test_get_subarray_index(dl1_parameters_file): + from ctapipe.vectorization import get_subarray_index + + opts = dict(simulated=False, true_parameters=False, dl2=False, pointing=False) + with TableLoader(dl1_parameters_file, **opts) as loader: + tel_events = loader.read_telescope_events() + + subarray_obs_ids, subarray_event_ids, _, _ = get_subarray_index(tel_events) + trigger = read_table(dl1_parameters_file, "/dl1/event/subarray/trigger") + + assert len(subarray_obs_ids) == len(subarray_event_ids) + assert len(subarray_obs_ids) == len(trigger) + check_equal_array_event_order( + Table({"obs_id": subarray_obs_ids, "event_id": subarray_event_ids}), trigger + ) + + +def test_mean_std_ufunc(dl1_parameters_file): + from ctapipe.vectorization import get_subarray_index, weighted_mean_std_ufunc + + opts = dict(simulated=False, true_parameters=False, dl2=False, pointing=False) + with TableLoader(dl1_parameters_file, **opts) as loader: + tel_events = loader.read_telescope_events() + + valid = np.isfinite(tel_events["hillas_length"]) + obs_ids, event_ids, m, tel_to_subarray_idx = get_subarray_index(tel_events) + n_array_events = len(obs_ids) + + # test only default uniform weights, + # other weights are tested in test_stereo_combination + mean, std = weighted_mean_std_ufunc( + tel_events["hillas_length"], valid, n_array_events, tel_to_subarray_idx, m + ) + + # check if result is identical with np.mean/np.std + true_mean = np.full(n_array_events, np.nan) + true_std = np.full(n_array_events, np.nan) + grouped = tel_events.group_by(["obs_id", "event_id"]) + true_mean = grouped["hillas_length"].groups.aggregate(np.nanmean) + true_std = grouped["hillas_length"].groups.aggregate(np.nanstd) + + assert np.allclose(mean, true_mean, equal_nan=True) + assert np.allclose(std, true_std, equal_nan=True) From df268deb1ac537af72b196b142b7e68e00e6502a Mon Sep 17 00:00:00 2001 From: Lukas Beiske Date: Mon, 29 Jan 2024 17:22:34 +0100 Subject: [PATCH 13/22] Move error for empty image_parameters trait into FeatureAggregator --- src/ctapipe/image/statistics.py | 5 ++++- src/ctapipe/tools/aggregate_features.py | 6 +----- src/ctapipe/tools/process.py | 6 +----- .../tools/tests/test_aggregate_features.py | 19 ++++++++++--------- 4 files changed, 16 insertions(+), 20 deletions(-) diff --git a/src/ctapipe/image/statistics.py b/src/ctapipe/image/statistics.py index 83d1d878d15..6af55999dd8 100644 --- a/src/ctapipe/image/statistics.py +++ b/src/ctapipe/image/statistics.py @@ -10,7 +10,7 @@ StatisticsContainer, ) from ..core import Component, FeatureGenerator, QualityQuery -from ..core.traits import List, Tuple, Unicode +from ..core.traits import List, TraitError, Tuple, Unicode from ..vectorization import ( collect_features, get_subarray_index, @@ -156,6 +156,9 @@ def aggregate_table(self, mono_parameters: Table) -> Table: Construct table containing aggregated image parameters from table of telescope events. """ + if len(self.image_parameters) == 0: + raise TraitError("No DL1 image parameters to aggregate are specified.") + mono_parameters = self.feature_generator(mono_parameters) passes_cuts = self.quality_query.get_table_mask(mono_parameters) diff --git a/src/ctapipe/tools/aggregate_features.py b/src/ctapipe/tools/aggregate_features.py index 5bc81e8a941..4a2f9d43b7b 100644 --- a/src/ctapipe/tools/aggregate_features.py +++ b/src/ctapipe/tools/aggregate_features.py @@ -4,7 +4,7 @@ import tables from tqdm.auto import tqdm -from ..core import Tool, ToolConfigurationError +from ..core import Tool from ..core.traits import Bool, Integer, Path, flag from ..image import FeatureAggregator from ..io import HDF5Merger, TableLoader, write_table @@ -118,10 +118,6 @@ def setup(self): ) ) self.aggregator = FeatureAggregator(parent=self) - if len(self.aggregator.image_parameters) == 0: - raise ToolConfigurationError( - "No image parameters to aggregate are specified." - ) def start(self): """Aggregate DL1 image parameters for input tables.""" diff --git a/src/ctapipe/tools/process.py b/src/ctapipe/tools/process.py index 42187048546..45f07578aa3 100644 --- a/src/ctapipe/tools/process.py +++ b/src/ctapipe/tools/process.py @@ -7,7 +7,7 @@ from tqdm.auto import tqdm from ..calib import CameraCalibrator, GainSelector -from ..core import QualityQuery, Tool, ToolConfigurationError +from ..core import QualityQuery, Tool from ..core.traits import Bool, classes_with_traits, flag from ..image import FeatureAggregator, ImageCleaner, ImageModifier, ImageProcessor from ..image.extractor import ImageExtractor @@ -328,10 +328,6 @@ def start(self): self.process_shower(event) if self.write.write_dl1_aggregates: - if len(self.aggregate.image_parameters) == 0: - raise ToolConfigurationError( - "No DL1 image parameters to aggregate are specified." - ) self.aggregate(event) self.write(event) diff --git a/src/ctapipe/tools/tests/test_aggregate_features.py b/src/ctapipe/tools/tests/test_aggregate_features.py index ebc25b89f57..b82452dca98 100644 --- a/src/ctapipe/tools/tests/test_aggregate_features.py +++ b/src/ctapipe/tools/tests/test_aggregate_features.py @@ -3,6 +3,7 @@ import pytest from ctapipe.core import ToolConfigurationError, run_tool +from ctapipe.core.traits import TraitError from ctapipe.io import TableLoader @@ -14,8 +15,7 @@ def test_aggregate_features(dl2_shower_geometry_file_lapalma, tmp_path): config_path = tmp_path / "config.json" with pytest.raises( - ToolConfigurationError, - match="No image parameters to aggregate are specified.", + TraitError, match="No DL1 image parameters to aggregate are specified." ): run_tool( AggregateFeatures(), @@ -72,10 +72,11 @@ def test_aggregate_features(dl2_shower_geometry_file_lapalma, tmp_path): dl2=False, simulated=False, ) - assert "hillas_length_max" in events.colnames - assert "hillas_length_min" in events.colnames - assert "hillas_length_mean" in events.colnames - assert "hillas_length_std" in events.colnames - assert "timing_slope_mean" in events.colnames - assert "HillasReconstructor_tel_impact_distance_mean" in events.colnames - assert "hillas_abs_skewness_mean" in events.colnames + for col in [ + "hillas_length", + "timing_slope", + "HillasReconstructor_tel_impact_distance", + "hillas_abs_skewness", + ]: + for suffix in ["max", "min", "mean", "std"]: + assert f"{col}_{suffix}" in events.colnames From 65791a6b7f11cfcb36d348d8933c3175475f612a Mon Sep 17 00:00:00 2001 From: Lukas Beiske Date: Mon, 29 Jan 2024 17:23:02 +0100 Subject: [PATCH 14/22] Add additional test for process tool --- src/ctapipe/tools/tests/test_process.py | 27 +++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/src/ctapipe/tools/tests/test_process.py b/src/ctapipe/tools/tests/test_process.py index 769574c71f4..e6de2ef3d53 100644 --- a/src/ctapipe/tools/tests/test_process.py +++ b/src/ctapipe/tools/tests/test_process.py @@ -4,6 +4,7 @@ Test ctapipe-process on a few different use cases """ +import json from subprocess import CalledProcessError import astropy.units as u @@ -488,3 +489,29 @@ def test_only_trigger_and_simulation(tmp_path): assert len(events) == 7 assert "tels_with_trigger" in events.colnames assert "true_energy" in events.colnames + + +def test_dl1_aggregation(dl1_parameters_file, tmp_path): + output_path = tmp_path / "aggregated.dl1.h5" + config_path = tmp_path / "config.json" + + config = {"FeatureAggregator": {"image_parameters": [("hillas", "length")]}} + with config_path.open("w") as f: + json.dump(config, f) + + run_tool( + ProcessorTool(), + argv=[ + f"--input={dl1_parameters_file}", + f"--output={output_path}", + f"--config={config_path}", + "--aggregate-dl1-image-parameters", + ], + cwd=tmp_path, + raises=True, + ) + + with TableLoader(output_path) as loader: + events = loader.read_subarray_events(simulated=False, dl1_aggregates=True) + for suffix in ["max", "min", "mean", "std"]: + assert f"hillas_length_{suffix}" in events.colnames From 16f184e5e45c26d7f8a3900af1062708f45ede6d Mon Sep 17 00:00:00 2001 From: Lukas Beiske Date: Tue, 16 Apr 2024 09:58:49 +0200 Subject: [PATCH 15/22] Fix typos --- src/ctapipe/tools/aggregate_features.py | 4 ++-- src/ctapipe/vectorization/aggregate.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/ctapipe/tools/aggregate_features.py b/src/ctapipe/tools/aggregate_features.py index 4a2f9d43b7b..4349b7fa3d5 100644 --- a/src/ctapipe/tools/aggregate_features.py +++ b/src/ctapipe/tools/aggregate_features.py @@ -17,7 +17,7 @@ class AggregateFeatures(Tool): Aggregate DL1 image parameters array-event-wise. This tool calculates the maximal, minimal, and mean value, - as well as the standart deviation for any given DL1 image parameter + as well as the standard deviation for any given DL1 image parameter for all array events given as input. """ @@ -104,7 +104,7 @@ class AggregateFeatures(Tool): classes = [TableLoader, FeatureAggregator] def setup(self): - """Initilize components from config.""" + """Initialize components from config.""" self.check_output(self.output_path) self.log.info("Copying to output destination.") with HDF5Merger(self.output_path, parent=self) as merger: diff --git a/src/ctapipe/vectorization/aggregate.py b/src/ctapipe/vectorization/aggregate.py index 99ecca9b081..382528579ee 100644 --- a/src/ctapipe/vectorization/aggregate.py +++ b/src/ctapipe/vectorization/aggregate.py @@ -92,7 +92,7 @@ def weighted_mean_std_ufunc( weights=np.array([1]), ): """ - Calculate the weighted mean and standart deviation for each array event + Calculate the weighted mean and standard deviation for each array event over the corresponding telescope events. Parameters @@ -115,7 +115,7 @@ def weighted_mean_std_ufunc( Returns ------- Tuple(np.ndarray, np.ndarray) - weighted mean and standart deviation for each array event + weighted mean and standard deviation for each array event """ # avoid numerical problems by very large or small weights weights = weights / weights.max() From 22c48a72f5ac60a6c7f3c6af9b7bc49dd58c4fd1 Mon Sep 17 00:00:00 2001 From: JBernete Date: Mon, 22 Jan 2024 15:58:09 +0100 Subject: [PATCH 16/22] Start working in angular regressor reconstruction --- src/ctapipe/reco/__init__.py | 3 + src/ctapipe/reco/angular_error.py | 91 +++++++++++++++ .../resources/train_ang_error_regressor.yaml | 20 ++++ src/ctapipe/tools/tests/test_train.py | 20 ++++ .../tools/train_angular_error_regressor.py | 107 ++++++++++++++++++ 5 files changed, 241 insertions(+) create mode 100644 src/ctapipe/reco/angular_error.py create mode 100644 src/ctapipe/resources/train_ang_error_regressor.yaml create mode 100644 src/ctapipe/tools/train_angular_error_regressor.py diff --git a/src/ctapipe/reco/__init__.py b/src/ctapipe/reco/__init__.py index 267b68b4b5c..726bb5e08ee 100644 --- a/src/ctapipe/reco/__init__.py +++ b/src/ctapipe/reco/__init__.py @@ -19,7 +19,10 @@ ) from .stereo_combination import StereoCombiner, StereoMeanCombiner +from .angular_error import AngularErrorRegressor + __all__ = [ + "AngularErrorRegressor", "Reconstructor", "HillasGeometryReconstructor", "ReconstructionProperty", diff --git a/src/ctapipe/reco/angular_error.py b/src/ctapipe/reco/angular_error.py new file mode 100644 index 00000000000..45c6c581934 --- /dev/null +++ b/src/ctapipe/reco/angular_error.py @@ -0,0 +1,91 @@ +import pathlib + +import joblib +from astropy.coordinates import angular_separation + +from .preprocessing import table_to_X +from .reconstructor import Reconstructor +from ..core import traits, Provenance, FeatureGenerator +from .sklearn import SUPPORTED_REGRESSORS +__all__ = ['AngularErrorRegressor'] + +from ..core.traits import TraitError, Unicode + + +class AngularErrorRegressor(Reconstructor): + """ + Reconstructor for estimating the angular reconstruction error. + """ + features = traits.List( + traits.Unicode(), help="Features to use for this model." + ).tag(config=True) + model_config = traits.Dict({}, help="kwargs for the sklearn model.").tag( + config=True + ) + model_cls = traits.Enum( + SUPPORTED_REGRESSORS.keys(), + default_value=None, + allow_none=True, + help="Which scikit-learn model to use.", + ).tag(config=True) + reconstructor_prefix = Unicode( + default_value="HillasReconstructor", + allow_none=False, + help="Prefix of the reconstructor to use for the training.", + ).tag(config=True) + + def __init__(self, subarray=None, models=None, n_jobs=None, **kwargs): + # Run the Component __init__ first to handle the configuration + # and make `self.load_path` available + super().__init__(subarray, **kwargs) + + if self.model_cls is None: + raise TraitError( + "Must provide `model_cls` if not loading model from file" + ) + + self.feature_generator = FeatureGenerator(parent=self) + + # to verify settings + self._new_model() + + self.unit = None + + def __call__(self, event): + pass + + def predict_subarray_table(self, events): + pass + + def fit(self, table): + """ + Create and fit a new model. + """ + self._model = self._new_model() + table = self.feature_generator(table, subarray=self.subarray) + X, valid = table_to_X(table, self.features, self.log) + ang_error = self._compute_angular_separation(table[valid]) + self.unit = ang_error.unit + self._model.fit(X, ang_error.quantity.to_value(self.unit)) + + def _new_model(self): + cfg = self.model_config + return SUPPORTED_REGRESSORS[self.model_cls](**cfg) + + def _compute_angular_separation(self, table): + return angular_separation( + table[f"{self.reconstructor_prefix}_alt"], + table[f"{self.reconstructor_prefix}_az"], + table["true_alt"], + table["true_az"], + ) + + def write(self, path, overwrite=False): + path = pathlib.Path(path) + + if path.exists() and not overwrite: + raise IOError(f"Path {path} exists and overwrite=False") + + with path.open("wb") as f: + Provenance().add_output_file(path, role="ml-models") + joblib.dump(self, f, compress=True) \ No newline at end of file diff --git a/src/ctapipe/resources/train_ang_error_regressor.yaml b/src/ctapipe/resources/train_ang_error_regressor.yaml new file mode 100644 index 00000000000..adcf6f8507e --- /dev/null +++ b/src/ctapipe/resources/train_ang_error_regressor.yaml @@ -0,0 +1,20 @@ +TrainAngularErrorRegressor: + AngularErrorRegressor: + model_cls: 'MLPRegressor' + model_config: + hidden_layer_sizes: [36, 6] + activation: 'tanh' + solver: 'adam' + max_iter: 20000 + tol: 1.0e-5 + random_state: 0 + features: + - "n_telescopes_triggered" + - "n_LSTs_triggered" + - "n_MSTs_triggered" + + FeatureGenerator: # On-the-fly generation of additional features + features: + - [ "n_telescopes_triggered", "subarray.multiplicity(tels_with_trigger)" ] + - [ "n_LSTs_triggered", "subarray.multiplicity(tels_with_trigger, 'LST_LST_LSTCam')" ] + - [ "n_MSTs_triggered", "subarray.multiplicity(tels_with_trigger, 'MST_MST_NectarCam')" ] \ No newline at end of file diff --git a/src/ctapipe/tools/tests/test_train.py b/src/ctapipe/tools/tests/test_train.py index acdb60925c3..68f42ee692b 100644 --- a/src/ctapipe/tools/tests/test_train.py +++ b/src/ctapipe/tools/tests/test_train.py @@ -224,3 +224,23 @@ def test_no_cross_validation(tmp_path): ) assert ret == 0 return out_file + +def test_angular_error_regressor(tmp_path): + from ctapipe.tools.train_angular_error_regressor import TrainAngularErrorRegressor + + out_file = tmp_path / "angular_error.pkl" + + tool = TrainAngularErrorRegressor() + config = resource_file("train_ang_error_regressor.yaml") + ret = run_tool( + tool, + argv=[ + "--input=dataset://gamma_diffuse_dl2_train_small.dl2.h5", + f"--output={out_file}", + f"--config={config}", + "--log-level=INFO", + "--overwrite", + ], + ) + assert ret == 0 + return out_file \ No newline at end of file diff --git a/src/ctapipe/tools/train_angular_error_regressor.py b/src/ctapipe/tools/train_angular_error_regressor.py new file mode 100644 index 00000000000..c32b08876eb --- /dev/null +++ b/src/ctapipe/tools/train_angular_error_regressor.py @@ -0,0 +1,107 @@ +""" +Tool for training the AngularErrorRegressor +""" +import numpy as np + +from ctapipe.core import Tool +from ctapipe.core.traits import Int, IntTelescopeParameter, Path, Unicode +from ctapipe.io import TableLoader +from ctapipe.reco import CrossValidator, AngularErrorRegressor + +__all__ = [ + "TrainAngularErrorRegressor", +] + + +class TrainAngularErrorRegressor(Tool): + + name = "ctapipe-train-angular-error-regressor" + description = __doc__ + + examples = """ + ctapipe-train-angular-error-regressor \\ + --config train_angular_error_regressor.yaml \\ + --input gamma.dl2.h5 \\ + --output angular_error_regressor.pkl + """ + + output_path = Path( + default_value=None, + allow_none=False, + directory_ok=False, + help=( + "Output path for the trained reconstructor." + " At the moment, pickle is the only supported format." + ), + ).tag(config=True) + + n_events = Int( + default_value=None, + allow_none=True, + help=( + "Number of events for training the model." + " If not given, all available events will be used." + ), + ).tag(config=True) + + random_seed = Int( + default_value=0, help="Random seed for sampling training events." + ).tag(config=True) + + n_jobs = Int( + default_value=None, + allow_none=True, + help="Number of threads to use for the reconstruction. This overwrites the values in the config of each reconstructor.", + ).tag(config=True) + + aliases = { + ("i", "input"): "TableLoader.input_url", + ("o", "output"): "TrainAngularErrorRegressor.output_path", + "n-events": "TrainAngularErrorRegressor.n_events", + } + + classes = [ + TableLoader, + AngularErrorRegressor, + CrossValidator, + ] + + def setup(self): + """ + Initialize components from config. + """ + self.loader = self.enter_context( + TableLoader( + parent=self, + ) + ) + self.regressor = AngularErrorRegressor(self.loader.subarray, parent=self) + self.rng = np.random.default_rng(self.random_seed) + self.check_output(self.output_path) + + def start(self): + """ + Run the tool. + """ + events = self.loader.read_subarray_events( + dl2=True, + simulated=True, + ) + + self.log.info("Loaded %d events", len(events)) + valid = events[self.regressor.reconstructor_prefix + "_is_valid"] + events = events[valid] + self.log.info("Using %d valid events", len(events)) + self.regressor.fit(events) + + def finish(self): + """ + Save the model to disk. + """ + self.regressor.write(self.output_path) +def main(): + TrainAngularErrorRegressor().run() + + +if __name__ == "__main__": + main() From 539dd9af917656bbf158b2106cbf4e66ebc955e6 Mon Sep 17 00:00:00 2001 From: JBernete Date: Tue, 23 Jan 2024 14:06:05 +0100 Subject: [PATCH 17/22] Apply model --- src/ctapipe/reco/angular_error.py | 23 ++- .../tools/apply_angular_error_model.py | 175 ++++++++++++++++++ 2 files changed, 195 insertions(+), 3 deletions(-) create mode 100644 src/ctapipe/tools/apply_angular_error_model.py diff --git a/src/ctapipe/reco/angular_error.py b/src/ctapipe/reco/angular_error.py index 45c6c581934..b96687e7e73 100644 --- a/src/ctapipe/reco/angular_error.py +++ b/src/ctapipe/reco/angular_error.py @@ -2,9 +2,12 @@ import joblib from astropy.coordinates import angular_separation +import numpy as np +import astropy.units as u +from astropy.table import Table from .preprocessing import table_to_X -from .reconstructor import Reconstructor +from .reconstructor import Reconstructor, ReconstructionProperty from ..core import traits, Provenance, FeatureGenerator from .sklearn import SUPPORTED_REGRESSORS __all__ = ['AngularErrorRegressor'] @@ -55,7 +58,21 @@ def __call__(self, event): pass def predict_subarray_table(self, events): - pass + table = self.feature_generator(events, subarray=self.subarray) + X, is_valid = table_to_X(table, self.features, self.log) + n_rows = len(table) + ang_error = np.full(n_rows, np.nan) + ang_error[is_valid] = self._model.predict(X[is_valid]) + ang_error = u.Quantity(ang_error, self.unit, copy=False) + + result = Table( + { + f"{self.reconstructor_prefix}_ang_distance_uncert": ang_error, + f"{self.reconstructor_prefix}_is_valid": is_valid, + } + ) + return result + def fit(self, table): """ @@ -88,4 +105,4 @@ def write(self, path, overwrite=False): with path.open("wb") as f: Provenance().add_output_file(path, role="ml-models") - joblib.dump(self, f, compress=True) \ No newline at end of file + joblib.dump(self, f, compress=True) diff --git a/src/ctapipe/tools/apply_angular_error_model.py b/src/ctapipe/tools/apply_angular_error_model.py new file mode 100644 index 00000000000..399ba4f2d52 --- /dev/null +++ b/src/ctapipe/tools/apply_angular_error_model.py @@ -0,0 +1,175 @@ + +import tables +from tqdm.auto import tqdm + +from ctapipe.core import Tool + +from ctapipe.core.traits import Path, Integer, Bool, classes_with_traits, flag + +from ctapipe.io import TableLoader, HDF5Merger, write_table +from ctapipe.reco import Reconstructor + + +__all__ = [ + "ApplyAngularErrorModel", +] + +class ApplyAngularErrorModel(Tool): + """ + Apply the angular error model to a DL2 file. + + Model needs to be trained with + `~ctapipe.tools.train_angular_error_regressor.TrainAngularErrorRegressor`. + """ + + name = "ctapipe-apply-angular-error-model" + description = __doc__ + + examples = """ + ctapipe-apply-angular-error-model \\ + --input gamma.dl2.h5 \\ + --reconstructor angular_error_regressor.pkl \\ + --output gamma_applied.dl2.h5 + """ + + input_url = Path( + default_value=None, + allow_none=False, + directory_ok=False, + exists=True, + help="Input dl2 file", + ).tag(config=True) + + output_path = Path( + default_value=None, + allow_none=False, + directory_ok=False, + help="Output file", + ).tag(config=True) + + reconstructor_path = Path( + default_value=None, + allow_none=False, + directory_ok=False, + exists=True, + help="Path to trained reconstructor to be applied to the input data", + ).tag(config=True) + + chunk_size = Integer( + default_value=100000, + allow_none=True, + help="How many subarray events to load at once", + ).tag(config=True) + + n_jobs = Integer( + default_value=None, + allow_none=True, + help="Number of threads to use for the reconstruction. This overwrites the values in the config", + ).tag(config=True) + + progress_bar = Bool( + help="show progress bar during processing", + default_value=True, + ).tag(config=True) + + aliases = { + ("i", "input"): "ApplyAngularErrorModel.input_url", + ("o", "output"): "ApplyAngularErrorModel.output_path", + ("r", "reconstructor"): "ApplyAngularErrorModel.reconstructor_path", + "n-jobs": "ApplyAngularErrorModel.n_jobs", + "chunk-size": "ApplyAngularErrorModel.chunk_size", + } + + flags = { + **flag( + "progress", + "ProcessorTool.progress_bar", + "show a progress bar during event processing", + "don't show a progress bar during event processing", + ), + **flag( + "dl1-parameters", + "HDF5Merger.dl1_parameters", + "Include dl1 parameters", + "Exclude dl1 parameters", + ), + **flag( + "true-parameters", + "HDF5Merger.true_parameters", + "Include true parameters", + "Exclude true parameters", + ), + "overwrite": ( + { + "HDF5Merger": {"overwrite": True}, + "ApplyAngularErrorModel": {"overwrite": True}, + }, + "Overwrite output file if it exists", + ), + } + + classes = [TableLoader] + classes_with_traits(Reconstructor) + + def setup(self): + """ + Initialize components from config. + """ + self.check_output(self.output_path) + self.log.info("Copying to output destination.") + with HDF5Merger(self.output_path, parent=self) as merger: + merger(self.input_url) + self.h5file = self.enter_context(tables.open_file(self.output_path, mode="r+")) + self.loader = self.enter_context( + TableLoader( + self.input_url, + parent=self, + ) + ) + self.regressor = Reconstructor.read(self.reconstructor_path, parent=self, subarray=self.loader.subarray) + if self.n_jobs: + self.regressor.n_jobs = self.n_jobs + + def start(self): + """ + Apply the model to the input file. + """ + chunk_iterator = self.loader.read_subarray_events_chunked( + self.chunk_size, + simulated=False, + observation_info=True, + ) + bar = tqdm( + chunk_iterator, + desc="Applying angular error model", + unit="Subarray events", + total=chunk_iterator.n_total, + disable=not self.progress_bar, + ) + with bar: + for chunk, (start, stop, table) in enumerate(chunk_iterator): + self.log.debug("Events read from chunk %d: %d", chunk, len(table)) + self._apply(self.regressor, table, start=start, stop=stop) + self.log.debug("Events after applying angular error model: %d", len(table)) + bar.update(stop - start) + + def _apply(self, reconstructor, table, start=None, stop=None): + prediction = reconstructor.predict_subarray_table(table) + prefix = reconstructor.reconstructor_prefix + new_columns = prediction.colnames + self.log.debug("Writing to output file") + for col in new_columns: + table[col] = prediction[col] + output_columns = ["obs_id", "event_id"] + new_columns + write_table( + table[output_columns], + self.output_path, + f"/dl2/event/subarray/geometry/{prefix}_uncertainty", + append=True, + ) + +def main(): + ApplyAngularErrorModel().run() + + +if __name__ == "__main__": + main() \ No newline at end of file From 2bc1d32a90284c8a090675ce76cd41e5176d0676 Mon Sep 17 00:00:00 2001 From: JBernete Date: Tue, 23 Jan 2024 17:08:57 +0100 Subject: [PATCH 18/22] Add aggregated features and quality query --- src/ctapipe/reco/angular_error.py | 16 ++++++++-------- .../resources/train_ang_error_regressor.yaml | 1 + src/ctapipe/tools/apply_angular_error_model.py | 1 + .../tools/train_angular_error_regressor.py | 9 ++++++--- 4 files changed, 16 insertions(+), 11 deletions(-) diff --git a/src/ctapipe/reco/angular_error.py b/src/ctapipe/reco/angular_error.py index b96687e7e73..2aa2d4a5bd3 100644 --- a/src/ctapipe/reco/angular_error.py +++ b/src/ctapipe/reco/angular_error.py @@ -8,7 +8,7 @@ from .preprocessing import table_to_X from .reconstructor import Reconstructor, ReconstructionProperty -from ..core import traits, Provenance, FeatureGenerator +from ..core import traits, Provenance, FeatureGenerator, QualityQuery from .sklearn import SUPPORTED_REGRESSORS __all__ = ['AngularErrorRegressor'] @@ -48,7 +48,7 @@ def __init__(self, subarray=None, models=None, n_jobs=None, **kwargs): ) self.feature_generator = FeatureGenerator(parent=self) - + self.quality_query = QualityQuery(parent=self) # to verify settings self._new_model() @@ -57,23 +57,23 @@ def __init__(self, subarray=None, models=None, n_jobs=None, **kwargs): def __call__(self, event): pass - def predict_subarray_table(self, events): - table = self.feature_generator(events, subarray=self.subarray) - X, is_valid = table_to_X(table, self.features, self.log) + def predict_subarray_table(self, table): + quality_valid = self.quality_query.get_table_mask(table) + table = self.feature_generator(table[quality_valid], subarray=self.subarray) + X, valid = table_to_X(table, self.features, self.log) n_rows = len(table) ang_error = np.full(n_rows, np.nan) - ang_error[is_valid] = self._model.predict(X[is_valid]) + ang_error[valid] = self._model.predict(X) ang_error = u.Quantity(ang_error, self.unit, copy=False) result = Table( { f"{self.reconstructor_prefix}_ang_distance_uncert": ang_error, - f"{self.reconstructor_prefix}_is_valid": is_valid, + f"{self.reconstructor_prefix}_is_valid": valid, } ) return result - def fit(self, table): """ Create and fit a new model. diff --git a/src/ctapipe/resources/train_ang_error_regressor.yaml b/src/ctapipe/resources/train_ang_error_regressor.yaml index adcf6f8507e..733de12f00b 100644 --- a/src/ctapipe/resources/train_ang_error_regressor.yaml +++ b/src/ctapipe/resources/train_ang_error_regressor.yaml @@ -12,6 +12,7 @@ TrainAngularErrorRegressor: - "n_telescopes_triggered" - "n_LSTs_triggered" - "n_MSTs_triggered" + - "hillas_length_mean" FeatureGenerator: # On-the-fly generation of additional features features: diff --git a/src/ctapipe/tools/apply_angular_error_model.py b/src/ctapipe/tools/apply_angular_error_model.py index 399ba4f2d52..3388ee92208 100644 --- a/src/ctapipe/tools/apply_angular_error_model.py +++ b/src/ctapipe/tools/apply_angular_error_model.py @@ -137,6 +137,7 @@ def start(self): self.chunk_size, simulated=False, observation_info=True, + dl1_aggregates=True, ) bar = tqdm( chunk_iterator, diff --git a/src/ctapipe/tools/train_angular_error_regressor.py b/src/ctapipe/tools/train_angular_error_regressor.py index c32b08876eb..8ef0f317f14 100644 --- a/src/ctapipe/tools/train_angular_error_regressor.py +++ b/src/ctapipe/tools/train_angular_error_regressor.py @@ -3,7 +3,7 @@ """ import numpy as np -from ctapipe.core import Tool +from ctapipe.core import Tool, QualityQuery from ctapipe.core.traits import Int, IntTelescopeParameter, Path, Unicode from ctapipe.io import TableLoader from ctapipe.reco import CrossValidator, AngularErrorRegressor @@ -76,6 +76,7 @@ def setup(self): ) ) self.regressor = AngularErrorRegressor(self.loader.subarray, parent=self) + self.quality_query = QualityQuery(parent=self) self.rng = np.random.default_rng(self.random_seed) self.check_output(self.output_path) @@ -86,10 +87,12 @@ def start(self): events = self.loader.read_subarray_events( dl2=True, simulated=True, + dl1_aggregates=True, ) self.log.info("Loaded %d events", len(events)) - valid = events[self.regressor.reconstructor_prefix + "_is_valid"] + valid = self.quality_query.get_table_mask(events) + valid = valid & events[f"{self.regressor.reconstructor_prefix}_is_valid"] events = events[valid] self.log.info("Using %d valid events", len(events)) self.regressor.fit(events) @@ -98,7 +101,7 @@ def finish(self): """ Save the model to disk. """ - self.regressor.write(self.output_path) + self.regressor.write(self.output_path, overwrite=self.overwrite) def main(): TrainAngularErrorRegressor().run() From 767b875c72a64443e9da5c6e44b0ef2959548539 Mon Sep 17 00:00:00 2001 From: JBernete Date: Fri, 26 Jan 2024 11:11:43 +0100 Subject: [PATCH 19/22] Rename angular error to direction uncertainty --- src/ctapipe/reco/__init__.py | 4 +-- ...ular_error.py => direction_uncertainty.py} | 20 ++++++------ ...rain_direction_uncertainty_regressor.yaml} | 12 +++++-- ...y => apply_direction_uncertainty_model.py} | 32 +++++++++---------- src/ctapipe/tools/tests/test_train.py | 8 ++--- ... train_direction_uncertainty_regressor.py} | 26 +++++++-------- 6 files changed, 55 insertions(+), 47 deletions(-) rename src/ctapipe/reco/{angular_error.py => direction_uncertainty.py} (84%) rename src/ctapipe/resources/{train_ang_error_regressor.yaml => train_direction_uncertainty_regressor.yaml} (69%) rename src/ctapipe/tools/{apply_angular_error_model.py => apply_direction_uncertainty_model.py} (81%) rename src/ctapipe/tools/{train_angular_error_regressor.py => train_direction_uncertainty_regressor.py} (76%) diff --git a/src/ctapipe/reco/__init__.py b/src/ctapipe/reco/__init__.py index 726bb5e08ee..b6739aa9383 100644 --- a/src/ctapipe/reco/__init__.py +++ b/src/ctapipe/reco/__init__.py @@ -19,10 +19,10 @@ ) from .stereo_combination import StereoCombiner, StereoMeanCombiner -from .angular_error import AngularErrorRegressor +from .direction_uncertainty import DirectionUncertaintyRegressor __all__ = [ - "AngularErrorRegressor", + "DirectionUncertaintyRegressor", "Reconstructor", "HillasGeometryReconstructor", "ReconstructionProperty", diff --git a/src/ctapipe/reco/angular_error.py b/src/ctapipe/reco/direction_uncertainty.py similarity index 84% rename from src/ctapipe/reco/angular_error.py rename to src/ctapipe/reco/direction_uncertainty.py index 2aa2d4a5bd3..b5be9889349 100644 --- a/src/ctapipe/reco/angular_error.py +++ b/src/ctapipe/reco/direction_uncertainty.py @@ -10,14 +10,14 @@ from .reconstructor import Reconstructor, ReconstructionProperty from ..core import traits, Provenance, FeatureGenerator, QualityQuery from .sklearn import SUPPORTED_REGRESSORS -__all__ = ['AngularErrorRegressor'] +__all__ = ['DirectionUncertaintyRegressor'] from ..core.traits import TraitError, Unicode -class AngularErrorRegressor(Reconstructor): +class DirectionUncertaintyRegressor(Reconstructor): """ - Reconstructor for estimating the angular reconstruction error. + Reconstructor for estimating the direction reconstruction uncertainty. """ features = traits.List( traits.Unicode(), help="Features to use for this model." @@ -62,13 +62,13 @@ def predict_subarray_table(self, table): table = self.feature_generator(table[quality_valid], subarray=self.subarray) X, valid = table_to_X(table, self.features, self.log) n_rows = len(table) - ang_error = np.full(n_rows, np.nan) - ang_error[valid] = self._model.predict(X) - ang_error = u.Quantity(ang_error, self.unit, copy=False) + dir_uncert = np.full(n_rows, np.nan) + dir_uncert[valid] = self._model.predict(X) + dir_uncert = u.Quantity(dir_uncert, self.unit, copy=False) result = Table( { - f"{self.reconstructor_prefix}_ang_distance_uncert": ang_error, + f"{self.reconstructor_prefix}_direction_uncertainty": dir_uncert, f"{self.reconstructor_prefix}_is_valid": valid, } ) @@ -81,9 +81,9 @@ def fit(self, table): self._model = self._new_model() table = self.feature_generator(table, subarray=self.subarray) X, valid = table_to_X(table, self.features, self.log) - ang_error = self._compute_angular_separation(table[valid]) - self.unit = ang_error.unit - self._model.fit(X, ang_error.quantity.to_value(self.unit)) + dir_uncert = self._compute_angular_separation(table[valid]) + self.unit = dir_uncert.unit + self._model.fit(X, dir_uncert.quantity.to_value(self.unit)) def _new_model(self): cfg = self.model_config diff --git a/src/ctapipe/resources/train_ang_error_regressor.yaml b/src/ctapipe/resources/train_direction_uncertainty_regressor.yaml similarity index 69% rename from src/ctapipe/resources/train_ang_error_regressor.yaml rename to src/ctapipe/resources/train_direction_uncertainty_regressor.yaml index 733de12f00b..240398b8905 100644 --- a/src/ctapipe/resources/train_ang_error_regressor.yaml +++ b/src/ctapipe/resources/train_direction_uncertainty_regressor.yaml @@ -1,5 +1,5 @@ -TrainAngularErrorRegressor: - AngularErrorRegressor: +TrainDirectionUncertaintyRegressor: + DirectionUncertaintyRegressor: model_cls: 'MLPRegressor' model_config: hidden_layer_sizes: [36, 6] @@ -13,6 +13,14 @@ TrainAngularErrorRegressor: - "n_LSTs_triggered" - "n_MSTs_triggered" - "hillas_length_mean" + - "hillas_length_std" + - "hillas_width_mean" + - "hillas_width_std" + - "hillas_skewness_mean" + - "hillas_skewness_std" + - "hillas_kurtosis_mean" + - "hillas_kurtosis_std" + # ADD MORE FEATURES HERE FeatureGenerator: # On-the-fly generation of additional features features: diff --git a/src/ctapipe/tools/apply_angular_error_model.py b/src/ctapipe/tools/apply_direction_uncertainty_model.py similarity index 81% rename from src/ctapipe/tools/apply_angular_error_model.py rename to src/ctapipe/tools/apply_direction_uncertainty_model.py index 3388ee92208..f0375a74409 100644 --- a/src/ctapipe/tools/apply_angular_error_model.py +++ b/src/ctapipe/tools/apply_direction_uncertainty_model.py @@ -11,24 +11,24 @@ __all__ = [ - "ApplyAngularErrorModel", + "ApplyDirectionUncertaintyModel", ] -class ApplyAngularErrorModel(Tool): +class ApplyDirectionUncertaintyModel(Tool): """ - Apply the angular error model to a DL2 file. + Apply the direction uncertainty model to a DL2 file. Model needs to be trained with - `~ctapipe.tools.train_angular_error_regressor.TrainAngularErrorRegressor`. + `~ctapipe.tools.train_direction_uncertainty_regressor.TrainDirectionUncertaintyRegressor`. """ - name = "ctapipe-apply-angular-error-model" + name = "ctapipe-apply-direction-uncertainty-model" description = __doc__ examples = """ - ctapipe-apply-angular-error-model \\ + ctapipe-apply-direction-uncertainty-model \\ --input gamma.dl2.h5 \\ - --reconstructor angular_error_regressor.pkl \\ + --reconstructor direction_uncertainty_regressor.pkl \\ --output gamma_applied.dl2.h5 """ @@ -73,11 +73,11 @@ class ApplyAngularErrorModel(Tool): ).tag(config=True) aliases = { - ("i", "input"): "ApplyAngularErrorModel.input_url", - ("o", "output"): "ApplyAngularErrorModel.output_path", - ("r", "reconstructor"): "ApplyAngularErrorModel.reconstructor_path", - "n-jobs": "ApplyAngularErrorModel.n_jobs", - "chunk-size": "ApplyAngularErrorModel.chunk_size", + ("i", "input"): "ApplyDirectionUncertaintyModel.input_url", + ("o", "output"): "ApplyDirectionUncertaintyModel.output_path", + ("r", "reconstructor"): "ApplyDirectionUncertaintyModel.reconstructor_path", + "n-jobs": "ApplyDirectionUncertaintyModel.n_jobs", + "chunk-size": "ApplyDirectionUncertaintyModel.chunk_size", } flags = { @@ -102,7 +102,7 @@ class ApplyAngularErrorModel(Tool): "overwrite": ( { "HDF5Merger": {"overwrite": True}, - "ApplyAngularErrorModel": {"overwrite": True}, + "ApplyDirectionUncertaintyModel": {"overwrite": True}, }, "Overwrite output file if it exists", ), @@ -141,7 +141,7 @@ def start(self): ) bar = tqdm( chunk_iterator, - desc="Applying angular error model", + desc="Applying direction uncertainty model", unit="Subarray events", total=chunk_iterator.n_total, disable=not self.progress_bar, @@ -150,7 +150,7 @@ def start(self): for chunk, (start, stop, table) in enumerate(chunk_iterator): self.log.debug("Events read from chunk %d: %d", chunk, len(table)) self._apply(self.regressor, table, start=start, stop=stop) - self.log.debug("Events after applying angular error model: %d", len(table)) + self.log.debug("Events after applying direction uncertainty model: %d", len(table)) bar.update(stop - start) def _apply(self, reconstructor, table, start=None, stop=None): @@ -169,7 +169,7 @@ def _apply(self, reconstructor, table, start=None, stop=None): ) def main(): - ApplyAngularErrorModel().run() + ApplyDirectionUncertaintyModel().run() if __name__ == "__main__": diff --git a/src/ctapipe/tools/tests/test_train.py b/src/ctapipe/tools/tests/test_train.py index 68f42ee692b..8bdbbbd297e 100644 --- a/src/ctapipe/tools/tests/test_train.py +++ b/src/ctapipe/tools/tests/test_train.py @@ -225,13 +225,13 @@ def test_no_cross_validation(tmp_path): assert ret == 0 return out_file -def test_angular_error_regressor(tmp_path): - from ctapipe.tools.train_angular_error_regressor import TrainAngularErrorRegressor +def test_direction_uncertainty_regressor(tmp_path): + from ctapipe.tools.train_direction_uncertainty_regressor import TrainAngularErrorRegressor - out_file = tmp_path / "angular_error.pkl" + out_file = tmp_path / "direction_uncertainty.pkl" tool = TrainAngularErrorRegressor() - config = resource_file("train_ang_error_regressor.yaml") + config = resource_file("train_direction_uncertainty_regressor.yaml") ret = run_tool( tool, argv=[ diff --git a/src/ctapipe/tools/train_angular_error_regressor.py b/src/ctapipe/tools/train_direction_uncertainty_regressor.py similarity index 76% rename from src/ctapipe/tools/train_angular_error_regressor.py rename to src/ctapipe/tools/train_direction_uncertainty_regressor.py index 8ef0f317f14..e1046431c76 100644 --- a/src/ctapipe/tools/train_angular_error_regressor.py +++ b/src/ctapipe/tools/train_direction_uncertainty_regressor.py @@ -1,28 +1,28 @@ """ -Tool for training the AngularErrorRegressor +Tool for training the DirectionUncertaintyRegressor """ import numpy as np from ctapipe.core import Tool, QualityQuery from ctapipe.core.traits import Int, IntTelescopeParameter, Path, Unicode from ctapipe.io import TableLoader -from ctapipe.reco import CrossValidator, AngularErrorRegressor +from ctapipe.reco import CrossValidator, DirectionUncertaintyRegressor __all__ = [ - "TrainAngularErrorRegressor", + "TrainDirectionUncertaintyRegressor", ] -class TrainAngularErrorRegressor(Tool): +class TrainDirectionUncertaintyRegressor(Tool): - name = "ctapipe-train-angular-error-regressor" + name = "ctapipe-train-direction-uncertainty-regressor" description = __doc__ examples = """ - ctapipe-train-angular-error-regressor \\ - --config train_angular_error_regressor.yaml \\ + ctapipe-train-direction-uncertainty-regressor \\ + --config train_direction_uncertainty_regressor.yaml \\ --input gamma.dl2.h5 \\ - --output angular_error_regressor.pkl + --output direction_uncertainty_regressor.pkl """ output_path = Path( @@ -56,13 +56,13 @@ class TrainAngularErrorRegressor(Tool): aliases = { ("i", "input"): "TableLoader.input_url", - ("o", "output"): "TrainAngularErrorRegressor.output_path", - "n-events": "TrainAngularErrorRegressor.n_events", + ("o", "output"): "TrainDirectionUncertaintyRegressor.output_path", + "n-events": "TrainDirectionUncertaintyRegressor.n_events", } classes = [ TableLoader, - AngularErrorRegressor, + DirectionUncertaintyRegressor, CrossValidator, ] @@ -75,7 +75,7 @@ def setup(self): parent=self, ) ) - self.regressor = AngularErrorRegressor(self.loader.subarray, parent=self) + self.regressor = DirectionUncertaintyRegressor(self.loader.subarray, parent=self) self.quality_query = QualityQuery(parent=self) self.rng = np.random.default_rng(self.random_seed) self.check_output(self.output_path) @@ -103,7 +103,7 @@ def finish(self): """ self.regressor.write(self.output_path, overwrite=self.overwrite) def main(): - TrainAngularErrorRegressor().run() + TrainDirectionUncertaintyRegressor().run() if __name__ == "__main__": From 83f366425003f88d4d990239ba465f344869129a Mon Sep 17 00:00:00 2001 From: JBernete Date: Fri, 26 Jan 2024 11:26:09 +0100 Subject: [PATCH 20/22] Fix column name --- src/ctapipe/reco/direction_uncertainty.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ctapipe/reco/direction_uncertainty.py b/src/ctapipe/reco/direction_uncertainty.py index b5be9889349..885b79d102c 100644 --- a/src/ctapipe/reco/direction_uncertainty.py +++ b/src/ctapipe/reco/direction_uncertainty.py @@ -68,7 +68,7 @@ def predict_subarray_table(self, table): result = Table( { - f"{self.reconstructor_prefix}_direction_uncertainty": dir_uncert, + f"{self.reconstructor_prefix}_ang_distance_uncert": dir_uncert, f"{self.reconstructor_prefix}_is_valid": valid, } ) From 34b6f8d316c4484f9de64381ed9868857e992e17 Mon Sep 17 00:00:00 2001 From: Lukas Beiske Date: Tue, 16 Apr 2024 15:07:17 +0200 Subject: [PATCH 21/22] Fix test --- src/ctapipe/tools/tests/test_train.py | 38 ++++++++++++++++++++++++--- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/src/ctapipe/tools/tests/test_train.py b/src/ctapipe/tools/tests/test_train.py index 8bdbbbd297e..425190edde8 100644 --- a/src/ctapipe/tools/tests/test_train.py +++ b/src/ctapipe/tools/tests/test_train.py @@ -1,3 +1,5 @@ +import json + import numpy as np import pytest @@ -225,17 +227,45 @@ def test_no_cross_validation(tmp_path): assert ret == 0 return out_file + def test_direction_uncertainty_regressor(tmp_path): - from ctapipe.tools.train_direction_uncertainty_regressor import TrainAngularErrorRegressor + from ctapipe.tools.aggregate_features import AggregateFeatures + from ctapipe.tools.train_direction_uncertainty_regressor import ( + TrainDirectionUncertaintyRegressor, + ) + + agg_path = tmp_path / "aggregated.dl1.h5" + config_path = tmp_path / "config.json" + config = { + "FeatureAggregator": { + "image_parameters": [ + ("hillas", "length"), + ("hillas", "width"), + ("hillas", "skewness"), + ("hillas", "kurtosis"), + ], + } + } + with config_path.open("w") as f: + json.dump(config, f) + + run_tool( + AggregateFeatures(), + argv=[ + "--input=dataset://gamma_diffuse_dl2_train_small.dl2.h5", + f"--output={agg_path}", + f"--config={config_path}", + ], + ) out_file = tmp_path / "direction_uncertainty.pkl" - tool = TrainAngularErrorRegressor() + tool = TrainDirectionUncertaintyRegressor() config = resource_file("train_direction_uncertainty_regressor.yaml") ret = run_tool( tool, argv=[ - "--input=dataset://gamma_diffuse_dl2_train_small.dl2.h5", + f"--input={agg_path}", f"--output={out_file}", f"--config={config}", "--log-level=INFO", @@ -243,4 +273,4 @@ def test_direction_uncertainty_regressor(tmp_path): ], ) assert ret == 0 - return out_file \ No newline at end of file + return out_file From 08acd3d23d0cbcc5e21368611de07752f27e87e0 Mon Sep 17 00:00:00 2001 From: Lukas Beiske Date: Tue, 16 Apr 2024 15:11:47 +0200 Subject: [PATCH 22/22] Run pre-commit --- src/ctapipe/reco/__init__.py | 3 +-- src/ctapipe/reco/direction_uncertainty.py | 16 +++++++-------- ...train_direction_uncertainty_regressor.yaml | 2 +- .../apply_direction_uncertainty_model.py | 20 ++++++++++--------- .../train_direction_uncertainty_regressor.py | 11 ++++++---- 5 files changed, 28 insertions(+), 24 deletions(-) diff --git a/src/ctapipe/reco/__init__.py b/src/ctapipe/reco/__init__.py index b6739aa9383..029d9562da4 100644 --- a/src/ctapipe/reco/__init__.py +++ b/src/ctapipe/reco/__init__.py @@ -2,6 +2,7 @@ # reconstructors must be imported before ShowerProcessor, so # they are available there +from .direction_uncertainty import DirectionUncertaintyRegressor from .hillas_intersection import HillasIntersection from .hillas_reconstructor import HillasReconstructor from .impact import ImPACTReconstructor @@ -19,8 +20,6 @@ ) from .stereo_combination import StereoCombiner, StereoMeanCombiner -from .direction_uncertainty import DirectionUncertaintyRegressor - __all__ = [ "DirectionUncertaintyRegressor", "Reconstructor", diff --git a/src/ctapipe/reco/direction_uncertainty.py b/src/ctapipe/reco/direction_uncertainty.py index 885b79d102c..200645a1024 100644 --- a/src/ctapipe/reco/direction_uncertainty.py +++ b/src/ctapipe/reco/direction_uncertainty.py @@ -1,16 +1,17 @@ import pathlib +import astropy.units as u import joblib -from astropy.coordinates import angular_separation import numpy as np -import astropy.units as u +from astropy.coordinates import angular_separation from astropy.table import Table +from ..core import FeatureGenerator, Provenance, QualityQuery, traits from .preprocessing import table_to_X -from .reconstructor import Reconstructor, ReconstructionProperty -from ..core import traits, Provenance, FeatureGenerator, QualityQuery +from .reconstructor import Reconstructor from .sklearn import SUPPORTED_REGRESSORS -__all__ = ['DirectionUncertaintyRegressor'] + +__all__ = ["DirectionUncertaintyRegressor"] from ..core.traits import TraitError, Unicode @@ -19,6 +20,7 @@ class DirectionUncertaintyRegressor(Reconstructor): """ Reconstructor for estimating the direction reconstruction uncertainty. """ + features = traits.List( traits.Unicode(), help="Features to use for this model." ).tag(config=True) @@ -43,9 +45,7 @@ def __init__(self, subarray=None, models=None, n_jobs=None, **kwargs): super().__init__(subarray, **kwargs) if self.model_cls is None: - raise TraitError( - "Must provide `model_cls` if not loading model from file" - ) + raise TraitError("Must provide `model_cls` if not loading model from file") self.feature_generator = FeatureGenerator(parent=self) self.quality_query = QualityQuery(parent=self) diff --git a/src/ctapipe/resources/train_direction_uncertainty_regressor.yaml b/src/ctapipe/resources/train_direction_uncertainty_regressor.yaml index 240398b8905..ba89b3c1165 100644 --- a/src/ctapipe/resources/train_direction_uncertainty_regressor.yaml +++ b/src/ctapipe/resources/train_direction_uncertainty_regressor.yaml @@ -26,4 +26,4 @@ TrainDirectionUncertaintyRegressor: features: - [ "n_telescopes_triggered", "subarray.multiplicity(tels_with_trigger)" ] - [ "n_LSTs_triggered", "subarray.multiplicity(tels_with_trigger, 'LST_LST_LSTCam')" ] - - [ "n_MSTs_triggered", "subarray.multiplicity(tels_with_trigger, 'MST_MST_NectarCam')" ] \ No newline at end of file + - [ "n_MSTs_triggered", "subarray.multiplicity(tels_with_trigger, 'MST_MST_NectarCam')" ] diff --git a/src/ctapipe/tools/apply_direction_uncertainty_model.py b/src/ctapipe/tools/apply_direction_uncertainty_model.py index f0375a74409..f305eaed93d 100644 --- a/src/ctapipe/tools/apply_direction_uncertainty_model.py +++ b/src/ctapipe/tools/apply_direction_uncertainty_model.py @@ -1,19 +1,16 @@ - import tables from tqdm.auto import tqdm from ctapipe.core import Tool - -from ctapipe.core.traits import Path, Integer, Bool, classes_with_traits, flag - -from ctapipe.io import TableLoader, HDF5Merger, write_table +from ctapipe.core.traits import Bool, Integer, Path, classes_with_traits, flag +from ctapipe.io import HDF5Merger, TableLoader, write_table from ctapipe.reco import Reconstructor - __all__ = [ "ApplyDirectionUncertaintyModel", ] + class ApplyDirectionUncertaintyModel(Tool): """ Apply the direction uncertainty model to a DL2 file. @@ -125,7 +122,9 @@ def setup(self): parent=self, ) ) - self.regressor = Reconstructor.read(self.reconstructor_path, parent=self, subarray=self.loader.subarray) + self.regressor = Reconstructor.read( + self.reconstructor_path, parent=self, subarray=self.loader.subarray + ) if self.n_jobs: self.regressor.n_jobs = self.n_jobs @@ -150,7 +149,9 @@ def start(self): for chunk, (start, stop, table) in enumerate(chunk_iterator): self.log.debug("Events read from chunk %d: %d", chunk, len(table)) self._apply(self.regressor, table, start=start, stop=stop) - self.log.debug("Events after applying direction uncertainty model: %d", len(table)) + self.log.debug( + "Events after applying direction uncertainty model: %d", len(table) + ) bar.update(stop - start) def _apply(self, reconstructor, table, start=None, stop=None): @@ -168,9 +169,10 @@ def _apply(self, reconstructor, table, start=None, stop=None): append=True, ) + def main(): ApplyDirectionUncertaintyModel().run() if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/src/ctapipe/tools/train_direction_uncertainty_regressor.py b/src/ctapipe/tools/train_direction_uncertainty_regressor.py index e1046431c76..98b9dc5a55d 100644 --- a/src/ctapipe/tools/train_direction_uncertainty_regressor.py +++ b/src/ctapipe/tools/train_direction_uncertainty_regressor.py @@ -3,8 +3,8 @@ """ import numpy as np -from ctapipe.core import Tool, QualityQuery -from ctapipe.core.traits import Int, IntTelescopeParameter, Path, Unicode +from ctapipe.core import QualityQuery, Tool +from ctapipe.core.traits import Int, Path from ctapipe.io import TableLoader from ctapipe.reco import CrossValidator, DirectionUncertaintyRegressor @@ -14,7 +14,6 @@ class TrainDirectionUncertaintyRegressor(Tool): - name = "ctapipe-train-direction-uncertainty-regressor" description = __doc__ @@ -75,7 +74,9 @@ def setup(self): parent=self, ) ) - self.regressor = DirectionUncertaintyRegressor(self.loader.subarray, parent=self) + self.regressor = DirectionUncertaintyRegressor( + self.loader.subarray, parent=self + ) self.quality_query = QualityQuery(parent=self) self.rng = np.random.default_rng(self.random_seed) self.check_output(self.output_path) @@ -102,6 +103,8 @@ def finish(self): Save the model to disk. """ self.regressor.write(self.output_path, overwrite=self.overwrite) + + def main(): TrainDirectionUncertaintyRegressor().run()