diff --git a/pypeflow/do_task.py b/pypeflow/do_task.py index c516a9e..567575f 100644 --- a/pypeflow/do_task.py +++ b/pypeflow/do_task.py @@ -22,7 +22,7 @@ { "inputs": {"input-name": "filename"}, "outputs": {"output-name": "output-filename (relative)"}, - "python_function": "falcon_kit.mains.foo", + "bash_template_fn": "template.sh", "parameters": {} } @@ -180,7 +180,6 @@ def run_bash(bash_template, myinputs, myoutputs, parameters): def run_cfg_in_tmpdir(cfg, tmpdir): """ Except 'inputs', 'outputs', 'parameters' in cfg. - If 'python_function' in cfg, then use it. (Deprecated.) If 'bash_template_fn' in cfg, then substitute and use it. """ for fn in cfg['inputs'].values(): @@ -188,14 +187,9 @@ def run_cfg_in_tmpdir(cfg, tmpdir): inputs = cfg['inputs'] outputs = cfg['outputs'] parameters = cfg['parameters'] - python_function_name = cfg.get('python_function') - bash_template_fn = cfg.get('bash_template_fn') - if bash_template_fn: - wait_for(bash_template_fn) - bash_template = open(bash_template_fn).read() - else: - bash_template = None - assert python_function_name or bash_template_fn + bash_template_fn = cfg['bash_template_fn'] + wait_for(bash_template_fn) + bash_template = open(bash_template_fn).read() myinputs = dict(inputs) myoutputs = dict(outputs) finaloutdir = os.getcwd() @@ -210,11 +204,8 @@ def run_cfg_in_tmpdir(cfg, tmpdir): else: myrundir = finaloutdir with util.cd(myrundir): - if python_function_name: - run_python(python_function_name, myinputs, myoutputs, parameters) - elif bash_template: - # TODO(CD): Write a script in wdir even when running in tmpdir. - run_bash(bash_template, myinputs, myoutputs, parameters) + # TODO(CD): Write a script in wdir even when running in tmpdir. + run_bash(bash_template, myinputs, myoutputs, parameters) if tmpdir: """ for k,v in outputs.iteritems(): diff --git a/pypeflow/sample_tasks.py b/pypeflow/sample_tasks.py index 923cfa8..dfc697a 100644 --- a/pypeflow/sample_tasks.py +++ b/pypeflow/sample_tasks.py @@ -9,7 +9,7 @@ LOG = logging.getLogger(__name__) -def create_task_new(i1, o1): +def create_task(i1, o1): script = """ cat {input.i1} > {output.o1} """ @@ -23,30 +23,3 @@ def create_task_new(i1, o1): }, parameters={}, ) - -def taskA(self): - i1 = fn(self.i1) - o1 = fn(self.o1) - script = """ -set -vex -cat {i1} > {o1} -""".format(**locals()) - script_fn = 'script.sh' - with open(script_fn, 'w') as ofs: - ofs.write(script) - self.generated_script_fn = script_fn - -def create_task_old(i1, o1): - i1 = makePypeLocalFile(i1) - o1 = makePypeLocalFile(o1) - parameters = {} - make_task = PypeTask( - inputs={ - 'i1': i1, - }, - outputs={ - 'o1': o1, - }, - parameters=parameters, - ) - return make_task(taskA) diff --git a/pypeflow/simple_pwatcher_bridge.py b/pypeflow/simple_pwatcher_bridge.py index d2d322a..70d36d6 100644 --- a/pypeflow/simple_pwatcher_bridge.py +++ b/pypeflow/simple_pwatcher_bridge.py @@ -305,6 +305,9 @@ def _refreshTargets(self, updateFreq, exitOnFailure): while ready or submitted: # Nodes cannot be in ready or submitted unless they are also in unsatg. to_submit = set() + if self.max_jobs <= 0: + msg = 'self.max_jobs={}'.format(self.max_jobs) + raise Exception(msg) while ready and (self.max_jobs > len(submitted) + len(to_submit)): node = ready.pop() to_submit.add(node) @@ -477,13 +480,7 @@ def generate_script(self): 'bash_template_fn' : 'template.sh', } else: - # TODO: Stop supporting python_function - task_desc = { - 'inputs': inputs, - 'outputs': outputs, - 'parameters': pt.parameters, - 'python_function': pt.__name__, - } + raise Exception('We no longer support python functions as PypeTasks.') task_content = json.dumps(task_desc, sort_keys=True, indent=4, separators=(',', ': ')) + '\n' task_json_fn = os.path.join(wdir, 'task.json') open(task_json_fn, 'w').write(task_content) diff --git a/test/test_integ.py b/test/test_integ.py index df0141a..a5ff9be 100644 --- a/test/test_integ.py +++ b/test/test_integ.py @@ -47,10 +47,6 @@ def try_workflow(text, create_task): assert os.path.exists(o1) assert text == open(o1).read() -def test_old(tmpdir): - with tmpdir.as_cwd(): - try_workflow('OLD', sample_tasks.create_task_old) - def test_new(tmpdir): with tmpdir.as_cwd(): - try_workflow('NEW', sample_tasks.create_task_new) + try_workflow('bash-based', sample_tasks.create_task)