Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
holtgrewe committed Jan 15, 2024
1 parent a0526c4 commit 70bfab9
Show file tree
Hide file tree
Showing 7 changed files with 208 additions and 132 deletions.
128 changes: 79 additions & 49 deletions altamisa/apps/isatab2dot.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,26 @@
"""Conversion of ISA-Tab to dot.
"""

import argparse
from contextlib import ExitStack
import json
import os
import sys

import attrs
import typer
from typing_extensions import Annotated

from altamisa.isatab import AssayReader, InvestigationReader, StudyReader

#: Typer application instance.
app = typer.Typer()


@attrs.define
class Arguments:
investigation_file: str
output_file: str


def print_dot(
obj,
Expand Down Expand Up @@ -49,61 +62,78 @@ def print_dot(
print("{}{} -> {};".format(indent, json.dumps(arc.tail), json.dumps(arc.head)), file=outf)


def run(args):
def run(args: Arguments):
with open(args.investigation_file, "rt") as inputf:
investigation = InvestigationReader.from_stream(inputf).read()

path = os.path.dirname(args.investigation_file)

print("digraph investigation {", file=args.output_file)
print(' rankdir = "LR";', file=args.output_file)

for s, study_info in enumerate(investigation.studies):
if not study_info.info.path:
print(" /* no file for study {} */".format(s + 1), file=args.output_file)
continue
with open(os.path.join(path, study_info.info.path), "rt") as inputf:
study = StudyReader.from_stream("S{}".format(s + 1), inputf).read()
print(" /* study {} */".format(study_info.info.path), file=args.output_file)
print(" subgraph clusterStudy{} {{".format(s), file=args.output_file)
print(' label = "Study: {}"'.format(study_info.info.path), file=args.output_file)
print_dot(study, args.output_file)
print(" }", file=args.output_file)

for a, assay_info in enumerate(study_info.assays):
if not assay_info.path:
print(" /* no file for assay {} */".format(a + 1), file=args.output_file)
continue
with open(os.path.join(path, assay_info.path), "rt") as inputf:
assay = AssayReader.from_stream(
"S{}".format(s + 1), "A{}".format(a + 1), inputf
).read()
print(" /* assay {} */".format(assay_info.path), file=args.output_file)
print(" subgraph clusterAssayS{}A{} {{".format(s, a), file=args.output_file)
print(' label = "Assay: {}"'.format(assay_info.path), file=args.output_file)
print_dot(assay, args.output_file)
print(" }", file=args.output_file)

print("}", file=args.output_file)
with ExitStack() as stack:
if args.output_file == "-":
output_file = sys.stdout
else:
output_file = stack.enter_context(open(args.output_file, "wt"))

print("digraph investigation {", file=output_file)
print(' rankdir = "LR";', file=output_file)

def main(argv=None):
parser = argparse.ArgumentParser()

parser.add_argument(
"-i", "--investigation-file", required=True, help="Path to investigation file"
)
parser.add_argument(
"-o",
"--output-file",
default="-",
type=argparse.FileType("wt"),
help='Path to output file, stdout ("-") by default',
for s, study_info in enumerate(investigation.studies):
if not study_info.info.path:
print(" /* no file for study {} */".format(s + 1), file=output_file)
continue
with open(os.path.join(path, study_info.info.path), "rt") as inputf:
study = StudyReader.from_stream("S{}".format(s + 1), inputf).read()
print(" /* study {} */".format(study_info.info.path), file=output_file)
print(" subgraph clusterStudy{} {{".format(s), file=output_file)
print(' label = "Study: {}"'.format(study_info.info.path), file=output_file)
print_dot(study, output_file)
print(" }", file=output_file)

for a, assay_info in enumerate(study_info.assays):
if not assay_info.path:
print(" /* no file for assay {} */".format(a + 1), file=output_file)
continue
with open(os.path.join(path, assay_info.path), "rt") as inputf:
assay = AssayReader.from_stream(
"S{}".format(s + 1), "A{}".format(a + 1), inputf
).read()
print(" /* assay {} */".format(assay_info.path), file=output_file)
print(" subgraph clusterAssayS{}A{} {{".format(s, a), file=output_file)
print(' label = "Assay: {}"'.format(assay_info.path), file=output_file)
print_dot(assay, output_file)
print(" }", file=output_file)

print("}", file=output_file)


@app.command()
def main(
investigation_file: Annotated[
str,
typer.Option(
"--investigation-file",
"-i",
help="Path to input investigation file",
),
],
output_file: Annotated[
str,
typer.Option(
"--output-file",
"-o",
help="Path to output file, stdout ('-') by default",
),
] = "-",
):
"""Main entry point."""
# Convert to `Arguments` object.
args = Arguments(
investigation_file=investigation_file,
output_file=output_file,
)

args = parser.parse_args(argv)
return run(args)
# Actually run.
run(args)


if __name__ == "__main__":
sys.exit(main()) # pragma: no cover
if __name__ == "__main__": # pragma: no cover
typer.run(main)
169 changes: 105 additions & 64 deletions altamisa/apps/isatab2isatab.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,17 @@
"""Read from ISA-Tab and directly write to ISA-Tab.
"""

import argparse
from contextlib import ExitStack
import os
import sys
import typing
from typing import Dict, Optional, Tuple
import warnings

import attrs
import typer
from typing_extensions import Annotated

from altamisa.exceptions import IsaException
from altamisa.isatab import (
AssayReader,
Expand All @@ -19,15 +25,27 @@
StudyValidator,
StudyWriter,
)
from altamisa.isatab.models import Assay, InvestigationInfo, Study

#: Typer application instance.
app = typer.Typer()


@attrs.define
class Arguments:
input_investigation_file: str
output_investigation_file: str
quotes: Optional[str]
warnings: bool

def run(args):

def run(args: Arguments):
# Collect warnings
with warnings.catch_warnings(record=True) as records:
run_warnings_caught(args)

# Print warnings
if not args.no_warnings:
if args.warnings:
for record in records:
warnings.showwarning(
record.message,
Expand All @@ -38,7 +56,7 @@ def run(args):
)


def run_warnings_caught(args):
def run_warnings_caught(args: Arguments):
# Check if input and output directory are different
path_in = os.path.realpath(os.path.dirname(args.input_investigation_file))
path_out = os.path.realpath(os.path.dirname(args.output_investigation_file))
Expand All @@ -47,29 +65,36 @@ def run_warnings_caught(args):
msg = tpl.format(path_in, path_out)
raise IsaException(msg)

if args.input_investigation_file == "-": # pragma: no cover
args.input_investigation_file = sys.stdin
else:
args.input_investigation_file = open(args.input_investigation_file, "rt")
if args.output_investigation_file == "-": # pragma: no cover
args.output_investigation_file = sys.stdout
else:
args.output_investigation_file = open(args.output_investigation_file, "wt")

investigation, studies, assays = run_reading(args, path_in)
run_writing(args, path_out, investigation, studies, assays)


def run_reading(args, path_in):
with ExitStack() as stack:
if args.output_investigation_file == "-": # pragma: no cover
output_investigation_file = sys.stdout
else:
output_investigation_file = stack.push(open(args.output_investigation_file, "wt"))

investigation, studies, assays = run_reading(args, path_in)
run_writing(
args,
path_out,
output_investigation_file,
investigation,
studies,
assays,
)


def run_reading(
args, path_in
) -> Tuple[InvestigationInfo, Dict[int, Study], Dict[int, Dict[int, Assay]]]:
# Read investigation
investigation = InvestigationReader.from_stream(args.input_investigation_file).read()
with open(args.input_investigation_file, "rt") as inputf:
investigation = InvestigationReader.from_stream(inputf).read()

# Validate investigation
InvestigationValidator(investigation).validate()

# Read studies and assays
studies = {}
assays = {}
studies: Dict[int, Study] = {}
assays: Dict[int, Dict[int, Assay]] = {}
for s, study_info in enumerate(investigation.studies):
if study_info.info.path:
with open(os.path.join(path_in, study_info.info.path), "rt") as inputf:
Expand All @@ -94,27 +119,34 @@ def run_reading(args, path_in):
return investigation, studies, assays


def run_writing(args, path_out, investigation, studies, assays):
def run_writing(
args,
path_out,
output_investigation_file: typing.TextIO,
investigation: InvestigationInfo,
studies: Dict[int, Study],
assays: Dict[int, Dict[int, Assay]],
):
# Write investigation
if args.output_investigation_file.name == "<stdout>":
if output_investigation_file.name == "<stdout>":
InvestigationWriter.from_stream(
investigation, args.output_investigation_file, quote=args.quotes
investigation, output_investigation_file, quote=args.quotes
).write()
else:
with open(args.output_investigation_file.name, "wt", newline="") as outputf:
with open(output_investigation_file.name, "wt", newline="") as outputf:
InvestigationWriter.from_stream(investigation, outputf, quote=args.quotes).write()

# Write studies and assays
for s, study_info in enumerate(investigation.studies):
if args.output_investigation_file.name == "<stdout>":
if output_investigation_file.name == "<stdout>":
if study_info.info.path:
StudyWriter.from_stream(
studies[s], args.output_investigation_file, quote=args.quotes
studies[s], output_investigation_file, quote=args.quotes
).write()
for a, assay_info in enumerate(study_info.assays):
if assay_info.path:
AssayWriter.from_stream(
assays[s][a], args.output_investigation_file, quote=args.quotes
assays[s][a], output_investigation_file, quote=args.quotes
).write()
else:
if study_info.info.path:
Expand All @@ -128,44 +160,53 @@ def run_writing(args, path_out, investigation, studies, assays):
AssayWriter.from_stream(assays[s][a], outputf, quote=args.quotes).write()


def main(argv=None):
parser = argparse.ArgumentParser()

parser.add_argument(
"-i",
"--input-investigation-file",
required=True,
type=str,
help="Path to input investigation file",
)
parser.add_argument(
"-o",
"--output-investigation-file",
default="-",
type=str,
help=(
'Path to output investigation file, stdout ("-") by default. '
"Needs to be in a different directory!"
@app.command()
def main(
input_investigation_file: Annotated[
str,
typer.Option(
"--input-investigation-file",
"-i",
help="Path to input investigation file",
),
],
output_investigation_file: Annotated[
str,
typer.Option(
"--output-investigation-file",
"-o",
help=(
'Path to output investigation file, stdout ("-") by default. '
"Needs to be in a different directory!"
),
),
],
quotes: Annotated[
Optional[str],
typer.Option(
"--quotes",
"-q",
help='Character for quoting, e.g. "\\"" (None by default)',
),
] = None,
warnings: Annotated[
bool,
typer.Option(
"--warnings/--no-warnings",
help="Show ISA-tab related warnings (default is to show)",
),
] = True,
):
# Convert to `Arguments` object.
args = Arguments(
input_investigation_file=input_investigation_file,
output_investigation_file=output_investigation_file,
quotes=quotes,
warnings=warnings,
)
parser.add_argument(
"-q",
"--quotes",
default=None,
type=str,
help='Character for quoting, e.g. "\\"" (None by default)',
)
parser.add_argument(
"--no-warnings",
dest="no_warnings",
action="store_true",
help="Suppress ISA-tab related warnings (False by default)",
)
parser.set_defaults(no_warnings=False)

args = parser.parse_args(argv)
# Start application
return run(args)


if __name__ == "__main__":
sys.exit(main()) # pragma: no cover
if __name__ == "__main__": # pragma: no cover
typer.run(main)
Loading

0 comments on commit 70bfab9

Please sign in to comment.