-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathconfig.py
executable file
·175 lines (153 loc) · 5.89 KB
/
config.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
from parsl.config import Config
from parsl.channels import SSHChannel
from parsl.providers import LocalProvider, SlurmProvider, PBSProProvider
from parsl.executors import HighThroughputExecutor
from parsl.addresses import address_by_hostname
from parsl.launchers import SimpleLauncher
import os
import json
import argparse
import shutil
# parsl_utils is cloned at runtime
import parsl_utils
from parsl_utils.data_provider.rsync import PWRSyncStaging
from parsl_utils.data_provider.gsutil import PWGsutil
from parsl_utils.data_provider.s3 import PWS3
def read_args():
parser = argparse.ArgumentParser()
parsed, unknown = parser.parse_known_args()
for arg in unknown:
if arg.startswith(("-", "--")):
parser.add_argument(arg, default="", nargs="?")
pwargs = vars(parser.parse_args())
return pwargs
pwargs = read_args()
# Need to name the job to be able to remove it with clean_resources.sh!
job_number = os.getcwd().split('/')[-1]
with open('executors.json', 'r') as f:
exec_conf = json.load(f)
for label, executor in exec_conf.items():
for k, v in executor.items():
if type(v) == str:
exec_conf[label][k] = os.path.expanduser(v)
# Define HighThroughputExecutors
executors = []
for exec_label, exec_conf_i in exec_conf.items():
# Set default values:
if 'SSH_CHANNEL_SCRIPT_DIR' not in exec_conf_i:
script_dir = os.path.join(exec_conf_i['RUN_DIR'], 'ssh_channel_script_dir')
else:
script_dir = exec_conf_i['SSH_CHANNEL_SCRIPT_DIR']
if 'WORKER_LOGDIR_ROOT' not in exec_conf_i:
worker_logdir_root = os.path.join(exec_conf_i['RUN_DIR'], 'worker_logdir_root')
else:
worker_logdir_root = exec_conf_i['WORKER_LOGDIR_ROOT']
# To support kerberos:
import shutil
ssh_path = shutil.which('ssh')
if 'kerberos' in ssh_path.split('/'):
gssapi_auth = True
else:
gssapi_auth = False
channel = SSHChannel(
hostname = exec_conf_i['HOST_IP'],
username = exec_conf_i['HOST_USER'],
# Full path to a script dir where generated scripts could be sent to
script_dir = script_dir,
key_filename = '/home/{PW_USER}/.ssh/pw_id_rsa'.format(
PW_USER = os.environ['PW_USER']
),
gssapi_auth = gssapi_auth
)
# Define worker init:
# - export PYTHONPATH={run_dir} is needed to use custom staging providers
worker_init = 'export PYTHONPATH={run_dir}; bash {workdir}/pw/remote.sh; bash {workdir}/pw/.pw/remote.sh; source {conda_sh}; conda activate {conda_env}; cd {run_dir}; {clean_cmd}'.format(
workdir = exec_conf_i['WORKDIR'],
conda_sh = os.path.join(exec_conf_i['CONDA_DIR'], 'etc/profile.d/conda.sh'),
conda_env = exec_conf_i['CONDA_ENV'],
run_dir = exec_conf_i['RUN_DIR'],
clean_cmd = "ps -x | grep worker.pl | grep -v grep | awk '{print $1}'"
)
# Data provider:
# One instance per executor
storage_access = [
PWRSyncStaging(exec_label),
PWGsutil(exec_label),
PWS3(exec_label)
]
# Define provider
if 'PBSProProvider' in exec_conf_i:
provider = PBSProProvider(
**json.loads(exec_conf_i['PBSProProvider']),
worker_init = worker_init,
channel = channel
)
elif 'SlurmProvider' in exec_conf_i:
provider = SlurmProvider(
**json.loads(exec_conf_i['SlurmProvider']),
worker_init = worker_init,
channel = channel,
launcher = SimpleLauncher(),
parallelism = float(json.loads(exec_conf_i['SlurmProvider'])['nodes_per_block'])
)
else:
# Need to overwrite the default worker_init since we don't want to run remote.sh in this case
worker_init = 'export PYTHONPATH={run_dir}; source {conda_sh}; conda activate {conda_env}; cd {run_dir}'.format(
conda_sh = os.path.join(exec_conf_i['CONDA_DIR'], 'etc/profile.d/conda.sh'),
conda_env = exec_conf_i['CONDA_ENV'],
run_dir = exec_conf_i['RUN_DIR']
)
provider = LocalProvider(
worker_init = worker_init,
channel = channel
)
if 'cores_per_worker' in exec_conf_i:
cores_per_worker = float(exec_conf_i['cores_per_worker'])
elif 'CORES_PER_WORKER' in exec_conf_i:
cores_per_worker = float(exec_conf_i['CORES_PER_WORKER'])
else:
cores_per_worker = 1.0
executors.append(
HighThroughputExecutor(
worker_ports=((
int(exec_conf_i['WORKER_PORT_1']),
int(exec_conf_i['WORKER_PORT_2'])
)),
label = exec_label,
cores_per_worker = cores_per_worker, # NEEDS TO BE THE SAME AS CORES_PER_NODE FOR SLURMPROVIDER
worker_debug = True, # Default False for shorter logs
working_dir = exec_conf_i['RUN_DIR'],
worker_logdir_root = worker_logdir_root,
address = exec_conf_i['ADDRESS'],
provider = provider,
storage_access = storage_access
)
)
if 'parsl_retries' in pwargs:
retries = int(pwargs['parsl_retries'])
from . import retry_handler
retry_handler = retry_handler.retry_handler
else:
retries = 0
retry_handler = None
if len(executors) > 1:
config = Config(
retries = retries,
retry_handler = retry_handler,
executors = executors
)
else:
from parsl.monitoring.monitoring import MonitoringHub
executor_address = list(exec_conf.values())[0]['ADDRESS']
config = Config(
retries = retries,
retry_handler = retry_handler,
executors = executors,
monitoring = MonitoringHub(
hub_address = executor_address,
monitoring_debug = False,
workflow_name = str(job_number),
logging_endpoint = 'sqlite:////pw/.monitoring.db',
resource_monitoring_enabled = False,
)
)