Skip to content

Commit

Permalink
Update for 1.4.8
Browse files Browse the repository at this point in the history
Fix apply_groupby import
Add queue to Ray backend
  • Loading branch information
anttttti committed Jun 28, 2021
1 parent 6299a37 commit 3bb7b26
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 29 deletions.
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
===============
Wordbatch 1.4.7
Wordbatch 1.4.8
===============

Overview
Expand Down
21 changes: 11 additions & 10 deletions scripts/backends_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@
from sklearn.feature_extraction.text import HashingVectorizer
import warnings
import pandas as pd
import multiprocessing

tripadvisor_dir= "../data/tripadvisor/json"

#Configure below to allow Dask / Spark
# Configure below to allow Dask / Spark
# scheduler_ip= "169.254.93.14"
# from dask.distributed import Client
# #dask-scheduler --host 169.254.93.14
Expand Down Expand Up @@ -74,8 +75,8 @@ def normalize_text(text):
['serial', ""],
['multiprocessing', ""],
['loky', ""],
#['dask', dask_client], #Uncomment once configured
#['spark', spark_context], #Uncomment once configured
# ['dask', dask_client], # Uncomment once configured
# ['spark', spark_context], # Uncomment once configured
['ray', ray]
]

Expand All @@ -91,7 +92,7 @@ def normalize_text(text):
texts_chunk = texts[:data_size]
print("Task:", task, "Data size:", data_size)
for backend in backends:
batcher = Batcher(procs=16, minibatch_size=5000, backend=backend[0], backend_handle=backend[1])
batcher = Batcher(procs=multiprocessing.cpu_count(), minibatch_size=5000, backend=backend[0], backend_handle=backend[1])
try:
with timer("Completed: ["+task+","+str(len(texts_chunk))+","+backend[0]+"]"), warnings.catch_warnings():
warnings.simplefilter("ignore")
Expand All @@ -103,13 +104,13 @@ def normalize_text(text):

if task=="WordBag":
wb = WordBatch(normalize_text=normalize_text,
dictionary=Dictionary(min_df=10, max_words=1000000, verbose=0),
tokenizer= Tokenizer(spellcor_count=2, spellcor_dist=2, stemmer= stemmer),
extractor=WordBag(hash_ngrams=0, norm= 'l2', tf= 'binary', idf= 50.0),
batcher= batcher,
verbose= 0)
dictionary=Dictionary(min_df=10, max_words=1000000, verbose=0),
tokenizer= Tokenizer(spellcor_count=2, spellcor_dist=2, stemmer= stemmer),
extractor=WordBag(hash_ngrams=0, norm= 'l2', tf= 'binary', idf= 50.0),
batcher= batcher,
verbose= 0)
t = wb.fit_transform(texts_chunk)
print(t.shape, t.data[:5])
except:
print("Failed ["+task+","+str(len(texts_chunk))+","+backend[0]+"]")
print("Failed: ["+task+","+str(len(texts_chunk))+","+backend[0]+"]")
print("")
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

setup(
name='Wordbatch',
version='1.4.7',
version='1.4.8',
description='Python library for distributed AI processing pipelines, using swappable scheduler backends',
url='https://github.com/anttttti/Wordbatch',
author='Antti Puurula',
Expand Down
2 changes: 1 addition & 1 deletion wordbatch/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import os
PACKAGE_DIR = os.path.dirname(os.path.abspath(__file__))
__version__ = '1.4.7'
__version__ = '1.4.8'

54 changes: 42 additions & 12 deletions wordbatch/batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,28 @@ class Batcher(object):
- 'ray' Ray local or distributed execution
task_num_cpus: int
Number of CPUs to reserve per minibatch task for Ray
task_num_gpus: int
Number of GPUs to reserve per minibatch task for Ray
backend_handle: object
Backend handle for sending tasks
verbose: int
Verbosity level. Setting verbose > 0 will display additional information depending on the specific level set.
"""
def __init__(self, procs= 0, minibatch_size= 20000, backend_handle= None, backend= "multiprocessing", verbose= 0):
def __init__(self, procs= 0, minibatch_size= 20000, backend_handle= None, backend= "multiprocessing",
task_num_cpus= 1, task_num_gpus= 0, verbose= 0):
if procs==0: procs= multiprocessing.cpu_count()
self.procs= procs
self.verbose= verbose
self.minibatch_size= minibatch_size
self.backend_handle= backend_handle
self.backend= backend
self.task_num_cpus = task_num_cpus
self.task_num_gpus = task_num_gpus

def list2indexedrdd(self, lst, minibatch_size=0):
if minibatch_size==0: minibatch_size= self.minibatch_size
Expand Down Expand Up @@ -79,6 +88,9 @@ def split_batches(self, data, minibatch_size= None, backend= None):
minibatch_size: int
Expected sizes of minibatches split from the data.
backend: object
Backend to use, instead of the Batcher backend attribute
Returns
-------
data_split: list
Expand Down Expand Up @@ -125,7 +137,8 @@ def merge_batches(self, data):
return [item for sublist in data for item in sublist]

def process_batches(self, task, data, args, backend=None, backend_handle=None, input_split=False,
merge_output= True, minibatch_size= None, procs=None, verbose= None):
merge_output= True, minibatch_size= None, procs=None, task_num_cpus= None,
task_num_gpus= None, verbose= None):
"""
Parameters
Expand All @@ -146,7 +159,8 @@ def process_batches(self, task, data, args, backend=None, backend_handle=None, i
If True, results from minibatches will be reduced into one single instance before return.
procs: int
Number of process(es)/thread(s) for executing task in parallel. Used for multiprocessing, threading and Loky
Number of process(es)/thread(s) for executing task in parallel. Used for multiprocessing, threading,
Loky and Ray
minibatch_size: int
Expected size of each minibatch
Expand All @@ -171,6 +185,12 @@ def process_batches(self, task, data, args, backend=None, backend_handle=None, i
backend_handle: object
Backend handle for sending tasks
task_num_cpus: int
Number of CPUs to reserve per minibatch task for Ray
task_num_gpus: int
Number of GPUs to reserve per minibatch task for Ray
verbose: int
Verbosity level. Setting verbose > 0 will display additional information depending on the specific level set.
Expand All @@ -184,6 +204,8 @@ def process_batches(self, task, data, args, backend=None, backend_handle=None, i
if procs is None: procs= self.procs
if backend is None: backend= self.backend
if backend_handle is None: backend_handle = self.backend_handle
if task_num_cpus is None: task_num_cpus = self.task_num_cpus
if task_num_gpus is None: task_num_gpus = self.task_num_gpus
if verbose is None: verbose= self.verbose
if verbose > 1:
print("Task:", task, " backend:", backend, " backend_handle:", backend_handle, " procs:",
Expand Down Expand Up @@ -228,19 +250,27 @@ def apply_func_to_indexedrdd(batch):
return [batch[0]] + [task([batch[1]] + args)]
results = paral_params.map(apply_func_to_indexedrdd)
elif backend == "ray":
#import ray
#@ray.remote
@self.backend_handle.remote
@self.backend_handle.remote(num_cpus=task_num_cpus, num_gpus=task_num_gpus)
def f_ray(f, data):
return f(data)
results = [f_ray.remote(task, params) for params in paral_params]
results = [self.backend_handle.get(x) for x in results] #Slower, but handles edge cases
#results= self.backend_handle.get(results) #Faster, but crashes on edge cases?
#results = self.backend_handle.get([f_ray.remote(task, params) for params in paral_params])
results = [f_ray.remote(task, paral_params.pop(0)) for _ in range(min(len(paral_params), self.procs))]
uncompleted = results
while (len(paral_params) > 0):
# More tasks than available processors. Queue the task calls
done, remaining = self.backend_handle.wait(uncompleted, timeout=60, fetch_local=False)
if len(done) == 0: continue
done= done[0]
uncompleted = [x for x in uncompleted if x != done]
if len(remaining) > 0:
new = f_ray.remote(task, paral_params.pop(0))
uncompleted.append(new)
results.append(new)
results = [self.backend_handle.get(x) for x in results]
#ppft currently not supported. Supporting arbitrary tasks requires modifications to passed arguments
#elif backend == "ppft":
# jobs = [self.backend_handle.submit(task, (x,), (), ()) for x in paral_params]
# results = [x() for x in jobs]

if merge_output: return self.merge_batches(self.collect_batches(results, backend=backend))
if verbose > 2:
print("Task:", task, " backend:", backend, " backend_handle:", backend_handle, " completed")
Expand Down Expand Up @@ -269,11 +299,11 @@ def shuffle_batch(self, texts, labels= None, seed= None):
List of shuffled labels. This will only be returned when non-None
labels is passed
"""
if seed!=None: random.seed(seed)
if seed != None: random.seed(seed)
index_shuf= list(range(len(texts)))
random.shuffle(index_shuf)
texts= [texts[x] for x in index_shuf]
if labels==None: return texts
if labels == None: return texts
labels= [labels[x] for x in index_shuf]
return texts, labels

Expand Down
1 change: 1 addition & 0 deletions wordbatch/pipelines/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .apply import Apply, decorator_apply
from .apply_batch import ApplyBatch, decorator_apply_batch
from .apply_groupby import ApplyGroupBy, decorator_apply_groupby
from .batch_transformer import BatchTransformer
from .feature_union import FeatureUnion
from .wordbatch import WordBatch
4 changes: 2 additions & 2 deletions wordbatch/pipelines/apply.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@

def decorator_apply(func, batcher=None, cache=None, vectorize=None):
def wrapper_func(*args, **kwargs):
return Apply(func, args=args[1:], kwargs= kwargs, batcher= batcher, cache= cache,
vectorize= vectorize).transform(args[0])
return Apply(func, args=args[1:], kwargs=kwargs, batcher=batcher, cache=cache, vectorize=vectorize)\
.transform(args[0])
return wrapper_func

def batch_transform(args):
Expand Down
14 changes: 12 additions & 2 deletions wordbatch/pipelines/apply_groupby.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
#!python
import pandas as pd
from wordbatch.pipelines import Apply
import wordbatch.batcher

def decorator_apply_groupby(func, group, batcher=None, rows_per_bin=200, cache=None, vectorize=None):
def wrapper_func(*args, **kwargs):
return ApplyGroupBy(func, args=args[1:], kwargs=kwargs, group=group, rows_per_bin=rows_per_bin,
batcher=batcher, cache=cache, vectorize=vectorize).transform(args[0])
return wrapper_func

class ApplyGroupBy(object):
def __init__(self, batcher, function, group, rows_per_bin= 200, cache=None, vectorize=None, args=[], kwargs={}):
self.batcher= batcher
def __init__(self, function, group, batcher=None, rows_per_bin= 200, cache=None, vectorize=None, args=[],
kwargs={}):
if batcher is None: self.batcher= wordbatch.batcher.Batcher()
else: self.batcher= batcher
self.function= function
self.group= group
self.rows_per_bin = rows_per_bin
Expand Down

0 comments on commit 3bb7b26

Please sign in to comment.