Skip to content

Commit

Permalink
Refactor cold-extraction module for the front-end
Browse files Browse the repository at this point in the history
Changes to cold-extraction for the front-end module
  • Loading branch information
pradeeban authored Jul 1, 2021
2 parents 29f3711 + 7b2238b commit f704774
Show file tree
Hide file tree
Showing 4 changed files with 438 additions and 180 deletions.
367 changes: 225 additions & 142 deletions modules/cold-extraction/ColdDataRetriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,109 +15,77 @@

from collections import defaultdict

config = defaultdict(lambda: None)
# Read Default config.json file
with open('config.json', 'r') as f:
tmp_config = json.load(f)
config.update(tmp_config)

# CLI Argument Parser
ap = argparse.ArgumentParser()

ap.add_argument("--NifflerSystem", default=config['NifflerSystem'],
help="Path to json file with Niffler System Information.")
ap.add_argument("--StorageFolder",
default=config['StorageFolder'], help="StoreSCP config: Storage Folder. Refer Readme.md")
ap.add_argument("--FilePath", default=config['FilePath'],
help="StoreSCP config: FilePath, Refer configuring config.json in Readme.md.")
ap.add_argument("--CsvFile", default=config['CsvFile'],
help="Path to CSV file for extraction. Refer Readme.md.")
ap.add_argument("--ExtractionType", default=config['ExtractionType'],
help="One of the supported extraction type for Cold Data extraction. Refer Readme.md.")
ap.add_argument("--AccessionIndex", default=config['AccessionIndex'], type=int,
help="Set the CSV column index of AccessionNumber for extractions with Accessions.")
ap.add_argument("--PatientIndex", default=config['PatientIndex'], type=int,
help="Set the CSV column index of EMPI for extractions with EMPI and an accession or EMPI and a date.")
ap.add_argument("--DateIndex", default=config['DateIndex'], type=int,
help="Set the CSV column index of Date(StudyDate, AcquisitionDate) for extractions with EMPI and a date.")
ap.add_argument("--DateType", default=config['DateType'],
help="DateType can range from AcquisitionDate, StudyDate, etc. Refer Readme.md.")
ap.add_argument("--DateFormat", default=config['DateFormat'],
help="DateFormat can range from %Y%m%d, %m/%d/%y, %m-%d-%y, %%m%d%y, etc. Refer Readme.md.")
ap.add_argument("--SendEmail", default=config['SendEmail'], type=bool,
help="Send email when extraction is complete. Default false")
ap.add_argument("--YourEmail", default=config['YourEmail'],
help="A valid email, if send email is enabled.")

args = vars(ap.parse_args())

#Get variables for StoreScp from config.json.
storage_folder = args['StorageFolder']
file_path = args['FilePath']

# Get variables for the each on-demand extraction from config.json
csv_file = args['CsvFile']
extraction_type = args['ExtractionType']
accession_index = args['AccessionIndex']
patient_index = args['PatientIndex']
date_index = args['DateIndex']
date_type = args['DateType']
date_format = args['DateFormat']
email = args['YourEmail']
send_email = args['SendEmail']

# Reads the system_json file.
system_json = args['NifflerSystem']

with open(system_json, 'r') as f:
niffler = json.load(f)

# Get constants from system.json
DCM4CHE_BIN = niffler['DCM4CHEBin']
SRC_AET = niffler['SrcAet']
QUERY_AET = niffler['QueryAet']
DEST_AET = niffler['DestAet']
NIGHTLY_ONLY = niffler['NightlyOnly']
START_HOUR = niffler['StartHour']
END_HOUR = niffler['EndHour']
IS_EXTRACTION_NOT_RUNNING = True
NIFFLER_ID = niffler['NifflerID']
MAX_PROCESSES = niffler['MaxNifflerProcesses']

SEPARATOR = ','

accessions = []
patients = []
dates = []

storescp_processes = 0
niffler_processes = 0

nifflerscp_str = "storescp.*{0}".format(QUERY_AET)
qbniffler_str = 'ColdDataRetriever'

niffler_log = 'niffler' + str(NIFFLER_ID) + '.log'

logging.basicConfig(filename=niffler_log,level=logging.INFO)
logging.getLogger('schedule').setLevel(logging.WARNING)

# Variables to track progress between iterations.
extracted_ones = list()

# By default, assume that this is a fresh extraction.
resume = False

# All extracted files from the csv file are saved in a respective .pickle file.
try:
with open(csv_file +'.pickle', 'rb') as f:
extracted_ones = pickle.load(f)
# Since we have successfully located a pickle file, it indicates that this is a resume.
resume = True
except:
logging.info("No existing pickle file found. Therefore, initialized with empty value to track the progress to {0}.pickle.".format(csv_file))

# record the start time
t_start = time.time()
def initialize_Values(valuesDict):
global storescp_processes, niffler_processes, nifflerscp_str, qbniffler_str
global storage_folder, file_path, csv_file, extraction_type, accession_index, patient_index, date_index, date_type, date_format, email, send_email, system_json
global DCM4CHE_BIN, SRC_AET, QUERY_AET, DEST_AET, NIGHTLY_ONLY, START_HOUR, END_HOUR, IS_EXTRACTION_NOT_RUNNING, NIFFLER_ID, MAX_PROCESSES, SEPARATOR
global accessions, patients, dates, niffler_log, resume, length

storage_folder = valuesDict['storage_folder']
file_path = valuesDict['file_path']
csv_file = valuesDict['CsvFile']
extraction_type = valuesDict['extraction_type']
accession_index = int(valuesDict['accession_index'])
patient_index = int(valuesDict['patient_index'])
date_index = int(valuesDict['date_index'])
date_type = valuesDict['date_type']
date_format = valuesDict['date_format']
email = valuesDict['email']
send_email = bool(valuesDict['send_email'])
system_json = valuesDict['NifflerSystem']

# Reads the system_json file.
with open(system_json, 'r') as f:
niffler = json.load(f)

# Get constants from system.json
DCM4CHE_BIN = niffler['DCM4CHEBin']
SRC_AET = niffler['SrcAet']
QUERY_AET = niffler['QueryAet']
DEST_AET = niffler['DestAet']
NIGHTLY_ONLY = niffler['NightlyOnly']
START_HOUR = niffler['StartHour']
END_HOUR = niffler['EndHour']
IS_EXTRACTION_NOT_RUNNING = True
NIFFLER_ID = niffler['NifflerID']
MAX_PROCESSES = niffler['MaxNifflerProcesses']

SEPARATOR = ','

accessions = []
patients = []
dates = []

storescp_processes = 0
niffler_processes = 0

nifflerscp_str = "storescp.*{0}".format(QUERY_AET)
qbniffler_str = 'ColdDataRetriever'

niffler_log = 'niffler' + str(NIFFLER_ID) + '.log'

logging.basicConfig(filename=niffler_log,level=logging.INFO)
logging.getLogger('schedule').setLevel(logging.WARNING)

# Variables to track progress between iterations.
global extracted_ones
extracted_ones = list()

# By default, assume that this is a fresh extraction.
resume = False

# All extracted files from the csv file are saved in a respective .pickle file.
try:
with open(csv_file +'.pickle', 'rb') as f:
extracted_ones = pickle.load(f)
# Since we have successfully located a pickle file, it indicates that this is a resume.
resume = True
except:
logging.info("No existing pickle file found. Therefore, initialized with empty value to track the progress to {0}.pickle.".format(csv_file))

# record the start time
t_start = time.time()
run_cold_extraction()

# Check and kill the StoreScp processes.
def check_kill_process():
Expand Down Expand Up @@ -161,33 +129,34 @@ def initialize():
subprocess.call("{0}/storescp --accept-unknown --directory {1} --filepath {2} -b {3} > storescp.out &".format(DCM4CHE_BIN, storage_folder, file_path, QUERY_AET), shell=True)



with open(csv_file, newline='') as f:
reader = csv.reader(f)
next(f)
for row in reader:
row = [x.strip() for x in row]
if (extraction_type == 'empi_date'):
if not ((row[patient_index] == "") or (row[date_index] == "")):
patients.append(row[patient_index])
temp_date = row[date_index]
dt_stamp = datetime.datetime.strptime(temp_date, date_format)
date_str = dt_stamp.strftime('%Y%m%d')
dates.append(date_str)
length = len(patients)
elif (extraction_type == 'empi'):
if not ((row[patient_index] == "")):
patients.append(row[patient_index])
length = len(patients)
elif (extraction_type == 'accession'):
if not ((row[accession_index] == "")):
accessions.append(row[accession_index])
length = len(accessions)
elif (extraction_type == 'empi_accession'):
if not ((row[patient_index] == "") or (row[accession_index] == "")):
patients.append(row[patient_index])
accessions.append(row[accession_index])
length = len(accessions)
def read_csv():
global length
with open(csv_file, newline='') as f:
reader = csv.reader(f)
next(f)
for row in reader:
row = [x.strip() for x in row]
if (extraction_type == 'empi_date'):
if not ((row[patient_index] == "") or (row[date_index] == "")):
patients.append(row[patient_index])
temp_date = row[date_index]
dt_stamp = datetime.datetime.strptime(temp_date, date_format)
date_str = dt_stamp.strftime('%Y%m%d')
dates.append(date_str)
length = len(patients)
elif (extraction_type == 'empi'):
if not ((row[patient_index] == "")):
patients.append(row[patient_index])
length = len(patients)
elif (extraction_type == 'accession'):
if not ((row[accession_index] == "")):
accessions.append(row[accession_index])
length = len(accessions)
elif (extraction_type == 'empi_accession'):
if not ((row[patient_index] == "") or (row[accession_index] == "")):
patients.append(row[patient_index])
accessions.append(row[accession_index])
length = len(accessions)


# Run the retrieval only once, when the extraction script starts, and keep it running in a separate thread.
Expand All @@ -202,6 +171,7 @@ def run_retrieval():

# The core DICOM on-demand retrieve process.
def retrieve():
global length
# For the cases that have the typical EMPI and Accession values together.
if (extraction_type == 'empi_accession'):
# Create our Identifier (query) dataset
Expand Down Expand Up @@ -305,16 +275,129 @@ def run_threaded(job_func):
job_thread = threading.Thread(target=job_func)
job_thread.start()


# The thread scheduling
schedule.every(1).minutes.do(run_threaded, run_retrieval)
schedule.every(10).minutes.do(run_threaded, update_pickle)

# Keep running in a loop.
while True:
def run_cold_extraction():
read_csv()
# The thread scheduling
schedule.every(1).minutes.do(run_threaded, run_retrieval)
schedule.every(10).minutes.do(run_threaded, update_pickle)

# # Keep running in a loop.
while True:
try:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
check_kill_process()
logging.shutdown()
sys.exit(0)

if __name__ == "__main__":
global storescp_processes, niffler_processes, nifflerscp_str, qbniffler_str
global storage_folder, file_path, csv_file, extraction_type, accession_index, patient_index, date_index, date_type, date_format, email, send_email
global DCM4CHE_BIN, SRC_AET, QUERY_AET, DEST_AET, NIGHTLY_ONLY, START_HOUR, END_HOUR, IS_EXTRACTION_NOT_RUNNING, NIFFLER_ID, MAX_PROCESSES, SEPARATOR
global accessions, patients, dates, niffler_log, resume, length

config = defaultdict(lambda: None)
# Read Default config.json file
with open('config.json', 'r') as f:
tmp_config = json.load(f)
config.update(tmp_config)

# CLI Argument Parser
ap = argparse.ArgumentParser()

ap.add_argument("--NifflerSystem", default=config['NifflerSystem'],
help="Path to json file with Niffler System Information.")
ap.add_argument("--StorageFolder",
default=config['StorageFolder'], help="StoreSCP config: Storage Folder. Refer Readme.md")
ap.add_argument("--FilePath", default=config['FilePath'],
help="StoreSCP config: FilePath, Refer configuring config.json in Readme.md.")
ap.add_argument("--CsvFile", default=config['CsvFile'],
help="Path to CSV file for extraction. Refer Readme.md.")
ap.add_argument("--ExtractionType", default=config['ExtractionType'],
help="One of the supported extraction type for Cold Data extraction. Refer Readme.md.")
ap.add_argument("--AccessionIndex", default=config['AccessionIndex'], type=int,
help="Set the CSV column index of AccessionNumber for extractions with Accessions.")
ap.add_argument("--PatientIndex", default=config['PatientIndex'], type=int,
help="Set the CSV column index of EMPI for extractions with EMPI and an accession or EMPI and a date.")
ap.add_argument("--DateIndex", default=config['DateIndex'], type=int,
help="Set the CSV column index of Date(StudyDate, AcquisitionDate) for extractions with EMPI and a date.")
ap.add_argument("--DateType", default=config['DateType'],
help="DateType can range from AcquisitionDate, StudyDate, etc. Refer Readme.md.")
ap.add_argument("--DateFormat", default=config['DateFormat'],
help="DateFormat can range from %Y%m%d, %m/%d/%y, %m-%d-%y, %%m%d%y, etc. Refer Readme.md.")
ap.add_argument("--SendEmail", default=config['SendEmail'], type=bool,
help="Send email when extraction is complete. Default false")
ap.add_argument("--YourEmail", default=config['YourEmail'],
help="A valid email, if send email is enabled.")

args = vars(ap.parse_args())

#Get variables for StoreScp from config.json.
storage_folder = args['StorageFolder']
file_path = args['FilePath']

# Get variables for the each on-demand extraction from config.json
csv_file = args['CsvFile']
extraction_type = args['ExtractionType']
accession_index = args['AccessionIndex']
patient_index = args['PatientIndex']
date_index = args['DateIndex']
date_type = args['DateType']
date_format = args['DateFormat']
email = args['YourEmail']
send_email = args['SendEmail']

# Reads the system_json file.
system_json = args['NifflerSystem']

with open(system_json, 'r') as f:
niffler = json.load(f)

# Get constants from system.json
DCM4CHE_BIN = niffler['DCM4CHEBin']
SRC_AET = niffler['SrcAet']
QUERY_AET = niffler['QueryAet']
DEST_AET = niffler['DestAet']
NIGHTLY_ONLY = niffler['NightlyOnly']
START_HOUR = niffler['StartHour']
END_HOUR = niffler['EndHour']
IS_EXTRACTION_NOT_RUNNING = True
NIFFLER_ID = niffler['NifflerID']
MAX_PROCESSES = niffler['MaxNifflerProcesses']

SEPARATOR = ','

accessions = []
patients = []
dates = []

storescp_processes = 0
niffler_processes = 0

nifflerscp_str = "storescp.*{0}".format(QUERY_AET)
qbniffler_str = 'ColdDataRetriever'

niffler_log = 'niffler' + str(NIFFLER_ID) + '.log'

logging.basicConfig(filename=niffler_log,level=logging.INFO)
logging.getLogger('schedule').setLevel(logging.WARNING)

# Variables to track progress between iterations.
extracted_ones = list()

# By default, assume that this is a fresh extraction.
resume = False

# All extracted files from the csv file are saved in a respective .pickle file.
try:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
check_kill_process()
sys.exit(0)
with open(csv_file +'.pickle', 'rb') as f:
extracted_ones = pickle.load(f)
# Since we have successfully located a pickle file, it indicates that this is a resume.
resume = True
except:
logging.info("No existing pickle file found. Therefore, initialized with empty value to track the progress to {0}.pickle.".format(csv_file))

# record the start time
t_start = time.time()
run_cold_extraction()
Loading

0 comments on commit f704774

Please sign in to comment.