diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index 017cd62..b095b02 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -13,6 +13,11 @@ jobs: matrix: python-version: ["3.10"] + permissions: + contents: read + issues: write + pull-requests: write + steps: - uses: actions/checkout@v3 with: diff --git a/waveform_benchmark/formats/dicom.py b/waveform_benchmark/formats/dicom.py index 98b196f..6b679f3 100644 --- a/waveform_benchmark/formats/dicom.py +++ b/waveform_benchmark/formats/dicom.py @@ -97,7 +97,6 @@ # ElectromyogramWaveform: EMG, unconstrained, 1-64, , unconstrained, DCID 3031 “Lead Location Near or in Muscle” or DCID 3032 “Lead Location Near Peripheral Nerve”, SS/SL # SleepEEGWaveform: EEG, unconstrained, 1-64, , unconstrained, DCID 3030 “EEG Lead” , SS/SL # MultichannelRespiratoryWaveform: RESP, >1, , unconstrained, DCID 3005 “Respiration Waveform” , SS/SL -# CHANNEL_TO_DICOM_IOD = { @@ -147,7 +146,7 @@ class BaseDICOMFormat(BaseFormat): # group channels by iod, then split by start and end times. # a second pass identify different frequences within a group, and split those off into separate chunks. - + # create a pandas data table to facilitate dicom file creation. output should each be a separate series of chunks for an iod. def create_channel_table(self, waveforms) -> pd.DataFrame: # For example: waveforms['V5'] -> {'units': 'mV', 'samples_per_second': 360, 'chunks': [{'start_time': 0.0, 'end_time': 1805.5555555555557, 'start_sample': 0, 'end_sample': 650000, 'gain': 200.0, 'samples': array([-0.065, -0.065, -0.065, ..., -0.365, -0.335, 0. ], dtype=float32)}]} @@ -160,11 +159,10 @@ def create_channel_table(self, waveforms) -> pd.DataFrame: raise ValueError("Samples per second not found in waveform") if 'units' not in wf.keys(): raise ValueError("Units not found in waveform") - + freq = wf['samples_per_second'] iod = CHANNEL_TO_DICOM_IOD[channel.upper()] group = iod.channel_coding[channel.upper()]['group'] - for i, chunk in enumerate(wf['chunks']): if 'start_sample' not in chunk.keys(): @@ -179,7 +177,7 @@ def create_channel_table(self, waveforms) -> pd.DataFrame: raise ValueError("Gain not found in chunk") if 'samples' not in chunk.keys(): raise ValueError("Samples not found in chunk") - + # add data.append({'channel': channel, 'freq': freq, @@ -202,23 +200,22 @@ def create_channel_table(self, waveforms) -> pd.DataFrame: 'iod': iod.__name__, 'group': group, 'freq_id': None}) - + df = pd.DataFrame(data) df.sort_values(by=['iod', 'start_time', 'freq', 'chunk_id'], inplace=True) - # assign freq_id, one per frequency within a group sorted = df.groupby(['iod', 'group']) out = [] # first assign a freq_id, which distinguishes the different frequencies within a iod-group. for (iod, group), gr in sorted: - + # within each group, each frequency gets a separate id as we should not have multiple frequencies in a group - + nfreqs = gr['freq'].nunique() if nfreqs > 1: print("NOTE: multiple frequencies in a group. need to split further.") - + freqed = gr.groupby('freq') id = 1 for freq, gr2 in freqed: @@ -243,13 +240,12 @@ def split_chunks_temporal_adaptive(self, chunk_table: pd.DataFrame) -> dict: # need an ordering of channel starting and ending # create new table with time stamp, and chunk id (- for start, + for end) # sort in order: iod, group, freq, timestamp, chunk id - + # now group by iod, freq_id, and time chunk_table.sort_values(by=['iod', 'freq_id', 'start_time', 'chunk_id'], inplace=True) - + sorted = chunk_table.groupby(['iod', 'freq_id', 'start_time']) - - + # now iterate by time to create teh segments out = dict() file_id = 0 @@ -258,13 +254,11 @@ def split_chunks_temporal_adaptive(self, chunk_table: pd.DataFrame) -> dict: for (iod, freq_id, time), gr in sorted: # print("GROUPED", iod, freq_id, time, gr) - # now iterate through the groups and create subchunks # since grouped by time, we will have addition and removal of channels/chunks at each time point, but not between. # so it's sufficient to use sets to track if len(curr_ch_chunk_list) == 0: # first subchunk. no worries. - for index, ch in gr.iterrows(): if ch['chunk_id'] < 0: curr_ch_chunk_list[(ch['channel'], (-ch['chunk_id']) - 1)] = ch['group'] @@ -285,7 +279,7 @@ def split_chunks_temporal_adaptive(self, chunk_table: pd.DataFrame) -> dict: else: print("ERROR: chunk id is 0", ch) last_time = time - + # print(out) # do group by return out @@ -300,10 +294,10 @@ def split_chunks_temporal_merged(self, chunk_table: pd.DataFrame) -> dict: # create new table with time stamp, and chunk id (- for start, + for end) # sort in order: iod, group, freq, timestamp, chunk id # iterate and push and pop to queue (parenthesis like). create a chunk when queue becomes empty. - + chunk_table.sort_values(by=['iod', 'freq_id', 'start_time', 'chunk_id'], inplace=True) sorted = chunk_table.groupby(['iod', 'freq_id']) - + out = dict() file_id = 0 stime = None @@ -334,8 +328,7 @@ def split_chunks_temporal_merged(self, chunk_table: pd.DataFrame) -> dict: etime = None else: print("ERROR: chunk id is 0", row) - - + return out # input: list of dataframes, each dataframe is a group of channels with same iod, group, and frequency. @@ -353,7 +346,7 @@ def split_chunks_temporal_fixed(self, chunk_table: pd.DataFrame, duration_sec: f # # instead - first get the merged subchunks, then subdivide more # sorted = self.split_chunks_temporal_merged(chunk_table) # # next process each subchunk: - + chunk_table.sort_values(by=['iod', 'freq_id', 'start_time', 'chunk_id'], inplace=True) sorted = chunk_table.groupby(['iod', 'freq_id']) @@ -365,15 +358,15 @@ def split_chunks_temporal_fixed(self, chunk_table: pd.DataFrame, duration_sec: f stack = [] deleted = set() added = dict() # store the (channel, chunk_id) - + # insert all splitter time_series = chunk_table['start_time'] stime = time_series.min() etime = time_series.max() - + splitters = [] periods = int(math.ceil((etime - stime) / duration_sec)) - + for i in range(periods): splitter_time = stime + (i+1) * duration_sec @@ -385,19 +378,17 @@ def split_chunks_temporal_fixed(self, chunk_table: pd.DataFrame, duration_sec: f 'iod': "any", 'group': -1, 'freq_id': -1}) - + splits = pd.DataFrame(splitters) # merge splits and sorted - for (iod, freq_id), gr in sorted: - + # ====== add splitters # df is sorted by time. splits_iod = pd.concat([gr, splits]).sort_values(by=['start_time', 'chunk_id']) - # print(splits_iod) - + # ======= process splits_iod stack = [] added = dict() @@ -454,14 +445,14 @@ def _pretty_print(self, table: dict): print(key, ": ", value['start_t'], " ", value['end_t']) for k, v in value['channel_chunk'].items(): print(" ", k, v) - + def write_waveforms(self, path, waveforms): fs = FileSet() - + # one series per modality # as many multiplexed groups as allowed by modality # one instance per chunk - + # one dicomdir per study? or series? studyInstanceUID = uid.generate_uid() seriesInstanceUID = uid.generate_uid() @@ -483,9 +474,9 @@ def write_waveforms(self, path, waveforms): subchunks1 = self.split_chunks_temporal_merged(channel_table) # print("merged", len(subchunks1)) # self._pretty_print(subchunks1) - + #========== now write out ============= - + # count channels belonging to respiratory data this is needed for the iod count_per_iod = {} for channel in waveforms.keys(): @@ -494,18 +485,18 @@ def write_waveforms(self, path, waveforms): count_per_iod[iod_name] = 1 else: count_per_iod[iod_name] += 1 - + # the format of output of split-chunks # { (iod, freq_id, file_id): {start_t, end_t, {(channel, chunk_id) : group, ...}}} for (iod_name, freq_id, file_id), chunk_info in subchunks1.items(): # print("writing ", iod_name, ", ", file_id) - + # create and iod instance iod = self.make_iod(iod_name, hifi=self.hifi, num_channels = count_per_iod[iod_name]) - + # each multiplex group can have its own frequency # but if there are different frequencies for channels in a multiplex group, we need to split. - + # create a new file TO FIX. dicom = self.writer.make_empty_wave_filedataset(iod) dicom = self.writer.set_order_info(dicom) @@ -513,7 +504,7 @@ def write_waveforms(self, path, waveforms): dicom = self.writer.set_series_info(dicom, iod, seriesUID=seriesInstanceUID) dicom = self.writer.set_waveform_acquisition_info(dicom, instanceNumber = file_id) dicom = self.writer.add_waveform_chunks_multiplexed(dicom, iod, chunk_info, waveforms) - + # Save DICOM file. write_like_original is required # these the initial path when added - it points to a temp file. # instance = fs.add(dicom) @@ -526,7 +517,7 @@ def write_waveforms(self, path, waveforms): record.ContentDate = dicom.ContentDate record.ContentTime = dicom.ContentTime block = record.private_block(0x0099, "CHORUS_AI", create=True) - + # gather metadata from dicom. to keep as same order, must not use set channel_info = [] group_info =[] @@ -535,12 +526,12 @@ def write_waveforms(self, path, waveforms): nsample = seq.NumberOfWaveformSamples stime = cast(float, seq.MultiplexGroupTimeOffset) / 1000.0 group_info.append({'freq': freq, 'number_samples': nsample, 'start_time': stime}) - + for chan_id, ch in enumerate(seq.ChannelDefinitionSequence): channel_info.append({'channel': ch.ChannelSourceSequence[0].CodeMeaning, 'group_idx': seq_id, 'channel_idx': chan_id}) - + # print("GROUPS: ", group_info) # print("CHANNELS: ", channel_info) channel_names = [x['channel'] for x in channel_info] @@ -549,11 +540,11 @@ def write_waveforms(self, path, waveforms): freqs = [str(x['freq']) for x in group_info] nsamples = [str(x['number_samples']) for x in group_info] stimes = [str(x['start_time']) for x in group_info] - + block.add_new(0x21, "LO", ",".join(channel_names)) # all channels (0099 1021) block.add_new(0x22, "LO", ",".join(group_ids)) # group of each channel (0099 1022) block.add_new(0x23, "LO", ",".join(channel_ids)) # id of channel in its group (0099 1023) - + block.add_new(0x11, "LO", ",".join(freqs)) # group's frequencies (0099 1011) block.add_new(0x12, "LO", ",".join(nsamples)) # groups' sample count (0099 1012) block.add_new(0x01, "LO", ",".join(stimes)) # group's start times. (0099 1013) @@ -562,9 +553,9 @@ def write_waveforms(self, path, waveforms): # stime_str = stime_str if len(stime_str) <= 16 else stime_str[:16] # block.add_new(0x01, "DS", stime_str) # file seqs start time (0099 1001) // use same VR as MultiplexGroupTimeOffset # block.add_new(0x02, "FL", chunk_info['end_t']) # file seqs end time (0099 1002) - + wave_node = RecordNode(record) - + instance = fs.add_custom(dicom, leaf = wave_node) # dcm_path = prefix + "_" + str(file_id) + ext @@ -575,7 +566,7 @@ def write_waveforms(self, path, waveforms): # but we may have to keep this in the file metadata field. # https://dicom.nema.org/medical/dicom/current/output/chtml/part03/chapter_F.html # https://dicom.nema.org/medical/dicom/current/output/chtml/part03/sect_F.5.24.html - + # fs.write('/mnt/c/Users/tcp19/Downloads/Compressed/test_waveform') # fs.copy(path) # print(path) @@ -583,7 +574,7 @@ def write_waveforms(self, path, waveforms): def read_waveforms(self, path, start_time, end_time, signal_names): # have to read the whole data set each time if using dcmread. this is not efficient. - + signal_set = set(signal_names) signal_set = {name.upper() : name for name in signal_set} # ========== read from dicomdir file @@ -592,45 +583,45 @@ def read_waveforms(self, path, start_time, end_time, signal_names): # read as file_set, then use metadata to get the table and see which files need to be accessed. # then random access to read. t1 = time.time() - + ds = dcmread(path + "/DICOMDIR") - + file_info = {} for item in ds.DirectoryRecordSequence: # if there is private tag data, use it. - - if (item.DirectoryRecordType == "WAVEFORM") and (Tag(0x0099, 0x1001) in item) : - + + if (item.DirectoryRecordType == "WAVEFORM") and (Tag(0x0099, 0x1001) in item): + # keep same order, do not use set freqs = [float(x) for x in str.split(item[0x0099, 0x1011].value, sep = ',')] samples = [int(x) for x in str.split(item[0x0099, 0x1012].value, sep = ',')] stimes = [float(x) for x in str.split(item[0x0099, 0x1001].value, sep = ',')] etimes = [x + float(y) / z for x, y, z in zip(stimes, samples, freqs)] - + channels = str.split(item[0x0099, 0x1021].value, sep = ',') canonical_channels = [x.upper() for x in channels] group_ids = [ int(x) for x in str.split(item[0x0099, 0x1022].value, sep = ',')] chan_ids = [int(x) for x in str.split(item[0x0099, 0x1023].value, sep = ',')] - + # get group ids and set of channels for all available channels. for (i, chan) in enumerate(channels): group_id = group_ids[i] stime = stimes[group_id] etime = etimes[group_id] - + # filtering here reduces the number of files to open if (chan.upper() not in signal_set.keys()): continue if (etime <= start_time) or (stime >= end_time): continue - + # only add key if this is a file to be opened. if item.ReferencedFileID not in file_info.keys(): file_info[item.ReferencedFileID] = {} - + if group_id not in file_info[item.ReferencedFileID].keys(): file_info[item.ReferencedFileID][group_id] = [] - + # original channel name channel_info = {'channel': chan, 'channel_idx': chan_ids[i], @@ -641,15 +632,14 @@ def read_waveforms(self, path, start_time, end_time, signal_names): else: # no metadata, so add mapping of None to indicate need to read metadata from file file_info[item.ReferencedFileID] = None - - + t2 = time.time() d1 = t2 - t1 t1 = time.time() - + # ========== open specific subfiles and gather the channel and time information as dataframe # extract channel, start_time, end_time, start_sample, end_sample, iod, group, freq, freq_id - + # each should use a different padding value. output = {} info_tags = ["WaveformSequence", @@ -663,27 +653,27 @@ def read_waveforms(self, path, start_time, end_time, signal_names): # file_info contains either None (have to get from individual dicom file), or metadata for matched channel/time for file_name, finfo in file_info.items(): fn = path + "/" + file_name - + read_meta_from_file = (finfo is None) - + # if metadata is in dicomdir, then we have only required files in file_info. # if metadata is not in dicomdir, then all files are listed and metadata needs to be retrieved. # either way, need to read the file. with open(fn, 'rb') as fobj: - + # open the file t3 = time.time() ds = dcmread(fobj, defer_size = 1000, specific_tags = info_tags) seqs_raw = dcm_reader.get_tag(fobj, ds, 'WaveformSequence', defer_size = 1000) seqs = cast(list[Dataset], seqs_raw) - + t4 = time.time() d5 = t4 - t3 t3 = time.time() arrs = {} for group_idx, seq in enumerate(seqs): - + if read_meta_from_file: # get the file metadata (can be saved in DICOMDIR in the future, but would need to change the channel metadata info.) channel_infos = dcm_reader.get_waveform_seq_info(fobj, seq) # get channel info @@ -695,14 +685,14 @@ def read_waveforms(self, path, start_time, end_time, signal_names): if len(channel_infos) == 0: continue - + # iterate over the channel_infos now. for info in channel_infos: channel = info['channel'].upper() - + if (channel not in signal_set.keys()): continue - + # compute start and end offsets in the file using timestamps freq = float(info['freq']) max_len = int(np.round(end_time * freq)) - int(np.round(start_time * freq)) @@ -711,16 +701,16 @@ def read_waveforms(self, path, start_time, end_time, signal_names): gstart = float(info['start_time']) nsamples = int(info['number_samples']) gend = gstart + float(nsamples) / freq - + # calculate the intersection of the time window win_start = max(gstart, start_time) win_end = min(gend, end_time) - + if (win_start >= win_end): # window is not possible continue # else we have a valid window - + # compute the start and end offset for the source and destination start_offset = max(0, int(np.round(win_start * freq) - np.round(gstart * freq) )) end_offset = min(nsamples, int(np.round(win_end * freq) - np.round(gstart * freq) )) @@ -733,11 +723,11 @@ def read_waveforms(self, path, start_time, end_time, signal_names): nsamps = min(end_offset - start_offset, target_end - target_start) end_offset = start_offset + nsamps target_end = target_start + nsamps - + if nsamps <= 0: continue # else we have a valid window with positive number of samples - + # get info about the each channel present. channel_idx = info['channel_idx'] requested_channel_name = signal_set[channel] @@ -745,7 +735,7 @@ def read_waveforms(self, path, start_time, end_time, signal_names): if group_idx not in arrs.keys(): item = cast(Dataset, seq) arrs[group_idx] = dcm_reader.get_multiplex_array(fobj, item, start_offset, end_offset, as_raw = False) - + # init the output if not previously allocated if requested_channel_name not in output.keys(): output[requested_channel_name] = np.full(shape = max_len, fill_value = np.nan, dtype=np.float64) @@ -754,7 +744,7 @@ def read_waveforms(self, path, start_time, end_time, signal_names): # print("copy ", arrs[group_idx].shape, " to ", output[channel].shape, # " from ", target_start, " to ", target_end) output[requested_channel_name][target_start:target_end] = arrs[group_idx][channel_idx, 0:nsamps] - + t2 = time.time() d3 = t2 - t1 # print("time: ", path, " (read, metadata, array) = (", d1, d3, ")") @@ -775,7 +765,7 @@ class DICOMHighBits(BaseDICOMFormat): chunkSize = None # adaptive hifi = True - + class DICOMLowBits(BaseDICOMFormat): writer = dcm_writer.DICOMWaveformWriter() @@ -809,4 +799,3 @@ class DICOMLowBitsMerged(DICOMLowBits): chunkSize = -1 hifi = False - \ No newline at end of file