Skip to content

Commit

Permalink
New logic for STAGE_INPUT valorization (#70)
Browse files Browse the repository at this point in the history
* Fix to automatic STAGE_INPUT reconstruction

* Fix to previous commit

* Fixes a further bug in the reconstruction of the previous stage

* Further changes for the check of STAGE_INPUT value

* Update cobrawap/pipeline/utils/Snakefile

Co-authored-by: Robin Gutzen <[email protected]>

* Update Snakefile

---------

Co-authored-by: Robin Gutzen <[email protected]>
  • Loading branch information
cosimolupo and rgutzen authored Jul 18, 2024
1 parent 419c44c commit 779ed33
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 47 deletions.
89 changes: 45 additions & 44 deletions cobrawap/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
sys.path.append(str(Path(inspect.getfile(lambda: None)).parent))
sys.path.append(str(Path(inspect.getfile(lambda: None)).parent / 'pipeline'))
from cmd_utils import get_setting, set_setting, get_initial_available_stages
from cmd_utils import is_profile_name_valid, create_new_configfile
from cmd_utils import input_profile, get_profile, setup_entry_stage
from cmd_utils import is_profile_name_valid, create_new_configfile
from cmd_utils import input_profile, get_profile, setup_entry_stage
from cmd_utils import working_directory, load_config_file, get_config
from cmd_utils import locate_str_in_list, read_stage_output
log = logging.getLogger()
Expand Down Expand Up @@ -48,7 +48,7 @@ def get_parser():

# Initialization
subparsers = CLI.add_subparsers(help='')
CLI_init = subparsers.add_parser('init',
CLI_init = subparsers.add_parser('init',
help='initialize the cobrawap directories (required only once)')
CLI_init.set_defaults(command='init')
CLI_init.add_argument("--output_path", type=Path, default=None,
Expand All @@ -59,13 +59,13 @@ def get_parser():
"stored [default: '~/cobrawap_config/']")

# Show Settings
CLI_settings = subparsers.add_parser('settings',
CLI_settings = subparsers.add_parser('settings',
help='display the content of ~/.cobrawap/config')
CLI_settings.set_defaults(command='settings')


# Configuration
CLI_create = subparsers.add_parser('create',
CLI_create = subparsers.add_parser('create',
help='create configuration for a new dataset')
CLI_create.set_defaults(command='create')
CLI_create.add_argument("--data_path", type=Path, nargs='?', default=None,
Expand All @@ -77,12 +77,12 @@ def get_parser():
CLI_create.add_argument("--profile", type=str, nargs='?', default=None,
help="profile name of this dataset/application "
"(see profile name conventions in documentation)")
CLI_create.add_argument("--parent_profile", type=str, nargs='?', default=None,
CLI_create.add_argument("--parent_profile", type=str, nargs='?', default=None,
help="optional parent profile name "
"(see profile name conventions in documentation)")

# Additional configurations
CLI_profile = subparsers.add_parser('add_profile',
CLI_profile = subparsers.add_parser('add_profile',
help='create a new configuration for an existing dataset')
CLI_profile.set_defaults(command='add_profile')
CLI_profile.add_argument("--profile", type=str, nargs='?', default=None,
Expand All @@ -97,15 +97,15 @@ def get_parser():
"[default: basic template]")

# Run
CLI_run = subparsers.add_parser('run',
CLI_run = subparsers.add_parser('run',
help='run the analysis pipeline on the selected '
'input and with the specified configurations')
CLI_run.set_defaults(command='run')
CLI_run.add_argument("--profile", type=str, nargs='?', default=None,
help="name of the config profile to be analyzed")

# Stage
CLI_stage = subparsers.add_parser('run_stage',
CLI_stage = subparsers.add_parser('run_stage',
help='execute an individual stage')
CLI_stage.set_defaults(command='run_stage')
CLI_stage.add_argument("--profile", type=str, nargs='?', default=None,
Expand All @@ -115,7 +115,7 @@ def get_parser():
help="select individual stage to execute")

# Block
CLI_block = subparsers.add_parser('run_block',
CLI_block = subparsers.add_parser('run_block',
help='execute an individual block method on some input')
CLI_block.set_defaults(command='run_block')
CLI_block.add_argument("block", type=str, nargs='?', default=None,
Expand All @@ -141,7 +141,7 @@ def main():
elif args.command == 'settings':
log.info("display settings at ~/.cobrawap/config")
print_settings(**vars(args))

elif args.command == 'create':
log.info("creating a set of config files")
create(**vars(args))
Expand Down Expand Up @@ -192,7 +192,7 @@ def initialize(output_path=None, config_path=None, **kwargs):
config_path.mkdir(parents=True, exist_ok=True)
if not config_path.is_dir():
raise ValueError(f"{config_path} is not a valid directory!")

set_setting(dict(config_path=str(config_path)))

# set pipeline path
Expand All @@ -218,12 +218,12 @@ def initialize(output_path=None, config_path=None, **kwargs):
shutil.copy(pipeline_path / stage / 'configs' \
/ 'config_template.yaml',
stage_config_path / 'config.yaml')

pipeline_config_path = config_path / 'configs'
pipeline_config_path.mkdir(parents=True, exist_ok=True)
shutil.copy(pipeline_path / 'configs' / 'config_template.yaml',
pipeline_config_path / 'config.yaml')

stage01_script_path = config_path / stages['1'] / 'scripts'
stage01_script_path.mkdir(parents=True, exist_ok=True)
shutil.copy(pipeline_path / stages['1'] / 'scripts' \
Expand All @@ -238,28 +238,28 @@ def print_settings(*args, **kwargs):
return None


def create(profile=None, parent_profile=None, data_path=None,
def create(profile=None, parent_profile=None, data_path=None,
loading_script_name=None, **kwargs):
profile, parent_profile = get_profile(profile=profile,
profile, parent_profile = get_profile(profile=profile,
parent_profile=parent_profile)
base_name = parent_profile if parent_profile else profile

for stage_number, stage in get_setting('stages').items():
config_name = profile if '1' in str(stage_number) else base_name
create_new_configfile(stage=stage,
create_new_configfile(stage=stage,
profile=config_name,
parent=parent_profile)
setup_entry_stage(profile=profile,

setup_entry_stage(profile=profile,
parent_profile=parent_profile,
data_path=data_path,
data_path=data_path,
loading_script_name=loading_script_name)
return None


def add_profile(profile=None, parent_profile=None, stages=None,
def add_profile(profile=None, parent_profile=None, stages=None,
data_path=None, loading_script_name=None, **kwargs):
profile, parent_profile = get_profile(profile=profile,
profile, parent_profile = get_profile(profile=profile,
parent_profile=parent_profile)
# get stage selection
stages = ''
Expand All @@ -275,26 +275,26 @@ def add_profile(profile=None, parent_profile=None, stages=None,
stages = ''

for stage_number in stages:
create_new_configfile(stage_number=stage_number,
create_new_configfile(stage_number=stage_number,
profile=profile,
parent=parent_profile)

if any('1' in stage for stage in stages):
setup_entry_stage(profile=profile, parent_profile=parent_profile,
data_path=data_path,
data_path=data_path,
loading_script_name=loading_script_name)
return None


def run(profile=None, extra_args=None, **kwargs):
# select profile
profile = input_profile(profile=profile)

# set runtime config
pipeline_path = Path(get_setting('pipeline_path'))

# execute snakemake
snakemake_args = ['snakemake','-c1','--config',f'PROFILE={profile}']
snakemake_args = ['snakemake', '-c1', '--config', f'PROFILE={profile}']
log.info(f'Executing `{" ".join(snakemake_args+extra_args)}`')

with working_directory(pipeline_path):
Expand Down Expand Up @@ -324,34 +324,35 @@ def run_stage(profile=None, stage=None, extra_args=None, **kwargs):
pipeline_config_path = config_path / 'configs' / 'config.yaml'
config_dict = load_config_file(pipeline_config_path)
stage_idx = locate_str_in_list(config_dict['STAGES'], stage)
# stage_idx_global = locate_str_in_list([v for k,v in stages.items()], stage)
if stage_idx is None:
raise IndexError("Make sure that the selected stage is also specified "\
"in your top-level config in the list `STAGES`!")

stage_config_path = get_config(config_dir=config_path / stage,
config_name=f'config_{profile}.yaml',
get_path_instead=True)

prev_stage = config_dict['STAGES'][stage_idx-1]
prev_stage_config_path = get_config(config_dir=config_path / prev_stage,
config_name=f'config_{profile}.yaml',
get_path_instead=True)
prev_config_name = Path(prev_stage_config_path).name
output_name = read_stage_output(stage=prev_stage,
config_dir=config_path,
config_name=prev_config_name)
stage_input = output_path / profile / prev_stage / output_name

if stage_idx > 0:
prev_stage = config_dict['STAGES'][stage_idx-1]
prev_stage_config_path = get_config(config_dir=config_path / prev_stage,
config_name=f'config_{profile}.yaml',
get_path_instead=True)
prev_config_name = Path(prev_stage_config_path).name
prev_output_name = read_stage_output(stage=prev_stage,
config_dir=config_path,
config_name=prev_config_name)
stage_input = output_path / profile / prev_stage / prev_output_name
extra_args = [f'STAGE_INPUT={stage_input}'] + extra_args

# descend into stage folder
pipeline_path = pipeline_path / stage

# append stage specific arguments
extra_args = [f'STAGE_INPUT={stage_input}'] \
+ extra_args \
+ ['--configfile', f'{stage_config_path}']
extra_args = extra_args + ['--configfile', f'{stage_config_path}']

# execute snakemake
snakemake_args = ['snakemake','-c1','--config',f'PROFILE={profile}']
snakemake_args = ['snakemake', '-c1', '--config', f'PROFILE={profile}']
log.info(f'Executing `{" ".join(snakemake_args+extra_args)}`')

with working_directory(pipeline_path):
Expand Down Expand Up @@ -387,7 +388,7 @@ def run_block(block=None, block_args=None, block_help=False, **kwargs):
# execute block
myenv = os.environ.copy()
myenv['PYTHONPATH'] = ':'.join(sys.path)
subprocess.run(['python', str(block_dir / f'{block}.py')]
subprocess.run(['python', str(block_dir / f'{block}.py')]
+ block_args,
env=myenv)
return None
Expand Down
8 changes: 5 additions & 3 deletions cobrawap/pipeline/utils/Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ from utils.snakefile import get_setting
CONFIG_PATH = Path(get_setting('config_path'))
OUTPUT_PATH = Path(get_setting('output_path'))

is_first_stage = config['STAGE_NAME'] == get_setting('stages')['1']
if is_first_stage:
config['STAGE_INPUT'] = None
if 'STAGE_INPUT' not in config:
logger.warning('No STAGE_INPUT defined for running the stage individually! '
'You can set it via the command line with '
logger.warning('No STAGE_INPUT defined for running stage \'{}\' individually! '.format(config['STAGE_NAME']) +
'You can set it via the command line with ' +
'`--config STAGE_INPUT=/path/to/file`.')
config['STAGE_INPUT'] = None

if 'USE_LINK_AS_STAGE_OUTPUT' not in config:
config['USE_LINK_AS_STAGE_OUTPUT'] = True
Expand Down

0 comments on commit 779ed33

Please sign in to comment.