-
Notifications
You must be signed in to change notification settings - Fork 0
/
tasks.py
319 lines (229 loc) · 8.12 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
import resource
from argparse import Namespace
from resource import getrusage, RUSAGE_SELF
from typing import Tuple
import timeit as ti
from glob import glob
from multiprocessing import Pool, Process, Queue
import numpy as np
import pandas as pd
import modin.pandas as mpd
from distributed import Client
import dask.dataframe as dd
from dask_memusage import install
from dask.distributed import performance_report
def set_usage() -> int:
"""Sets the parameter for getrusage function.
:return: Parameter for getrusage.
"""
return RUSAGE_SELF
_dtype = {'ActualElapsedTime': 'float64',
'ArrDelay': 'float64',
'ArrTime': 'float64',
'DepDelay': 'float64',
'DepTime': 'float64',
'Distance': 'float64',
'CRSElapsedTime': 'float64',
'CancellationCode': 'object',
'TailNum': 'object',
'AirTime': 'float64',
'TaxiIn': 'float64',
'TaxiOut': 'float64',
'CRSDepTime': 'string'
}
cols = ['Year', 'Month', 'DayofMonth', 'DayOfWeek', 'CRSDepTime', 'DepDelay', 'CRSArrTime', 'ArrDelay', 'Origin',
'Dest']
def format_usage(usage: resource.struct_rusage) -> int:
"""Reformat usage statistics data.
:param usage: Resource usage data.
:return: Reformatted resource usage data.
"""
return usage.ru_maxrss
def gather_usage(start_time: float, queue: Queue):
"""Gather resource usage data and put them into queue.
:param start_time: Task start time.
:param queue: Queue for subprocess data storing.
"""
queue.put(format_usage(getrusage(set_usage())))
queue.put(ti.default_timer() - start_time)
def manage_subprocess(args: Namespace, task: str) -> Tuple[str, int, float]:
"""Spawns a subprocess and executes the specified task in it.
:param args: Parsed command line arguments.
:param task: Task name.
:return: Resource usage stats for the task.
"""
functions = {
u'pandas': pandas_main,
u'modin': modin_main,
u'multiproc': multiproc_main,
u'dask': dask_main,
}
# execute the subprocess
queue = Queue()
p = Process(target=functions[task], args=(args, queue), name=task)
p.start()
p.join()
p.close()
# get usage data from queue
mem_usage = queue.get()
duration = queue.get()
queue.close()
return task, mem_usage, duration
def pandas_main(args: Namespace, queue: Queue):
"""Executes the pandas task on file(s).
:param args: Parsed command line arguments.
:param queue: Queue for subprocess data storing.
"""
print(u'PANDAS started...')
start_time = ti.default_timer()
files = glob(args.path)
if len(files) == 1:
pandas_single(files[0])
elif len(files) > 1:
pandas_more(files)
else:
raise FileNotFoundError(u'Something is wrong with the files!')
# gather usage data
gather_usage(start_time, queue)
def pandas_single(file: str):
"""Execute the pandas task on a single data file.
:param file: Path to the file containing task data.
"""
df = pd.read_csv(file, dtype=_dtype, usecols=cols)
result = df['DepDelay'].mean()
print(u'Dep avg is {}'.format(result))
df.head()
def pandas_more(files: list):
"""Executes the pandas task on multiple data files.
:param files: Path to the data files.
"""
sums, counts = [], []
for file in files:
df = pd.read_csv(file, dtype=_dtype, usecols=cols)
sums.append(df['DepDelay'].sum())
counts.append(df['DepDelay'].count())
print(u'Dep avg is {}'.format(sum(sums) / sum(counts)))
def dask_main(args: Namespace, queue: Queue):
"""Executes the dask task on the cluster.
:param args: Parsed command line arguments.
:param queue: Queue for subprocess data storing.
"""
# cluster start/bind
if args.cluster is None:
client = Client()
else:
client = Client(args.cluster)
print('DASK started...')
start_time = ti.default_timer()
# print(client.dashboard_link)
# with performance_report('dask_report.html'):
# dask_task(args.path)
dask_task(args.path)
# gather usage data
gather_usage(start_time, queue)
# cluster close if it was locally started
if args.cluster is None:
client.close()
def dask_task(files: str):
"""Runs dask tasks on the specified data file.
:param files: Path to the data file(s) for task.
"""
df = dd.read_csv(files, dtype=_dtype, usecols=cols)
result = df['DepDelay'].mean().compute()
df.head()
print('Dep avg is {}'.format(result))
def multiproc_main(args: Namespace, queue: Queue):
"""Executes the multiprocessing task on file(s).
:param args: Parsed command line arguments.
:param queue: Queue for subprocess data storing.
"""
# todo try to improve multiprocessing logic and performance
print('MULTIPROC started...')
num_cores = 4
files = glob(args.path)
with Pool(num_cores) as pool:
start_time = ti.default_timer()
if len(files) == 1:
multiproc_single(files[0], pool)
elif len(files) > 1:
multiproc_more(files, pool)
else:
raise FileNotFoundError(u'Something is wrong with the files!')
# gather usage data
gather_usage(start_time, queue)
def multiproc_single(file: str, pool: Pool, control_print: bool = True, num_cores: int = 4) -> Tuple[int, int]:
"""Executes the task with Pool of processes.
:param file: Path to the data file for task.
:param pool: Pool of worker processes for task execution.
:param control_print: Determine if control output should be printed.
:param num_cores: Specify number of cores on the machine.
"""
df = pd.read_csv(file, dtype=_dtype, usecols=cols)
df_split = np.array_split(df, num_cores)
del_sum, del_cnt = 0, 0
output = pool.imap(multiproc_task, df_split)
for x, y in output:
del_sum += x
del_cnt += y
if control_print:
print('Dep avg is {}'.format(del_sum / del_cnt))
return del_sum, del_cnt
def multiproc_more(files: list, pool: Pool):
"""Executes the task over multiple files.
:param files: Path to the files.
:param pool: Pool of worker processes for task execution.
"""
sums, counts = [], []
for file in files:
del_sum, del_cnt = multiproc_single(file, pool, False)
sums.append(del_sum)
counts.append(del_cnt)
print('Dep avg is {}'.format(sum(sums) / sum(counts)))
def multiproc_task(df: pd.DataFrame) -> Tuple[int, int]:
"""Runs multiprocessing task on the specified part of the DataFrame.
:param df: Part of the DataFrame.
:return: Tuple of intermediate task results.
"""
del_sum = df['DepDelay'].sum()
del_cnt = df['DepDelay'].count()
return del_sum, del_cnt
def modin_main(args: Namespace, queue: Queue):
"""Executes the modin task on the cluster.
:param args: Parsed command line arguments.
:param queue: Queue for subprocess data storing.
"""
# cluster start/bind
files = glob(args.path)
if args.cluster is None:
client = Client()
else:
client = Client(args.cluster)
start_time = ti.default_timer()
print('MODIN started...')
if len(files) == 1:
modin_single(files[0])
elif len(files) > 1:
modin_more(files)
# gather usage data
gather_usage(start_time, queue)
# cluster close if it was locally started
if args.cluster is None:
client.close()
def modin_single(file: str):
"""Runs modin task on the specified data file.
:param file: Path to the data file(s) for task.
"""
df = mpd.read_csv(file, dtype=_dtype, usecols=cols)
df.head()
result = df['DepDelay'].mean()
print('Dep avg is {}'.format(result))
def modin_more(files: list):
"""Executes the modin task on multiple data files.
:param files: Path to the data files.
"""
sums, counts = [], []
for file in files:
df = mpd.read_csv(file, dtype=_dtype, usecols=cols)
sums.append(df['DepDelay'].sum())
counts.append(df['DepDelay'].count())
print(u'Dep avg is {}'.format(sum(sums) / sum(counts)))