diff --git a/modules/nifti-extraction/ImageExtractorNifti.py b/modules/nifti-extraction/ImageExtractorNifti.py new file mode 100644 index 0000000..5ad7575 --- /dev/null +++ b/modules/nifti-extraction/ImageExtractorNifti.py @@ -0,0 +1,473 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +import os +import glob +from shutil import copyfile +import hashlib +import json +import sys +import subprocess +import logging +from multiprocessing import Pool +import pdb +import time +import pickle +import argparse +import numpy as np +import pandas as pd +import pydicom as dicom +# pydicom imports needed to handle data errors +from pydicom import config +from pydicom import datadict +from pydicom import values +import dicom2nifti +import pathlib +configs = {} + + + +def initialize_config_and_execute(config_values): + global configs + configs = config_values + # Applying checks for paths + + p1 = pathlib.PurePath(configs['DICOMHome']) + dicom_home = p1.as_posix() # the folder containing your dicom files + + p2 = pathlib.PurePath(configs['OutputDirectory']) + output_directory = p2.as_posix() + + print_images = configs['PrintImages'] + print_only_common_headers = bool(configs['CommonHeadersOnly']) + depth = int(configs['Depth']) + processes = int(configs['UseProcesses']) # how many processes to use. + flattened_to_level = configs['FlattenedToLevel'] + email = configs['YourEmail'] + send_email = configs['SendEmail'] + no_splits = int(configs['SplitIntoChunks']) + is16Bit = bool(configs['is16Bit']) + + metadata_col_freq_threshold = 0.1 + + nifti_destination = output_directory + '/extracted-images/' + failed = output_directory + '/failed-dicom/' + maps_directory = output_directory + '/maps/' + meta_directory = output_directory + '/meta/' + + LOG_FILENAME = output_directory + '/ImageExtractor.out' + pickle_file = output_directory + '/ImageExtractor.pickle' + dict_pickle_file = output_directory + '/ImageExtractork_dict.pickle' + + # record the start time + t_start = time.time() + + if not os.path.exists(output_directory): + os.makedirs(output_directory) + + logging.basicConfig(filename=LOG_FILENAME, level=logging.DEBUG) + + if not os.path.exists(maps_directory): + os.makedirs(maps_directory) + + if not os.path.exists(meta_directory): + os.makedirs(meta_directory) + + if not os.path.exists(nifti_destination): + os.makedirs(nifti_destination) + + if not os.path.exists(failed): + os.makedirs(failed) + + if not os.path.exists(failed + "/1"): + os.makedirs(failed + "/1") + + if not os.path.exists(failed + "/2"): + os.makedirs(failed + "/2") + + if not os.path.exists(failed + "/3"): + os.makedirs(failed + "/3") + + if not os.path.exists(failed + "/4"): + os.makedirs(failed + "/4") + + logging.info("------- Values Initialization DONE -------") + final_res = execute(pickle_file, dicom_home, output_directory, print_images, print_only_common_headers, depth, + processes, flattened_to_level, email, send_email, no_splits, is16Bit, nifti_destination, + failed, maps_directory, meta_directory, LOG_FILENAME, metadata_col_freq_threshold, t_start,dict_pickle_file) + return final_res + + +# Function for getting tuple for field,val pairs +def get_tuples(plan, outlist = None, key = ""): + if len(key)>0: + key = key + "_" + if not outlist: + outlist = [] + for aa in plan.dir(): + try: + hasattr(plan,aa) + except TypeError as e: + logging.warning('Type Error encountered') + if hasattr(plan, aa) and aa!= 'PixelData': + value = getattr(plan, aa) + start = len(outlist) + # if dicom sequence extract tags from each element + if type(value) is dicom.sequence.Sequence: + for nn, ss in enumerate(list(value)): + newkey = "_".join([key,("%d"%nn),aa]) if len(key) else "_".join([("%d"%nn),aa]) + candidate = get_tuples(ss,outlist=None,key=newkey) + # if extracted tuples are too big condense to a string + if len(candidate)>2000: + outlist.append((newkey,str(candidate))) + else: + outlist.extend(candidate) + else: + if type(value) is dicom.valuerep.DSfloat: + value = float(value) + elif type(value) is dicom.valuerep.IS: + value = str(value) + elif type(value) is dicom.valuerep.MultiValue: + value = tuple(value) + elif type(value) is dicom.uid.UID: + value = str(value) + outlist.append((key + aa, value)) + # appends name, value pair for this file. these are later concatenated to the dataframe + return outlist + + +def extract_headers(f_list_elem): + nn,ff = f_list_elem # unpack enumerated list + ff = glob.glob(f'{ff}/*.dcm')[0] + plan = dicom.dcmread(ff, force=True) # reads in dicom file + # checks if this file has an image + c=True + try: + check = plan.pixel_array # throws error if dicom file has no image + except: + c = False + kv = get_tuples(plan) # gets tuple for field,val pairs for this file. function defined above + # dicom images should not have more than 300 + if len(kv)>500: + logging.debug(str(len(kv)) + " dicoms produced by " + ff) + kv.append(('file', f_list_elem[1])) # adds my custom field with the original filepath + kv.append(('has_pix_array',c)) # adds my custom field with if file has image + if c: + # adds my custom category field - useful if classifying images before processing + kv.append(('category','uncategorized')) + else: + kv.append(('category','no image')) # adds my custom category field, makes note as imageless + return dict(kv) + + +# Function to extract pixel array information +# takes an integer used to index into the global filedata dataframe +# returns tuple of +# filemapping: dicom to nifti paths (as str) +# fail_path: dicom to failed folder (as tuple) +# found_err: error code produced when processing +def extract_images(filedata, i, nifti_destination, flattened_to_level, failed, is16Bit): + #ds = dicom.dcmread(filedata.iloc[i].loc['file'], force=True) # read file in + #dicom2nifti.dicom_series_to_nifti(filedata.iloc[i].loc['file'], ) + found_err=None + filemapping = "" + fail_path = "" + try: + imName=os.path.split(filedata.iloc[i].loc['file'])[1][:-4] # get file name ex: IM-0107-0022 + if flattened_to_level == 'patient': + ID = filedata.iloc[i].loc['PatientID'] # Unique identifier for the Patient. + folderName = hashlib.sha224(ID.encode('utf-8')).hexdigest() + # check for existence of patient folder. Create if it does not exist. + os.makedirs(nifti_destination + folderName,exist_ok=True) + elif flattened_to_level == 'study': + ID1 = filedata.iloc[i].loc['PatientID'] # Unique identifier for the Patient. + try: + ID2 = filedata.iloc[i].loc['StudyInstanceUID'] # Unique identifier for the Study. + except: + ID2='ALL-STUDIES' + folderName = hashlib.sha224(ID1.encode('utf-8')).hexdigest() + "/" + \ + hashlib.sha224(ID2.encode('utf-8')).hexdigest() + # check for existence of the folder tree patient/study/series. Create if it does not exist. + os.makedirs(nifti_destination + folderName,exist_ok=True) + else: + ID1=filedata.iloc[i].loc['PatientID'] # Unique identifier for the Patient. + try: + ID2=filedata.iloc[i].loc['StudyInstanceUID'] # Unique identifier for the Study. + ID3=filedata.iloc[i].loc['SeriesInstanceUID'] # Unique identifier of the Series. + except: + ID2='ALL-STUDIES' + ID3='ALL-SERIES' + folderName = hashlib.sha224(ID1.encode('utf-8')).hexdigest() + "/" + \ + hashlib.sha224(ID2.encode('utf-8')).hexdigest() + "/" + \ + hashlib.sha224(ID3.encode('utf-8')).hexdigest() + # check for existence of the folder tree patient/study/series. Create if it does not exist. + os.makedirs(nifti_destination + folderName,exist_ok=True) + + + niftifile = nifti_destination+folderName + '/' + hashlib.sha224(imName.encode('utf-8')).hexdigest() + '.nii.gz' + dicom2nifti.dicom_series_to_nifti(str(filedata.iloc[i].loc['file']),niftifile) + filemapping = filedata.iloc[i].loc['file'] + ',' + niftifile + '\n' + except AttributeError as error: + found_err = error + logging.error(found_err) + fail_path = filedata.iloc[i].loc['file'], failed + '1/' + \ + os.path.split(filedata.iloc[i].loc['file'])[1][:-4]+'.dcm' + except ValueError as error: + found_err = error + logging.error(found_err) + fail_path = filedata.iloc[i].loc['file'], failed + '2/' + \ + os.path.split(filedata.iloc[i].loc['file'])[1][:-4]+'.dcm' + except BaseException as error: + found_err = error + logging.error(found_err) + fail_path = filedata.iloc[i].loc['file'], failed + '3/' + \ + os.path.split(filedata.iloc[i].loc['file'])[1][:-4]+'.dcm' + except Exception as error: + found_err = error + logging.error(found_err) + fail_path = filedata.iloc[i].loc['file'], failed + '4/' + \ + os.path.split(filedata.iloc[i].loc['file'])[1][:-4]+'.dcm' + return (filemapping, fail_path, found_err) + + +# Function when pydicom fails to read a value attempt to read as other types. +def fix_mismatch_callback(raw_elem, **kwargs): + try: + if raw_elem.VR: + values.convert_value(raw_elem.VR, raw_elem) + except BaseException as err: + for vr in kwargs['with_VRs']: + try: + values.convert_value(vr, raw_elem) + except ValueError: + pass + else: + raw_elem = raw_elem._replace(VR=vr) + return raw_elem + + +def get_path(depth, dicom_home): + directory = dicom_home + '/' + i = 0 + while i < depth: + directory += "*/" + i += 1 + return directory + "*.dcm" + + +# Function used by pydicom. +def fix_mismatch(with_VRs=['PN', 'DS', 'IS']): + """A callback function to check that RawDataElements are translatable + with their provided VRs. If not, re-attempt translation using + some other translators. + Parameters + ---------- + with_VRs : list, [['PN', 'DS', 'IS']] + A list of VR strings to attempt if the raw data element value cannot + be translated with the raw data element's VR. + Returns + ------- + No return value. The callback function will return either + the original RawDataElement instance, or one with a fixed VR. + """ + dicom.config.data_element_callback = fix_mismatch_callback + config.data_element_callback_kwargs = { + 'with_VRs': with_VRs, + } + + +def execute(pickle_file, dicom_home, output_directory, print_images, print_only_common_headers, depth, + processes, flattened_to_level, email, send_email, no_splits, is16Bit, nifti_destination, + failed, maps_directory, meta_directory, LOG_FILENAME, metadata_col_freq_threshold, t_start,dict_pickle_file): + err = None + fix_mismatch() + if processes == 0.5: # use half the cores to avoid high ram usage + core_count = int(os.cpu_count()/2) + elif processes == 0: # use all the cores + core_count = int(os.cpu_count()) + elif processes < os.cpu_count(): # use the specified number of cores to avoid high ram usage + core_count = processes + else: + core_count = int(os.cpu_count()) + # get set up to create dataframe + dirs = os.listdir(dicom_home) + # gets all dicom files. if editing this code, get filelist into the format of a list of strings, + # with each string as the file path to a different dicom file. + file_path = get_path(depth, dicom_home) + + if False : #os.path.isfile(pickle_file): + with open(pickle_file,'rb') as f: + filelist=pickle.load(f) + with open(dict_pickle_file,'rb') as f: + patient_dict = pickle.load(f) + else: + filelist=glob.glob(file_path, recursive=True) # search the folders at the depth we request and finds all dicoms + pickle.dump(filelist,open(pickle_file,'wb')) + #get all the patient folders + patient_dict = {} + volume_list =[] + for patient_path in glob.glob(f'{dicom_home}/*'): #is unique + patient = patient_path.split('/')[-1] + patient_studies = {} + for study_path in glob.glob(f"{dicom_home}/{patient}/*"): #is unique + study = study_path.split('/')[-1] + volume_list.extend( [ e for e in glob.glob(f"{dicom_home}/{patient}/{study}/*") ] ) + #patient_studies[study] = volume_dict + #patient_dict[patient] = patient_studies + #pickle.dump(patient_dict,open(dict_pickle_file,'wb')) # todo change this to be a list of files names instead. + file_chunks = np.array_split(volume_list,no_splits) + logging.info('Number of dicom files: ' + str(len(filelist))) + try: + ff = filelist[0] # load first file as a template to look at all + except IndexError: + logging.error("There is no file present in the given folder in " + file_path) + sys.exit(1) + plan = dicom.dcmread(ff, force=True) + logging.debug('Loaded the first file successfully') + + keys = [(aa) for aa in plan.dir() if (hasattr(plan, aa) and aa != 'PixelData')] + # checks for images in fields and prints where they are + for field in plan.dir(): + if (hasattr(plan, field) and field!='PixelData'): + entry = getattr(plan, field) + if type(entry) is bytes: + logging.debug(field) + logging.debug(str(entry)) + + for i,chunk in enumerate(file_chunks): + csv_destination = "{}/meta/metadata_{}.csv".format(output_directory,i) + mappings = "{}/maps/mapping_{}.csv".format(output_directory,i) + fm = open(mappings, "w+") + filemapping = 'Original DICOM file location, NIFTI location \n' + fm.write(filemapping) + + # add a check to see if the metadata has already been extracted + # step through whole file list, read in file, append fields to future dataframe of all files + + headerlist = [] + # start up a multi processing pool + # for every item in filelist send data to a subprocess and run extract_headers func + # output is then added to headerlist as they are completed (no ordering is done) + with Pool(core_count) as p: + res= p.imap_unordered(extract_headers, enumerate(chunk)) + for i,e in enumerate(res): + headerlist.append(e) + data = pd.DataFrame(headerlist) + logging.info('Chunk ' + str(i) + ' Number of fields per file : ' + str(len(data.columns))) + # find common fields + # make dataframe containing all fields and all files minus those removed in previous block + # export csv file of final dataframe + export_csv = data.to_csv(csv_destination, index = None, header=True) + fields=data.keys() + count = 0 # potential painpoint + # writting of log handled by main process + if print_images: + logging.info("Start processing Images") + pokemon = open('fail_log.txt','w') + filedata = data + total = len(chunk) + stamp = time.time() + for i in range(len(filedata)): + (fmap,fail_path,err) = extract_images(filedata, i, nifti_destination, flattened_to_level, failed, is16Bit) + if err: + count +=1 + #copyfile(fail_path[0],fail_path[1]) + print(f"err:{fail_path[1]}@{fail_path[0]}",file=pokemon) + err_msg = str(count) + ' out of ' + str(len(chunk)) + ' dicom images have failed extraction' + logging.error(err_msg) + else: + fm.write(fmap) + pokemon.close() + fm.close() + logging.info('Chunk run time: %s %s', time.time() - t_start, ' seconds!') + + logging.info('Generating final metadata file') + + col_names = dict() + all_headers = dict() + total_length = 0 + + metas = glob.glob( "{}*.csv".format(meta_directory)) + # for each meta file identify the columns that are not na's for at least 10% (metadata_col_freq_threshold) of data + for meta in metas: + m = pd.read_csv(meta,dtype='str') + d_len = m.shape[0] + total_length += d_len + + for e in m.columns: + col_pop = d_len - np.sum(m[e].isna()) # number of populated rows for this column in this metadata file + + if e in col_names: + col_names[e] += col_pop + else: + col_names[e] = col_pop + + # all_headers keeps track of number of appearances of each header. We later use this count to ensure that + # the headers we use are present in all metadata files. + if e in all_headers: + all_headers[e] += 1 + else: + all_headers[e] = 1 + + loadable_names = list() + for k in col_names.keys(): + if k in all_headers and all_headers[k] >= no_splits: # no_splits == number of batches used + if col_names[k] >= metadata_col_freq_threshold*total_length: + loadable_names.append(k) # use header only if it's present in every metadata file + + # load every metadata file using only valid columns + meta_list = list() + for meta in metas: + m = pd.read_csv(meta,dtype='str',usecols=loadable_names) + meta_list.append(m) + merged_meta = pd.concat(meta_list,ignore_index=True) + merged_meta.to_csv('{}/metadata.csv'.format(output_directory),index=False) + # getting a single mapping file + logging.info('Generatign final mapping file') + mappings = glob.glob("{}/maps/*.csv".format(output_directory)) + map_list = list() + for mapping in mappings: + map_list.append(pd.read_csv(mapping,dtype='str')) + merged_maps = pd.concat(map_list,ignore_index=True) + if print_only_common_headers == True: + mask_common_fields = merged_maps.isnull().mean() < 0.1 + common_fields = set(np.asarray(merged_maps.columns)[mask_common_fields]) + merged_maps = merged_maps[common_fields] + merged_maps.to_csv('{}/mapping.csv'.format(output_directory),index=False) + + if send_email == True: + subprocess.call('echo "Niffler has successfully completed the nifti conversion" | mail -s "The image conversion' + ' has been complete" {0}'.format(email), shell=True) + # Record the total run-time + logging.info('Total run time: %s %s', time.time() - t_start, ' seconds!') + logging.shutdown() # Closing logging file after extraction is done !! + logs = [] + logs.append(err) + logs.append("The nifti conversion is SUCCESSFUL") + return logs + + +if __name__ == "__main__": + with open('config.json', 'r') as f: + niffler = json.load(f) + + # CLI Argument Parser + ap = argparse.ArgumentParser() + + ap.add_argument("--DICOMHome", default=niffler['DICOMHome']) + ap.add_argument("--OutputDirectory", default=niffler['OutputDirectory']) + ap.add_argument("--Depth", default=niffler['Depth']) + ap.add_argument("--SplitIntoChunks", default=niffler['SplitIntoChunks']) + ap.add_argument("--PrintImages", default=niffler['PrintImages']) + ap.add_argument("--CommonHeadersOnly", default=niffler['CommonHeadersOnly']) + ap.add_argument("--UseProcesses", default=niffler['UseProcesses']) + ap.add_argument("--FlattenedToLevel", default=niffler['FlattenedToLevel']) + ap.add_argument("--is16Bit", default=niffler['is16Bit']) + ap.add_argument("--SendEmail", default=niffler['SendEmail']) + ap.add_argument("--YourEmail", default=niffler['YourEmail']) + + args = vars(ap.parse_args()) + + if len(args) > 0: + initialize_config_and_execute(args) + else: + initialize_config_and_execute(niffler) diff --git a/modules/nifti-extraction/README.md b/modules/nifti-extraction/README.md new file mode 100644 index 0000000..c9b00c4 --- /dev/null +++ b/modules/nifti-extraction/README.md @@ -0,0 +1,94 @@ +# The Niffler PNG Extractor + +The PNG Extractor converts a set of DICOM images into png images, extract metadata in a privacy-preserving manner. + + +## Configuring Niffler PNG Extractor + +Find the config.json file in the folder and modify accordingly *for each* Niffler PNG extractions. + +* *DICOMHome*: The folder where you have your DICOM files whose metadata and binary imaging data (png) must be extracted. + +* *OutputDirectory*: The root folder where Niffler produces the output after running the PNG Extractor. + +* *Depth*: How far in the folder hierarchy from the DICOMHome are the DICOM images. For example, a patient/study/series/instances.dcm hierarchy indicates a depth of 3. If the DICOM files are in the DICOMHome itself with no folder hierarchy, the depth will be 0. + +* *SplitIntoChunks*: How many chunks do you want to split the metadata extraction process into? By default, 1. Leave it as it is for most of the extractions. For extremely large batches, split it accordingly. Single chunk works for 10,000 files. So you can set it to 2, if you have 20,000 files, for example. + +* *UseProcesses*: How many of the CPU cores to be used for the Image Extraction. Default is 0, indicating all the cores. 0.5 indicates, using only half of the available cores. Any other number sets the number of cores to be used to that value. If a value more than the available cores is specified, all the cores will be used. + +* *FlattenedToLevel*: Specify how you want your folder tree to be. Default is, "patient" (produces patient/*.png). + You may change this value to "study" (patient/study/*.png) or "series" (patient/study/series/*.png). All IDs are de-identified. + +* *is16Bit*: Specifies whether to save extracted image as 16-bit image. By default, this is set to true. Please set it to false to run 8-bit extraction. + +* *SendEmail*: Do you want to send an email notification when the extraction completes? The default is true. You may disable this if you do not want to receive an email upon the completion. + +* *YourEmail*: Replace "test@test.test" with a valid email if you would like to receive an email notification. If the SendEmail property is disabled, you can leave this as is. + + +### Print the Images or Limit the Extraction to Include only the Common DICOM Attributes + +The below two fields can be left unmodified for most executions. The default values are included below for these boolean properties. + +* *PrintImages*: Do you want to print the images from these dicom files? Default is _true_. + +* *CommonHeadersOnly*: Do you want the resulting dataframe csv to contain only the common headers? Finds if less than 10% of the rows are missing this column field. To extract all the headers, default is set as _false_. + + +## Running the Niffler PNG Extractor +```bash + +$ python3 ImageExtractor.py + +# With Nohup +$ nohup python3 ImageExtractor.py > UNIQUE-OUTPUT-FILE-FOR-YOUR-EXTRACTION.out & + +# With Command Line Arguments +$ nohup python3 ImageExtractor.py --DICOMHome "/opt/data/new-study" --Depth 0 --PrintImages true --SendEmail true > UNIQUE-OUTPUT-FILE-FOR-YOUR-EXTRACTION.out & +``` +Check that the extraction is going smooth with no errors, by, + +``` +$ tail -f UNIQUE-OUTPUT-FILE-FOR-YOUR-EXTRACTION.out +``` + +## The output files and folders + +In the OutputDirectory, there will be several sub folders and directories. + +* *metadata.csv*: The metadata from the DICOM images in a csv format. + +* *mapping.csv*: A csv file that maps the DICOM -> PNG file locations. + +* *ImageExtractor.out*: The log file. + +* *extracted-images*: The folder that consists of extracted PNG images + +* *failed-dicom*: The folder that consists of the DICOM images that failed to produce the PNG images upon the execution of the Niffler PNG Extractor. Failed DICOM images are stored in 4 sub-folders named 1, 2, 3, and 4, categorizing according to their failure reason. + + +## Running the Niffler PNG Extractor with Slurm + +There is also an experimental PNG extractor implementation (ImageExtractorSlurm.py) that provides a distributed execution based on Slurm on a cluster. + + +## Troubleshooting + +If you encounter your images being ending in the failed-dicom/3 folder (the folder signifying base exception), check the +ImageExtractor.out. + +Check whether you still have conda installed and configured correctly (by running "conda"), if you observe the below error log: + +"The following handlers are available to decode the pixel data however they are missing required dependencies: GDCM (req. GDCM)" + +The above error indicates a missing gdcm, which usually happens if either it is not configured (if you did not follow the +installation steps correctly) or if conda (together with gdcm) was later broken (mostly due to a system upgrade or a manual removal of conda). + +Check whether conda is available, by running "conda" in terminal. If it is missing, install [Anaconda](https://www.anaconda.com/distribution/#download-section). + +If you just installed conda, make sure to close and open your terminal. Then, install gdcm. + +``` +$ conda install -c conda-forge -y gdcm +``` diff --git a/modules/nifti-extraction/config.json b/modules/nifti-extraction/config.json new file mode 100644 index 0000000..de9d9dd --- /dev/null +++ b/modules/nifti-extraction/config.json @@ -0,0 +1,13 @@ +{ + "DICOMHome": "/path/to/files", + "OutputDirectory": "/path/to/svae", + "Depth": 3, + "SplitIntoChunks": 3, + "PrintImages": true, + "CommonHeadersOnly": true, + "UseProcesses": 12 , + "FlattenedToLevel": "patient", + "is16Bit":true, + "SendEmail": true, + "YourEmail": "test@test.edu" +} diff --git a/modules/png-extraction/ImageExtractor.py b/modules/png-extraction/ImageExtractor.py index c4aaedb..f739225 100644 --- a/modules/png-extraction/ImageExtractor.py +++ b/modules/png-extraction/ImageExtractor.py @@ -144,9 +144,9 @@ def extract_headers(f_list_elem): except: c = False kv = get_tuples(plan) # gets tuple for field,val pairs for this file. function defined above - # dicom images should not have more than 300 + # dicom images should not have more than 300 dicom tags if len(kv)>500: - logging.debug(str(len(kv)) + " dicoms produced by " + ff) + logging.debug(str(len(kv)) + " dicom tags produced by " + ff) kv.append(('file', f_list_elem[1])) # adds my custom field with the original filepath kv.append(('has_pix_array',c)) # adds my custom field with if file has image if c: @@ -263,6 +263,8 @@ def fix_mismatch_callback(raw_elem, **kwargs): values.convert_value(vr, raw_elem) except ValueError: pass + except TypeError: + continue else: raw_elem = raw_elem._replace(VR=vr) return raw_elem diff --git a/modules/rta-extraction/README.md b/modules/rta-extraction/README.md index 6f98f93..de5f467 100644 --- a/modules/rta-extraction/README.md +++ b/modules/rta-extraction/README.md @@ -2,24 +2,34 @@ The RTA Extractor runs continuously to load the data (labs, meds and orders) in JSON format, clear the data which has been on the database for more than 24 hours and store the data in a tabular format (csv file) upon reciebing query parameters. -# Configuring Niffler RTA Extractor +## Configuring Niffler RTA Extractor -Niffler RTA Extractor must be configured as a service for it to run continuously and resume automatically even when the server restarts. Unless you are the administrator who is configuring Niffler for the first time, skip this section. +Niffler RTA Extractor must be configured as a service for it to run continuously and resume automatically when the server restarts. Unless you are the administrator who is configuring Niffler for the first time, skip this section. Find the system.json file in the service folder and modify accordingly. system.json entries are to be set *only once* for the Niffler deployment by the administrator. Once set, further extractions do not require a change. * *LabsURL*: Set the URL providing continous labs data. + * *MedsURL*: Set the URL providing continous meds data. + * *OrdersURL*: Set the URL providing continous orders data. + * *LabsDataLoadFrequency*: Time Frequency for loading labs data onto MongoDB. The frequency is to be provided in minutes. + * *MedsDataLoadFrequency*: Time Frequency for loading meds data onto MongoDB. The frequency is to be provided in minutes. + * *OrdersDataLoadFrequency*: Time Frequency for loading orders data onto MongoDB. The frequency is to be provided in minutes. + * *UserName*: Set the Username Credentials for RTA Connection. + * *PassCode*: Set the Passcode Credentials for RTA Connection. + * *MongoURI*: Set the MongoDB Connection URL. + * *MongoUserName*: Set the MongoDB Username for Credentials. + * *MongoPassCode*: Set the MongoDB Passcode for Credentials. ## Configure DICOM attributes to extract diff --git a/modules/rta-extraction/RtaExtractor.py b/modules/rta-extraction/RtaExtractor.py index 8c552d4..8a23a76 100644 --- a/modules/rta-extraction/RtaExtractor.py +++ b/modules/rta-extraction/RtaExtractor.py @@ -16,6 +16,7 @@ from warnings import resetwarnings from bson.objectid import ObjectId import pymongo +from pymongo import database from pymongo.message import delete, query import pydicom import requests @@ -33,15 +34,12 @@ import pdb from datetime import datetime, timedelta from pymongo import MongoClient -from collections import Counter - -def run_threaded(job_func): - job_thread = threading.Thread(target=job_func) - job_thread.start() # Data Loading Function -def load_json_data(url, user, passcode, db_json=None, first_index=None, second_index=None): - +def load_data(url, user, passcode, db_json=None, first_index=None, second_index=None): + ''' + Loads the json data from labs, meds and orders into corresponsing MongoDB Collection. + ''' # Parameters: # 1. url - URL to get the data. # 2. user - Username for Authorization of ResearchPACS Server. @@ -50,6 +48,7 @@ def load_json_data(url, user, passcode, db_json=None, first_index=None, second_i # 5. first_index - First index in the MongoDB Collection. (Usually a date time attribute) # 6. second_index - Second index in the MongoDB Collection. (Usually empi information attribute) + global total_data load_time = time.time() data_collection = db[db_json] data = requests.get(url, auth=(user, passcode)) @@ -57,37 +56,50 @@ def load_json_data(url, user, passcode, db_json=None, first_index=None, second_i items_data = data['items'] for record in items_data: - data_collection.insert_one(record) - data_collection.create_index( - [ - (first_index, 1), - (second_index, 1) - ] - ) + if record not in total_data: + data_collection.insert_one(record) + total_data.append(record) + data_collection.create_index( + [ + (first_index, 1), + (second_index, 1) + ] + ) + logging.info(len(total_data)) + view_data(db_json) for i in data['links']: if (i['rel'] == 'next'): url = i['href'] - load_json_data(url, user, passcode, db_json, first_index, second_index) + load_data(url, user, passcode, db_json, first_index, second_index) time_taken = round(time.time()-load_time, 2) logging.info('Spent {} seconds loading data into {}.'.format(time_taken, db_json)) # Data Clearing Function def clear_data(db_json=None): - - # Parameters + ''' + Clears the data which is older than one day from MongoDB Collection. + ''' + # Parameters: # 1. db_json - Name of the MongoDB Collection. clear_time = time.time() data_collection = db[db_json] cursor = data_collection.find({}) + if db_json == 'labs_json': + date_column = 'lab_date' + elif db_json == 'meds_json': + date_column = 'update_dt_tm' + elif db_json == 'orders_json': + date_column = 'completed_dt_tm' + for document in cursor: previous_time = datetime.now()-timedelta(days=1) previous_date = previous_time.date() - item_date = datetime.strptime(document['lab_date'], '%Y-%m-%dT%H:%M:%SZ').date() + item_date = datetime.strptime(document[date_column], '%Y-%m-%dT%H:%M:%SZ').date() diff_time = previous_date-item_date if (diff_time.total_seconds()>=0): @@ -95,9 +107,16 @@ def clear_data(db_json=None): time_taken = round(time.time()-clear_time, 2) logging.info('Spent {} seconds clearing the data from {}.'.format(time_taken, db_json)) - -# Data Filtering Function + +# Data Viewing and Filtering Function def view_data(db_json=None, user_query=None): + ''' + Display the shape, outliers and value counts of the dataframe. + ''' + # Parameters: + # 1. db_json - Name of the MongoDB Collection. + # 2. user_query - List of features. + view_time = time.time() data_collection = db[db_json] data_cursor = data_collection.find({}) @@ -105,13 +124,117 @@ def view_data(db_json=None, user_query=None): doc_list = [] for document in data_cursor: doc_list.append(document) - - df = pd.DataFrame(doc_list) - logging.info (df.shape) + if db_json == 'labs_json': + date_column = 'lab_date' + labs_df = pd.DataFrame(doc_list) + labs_df[date_column] = pd.to_datetime(labs_df[date_column].str.split('T').str[0]) + logging.info(labs_df.shape) + logging.info(str(labs_df[date_column].value_counts().to_dict())) + + logging.info('Outliers - {}'.format(labs_df[date_column][datetime.now()-labs_df[date_column] > timedelta(30)].shape[0])) + logging.info('Outliers Percentage - {}'.format(labs_df[date_column][datetime.now()-labs_df[date_column] > timedelta(30)].shape[0]/labs_df[date_column].shape[0])) + + elif db_json == 'meds_json': + date_column = 'update_dt_tm' + meds_df = pd.DataFrame(doc_list) + meds_df[date_column] = pd.to_datetime(meds_df[date_column].str.split('T').str[0]) + logging.info(meds_df.shape) + logging.info(str(meds_df[date_column].value_counts().to_dict())) + + logging.info('Outliers - {}'.format(meds_df[date_column][datetime.now()-meds_df[date_column] > timedelta(30)].shape[0])) + logging.info('Outliers Percentage - {}'.format(meds_df[date_column][datetime.now()-meds_df[date_column] > timedelta(30)].shape[0]/meds_df[date_column].shape[0])) + + elif db_json == 'orders_json': + date_column = 'completed_dt_tm' + orders_df = pd.DataFrame(doc_list) + orders_df[date_column] = pd.to_datetime(orders_df[date_column].str.split('T').str[0]) + logging.info(orders_df.shape) + logging.info(str(orders_df[date_column].value_counts().to_dict())) + + logging.info('Outliers - {}'.format(orders_df[date_column][datetime.now()-orders_df[date_column] > timedelta(30)].shape[0])) + logging.info('Outliers Percentage - {}'.format(orders_df[date_column][datetime.now()-orders_df[date_column] > timedelta(30)].shape[0]/orders_df[date_column].shape[0])) + time_taken = round(time.time()-view_time, 2) logging.info('Spent {} seconds viewing the data of {}.'.format(time_taken, db_json)) +def load_labs_data(): + ''' + A buffer function between main and load_data functions for labs data. + ''' + global total_data + global UserName + global PassCode + + now = datetime.now() + logging.info('Loading Labs Data') + logging.info(now.strftime('%Y-%m-%d %H:%M:%S')) + + load_data(url=LabsURL, user=UserName, passcode=PassCode, db_json='labs_json', first_index='lab_date', second_index='empi') + +def load_meds_data(): + ''' + A buffer function between main and load_data functions for meds data. + ''' + global total_data + global UserName + global PassCode + + now = datetime.now() + logging.info('Loading Meds Data') + logging.info(now.strftime('%Y-%m-%d %H:%M:%S')) + + load_data(url=MedsURL, user=UserName, passcode=PassCode, db_json='meds_json', first_index='update_dt_tm', second_index='empi') + +def load_orders_data(): + ''' + A buffer function between main and load_data functions for orders data. + ''' + global total_data + global UserName + global PassCode + + now = datetime.now() + logging.info('Loading Orders Data') + logging.info(now.strftime('%Y-%m-%d %H:%M:%S')) + + load_data(url=OrdersURL, user=UserName, passcode=PassCode, db_json='orders_json', first_index='completed_dt_tm', second_index='empi') + +def clear_labs_data(): + ''' + A buffer function between main and clear_data functions for labs data. + ''' + now = datetime.now() + logging.info('Clearing Labs Data') + logging.info(now.strftime('%Y-%m-%d %H:%M:%S')) + clear_data(db_json='labs_json') + +def clear_meds_data(): + ''' + A buffer function between main and clear_data functions for meds data. + ''' + now = datetime.now() + logging.info('Clearing Meds Data') + logging.info(now.strftime('%Y-%m-%d %H:%M:%S')) + clear_data(db_json='meds_json') + +def clear_orders_data(): + ''' + A buffer function between main and clear_data functions for orders data. + ''' + now = datetime.now() + logging.info('Clearing Orders Data') + logging.info(now.strftime('%Y-%m-%d %H:%M:%S')) + clear_data(db_json='orders_json') + +def print_function(): + now = datetime.now() + print ('Time - {}'.format(now)) + print ('Hit the print function') + +def run_threaded(job_func): + job_thread = threading.Thread(target=job_func) + job_thread.start() if __name__ == "__main__": log_format = '%(levelname)s %(asctime)s - %(message)s' @@ -119,24 +242,21 @@ def view_data(db_json=None, user_query=None): format=log_format, filemode='w') logging = logging.getLogger() - with open('service/system.json', 'r') as f: + with open('system.json', 'r') as f: niffler = json.load(f) # Get constants from system.json - Labs_FolderPath = niffler['LabsFilePath'] - Meds_FolderPath = niffler['MedsFilePath'] - Orders_FolderPath = niffler['OrdersFilePath'] LabsURL = niffler['LabsURL'] MedsURL = niffler['MedsURL'] OrdersURL = niffler['OrdersURL'] Labs_ExtractionFrequency = niffler['LabsDataExtractionFrequency'] Meds_ExtractionFrequency = niffler['MedsDataExtractionFrequency'] Orders_ExtractionFrequency = niffler['OrdersDataExtractionFrequency'] + UserName = niffler['UserName'] + PassCode = niffler['PassCode'] Mongo_URI = niffler['MongoURI'] Mongo_UserName = niffler['MongoUserName'] Mongo_PassCode = niffler['MongoPassCode'] - UserName = niffler['UserName'] - PassCode = niffler['PassCode'] # Connect to MongoDB connection_start_time = time.time() @@ -148,23 +268,15 @@ def view_data(db_json=None, user_query=None): logging.info('Time taken to establish MongoDB Connection - {}'.format(round(time.time() - connection_start_time), 2)) db = client.database - - schedule.every(Labs_ExtractionFrequency).minutes.do(run_threaded, - load_json_data(url=LabsURL, - user=UserName, passcode=PassCode, db_json='labs_json', - first_index='lab_date', second_index='empi')) - schedule.every(Labs_ExtractionFrequency).minutes.do(run_threaded, - load_json_data(url=MedsURL, - user=UserName, passcode=PassCode, db_json='meds_json', - first_index='update_dt_tm', second_index='empi')) - schedule.every(Labs_ExtractionFrequency).minutes.do(run_threaded, - load_json_data(url=OrdersURL, - user=UserName, passcode=PassCode, db_json='orders_json', - irst_index='completed_dt_tm', second_index='empi')) - - schedule.every(1).day.at("23:59").do(run_threaded, clear_data(db_json='labs_json')) - schedule.every(1).day.at("23:59").do(run_threaded, clear_data(db_json='meds_json')) - schedule.every(1).day.at("23:59").do(run_threaded, clear_data(db_json='orders_json')) + total_data = [] + + schedule.every(Meds_ExtractionFrequency).minutes.do(run_threaded, load_labs_data) + schedule.every(Meds_ExtractionFrequency).minutes.do(run_threaded, load_meds_data) + schedule.every(Meds_ExtractionFrequency).minutes.do(run_threaded, load_orders_data) + + schedule.every(1).day.at("03:00").do(run_threaded, clear_labs_data) + schedule.every(1).day.at("03:15").do(run_threaded, clear_meds_data) + schedule.every(1).day.at("03:25").do(run_threaded, clear_orders_data) while True: schedule.run_pending() diff --git a/modules/suvpar/Strip.py b/modules/suvpar/Strip.py index 7333283..32debe5 100644 --- a/modules/suvpar/Strip.py +++ b/modules/suvpar/Strip.py @@ -3,9 +3,12 @@ import json logging.basicConfig(level=logging.INFO) +df = {} +output_csv = {} -def initialize_config_and_execute(): +def initialize(): + global output_csv, df with open('config.json', 'r') as f: config = json.load(f) @@ -16,10 +19,23 @@ def initialize_config_and_execute(): text_file = open(feature_file, "r") feature_list = text_file.read().split('\n') - filtered_csv = pandas.read_csv(filename, usecols=lambda x: x in feature_list, sep=',') - df = pandas.DataFrame(filtered_csv) + df = pandas.read_csv(filename, usecols=lambda x: x in feature_list, sep=',') + logging.info(df['ImageType']) + + +def strip(): + global df + # Consider only MR. Remove modalities such as PR and SR that are present in the original data. + df = df[df.Modality == "MR"] + # Consider only the ImageType that are true. + df = df[df['ImageType'].str.contains("ORIGINAL")] + + +def write(): df.to_csv(output_csv) if __name__ == "__main__": - initialize_config_and_execute() + initialize() + strip() + write() diff --git a/modules/suvpar/featureset1.txt b/modules/suvpar/featureset1.txt new file mode 100644 index 0000000..7015f96 --- /dev/null +++ b/modules/suvpar/featureset1.txt @@ -0,0 +1,23 @@ +AccessionNumber +InstitutionAddress +StudyDate +StudyTime +DeviceSerialNumber +PatientID +StudyDescription +ProtocolName +PerformedProcedureStepDescription +NumberOfStudyRelatedSeries +NumberOfStudyRelatedInstances +AcquisitionDate +AcquisitionTime +SeriesDate +SeriesTime +SeriesNumber +SeriesDescription +ImageType +InstanceNumber +AcquisitionDuration +Modality +Manufacturer +ManufacturerModelName \ No newline at end of file diff --git a/modules/workflows/ModalityGrouping.py b/modules/workflows/ModalityGrouping.py new file mode 100644 index 0000000..4d1cf1e --- /dev/null +++ b/modules/workflows/ModalityGrouping.py @@ -0,0 +1,29 @@ +import os, glob +import sys +import logging +import pydicom as pyd +import shutil + +def modality_split(cold_extraction_path, modality_split_path): + # iterating through all the files in cold extraction + for root, dirs, files in os.walk(cold_extraction_path): + for file in files: + if file.endswith('.dcm'): + dcm_filename = root+'/'+file + dcm_path = '/'.join(dcm_filename.split('/')[5:]) + dcm_only_folder = '/'.join(dcm_path.split('/')[:-1]) + dcm_file = pyd.dcmread(dcm_filename) + dcm_modality = dcm_file.Modality + + # print (dcm_modality, dcm_only_folder) + isExist = os.path.exists(modality_split_path+str(dcm_modality)+'/'+str(dcm_only_folder)) + if not isExist: + os.makedirs(modality_split_path+str(dcm_modality)+'/'+str(dcm_only_folder)) + print (cold_extraction_path+str(dcm_path), modality_split_path+str(dcm_modality)+'/'+str(dcm_only_folder)+'/'+str(dcm_path.split('/')[-1])) + shutil.copy2(src=cold_extraction_path+str(dcm_path), dst=modality_split_path+str(dcm_modality)+'/'+str(dcm_only_folder)+'/'+str(dcm_path.split('/')[-1])) + +if __name__ == "__main__": + cold_extraction_path = sys.argv[1] + modality_split_path = sys.argv[2] + print ('Starting Modality Grouping') + modality_split(cold_extraction_path, modality_split_path)