From c28cee3452fb53534bb110fbcc03b9f9f8f02a65 Mon Sep 17 00:00:00 2001 From: Tony Pan Date: Fri, 14 Jun 2024 13:36:55 -0400 Subject: [PATCH 1/6] FIX: potentially overflow for some test sets. Now explicitly scale the data to the dynamic range of the value representation. Uses ADC resolution (sensitivity), gain (sensitivityCorrection) and baseline (0 offset). --- .../formats/dcm_utils/dcm_waveform_reader.py | 20 ++-- .../formats/dcm_utils/dcm_waveform_writer.py | 94 +++++++++++++++---- waveform_benchmark/formats/dicom.py | 25 ++++- 3 files changed, 108 insertions(+), 31 deletions(-) diff --git a/waveform_benchmark/formats/dcm_utils/dcm_waveform_reader.py b/waveform_benchmark/formats/dcm_utils/dcm_waveform_reader.py index 4a4afdd..a2c5887 100644 --- a/waveform_benchmark/formats/dcm_utils/dcm_waveform_reader.py +++ b/waveform_benchmark/formats/dcm_utils/dcm_waveform_reader.py @@ -180,15 +180,21 @@ def get_multiplex_array(fileobj : BinaryIO, baseline = float(ch.ChannelBaseline) sensitivity = float(ch.ChannelSensitivity) correction = float(ch.ChannelSensitivityCorrectionFactor) + # nominal = v * gain = (encoded * sensitivity - baseline) - see reader. + # v = nominal * correction adjustment = sensitivity * correction - if (adjustment != 1.0) and (baseline != 0.0): - arr[ch_idx, ...] = np.where(arr[ch_idx, ...] == padding_value, np.nan, arr[ch_idx, ...] * adjustment + baseline) - elif (adjustment != 1.0): - arr[ch_idx, ...] = np.where(arr[ch_idx, ...] == padding_value, np.nan, arr[ch_idx, ...] * adjustment) - elif (baseline != 0.0): - arr[ch_idx, ...] = np.where(arr[ch_idx, ...] == padding_value, np.nan, arr[ch_idx, ...] + baseline) + base = baseline * correction + # print(" reading ", ch_idx, sensitivity, baseline, correction, adjustment, base) + if (adjustment != 1.0): + if (base != 0.0): + arr[ch_idx, ...] = np.where(arr[ch_idx, ...] == padding_value, np.nan, arr[ch_idx, ...] * adjustment - base) + else: + arr[ch_idx, ...] = np.where(arr[ch_idx, ...] == padding_value, np.nan, arr[ch_idx, ...] * adjustment) else: - arr[ch_idx, ...] = np.where(arr[ch_idx, ...] == padding_value, np.nan, arr[ch_idx, ...]) + if (base != 0.0): + arr[ch_idx, ...] = np.where(arr[ch_idx, ...] == padding_value, np.nan, arr[ch_idx, ...] - base) + else: + arr[ch_idx, ...] = np.where(arr[ch_idx, ...] == padding_value, np.nan, arr[ch_idx, ...]) return cast("np.ndarray", arr) diff --git a/waveform_benchmark/formats/dcm_utils/dcm_waveform_writer.py b/waveform_benchmark/formats/dcm_utils/dcm_waveform_writer.py index 980d591..3cdcf7e 100644 --- a/waveform_benchmark/formats/dcm_utils/dcm_waveform_writer.py +++ b/waveform_benchmark/formats/dcm_utils/dcm_waveform_writer.py @@ -8,12 +8,14 @@ import math from datetime import datetime +import decimal import warnings -# warnings.filterwarnings("error") from waveform_benchmark.formats.base import BaseFormat +# dicom3tools currently does NOT validate the IODs for waveform. IT does validate the referencedSOPClassUIDInFile in DICOMDIR file. + # types of waveforms and constraints: # https://dicom.nema.org/medical/dicom/current/output/chtml/part03/PS3.3.html # https://dicom.nema.org/medical/dicom/current/output/chtml/part17/chapter_C.html (data organization, and use cases) @@ -80,20 +82,20 @@ class DICOMWaveformVR: class DICOMWaveform8(DICOMWaveformVR): WaveformBitsAllocated = 8 WaveformSampleInterpretation = "SB" - PaddingValue = int(-128) PythonDatatype = np.int8 + PaddingValue = np.iinfo(PythonDatatype).min class DICOMWaveform16(DICOMWaveformVR): WaveformBitsAllocated = 16 WaveformSampleInterpretation = "SS" - PaddingValue = int(-32768) PythonDatatype = np.int16 + PaddingValue = np.iinfo(PythonDatatype).min class DICOMWaveform32(DICOMWaveformVR): WaveformBitsAllocated = 32 WaveformSampleInterpretation = "SL" - PaddingValue = int(-2147483648) PythonDatatype = np.int32 + PaddingValue = np.iinfo(PythonDatatype).min # relevant definitions: @@ -259,14 +261,14 @@ def __init__(self, hifi: bool = False, num_channels: int = None): # self.VR = DICOMWaveform8 # 8bit allowed, but gain may be too high for 8 bit self.VR = DICOMWaveform16 - storage_uid = uid.RespiratoryWaveformStorage + self.storage_uid = uid.RespiratoryWaveformStorage elif num_channels > 1: if hifi: self.VR = DICOMWaveform32 else: self.VR = DICOMWaveform16 - storage_uid = uid.MultichannelRespiratoryWaveformStorage - + self.storage_uid = uid.MultichannelRespiratoryWaveformStorage + modality = 'RESP' channel_coding = { 'RESP': {'group': 1, 'scheme': 'SCPECG', 'value': '5.6.3-9-01', 'meaning': 'Respiration'}, @@ -448,11 +450,12 @@ def set_waveform_acquisition_info(self, dataset, # channel_chunks is a list of tuples (channel, chunk). - # + # minmax is a dict of channel to (min, max) def create_multiplexed_chunk(self, waveforms: dict, iod: DICOMWaveformIOD, group: int, channel_chunk: list, + minmax: dict, start_time: float, end_time: float): @@ -525,6 +528,11 @@ def create_multiplexed_chunk(self, waveforms: dict, dtype=iod.VR.PythonDatatype) # now collect the chunks into an array, generate the multiplex group, and increment the chunk ids. unprocessed_chunks.sort(key=lambda x: (x[0], x[3], x[4])) # ordered by channel + # using 90% of the dynamic range to avoid overflow + out_min = np.round(float(iod.VR.PaddingValue) * 0.9) + out_max = np.round(float(np.iinfo(iod.VR.PythonDatatype).max) * 0.9) + + gains = {} for channel, chunk_id, start_src, start_target, end_target in unprocessed_chunks: chunk = waveforms[channel]['chunks'][chunk_id] @@ -537,15 +545,39 @@ def create_multiplexed_chunk(self, waveforms: dict, # values is in original type. nan replaced with PaddingValue then will be multiplied by gain, so divide here to avoid overflow. # values = np.nan_to_num(np.frombuffer(chunk['samples'][start_src:end_src], dtype=np.dtype(chunk['samples'].dtype)), # nan = float(iod.VR.PaddingValue) / float(chunk['gain']) ) + # per dicom standard + # channelSensitivity is gain * adc resolution + # datavalue * channelsensitivity = nominal value in unit specified. should match input. + # nominal value * sensitivity correction factor = actual value. + # + + # get the input values v = np.frombuffer(chunk['samples'][start_src:end_src], dtype=np.dtype(chunk['samples'].dtype)) - gain = float(chunk['gain']) - values = np.where(np.isnan(v), float(iod.VR.PaddingValue), v * gain) - - + min_v = float(minmax[channel][0]) + max_v = float(minmax[channel][1]) + + # sensitivity, baseline, and scaling factor: + # encoded = (input - min(input)) / (max(input) - min(input)) * (max(output) - min(output)) + min(output) + # = input * scale1 + min(output) - min(input) * scale1 + # = input * scale1 + baseline + # where scale1 = (max(output) - min(output)) / (max(input) - min(input), + scale1 = (out_max - out_min) / (max_v - min_v) + # baseline = min(output) - min(input) * scale1 + base1 = out_min - min_v * scale1 + chan_id = channels[channel] + samples[chan_id][start_target:end_target] = np.where(np.isnan(v), float(iod.VR.PaddingValue), np.round(v * scale1 + base1, decimals=0)).astype(iod.VR.PythonDatatype) + # nominal data = input * gain = (encoded - baseline) / scale1 * gain + # = encoded * scale2 - baseline2 + # where scale2 = gain / scale1, baseline2 = baseline * scale2 + + if channel not in gains.keys(): + gains[channel] = set() + gains[channel].add(float(chunk['gain'])) + # write out in integer format # samples[chan_id][start_target:end_target] = np.round(values * float(chunk['gain']), decimals=0).astype(iod.VR.PythonDatatype) - samples[chan_id][start_target:end_target] = np.round(values, decimals=0).astype(iod.VR.PythonDatatype) + # samples[chan_id][start_target:end_target] = np.round(values, decimals=0).astype(iod.VR.PythonDatatype) # print("chunk shape:", chunk['samples'].shape) # print("values shape: ", values.shape) # print("samples shape: ", samples.shape) @@ -579,7 +611,21 @@ def create_multiplexed_chunk(self, waveforms: dict, source.CodingSchemeVersion = "unknown" source.CodeMeaning = channel - chdef.ChannelSensitivity = 1.0 + + if len(gains[channel]) > 1: + print("ERROR: Different gains for the same channel is not supported. ", gains[channel]) + gain = gains[channel].pop() + + # actual data = nominal data / gain. + # baseline: standards def: offset of encoded sample value 0 from actual (nonimal) 0 in same unit as nominal + # set as baseline2 + # sensitivity = scale2 + # sensitivity correction factor would be 1/gain, so we can recover gain. + sens_corr = str(decimal.Decimal(1.0 / gain)) + + scale1inv = (minmax[channel][1] - minmax[channel][0]) / (out_max - out_min) + sensitivity = str(decimal.Decimal(gain * scale1inv)) + chdef.ChannelSensitivity = sensitivity if len(sensitivity) <= 16 else sensitivity[:16] # gain and ADC resolution goes here chdef.ChannelSensitivityUnitsSequence = [Dataset()] units = chdef.ChannelSensitivityUnitsSequence[0] @@ -590,14 +636,20 @@ def create_multiplexed_chunk(self, waveforms: dict, units.CodeMeaning = UCUM_ENCODING[unit] # this needs to be fixed. # multiplier to apply to the encoded value to get back the orginal input. - ds = str(float(1.0) / float(chunk['gain'])) - chdef.ChannelSensitivityCorrectionFactor = ds if len(ds) <= 16 else ds[:16] - chdef.ChannelBaseline = '0' + chdef.ChannelSensitivityCorrectionFactor = sens_corr if len(sens_corr) <= 16 else sens_corr[:16] + # Offset of encoded sample value 0 from actual 0 using the units defined in the Channel Sensitivity Units Sequence (003A,0211). + # baseline2 = baseline * scale2 = baseline * gain * scale1inv = (min(output) - min(input) * scale1) * gain * scale1inv + # = min(output) * gain * scale1inv - min(input) * gain + # = min(output) * sensitivity - min(input) * gain + # nom data = encoded * scale2 - baseline2 + baseln = str(decimal.Decimal((out_min * scale1inv - minmax[channel][0]) * gain)) + chdef.ChannelBaseline = baseln if len(baseln) <= 16 else baseln[:16] + chdef.WaveformBitsStored = iod.VR.WaveformBitsAllocated # only for amplifier type of AC # chdef.FilterLowFrequency = '0.05' # chdef.FilterHighFrequency = '300' - channeldefs[channel] = chdef + channeldefs[channel] = chdef wfDS.ChannelDefinitionSequence = channeldefs.values() @@ -608,10 +660,12 @@ def create_multiplexed_chunk(self, waveforms: dict, return wfDS + # minmax is dictionary of channel to (min, max) values def add_waveform_chunks_multiplexed(self, dataset, iod: DICOMWaveformIOD, chunk_info: dict, - waveforms: dict): + waveforms: dict, + minmax: dict): # dicom waveform -> sequence of multiplex group -> channels with same sampling frequency # multiplex group can have labels. @@ -629,7 +683,7 @@ def add_waveform_chunks_multiplexed(self, dataset, for group, chanchunks in grouped_channels.items(): # input to this is [(channel, chunk), ... ] - multiplexGroupDS = self.create_multiplexed_chunk(waveforms, iod, group, chanchunks, + multiplexGroupDS = self.create_multiplexed_chunk(waveforms, iod, group, chanchunks, minmax, start_time=start_time, end_time=end_time) dataset.WaveformSequence.append(multiplexGroupDS) diff --git a/waveform_benchmark/formats/dicom.py b/waveform_benchmark/formats/dicom.py index 98b196f..107e5bb 100644 --- a/waveform_benchmark/formats/dicom.py +++ b/waveform_benchmark/formats/dicom.py @@ -16,6 +16,8 @@ import pandas as pd import pprint +import uuid + import warnings # warnings.filterwarnings("error") from pydicom.fileset import FileSet, RecordNode @@ -454,6 +456,17 @@ def _pretty_print(self, table: dict): print(key, ": ", value['start_t'], " ", value['end_t']) for k, v in value['channel_chunk'].items(): print(" ", k, v) + + # get channel min and max values, across chunks. + def _get_waveform_channel_minmax(self, waveforms): + minmax = {} + for channel, wf in waveforms.items(): + mins = [ np.nanmin(chunk['samples']) for chunk in wf['chunks'] ] + maxs = [ np.nanmax(chunk['samples']) for chunk in wf['chunks'] ] + + minmax[channel] = (np.nanmin(mins), np.nanmax(maxs)) + return minmax + def write_waveforms(self, path, waveforms): fs = FileSet() @@ -483,7 +496,8 @@ def write_waveforms(self, path, waveforms): subchunks1 = self.split_chunks_temporal_merged(channel_table) # print("merged", len(subchunks1)) # self._pretty_print(subchunks1) - + + minmax = self._get_waveform_channel_minmax(waveforms) #========== now write out ============= # count channels belonging to respiratory data this is needed for the iod @@ -512,7 +526,7 @@ def write_waveforms(self, path, waveforms): dicom = self.writer.set_study_info(dicom, studyUID = studyInstanceUID, studyDate = datetime.now()) dicom = self.writer.set_series_info(dicom, iod, seriesUID=seriesInstanceUID) dicom = self.writer.set_waveform_acquisition_info(dicom, instanceNumber = file_id) - dicom = self.writer.add_waveform_chunks_multiplexed(dicom, iod, chunk_info, waveforms) + dicom = self.writer.add_waveform_chunks_multiplexed(dicom, iod, chunk_info, waveforms, minmax) # Save DICOM file. write_like_original is required # these the initial path when added - it points to a temp file. @@ -522,6 +536,7 @@ def write_waveforms(self, path, waveforms): record = Dataset() record.DirectoryRecordType = "WAVEFORM" record.ReferencedSOPInstanceUIDInFile = dicom.SOPInstanceUID + record.ReferencedSOPClassUIDInFile = dicom.SOPClassUID record.InstanceNumber = dicom.InstanceNumber record.ContentDate = dicom.ContentDate record.ContentTime = dicom.ContentTime @@ -580,6 +595,8 @@ def write_waveforms(self, path, waveforms): # fs.copy(path) # print(path) fs.write(path) + # copy to a directory with randomly generated name + # fs.copy('/mnt/c/Users/tcp19/BACKUP/dicom_waveform/' + str(uuid.uuid4())) def read_waveforms(self, path, start_time, end_time, signal_names): # have to read the whole data set each time if using dcmread. this is not efficient. @@ -787,13 +804,13 @@ class DICOMHighBitsChunked(DICOMHighBits): # waveform lead names to dicom IOD mapping. Incomplete. # avoiding 12 lead ECG because of the limit in number of samples. - chunkSize = 86400.0 # chunk as 1 day. + chunkSize = 3600.0 # chunk as 1 hr. hifi = True class DICOMLowBitsChunked(DICOMLowBits): - chunkSize = 86400.0 + chunkSize = 3600.0 hifi = False From 27bcba190c80c025259fe0d9445fca8bd5663850 Mon Sep 17 00:00:00 2001 From: Tony Pan Date: Fri, 12 Jul 2024 12:29:05 -0400 Subject: [PATCH 2/6] ENH: updated to support 16 bit only --- .../formats/dcm_utils/dcm_waveform_writer.py | 34 +++++++-------- waveform_benchmark/formats/dicom.py | 42 ++++++++++++------- 2 files changed, 44 insertions(+), 32 deletions(-) diff --git a/waveform_benchmark/formats/dcm_utils/dcm_waveform_writer.py b/waveform_benchmark/formats/dcm_utils/dcm_waveform_writer.py index 3cdcf7e..a60be19 100644 --- a/waveform_benchmark/formats/dcm_utils/dcm_waveform_writer.py +++ b/waveform_benchmark/formats/dcm_utils/dcm_waveform_writer.py @@ -133,7 +133,7 @@ class DICOMWaveformIOD: class TwelveLeadECGWaveform(DICOMWaveformIOD): - def __init__(self, hifi: bool = False, num_channels: int = None): + def __init__(self, bits: int = 16, num_channels: int = None): pass VR = DICOMWaveform16 @@ -158,8 +158,8 @@ def __init__(self, hifi: bool = False, num_channels: int = None): # 4 groups, 1 to 24 channels each. unknown sample count limit., f in 200-1000 class GeneralECGWaveform(DICOMWaveformIOD): - def __init__(self, hifi: bool = False, num_channels: int = None): - if hifi: + def __init__(self, bits: int = 16, num_channels: int = None): + if bits > 16: self.VR = DICOMWaveform32 self.storage_uid = '1.2.840.10008.5.1.4.1.1.9.1.4' else: @@ -186,7 +186,7 @@ def __init__(self, hifi: bool = False, num_channels: int = None): class AmbulatoryECGWaveform(DICOMWaveformIOD): - def __init__(self, hifi: bool = False, num_channels: int = None): + def __init__(self, bits : int = 16, num_channels: int = None): pass # 8bit allowed, but gain may be too high for 8 bit @@ -198,7 +198,7 @@ def __init__(self, hifi: bool = False, num_channels: int = None): } class CardiacElectrophysiologyWaveform(DICOMWaveformIOD): - def __init__(self, hifi: bool = False, num_channels: int = None): + def __init__(self, bits : int = 16, num_channels: int = None): pass VR = DICOMWaveform16 @@ -210,7 +210,7 @@ def __init__(self, hifi: bool = False, num_channels: int = None): # max 4 sequences, up to 8 channels each, num of samples limited by waveform maxsize. f < 400, class HemodynamicWaveform(DICOMWaveformIOD): - def __init__(self, hifi: bool = False, num_channels: int = None): + def __init__(self, bits : int = 16, num_channels: int = None): pass VR = DICOMWaveform16 @@ -232,8 +232,8 @@ def __init__(self, hifi: bool = False, num_channels: int = None): # max 1 sequence, 1 wave each. unknown sample count limit. f < 600. class ArterialPulseWaveform(DICOMWaveformIOD): - def __init__(self, hifi: bool = False, num_channels: int = None): - # if hifi: + def __init__(self, bits : int = 16, num_channels: int = None): + # if bits > 8: # self.VR = DICOMWaveform16 # else: # self.VR = DICOMWaveform8 @@ -253,9 +253,9 @@ def __init__(self, hifi: bool = False, num_channels: int = None): # different IOD for multiple channels. 1 multplex group, 1 channel each. unknown sample count limit. f < 100. class RespiratoryWaveform(DICOMWaveformIOD): - def __init__(self, hifi: bool = False, num_channels: int = None): + def __init__(self, bits : int = 16, num_channels: int = None): if num_channels <= 1: - # if hifi: + # if bits > 8: # self.VR = DICOMWaveform16 # else: # self.VR = DICOMWaveform8 @@ -263,7 +263,7 @@ def __init__(self, hifi: bool = False, num_channels: int = None): self.VR = DICOMWaveform16 self.storage_uid = uid.RespiratoryWaveformStorage elif num_channels > 1: - if hifi: + if bits > 16: self.VR = DICOMWaveform32 else: self.VR = DICOMWaveform16 @@ -280,8 +280,8 @@ def __init__(self, hifi: bool = False, num_channels: int = None): } class RoutineScalpEEGWaveform(DICOMWaveformIOD): - def __init__(self, hifi: bool = False, num_channels: int = None): - if hifi: + def __init__(self, bits : int = 16, num_channels: int = None): + if bits > 16: self.VR = DICOMWaveform32 else: self.VR = DICOMWaveform16 @@ -294,8 +294,8 @@ def __init__(self, hifi: bool = False, num_channels: int = None): # unlimited number of multiplex groups, up to 64 channels each. sample size and f unconstrained. class SleepEEGWaveform(DICOMWaveformIOD): - def __init__(self, hifi: bool = False, num_channels: int = None): - if hifi: + def __init__(self, bits : int = 16, num_channels: int = None): + if bits > 16: self.VR = DICOMWaveform32 else: self.VR = DICOMWaveform16 @@ -314,8 +314,8 @@ def __init__(self, hifi: bool = False, num_channels: int = None): #unlimited multiplex groups, up to 64 channels each. sample size and f unconstrained. class ElectromyogramWaveform(DICOMWaveformIOD): - def __init__(self, hifi: bool = False, num_channels: int = None): - if hifi: + def __init__(self, bits : int = 16, num_channels: int = None): + if bits > 16: self.VR = DICOMWaveform32 else: self.VR = DICOMWaveform16 diff --git a/waveform_benchmark/formats/dicom.py b/waveform_benchmark/formats/dicom.py index 4d8261b..a874dff 100644 --- a/waveform_benchmark/formats/dicom.py +++ b/waveform_benchmark/formats/dicom.py @@ -424,21 +424,21 @@ def split_chunks_temporal_fixed(self, chunk_table: pd.DataFrame, duration_sec: f return out - def make_iod(self, iod_name: str, hifi: bool, num_channels: int = 1): + def make_iod(self, iod_name: str, bits: int, num_channels: int = 1): if iod_name == "GeneralECGWaveform": - return dcm_writer.GeneralECGWaveform(hifi=hifi) + return dcm_writer.GeneralECGWaveform(bits=bits) elif iod_name == "AmbulatoryECGWaveform": - return dcm_writer.AmbulatoryECGWaveform(hifi=hifi) + return dcm_writer.AmbulatoryECGWaveform(bits=bits) elif iod_name == "SleepEEGWaveform": - return dcm_writer.SleepEEGWaveform(hifi=hifi) + return dcm_writer.SleepEEGWaveform(bits=bits) elif iod_name == "ElectromyogramWaveform": - return dcm_writer.ElectromyogramWaveform(hifi=hifi) + return dcm_writer.ElectromyogramWaveform(bits=bits) elif iod_name == "ArterialPulseWaveform": - return dcm_writer.ArterialPulseWaveform(hifi=hifi) + return dcm_writer.ArterialPulseWaveform(bits=bits) elif iod_name == "RespiratoryWaveform": - return dcm_writer.RespiratoryWaveform(hifi=hifi, num_channels=num_channels) + return dcm_writer.RespiratoryWaveform(bits=bits, num_channels=num_channels) elif iod_name == "HemodynamicWaveform": - return dcm_writer.HemodynamicWaveform(hifi=hifi) + return dcm_writer.HemodynamicWaveform(bits=bits) else: raise ValueError("Unknown IOD") @@ -506,7 +506,7 @@ def write_waveforms(self, path, waveforms): # print("writing ", iod_name, ", ", file_id) # create and iod instance - iod = self.make_iod(iod_name, hifi=self.hifi, num_channels = count_per_iod[iod_name]) + iod = self.make_iod(iod_name, bits=self.bits, num_channels = count_per_iod[iod_name]) # each multiplex group can have its own frequency # but if there are different frequencies for channels in a multiplex group, we need to split. @@ -780,28 +780,36 @@ class DICOMHighBits(BaseDICOMFormat): writer = dcm_writer.DICOMWaveformWriter() chunkSize = None # adaptive - hifi = True + bits = 64 # max possible bits class DICOMLowBits(BaseDICOMFormat): writer = dcm_writer.DICOMWaveformWriter() chunkSize = None # adaptive - hifi = False + bits = 0 # min possible bits +class DICOM16Bits(BaseDICOMFormat): + + writer = dcm_writer.DICOMWaveformWriter() + chunkSize = None # adaptive + bits = 16 class DICOMHighBitsChunked(DICOMHighBits): # waveform lead names to dicom IOD mapping. Incomplete. # avoiding 12 lead ECG because of the limit in number of samples. chunkSize = 3600.0 # chunk as 1 hr. - hifi = True class DICOMLowBitsChunked(DICOMLowBits): chunkSize = 3600.0 - hifi = False + + +class DICOM16BitsChunked(DICOM16Bits): + + chunkSize = 3600.0 class DICOMHighBitsMerged(DICOMHighBits): @@ -809,10 +817,14 @@ class DICOMHighBitsMerged(DICOMHighBits): # avoiding 12 lead ECG because of the limit in number of samples. chunkSize = -1 - hifi = True class DICOMLowBitsMerged(DICOMLowBits): chunkSize = -1 - hifi = False + + +class DICOM16BitsMerged(DICOM16Bits): + + chunkSize = -1 + From 8d3662824f1def08bced980d6f83c4408159d371 Mon Sep 17 00:00:00 2001 From: Tony Pan Date: Wed, 17 Jul 2024 10:40:38 -0400 Subject: [PATCH 3/6] FIX: data fidelity - fixed gain/baseline/dynamic range calculations, made float datatypes consistent. FIX: handle multiple gains in a channel - split files by gain, removed 'Merged' chunking scheme as it requires extra work for multiple gains. FIX: handle input signals that are all nan --- .../formats/dcm_utils/dcm_waveform_reader.py | 18 +- .../formats/dcm_utils/dcm_waveform_writer.py | 248 +++++++++++------- waveform_benchmark/formats/dicom.py | 193 ++++++++------ 3 files changed, 283 insertions(+), 176 deletions(-) diff --git a/waveform_benchmark/formats/dcm_utils/dcm_waveform_reader.py b/waveform_benchmark/formats/dcm_utils/dcm_waveform_reader.py index a2c5887..560bd6e 100644 --- a/waveform_benchmark/formats/dcm_utils/dcm_waveform_reader.py +++ b/waveform_benchmark/formats/dcm_utils/dcm_waveform_reader.py @@ -175,27 +175,29 @@ def get_multiplex_array(fileobj : BinaryIO, chseq = cast(list["Dataset"], channel_seqs) # Apply correction factor (if possible) + padd_loc = (arr == padding_value) arr = arr.astype("float") for ch_idx, ch in enumerate(chseq): baseline = float(ch.ChannelBaseline) sensitivity = float(ch.ChannelSensitivity) correction = float(ch.ChannelSensitivityCorrectionFactor) - # nominal = v * gain = (encoded * sensitivity - baseline) - see reader. - # v = nominal * correction + + # print("baseline " + str(baseline) + " sensitivity " + str(sensitivity) + " correction " + str(correction)) + adjustment = sensitivity * correction - base = baseline * correction + base = baseline # print(" reading ", ch_idx, sensitivity, baseline, correction, adjustment, base) if (adjustment != 1.0): if (base != 0.0): - arr[ch_idx, ...] = np.where(arr[ch_idx, ...] == padding_value, np.nan, arr[ch_idx, ...] * adjustment - base) + arr[ch_idx, ...] = np.where(padd_loc[ch_idx, ...], np.nan, arr[ch_idx, ...] * adjustment - base) else: - arr[ch_idx, ...] = np.where(arr[ch_idx, ...] == padding_value, np.nan, arr[ch_idx, ...] * adjustment) + arr[ch_idx, ...] = np.where(padd_loc[ch_idx, ...], np.nan, arr[ch_idx, ...] * adjustment) else: if (base != 0.0): - arr[ch_idx, ...] = np.where(arr[ch_idx, ...] == padding_value, np.nan, arr[ch_idx, ...] - base) + arr[ch_idx, ...] = np.where(padd_loc[ch_idx, ...], np.nan, arr[ch_idx, ...] - base) else: - arr[ch_idx, ...] = np.where(arr[ch_idx, ...] == padding_value, np.nan, arr[ch_idx, ...]) - + arr[ch_idx, ...] = np.where(padd_loc[ch_idx, ...], np.nan, arr[ch_idx, ...]) + # print(ch.ChannelSourceSequence[0].CodeMeaning + " sensitivity " + str(sensitivity) + " gain = " + str(1.0 / sensitivity) + " baseline = " + str(baseline) + " correction = " + str(correction)) return cast("np.ndarray", arr) diff --git a/waveform_benchmark/formats/dcm_utils/dcm_waveform_writer.py b/waveform_benchmark/formats/dcm_utils/dcm_waveform_writer.py index a60be19..1bb97e8 100644 --- a/waveform_benchmark/formats/dcm_utils/dcm_waveform_writer.py +++ b/waveform_benchmark/formats/dcm_utils/dcm_waveform_writer.py @@ -8,7 +8,6 @@ import math from datetime import datetime -import decimal import warnings @@ -73,6 +72,8 @@ # TODO: [x] extract with random access # TODO: [x] merge chunks +# NOTE: float32 IEEE standard represents values like -2147483600.0 as -2147483648.0. +# not an issue for 8bit and 16bit output, for 32bit, use double (64bit) that works. # Value Representation Formats. base class class DICOMWaveformVR: @@ -82,20 +83,23 @@ class DICOMWaveformVR: class DICOMWaveform8(DICOMWaveformVR): WaveformBitsAllocated = 8 WaveformSampleInterpretation = "SB" - PythonDatatype = np.int8 - PaddingValue = np.iinfo(PythonDatatype).min + FileDatatype = np.int8 + FloatDataType = np.float32 + PaddingValue = np.iinfo(FileDatatype).min class DICOMWaveform16(DICOMWaveformVR): WaveformBitsAllocated = 16 WaveformSampleInterpretation = "SS" - PythonDatatype = np.int16 - PaddingValue = np.iinfo(PythonDatatype).min + FileDatatype = np.int16 + FloatDataType = np.float32 + PaddingValue = np.iinfo(FileDatatype).min class DICOMWaveform32(DICOMWaveformVR): WaveformBitsAllocated = 32 WaveformSampleInterpretation = "SL" - PythonDatatype = np.int32 - PaddingValue = np.iinfo(PythonDatatype).min + FileDatatype = np.int32 + FloatDataType = np.float64 + PaddingValue = np.iinfo(FileDatatype).min # relevant definitions: @@ -450,12 +454,10 @@ def set_waveform_acquisition_info(self, dataset, # channel_chunks is a list of tuples (channel, chunk). - # minmax is a dict of channel to (min, max) def create_multiplexed_chunk(self, waveforms: dict, iod: DICOMWaveformIOD, group: int, channel_chunk: list, - minmax: dict, start_time: float, end_time: float): @@ -479,12 +481,11 @@ def create_multiplexed_chunk(self, waveforms: dict, unprocessed_chunks = [] channels = {} + mins = {} + maxs = {} + gains = {} i = 0 for (channel, chunkid) in channel_chunk: - # assign ids for each channel - if channel not in channels.keys(): - channels[channel] = i - i += 1 # use samples - the sample ids are about same as time * freq c_start = waveforms[channel]['chunks'][chunkid]['start_sample'] @@ -498,7 +499,37 @@ def create_multiplexed_chunk(self, waveforms: dict, duration = et - st + # only process if there is finite data. + temp_data = waveforms[channel]['chunks'][chunkid]['samples'][channel_start:channel_start+duration] + if (temp_data is None) or (len(temp_data) == 0) or (not np.any(np.isfinite(temp_data))): + continue + + # assign ids for each channel + if channel not in channels.keys(): + channels[channel] = i + mins[channel] = [] + maxs[channel] = [] + gains[channel] = [] + i += 1 + + mins[channel].append(np.fmin.reduce(temp_data)) + maxs[channel].append(np.fmax.reduce(temp_data)) + gains[channel].append(waveforms[channel]['chunks'][chunkid]['gain']) unprocessed_chunks.append((channel, chunkid, channel_start, target_start, target_start + duration)) + + if len(channels) == 0: + return None + + for c in channels.keys(): + mins[c] = np.fmin.reduce(mins[c]) + maxs[c] = np.fmax.reduce(maxs[c]) + gains[c] = list(set(gains[c])) + if len(gains[c]) > 1: + print("ERROR Channel " + c + " has multiple gains ") + for g in gains[c]: + print("gain: " + str(g)) + raise ValueError("Channel has multiple gains ") + gains[c] = gains[c][0] # create a new multiplex group @@ -525,14 +556,12 @@ def create_multiplexed_chunk(self, waveforms: dict, channeldefs = {} samples = np.full(shape = (len(channels.keys()), chunk_max_len), fill_value = iod.VR.PaddingValue, - dtype=iod.VR.PythonDatatype) + dtype=iod.VR.FileDatatype) # now collect the chunks into an array, generate the multiplex group, and increment the chunk ids. unprocessed_chunks.sort(key=lambda x: (x[0], x[3], x[4])) # ordered by channel - # using 90% of the dynamic range to avoid overflow - out_min = np.round(float(iod.VR.PaddingValue) * 0.9) - out_max = np.round(float(np.iinfo(iod.VR.PythonDatatype).max) * 0.9) - gains = {} + float_type = iod.VR.FloatDataType + for channel, chunk_id, start_src, start_target, end_target in unprocessed_chunks: chunk = waveforms[channel]['chunks'][chunk_id] @@ -542,57 +571,100 @@ def create_multiplexed_chunk(self, waveforms: dict, # print("channel start ", start_src, end_src, start_target, end_target, duration) # QUANTIZE: and pad missing data - # values is in original type. nan replaced with PaddingValue then will be multiplied by gain, so divide here to avoid overflow. - # values = np.nan_to_num(np.frombuffer(chunk['samples'][start_src:end_src], dtype=np.dtype(chunk['samples'].dtype)), - # nan = float(iod.VR.PaddingValue) / float(chunk['gain']) ) - # per dicom standard - # channelSensitivity is gain * adc resolution - # datavalue * channelsensitivity = nominal value in unit specified. should match input. - # nominal value * sensitivity correction factor = actual value. - # + + ## input type + # input is same as "nominal" in dicom parlance. + # baseline = gain * (max + min) / 2 + # digital values are the binary stored data. digital values = input * gain - baseline. this appears to be by convention. + # + ## per dicom standard. + # channelSensitivity is defined to _include_ both gain and adc resolution. but probably not "gain * adc resolution" + # by standards definition, stored value * channelsensitivity = nominal value in unit specified (e.g. mV). + # this means sensitivity = 1/gain. + # further: nominal value * sensitivity correction factor = calibrated value. + # baseline: offset of sample 0 vs actual 0, in nominal value unit. + # + ## harmonizing: + # nominal == input + # baseline = -(max + min)/2 + # sensitivity == gain + # stored value (digital) = nominal * gain + # but may not have enough dynamic range in the stored values + # rounding error of 0.5 can propagate with a low gain. + # scale further so we reach the dynamic range. # get the input values - v = np.frombuffer(chunk['samples'][start_src:end_src], dtype=np.dtype(chunk['samples'].dtype)) - min_v = float(minmax[channel][0]) - max_v = float(minmax[channel][1]) + v = np.frombuffer(chunk['samples'][start_src:end_src], dtype=np.dtype(chunk['samples'].dtype)).astype(float_type) - # sensitivity, baseline, and scaling factor: - # encoded = (input - min(input)) / (max(input) - min(input)) * (max(output) - min(output)) + min(output) - # = input * scale1 + min(output) - min(input) * scale1 - # = input * scale1 + baseline - # where scale1 = (max(output) - min(output)) / (max(input) - min(input), - scale1 = (out_max - out_min) / (max_v - min_v) - # baseline = min(output) - min(input) * scale1 - base1 = out_min - min_v * scale1 + # vmin, vmax, gain = minmax[channel] + vmin = float_type(mins[channel]) + vmax = float_type(maxs[channel]) + gain = float_type(gains[channel]) - chan_id = channels[channel] - samples[chan_id][start_target:end_target] = np.where(np.isnan(v), float(iod.VR.PaddingValue), np.round(v * scale1 + base1, decimals=0)).astype(iod.VR.PythonDatatype) - # nominal data = input * gain = (encoded - baseline) / scale1 * gain - # = encoded * scale2 - baseline2 - # where scale2 = gain / scale1, baseline2 = baseline * scale2 + # print(str(vmin) + "," + str(vmax) + "," + str(gain)) + baseline = (vmin + vmax) * float_type(-0.5) - if channel not in gains.keys(): - gains[channel] = set() - gains[channel].add(float(chunk['gain'])) - - # write out in integer format - # samples[chan_id][start_target:end_target] = np.round(values * float(chunk['gain']), decimals=0).astype(iod.VR.PythonDatatype) - # samples[chan_id][start_target:end_target] = np.round(values, decimals=0).astype(iod.VR.PythonDatatype) - # print("chunk shape:", chunk['samples'].shape) - # print("values shape: ", values.shape) - # print("samples shape: ", samples.shape) + vg_min = (vmin + baseline ) * gain + vg_max = (vmax + baseline ) * gain + vg_mag = np.fmax.reduce([np.abs(vg_min), np.abs(vg_max)]) + # need to leave some room for rounding + dt_mag = float_type(np.fmin.reduce([np.abs(iod.VR.PaddingValue + 1), np.abs(np.iinfo(iod.VR.FileDatatype).max)]) - 1) - # interleave the samples - samplesT = np.transpose(samples) - # print(channel_chunk) - # print("output shape", samplesT.shape) - + # if np.isnan(vmin) or np.isnan(vmax): + # print(v) + # print("min ", vmin, " max ", vmax, " size ", len(v), " num of nans ", np.sum(np.isnan(v))) + # print(np.sum(np.isnan(chunk['samples'][start_src:end_src]))) + # print(channel_chunk) + # print("start ", start_src, " end ", end_src) + + scale = float_type(1.0) + if (vg_mag != 0) and (dt_mag != 0): + scale *= (dt_mag / vg_mag) + + chan_id = channels[channel] + y = np.round((v + baseline) * gain * scale, decimals=0) + # print("min and max " + str(np.fmin.reduce(y)) + "," + str(np.fmax.reduce(y)) + " val max = " + str(float_type(np.iinfo(iod.VR.FileDatatype).max)) + " val min = " + str(float_type(iod.VR.PaddingValue))) + # if np.any(y > float_type(np.iinfo(iod.VR.FileDatatype).max)): + # print("WARNING of max " + channel + "," + str(chunk_id) + " max " + str(float_type(np.iinfo(iod.VR.FileDatatype).max)) + " actual " + str(y[y >= float_type(np.iinfo(iod.VR.FileDatatype).max)][0])) + # if np.any(y < float_type(iod.VR.PaddingValue)): + # print("WARNING of < min " + channel + "," + str(chunk_id) + " min " + str(float_type(np.iinfo(iod.VR.FileDatatype).min)) + " actual " + str(y[y < iod.VR.PaddingValue][0])) + # if np.any(y == float_type(iod.VR.PaddingValue)): + # min_vals = list(set(y[np.where(y == iod.VR.PaddingValue)])) + # min_val = min_vals[0] + # print("WARNING chann " + channel + " of == min " + channel + "," + str(chunk_id) + " min " + str(float_type(np.iinfo(iod.VR.FileDatatype).min)) + " actual " + str(min_val) + " actual cast to float " + str(np.float32(min_val))) + # print(" ," + str(min_val*1.0) + "," + str(float_type(min_val)) + "," + str(min_val) + ", " + str(np.float32(min_val))) + + x = np.where(np.isnan(y), float_type(iod.VR.PaddingValue), y) + # if np.any(np.isnan(x)): + # print("NOTE nan encountered after np.where " + str(iod.VR.PaddingValue) + " baseline " + str(baseline) + " gain " + str(gain) + " scale " + str(scale)) + # if np.any(np.isinf(x)): + # print("NOTE inf encountered after np.where " + str(iod.VR.PaddingValue) + " baseline " + str(baseline) + " gain " + str(gain) + " scale " + str(scale)) + # if not np.all(np.isfinite(x)): + # print("NOTE non-finite encountered " + channel + " after np.where " + str(float_type(iod.VR.PaddingValue)) + " baseline " + str(baseline) + " gain " + str(gain) + " scale " + str(scale)) + + samples[chan_id][start_target:end_target] = x.astype(iod.VR.FileDatatype) - for (channel, chunk_id) in channel_chunk: + for channel in channels.keys(): if channel in channeldefs.keys(): continue - chunk = waveforms[channel]['chunks'][chunk_id] + # recompute the gain, sensitivity, scale, etc for each channel + vmin = float_type(mins[channel]) + vmax = float_type(maxs[channel]) + gain = float_type(gains[channel]) + + baseline = (vmin + vmax) * float_type(-0.5) + vg_min = (vmin + baseline ) * gain + vg_max = (vmax + baseline ) * gain + vg_mag = np.fmax.reduce([np.abs(vg_min), np.abs(vg_max)]) + # need to leave some room for rounding + dt_mag = float_type(np.fmin.reduce([np.abs(iod.VR.PaddingValue+ 1), np.abs(np.iinfo(iod.VR.FileDatatype).max)]) - 1) + # print(channel + "," + str(chunk_id) + " vg_min = " + str(vg_min) + " vg_max = " + str(vg_max) + " baseline = " + str(baseline) + " gain = " + str(gain) + " min = " + str(vmin) + " max = " + str(vmax)) + + scale = float_type(1.0) + if (vg_mag != 0) and (dt_mag != 0): + scale *= (dt_mag / vg_mag) + unit = waveforms[channel]['units'] # create the channel @@ -610,21 +682,11 @@ def create_multiplexed_chunk(self, waveforms: dict, source.CodingSchemeDesignator = iod.channel_coding[channel.upper()]['scheme'] source.CodingSchemeVersion = "unknown" source.CodeMeaning = channel - - if len(gains[channel]) > 1: - print("ERROR: Different gains for the same channel is not supported. ", gains[channel]) - gain = gains[channel].pop() - - # actual data = nominal data / gain. - # baseline: standards def: offset of encoded sample value 0 from actual (nonimal) 0 in same unit as nominal - # set as baseline2 - # sensitivity = scale2 - # sensitivity correction factor would be 1/gain, so we can recover gain. - sens_corr = str(decimal.Decimal(1.0 / gain)) - - scale1inv = (minmax[channel][1] - minmax[channel][0]) / (out_max - out_min) - sensitivity = str(decimal.Decimal(gain * scale1inv)) + sens = float_type(1.0) / gain + sensitivity = str(sens) + if (len(sensitivity) > 16): + sensitivity = str(np.round(sens, decimals=14)) chdef.ChannelSensitivity = sensitivity if len(sensitivity) <= 16 else sensitivity[:16] # gain and ADC resolution goes here chdef.ChannelSensitivityUnitsSequence = [Dataset()] units = chdef.ChannelSensitivityUnitsSequence[0] @@ -636,13 +698,18 @@ def create_multiplexed_chunk(self, waveforms: dict, units.CodeMeaning = UCUM_ENCODING[unit] # this needs to be fixed. # multiplier to apply to the encoded value to get back the orginal input. - chdef.ChannelSensitivityCorrectionFactor = sens_corr if len(sens_corr) <= 16 else sens_corr[:16] - # Offset of encoded sample value 0 from actual 0 using the units defined in the Channel Sensitivity Units Sequence (003A,0211). - # baseline2 = baseline * scale2 = baseline * gain * scale1inv = (min(output) - min(input) * scale1) * gain * scale1inv - # = min(output) * gain * scale1inv - min(input) * gain - # = min(output) * sensitivity - min(input) * gain - # nom data = encoded * scale2 - baseline2 - baseln = str(decimal.Decimal((out_min * scale1inv - minmax[channel][0]) * gain)) + corr = float_type(1.0) / scale + correction = str(corr) + if (len(correction) > 16): + correction = str(np.round(corr, decimals=14)) + chdef.ChannelSensitivityCorrectionFactor = correction if len(correction) <= 16 else correction[:16] # gain and ADC resolution goes here + + baseln = str(baseline) + if (len(baseln) > 16): + if (baseline >= 0.0): + baseln = str(np.round(baseline, decimals=14)) + else: + baseln = str(np.round(baseline, decimals=13)) chdef.ChannelBaseline = baseln if len(baseln) <= 16 else baseln[:16] chdef.WaveformBitsStored = iod.VR.WaveformBitsAllocated @@ -655,7 +722,9 @@ def create_multiplexed_chunk(self, waveforms: dict, wfDS.ChannelDefinitionSequence = channeldefs.values() # actual bytes. arr is a numpy array of shape np.stack((ch1, ch2,...), axis=1) # arr = np.stack([ samples[channel] for channel in channel_chunk.keys()], axis=1) - wfDS.WaveformData = samplesT.tobytes() + + # interleave the samples + wfDS.WaveformData = np.transpose(samples).tobytes() return wfDS @@ -664,8 +733,7 @@ def create_multiplexed_chunk(self, waveforms: dict, def add_waveform_chunks_multiplexed(self, dataset, iod: DICOMWaveformIOD, chunk_info: dict, - waveforms: dict, - minmax: dict): + waveforms: dict): # dicom waveform -> sequence of multiplex group -> channels with same sampling frequency # multiplex group can have labels. @@ -681,19 +749,23 @@ def add_waveform_chunks_multiplexed(self, dataset, unique_groups = set(channel_chunks.values()) grouped_channels = { group: [ key for key, val in channel_chunks.items() if val == group ] for group in unique_groups } + seq_size = 0 for group, chanchunks in grouped_channels.items(): # input to this is [(channel, chunk), ... ] - multiplexGroupDS = self.create_multiplexed_chunk(waveforms, iod, group, chanchunks, minmax, + multiplexGroupDS = self.create_multiplexed_chunk(waveforms, iod, group, chanchunks, start_time=start_time, end_time=end_time) - dataset.WaveformSequence.append(multiplexGroupDS) + if (multiplexGroupDS is not None): + dataset.WaveformSequence.append(multiplexGroupDS) + seq_size += 1 # each unique frequency will have a different label for the multiplex group.. The multiplex group will # have a separate instance contain the channels with the same frequency # each chunk will have channels with the same frequency (with same label). Channels for each multiplex group # will have the same start and end time/ sample ids. - - return dataset - + if (seq_size > 0): + return dataset + else: + return None # dicom value types are constrained by IOD type diff --git a/waveform_benchmark/formats/dicom.py b/waveform_benchmark/formats/dicom.py index a874dff..f235e3d 100644 --- a/waveform_benchmark/formats/dicom.py +++ b/waveform_benchmark/formats/dicom.py @@ -188,6 +188,7 @@ def create_channel_table(self, waveforms) -> pd.DataFrame: 'end_time': chunk['end_time'], # 'start_sample': chunk['start_sample'], # 'end_sample': chunk['end_sample'], + 'gain': chunk['gain'], 'iod': iod.__name__, 'group': group, 'freq_id': None}) @@ -199,12 +200,13 @@ def create_channel_table(self, waveforms) -> pd.DataFrame: 'end_time': chunk['end_time'], # 'start_sample': chunk['start_sample'], # 'end_sample': chunk['end_sample'], + 'gain': chunk['gain'], 'iod': iod.__name__, 'group': group, 'freq_id': None}) df = pd.DataFrame(data) - df.sort_values(by=['iod', 'start_time', 'freq', 'chunk_id'], inplace=True) + df.sort_values(by=['iod', 'start_time', 'freq', 'gain', 'chunk_id'], inplace=True) # assign freq_id, one per frequency within a group sorted = df.groupby(['iod', 'group']) @@ -286,57 +288,58 @@ def split_chunks_temporal_adaptive(self, chunk_table: pd.DataFrame) -> dict: # do group by return out - # input: list of dataframes, each dataframe is a group of channels with same iod, group, and frequency. - # return: dict, each entry is (iod, group, freq, id) = {start_t, end_t, [{(channel, chunk_id): (s_time, e_time), ...]}. each chunk is defined as the max span for a set of channels. - # this is detected via a stack - when stack is empty, a subchunk is created. note that chunkids are not aligned between channels - def split_chunks_temporal_merged(self, chunk_table: pd.DataFrame) -> dict: - # split the chunks so that each is a collection of channels in a group. - # fill missing channels with zeros - # need an ordering of channel starting and ending - # create new table with time stamp, and chunk id (- for start, + for end) - # sort in order: iod, group, freq, timestamp, chunk id - # iterate and push and pop to queue (parenthesis like). create a chunk when queue becomes empty. - - chunk_table.sort_values(by=['iod', 'freq_id', 'start_time', 'chunk_id'], inplace=True) - sorted = chunk_table.groupby(['iod', 'freq_id']) - - out = dict() - file_id = 0 - stime = None - etime = None - stack = [] - channel_chunk_list = dict() # store the (channel, chunk_id) - for (iod, freq_id), df in sorted: - for index, row in df.iterrows(): - # update first - etime = row['end_time'] - if row['chunk_id'] < 0: - stack.append((row['channel'], (-row['chunk_id']) - 1)) # start of chunk - channel_chunk_list[(row['channel'], (-row['chunk_id']) - 1)] = row['group'] # add on insert only - # save if needed - if len(stack) == 1: - # inserted first element in the stack - stime = row['start_time'] - elif row['chunk_id'] > 0: - stack.remove((row['channel'], row['chunk_id'] - 1)) # end of chunk - # update end time on remove only - if len(stack) == 0: - # everything removed from a stack. this indicates a subchunk is complete - if len(channel_chunk_list) > 0: - out[(iod, freq_id, file_id)] = {'start_t': stime, 'end_t': etime, 'channel_chunk': channel_chunk_list.copy()} - file_id += 1 - channel_chunk_list = {} # reset - stime = None - etime = None - else: - print("ERROR: chunk id is 0", row) - - return out + # # Deprecate - this does not work if there are multiple gains in the same chunk. + # # input: list of dataframes, each dataframe is a group of channels with same iod, group, and frequency. + # # return: dict, each entry is (iod, group, freq, id) = {start_t, end_t, [{(channel, chunk_id): (s_time, e_time), ...]}. each chunk is defined as the max span for a set of channels. + # # this is detected via a stack - when stack is empty, a subchunk is created. note that chunkids are not aligned between channels + # def split_chunks_temporal_merged(self, chunk_table: pd.DataFrame) -> dict: + # # split the chunks so that each is a collection of channels in a group. + # # fill missing channels with zeros + # # need an ordering of channel starting and ending + # # create new table with time stamp, and chunk id (- for start, + for end) + # # sort in order: iod, group, freq, timestamp, chunk id + # # iterate and push and pop to queue (parenthesis like). create a chunk when queue becomes empty. + + # chunk_table.sort_values(by=['iod', 'freq_id', 'gain', 'start_time', 'chunk_id'], inplace=True) + # sorted = chunk_table.groupby(['iod', 'freq_id', 'gain']) + + # out = dict() + # file_id = 0 + # stime = None + # etime = None + # stack = [] + # channel_chunk_list = dict() # store the (channel, chunk_id) + # for (iod, freq_id), df in sorted: + # for index, row in df.iterrows(): + # # update first + # etime = row['end_time'] + # if row['chunk_id'] < 0: + # stack.append((row['channel'], (-row['chunk_id']) - 1)) # start of chunk + # channel_chunk_list[(row['channel'], (-row['chunk_id']) - 1)] = row['group'] # add on insert only + # # save if needed + # if len(stack) == 1: + # # inserted first element in the stack + # stime = row['start_time'] + # elif row['chunk_id'] > 0: + # stack.remove((row['channel'], row['chunk_id'] - 1)) # end of chunk + # # update end time on remove only + # if len(stack) == 0: + # # everything removed from a stack. this indicates a subchunk is complete + # if len(channel_chunk_list) > 0: + # out[(iod, freq_id, file_id)] = {'start_t': stime, 'end_t': etime, 'channel_chunk': channel_chunk_list.copy()} + # file_id += 1 + # channel_chunk_list = {} # reset + # stime = None + # etime = None + # else: + # print("ERROR: chunk id is 0", row) + + # return out # input: list of dataframes, each dataframe is a group of channels with same iod, group, and frequency. # return: dict, each entry is (iod, group, freq, id) = {start_t, end_t, [(channel, chunk_id), ...]}. each chunk is a fixed time period. # we should first detect the merged chunks then segment the chunks. - def split_chunks_temporal_fixed(self, chunk_table: pd.DataFrame, duration_sec: float = 600.0) -> dict: + def split_chunks_temporal_fixed(self, chunk_table: pd.DataFrame, duration_sec: float = 3600.0) -> dict: # split the chunks so that each is a fixed length with appropriate grouping. some may have partial data # need an ordering of channel starting and ending. # create new table with time stamp, and chunk id (- for start, + for end) @@ -349,7 +352,7 @@ def split_chunks_temporal_fixed(self, chunk_table: pd.DataFrame, duration_sec: f # sorted = self.split_chunks_temporal_merged(chunk_table) # # next process each subchunk: - chunk_table.sort_values(by=['iod', 'freq_id', 'start_time', 'chunk_id'], inplace=True) + chunk_table.sort_values(by=['iod', 'freq_id', 'gain', 'start_time', 'chunk_id'], inplace=True) sorted = chunk_table.groupby(['iod', 'freq_id']) # for each subchunk, get the start and end, and insert splitters. @@ -377,6 +380,7 @@ def split_chunks_temporal_fixed(self, chunk_table: pd.DataFrame, duration_sec: f 'chunk_id': 0, # note: this will occur before the end time entries. 'start_time': splitter_time, 'end_time': splitter_time, + 'gain': -1, 'iod': "any", 'group': -1, 'freq_id': -1}) @@ -401,7 +405,7 @@ def split_chunks_temporal_fixed(self, chunk_table: pd.DataFrame, duration_sec: f if row['chunk_id'] < 0: k = (row['channel'], (-row['chunk_id']) - 1) stack.append(k) # start of chunk - added[k] = row['group'] # add on insert only + added[k] = (row['group'], row['gain']) # add on insert only # save if needed elif row['chunk_id'] > 0: @@ -414,8 +418,30 @@ def split_chunks_temporal_fixed(self, chunk_table: pd.DataFrame, duration_sec: f # everything removed from a stack. this indicates a subchunk is complete if len(added) > 0: splitter_end = row['start_time'] - out[(iod, freq_id, file_id)] = {'start_t': splitter_start, 'end_t': splitter_end, 'channel_chunk': added.copy()} - file_id += 1 + + # check for multiple gains for same channel. Separate by gain + gains = {} + for (ch, _), (_, gain) in added.items(): + if ch not in gains.keys(): + gains[ch] = set() + gains[ch].add(gain) + # the number of files to create is the max number of gains for any channel + nfiles = max([len(x) for x in gains.values()]) + # convert set to list + gains = {ch: list(gains[ch]) for ch in gains.keys()} + # now split the added dict by file id + to_add_groups = [] + for i in range(nfiles): + to_add_groups.append({}) + + for (channel, chunk_id), (group, gain) in added.items(): + id = gains[channel].index(gain) + to_add_groups[id][(channel, chunk_id)] = group + + for i in range(nfiles): + out[(iod, freq_id, file_id)] = {'start_t': splitter_start, 'end_t': splitter_end, 'channel_chunk': to_add_groups[i].copy()} + file_id += 1 + splitter_start = splitter_end # update he added list by any queued deletes. for x in deleted: @@ -448,15 +474,18 @@ def _pretty_print(self, table: dict): for k, v in value['channel_chunk'].items(): print(" ", k, v) - # get channel min and max values, across chunks. - def _get_waveform_channel_minmax(self, waveforms): - minmax = {} - for channel, wf in waveforms.items(): - mins = [ np.nanmin(chunk['samples']) for chunk in wf['chunks'] ] - maxs = [ np.nanmax(chunk['samples']) for chunk in wf['chunks'] ] + # # get channel min and max values, across chunks. + # def _get_waveform_channel_minmax(self, waveforms): + # minmax = {} + # for channel, wf in waveforms.items(): + # mins = [ np.fmin.reduce(chunk['samples']) for chunk in wf['chunks'] ] + # maxs = [ np.fmax.reduce(chunk['samples']) for chunk in wf['chunks'] ] + # gains = list(set([ chunk['gain'] for chunk in wf['chunks'] ])) + # if len(gains) != 1: + # raise ValueError("ERROR: more than 1 gain for a channel") - minmax[channel] = (np.nanmin(mins), np.nanmax(maxs)) - return minmax + # minmax[channel] = (np.fmin.reduce(mins), np.fmax.reduce(maxs), gains[0]) + # return minmax def write_waveforms(self, path, waveforms): @@ -483,12 +512,12 @@ def write_waveforms(self, path, waveforms): subchunks1 = self.split_chunks_temporal_fixed(channel_table, duration_sec = self.chunkSize) # print("FIXED", len(subchunks1)) # self._pretty_print(subchunks1) - else: - subchunks1 = self.split_chunks_temporal_merged(channel_table) - # print("merged", len(subchunks1)) - # self._pretty_print(subchunks1) + # else: + # subchunks1 = self.split_chunks_temporal_merged(channel_table) + # # print("merged", len(subchunks1)) + # # self._pretty_print(subchunks1) - minmax = self._get_waveform_channel_minmax(waveforms) + # minmax = self._get_waveform_channel_minmax(waveforms) #========== now write out ============= # count channels belonging to respiratory data this is needed for the iod @@ -517,7 +546,11 @@ def write_waveforms(self, path, waveforms): dicom = self.writer.set_study_info(dicom, studyUID = studyInstanceUID, studyDate = datetime.now()) dicom = self.writer.set_series_info(dicom, iod, seriesUID=seriesInstanceUID) dicom = self.writer.set_waveform_acquisition_info(dicom, instanceNumber = file_id) - dicom = self.writer.add_waveform_chunks_multiplexed(dicom, iod, chunk_info, waveforms, minmax) + dicom = self.writer.add_waveform_chunks_multiplexed(dicom, iod, chunk_info, waveforms) + + if dicom is None: + # no wave sequence written. skip + continue # Save DICOM file. write_like_original is required # these the initial path when added - it points to a temp file. @@ -614,15 +647,14 @@ def read_waveforms(self, path, start_time, end_time, signal_names): samples = [int(x) for x in str.split(item[0x0099, 0x1012].value, sep = ',')] stimes = [float(x) for x in str.split(item[0x0099, 0x1001].value, sep = ',')] etimes = [x + float(y) / z for x, y, z in zip(stimes, samples, freqs)] - + channels = str.split(item[0x0099, 0x1021].value, sep = ',') canonical_channels = [x.upper() for x in channels] group_ids = [ int(x) for x in str.split(item[0x0099, 0x1022].value, sep = ',')] chan_ids = [int(x) for x in str.split(item[0x0099, 0x1023].value, sep = ',')] # get group ids and set of channels for all available channels. - for (i, chan) in enumerate(channels): - group_id = group_ids[i] + for (chan, group_id, chan_id) in zip(channels, group_ids, chan_ids): stime = stimes[group_id] etime = etimes[group_id] @@ -641,7 +673,7 @@ def read_waveforms(self, path, start_time, end_time, signal_names): # original channel name channel_info = {'channel': chan, - 'channel_idx': chan_ids[i], + 'channel_idx': chan_id, 'freq': freqs[group_id], 'number_samples': samples[group_id], 'start_time': stime} @@ -755,12 +787,13 @@ def read_waveforms(self, path, start_time, end_time, signal_names): # init the output if not previously allocated if requested_channel_name not in output.keys(): - output[requested_channel_name] = np.full(shape = max_len, fill_value = np.nan, dtype=np.float64) + output[requested_channel_name] = np.full(shape = max_len, fill_value = np.nan, dtype=np.float32) # copy the data to the output # print("copy ", arrs[group_idx].shape, " to ", output[channel].shape, # " from ", target_start, " to ", target_end) - output[requested_channel_name][target_start:target_end] = arrs[group_idx][channel_idx, 0:nsamps] + new_vals = arrs[group_idx][channel_idx, 0:nsamps] + output[requested_channel_name][target_start:target_end] = np.where(np.isfinite(new_vals), new_vals, output[requested_channel_name][target_start:target_end]) t2 = time.time() d3 = t2 - t1 @@ -812,19 +845,19 @@ class DICOM16BitsChunked(DICOM16Bits): chunkSize = 3600.0 -class DICOMHighBitsMerged(DICOMHighBits): - # waveform lead names to dicom IOD mapping. Incomplete. - # avoiding 12 lead ECG because of the limit in number of samples. +# class DICOMHighBitsMerged(DICOMHighBits): +# # waveform lead names to dicom IOD mapping. Incomplete. +# # avoiding 12 lead ECG because of the limit in number of samples. - chunkSize = -1 +# chunkSize = -1 -class DICOMLowBitsMerged(DICOMLowBits): +# class DICOMLowBitsMerged(DICOMLowBits): - chunkSize = -1 +# chunkSize = -1 -class DICOM16BitsMerged(DICOM16Bits): +# class DICOM16BitsMerged(DICOM16Bits): - chunkSize = -1 +# chunkSize = -1 From 7ff634c112b7d13bced611e5d3b26f356f7d6846 Mon Sep 17 00:00:00 2001 From: Tony Pan Date: Wed, 17 Jul 2024 10:45:23 -0400 Subject: [PATCH 4/6] ENH: CLI parameter for script to run only fidelity check and skip the read benchmark. --- waveform_benchmark/__main__.py | 6 +- waveform_benchmark/benchmark.py | 124 ++++++++++++++++---------------- 2 files changed, 68 insertions(+), 62 deletions(-) diff --git a/waveform_benchmark/__main__.py b/waveform_benchmark/__main__.py index 5cc420a..03a5d40 100644 --- a/waveform_benchmark/__main__.py +++ b/waveform_benchmark/__main__.py @@ -82,6 +82,9 @@ def main(): ap.add_argument('--waveform_suite_summary_file', '-w', default='waveform_suite_benchmark_summary.csv', help='Save a CSV summary of the waveform suite run to this path/file') + ap.add_argument('--test_only', + default=False, action='store_true', + help='Run only the tests, do not run the benchmarks') opts = ap.parse_args() # If log is requested send the output there @@ -121,7 +124,8 @@ def main(): else: run_benchmarks(input_record=opts.input_record, format_class=opts.format_class, - pn_dir=opts.physionet_directory) + pn_dir=opts.physionet_directory, + test_only = opts.test_only) # Close the log file after the run is complete if opts.save_output_to_log: diff --git a/waveform_benchmark/benchmark.py b/waveform_benchmark/benchmark.py index 6776237..3a29fa8 100755 --- a/waveform_benchmark/benchmark.py +++ b/waveform_benchmark/benchmark.py @@ -57,7 +57,7 @@ def compute_snr(reference_signal, output_signal): def run_benchmarks(input_record, format_class, pn_dir=None, format_list=None, waveform_list=None, test_list=None, - result_list=None): + result_list=None, test_only = False): # Load the class we will be testing module_name, class_name = format_class.rsplit('.', 1) @@ -222,67 +222,69 @@ def run_benchmarks(input_record, format_class, pn_dir=None, format_list=None, wa print(filedata_nonan[~isgood][:10]) print(f"(Gain: {chunk['gain']})") # print('_' * 64) - print('_' * 64) - print('Read performance (median of N trials):') - print(' #seek #read KiB sec [N]') - - for block_length, block_count in TEST_BLOCK_LENGTHS: - counters = [] - for i in repeat_test(TEST_MIN_DURATION, TEST_MIN_ITERATIONS): + + if not test_only: + print('_' * 64) + print('Read performance (median of N trials):') + print(' #seek #read KiB sec [N]') + + for block_length, block_count in TEST_BLOCK_LENGTHS: + counters = [] + for i in repeat_test(TEST_MIN_DURATION, TEST_MIN_ITERATIONS): + r = random.Random(12345) + with PerformanceCounter() as pc: + for j in range(block_count): + t0 = r.random() * (total_length - block_length) + t1 = t0 + block_length + fmt().read_waveforms(path, t0, t1, all_channels) + counters.append(pc) + + print('%6.0f %6.0f %8.0f %8.4f %6s read %d x %.0fs, all channels' + % (median_attr(counters, 'n_seek_calls'), + median_attr(counters, 'n_read_calls'), + median_attr(counters, 'n_bytes_read') / 1024, + median_attr(counters, 'cpu_seconds'), + '[%d]' % len(counters), + block_count, + block_length)) + + if format_list is not None: + # Append read time result + format_list, waveform_list, test_list, result_list = append_result(format_class, input_record, + f'{block_count}_all', + median_attr(counters, 'cpu_seconds'), + format_list, + waveform_list, test_list, + result_list) + + for block_length, block_count in TEST_BLOCK_LENGTHS: + counters = [] r = random.Random(12345) - with PerformanceCounter() as pc: - for j in range(block_count): - t0 = r.random() * (total_length - block_length) - t1 = t0 + block_length - fmt().read_waveforms(path, t0, t1, all_channels) - counters.append(pc) - - print('%6.0f %6.0f %8.0f %8.4f %6s read %d x %.0fs, all channels' - % (median_attr(counters, 'n_seek_calls'), - median_attr(counters, 'n_read_calls'), - median_attr(counters, 'n_bytes_read') / 1024, - median_attr(counters, 'cpu_seconds'), - '[%d]' % len(counters), - block_count, - block_length)) - - if format_list is not None: - # Append read time result - format_list, waveform_list, test_list, result_list = append_result(format_class, input_record, - f'{block_count}_all', - median_attr(counters, 'cpu_seconds'), - format_list, - waveform_list, test_list, - result_list) - - for block_length, block_count in TEST_BLOCK_LENGTHS: - counters = [] - r = random.Random(12345) - for i in repeat_test(TEST_MIN_DURATION, TEST_MIN_ITERATIONS): - with PerformanceCounter() as pc: - for j in range(block_count): - t0 = r.random() * (total_length - block_length) - t1 = t0 + block_length - c = r.choice(all_channels) - fmt().read_waveforms(path, t0, t1, [c]) - counters.append(pc) - - print('%6.0f %6.0f %8.0f %8.4f %6s read %d x %.0fs, one channel' - % (median_attr(counters, 'n_seek_calls'), - median_attr(counters, 'n_read_calls'), - median_attr(counters, 'n_bytes_read') / 1024, - median_attr(counters, 'cpu_seconds'), - '[%d]' % len(counters), - block_count, - block_length)) - - if format_list: - format_list, waveform_list, test_list, result_list = append_result(format_class, input_record, - f'{block_count}_one', - median_attr(counters, 'cpu_seconds'), - format_list, - waveform_list, test_list, - result_list) + for i in repeat_test(TEST_MIN_DURATION, TEST_MIN_ITERATIONS): + with PerformanceCounter() as pc: + for j in range(block_count): + t0 = r.random() * (total_length - block_length) + t1 = t0 + block_length + c = r.choice(all_channels) + fmt().read_waveforms(path, t0, t1, [c]) + counters.append(pc) + + print('%6.0f %6.0f %8.0f %8.4f %6s read %d x %.0fs, one channel' + % (median_attr(counters, 'n_seek_calls'), + median_attr(counters, 'n_read_calls'), + median_attr(counters, 'n_bytes_read') / 1024, + median_attr(counters, 'cpu_seconds'), + '[%d]' % len(counters), + block_count, + block_length)) + + if format_list: + format_list, waveform_list, test_list, result_list = append_result(format_class, input_record, + f'{block_count}_one', + median_attr(counters, 'cpu_seconds'), + format_list, + waveform_list, test_list, + result_list) print('_' * 64) From f1d3c4ab5aab52fefb343633af0e04a3f8254001 Mon Sep 17 00:00:00 2001 From: Tony Pan Date: Wed, 17 Jul 2024 11:28:09 -0400 Subject: [PATCH 5/6] ENH: added documentation for the '--test_only' to BENCHMARK.md --- BENCHMARK.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/BENCHMARK.md b/BENCHMARK.md index 80b9f30..ddb734a 100644 --- a/BENCHMARK.md +++ b/BENCHMARK.md @@ -9,7 +9,7 @@ The benchmarking script includes a simple set of metrics to evaluate the perform The [waveform_benchmark.py](./waveform_benchmark.py) script is the entrypoint for running benchmarks. This script calls functions in the `waveform_benchmark` package. The syntax for running waveform_benchmark.py from the command line is: ``` -./waveform_benchmark.py -r -f -p +./waveform_benchmark.py -r -f -p [--test_only] ``` The `-p` argument can be used to pull a file directly from a PhysioNet database but isn't needed when running on a local file. For example, to run the `WFDBFormat516` benchmark on local record `data/waveforms/mimic_iv/waves/p100/p10079700/85594648/85594648`: @@ -18,6 +18,8 @@ The `-p` argument can be used to pull a file directly from a PhysioNet database ./waveform_benchmark.py -r ./data/waveforms/mimic_iv/waves/p100/p10079700/85594648/85594648 -f waveform_benchmark.formats.wfdb.WFDBFormat516 ``` +The `--test_only` flag can be used to turn off the read performance benchmarking thus run only the fidelity tests. + An example output is provided below: ``` From 4a89b78fdc3bc035999fb8608d5a852d377fbd69 Mon Sep 17 00:00:00 2001 From: Tony Pan Date: Fri, 19 Jul 2024 14:53:59 -0400 Subject: [PATCH 6/6] FIX: if a channel only has nan data, it should not be excluded from the output as fidelity check would be looking for that channel. --- waveform_benchmark/benchmark.py | 2 +- .../formats/dcm_utils/dcm_waveform_writer.py | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/waveform_benchmark/benchmark.py b/waveform_benchmark/benchmark.py index 3a29fa8..ad35ebe 100755 --- a/waveform_benchmark/benchmark.py +++ b/waveform_benchmark/benchmark.py @@ -206,7 +206,7 @@ def run_benchmarks(input_record, format_class, pn_dir=None, format_list=None, wa # use numpy's isclose to determine floating point equality isgood = np.isclose(filedata_nonan, data_nonan, atol=0.5/chunk['gain']) numgood = np.sum(isgood) - fpeq_rel = numgood/len(data_nonan) + fpeq_rel = numgood/len(data_nonan) if len(data_nonan) != 0 else np.nan # compute SNR to quantify signal fidelity snr = compute_snr(data_nonan, filedata_nonan) diff --git a/waveform_benchmark/formats/dcm_utils/dcm_waveform_writer.py b/waveform_benchmark/formats/dcm_utils/dcm_waveform_writer.py index 1bb97e8..d3653a7 100644 --- a/waveform_benchmark/formats/dcm_utils/dcm_waveform_writer.py +++ b/waveform_benchmark/formats/dcm_utils/dcm_waveform_writer.py @@ -501,8 +501,10 @@ def create_multiplexed_chunk(self, waveforms: dict, # only process if there is finite data. temp_data = waveforms[channel]['chunks'][chunkid]['samples'][channel_start:channel_start+duration] - if (temp_data is None) or (len(temp_data) == 0) or (not np.any(np.isfinite(temp_data))): - continue + if (temp_data is None) or (len(temp_data) == 0): + continue + # in case it's all nans or infs + all_nan = (not np.any(np.isfinite(temp_data))) # assign ids for each channel if channel not in channels.keys(): @@ -512,8 +514,8 @@ def create_multiplexed_chunk(self, waveforms: dict, gains[channel] = [] i += 1 - mins[channel].append(np.fmin.reduce(temp_data)) - maxs[channel].append(np.fmax.reduce(temp_data)) + mins[channel].append(0 if all_nan else np.fmin.reduce(temp_data)) + maxs[channel].append(0 if all_nan else np.fmax.reduce(temp_data)) gains[channel].append(waveforms[channel]['chunks'][chunkid]['gain']) unprocessed_chunks.append((channel, chunkid, channel_start, target_start, target_start + duration))