-
Notifications
You must be signed in to change notification settings - Fork 0
/
df_pipeline.py
186 lines (150 loc) · 6.34 KB
/
df_pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
__author__ = 'lucabasa'
__version__ = '1.0.7'
__status__ = 'development'
'''
This script contains methods to preserve the DataFrame structure inside of a pipeline.
In this way, it is possible to create custom transformers that create or delete features inside of the pipeline.
Moreover, the creation of dummies takes care of checking if the train and test set have the same dummies,
preventing the annoying error when a model is called and the shape of the data is not consistent.
Examples with the use of these methods can be found here https://www.kaggle.com/lucabasa/understand-and-use-a-pipeline
'''
import pandas as pd
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import FeatureUnion
from sklearn.preprocessing import StandardScaler, RobustScaler
from sklearn.impute import SimpleImputer
import warnings
class feat_sel(BaseEstimator, TransformerMixin):
'''
This transformer selects either numerical or categorical features.
In this way we can build separate pipelines for separate data types.
'''
def __init__(self, dtype='numeric'):
self.dtype = dtype # do not use parameters like _dype as it doesn't play nice with GridSearch
def fit( self, X, y=None ):
return self
def transform(self, X, y=None):
if self.dtype == 'numeric':
num_cols = X.columns[X.dtypes != object].tolist()
return X[num_cols]
elif self.dtype == 'category':
cat_cols = X.columns[X.dtypes == object].tolist()
return X[cat_cols]
class df_imputer(TransformerMixin, BaseEstimator):
'''
Just a wrapper for the SimpleImputer that keeps the dataframe structure
'''
def __init__(self, strategy='mean'):
self.strategy = strategy
self.imp = None
self.statistics_ = None
def fit(self, X, y=None):
self.imp = SimpleImputer(strategy=self.strategy)
self.imp.fit(X)
self.statistics_ = pd.Series(self.imp.statistics_, index=X.columns)
return self
def transform(self, X):
# assumes X is a DataFrame
Ximp = self.imp.transform(X)
Xfilled = pd.DataFrame(Ximp, index=X.index, columns=X.columns)
return Xfilled
class df_scaler(TransformerMixin, BaseEstimator):
'''
Wrapper of StandardScaler or RobustScaler
'''
def __init__(self, method='standard'):
self.scl = None
self.scale_ = None
self.method = method
if self.method == 'sdandard':
self.mean_ = None
elif method == 'robust':
self.center_ = None
self.columns = None # this is useful when it is the last step of a pipeline before the model
def fit(self, X, y=None):
if self.method == 'standard':
self.scl = StandardScaler()
self.scl.fit(X)
self.mean_ = pd.Series(self.scl.mean_, index=X.columns)
elif self.method == 'robust':
self.scl = RobustScaler()
self.scl.fit(X)
self.center_ = pd.Series(self.scl.center_, index=X.columns)
self.scale_ = pd.Series(self.scl.scale_, index=X.columns)
return self
def transform(self, X):
# assumes X is a DataFrame
Xscl = self.scl.transform(X)
Xscaled = pd.DataFrame(Xscl, index=X.index, columns=X.columns)
self.columns = X.columns
return Xscaled
def get_feature_names(self):
return list(self.columns)
class dummify(TransformerMixin, BaseEstimator):
'''
Wrapper for get dummies
Via match_cols, it is possible to ask the transformer to make sure that all the dummies are there
Missing dummies are introduced with a column of 0's
Extra dummies are dropped
'''
def __init__(self, drop_first=False, match_cols=True):
self.drop_first = drop_first
self.columns = [] # useful to well behave with FeatureUnion
self.match_cols = match_cols
def fit(self, X, y=None):
return self
def match_columns(self, X):
miss_train = list(set(X.columns) - set(self.columns))
miss_test = list(set(self.columns) - set(X.columns))
err = 0
if len(miss_test) > 0:
for col in miss_test:
X[col] = 0 # insert a column for the missing dummy
err += 1
if len(miss_train) > 0:
for col in miss_train:
del X[col] # delete the column of the extra dummy
err += 1
if err > 0:
warnings.warn('The dummies in this set do not match the ones in the train set, we corrected the issue.',
UserWarning)
return X[self.columns] # preserve original order to avoid problems with some algorithms
def transform(self, X):
X = pd.get_dummies(X, drop_first=self.drop_first)
if (len(self.columns) > 0):
if self.match_cols:
X = self.match_columns(X)
else:
self.columns = X.columns
return X
def get_features_name(self):
return self.columns
class FeatureUnion_df(TransformerMixin, BaseEstimator):
'''
Wrapper of FeatureUnion but returning a Dataframe,
the column order follows the concatenation done by FeatureUnion
transformer_list: list of Pipelines
'''
def __init__(self, transformer_list, n_jobs=None, transformer_weights=None, verbose=False):
self.transformer_list = transformer_list
self.n_jobs = n_jobs
self.transformer_weights = transformer_weights
self.verbose = verbose # these are necessary to work inside of GridSearch or similar
self.feat_un = FeatureUnion(self.transformer_list,
self.n_jobs,
self.transformer_weights,
self.verbose)
def fit(self, X, y=None):
self.feat_un.fit(X, y)
return self
def transform(self, X, y=None):
X_tr = self.feat_un.transform(X)
columns = []
for trsnf in self.transformer_list:
cols = trsnf[1].steps[-1][1].get_features_name()
columns += list(cols)
X_tr = pd.DataFrame(X_tr, index=X.index, columns=columns)
return X_tr
def get_params(self, deep=True): # necessary to well behave in GridSearch
return self.feat_un.get_params(deep=deep)