From 2bcd7deaa0a9fce0a6471d54a060d21646726e2b Mon Sep 17 00:00:00 2001 From: CamDavidsonPilon Date: Mon, 3 Feb 2025 14:14:35 -0500 Subject: [PATCH] new od calibration --- CHANGELOG.md | 7 +- pioreactor/calibrations/__init__.py | 33 +- ...ation.py => od_calibration_single_vial.py} | 0 .../od_calibration_using_standards.py | 361 ++++++++++++++++++ pioreactor/cli/calibrations.py | 80 ++-- pioreactor/structs.py | 24 ++ pioreactor/tests/test_od_reading.py | 199 ++++++++++ 7 files changed, 647 insertions(+), 57 deletions(-) rename pioreactor/calibrations/{od_calibration.py => od_calibration_single_vial.py} (100%) create mode 100644 pioreactor/calibrations/od_calibration_using_standards.py diff --git a/CHANGELOG.md b/CHANGELOG.md index c15c9721..3cc73091 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,8 @@ #### Enhancements - - new RP2040 firmware improvements => less noise in OD readings + - new OD calibration using standards (requires multiple vials). Try `pio calibrations run --device od`. This was inspired by the plugin by @odcambc. + - new RP2040 firmware improvements (v0.4) => faster response over i2c. - Improved chart colors in the UI - The OD reading CLI has a new option, `--snapshot`, that will start the job, take a single reading, and exit. This is useful for scripting purposes. - A new CLI for pumps: `pio run pumps`. Add pumps as options: @@ -25,7 +26,8 @@ - GET `/api/experiment_profiles/running/experiments/` introduced #### Breaking changes - - plugins should migrate from `click_some_name` to autodiscover plugins, to importing `run`. Example: + + - (Eventually) plugins should migrate from `click_some_name` to autodiscover plugins, to importing `run`. Example: ``` import click from pioreactor.cli.run import run @@ -38,7 +40,6 @@ ... ``` - #### Bug fixes - fixed ui not showing 3p calibrations - experiment profiles start now use the unit_api directly. This may mitigate the issue where huey workers stampeding on each other when try to start many jobs. diff --git a/pioreactor/calibrations/__init__.py b/pioreactor/calibrations/__init__.py index 246b6a42..2364fee6 100644 --- a/pioreactor/calibrations/__init__.py +++ b/pioreactor/calibrations/__init__.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- from __future__ import annotations +from collections import defaultdict from pathlib import Path from typing import Callable from typing import Literal @@ -22,20 +23,24 @@ CALIBRATION_PATH = Path(".pioreactor/storage/calibrations/") # Lookup table for different calibration protocols -calibration_protocols: dict[tuple[str, str], Type[CalibrationProtocol]] = {} +Device = str +ProtocolName = str + +calibration_protocols: dict[Device, dict[ProtocolName, Type[CalibrationProtocol]]] = defaultdict(dict) class CalibrationProtocol: - protocol_name: str - target_device: str | list[str] + protocol_name: ProtocolName + target_device: Device | list[Device] + description = "" def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs) if isinstance(cls.target_device, str): - calibration_protocols[(cls.target_device, cls.protocol_name)] = cls + calibration_protocols[cls.target_device][cls.protocol_name] = cls elif isinstance(cls.target_device, list): for device in cls.target_device: - calibration_protocols[(device, cls.protocol_name)] = cls + calibration_protocols[device][cls.protocol_name] = cls else: raise ValueError("target_device must be a string or a list of strings") @@ -46,19 +51,23 @@ def run(self, *args, **kwargs): class SingleVialODProtocol(CalibrationProtocol): target_device = "od" protocol_name = "single_vial" + description = "Calibrate OD using a single vial" def run(self, *args, **kwargs) -> structs.ODCalibration: - from pioreactor.calibrations.od_calibration import run_od_calibration + from pioreactor.calibrations.od_calibration_single_vial import run_od_calibration return run_od_calibration() -class BatchVialODProtocol(CalibrationProtocol): +class StandardsODProtocol(CalibrationProtocol): target_device = "od" - protocol_name = "batch_vial" + protocol_name = "standards" + description = "Calibrate OD using standards. Requires multiple vials" def run(self, *args, **kwargs) -> structs.ODCalibration: - raise NotImplementedError("Not implemented yet") + from pioreactor.calibrations.od_calibration_using_standards import run_od_calibration + + return run_od_calibration() class DurationBasedPumpProtocol(CalibrationProtocol): @@ -102,7 +111,7 @@ def load_active_calibration(device: Literal["stirring"]) -> structs.SimpleStirri pass -def load_active_calibration(device: str) -> structs.AnyCalibration | None: +def load_active_calibration(device: Device) -> structs.AnyCalibration | None: with local_persistent_storage("active_calibrations") as c: active_cal_name = c.get(device) @@ -112,7 +121,7 @@ def load_active_calibration(device: str) -> structs.AnyCalibration | None: return load_calibration(device, active_cal_name) -def load_calibration(device: str, calibration_name: str) -> structs.AnyCalibration: +def load_calibration(device: Device, calibration_name: str) -> structs.AnyCalibration: target_file = CALIBRATION_PATH / device / f"{calibration_name}.yaml" if not target_file.exists(): @@ -127,7 +136,7 @@ def load_calibration(device: str, calibration_name: str) -> structs.AnyCalibrati raise ValidationError(f"Error reading {target_file.stem}: {e}") -def list_of_calibrations_by_device(device: str) -> list[str]: +def list_of_calibrations_by_device(device: Device) -> list[str]: device_dir = CALIBRATION_PATH / device if not device_dir.exists(): return [] diff --git a/pioreactor/calibrations/od_calibration.py b/pioreactor/calibrations/od_calibration_single_vial.py similarity index 100% rename from pioreactor/calibrations/od_calibration.py rename to pioreactor/calibrations/od_calibration_single_vial.py diff --git a/pioreactor/calibrations/od_calibration_using_standards.py b/pioreactor/calibrations/od_calibration_using_standards.py new file mode 100644 index 00000000..4222c50d --- /dev/null +++ b/pioreactor/calibrations/od_calibration_using_standards.py @@ -0,0 +1,361 @@ +# -*- coding: utf-8 -*- +""" +Copyright 2023 Chris Macdonald, Pioreactor + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +""" +from __future__ import annotations + +from math import log2 +from time import sleep +from typing import cast + +import click +from click import clear +from click import confirm +from click import echo +from click import prompt +from click import style +from msgspec.json import encode +from msgspec.json import format + +from pioreactor import structs +from pioreactor import types as pt +from pioreactor.background_jobs.od_reading import start_od_reading +from pioreactor.background_jobs.stirring import start_stirring as stirring +from pioreactor.background_jobs.stirring import Stirrer +from pioreactor.calibrations import utils +from pioreactor.config import config +from pioreactor.utils import is_pio_job_running +from pioreactor.utils import local_persistent_storage +from pioreactor.utils import managed_lifecycle +from pioreactor.utils.timing import current_utc_datestamp +from pioreactor.utils.timing import current_utc_datetime +from pioreactor.whoami import get_testing_experiment_name +from pioreactor.whoami import get_unit_name +from pioreactor.whoami import is_testing_env + + +def green(string: str) -> str: + return style(string, fg="green") + + +def red(string: str) -> str: + return style(string, fg="red") + + +def introduction() -> None: + import logging + + logging.disable(logging.WARNING) + + clear() + echo( + """This routine will calibrate the current Pioreactor to (offline) OD600 readings. You'll need: + 1. The Pioreactor you wish to calibrate (the one you are using) + 2. At least 10mL of a culture with the highest density you'll ever observe, and its OD600 measurement + 3. A micro-pipette, or accurate tool to dispense 1ml of liquid. + 4. Accurate 10mL measurement tool (ex: graduated cylinder) + 5. Sterile media, amount to be determined shortly. +""" + ) + + +def get_name_from_user() -> str: + with local_persistent_storage("od_calibrations") as cache: + while True: + name = prompt( + green( + f"Optional: Provide a name for this calibration. [enter] to use default name `od-cal-{current_utc_datestamp()}`" + ), + type=str, + default=f"od-cal-{current_utc_datestamp()}", + show_default=False, + ).strip() + + if name == "": + echo("Name cannot be empty") + continue + elif name in cache: + if confirm(green("❗️ Name already exists. Do you wish to overwrite?")): + return name + elif name == "current": + echo("Name cannot be `current`.") + continue + else: + return name + + +def get_metadata_from_user() -> tuple[pt.OD600, pt.OD600, pt.mL, pt.PdAngle, pt.PdChannel]: + if config.get("od_reading.config", "ir_led_intensity") == "auto": + echo( + red( + "Can't use ir_led_intensity=auto with OD calibrations. Change ir_led_intensity in your config.ini to a numeric value (70 is good default). Aborting!" + ) + ) + raise click.Abort() + + initial_od600 = prompt( + green("Provide the OD600 measurement of your initial, high density, culture"), + type=click.FloatRange(min=0.01, clamp=False), + ) + + minimum_od600 = prompt( + green("Provide the minimum OD600 measurement you wish to calibrate to"), + type=click.FloatRange(min=0, max=initial_od600, clamp=False), + ) + + while minimum_od600 >= initial_od600: + minimum_od600 = cast( + pt.OD600, + prompt( + "The minimum OD600 measurement must be less than the initial OD600 culture measurement", + type=click.FloatRange(min=0, max=initial_od600, clamp=False), + ), + ) + + if minimum_od600 == 0: + minimum_od600 = 0.01 + + dilution_amount = prompt( + green("Provide the volume to be added to your vial each iteration (default = 1 mL)"), + default=1, + type=click.FloatRange(min=0.01, max=10, clamp=False), + ) + + number_of_points = int(log2(initial_od600 / minimum_od600) * (10 / dilution_amount)) + + echo(f"This will require {number_of_points} data points.") + echo(f"You will need at least {number_of_points * dilution_amount + 10}mL of media available.") + confirm(green("Continue?"), abort=True, default=True) + + if "REF" not in config["od_config.photodiode_channel_reverse"]: + echo( + red( + "REF required for OD calibration. Set an input to REF in [od_config.photodiode_channel] in your config." + ) + ) + raise click.Abort() + # technically it's not required? we just need a specific PD channel to calibrate from. + + ref_channel = config["od_config.photodiode_channel_reverse"]["REF"] + pd_channel = cast(pt.PdChannel, "1" if ref_channel == "2" else "2") + + confirm( + green( + f"Confirm using channel {pd_channel} with angle {config['od_config.photodiode_channel'][pd_channel]}° position in the Pioreactor" + ), + abort=True, + default=True, + ) + angle = cast(pt.PdAngle, config["od_config.photodiode_channel"][pd_channel]) + return initial_od600, minimum_od600, dilution_amount, angle, pd_channel + + +def setup_HDC_instructions() -> None: + click.clear() + click.echo( + """ Setting up: + 1. Place first standard into Pioreactor, with a stir bar. +""" + ) + + +def start_stirring(): + while not confirm(green("Reading to start stirring?"), default=True, abort=True): + pass + + echo("Starting stirring and blocking until near target RPM.") + + st = stirring( + target_rpm=config.getfloat("stirring.config", "target_rpm"), + unit=get_unit_name(), + experiment=get_testing_experiment_name(), + ) + st.block_until_rpm_is_close_to_target(abs_tolerance=120) + return st + + +def choose_settings() -> float: + config_rpm = config.getfloat("stirring", "target_rpm") + + rpm = click.prompt( + click.style( + f"Optional: Enter RPM for stirring. [enter] for {config_rpm} RPM, default set in config.ini", + fg="green", + ), + type=click.FloatRange(0, 10000), + default=config_rpm, + show_default=False, + ) + return rpm + + +def to_struct( + curve_data_: list[float], + curve_type: str, + voltages: list[pt.Voltage], + od600s: list[pt.OD], + angle, + name: str, + pd_channel: pt.PdChannel, + unit: str, +) -> structs.ODCalibration: + data_blob = structs.ODCalibration( + created_at=current_utc_datetime(), + calibrated_on_pioreactor_unit=unit, + calibration_name=name, + angle=angle, + curve_data_=curve_data_, + curve_type=curve_type, + y="od600s", + x="voltages", + recorded_data={"x": voltages, "y": od600s}, + ir_led_intensity=float(config["od_reading.config"]["ir_led_intensity"]), + pd_channel=pd_channel, + ) + + return data_blob + + +def start_recording_standards(st: Stirrer, signal_channel): + voltages = [] + od600_values = [] + click.echo("Starting OD recordings.") + + with start_od_reading( + cast(pt.PdAngleOrREF, config.get("od_config.photodiode_channel", "1")), + cast(pt.PdAngleOrREF, config.get("od_config.photodiode_channel", "2")), + interval=None, + unit=get_unit_name(), + fake_data=is_testing_env(), + experiment=get_testing_experiment_name(), + calibration=None, + ) as od_reader: + + def get_voltage_from_adc() -> float: + od_readings1 = od_reader.record_from_adc() + od_readings2 = od_reader.record_from_adc() + return 0.5 * (od_readings1.ods[signal_channel].od + od_readings2.ods[signal_channel].od) + + for _ in range(4): + # warm up + od_reader.record_from_adc() + + while True: + click.echo("Recording next standard.") + standard_od = click.prompt("Enter OD600 measurement", type=float) + for i in range(4): + click.echo(".", nl=False) + sleep(0.5) + + click.echo(".", nl=False) + voltage = get_voltage_from_adc() + click.echo(".", nl=False) + + od600_values.append(standard_od) + voltages.append(voltage) + + st.set_state("sleeping") + + for i in range(len(od600_values)): + click.clear() + utils.plot_data( + od600_values, + voltages, + title="OD Calibration (ongoing)", + x_min=0, + x_max=max(od600_values), + x_label="OD600", + y_label="Voltage", + ) + click.echo() + + if not click.confirm("Record another OD600 standard?", default=True): + break + + click.echo() + click.echo(click.style("Stop❗", fg="red")) + click.echo("Carefully remove vial and replace with next standard.") + click.echo("Confirm vial outside is dry and clean.") + while not click.confirm("Continue?", default=True): + pass + st.set_state("ready") + st.block_until_rpm_is_close_to_target(abs_tolerance=120) + sleep(1.0) + + click.clear() + utils.plot_data( + od600_values, + voltages, + title="OD Calibration (ongoing)", + x_min=0, + x_max=max(od600_values), + x_label="OD600", + y_label="Voltage", + ) + click.echo("Add media blank standard.") + od600_blank = click.prompt("What is the OD600 of your blank?", type=float) + click.echo("Confirm vial outside is dry and clean. Place into Pioreactor.") + while not click.confirm("Continue?", default=True): + pass + + voltages.append(get_voltage_from_adc()) + od600_values.append(od600_blank) + + return od600_values, voltages + + +def run_od_calibration() -> structs.ODCalibration: + unit = get_unit_name() + experiment = get_testing_experiment_name() + curve_data_: list[float] = [] + curve_type = "poly" + + with managed_lifecycle(unit, experiment, "od_calibration"): + introduction() + name = get_name_from_user() + + if any(is_pio_job_running(["stirring", "od_reading"])): + echo(red("Both Stirring and OD reading should be turned off.")) + raise click.Abort() + + ( + initial_od600, + minimum_od600, + dilution_amount, + angle, + pd_channel, + ) = get_metadata_from_user() + setup_HDC_instructions() + + with start_stirring() as st: + inferred_od600s, voltages = start_recording_standards(st, pd_channel) + + cal = to_struct( + curve_data_, + curve_type, + voltages, + inferred_od600s, + angle, + name, + pd_channel, + unit, + ) + + cal = utils.crunch_data_and_confirm_with_user(cal) + + echo(style(f"Calibration curve for `{name}`", underline=True, bold=True)) + echo(utils.curve_to_functional_form(cal.curve_type, cal.curve_data_)) + echo() + echo(style(f"Data for `{name}`", underline=True, bold=True)) + print(format(encode(cal)).decode()) + echo() + echo(f"Finished calibration of `{name}` ✅") + + return cal diff --git a/pioreactor/cli/calibrations.py b/pioreactor/cli/calibrations.py index 32e64420..aadb7b9f 100644 --- a/pioreactor/cli/calibrations.py +++ b/pioreactor/cli/calibrations.py @@ -10,11 +10,11 @@ from pioreactor.calibrations import calibration_protocols from pioreactor.calibrations import list_devices from pioreactor.calibrations import list_of_calibrations_by_device +from pioreactor.calibrations import load_active_calibration from pioreactor.calibrations import load_calibration from pioreactor.calibrations.utils import crunch_data_and_confirm_with_user from pioreactor.calibrations.utils import curve_to_callable from pioreactor.calibrations.utils import plot_data -from pioreactor.utils import local_persistent_storage @click.group(short_help="calibration utils") @@ -50,19 +50,15 @@ def _display_calibrations_by_device(device: str) -> None: click.echo(header) click.echo("-" * len(header)) - with local_persistent_storage("active_calibrations") as c: - for name in list_of_calibrations_by_device(device): - try: - location = (calibration_dir / name).with_suffix(".yaml") - data = yaml_decode( - location.read_bytes(), type=structs.subclass_union(structs.CalibrationBase) - ) - active = c.get(device) == data.calibration_name - row = f"{device:<25}{data.calibration_name:<50}{data.created_at.strftime('%Y-%m-%d %H:%M:%S'):<25}{'✅' if active else '':<10}{location}" - click.echo(row) - except Exception as e: - error_message = f"Error reading {name}: {e}" - click.echo(f"{error_message:<60}") + for name in list_of_calibrations_by_device(device): + try: + location = (calibration_dir / name).with_suffix(".yaml") + data = yaml_decode(location.read_bytes(), type=structs.subclass_union(structs.CalibrationBase)) + row = f"{device:<25}{data.calibration_name:<50}{data.created_at.strftime('%Y-%m-%d %H:%M:%S'):<25}{'✅' if data.is_active(device) else '':<10}{location}" + click.echo(row) + except Exception as e: + error_message = f"Error reading {name}: {e}" + click.echo(f"{error_message:<60}") @calibration.command(name="run", context_settings=dict(ignore_unknown_options=True, allow_extra_args=True)) @@ -81,29 +77,31 @@ def run_calibration(ctx, device: str, protocol_name: str | None, y: bool) -> Non if "--protocol" in ctx.args: raise click.UsageError("Please use --protocol-name instead of --protocol") - DEFAULT_PROTOCOLS = { - "od": "single_vial", - "media_pump": "duration_based", - "alt_media_pump": "duration_based", - "waste_pump": "duration_based", - "stirring": "dc_based", - } - # Dispatch to the assistant function for that device - if protocol_name is None and device in DEFAULT_PROTOCOLS: - protocol_name = DEFAULT_PROTOCOLS[device] - elif protocol_name is None: - raise ValueError("Must provide protocol name: --protocol-name ") + if protocol_name is None: + if len(calibration_protocols.get(device, {}).keys()) == 1: + protocol_name = list(calibration_protocols.get(device, {}).keys())[0] + else: + # user will choose using click.prompt and click.Choice + click.clear() + click.echo() + click.echo(f"Available protocols for {device}:") + for protocol in calibration_protocols.get(device, {}).values(): + click.echo(click.style(f" • {protocol.protocol_name}", bold=True)) + click.echo(f" Description: {protocol.description}") + click.echo() + protocol_name = click.prompt( + "Choose a protocol", type=click.Choice(list(calibration_protocols.get(device, {}).keys())) + ) + + assistant = calibration_protocols.get(device, {}).get(protocol_name) - assistant = calibration_protocols.get((device, protocol_name)) if assistant is None: click.echo( f"No protocols found for device '{device}'. Available {device} protocols: {list(c[1] for c in calibration_protocols.keys() if c[0] == device)}" ) raise click.Abort() - # Run the assistant function to get the final calibration data - calibration_struct = assistant().run( target_device=device, **{ctx.args[i][2:].replace("-", "_"): ctx.args[i + 1] for i in range(0, len(ctx.args), 2)}, @@ -131,8 +129,9 @@ def run_calibration(ctx, device: str, protocol_name: str | None, y: bool) -> Non @calibration.command(name="protocols") def list_protocols() -> None: - for device, protocol in calibration_protocols.keys(): - click.echo(f"{device}: {protocol}") + for device, protocols in calibration_protocols.items(): + for protocol in protocols: + click.echo(f"{device}: {protocol}") @calibration.command(name="display") @@ -171,14 +170,13 @@ def set_active_calibration(device: str, calibration_name: str | None) -> None: """ if calibration_name is None: - with local_persistent_storage("active_calibrations") as c: - is_present = c.pop(device) - if is_present: - click.echo(f"No calibration name provided. Clearing active calibration for {device}.") + present = load_active_calibration(device) # type: ignore + + if present is not None: + click.echo(f"Clearing active calibration for {device}.") + present.remove_as_active_calibration_for_device(device) else: - click.echo( - f"No calibration name provided. Tried clearing active calibration for {device}, but didn't find one." - ) + click.echo(f"Tried clearing active calibration for {device}, but didn't find one.") else: data = load_calibration(device, calibration_name) @@ -203,10 +201,8 @@ def delete_calibration(device: str, calibration_name: str) -> None: target_file.unlink() - with local_persistent_storage("active_calibrations") as c: - is_present = c.get(device) == calibration_name - if is_present: - c.pop(device) + cal = load_calibration(device, calibration_name) + cal.remove_as_active_calibration_for_device(device) click.echo(f"Deleted calibration '{calibration_name}' of device '{device}'.") diff --git a/pioreactor/structs.py b/pioreactor/structs.py index e5a324c9..46c8324d 100644 --- a/pioreactor/structs.py +++ b/pioreactor/structs.py @@ -15,6 +15,7 @@ from pioreactor import exc from pioreactor import types as pt +from pioreactor.logging import create_logger T = t.TypeVar("T") @@ -166,6 +167,8 @@ def save_to_disk_for_device(self, device: str) -> str: from pioreactor.calibrations import CALIBRATION_PATH import shutil + logger = create_logger("calibrations") + calibration_dir = CALIBRATION_PATH / device calibration_dir.mkdir(parents=True, exist_ok=True) @@ -178,17 +181,32 @@ def save_to_disk_for_device(self, device: str) -> str: with out_file.open("wb") as f: f.write(yaml_encode(self)) + logger.info(f"Saved calibration {self.calibration_name} to {out_file}") return str(out_file) def set_as_active_calibration_for_device(self, device: str) -> None: from pioreactor.utils import local_persistent_storage + logger = create_logger("calibrations") + if not self.exists_on_disk_for_device(device): self.save_to_disk_for_device(device) with local_persistent_storage("active_calibrations") as c: c[device] = self.calibration_name + logger.info(f"Set {self.calibration_name} as active calibration for {device}") + + def remove_as_active_calibration_for_device(self, device: str) -> None: + from pioreactor.utils import local_persistent_storage + + logger = create_logger("calibrations") + + with local_persistent_storage("active_calibrations") as c: + if c.get(device) == self.calibration_name: + del c[device] + logger.info(f"Removed {self.calibration_name} as active calibration for {device}") + def exists_on_disk_for_device(self, device: str) -> bool: from pioreactor.calibrations import CALIBRATION_PATH @@ -256,6 +274,12 @@ def ipredict(self, y: Y, enforce_bounds=False) -> X: else: raise exc.SolutionAboveDomainError("Solution below domain") + def is_active(self, device: str) -> bool: + from pioreactor.utils import local_persistent_storage + + with local_persistent_storage("active_calibrations") as c: + return c.get(device) == self.calibration_name + class ODCalibration(CalibrationBase, kw_only=True, tag="od"): ir_led_intensity: float diff --git a/pioreactor/tests/test_od_reading.py b/pioreactor/tests/test_od_reading.py index 18b54217..99db94b4 100644 --- a/pioreactor/tests/test_od_reading.py +++ b/pioreactor/tests/test_od_reading.py @@ -109,6 +109,205 @@ def test_sin_regression_all_zeros_should_return_zeros() -> None: assert np.isinf(AIC) +def test_sin_regression_with_decreasing_signal() -> None: + # the IR led warms up over the duration, which decreases the signal (looks linear). We should still get a value closer to average. + import numpy as np + + x = [ + 0.0008298440370708704, + 0.021651222021318972, + 0.04314119496848434, + 0.06425559404306114, + 0.08625895204022527, + 0.10757809097412974, + 0.1286022289423272, + 0.15018110803794116, + 0.1714978510281071, + 0.19339219899848104, + 0.2149411819409579, + 0.2360574039630592, + 0.25775998004246503, + 0.27918078599032015, + 0.30000732000917196, + 0.32172468898352236, + 0.34277611901052296, + 0.36466900794766843, + 0.386212470009923, + 0.4071048899786547, + 0.4289396540261805, + 0.4501034809509292, + 0.47221486002672464, + 0.49366644700057805, + 0.5148165229475126, + 0.5366985789733008, + 0.558002406032756, + 0.5789992519421503, + 0.6005703710252419, + 0.6218597700353712, + 0.643724118010141, + 0.665215183980763, + 0.6863259370438755, + 0.7080741389654577, + 0.7294300489593297, + 0.7501914779422805, + 0.7718503050273284, + 0.7929848609492183, + 0.81479524995666, + 0.8362971489550546, + 0.8571722250198945, + 0.8789658440509811, + ] + y = [ + 2532.0, + 2531.0, + 2529.0, + 2526.0, + 2525.0, + 2525.0, + 2523.0, + 2524.0, + 2523.0, + 2522.0, + 2521.0, + 2521.0, + 2520, + 2518, + 2517, + 2517, + 2516, + 2515, + 2514, + 2512, + 2511, + 2510, + 2510, + 2510, + 2509, + 2508, + 2508, + 2507, + 2507, + 2506, + 2507, + 2507, + 2505, + 2505, + 2504, + 2503, + 2503, + 2502, + 2502, + 2501, + 2500, + 2500, + ] + + adc_reader = ADCReader(channels=[]) + + (C, A, phi), AIC = adc_reader._sin_regression_with_known_freq(x, y, 60) + assert abs(C - np.mean(y)) < 5 + + +def test_sin_regression_with_stable_signal() -> None: + # like the test sin_regression_with_decreasing_signal, but we kept the IR LED on so it's stabilized + import numpy as np + + x2 = [ + 0.0008372400188818574, + 0.021714452072046697, + 0.04315682000014931, + 0.0643049170030281, + 0.08630915998946875, + 0.10759199701715261, + 0.12866321904584765, + 0.150232566986233, + 0.17160384100861847, + 0.19345522008370608, + 0.21494654600974172, + 0.2361216749995947, + 0.2578482620883733, + 0.279326880001463, + 0.3000860180472955, + 0.3219419290544465, + 0.3429933590814471, + 0.36488588398788124, + 0.3864633560879156, + 0.4075205160770565, + 0.43016210407949984, + 0.45042379410006106, + 0.47266907908488065, + 0.4941541040316224, + 0.5151724610477686, + 0.537163631990552, + 0.5584422500105575, + 0.5794853990664706, + 0.6010359440697357, + 0.6222718540811911, + 0.6442805770784616, + 0.6657215390587226, + 0.686848073033616, + 0.7084880460752174, + 0.7299391120905057, + 0.7506692920578644, + 0.7722554620122537, + 0.7935136629967019, + 0.8153043130878359, + 0.8368472020374611, + 0.8576910280389711, + 0.879548761062324, + ] + + y2 = [ + 2369.0, + 2370, + 2371, + 2371, + 2371, + 2370, + 2372, + 2370, + 2370, + 2371, + 2370, + 2370, + 2372, + 2371, + 2371, + 2370, + 2373, + 2372, + 2371, + 2373, + 2373, + 2371, + 2372, + 2371, + 2370, + 2373, + 2372, + 2372, + 2372, + 2371, + 2371, + 2372, + 2371, + 2371, + 2371, + 2371, + 2371, + 2371, + 2370, + 2370, + 2370, + 2370, + ] + + adc_reader = ADCReader(channels=[]) + + (C, A, phi), AIC = adc_reader._sin_regression_with_known_freq(x2, y2, 60) + assert abs(C - np.mean(y2)) < 5 + + def test_sin_regression_real_data_and_that_60hz_is_the_minimum() -> None: y = [ 8694.0,