From 6299a3797e346ce9969e549a87ea2b0272013a71 Mon Sep 17 00:00:00 2001 From: anttttti Date: Thu, 20 May 2021 22:33:36 +1200 Subject: [PATCH] Update for version 1.4.7 Fix numpy version incompatibility, which caused FTRL model serialization errors with newer Numpy versions. Add apply_groupby.py, which distributes Pandas groupby calls --- README.rst | 2 +- setup.py | 2 +- wordbatch/__init__.py | 2 +- wordbatch/models/fm_ftrl.pyx | 12 +++++----- wordbatch/models/ftrl.pyx | 6 ++--- wordbatch/models/ftrl32.pyx | 6 ++--- wordbatch/models/nn_relu_h1.pyx | 10 ++++---- wordbatch/models/nn_relu_h2.pyx | 16 ++++++------- wordbatch/pipelines/apply_groupby.py | 34 ++++++++++++++++++++++++++++ 9 files changed, 62 insertions(+), 28 deletions(-) create mode 100644 wordbatch/pipelines/apply_groupby.py diff --git a/README.rst b/README.rst index 9408526..1a101ea 100644 --- a/README.rst +++ b/README.rst @@ -1,5 +1,5 @@ =============== -Wordbatch 1.4.6 +Wordbatch 1.4.7 =============== Overview diff --git a/setup.py b/setup.py index 9045c2b..f1763f9 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ setup( name='Wordbatch', - version='1.4.6', + version='1.4.7', description='Python library for distributed AI processing pipelines, using swappable scheduler backends', url='https://github.com/anttttti/Wordbatch', author='Antti Puurula', diff --git a/wordbatch/__init__.py b/wordbatch/__init__.py index 1d616ff..89de139 100644 --- a/wordbatch/__init__.py +++ b/wordbatch/__init__.py @@ -1,4 +1,4 @@ import os PACKAGE_DIR = os.path.dirname(os.path.abspath(__file__)) -__version__ = '1.4.6' +__version__ = '1.4.7' diff --git a/wordbatch/models/fm_ftrl.pyx b/wordbatch/models/fm_ftrl.pyx index b749401..67f2c9a 100644 --- a/wordbatch/models/fm_ftrl.pyx +++ b/wordbatch/models/fm_ftrl.pyx @@ -90,12 +90,12 @@ cdef void update_single(int* inds, double* vals, int lenn, double e, double ialp n_fm[i] += e2 cdef class FM_FTRL: - cdef double[:] w - cdef double[:] z - cdef double[:] n - cdef double[:] w_fm - cdef double[:] z_fm - cdef double[:] n_fm + cdef const double[:] w + cdef const double[:] z + cdef const double[:] n + cdef const double[:] w_fm + cdef const double[:] z_fm + cdef const double[:] n_fm cdef unsigned int threads cdef unsigned int iters diff --git a/wordbatch/models/ftrl.pyx b/wordbatch/models/ftrl.pyx index 186a5a7..14f3a53 100644 --- a/wordbatch/models/ftrl.pyx +++ b/wordbatch/models/ftrl.pyx @@ -65,9 +65,9 @@ cdef void update_single(int* inds, double* vals, int lenn, double e, double ialp n[i]+= g2 cdef class FTRL: - cdef double[:] w - cdef double[:] z - cdef double[:] n + cdef const double[:] w + cdef const double[:] z + cdef const double[:] n cdef unsigned int threads cdef unsigned int iters diff --git a/wordbatch/models/ftrl32.pyx b/wordbatch/models/ftrl32.pyx index 15fe9c4..1c647fa 100644 --- a/wordbatch/models/ftrl32.pyx +++ b/wordbatch/models/ftrl32.pyx @@ -65,9 +65,9 @@ cdef void update_single(int* inds, double* vals, int lenn, double e, double ialp n[i]+= g2 cdef class FTRL32: - cdef float[:] w - cdef float[:] z - cdef float[:] n + cdef const float[:] w + cdef const float[:] z + cdef const float[:] n cdef unsigned int threads cdef unsigned int iters diff --git a/wordbatch/models/nn_relu_h1.pyx b/wordbatch/models/nn_relu_h1.pyx index afecea1..b2e2cdb 100644 --- a/wordbatch/models/nn_relu_h1.pyx +++ b/wordbatch/models/nn_relu_h1.pyx @@ -54,11 +54,11 @@ cdef void update_single(int* inds, double* vals, int lenn, int D, int D_nn, doub c1[j]+= fabs(dldw1) cdef class NN_ReLU_H1: - cdef double[:] w0 - cdef double[:] w1 - cdef double[:] z - cdef double[:] c0 - cdef double[:] c1 + cdef const double[:] w0 + cdef const double[:] w1 + cdef const double[:] z + cdef const double[:] c0 + cdef const double[:] c1 cdef unsigned int threads cdef unsigned int iters diff --git a/wordbatch/models/nn_relu_h2.pyx b/wordbatch/models/nn_relu_h2.pyx index d2cce56..2201e2f 100644 --- a/wordbatch/models/nn_relu_h2.pyx +++ b/wordbatch/models/nn_relu_h2.pyx @@ -65,14 +65,14 @@ cdef void update_single(int* inds, double* vals, int lenn, int D, int D_nn, i c2[k] += fabs(dldw2) cdef class NN_ReLU_H2: - cdef double[:] w0 - cdef double[:] w1 - cdef double[:] w2 - cdef double[:] z1 - cdef double[:] z2 - cdef double[:] c0 - cdef double[:] c1 - cdef double[:] c2 + cdef const double[:] w0 + cdef const double[:] w1 + cdef const double[:] w2 + cdef const double[:] z1 + cdef const double[:] z2 + cdef const double[:] c0 + cdef const double[:] c1 + cdef const double[:] c2 cdef unsigned int threads cdef unsigned int iters diff --git a/wordbatch/pipelines/apply_groupby.py b/wordbatch/pipelines/apply_groupby.py new file mode 100644 index 0000000..6d261c8 --- /dev/null +++ b/wordbatch/pipelines/apply_groupby.py @@ -0,0 +1,34 @@ +import pandas as pd +from wordbatch.pipelines import Apply + +class ApplyGroupBy(object): + def __init__(self, batcher, function, group, rows_per_bin= 200, cache=None, vectorize=None, args=[], kwargs={}): + self.batcher= batcher + self.function= function + self.group= group + self.rows_per_bin = rows_per_bin + self.cache= cache + self.vectorize= vectorize + self.args= [args] + self.kwargs= [kwargs] + + def fit(self, data, input_split= False): + return self + + def fit_transform(self, data, input_split= False, merge_output= True): + return self.transform(data, input_split, merge_output) + + def transform(self, data, input_split= False, merge_output= True): + bin_ids = data[self.group].unique() + group_bins= {x:1 for x in bin_ids} if len(bin_ids) <= self.rows_per_bin else \ + {x[0]: x[1] for x in zip(bin_ids, pd.qcut(bin_ids, len(bin_ids) // self.rows_per_bin))} + group_bin_col = data[self.group].map(group_bins) + bin_ids, groups = zip(*data.groupby(group_bin_col, as_index=False)) + t= [x for x in Apply(self.function, self.batcher, *self.args, *self.kwargs, self.cache, + self.vectorize).transform(groups, input_split, merge_output) + if len(x) > 0] + try: + t= pd.concat(t, sort=False) # t is Series or DataFrame + except: + t= [item for sublist in t for item in sublist] # t is some iterable + return t \ No newline at end of file