Skip to content

Commit

Permalink
Merge pull request #614 from psadil/fix/acqtime-for-xa
Browse files Browse the repository at this point in the history
Allow filling of acq_time when AcquisitionDate AcquisitionTime missing
  • Loading branch information
yarikoptic authored Mar 17, 2023
2 parents 71d87c0 + 74e3fcc commit b2e5d03
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 22 deletions.
17 changes: 6 additions & 11 deletions heudiconv/bids.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from random import sample
from glob import glob
import errno
import typing
import warnings

from .external.pydicom import dcm
Expand All @@ -32,6 +33,7 @@
remove_prefix,
)
from . import __version__
from . import dicoms

lgr = logging.getLogger(__name__)

Expand Down Expand Up @@ -461,7 +463,7 @@ def add_rows_to_scans_keys_file(fn, newrows):
writer.writerows([header] + data_rows_sorted)


def get_formatted_scans_key_row(dcm_fn):
def get_formatted_scans_key_row(dcm_fn) -> typing.List[str]:
"""
Parameters
----------
Expand All @@ -474,15 +476,8 @@ def get_formatted_scans_key_row(dcm_fn):
"""
dcm_data = dcm.read_file(dcm_fn, stop_before_pixels=True, force=True)
# we need to store filenames and acquisition times
# parse date and time of start of run acquisition and get it into isoformat
try:
date = dcm_data.AcquisitionDate
time = dcm_data.AcquisitionTime
acq_time = get_datetime(date, time)
except (AttributeError, ValueError) as exc:
lgr.warning("Failed to get date/time for the content: %s", str(exc))
acq_time = ''
# we need to store filenames and acquisition datetimes
acq_datetime = dicoms.get_datetime_from_dcm(dcm_data=dcm_data)
# add random string
# But let's make it reproducible by using all UIDs
# (might change across versions?)
Expand All @@ -495,7 +490,7 @@ def get_formatted_scans_key_row(dcm_fn):
perfphys = dcm_data.PerformingPhysicianName
except AttributeError:
perfphys = ''
row = [acq_time, perfphys, randstr]
row = [acq_datetime.isoformat() if acq_datetime else "", perfphys, randstr]
# empty entries should be 'n/a'
# https://github.com/dartmouth-pbs/heudiconv/issues/32
row = ['n/a' if not str(e) else e for e in row]
Expand Down
93 changes: 82 additions & 11 deletions heudiconv/dicoms.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
# dicom operations
import datetime
import os
import os.path as op
import logging
from collections import OrderedDict
import tarfile

from typing import List, Optional

from .external.pydicom import dcm
from .utils import (
get_typed_attr,
Expand Down Expand Up @@ -314,19 +317,87 @@ def group_dicoms_into_seqinfos(files, grouping, file_filter=None,
return seqinfos


def get_dicom_series_time(dicom_list):
"""Get time in seconds since epoch from dicom series date and time
Primarily to be used for reproducible time stamping
def get_reproducible_int(dicom_list: List[str]) -> int:
"""Get integer that can be used to reproducibly sort input DICOMs, which is based on when they were acquired.
Parameters
----------
dicom_list : List[str]
Paths to existing DICOM files
Returns
-------
int
An integer relating to when the DICOM was acquired
Raises
------
AssertionError
Notes
-----
1. When date and time for can be read (see :func:`get_datetime_from_dcm`), return
that value as time in seconds since epoch (i.e., Jan 1 1970).
2. In cases where a date/time/datetime is not available (e.g., anonymization stripped this info), return
epoch + AcquisitionNumber (in seconds), which is AcquisitionNumber as an integer
3. If 1 and 2 are not possible, then raise AssertionError and provide message about missing information
Cases are based on only the first element of the dicom_list.
"""
import time
import calendar

dicom = dcm.read_file(dicom_list[0], stop_before_pixels=True, force=True)
dcm_date = dicom.SeriesDate # YYYYMMDD
dcm_time = dicom.SeriesTime # HHMMSS.MICROSEC
dicom_time_str = dcm_date + dcm_time.split('.', 1)[0] # YYYYMMDDHHMMSS
# convert to epoch
return calendar.timegm(time.strptime(dicom_time_str, '%Y%m%d%H%M%S'))
dicom_datetime = get_datetime_from_dcm(dicom)
if dicom_datetime:
return calendar.timegm(dicom_datetime.timetuple())

acquisition_number = dicom.get('AcquisitionNumber')
if acquisition_number:
return int(acquisition_number)

raise AssertionError(
"No metadata found that can be used to sort DICOMs reproducibly. Was header information erased?"
)


def get_datetime_from_dcm(dcm_data: dcm.FileDataset) -> Optional[datetime.datetime]:
"""Extract datetime from filedataset, or return None is no datetime information found.
Parameters
----------
dcm_data : dcm.FileDataset
DICOM with header, e.g., as ready by pydicom.dcmread.
Objects with __getitem__ and have those keys with values properly formatted may also work
Returns
-------
Optional[datetime.datetime]
One of several datetimes that are related to when the scan occurred, or None if no datetime can be found
Notes
------
The following fields are checked in order
1. AcquisitionDate & AcquisitionTime (0008,0022); (0008,0032)
2. AcquisitionDateTime (0008,002A);
3. SeriesDate & SeriesTime (0008,0021); (0008,0031)
"""
acq_date = dcm_data.get("AcquisitionDate")
acq_time = dcm_data.get("AcquisitionTime")
if not (acq_date is None or acq_time is None):
return datetime.datetime.strptime(acq_date+acq_time, "%Y%m%d%H%M%S.%f")

acq_dt = dcm_data.get("AcquisitionDateTime")
if not acq_dt is None:
return datetime.datetime.strptime(acq_dt, "%Y%m%d%H%M%S.%f")

series_date = dcm_data.get("SeriesDate")
series_time = dcm_data.get("SeriesTime")
if not (series_date is None or series_time is None):
return datetime.datetime.strptime(series_date+series_time, "%Y%m%d%H%M%S.%f")


def compress_dicoms(dicom_list, out_prefix, tempdirs, overwrite):
Expand Down Expand Up @@ -364,9 +435,9 @@ def compress_dicoms(dicom_list, out_prefix, tempdirs, overwrite):
# Solution from DataLad although ugly enough:

dicom_list = sorted(dicom_list)
dcm_time = get_dicom_series_time(dicom_list)
dcm_time = get_reproducible_int(dicom_list)

def _assign_dicom_time(ti):
def _assign_dicom_time(ti: tarfile.TarInfo) -> tarfile.TarInfo:
# Reset the date to match the one from the dicom, not from the
# filesystem so we could sort reproducibly
ti.mtime = dcm_time
Expand Down
Binary file not shown.
87 changes: 87 additions & 0 deletions heudiconv/tests/test_dicoms.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import os.path as op
import json
from glob import glob
Expand All @@ -12,6 +13,8 @@
embed_dicom_and_nifti_metadata,
group_dicoms_into_seqinfos,
parse_private_csa_header,
get_datetime_from_dcm,
get_reproducible_int
)
from .utils import (
assert_cwd_unchanged,
Expand Down Expand Up @@ -85,3 +88,87 @@ def test_group_dicoms_into_seqinfos(tmpdir):
assert type(seqinfo) is OrderedDict
assert len(seqinfo) == len(dcmfiles)
assert [s.series_description for s in seqinfo] == ['AAHead_Scout_32ch-head-coil', 'PhoenixZIPReport']


def test_get_datetime_from_dcm_from_acq_date_time():
typical_dcm = dcm.dcmread(op.join(TESTS_DATA_PATH, 'phantom.dcm'), stop_before_pixels=True)

# do we try to grab from AcquisitionDate/AcquisitionTime first when available?
dt = get_datetime_from_dcm(typical_dcm)
assert (
dt == datetime.datetime.strptime(
typical_dcm.get("AcquisitionDate") + typical_dcm.get("AcquisitionTime"),
"%Y%m%d%H%M%S.%f"
)
)


def test_get_datetime_from_dcm_from_acq_datetime():
# if AcquisitionDate and AcquisitionTime not there, can we rely on AcquisitionDateTime?
XA30_enhanced_dcm = dcm.dcmread(
op.join(TESTS_DATA_PATH, 'MRI_102TD_PHA_S.MR.Chen_Matthews_1.3.1.2022.11.16.15.50.20.357.31204541.dcm'),
stop_before_pixels=True
)
dt = get_datetime_from_dcm(XA30_enhanced_dcm)
assert (dt == datetime.datetime.strptime(XA30_enhanced_dcm.get("AcquisitionDateTime"), "%Y%m%d%H%M%S.%f"))


def test_get_datetime_from_dcm_from_only_series_date_time():
# if acquisition date/time/datetime not available, can we rely on SeriesDate & SeriesTime?
XA30_enhanced_dcm = dcm.dcmread(
op.join(TESTS_DATA_PATH, 'MRI_102TD_PHA_S.MR.Chen_Matthews_1.3.1.2022.11.16.15.50.20.357.31204541.dcm'),
stop_before_pixels=True
)
del XA30_enhanced_dcm.AcquisitionDateTime
dt = get_datetime_from_dcm(XA30_enhanced_dcm)
assert (
dt == datetime.datetime.strptime(
XA30_enhanced_dcm.get("SeriesDate") + XA30_enhanced_dcm.get("SeriesTime"),
"%Y%m%d%H%M%S.%f"
)
)

def test_get_datetime_from_dcm_wo_dt():
# if there's no known source (e.g., after anonymization), are we still able to proceed?
XA30_enhanced_dcm = dcm.dcmread(
op.join(TESTS_DATA_PATH, 'MRI_102TD_PHA_S.MR.Chen_Matthews_1.3.1.2022.11.16.15.50.20.357.31204541.dcm'),
stop_before_pixels=True
)
del XA30_enhanced_dcm.AcquisitionDateTime
del XA30_enhanced_dcm.SeriesDate
del XA30_enhanced_dcm.SeriesTime
assert get_datetime_from_dcm(XA30_enhanced_dcm) is None


def test_get_reproducible_int():
dcmfile = op.join(TESTS_DATA_PATH, 'phantom.dcm')

assert type(get_reproducible_int([dcmfile])) is int


@pytest.fixture
def test_get_reproducible_int_wo_dt(tmp_path):
# can this function return an int when we don't have any usable dates?
typical_dcm = dcm.dcmread(op.join(TESTS_DATA_PATH, 'phantom.dcm'), stop_before_pixels=True)
del typical_dcm.SeriesDate
del typical_dcm.AcquisitionDate
dcm.dcmwrite(tmp_path, typical_dcm)

assert type(get_reproducible_int([tmp_path])) is int


@pytest.fixture
def test_get_reproducible_int_raises_assertion_wo_dt(tmp_path):
# if there's no known source (e.g., after anonymization), is AssertionError Raised?
XA30_enhanced_dcm = dcm.dcmread(
op.join(TESTS_DATA_PATH, 'MRI_102TD_PHA_S.MR.Chen_Matthews_1.3.1.2022.11.16.15.50.20.357.31204541.dcm'),
stop_before_pixels=True
)
del XA30_enhanced_dcm.AcquisitionDateTime
del XA30_enhanced_dcm.SeriesDate
del XA30_enhanced_dcm.SeriesTime
dcm.dcmwrite(tmp_path, dataset=XA30_enhanced_dcm)
with pytest.raises(AssertionError):
get_reproducible_int([tmp_path])


0 comments on commit b2e5d03

Please sign in to comment.