Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MED-22][external] import nifti to raster layer mask annotations #725

Merged
merged 14 commits into from
Dec 19, 2023
148 changes: 135 additions & 13 deletions darwin/importer/formats/nifti.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import sys
import uuid
import warnings
from collections import OrderedDict, defaultdict
from pathlib import Path
Expand Down Expand Up @@ -102,7 +103,7 @@ def _parse_nifti(
class_img = np.isin(img, class_idxs).astype(np.uint8)
cc_img, num_labels = cc3d.connected_components(class_img, return_N=True)
for instance_id in range(1, num_labels):
_video_annotations = get_video_annotation(
_video_annotations = get_polygon_video_annotations(
cc_img,
class_idxs=[instance_id],
class_name=class_name,
Expand All @@ -116,7 +117,7 @@ def _parse_nifti(
for class_name, class_idxs in processed_class_map.items():
if class_name == "background":
continue
_video_annotations = get_video_annotation(
_video_annotations = get_polygon_video_annotations(
img,
class_idxs=class_idxs,
class_name=class_name,
Expand All @@ -127,10 +128,23 @@ def _parse_nifti(
if _video_annotations is None:
continue
video_annotations += _video_annotations
annotation_classes = {
dt.AnnotationClass(class_name, "polygon", "polygon")
for class_name in class_map.values()
}
elif mode == "mask":
video_annotations = get_mask_video_annotations(
img,
processed_class_map,
slot_names,
)
if mode in ["video", "instances"]:
annotation_classes = {
dt.AnnotationClass(class_name, "polygon", "polygon")
for class_name in class_map.values()
}
elif mode == "mask":
annotation_classes = {
dt.AnnotationClass(class_name, "mask", "mask")
for class_name in class_map.values()
}

return dt.AnnotationFile(
path=json_path,
filename=str(filename),
Expand All @@ -148,7 +162,7 @@ def _parse_nifti(
)


def get_video_annotation(
def get_polygon_video_annotations(
volume: np.ndarray,
class_name: str,
class_idxs: List[int],
Expand All @@ -157,13 +171,18 @@ def get_video_annotation(
pixdims: Tuple[float],
) -> Optional[List[dt.VideoAnnotation]]:
if not is_mpr:
return nifti_to_video_annotation(
volume, class_name, class_idxs, slot_names, view_idx=2, pixdims=pixdims
return nifti_to_video_polygon_annotation(
volume,
class_name,
class_idxs,
slot_names,
view_idx=2,
pixdims=pixdims,
)
elif is_mpr and len(slot_names) == 3:
video_annotations = []
for view_idx, slot_name in enumerate(slot_names):
_video_annotations = nifti_to_video_annotation(
_video_annotations = nifti_to_video_polygon_annotation(
volume,
class_name,
class_idxs,
Expand All @@ -177,9 +196,99 @@ def get_video_annotation(
raise Exception("If is_mpr is True, slot_names must be of length 3")


def nifti_to_video_annotation(
volume, class_name, class_idxs, slot_names, view_idx=2, pixdims=(1, 1, 1)
):
def get_mask_video_annotations(
volume: np.ndarray, processed_class_map: Dict, slot_names: List[str]
) -> Optional[List[dt.VideoAnnotation]]:
"""
The function takes a volume and a class map and returns a list of video annotations

We write a single raster layer for the volume but K mask annotations, where K is the number of classes.
"""
frame_annotations = OrderedDict()
all_mask_annotations = defaultdict(lambda: OrderedDict())
# This is a dictionary of class_names to generated mask_annotation_ids
mask_annotation_ids = {
class_name: str(uuid.uuid4()) for class_name in processed_class_map.keys()
}
# We need to create a new mapping dictionary where the keys are the mask_annotation_ids
# and the values are the new integers which we use in the raster layer
mask_annotation_ids_mapping = {}
map_from_nifti_idx_to_raster_idx = {0: 0} # 0 is the background class
for i in range(volume.shape[2]):
slice_mask = volume[:, :, i].astype(np.uint8)
for raster_idx, (class_name, class_idxs) in enumerate(
processed_class_map.items()
):
if class_name == "background":
continue
class_mask = np.isin(slice_mask, class_idxs).astype(np.uint8).copy()
if class_mask.sum() == 0:
continue
all_mask_annotations[class_name][i] = dt.make_mask(
class_name, subs=None, slot_names=slot_names
)
all_mask_annotations[class_name][i].id = mask_annotation_ids[class_name]
mask_annotation_ids_mapping[mask_annotation_ids[class_name]] = (
raster_idx + 1
)
for class_idx in class_idxs:
map_from_nifti_idx_to_raster_idx[class_idx] = raster_idx + 1
# Now that we've created all the mask annotations, we need to create the raster layer
# We only map the mask_annotation_ids which appear in any given frame.
for i in range(volume.shape[2]):
slice_mask = volume[:, :, i].astype(
np.uint8
) # Product requirement: We only support 255 classes!
# We need to convert from nifti_idx to raster_idx
slice_mask = np.vectorize(
lambda key: map_from_nifti_idx_to_raster_idx.get(key, 0)
)(slice_mask)
dense_rle = convert_to_dense_rle(slice_mask)
raster_annotation = dt.make_raster_layer(
class_name="__raster_layer__",
mask_annotation_ids_mapping=mask_annotation_ids_mapping,
total_pixels=class_mask.size,
dense_rle=dense_rle,
slot_names=slot_names,
)
frame_annotations[i] = raster_annotation
all_frame_ids = list(frame_annotations.keys())
if not all_frame_ids:
return None
if len(all_frame_ids) == 1:
segments = [[all_frame_ids[0], all_frame_ids[0] + 1]]
elif len(all_frame_ids) > 1:
segments = [[min(all_frame_ids), max(all_frame_ids)]]
raster_video_annotation = dt.make_video_annotation(
frame_annotations,
keyframes={f_id: True for f_id in all_frame_ids},
segments=segments,
interpolated=False,
slot_names=slot_names,
)
mask_video_annotations = []
for class_name, mask_annotations in all_mask_annotations.items():
mask_video_annotation = dt.make_video_annotation(
mask_annotations,
keyframes={f_id: True for f_id in all_frame_ids},
segments=segments,
interpolated=False,
slot_names=slot_names,
)
mask_video_annotation.id = mask_annotation_ids[class_name]
mask_video_annotations.append(mask_video_annotation)

return [raster_video_annotation] + mask_video_annotations


def nifti_to_video_polygon_annotation(
volume: np.ndarray,
class_name: str,
class_idxs: List[int],
slot_names: List[str],
view_idx: int = 2,
pixdims: Tuple[int, int, int] = (1, 1, 1),
) -> Optional[List[dt.VideoAnnotation]]:
frame_annotations = OrderedDict()
for i in range(volume.shape[view_idx]):
if view_idx == 2:
Expand Down Expand Up @@ -376,3 +485,16 @@ def process_nifti(
data_array = nib.orientations.apply_orientation(img.get_fdata(), ornt)
pixdims = img.header.get_zooms()
return data_array, pixdims


def convert_to_dense_rle(raster: np.ndarray) -> List[int]:
dense_rle, prev_val, cnt = [], None, 0
for val in raster.T.flat:
if val == prev_val:
cnt += 1
else:
if prev_val is not None:
dense_rle.extend([int(prev_val), int(cnt)])
prev_val, cnt = val, 1
dense_rle.extend([int(prev_val), int(cnt)])
return dense_rle
2 changes: 1 addition & 1 deletion darwin/importer/formats/nifti_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"image": {"type": "string"},
"label": {"type": "string"},
"class_map": class_map,
"mode": {"type": "string", "enum": ["video", "instances"]},
"mode": {"type": "string", "enum": ["video", "instances", "mask"]},
"is_mpr": {"type": "boolean"},
"slot_names": {"type": "array", "items": {"type": "string"}},
},
Expand Down
2 changes: 2 additions & 0 deletions darwin/importer/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,8 @@ def _import_annotations(
if (
annotation_type not in remote_classes
or annotation_class.name not in remote_classes[annotation_type]
and annotation_type
!= "raster_layer" # We do not skip raster layers as they are always available.
):
if annotation_type not in remote_classes:
logger.warning(
Expand Down
59 changes: 59 additions & 0 deletions tests/darwin/importer/formats/import_nifti_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,65 @@ def test_image_annotation_nifti_import_incorrect_number_slot(team_slug: str):
parse_path(path=upload_json)


def test_image_annotation_nifti_import_single_slot_to_mask(team_slug: str):
with tempfile.TemporaryDirectory() as tmpdir:
with ZipFile("tests/data.zip") as zfile:
zfile.extractall(tmpdir)
label_path = (
Path(tmpdir)
/ team_slug
/ "nifti"
/ "releases"
/ "latest"
/ "annotations"
/ "vol0_brain.nii.gz"
)
input_dict = {
"data": [
{
"image": "vol0 (1).nii",
"label": str(label_path),
"class_map": {"1": "brain"},
"mode": "mask",
"is_mpr": False,
"slot_names": ["0.1"],
}
]
}
upload_json = Path(tmpdir) / "annotations.json"
upload_json.write_text(
json.dumps(input_dict, indent=4, sort_keys=True, default=str)
)
annotation_files = parse_path(path=upload_json)
annotation_file = annotation_files[0]
output_json_string = json.loads(
serialise_annotation_file(annotation_file, as_dict=False)
)
expected_json_string = json.load(
open(
Path(tmpdir)
/ team_slug
/ "nifti"
/ "vol0_annotation_file_to_mask.json",
"r",
)
)
# This needs to not check for mask_annotation_ids_mapping as these
# are randomly generated
[
frame.get("raster_layer", {}).pop("mask_annotation_ids_mapping")
for frame in output_json_string["annotations"][0]["frames"].values()
]
[
frame.get("raster_layer", {}).pop("mask_annotation_ids_mapping")
for frame in expected_json_string["annotations"][0]["frames"].values()
]
assert (
output_json_string["annotations"][0]["frames"]
== expected_json_string["annotations"][0]["frames"]
)


def serialise_annotation_file(
annotation_file: AnnotationFile, as_dict
) -> Union[str, dict]:
Expand Down
Binary file modified tests/data.zip
Binary file not shown.
Loading