-
Notifications
You must be signed in to change notification settings - Fork 0
/
processRecordings.py
executable file
·235 lines (207 loc) · 10.8 KB
/
processRecordings.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
#!/home/david/miniconda/envs/klusta/bin/python
import psutil
import time
import subprocess
import os
import glob
import fnmatch
import socket
import shutil
import argparse
import xml.etree.ElementTree as ET
# import resample
# TODO
# - add behavior tracking extraction
# - add LFP extraction
ssdDirectory = '/home/david/to_cut/autoclustered/'
ssdCompName = 'hyperion'
def main(args):
dataFolder = args.dataFolder # directory with recording subdirectories
numShanks = args.numShanks# set this value to the number of shanks (spike groups) to cluster
waitTime = args.waitTime # time interval, in seconds, in between starting new extraction/clustering jobs
numJobs = args.numJobs# max number of jobs to run at once
cpuLimit = args.cpuLimit # max cpu usage allowed before starting new jobs
repoPath = args.repoPath
print('repo path is : ' + repoPath)
while True: # this is the song that never ends....
os.chdir(dataFolder)
print('searching for unprocessed recordings...')
for dirName, subdirList, fileList in os.walk(dataFolder):
for file in fileList:
# check that a .dat exists in this folder and matches the
# directory name
if file.startswith(dirName.split('/')[-1]) & file.endswith(".dat"):
os.chdir(os.path.abspath(dirName)) # we are now in the recording session folder...
xmlfile = glob.glob("*xml")
extractLFP(dirName,file,xmlfile,repoPath) # if no LFP yet, we make one
# check if shank dirs exist and make them if they don't
checkShankDirsExist(subdirList, dirName, numShanks, xmlfile,repoPath)
for root, shankdirs, defaultFiles in os.walk(dirName):
for shank in shankdirs: # iterate through shank subdirectories
# if the shank hasn't already been clustered and its directory name is less than 3 characters
if not fnmatch.fnmatch(shank, '_klust*') and len(shank) < 3:
os.chdir(shank) # now in shank directory
for file in os.listdir('.'):
# double check there's a prm file
if fnmatch.fnmatch(file, '*.prm'):
# you shall not pass... until other
# jobs have finished.
checkJobLimits(cpuLimit, numJobs, waitTime)
# check that spike extraction hasn't
# been done
if not any(fnmatch.fnmatch(i, '*.kwik') for i in os.listdir('.')):
# startClusterJob(root, file)
print('do nothing')
# check if there is a log file
status = getFolderStatus()
startAutoClustering(shank, dirName,repoPath,status)
copyToSSD(
ssdCompName, ssdDirectory, root, shank, status)
os.chdir('..') # return to recording directory
time.sleep(waitTime) # it goes on and on my friends...
def getCurrentJobs():
detekt = "phy"
kk = "klustakwik"
klusta = "klusta"
KK = "Klustakwik" # upper case version on the servers
matlab = "MATLAB"
count = 0
for proc in psutil.process_iter():
if proc.name() == kk:
count += 1
if proc.name() == detekt:
count += 1
if proc.name() == klusta:
count += 1
if proc.name() == KK:
count += 1
if proc.name() == matlab:
count += 1
return count
def getFolderStatus():
klg = glob.glob('*.klg.*')
print(klg)
if len(klg) > 0:
with open(klg[0], "rb") as f:
if os.path.getsize(klg[0]) > 200: # checks that file has more than 1 byte written to it
f.seek(-2, 2) # Jump to the second last byte.
while f.read(1) != "\n": # Until EOL is found...
# ...jump back the read byte plus one more.
f.seek(-2, 1)
last = f.readline() # Read last line.
status = last.split(" ")[-1].split(".")[0]
status = status.split("\n")[0] # removes EOF
else:
status = ''
else:
status = ''
return status
def checkJobLimits(cpuLimit, numJobs, waitTime):
cpu = psutil.cpu_percent(2)
while cpu > cpuLimit:
print('current cpu usage: %f' % cpu)
# wait until resources are available
time.sleep(waitTime)
cpu = psutil.cpu_percent(2)
mem = psutil.virtual_memory() # samples virtual memory usage
while mem.percent > 97:
print('current memory usage: %f' % mem.percent)
# wait until resources are
# available
time.sleep(waitTime)
mem = psutil.virtual_memory()
while getCurrentJobs() >= numJobs:
print('waiting for %f jobs to finish...' % getCurrentJobs())
time.sleep(waitTime)
def checkShankDirsExist(subdirList, dirName, numShanks, xmlfile,repoPath):
try:
subdirList = [d for d in subdirList if not '201' in d if not
'extras' in d if not 'temp' in d if not 'Session'
in d if not 'State' in d] # removes folders that are not shank folders
if len(subdirList) < numShanks:
# this section needs to be abtracted to the number of
# shanks instead of a hard number...
print(os.path.abspath(dirName))
matlab_command = ['matlab -nodesktop -r "addpath(genpath(\'' + repoPath + '\')); \
addpath(genpath(\'/ifs/home/dwt244/buzcode\')); \
makeProbeMap(\'' + os.path.abspath(dirName) + '\',\'' + xmlfile[0] + '\');exit"']
# generate folder structure and .prm/.prb files
print(matlab_command)
subprocess.call(matlab_command[0], shell=True)
time.sleep(10) # let the process get going...
return True
except:
print('errorrr')
return False
def extractBehaviorTracking(xmlfile):
# checks if there is behavioral tracking data that needs to be synced to ephys data
# eventually this will call Process_ConvertOptitrack2Behav.m or it's
# replacement
if not os.path.isfile([xmlfile[0] + '.tracking.behavior.mat']) and \
len(glob.iglob('Session*')) > 0 or len(glob.iglob('*.tak')) > 0:
matlab_command = ['matlab -nodesktop -r "addpath(genpath(\'' + repoPath + '\')); \
Process_ConvertOptitrack2Behav(' + xmlfile[0] +');exit"']
print(matlab_command)
subprocess.call(matlab_command[0], shell=True)
def extractLFP(dirName,file,xmlfile,repoPath):
lfpFile = file.split('.')[0] + '.lfp'
# if not os.path.isfile(lfpFile): # check if LFP file exists...
# print('making LFP file for ' + os.getcwd())
# tree = ET.parse(dirName + '/' + xmlfile[0])
# root = tree.getroot()
# try:
# nChannels = int(root.find('acquisitionSystem').find('nChannels').text) # some very bad assumptions that your xml is formatted a la FMAToolbox....
# except (AttributeError):
# print('is your xml file formatted correctly? couldnt find nChannels....')
# resample.main(dirName,file,lfpFile,nChannels,20000,1250)
def startClusterJob(root, file): # starts the spike extraction/clustering process using
# if not socket.gethostname() == 'hyperion':
toRun = ['nohup klusta ' + file + ' &'] # create the klusta command to run
# run klusta job
subprocess.call(toRun[0], shell=True)
# add something here to write the computer name to the log file
f = open('complog.log', 'w')
f.write(socket.gethostname())
f.close()
print(['starting... ' + root + toRun[0]])
time.sleep(10) # let one process start before generating another
def startAutoClustering(shank, dirName,repoPath,status):
if any(fnmatch.fnmatch(status, p) for p in ['1000','abandoning', 'finishing']) and not os.path.exists("autoclustering.out"):
# check Klustakwik has finished
print(os.getcwd())
print('starting autoclustering on ' + shank + ' ..')
with open("autoclustering.out", "wb") as myfile:
myfile.write("autoclustering in progress\n")
runAutoClust = ['matlab -nodesktop -r "addpath(genpath(\'' + repoPath + '\'));'
' AutoClustering(\'' + dirName.split('/')[-1] + '\', ' + shank + ');exit"']
# making this a check_call forces matlab to complete before going to
# the next job (only one autoclustering job runs at a time)
subprocess.check_call(runAutoClust, shell=True)
def copyToSSD(ssdCompName, ssdDirectory, root, shank, status): # copies finished shanks to a SSD for manual spike sorting
if fnmatch.fnmatch(status, 'autoclustered') and socket.gethostname() == 'hyperion' and os.path.exists("autoclustering.out"):
# # checks that Autoclustering is done
print('copying ' + root + '/' + shank +
' to SSD and removing progress logfile..')
os.remove("autoclustering.out")
try:
shutil.copytree(root + '/' + shank, ssdDirectory +
root.split('/')[-2] + '/' + root.split('/')[-1] + '/' + shank)
except:
print('file exists already... not copying anything.')
# copy files to SSD
with open("nohup.out", "a") as myfile:
myfile.write("copied to SSD\n")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='This function is designed to run '\
'in the background on a data processing '\
'machine. It constantly searches through '\
'a given directory [arg1] and starts extract,'\
' clustering, and other processing jobs')
parser.add_argument('dataFolder',type=str,default=os.getcwd(),help='the folder with all of your recordings in subdirectories')
parser.add_argument('numShanks',type=float,default=10,help='number of shanks to process')
parser.add_argument('-waitTime',type=int,default=300,help='time (seconds) to wait before searching for more jobs [default = 300]')
parser.add_argument('-numJobs',type=float,default=4,help='number of jobs to run simultaneously [default = 4]')
parser.add_argument('-cpuLimit',type=float,default=100,help='cpu usage limit [default = 80]')
parser.add_argument('-repoPath',type=str,default=os.getcwd(),help='location of ephys-processing repository')
args = parser.parse_args()
main(args)