Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add functionality for array-event-wise aggregation of dl1 image parameters #2497

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changes/2497.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Add the ``FeatureAggregator`` component and the ``ctapipe-aggregate-image-parameters`` tool
for aggregating the telescope-wise dl1 image parameters for each array event.
This functionality is also added to ``ctapipe-process``.

Refactor helper functions for vectorized numpy calculations into new ``ctapipe.vectorization`` module.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ ctapipe-train-energy-regressor = "ctapipe.tools.train_energy_regressor:main"
ctapipe-train-particle-classifier = "ctapipe.tools.train_particle_classifier:main"
ctapipe-train-disp-reconstructor = "ctapipe.tools.train_disp_reconstructor:main"
ctapipe-apply-models = "ctapipe.tools.apply_models:main"
ctapipe-aggregate-image-parameters = "ctapipe.tools.aggregate_features:main"

[project.entry-points.ctapipe_io]
HDF5EventSource = "ctapipe.io.hdf5eventsource:HDF5EventSource"
Expand Down
10 changes: 9 additions & 1 deletion src/ctapipe/containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
"TimingParametersContainer",
"TriggerContainer",
"WaveformCalibrationContainer",
"BaseStatisticsContainer",
"StatisticsContainer",
"IntensityStatisticsContainer",
"PeakTimeStatisticsContainer",
Expand Down Expand Up @@ -410,13 +411,16 @@ class MorphologyContainer(Container):
n_large_islands = Field(-1, "Number of > 50 pixel islands")


class StatisticsContainer(Container):
class BaseStatisticsContainer(Container):
"""Store descriptive statistics"""

max = Field(np.float32(nan), "value of pixel with maximum intensity")
min = Field(np.float32(nan), "value of pixel with minimum intensity")
mean = Field(np.float32(nan), "mean intensity")
std = Field(np.float32(nan), "standard deviation of intensity")


class StatisticsContainer(BaseStatisticsContainer):
skewness = Field(nan, "skewness of intensity")
kurtosis = Field(nan, "kurtosis of intensity")

Expand Down Expand Up @@ -522,6 +526,10 @@ class DL1Container(Container):
default_factory=partial(Map, DL1CameraContainer),
description="map of tel_id to DL1CameraContainer",
)
aggregate = Field(
default_factory=partial(Map, BaseStatisticsContainer),
description="map of image parameter to aggregation statistics",
)


class DL1CameraCalibrationContainer(Container):
Expand Down
3 changes: 2 additions & 1 deletion src/ctapipe/image/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
neg_log_likelihood_numeric,
)
from .reducer import DataVolumeReducer, NullDataVolumeReducer, TailCutsDataVolumeReducer
from .statistics import descriptive_statistics
from .statistics import FeatureAggregator, descriptive_statistics
from .timing import timing_parameters

__all__ = [
Expand Down Expand Up @@ -119,4 +119,5 @@
"TailCutsDataVolumeReducer",
"InvalidPixelHandler",
"NeighborAverage",
"FeatureAggregator",
]
129 changes: 127 additions & 2 deletions src/ctapipe/image/statistics.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,25 @@
"""Calculating statistics of image parameters."""
import astropy.units as u
import numpy as np
from astropy.table import Table, vstack
from numba import njit

from ..containers import StatisticsContainer
from ..containers import (
ArrayEventContainer,
BaseStatisticsContainer,
StatisticsContainer,
)
from ..core import Component, FeatureGenerator, QualityQuery
from ..core.traits import List, TraitError, Tuple, Unicode
from ..vectorization import (
collect_features,
get_subarray_index,
max_ufunc,
min_ufunc,
weighted_mean_std_ufunc,
)

__all__ = ["descriptive_statistics", "skewness", "kurtosis"]
__all__ = ["descriptive_statistics", "skewness", "kurtosis", "FeatureAggregator"]


@njit(cache=True)
Expand Down Expand Up @@ -92,3 +108,112 @@ def descriptive_statistics(
skewness=skewness(values, mean=mean, std=std),
kurtosis=kurtosis(values, mean=mean, std=std),
)


class FeatureAggregator(Component):
"""Array-event-wise aggregation of image parameters."""

image_parameters = List(
Tuple(Unicode(), Unicode()),
default_value=[],
help=(
"List of 2-Tuples of Strings: ('prefix', 'feature'). "
"The image parameter to be aggregated is 'prefix_feature'."
),
).tag(config=True)

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.feature_generator = FeatureGenerator(parent=self)
self.quality_query = QualityQuery(parent=self)

def __call__(self, event: ArrayEventContainer) -> None:
"""Fill event container with aggregated image parameters."""
table = None
for tel_id in event.dl1.tel.keys():
t = collect_features(event, tel_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible I missed it, but you are collecting features from all telescope in the event, but shouldn't it include only those used in reconstruction? In other words, instead of tels_with_trigger, you should have something like event.dl2.stereo.geometry.telescopes

Copy link
Contributor Author

@LukasBeiske LukasBeiske Jan 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now this is done via a quality query in aggregate_table, which should use the same quality criteria as the reconstruction for which the error will be estimated. But if this is integrated into the train/apply tool for the error estimator, doing it that way would be better, I agree.

t["obs_id"] = event.index.obs_id
t["event_id"] = event.index.event_id
if not table:
table = t
else:
table = vstack([table, t])

agg_table = self.aggregate_table(table)
for col in [
prefix + "_" + feature for prefix, feature in self.image_parameters
]:
event.dl1.aggregate[col] = BaseStatisticsContainer(
max=agg_table[col + "_max"],
min=agg_table[col + "_min"],
mean=agg_table[col + "_mean"],
std=agg_table[col + "_std"],
prefix=col,
)

def aggregate_table(self, mono_parameters: Table) -> Table:
"""
Construct table containing aggregated image parameters
from table of telescope events.
"""
if len(self.image_parameters) == 0:
raise TraitError("No DL1 image parameters to aggregate are specified.")

mono_parameters = self.feature_generator(mono_parameters)
passes_cuts = self.quality_query.get_table_mask(mono_parameters)

obs_ids, event_ids, multiplicity, tel_to_array_indices = get_subarray_index(
mono_parameters
)
n_array_events = len(obs_ids)
agg_table = Table({"obs_id": obs_ids, "event_id": event_ids})
# copy metadata
for colname in ("obs_id", "event_id"):
agg_table[colname].description = mono_parameters[colname].description

for prefix, feature in self.image_parameters:
if feature in ("psi", "phi"):
raise NotImplementedError(
"Aggregating rotation angels or polar coordinates"
" is not supported."
)

col = prefix + "_" + feature
unit = mono_parameters[col].quantity.unit
if prefix == "morphology":
valid = mono_parameters[col] >= 0 & passes_cuts
else:
valid = ~np.isnan(mono_parameters[col]) & passes_cuts

if np.sum(valid) > 0:
means, stds = weighted_mean_std_ufunc(
mono_parameters[col],
valid,
n_array_events,
tel_to_array_indices,
multiplicity,
)
max_values = max_ufunc(
mono_parameters[col],
valid,
n_array_events,
tel_to_array_indices,
)
min_values = min_ufunc(
mono_parameters[col],
valid,
n_array_events,
tel_to_array_indices,
)
else:
means = np.full(n_array_events, np.nan)
stds = np.full(n_array_events, np.nan)
max_values = np.full(n_array_events, np.nan)
min_values = np.full(n_array_events, np.nan)

agg_table[col + "_max"] = u.Quantity(max_values, unit, copy=False)
agg_table[col + "_min"] = u.Quantity(min_values, unit, copy=False)
agg_table[col + "_mean"] = u.Quantity(means, unit, copy=False)
agg_table[col + "_std"] = u.Quantity(stds, unit, copy=False)

return agg_table
34 changes: 34 additions & 0 deletions src/ctapipe/image/tests/test_statistics.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
import astropy.units as u
import numpy as np
import scipy.stats

from ctapipe.containers import (
ArrayEventContainer,
HillasParametersContainer,
ImageParametersContainer,
MorphologyContainer,
)


def test_statistics():
from ctapipe.image import descriptive_statistics
Expand Down Expand Up @@ -60,3 +68,29 @@ def test_return_type():

stats = descriptive_statistics(data, container_class=PeakTimeStatisticsContainer)
assert isinstance(stats, PeakTimeStatisticsContainer)


def test_feature_aggregator():
from ctapipe.image import FeatureAggregator

event = ArrayEventContainer()
for tel_id, length, n_islands in zip((2, 7, 11), (0.3, 0.5, 0.4), (2, 1, 3)):
event.dl1.tel[tel_id].parameters = ImageParametersContainer(
hillas=HillasParametersContainer(length=length * u.deg),
morphology=MorphologyContainer(n_islands=n_islands),
)

features = [
("hillas", "length"),
("morphology", "n_islands"),
]
aggregate_featuers = FeatureAggregator(image_parameters=features)
aggregate_featuers(event)
assert event.dl1.aggregate["hillas_length"].max == 0.5 * u.deg
assert event.dl1.aggregate["hillas_length"].min == 0.3 * u.deg
assert u.isclose(event.dl1.aggregate["hillas_length"].mean, 0.4 * u.deg)
assert u.isclose(event.dl1.aggregate["hillas_length"].std, 0.081649658 * u.deg)
assert event.dl1.aggregate["morphology_n_islands"].max == 3
assert event.dl1.aggregate["morphology_n_islands"].min == 1
assert np.isclose(event.dl1.aggregate["morphology_n_islands"].mean, 2)
assert np.isclose(event.dl1.aggregate["morphology_n_islands"].std, 0.81649658)
17 changes: 17 additions & 0 deletions src/ctapipe/io/datawriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@ class DataWriter(Component):
help="Store muon parameters if available", default_value=False
).tag(config=True)

write_dl1_aggregates = Bool(
help="Store array-event-wise aggregated DL1 image parameters if available",
default_value=False,
).tag(config=True)

compression_level = Int(
help="compression level, 0=None, 9=maximum", default_value=5, min=0, max=9
).tag(config=True)
Expand Down Expand Up @@ -338,6 +343,9 @@ def __call__(self, event: ArrayEventContainer):
if self.write_muon_parameters:
self._write_muon_telescope_events(event)

if self.write_dl1_aggregates:
self._write_dl1_aggregates(event)

def _write_constant_pointing(self, event):
"""
Write pointing configuration from event data assuming fixed pointing over the OB.
Expand Down Expand Up @@ -723,6 +731,15 @@ def _write_muon_telescope_events(self, event: ArrayEventContainer):
[tel_index, muon.ring, muon.parameters, muon.efficiency],
)

def _write_dl1_aggregates(self, event: ArrayEventContainer):
"""
Write array-event-wise aggregated DL1 image parameters.
"""
self._writer.write(
table_name="dl1/event/subarray/aggregated_image_parameters",
containers=[event.index] + list(event.dl1.aggregate.values()),
)

def _write_dl2_telescope_events(self, event: ArrayEventContainer):
"""
write per-telescope DL2 shower information.
Expand Down
46 changes: 32 additions & 14 deletions src/ctapipe/io/tableloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
IMAGES_GROUP = "/dl1/event/telescope/images"
MUON_GROUP = "/dl1/event/telescope/muon"
TRIGGER_TABLE = "/dl1/event/subarray/trigger"
PARAMETER_AGGS_GROUP = "/dl1/event/subarray/aggregated_image_parameters"
SHOWER_TABLE = "/simulation/event/subarray/shower"
TRUE_IMAGES_GROUP = "/simulation/event/telescope/images"
TRUE_PARAMETERS_GROUP = "/simulation/event/telescope/parameters"
Expand Down Expand Up @@ -179,6 +180,9 @@ class TableLoader(Component):
config=True
)
dl1_muons = traits.Bool(False, help="load muon ring parameters").tag(config=True)
dl1_aggregates = traits.Bool(
False, help="load array-event-wise aggregated image parameters"
).tag(config=True)

dl2 = traits.Bool(True, help="load available dl2 stereo parameters").tag(
config=True
Expand Down Expand Up @@ -280,6 +284,7 @@ def _check_args(self, **kwargs):
"dl1_parameters": PARAMETERS_GROUP,
"dl1_images": IMAGES_GROUP,
"dl1_muons": MUON_GROUP,
"dl1_aggregates": PARAMETER_AGGS_GROUP,
"true_parameters": TRUE_PARAMETERS_GROUP,
"true_images": TRUE_IMAGES_GROUP,
"observation_info": OBSERVATION_TABLE,
Expand Down Expand Up @@ -380,6 +385,7 @@ def read_subarray_events(
self,
start=None,
stop=None,
dl1_aggregates=None,
dl2=None,
simulated=None,
observation_info=None,
Expand All @@ -390,6 +396,8 @@ def read_subarray_events(
Parameters
----------

dl1_aggregates: bool
load available aggregated dl1 image parameters
dl2: bool
load available dl2 stereo parameters
simulated: bool
Expand All @@ -407,10 +415,12 @@ def read_subarray_events(
Table with primary index columns "obs_id" and "event_id".
"""
updated_args = self._check_args(
dl1_aggregates=dl1_aggregates,
dl2=dl2,
simulated=simulated,
observation_info=observation_info,
)
dl1_aggregates = updated_args["dl1_aggregates"]
dl2 = updated_args["dl2"]
simulated = updated_args["simulated"]
observation_info = updated_args["observation_info"]
Expand All @@ -423,20 +433,28 @@ def read_subarray_events(
showers = read_table(self.h5file, SHOWER_TABLE, start=start, stop=stop)
table = _merge_subarray_tables(table, showers)

if dl2:
if DL2_SUBARRAY_GROUP in self.h5file:
for group_name in self.h5file.root[DL2_SUBARRAY_GROUP]._v_children:
group_path = f"{DL2_SUBARRAY_GROUP}/{group_name}"
group = self.h5file.root[group_path]

for algorithm in group._v_children:
dl2 = read_table(
self.h5file,
f"{group_path}/{algorithm}",
start=start,
stop=stop,
)
table = _merge_subarray_tables(table, dl2)
if dl2 and DL2_SUBARRAY_GROUP in self.h5file:
for group_name in self.h5file.root[DL2_SUBARRAY_GROUP]._v_children:
group_path = f"{DL2_SUBARRAY_GROUP}/{group_name}"
group = self.h5file.root[group_path]

for algorithm in group._v_children:
dl2 = read_table(
self.h5file,
f"{group_path}/{algorithm}",
start=start,
stop=stop,
)
table = _merge_subarray_tables(table, dl2)

if dl1_aggregates and PARAMETER_AGGS_GROUP in self.h5file:
aggs = read_table(
self.h5file,
PARAMETER_AGGS_GROUP,
start=start,
stop=stop,
)
table = _merge_subarray_tables(table, aggs)

if observation_info:
table = self._join_observation_info(table)
Expand Down
Loading