Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add code for sentiment analysis #417

Open
wants to merge 32 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
e1ec479
WIP
jaidevd Feb 23, 2021
4f5b50d
WIP: MLHandler support for Huggingface transformers
jaidevd Feb 24, 2021
52e72cb
WIP
jaidevd Feb 24, 2021
4b01634
WIP
jaidevd Feb 25, 2021
1f85677
Ensure label IDs are long ints
jaidevd Feb 26, 2021
6af50ea
FIX: Enforce correct order of features before prediction in MLHandler
jaidevd Mar 1, 2021
a3fa51f
WIP
jaidevd Mar 1, 2021
2322e8e
Merge branch 'jd-mlhandler-featnames-order' of github.com:gramener/gr…
jaidevd Mar 1, 2021
e6be0d5
WIP: MLHandler Refactoring
jaidevd Mar 2, 2021
2a30092
Merge branch 'master' of github.com:gramener/gramex into jd-transformers
jaidevd Mar 2, 2021
8317fe1
ENH: Transfomers - model persistence
jaidevd Mar 2, 2021
2d5b213
WIP
jaidevd Mar 3, 2021
168c7e1
Merge branch 'master' of github.com:gramener/gramex into jd-transformers
jaidevd Mar 3, 2021
3775939
WIP
jaidevd Mar 5, 2021
d446803
ENH: Modify gramex.install.safe_rmtree to remove files outside $GRAME…
jaidevd Mar 9, 2021
80b6c27
Add code for sentiment analysis and remove print statements
MSanKeys963 Jun 21, 2021
13116a0
Remove print statement
MSanKeys963 Jun 21, 2021
7f40258
Solve merge conflicts
MSanKeys963 Jul 6, 2021
dac5359
Add space
MSanKeys963 Jul 6, 2021
e9c6bb2
Add new install.py
MSanKeys963 Jul 7, 2021
07807c7
Remove space
MSanKeys963 Jul 9, 2021
4ee107d
Remove merge conflicts test_mlhandler.py
MSanKeys963 Jul 10, 2021
20ce370
Merge branch 'master' into jd-transformers
MSanKeys963 Jul 10, 2021
e4f59e0
Remove unnecessary space & lines
MSanKeys963 Jul 10, 2021
2f00a8e
Restore formhandler.py to original version & Minor Change to mlhandle…
MSanKeys963 Jul 13, 2021
e84a064
Remove unused function
MSanKeys963 Jul 13, 2021
dd3ec9c
Rename dl_utils.py to dl.py
MSanKeys963 Jul 20, 2021
d6a7132
Remove dl_utils.py
MSanKeys963 Jul 20, 2021
2d2cfb1
Fix imports
MSanKeys963 Jul 20, 2021
326b93d
Add changes to support new gramex.yaml config.
MSanKeys963 Aug 5, 2021
db0b20c
Remove spaces
MSanKeys963 Aug 5, 2021
7c59db3
Add changes to run PyTorch and Hugging Face only when requested and f…
MSanKeys963 Aug 9, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions gramex/dl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from torch.utils.data import Dataset
import torch


class SentimentDataset(Dataset):
def __init__(self, encodings, labels):
self.encodings = encodings
self.labels = labels

def __getitem__(self, idx):
item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
item['labels'] = torch.tensor(self.labels[idx]).to(torch.int64)
return item

def __len__(self):
return len(self.labels)
169 changes: 148 additions & 21 deletions gramex/handlers/mlhandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.metrics import roc_auc_score
from sklearn.preprocessing import OneHotEncoder, StandardScaler, LabelEncoder
from slugify import slugify
from tornado.gen import coroutine
from tornado.web import HTTPError
Expand All @@ -42,8 +43,16 @@
'cats': [],
'target_col': None
}
TRANSFORMERS_DEFAULTS = dict(
num_train_epochs=1,
per_device_train_batch_size=16,
per_device_eval_batch_size=32,
weight_decay=0.01,
warmup_steps=100,
)
ACTIONS = ['predict', 'score', 'append', 'train', 'retrain']
DEFAULT_TEMPLATE = op.join(op.dirname(__file__), '..', 'apps', 'mlhandler', 'template.html')
SENTIMENT_LENC = LabelEncoder().fit(['NEGATIVE', 'POSITIVE'])
search_modelclass = lambda x: locate(x, MLCLASS_MODULES) # NOQA: E731


Expand All @@ -59,11 +68,45 @@ def _fit(model, x, y, path=None, name=None):
return model


def _train_transformer(model, data, model_path, **kwargs):
enc = model.tokenizer(data['_text'].values.tolist(), truncation=True, padding=True)
labels = SENTIMENT_LENC.transform(data['label'])
from gramex.dl import SentimentDataset
train_dataset = SentimentDataset(enc, labels)
model_output_dir = op.join(op.dirname(model_path), 'results')
model_log_dir = op.join(op.dirname(model_path), 'logs')
from transformers import Trainer, TrainingArguments
trargs = TrainingArguments(
output_dir=model_output_dir, logging_dir=model_log_dir, **kwargs)
Trainer(model=model.model, args=trargs, train_dataset=train_dataset).train()
model.save_pretrained(model_path)
move_to_cpu(model)
pred = model(data['_text'].values.tolist())
res = {
'roc_auc': roc_auc_score(
labels, SENTIMENT_LENC.transform([c['label'] for c in pred]))
}
return res


def _score_transformer(model, data):
pred = model(data['_text'].values.tolist())
score = roc_auc_score(
*map(SENTIMENT_LENC.transform, (data['label'], [c['label'] for c in pred])))
return {'roc_auc': score}


def move_to_cpu(model):
getattr(model, 'model', model).to('cpu')


class MLHandler(FormHandler):

@classmethod
def setup(cls, data=None, model={}, config_dir='', **kwargs):
def setup(cls, data=None, model={}, backend="", config_dir='', **kwargs):
cls.slug = slugify(cls.name)
cls.backend = model.get('backend')
cls.sentiment_df = pd.DataFrame()
# Create the config store directory
if not config_dir:
config_dir = op.join(gramex.config.variables['GRAMEXDATA'], 'apps', 'mlhandler',
Expand All @@ -74,7 +117,9 @@ def setup(cls, data=None, model={}, config_dir='', **kwargs):
cls.data_store = op.join(cls.config_dir, 'data.h5')

cls.template = kwargs.pop('template', DEFAULT_TEMPLATE)
cls.mclass = model.get('class')
super(MLHandler, cls).setup(**kwargs)

try:
if 'transform' in data:
data['transform'] = build_transform(
Expand All @@ -91,9 +136,6 @@ def setup(cls, data=None, model={}, config_dir='', **kwargs):
data = None
cls._built_transform = staticmethod(lambda x: x)

default_model_path = op.join(cls.config_dir, slugify(cls.name) + '.pkl')
cls.model_path = model.pop('path', default_model_path)

# store the model kwargs from gramex.yaml into the store
for key in TRANSFORMS:
cls.set_opt(key, model.get(key, cls.get_opt(key)))
Expand All @@ -104,20 +146,31 @@ def setup(cls, data=None, model={}, config_dir='', **kwargs):

cls.set_opt('class', model.get('class'))
cls.set_opt('params', model.get('params', {}))
if op.exists(cls.model_path): # If the pkl exists, load it
cls.model = joblib.load(cls.model_path)
elif data is not None:
mclass = cls.get_opt('class', model.get('class', False))
params = cls.get_opt('params', {})
data = cls._filtercols(data)
data = cls._filterrows(data)
cls.model = cls._assemble_pipeline(data, mclass=mclass, params=params)

# train the model
target = data[target_col]
train = data[[c for c in data if c != target_col]]
gramex.service.threadpool.submit(
_fit, cls.model, train, target, cls.model_path, cls.name)

if cls.backend == "transformers":
cls.load_transformer(cls.mclass, model)
if data is not None:
data = cls._filtercols(data)
data = cls._filterrows(data)
cls._concatenate(data)
else:
default_model_path = op.join(cls.config_dir, slugify(cls.name) + '.pkl')
cls.model_path = model.pop('path', default_model_path)

if op.exists(cls.model_path): # If the pkl exists, load it
cls.model = joblib.load(cls.model_path)
elif data is not None:
mclass = cls.get_opt('class', model.get('class', False))
params = cls.get_opt('params', {})
data = cls._filtercols(data)
data = cls._filterrows(data)
cls.model = cls._assemble_pipeline(data, mclass=mclass, params=params)

# train the model
target = data[target_col]
train = data[[c for c in data if c != target_col]]
gramex.service.threadpool.submit(
_fit, cls.model, train, target, cls.model_path, cls.name)
cls.config_store.flush()

@classmethod
Expand All @@ -128,6 +181,25 @@ def load_data(cls, default=pd.DataFrame()):
df = default
return df

@classmethod
def load_transformer(cls, task, _model={}):
default_model_path = op.join(
gramex.config.variables['GRAMEXDATA'], 'apps', 'mlhandler',
slugify(cls.name))
cls.model_path = _model.get('path', default_model_path)
# try loading from model_path
kwargs = {}
from transformers import pipeline
from transformers import AutoModelForSequenceClassification, AutoTokenizer
try:
kwargs['model'] = AutoModelForSequenceClassification.from_pretrained(cls.model_path)
kwargs['tokenizer'] = AutoTokenizer.from_pretrained(cls.model_path)
except Exception as err:
app_log.warning(f'Could not load model from {cls.model_path}.')
app_log.warning(f'{err}')
model = pipeline(task, **kwargs)
cls.model = model

@classmethod
def store_data(cls, df, append=False):
df.to_hdf(cls.data_store, format="table", key="data", append=append)
Expand Down Expand Up @@ -193,6 +265,11 @@ def _parse_data(self, _cache=True, append=False):
self.store_data(data, append)
return data

def _coerce_transformers_opts(self):
kwargs = {k: self.get_arg(k, TRANSFORMERS_DEFAULTS.get(k)) for k in TRANSFORMERS_DEFAULTS}
kwargs = {k: type(TRANSFORMERS_DEFAULTS.get(k))(v) for k, v in kwargs.items()}
return kwargs

@classmethod
def _filtercols(cls, data, **kwargs):
include = kwargs.get('include', cls.get_opt('include', []))
Expand Down Expand Up @@ -253,6 +330,29 @@ def _assemble_pipeline(cls, data, force=False, mclass='', params=None):
return Pipeline([('transform', ct), (model.__class__.__name__, model)])
return cls.model

@classmethod
def _concatenate(cls, data):
cats = set(cls.get_opt('cats', []))
for cat in cats:
if not data[cat].astype(str).all():
raise HTTPError(BAD_REQUEST,
reason=f"Columns {cat} should contain string.")

data.insert(0, column='_text', value='')

for col in data:
if col in cats:
data['_text'] += data[col]

cls.sentiment_df = data['_text'].copy()
cls.sentiment_df = cls.sentiment_df.to_frame(name='_text')
if 'label' in data.columns:
cls.sentiment_df['label'] = data['label']
else:
app_log.error("Column: 'label' missing, training and scoring not available!")
data.drop('_text', axis=1)
cls.store_data(data)

def _transform(self, data, **kwargs):
orgdata = self.load_data()
for col in data:
Expand Down Expand Up @@ -307,6 +407,10 @@ def get(self, *path_args, **path_kwargs):
self.write(json.dumps(params, indent=2))
elif '_cache' in self.args:
self.write(self.load_data().to_json(orient='records'))
elif self.backend == "transformers" and 'text' in self.args:
text = self.get_arguments('text')
result = yield gramex.service.threadpool.submit(self.model, text)
self.write(json.dumps(result, indent=2))
else:
self._check_model_path()
if '_download' in self.args:
Expand Down Expand Up @@ -369,8 +473,31 @@ def post(self, *path_args, **path_kwargs):
action = self.args.pop('_action', 'predict')
if action not in ACTIONS:
raise HTTPError(BAD_REQUEST, f'Action {action} not supported.')
res = yield gramex.service.threadpool.submit(getattr(self, f"_{action}"))
self.write(json.dumps(res, indent=2, cls=CustomJSONEncoder))
if self.backend == "transformers":
data = self.sentiment_df
move_to_cpu(self.model)
kwargs = {}
if action == 'train':
if 'label' not in data.columns:
app_log.error("Column: 'label' missing, training and scoring not available!")
raise HTTPError(BAD_REQUEST,
reason=print("Missing column named label(target values) from data."))
kwargs = self._coerce_transformers_opts()
kwargs['model_path'] = self.model_path
args = _train_transformer, self.model, data
elif action == 'score':
if 'label' not in data.columns:
app_log.error("Column: 'label' missing, training and scoring not available!")
raise HTTPError(BAD_REQUEST,
reason=print("Missing column named label(target values) from data."))
args = _score_transformer, self.model, data
elif action == 'predict':
args = self.model, data['_text'].values.tolist()
res = yield gramex.service.threadpool.submit(*args, **kwargs)
self.write(json.dumps(res, indent=2, cls=CustomJSONEncoder))
else:
res = yield gramex.service.threadpool.submit(getattr(self, f"_{action}"))
self.write(json.dumps(res, indent=2, cls=CustomJSONEncoder))
super(MLHandler, self).post(*path_args, **path_kwargs)

def get_cached_arg(self, argname):
Expand Down