diff --git a/docs/changes/2634.feature.rst b/docs/changes/2634.feature.rst new file mode 100644 index 00000000000..f09235fb0bb --- /dev/null +++ b/docs/changes/2634.feature.rst @@ -0,0 +1 @@ +Add ChunkInterpolator to ctapipe.monitoring.interpolation as a tool to select data from chunks. The planned use for this is to select calibration data. diff --git a/src/ctapipe/monitoring/__init__.py b/src/ctapipe/monitoring/__init__.py index cb102f768cf..563ab6d975b 100644 --- a/src/ctapipe/monitoring/__init__.py +++ b/src/ctapipe/monitoring/__init__.py @@ -2,7 +2,14 @@ Module for handling monitoring data. """ from .aggregator import PlainAggregator, SigmaClippingAggregator, StatisticsAggregator -from .interpolation import Interpolator, PointingInterpolator +from .interpolation import ( + ChunkInterpolator, + FlatFieldInterpolator, + LinearInterpolator, + MonitoringInterpolator, + PedestalInterpolator, + PointingInterpolator, +) from .outlier import ( MedianOutlierDetector, OutlierDetector, @@ -18,6 +25,10 @@ "RangeOutlierDetector", "MedianOutlierDetector", "StdOutlierDetector", - "Interpolator", + "MonitoringInterpolator", + "LinearInterpolator", "PointingInterpolator", + "ChunkInterpolator", + "FlatFieldInterpolator", + "PedestalInterpolator", ] diff --git a/src/ctapipe/monitoring/interpolation.py b/src/ctapipe/monitoring/interpolation.py index 84064cbc1a3..1425938efb4 100644 --- a/src/ctapipe/monitoring/interpolation.py +++ b/src/ctapipe/monitoring/interpolation.py @@ -4,82 +4,72 @@ import astropy.units as u import numpy as np import tables +from astropy.table import Table from astropy.time import Time from scipy.interpolate import interp1d from ctapipe.core import Component, traits __all__ = [ - "Interpolator", + "MonitoringInterpolator", + "LinearInterpolator", "PointingInterpolator", + "ChunkInterpolator", ] -class Interpolator(Component, metaclass=ABCMeta): +class MonitoringInterpolator(Component, metaclass=ABCMeta): """ - Interpolator parent class. + MonitoringInterpolator parent class. Parameters ---------- h5file : None | tables.File - A open hdf5 file with read access. + An open hdf5 file with read access. """ - bounds_error = traits.Bool( - default_value=True, - help="If true, raises an exception when trying to extrapolate out of the given table", - ).tag(config=True) - - extrapolate = traits.Bool( - help="If bounds_error is False, this flag will specify whether values outside" - "the available values are filled with nan (False) or extrapolated (True).", - default_value=False, - ).tag(config=True) - - telescope_data_group = None - required_columns = set() - expected_units = {} - - def __init__(self, h5file=None, **kwargs): + def __init__(self, h5file: None | tables.File = None, **kwargs: Any) -> None: super().__init__(**kwargs) if h5file is not None and not isinstance(h5file, tables.File): raise TypeError("h5file must be a tables.File") self.h5file = h5file - self.interp_options: dict[str, Any] = dict(assume_sorted=True, copy=False) - if self.bounds_error: - self.interp_options["bounds_error"] = True - elif self.extrapolate: - self.interp_options["bounds_error"] = False - self.interp_options["fill_value"] = "extrapolate" - else: - self.interp_options["bounds_error"] = False - self.interp_options["fill_value"] = np.nan + @abstractmethod + def __call__(self, tel_id: int, time: Time): + """ + Interpolates monitoring data for a given timestamp - self._interpolators = {} + Parameters + ---------- + tel_id : int + Telescope id. + time : astropy.time.Time + Time for which to interpolate the monitoring data. + + """ + pass @abstractmethod - def add_table(self, tel_id, input_table): + def add_table(self, tel_id: int, input_table: Table) -> None: """ - Add a table to this interpolator + Add a table to this interpolator. This method reads input tables and creates instances of the needed interpolators to be added to _interpolators. The first index of _interpolators needs to be - tel_id, the second needs to be the name of the parameter that is to be interpolated + tel_id, the second needs to be the name of the parameter that is to be interpolated. Parameters ---------- tel_id : int - Telescope id + Telescope id. input_table : astropy.table.Table Table of pointing values, expected columns are always ``time`` as ``Time`` column and - other columns for the data that is to be interpolated + other columns for the data that is to be interpolated. """ - pass - def _check_tables(self, input_table): + def _check_tables(self, input_table: Table) -> None: missing = self.required_columns - set(input_table.colnames) if len(missing) > 0: raise ValueError(f"Table is missing required column(s): {missing}") @@ -98,14 +88,7 @@ def _check_tables(self, input_table): f"{col} must have units compatible with '{self.expected_units[col].name}'" ) - def _check_interpolators(self, tel_id): - if tel_id not in self._interpolators: - if self.h5file is not None: - self._read_parameter_table(tel_id) # might need to be removed - else: - raise KeyError(f"No table available for tel_id {tel_id}") - - def _read_parameter_table(self, tel_id): + def _read_parameter_table(self, tel_id: int) -> None: # prevent circular import between io and monitoring from ..io import read_table @@ -116,32 +99,74 @@ def _read_parameter_table(self, tel_id): self.add_table(tel_id, input_table) -class PointingInterpolator(Interpolator): +class LinearInterpolator(MonitoringInterpolator): + """ + LinearInterpolator parent class. + + Parameters + ---------- + h5file : None | tables.File + An open hdf5 file with read access. + """ + + bounds_error = traits.Bool( + default_value=True, + help="If true, raises an exception when trying to extrapolate out of the given table", + ).tag(config=True) + + extrapolate = traits.Bool( + help="If bounds_error is False, this flag will specify whether values outside" + "the available values are filled with nan (False) or extrapolated (True).", + default_value=False, + ).tag(config=True) + + def __init__(self, h5file: None | tables.File = None, **kwargs: Any) -> None: + super().__init__(h5file, **kwargs) + self._interpolators = {} + self.interp_options: dict[str, Any] = dict(assume_sorted=True, copy=False) + if self.bounds_error: + self.interp_options["bounds_error"] = True + elif self.extrapolate: + self.interp_options["bounds_error"] = False + self.interp_options["fill_value"] = "extrapolate" + else: + self.interp_options["bounds_error"] = False + self.interp_options["fill_value"] = np.nan + + def _check_interpolators(self, tel_id: int) -> None: + if tel_id not in self._interpolators: + if self.h5file is not None: + self._read_parameter_table(tel_id) # might need to be removed + else: + raise KeyError(f"No table available for tel_id {tel_id}") + + +class PointingInterpolator(LinearInterpolator): """ - Interpolator for pointing and pointing correction data + Interpolator for pointing and pointing correction data. """ telescope_data_group = "/dl0/monitoring/telescope/pointing" required_columns = frozenset(["time", "azimuth", "altitude"]) expected_units = {"azimuth": u.rad, "altitude": u.rad} - def __call__(self, tel_id, time): + def __call__(self, tel_id: int, time: Time) -> tuple[u.Quantity, u.Quantity]: """ Interpolate alt/az for given time and tel_id. Parameters ---------- tel_id : int - telescope id + Telescope id. time : astropy.time.Time - time for which to interpolate the pointing + Time for which to interpolate the pointing. Returns ------- altitude : astropy.units.Quantity[deg] - interpolated altitude angle + Interpolated altitude angle. azimuth : astropy.units.Quantity[deg] - interpolated azimuth angle + Interpolated azimuth angle. """ self._check_interpolators(tel_id) @@ -151,20 +176,19 @@ def __call__(self, tel_id, time): alt = u.Quantity(self._interpolators[tel_id]["alt"](mjd), u.rad, copy=False) return alt, az - def add_table(self, tel_id, input_table): + def add_table(self, tel_id: int, input_table: Table) -> None: """ - Add a table to this interpolator + Add a table to this interpolator. Parameters ---------- tel_id : int - Telescope id + Telescope id. input_table : astropy.table.Table Table of pointing values, expected columns are ``time`` as ``Time`` column, ``azimuth`` and ``altitude`` as quantity columns for pointing and pointing correction data. """ - self._check_tables(input_table) if not isinstance(input_table["time"], Time): @@ -184,5 +208,131 @@ def add_table(self, tel_id, input_table): alt = input_table["altitude"].quantity.to_value(u.rad) mjd = input_table["time"].tai.mjd self._interpolators[tel_id] = {} - self._interpolators[tel_id]["az"] = interp1d(mjd, az, **self.interp_options) - self._interpolators[tel_id]["alt"] = interp1d(mjd, alt, **self.interp_options) + self._interpolators[tel_id]["az"] = interp1d( + mjd, az, kind="linear", **self.interp_options + ) + self._interpolators[tel_id]["alt"] = interp1d( + mjd, alt, kind="linear", **self.interp_options + ) + + +class ChunkInterpolator(MonitoringInterpolator): + """ + Simple interpolator for overlapping chunks of data. + """ + + def __init__(self, h5file: None | tables.File = None, **kwargs: Any) -> None: + super().__init__(**kwargs) + self._interpolators = {} + self.start_time = {} + self.end_time = {} + self.values = {} + self.columns = list(self.required_columns) # these will be the data columns + self.columns.remove("start_time") + self.columns.remove("end_time") + + def __call__(self, tel_id: int, time: Time) -> float | dict[str, float]: + """ + Interpolate overlapping chunks of data for a given time, tel_id, and column(s). + + Parameters + ---------- + tel_id : int + Telescope id. + time : astropy.time.Time + Time for which to interpolate the data. + + Returns + ------- + interpolated : float or dict + Interpolated data for the specified column(s). + """ + + if tel_id not in self.values: + self._read_parameter_table(tel_id) + + result = {} + mjd = time.to_value("mjd") + for column in self.columns: + result[column] = self._interpolate_chunk(tel_id, column, mjd) + + if len(result) == 1: + return result[self.columns[0]] + return result + + def add_table(self, tel_id: int, input_table: Table) -> None: + """ + Add a table to this interpolator for specific columns. + + Parameters + ---------- + tel_id : int + Telescope id. + input_table : astropy.table.Table + Table of values to be interpolated, expected columns + are ``start_time`` as ``validity start Time`` column, + ``end_time`` as ``validity end Time`` and the specified columns + for the data of the chunks. + """ + + self._check_tables(input_table) + + input_table = input_table.copy() + input_table.sort("start_time") + + self.values[tel_id] = {} + self.start_time[tel_id] = input_table["start_time"].to_value("mjd") + self.end_time[tel_id] = input_table["end_time"].to_value("mjd") + + for column in self.columns: + self.values[tel_id][column] = input_table[column] + + def _interpolate_chunk(self, tel_id, column, mjd: float) -> float: + """ + Interpolates overlapping chunks of data preferring earlier chunks if valid + + Parameters + ---------- + tel_id : int + tel_id for which data is to be interpolated + mjd : float + Time for which to interpolate the data. + """ + + start_time = self.start_time[tel_id] + end_time = self.end_time[tel_id] + values = self.values[tel_id][column] + # Find the index of the closest preceding start time + preceding_index = np.searchsorted(start_time, mjd, side="right") - 1 + + if preceding_index < 0: + return np.nan + + value = np.nan + + # Check if the time is within the valid range of the chunk + if start_time[preceding_index] <= mjd <= end_time[preceding_index]: + value = values[preceding_index] + + # If an element in the closest preceding chunk has nan, check the next closest chunk + + for i in range(preceding_index - 1, -1, -1): + if start_time[i] <= mjd <= end_time[i]: + if value is np.nan: + value = values[i] + else: + value = np.where(np.isnan(value), values[i], value) + + return value + + +class FlatFieldInterpolator(ChunkInterpolator): + required_columns = frozenset(["start_time", "end_time", "relative_gain"]) + expected_units = {"relative_gain": None} + telescope_data_group = "/dl1/monitoring/telescope/flatfield" + + +class PedestalInterpolator(ChunkInterpolator): + required_columns = frozenset(["start_time", "end_time", "pedestal"]) + expected_units = {"pedestal": None} + telescope_data_group = "/dl1/monitoring/telescope/pedestal" diff --git a/src/ctapipe/monitoring/tests/test_interpolator.py b/src/ctapipe/monitoring/tests/test_interpolator.py index 782aeae7435..0e2f0009ffd 100644 --- a/src/ctapipe/monitoring/tests/test_interpolator.py +++ b/src/ctapipe/monitoring/tests/test_interpolator.py @@ -5,11 +5,124 @@ from astropy.table import Table from astropy.time import Time -from ctapipe.monitoring.interpolation import PointingInterpolator +from ctapipe.monitoring.interpolation import ( + FlatFieldInterpolator, + PedestalInterpolator, + PointingInterpolator, +) t0 = Time("2022-01-01T00:00:00") +def test_chunk_selection(camera_geometry): + table_ff = Table( + { + "start_time": t0 + [0, 1, 2, 6] * u.s, + "end_time": t0 + [2, 3, 4, 8] * u.s, + "relative_gain": [ + np.full((2, len(camera_geometry)), x) for x in [1, 2, 3, 4] + ], + }, + ) + interpolator_ff = FlatFieldInterpolator() + interpolator_ff.add_table(1, table_ff) + + val1 = interpolator_ff(tel_id=1, time=t0 + 1.2 * u.s) + val2 = interpolator_ff(tel_id=1, time=t0 + 1.7 * u.s) + val3 = interpolator_ff(tel_id=1, time=t0 + 2.2 * u.s) + + assert np.all(np.isclose(val1, np.full((2, len(camera_geometry)), 2))) + assert np.all(np.isclose(val2, np.full((2, len(camera_geometry)), 2))) + assert np.all(np.isclose(val3, np.full((2, len(camera_geometry)), 3))) + + table_ped = Table( + { + "start_time": t0 + [0, 1, 2, 6] * u.s, + "end_time": t0 + [2, 3, 4, 8] * u.s, + "pedestal": [np.full((2, len(camera_geometry)), x) for x in [1, 2, 3, 4]], + }, + ) + interpolator_ped = PedestalInterpolator() + interpolator_ped.add_table(1, table_ped) + + val1 = interpolator_ped(tel_id=1, time=t0 + 1.2 * u.s) + val2 = interpolator_ped(tel_id=1, time=t0 + 1.7 * u.s) + val3 = interpolator_ped(tel_id=1, time=t0 + 2.2 * u.s) + + assert np.all(np.isclose(val1, np.full((2, len(camera_geometry)), 2))) + assert np.all(np.isclose(val2, np.full((2, len(camera_geometry)), 2))) + assert np.all(np.isclose(val3, np.full((2, len(camera_geometry)), 3))) + + +def test_nan_switch(camera_geometry): + data = np.array([np.full((2, len(camera_geometry)), x) for x in [1, 2, 3, 4]]) + data[1][0, 0] = 5 + data = np.where( + data > 4, np.nan, data + ) # this is a workaround to introduce a nan in the data + + table_ff = Table( + { + "start_time": t0 + [0, 1, 2, 6] * u.s, + "end_time": t0 + [2, 3, 4, 8] * u.s, + "relative_gain": data, + }, + ) + interpolator_ff = FlatFieldInterpolator() + interpolator_ff.add_table(1, table_ff) + + val = interpolator_ff(tel_id=1, time=t0 + 1.2 * u.s) + + res = np.full((2, len(camera_geometry)), 2) + res[0][ + 0 + ] = 1 # where the nan was introduced before we should now have the value from the earlier chunk + + assert np.all(np.isclose(val, res)) + + table_ped = Table( + { + "start_time": t0 + [0, 1, 2, 6] * u.s, + "end_time": t0 + [2, 3, 4, 8] * u.s, + "pedestal": data, + }, + ) + interpolator_ped = PedestalInterpolator() + interpolator_ped.add_table(1, table_ped) + + val = interpolator_ped(tel_id=1, time=t0 + 1.2 * u.s) + + assert np.all(np.isclose(val, res)) + + +def test_no_valid_chunk(): + table_ff = Table( + { + "start_time": t0 + [0, 1, 2, 6] * u.s, + "end_time": t0 + [2, 3, 4, 8] * u.s, + "relative_gain": [1, 2, 3, 4], + }, + ) + interpolator_ff = FlatFieldInterpolator() + interpolator_ff.add_table(1, table_ff) + + val = interpolator_ff(tel_id=1, time=t0 + 5.2 * u.s) + assert np.isnan(val) + + table_ped = Table( + { + "start_time": t0 + [0, 1, 2, 6] * u.s, + "end_time": t0 + [2, 3, 4, 8] * u.s, + "pedestal": [1, 2, 3, 4], + }, + ) + interpolator_ped = PedestalInterpolator() + interpolator_ped.add_table(1, table_ped) + + val = interpolator_ped(tel_id=1, time=t0 + 5.2 * u.s) + assert np.isnan(val) + + def test_azimuth_switchover(): """Test pointing interpolation"""