Skip to content

Commit

Permalink
CLI improvement and more documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
udincer committed Feb 14, 2020
1 parent ab8c9c2 commit 4699a57
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 79 deletions.
46 changes: 19 additions & 27 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,40 +1,32 @@
# psub
Submit array jobs without all the suffering
Submit array jobs without all the suffering.

Stands for petko-submit, OG ernstlab member who came up with the idea.
psub provides an intutitive way to submit array jobs on a SGE cluster (particularly UCLA's Hoffman2). Instead of trying to write scripts that generate scripts that in turn gets submitted to the scheduler or dealing with environmental variables, you can do this with psub and forget about the rest:

## New API (alpha):

There's a new (unstable) API that allows submitting jobs directly in Python.

## Installation:

Must be installed under ~/utils like this:
```
mkdir ~/utils; cd ~/utils
git clone [email protected]:udincer/psub.git
psub --mem 4G --time 12:00:00 "./my_script.py -p {} {} ::: p1 p2 p3 ::: arg1 arg2"
```

Add alias:
This will submit a job array with 6 jobs for each combination of `arg` and `p` and request 4 GB of memory and 12 hours from the scheduler:
```
# Optional: add psub as an alias
# Add the following to .bashrc
alias psub='/u/home/d/<your_username>/utils/psub/psub_main.sh '
./my_script.py -p p1 arg1
./my_script.py -p p1 arg2
./my_script.py -p p2 arg1
./my_script.py -p p2 arg2
./my_script.py -p p3 arg1
./my_script.py -p p3 arg2
```

## How to use:
psub keeps all stdouts and stderrs in a nice tidy directory for each job array.

See `psub --help` for all features.

psub is still in _alpha_, there will be bugs.

## Installation:

Make script containing each command you want to run in parallel in exactly one line:
```
# the_jobs.sh
python run_analysis.py theta1
python run_analysis.py theta2
python run_analysis.py theta3
python run_analysis.py theta4
python run_analysis.py theta5
pip install psub
```

Run psub like this, will batch 3 commands to one SGE job (for short jobs):
```
psub the_jobs.py 3
```
psub stands for petko-submit, OG ernstlab member who came up with the core idea.
114 changes: 98 additions & 16 deletions psub/psub.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

"""Submit array jobs to a SGE cluster without all the suffering."""

import sys
import os
import subprocess
import argparse
import re
import itertools
import textwrap
from datetime import datetime


Expand All @@ -15,9 +17,7 @@ class Psub:
PSUB_MAIN = """
TASKS_FILE=$1
N_TASKS=$(cat $TASKS_FILE | wc -l)
NUM_IN_BATCH=${{2:-1}}
echo Submitting $N_TASKS tasks to the queue from $TASKS_FILE, with $NUM_IN_BATCH lines in each batch
qsub <<CMD
#!/bin/bash
Expand All @@ -39,7 +39,6 @@ class Psub:
RUN_TASK = """
TASKS_FILE=$1
NUM_IN_BATCH=$2
for ((i=0; i<${NUM_IN_BATCH}; i++)); do
LINE_NUM=$((SGE_TASK_ID+i))
CMD=$(awk "NR==$LINE_NUM" $TASKS_FILE)
Expand Down Expand Up @@ -128,7 +127,7 @@ def add(self, cmd):
else:
raise TypeError("Commands should be str or list of str")

def submit(self, dryrun=False):
def submit(self, dryrun=False, skip_confirm=False):
os.makedirs(self.logdir, exist_ok=True)

with open(self.cmd_fn, "w") as f:
Expand All @@ -155,9 +154,18 @@ def submit(self, dryrun=False):

subprocess_cmd = f". {self.tmpdir}/psub_main.sh {self.cmd_fn}"

print(str(self))

if dryrun:
print(str(self))
return

if not skip_confirm:
print("\nSubmit to cluster? [Y/n]")
response = input()
else:
response = "y"

if response in {"", "y", "Y"}:
comp_process = subprocess.run(
subprocess_cmd,
shell=True,
Expand Down Expand Up @@ -210,20 +218,91 @@ def parse_command(command):

if __name__ == "__main__":

parser = argparse.ArgumentParser()

parser.add_argument("--dry-run", action="store_true")

parser.add_argument("--jobname", "--name")
parser.add_argument("--tmpdir", "--tmp")
ARGPARSE_HELP_STRING = """Submit jobs to the SGE cluster without all the suffering.
Usage example:
psub \\
--jobname my_job \\
--mem 4G \\
--time 12:00:00 \\
"./my_script.py {} --parameter {} ::: arg1 arg2 ::: p1 p2 p3"
parser.add_argument("--l_arch", "--arch")
parser.add_argument("--l_mem", "--mem")
parser.add_argument("--l_time", "--time")
parser.add_argument("--l_highp", action="store_true", default=True)
This will run my_script.py as a job array with 6 jobs for each combination
of arg and p.
parser.add_argument("command", nargs="+")
Use ::: to expand a list of parameters and :::: to read from each line in a file.
Inspired by GNU Parallel's interface (which does it better).
"""

parser = argparse.ArgumentParser(
prog="psub",
description=ARGPARSE_HELP_STRING,
formatter_class=argparse.RawDescriptionHelpFormatter,
)

parser.add_argument(
"--dry-run",
action="store_true",
help="Show what jobs will be submitted without actually submitting them.",
)

parser.add_argument(
"-n", "--jobname", help="Name of job that will appear in the queue"
)

# alternative to :::: syntax
parser.add_argument(
"-a",
"--file",
action="store_true",
help=(
'Submit jobs for file where each line is a command. '
'Equivalent to "psub :::: file".'
),
)

parser.add_argument(
"--l_arch", "--arch", help="Only request a target CPU architecture, e.g. intel*"
)
parser.add_argument("--l_mem", "--mem", help="Memory per core requested, e.g. 4G")
parser.add_argument(
"--l_time", "--time", help="Time requested, e.g. 12:00:00 for 12 hours."
)
parser.add_argument(
"--l_highp",
action="store_true",
default=True,
help=(
"Submit to highp queue. "
"This will only use nodes that belong to your user group and "
"allows for job durations up to 14 days."
),
)

parser.add_argument(
"--tmpdir",
"--tmp",
help=(
"Temporary directory for intermediate scripts generated by psub. "
"This should be a directory that can be accessed by all nodes. "
"This means /tmp and /dev/shm won't work."
),
)

parser.add_argument(
"-y",
"--yes",
action="store_true",
default=False,
help="Do not ask for confirmation before submitting jobs.",
)

parser.add_argument("command", nargs="+", help="Command template string")

if len(sys.argv) < 2:
parser.print_help(sys.stderr)
raise SystemExit()

args = parser.parse_args()

command = " ".join(args.command)
Expand All @@ -242,6 +321,9 @@ def parse_command(command):
if args.l_highp is not None:
p.set_resources(l_highp=args.l_highp)

if args.file:
command = f":::: {command}"

commands = parse_command(command)

p.add(commands)
Expand Down
27 changes: 0 additions & 27 deletions psub_main.sh

This file was deleted.

8 changes: 0 additions & 8 deletions run_task.sh

This file was deleted.

2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setuptools.setup(
name="psub",
version="0.0.4_alpha",
version="0.0.5_alpha",
author="Tev Dincer",
author_email="[email protected]",
description="Submit array jobs to a SGE cluster without all the suffering",
Expand Down

0 comments on commit 4699a57

Please sign in to comment.