Skip to content

Commit

Permalink
temporary commit
Browse files Browse the repository at this point in the history
  • Loading branch information
LukasBeiske committed Jan 24, 2024
1 parent 723ffbd commit aabf989
Show file tree
Hide file tree
Showing 11 changed files with 368 additions and 294 deletions.
214 changes: 122 additions & 92 deletions ctapipe/image/statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,23 @@
import astropy.units as u
import numpy as np
from astropy.stats import circmean, circstd
from astropy.table import Table
from astropy.table import Table, vstack
from numba import njit

from ..containers import (
ArrayEventContainer,
BaseStatisticsContainer,
StatisticsContainer,
)
from ..core import Component
from ..core import Component, FeatureGenerator, QualityQuery
from ..core.traits import List, Tuple, Unicode
from ..vectorization import max_ufunc, min_ufunc, weighted_mean_ufunc
from ..vectorization import (
collect_features,
get_subarray_index,
max_ufunc,
min_ufunc,
weighted_mean_std_ufunc,
)

__all__ = ["descriptive_statistics", "skewness", "kurtosis", "FeatureAggregator"]

Expand Down Expand Up @@ -117,124 +123,148 @@ class FeatureAggregator(Component):
),
).tag(config=True)

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.feature_generator = FeatureGenerator(parent=self)
self.quality_query = QualityQuery(parent=self)

def __call__(self, event: ArrayEventContainer) -> None:
"""Fill event container with aggregated image parameters."""
for prefix, feature in self.image_parameters:
values = []
unit = None
for tel_id in event.dl1.tel.keys():
value = event.dl1.tel[tel_id].parameters[prefix][feature]
if isinstance(value, u.Quantity):
if not unit:
unit = value.unit
value = value.to_value(unit)

valid = value >= 0 if prefix == "morphology" else ~np.isnan(value)
if valid:
values.append(value)

if len(values) > 0:
if feature.endswith(("psi", "phi")):
mean = circmean(
u.Quantity(values, unit, copy=False).to_value(u.rad)
)
std = circstd(u.Quantity(values, unit, copy=False).to_value(u.rad))
else:
mean = np.mean(values)
std = np.std(values)

# Use the same dtype for all columns, independent of the dtype
# of the aggregated image parameter, since `_mean` and `_std`
# requiere floats anyway.
max_value = np.float64(np.max(values))
min_value = np.float64(np.min(values))
table = None
for tel_id in event.trigger.tels_with_trigger:
t = collect_features(event, tel_id)
t["obs_id"] = event.index.obs_id
t["event_id"] = event.index.event_id
if not table:
table = t
else:
max_value = np.nan
min_value = np.nan
mean = np.nan
std = np.nan

if unit:
max_value = u.Quantity(max_value, unit, copy=False)
min_value = u.Quantity(min_value, unit, copy=False)
if feature.endswith(("psi", "phi")):
mean = u.Quantity(mean, u.rad, copy=False).to(unit)
std = u.Quantity(std, u.rad, copy=False).to(unit)
else:
mean = u.Quantity(mean, unit, copy=False)
std = u.Quantity(std, unit, copy=False)

event.dl1.aggregate[prefix + "_" + feature] = BaseStatisticsContainer(
max=max_value,
min=min_value,
mean=mean,
std=std,
prefix=prefix + "_" + feature,
table = vstack([table, t])

agg_table = self.aggregate_table(table)
for col in [
prefix + "_" + feature for prefix, feature in self.image_parameters
]:
event.dl1.aggregate[col] = BaseStatisticsContainer(
max=agg_table[col + "_max"],
min=agg_table[col + "_min"],
mean=agg_table[col + "_mean"],
std=agg_table[col + "_std"],
prefix=col,
)

def aggregate_table(self, mono_parameters: Table) -> dict[str, Table]:
# for prefix, feature in self.image_parameters:
# values = []
# unit = None
# for tel_id in event.dl1.tel.keys():
# value = event.dl1.tel[tel_id].parameters[prefix][feature]
# if isinstance(value, u.Quantity):
# if not unit:
# unit = value.unit
# value = value.to_value(unit)
#
# valid = value >= 0 if prefix == "morphology" else ~np.isnan(value)
# if valid:
# values.append(value)
#
# if len(values) > 0:
# if feature.endswith(("psi", "phi")):
# mean = circmean(
# u.Quantity(values, unit, copy=False).to_value(u.rad)
# )
# std = circstd(u.Quantity(values, unit, copy=False).to_value(u.rad))
# else:
# mean = np.mean(values)
# std = np.std(values)
#
# # Use the same dtype for all columns, independent of the dtype
# # of the aggregated image parameter, since `_mean` and `_std`
# # requiere floats anyway.
# max_value = np.float64(np.max(values))
# min_value = np.float64(np.min(values))
# else:
# max_value = np.nan
# min_value = np.nan
# mean = np.nan
# std = np.nan
#
# if unit:
# max_value = u.Quantity(max_value, unit, copy=False)
# min_value = u.Quantity(min_value, unit, copy=False)
# if feature.endswith(("psi", "phi")):
# mean = u.Quantity(mean, u.rad, copy=False).to(unit)
# std = u.Quantity(std, u.rad, copy=False).to(unit)
# else:
# mean = u.Quantity(mean, unit, copy=False)
# std = u.Quantity(std, unit, copy=False)
#
# event.dl1.aggregate[prefix + "_" + feature] = BaseStatisticsContainer(
# max=max_value,
# min=min_value,
# mean=mean,
# std=std,
# prefix=prefix + "_" + feature,
# )

def aggregate_table(self, mono_parameters: Table) -> Table:
"""
Construct table containing aggregated image parameters
from table of telescope events.
"""
agg_tables = {}
mono_parameters = self.feature_generator(mono_parameters)
passes_cuts = self.quality_query.get_table_mask(mono_parameters)

obs_ids, event_ids, multiplicity, tel_to_array_indices = get_subarray_index(
mono_parameters
)
n_array_events = len(obs_ids)
agg_table = Table({"obs_id": obs_ids, "event_id": event_ids})
# copy metadata
for colname in ("obs_id", "event_id"):
agg_table[colname].description = mono_parameters[colname].description

for prefix, feature in self.image_parameters:
if feature in ("psi", "phi"):
# TODO: deal with circular boundary conditions
# or just ignore and throw an error, since it's
# not really useful to aggregate those.
raise NotImplementedError()

col = prefix + "_" + feature
unit = mono_parameters[col].quantity.unit
if prefix == "morphology":
valid = mono_parameters[col] >= 0
valid = mono_parameters[col] >= 0 & passes_cuts
else:
valid = ~np.isnan(mono_parameters[col])
valid = ~np.isnan(mono_parameters[col]) & passes_cuts

valid_parameters = mono_parameters[valid]
array_events, indices, multiplicity = np.unique(
mono_parameters["obs_id", "event_id"],
return_inverse=True,
return_counts=True,
)
agg_table = Table(array_events)
for colname in ("obs_id", "event_id"):
agg_table[colname].description = mono_parameters[colname].description

n_array_events = len(array_events)
if len(valid_parameters) > 0:
mono_column = valid_parameters[col]
means = weighted_mean_ufunc(
mono_column,
np.array([1]),
n_array_events,
indices[valid],
)
# FIXME: This has the same problem of strange NaNs as
# e.g. "energy_uncert" generated by the StereoMeanCombiner!
# Output is also incorrect, but I don't understand why...
variance = weighted_mean_ufunc(
(mono_column - np.repeat(means, multiplicity)[valid]) ** 2,
np.array([1]),
if np.sum(valid) > 0:
means, stds = weighted_mean_std_ufunc(
mono_parameters[col],
valid,
n_array_events,
indices[valid],
tel_to_array_indices,
multiplicity,
)
max_values = max_ufunc(
mono_column,
mono_parameters[col],
valid,
n_array_events,
indices[valid],
tel_to_array_indices,
)
min_values = min_ufunc(
mono_column,
mono_parameters[col],
valid,
n_array_events,
indices[valid],
tel_to_array_indices,
)
else:
means = np.full(n_array_events, np.nan)
variance = np.full(n_array_events, np.nan)
stds = np.full(n_array_events, np.nan)
max_values = np.full(n_array_events, np.nan)
min_values = np.full(n_array_events, np.nan)

agg_table[col + "_max"] = u.Quantity(max_values, unit, copy=False)
agg_table[col + "_min"] = u.Quantity(min_values, unit, copy=False)
agg_table[col + "_mean"] = u.Quantity(means, unit, copy=False)
agg_table[col + "_std"] = u.Quantity(np.sqrt(variance), unit, copy=False)

agg_tables[col] = agg_table
agg_table[col + "_std"] = u.Quantity(stds, unit, copy=False)

return agg_tables
return agg_table
12 changes: 4 additions & 8 deletions ctapipe/io/datawriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,6 @@ def _write_trigger(self, event: ArrayEventContainer):

def _write_r1_telescope_events(self, event: ArrayEventContainer):
for tel_id, r1_tel in event.r1.tel.items():

tel_index = _get_tel_index(event, tel_id)
table_name = self.table_name(tel_id)

Expand All @@ -659,7 +658,6 @@ def _write_r1_telescope_events(self, event: ArrayEventContainer):

def _write_r0_telescope_events(self, event: ArrayEventContainer):
for tel_id, r0_tel in event.r0.tel.items():

tel_index = _get_tel_index(event, tel_id)
table_name = self.table_name(tel_id)

Expand Down Expand Up @@ -726,7 +724,6 @@ def _write_dl1_telescope_events(self, event: ArrayEventContainer):
)

def _write_muon_telescope_events(self, event: ArrayEventContainer):

for tel_id, muon in event.muon.tel.items():
table_name = self.table_name(tel_id)
tel_index = _get_tel_index(event, tel_id)
Expand All @@ -739,11 +736,10 @@ def _write_dl1_aggregates(self, event: ArrayEventContainer):
"""
Write array-event-wise aggregated DL1 image parameters.
"""
for feature_name, container in event.dl1.aggregate.items():
self._writer.write(
table_name=f"dl1/event/aggregate/{feature_name}",
containers=[event.index, container],
)
self._writer.write(
table_name="dl1/event/subarray/aggregated_image_parameters",
containers=[event.index] + list(event.dl1.aggregate.values()),
)

def _write_dl2_telescope_events(self, event: ArrayEventContainer):
"""
Expand Down
63 changes: 1 addition & 62 deletions ctapipe/reco/preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,16 @@
import astropy.units as u
import numpy as np
from astropy.coordinates import AltAz
from astropy.table import QTable, Table
from astropy.table import Table
from numpy.lib.recfunctions import structured_to_unstructured

from ctapipe.coordinates import MissingFrameAttributeWarning, TelescopeFrame

from ..containers import ArrayEventContainer

LOG = logging.getLogger(__name__)


__all__ = [
"check_valid_rows",
"collect_features",
"table_to_float",
"table_to_X",
"horizontal_to_telescope",
Expand Down Expand Up @@ -68,64 +65,6 @@ def table_to_X(table: Table, features: List[str], log=LOG):
return X, valid


def collect_features(
event: ArrayEventContainer, tel_id: int, subarray_table=None
) -> Table:
"""Loop over all containers with features.
Parameters
----------
event : ArrayEventContainer
The event container from which to collect the features
tel_id : int
The telscope id for which to collect the features
subarray_table : Table
The subarray as "to_table("joined")", to be added to the features.
Returns
-------
Table
"""
features = {}

features.update(event.trigger.as_dict(recursive=False, flatten=True))

features.update(
event.dl1.tel[tel_id].parameters.as_dict(
add_prefix=True,
recursive=True,
flatten=True,
)
)

features.update(
event.dl2.tel[tel_id].as_dict(
add_prefix=True,
recursive=True,
flatten=True,
add_key=False, # prefix is already the map key for dl2 stuff
)
)

features.update(
event.dl2.stereo.as_dict(
add_prefix=True,
recursive=True,
flatten=True,
add_key=False, # prefix is already the map key for dl2 stuff
)
)

if subarray_table is not None:
# to include units in features
if not isinstance(subarray_table, QTable):
subarray_table = QTable(subarray_table, copy=False)

features.update(subarray_table.loc[tel_id])

return Table({k: [v] for k, v in features.items()})


@u.quantity_input(alt=u.deg, az=u.deg, pointing_alt=u.deg, pointing_az=u.deg)
def horizontal_to_telescope(alt, az, pointing_alt, pointing_az):
"""Transform coordinates from horizontal coordinates into TelescopeFrame"""
Expand Down
Loading

0 comments on commit aabf989

Please sign in to comment.