diff --git a/docs/source/observations.rst b/docs/source/observations.rst index 9afe6798..48b4e4ef 100644 --- a/docs/source/observations.rst +++ b/docs/source/observations.rst @@ -97,8 +97,15 @@ With this memory layout, typical operations look like this:: Parallel applications --------------------- -The only work that the :class:`.Observation` class actually does is handling -parallelism. ``obs.tod`` can be distributed over a +The :class:`.Observation` class allows the distribution of ``obs.tod`` over multiple MPI +processes to enable the parallelization of computations. The distribution of ``obs.tod`` +can be achieved in two different ways: + +1. Uniform distribution of detectors along the detector axis +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +With ``n_blocks_det`` and ``n_blocks_time`` arguments of :class:`.Observation` class, +the ``obs.tod`` is evenly distributed over a ``n_blocks_det`` by ``n_blocks_time`` grid of MPI ranks. The blocks can be changed at run-time. @@ -111,7 +118,7 @@ The main advantage is that the example operations in the Serial section are achieved with the same lines of code. The price to pay is that you have to set detector properties with special methods. -:: +.. code-block:: python import litebird_sim as lbs from mpi4py import MPI @@ -158,21 +165,222 @@ TOD) gets distributed. .. image:: ./images/observation_data_distribution.png -When ``n_blocks_det != 1``, keep in mind that ``obs.tod[0]`` or -``obs.wn_levels[0]`` are quantities of the first *local* detector, not global. -This should not be a problem as the only thing that matters is that the two -quantities refer to the same detector. If you need the global detector index, -you can get it with ``obs.det_idx[0]``, which is created -at construction time. - -To get a better understanding of how observations are being used in a -MPI simulation, use the method :meth:`.Simulation.describe_mpi_distribution`. -This method must be called *after* the observations have been allocated using -:meth:`.Simulation.create_observations`; it will return an instance of the -class :class:`.MpiDistributionDescr`, which can be inspected to determine -which detectors and time spans are covered by each observation in all the -MPI processes that are being used. For more information, refer to the Section -:ref:`simulations`. +2. Custom grouping of detectors along the detector axis +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +While uniform distribution of detectors along the detector axis optimizes load +balancing, it is less suitable for simulating the some effects, like crosstalk and +noise correlation between the detectors. This uniform distribution across MPI +processes necessitates the transfer of large TOD arrays across multiple MPI processes, +which complicates the code implementation and may potentially lead to significant +performance overhead. To save us from this situation, the :class:`Observation` class +accepts an argument ``det_blocks_attributes`` that is a list of string objects +specifying the detector attributes to create the group of detectors. Once the +detector groups are made, the detectors are distributed to the MPI processes in such +a way that all the detectors of a group reside on the same MPI process. + +If a valid ``det_blocks_attributes`` argument is passed to the :class:`Observation` +class, the arguments ``n_blocks_det`` and ``n_blocks_time`` are ignored. Since the +``det_blocks_attributes`` creates the detector blocks dynamically, the +``n_blocks_time`` is computed during runtime using the size of MPI communicator and +the number of detector blocks (``n_blocks_time = comm.size // n_blocks_det``). + +The detector blocks made in this way can be accessed with +``Observation.detector_blocks``. It is a dictionary object has the tuple of +``det_blocks_attributes`` values as dictionary keys and the list of detectors +corresponding to the key as dictionary values. This dictionary is sorted so that the +group with the largest number of detectors comes first and the one with +the fewest detectors comes last. + +The following example illustrates the distribution of ``obs.tod`` matrix across the +MPI processes when ``det_blocks_attributes`` is specified. + +.. code-block:: python + + import litebird_sim as lbs + + comm = lbs.MPI_COMM_WORLD + + start_time = 456 + duration_s = 100 + sampling_freq_Hz = 1 + + # Creating a list of detectors. + dets = [ + lbs.DetectorInfo( + name="channel1_w9_detA", + wafer="wafer_9", + channel="channel1", + sampling_rate_hz=sampling_freq_Hz, + ), + lbs.DetectorInfo( + name="channel1_w3_detB", + wafer="wafer_3", + channel="channel1", + sampling_rate_hz=sampling_freq_Hz, + ), + lbs.DetectorInfo( + name="channel1_w1_detC", + wafer="wafer_1", + channel="channel1", + sampling_rate_hz=sampling_freq_Hz, + ), + lbs.DetectorInfo( + name="channel1_w1_detD", + wafer="wafer_1", + channel="channel1", + sampling_rate_hz=sampling_freq_Hz, + ), + lbs.DetectorInfo( + name="channel2_w4_detA", + wafer="wafer_4", + channel="channel2", + sampling_rate_hz=sampling_freq_Hz, + ), + lbs.DetectorInfo( + name="channel2_w4_detB", + wafer="wafer_4", + channel="channel2", + sampling_rate_hz=sampling_freq_Hz, + ), + ] + + # Initializing a simulation + sim = lbs.Simulation( + start_time=start_time, + duration_s=duration_s, + random_seed=12345, + mpi_comm=comm, + ) + + # Creating the observations with detector blocks + sim.create_observations( + detectors=dets, + split_list_over_processes=False, + num_of_obs_per_detector=3, + det_blocks_attributes=["channel"], # case 1 and 2 + # det_blocks_attributes=["channel", "wafer"] # case 3 + ) + +With the list of detectors defined in the code snippet above, we can see how the +detectors axis and time axis is divided depending on the size of MPI communicator and +``det_blocks_attributes``. + +**Case 1** + +*Size of MPI communicator = 3*, ``det_blocks_attributes=["channel"]`` + +:: + + Detector axis ---> + Two blocks ---> + +------------------+ +------------------+ + | Rank 0 | | Rank 1 | + +------------------+ +------------------+ + | channel1_w9_detA | | channel2_w4_detA | + T + + + + + i O | channel1_w3_detB | | channel2_w4_detB | + m n + + +------------------+ + e e | channel1_w1_detC | + + + + a b | channel1_w1_detD | + x l +------------------+ + i o + s c ........................................... + k + | | +------------------+ + | | | Rank 2 | + ⋎ ⋎ +------------------+ + | (Unused) | + +------------------+ + +**Case 2** + +*Size of MPI communicator = 4*, ``det_blocks_attributes=["channel"]`` + +:: + + Detector axis ---> + Two blocks ---> + +------------------+ +------------------+ + | Rank 0 | | Rank 2 | + +------------------+ +------------------+ + | channel1_w9_detA | | channel2_w4_detA | + + + + + + | channel1_w3_detB | | channel2_w4_detB | + T + + +------------------+ + i T | channel1_w1_detC | + m w + + + e o | channel1_w1_detD | + +------------------+ + a b + x l ........................................... + i o + s c +------------------+ +------------------+ + k | Rank 1 | | Rank 3 | + | | +------------------+ +------------------+ + | | | channel1_w9_detA | | channel2_w4_detA | + ⋎ ⋎ + + + + + | channel1_w3_detB | | channel2_w4_detB | + + + +------------------+ + | channel1_w1_detC | + + + + | channel1_w1_detD | + +------------------+ + +**Case 3** + +*Size of MPI communicator = 10*, ``det_blocks_attributes=["channel", "wafer"]`` + +:: + + Detector axis ---> + Four blocks ---> + +------------------+ +------------------+ +------------------+ +------------------+ + | Rank 0 | | Rank 2 | | Rank 4 | | Rank 6 | + +------------------+ +------------------+ +------------------+ +------------------+ + T | channel1_w1_detC | | channel2_w4_detA | | channel1_w9_detA | | channel1_w3_detB | + i T + + + + +------------------+ +------------------+ + m w | channel1_w1_detD | | channel2_w4_detB | + e o +------------------+ +------------------+ + + ......................................................................................... + + a b +------------------+ +------------------+ +------------------+ +------------------+ + x l | Rank 1 | | Rank 3 | | Rank 5 | | Rank 7 | + i o +------------------+ +------------------+ +------------------+ +------------------+ + s c | channel1_w1_detC | | channel2_w4_detA | | channel1_w9_detA | | channel1_w3_detB | + k + + + + +------------------+ +------------------+ + | | | channel1_w1_detD | | channel2_w4_detB | + | | +------------------+ +------------------+ + ⋎ ⋎ + ......................................................................................... + + +------------------+ +------------------+ + | Rank 8 | | Rank 9 | + +------------------+ +------------------+ + | (Unused) | | (Unused) | + +------------------+ +------------------+ + +.. note:: + When ``n_blocks_det != 1``, keep in mind that ``obs.tod[0]`` or + ``obs.wn_levels[0]`` are quantities of the first *local* detector, not global. + This should not be a problem as the only thing that matters is that the two + quantities refer to the same detector. If you need the global detector index, + you can get it with ``obs.det_idx[0]``, which is created at construction time. + ``obs.det_idx`` stores the detector indices of the detectors available to an + :class:`Observation` class, with respect to the list of detectors stored in + ``obs.detectors_global`` variable. + +.. note:: + To get a better understanding of how observations are being used in a + MPI simulation, use the method :meth:`.Simulation.describe_mpi_distribution`. + This method must be called *after* the observations have been allocated using + :meth:`.Simulation.create_observations`; it will return an instance of the + class :class:`.MpiDistributionDescr`, which can be inspected to determine + which detectors and time spans are covered by each observation in all the + MPI processes that are being used. For more information, refer to the Section + :ref:`simulations`. Other notable functionalities ----------------------------- diff --git a/litebird_sim/detectors.py b/litebird_sim/detectors.py index a489c13b..f2d0d981 100644 --- a/litebird_sim/detectors.py +++ b/litebird_sim/detectors.py @@ -75,6 +75,9 @@ class DetectorInfo: - channel (Union[str, None]): The channel. The default is None + - squid (Union[int, None]): The squid number of the detector. + The default value is None. + - sampling_rate_hz (float): The sampling rate of the ADC associated with this detector. The default is 0.0 @@ -136,6 +139,7 @@ class DetectorInfo: pixel: Union[int, None] = None pixtype: Union[str, None] = None channel: Union[str, None] = None + squid: Union[int, None] = None sampling_rate_hz: float = 0.0 fwhm_arcmin: float = 0.0 ellipticity: float = 0.0 @@ -148,8 +152,6 @@ class DetectorInfo: fknee_mhz: float = 0.0 fmin_hz: float = 0.0 alpha: float = 0.0 - bandcenter_ghz: float = 0.0 - bandwidth_ghz: float = 0.0 pol: Union[str, None] = None orient: Union[str, None] = None quat: Any = None @@ -175,6 +177,7 @@ def from_dict(dictionary: Dict[str, Any]): - ``pixel`` - ``pixtype`` - ``channel`` + - ``squid`` - ``bandcenter_ghz`` - ``bandwidth_ghz`` - ``band_freqs_ghz`` diff --git a/litebird_sim/distribute.py b/litebird_sim/distribute.py index 3649a877..ef893d82 100644 --- a/litebird_sim/distribute.py +++ b/litebird_sim/distribute.py @@ -50,7 +50,7 @@ def distribute_evenly(num_of_elements, num_of_groups): # If leftovers == 0, then the number of elements is divided evenly # by num_of_groups, and the solution is trivial. If it's not, then - # each of the "leftoverss" is placed in one of the first groups. + # each of the "leftovers" is placed in one of the first groups. # # Example: let's split 8 elements in 3 groups. In this case, # base_length=2 and leftovers=2 (elements #7 and #8): @@ -68,7 +68,7 @@ def distribute_evenly(num_of_elements, num_of_groups): cur_length = base_length + 1 cur_pos = cur_length * i else: - # No need to accomodate for leftovers, but consider their + # No need to accommodate for leftovers, but consider their # presence in fixing the starting position for this group cur_length = base_length cur_pos = base_length * i + leftovers @@ -84,6 +84,47 @@ def distribute_evenly(num_of_elements, num_of_groups): return result +def distribute_detector_blocks(detector_blocks): + """Similar to the :func:`distribute_evenly()` function, this function + returns a list of named-tuples, with fields `start_idx` (the starting + index of the detector in a group within the global list of detectors) and + num_of_elements` (the number of detectors in the group). Unlike + :func:`distribute_evenly()`, this function simply uses the detector groups + given in the `detector_blocks` attribute. + + Example: + Following the example given in + :meth:`litebird_sim.Observation._make_detector_blocks()`, + `distribute_detector_blocks()` will return + + ``` + [ + Span(start_idx=0, num_of_elements=2), + Span(start_idx=2, num_of_elements=2), + Span(start_idx=4, num_of_elements=1), + ] + ``` + + Args: + detector_blocks (dict): The detector block object. See :meth:`litebird_sim.Observation._make_detector_blocks()`. + + Returns: + A list of 2-elements named-tuples containing (1) the starting index of + the detectors of the block with respect to the flatten list of entire + detector blocks and (2) the number of elements in the detector block. + """ + cur_position = 0 + prev_length = 0 + result = [] + for key in detector_blocks: + cur_length = len(detector_blocks[key]) + cur_position += prev_length + prev_length = cur_length + result.append(Span(start_idx=cur_position, num_of_elements=cur_length)) + + return result + + # The following implementation of the painter's partition problem is # heavily inspired by the code at # https://www.geeksforgeeks.org/painters-partition-problem-set-2/?ref=rp diff --git a/litebird_sim/observations.py b/litebird_sim/observations.py index 7997de56..7485a940 100644 --- a/litebird_sim/observations.py +++ b/litebird_sim/observations.py @@ -2,13 +2,17 @@ from dataclasses import dataclass from typing import Union, List, Any, Optional +import numbers import astropy.time import numpy as np import numpy.typing as npt +from collections import defaultdict + from .coordinates import DEFAULT_TIME_SCALE -from .distribute import distribute_evenly +from .distribute import distribute_evenly, distribute_detector_blocks +from .detectors import DetectorInfo @dataclass @@ -80,11 +84,20 @@ class Observation: sampling_rate_hz (float): The sampling frequency, in Hertz. + det_blocks_attributes (list of strings): The list of detector + attributes that will be used to divide the detector axis of the + tod matrix and all its attributes. For example, with + ``det_blocks_attributes = ["wafer", "pixel"]``, the detectors will + be divided into blocks such that all detectors in a block will + have the same ``wafer`` and ``pixel`` attribute. + n_blocks_det (int): divide the detector axis of the tod (and all the - arrays of detector attributes) in `n_blocks_det` blocks + arrays of detector attributes) in `n_blocks_det` blocks. It will + be ignored if ``det_blocks_attributes`` is not `None`. n_blocks_time (int): divide the time axis of the tod in - `n_blocks_time` blocks + `n_blocks_time` blocks. It will be ignored + if ``det_blocks_attributes`` is not `None`. comm: either `None` (do not use MPI) or a MPI communicator object, like `mpi4py.MPI.COMM_WORLD`. Its size is required to be at @@ -103,6 +116,7 @@ def __init__( sampling_rate_hz: float, allocate_tod=True, tods=None, + det_blocks_attributes: Union[List[str], None] = None, n_blocks_det=1, n_blocks_time=1, comm=None, @@ -123,17 +137,24 @@ def __init__( delta = 1.0 / sampling_rate_hz self.end_time_global = start_time_global + n_samples_global * delta + self._sampling_rate_hz = sampling_rate_hz + self._det_blocks_attributes = det_blocks_attributes + self.detector_blocks = None + if isinstance(detectors, int): self._n_detectors_global = detectors else: - if comm and comm.size > 1: - self._n_detectors_global = comm.bcast(len(detectors), root) + if self.comm and self.comm.size > 1: + self._n_detectors_global = self.comm.bcast(len(detectors), root) + + if self._det_blocks_attributes is not None: + n_blocks_det, n_blocks_time = self._make_detector_blocks( + detectors, self.comm + ) else: self._n_detectors_global = len(detectors) - self._sampling_rate_hz = sampling_rate_hz - - # Neme of the attributes that store an array with the value of a + # Name of the attributes that store an array with the value of a # property for each of the (local) detectors self._attr_det_names = [] self._check_blocks(n_blocks_det, n_blocks_time) @@ -159,8 +180,17 @@ def __init__( setattr(self, cur_tod.name, None) self.setattr_det_global("det_idx", np.arange(self._n_detectors_global), root) + + self.detectors_global = [] + + if self.detector_blocks is not None: + for key in self.detector_blocks: + self.detectors_global += self.detector_blocks[key] + else: + self.detectors_global = detectors + if not isinstance(detectors, int): - self._set_attributes_from_list_of_dict(detectors, root) + self._set_attributes_from_list_of_dict(self.detectors_global, root) ( self.start_time, @@ -176,7 +206,10 @@ def sampling_rate_hz(self): @property def n_detectors(self): - return len(self.det_idx) + if self.det_idx is None: + return 0 + else: + return len(self.det_idx) def _get_local_start_time_start_and_n_samples(self): _, _, start, num = self._get_start_and_num( @@ -203,7 +236,7 @@ def _get_local_start_time_start_and_n_samples(self): return self.start_time_global + start * delta, start, num def _set_attributes_from_list_of_dict(self, list_of_dict, root): - assert len(list_of_dict) == self.n_detectors_global + np.testing.assert_equal(len(list_of_dict), self.n_detectors_global) # Turn list of dict into dict of arrays if not self.comm or self.comm.rank == root: @@ -273,10 +306,86 @@ def n_blocks_time(self): def n_blocks_det(self): return self._n_blocks_det + def _make_detector_blocks(self, detectors, comm): + """This function distributes the detectors in groups such that each + group has the same set of attributes specified by the strings in + `self._det_block_attributes`. Once the groups are made, the number of + detector blocks is set to be the total number of detector groups, + whereas the number of time blocks is computed using the number of + detector blocks and the size of `comm` communicator. + + The detector blocks are stored in `self.detector_blocks`. This + dictionary object has the tuple of `self._det_blocks_attributes` values + as dictionary keys and the list of detectors corresponding to the key + as dictionary values. This dictionary is sorted so that the + group with the largest number of detectors comes first and the one with + the fewest detectors comes last. + + Example: + For + + ``` + detectors = [ + "000_002_123_xx_140_x", + "000_005_321_xx_140_x", + "000_004_456_xx_119_x", + "000_002_654_xx_140_x", + "000_004_789_xx_119_x", + ] + ``` + + and `self._det_blocks_attributes = ["channel", "wafer"]`, + `_make_detector_blocks()` will set + + ``` + self.detector_blocks = { + ("140", "L02"): ["000_002_123_xx_140_x", "000_002_654_xx_140_x"], + ("119", "L04"): ["000_004_456_xx_119_x", "000_004_789_xx_119_x"], + ("140", "L05"): ["000_005_321_xx_140_x"], + } + ``` + + and return `n_blocks_det = 3` + + Args: + detectors (List[dict]): List of detectors + + comm: The MPI communicator + + Returns: + n_blocks_det (int): Number of detector blocks + + n_blocks_time (int): Number of time blocks + + """ + self.detector_blocks = defaultdict(list) + for det in detectors: + key = tuple(det[attribute] for attribute in self._det_blocks_attributes) + self.detector_blocks[key].append(det) + + self.detector_blocks = dict( + sorted( + self.detector_blocks.items(), + key=lambda item: len(item[1]), + reverse=True, + ) + ) + n_blocks_det = len(self.detector_blocks) + n_blocks_time = comm.size // n_blocks_det + + return n_blocks_det, n_blocks_time + def _check_blocks(self, n_blocks_det, n_blocks_time): if self.comm is None: if n_blocks_det != 1 or n_blocks_time != 1: raise ValueError("Only one block allowed without an MPI comm") + elif n_blocks_det == 0 or n_blocks_time == 0: + raise ValueError( + "The number of detector blocks and the number of time blocks " + "must be must be non-zero\n" + f"n_blocks_det = {n_blocks_det}, " + f"n_blocks_time = {n_blocks_time}" + ) elif n_blocks_det > self.n_detectors_global: raise ValueError( "You can not have more detector blocks than detectors " @@ -296,21 +405,33 @@ def _check_blocks(self, n_blocks_det, n_blocks_time): def _get_start_and_num(self, n_blocks_det, n_blocks_time): """For both detectors and time, returns the starting (global) - index and lenght of each block if the number of blocks is changed to the + index and length of each block if the number of blocks is changed to the values passed as arguments """ - det_start, det_n = np.array( - [ - [span.start_idx, span.num_of_elements] - for span in distribute_evenly(self._n_detectors_global, n_blocks_det) - ] - ).T + if self._det_blocks_attributes is None or self.comm.size == 1: + det_start, det_n = np.array( + [ + [span.start_idx, span.num_of_elements] + for span in distribute_evenly( + self._n_detectors_global, n_blocks_det + ) + ] + ).T + else: + det_start, det_n = np.array( + [ + [span.start_idx, span.num_of_elements] + for span in distribute_detector_blocks(self.detector_blocks) + ] + ).T + time_start, time_n = np.array( [ [span.start_idx, span.num_of_elements] for span in distribute_evenly(self._n_samples_global, n_blocks_time) ] ).T + return ( np.array(det_start), np.array(det_n), @@ -327,6 +448,7 @@ def _get_tod_shape(self, n_blocks_det, n_blocks_time): return (self._n_detectors_global, self._n_samples_global) _, det_n, _, time_n = self._get_start_and_num(n_blocks_det, n_blocks_time) + try: return ( det_n[self.comm.rank // n_blocks_time], @@ -553,7 +675,10 @@ def setattr_det_global(self, name, info, root=0): is_in_grid = self.comm.rank < self._n_blocks_det * self._n_blocks_time comm_grid = self.comm.Split(int(is_in_grid)) if not is_in_grid: # The process does not own any detector (and TOD) - setattr(self, name, None) + null_det = DetectorInfo() + attribute = getattr(null_det, name, None) + value = 0 if isinstance(attribute, numbers.Number) else None + setattr(self, name, value) return my_col = comm_grid.rank % self._n_blocks_time @@ -662,7 +787,7 @@ def get_pointings( pointing_buffer: Optional[npt.NDArray] = None, hwp_buffer: Optional[npt.NDArray] = None, pointings_dtype=np.float32, - ) -> (npt.NDArray, Optional[npt.NDArray]): + ) -> tuple[npt.NDArray, Optional[npt.NDArray]]: """ Compute the pointings for one or more detectors in this observation diff --git a/litebird_sim/simulations.py b/litebird_sim/simulations.py index 7a213a0a..e11267b4 100644 --- a/litebird_sim/simulations.py +++ b/litebird_sim/simulations.py @@ -889,6 +889,7 @@ def create_observations( detectors: List[DetectorInfo], num_of_obs_per_detector: int = 1, split_list_over_processes=True, + det_blocks_attributes: Union[List[str], None] = None, n_blocks_det=1, n_blocks_time=1, root=0, @@ -922,7 +923,12 @@ def create_observations( simulating 10 detectors and you specify ``n_blocks_det=5``, this means that each observation will handle ``10 / 5 = 2`` detectors. The default is that *all* the detectors be kept - together (``n_blocks_det=1``). + together (``n_blocks_det=1``). On the other hand, the parameter + `det_blocks_attributes` specifies the list of detector attributes + to create the groups of detectors. For example, with + ``det_blocks_attributes = ["wafer", "pixel"]``, the detectors will + be divided into groups such that all detectors in a group will + have the same ``wafer`` and ``pixel`` attribute. The parameter `n_blocks_time` specifies the number of time splits of the observations. In the case of a 3-month-long @@ -1013,6 +1019,7 @@ def create_observations( start_time_global=cur_time, sampling_rate_hz=sampfreq_hz, n_samples_global=nsamples, + det_blocks_attributes=det_blocks_attributes, n_blocks_det=n_blocks_det, n_blocks_time=n_blocks_time, comm=(None if split_list_over_processes else self.mpi_comm), diff --git a/test/test_detector_blocks.py b/test/test_detector_blocks.py new file mode 100644 index 00000000..1c1e9346 --- /dev/null +++ b/test/test_detector_blocks.py @@ -0,0 +1,100 @@ +import numpy as np +import litebird_sim as lbs + + +def test_detector_blocks(): + comm = lbs.MPI_COMM_WORLD + + start_time = 456 + duration_s = 100 + sampling_freq_Hz = 1 + nobs_per_det = 3 + + # Creating a list of detectors. + dets = [ + lbs.DetectorInfo( + name="channel1_w9_detA", + wafer="wafer_9", + channel="channel1", + sampling_rate_hz=sampling_freq_Hz, + ), + lbs.DetectorInfo( + name="channel1_w3_detB", + wafer="wafer_3", + channel="channel1", + sampling_rate_hz=sampling_freq_Hz, + ), + lbs.DetectorInfo( + name="channel1_w1_detC", + wafer="wafer_1", + channel="channel1", + sampling_rate_hz=sampling_freq_Hz, + ), + lbs.DetectorInfo( + name="channel1_w1_detD", + wafer="wafer_1", + channel="channel1", + sampling_rate_hz=sampling_freq_Hz, + ), + lbs.DetectorInfo( + name="channel2_w4_detA", + wafer="wafer_4", + channel="channel2", + sampling_rate_hz=sampling_freq_Hz, + ), + lbs.DetectorInfo( + name="channel2_w4_detB", + wafer="wafer_4", + channel="channel2", + sampling_rate_hz=sampling_freq_Hz, + ), + ] + + sim = lbs.Simulation( + start_time=start_time, + duration_s=duration_s, + random_seed=12345, + mpi_comm=comm, + ) + + sim.create_observations( + detectors=dets, + split_list_over_processes=False, + num_of_obs_per_detector=nobs_per_det, + det_blocks_attributes=["channel", "wafer"], + ) + + tod_len_per_det_per_proc = 0 + for obs in sim.observations: + tod_shape = obs.tod.shape + + n_blocks_det = obs.n_blocks_det + n_blocks_time = obs.n_blocks_time + tod_len_per_det_per_proc += obs.tod.shape[1] + + # No testing required if the proc doesn't owns a detector + if obs.det_idx is not None: + det_names_per_obs = [ + obs.detectors_global[idx]["name"] for idx in obs.det_idx + ] + + # Testing if the mapping between the obs.name and + # obs.det_idx is consistent with obs.detectors_global + np.testing.assert_equal(obs.name, det_names_per_obs) + + # Testing the distribution of the number of detectors per + # detector block + np.testing.assert_equal(obs.name.shape[0], tod_shape[0]) + + # Testing if the distribution of samples along the time axis is consistent + if comm.rank < n_blocks_det * n_blocks_time: + arr = [ + span.num_of_elements + for span in lbs.distribute.distribute_evenly( + duration_s * sampling_freq_Hz, n_blocks_time * nobs_per_det + ) + ] + + start_idx = (comm.rank % n_blocks_time) * nobs_per_det + stop_idx = start_idx + nobs_per_det + np.testing.assert_equal(sum(arr[start_idx:stop_idx]), tod_len_per_det_per_proc)