diff --git a/docs/sphinx/user_guide/index.rst b/docs/sphinx/user_guide/index.rst index 5e418222..e517f793 100644 --- a/docs/sphinx/user_guide/index.rst +++ b/docs/sphinx/user_guide/index.rst @@ -28,6 +28,8 @@ Before starting, make sure you have installed TEEHR and its dependencies as desc :doc:`Grouping and Filtering ` :download:`(download notebook) ` +:doc:`Adding calculated fields ` :download:`(download notebook) ` + .. toctree:: :maxdepth: 2 :hidden: @@ -40,4 +42,5 @@ Before starting, make sure you have installed TEEHR and its dependencies as desc Read an Evaluation from S3 Joining Timeseries Grouping and Filtering + Adding calculated fields Metrics diff --git a/docs/sphinx/user_guide/notebooks/02_loading_local_data.ipynb b/docs/sphinx/user_guide/notebooks/02_loading_local_data.ipynb index 0efe886b..7933988b 100644 --- a/docs/sphinx/user_guide/notebooks/02_loading_local_data.ipynb +++ b/docs/sphinx/user_guide/notebooks/02_loading_local_data.ipynb @@ -639,7 +639,7 @@ "source": [ "Now we have data in all the tables required to conduct an evaluation. There is one more step to do before we can start \"slicing and dicing\" the data and looking at evaluation metrics - we need to generate the `joined_timeseries` data table. The `joined_timeseries` table is essentially a materialized view that joins the `primary_timeseries` and `secondary_timeseries` based on `location_id`, `value_time`, `variable_name` and `unit_name`, adds the `location_attributes`, and adds any user defined fields.\n", "\n", - "Here we are going to execute the `create()` method on the `joined_timeseries` table class to generate the `joined_timeseries` \"materialized view\" from the data we just loaded into the domain, location and timeseries tables. The user defined fields to be added are defined as part of the evaluation in the `[evaluation_dir]/scripts/user_defined_fields.py` Python script. Note, it is possible to skip adding any user defined fields by setting `execute_udf=False` when calling the `create()` method. Creating user defined fields can be a powerful tool to allow per-location, per-timestep fields to be added to the materialized `joined_timeseries` table. This will be covered in more detail in future lessons. For now, lets just set `execute_udf=True` and execute the `user_defined_fields.py` script." + "Here we are going to execute the `create()` method on the `joined_timeseries` table class to generate the `joined_timeseries` \"materialized view\" from the data we just loaded into the domain, location and timeseries tables. The user defined fields to be added are defined as part of the evaluation in the `[evaluation_dir]/scripts/user_defined_fields.py` Python script. Note, it is possible to skip adding any user defined fields by setting `execute_scripts=False` when calling the `create()` method. Creating user defined fields can be a powerful tool to allow per-location, per-timestep fields to be added to the materialized `joined_timeseries` table. This will be covered in more detail in future lessons. For now, lets just set `execute_scripts=True` and execute the `user_defined_fields.py` script." ] }, { @@ -648,7 +648,7 @@ "metadata": {}, "outputs": [], "source": [ - "ev.joined_timeseries.create(execute_udf=True)" + "ev.joined_timeseries.create(execute_scripts=True)" ] }, { diff --git a/docs/sphinx/user_guide/notebooks/04_setup_simple_example.ipynb b/docs/sphinx/user_guide/notebooks/04_setup_simple_example.ipynb index b4804227..c821c93b 100644 --- a/docs/sphinx/user_guide/notebooks/04_setup_simple_example.ipynb +++ b/docs/sphinx/user_guide/notebooks/04_setup_simple_example.ipynb @@ -420,7 +420,7 @@ "metadata": {}, "outputs": [], "source": [ - "ev.joined_timeseries.create(execute_udf=True)" + "ev.joined_timeseries.create(execute_scripts=True)" ] }, { diff --git a/docs/sphinx/user_guide/notebooks/08_adding_calculated_fields.ipynb b/docs/sphinx/user_guide/notebooks/08_adding_calculated_fields.ipynb new file mode 100644 index 00000000..542674e5 --- /dev/null +++ b/docs/sphinx/user_guide/notebooks/08_adding_calculated_fields.ipynb @@ -0,0 +1,1149 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# 08 Add Calculated Fields" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Overview\n", + "In this lesson we will clone a small example TEEHR Evaluation from S3 and work through the different ways to add calculated fields to the joined_timeseries table, save them to the joined_timeseries table on disk to persist them, or add them temporarily before calculating metrics. The first few steps for creating and cloning the TEEHR Evaluation should look familiar if you worked through the previous examples." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Create a new Evaluation\n", + "First we will import TEEHR along with some other required libraries for this example. Then we create a new instance of the Evaluation that points to a directory where the evaluation data will be stored." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + " \n", + "
\n", + " \n", + " Loading BokehJS ...\n", + "
\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "application/javascript": "'use strict';\n(function(root) {\n function now() {\n return new Date();\n }\n\n const force = true;\n\n if (typeof root._bokeh_onload_callbacks === \"undefined\" || force === true) {\n root._bokeh_onload_callbacks = [];\n root._bokeh_is_loading = undefined;\n }\n\nconst JS_MIME_TYPE = 'application/javascript';\n const HTML_MIME_TYPE = 'text/html';\n const EXEC_MIME_TYPE = 'application/vnd.bokehjs_exec.v0+json';\n const CLASS_NAME = 'output_bokeh rendered_html';\n\n /**\n * Render data to the DOM node\n */\n function render(props, node) {\n const script = document.createElement(\"script\");\n node.appendChild(script);\n }\n\n /**\n * Handle when an output is cleared or removed\n */\n function handleClearOutput(event, handle) {\n function drop(id) {\n const view = Bokeh.index.get_by_id(id)\n if (view != null) {\n view.model.document.clear()\n Bokeh.index.delete(view)\n }\n }\n\n const cell = handle.cell;\n\n const id = cell.output_area._bokeh_element_id;\n const server_id = cell.output_area._bokeh_server_id;\n\n // Clean up Bokeh references\n if (id != null) {\n drop(id)\n }\n\n if (server_id !== undefined) {\n // Clean up Bokeh references\n const cmd_clean = \"from bokeh.io.state import curstate; print(curstate().uuid_to_server['\" + server_id + \"'].get_sessions()[0].document.roots[0]._id)\";\n cell.notebook.kernel.execute(cmd_clean, {\n iopub: {\n output: function(msg) {\n const id = msg.content.text.trim()\n drop(id)\n }\n }\n });\n // Destroy server and session\n const cmd_destroy = \"import bokeh.io.notebook as ion; ion.destroy_server('\" + server_id + \"')\";\n cell.notebook.kernel.execute(cmd_destroy);\n }\n }\n\n /**\n * Handle when a new output is added\n */\n function handleAddOutput(event, handle) {\n const output_area = handle.output_area;\n const output = handle.output;\n\n // limit handleAddOutput to display_data with EXEC_MIME_TYPE content only\n if ((output.output_type != \"display_data\") || (!Object.prototype.hasOwnProperty.call(output.data, EXEC_MIME_TYPE))) {\n return\n }\n\n const toinsert = output_area.element.find(\".\" + CLASS_NAME.split(' ')[0]);\n\n if (output.metadata[EXEC_MIME_TYPE][\"id\"] !== undefined) {\n toinsert[toinsert.length - 1].firstChild.textContent = output.data[JS_MIME_TYPE];\n // store reference to embed id on output_area\n output_area._bokeh_element_id = output.metadata[EXEC_MIME_TYPE][\"id\"];\n }\n if (output.metadata[EXEC_MIME_TYPE][\"server_id\"] !== undefined) {\n const bk_div = document.createElement(\"div\");\n bk_div.innerHTML = output.data[HTML_MIME_TYPE];\n const script_attrs = bk_div.children[0].attributes;\n for (let i = 0; i < script_attrs.length; i++) {\n toinsert[toinsert.length - 1].firstChild.setAttribute(script_attrs[i].name, script_attrs[i].value);\n toinsert[toinsert.length - 1].firstChild.textContent = bk_div.children[0].textContent\n }\n // store reference to server id on output_area\n output_area._bokeh_server_id = output.metadata[EXEC_MIME_TYPE][\"server_id\"];\n }\n }\n\n function register_renderer(events, OutputArea) {\n\n function append_mime(data, metadata, element) {\n // create a DOM node to render to\n const toinsert = this.create_output_subarea(\n metadata,\n CLASS_NAME,\n EXEC_MIME_TYPE\n );\n this.keyboard_manager.register_events(toinsert);\n // Render to node\n const props = {data: data, metadata: metadata[EXEC_MIME_TYPE]};\n render(props, toinsert[toinsert.length - 1]);\n element.append(toinsert);\n return toinsert\n }\n\n /* Handle when an output is cleared or removed */\n events.on('clear_output.CodeCell', handleClearOutput);\n events.on('delete.Cell', handleClearOutput);\n\n /* Handle when a new output is added */\n events.on('output_added.OutputArea', handleAddOutput);\n\n /**\n * Register the mime type and append_mime function with output_area\n */\n OutputArea.prototype.register_mime_type(EXEC_MIME_TYPE, append_mime, {\n /* Is output safe? */\n safe: true,\n /* Index of renderer in `output_area.display_order` */\n index: 0\n });\n }\n\n // register the mime type if in Jupyter Notebook environment and previously unregistered\n if (root.Jupyter !== undefined) {\n const events = require('base/js/events');\n const OutputArea = require('notebook/js/outputarea').OutputArea;\n\n if (OutputArea.prototype.mime_types().indexOf(EXEC_MIME_TYPE) == -1) {\n register_renderer(events, OutputArea);\n }\n }\n if (typeof (root._bokeh_timeout) === \"undefined\" || force === true) {\n root._bokeh_timeout = Date.now() + 5000;\n root._bokeh_failed_load = false;\n }\n\n const NB_LOAD_WARNING = {'data': {'text/html':\n \"
\\n\"+\n \"

\\n\"+\n \"BokehJS does not appear to have successfully loaded. If loading BokehJS from CDN, this \\n\"+\n \"may be due to a slow or bad network connection. Possible fixes:\\n\"+\n \"

\\n\"+\n \"\\n\"+\n \"\\n\"+\n \"from bokeh.resources import INLINE\\n\"+\n \"output_notebook(resources=INLINE)\\n\"+\n \"\\n\"+\n \"
\"}};\n\n function display_loaded(error = null) {\n const el = document.getElementById(\"bc94028c-c42f-4b78-b7fe-1cda7b24e7ff\");\n if (el != null) {\n const html = (() => {\n if (typeof root.Bokeh === \"undefined\") {\n if (error == null) {\n return \"BokehJS is loading ...\";\n } else {\n return \"BokehJS failed to load.\";\n }\n } else {\n const prefix = `BokehJS ${root.Bokeh.version}`;\n if (error == null) {\n return `${prefix} successfully loaded.`;\n } else {\n return `${prefix} encountered errors while loading and may not function as expected.`;\n }\n }\n })();\n el.innerHTML = html;\n\n if (error != null) {\n const wrapper = document.createElement(\"div\");\n wrapper.style.overflow = \"auto\";\n wrapper.style.height = \"5em\";\n wrapper.style.resize = \"vertical\";\n const content = document.createElement(\"div\");\n content.style.fontFamily = \"monospace\";\n content.style.whiteSpace = \"pre-wrap\";\n content.style.backgroundColor = \"rgb(255, 221, 221)\";\n content.textContent = error.stack ?? error.toString();\n wrapper.append(content);\n el.append(wrapper);\n }\n } else if (Date.now() < root._bokeh_timeout) {\n setTimeout(() => display_loaded(error), 100);\n }\n }\n\n function run_callbacks() {\n try {\n root._bokeh_onload_callbacks.forEach(function(callback) {\n if (callback != null)\n callback();\n });\n } finally {\n delete root._bokeh_onload_callbacks\n }\n console.debug(\"Bokeh: all callbacks have finished\");\n }\n\n function load_libs(css_urls, js_urls, callback) {\n if (css_urls == null) css_urls = [];\n if (js_urls == null) js_urls = [];\n\n root._bokeh_onload_callbacks.push(callback);\n if (root._bokeh_is_loading > 0) {\n console.debug(\"Bokeh: BokehJS is being loaded, scheduling callback at\", now());\n return null;\n }\n if (js_urls == null || js_urls.length === 0) {\n run_callbacks();\n return null;\n }\n console.debug(\"Bokeh: BokehJS not loaded, scheduling load and callback at\", now());\n root._bokeh_is_loading = css_urls.length + js_urls.length;\n\n function on_load() {\n root._bokeh_is_loading--;\n if (root._bokeh_is_loading === 0) {\n console.debug(\"Bokeh: all BokehJS libraries/stylesheets loaded\");\n run_callbacks()\n }\n }\n\n function on_error(url) {\n console.error(\"failed to load \" + url);\n }\n\n for (let i = 0; i < css_urls.length; i++) {\n const url = css_urls[i];\n const element = document.createElement(\"link\");\n element.onload = on_load;\n element.onerror = on_error.bind(null, url);\n element.rel = \"stylesheet\";\n element.type = \"text/css\";\n element.href = url;\n console.debug(\"Bokeh: injecting link tag for BokehJS stylesheet: \", url);\n document.body.appendChild(element);\n }\n\n for (let i = 0; i < js_urls.length; i++) {\n const url = js_urls[i];\n const element = document.createElement('script');\n element.onload = on_load;\n element.onerror = on_error.bind(null, url);\n element.async = false;\n element.src = url;\n console.debug(\"Bokeh: injecting script tag for BokehJS library: \", url);\n document.head.appendChild(element);\n }\n };\n\n function inject_raw_css(css) {\n const element = document.createElement(\"style\");\n element.appendChild(document.createTextNode(css));\n document.body.appendChild(element);\n }\n\n const js_urls = [\"https://cdn.bokeh.org/bokeh/release/bokeh-3.6.1.min.js\", \"https://cdn.bokeh.org/bokeh/release/bokeh-gl-3.6.1.min.js\", \"https://cdn.bokeh.org/bokeh/release/bokeh-widgets-3.6.1.min.js\", \"https://cdn.bokeh.org/bokeh/release/bokeh-tables-3.6.1.min.js\", \"https://cdn.bokeh.org/bokeh/release/bokeh-mathjax-3.6.1.min.js\"];\n const css_urls = [];\n\n const inline_js = [ function(Bokeh) {\n Bokeh.set_log_level(\"info\");\n },\nfunction(Bokeh) {\n }\n ];\n\n function run_inline_js() {\n if (root.Bokeh !== undefined || force === true) {\n try {\n for (let i = 0; i < inline_js.length; i++) {\n inline_js[i].call(root, root.Bokeh);\n }\n\n } catch (error) {display_loaded(error);throw error;\n }if (force === true) {\n display_loaded();\n }} else if (Date.now() < root._bokeh_timeout) {\n setTimeout(run_inline_js, 100);\n } else if (!root._bokeh_failed_load) {\n console.log(\"Bokeh: BokehJS failed to load within specified timeout.\");\n root._bokeh_failed_load = true;\n } else if (force !== true) {\n const cell = $(document.getElementById(\"bc94028c-c42f-4b78-b7fe-1cda7b24e7ff\")).parents('.cell').data().cell;\n cell.output_area.append_execute_result(NB_LOAD_WARNING)\n }\n }\n\n if (root._bokeh_is_loading === 0) {\n console.debug(\"Bokeh: BokehJS loaded, going straight to plotting\");\n run_inline_js();\n } else {\n load_libs(css_urls, js_urls, function() {\n console.debug(\"Bokeh: BokehJS plotting callback run at\", now());\n run_inline_js();\n });\n }\n}(window));", + "application/vnd.bokehjs_load.v0+json": "" + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "import teehr\n", + "from pathlib import Path\n", + "import shutil\n", + "\n", + "# Tell Bokeh to output plots in the notebook\n", + "from bokeh.io import output_notebook\n", + "output_notebook()" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": { + "tags": [ + "hide-output" + ] + }, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Setting default log level to \"WARN\".\n", + "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n", + "25/01/06 16:53:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n" + ] + } + ], + "source": [ + "# Define the directory where the Evaluation will be created\n", + "test_eval_dir = Path(Path().home(), \"temp\", \"08_calculated_fields\")\n", + "shutil.rmtree(test_eval_dir, ignore_errors=True)\n", + "\n", + "# Create an Evaluation object and create the directory\n", + "ev = teehr.Evaluation(dir_path=test_eval_dir, create_dir=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Clone Evaluation Data form S3\n", + "As mentioned above, for this exercise we will be cloning a complete Evaluation dataset from the TEEHR S3 bucket. First we will list the available Evaluations and then we will clone the `e0_2_location_example` evaluation which is a small example Evaluation that only contains 2 gages." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
namedescriptionurl
0e0_2_location_exampleExample evaluation datsets with 2 USGS gagess3a://ciroh-rti-public-data/teehr-data-warehou...
1e1_camels_daily_streamflowDaily average streamflow at ther Camels basinss3a://ciroh-rti-public-data/teehr-data-warehou...
2e2_camels_hourly_streamflowHourly instantaneous streamflow at ther Camels...s3a://ciroh-rti-public-data/teehr-data-warehou...
3e3_usgs_hourly_streamflowHourly instantaneous streamflow at USGS CONUS ...s3a://ciroh-rti-public-data/teehr-data-warehou...
\n", + "
" + ], + "text/plain": [ + " name \\\n", + "0 e0_2_location_example \n", + "1 e1_camels_daily_streamflow \n", + "2 e2_camels_hourly_streamflow \n", + "3 e3_usgs_hourly_streamflow \n", + "\n", + " description \\\n", + "0 Example evaluation datsets with 2 USGS gages \n", + "1 Daily average streamflow at ther Camels basins \n", + "2 Hourly instantaneous streamflow at ther Camels... \n", + "3 Hourly instantaneous streamflow at USGS CONUS ... \n", + "\n", + " url \n", + "0 s3a://ciroh-rti-public-data/teehr-data-warehou... \n", + "1 s3a://ciroh-rti-public-data/teehr-data-warehou... \n", + "2 s3a://ciroh-rti-public-data/teehr-data-warehou... \n", + "3 s3a://ciroh-rti-public-data/teehr-data-warehou... " + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# List the evaluations in the S3 bucket\n", + "ev.list_s3_evaluations()" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": { + "tags": [ + "hide-output" + ] + }, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "25/01/06 16:53:21 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties\n", + "25/01/06 16:53:35 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.\n", + " \r" + ] + } + ], + "source": [ + "# Clone the e0_2_location_example evaluation from the S3 bucket\n", + "ev.clone_from_s3(\"e0_2_location_example\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Add Calculated Fields to Joined Timeseries" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now we will get right to it. Lets start by re-creating the `joined_timeseries` table and setting the `add_attrs=False` and `execute_udf=False` so that we get just the basic `joined_timeseries` table that joins the primary and secondary timeseries but does not add attributes or user defined fields. This step is not strictly necessary but makes it a bit easier to follow what we are doing in the subsequent steps. Lets take a look at the `joined_timeseries` table and see what fields are included." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+--------------+-------------------+-------------------+---------------------+-------------+---------------+---------+------+-------------------+--------------------+\n", + "|reference_time| value_time|primary_location_id|secondary_location_id|primary_value|secondary_value|unit_name|member| configuration_name| variable_name|\n", + "+--------------+-------------------+-------------------+---------------------+-------------+---------------+---------+------+-------------------+--------------------+\n", + "| NULL|2000-10-01 00:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 00:00:00| usgs-14138800| nwm30-23736071| 3.3413877| 0.06| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 01:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 01:00:00| usgs-14138800| nwm30-23736071| 3.9926753| 0.06| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 02:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 02:00:00| usgs-14138800| nwm30-23736071| 4.445745| 0.06| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 03:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 03:00:00| usgs-14138800| nwm30-23736071| 5.408518| 0.06| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 04:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 04:00:00| usgs-14138800| nwm30-23736071| 5.6067357| 0.06| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 05:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 05:00:00| usgs-14138800| nwm30-23736071| 5.153666| 0.06| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 06:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 06:00:00| usgs-14138800| nwm30-23736071| 4.5590124| 0.06| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 07:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 07:00:00| usgs-14138800| nwm30-23736071| 5.2952504| 0.06| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 08:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 08:00:00| usgs-14138800| nwm30-23736071| 7.730499| 0.06| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 09:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 09:00:00| usgs-14138800| nwm30-23736071| 9.825946| 0.07| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "+--------------+-------------------+-------------------+---------------------+-------------+---------------+---------+------+-------------------+--------------------+\n", + "only showing top 20 rows\n", + "\n" + ] + } + ], + "source": [ + "ev.joined_timeseries.create(add_attrs=False, execute_scripts=False)\n", + "ev.joined_timeseries.to_sdf().show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Ok, so, as shown, the `joined_timeseries` table now only contains the basic fields required without any attributes or user defined fields. Now lets add some fields to the `joined_timeseries` table using the TEEHR calculated fields. Adding fields can be useful for grouping and filtering operations. The TEEHR calculated fields (CF) allow for row level and timeseries aware CFs:\n", + "\n", + "* Row level CFs are applied to each row in the table based on data that is in one or more existing fields. These are applied per row and are not aware of the data in any other row (e.g., are not aware of any other timeseries values in a \"timeseries\"). This can be used for adding fields such as a field based on the data/time (e.g., month, year, season, etc.) or based on the value field (e.g., normalized flow, log flow, etc.) and many other uses.\n", + "* Timeseries aware CFs are aware of ordered groups of data (e.g., a timeseries). This is useful for things such as event detection, base flow separation, and other fields that need to be calculated based on a entire collection of timeseries values. The definition of what creates a unique set of timeseries (i.e., a timeseries) can be specified.\n", + "\n", + "There are two ways that these CFs can be used: \n", + "\n", + "* First, they can be used to add the CF to the `joined_timeseries` which can then be persisted by writing to disk. This is useful if the calculation is expected to be needed for multiple different metric calculations.\n", + "* Second, they can be used as a pre-processing step in the calculation of metrics.\n", + "\n", + "These use cases will be demonstrated below. First we will import the CF classes. Normally this would be done at the top of the page, but is done here for demonstration purposes." + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "# first we need to import the UDF classes\n", + "from teehr import RowLevelCalculatedFields as rcf\n", + "from teehr import TimeseriesAwareCalculatedFields as tcf" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Available Calculated Fields\n", + "There are a number of calculated field classes that are included in the TEEHR package. They are:\n", + "\n", + "Row Level Calculated Fields:\n", + "- Month\n", + "- Year\n", + "- WaterYear\n", + "- NormalizedFlow\n", + "- Seasons\n", + "\n", + "Timeseries Aware Calculated Fields.\n", + "- PercentileEventDetection\n", + "\n", + "There will be more added over time. If there is one you are particularly interested in, please reach out and let us know." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Add Calculated Field in Memory\n", + "Now we will use the Row level CFs to add year, month, water year and season to the `joined_timeseries` table in memory, using the `add_udf_columns()` method on the `joined_timeseries` table, but will not save it to disk yet. Adding the UDFs and displaying the table shows that the new fields were added." + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "[Stage 73:> (0 + 1) / 1]\r" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+--------------+-------------------+-------------------+---------------------+-------------+---------------+---------+------+-------------------+--------------------+-----+----+----------+------+\n", + "|reference_time| value_time|primary_location_id|secondary_location_id|primary_value|secondary_value|unit_name|member| configuration_name| variable_name|month|year|water_year|season|\n", + "+--------------+-------------------+-------------------+---------------------+-------------+---------------+---------+------+-------------------+--------------------+-----+----+----------+------+\n", + "| NULL|2000-10-01 00:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...| 10|2000| 2001| fall|\n", + "| NULL|2000-10-01 00:00:00| usgs-14138800| nwm30-23736071| 3.3413877| 0.06| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...| 10|2000| 2001| fall|\n", + "| NULL|2000-10-01 01:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...| 10|2000| 2001| fall|\n", + "| NULL|2000-10-01 01:00:00| usgs-14138800| nwm30-23736071| 3.9926753| 0.06| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...| 10|2000| 2001| fall|\n", + "| NULL|2000-10-01 02:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...| 10|2000| 2001| fall|\n", + "| NULL|2000-10-01 02:00:00| usgs-14138800| nwm30-23736071| 4.445745| 0.06| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...| 10|2000| 2001| fall|\n", + "| NULL|2000-10-01 03:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...| 10|2000| 2001| fall|\n", + "| NULL|2000-10-01 03:00:00| usgs-14138800| nwm30-23736071| 5.408518| 0.06| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...| 10|2000| 2001| fall|\n", + "| NULL|2000-10-01 04:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...| 10|2000| 2001| fall|\n", + "| NULL|2000-10-01 04:00:00| usgs-14138800| nwm30-23736071| 5.6067357| 0.06| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...| 10|2000| 2001| fall|\n", + "| NULL|2000-10-01 05:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...| 10|2000| 2001| fall|\n", + "| NULL|2000-10-01 05:00:00| usgs-14138800| nwm30-23736071| 5.153666| 0.06| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...| 10|2000| 2001| fall|\n", + "| NULL|2000-10-01 06:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...| 10|2000| 2001| fall|\n", + "| NULL|2000-10-01 06:00:00| usgs-14138800| nwm30-23736071| 4.5590124| 0.06| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...| 10|2000| 2001| fall|\n", + "| NULL|2000-10-01 07:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...| 10|2000| 2001| fall|\n", + "| NULL|2000-10-01 07:00:00| usgs-14138800| nwm30-23736071| 5.2952504| 0.06| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...| 10|2000| 2001| fall|\n", + "| NULL|2000-10-01 08:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...| 10|2000| 2001| fall|\n", + "| NULL|2000-10-01 08:00:00| usgs-14138800| nwm30-23736071| 7.730499| 0.06| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...| 10|2000| 2001| fall|\n", + "| NULL|2000-10-01 09:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...| 10|2000| 2001| fall|\n", + "| NULL|2000-10-01 09:00:00| usgs-14138800| nwm30-23736071| 9.825946| 0.07| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...| 10|2000| 2001| fall|\n", + "+--------------+-------------------+-------------------+---------------------+-------------+---------------+---------+------+-------------------+--------------------+-----+----+----------+------+\n", + "only showing top 20 rows\n", + "\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + " \r" + ] + } + ], + "source": [ + "sdf = ev.joined_timeseries.add_calculated_fields([\n", + " rcf.Month(),\n", + " rcf.Year(),\n", + " rcf.WaterYear(),\n", + " rcf.Seasons()\n", + "]).to_sdf()\n", + "sdf.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "However, if we query the table again, we can see that the additional fields are not there. This is because we did not write the table with the additional fields to disk." + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+--------------+-------------------+-------------------+---------------------+-------------+---------------+---------+------+-------------------+--------------------+\n", + "|reference_time| value_time|primary_location_id|secondary_location_id|primary_value|secondary_value|unit_name|member| configuration_name| variable_name|\n", + "+--------------+-------------------+-------------------+---------------------+-------------+---------------+---------+------+-------------------+--------------------+\n", + "| NULL|2000-10-01 00:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 00:00:00| usgs-14138800| nwm30-23736071| 3.3413877| 0.06| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 01:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 01:00:00| usgs-14138800| nwm30-23736071| 3.9926753| 0.06| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 02:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 02:00:00| usgs-14138800| nwm30-23736071| 4.445745| 0.06| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 03:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 03:00:00| usgs-14138800| nwm30-23736071| 5.408518| 0.06| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 04:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 04:00:00| usgs-14138800| nwm30-23736071| 5.6067357| 0.06| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 05:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 05:00:00| usgs-14138800| nwm30-23736071| 5.153666| 0.06| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 06:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 06:00:00| usgs-14138800| nwm30-23736071| 4.5590124| 0.06| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 07:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 07:00:00| usgs-14138800| nwm30-23736071| 5.2952504| 0.06| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 08:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 08:00:00| usgs-14138800| nwm30-23736071| 7.730499| 0.06| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 09:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 09:00:00| usgs-14138800| nwm30-23736071| 9.825946| 0.07| m^3/s| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "+--------------+-------------------+-------------------+---------------------+-------------+---------------+---------+------+-------------------+--------------------+\n", + "only showing top 20 rows\n", + "\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/Users/mdenno/.pyenv/versions/3.10.15/lib/python3.10/multiprocessing/resource_tracker.py:224: UserWarning: resource_tracker: There appear to be 1 leaked semaphore objects to clean up at shutdown\n", + " warnings.warn('resource_tracker: There appear to be %d '\n" + ] + } + ], + "source": [ + "ev.joined_timeseries.to_sdf().show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Add Calculated Field and Persist\n", + "Now we will add the UDFs again, but this time we will write the table with the additional fields to disk so they are persisted." + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + " \r" + ] + } + ], + "source": [ + "ev.joined_timeseries.add_calculated_fields([\n", + " rcf.Month(),\n", + " rcf.Year(),\n", + " rcf.WaterYear(),\n", + " rcf.Seasons()\n", + "]).write()\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "And query the table again. This time we can see that the new fields we added are there as they were written to disk and are now part of the `joined_timeseries` table on disk." + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+--------------+-------------------+-------------------+---------------------+-------------+---------------+---------+------+-----+----+----------+------+-------------------+--------------------+\n", + "|reference_time| value_time|primary_location_id|secondary_location_id|primary_value|secondary_value|unit_name|member|month|year|water_year|season| configuration_name| variable_name|\n", + "+--------------+-------------------+-------------------+---------------------+-------------+---------------+---------+------+-----+----+----------+------+-------------------+--------------------+\n", + "| NULL|2000-10-01 00:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL| 10|2000| 2001| fall|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 00:00:00| usgs-14138800| nwm30-23736071| 3.3413877| 0.06| m^3/s| NULL| 10|2000| 2001| fall|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 01:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL| 10|2000| 2001| fall|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 01:00:00| usgs-14138800| nwm30-23736071| 3.9926753| 0.06| m^3/s| NULL| 10|2000| 2001| fall|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 02:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL| 10|2000| 2001| fall|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 02:00:00| usgs-14138800| nwm30-23736071| 4.445745| 0.06| m^3/s| NULL| 10|2000| 2001| fall|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 03:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL| 10|2000| 2001| fall|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 03:00:00| usgs-14138800| nwm30-23736071| 5.408518| 0.06| m^3/s| NULL| 10|2000| 2001| fall|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 04:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL| 10|2000| 2001| fall|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 04:00:00| usgs-14138800| nwm30-23736071| 5.6067357| 0.06| m^3/s| NULL| 10|2000| 2001| fall|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 05:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL| 10|2000| 2001| fall|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 05:00:00| usgs-14138800| nwm30-23736071| 5.153666| 0.06| m^3/s| NULL| 10|2000| 2001| fall|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 06:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL| 10|2000| 2001| fall|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 06:00:00| usgs-14138800| nwm30-23736071| 4.5590124| 0.06| m^3/s| NULL| 10|2000| 2001| fall|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 07:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL| 10|2000| 2001| fall|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 07:00:00| usgs-14138800| nwm30-23736071| 5.2952504| 0.06| m^3/s| NULL| 10|2000| 2001| fall|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 08:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL| 10|2000| 2001| fall|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 08:00:00| usgs-14138800| nwm30-23736071| 7.730499| 0.06| m^3/s| NULL| 10|2000| 2001| fall|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 09:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL| 10|2000| 2001| fall|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 09:00:00| usgs-14138800| nwm30-23736071| 9.825946| 0.07| m^3/s| NULL| 10|2000| 2001| fall|nwm30_retrospective|streamflow_hourly...|\n", + "+--------------+-------------------+-------------------+---------------------+-------------+---------------+---------+------+-----+----+----------+------+-------------------+--------------------+\n", + "only showing top 20 rows\n", + "\n" + ] + } + ], + "source": [ + "ev.joined_timeseries.to_sdf().show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Timeseries Aware Calculated Fields\n", + "The timeseries aware calculated fields behave the same way from the users perspective, but behind the scenes are performing some extra grouping and sorting to ensure that the field is calculated based on an ordered group of timeseries values (i.e., a \"timeseries\"). This is necessary for doing things like event detection, but comes at a computational cost, so use with care, especially on large datasets. Lets try it. This time we will jump right to writing the resulting data frame back to disk to persist it, but you could add the field and display the results without persisting as we did above." + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + " \r" + ] + } + ], + "source": [ + "ev.joined_timeseries.add_calculated_fields([\n", + " tcf.PercentileEventDetection()\n", + "]).write()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "And query the `joined_timeseries` table to see the new `event` and `event_id` fields." + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+--------------+-------------------+-------------------+---------------------+-------------+---------------+---------+------+-----+----+----------+------+-----+--------+-------------------+--------------------+\n", + "|reference_time| value_time|primary_location_id|secondary_location_id|primary_value|secondary_value|unit_name|member|month|year|water_year|season|event|event_id| configuration_name| variable_name|\n", + "+--------------+-------------------+-------------------+---------------------+-------------+---------------+---------+------+-----+----+----------+------+-----+--------+-------------------+--------------------+\n", + "| NULL|2000-10-01 00:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL| 10|2000| 2001| fall|false| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 01:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL| 10|2000| 2001| fall|false| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 02:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL| 10|2000| 2001| fall|false| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 03:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL| 10|2000| 2001| fall|false| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 04:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL| 10|2000| 2001| fall|false| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 05:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL| 10|2000| 2001| fall|false| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 06:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL| 10|2000| 2001| fall|false| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 07:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL| 10|2000| 2001| fall|false| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 08:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL| 10|2000| 2001| fall|false| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 09:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL| 10|2000| 2001| fall|false| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 10:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL| 10|2000| 2001| fall|false| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 11:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL| 10|2000| 2001| fall|false| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 12:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL| 10|2000| 2001| fall|false| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 13:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL| 10|2000| 2001| fall|false| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 14:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL| 10|2000| 2001| fall|false| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 15:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL| 10|2000| 2001| fall|false| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 16:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL| 10|2000| 2001| fall|false| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 17:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL| 10|2000| 2001| fall|false| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 18:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL| 10|2000| 2001| fall|false| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "| NULL|2000-10-01 19:00:00| usgs-14316700| nwm30-23894572| 1.1326739| 0.38| m^3/s| NULL| 10|2000| 2001| fall|false| NULL|nwm30_retrospective|streamflow_hourly...|\n", + "+--------------+-------------------+-------------------+---------------------+-------------+---------------+---------+------+-----+----+----------+------+-----+--------+-------------------+--------------------+\n", + "only showing top 20 rows\n", + "\n" + ] + } + ], + "source": [ + "ev.joined_timeseries.to_sdf().show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "In the table above you can see the `event` and `event_id` fields, but it is a bit difficult to see what was done based on the table alone. Lets create a plot to see what the new fields mean. In the next few cells we will query the `joined_timeseries` table and filter the data to a single location ('usgs-14138800') and only values that were identified as `event=True`, then create a plot where we color the points by the new `event_id` field." + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "application/javascript": "(function(root) {\n function now() {\n return new Date();\n }\n\n const force = true;\n const py_version = '3.6.1'.replace('rc', '-rc.').replace('.dev', '-dev.');\n const reloading = false;\n const Bokeh = root.Bokeh;\n\n // Set a timeout for this load but only if we are not already initializing\n if (typeof (root._bokeh_timeout) === \"undefined\" || (force || !root._bokeh_is_initializing)) {\n root._bokeh_timeout = Date.now() + 5000;\n root._bokeh_failed_load = false;\n }\n\n function run_callbacks() {\n try {\n root._bokeh_onload_callbacks.forEach(function(callback) {\n if (callback != null)\n callback();\n });\n } finally {\n delete root._bokeh_onload_callbacks;\n }\n console.debug(\"Bokeh: all callbacks have finished\");\n }\n\n function load_libs(css_urls, js_urls, js_modules, js_exports, callback) {\n if (css_urls == null) css_urls = [];\n if (js_urls == null) js_urls = [];\n if (js_modules == null) js_modules = [];\n if (js_exports == null) js_exports = {};\n\n root._bokeh_onload_callbacks.push(callback);\n\n if (root._bokeh_is_loading > 0) {\n // Don't load bokeh if it is still initializing\n console.debug(\"Bokeh: BokehJS is being loaded, scheduling callback at\", now());\n return null;\n } else if (js_urls.length === 0 && js_modules.length === 0 && Object.keys(js_exports).length === 0) {\n // There is nothing to load\n run_callbacks();\n return null;\n }\n\n function on_load() {\n root._bokeh_is_loading--;\n if (root._bokeh_is_loading === 0) {\n console.debug(\"Bokeh: all BokehJS libraries/stylesheets loaded\");\n run_callbacks()\n }\n }\n window._bokeh_on_load = on_load\n\n function on_error(e) {\n const src_el = e.srcElement\n console.error(\"failed to load \" + (src_el.href || src_el.src));\n }\n\n const skip = [];\n if (window.requirejs) {\n window.requirejs.config({'packages': {}, 'paths': {}, 'shim': {}});\n root._bokeh_is_loading = css_urls.length + 0;\n } else {\n root._bokeh_is_loading = css_urls.length + js_urls.length + js_modules.length + Object.keys(js_exports).length;\n }\n\n const existing_stylesheets = []\n const links = document.getElementsByTagName('link')\n for (let i = 0; i < links.length; i++) {\n const link = links[i]\n if (link.href != null) {\n existing_stylesheets.push(link.href)\n }\n }\n for (let i = 0; i < css_urls.length; i++) {\n const url = css_urls[i];\n const escaped = encodeURI(url)\n if (existing_stylesheets.indexOf(escaped) !== -1) {\n on_load()\n continue;\n }\n const element = document.createElement(\"link\");\n element.onload = on_load;\n element.onerror = on_error;\n element.rel = \"stylesheet\";\n element.type = \"text/css\";\n element.href = url;\n console.debug(\"Bokeh: injecting link tag for BokehJS stylesheet: \", url);\n document.body.appendChild(element);\n } var existing_scripts = []\n const scripts = document.getElementsByTagName('script')\n for (let i = 0; i < scripts.length; i++) {\n var script = scripts[i]\n if (script.src != null) {\n existing_scripts.push(script.src)\n }\n }\n for (let i = 0; i < js_urls.length; i++) {\n const url = js_urls[i];\n const escaped = encodeURI(url)\n if (skip.indexOf(escaped) !== -1 || existing_scripts.indexOf(escaped) !== -1) {\n if (!window.requirejs) {\n on_load();\n }\n continue;\n }\n const element = document.createElement('script');\n element.onload = on_load;\n element.onerror = on_error;\n element.async = false;\n element.src = url;\n console.debug(\"Bokeh: injecting script tag for BokehJS library: \", url);\n document.head.appendChild(element);\n }\n for (let i = 0; i < js_modules.length; i++) {\n const url = js_modules[i];\n const escaped = encodeURI(url)\n if (skip.indexOf(escaped) !== -1 || existing_scripts.indexOf(escaped) !== -1) {\n if (!window.requirejs) {\n on_load();\n }\n continue;\n }\n var element = document.createElement('script');\n element.onload = on_load;\n element.onerror = on_error;\n element.async = false;\n element.src = url;\n element.type = \"module\";\n console.debug(\"Bokeh: injecting script tag for BokehJS library: \", url);\n document.head.appendChild(element);\n }\n for (const name in js_exports) {\n const url = js_exports[name];\n const escaped = encodeURI(url)\n if (skip.indexOf(escaped) >= 0 || root[name] != null) {\n if (!window.requirejs) {\n on_load();\n }\n continue;\n }\n var element = document.createElement('script');\n element.onerror = on_error;\n element.async = false;\n element.type = \"module\";\n console.debug(\"Bokeh: injecting script tag for BokehJS library: \", url);\n element.textContent = `\n import ${name} from \"${url}\"\n window.${name} = ${name}\n window._bokeh_on_load()\n `\n document.head.appendChild(element);\n }\n if (!js_urls.length && !js_modules.length) {\n on_load()\n }\n };\n\n function inject_raw_css(css) {\n const element = document.createElement(\"style\");\n element.appendChild(document.createTextNode(css));\n document.body.appendChild(element);\n }\n\n const js_urls = [\"https://cdn.holoviz.org/panel/1.5.4/dist/bundled/reactiveesm/es-module-shims@^1.10.0/dist/es-module-shims.min.js\", \"https://cdn.bokeh.org/bokeh/release/bokeh-3.6.1.min.js\", \"https://cdn.bokeh.org/bokeh/release/bokeh-gl-3.6.1.min.js\", \"https://cdn.bokeh.org/bokeh/release/bokeh-widgets-3.6.1.min.js\", \"https://cdn.bokeh.org/bokeh/release/bokeh-tables-3.6.1.min.js\", \"https://cdn.holoviz.org/panel/1.5.4/dist/panel.min.js\"];\n const js_modules = [];\n const js_exports = {};\n const css_urls = [];\n const inline_js = [ function(Bokeh) {\n Bokeh.set_log_level(\"info\");\n },\nfunction(Bokeh) {} // ensure no trailing comma for IE\n ];\n\n function run_inline_js() {\n if ((root.Bokeh !== undefined) || (force === true)) {\n for (let i = 0; i < inline_js.length; i++) {\n try {\n inline_js[i].call(root, root.Bokeh);\n } catch(e) {\n if (!reloading) {\n throw e;\n }\n }\n }\n // Cache old bokeh versions\n if (Bokeh != undefined && !reloading) {\n var NewBokeh = root.Bokeh;\n if (Bokeh.versions === undefined) {\n Bokeh.versions = new Map();\n }\n if (NewBokeh.version !== Bokeh.version) {\n Bokeh.versions.set(NewBokeh.version, NewBokeh)\n }\n root.Bokeh = Bokeh;\n }\n } else if (Date.now() < root._bokeh_timeout) {\n setTimeout(run_inline_js, 100);\n } else if (!root._bokeh_failed_load) {\n console.log(\"Bokeh: BokehJS failed to load within specified timeout.\");\n root._bokeh_failed_load = true;\n }\n root._bokeh_is_initializing = false\n }\n\n function load_or_wait() {\n // Implement a backoff loop that tries to ensure we do not load multiple\n // versions of Bokeh and its dependencies at the same time.\n // In recent versions we use the root._bokeh_is_initializing flag\n // to determine whether there is an ongoing attempt to initialize\n // bokeh, however for backward compatibility we also try to ensure\n // that we do not start loading a newer (Panel>=1.0 and Bokeh>3) version\n // before older versions are fully initialized.\n if (root._bokeh_is_initializing && Date.now() > root._bokeh_timeout) {\n // If the timeout and bokeh was not successfully loaded we reset\n // everything and try loading again\n root._bokeh_timeout = Date.now() + 5000;\n root._bokeh_is_initializing = false;\n root._bokeh_onload_callbacks = undefined;\n root._bokeh_is_loading = 0\n console.log(\"Bokeh: BokehJS was loaded multiple times but one version failed to initialize.\");\n load_or_wait();\n } else if (root._bokeh_is_initializing || (typeof root._bokeh_is_initializing === \"undefined\" && root._bokeh_onload_callbacks !== undefined)) {\n setTimeout(load_or_wait, 100);\n } else {\n root._bokeh_is_initializing = true\n root._bokeh_onload_callbacks = []\n const bokeh_loaded = root.Bokeh != null && (root.Bokeh.version === py_version || (root.Bokeh.versions !== undefined && root.Bokeh.versions.has(py_version)));\n if (!reloading && !bokeh_loaded) {\n if (root.Bokeh) {\n root.Bokeh = undefined;\n }\n console.debug(\"Bokeh: BokehJS not loaded, scheduling load and callback at\", now());\n }\n load_libs(css_urls, js_urls, js_modules, js_exports, function() {\n console.debug(\"Bokeh: BokehJS plotting callback run at\", now());\n run_inline_js();\n });\n }\n }\n // Give older versions of the autoload script a head-start to ensure\n // they initialize before we start loading newer version.\n setTimeout(load_or_wait, 100)\n}(window));", + "application/vnd.holoviews_load.v0+json": "" + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "application/javascript": "\nif ((window.PyViz === undefined) || (window.PyViz instanceof HTMLElement)) {\n window.PyViz = {comms: {}, comm_status:{}, kernels:{}, receivers: {}, plot_index: []}\n}\n\n\n function JupyterCommManager() {\n }\n\n JupyterCommManager.prototype.register_target = function(plot_id, comm_id, msg_handler) {\n if (window.comm_manager || ((window.Jupyter !== undefined) && (Jupyter.notebook.kernel != null))) {\n var comm_manager = window.comm_manager || Jupyter.notebook.kernel.comm_manager;\n comm_manager.register_target(comm_id, function(comm) {\n comm.on_msg(msg_handler);\n });\n } else if ((plot_id in window.PyViz.kernels) && (window.PyViz.kernels[plot_id])) {\n window.PyViz.kernels[plot_id].registerCommTarget(comm_id, function(comm) {\n comm.onMsg = msg_handler;\n });\n } else if (typeof google != 'undefined' && google.colab.kernel != null) {\n google.colab.kernel.comms.registerTarget(comm_id, (comm) => {\n var messages = comm.messages[Symbol.asyncIterator]();\n function processIteratorResult(result) {\n var message = result.value;\n console.log(message)\n var content = {data: message.data, comm_id};\n var buffers = []\n for (var buffer of message.buffers || []) {\n buffers.push(new DataView(buffer))\n }\n var metadata = message.metadata || {};\n var msg = {content, buffers, metadata}\n msg_handler(msg);\n return messages.next().then(processIteratorResult);\n }\n return messages.next().then(processIteratorResult);\n })\n }\n }\n\n JupyterCommManager.prototype.get_client_comm = function(plot_id, comm_id, msg_handler) {\n if (comm_id in window.PyViz.comms) {\n return window.PyViz.comms[comm_id];\n } else if (window.comm_manager || ((window.Jupyter !== undefined) && (Jupyter.notebook.kernel != null))) {\n var comm_manager = window.comm_manager || Jupyter.notebook.kernel.comm_manager;\n var comm = comm_manager.new_comm(comm_id, {}, {}, {}, comm_id);\n if (msg_handler) {\n comm.on_msg(msg_handler);\n }\n } else if ((plot_id in window.PyViz.kernels) && (window.PyViz.kernels[plot_id])) {\n var comm = window.PyViz.kernels[plot_id].connectToComm(comm_id);\n comm.open();\n if (msg_handler) {\n comm.onMsg = msg_handler;\n }\n } else if (typeof google != 'undefined' && google.colab.kernel != null) {\n var comm_promise = google.colab.kernel.comms.open(comm_id)\n comm_promise.then((comm) => {\n window.PyViz.comms[comm_id] = comm;\n if (msg_handler) {\n var messages = comm.messages[Symbol.asyncIterator]();\n function processIteratorResult(result) {\n var message = result.value;\n var content = {data: message.data};\n var metadata = message.metadata || {comm_id};\n var msg = {content, metadata}\n msg_handler(msg);\n return messages.next().then(processIteratorResult);\n }\n return messages.next().then(processIteratorResult);\n }\n }) \n var sendClosure = (data, metadata, buffers, disposeOnDone) => {\n return comm_promise.then((comm) => {\n comm.send(data, metadata, buffers, disposeOnDone);\n });\n };\n var comm = {\n send: sendClosure\n };\n }\n window.PyViz.comms[comm_id] = comm;\n return comm;\n }\n window.PyViz.comm_manager = new JupyterCommManager();\n \n\n\nvar JS_MIME_TYPE = 'application/javascript';\nvar HTML_MIME_TYPE = 'text/html';\nvar EXEC_MIME_TYPE = 'application/vnd.holoviews_exec.v0+json';\nvar CLASS_NAME = 'output';\n\n/**\n * Render data to the DOM node\n */\nfunction render(props, node) {\n var div = document.createElement(\"div\");\n var script = document.createElement(\"script\");\n node.appendChild(div);\n node.appendChild(script);\n}\n\n/**\n * Handle when a new output is added\n */\nfunction handle_add_output(event, handle) {\n var output_area = handle.output_area;\n var output = handle.output;\n if ((output.data == undefined) || (!output.data.hasOwnProperty(EXEC_MIME_TYPE))) {\n return\n }\n var id = output.metadata[EXEC_MIME_TYPE][\"id\"];\n var toinsert = output_area.element.find(\".\" + CLASS_NAME.split(' ')[0]);\n if (id !== undefined) {\n var nchildren = toinsert.length;\n var html_node = toinsert[nchildren-1].children[0];\n html_node.innerHTML = output.data[HTML_MIME_TYPE];\n var scripts = [];\n var nodelist = html_node.querySelectorAll(\"script\");\n for (var i in nodelist) {\n if (nodelist.hasOwnProperty(i)) {\n scripts.push(nodelist[i])\n }\n }\n\n scripts.forEach( function (oldScript) {\n var newScript = document.createElement(\"script\");\n var attrs = [];\n var nodemap = oldScript.attributes;\n for (var j in nodemap) {\n if (nodemap.hasOwnProperty(j)) {\n attrs.push(nodemap[j])\n }\n }\n attrs.forEach(function(attr) { newScript.setAttribute(attr.name, attr.value) });\n newScript.appendChild(document.createTextNode(oldScript.innerHTML));\n oldScript.parentNode.replaceChild(newScript, oldScript);\n });\n if (JS_MIME_TYPE in output.data) {\n toinsert[nchildren-1].children[1].textContent = output.data[JS_MIME_TYPE];\n }\n output_area._hv_plot_id = id;\n if ((window.Bokeh !== undefined) && (id in Bokeh.index)) {\n window.PyViz.plot_index[id] = Bokeh.index[id];\n } else {\n window.PyViz.plot_index[id] = null;\n }\n } else if (output.metadata[EXEC_MIME_TYPE][\"server_id\"] !== undefined) {\n var bk_div = document.createElement(\"div\");\n bk_div.innerHTML = output.data[HTML_MIME_TYPE];\n var script_attrs = bk_div.children[0].attributes;\n for (var i = 0; i < script_attrs.length; i++) {\n toinsert[toinsert.length - 1].childNodes[1].setAttribute(script_attrs[i].name, script_attrs[i].value);\n }\n // store reference to server id on output_area\n output_area._bokeh_server_id = output.metadata[EXEC_MIME_TYPE][\"server_id\"];\n }\n}\n\n/**\n * Handle when an output is cleared or removed\n */\nfunction handle_clear_output(event, handle) {\n var id = handle.cell.output_area._hv_plot_id;\n var server_id = handle.cell.output_area._bokeh_server_id;\n if (((id === undefined) || !(id in PyViz.plot_index)) && (server_id !== undefined)) { return; }\n var comm = window.PyViz.comm_manager.get_client_comm(\"hv-extension-comm\", \"hv-extension-comm\", function () {});\n if (server_id !== null) {\n comm.send({event_type: 'server_delete', 'id': server_id});\n return;\n } else if (comm !== null) {\n comm.send({event_type: 'delete', 'id': id});\n }\n delete PyViz.plot_index[id];\n if ((window.Bokeh !== undefined) & (id in window.Bokeh.index)) {\n var doc = window.Bokeh.index[id].model.document\n doc.clear();\n const i = window.Bokeh.documents.indexOf(doc);\n if (i > -1) {\n window.Bokeh.documents.splice(i, 1);\n }\n }\n}\n\n/**\n * Handle kernel restart event\n */\nfunction handle_kernel_cleanup(event, handle) {\n delete PyViz.comms[\"hv-extension-comm\"];\n window.PyViz.plot_index = {}\n}\n\n/**\n * Handle update_display_data messages\n */\nfunction handle_update_output(event, handle) {\n handle_clear_output(event, {cell: {output_area: handle.output_area}})\n handle_add_output(event, handle)\n}\n\nfunction register_renderer(events, OutputArea) {\n function append_mime(data, metadata, element) {\n // create a DOM node to render to\n var toinsert = this.create_output_subarea(\n metadata,\n CLASS_NAME,\n EXEC_MIME_TYPE\n );\n this.keyboard_manager.register_events(toinsert);\n // Render to node\n var props = {data: data, metadata: metadata[EXEC_MIME_TYPE]};\n render(props, toinsert[0]);\n element.append(toinsert);\n return toinsert\n }\n\n events.on('output_added.OutputArea', handle_add_output);\n events.on('output_updated.OutputArea', handle_update_output);\n events.on('clear_output.CodeCell', handle_clear_output);\n events.on('delete.Cell', handle_clear_output);\n events.on('kernel_ready.Kernel', handle_kernel_cleanup);\n\n OutputArea.prototype.register_mime_type(EXEC_MIME_TYPE, append_mime, {\n safe: true,\n index: 0\n });\n}\n\nif (window.Jupyter !== undefined) {\n try {\n var events = require('base/js/events');\n var OutputArea = require('notebook/js/outputarea').OutputArea;\n if (OutputArea.prototype.mime_types().indexOf(EXEC_MIME_TYPE) == -1) {\n register_renderer(events, OutputArea);\n }\n } catch(err) {\n }\n}\n", + "application/vnd.holoviews_load.v0+json": "" + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "application/vnd.holoviews_exec.v0+json": "", + "text/html": [ + "
\n", + "
\n", + "
\n", + "" + ] + }, + "metadata": { + "application/vnd.holoviews_exec.v0+json": { + "id": "fcbbcab4-04b6-4340-b105-75ba82ce3266" + } + }, + "output_type": "display_data" + } + ], + "source": [ + "import hvplot.pandas # noqa" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "metadata": {}, + "outputs": [], + "source": [ + "pdf = ev.joined_timeseries.filter([\n", + " \"primary_location_id = 'usgs-14138800'\",\n", + " \"event = true\",\n", + "]).to_pandas()" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "metadata": {}, + "outputs": [], + "source": [ + "primary_plot = pdf.hvplot.points(x=\"value_time\", y=\"primary_value\", color=\"event_id\") #.opts(width=1200, height=400)" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "metadata": {}, + "outputs": [ + { + "data": {}, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "application/vnd.holoviews_exec.v0+json": "", + "text/html": [ + "
\n", + "
\n", + "
\n", + "" + ], + "text/plain": [ + ":Points [value_time,primary_value] (event_id)" + ] + }, + "execution_count": 16, + "metadata": { + "application/vnd.holoviews_exec.v0+json": { + "id": "19daf080-733d-4519-82be-e92fcdc694f8" + } + }, + "output_type": "execute_result" + } + ], + "source": [ + "primary_plot.opts(width=1200, height=400)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "If you zoom in on the plot above, you can see that each unique `event` has been given a unique `event_id` that was used to color the individual data points. This was done by first identifying the 85 percentile value at each location and identifying each value as either above or below that value for its location. Then the values were grouped by continuous sets of values that were identified as `event=True` and each continuous group was given an `event_id`. This is just one example of what can be done using `TimeseriesAwareCalculatedFields`." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Use Calculated Fields in Metrics" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Beyond just identifying high flow events (in this example), the `event` and `event_id` fields can be used in subsequent metrics calculations. In the next few cells we will demonstrate one way that the `event` and `event_id` fields can be used by working through the steps to calculate the `event_max_relative_bias`. The `event_max_relative_bias` is the relative bias between the primary and secondary timeseries maximum values within each event. We calculate this in twos steps using chained queries. We will do it in two steps to demonstrate what we are doing. Note, if you have not worked though the grouping and filter notebooks you may want to go back and do that first as it is an important concept to understanding what is being done here. First we run a metrics query where we filter to a single location and only values that were identified as being `event=True`, group by `configuration_name`, `primary_location_id` and `event_id`, and calculate the maximum primary and secondary values which we call `max_primary_value` and `max_secondary_value` but could give them any name we wanted. " + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "[Stage 104:> (0 + 1) / 1]\r" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-------------------+-------------------+--------------------+-----------------+-------------------+\n", + "| configuration_name|primary_location_id| event_id|max_primary_value|max_secondary_value|\n", + "+-------------------+-------------------+--------------------+-----------------+-------------------+\n", + "|nwm30_retrospective| usgs-14138800|2000-10-01 00:00:...| 11.015253| 0.07|\n", + "|nwm30_retrospective| usgs-14138800|2000-10-21 03:00:...| 5.153666| 0.96|\n", + "|nwm30_retrospective| usgs-14138800|2000-11-08 15:00:...| 3.68119| 1.43|\n", + "|nwm30_retrospective| usgs-14138800|2000-11-27 09:00:...| 7.5039644| 1.24|\n", + "|nwm30_retrospective| usgs-14138800|2000-12-15 03:00:...| 2.8600016| 0.64|\n", + "|nwm30_retrospective| usgs-14138800|2000-12-15 07:00:...| 3.68119| 0.64|\n", + "|nwm30_retrospective| usgs-14138800|2000-12-17 04:00:...| 4.049309| 1.24|\n", + "|nwm30_retrospective| usgs-14138800|2000-12-22 23:00:...| 5.5501018| 1.1|\n", + "|nwm30_retrospective| usgs-14138800|2001-01-05 15:00:...| 3.3413877| 2.47|\n", + "|nwm30_retrospective| usgs-14138800|2001-02-05 01:00:...| 7.2207956| 8.19|\n", + "|nwm30_retrospective| usgs-14138800|2001-03-18 14:00:...| 12.884165| 11.99|\n", + "|nwm30_retrospective| usgs-14138800|2001-03-25 12:00:...| 3.3413877| 4.33|\n", + "|nwm30_retrospective| usgs-14138800|2001-03-25 22:00:...| 3.1148531| 5.66|\n", + "|nwm30_retrospective| usgs-14138800|2001-03-26 22:00:...| 2.8600016| 4.0|\n", + "|nwm30_retrospective| usgs-14138800|2001-03-28 01:00:...| 7.305746| 7.3399997|\n", + "|nwm30_retrospective| usgs-14138800|2001-03-31 17:00:...| 8.070301| 4.89|\n", + "|nwm30_retrospective| usgs-14138800|2001-04-25 23:00:...| 3.879408| 1.4699999|\n", + "|nwm30_retrospective| usgs-14138800|2001-04-30 12:00:...| 14.016839| 8.44|\n", + "|nwm30_retrospective| usgs-14138800|2001-05-14 22:00:...| 9.42951| 1.4399999|\n", + "|nwm30_retrospective| usgs-14138800|2001-06-03 15:00:...| 2.8600016| 0.71999997|\n", + "+-------------------+-------------------+--------------------+-----------------+-------------------+\n", + "only showing top 20 rows\n", + "\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + " \r" + ] + } + ], + "source": [ + "(\n", + " ev.metrics\n", + " .query(\n", + " group_by=[\"configuration_name\", \"primary_location_id\", \"event_id\"],\n", + " filters=[\n", + " \"primary_location_id = 'usgs-14138800'\",\n", + " \"event = true\",\n", + " ],\n", + " include_metrics=[\n", + " teehr.Metrics.Maximum(\n", + " input_field_names=[\"primary_value\"],\n", + " output_field_name=\"max_primary_value\"\n", + " ),\n", + " teehr.Metrics.Maximum(\n", + " input_field_names=[\"secondary_value\"],\n", + " output_field_name=\"max_secondary_value\"\n", + " )\n", + " ]\n", + " )\n", + " .to_sdf().show()\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "You can see that this gives us the `max_primary_value` and `max_secondary_value` for each unique group of `configuration_name`, `primary_location_id` and `event_id`. But, that is not what we are actually trying to calculate. We are really trying to calculate the `event_max_relative_bias`. To do that we have to add one more step by chaining together queries. In the following query we add an additional chained query where we only group by `configuration_name` and `primary_location_id` which causes the query to aggregate the values from the different events, and then as an aggregation method (`include_metrics`) we choose relative bias." + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/Users/mdenno/.pyenv/versions/3.10.15/lib/python3.10/multiprocessing/resource_tracker.py:224: UserWarning: resource_tracker: There appear to be 1 leaked semaphore objects to clean up at shutdown\n", + " warnings.warn('resource_tracker: There appear to be %d '\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-------------------+-------------------+-----------------------+\n", + "| configuration_name|primary_location_id|event_max_relative_bias|\n", + "+-------------------+-------------------+-----------------------+\n", + "|nwm30_retrospective| usgs-14138800| -0.08458245|\n", + "+-------------------+-------------------+-----------------------+\n", + "\n" + ] + } + ], + "source": [ + "(\n", + " ev.metrics\n", + " .query(\n", + " group_by=[\"configuration_name\", \"primary_location_id\", \"event_id\"],\n", + " filters=[\n", + " \"primary_location_id = 'usgs-14138800'\",\n", + " \"event = true\",\n", + " ],\n", + " include_metrics=[\n", + " teehr.Metrics.Maximum(\n", + " input_field_names=[\"primary_value\"],\n", + " output_field_name=\"max_primary_value\"\n", + " ),\n", + " teehr.Metrics.Maximum(\n", + " input_field_names=[\"secondary_value\"],\n", + " output_field_name=\"max_secondary_value\"\n", + " )\n", + " ]\n", + " )\n", + " .query(\n", + " group_by=[\"configuration_name\", \"primary_location_id\"],\n", + " include_metrics=[\n", + " teehr.Metrics.RelativeBias(\n", + " input_field_names=[\"max_primary_value\", \"max_secondary_value\"],\n", + " output_field_name=\"event_max_relative_bias\"\n", + " )\n", + " ]\n", + " )\n", + " .to_sdf().show()\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "One last thing to cover here. So far we have added the calculated fields on the `joined_timeseries` table, written them to disk, and then queried the `joined_timeseries` table to calculate metrics. This works well and allows the calculated fields to be calculated once and used in many subsequent metrics, plots, etc. However, you may wish to add temporary fields to the `joined_timeseries` table as part of the metrics calculation. This can be done too. Building on the pervious example, where we calculated the \"event_max_relative_bias\", lets now assume we want to calculate the same metric but for the 90th percentile instead of the default 85th percentile that we used when we added the added the `event` and `event_id` fields to the `joined_timeseries` table. We could add the new \"90th percentile event\" to the `joined_timeseries` table and save to disk and then proceed as we did before, or we can add new `event90` and `event90_id` fields to the data frame temporarily before calculating the maximum event values and ultimately the \"event_90th_max_relative_bias\"." + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "[Stage 129:> (0 + 1) / 1]\r" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-------------------+-------------------+----------------------------+\n", + "| configuration_name|primary_location_id|event_90th_max_relative_bias|\n", + "+-------------------+-------------------+----------------------------+\n", + "|nwm30_retrospective| usgs-14138800| -0.08735727|\n", + "+-------------------+-------------------+----------------------------+\n", + "\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + " \r" + ] + } + ], + "source": [ + "(\n", + " ev.metrics\n", + " # Add the PercentileEventDetection calculated field to identify events greater than the 90th percentile.\n", + " # Note the output_event_field_name and output_event_id_field_name are set to \"event90\" and \"event90_id\" respectively.\n", + " .add_calculated_fields([\n", + " tcf.PercentileEventDetection(\n", + " quantile=0.90,\n", + " output_event_field_name=\"event90\",\n", + " output_event_id_field_name=\"event90_id\"\n", + " )\n", + " ])\n", + " # First query to calculate the maximum primary and secondary values for each event.\n", + " # Note the filters are set to only include events where event90 is true and the group_by includes event90_id.\n", + " .query(\n", + " group_by=[\"configuration_name\", \"primary_location_id\", \"event90_id\"],\n", + " filters=[\n", + " \"primary_location_id = 'usgs-14138800'\",\n", + " \"event90 = true\",\n", + " ],\n", + " include_metrics=[\n", + " teehr.Metrics.Maximum(\n", + " input_field_names=[\"primary_value\"],\n", + " output_field_name=\"max_primary_value\"\n", + " ),\n", + " teehr.Metrics.Maximum(\n", + " input_field_names=[\"secondary_value\"],\n", + " output_field_name=\"max_secondary_value\"\n", + " )\n", + " ]\n", + " )\n", + " # Second query to calculate the relative bias between the maximum primary and secondary values.\n", + " .query(\n", + " group_by=[\"configuration_name\", \"primary_location_id\"],\n", + " include_metrics=[\n", + " teehr.Metrics.RelativeBias(\n", + " input_field_names=[\"max_primary_value\", \"max_secondary_value\"],\n", + " output_field_name=\"event_90th_max_relative_bias\"\n", + " )\n", + " ]\n", + " )\n", + " # Convert the metrics to a pandas DataFrame\n", + " .to_sdf().show()\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "metadata": {}, + "outputs": [], + "source": [ + "ev.spark.stop()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.15" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/poetry.lock b/poetry.lock index 95a3bfa5..a720fb81 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.4 and should not be changed by hand. [[package]] name = "accessible-pygments" @@ -454,6 +454,23 @@ charset-normalizer = ["charset-normalizer"] html5lib = ["html5lib"] lxml = ["lxml"] +[[package]] +name = "bleach" +version = "6.2.0" +description = "An easy safelist-based HTML-sanitizing tool." +optional = false +python-versions = ">=3.9" +files = [ + {file = "bleach-6.2.0-py3-none-any.whl", hash = "sha256:117d9c6097a7c3d22fd578fcd8d35ff1e125df6736f554da4e432fdd63f31e5e"}, + {file = "bleach-6.2.0.tar.gz", hash = "sha256:123e894118b8a599fd80d3ec1a6d4cc7ce4e5882b1317a7e1ba69b56e95f991f"}, +] + +[package.dependencies] +webencodings = "*" + +[package.extras] +css = ["tinycss2 (>=1.1.0,<1.5)"] + [[package]] name = "bokeh" version = "3.6.1" @@ -842,6 +859,25 @@ files = [ {file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"}, ] +[[package]] +name = "colorcet" +version = "3.1.0" +description = "Collection of perceptually uniform colormaps" +optional = false +python-versions = ">=3.7" +files = [ + {file = "colorcet-3.1.0-py3-none-any.whl", hash = "sha256:2a7d59cc8d0f7938eeedd08aad3152b5319b4ba3bcb7a612398cc17a384cb296"}, + {file = "colorcet-3.1.0.tar.gz", hash = "sha256:2921b3cd81a2288aaf2d63dbc0ce3c26dcd882e8c389cc505d6886bf7aa9a4eb"}, +] + +[package.extras] +all = ["colorcet[doc]", "colorcet[examples]", "colorcet[tests-extra]", "colorcet[tests]"] +doc = ["colorcet[examples]", "nbsite (>=0.8.4)", "sphinx-copybutton"] +examples = ["bokeh", "holoviews", "matplotlib", "numpy"] +tests = ["packaging", "pre-commit", "pytest (>=2.8.5)", "pytest-cov"] +tests-examples = ["colorcet[examples]", "nbval"] +tests-extra = ["colorcet[tests]", "pytest-mpl"] + [[package]] name = "comm" version = "0.2.2" @@ -1863,6 +1899,31 @@ files = [ [package.dependencies] numpy = ">=1.19.3" +[[package]] +name = "holoviews" +version = "1.20.0" +description = "A high-level plotting API for the PyData ecosystem built on HoloViews." +optional = false +python-versions = ">=3.9" +files = [ + {file = "holoviews-1.20.0-py3-none-any.whl", hash = "sha256:dc810b6790e1dd2c90f16406b292e08db10efa377b2554e46755a130e12044c5"}, + {file = "holoviews-1.20.0.tar.gz", hash = "sha256:29d183045fafa3d846deda999d9687b99b8abdc1a8c06712e54afa576bb02b3e"}, +] + +[package.dependencies] +bokeh = ">=3.1" +colorcet = "*" +numpy = ">=1.21" +packaging = "*" +pandas = ">=1.3" +panel = ">=1.0" +param = ">=2.0,<3.0" +pyviz-comms = ">=2.1" + +[package.extras] +recommended = ["matplotlib (>=3)", "plotly (>=4.0)"] +tests = ["pytest", "pytest-asyncio", "pytest-rerunfailures"] + [[package]] name = "httpcore" version = "1.0.6" @@ -1908,6 +1969,41 @@ cli = ["click (==8.*)", "pygments (==2.*)", "rich (>=10,<14)"] http2 = ["h2 (>=3,<5)"] socks = ["socksio (==1.*)"] +[[package]] +name = "hvplot" +version = "0.11.1" +description = "A high-level plotting API for the PyData ecosystem built on HoloViews." +optional = false +python-versions = ">=3.9" +files = [ + {file = "hvplot-0.11.1-py3-none-any.whl", hash = "sha256:1cef5bf1ab4157c50b5ee265da760a8034b8dbb7fc81867d6050962a2eab9b35"}, + {file = "hvplot-0.11.1.tar.gz", hash = "sha256:989ed0389189adc47edcd2601d2eab18bf366e74b07f5e2873e021323c4a14bb"}, +] + +[package.dependencies] +bokeh = ">=3.1" +colorcet = ">=2" +holoviews = ">=1.19.0" +numpy = ">=1.21" +packaging = "*" +pandas = ">=1.3" +panel = ">=1.0" +param = ">=1.12.0,<3.0" + +[package.extras] +dev-extras = ["setuptools-scm (>=6)"] +doc = ["hvplot[examples]", "nbsite (>=0.8.6)", "sphinxext-rediraffe"] +examples = ["bokeh-sampledata", "dask[dataframe] (>=2021.3.0)", "datashader (>=0.6.5)", "duckdb", "fugue[sql]", "geodatasets (>=2023.12.0)", "hvplot[fugue-sql]", "ibis-framework[duckdb]", "intake (>=0.6.5,<2.0.0)", "intake-parquet (>=0.2.3)", "intake-xarray (>=0.5.0)", "ipywidgets", "matplotlib", "networkx (>=2.6.3)", "notebook (>=5.4)", "numba (>=0.51.0)", "pillow (>=8.2.0)", "plotly", "polars", "pooch (>=1.6.0)", "s3fs (>=2022.1.0)", "scikit-image (>=0.17.2)", "scipy (>=1.5.3)", "selenium (>=3.141.0)", "streamz (>=0.3.0)", "xarray (>=0.18.2)", "xyzservices (>=2022.9.0)"] +examples-tests = ["hvplot[examples]", "hvplot[tests-nb]"] +fugue-sql = ["fugue-sql-antlr (>=0.2.0)", "jinja2", "qpd (>=0.4.4)", "sqlglot"] +geo = ["cartopy", "fiona", "geopandas", "geoviews (>=1.9.0)", "pyproj", "rasterio", "rioxarray", "spatialpandas (>=0.4.3)"] +graphviz = ["pygraphviz"] +hvdev = ["colorcet (>=0.0.1a1)", "datashader (>=0.0.1a1)", "holoviews (>=0.0.1a1)", "panel (>=0.0.1a1)", "param (>=0.0.1a1)", "pyviz-comms (>=0.0.1a1)"] +hvdev-geo = ["geoviews (>=0.0.1a1)"] +tests = ["dask", "duckdb", "fugue[sql]", "hvplot[fugue-sql]", "hvplot[tests-core]", "ibis-framework[duckdb]", "polars", "spatialpandas"] +tests-core = ["bokeh-sampledata", "dask[dataframe]", "ipywidgets", "matplotlib", "parameterized", "plotly", "pooch", "pre-commit", "psutil", "pytest", "pytest-cov", "ruff", "scipy", "xarray"] +tests-nb = ["nbval", "pytest-xdist"] + [[package]] name = "hydrotools-metrics" version = "1.3.3" @@ -2299,6 +2395,26 @@ grib2 = ["cfgrib"] hdf = ["h5py", "xarray"] netcdf3 = ["scipy"] +[[package]] +name = "linkify-it-py" +version = "2.0.3" +description = "Links recognition library with FULL unicode support." +optional = false +python-versions = ">=3.7" +files = [ + {file = "linkify-it-py-2.0.3.tar.gz", hash = "sha256:68cda27e162e9215c17d786649d1da0021a451bdc436ef9e0fa0ba5234b9b048"}, + {file = "linkify_it_py-2.0.3-py3-none-any.whl", hash = "sha256:6bcbc417b0ac14323382aef5c5192c0075bf8a9d6b41820a2b66371eac6b6d79"}, +] + +[package.dependencies] +uc-micro-py = "*" + +[package.extras] +benchmark = ["pytest", "pytest-benchmark"] +dev = ["black", "flake8", "isort", "pre-commit", "pyproject-flake8"] +doc = ["myst-parser", "sphinx", "sphinx-book-theme"] +test = ["coverage", "pytest", "pytest-cov"] + [[package]] name = "llvmlite" version = "0.43.0" @@ -2340,6 +2456,21 @@ files = [ {file = "locket-1.0.0.tar.gz", hash = "sha256:5c0d4c052a8bbbf750e056a8e65ccd309086f4f0f18a2eac306a8dfa4112a632"}, ] +[[package]] +name = "markdown" +version = "3.7" +description = "Python implementation of John Gruber's Markdown." +optional = false +python-versions = ">=3.8" +files = [ + {file = "Markdown-3.7-py3-none-any.whl", hash = "sha256:7eb6df5690b81a1d7942992c97fad2938e956e79df20cbc6186e9c3a77b1c803"}, + {file = "markdown-3.7.tar.gz", hash = "sha256:2ae2471477cfd02dbbf038d5d9bc226d40def84b4fe2986e49b59b6b472bbed2"}, +] + +[package.extras] +docs = ["mdx-gh-links (>=0.2)", "mkdocs (>=1.5)", "mkdocs-gen-files", "mkdocs-literate-nav", "mkdocs-nature (>=0.6)", "mkdocs-section-index", "mkdocstrings[python]"] +testing = ["coverage", "pyyaml"] + [[package]] name = "markdown-it-py" version = "3.0.0" @@ -3079,6 +3210,60 @@ polars = ["polars (>=0.20.0)"] pyspark = ["pyspark[connect] (>=3.2.0)"] strategies = ["hypothesis (>=6.92.7)"] +[[package]] +name = "panel" +version = "1.5.4" +description = "The powerful data exploration & web app framework for Python." +optional = false +python-versions = ">=3.10" +files = [ + {file = "panel-1.5.4-py3-none-any.whl", hash = "sha256:98521ff61dfe2ef684181213842521674d2db95f692c7942ab9103a2c0e882b9"}, + {file = "panel-1.5.4.tar.gz", hash = "sha256:7644e87afe9b94c32b4fca939d645c5b958d671691bd841d3391e31941090092"}, +] + +[package.dependencies] +bleach = "*" +bokeh = ">=3.5.0,<3.7.0" +linkify-it-py = "*" +markdown = "*" +markdown-it-py = "*" +mdit-py-plugins = "*" +packaging = "*" +pandas = ">=1.2" +param = ">=2.1.0,<3.0" +pyviz-comms = ">=2.0.0" +requests = "*" +tqdm = "*" +typing-extensions = "*" + +[package.extras] +dev = ["watchfiles"] +fastapi = ["bokeh-fastapi (>=0.1.2)", "fastapi[standard]"] +mypy = ["mypy", "pandas-stubs", "types-bleach", "types-croniter", "types-markdown", "types-psutil", "types-requests", "types-tqdm", "typing-extensions"] +recommended = ["holoviews (>=1.18.0)", "jupyterlab", "matplotlib", "pillow", "plotly"] +tests = ["psutil", "pytest", "pytest-asyncio", "pytest-rerunfailures", "pytest-xdist"] + +[[package]] +name = "param" +version = "2.1.1" +description = "Make your Python code clearer and more reliable by declaring Parameters." +optional = false +python-versions = ">=3.8" +files = [ + {file = "param-2.1.1-py3-none-any.whl", hash = "sha256:81066d040526fbaa44b6419f3e92348fa8856ea44c8d3915e9245937ddabe2d6"}, + {file = "param-2.1.1.tar.gz", hash = "sha256:3b1da14abafa75bfd908572378a58696826b3719a723bc31b40ffff2e9a5c852"}, +] + +[package.extras] +all = ["aiohttp", "cloudpickle", "coverage[toml]", "flake8", "gmpy", "ipython", "jsonschema", "nbsite (==0.8.4)", "nbval", "nest-asyncio", "numpy", "odfpy", "openpyxl", "pandas", "panel", "pre-commit", "pyarrow", "pytest", "pytest-asyncio", "pytest-xdist", "sphinx-remove-toctrees", "tables", "xlrd"] +doc = ["aiohttp", "nbsite (==0.8.4)", "pandas", "panel", "sphinx-remove-toctrees"] +examples = ["aiohttp", "pandas", "panel"] +lint = ["flake8", "pre-commit"] +tests = ["coverage[toml]", "pytest", "pytest-asyncio"] +tests-deser = ["odfpy", "openpyxl", "pyarrow", "tables", "xlrd"] +tests-examples = ["aiohttp", "nbval", "pandas", "panel", "pytest", "pytest-asyncio", "pytest-xdist"] +tests-full = ["aiohttp", "cloudpickle", "coverage[toml]", "gmpy", "ipython", "jsonschema", "nbval", "nest-asyncio", "numpy", "odfpy", "openpyxl", "pandas", "panel", "pyarrow", "pytest", "pytest-asyncio", "pytest-xdist", "tables", "xlrd"] + [[package]] name = "parso" version = "0.8.4" @@ -3961,6 +4146,25 @@ files = [ {file = "pytz-2024.2.tar.gz", hash = "sha256:2aa355083c50a0f93fa581709deac0c9ad65cca8a9e9beac660adcbd493c798a"}, ] +[[package]] +name = "pyviz-comms" +version = "3.0.3" +description = "A JupyterLab extension for rendering HoloViz content." +optional = false +python-versions = ">=3.8" +files = [ + {file = "pyviz_comms-3.0.3-py3-none-any.whl", hash = "sha256:fd26951eebc7950106d481655d91ba06296d4cf352dffb1d03f88f959832448e"}, + {file = "pyviz_comms-3.0.3.tar.gz", hash = "sha256:fde4a017c2213ecee63a9a6741431c845e42a5c7b1588e4a7ba2e4370c583728"}, +] + +[package.dependencies] +param = "*" + +[package.extras] +all = ["flake8", "jupyterlab (>=4.0,<5.0)", "keyring", "pytest", "rfc3986", "setuptools (>=40.8.0)", "twine"] +build = ["jupyterlab (>=4.0,<5.0)", "keyring", "rfc3986", "setuptools (>=40.8.0)", "twine"] +tests = ["flake8", "pytest"] + [[package]] name = "pywin32" version = "308" @@ -5084,6 +5288,27 @@ files = [ {file = "tornado-6.4.1.tar.gz", hash = "sha256:92d3ab53183d8c50f8204a51e6f91d18a15d5ef261e84d452800d4ff6fc504e9"}, ] +[[package]] +name = "tqdm" +version = "4.67.1" +description = "Fast, Extensible Progress Meter" +optional = false +python-versions = ">=3.7" +files = [ + {file = "tqdm-4.67.1-py3-none-any.whl", hash = "sha256:26445eca388f82e72884e0d580d5464cd801a3ea01e63e5601bdff9ba6a48de2"}, + {file = "tqdm-4.67.1.tar.gz", hash = "sha256:f8aef9c52c08c13a65f30ea34f4e5aac3fd1a34959879d7e59e63027286627f2"}, +] + +[package.dependencies] +colorama = {version = "*", markers = "platform_system == \"Windows\""} + +[package.extras] +dev = ["nbval", "pytest (>=6)", "pytest-asyncio (>=0.24)", "pytest-cov", "pytest-timeout"] +discord = ["requests"] +notebook = ["ipywidgets (>=6)"] +slack = ["slack-sdk"] +telegram = ["requests"] + [[package]] name = "traitlets" version = "5.14.3" @@ -5154,6 +5379,20 @@ files = [ {file = "tzdata-2024.2.tar.gz", hash = "sha256:7d85cc416e9382e69095b7bdf4afd9e3880418a2413feec7069d533d6b4e31cc"}, ] +[[package]] +name = "uc-micro-py" +version = "1.0.3" +description = "Micro subset of unicode data files for linkify-it-py projects." +optional = false +python-versions = ">=3.7" +files = [ + {file = "uc-micro-py-1.0.3.tar.gz", hash = "sha256:d321b92cff673ec58027c04015fcaa8bb1e005478643ff4a500882eaab88c48a"}, + {file = "uc_micro_py-1.0.3-py3-none-any.whl", hash = "sha256:db1dffff340817673d7b466ec86114a9dc0e9d4d9b5ba229d9d60e5c12600cd5"}, +] + +[package.extras] +test = ["coverage", "pytest", "pytest-cov"] + [[package]] name = "ujson" version = "5.10.0" @@ -5417,6 +5656,17 @@ files = [ {file = "wcwidth-0.2.13.tar.gz", hash = "sha256:72ea0c06399eb286d978fdedb6923a9eb47e1c486ce63e9b4e64fc18303972b5"}, ] +[[package]] +name = "webencodings" +version = "0.5.1" +description = "Character encoding aliases for legacy web content" +optional = false +python-versions = "*" +files = [ + {file = "webencodings-0.5.1-py2.py3-none-any.whl", hash = "sha256:a0af1213f3c2226497a97e2b3aa01a7e4bee4f403f95be16fc9acd2947514a78"}, + {file = "webencodings-0.5.1.tar.gz", hash = "sha256:b36a1c245f2d304965eb4e0a82848379241dc04b865afcc4aab16748587e1923"}, +] + [[package]] name = "websockets" version = "14.1" @@ -5749,4 +5999,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.0" python-versions = "^3.10" -content-hash = "6d1c4fc3b8849603f90d5502578baae326706739a4e03baadd32d06107dc6480" +content-hash = "602dae1815d49eca2ad9993c1673d2289d7b127adf00b2cac416e2b3f7f89df2" diff --git a/pyproject.toml b/pyproject.toml index 294ea861..4189267d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,7 @@ arch = "^7.0.0" pandera = {extras = ["pyspark"], version = "^0.20.4"} netcdf4 = "1.6.5" bokeh = "^3.5.0" +hvplot = "^0.11.1" [tool.poetry.group.test.dependencies] pytest = "^7.4.3" diff --git a/src/teehr/__init__.py b/src/teehr/__init__.py index 42f64f1d..36932a93 100644 --- a/src/teehr/__init__.py +++ b/src/teehr/__init__.py @@ -22,6 +22,9 @@ JoinedTimeseriesFilter ) +from teehr.models.calculated_fields.row_level import RowLevelCalculatedFields # noqa +from teehr.models.calculated_fields.timeseries_aware import TimeseriesAwareCalculatedFields # noqa + # For docs from teehr.evaluation.tables.base_table import ( # noqa BaseTable, diff --git a/src/teehr/evaluation/metrics.py b/src/teehr/evaluation/metrics.py index a4baf7be..6a92225f 100644 --- a/src/teehr/evaluation/metrics.py +++ b/src/teehr/evaluation/metrics.py @@ -8,6 +8,7 @@ JoinedTimeseriesFilter ) from teehr.models.metrics.metric_models import MetricsBasemodel +from teehr.models.calculated_fields.base import CalculatedFieldBaseModel from teehr.models.table_enums import ( JoinedTimeseriesFields ) @@ -173,3 +174,34 @@ def to_geopandas(self) -> gpd.GeoDataFrame: def to_sdf(self) -> ps.DataFrame: """Return the Spark DataFrame.""" return self.df + + def add_calculated_fields(self, cfs: Union[CalculatedFieldBaseModel, List[CalculatedFieldBaseModel]]): + """Add calculated fields to the joined timeseries DataFrame before running metrics. + + Parameters + ---------- + cfs : Union[CalculatedFieldBaseModel, List[CalculatedFieldBaseModel]] + The CFs to apply to the DataFrame. + + Returns + ------- + self + The Metrics object with the CFs applied to the DataFrame. + + Examples + -------- + >>> import teehr + >>> from teehr import RowLevelCalculatedFields as rcf + >>> ev.join_timeseries.add_calculated_fields([ + >>> rcf.Month() + >>> ]).write() + """ + # self._check_load_table() + + if not isinstance(cfs, List): + cfs = [cfs] + + for cf in cfs: + self.df = cf.apply_to(self.df) + + return self \ No newline at end of file diff --git a/src/teehr/evaluation/tables/joined_timeseries_table.py b/src/teehr/evaluation/tables/joined_timeseries_table.py index 95eb9f7d..a3beb96c 100644 --- a/src/teehr/evaluation/tables/joined_timeseries_table.py +++ b/src/teehr/evaluation/tables/joined_timeseries_table.py @@ -8,6 +8,8 @@ import pyspark.sql as ps import logging from teehr.utils.utils import to_path_or_s3path +from teehr.models.calculated_fields.base import CalculatedFieldBaseModel +from typing import List, Union logger = logging.getLogger(__name__) @@ -110,7 +112,42 @@ def _add_attr(self, joined_df: ps.DataFrame) -> ps.DataFrame: return joined_df - def _add_udfs(self, joined_df: ps.DataFrame) -> ps.DataFrame: + def add_calculated_fields(self, cfs: Union[CalculatedFieldBaseModel, List[CalculatedFieldBaseModel]]): + """Add calculated fields to the joined timeseries table. + + Note this does not persist the CFs to the table. It only applies them to the DataFrame. + To persist the CFs to the table, use the `write` method. + + Parameters + ---------- + cfs : Union[CalculatedFieldBaseModel, List[CalculatedFieldBaseModel]] + The CFs to apply to the DataFrame. + + Examples + -------- + >>> import teehr + >>> from teehr import RowLevelCalculatedFields as rcf + >>> ev.join_timeseries.add_calculated_fields([ + >>> rcf.Month() + >>> ]).write() + """ + self._check_load_table() + + if not isinstance(cfs, List): + cfs = [cfs] + + for cf in cfs: + self.df = cf.apply_to(self.df) + + return self + + def write(self): + """Write the joined timeseries table to disk.""" + self._write_spark_df(self.df) + logger.info("Joined timeseries table written to disk.") + self._load_table() + + def _run_script(self, joined_df: ps.DataFrame) -> ps.DataFrame: """Add UDFs to the joined timeseries dataframe.""" try: @@ -126,12 +163,12 @@ def _add_udfs(self, joined_df: ps.DataFrame) -> ps.DataFrame: return joined_df - def create(self, add_attrs: bool = False, execute_udf: bool = False): + def create(self, add_attrs: bool = False, execute_scripts: bool = False): """Create joined timeseries table. Parameters ---------- - execute_udf : bool, optional + execute_scripts : bool, optional Execute UDFs, by default False add_attrs : bool, optional Add attributes, by default False @@ -141,11 +178,10 @@ def create(self, add_attrs: bool = False, execute_udf: bool = False): if add_attrs: joined_df = self._add_attr(joined_df) - if execute_udf: - joined_df = self._add_udfs(joined_df) + if execute_scripts: + joined_df = self._run_script(joined_df) validated_df = self._validate(joined_df, False) - self._write_spark_df(validated_df) logger.info("Joined timeseries table created.") self._load_table() \ No newline at end of file diff --git a/src/teehr/examples/01-evaluation_setup.ipynb b/src/teehr/examples/01-evaluation_setup.ipynb index 2e4ddd5c..72e84d38 100644 --- a/src/teehr/examples/01-evaluation_setup.ipynb +++ b/src/teehr/examples/01-evaluation_setup.ipynb @@ -188,7 +188,7 @@ "outputs": [], "source": [ "# Create the joined timeseries\n", - "ev.joined_timeseries.create(execute_udf=True)" + "ev.joined_timeseries.create(execute_scripts=True)" ] } ], diff --git a/src/teehr/examples/05-2_site_setup.ipynb b/src/teehr/examples/05-2_site_setup.ipynb index cd5b6cbb..a938fd78 100644 --- a/src/teehr/examples/05-2_site_setup.ipynb +++ b/src/teehr/examples/05-2_site_setup.ipynb @@ -183,7 +183,7 @@ "outputs": [], "source": [ "# Create the joined timeseries\n", - "ev.joined_timeseries.create(add_attrs=True, execute_udf=True)" + "ev.joined_timeseries.create(add_attrs=True, execute_scripts=True)" ] }, { diff --git a/src/teehr/examples/ngen_example.py b/src/teehr/examples/ngen_example.py index d733c1bb..cd1574bb 100644 --- a/src/teehr/examples/ngen_example.py +++ b/src/teehr/examples/ngen_example.py @@ -61,7 +61,7 @@ # ev.load.import_secondary_timeseries() # Create the joined timeseries -ev.joined_timeseries.create(execute_udf=False) +ev.joined_timeseries.create(execute_scripts=False) df = ev.metrics.query( order_by=["primary_location_id", "configuration_name"], diff --git a/src/teehr/examples/setup_evaluation.py b/src/teehr/examples/setup_evaluation.py index 34b4cd61..ffa28638 100644 --- a/src/teehr/examples/setup_evaluation.py +++ b/src/teehr/examples/setup_evaluation.py @@ -111,4 +111,4 @@ ) # Create the joined timeseries -ev.joined_timeseries.create(execute_udf=True) +ev.joined_timeseries.create(execute_scripts=True) diff --git a/src/teehr/models/calculated_fields/base.py b/src/teehr/models/calculated_fields/base.py new file mode 100644 index 00000000..c8a97e6c --- /dev/null +++ b/src/teehr/models/calculated_fields/base.py @@ -0,0 +1,18 @@ +import abc +from pydantic import BaseModel as PydanticBaseModel, ConfigDict + + +class CalculatedFieldABC(abc.ABC): + @abc.abstractmethod + def apply_to(self): + pass + + +class CalculatedFieldBaseModel(PydanticBaseModel): + """Calculated field base model configuration.""" + + model_config = ConfigDict( + arbitrary_types_allowed=True, + validate_assignment=True, + extra='forbid' # raise an error if extra fields are passed + ) \ No newline at end of file diff --git a/src/teehr/models/calculated_fields/row_level.py b/src/teehr/models/calculated_fields/row_level.py new file mode 100644 index 00000000..b48476da --- /dev/null +++ b/src/teehr/models/calculated_fields/row_level.py @@ -0,0 +1,221 @@ +"""Classes representing UDFs.""" +from typing import List, Union +from pydantic import BaseModel as PydanticBaseModel +from pydantic import Field, ConfigDict +import pandas as pd +import pyspark.sql.types as T +from pyspark.sql.functions import pandas_udf +import pyspark.sql as ps +from teehr.models.calculated_fields.base import CalculatedFieldABC, CalculatedFieldBaseModel + + +class Month(CalculatedFieldABC, CalculatedFieldBaseModel): + """Adds the month from a timestamp column. + + Properties + ---------- + - input_field_name: + The name of the column containing the timestamp. + Default: "value_time" + - output_field_name: + The name of the column to store the month. + Default: "month" + + """ + input_field_name: str = Field( + default="value_time" + ) + output_field_name: str = Field( + default="month" + ) + + def apply_to(self, sdf: ps.DataFrame) -> ps.DataFrame: + @pandas_udf(returnType=T.IntegerType()) + def func(col: pd.Series) -> pd.Series: + return col.dt.month + + sdf = sdf.withColumn( + self.output_field_name, + func(self.input_field_name) + ) + return sdf + + +class Year(CalculatedFieldABC, CalculatedFieldBaseModel): + """Adds the year from a timestamp column. + + Properties + ---------- + - input_field_name: + The name of the column containing the timestamp. + Default: "value_time" + - output_field_name: + The name of the column to store the year. + Default: "year" + + """ + input_field_name: str = Field( + default="value_time" + ) + output_field_name: str = Field( + default="year" + ) + + def apply_to(self, sdf: ps.DataFrame) -> ps.DataFrame: + @pandas_udf(returnType=T.IntegerType()) + def func(col: pd.Series) -> pd.Series: + return col.dt.year + + sdf = sdf.withColumn( + self.output_field_name, + func(self.input_field_name) + ) + return sdf + + +class WaterYear(CalculatedFieldABC, CalculatedFieldBaseModel): + """Adds the water year from a timestamp column. + + Properties + ---------- + - input_field_name: + The name of the column containing the timestamp. + Default: "value_time" + - output_field_name: + The name of the column to store the water year. + Default: "water_year" + + Water year is defined as the year of the date plus one if the month is October or later. + """ + input_field_name: str = Field( + default="value_time" + ) + output_field_name: str = Field( + default="water_year" + ) + + def apply_to(self, sdf: ps.DataFrame) -> ps.DataFrame: + @pandas_udf(returnType=T.IntegerType()) + def func(col: pd.Series) -> pd.Series: + return col.dt.year + (col.dt.month >= 10).astype(int) + + sdf = sdf.withColumn( + self.output_field_name, + func(self.input_field_name) + ) + return sdf + + +class NormalizedFlow(CalculatedFieldABC, CalculatedFieldBaseModel): + """Normalize flow values by drainage area. + + Properties + ---------- + - primary_value_field_name: + The name of the column containing the flow values. + Default: "primary_value" + - drainage_area_field_name: + The name of the column containing the drainage area. + Default: "drainage_area" + - output_field_name: + The name of the column to store the normalized flow values. + Default: "normalized_flow" + + """ + primary_value_field_name: str = Field( + default="primary_value" + ) + drainage_area_field_name: str = Field( + default="drainage_area" + ) + output_field_name: str = Field( + default="normalized_flow" + ) + + def apply_to(self, sdf: ps.DataFrame) -> ps.DataFrame: + + @pandas_udf(returnType=T.FloatType()) + def func(value: pd.Series, area: pd.Series) -> pd.Series: + return value.astype(float) / area.astype(float) + + sdf = sdf.withColumn( + self.output_field_name, + func(self.primary_value_field_name, self.drainage_area_field_name) + ) + return sdf + +class Seasons(CalculatedFieldABC, CalculatedFieldBaseModel): + """Adds the season from a timestamp column. + + Properties + ---------- + - value_time_field_name: + The name of the column containing the timestamp. + Default: "value_time" + - season_months: + A dictionary mapping season names to the months that define them. + `Default: { + "winter": [12, 1, 2], + "spring": [3, 4, 5], + "summer": [6, 7, 8], + "fall": [9, 10, 11] + }` + - output_field_name: + The name of the column to store the season. + Default: "season" + + """ + value_time_field_name: str = Field( + default="value_time" + ) + season_months: dict = Field( + default={ + "winter": [12, 1, 2], + "spring": [3, 4, 5], + "summer": [6, 7, 8], + "fall": [9, 10, 11] + } + ) + output_field_name: str = Field( + default="season" + ) + + def apply_to(self, sdf: ps.DataFrame) -> ps.DataFrame: + + @pandas_udf(returnType=T.StringType()) + def func(value_time: pd.Series) -> pd.Series: + return value_time.dt.month.apply( + lambda x: next( + (season for season, months in self.season_months.items() if x in months), + None + ) + ) + + sdf = sdf.withColumn( + self.output_field_name, + func(self.value_time_field_name) + ) + return sdf + +class RowLevelCalculatedFields(): + """Row level Calculated Fields. + + Row level CFs are applied to each row in the table based on data that is in one or more + existing fields. These are applied per row and are not aware of the data in any other + row (e.g., are not aware of any other timeseries values in a "timeseries"). This can be + used for adding fields such as a field based on the data/time (e.g., month, year, season, etc.) + or based on the value field (e.g., normalized flow, log flow, etc.) and many other uses. + + Available Calculated Fields: + - Month + - Year + - WaterYear + - NormalizedFlow + - Seasons + + """ + Month = Month + Year = Year + WaterYear = WaterYear + NormalizedFlow = NormalizedFlow + Seasons = Seasons \ No newline at end of file diff --git a/src/teehr/models/calculated_fields/timeseries_aware.py b/src/teehr/models/calculated_fields/timeseries_aware.py new file mode 100644 index 00000000..c58d0946 --- /dev/null +++ b/src/teehr/models/calculated_fields/timeseries_aware.py @@ -0,0 +1,194 @@ +"""Classes representing UDFs.""" +from typing import List, Union +from pydantic import Field +import pandas as pd +import pyspark.sql.types as T +import pyspark.sql as ps +from teehr.models.calculated_fields.base import CalculatedFieldABC, CalculatedFieldBaseModel + +class PercentileEventDetection(CalculatedFieldABC, CalculatedFieldBaseModel): + """Adds an "event" and "event_id" column to the DataFrame based on a percentile threshold. + + The "event" column (bool) indicates whether the value is above the XXth percentile. + The "event_id" column (string) groups continuous segments of events and assigns a + unique ID to each segment in the format "startdate-enddate". + + Properties + ---------- + - quantile: + The percentile threshold to use for event detection. + Default: 0.85 + - value_time_field_name: + The name of the column containing the timestamp. + Default: "value_time" + - value_field_name: + The name of the column containing the value to detect events on. + Default: "primary_value" + - output_event_field_name: + The name of the column to store the event detection. + Default: "event" + - output_event_id_field_name: + The name of the column to store the event ID. + Default: "event_id" + - uniqueness_fields: + The columns to use to uniquely identify each timeseries. + `Default: [ + 'reference_time', + 'primary_location_id', + 'configuration_name', + 'variable_name', + 'unit_name' + ]` + """ + quantile: float = Field( + default=0.85 + ) + value_time_field_name: str = Field( + default="value_time" + ) + value_field_name: str = Field( + default="primary_value" + ) + output_event_field_name: str = Field( + default="event" + ) + output_event_id_field_name: str = Field( + default="event_id" + ) + uniqueness_fields: Union[str, List[str]] = Field( + default=[ + 'reference_time', + 'primary_location_id', + 'configuration_name', + 'variable_name', + 'unit_name' + ] + ) + + @staticmethod + def add_is_event( + sdf: ps.DataFrame, + output_field, + input_field, + quantile, + group_by, + return_type=T.BooleanType() + + ): + # Get the schema of the input DataFrame + input_schema = sdf.schema + + # Create a copy of the schema and add the new column + output_schema = T.StructType(input_schema.fields + [T.StructField(output_field, return_type, True)]) + + def is_event(pdf, input_field, quantile, output_field) -> pd.DataFrame: + pvs = pdf[input_field] + + # Calculate the XXth percentile + percentile = pvs.quantile(quantile) + + # Create a new column indicating whether each value is above the XXth percentile + pdf[output_field] = pvs > percentile + + return pdf + + def wrapper(pdf, input_field, quantile, output_field): + return is_event(pdf, input_field, quantile, output_field) + + # Group the data and apply the UDF + # lambda pdf: wrapper_function(pdf, threshold_value) + sdf = sdf.groupby(group_by).applyInPandas( + lambda pdf: wrapper(pdf, input_field, quantile, output_field), + schema=output_schema + ) + + return sdf + + @staticmethod + def add_event_ids( + sdf, + output_field, + input_field, + time_field, + group_by, + return_type=T.StringType(), + + ): + # Get the schema of the input DataFrame + input_schema = sdf.schema + + # Create a copy of the schema and add the new column + output_schema = T.StructType(input_schema.fields + [T.StructField(output_field, return_type, True)]) + + def event_ids(pdf: pd.DataFrame, input_field, time_field, output_field) -> pd.DataFrame: + # Create a new column for continuous segments + pdf['segment'] = (pdf[input_field] != pdf[input_field].shift()).cumsum() + + # Filter only the segments where values are over the 90th percentile + segments = pdf[pdf[input_field]] + + # Group by segment and create startdate-enddate string + segment_ranges = segments.groupby('segment').agg( + startdate=(time_field, 'min'), + enddate=(time_field, 'max') + ).reset_index() + + # Merge the segment ranges back to the original DataFrame + pdf = pdf.merge(segment_ranges[['segment', 'startdate', 'enddate']], on='segment', how='left') + + # Create the startdate-enddate string column + pdf[output_field] = pdf.apply( + lambda row: f"{row['startdate']}-{row['enddate']}" if pd.notnull(row['startdate']) else None, + axis=1 + ) + + # Drop the 'segment', 'startdate', and 'enddate' columns before returning + pdf.drop(columns=['segment', 'startdate', 'enddate'], inplace=True) + + return pdf + + def wrapper(pdf, input_field, time_field, output_field): + return event_ids(pdf, input_field, time_field, output_field) + + # Group the data and apply the UDF + sdf = sdf.orderBy(*group_by, time_field).groupby(group_by).applyInPandas( + lambda pdf: wrapper(pdf, input_field, time_field, output_field), + schema=output_schema + ) + + return sdf + + def apply_to(self, sdf: ps.DataFrame) -> ps.DataFrame: + + sdf = self.add_is_event( + sdf=sdf, + input_field=self.value_field_name, + quantile=self.quantile, + output_field=self.output_event_field_name, + group_by=self.uniqueness_fields + ) + sdf = self.add_event_ids( + sdf=sdf, + input_field=self.output_event_field_name, + time_field=self.value_time_field_name, + output_field=self.output_event_id_field_name, + group_by=self.uniqueness_fields + ) + + return sdf + + +class TimeseriesAwareCalculatedFields(): + """Timeseries aware calculated fields. + + Timeseries aware CFs are aware of ordered groups of data (e.g., a timeseries). + This is useful for things such as event detection, base flow separation, and + other fields that need to be calculated based on a entire timeseries. The + definition of what creates a unique set of timeseries (i.e., a timeseries) can + be specified. + + Available Calculated Fields: + - PercentileEventDetection + + """ + PercentileEventDetection = PercentileEventDetection \ No newline at end of file diff --git a/src/teehr/models/pandera_dataframe_schemas.py b/src/teehr/models/pandera_dataframe_schemas.py index 9834dbb2..1083e916 100644 --- a/src/teehr/models/pandera_dataframe_schemas.py +++ b/src/teehr/models/pandera_dataframe_schemas.py @@ -503,7 +503,8 @@ def secondary_timeseries_schema( ), "member": ps.Column( T.StringType(), - nullable=True + nullable=True, + ) }, strict=True, diff --git a/src/teehr/template/scripts/user_defined_fields.py b/src/teehr/template/scripts/user_defined_fields.py index 79b92331..783df9be 100644 --- a/src/teehr/template/scripts/user_defined_fields.py +++ b/src/teehr/template/scripts/user_defined_fields.py @@ -11,9 +11,9 @@ """ from pyspark.sql import DataFrame -from pyspark.sql.functions import udf -from pyspark.sql.types import IntegerType, FloatType import logging +from teehr import RowLevelCalculatedFields as rcf +from teehr import TimeseriesAwareCalculatedFields as tcf logger = logging.getLogger(__name__) @@ -27,8 +27,7 @@ def add_user_defined_fields( Parameters ---------- - joined_df: DataFrame - The joined timeseries data. + joined_timeseries : JoinedTimeseriesTable Returns ------- @@ -40,59 +39,22 @@ def add_user_defined_fields( # Add a month field to the joined timeseries data logger.info("Adding month from date") - @udf(returnType=IntegerType()) - def month_from_date(date): - return date.month - - joined_df = joined_df.withColumn( - "month", - month_from_date("value_time") - ) - - # Add a year field to the joined timeseries data - logger.info("Adding water year from date") - - @udf(returnType=IntegerType()) - def year_from_date(date): - return date.year - - joined_df = joined_df.withColumn( - "year", - year_from_date("value_time") - ) - - # Add a water year field to the joined timeseries data - logger.info("Adding water year from date") - - @udf(returnType=IntegerType()) - def water_year_from_date(date): - if date.month >= 10: - return date.year + 1 - else: - return date.year - - joined_df = joined_df.withColumn( - "water_year", - water_year_from_date("value_time") - ) - - # Add a normalized flow for primary and secondary values - # to the joined timeseries data. - # logger.info("Adding normalized flow") - - # @udf(returnType=FloatType()) - # def normalized_flow(flow, area): - # return float(flow) / float(area) - - # joined_df = joined_df.withColumn( - # "primary_normalized_flow", - # normalized_flow("primary_value", "drainage_area") - # ) - - # joined_df = joined_df.withColumn( - # "secondary_normalized_flow", - # normalized_flow("secondary_value", "drainage_area") - # ) + month = rcf.Month() + year = rcf.Year() + water_year = rcf.WaterYear() + # normalized_flow = rcf.NormalizedFlow() + seasons = rcf.Seasons() + + cfs = [ + month, + year, + water_year, + # normalized_flow, + seasons + ] + + for cf in cfs: + joined_df = cf.apply_to(joined_df) # Return the joined timeseries data with user defined fields return joined_df \ No newline at end of file diff --git a/tests/setup_v0_3_study.py b/tests/setup_v0_3_study.py index 25d9ec17..66223f50 100644 --- a/tests/setup_v0_3_study.py +++ b/tests/setup_v0_3_study.py @@ -103,6 +103,6 @@ def setup_v0_3_study(tmpdir): ) # Create the joined timeseries - eval.joined_timeseries.create(execute_udf=True) + eval.joined_timeseries.create(add_attrs=True, execute_scripts=True) return eval diff --git a/tests/test_add_udfs.py b/tests/test_add_udfs.py new file mode 100644 index 00000000..04c22602 --- /dev/null +++ b/tests/test_add_udfs.py @@ -0,0 +1,122 @@ +"""Tests for the TEEHR study creation.""" +import tempfile +import teehr +from teehr import RowLevelCalculatedFields as rcf +from teehr import TimeseriesAwareCalculatedFields as tcf + +from setup_v0_3_study import setup_v0_3_study + +import pyspark.sql.types as T + +def test_add_row_udfs(tmpdir): + """Test adding row level UDFs.""" + ev = setup_v0_3_study(tmpdir) + sdf = ev.joined_timeseries.to_sdf() + + sdf = rcf.Month().apply_to(sdf) + + sdf = rcf.Year().apply_to(sdf) + + sdf = rcf.WaterYear().apply_to(sdf) + + sdf = rcf.NormalizedFlow().apply_to(sdf) + + sdf = rcf.Seasons().apply_to(sdf) + + cols = sdf.columns + assert "month" in cols + assert "year" in cols + assert "water_year" in cols + assert "normalized_flow" in cols + assert "season" in cols + + ev.spark.stop() + + +def test_add_timeseries_udfs(tmpdir): + """Test adding a timeseries aware UDF.""" + ev = setup_v0_3_study(tmpdir) + sdf = ev.joined_timeseries.to_sdf() + + ped = tcf.PercentileEventDetection() + sdf = ped.apply_to(sdf) + + cols = sdf.columns + assert "event" in cols + assert "event_id" in cols + + ev.spark.stop() + + +def test_add_udfs_write(tmpdir): + """Test adding UDFs and write DataFrame back to table.""" + ev = setup_v0_3_study(tmpdir) + + ped = tcf.PercentileEventDetection() + ev.joined_timeseries.add_calculated_fields(ped).write() + new_sdf = ev.joined_timeseries.to_sdf() + + cols = new_sdf.columns + assert "event" in cols + assert "event_id" in cols + + ev.spark.stop() + +def test_location_event_detection(tmpdir): + """Test event detection and metrics per event.""" + ev = setup_v0_3_study(tmpdir) + + ped = tcf.PercentileEventDetection() + sdf = ev.metrics.add_calculated_fields(ped).query( + group_by=["configuration_name", "primary_location_id", "event_id"], + include_metrics=[ + teehr.Metrics.Maximum( + input_field_names=["primary_value"], + output_field_name="max_primary_value" + ), + teehr.Metrics.Maximum( + input_field_names=["secondary_value"], + output_field_name="max_secondary_value" + ) + ] + ).to_sdf() + + assert sdf.count() == 6 + + assert "configuration_name" in sdf.columns + assert "primary_location_id" in sdf.columns + assert "event_id" in sdf.columns + assert "max_primary_value" in sdf.columns + assert "max_secondary_value" in sdf.columns + + ev.spark.stop() + + +if __name__ == "__main__": + with tempfile.TemporaryDirectory( + prefix="teehr-" + ) as tempdir: + test_add_row_udfs( + tempfile.mkdtemp( + prefix="1-", + dir=tempdir + ) + ) + test_add_timeseries_udfs( + tempfile.mkdtemp( + prefix="2-", + dir=tempdir + ) + ) + test_add_udfs_write( + tempfile.mkdtemp( + prefix="3-", + dir=tempdir + ) + ) + test_location_event_detection( + tempfile.mkdtemp( + prefix="5-", + dir=tempdir + ) + ) diff --git a/tests/test_create_joined_timeseries.py b/tests/test_create_joined_timeseries.py index 8e495fe8..de165513 100644 --- a/tests/test_create_joined_timeseries.py +++ b/tests/test_create_joined_timeseries.py @@ -104,7 +104,7 @@ def test_create_joined_timeseries(tmpdir): ) # Create the joined timeseries - eval.joined_timeseries.create(add_attrs=True, execute_udf=True) + eval.joined_timeseries.create(add_attrs=True, execute_scripts=True) columns = eval.joined_timeseries.to_sdf().columns expected_columns = [ @@ -124,10 +124,11 @@ def test_create_joined_timeseries(tmpdir): 'water_year', 'configuration_name', 'variable_name', - 'member' + 'member', + 'season' ] - assert len(columns) == 17 + assert len(columns) == len(expected_columns) assert sorted(columns) == sorted(expected_columns) diff --git a/tests/test_import_timeseries.py b/tests/test_import_timeseries.py index aef9d96f..b7cd3b24 100644 --- a/tests/test_import_timeseries.py +++ b/tests/test_import_timeseries.py @@ -317,7 +317,7 @@ def test_validate_and_insert_fews_xml_timeseries(tmpdir): eval.primary_timeseries.load_parquet( in_path=primary_filepath ) - eval.joined_timeseries.create(execute_udf=False) + eval.joined_timeseries.create(execute_scripts=False) # df = eval.joined_timeseries.to_pandas() # Now, metrics.