Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] feat: add support for Micro-Manager Multi Camera utilities #333

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
28 changes: 22 additions & 6 deletions src/napari_micromanager/_core_link.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import napari
import napari.layers
from pymmcore_plus import CMMCorePlus
from pymmcore_plus import CMMCorePlus, Metadata
from qtpy.QtCore import QObject, Qt, QTimerEvent
from superqt.utils import ensure_main_thread

Expand Down Expand Up @@ -57,7 +57,10 @@ def timerEvent(self, a0: QTimerEvent | None) -> None:
def _image_snapped(self) -> None:
# If we are in the middle of an MDA, don't update the preview viewer.
if not self._mda_handler._mda_running:
self._update_viewer(self._mmc.getImage())
# update the viewer with the image from all the cameras
for cam in range(self._mmc.getNumberOfCameraChannels()):
# using tagged image to then get the camera name from the metadata
self._update_viewer(*self._mmc.getTaggedImage(cam))

def _start_live(self) -> None:
interval = int(self._mmc.getExposure())
Expand All @@ -74,21 +77,34 @@ def _restart_live(self, camera: str, exposure: float) -> None:
self._mmc.startContinuousSequenceAcquisition()

@ensure_main_thread # type: ignore [misc]
def _update_viewer(self, data: np.ndarray | None = None) -> None:
def _update_viewer(
self, data: np.ndarray | None = None, metadata: dict | Metadata | None = None
) -> None:
"""Update viewer with the latest image from the circular buffer."""
if data is None:
if self._mmc.getRemainingImageCount() == 0:
return
try:
data = self._mmc.getLastImage()
# get the last image from the circular buffer with metadata
data, metadata = self._mmc.getLastImageAndMD()
except (RuntimeError, IndexError):
# circular buffer empty
return

if metadata is None:
return

# get the camera from the metadata
cam = metadata.get("Camera", self._mmc.getCameraDevice())
layer_name = f"preview ({cam})"

try:
preview_layer = self.viewer.layers["preview"]
preview_layer = self.viewer.layers[layer_name]
preview_layer.data = data
except KeyError:
preview_layer = self.viewer.add_image(data, name="preview")
preview_layer = self.viewer.add_image(
data, name=layer_name, blending="additive"
)

preview_layer.metadata["mode"] = "preview"

Expand Down
45 changes: 33 additions & 12 deletions src/napari_micromanager/_mda_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import tempfile
import time
from collections import deque
from typing import TYPE_CHECKING, Callable, Generator, cast
from typing import TYPE_CHECKING, Any, Callable, Generator, cast

import napari
import zarr
Expand Down Expand Up @@ -60,7 +60,7 @@

# mapping of id -> (zarr.Array, temporary directory) for each layer created
self._tmp_arrays: dict[str, tuple[zarr.Array, tempfile.TemporaryDirectory]] = {}
self._deck: deque[tuple[np.ndarray, MDAEvent]] = deque()
self._deck: deque[tuple[np.ndarray, MDAEvent, dict[str, Any]]] = deque()

# Add all core connections to this list. This makes it easy to disconnect
# from core when this widget is closed.
Expand Down Expand Up @@ -90,7 +90,7 @@

# determine the new layers that need to be created for this experiment
# (based on the sequence mode, and whether we're splitting C/P, etc.)
axis_labels, layers_to_create = _determine_sequence_layers(sequence)
axis_labels, layers_to_create = _determine_sequence_layers(sequence, self._mmc)

yx_shape = [self._mmc.getImageHeight(), self._mmc.getImageWidth()]

Expand Down Expand Up @@ -143,15 +143,17 @@
else:
time.sleep(0.1)

def _on_mda_frame(self, image: np.ndarray, event: MDAEvent) -> None:
def _on_mda_frame(
self, image: np.ndarray, event: MDAEvent, meta: dict[str, Any]
) -> None:
"""Called on the `frameReady` event from the core."""
self._deck.append((image, event))
self._deck.append((image, event, meta))

def _process_frame(
self, image: np.ndarray, event: MDAEvent
self, image: np.ndarray, event: MDAEvent, meta: dict[str, Any]
) -> tuple[str | None, tuple[int, ...] | None]:
# get info about the layer we need to update
_id, im_idx, layer_name = _id_idx_layer(event)
_id, im_idx, layer_name = _id_idx_layer(event, meta)

# update the zarr array backing the layer
self._tmp_arrays[_id][0][im_idx] = image
Expand Down Expand Up @@ -242,7 +244,7 @@


def _determine_sequence_layers(
sequence: MDASequence,
sequence: MDASequence, mmcore: CMMCorePlus
) -> tuple[list[str], list[tuple[str, list[int], LayerMeta]]]:
# sourcery skip: extract-duplicate-method
"""Return (axis_labels, (id, shape, and metadata)) for each layer to add for seq.
Expand All @@ -259,6 +261,8 @@
The YX shape of a single image in the sequence.
(this argument might not need to be passed here, perhaps could be handled
be the caller of this function)
mmcore : CMMCorePlus
The Micro-Manager core instance.

Returns
-------
Expand Down Expand Up @@ -308,17 +312,29 @@

axis_labels += ["y", "x"]

# add camera name to id if more than one camera
cameras = mmcore.getCameraChannelNames()
if len(cameras) > 1:
_layer_info = [

Check warning on line 318 in src/napari_micromanager/_mda_handler.py

View check run for this annotation

Codecov / codecov/patch

src/napari_micromanager/_mda_handler.py#L318

Added line #L318 was not covered by tests
(f"{_id}_{camera}", *items) # type: ignore
for camera in cameras
for _id, *items in _layer_info
]

return axis_labels, _layer_info


def _id_idx_layer(event: MDAEvent) -> tuple[str, tuple[int, ...], str]:
def _id_idx_layer(
event: MDAEvent, meta: dict[str, Any]
) -> tuple[str, tuple[int, ...], str]:
"""Get the tmp_path id, index, and layer name for a given event.

Parameters
----------
event : MDAEvent
An event for which to retrieve the id, index, and layer name.

meta : dict[str, Any]
Metadata for the sequence.

Returns
-------
Expand All @@ -330,14 +346,14 @@
- `layer_name` is the name of the corresponding layer in the viewer.
"""
seq = cast("MDASequence", event.sequence)
meta = cast(dict, seq.metadata.get(NMM_METADATA_KEY, {}))
nmm_meta = cast(dict, seq.metadata.get(NMM_METADATA_KEY, {}))
axis_order = list(get_full_sequence_axes(seq))

ch_id = ""
# get filename from MDASequence metadata
prefix = _get_file_name_from_metadata(seq)

if meta.get("split_channels", False) and event.channel:
if nmm_meta.get("split_channels", False) and event.channel:
ch_id = f"{event.channel.config}_{event.index['c']:03d}_"
axis_order.remove("c")

Expand All @@ -356,4 +372,9 @@
# the name of this layer in the napari viewer
layer_name = f"{prefix}_{ch_id}{seq.uid}"

# "Camera" is present in meta only in case there are multiple cameras
if camera := meta.get("Camera"):
_id = f"{_id}_{camera}"
layer_name = f"{layer_name}_{camera}"

return _id, im_idx, layer_name
8 changes: 7 additions & 1 deletion tests/test_layer_scale.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from pymmcore_plus import CMMCorePlus
from useq import MDASequence

import warnings


@pytest.mark.parametrize("axis_order", ["tpcz", "tpzc"])
def test_layer_scale(
Expand Down Expand Up @@ -62,11 +64,15 @@ def test_layer_scale(


def test_preview_scale(core: CMMCorePlus, main_window: MainWindow):
warnings.filterwarnings("ignore", category=DeprecationWarning)
img = core.snap()
main_window._core_link._update_viewer(img)

pix_size = core.getPixelSizeUm()
assert tuple(main_window.viewer.layers["preview"].scale) == (pix_size, pix_size)
assert tuple(main_window.viewer.layers["preview (Camera)"].scale) == (
pix_size,
pix_size,
)

# now pretend that the user never provided a pixel size config
# we need to not crash in this case
Expand Down
Loading