diff --git a/examples/Next-Item-Prediction-with-Transformers/tf/transformers-next-item-prediction-with-pretrained-embeddings.ipynb b/examples/Next-Item-Prediction-with-Transformers/tf/transformers-next-item-prediction-with-pretrained-embeddings.ipynb new file mode 100644 index 000000000..f5decba04 --- /dev/null +++ b/examples/Next-Item-Prediction-with-Transformers/tf/transformers-next-item-prediction-with-pretrained-embeddings.ipynb @@ -0,0 +1,1433 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "id": "5b545747", + "metadata": {}, + "outputs": [], + "source": [ + "# Copyright 2022 NVIDIA Corporation. All Rights Reserved.\n", + "#\n", + "# Licensed under the Apache License, Version 2.0 (the \"License\");\n", + "# you may not use this file except in compliance with the License.\n", + "# You may obtain a copy of the License at\n", + "#\n", + "# http://www.apache.org/licenses/LICENSE-2.0\n", + "#`\n", + "# Unless required by applicable law or agreed to in writing, software\n", + "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", + "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", + "# See the License for the specific language governing permissions anda\n", + "# limitations under the License.\n", + "# ==============================================================================\n", + "\n", + "# Each user is responsible for checking the content of datasets and the\n", + "# applicable licenses and determining if suitable for the intended use." + ] + }, + { + "cell_type": "markdown", + "id": "5ec6d3b8", + "metadata": {}, + "source": [ + "\n", + "\n", + "# Transformer-based architecture for next-item prediction task with pretrained embeddings\n", + "\n", + "This notebook is created using the latest stable [merlin-tensorflow](https://catalog.ngc.nvidia.com/orgs/nvidia/teams/merlin/containers/merlin-tensorflow/tags) container.\n", + "\n", + "## Overview\n", + "\n", + "In this use case we will train a Transformer-based architecture for next-item prediction task with pretrained embeddings.\n", + "\n", + "**You can chose to download the full dataset manually or use synthetic data.**\n", + "\n", + "We will use the [SIGIR eCOM 2021 Data Challenge Dataset](https://github.com/coveooss/SIGIR-ecom-data-challenge) to train a session-based model. The dataset contains 36M events of users browsing an online store.\n", + "\n", + "We will reshape the data to organize it into 'sessions'. Each session will be a full customer online journey in chronological order. The goal will be to predict the `url` of the next action taken.\n", + "\n", + "\n", + "### Learning objectives\n", + "\n", + "- Training a Transformer-based architecture for next-item prediction task" + ] + }, + { + "cell_type": "markdown", + "id": "fd2b847f", + "metadata": {}, + "source": [ + "## Downloading and preparing the dataset" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "2dd7827c", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2023-06-20 22:58:36.667322: I tensorflow/core/platform/cpu_feature_guard.cc:194] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations: SSE3 SSE4.1 SSE4.2 AVX\n", + "To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.\n", + "/usr/local/lib/python3.8/dist-packages/merlin/dtypes/mappings/torch.py:43: UserWarning: PyTorch dtype mappings did not load successfully due to an error: No module named 'torch'\n", + " warn(f\"PyTorch dtype mappings did not load successfully due to an error: {exc.msg}\")\n", + "2023-06-20 22:58:38.026020: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:998] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero\n", + "2023-06-20 22:58:38.026445: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:998] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero\n", + "2023-06-20 22:58:38.026622: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:998] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero\n" + ] + } + ], + "source": [ + "import os\n", + "import cudf\n", + "import numpy as np\n", + "import pandas as pd\n", + "import nvtabular as nvt\n", + "from merlin.schema import ColumnSchema, Schema, Tags\n", + "\n", + "OUTPUT_DATA_DIR = os.environ.get('OUTPUT_DATA_DIR', '/workspace/data')\n", + "NUM_EPOCHS = int(os.environ.get('NUM_EPOCHS', 5))\n", + "NUM_EXAMPLES = int(os.environ.get('NUM_EXAMPLES', 100_000))\n", + "MINIMUM_SESSION_LENGTH = int(os.environ.get('MINIMUM_SESSION_LENGTH', 5))" + ] + }, + { + "cell_type": "markdown", + "id": "7fcf7c86", + "metadata": {}, + "source": [ + "You can download the full dataset by registering [here](https://www.coveo.com/en/ailabs/sigir-ecom-data-challenge). If you chose to download the data, please place it alongside this notebook in the `sigir_dataset` directory and extract it.\n", + "\n", + "By default, in this notebook we will be using synthetically generated data based on the SIGIR dataset, but you can run on the full dataset by changing the value of the boolean flag below." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "bc3d1882", + "metadata": {}, + "outputs": [], + "source": [ + "RUN_ON_SYNTHETIC_DATA = True" + ] + }, + { + "cell_type": "markdown", + "id": "68bc6d6d", + "metadata": {}, + "source": [ + "### Clean downloaded data" + ] + }, + { + "cell_type": "markdown", + "id": "9016a3e2", + "metadata": {}, + "source": [ + "If you are training on the full SIGIR dataset, the following code will pre-process it.\n", + "\n", + "Here we deal with `nan` values, drop rows with missing information and parse strings containing lists to lists.\n", + "\n", + "The synthetically generated data is already clean -- it doesn't require this pre-processing." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "428ab049", + "metadata": {}, + "outputs": [], + "source": [ + "if not RUN_ON_SYNTHETIC_DATA:\n", + " train = nvt.Dataset('/workspace/sigir_dataset/train/browsing_train.csv', part_size='500MB')\n", + " skus = nvt.Dataset('/workspace/sigir_dataset/train/sku_to_content.csv')\n", + "\n", + " skus = pd.read_csv('/workspace/sigir_dataset/train/sku_to_content.csv')\n", + "\n", + " skus['description_vector'] = skus['description_vector'].replace(np.nan, '')\n", + " skus['image_vector'] = skus['image_vector'].replace(np.nan, '')\n", + "\n", + " skus['description_vector'] = skus['description_vector'].apply(lambda x: [] if len(x) == 0 else eval(x))\n", + " skus['image_vector'] = skus['image_vector'].apply(lambda x: [] if len(x) == 0 else eval(x))\n", + " skus = skus[skus.description_vector.apply(len) > 0]\n", + " skus = nvt.Dataset(skus)" + ] + }, + { + "cell_type": "markdown", + "id": "9b33fa32", + "metadata": {}, + "source": [ + "### Generate synthetic data" + ] + }, + { + "cell_type": "markdown", + "id": "4c4ba9b9", + "metadata": {}, + "source": [ + "If you are not running on the full dataset, the following lines of code will generate its synthetic counterpart." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "84789211", + "metadata": {}, + "outputs": [], + "source": [ + "if RUN_ON_SYNTHETIC_DATA:\n", + " from merlin.datasets.synthetic import generate_data\n", + "\n", + " train = generate_data('sigir-browsing', NUM_EXAMPLES)\n", + " skus = generate_data('sigir-sku', NUM_EXAMPLES)" + ] + }, + { + "cell_type": "markdown", + "id": "5533f446", + "metadata": {}, + "source": [ + "## Constructing a workflow" + ] + }, + { + "cell_type": "markdown", + "id": "ac47bc4e", + "metadata": {}, + "source": [ + "We need to process our data further before we can use it to train our model.\n", + "\n", + "In particular, the `skus` dataset contains the mapping between the `product_sku_hash` (essentially an item id) to the `description_vector` -- an embedding obtained from the description.\n", + "\n", + "We would like to enable our model to make use of this piece of information. In order to feed this data to our model, we need to map the `product_sku_hash` to an id.\n", + "\n", + "But we need to make sure that the way we process `skus` and the `train` dataset (event information) is consistent, that the same `product_sku_hash` is mapped to the same id both when processing `skus` and `train`.\n", + "\n", + "We do so by defining and fitting a `Categorify` op once and using it to process both the `skus` and the `train` datasets.\n", + "\n", + "Additionally, we apply some further processing to the `train` dataset. We group rows by `session_id_hash` so that each training example will contain events from a single customer visit to the online store arranged in chronological order.\n", + "\n", + "If you would like to learn more about leveraging `NVTabular` to process tabular data on the GPU using a set of industry standard operators, please consult the examples available [here](https://github.com/NVIDIA-Merlin/NVTabular/tree/main/examples).\n", + "\n", + "Let's first process the `train` dataset and retain the `Categorify` operator (`cat_op`) for processing of `skus`." + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "3b5feee3", + "metadata": {}, + "outputs": [], + "source": [ + "cat_op = nvt.ops.Categorify()\n", + "out = ['product_sku_hash'] >> cat_op >> nvt.ops.TagAsItemID()\n", + "out += ['event_type', 'product_action', 'session_id_hash', 'hashed_url'] >> nvt.ops.Categorify()\n", + "out += ['server_timestamp_epoch_ms'] >> nvt.ops.NormalizeMinMax()\n", + "\n", + "groupby_features = out >> nvt.ops.Groupby(\n", + " groupby_cols=['session_id_hash'],\n", + " aggs={\n", + " 'product_sku_hash': ['list'],\n", + " 'event_type': ['list'],\n", + " 'product_action': ['list'],\n", + " 'hashed_url': ['list', 'count'],\n", + " 'server_timestamp_epoch_ms': ['list']\n", + " },\n", + " sort_cols=\"server_timestamp_epoch_ms\"\n", + ")\n", + "\n", + "filtered_sessions = groupby_features >> nvt.ops.Filter(f=lambda df: df[\"hashed_url_count\"] >= MINIMUM_SESSION_LENGTH)\n", + "\n", + "# We won't be needing the `session_id_hash` nor the `hashed_url_count` any longer\n", + "wf = nvt.Workflow(\n", + " filtered_sessions[\n", + " 'product_sku_hash_list',\n", + " 'event_type_list',\n", + " 'product_action_list',\n", + " 'hashed_url_list',\n", + " ]\n", + ")\n", + "\n", + "# Let's save the output of our workflow -- transformed `train` for later use (training of our model).\n", + "wf.fit_transform(train).to_parquet('train_transformed')" + ] + }, + { + "cell_type": "markdown", + "id": "45a4828e", + "metadata": {}, + "source": [ + "Here are a couple of example rows from `train_transformed`." + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "650fb0d0", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
product_sku_hash_listevent_type_listproduct_action_listhashed_url_list
0[578, 972, 378, 420, 328, 126, 233, 925, 410, ...[3, 4, 4, 3, 3, 3, 3, 3, 3, 4, 4, 3, 4, 4, 4, ...[3, 3, 5, 6, 4, 3, 3, 4, 4, 4, 6, 5, 3, 4, 3, ...[766, 955, 745, 210, 940, 688, 986, 524, 425, ...
1[298, 304, 393, 697, 706, 313, 834, 83, 502, 1...[4, 4, 4, 3, 4, 4, 4, 3, 3, 3, 4, 4, 3, 4, 3, ...[3, 5, 6, 4, 4, 3, 3, 3, 6, 6, 3, 3, 6, 6, 3, ...[13, 221, 915, 658, 456, 378, 802, 180, 580, 4...
2[706, 221, 22, 702, 339, 645, 436, 358, 84, 35...[4, 3, 4, 4, 4, 4, 4, 4, 3, 3, 3, 4, 3, 4, 3, ...[3, 6, 4, 6, 3, 3, 5, 5, 4, 6, 4, 6, 3, 5, 6, ...[271, 940, 562, 498, 172, 239, 270, 215, 489, ...
3[278, 153, 189, 717, 580, 540, 219, 79, 200, 9...[3, 3, 3, 3, 4, 4, 3, 4, 4, 3, 4, 4, 3, 3, 3, ...[6, 6, 6, 6, 3, 4, 4, 4, 4, 4, 3, 6, 5, 4, 3, ...[169, 419, 875, 725, 926, 770, 160, 554, 763, ...
4[156, 922, 914, 592, 842, 916, 137, 928, 615, ...[3, 4, 4, 4, 3, 4, 4, 4, 4, 3, 4, 3, 4, 3, 4, ...[6, 4, 5, 6, 5, 4, 3, 3, 6, 5, 6, 5, 3, 6, 3, ...[318, 506, 281, 191, 506, 480, 965, 399, 761, ...
\n", + "
" + ], + "text/plain": [ + " product_sku_hash_list \\\n", + "0 [578, 972, 378, 420, 328, 126, 233, 925, 410, ... \n", + "1 [298, 304, 393, 697, 706, 313, 834, 83, 502, 1... \n", + "2 [706, 221, 22, 702, 339, 645, 436, 358, 84, 35... \n", + "3 [278, 153, 189, 717, 580, 540, 219, 79, 200, 9... \n", + "4 [156, 922, 914, 592, 842, 916, 137, 928, 615, ... \n", + "\n", + " event_type_list \\\n", + "0 [3, 4, 4, 3, 3, 3, 3, 3, 3, 4, 4, 3, 4, 4, 4, ... \n", + "1 [4, 4, 4, 3, 4, 4, 4, 3, 3, 3, 4, 4, 3, 4, 3, ... \n", + "2 [4, 3, 4, 4, 4, 4, 4, 4, 3, 3, 3, 4, 3, 4, 3, ... \n", + "3 [3, 3, 3, 3, 4, 4, 3, 4, 4, 3, 4, 4, 3, 3, 3, ... \n", + "4 [3, 4, 4, 4, 3, 4, 4, 4, 4, 3, 4, 3, 4, 3, 4, ... \n", + "\n", + " product_action_list \\\n", + "0 [3, 3, 5, 6, 4, 3, 3, 4, 4, 4, 6, 5, 3, 4, 3, ... \n", + "1 [3, 5, 6, 4, 4, 3, 3, 3, 6, 6, 3, 3, 6, 6, 3, ... \n", + "2 [3, 6, 4, 6, 3, 3, 5, 5, 4, 6, 4, 6, 3, 5, 6, ... \n", + "3 [6, 6, 6, 6, 3, 4, 4, 4, 4, 4, 3, 6, 5, 4, 3, ... \n", + "4 [6, 4, 5, 6, 5, 4, 3, 3, 6, 5, 6, 5, 3, 6, 3, ... \n", + "\n", + " hashed_url_list \n", + "0 [766, 955, 745, 210, 940, 688, 986, 524, 425, ... \n", + "1 [13, 221, 915, 658, 456, 378, 802, 180, 580, 4... \n", + "2 [271, 940, 562, 498, 172, 239, 270, 215, 489, ... \n", + "3 [169, 419, 875, 725, 926, 770, 160, 554, 763, ... \n", + "4 [318, 506, 281, 191, 506, 480, 965, 399, 761, ... " + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "nvt.Dataset('train_transformed', engine='parquet').head()" + ] + }, + { + "cell_type": "markdown", + "id": "18f12dbd", + "metadata": {}, + "source": [ + "Now that we have processed the train set, we can use the mapping preserved in the `cat_op` to process the `skus` dataset containing the embeddings we are after.\n", + "\n", + "Let's now `Categorify` the `product_sku_hash` in `skus` and grab just the description embedding information." + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "313808d0", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
product_sku_hashdescription_vectorcategory_hashprice_bucket
013[0.07939800762120258, 0.3465797761609977, -0.3...160.186690
125[0.4275482879608162, -0.30569476366666, 0.1440...380.951997
218[-0.31035419787213536, 0.18070481533058008, 0....220.973384
31[-0.31319783485940356, -0.11623980504981396, -...1380.146260
411[0.25091279302969943, -0.33473442518442525, 0....1190.808252
\n", + "
" + ], + "text/plain": [ + " product_sku_hash description_vector \\\n", + "0 13 [0.07939800762120258, 0.3465797761609977, -0.3... \n", + "1 25 [0.4275482879608162, -0.30569476366666, 0.1440... \n", + "2 18 [-0.31035419787213536, 0.18070481533058008, 0.... \n", + "3 1 [-0.31319783485940356, -0.11623980504981396, -... \n", + "4 11 [0.25091279302969943, -0.33473442518442525, 0.... \n", + "\n", + " category_hash price_bucket \n", + "0 16 0.186690 \n", + "1 38 0.951997 \n", + "2 22 0.973384 \n", + "3 138 0.146260 \n", + "4 119 0.808252 " + ] + }, + "execution_count": 8, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "skus.head()" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "dfad1bcf", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
product_sku_hashdescription_vector
0836[0.07939800762120258, 0.3465797761609977, -0.3...
1979[0.4275482879608162, -0.30569476366666, 0.1440...
211[-0.31035419787213536, 0.18070481533058008, 0....
3469[-0.31319783485940356, -0.11623980504981396, -...
4118[0.25091279302969943, -0.33473442518442525, 0....
\n", + "
" + ], + "text/plain": [ + " product_sku_hash description_vector\n", + "0 836 [0.07939800762120258, 0.3465797761609977, -0.3...\n", + "1 979 [0.4275482879608162, -0.30569476366666, 0.1440...\n", + "2 11 [-0.31035419787213536, 0.18070481533058008, 0....\n", + "3 469 [-0.31319783485940356, -0.11623980504981396, -...\n", + "4 118 [0.25091279302969943, -0.33473442518442525, 0...." + ] + }, + "execution_count": 9, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "out = ['product_sku_hash'] >> cat_op\n", + "wf_skus = nvt.Workflow(out + 'description_vector')\n", + "skus_ds = wf_skus.transform(skus)\n", + "\n", + "skus_ds.head()" + ] + }, + { + "cell_type": "markdown", + "id": "360fe65d", + "metadata": {}, + "source": [ + "Let us now export the embedding information to a `numpy` array and write it to disk.\n", + "\n", + "We will later pass this information to the `Loader` so that it will load the correct emebedding for the product corresponding to a given step of a customer journey.\n", + "\n", + "The embeddings are linked to the train set using the `product_sku_hash` information." + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "d99dfdd0", + "metadata": {}, + "outputs": [], + "source": [ + "skus_ds.to_npy('skus.npy')" + ] + }, + { + "cell_type": "markdown", + "id": "58d80879", + "metadata": {}, + "source": [ + "How will the `Loader` know which embedding to associate with a given row of the train set?\n", + "\n", + "The `product_sku_hash` ids have been exported along with the embeddings and are contained in the first column of the output `numpy` array.\n", + "\n", + "Here is the id of the first embedding stored in `skus.npy`:" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "d60c6651", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "836.0" + ] + }, + "execution_count": 11, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "np.load('skus.npy')[0, 0]" + ] + }, + { + "cell_type": "markdown", + "id": "974cf669", + "metadata": {}, + "source": [ + "and here is the embedding vector corresponding to `product_sku_hash` of id referenced above:" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "c2c111fd", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "array([ 0.07939801, 0.34657978, -0.38269496, 0.56307004, -0.10142923,\n", + " 0.03702352, -0.11606304, 0.10070879, -0.21879928, 0.06107687,\n", + " -0.20743195, -0.01330719, 0.60182867, 0.0920322 , 0.2648726 ,\n", + " 0.56061561, 0.48643498, 0.39045152, -0.40012162, 0.09153962,\n", + " -0.38351605, 0.57134731, 0.59986226, -0.40321368, -0.32984972,\n", + " 0.37559494, 0.1554353 , -0.0413067 , 0.33814398, 0.30678041,\n", + " 0.24001132, 0.42737922, 0.41554601, -0.40451691, 0.50428902,\n", + " -0.2004803 , -0.38297056, 0.06580838, 0.48285745, 0.51406472,\n", + " 0.02268894, 0.36343324, 0.32497967, -0.29736346, -0.00538915,\n", + " 0.12329302, -0.04998194, 0.27843002, 0.20212714, 0.39019503])" + ] + }, + "execution_count": 12, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "np.load('skus.npy')[0, 1:]" + ] + }, + { + "cell_type": "markdown", + "id": "7b8c4a13", + "metadata": {}, + "source": [ + "We are now ready to construct the `Loader` that will feed the data to our model.\n", + "\n", + "We begin by reading in the embeddings information." + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "51e1f766", + "metadata": {}, + "outputs": [], + "source": [ + "embeddings = np.load('skus.npy')" + ] + }, + { + "cell_type": "markdown", + "id": "e0b1f18d", + "metadata": {}, + "source": [ + "We are now ready to define the `Loader`.\n", + "\n", + "We are passing in an `EmbeddingOperator` that will ensure that correct `sku` information (correct `description_vector`) is associated with the correct step in the customer journey (with the lookup key being contained in the `product_sku_hash_list`)\n", + "\n", + "When specifying the dataset, we are creating a `Merlin Dataset` based on the `train_transformed` data we saved above.\n", + "\n", + "Depending on the hardware that you will be running this on and the size of the dataset that you will be using, should you run out of GPU memory, you can specify one of the several parameters that can ease the memory load (`npartitions`, `part_size`, or `part_mem_fraction`).\n", + "\n", + "The `BATCH_SIZE` of 16 should work on a broad set of hardware, but if you are training on a lot of data and your hardware permitting you might want to significantly increase it." + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "1d7212fc", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "WARNING:tensorflow:Please fix your imports. Module tensorflow.python.training.tracking.data_structures has been moved to tensorflow.python.trackable.data_structures. The old module will be deleted in version 2.11.\n", + "[INFO]: sparse_operation_kit is imported\n", + "WARNING:tensorflow:Please fix your imports. Module tensorflow.python.training.tracking.base has been moved to tensorflow.python.trackable.base. The old module will be deleted in version 2.11.\n", + "[SOK INFO] Import /usr/local/lib/python3.8/dist-packages/merlin_sok-1.1.4-py3.8-linux-x86_64.egg/sparse_operation_kit/lib/libsok_experiment.so\n", + "[SOK INFO] Import /usr/local/lib/python3.8/dist-packages/merlin_sok-1.1.4-py3.8-linux-x86_64.egg/sparse_operation_kit/lib/libsok_experiment.so\n", + "[SOK INFO] Initialize finished, communication tool: horovod\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2023-06-20 22:58:50.835162: I tensorflow/core/platform/cpu_feature_guard.cc:194] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations: SSE3 SSE4.1 SSE4.2 AVX\n", + "To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.\n", + "2023-06-20 22:58:50.836068: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:998] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero\n", + "2023-06-20 22:58:50.836268: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:998] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero\n", + "2023-06-20 22:58:50.836425: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:998] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero\n", + "2023-06-20 22:58:50.836673: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:998] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero\n", + "2023-06-20 22:58:50.836849: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:998] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero\n", + "2023-06-20 22:58:50.837009: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:998] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero\n", + "2023-06-20 22:58:50.837114: W tensorflow/core/common_runtime/gpu/gpu_bfc_allocator.cc:42] Overriding orig_value setting because the TF_FORCE_GPU_ALLOW_GROWTH environment variable is set. Original config value was 0.\n", + "2023-06-20 22:58:50.837130: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1621] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 24576 MB memory: -> device: 0, name: Quadro RTX 8000, pci bus id: 0000:08:00.0, compute capability: 7.5\n" + ] + } + ], + "source": [ + "BATCH_SIZE = 16\n", + "\n", + "from merlin.dataloader.tensorflow import Loader\n", + "from merlin.dataloader.ops.embeddings import EmbeddingOperator\n", + "import merlin.models.tf as mm\n", + "\n", + "embedding_operator = EmbeddingOperator(\n", + " embeddings[:, 1:].astype(np.float32),\n", + " id_lookup_table=embeddings[:, 0].astype(int),\n", + " lookup_key=\"product_sku_hash_list\",\n", + " embedding_name='product_embeddings'\n", + ")\n", + "\n", + "loader = Loader(\n", + " dataset=nvt.Dataset('train_transformed', engine='parquet'),\n", + " batch_size=BATCH_SIZE,\n", + " transforms=[\n", + " embedding_operator\n", + " ],\n", + " shuffle=True\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "4f037d5d", + "metadata": {}, + "source": [ + "Using the `EmbeddingOperator` object we referenced our `product_embeddings` and insructed the model what to use as a key to look up the information.\n", + "\n", + "Below is an example batch of data that our model will consume." + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "7371e23d", + "metadata": {}, + "outputs": [], + "source": [ + "batch = mm.sample_batch(loader, batch_size=BATCH_SIZE, include_targets=False, prepare_features=True)" + ] + }, + { + "cell_type": "markdown", + "id": "f7c9a50d", + "metadata": {}, + "source": [ + "`product_embeddings` are included in the batch." + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "id": "3cbf8ea4", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "dict_keys(['product_sku_hash_list', 'event_type_list', 'product_action_list', 'hashed_url_list', 'product_embeddings'])" + ] + }, + "execution_count": 16, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "batch.keys()" + ] + }, + { + "cell_type": "markdown", + "id": "53e61e71", + "metadata": {}, + "source": [ + "## Creating and training the model" + ] + }, + { + "cell_type": "markdown", + "id": "2461926e", + "metadata": {}, + "source": [ + "We are now ready to construct our model." + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "id": "6867c8ba", + "metadata": {}, + "outputs": [], + "source": [ + "import merlin.models.tf as mm\n", + "\n", + "input_block = mm.InputBlockV2(\n", + " loader.output_schema,\n", + " embeddings=mm.Embeddings(\n", + " loader.output_schema.select_by_tag(Tags.CATEGORICAL),\n", + " sequence_combiner=None,\n", + " ),\n", + " pretrained_embeddings=mm.PretrainedEmbeddings(\n", + " loader.output_schema.select_by_tag(Tags.EMBEDDING),\n", + " sequence_combiner=None,\n", + " normalizer=\"l2-norm\",\n", + " output_dims={\"product_embeddings\": 64},\n", + " )\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "cafb788f", + "metadata": {}, + "source": [ + "We have now constructed an `input_block` that will take our batch and transform it in a fashion that will make it amenable for further processing by subsequent layers of our model.\n", + "\n", + "To test that everything has worked, we can pass our example `batch` through the `input_block`." + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "id": "3f8afa56", + "metadata": {}, + "outputs": [], + "source": [ + "input_batch = input_block(batch)" + ] + }, + { + "cell_type": "markdown", + "id": "d24a70fe", + "metadata": {}, + "source": [ + "Let us now construct the remaining layers of our model." + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "id": "78b21c0f", + "metadata": {}, + "outputs": [], + "source": [ + "target = 'hashed_url_list'\n", + "\n", + "# We do not need the `train_transformed` dataset here, but we do need\n", + "# to access the schema.\n", + "# It contains important information that will help our model construct itself.\n", + "schema = nvt.Dataset('train_transformed', engine='parquet').schema\n", + "\n", + "dmodel=64\n", + "mlp_block = mm.MLPBlock(\n", + " [128,dmodel],\n", + " activation='relu',\n", + " no_activation_last_layer=True,\n", + " )\n", + "transformer_block = mm.XLNetBlock(d_model=dmodel, n_head=4, n_layer=2)\n", + "model = mm.Model(\n", + " input_block,\n", + " mlp_block,\n", + " transformer_block,\n", + " mm.CategoricalOutput(\n", + " schema.select_by_name(target),\n", + " default_loss=\"categorical_crossentropy\",\n", + " ),\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "13b54d19", + "metadata": {}, + "source": [ + "And let us train it." + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "id": "fbb03f0c", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/usr/local/lib/python3.8/dist-packages/keras/initializers/initializers_v2.py:120: UserWarning: The initializer TruncatedNormal is unseeded and being called multiple times, which will return identical values each time (even if the initializer is unseeded). Please update your code to provide a seed to the initializer, or avoid using the same initalizer instance more than once.\n", + " warnings.warn(\n", + "2023-06-20 22:58:58.950175: I tensorflow/compiler/xla/stream_executor/cuda/cuda_dnn.cc:428] Loaded cuDNN version 8700\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Epoch 1/5\n", + "WARNING:tensorflow:Gradients do not exist for variables ['model/mask_emb:0', 'transformer/layer_._0/rel_attn/r_s_bias:0', 'transformer/layer_._0/rel_attn/seg_embed:0', 'transformer/layer_._1/rel_attn/r_s_bias:0', 'transformer/layer_._1/rel_attn/seg_embed:0'] when minimizing the loss. If you're using `model.compile()`, did you forget to provide a `loss` argument?\n", + "WARNING:tensorflow:Gradients do not exist for variables ['model/mask_emb:0', 'transformer/layer_._0/rel_attn/r_s_bias:0', 'transformer/layer_._0/rel_attn/seg_embed:0', 'transformer/layer_._1/rel_attn/r_s_bias:0', 'transformer/layer_._1/rel_attn/seg_embed:0'] when minimizing the loss. If you're using `model.compile()`, did you forget to provide a `loss` argument?\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2023-06-20 22:59:11.285571: W tensorflow/core/grappler/optimizers/loop_optimizer.cc:907] Skipping loop optimization for Merge node with control input: model/xl_net_block/sequential_block_7/replace_masked_embeddings/RaggedWhere/Assert/AssertGuard/branch_executed/_95\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "18/18 [==============================] - 42s 2s/step - loss: 6.9800 - recall_at_10: 0.0106 - mrr_at_10: 0.0033 - ndcg_at_10: 0.0050 - map_at_10: 0.0033 - precision_at_10: 0.0011 - regularization_loss: 0.0000e+00 - loss_batch: 6.9689\n", + "Epoch 2/5\n", + "18/18 [==============================] - 34s 2s/step - loss: 6.9591 - recall_at_10: 0.0106 - mrr_at_10: 0.0031 - ndcg_at_10: 0.0048 - map_at_10: 0.0031 - precision_at_10: 0.0011 - regularization_loss: 0.0000e+00 - loss_batch: 6.9363\n", + "Epoch 3/5\n", + "18/18 [==============================] - 39s 2s/step - loss: 6.9471 - recall_at_10: 0.0107 - mrr_at_10: 0.0028 - ndcg_at_10: 0.0046 - map_at_10: 0.0028 - precision_at_10: 0.0011 - regularization_loss: 0.0000e+00 - loss_batch: 6.9206\n", + "Epoch 4/5\n", + "18/18 [==============================] - 38s 2s/step - loss: 6.9398 - recall_at_10: 0.0103 - mrr_at_10: 0.0030 - ndcg_at_10: 0.0047 - map_at_10: 0.0030 - precision_at_10: 0.0010 - regularization_loss: 0.0000e+00 - loss_batch: 6.9015\n", + "Epoch 5/5\n", + "18/18 [==============================] - 38s 2s/step - loss: 6.9375 - recall_at_10: 0.0104 - mrr_at_10: 0.0030 - ndcg_at_10: 0.0047 - map_at_10: 0.0030 - precision_at_10: 0.0010 - regularization_loss: 0.0000e+00 - loss_batch: 6.9095\n" + ] + }, + { + "data": { + "text/plain": [ + "" + ] + }, + "execution_count": 20, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "model.compile(run_eagerly=False, optimizer='adam', loss=\"categorical_crossentropy\")\n", + "model.fit(loader, batch_size=BATCH_SIZE, epochs=NUM_EPOCHS, pre=mm.SequenceMaskRandom(schema=loader.output_schema, target=target, masking_prob=0.3, transformer=transformer_block))" + ] + }, + { + "cell_type": "markdown", + "id": "fa8ab17b", + "metadata": {}, + "source": [ + "## Serving predictions" + ] + }, + { + "cell_type": "markdown", + "id": "c778420d", + "metadata": {}, + "source": [ + "Now that we have prepared a workflow for processing our data (`wf`), defined the embedding operator (`embedding_operator`) and trained our model (`model`), we have all the components we need to serve our model using the Triton Inference Server (TIS).\n", + "\n", + "Let us define a set of inference operators (a pipeline for processing our data all the way to obtaining predictions) and export them as an ensemble that we will be able to serve using TIS." + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "id": "18f19033", + "metadata": {}, + "outputs": [], + "source": [ + "from merlin.systems.dag.ops.tensorflow import PredictTensorflow\n", + "from merlin.systems.dag.ensemble import Ensemble\n", + "from merlin.systems.dag.ops.workflow import TransformWorkflow" + ] + }, + { + "cell_type": "code", + "execution_count": 22, + "id": "385aba04", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "WARNING:tensorflow:Skipping full serialization of Keras layer TFSharedEmbeddings(\n", + " (_feature_shapes): Dict(\n", + " (product_sku_hash_list): TensorShape([16, None, 1])\n", + " (event_type_list): TensorShape([16, None, 1])\n", + " (product_action_list): TensorShape([16, None, 1])\n", + " (hashed_url_list): TensorShape([16, None, 1])\n", + " (product_embeddings): TensorShape([16, None, 50])\n", + " )\n", + " (_feature_dtypes): Dict(\n", + " (product_sku_hash_list): tf.int64\n", + " (event_type_list): tf.int64\n", + " (product_action_list): tf.int64\n", + " (hashed_url_list): tf.int64\n", + " (product_embeddings): tf.float32\n", + " )\n", + "), because it is not built.\n", + "WARNING:tensorflow:Skipping full serialization of Keras layer Dropout(\n", + " (_feature_shapes): Dict(\n", + " (product_sku_hash_list): TensorShape([16, None, 1])\n", + " (event_type_list): TensorShape([16, None, 1])\n", + " (product_action_list): TensorShape([16, None, 1])\n", + " (hashed_url_list): TensorShape([16, None, 1])\n", + " (product_embeddings): TensorShape([16, None, 50])\n", + " )\n", + " (_feature_dtypes): Dict(\n", + " (product_sku_hash_list): tf.int64\n", + " (event_type_list): tf.int64\n", + " (product_action_list): tf.int64\n", + " (hashed_url_list): tf.int64\n", + " (product_embeddings): tf.float32\n", + " )\n", + "), because it is not built.\n", + "WARNING:tensorflow:Skipping full serialization of Keras layer Dropout(\n", + " (_feature_shapes): Dict(\n", + " (product_sku_hash_list): TensorShape([16, None, 1])\n", + " (event_type_list): TensorShape([16, None, 1])\n", + " (product_action_list): TensorShape([16, None, 1])\n", + " (hashed_url_list): TensorShape([16, None, 1])\n", + " (product_embeddings): TensorShape([16, None, 50])\n", + " )\n", + " (_feature_dtypes): Dict(\n", + " (product_sku_hash_list): tf.int64\n", + " (event_type_list): tf.int64\n", + " (product_action_list): tf.int64\n", + " (hashed_url_list): tf.int64\n", + " (product_embeddings): tf.float32\n", + " )\n", + "), because it is not built.\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "WARNING:absl:Found untraced functions such as model_context_layer_call_fn, model_context_layer_call_and_return_conditional_losses, sequence_mask_random_layer_call_fn, sequence_mask_random_layer_call_and_return_conditional_losses, prepare_list_features_1_layer_call_fn while saving (showing 5 of 110). These functions will not be directly callable after loading.\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "INFO:tensorflow:Assets written to: /tmp/tmpi3g8g7q7/assets\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "INFO:tensorflow:Assets written to: /tmp/tmpi3g8g7q7/assets\n" + ] + } + ], + "source": [ + "inference_operators = wf.input_schema.column_names >> TransformWorkflow(wf) >> embedding_operator >> PredictTensorflow(model)" + ] + }, + { + "cell_type": "code", + "execution_count": 23, + "id": "1c14a25d", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "WARNING:tensorflow:Skipping full serialization of Keras layer TFSharedEmbeddings(\n", + " (_feature_shapes): Dict(\n", + " (product_sku_hash_list): TensorShape([16, None, 1])\n", + " (event_type_list): TensorShape([16, None, 1])\n", + " (product_action_list): TensorShape([16, None, 1])\n", + " (hashed_url_list): TensorShape([16, None, 1])\n", + " (product_embeddings): TensorShape([16, None, 50])\n", + " )\n", + " (_feature_dtypes): Dict(\n", + " (product_sku_hash_list): tf.int64\n", + " (event_type_list): tf.int64\n", + " (product_action_list): tf.int64\n", + " (hashed_url_list): tf.int64\n", + " (product_embeddings): tf.float32\n", + " )\n", + "), because it is not built.\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "WARNING:tensorflow:Skipping full serialization of Keras layer TFSharedEmbeddings(\n", + " (_feature_shapes): Dict(\n", + " (product_sku_hash_list): TensorShape([16, None, 1])\n", + " (event_type_list): TensorShape([16, None, 1])\n", + " (product_action_list): TensorShape([16, None, 1])\n", + " (hashed_url_list): TensorShape([16, None, 1])\n", + " (product_embeddings): TensorShape([16, None, 50])\n", + " )\n", + " (_feature_dtypes): Dict(\n", + " (product_sku_hash_list): tf.int64\n", + " (event_type_list): tf.int64\n", + " (product_action_list): tf.int64\n", + " (hashed_url_list): tf.int64\n", + " (product_embeddings): tf.float32\n", + " )\n", + "), because it is not built.\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "WARNING:tensorflow:Skipping full serialization of Keras layer Dropout(\n", + " (_feature_shapes): Dict(\n", + " (product_sku_hash_list): TensorShape([16, None, 1])\n", + " (event_type_list): TensorShape([16, None, 1])\n", + " (product_action_list): TensorShape([16, None, 1])\n", + " (hashed_url_list): TensorShape([16, None, 1])\n", + " (product_embeddings): TensorShape([16, None, 50])\n", + " )\n", + " (_feature_dtypes): Dict(\n", + " (product_sku_hash_list): tf.int64\n", + " (event_type_list): tf.int64\n", + " (product_action_list): tf.int64\n", + " (hashed_url_list): tf.int64\n", + " (product_embeddings): tf.float32\n", + " )\n", + "), because it is not built.\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "WARNING:tensorflow:Skipping full serialization of Keras layer Dropout(\n", + " (_feature_shapes): Dict(\n", + " (product_sku_hash_list): TensorShape([16, None, 1])\n", + " (event_type_list): TensorShape([16, None, 1])\n", + " (product_action_list): TensorShape([16, None, 1])\n", + " (hashed_url_list): TensorShape([16, None, 1])\n", + " (product_embeddings): TensorShape([16, None, 50])\n", + " )\n", + " (_feature_dtypes): Dict(\n", + " (product_sku_hash_list): tf.int64\n", + " (event_type_list): tf.int64\n", + " (product_action_list): tf.int64\n", + " (hashed_url_list): tf.int64\n", + " (product_embeddings): tf.float32\n", + " )\n", + "), because it is not built.\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "WARNING:tensorflow:Skipping full serialization of Keras layer Dropout(\n", + " (_feature_shapes): Dict(\n", + " (product_sku_hash_list): TensorShape([16, None, 1])\n", + " (event_type_list): TensorShape([16, None, 1])\n", + " (product_action_list): TensorShape([16, None, 1])\n", + " (hashed_url_list): TensorShape([16, None, 1])\n", + " (product_embeddings): TensorShape([16, None, 50])\n", + " )\n", + " (_feature_dtypes): Dict(\n", + " (product_sku_hash_list): tf.int64\n", + " (event_type_list): tf.int64\n", + " (product_action_list): tf.int64\n", + " (hashed_url_list): tf.int64\n", + " (product_embeddings): tf.float32\n", + " )\n", + "), because it is not built.\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "WARNING:tensorflow:Skipping full serialization of Keras layer Dropout(\n", + " (_feature_shapes): Dict(\n", + " (product_sku_hash_list): TensorShape([16, None, 1])\n", + " (event_type_list): TensorShape([16, None, 1])\n", + " (product_action_list): TensorShape([16, None, 1])\n", + " (hashed_url_list): TensorShape([16, None, 1])\n", + " (product_embeddings): TensorShape([16, None, 50])\n", + " )\n", + " (_feature_dtypes): Dict(\n", + " (product_sku_hash_list): tf.int64\n", + " (event_type_list): tf.int64\n", + " (product_action_list): tf.int64\n", + " (hashed_url_list): tf.int64\n", + " (product_embeddings): tf.float32\n", + " )\n", + "), because it is not built.\n", + "WARNING:absl:Found untraced functions such as model_context_layer_call_fn, model_context_layer_call_and_return_conditional_losses, sequence_mask_random_layer_call_fn, sequence_mask_random_layer_call_and_return_conditional_losses, prepare_list_features_1_layer_call_fn while saving (showing 5 of 110). These functions will not be directly callable after loading.\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "INFO:tensorflow:Assets written to: /workspace/data/ensemble/1_predicttensorflowtriton/1/model.savedmodel/assets\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "INFO:tensorflow:Assets written to: /workspace/data/ensemble/1_predicttensorflowtriton/1/model.savedmodel/assets\n", + "/usr/local/lib/python3.8/dist-packages/merlin/models/tf/utils/tf_utils.py:101: CustomMaskWarning: Custom mask layers require a config and must override get_config. When loading, the custom mask layer must be passed to the custom_objects argument.\n", + " config[key] = tf.keras.utils.serialize_keras_object(maybe_value)\n", + "/usr/local/lib/python3.8/dist-packages/merlin/models/tf/core/combinators.py:288: CustomMaskWarning: Custom mask layers require a config and must override get_config. When loading, the custom mask layer must be passed to the custom_objects argument.\n", + " config[i] = tf.keras.utils.serialize_keras_object(layer)\n", + "/usr/local/lib/python3.8/dist-packages/keras/saving/legacy/saved_model/layer_serialization.py:134: CustomMaskWarning: Custom mask layers require a config and must override get_config. When loading, the custom mask layer must be passed to the custom_objects argument.\n", + " return serialization.serialize_keras_object(obj)\n", + "/usr/local/lib/python3.8/dist-packages/keras/initializers/initializers_v2.py:120: UserWarning: The initializer TruncatedNormal is unseeded and being called multiple times, which will return identical values each time (even if the initializer is unseeded). Please update your code to provide a seed to the initializer, or avoid using the same initalizer instance more than once.\n", + " warnings.warn(\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "WARNING:tensorflow:No training configuration found in save file, so the model was *not* compiled. Compile it manually.\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "WARNING:tensorflow:No training configuration found in save file, so the model was *not* compiled. Compile it manually.\n" + ] + } + ], + "source": [ + "ensemble = Ensemble(inference_operators, wf.input_schema)\n", + "ensemble.export(os.path.join(OUTPUT_DATA_DIR, 'ensemble'));" + ] + }, + { + "cell_type": "markdown", + "id": "264fd1ea", + "metadata": {}, + "source": [ + "After we export the ensemble, we are ready to start the Triton Inference Server.\n", + "\n", + "The server is installed in Merlin Tensorflow and Merlin PyTorch containers. If you are not using one of our containers, then ensure it is installed in your environment. For more information, see the Triton Inference Server [documentation](https://github.com/triton-inference-server/server/blob/r22.03/README.md#documentation).\n", + "\n", + "You can start the server by running the following command:\n", + "\n", + "```tritonserver --model-repository={OUTPUT_DATA_DIR}/ensemble/```\n", + "\n", + "For the --model-repository argument, specify the same value as the `export_path` that you specified previously in the `ensemble.export` method.\n", + "\n", + "After you run the `tritonserver` command, wait until your terminal shows messages like the following example:\n", + "\n", + "I0414 18:29:50.741833 4067 grpc_server.cc:4421] Started GRPCInferenceService at 0.0.0.0:8001
\n", + "I0414 18:29:50.742197 4067 http_server.cc:3113] Started HTTPService at 0.0.0.0:8000
\n", + "I0414 18:29:50.783470 4067 http_server.cc:178] Started Metrics Service at 0.0.0.0:8002\n", + "\n", + "Let us now package our data for inference. We will send 5 rows of data, which corresponds to a single customer journey (session) through the online store. The data will be first processed by the `NVTabular` workflow and subsequentally passed to our transformer model for predicting. " + ] + }, + { + "cell_type": "code", + "execution_count": 24, + "id": "90483210", + "metadata": {}, + "outputs": [], + "source": [ + "# obtaining five rows of data\n", + "df = train.head(5)\n", + "# making sure all the rows correspond to the same online session (have the same `session_id_hash`)\n", + "df['session_id_hash'] = df['session_id_hash'].iloc[0]" + ] + }, + { + "cell_type": "markdown", + "id": "efdf671e", + "metadata": {}, + "source": [ + "Let us now send the data to the Triton Inference Server for inference." + ] + }, + { + "cell_type": "code", + "execution_count": 25, + "id": "d8453048", + "metadata": {}, + "outputs": [], + "source": [ + "from merlin.systems.triton import convert_df_to_triton_input\n", + "import tritonclient.grpc as grpcclient\n", + "\n", + "inputs = convert_df_to_triton_input(wf.input_schema, df)\n", + "\n", + "with grpcclient.InferenceServerClient(\"localhost:8001\") as client:\n", + " response = client.infer('executor_model', inputs)" + ] + }, + { + "cell_type": "markdown", + "id": "913b80e8", + "metadata": {}, + "source": [ + "Let's parse the response." + ] + }, + { + "cell_type": "code", + "execution_count": 26, + "id": "4cc4b046", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "array([[-2.2332087 , -2.1218574 , -2.390479 , ..., -0.7735352 ,\n", + " 0.1954267 , -0.34523243]], dtype=float32)" + ] + }, + "execution_count": 26, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "predictions = response.as_numpy(\"hashed_url_list/categorical_output\")\n", + "predictions" + ] + }, + { + "cell_type": "markdown", + "id": "e49c2ed9", + "metadata": {}, + "source": [ + "The response contains logits predicting the id of the url the customer is most likely to arrive at as next step of their journey through the online store.\n", + "\n", + "Here is the predicted hashed url id:" + ] + }, + { + "cell_type": "code", + "execution_count": 27, + "id": "0b9af2ae", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "34" + ] + }, + "execution_count": 27, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "predicted_hashed_url_id = predictions.argmax()\n", + "predicted_hashed_url_id" + ] + }, + { + "cell_type": "markdown", + "id": "8ef47efd", + "metadata": {}, + "source": [ + "## Summary\n", + "\n", + "We have trained a transformer model for the next item prediction task using language model masking.\n", + "\n", + "For another session-based example that goes deeper into data preprocessing and that covers several advanced techniques (Weight Tying, Temperature Scaling) please see [Session-Based Next Item Prediction for Fashion E-Commerce](https://github.com/NVIDIA-Merlin/models/blob/t4rec_use_case/examples/usecases/ecommerce-session-based-next-item-prediction-for-fashion.ipynb). " + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.10" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/examples/Next-Item-Prediction-with-Transformers/tf/transformers-next-item-prediction.ipynb b/examples/Next-Item-Prediction-with-Transformers/tf/transformers-next-item-prediction.ipynb new file mode 100644 index 000000000..b409170f0 --- /dev/null +++ b/examples/Next-Item-Prediction-with-Transformers/tf/transformers-next-item-prediction.ipynb @@ -0,0 +1,1516 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "id": "a556f660", + "metadata": {}, + "outputs": [], + "source": [ + "# Copyright 2022 NVIDIA Corporation. All Rights Reserved.\n", + "#\n", + "# Licensed under the Apache License, Version 2.0 (the \"License\");\n", + "# you may not use this file except in compliance with the License.\n", + "# You may obtain a copy of the License at\n", + "#\n", + "# http://www.apache.org/licenses/LICENSE-2.0\n", + "#\n", + "# Unless required by applicable law or agreed to in writing, software\n", + "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", + "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", + "# See the License for the specific language governing permissions anda\n", + "# limitations under the License.\n", + "# ==============================================================================\n", + "\n", + "# Each user is responsible for checking the content of datasets and the\n", + "# applicable licenses and determining if suitable for the intended use." + ] + }, + { + "cell_type": "markdown", + "id": "697d1452", + "metadata": {}, + "source": [ + "\n", + "\n", + "# Transformer-based architecture for next-item prediction task\n", + "\n", + "This notebook is created using the latest stable [merlin-tensorflow](https://catalog.ngc.nvidia.com/orgs/nvidia/teams/merlin/containers/merlin-tensorflow/tags) container.\n", + "\n", + "## Overview\n", + "\n", + "In this use case we will train a Transformer-based architecture for next-item prediction task.\n", + "\n", + "**Note, the data for this notebook will be automatically downloaded to the folder specified in the cells below.**\n", + "\n", + "We will use the [booking.com dataset](https://github.com/bookingcom/ml-dataset-mdt) to train a session-based model. The dataset contains 1,166,835 of anonymized hotel reservations in the train set and 378,667 in the test set. Each reservation is a part of a customer's trip (identified by `utrip_id`) which includes consecutive reservations.\n", + "\n", + "We will reshape the data to organize it into 'sessions'. Each session will be a full customer itinerary in chronological order. The goal will be to predict the city_id of the final reservation of each trip.\n", + "\n", + "\n", + "### Learning objectives\n", + "\n", + "- Training a Transformer-based architecture for next-item prediction task" + ] + }, + { + "cell_type": "markdown", + "id": "1cccd005", + "metadata": {}, + "source": [ + "## Downloading and preparing the dataset" + ] + }, + { + "cell_type": "markdown", + "id": "1d0b619b", + "metadata": {}, + "source": [ + "We will download the dataset using a functionality provided by merlin models. The dataset can be found on GitHub [here](https://github.com/bookingcom/ml-dataset-mdt).\n", + "\n", + "**Read more about libraries used in the import statements below**\n", + "\n", + "- [get_lib](https://github.com/NVIDIA-Merlin/core/blob/stable/merlin/core/dispatch.py)\n", + "- [get_booking](https://github.com/NVIDIA-Merlin/models/tree/stable/merlin/datasets/ecommerce)\n", + "- [nvtabular](https://github.com/NVIDIA-Merlin/NVTabular/tree/stable/nvtabular)\n", + "- [nvtabular ops](https://github.com/NVIDIA-Merlin/NVTabular/tree/stable/nvtabular/ops)\n", + "- [schema tags](https://github.com/NVIDIA-Merlin/core/blob/stable/merlin/schema/tags.py)\n", + "- [merlin models tensorflow](https://github.com/NVIDIA-Merlin/models/tree/stable/merlin/models/tf)\n", + "- [get_booking](https://github.com/NVIDIA-Merlin/models/blob/stable/merlin/datasets/ecommerce/booking/dataset.py)" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "40e9ef05", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2023-05-31 06:06:25.697025: I tensorflow/core/platform/cpu_feature_guard.cc:194] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations: SSE3 SSE4.1 SSE4.2 AVX\n", + "To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "WARNING:tensorflow:Please fix your imports. Module tensorflow.python.training.tracking.data_structures has been moved to tensorflow.python.trackable.data_structures. The old module will be deleted in version 2.11.\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/usr/local/lib/python3.8/dist-packages/merlin/dtypes/mappings/torch.py:43: UserWarning: PyTorch dtype mappings did not load successfully due to an error: No module named 'torch'\n", + " warn(f\"PyTorch dtype mappings did not load successfully due to an error: {exc.msg}\")\n", + "2023-05-31 06:06:26.988036: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:998] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero\n", + "2023-05-31 06:06:26.988386: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:998] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero\n", + "2023-05-31 06:06:26.988518: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:998] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[INFO]: sparse_operation_kit is imported\n", + "WARNING:tensorflow:Please fix your imports. Module tensorflow.python.training.tracking.base has been moved to tensorflow.python.trackable.base. The old module will be deleted in version 2.11.\n", + "[SOK INFO] Import /usr/local/lib/python3.8/dist-packages/merlin_sok-1.1.4-py3.8-linux-x86_64.egg/sparse_operation_kit/lib/libsok_experiment.so\n", + "[SOK INFO] Import /usr/local/lib/python3.8/dist-packages/merlin_sok-1.1.4-py3.8-linux-x86_64.egg/sparse_operation_kit/lib/libsok_experiment.so\n", + "[SOK INFO] Initialize finished, communication tool: horovod\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2023-05-31 06:06:28.519868: I tensorflow/core/platform/cpu_feature_guard.cc:194] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations: SSE3 SSE4.1 SSE4.2 AVX\n", + "To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.\n", + "2023-05-31 06:06:28.520815: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:998] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero\n", + "2023-05-31 06:06:28.520999: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:998] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero\n", + "2023-05-31 06:06:28.521129: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:998] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero\n", + "2023-05-31 06:06:28.591345: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:998] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero\n", + "2023-05-31 06:06:28.591534: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:998] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero\n", + "2023-05-31 06:06:28.591665: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:998] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero\n", + "2023-05-31 06:06:28.591770: W tensorflow/core/common_runtime/gpu/gpu_bfc_allocator.cc:42] Overriding orig_value setting because the TF_FORCE_GPU_ALLOW_GROWTH environment variable is set. Original config value was 0.\n", + "2023-05-31 06:06:28.591778: I tensorflow/core/common_runtime/gpu/gpu_process_state.cc:222] Using CUDA malloc Async allocator for GPU: 0\n", + "2023-05-31 06:06:28.591860: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1621] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 24576 MB memory: -> device: 0, name: Quadro RTX 8000, pci bus id: 0000:08:00.0, compute capability: 7.5\n", + "/usr/local/lib/python3.8/dist-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html\n", + " from .autonotebook import tqdm as notebook_tqdm\n" + ] + } + ], + "source": [ + "# Resetting the TF memory allocation to not be 50% by default. \n", + "import os\n", + "os.environ[\"TF_GPU_ALLOCATOR\"]=\"cuda_malloc_async\"\n", + "\n", + "from merlin.core.dispatch import get_lib\n", + "from merlin.datasets.ecommerce import get_booking\n", + "\n", + "import numpy as np\n", + "import timeit\n", + "\n", + "from nvtabular import *\n", + "from nvtabular import ops\n", + "\n", + "from merlin.schema.tags import Tags\n", + "import merlin.models.tf as mm\n", + "\n", + "INPUT_DATA_DIR = os.environ.get('INPUT_DATA_DIR', '/workspace/data')\n", + "OUTPUT_DATA_DIR = os.environ.get('OUTPUT_DATA_DIR', '/workspace/data')\n", + "NUM_EPOCHS = int(os.environ.get('NUM_EPOCHS', '5'))" + ] + }, + { + "cell_type": "markdown", + "id": "c1b42076", + "metadata": {}, + "source": [ + "Let's download the data." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "d0a33352", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/usr/local/lib/python3.8/dist-packages/merlin/schema/tags.py:149: UserWarning: Compound tags like Tags.USER_ID have been deprecated and will be removed in a future version. Please use the atomic versions of these tags, like [, ].\n", + " warnings.warn(\n", + "/usr/local/lib/python3.8/dist-packages/merlin/schema/tags.py:149: UserWarning: Compound tags like Tags.SESSION_ID have been deprecated and will be removed in a future version. Please use the atomic versions of these tags, like [, ].\n", + " warnings.warn(\n", + "/usr/local/lib/python3.8/dist-packages/merlin/schema/tags.py:149: UserWarning: Compound tags like Tags.ITEM_ID have been deprecated and will be removed in a future version. Please use the atomic versions of these tags, like [, ].\n", + " warnings.warn(\n" + ] + }, + { + "data": { + "text/plain": [ + "(,\n", + " )" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "get_booking(INPUT_DATA_DIR)" + ] + }, + { + "cell_type": "markdown", + "id": "ee9dd8c8", + "metadata": {}, + "source": [ + "Each reservation has a unique utrip_id. During each trip a customer vists several destinations." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "01d1b755", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + " user_id checkin checkout city_id device_class affiliate_id \\\n", + "0 1000027 2016-08-13 2016-08-14 8183 desktop 7168 \n", + "1 1000027 2016-08-14 2016-08-16 15626 desktop 7168 \n", + "2 1000027 2016-08-16 2016-08-18 60902 desktop 7168 \n", + "3 1000027 2016-08-18 2016-08-21 30628 desktop 253 \n", + "4 1000033 2016-04-09 2016-04-11 38677 mobile 359 \n", + "\n", + " booker_country hotel_country utrip_id \n", + "0 Elbonia Gondal 1000027_1 \n", + "1 Elbonia Gondal 1000027_1 \n", + "2 Elbonia Gondal 1000027_1 \n", + "3 Elbonia Gondal 1000027_1 \n", + "4 Gondal Cobra Island 1000033_1 \n" + ] + } + ], + "source": [ + "# When displaying cudf dataframes use print() or display(), otherwise Jupyter creates hidden copies.\n", + "train = get_lib().read_csv(f'{INPUT_DATA_DIR}/train_set.csv', parse_dates=['checkin', 'checkout'])\n", + "print(train.head())" + ] + }, + { + "cell_type": "markdown", + "id": "fecc2d94", + "metadata": {}, + "source": [ + "We will train on sequences of `city_id` and `booker_country` and based on this information, our model will attempt to predict the next `city_id` (the next hop in the journey).\n", + "\n", + "We will train a transformer model that can work with sequences of variable length within a batch. This functionality is provided to us out of the box and doesn't require any changes to the architecture. Thanks to it we do not have to pad or trim our sequences to any particular length -- our model can make effective use of all of the data!\n", + "\n", + "*With one exception.* For a masked language model that we will be training, we need to discard sequences that are shorter than two hops. This makes sense as there is nothing our model could learn if it was only presented with an itinerary with a single destination on it!\n", + "\n", + "Let us begin by splitting the data into a train and validation set based on trip ID.\n", + "\n", + "Let's see how many unique trips there are in the dataset. Also, let us shuffle the trips along the way so that our validation set consists of a random sample of our train set." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "23bef6ae", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Number of unique trips is : 217686\n" + ] + } + ], + "source": [ + "# Unique trip ids.\n", + "utrip_ids = train.sample(frac=1).utrip_id.unique()\n", + "print('Number of unique trips is :', len(utrip_ids))" + ] + }, + { + "cell_type": "markdown", + "id": "f7eca1f6", + "metadata": {}, + "source": [ + "Now let's assign data to our train and validation sets. Furthermore, we sort the data by `utrip_id` and `checkin`. This way we ensure our sequences of visited `city_ids` will be in proper order!\n", + "\n", + "Also, let's remove trips where only a single city was visited as they cannot be modeled as a sequence." + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "7754847c", + "metadata": {}, + "outputs": [], + "source": [ + "train = get_lib().from_pandas(\n", + " train.to_pandas().join(train.to_pandas().groupby('utrip_id').size().rename('num_examples'), on='utrip_id')\n", + ")\n", + "train = train[train.num_examples > 1]\n", + "\n", + "train.checkin = train.checkin.astype('int')\n", + "train.checkout = train.checkout.astype('int')\n", + "\n", + "train_set_utrip_ids = utrip_ids[:int(0.8 * utrip_ids.shape[0])]\n", + "validation_set_utrip_ids = utrip_ids[int(0.8 * utrip_ids.shape[0]):]\n", + "\n", + "train_set = train[train.utrip_id.isin(train_set_utrip_ids)].sort_values(['utrip_id', 'checkin'])\n", + "validation_set = train[train.utrip_id.isin(validation_set_utrip_ids)].sort_values(['utrip_id', 'checkin'])" + ] + }, + { + "cell_type": "markdown", + "id": "79cc3992", + "metadata": {}, + "source": [ + "## Preprocessing with NVTabular\n", + "\n", + "We can now begin with data preprocessing.\n", + "\n", + "We will combine trips into \"sessions\", discard trips that are too short and calculate total trip length.\n", + "\n", + "We will use NVTabular for this work. It offers optimized tabular data preprocessing operators that run on the GPU. If you would like to learn more about the NVTabular library, please take a look [here](https://github.com/NVIDIA-Merlin/NVTabular).\n", + "\n", + "Read more about the [Merlin's Dataset API](https://github.com/NVIDIA-Merlin/core/blob/stable/merlin/io/dataset.py) \n", + "Read more about how [parquet files are read in and processed by Merlin](https://github.com/NVIDIA-Merlin/core/blob/stable/merlin/io/parquet.py) \n", + "Read more about [Tags](https://github.com/NVIDIA-Merlin/core/blob/stable/merlin/schema/tags.py) \n", + "- [schema_select_by_tag](https://github.com/NVIDIA-Merlin/core/blob/stable/merlin/schema/schema.py) \n", + "\n", + "Read more about [NVTabular Workflows](https://github.com/NVIDIA-Merlin/NVTabular/blob/stable/nvtabular/workflow/workflow.py) \n", + "- [fit_transform](https://github.com/NVIDIA-Merlin/NVTabular/blob/stable/nvtabular/workflow/workflow.py)\n", + "- [transform](https://github.com/NVIDIA-Merlin/NVTabular/blob/stable/nvtabular/workflow/workflow.py) \n", + "\n", + "Read more about the [NVTabular Operators]() \n", + "- [Categorify](https://github.com/NVIDIA-Merlin/NVTabular/blob/stable/nvtabular/ops/categorify.py)\n", + "- [AddTags](https://github.com/NVIDIA-Merlin/NVTabular/blob/stable/nvtabular/ops/add_metadata.py)\n", + "- [LambdaOp](https://github.com/NVIDIA-Merlin/NVTabular/blob/stable/nvtabular/ops/lambdaop.py)\n", + "- [Rename](https://github.com/NVIDIA-Merlin/NVTabular/blob/stable/nvtabular/ops/rename.py)\n", + "- [Filter](https://github.com/NVIDIA-Merlin/NVTabular/blob/stable/nvtabular/ops/filter.py)\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "3435af68", + "metadata": {}, + "outputs": [], + "source": [ + "train_set_dataset = Dataset(train_set)\n", + "validation_set_dataset = Dataset(validation_set)" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "60bd5e59", + "metadata": {}, + "outputs": [], + "source": [ + "weekday_checkin = (\n", + " [\"checkin\"]\n", + " >> ops.LambdaOp(lambda col: get_lib().to_datetime(col).dt.weekday)\n", + " >> ops.Rename(name=\"weekday_checkin\")\n", + ")\n", + "\n", + "weekday_checkout = (\n", + " [\"checkout\"]\n", + " >> ops.LambdaOp(lambda col: get_lib().to_datetime(col).dt.weekday)\n", + " >> ops.Rename(name=\"weekday_checkout\")\n", + ")\n", + "\n", + "categorical_features = (['city_id', 'booker_country', 'hotel_country'] +\n", + " weekday_checkin + weekday_checkout\n", + " ) >> ops.Categorify()\n", + "\n", + "groupby_features = categorical_features + ['utrip_id', 'checkin'] >> ops.Groupby(\n", + " groupby_cols=['utrip_id'],\n", + " aggs={\n", + " 'city_id': ['list', 'count'],\n", + " 'booker_country': ['list'],\n", + " 'hotel_country': ['list'],\n", + " 'weekday_checkin': ['list'],\n", + " 'weekday_checkout': ['list']\n", + " },\n", + " sort_cols=\"checkin\"\n", + ")\n", + "\n", + "list_features = (\n", + " groupby_features['city_id_list', 'booker_country_list', 'hotel_country_list', \n", + " 'weekday_checkin_list', 'weekday_checkout_list'\n", + " ] >> ops.AddTags([Tags.SEQUENCE])\n", + ")\n", + "\n", + "# Filter out sessions with less than 2 interactions \n", + "MINIMUM_SESSION_LENGTH = 2\n", + "features = list_features + (groupby_features['city_id_count'] >> ops.AddTags([Tags.CONTINUOUS]))\n", + "filtered_sessions = features >> ops.Filter(f=lambda df: df[\"city_id_count\"] >= MINIMUM_SESSION_LENGTH) " + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "6105767a", + "metadata": {}, + "outputs": [], + "source": [ + "wf = Workflow(filtered_sessions)\n", + "\n", + "wf.fit_transform(train_set_dataset).to_parquet(os.path.join(OUTPUT_DATA_DIR, 'train_processed.parquet'))\n", + "wf.transform(validation_set_dataset).to_parquet(os.path.join(OUTPUT_DATA_DIR, 'validation_processed.parquet'))\n", + "\n", + "wf.save(os.path.join(OUTPUT_DATA_DIR, 'workflow'))" + ] + }, + { + "cell_type": "markdown", + "id": "539a6675", + "metadata": {}, + "source": [ + "Our data consists of a sequence of visited `city_ids`, a sequence of `booker_countries` (represented as integer categories) and a `city_id_count` column (which contains the count of visited cities in a trip)." + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "2dee6b53", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
city_id_listbooker_country_listhotel_country_listweekday_checkin_listweekday_checkout_listcity_id_count
0[8238, 156, 2278, 2097][3, 3, 3, 3][3, 3, 3, 3][5, 7, 4, 3][7, 4, 2, 7]4
1[63, 1160, 87, 618, 63][1, 1, 1, 1, 1][1, 1, 1, 1, 1][5, 1, 4, 3, 5][6, 4, 2, 5, 4]5
2[7, 6, 24, 1050, 65, 52, 3][2, 2, 2, 2, 2, 2, 2][2, 2, 2, 16, 16, 3, 3][5, 1, 2, 6, 5, 7, 4][6, 3, 1, 5, 7, 4, 3]7
3[1032, 757, 140, 3][2, 2, 2, 2][19, 19, 19, 3][1, 4, 2, 3][4, 3, 2, 5]4
4[3603, 262, 662, 250, 359][1, 1, 1, 1, 1][30, 30, 30, 30, 30][1, 3, 6, 5, 1][2, 1, 5, 6, 3]5
\n", + "
" + ], + "text/plain": [ + " city_id_list booker_country_list \\\n", + "0 [8238, 156, 2278, 2097] [3, 3, 3, 3] \n", + "1 [63, 1160, 87, 618, 63] [1, 1, 1, 1, 1] \n", + "2 [7, 6, 24, 1050, 65, 52, 3] [2, 2, 2, 2, 2, 2, 2] \n", + "3 [1032, 757, 140, 3] [2, 2, 2, 2] \n", + "4 [3603, 262, 662, 250, 359] [1, 1, 1, 1, 1] \n", + "\n", + " hotel_country_list weekday_checkin_list weekday_checkout_list \\\n", + "0 [3, 3, 3, 3] [5, 7, 4, 3] [7, 4, 2, 7] \n", + "1 [1, 1, 1, 1, 1] [5, 1, 4, 3, 5] [6, 4, 2, 5, 4] \n", + "2 [2, 2, 2, 16, 16, 3, 3] [5, 1, 2, 6, 5, 7, 4] [6, 3, 1, 5, 7, 4, 3] \n", + "3 [19, 19, 19, 3] [1, 4, 2, 3] [4, 3, 2, 5] \n", + "4 [30, 30, 30, 30, 30] [1, 3, 6, 5, 1] [2, 1, 5, 6, 3] \n", + "\n", + " city_id_count \n", + "0 4 \n", + "1 5 \n", + "2 7 \n", + "3 4 \n", + "4 5 " + ] + }, + "execution_count": 10, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "Dataset(os.path.join(OUTPUT_DATA_DIR, 'train_processed.parquet')).head()" + ] + }, + { + "cell_type": "markdown", + "id": "e89cc3a0", + "metadata": {}, + "source": [ + "We are now ready to train our model." + ] + }, + { + "cell_type": "markdown", + "id": "ce95c794", + "metadata": {}, + "source": [ + "Here is the schema of the data that our model will use." + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "c4813456", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
nametagsdtypeis_listis_raggedproperties.num_bucketsproperties.freq_thresholdproperties.max_sizeproperties.start_indexproperties.cat_pathproperties.domain.minproperties.domain.maxproperties.domain.nameproperties.embedding_sizes.cardinalityproperties.embedding_sizes.dimensionproperties.value_count.minproperties.value_count.max
0city_id_list(Tags.SEQUENCE, Tags.CATEGORICAL)DType(name='int64', element_type=<ElementType....TrueTrueNone000.//categories/unique.city_id.parquet037202city_id372035120None
1booker_country_list(Tags.SEQUENCE, Tags.CATEGORICAL)DType(name='int64', element_type=<ElementType....TrueTrueNone000.//categories/unique.booker_country.parquet05booker_country6160None
2hotel_country_list(Tags.SEQUENCE, Tags.CATEGORICAL)DType(name='int64', element_type=<ElementType....TrueTrueNone000.//categories/unique.hotel_country.parquet0194hotel_country195310None
3weekday_checkin_list(Tags.SEQUENCE, Tags.CATEGORICAL)DType(name='int64', element_type=<ElementType....TrueTrueNone000.//categories/unique.weekday_checkin.parquet07weekday_checkin8160None
4weekday_checkout_list(Tags.SEQUENCE, Tags.CATEGORICAL)DType(name='int64', element_type=<ElementType....TrueTrueNone000.//categories/unique.weekday_checkout.parquet07weekday_checkout8160None
\n", + "
" + ], + "text/plain": [ + "[{'name': 'city_id_list', 'tags': {, }, 'properties': {'num_buckets': None, 'freq_threshold': 0, 'max_size': 0, 'start_index': 0, 'cat_path': './/categories/unique.city_id.parquet', 'domain': {'min': 0, 'max': 37202, 'name': 'city_id'}, 'embedding_sizes': {'cardinality': 37203, 'dimension': 512}, 'value_count': {'min': 0, 'max': None}}, 'dtype': DType(name='int64', element_type=, element_size=64, element_unit=None, signed=True, shape=Shape(dims=(Dimension(min=0, max=None), Dimension(min=0, max=None)))), 'is_list': True, 'is_ragged': True}, {'name': 'booker_country_list', 'tags': {, }, 'properties': {'num_buckets': None, 'freq_threshold': 0, 'max_size': 0, 'start_index': 0, 'cat_path': './/categories/unique.booker_country.parquet', 'domain': {'min': 0, 'max': 5, 'name': 'booker_country'}, 'embedding_sizes': {'cardinality': 6, 'dimension': 16}, 'value_count': {'min': 0, 'max': None}}, 'dtype': DType(name='int64', element_type=, element_size=64, element_unit=None, signed=True, shape=Shape(dims=(Dimension(min=0, max=None), Dimension(min=0, max=None)))), 'is_list': True, 'is_ragged': True}, {'name': 'hotel_country_list', 'tags': {, }, 'properties': {'num_buckets': None, 'freq_threshold': 0, 'max_size': 0, 'start_index': 0, 'cat_path': './/categories/unique.hotel_country.parquet', 'domain': {'min': 0, 'max': 194, 'name': 'hotel_country'}, 'embedding_sizes': {'cardinality': 195, 'dimension': 31}, 'value_count': {'min': 0, 'max': None}}, 'dtype': DType(name='int64', element_type=, element_size=64, element_unit=None, signed=True, shape=Shape(dims=(Dimension(min=0, max=None), Dimension(min=0, max=None)))), 'is_list': True, 'is_ragged': True}, {'name': 'weekday_checkin_list', 'tags': {, }, 'properties': {'num_buckets': None, 'freq_threshold': 0, 'max_size': 0, 'start_index': 0, 'cat_path': './/categories/unique.weekday_checkin.parquet', 'domain': {'min': 0, 'max': 7, 'name': 'weekday_checkin'}, 'embedding_sizes': {'cardinality': 8, 'dimension': 16}, 'value_count': {'min': 0, 'max': None}}, 'dtype': DType(name='int64', element_type=, element_size=64, element_unit=None, signed=True, shape=Shape(dims=(Dimension(min=0, max=None), Dimension(min=0, max=None)))), 'is_list': True, 'is_ragged': True}, {'name': 'weekday_checkout_list', 'tags': {, }, 'properties': {'num_buckets': None, 'freq_threshold': 0, 'max_size': 0, 'start_index': 0, 'cat_path': './/categories/unique.weekday_checkout.parquet', 'domain': {'min': 0, 'max': 7, 'name': 'weekday_checkout'}, 'embedding_sizes': {'cardinality': 8, 'dimension': 16}, 'value_count': {'min': 0, 'max': None}}, 'dtype': DType(name='int64', element_type=, element_size=64, element_unit=None, signed=True, shape=Shape(dims=(Dimension(min=0, max=None), Dimension(min=0, max=None)))), 'is_list': True, 'is_ragged': True}]" + ] + }, + "execution_count": 11, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "seq_schema = Workflow.load(os.path.join(OUTPUT_DATA_DIR, 'workflow')).output_schema.select_by_tag(Tags.SEQUENCE)\n", + "seq_schema" + ] + }, + { + "cell_type": "markdown", + "id": "8d422833", + "metadata": {}, + "source": [ + "Let's also identify the target column." + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "2b90424a", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "'city_id_list'" + ] + }, + "execution_count": 12, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "target = Workflow.load(os.path.join(OUTPUT_DATA_DIR, 'workflow')).output_schema.select_by_tag(Tags.SEQUENCE).column_names[0]\n", + "target" + ] + }, + { + "cell_type": "markdown", + "id": "e9d8adad", + "metadata": {}, + "source": [ + "## Constructing the model" + ] + }, + { + "cell_type": "markdown", + "id": "c4cb17fe", + "metadata": {}, + "source": [ + "Let's construct our model.\n", + "\n", + "We can specify various hyperparameters, such as the number of heads and number of layers to use." + ] + }, + { + "cell_type": "markdown", + "id": "0a460e4c", + "metadata": {}, + "source": [ + "For the transformer portion of our model, we will use the `XLNet` architecture." + ] + }, + { + "cell_type": "markdown", + "id": "23bf02dc", + "metadata": {}, + "source": [ + "Later, when we run the `fit` method on our model, we will specify the `masking_probability` of `0.3` and link it to the transformer block defined in out model. Through the combination of these parameters, our model will train on sequences where any given timestep will be masked with a probability of 0.3 and it will be our model's training task to infer the target value for that step!\n", + "\n", + "To summarize, Masked Language Modeling is implemented by:\n", + "\n", + "* `SequenceMaskRandom()` - Used as a pre for model.fit(), it randomly selects items from the sequence to be masked for prediction as targets, by using Keras masking. This block also adds the necessary configuration to the specified `transformer` block so as it\n", + "is pre-configured with the necessary layers needed to prepare the inputs to the HuggingFace transformer layer and to post-process its outputs. For example, one pre-processing operation is to replace the input embeddings at masked positions for prediction by a dummy trainable embedding, to avoid leakage of the targets.\n", + "\n", + "\n", + "**Read more about the apis used to construct models** \n", + "- [blocks](https://github.com/NVIDIA-Merlin/models/tree/stable/merlin/models/tf/blocks)\n", + "- [MLPBlock](https://github.com/NVIDIA-Merlin/models/blob/stable/merlin/models/tf/blocks/mlp.py)\n", + "- [InputBlockV2](https://github.com/NVIDIA-Merlin/models/blob/stable/merlin/models/tf/inputs/base.py)\n", + "- [Embeddings](https://github.com/NVIDIA-Merlin/models/blob/stable/merlin/models/tf/inputs/embedding.py)\n", + "- [XLNetBlock](https://github.com/NVIDIA-Merlin/models/blob/stable/merlin/models/tf/transformers/block.py)\n", + "- [CategoricalOutput](https://github.com/NVIDIA-Merlin/models/blob/stable/merlin/models/tf/outputs/classification.py)\n", + "- [.schema.select_by_name](https://github.com/NVIDIA-Merlin/core/blob/stable/merlin/schema/schema.py)\n", + "- [.schema.select_by_tag](https://github.com/NVIDIA-Merlin/core/blob/stable/merlin/schema/schema.py)\n", + "- [model.compile()](https://github.com/NVIDIA-Merlin/models/blob/stable/merlin/models/tf/models/base.py)\n", + "- [model.fit()](https://github.com/NVIDIA-Merlin/models/blob/stable/merlin/models/tf/models/base.py)\n", + "- [model.evaluate()](https://github.com/NVIDIA-Merlin/models/blob/stable/merlin/models/tf/models/base.py)\n", + "- [mm.SequenceMaskRandom](https://github.com/NVIDIA-Merlin/models/blob/stable/merlin/models/tf/transforms/sequence.py)\n", + "- [mm.SequenceMaskLast](https://github.com/NVIDIA-Merlin/models/blob/stable/merlin/models/tf/transforms/sequence.py)" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "cddfd424", + "metadata": {}, + "outputs": [], + "source": [ + "dmodel=48\n", + "mlp_block = mm.MLPBlock(\n", + " [128,dmodel],\n", + " activation='relu',\n", + " no_activation_last_layer=True,\n", + " )\n", + "transformer_block = mm.XLNetBlock(d_model=dmodel, n_head=4, n_layer=2)\n", + "model = mm.Model(\n", + " mm.InputBlockV2(\n", + " seq_schema,\n", + " embeddings=mm.Embeddings(\n", + " Workflow.load(os.path.join(OUTPUT_DATA_DIR, 'workflow')).output_schema.select_by_tag(Tags.CATEGORICAL), sequence_combiner=None\n", + " ),\n", + " ),\n", + " mlp_block,\n", + " transformer_block,\n", + " mm.CategoricalOutput(\n", + " Workflow.load(os.path.join(OUTPUT_DATA_DIR, 'workflow')).output_schema.select_by_name(target),\n", + " default_loss=\"categorical_crossentropy\",\n", + " ),\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "aac975cd", + "metadata": {}, + "source": [ + "## Model training" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "65d28c27", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/usr/local/lib/python3.8/dist-packages/keras/initializers/initializers_v2.py:120: UserWarning: The initializer TruncatedNormal is unseeded and being called multiple times, which will return identical values each time (even if the initializer is unseeded). Please update your code to provide a seed to the initializer, or avoid using the same initializer instance more than once.\n", + " warnings.warn(\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Epoch 1/5\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2023-05-31 06:06:44.034041: I tensorflow/compiler/xla/stream_executor/cuda/cuda_dnn.cc:428] Loaded cuDNN version 8700\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "WARNING:tensorflow:Gradients do not exist for variables ['model/mask_emb:0', 'transformer/layer_._0/rel_attn/r_s_bias:0', 'transformer/layer_._0/rel_attn/seg_embed:0', 'transformer/layer_._1/rel_attn/r_s_bias:0', 'transformer/layer_._1/rel_attn/seg_embed:0'] when minimizing the loss. If you're using `model.compile()`, did you forget to provide a `loss` argument?\n", + "WARNING:tensorflow:Gradients do not exist for variables ['model/mask_emb:0', 'transformer/layer_._0/rel_attn/r_s_bias:0', 'transformer/layer_._0/rel_attn/seg_embed:0', 'transformer/layer_._1/rel_attn/r_s_bias:0', 'transformer/layer_._1/rel_attn/seg_embed:0'] when minimizing the loss. If you're using `model.compile()`, did you forget to provide a `loss` argument?\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2023-05-31 06:06:54.541024: W tensorflow/core/grappler/optimizers/loop_optimizer.cc:907] Skipping loop optimization for Merge node with control input: model/xl_net_block/sequential_block_5/replace_masked_embeddings/RaggedWhere/Assert/AssertGuard/branch_executed/_95\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "2720/2720 [==============================] - 81s 25ms/step - loss: 7.3315 - recall_at_10: 0.1973 - mrr_at_10: 0.0863 - ndcg_at_10: 0.1123 - map_at_10: 0.0863 - precision_at_10: 0.0197 - regularization_loss: 0.0000e+00 - loss_batch: 7.3306\n", + "Epoch 2/5\n", + "2720/2720 [==============================] - 70s 25ms/step - loss: 6.0979 - recall_at_10: 0.3633 - mrr_at_10: 0.1707 - ndcg_at_10: 0.2161 - map_at_10: 0.1707 - precision_at_10: 0.0363 - regularization_loss: 0.0000e+00 - loss_batch: 6.0950\n", + "Epoch 3/5\n", + "2720/2720 [==============================] - 71s 26ms/step - loss: 5.5827 - recall_at_10: 0.4306 - mrr_at_10: 0.2056 - ndcg_at_10: 0.2588 - map_at_10: 0.2056 - precision_at_10: 0.0431 - regularization_loss: 0.0000e+00 - loss_batch: 5.5806\n", + "Epoch 4/5\n", + "2720/2720 [==============================] - 72s 26ms/step - loss: 5.3211 - recall_at_10: 0.4627 - mrr_at_10: 0.2213 - ndcg_at_10: 0.2784 - map_at_10: 0.2213 - precision_at_10: 0.0463 - regularization_loss: 0.0000e+00 - loss_batch: 5.3194\n", + "Epoch 5/5\n", + "2720/2720 [==============================] - 71s 26ms/step - loss: 5.1920 - recall_at_10: 0.4787 - mrr_at_10: 0.2306 - ndcg_at_10: 0.2892 - map_at_10: 0.2306 - precision_at_10: 0.0479 - regularization_loss: 0.0000e+00 - loss_batch: 5.1903\n" + ] + }, + { + "data": { + "text/plain": [ + "" + ] + }, + "execution_count": 14, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "model.compile(run_eagerly=False, optimizer='adam', loss=\"categorical_crossentropy\")\n", + "\n", + "model.fit(\n", + " Dataset(os.path.join(OUTPUT_DATA_DIR, 'train_processed.parquet')),\n", + " batch_size=64,\n", + " epochs=NUM_EPOCHS,\n", + " pre=mm.SequenceMaskRandom(schema=seq_schema, target=target, masking_prob=0.3, transformer=transformer_block)\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "24699106", + "metadata": {}, + "source": [ + "## Model evaluation" + ] + }, + { + "cell_type": "markdown", + "id": "73d87d27", + "metadata": {}, + "source": [ + "We have trained our model.\n", + "\n", + "But in training the metrics come from a masked language modelling task. A portion of steps in the sequence was masked for each example. The metrics were calculated on this task.\n", + "\n", + "In reality, we probably care how well our model does on the next item prediction task (as it mimics the scenario in which the model would be likely to be used).\n", + "\n", + "Let's measure the performance of the model on a task where it attempts to predict the last item in a sequence.\n", + "\n", + "We will mask the last item using `SequenceMaskLast` and run inference." + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "bb3c6358", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2023-05-31 06:12:51.968982: W tensorflow/core/grappler/optimizers/loop_optimizer.cc:907] Skipping loop optimization for Merge node with control input: model/xl_net_block/sequential_block_5/replace_masked_embeddings/RaggedWhere/Assert/AssertGuard/branch_executed/_74\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "340/340 [==============================] - 11s 20ms/step - loss: 4.7151 - recall_at_10: 0.5533 - mrr_at_10: 0.3083 - ndcg_at_10: 0.3665 - map_at_10: 0.3083 - precision_at_10: 0.0553 - regularization_loss: 0.0000e+00 - loss_batch: 4.7149\n" + ] + } + ], + "source": [ + "metrics = model.evaluate(\n", + " Dataset(os.path.join(OUTPUT_DATA_DIR, 'validation_processed.parquet')),\n", + " batch_size=128,\n", + " pre=mm.SequenceMaskLast(schema=seq_schema, target=target, transformer=transformer_block),\n", + " return_dict=True\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "id": "83ca276f", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'loss': 4.715089797973633,\n", + " 'recall_at_10': 0.5533444881439209,\n", + " 'mrr_at_10': 0.30831339955329895,\n", + " 'ndcg_at_10': 0.36654922366142273,\n", + " 'map_at_10': 0.30831339955329895,\n", + " 'precision_at_10': 0.055334459990262985,\n", + " 'regularization_loss': 0.0,\n", + " 'loss_batch': 4.635858535766602}" + ] + }, + "execution_count": 16, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "metrics" + ] + }, + { + "cell_type": "markdown", + "id": "9fb5cc29", + "metadata": {}, + "source": [ + "## Serving predictions using the Triton Inference Server" + ] + }, + { + "cell_type": "markdown", + "id": "9dc6ee5f", + "metadata": {}, + "source": [ + "Now, we will serve our trained models on [NVIDIA Triton Inference Server (TIS)](https://github.com/triton-inference-server/server). TIS is an open-source inference serving software that helps standardize model deployment and execution and delivers fast and scalable AI in production. To serve recommender models on TIS easily, NVIDIA Merlin team designed and developed [the Merlin Systems library](https://github.com/NVIDIA-Merlin/systems). Merlin Systems provides tools and operators to be able to serve end-to-end recommender systems pipelines on TIS easily\n", + "\n", + "In order to perform inference on the Triton Inference Server, we need to output the inference operators to disk.\n", + "\n", + "The inference operators form an `Ensemble`, which is a pipeline that takes in raw data, processes it using NVTabular, and finally outputs predictions from the model that we trained.\n", + "\n", + "Let's write the `Ensemble` to disk (we will later load it on Triton to perform inference)." + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "id": "7ae33813", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "WARNING:tensorflow:Skipping full serialization of Keras layer TFSharedEmbeddings(\n", + " (_feature_shapes): Dict(\n", + " (city_id_list): TensorShape([64, None, 1])\n", + " (booker_country_list): TensorShape([64, None, 1])\n", + " (hotel_country_list): TensorShape([64, None, 1])\n", + " (weekday_checkin_list): TensorShape([64, None, 1])\n", + " (weekday_checkout_list): TensorShape([64, None, 1])\n", + " )\n", + " (_feature_dtypes): Dict(\n", + " (city_id_list): tf.int64\n", + " (booker_country_list): tf.int64\n", + " (hotel_country_list): tf.int64\n", + " (weekday_checkin_list): tf.int64\n", + " (weekday_checkout_list): tf.int64\n", + " )\n", + "), because it is not built.\n", + "WARNING:tensorflow:Skipping full serialization of Keras layer Dropout(\n", + " (_feature_shapes): Dict(\n", + " (city_id_list): TensorShape([64, None, 1])\n", + " (booker_country_list): TensorShape([64, None, 1])\n", + " (hotel_country_list): TensorShape([64, None, 1])\n", + " (weekday_checkin_list): TensorShape([64, None, 1])\n", + " (weekday_checkout_list): TensorShape([64, None, 1])\n", + " )\n", + " (_feature_dtypes): Dict(\n", + " (city_id_list): tf.int64\n", + " (booker_country_list): tf.int64\n", + " (hotel_country_list): tf.int64\n", + " (weekday_checkin_list): tf.int64\n", + " (weekday_checkout_list): tf.int64\n", + " )\n", + "), because it is not built.\n", + "WARNING:tensorflow:Skipping full serialization of Keras layer Dropout(\n", + " (_feature_shapes): Dict(\n", + " (city_id_list): TensorShape([64, None, 1])\n", + " (booker_country_list): TensorShape([64, None, 1])\n", + " (hotel_country_list): TensorShape([64, None, 1])\n", + " (weekday_checkin_list): TensorShape([64, None, 1])\n", + " (weekday_checkout_list): TensorShape([64, None, 1])\n", + " )\n", + " (_feature_dtypes): Dict(\n", + " (city_id_list): tf.int64\n", + " (booker_country_list): tf.int64\n", + " (hotel_country_list): tf.int64\n", + " (weekday_checkin_list): tf.int64\n", + " (weekday_checkout_list): tf.int64\n", + " )\n", + "), because it is not built.\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "WARNING:absl:Found untraced functions such as model_context_layer_call_fn, model_context_layer_call_and_return_conditional_losses, sequence_mask_random_layer_call_fn, sequence_mask_random_layer_call_and_return_conditional_losses, sequence_mask_last_layer_call_fn while saving (showing 5 of 108). These functions will not be directly callable after loading.\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "INFO:tensorflow:Assets written to: /tmp/tmp1sakw940/model.savedmodel/assets\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "INFO:tensorflow:Assets written to: /tmp/tmp1sakw940/model.savedmodel/assets\n", + "/usr/local/lib/python3.8/dist-packages/merlin/models/tf/utils/tf_utils.py:101: CustomMaskWarning: Custom mask layers require a config and must override get_config. When loading, the custom mask layer must be passed to the custom_objects argument.\n", + " config[key] = tf.keras.utils.serialize_keras_object(maybe_value)\n", + "/usr/local/lib/python3.8/dist-packages/merlin/models/tf/core/combinators.py:288: CustomMaskWarning: Custom mask layers require a config and must override get_config. When loading, the custom mask layer must be passed to the custom_objects argument.\n", + " config[i] = tf.keras.utils.serialize_keras_object(layer)\n", + "/usr/local/lib/python3.8/dist-packages/keras/saving/legacy/saved_model/layer_serialization.py:134: CustomMaskWarning: Custom mask layers require a config and must override get_config. When loading, the custom mask layer must be passed to the custom_objects argument.\n", + " return serialization.serialize_keras_object(obj)\n", + "/usr/local/lib/python3.8/dist-packages/keras/initializers/initializers_v2.py:120: UserWarning: The initializer TruncatedNormal is unseeded and being called multiple times, which will return identical values each time (even if the initializer is unseeded). Please update your code to provide a seed to the initializer, or avoid using the same initializer instance more than once.\n", + " warnings.warn(\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "WARNING:tensorflow:No training configuration found in save file, so the model was *not* compiled. Compile it manually.\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "WARNING:tensorflow:No training configuration found in save file, so the model was *not* compiled. Compile it manually.\n", + "/usr/local/lib/python3.8/dist-packages/keras/initializers/initializers_v2.py:120: UserWarning: The initializer TruncatedNormal is unseeded and being called multiple times, which will return identical values each time (even if the initializer is unseeded). Please update your code to provide a seed to the initializer, or avoid using the same initializer instance more than once.\n", + " warnings.warn(\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "WARNING:tensorflow:No training configuration found in save file, so the model was *not* compiled. Compile it manually.\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "WARNING:tensorflow:No training configuration found in save file, so the model was *not* compiled. Compile it manually.\n", + "/usr/local/lib/python3.8/dist-packages/merlin/systems/dag/node.py:100: UserWarning: Operator 'TransformWorkflow' is producing the output column 'city_id_count', which is not being used by any downstream operator in the ensemble graph.\n", + " warnings.warn(\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "WARNING:tensorflow:Skipping full serialization of Keras layer TFSharedEmbeddings(\n", + " (_feature_shapes): Dict(\n", + " (city_id_list): TensorShape([64, None, 1])\n", + " (booker_country_list): TensorShape([64, None, 1])\n", + " (hotel_country_list): TensorShape([64, None, 1])\n", + " (weekday_checkin_list): TensorShape([64, None, 1])\n", + " (weekday_checkout_list): TensorShape([64, None, 1])\n", + " )\n", + " (_feature_dtypes): Dict(\n", + " (city_id_list): tf.int64\n", + " (booker_country_list): tf.int64\n", + " (hotel_country_list): tf.int64\n", + " (weekday_checkin_list): tf.int64\n", + " (weekday_checkout_list): tf.int64\n", + " )\n", + "), because it is not built.\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "WARNING:tensorflow:Skipping full serialization of Keras layer TFSharedEmbeddings(\n", + " (_feature_shapes): Dict(\n", + " (city_id_list): TensorShape([64, None, 1])\n", + " (booker_country_list): TensorShape([64, None, 1])\n", + " (hotel_country_list): TensorShape([64, None, 1])\n", + " (weekday_checkin_list): TensorShape([64, None, 1])\n", + " (weekday_checkout_list): TensorShape([64, None, 1])\n", + " )\n", + " (_feature_dtypes): Dict(\n", + " (city_id_list): tf.int64\n", + " (booker_country_list): tf.int64\n", + " (hotel_country_list): tf.int64\n", + " (weekday_checkin_list): tf.int64\n", + " (weekday_checkout_list): tf.int64\n", + " )\n", + "), because it is not built.\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "WARNING:tensorflow:Skipping full serialization of Keras layer Dropout(\n", + " (_feature_shapes): Dict(\n", + " (city_id_list): TensorShape([64, None, 1])\n", + " (booker_country_list): TensorShape([64, None, 1])\n", + " (hotel_country_list): TensorShape([64, None, 1])\n", + " (weekday_checkin_list): TensorShape([64, None, 1])\n", + " (weekday_checkout_list): TensorShape([64, None, 1])\n", + " )\n", + " (_feature_dtypes): Dict(\n", + " (city_id_list): tf.int64\n", + " (booker_country_list): tf.int64\n", + " (hotel_country_list): tf.int64\n", + " (weekday_checkin_list): tf.int64\n", + " (weekday_checkout_list): tf.int64\n", + " )\n", + "), because it is not built.\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "WARNING:tensorflow:Skipping full serialization of Keras layer Dropout(\n", + " (_feature_shapes): Dict(\n", + " (city_id_list): TensorShape([64, None, 1])\n", + " (booker_country_list): TensorShape([64, None, 1])\n", + " (hotel_country_list): TensorShape([64, None, 1])\n", + " (weekday_checkin_list): TensorShape([64, None, 1])\n", + " (weekday_checkout_list): TensorShape([64, None, 1])\n", + " )\n", + " (_feature_dtypes): Dict(\n", + " (city_id_list): tf.int64\n", + " (booker_country_list): tf.int64\n", + " (hotel_country_list): tf.int64\n", + " (weekday_checkin_list): tf.int64\n", + " (weekday_checkout_list): tf.int64\n", + " )\n", + "), because it is not built.\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "WARNING:tensorflow:Skipping full serialization of Keras layer Dropout(\n", + " (_feature_shapes): Dict(\n", + " (city_id_list): TensorShape([64, None, 1])\n", + " (booker_country_list): TensorShape([64, None, 1])\n", + " (hotel_country_list): TensorShape([64, None, 1])\n", + " (weekday_checkin_list): TensorShape([64, None, 1])\n", + " (weekday_checkout_list): TensorShape([64, None, 1])\n", + " )\n", + " (_feature_dtypes): Dict(\n", + " (city_id_list): tf.int64\n", + " (booker_country_list): tf.int64\n", + " (hotel_country_list): tf.int64\n", + " (weekday_checkin_list): tf.int64\n", + " (weekday_checkout_list): tf.int64\n", + " )\n", + "), because it is not built.\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "WARNING:tensorflow:Skipping full serialization of Keras layer Dropout(\n", + " (_feature_shapes): Dict(\n", + " (city_id_list): TensorShape([64, None, 1])\n", + " (booker_country_list): TensorShape([64, None, 1])\n", + " (hotel_country_list): TensorShape([64, None, 1])\n", + " (weekday_checkin_list): TensorShape([64, None, 1])\n", + " (weekday_checkout_list): TensorShape([64, None, 1])\n", + " )\n", + " (_feature_dtypes): Dict(\n", + " (city_id_list): tf.int64\n", + " (booker_country_list): tf.int64\n", + " (hotel_country_list): tf.int64\n", + " (weekday_checkin_list): tf.int64\n", + " (weekday_checkout_list): tf.int64\n", + " )\n", + "), because it is not built.\n", + "WARNING:absl:Found untraced functions such as model_context_layer_call_fn, model_context_layer_call_and_return_conditional_losses, sequence_mask_random_layer_call_fn, sequence_mask_random_layer_call_and_return_conditional_losses, sequence_mask_last_layer_call_fn while saving (showing 5 of 108). These functions will not be directly callable after loading.\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "INFO:tensorflow:Assets written to: /workspace/data/ensemble/1_predicttensorflowtriton/1/model.savedmodel/assets\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "INFO:tensorflow:Assets written to: /workspace/data/ensemble/1_predicttensorflowtriton/1/model.savedmodel/assets\n", + "/usr/local/lib/python3.8/dist-packages/merlin/models/tf/utils/tf_utils.py:101: CustomMaskWarning: Custom mask layers require a config and must override get_config. When loading, the custom mask layer must be passed to the custom_objects argument.\n", + " config[key] = tf.keras.utils.serialize_keras_object(maybe_value)\n", + "/usr/local/lib/python3.8/dist-packages/merlin/models/tf/core/combinators.py:288: CustomMaskWarning: Custom mask layers require a config and must override get_config. When loading, the custom mask layer must be passed to the custom_objects argument.\n", + " config[i] = tf.keras.utils.serialize_keras_object(layer)\n", + "/usr/local/lib/python3.8/dist-packages/keras/saving/legacy/saved_model/layer_serialization.py:134: CustomMaskWarning: Custom mask layers require a config and must override get_config. When loading, the custom mask layer must be passed to the custom_objects argument.\n", + " return serialization.serialize_keras_object(obj)\n", + "/usr/local/lib/python3.8/dist-packages/keras/initializers/initializers_v2.py:120: UserWarning: The initializer TruncatedNormal is unseeded and being called multiple times, which will return identical values each time (even if the initializer is unseeded). Please update your code to provide a seed to the initializer, or avoid using the same initializer instance more than once.\n", + " warnings.warn(\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "WARNING:tensorflow:No training configuration found in save file, so the model was *not* compiled. Compile it manually.\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "WARNING:tensorflow:No training configuration found in save file, so the model was *not* compiled. Compile it manually.\n" + ] + } + ], + "source": [ + "from merlin.systems.dag.ops.tensorflow import PredictTensorflow\n", + "from merlin.systems.dag.ensemble import Ensemble\n", + "from merlin.systems.dag.ops.workflow import TransformWorkflow\n", + "\n", + "inf_ops = wf.input_schema.column_names >> TransformWorkflow(wf) >> PredictTensorflow(model)\n", + "\n", + "ensemble = Ensemble(inf_ops, wf.input_schema)\n", + "ensemble.export(os.path.join(OUTPUT_DATA_DIR, 'ensemble'));" + ] + }, + { + "cell_type": "markdown", + "id": "5edc6046", + "metadata": {}, + "source": [ + "After we export the ensemble, we are ready to start the Triton Inference Server.\n", + "\n", + "The server is installed in Merlin Tensorflow and Merlin PyTorch containers. If you are not using one of our containers, then ensure it is installed in your environment. For more information, see the Triton Inference Server [documentation](https://github.com/triton-inference-server/server/blob/r22.03/README.md#documentation).\n", + "\n", + "You can start the server by running the following command:\n", + "\n", + "```tritonserver --model-repository={OUTPUT_DATA_DIR}/ensemble/```\n", + "\n", + "For the --model-repository argument, specify the same value as the `export_path` that you specified previously in the `ensemble.export` method.\n", + "\n", + "After you run the `tritonserver` command, wait until your terminal shows messages like the following example:\n", + "\n", + "I0414 18:29:50.741833 4067 grpc_server.cc:4421] Started GRPCInferenceService at 0.0.0.0:8001
\n", + "I0414 18:29:50.742197 4067 http_server.cc:3113] Started HTTPService at 0.0.0.0:8000
\n", + "I0414 18:29:50.783470 4067 http_server.cc:178] Started Metrics Service at 0.0.0.0:8002\n", + "\n", + "Let us now package our data for inference. We will send the first 4 rows of our validation data, which corresponds to a single trip. The data will be first processed by the `NVTabular` workflow and subsequentally passed to our transformer model for predicting. " + ] + }, + { + "cell_type": "markdown", + "id": "d83a304d", + "metadata": {}, + "source": [ + "Let us send the first 4 rows of our validation data to Triton. This will correspond to a single trip (all rows have the same `utrip_id`) with four stops." + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "id": "3cad9026", + "metadata": {}, + "outputs": [], + "source": [ + "from merlin.systems.triton import convert_df_to_triton_input\n", + "\n", + "validation_data = validation_set_dataset.compute()\n", + "inputs = convert_df_to_triton_input(wf.input_schema, validation_data.iloc[:4])" + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "id": "c508adce", + "metadata": {}, + "outputs": [], + "source": [ + "import tritonclient.grpc as grpcclient\n", + "\n", + "with grpcclient.InferenceServerClient(\"localhost:8001\") as client:\n", + " response = client.infer('executor_model', inputs)" + ] + }, + { + "cell_type": "markdown", + "id": "6d34eecf", + "metadata": {}, + "source": [ + "The response consists of logits coming from our model." + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "id": "b3284691", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "array([[-2.8206294 , -1.3849059 , 1.9042726 , ..., 0.851537 ,\n", + " -2.4237087 , -0.73849726]], dtype=float32)" + ] + }, + "execution_count": 20, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "response.as_numpy('city_id_list/categorical_output')" + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "id": "824d2b4f", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "(1, 37203)" + ] + }, + "execution_count": 21, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "predictions = response.as_numpy('city_id_list/categorical_output')\n", + "predictions.shape" + ] + }, + { + "cell_type": "markdown", + "id": "fc5d415b", + "metadata": {}, + "source": [ + "The above values are logits output from the last layer of our model. They correspond in size to the cardinality of `city_id`, our target variable:" + ] + }, + { + "cell_type": "code", + "execution_count": 22, + "id": "29a8c0bd", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "37203" + ] + }, + "execution_count": 22, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "cardinality = wf.output_schema['city_id_list'].properties['embedding_sizes']['cardinality']\n", + "cardinality" + ] + }, + { + "cell_type": "markdown", + "id": "3c54c30f", + "metadata": {}, + "source": [ + "## Summary" + ] + }, + { + "cell_type": "markdown", + "id": "709c07fb", + "metadata": {}, + "source": [ + "We have trained a transformer model for the next item prediction task using language model masking.\n", + "\n", + "For another session-based example that goes deeper into data preprocessing and that covers several advanced techniques (Weight Tying, Temperature Scaling) please see [Session-Based Next Item Prediction for Fashion E-Commerce](https://github.com/NVIDIA-Merlin/models/blob/t4rec_use_case/examples/usecases/ecommerce-session-based-next-item-prediction-for-fashion.ipynb). " + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.10" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/pytest.ini b/pytest.ini index 3b9940dfb..09508941f 100644 --- a/pytest.ini +++ b/pytest.ini @@ -2,3 +2,4 @@ markers = multigpu: Tests only run in multiple-GPU environments singlegpu: Optional marker to run tests in single-GPU environments. Usually used when running in both single- and multi-GPU. + notebook: mark as testing notebooks diff --git a/tests/benchmark/test_asvdb_transformers_next_item_prediction.py b/tests/benchmark/test_asvdb_transformers_next_item_prediction.py new file mode 100644 index 000000000..ccd21015a --- /dev/null +++ b/tests/benchmark/test_asvdb_transformers_next_item_prediction.py @@ -0,0 +1,95 @@ +from asvdb import ASVDb, BenchmarkResult, utils +from testbook import testbook + +from tests.conftest import REPO_ROOT, get_benchmark_info + + +@testbook( + REPO_ROOT / "examples/Next-Item-Prediction-with-Transformers/tf/transformers-next-item-prediction.ipynb", + timeout=720, + execute=False, +) +def test_func(tb, tmpdir): + tb.inject( + f""" + import os + os.environ["INPUT_DATA_DIR"] = "/raid/data/booking" + os.environ["OUTPUT_DATA_DIR"] = "{tmpdir}" + os.environ["NUM_EPOCHS"] = '1' + """ + ) + tb.cells.pop(6) + tb.cells[ + 15 + ].source = """ + def process_data(): + wf = Workflow(filtered_sessions) + + wf.fit_transform(train_set_dataset).to_parquet( + os.path.join(OUTPUT_DATA_DIR, 'train_processed.parquet') + ) + wf.transform(validation_set_dataset).to_parquet( + os.path.join(OUTPUT_DATA_DIR, 'validation_processed.parquet') + ) + + wf.save(os.path.join(OUTPUT_DATA_DIR, 'workflow')) + + data_processing_runtime = timeit.timeit(process_data, number=1) + """ + tb.cells[ + 29 + ].source = """ + model.compile(run_eagerly=False, optimizer='adam', loss="categorical_crossentropy") + + def train_model(): + model.fit( + Dataset(os.path.join(OUTPUT_DATA_DIR, 'train_processed.parquet')), + batch_size=64, + epochs=NUM_EPOCHS, + pre=mm.SequenceMaskRandom( + schema=seq_schema, + target=target, + masking_prob=0.3, + transformer=transformer_block + ) + ) + + training_runtime = timeit.timeit(train_model, number=1) + """ + tb.execute_cell(list(range(0, 35))) + data_processing_runtime = tb.ref("data_processing_runtime") + training_runtime = tb.ref("training_runtime") + ndcg_at_10 = tb.ref("metrics")["ndcg_at_10"] + + bResult1 = BenchmarkResult( + funcName="", + argNameValuePairs=[ + ("notebook_name", "usecases/transformers-next-item-prediction"), + ("measurement", "data_processing_runtime"), + ], + result=data_processing_runtime, + ) + bResult2 = BenchmarkResult( + funcName="", + argNameValuePairs=[ + ("notebook_name", "usecases/transformers-next-item-prediction"), + ("measurement", "training_runtime"), + ], + result=training_runtime, + ) + bResult3 = BenchmarkResult( + funcName="", + argNameValuePairs=[ + ("notebook_name", "usecases/transformers-next-item-prediction"), + ("measurement", "ndcg_at_10"), + ], + result=ndcg_at_10, + ) + + bInfo = get_benchmark_info() + (repo, branch) = utils.getRepoInfo() + + db = ASVDb(dbDir="s3://nvtab-bench-asvdb/models_metric_tracking", repo=repo, branches=[branch]) + db.addResult(bInfo, bResult1) + db.addResult(bInfo, bResult2) + db.addResult(bInfo, bResult3) diff --git a/tests/conftest.py b/tests/conftest.py index 470c56c40..f05d89f92 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -22,6 +22,16 @@ from pathlib import Path import pytest +import platform +import warnings +from pathlib import Path + +import distributed +import psutil +import pytest +from asvdb import BenchmarkInfo, utils + +from merlin.models.utils import ci_utils REPO_ROOT = Path(__file__).parent.parent @@ -62,3 +72,20 @@ def get_cuda_cluster(): cluster = LocalCUDACluster(n_workers=n_workers) yield cluster cluster.close() + +def get_benchmark_info(): + uname = platform.uname() + (commitHash, commitTime) = utils.getCommitInfo() + + return BenchmarkInfo( + machineName=uname.machine, + cudaVer="na", + osType="%s %s" % (uname.system, uname.release), + pythonVer=platform.python_version(), + commitHash=commitHash, + commitTime=commitTime, + gpuType="na", + cpuType=uname.processor, + arch=uname.machine, + ram="%d" % psutil.virtual_memory().total, + ) diff --git a/tests/unit/test_transformers_next_item_prediction.py b/tests/unit/test_transformers_next_item_prediction.py new file mode 100644 index 000000000..5fe7f31f8 --- /dev/null +++ b/tests/unit/test_transformers_next_item_prediction.py @@ -0,0 +1,57 @@ +import shutil + +import pytest +from testbook import testbook + +from tests.conftest import REPO_ROOT + +pytest.importorskip("transformers") +utils = pytest.importorskip("merlin.systems.triton.utils") + +TRITON_SERVER_PATH = shutil.which("tritonserver") + + +@pytest.mark.skipif(not TRITON_SERVER_PATH, reason="triton server not found") +@testbook( + REPO_ROOT / "examples/Next-Item-Prediction-with-Transformers/tf/transformers-next-item-prediction.ipynb", + timeout=720, + execute=False, +) +@pytest.mark.notebook +def test_next_item_prediction(tb, tmpdir): + tb.inject( + f""" + import os, random + os.environ["INPUT_DATA_DIR"] = "{tmpdir}" + os.environ["OUTPUT_DATA_DIR"] = "{tmpdir}" + from datetime import datetime, timedelta + from merlin.datasets.synthetic import generate_data + ds = generate_data('booking.com-raw', 10000) + df = ds.compute() + def generate_date(): + date = datetime.today() + if random.randint(0, 1): + date -= timedelta(days=7) + return date + df['checkin'] = [generate_date() for _ in range(df.shape[0])] + df['checkout'] = [generate_date() for _ in range(df.shape[0])] + df.to_csv('{tmpdir}/train_set.csv') + """ + ) + tb.cells.pop(6) + tb.cells[29].source = tb.cells[29].source.replace("epochs=5", "epochs=1") + tb.execute_cell(list(range(0, 38))) + + with utils.run_triton_server(f"{tmpdir}/ensemble", grpc_port=8001): + tb.execute_cell(list(range(38, len(tb.cells)))) + + tb.inject( + """ + logits_count = predictions.shape[1] + """ + ) + tb.execute_cell(len(tb.cells) - 1) + + cardinality = tb.ref("cardinality") + logits_count = tb.ref("logits_count") + assert logits_count == cardinality diff --git a/tests/unit/test_transformers_next_item_prediction_with_pretrained_embeddings.py b/tests/unit/test_transformers_next_item_prediction_with_pretrained_embeddings.py new file mode 100644 index 000000000..f3157d1e1 --- /dev/null +++ b/tests/unit/test_transformers_next_item_prediction_with_pretrained_embeddings.py @@ -0,0 +1,38 @@ +import shutil + +import pytest +from testbook import testbook + +from tests.conftest import REPO_ROOT + +pytest.importorskip("transformers") +utils = pytest.importorskip("merlin.systems.triton.utils") + +TRITON_SERVER_PATH = shutil.which("tritonserver") + + +@pytest.mark.skipif(not TRITON_SERVER_PATH, reason="triton server not found") +@testbook( + REPO_ROOT + / "examples/Next-Item-Prediction-with-Transformers/tf/transformers-next-item-prediction-with-pretrained-embeddings.ipynb", + timeout=720, + execute=False, +) +@pytest.mark.notebook +def test_next_item_prediction(tb, tmpdir): + tb.inject( + f""" + import os, random + os.environ["OUTPUT_DATA_DIR"] = "{tmpdir}" + os.environ["NUM_EPOCHS"] = "1" + os.environ["NUM_EXAMPLES"] = "1_500" + os.environ["MINIMUM_SESSION_LENGTH"] = "2" + """ + ) + tb.execute_cell(list(range(0, 48))) + + with utils.run_triton_server(f"{tmpdir}/ensemble", grpc_port=8001): + tb.execute_cell(list(range(48, len(tb.cells)))) + + predicted_hashed_url_id = tb.ref("predicted_hashed_url_id").item() + assert predicted_hashed_url_id >= 0 and predicted_hashed_url_id <= 1002