diff --git a/README.rst b/README.rst index 1a101ea..6c3b0b7 100644 --- a/README.rst +++ b/README.rst @@ -1,5 +1,5 @@ =============== -Wordbatch 1.4.7 +Wordbatch 1.4.8 =============== Overview diff --git a/scripts/backends_benchmark.py b/scripts/backends_benchmark.py index 40692cf..9a51f31 100644 --- a/scripts/backends_benchmark.py +++ b/scripts/backends_benchmark.py @@ -10,10 +10,11 @@ from sklearn.feature_extraction.text import HashingVectorizer import warnings import pandas as pd +import multiprocessing tripadvisor_dir= "../data/tripadvisor/json" -#Configure below to allow Dask / Spark +# Configure below to allow Dask / Spark # scheduler_ip= "169.254.93.14" # from dask.distributed import Client # #dask-scheduler --host 169.254.93.14 @@ -74,8 +75,8 @@ def normalize_text(text): ['serial', ""], ['multiprocessing', ""], ['loky', ""], - #['dask', dask_client], #Uncomment once configured - #['spark', spark_context], #Uncomment once configured + # ['dask', dask_client], # Uncomment once configured + # ['spark', spark_context], # Uncomment once configured ['ray', ray] ] @@ -91,7 +92,7 @@ def normalize_text(text): texts_chunk = texts[:data_size] print("Task:", task, "Data size:", data_size) for backend in backends: - batcher = Batcher(procs=16, minibatch_size=5000, backend=backend[0], backend_handle=backend[1]) + batcher = Batcher(procs=multiprocessing.cpu_count(), minibatch_size=5000, backend=backend[0], backend_handle=backend[1]) try: with timer("Completed: ["+task+","+str(len(texts_chunk))+","+backend[0]+"]"), warnings.catch_warnings(): warnings.simplefilter("ignore") @@ -103,13 +104,13 @@ def normalize_text(text): if task=="WordBag": wb = WordBatch(normalize_text=normalize_text, - dictionary=Dictionary(min_df=10, max_words=1000000, verbose=0), - tokenizer= Tokenizer(spellcor_count=2, spellcor_dist=2, stemmer= stemmer), - extractor=WordBag(hash_ngrams=0, norm= 'l2', tf= 'binary', idf= 50.0), - batcher= batcher, - verbose= 0) + dictionary=Dictionary(min_df=10, max_words=1000000, verbose=0), + tokenizer= Tokenizer(spellcor_count=2, spellcor_dist=2, stemmer= stemmer), + extractor=WordBag(hash_ngrams=0, norm= 'l2', tf= 'binary', idf= 50.0), + batcher= batcher, + verbose= 0) t = wb.fit_transform(texts_chunk) print(t.shape, t.data[:5]) except: - print("Failed ["+task+","+str(len(texts_chunk))+","+backend[0]+"]") + print("Failed: ["+task+","+str(len(texts_chunk))+","+backend[0]+"]") print("") \ No newline at end of file diff --git a/setup.py b/setup.py index f1763f9..a8c9328 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ setup( name='Wordbatch', - version='1.4.7', + version='1.4.8', description='Python library for distributed AI processing pipelines, using swappable scheduler backends', url='https://github.com/anttttti/Wordbatch', author='Antti Puurula', diff --git a/wordbatch/__init__.py b/wordbatch/__init__.py index 89de139..3535f68 100644 --- a/wordbatch/__init__.py +++ b/wordbatch/__init__.py @@ -1,4 +1,4 @@ import os PACKAGE_DIR = os.path.dirname(os.path.abspath(__file__)) -__version__ = '1.4.7' +__version__ = '1.4.8' diff --git a/wordbatch/batcher.py b/wordbatch/batcher.py index 2d38afe..0b4e1bd 100644 --- a/wordbatch/batcher.py +++ b/wordbatch/batcher.py @@ -39,19 +39,28 @@ class Batcher(object): - 'ray' Ray local or distributed execution + task_num_cpus: int + Number of CPUs to reserve per minibatch task for Ray + + task_num_gpus: int + Number of GPUs to reserve per minibatch task for Ray + backend_handle: object Backend handle for sending tasks verbose: int Verbosity level. Setting verbose > 0 will display additional information depending on the specific level set. """ - def __init__(self, procs= 0, minibatch_size= 20000, backend_handle= None, backend= "multiprocessing", verbose= 0): + def __init__(self, procs= 0, minibatch_size= 20000, backend_handle= None, backend= "multiprocessing", + task_num_cpus= 1, task_num_gpus= 0, verbose= 0): if procs==0: procs= multiprocessing.cpu_count() self.procs= procs self.verbose= verbose self.minibatch_size= minibatch_size self.backend_handle= backend_handle self.backend= backend + self.task_num_cpus = task_num_cpus + self.task_num_gpus = task_num_gpus def list2indexedrdd(self, lst, minibatch_size=0): if minibatch_size==0: minibatch_size= self.minibatch_size @@ -79,6 +88,9 @@ def split_batches(self, data, minibatch_size= None, backend= None): minibatch_size: int Expected sizes of minibatches split from the data. + backend: object + Backend to use, instead of the Batcher backend attribute + Returns ------- data_split: list @@ -125,7 +137,8 @@ def merge_batches(self, data): return [item for sublist in data for item in sublist] def process_batches(self, task, data, args, backend=None, backend_handle=None, input_split=False, - merge_output= True, minibatch_size= None, procs=None, verbose= None): + merge_output= True, minibatch_size= None, procs=None, task_num_cpus= None, + task_num_gpus= None, verbose= None): """ Parameters @@ -146,7 +159,8 @@ def process_batches(self, task, data, args, backend=None, backend_handle=None, i If True, results from minibatches will be reduced into one single instance before return. procs: int - Number of process(es)/thread(s) for executing task in parallel. Used for multiprocessing, threading and Loky + Number of process(es)/thread(s) for executing task in parallel. Used for multiprocessing, threading, + Loky and Ray minibatch_size: int Expected size of each minibatch @@ -171,6 +185,12 @@ def process_batches(self, task, data, args, backend=None, backend_handle=None, i backend_handle: object Backend handle for sending tasks + task_num_cpus: int + Number of CPUs to reserve per minibatch task for Ray + + task_num_gpus: int + Number of GPUs to reserve per minibatch task for Ray + verbose: int Verbosity level. Setting verbose > 0 will display additional information depending on the specific level set. @@ -184,6 +204,8 @@ def process_batches(self, task, data, args, backend=None, backend_handle=None, i if procs is None: procs= self.procs if backend is None: backend= self.backend if backend_handle is None: backend_handle = self.backend_handle + if task_num_cpus is None: task_num_cpus = self.task_num_cpus + if task_num_gpus is None: task_num_gpus = self.task_num_gpus if verbose is None: verbose= self.verbose if verbose > 1: print("Task:", task, " backend:", backend, " backend_handle:", backend_handle, " procs:", @@ -228,19 +250,27 @@ def apply_func_to_indexedrdd(batch): return [batch[0]] + [task([batch[1]] + args)] results = paral_params.map(apply_func_to_indexedrdd) elif backend == "ray": - #import ray - #@ray.remote - @self.backend_handle.remote + @self.backend_handle.remote(num_cpus=task_num_cpus, num_gpus=task_num_gpus) def f_ray(f, data): return f(data) - results = [f_ray.remote(task, params) for params in paral_params] - results = [self.backend_handle.get(x) for x in results] #Slower, but handles edge cases - #results= self.backend_handle.get(results) #Faster, but crashes on edge cases? - #results = self.backend_handle.get([f_ray.remote(task, params) for params in paral_params]) + results = [f_ray.remote(task, paral_params.pop(0)) for _ in range(min(len(paral_params), self.procs))] + uncompleted = results + while (len(paral_params) > 0): + # More tasks than available processors. Queue the task calls + done, remaining = self.backend_handle.wait(uncompleted, timeout=60, fetch_local=False) + if len(done) == 0: continue + done= done[0] + uncompleted = [x for x in uncompleted if x != done] + if len(remaining) > 0: + new = f_ray.remote(task, paral_params.pop(0)) + uncompleted.append(new) + results.append(new) + results = [self.backend_handle.get(x) for x in results] #ppft currently not supported. Supporting arbitrary tasks requires modifications to passed arguments #elif backend == "ppft": # jobs = [self.backend_handle.submit(task, (x,), (), ()) for x in paral_params] # results = [x() for x in jobs] + if merge_output: return self.merge_batches(self.collect_batches(results, backend=backend)) if verbose > 2: print("Task:", task, " backend:", backend, " backend_handle:", backend_handle, " completed") @@ -269,11 +299,11 @@ def shuffle_batch(self, texts, labels= None, seed= None): List of shuffled labels. This will only be returned when non-None labels is passed """ - if seed!=None: random.seed(seed) + if seed != None: random.seed(seed) index_shuf= list(range(len(texts))) random.shuffle(index_shuf) texts= [texts[x] for x in index_shuf] - if labels==None: return texts + if labels == None: return texts labels= [labels[x] for x in index_shuf] return texts, labels diff --git a/wordbatch/pipelines/__init__.py b/wordbatch/pipelines/__init__.py index e156c85..7449640 100644 --- a/wordbatch/pipelines/__init__.py +++ b/wordbatch/pipelines/__init__.py @@ -1,5 +1,6 @@ from .apply import Apply, decorator_apply from .apply_batch import ApplyBatch, decorator_apply_batch +from .apply_groupby import ApplyGroupBy, decorator_apply_groupby from .batch_transformer import BatchTransformer from .feature_union import FeatureUnion from .wordbatch import WordBatch \ No newline at end of file diff --git a/wordbatch/pipelines/apply.py b/wordbatch/pipelines/apply.py index 495961c..6206c17 100644 --- a/wordbatch/pipelines/apply.py +++ b/wordbatch/pipelines/apply.py @@ -8,8 +8,8 @@ def decorator_apply(func, batcher=None, cache=None, vectorize=None): def wrapper_func(*args, **kwargs): - return Apply(func, args=args[1:], kwargs= kwargs, batcher= batcher, cache= cache, - vectorize= vectorize).transform(args[0]) + return Apply(func, args=args[1:], kwargs=kwargs, batcher=batcher, cache=cache, vectorize=vectorize)\ + .transform(args[0]) return wrapper_func def batch_transform(args): diff --git a/wordbatch/pipelines/apply_groupby.py b/wordbatch/pipelines/apply_groupby.py index 6d261c8..4efe363 100644 --- a/wordbatch/pipelines/apply_groupby.py +++ b/wordbatch/pipelines/apply_groupby.py @@ -1,9 +1,19 @@ +#!python import pandas as pd from wordbatch.pipelines import Apply +import wordbatch.batcher + +def decorator_apply_groupby(func, group, batcher=None, rows_per_bin=200, cache=None, vectorize=None): + def wrapper_func(*args, **kwargs): + return ApplyGroupBy(func, args=args[1:], kwargs=kwargs, group=group, rows_per_bin=rows_per_bin, + batcher=batcher, cache=cache, vectorize=vectorize).transform(args[0]) + return wrapper_func class ApplyGroupBy(object): - def __init__(self, batcher, function, group, rows_per_bin= 200, cache=None, vectorize=None, args=[], kwargs={}): - self.batcher= batcher + def __init__(self, function, group, batcher=None, rows_per_bin= 200, cache=None, vectorize=None, args=[], + kwargs={}): + if batcher is None: self.batcher= wordbatch.batcher.Batcher() + else: self.batcher= batcher self.function= function self.group= group self.rows_per_bin = rows_per_bin