Skip to content

Commit

Permalink
fix missing import making distributed optional
Browse files Browse the repository at this point in the history
  • Loading branch information
kain88-de committed Oct 31, 2018
1 parent dd88867 commit 114a2b0
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 10 deletions.
23 changes: 20 additions & 3 deletions pmda/leaflet.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,26 @@ def run(self,
if scheduler is None and n_jobs == 1:
scheduler = 'single-threaded'

# fall back to multiprocessing, we tried everything
if scheduler is None:
scheduler = 'multiprocessing'
if n_blocks is None:
if scheduler == 'multiprocessing':
n_blocks = n_jobs
else:
try:
from dask import distributed
if isinstance(scheduler, distributed.Client):
n_blocks = len(scheduler.ncores())
else:
n_blocks = 1
warnings.warn(
"Couldn't guess ideal number of blocks from scheduler. "
"Setting n_blocks=1. "
"Please provide `n_blocks` in call to method.")
except ImportError:
n_blocks = 1
warnings.warn(
"Couldn't guess ideal number of blocks from scheduler. "
"Setting n_blocks=1. "
"Please provide `n_blocks` in call to method.")

scheduler_kwargs = {'scheduler': scheduler}
if scheduler == 'multiprocessing':
Expand Down
23 changes: 16 additions & 7 deletions pmda/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,14 +315,23 @@ def run(self,
if n_blocks is None:
if scheduler == 'multiprocessing':
n_blocks = n_jobs
elif isinstance(scheduler, dask.distributed.Client):
n_blocks = len(scheduler.ncores())
else:
n_blocks = 1
warnings.warn(
"Couldn't guess ideal number of blocks from scheduler. "
"Setting n_blocks=1. "
"Please provide `n_blocks` in call to method.")
try:
from dask import distributed
if isinstance(scheduler, distributed.Client):
n_blocks = len(scheduler.ncores())
else:
n_blocks = 1
warnings.warn(
"Couldn't guess ideal number of blocks from scheduler. "
"Setting n_blocks=1. "
"Please provide `n_blocks` in call to method.")
except ImportError:
n_blocks = 1
warnings.warn(
"Couldn't guess ideal number of blocks from scheduler. "
"Setting n_blocks=1. "
"Please provide `n_blocks` in call to method.")

scheduler_kwargs = {'scheduler': scheduler}
if scheduler == 'multiprocessing':
Expand Down

0 comments on commit 114a2b0

Please sign in to comment.