Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chunk interpolation to select calibration data #2634

Open
wants to merge 30 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
ed3c86b
Adding the chunk function
Oct 29, 2024
15efaeb
Reverting to the previous implementation
Oct 29, 2024
050f404
Changing some variables
Oct 29, 2024
11ef1e2
Chaning to using scipy functions
Oct 30, 2024
7c08829
fixing docustring
Oct 30, 2024
c674bcc
Updating docustrings further
Oct 30, 2024
7bd017d
simplifying chunk interpolation
Oct 31, 2024
7504c20
Refactor ChunkInterpolator and its tests
mexanick Oct 31, 2024
9766719
adding changelog
Oct 31, 2024
f22a0c0
renaming changelog
Oct 31, 2024
366a3f3
Changing inheritance scheme
Nov 22, 2024
386faa8
reverting some tests
Nov 22, 2024
e5e2fde
documentation change
Nov 22, 2024
33377e5
fixing issue with class variable
Nov 25, 2024
e735ec1
implementing reviewer comment
Nov 27, 2024
8a0c213
moving some instance variales to class variables
Dec 2, 2024
3bad626
removing unnecessary class variables in parent classes
Dec 3, 2024
eaf34e2
moving ChunkInterpolator variables and making them mutable at first
Dec 4, 2024
53ec341
removing provate data definition from parent class
Dec 4, 2024
e7aa23d
moving a variable
Dec 4, 2024
c3da3d8
putting required units on ChunkInterpolator
Dec 4, 2024
3c1f685
implementing reviewer comment
Dec 4, 2024
c2530e4
making required_columns an instance variable
Dec 9, 2024
9a899df
making subclasses to ChunkInterpolator
Dec 9, 2024
0da6c5f
simplifying start_time and end_time
Dec 9, 2024
cd35c8e
adding data groups
Dec 10, 2024
4725326
adding child classes, making chunk function take arrays
Dec 12, 2024
7ac13d9
making the nan switch test check if the switch is done element-wise
Dec 12, 2024
3b8b4a0
adding imports to __init__
Dec 13, 2024
56afd67
Simplify logic in ChunkInterpolator
mexanick Jan 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/changes/2634.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add ChunkInterpolator to ctapipe.monitoring.interpolation as a tool to select data from chunks. The planned use for this is to select calibration data.
15 changes: 13 additions & 2 deletions src/ctapipe/monitoring/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,14 @@
Module for handling monitoring data.
"""
from .aggregator import PlainAggregator, SigmaClippingAggregator, StatisticsAggregator
from .interpolation import Interpolator, PointingInterpolator
from .interpolation import (
ChunkInterpolator,
ctoennis marked this conversation as resolved.
Show resolved Hide resolved
FlatFieldInterpolator,
LinearInterpolator,
MonitoringInterpolator,
PedestalInterpolator,
PointingInterpolator,
)
from .outlier import (
MedianOutlierDetector,
OutlierDetector,
Expand All @@ -18,6 +25,10 @@
"RangeOutlierDetector",
"MedianOutlierDetector",
"StdOutlierDetector",
"Interpolator",
"MonitoringInterpolator",
"LinearInterpolator",
"PointingInterpolator",
"ChunkInterpolator",
"FlatFieldInterpolator",
"PedestalInterpolator",
]
266 changes: 208 additions & 58 deletions src/ctapipe/monitoring/interpolation.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,82 +4,72 @@
import astropy.units as u
import numpy as np
import tables
from astropy.table import Table
from astropy.time import Time
from scipy.interpolate import interp1d

from ctapipe.core import Component, traits

__all__ = [
"Interpolator",
"MonitoringInterpolator",
"LinearInterpolator",
"PointingInterpolator",
"ChunkInterpolator",
]


class Interpolator(Component, metaclass=ABCMeta):
class MonitoringInterpolator(Component, metaclass=ABCMeta):
"""
Interpolator parent class.
MonitoringInterpolator parent class.

Parameters
----------
h5file : None | tables.File
A open hdf5 file with read access.
An open hdf5 file with read access.
"""

bounds_error = traits.Bool(
default_value=True,
help="If true, raises an exception when trying to extrapolate out of the given table",
).tag(config=True)

extrapolate = traits.Bool(
help="If bounds_error is False, this flag will specify whether values outside"
"the available values are filled with nan (False) or extrapolated (True).",
default_value=False,
).tag(config=True)

telescope_data_group = None
required_columns = set()
expected_units = {}

def __init__(self, h5file=None, **kwargs):
def __init__(self, h5file: None | tables.File = None, **kwargs: Any) -> None:
super().__init__(**kwargs)

if h5file is not None and not isinstance(h5file, tables.File):
raise TypeError("h5file must be a tables.File")
self.h5file = h5file

self.interp_options: dict[str, Any] = dict(assume_sorted=True, copy=False)
if self.bounds_error:
self.interp_options["bounds_error"] = True
elif self.extrapolate:
self.interp_options["bounds_error"] = False
self.interp_options["fill_value"] = "extrapolate"
else:
self.interp_options["bounds_error"] = False
self.interp_options["fill_value"] = np.nan
@abstractmethod
def __call__(self, tel_id: int, time: Time):
"""
Interpolates monitoring data for a given timestamp

self._interpolators = {}
Parameters
----------
tel_id : int
Telescope id.
time : astropy.time.Time
Time for which to interpolate the monitoring data.

"""
pass

@abstractmethod
def add_table(self, tel_id, input_table):
def add_table(self, tel_id: int, input_table: Table) -> None:
"""
Add a table to this interpolator
Add a table to this interpolator.
This method reads input tables and creates instances of the needed interpolators
to be added to _interpolators. The first index of _interpolators needs to be
tel_id, the second needs to be the name of the parameter that is to be interpolated
tel_id, the second needs to be the name of the parameter that is to be interpolated.

Parameters
----------
tel_id : int
Telescope id
Telescope id.
input_table : astropy.table.Table
Table of pointing values, expected columns
are always ``time`` as ``Time`` column and
other columns for the data that is to be interpolated
other columns for the data that is to be interpolated.
"""

pass

def _check_tables(self, input_table):
def _check_tables(self, input_table: Table) -> None:
missing = self.required_columns - set(input_table.colnames)
if len(missing) > 0:
raise ValueError(f"Table is missing required column(s): {missing}")
Expand All @@ -98,14 +88,7 @@ def _check_tables(self, input_table):
f"{col} must have units compatible with '{self.expected_units[col].name}'"
)

def _check_interpolators(self, tel_id):
if tel_id not in self._interpolators:
if self.h5file is not None:
self._read_parameter_table(tel_id) # might need to be removed
else:
raise KeyError(f"No table available for tel_id {tel_id}")

def _read_parameter_table(self, tel_id):
def _read_parameter_table(self, tel_id: int) -> None:
# prevent circular import between io and monitoring
from ..io import read_table

Expand All @@ -116,32 +99,74 @@ def _read_parameter_table(self, tel_id):
self.add_table(tel_id, input_table)


class PointingInterpolator(Interpolator):
class LinearInterpolator(MonitoringInterpolator):
"""
LinearInterpolator parent class.

Parameters
----------
h5file : None | tables.File
An open hdf5 file with read access.
"""

bounds_error = traits.Bool(
default_value=True,
help="If true, raises an exception when trying to extrapolate out of the given table",
).tag(config=True)

extrapolate = traits.Bool(
help="If bounds_error is False, this flag will specify whether values outside"
"the available values are filled with nan (False) or extrapolated (True).",
default_value=False,
).tag(config=True)

def __init__(self, h5file: None | tables.File = None, **kwargs: Any) -> None:
super().__init__(h5file, **kwargs)
self._interpolators = {}
self.interp_options: dict[str, Any] = dict(assume_sorted=True, copy=False)
if self.bounds_error:
self.interp_options["bounds_error"] = True
elif self.extrapolate:
self.interp_options["bounds_error"] = False
self.interp_options["fill_value"] = "extrapolate"
else:
self.interp_options["bounds_error"] = False
self.interp_options["fill_value"] = np.nan

def _check_interpolators(self, tel_id: int) -> None:
if tel_id not in self._interpolators:
if self.h5file is not None:
self._read_parameter_table(tel_id) # might need to be removed
else:
raise KeyError(f"No table available for tel_id {tel_id}")


class PointingInterpolator(LinearInterpolator):
mexanick marked this conversation as resolved.
Show resolved Hide resolved
"""
Interpolator for pointing and pointing correction data
Interpolator for pointing and pointing correction data.
"""

telescope_data_group = "/dl0/monitoring/telescope/pointing"
required_columns = frozenset(["time", "azimuth", "altitude"])
expected_units = {"azimuth": u.rad, "altitude": u.rad}

def __call__(self, tel_id, time):
def __call__(self, tel_id: int, time: Time) -> tuple[u.Quantity, u.Quantity]:
"""
Interpolate alt/az for given time and tel_id.

Parameters
----------
tel_id : int
telescope id
Telescope id.
time : astropy.time.Time
time for which to interpolate the pointing
Time for which to interpolate the pointing.

Returns
-------
altitude : astropy.units.Quantity[deg]
interpolated altitude angle
Interpolated altitude angle.
azimuth : astropy.units.Quantity[deg]
interpolated azimuth angle
Interpolated azimuth angle.
"""

self._check_interpolators(tel_id)
Expand All @@ -151,20 +176,19 @@ def __call__(self, tel_id, time):
alt = u.Quantity(self._interpolators[tel_id]["alt"](mjd), u.rad, copy=False)
return alt, az

def add_table(self, tel_id, input_table):
def add_table(self, tel_id: int, input_table: Table) -> None:
"""
Add a table to this interpolator
Add a table to this interpolator.

Parameters
----------
tel_id : int
Telescope id
Telescope id.
input_table : astropy.table.Table
Table of pointing values, expected columns
are ``time`` as ``Time`` column, ``azimuth`` and ``altitude``
as quantity columns for pointing and pointing correction data.
"""

self._check_tables(input_table)

if not isinstance(input_table["time"], Time):
Expand All @@ -184,5 +208,131 @@ def add_table(self, tel_id, input_table):
alt = input_table["altitude"].quantity.to_value(u.rad)
mjd = input_table["time"].tai.mjd
self._interpolators[tel_id] = {}
self._interpolators[tel_id]["az"] = interp1d(mjd, az, **self.interp_options)
self._interpolators[tel_id]["alt"] = interp1d(mjd, alt, **self.interp_options)
self._interpolators[tel_id]["az"] = interp1d(
mjd, az, kind="linear", **self.interp_options
)
self._interpolators[tel_id]["alt"] = interp1d(
mjd, alt, kind="linear", **self.interp_options
)


class ChunkInterpolator(MonitoringInterpolator):
"""
Simple interpolator for overlapping chunks of data.
"""

def __init__(self, h5file: None | tables.File = None, **kwargs: Any) -> None:
super().__init__(**kwargs)
self._interpolators = {}
self.start_time = {}
self.end_time = {}
self.values = {}
self.columns = list(self.required_columns) # these will be the data columns
self.columns.remove("start_time")
self.columns.remove("end_time")

def __call__(self, tel_id: int, time: Time) -> float | dict[str, float]:
"""
Interpolate overlapping chunks of data for a given time, tel_id, and column(s).

Parameters
----------
tel_id : int
Telescope id.
time : astropy.time.Time
Time for which to interpolate the data.

Returns
-------
interpolated : float or dict
Interpolated data for the specified column(s).
"""

if tel_id not in self.values:
self._read_parameter_table(tel_id)

result = {}
mjd = time.to_value("mjd")
for column in self.columns:
result[column] = self._interpolate_chunk(tel_id, column, mjd)

if len(result) == 1:
return result[self.columns[0]]
return result

def add_table(self, tel_id: int, input_table: Table) -> None:
"""
Add a table to this interpolator for specific columns.

Parameters
----------
tel_id : int
Telescope id.
input_table : astropy.table.Table
Table of values to be interpolated, expected columns
are ``start_time`` as ``validity start Time`` column,
``end_time`` as ``validity end Time`` and the specified columns
for the data of the chunks.
"""

self._check_tables(input_table)

input_table = input_table.copy()
input_table.sort("start_time")

self.values[tel_id] = {}
self.start_time[tel_id] = input_table["start_time"].to_value("mjd")
self.end_time[tel_id] = input_table["end_time"].to_value("mjd")

for column in self.columns:
self.values[tel_id][column] = input_table[column]

def _interpolate_chunk(self, tel_id, column, mjd: float) -> float:
"""
Interpolates overlapping chunks of data preferring earlier chunks if valid

Parameters
----------
tel_id : int
tel_id for which data is to be interpolated
mjd : float
Time for which to interpolate the data.
"""

start_time = self.start_time[tel_id]
end_time = self.end_time[tel_id]
values = self.values[tel_id][column]
# Find the index of the closest preceding start time
preceding_index = np.searchsorted(start_time, mjd, side="right") - 1

if preceding_index < 0:
return np.nan

value = np.nan

# Check if the time is within the valid range of the chunk
if start_time[preceding_index] <= mjd <= end_time[preceding_index]:
value = values[preceding_index]

# If an element in the closest preceding chunk has nan, check the next closest chunk

for i in range(preceding_index - 1, -1, -1):
if start_time[i] <= mjd <= end_time[i]:
if value is np.nan:
value = values[i]
else:
value = np.where(np.isnan(value), values[i], value)

return value


class FlatFieldInterpolator(ChunkInterpolator):
required_columns = frozenset(["start_time", "end_time", "relative_gain"])
ctoennis marked this conversation as resolved.
Show resolved Hide resolved
expected_units = {"relative_gain": None}
telescope_data_group = "/dl1/monitoring/telescope/flatfield"


class PedestalInterpolator(ChunkInterpolator):
required_columns = frozenset(["start_time", "end_time", "pedestal"])
expected_units = {"pedestal": None}
telescope_data_group = "/dl1/monitoring/telescope/pedestal"
Loading