From 0483f351f21bc5cddfcffa75dc26166b97f284bf Mon Sep 17 00:00:00 2001 From: "Ryan J. Dillon" Date: Tue, 11 Feb 2020 14:40:35 +0100 Subject: [PATCH] Add Mlflow report example notebook * Change get_kwargs method to return an empty dict if the secrets environment variable missing --- examples/Gordo-Reporters-MlFlow.ipynb | 383 ++++++++++++++++++ gordo/reporters/mlflow.py | 54 +-- tests/gordo/reporters/test_mlflow_reporter.py | 59 +-- tests/test_examples.py | 4 +- 4 files changed, 445 insertions(+), 55 deletions(-) create mode 100644 examples/Gordo-Reporters-MlFlow.ipynb diff --git a/examples/Gordo-Reporters-MlFlow.ipynb b/examples/Gordo-Reporters-MlFlow.ipynb new file mode 100644 index 000000000..53f55c16e --- /dev/null +++ b/examples/Gordo-Reporters-MlFlow.ipynb @@ -0,0 +1,383 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Using the Gordo Mlflow reporter with AzureML\n", + "\n", + "## Building on a cluster\n", + "When a gordo workflow is generated from a YAML config using `kubectl apply -f config.yml`, the model is built by the model builder pod. If a remote logging \"reporter\" was configured in the `config.yml`, then at the end of the model building step the metadata will be logged with the specified reporter. \n", + "\n", + "**Note**\n", + "When using the MLflow reporter, the cluster running the workflow must have the AzureML workspace credentials set to the environment variable `AZUREML_WORKSPACE_STR` as well as the `DL_SERVICE_AUTH_STR`.\n", + "\n", + "The cluster should use the workspace credentials associated with the deployment stage associated with that cluster, e.g. \"production\", \"staging\", \"testing\", etc.\n", + "\n", + "While reporters can be defined in the globals runtime when using the workflow generator, they must be defined by machine when building locally." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "\n", + "from azureml.core.workspace import Workspace\n", + "from azureml.core.authentication import InteractiveLoginAuthentication\n", + "import mlflow\n", + "\n", + "from gordo.reporters.mlflow import get_mlflow_client" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Note that dummy tag names are used here. The data provider is\n", + "# patched for testing the notebook code, but these will need to\n", + "# be changed or the use RadomDataProviderfor properly testing this\n", + "# workflow.\n", + "config_str = \"\"\"\n", + "apiVersion: equinor.com/v1\n", + "kind: Gordo\n", + "metadata:\n", + " name: test-project\n", + "spec:\n", + " deploy-version: 0.52.1\n", + " config:\n", + " machines:\n", + " - dataset:\n", + " tags:\n", + " - TRA-Tag-1\n", + " - TRA-Tag-2\n", + " target_tag_list:\n", + " - TRA-Tag-3\n", + " - TRA-Tag-4\n", + " train_end_date: '2019-03-01T00:00:00+00:00'\n", + " train_start_date: '2019-01-01T00:00:00+00:00'\n", + " data_provider: \n", + " interactive: True\n", + " metadata:\n", + " information: 'Use RandomForestRegressor to predict separate set of tags.'\n", + " model:\n", + " gordo.machine.model.anomaly.diff.DiffBasedAnomalyDetector:\n", + " base_estimator:\n", + " sklearn.compose.TransformedTargetRegressor:\n", + " transformer: sklearn.preprocessing.data.MinMaxScaler\n", + " regressor:\n", + " sklearn.pipeline.Pipeline:\n", + " steps:\n", + " - sklearn.decomposition.pca.PCA\n", + " - sklearn.multioutput.MultiOutputRegressor:\n", + " estimator:\n", + " sklearn.ensemble.forest.RandomForestRegressor:\n", + " n_estimators: 35\n", + " max_depth: 10\n", + " name: supervised-random-forest-anomaly\n", + " # During local building, reporters must be defined by machine\n", + " runtime:\n", + " reporters:\n", + " - gordo.reporters.mlflow.MlFlowReporter\n", + "globals:\n", + " runtime:\n", + " builder:\n", + " # Remote logging is by default deactived without setting anything.\n", + " remote_logging:\n", + " enable: False\n", + " \"\"\"" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "\n", + "## Building locally\n", + "\n", + "To build machines locally, but log remotely, configure the `AZUREML_WORKSPACE_STR` and `DL_SERVICE_AUTH_STR` as described above, then run the config file with the reporter configuration in `gordo.builder.local_build.local_build` method." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from gordo.builder.local_build import local_build\n", + "import os\n", + "\n", + "# This downloads 1yr of data from the datalake\n", + "# so it will of coarse take some time\n", + "model, machine = next(local_build(config_str))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# During a deployment, the CLI build method calls the reporters.\n", + "# In a local build, we'll do that manually\n", + "machine.report()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Reviewing results\n", + "\n", + "## AzureML Frontend\n", + "\n", + "The AzureML frontend can be helpful for quickly looking that your results appear to be populating correctly, for example during a gordo deployment. [Portal Link](https://ml.azure.com/?wsid=/subscriptions/019958ea-fe2c-4e14-bbd9-0d2db8ed7cfc/resourcegroups/gordo-ml-workspace-poc-rg/workspaces/gordo-ml-workspace-poc-ml)\n", + "\n", + "## Querying with MlflowClient\n", + "\n", + "\n", + "The necessary requirements for using Mlflow with AzureML are installed with gordo, so you can just use the client from your gordo `virtualenv`.\n", + "\n", + "The following are just some general examples, but you can find further documention on the client [here](https://www.mlflow.org/docs/latest/tracking.html#querying-runs-programmatically) as well as API documentation [here](https://www.mlflow.org/docs/latest/python_api/mlflow.tracking.html).\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# If you want to configure the client to query results on AzureML,\n", + "# define the connection arguments in a kwargs dict.\n", + "workspace_kwargs = { \n", + " \"subscription_id\":\"value\", \n", + " \"resource_group\": \"value\", \n", + " \"workspace_name\": \"value\",\n", + " \"auth\": InteractiveLoginAuthentication(force=True)\n", + " }\n", + "\n", + "# To login automatically, provide the service principal \n", + "# arguments in a kwargs dict\n", + "service_principal_kwargs = { \n", + " \"tenant_id\": \"\",\n", + " \"service_principal_id\": \"\",\n", + " \"service_principal_password\": \"\"\n", + " }\n", + "\n", + "# For the case of this example, we'll just run things locally, so we'll\n", + "# just pass empty dicts, which is the default when no arguments are passed.\n", + "workspace_kwargs = {}\n", + "service_principal_kwargs = {}\n", + "client = get_mlflow_client(workspace_kwargs, service_principal_kwargs)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Experiments\n", + "Each build of a machine corresponds to a new run for an experiment with that machine's name. With each subsequent deployment, there will be a new run under each built machines name." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Get all experiments (can take a bit)\n", + "experiments = client.list_experiments()\n", + "\n", + "# We've only built one machine, but it'ss\n", + "for exp in experiments:\n", + " print(exp.name)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Get a single experiment by name\n", + "exp = client.get_experiment_by_name(\"supervised-random-forest-anomaly\")\n", + "print(exp)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "# Find experiments matching some pattern\n", + "experiment_ids = [e.experiment_id for e in experiments if e.name.startswith(\"super\")]\n", + "exp_id = experiment_ids[0]\n", + "print(exp_id)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Runs\n", + "Searching of Runs can be perfomed with some [built-in arguments](https://www.mlflow.org/docs/latest/python_api/mlflow.tracking.html#mlflow.tracking.MlflowClient.search_runs), or with basic SQL select queries passed to the `filter_string` argument. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "## Using order by a metric\n", + "runs = client.search_runs(experiment_ids=experiment_ids, max_results=50, order_by=[\"metrics.r_2\"])\n", + "\n", + "print(\"Number of runs:\", len(runs))\n", + "print(\"Example:\", runs[0])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Using an SQL filter string\n", + "\n", + "# First we can get a single run and look at what metrics are logged in gordo\n", + "runs = client.search_runs(experiment_ids=experiment_ids, max_results=1)\n", + "runs[0].data.metrics.keys()\n", + "\n", + "# We can then search for runs matching a certain R2 score range\n", + "# Note that the Identifier must be enclosed in backticks or double quotes\n", + "runs = client.search_runs(experiment_ids=experiment_ids, filter_string='metrics.`r2-score` < 8', max_results=10) \n", + "\n", + "print(\"Number of runs:\", len(runs))\n", + "print(\"Example:\", runs[0])" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "There are som handy tools using the `azureml-sdk` as well. For example, you can bring up a widget displaying information about a run, and get metrics as iterables." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# We'll put this in an if statement, so the rest of this \n", + "# notebook can be tested\n", + "if False:\n", + " from azureml.widgets import RunDetails\n", + " from azureml.core.experiment import Experiment\n", + " from azureml.core.run import Run\n", + " experiment = Experiment(ws, experiments[-80].name)\n", + " azure_run = next(experiment.get_runs())\n", + " RunDetails(azure_run).show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "if False:\n", + " import matplotlib.pyplot as plt\n", + " # Or do some things yourself\n", + " metrics = azure_run.get_metrics()\n", + " print(metrics.keys())\n", + " plt.plot(range(len(metrics[\"accuracy\"])), metrics[\"accuracy\"])\n", + " plt.show()\n", + " print(azure_run.properties)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Artifacts\n", + "Artificacts are files, such JSON, images, pickled models, etc. The following are examples on explicitly uploading and downloading them on AzureML with a given `run_id`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "import uuid\n", + "import json\n", + "import shutil\n", + "\n", + "run_id = client.list_run_infos(exp.experiment_id)[-1].run_id\n", + "art_id = f\"{uuid.uuid4().hex}\"\n", + "\n", + "# Upload artifacts\n", + "local_path = os.path.abspath(f\"./{exp.name}_{run_id}/\")\n", + "if os.path.isdir(local_path):\n", + " shutil.rmtree(local_path)\n", + "os.makedirs(local_path, exist_ok=True)\n", + "\n", + "json.dump({\"a\": 42.0, \"b\":\"text\"}, open(os.path.join(local_path, f\"{art_id}.json\"), \"w\"))\n", + "\n", + "client.log_artifacts(run_id, local_path)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Get artifacts for a given Run\n", + "artifacts = client.list_artifacts(run_id)\n", + "\n", + "# Make a new path to save these to\n", + "new_local_path = os.path.join(local_path, \"downloaded\")\n", + "os.makedirs(new_local_path, exist_ok=True)\n", + "\n", + "# Iterate over Run's artifacts and save them\n", + "for f in artifacts:\n", + " client.download_artifacts(run_id=run_id, path=f.path, dst_path=local_path)\n", + " print(\"Downloaded:\", f)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.6" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/gordo/reporters/mlflow.py b/gordo/reporters/mlflow.py index ceac7a35d..5ac5b233d 100644 --- a/gordo/reporters/mlflow.py +++ b/gordo/reporters/mlflow.py @@ -4,7 +4,7 @@ import logging import os import tempfile -from typing import Dict, List, Union, Tuple +from typing import Dict, List, Tuple, Union from uuid import uuid4 from azureml.core import Workspace @@ -341,37 +341,28 @@ def _calc_n_batches(n: int, n_max: int): return log_batches -def get_kwargs_from_secret(name: str, keys: List[str]) -> dict: +def get_kwargs_from_secret(secret_str: str, keys: List[str]) -> dict: """ Get keyword arguments dictionary from secrets environment variable Parameters ---------- - name: str - Name of the environment variable whose content is a colon separated - list of secrets. + secret_str: str + String of a colon separated list of secrets. + keys: List[str] + List of keys associated with each element in secrets string. Returns ------- kwargs: dict Dictionary of keyword arguments parsed from environment variable. """ - secret_str = os.getenv(name) - - if secret_str is None: - raise MlflowLoggingError(f"The value for env var '{name}' must not be `None`.") - - if secret_str: - elements = secret_str.split(":") - if len(elements) != len(keys): - raise MlflowLoggingError( - "`keys` len {len(keys)} must be of equal length with env var {name} elements {len(elements)}." - ) - kwargs = {key: elements[i] for i, key in enumerate(keys)} - else: - kwargs = {} - - return kwargs + elements = secret_str.split(":") + if len(elements) != len(keys): + raise MlflowLoggingError( + f"`keys` len {len(keys)} must be of equal number of elements {len(elements)} parsed from secrets str." + ) + return {key: elements[i] for i, key in enumerate(keys)} def get_workspace_kwargs() -> dict: @@ -387,8 +378,13 @@ def get_workspace_kwargs() -> dict: AzureML Workspace configuration to use for remote MLFlow tracking. See :func:`gordo.builder.mlflow_utils.get_mlflow_client`. """ - return get_kwargs_from_secret( - "AZUREML_WORKSPACE_STR", ["subscription_id", "resource_group", "workspace_name"] + secret_str = os.getenv("AZUREML_WORKSPACE_STR") + return ( + get_kwargs_from_secret( + secret_str, ["subscription_id", "resource_group", "workspace_name"] + ) + if secret_str + else dict() ) @@ -405,9 +401,14 @@ def get_spauth_kwargs() -> dict: AzureML ServicePrincipalAuthentication keyword arguments. See :func:`gordo.builder.mlflow_utils.get_mlflow_client` """ - return get_kwargs_from_secret( - "DL_SERVICE_AUTH_STR", - ["tenant_id", "service_principal_id", "service_principal_password"], + secret_str = os.getenv("DL_SERVICE_AUTH_STR") + return ( + get_kwargs_from_secret( + secret_str, + ["tenant_id", "service_principal_id", "service_principal_password"], + ) + if secret_str + else dict() ) @@ -491,6 +492,7 @@ def report(self, machine: Machine): workspace_kwargs = get_workspace_kwargs() service_principal_kwargs = get_spauth_kwargs() + cache_key = ModelBuilder.calculate_cache_key(machine) with mlflow_context( diff --git a/tests/gordo/reporters/test_mlflow_reporter.py b/tests/gordo/reporters/test_mlflow_reporter.py index 1283aec9b..c1fe1012d 100644 --- a/tests/gordo/reporters/test_mlflow_reporter.py +++ b/tests/gordo/reporters/test_mlflow_reporter.py @@ -216,41 +216,44 @@ def test_get_machine_log_items(metadata): assert all(type(p) == Param for p in params) -@pytest.mark.parametrize( - "secret_str,keys,keys_valid", - [ - ("dummy1:dummy2:dummy3", ["key1", "key2", "key3"], True), - ("dummy1:dummy2:dummy3", ["key1", "key2"], False), - ], -) -def test_get_kwargs_from_secret(monkeypatch, secret_str, keys, keys_valid): +def test_get_kwargs_from_secret_invalid(): """ - Test that service principal kwargs are generated correctly if env var present + Test that method fails with number of secret elements mismatch number of keys """ - env_var_name = "TEST_SECRET" - - # TEST_SECRET doesn't exist as env var with pytest.raises(ReporterException): - mlu.get_kwargs_from_secret(env_var_name, keys) + mlu.get_kwargs_from_secret("dummy1:dummy2:dummy3", ["key1", "key2"]) - # TEST_SECRET exists as env var - monkeypatch.setenv(name=env_var_name, value=secret_str) - if keys_valid: - kwargs = mlu.get_kwargs_from_secret(env_var_name, keys) - for key, value in zip(keys, secret_str.split(":")): - assert kwargs[key] == value - else: - with pytest.raises(ReporterException): - mlu.get_kwargs_from_secret(env_var_name, keys) + with pytest.raises(AttributeError): + mlu.get_kwargs_from_secret(None, ["key1", "key2"]) -def test_workspace_spauth_kwargs(): - """Make sure an error is thrown when env vars not set""" - with pytest.raises(ReporterException): - mlu.get_workspace_kwargs() +def test_workspace_kwargs(monkeypatch): + """ + Test that appropriate kwargs dict is returned + """ + assert mlu.get_workspace_kwargs() == {} - with pytest.raises(ReporterException): - mlu.get_spauth_kwargs() + monkeypatch.setenv("AZUREML_WORKSPACE_STR", "test:test:test") + assert mlu.get_workspace_kwargs() == { + "subscription_id": "test", + "resource_group": "test", + "workspace_name": "test", + } + + +def test_spauth_kwargs(monkeypatch): + """ + Test that appropriate kwargs dict is returned + """ + + assert mlu.get_spauth_kwargs() == {} + + monkeypatch.setenv("DL_SERVICE_AUTH_STR", "test:test:test") + assert mlu.get_spauth_kwargs() == { + "tenant_id": "test", + "service_principal_id": "test", + "service_principal_password": "test", + } def test_MachineEncoder(metadata): diff --git a/tests/test_examples.py b/tests/test_examples.py index 59bea03c1..eb7d43409 100644 --- a/tests/test_examples.py +++ b/tests/test_examples.py @@ -47,8 +47,10 @@ def test_faked_DataLakeBackedDataset(MockDataset): X, y = dataset.get_data() +@mock.patch("azureml.core.authentication.InteractiveLoginAuthentication") +@mock.patch("gordo.reporters.mlflow.Workspace") @mock.patch.object(TimeSeriesDataset, "get_data", return_value=_fake_data()) -def test_notebooks(MockDataset, tmpdir): +def test_notebooks(MockDataset, MockInteractiveLogin, MockWorskpace, tmpdir): """ Ensures all notebooks will run without error """