-
Notifications
You must be signed in to change notification settings - Fork 4
/
bids_utils.py
118 lines (90 loc) · 4.69 KB
/
bids_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def gather_nifti_file_paths(dataset_folder, creds_path=None):
import os
s3_prefix="s3://s3.amazonaws.com"
file_path_list=[]
# paths that include s3:// are assumed to live in AWS Simple Storage Service
if "s3://" in dataset_folder:
try:
from indi_aws import fetch_creds
except Exception as e:
print "Error ({0:s}): Could not import indi_aws package".format(e.message)
raise(e)
try:
s3_path_vals=(dataset_folder.replace(s3_prefix,"")).split('/')
bucket_name = s3_path_vals[1]
data_path="/".join(s3_path_vals[2:])
except Exception as e:
print "Error ({0:s}): There is a problem with s3 path {1:s}".format(e.message,dataset_folder)
raise(e)
print "Extracting NIfTI paths from s3 bucket {0:s}::{1:s})".format(bucket_name,data_path)
bucket = fetch_creds.return_bucket(creds_path, bucket_name)
# Build S3-subjects to download
for bk in bucket.objects.filter(Prefix=data_path):
if str(bk.key).endswith(".nii") or str(bk.key).endswith(".nii.gz"):
file_path_list.append(os.path.join(s3_prefix,bucket_name,str(bk.key)))
else:
print "Extracting NIfTI paths from local filesystem"
for root, folders, files in os.walk(os.path.abspath(dataset_folder)):
for filename in files:
if filename.endswith('.nii') or filename.endswith('.nii.gz'):
file_path_list.append(os.path.join(root,filename))
if not file_path_list:
raise Exception( "Did not find any nifti files in %s"%(dataset_folder) )
return(file_path_list)
def extract_bids_data( file_path_list, inclusion_list=None ):
import os
# iterate through the files and put them into a dictionary, all of the information that we need to do this
# is in the filename
sub_dict = {}
for file_path in file_path_list:
filename=os.path.basename(file_path)
try:
# discard the file extension and split filename into key-value chunks, the last chunk being the series type
f_chunks = (filename.split(".")[0]).split("_")
# make a dictionary from the key-value chunks
f_dict = {chunk.split("-")[0]:"-".join(chunk.split("-")[1:]) for chunk in f_chunks[:-1]}
f_dict["series"] = f_chunks[-1]
except Exception as e:
print "Error (%s): gather_bids_data, %s does not appear to be in BIDS format"%(e.message,filename)
continue
if "sub" not in f_dict.keys():
print u"Error (missing 'sub-' key): {0:s} does not appear to be in BIDS format".format(filename)
continue
# add sub- onto the front of subject name to preserve bids-ness
f_dict["sub"] = "-".join(["sub",f_dict["sub"]])
# straighten out the session
if "ses" not in f_dict.keys():
f_dict["ses"] = "1"
# add ses- onto the front of session name to preserve bids-ness
f_dict["ses"] = "-".join(["ses",f_dict["ses"]])
# determine whether the scan is anatomical or functional, we don't know how to handle anything but T1w
# and BOLD for now
if "t1w" in f_dict["series"].lower():
scan_type = "anatomical_scan"
elif "bold" in f_dict["series"].lower():
scan_type = "functional_scan"
else:
print u"QAP currently does not support {0:s} scans".format(f_dict["series"])
continue
if not inclusion_list or f_dict["sub"] in inclusion_list:
# make sure that our different levels of dictionaries exist
if f_dict["sub"] not in sub_dict.keys():
sub_dict[f_dict["sub"]] = {}
if f_dict["ses"] not in sub_dict[f_dict["sub"]].keys():
sub_dict[f_dict["sub"]][f_dict["ses"]]={}
if scan_type not in sub_dict[f_dict["sub"]][f_dict["ses"]].keys():
sub_dict[f_dict["sub"]][f_dict["ses"]][scan_type]={}
# calculate a key for the scan file
f_key=f_dict["series"]
if "functional_scan" in scan_type:
if not "task" in f_dict:
print "Error (missing 'task-' key), Functional scan {0:s}".format(filename) + \
" does not appear to be in BIDS format"
continue
else:
f_key = "_".join([f_key,"-".join(["task",f_dict["task"]])])
if "acq" in f_dict:
f_key = "_".join([f_key,"-".join(["acq",f_dict["acq"]])])
# insert the full path to the scan into dictionary
sub_dict[f_dict["sub"]][f_dict["ses"]][scan_type][f_key] = file_path
return(sub_dict)