Skip to content
This repository has been archived by the owner on Mar 16, 2022. It is now read-only.

Commit

Permalink
Merge pull request #42 in SAT/pypeflow from feature/tag-2790-clean to…
Browse files Browse the repository at this point in the history
… develop

* commit 'e8df138248a4bc76f79f1edfed169498a162257c':
  Stop supporting python_function as PypeTask
  max_jobs <=0 indicates an error, for now
  • Loading branch information
Christopher Dunn committed Aug 2, 2018
2 parents ecbed43 + e8df138 commit 005acb1
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 55 deletions.
21 changes: 6 additions & 15 deletions pypeflow/do_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
{
"inputs": {"input-name": "filename"},
"outputs": {"output-name": "output-filename (relative)"},
"python_function": "falcon_kit.mains.foo",
"bash_template_fn": "template.sh",
"parameters": {}
}
Expand Down Expand Up @@ -180,22 +180,16 @@ def run_bash(bash_template, myinputs, myoutputs, parameters):
def run_cfg_in_tmpdir(cfg, tmpdir):
"""
Except 'inputs', 'outputs', 'parameters' in cfg.
If 'python_function' in cfg, then use it. (Deprecated.)
If 'bash_template_fn' in cfg, then substitute and use it.
"""
for fn in cfg['inputs'].values():
wait_for(fn)
inputs = cfg['inputs']
outputs = cfg['outputs']
parameters = cfg['parameters']
python_function_name = cfg.get('python_function')
bash_template_fn = cfg.get('bash_template_fn')
if bash_template_fn:
wait_for(bash_template_fn)
bash_template = open(bash_template_fn).read()
else:
bash_template = None
assert python_function_name or bash_template_fn
bash_template_fn = cfg['bash_template_fn']
wait_for(bash_template_fn)
bash_template = open(bash_template_fn).read()
myinputs = dict(inputs)
myoutputs = dict(outputs)
finaloutdir = os.getcwd()
Expand All @@ -210,11 +204,8 @@ def run_cfg_in_tmpdir(cfg, tmpdir):
else:
myrundir = finaloutdir
with util.cd(myrundir):
if python_function_name:
run_python(python_function_name, myinputs, myoutputs, parameters)
elif bash_template:
# TODO(CD): Write a script in wdir even when running in tmpdir.
run_bash(bash_template, myinputs, myoutputs, parameters)
# TODO(CD): Write a script in wdir even when running in tmpdir.
run_bash(bash_template, myinputs, myoutputs, parameters)
if tmpdir:
"""
for k,v in outputs.iteritems():
Expand Down
29 changes: 1 addition & 28 deletions pypeflow/sample_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
LOG = logging.getLogger(__name__)


def create_task_new(i1, o1):
def create_task(i1, o1):
script = """
cat {input.i1} > {output.o1}
"""
Expand All @@ -23,30 +23,3 @@ def create_task_new(i1, o1):
},
parameters={},
)

def taskA(self):
i1 = fn(self.i1)
o1 = fn(self.o1)
script = """
set -vex
cat {i1} > {o1}
""".format(**locals())
script_fn = 'script.sh'
with open(script_fn, 'w') as ofs:
ofs.write(script)
self.generated_script_fn = script_fn

def create_task_old(i1, o1):
i1 = makePypeLocalFile(i1)
o1 = makePypeLocalFile(o1)
parameters = {}
make_task = PypeTask(
inputs={
'i1': i1,
},
outputs={
'o1': o1,
},
parameters=parameters,
)
return make_task(taskA)
11 changes: 4 additions & 7 deletions pypeflow/simple_pwatcher_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,9 @@ def _refreshTargets(self, updateFreq, exitOnFailure):
while ready or submitted:
# Nodes cannot be in ready or submitted unless they are also in unsatg.
to_submit = set()
if self.max_jobs <= 0:
msg = 'self.max_jobs={}'.format(self.max_jobs)
raise Exception(msg)
while ready and (self.max_jobs > len(submitted) + len(to_submit)):
node = ready.pop()
to_submit.add(node)
Expand Down Expand Up @@ -477,13 +480,7 @@ def generate_script(self):
'bash_template_fn' : 'template.sh',
}
else:
# TODO: Stop supporting python_function
task_desc = {
'inputs': inputs,
'outputs': outputs,
'parameters': pt.parameters,
'python_function': pt.__name__,
}
raise Exception('We no longer support python functions as PypeTasks.')
task_content = json.dumps(task_desc, sort_keys=True, indent=4, separators=(',', ': ')) + '\n'
task_json_fn = os.path.join(wdir, 'task.json')
open(task_json_fn, 'w').write(task_content)
Expand Down
6 changes: 1 addition & 5 deletions test/test_integ.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,6 @@ def try_workflow(text, create_task):
assert os.path.exists(o1)
assert text == open(o1).read()

def test_old(tmpdir):
with tmpdir.as_cwd():
try_workflow('OLD', sample_tasks.create_task_old)

def test_new(tmpdir):
with tmpdir.as_cwd():
try_workflow('NEW', sample_tasks.create_task_new)
try_workflow('bash-based', sample_tasks.create_task)

0 comments on commit 005acb1

Please sign in to comment.