diff --git a/src/napari_micromanager/_core_link.py b/src/napari_micromanager/_core_link.py index eae4ab8d..525eb590 100644 --- a/src/napari_micromanager/_core_link.py +++ b/src/napari_micromanager/_core_link.py @@ -5,7 +5,7 @@ import napari import napari.layers -from pymmcore_plus import CMMCorePlus +from pymmcore_plus import CMMCorePlus, Metadata from qtpy.QtCore import QObject, Qt, QTimerEvent from superqt.utils import ensure_main_thread @@ -57,7 +57,10 @@ def timerEvent(self, a0: QTimerEvent | None) -> None: def _image_snapped(self) -> None: # If we are in the middle of an MDA, don't update the preview viewer. if not self._mda_handler._mda_running: - self._update_viewer(self._mmc.getImage()) + # update the viewer with the image from all the cameras + for cam in range(self._mmc.getNumberOfCameraChannels()): + # using tagged image to then get the camera name from the metadata + self._update_viewer(*self._mmc.getTaggedImage(cam)) def _start_live(self) -> None: interval = int(self._mmc.getExposure()) @@ -74,21 +77,34 @@ def _restart_live(self, camera: str, exposure: float) -> None: self._mmc.startContinuousSequenceAcquisition() @ensure_main_thread # type: ignore [misc] - def _update_viewer(self, data: np.ndarray | None = None) -> None: + def _update_viewer( + self, data: np.ndarray | None = None, metadata: dict | Metadata | None = None + ) -> None: """Update viewer with the latest image from the circular buffer.""" if data is None: if self._mmc.getRemainingImageCount() == 0: return try: - data = self._mmc.getLastImage() + # get the last image from the circular buffer with metadata + data, metadata = self._mmc.getLastImageAndMD() except (RuntimeError, IndexError): # circular buffer empty return + + if metadata is None: + return + + # get the camera from the metadata + cam = metadata.get("Camera", self._mmc.getCameraDevice()) + layer_name = f"preview ({cam})" + try: - preview_layer = self.viewer.layers["preview"] + preview_layer = self.viewer.layers[layer_name] preview_layer.data = data except KeyError: - preview_layer = self.viewer.add_image(data, name="preview") + preview_layer = self.viewer.add_image( + data, name=layer_name, blending="additive" + ) preview_layer.metadata["mode"] = "preview" diff --git a/src/napari_micromanager/_mda_handler.py b/src/napari_micromanager/_mda_handler.py index c4fdd142..ac4b7f4e 100644 --- a/src/napari_micromanager/_mda_handler.py +++ b/src/napari_micromanager/_mda_handler.py @@ -4,7 +4,7 @@ import tempfile import time from collections import deque -from typing import TYPE_CHECKING, Callable, Generator, cast +from typing import TYPE_CHECKING, Any, Callable, Generator, cast import napari import zarr @@ -60,7 +60,7 @@ def __init__(self, mmcore: CMMCorePlus, viewer: napari.viewer.Viewer) -> None: # mapping of id -> (zarr.Array, temporary directory) for each layer created self._tmp_arrays: dict[str, tuple[zarr.Array, tempfile.TemporaryDirectory]] = {} - self._deck: deque[tuple[np.ndarray, MDAEvent]] = deque() + self._deck: deque[tuple[np.ndarray, MDAEvent, dict[str, Any]]] = deque() # Add all core connections to this list. This makes it easy to disconnect # from core when this widget is closed. @@ -90,7 +90,7 @@ def _on_mda_started(self, sequence: MDASequence) -> None: # determine the new layers that need to be created for this experiment # (based on the sequence mode, and whether we're splitting C/P, etc.) - axis_labels, layers_to_create = _determine_sequence_layers(sequence) + axis_labels, layers_to_create = _determine_sequence_layers(sequence, self._mmc) yx_shape = [self._mmc.getImageHeight(), self._mmc.getImageWidth()] @@ -143,15 +143,17 @@ def _watch_mda( else: time.sleep(0.1) - def _on_mda_frame(self, image: np.ndarray, event: MDAEvent) -> None: + def _on_mda_frame( + self, image: np.ndarray, event: MDAEvent, meta: dict[str, Any] + ) -> None: """Called on the `frameReady` event from the core.""" - self._deck.append((image, event)) + self._deck.append((image, event, meta)) def _process_frame( - self, image: np.ndarray, event: MDAEvent + self, image: np.ndarray, event: MDAEvent, meta: dict[str, Any] ) -> tuple[str | None, tuple[int, ...] | None]: # get info about the layer we need to update - _id, im_idx, layer_name = _id_idx_layer(event) + _id, im_idx, layer_name = _id_idx_layer(event, meta) # update the zarr array backing the layer self._tmp_arrays[_id][0][im_idx] = image @@ -242,7 +244,7 @@ def _has_sub_sequences(sequence: MDASequence) -> bool: def _determine_sequence_layers( - sequence: MDASequence, + sequence: MDASequence, mmcore: CMMCorePlus ) -> tuple[list[str], list[tuple[str, list[int], LayerMeta]]]: # sourcery skip: extract-duplicate-method """Return (axis_labels, (id, shape, and metadata)) for each layer to add for seq. @@ -259,6 +261,8 @@ def _determine_sequence_layers( The YX shape of a single image in the sequence. (this argument might not need to be passed here, perhaps could be handled be the caller of this function) + mmcore : CMMCorePlus + The Micro-Manager core instance. Returns ------- @@ -308,17 +312,29 @@ def _determine_sequence_layers( axis_labels += ["y", "x"] + # add camera name to id if more than one camera + cameras = mmcore.getCameraChannelNames() + if len(cameras) > 1: + _layer_info = [ + (f"{_id}_{camera}", *items) # type: ignore + for camera in cameras + for _id, *items in _layer_info + ] + return axis_labels, _layer_info -def _id_idx_layer(event: MDAEvent) -> tuple[str, tuple[int, ...], str]: +def _id_idx_layer( + event: MDAEvent, meta: dict[str, Any] +) -> tuple[str, tuple[int, ...], str]: """Get the tmp_path id, index, and layer name for a given event. Parameters ---------- event : MDAEvent An event for which to retrieve the id, index, and layer name. - + meta : dict[str, Any] + Metadata for the sequence. Returns ------- @@ -330,14 +346,14 @@ def _id_idx_layer(event: MDAEvent) -> tuple[str, tuple[int, ...], str]: - `layer_name` is the name of the corresponding layer in the viewer. """ seq = cast("MDASequence", event.sequence) - meta = cast(dict, seq.metadata.get(NMM_METADATA_KEY, {})) + nmm_meta = cast(dict, seq.metadata.get(NMM_METADATA_KEY, {})) axis_order = list(get_full_sequence_axes(seq)) ch_id = "" # get filename from MDASequence metadata prefix = _get_file_name_from_metadata(seq) - if meta.get("split_channels", False) and event.channel: + if nmm_meta.get("split_channels", False) and event.channel: ch_id = f"{event.channel.config}_{event.index['c']:03d}_" axis_order.remove("c") @@ -356,4 +372,9 @@ def _id_idx_layer(event: MDAEvent) -> tuple[str, tuple[int, ...], str]: # the name of this layer in the napari viewer layer_name = f"{prefix}_{ch_id}{seq.uid}" + # "Camera" is present in meta only in case there are multiple cameras + if camera := meta.get("Camera"): + _id = f"{_id}_{camera}" + layer_name = f"{layer_name}_{camera}" + return _id, im_idx, layer_name diff --git a/tests/test_layer_scale.py b/tests/test_layer_scale.py index e3b3547f..bec82c78 100644 --- a/tests/test_layer_scale.py +++ b/tests/test_layer_scale.py @@ -10,6 +10,8 @@ from pymmcore_plus import CMMCorePlus from useq import MDASequence +import warnings + @pytest.mark.parametrize("axis_order", ["tpcz", "tpzc"]) def test_layer_scale( @@ -62,11 +64,15 @@ def test_layer_scale( def test_preview_scale(core: CMMCorePlus, main_window: MainWindow): + warnings.filterwarnings("ignore", category=DeprecationWarning) img = core.snap() main_window._core_link._update_viewer(img) pix_size = core.getPixelSizeUm() - assert tuple(main_window.viewer.layers["preview"].scale) == (pix_size, pix_size) + assert tuple(main_window.viewer.layers["preview (Camera)"].scale) == ( + pix_size, + pix_size, + ) # now pretend that the user never provided a pixel size config # we need to not crash in this case