From 6cfe35152e3608009c0b7c2b3fe8928983e10663 Mon Sep 17 00:00:00 2001 From: Aaron W Chen Date: Mon, 29 Jan 2024 22:05:20 -0800 Subject: [PATCH] Update classes for custom sklearn/mlflow --- src/custom_sklearn_text_transformer_mlflow.py | 237 ++++++++++++++++++ src/custom_stanza_mlflow.py | 166 ++++-------- 2 files changed, 281 insertions(+), 122 deletions(-) create mode 100644 src/custom_sklearn_text_transformer_mlflow.py diff --git a/src/custom_sklearn_text_transformer_mlflow.py b/src/custom_sklearn_text_transformer_mlflow.py new file mode 100644 index 0000000..21c431b --- /dev/null +++ b/src/custom_sklearn_text_transformer_mlflow.py @@ -0,0 +1,237 @@ +from itertools import tee, islice +import numpy as np +import pandas as pd +import re +from sklearn.feature_extraction.text import ( + CountVectorizer + , TfidfTransformer + , TfidfVectorizer +) +import stanza +import tqdm + + +class CustomSKLearnAnalyzer(): + """ + This class handles using Stanza with a custom analyzer inside sklearn + """ + + def __init__(self, stanza_lang_str="en"): + """ + Constructor method. Initializes the model with a Stanza libary language + type. The default is "en" for English, later on, can think adding + functionality to download the pretrained model/embeddings + """ + self.stanza_lang_str = stanza_lang_str + + def prepare_stanza_pipeline(self, + depparse_batch_size=50, + depparse_min_length_to_batch_separately=50, + verbose=True, + use_gpu=False, + batch_size=100 + ): + """ + Method to simply construction of Stanza Pipeline for usage in the sklearn custom analyzer + + Args: + Follow creation of stanza pipeline (link to their docs) + + self.stanza_lang_str: + str for pretrained Stanza embeddings to use in the pipeline (from init) + + depparse_batch_size: + int for batch size for processing, default is 50 + + depparse_min_length_to_batch_separately: + int for minimum string length to batch, default is 50 + + verbose: + boolean for information for readouts during processing, default is True + + use_gpu: + boolean for using GPU for stanza, default is False, + set to True when on cloud/not on streaming computer + + batch_size: + int for batch sizing, default is 100 + + Returns: + nlp: + stanza pipeline + """ + + # Perhaps down the road, this should be stored as an MLflow Artifact to be downloaded + # Or should this be part of the Container building at start up? If so, how would those get logged? Just as artifacts? + stanza.download(self.stanza_lang_str) + + nlp = stanza.Pipeline( + self.stanza_lang_str, + depparse_batch_size=depparse_batch_size, + depparse_min_length_to_batch_separately=depparse_min_length_to_batch_separately, + verbose=verbose, + use_gpu=use_gpu, + batch_size=batch_size + ) + + return nlp + + def fit_transform( + self, + input_data, + stanza_pipeline, + strip_accents="unicode", + lowercase=True, + min_ngram_length=1, + max_ngram_length=4, + min_df=3, + sklearn_type='OneHotEncode', + ): + """ + Method to simplify construction of custom sklearn text processor. + + Follows construction of standard CountVectorizer/TFIDFVectorizer + + Args: + Follows sklearn CountVectorizer construction with some changes: + + input_data: + pd.Series to be transformed. Each element in the series should be list of strings + + stanza_pipeline: + stanza.pipeline from prepare_stanza_pipeline + + min_ngram_length: + setting for minimum number in ngram vectoriazation, + used with custom analyzer + default of 1 + + max_ngram_length: + setting for maximum number in ngram vectoriazation, + used with custom analyzer + default of 4 + + sklearn_type: + Setting for OneHotEncode, Regular CountVectorization, or TFIDFVectorization + default for OneHotEncode, choose between "OneHotEncode", "CountVectorizer", "TFIDF" + + Returns: + sklearn_transformer: + sklearn text transformer for usage later/in MLflow models + + transformed_text: + pd.DataFrame that combines the vectorized text with the original dataframe + """ + + sklearn_transformer_params = { + 'strip_accents':strip_accents, + 'lowercase':lowercase, + 'min_df':min_df, + 'analyzer': CustomSKLearnWrapper().stanza_analyzer( + stanza_pipeline=stanza_pipeline, + min_ngram_length=min_ngram_length, + max_ngram_length=max_ngram_length + ), + } + + if sklearn_type == "OneHotEncode": + sklearn_transformer_params['binary'] = True + sklearn_transformer = CountVectorizer(**sklearn_transformer_params) + + elif sklearn_type == "CountVectorizer": + print("/n") + print("Using CountVectorizer, but is not OneHotEncoded or TFIDF transformed") + sklearn_transformer_params['binary'] = False + sklearn_transformer = CountVectorizer(**sklearn_transformer_params) + + elif sklearn_type == "TFIDF": + sklearn_transformer_params['binary'] = False + sklearn_transformer = TfidfVectorizer(**sklearn_transformer_params) + + else: + print("/n") + print("Invalid sklearn text processing type, please choose between 'OneHotEncode', 'CountVectorizer', 'TFIDF'") + return None + + response = sklearn_transformer.fit_transform( + tqdm(input_data['ingredients']) + ) + + transformed_recipe = pd.DataFrame( + response.toarray(), + columns=sklearn_transformer.get_feature_names_out(), + index=input_data.index + ) + + return sklearn_transformer, response + + + def stanza_analyzer(self, stanza_pipeline, minNgramLength, maxNgramLength): + """ + Custom ngram analyzer function, matching only ngrams that belong to the same line + + The source for this was StackOverflow because I couldn't figure out how to let sklearn pipelines use arguments for custom analyzers + + Use this as the analyzer for an sklearn pipeline, and it should work + + Args: + stanza_pipeline: Stanza pipeline + minNgramLength: integer for the minimum ngram (usually 1) + maxNgramLength: integer for maximum length ngram (usually should not exceed 4) + + Returns: + A function that will be used in sklearn pipeline. Said function yields a generator + + """ + + def ngrams_per_line(ingredients_list): + + lowered = " brk ".join( + map(str, [ingred for ingred in ingredients_list if ingred is not None]) + ).lower() + + if lowered is None: + lowered = "Missing ingredients" + + preproc = stanza_pipeline(lowered) + + lemmad = " ".join( + map( + str, + [ + word.lemma + for sent in preproc.sentences + for word in sent.words + if ( + word.upos + not in ["NUM", "DET", "ADV", "CCONJ", "ADP", "SCONJ"] + and word is not None + ) + ], + ) + ) + + # analyze each line of the input string seperately + for ln in lemmad.split(" brk "): + + # tokenize the input string (customize the regex as desired) + at_least_two_english_characters_whole_words = "(?u)\b[a-zA-Z]{2,}\b" + terms = re.split(at_least_two_english_characters_whole_words, ln) + + # loop ngram creation for every number between min and max ngram length + for ngramLength in range(minNgramLength, maxNgramLength + 1): + + # find and return all ngrams + # for ngram in zip(*[terms[i:] for i in range(3)]): + # <-- solution without a generator (works the same but has higher memory usage) + for ngram in zip( + *[ + islice(seq, i, len(terms)) + for i, seq in enumerate(tee(terms, ngramLength)) + ] + ): # <-- solution using a generator + + ngram = " ".join(map(str, ngram)) + yield ngram + + return ngrams_per_line diff --git a/src/custom_stanza_mlflow.py b/src/custom_stanza_mlflow.py index 19306e3..06ad118 100644 --- a/src/custom_stanza_mlflow.py +++ b/src/custom_stanza_mlflow.py @@ -5,151 +5,73 @@ import re from sklearn.metrics.pairwise import cosine_similarity, pairwise_distances import stanza +import src.custom_sklearn_text_transformer_mlflow -class StanzaWrapper(mlflow.pyfunc.PythonModel): +class CustomSKLearnWrapper(mlflow.pyfunc.PythonModel): """ This class allows Stanza pipelines to be logged in MLflow as a custom PythonModel """ - def __init__(self, stanza_lang_str='en'): - self.stanza_lang_str = stanza_lang_str + # def __init__(self, model): + # """ + # Constructor method. Initializes the model with a Stanza libary language + # type. The default is "en" for English + + # model: sklearn.Transformer + # The sklearn text Transformer or Pipeline that ends in a + # Transformer + + # later can add functionality to include pretrained models needed for Stanza + + # """ + # self.model = model def load_context(self, context): """ - This method is called when loading an MLflow model with - pyfunc.load_model(), as soon as the Python Model is constructed. + Method needed to override default load_context. Needs to handle different components of sklearn model - Args: - context: MLflow context where the model artifact is stored. """ - import pickle + import dill as pickle - self.model = pickle.load(open(context.artifacts["sklearn_transformer"], "rb")) - self.database = pickle.load(open(context.artifacts["data"], "rb")) + self.model = pickle.load(open(context.artifacts["sklearn_model"], "rb")) - - def predict(self, ingredients_list: list): + def predict(self, context, model_input, params): """ This method is needed to override the default predict. - It needs to function essentially as a wrapper + It needs to function essentially as a wrapper and returns back the + Transformer or Transformer Pipeline itself Args: - ingredients_list: the ingredients of a single, query recipe in a list + context: Any + Not used + + model: sklearn.Transformer + The sklearn text Transformer or Pipeline that ends in a + Transformer + + model_input: List(string) + The ingredients of a single query recipe in a list + Need to decide if this is taking in raw text or preprocessed text + Leaning towards taking in raw text, doing preprocessing, and + logging the pre processed text as an artifact + + params: dict, optional + Parameters used for the model (optional) + Not used currently for sklearn Returns: - similar_recipes_df: DataFrame of the top 5 most similar recipes from - the database + transformed_recipe_df: DataFrame of the recipes after going through + the sklearn/Stanza text processing """ - response = self.model.transform(ingredients_list) + response = self.model.transform(model_input) transformed_recipe = pd.DataFrame( - response.toarray(), columns=self.model.get_feature_names() + response.toarray(), + columns=self.model.get_feature_names(), + index=model_input.index, ) - similar_recipes_df = self.find_closest_recipes( - filtered_ingred_word_matrix=query_matrix, - recipe_tfidf=self.model, - X_df=prepped, - ) - return similar_recipes_df - - @classmethod - def stanza_analyzer(self, stanza_pipeline, minNgramLength, maxNgramLength): - """ - Custom ngram analyzer function, matching only ngrams that belong to the same line - - The source for this was StackOverflow because I couldn't figure out how to let sklearn pipelines use arguments for custom analyzers - - Use this as the analyzer for an sklearn pipeline, and it should work - - Args: - stanza_pipeline: Stanza pipeline - minNgramLength: integer for the minimum ngram (usually 1) - maxNgramLength: integer for maximum length ngram (usually should not exceed 4) - - Returns: - A function that will be used in sklearn pipeline. Said function yields a generator - - """ - def ngrams_per_line(ingredients_list): - - lowered = " brk ".join(map(str, [ingred for ingred in ingredients_list if ingred is not None])).lower() - - if lowered is None: - lowered = "Missing ingredients" - - preproc = stanza_pipeline(lowered) - - lemmad = " ".join(map(str, - [word.lemma - for sent in preproc.sentences - for word in sent.words if ( - word.upos not in ["NUM", "DET", "ADV", "CCONJ", "ADP", "SCONJ"] - and word is not None - )] - ) - ) - - # analyze each line of the input string seperately - for ln in lemmad.split(' brk '): - - # tokenize the input string (customize the regex as desired) - at_least_two_english_characters_whole_words = "(?u)\b[a-zA-Z]{2,}\b" - terms = re.split(at_least_two_english_characters_whole_words, ln) - - # loop ngram creation for every number between min and max ngram length - for ngramLength in range(minNgramLength, maxNgramLength+1): - - # find and return all ngrams - # for ngram in zip(*[terms[i:] for i in range(3)]): - # <-- solution without a generator (works the same but has higher memory usage) - for ngram in zip(*[islice(seq, i, len(terms)) for i, seq in enumerate(tee(terms, ngramLength))]): # <-- solution using a generator - - ngram = ' '.join(map(str, ngram)) - yield ngram - return ngrams_per_line - - - # def fit_transform(self, nlp_sklearn_params: list): - # """ - # This method duplicates/wraps scikit-learn behavior for Pipelines to handle text - - # Args: - # nlp_sklearn_params: list of tuples - - # Returns: - # pipe: Pipeline - # """ - # return t - - def filter_out_cuisine(ingred_word_matrix, X_df, cuisine_name, tfidf): - # This function takes in the ingredient word matrix (from joblib), a - # dataframe made from the database (from joblib), the user inputted cuisine - # name, and the ingredient TFIDF Vectorizer object (from joblib) and returns - # a word sub matrix that removes all recipes with the same cuisine as the - # inputted recipe. - - combo = pd.concat([ingred_word_matrix, X_df["imputed_label"]], axis=1) - filtered_ingred_word_matrix = combo[ - combo["imputed_label"] != cuisine_name - ].drop("imputed_label", axis=1) - return filtered_ingred_word_matrix - - def find_closest_recipes(filtered_ingred_word_matrix, recipe_tfidf, X_df): - # This function takes in the filtered ingredient word matrix from function - # filter_out_cuisine, the TFIDF recipe from function transform_tfidf, and - # a dataframe made from the database (from joblib) and returns a Pandas - # DataFrame with the top five most similar recipes and a Pandas Series - # containing the similarity amount - search_vec = np.array(recipe_tfidf).reshape(1, -1) - res_cos_sim = cosine_similarity(filtered_ingred_word_matrix, search_vec) - top_five = np.argsort(res_cos_sim.flatten())[-5:][::-1] - proximity = res_cos_sim[top_five] - recipe_ids = [filtered_ingred_word_matrix.iloc[idx].name for idx in top_five] - suggest_df = X_df.loc[recipe_ids] - suggest_df = pd.concat([suggest_df, proximity]) - return suggest_df - + return transformed_recipe