Skip to content

Commit

Permalink
Merge branch 'release'
Browse files Browse the repository at this point in the history
  • Loading branch information
jonls committed Jan 4, 2016
2 parents 846d04f + 80237bf commit 84dccc1
Show file tree
Hide file tree
Showing 24 changed files with 769 additions and 410 deletions.
9 changes: 9 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@

v0.18 (2016-01-04)
------------------

- Several commands now support parallelization with the `--parallel` option
(`fva`, `fluxcheck`, `fluxcoupling`, `robustness`).
- A more robust reaction parser is now used to parse reaction equations in
YAML files. This also means that quoting compound names with pipes (`|`) is
now optional.

v0.17 (2015-12-07)
------------------

Expand Down
6 changes: 6 additions & 0 deletions docs/api/datasource_reaction.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@

``psamm.datasource.reaction`` -- Parser for reactions
===================================================================

.. automodule:: psamm.datasource.reaction
:members:
163 changes: 162 additions & 1 deletion psamm/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,17 @@
The :func:`.main` function is the entry point of command line interface.
"""

from __future__ import division

import os
import argparse
import logging
import abc
from itertools import islice
import multiprocessing as mp

import pkg_resources
from six import add_metaclass, iteritems
from six import add_metaclass, iteritems, itervalues

from . import __version__ as package_version
from .datasource.native import NativeModel
Expand Down Expand Up @@ -159,6 +163,39 @@ def _get_solver(self, **kwargs):
return generic.Solver(**solver_args)


class ParallelTaskMixin(object):
"""Mixin for commands that run parallel computation tasks."""

@classmethod
def init_parser(cls, parser):
parser.add_argument(
'--parallel', help='Set number of parallel processes (0=auto)',
type=int, default=0)
super(ParallelTaskMixin, cls).init_parser(parser)

def _create_executor(self, handler, args, cpus_per_worker=1):
"""Return a new :class:`.Executor` instance."""
if self._args.parallel > 0:
workers = self._args.parallel
else:
try:
workers = mp.cpu_count() // cpus_per_worker
except NotImplementedError:
workers = 1

if workers != 1:
logger.info('Using {} parallel worker processes...'.format(
workers))
executor = ProcessPoolExecutor(
processes=workers, handler_init=handler, handler_args=args)
else:
logger.info('Using single worker...')
executor = SequentialExecutor(
handler_init=handler, handler_args=args)

return executor


class FilePrefixAppendAction(argparse.Action):
"""Action that appends one argument or multiple from a file.
Expand Down Expand Up @@ -194,6 +231,130 @@ def __call__(self, parser, namespace, values, option_string=None):
arguments.append(values)


class _ErrorMarker(object):
"""Signals error in the child process."""


class ExecutorError(Exception):
"""Error running tasks on executor."""


class _ExecutorProcess(mp.Process):
def __init__(self, task_queue, result_queue, handler_init,
handler_args=()):
super(_ExecutorProcess, self).__init__()
self._task_queue = task_queue
self._result_queue = result_queue
self._handler_init = handler_init
self._handler_args = handler_args

def run(self):
try:
handler = self._handler_init(*self._handler_args)
for tasks in iter(self._task_queue.get, None):
results = {task: handler.handle_task(*task) for task in tasks}
self._result_queue.put(results)
except BaseException:
self._result_queue.put(_ErrorMarker())
raise


class Executor(object):
def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
return False

def close(self):
pass


class ProcessPoolExecutor(Executor):
def __init__(self, handler_init, handler_args=(), processes=None):
if processes is None:
try:
processes = mp.cpu_count()
except NotImplementedError:
processes = 1

self._process_count = processes
self._processes = []
self._task_queue = mp.Queue()
self._result_queue = mp.Queue()

for _ in range(self._process_count):
p = _ExecutorProcess(
self._task_queue, self._result_queue, handler_init,
handler_args)
p.start()
self._processes.append(p)

def apply(self, task):
self._task_queue.put([task])
results = self._result_queue.get()
if isinstance(results, _ErrorMarker):
raise ExecutorError('Child process failed')

return next(itervalues(results))

def imap_unordered(self, iterable, chunksize=1):
def iter_chunks():
while True:
chunk = list(islice(iterable, chunksize))
if len(chunk) == 0:
break
yield chunk

it = iter_chunks()
workers = 0
for i in range(self._process_count):
tasks = next(it, None)
if tasks is None:
break

self._task_queue.put(tasks)
workers += 1

while workers > 0:
results = self._result_queue.get()
if isinstance(results, _ErrorMarker):
raise ExecutorError('Child process failed')

tasks = next(it, None)
if tasks is None:
workers -= 1

self._task_queue.put(tasks)

for task, result in iteritems(results):
yield task, result

def close(self):
for i in range(self._process_count):
self._task_queue.put(None)

def join(self):
for p in self._processes:
p.join()


class SequentialExecutor(Executor):
def __init__(self, handler_init, handler_args=()):
self._handler = handler_init(*handler_args)

def apply(self, task):
return self._handler.handle_task(*task)

def imap_unordered(self, iterable, chunksize):
for task in iterable:
yield task, self._handler.handle_task(*task)

def join(self):
pass


def main(command_class=None, args=None):
"""Run the command line interface with the given :class:`Command`.
Expand Down
57 changes: 49 additions & 8 deletions psamm/commands/fluxcheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@

from __future__ import unicode_literals

import time
import logging
from itertools import product

from ..command import Command, MetabolicMixin, SolverCommandMixin, CommandError
from ..command import (Command, MetabolicMixin, SolverCommandMixin,
ParallelTaskMixin, CommandError)
from .. import fluxanalysis, fastcore

logger = logging.getLogger(__name__)


class FluxConsistencyCommand(MetabolicMixin, SolverCommandMixin, Command):
class FluxConsistencyCommand(MetabolicMixin, SolverCommandMixin,
ParallelTaskMixin, Command):
"""Check that reactions are flux consistent in a model.
A reaction is flux consistent if there exists any steady-state flux
Expand Down Expand Up @@ -82,6 +86,8 @@ def run(self):
'Using Fastcore with thermodynamic constraints'
' is not supported!')

start_time = time.time()

if enable_fastcore:
solver = self._get_solver()
inconsistent = set(fastcore.fastcc(
Expand All @@ -100,12 +106,11 @@ def run(self):
tfba=enable_tfba, solver=solver))
else:
logger.info('Using flux bounds to determine consistency.')
inconsistent = set()
for reaction_id, (lo, hi) in fluxanalysis.flux_variability(
self._mm, sorted(self._mm.reactions), {},
tfba=enable_tfba, solver=solver):
if abs(lo) < epsilon and abs(hi) < epsilon:
inconsistent.add(reaction_id)
inconsistent = set(self._run_fva_fluxcheck(
self._mm, solver, enable_tfba, epsilon))

logger.info('Solving took {:.2f} seconds'.format(
time.time() - start_time))

# Count the number of reactions that are fixed at zero. While these
# reactions are still inconsistent, they are inconsistent because they
Expand Down Expand Up @@ -144,3 +149,39 @@ def run(self):
logger.info('Model has {}/{} inconsistent exchange reactions'
' ({} disabled by user)'.format(
count_exchange, total_exchange, disabled_exchange))

def _run_fva_fluxcheck(self, model, solver, enable_tfba, epsilon):
handler_args = model, solver, enable_tfba
executor = self._create_executor(
FluxCheckFVATaskHandler, handler_args, cpus_per_worker=2)

results = {}
with executor:
for (reaction_id, direction), value in executor.imap_unordered(
product(model.reactions, (1, -1)), 16):

if reaction_id not in results:
results[reaction_id] = value
continue

other_value = results[reaction_id]
if direction == -1:
bounds = value, other_value
else:
bounds = other_value, value

lower, upper = bounds
if abs(lower) < epsilon and abs(upper) < epsilon:
yield reaction_id

executor.join()


class FluxCheckFVATaskHandler(object):
def __init__(self, model, solver, enable_tfba):
self._problem = fluxanalysis.FluxBalanceProblem(model, solver)
if enable_tfba:
self._problem.add_thermodynamic()

def handle_task(self, reaction_id, direction):
return self._problem.flux_bound(reaction_id, direction)
55 changes: 38 additions & 17 deletions psamm/commands/fluxcoupling.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@

import logging

from ..command import Command, SolverCommandMixin, MetabolicMixin, CommandError
from ..command import (Command, SolverCommandMixin, MetabolicMixin,
ParallelTaskMixin, CommandError)
from .. import fluxanalysis, fluxcoupling

logger = logging.getLogger(__name__)


class FluxCouplingCommand(MetabolicMixin, SolverCommandMixin, Command):
class FluxCouplingCommand(MetabolicMixin, SolverCommandMixin,
ParallelTaskMixin, Command):
"""Find flux coupled reactions in the model.
This identifies any reaction pairs where the flux of one reaction
Expand All @@ -47,33 +49,43 @@ def run(self):
self._mm, max_reaction, tfba=False, solver=solver))
optimum = fba_fluxes[max_reaction]

self._fcp = fluxcoupling.FluxCouplingProblem(
self._mm, {max_reaction: 0.999 * optimum}, solver)
handler_args = self._mm, {max_reaction: 0.999 * optimum}, solver
executor = self._create_executor(
FluxCouplingTaskHandler, handler_args, cpus_per_worker=2)

self._coupled = {}
self._groups = []

reactions = sorted(self._mm.reactions)
for i, reaction1 in enumerate(reactions):
if reaction1 in self._coupled:
continue

for reaction2 in reactions[i+1:]:
if (reaction2 in self._coupled and
(self._coupled[reaction2] ==
self._coupled.get(reaction1))):
def iter_reaction_pairs():
reactions = sorted(self._mm.reactions)
for i, reaction1 in enumerate(reactions):
if reaction1 in self._coupled:
continue

self._check_reactions(reaction1, reaction2)
for reaction2 in reactions[i+1:]:
if (reaction2 in self._coupled and
(self._coupled[reaction2] ==
self._coupled.get(reaction1))):
continue

yield reaction1, reaction2

with executor:
for task, bounds in executor.imap_unordered(
iter_reaction_pairs(), 16):
reaction1, reaction2 = task
self._check_reactions(reaction1, reaction2, bounds)

executor.join()

logger.info('Coupled groups:')
for i, group in enumerate(self._groups):
if group is not None:
logger.info('{}: {}'.format(i, ', '.join(sorted(group))))

def _check_reactions(self, reaction1, reaction2):
logger.debug('Solving for {}, {}'.format(reaction1, reaction2))
lower, upper = self._fcp.solve(reaction1, reaction2)
def _check_reactions(self, reaction1, reaction2, bounds):
logger.debug('Solved for {}, {}'.format(reaction1, reaction2))
lower, upper = bounds

logger.debug('Result: {}, {}'.format(lower, upper))

Expand Down Expand Up @@ -135,3 +147,12 @@ def _couple_reactions(self, reaction1, reaction2):
self._coupled[reaction1] = group_index
self._coupled[reaction2] = group_index
self._groups.append(group)


class FluxCouplingTaskHandler(object):
def __init__(self, model, thresholds, solver):
self._problem = fluxcoupling.FluxCouplingProblem(
model, thresholds, solver)

def handle_task(self, reaction_1, reaction_2):
return self._problem.solve(reaction_1, reaction_2)
Loading

0 comments on commit 84dccc1

Please sign in to comment.