From f5bfb50e96e252936e0d9450b55783ed5749f9b2 Mon Sep 17 00:00:00 2001 From: Martijn Visser Date: Thu, 26 Sep 2024 14:12:01 +0200 Subject: [PATCH] Use pyarrow dtype_backend (#1781) Fixes #1721 Fixes #1394 The `dtype_backend` part of this is not breaking. The only part that is technically breaking is that we specify a unit of milliseconds for the Arrow time type. Previously we used the default nanosecond precision, which was then truncated to milliseconds in the core. I think it is better to disallow precision higher than milliseconds if we cannot distinguish them in the core. --- core/src/schema.jl | 12 +- core/src/util.jl | 9 +- core/src/validation.jl | 10 +- core/test/validation_test.jl | 4 +- docs/guide/examples.ipynb | 38 +- docs/tutorial/irrigation-demand.ipynb | 8 +- docs/tutorial/natural-flow.ipynb | 8 +- docs/tutorial/reservoir.ipynb | 8 +- pixi.toml | 2 + python/ribasim/ribasim/config.py | 13 +- python/ribasim/ribasim/delwaq/generate.py | 25 +- python/ribasim/ribasim/delwaq/parse.py | 6 +- python/ribasim/ribasim/geometry/edge.py | 4 +- python/ribasim/ribasim/input_base.py | 22 +- python/ribasim/ribasim/model.py | 20 +- python/ribasim/ribasim/schemas.py | 610 +++++++++++++----- python/ribasim/ribasim/utils.py | 15 + python/ribasim/tests/test_io.py | 59 +- python/ribasim/tests/test_model.py | 9 +- .../ribasim_testmodels/basic.py | 14 +- .../ribasim_testmodels/continuous_control.py | 4 +- .../ribasim_testmodels/discrete_control.py | 4 +- .../ribasim_testmodels/time.py | 4 +- utils/gen_python.jl | 22 +- utils/templates/schemas.py.jinja | 8 +- 25 files changed, 674 insertions(+), 264 deletions(-) diff --git a/core/src/schema.jl b/core/src/schema.jl index 6d5be89d3..aa13abfa9 100644 --- a/core/src/schema.jl +++ b/core/src/schema.jl @@ -286,7 +286,7 @@ end demand::Union{Missing, Float64} return_factor::Float64 min_level::Float64 - priority::Int32 + priority::Union{Missing, Int32} end @version UserDemandTimeV1 begin @@ -295,14 +295,14 @@ end demand::Float64 return_factor::Float64 min_level::Float64 - priority::Int32 + priority::Union{Missing, Int32} end @version LevelDemandStaticV1 begin node_id::Int32 min_level::Union{Missing, Float64} max_level::Union{Missing, Float64} - priority::Int32 + priority::Union{Missing, Int32} end @version LevelDemandTimeV1 begin @@ -310,18 +310,18 @@ end time::DateTime min_level::Union{Missing, Float64} max_level::Union{Missing, Float64} - priority::Int32 + priority::Union{Missing, Int32} end @version FlowDemandStaticV1 begin node_id::Int demand::Float64 - priority::Int32 + priority::Union{Missing, Int32} end @version FlowDemandTimeV1 begin node_id::Int time::DateTime demand::Float64 - priority::Int32 + priority::Union{Missing, Int32} end diff --git a/core/src/util.jl b/core/src/util.jl index 4bde59c6d..38bed6f29 100644 --- a/core/src/util.jl +++ b/core/src/util.jl @@ -446,11 +446,10 @@ function get_all_priorities(db::DB, config::Config)::Vector{Int32} (FlowDemandStaticV1, "FlowDemand / static"), (FlowDemandTimeV1, "FlowDemand / time"), ] - if valid_priorities( - load_structvector(db, config, type).priority, - config.allocation.use_allocation, - ) - union!(priorities, load_structvector(db, config, type).priority) + priority_col = load_structvector(db, config, type).priority + priority_col = Int32.(coalesce.(priority_col, Int32(0))) + if valid_priorities(priority_col, config.allocation.use_allocation) + union!(priorities, priority_col) else is_valid = false @error "Missing priority parameter(s) for a $name node in the allocation problem." diff --git a/core/src/validation.jl b/core/src/validation.jl index 41045081f..cfed36c6c 100644 --- a/core/src/validation.jl +++ b/core/src/validation.jl @@ -673,10 +673,10 @@ function valid_sources( return !errors end -function valid_priorities(priorities::Vector, use_allocation::Bool)::Bool - errors = false - if 0 in priorities && use_allocation - errors = true +function valid_priorities(priorities::Vector{Int32}, use_allocation::Bool)::Bool + if use_allocation && any(iszero, priorities) + return false + else + return true end - return !errors end diff --git a/core/test/validation_test.jl b/core/test/validation_test.jl index ffc99ba26..96c409a7b 100644 --- a/core/test/validation_test.jl +++ b/core/test/validation_test.jl @@ -460,11 +460,9 @@ end normpath(@__DIR__, "../../generated_testmodels/invalid_priorities/ribasim.toml") @test ispath(toml_path) - config = Ribasim.Config(toml_path; allocation_use_allocation = true) - logger = TestLogger() with_logger(logger) do - @test_throws "Priority parameter is missing" Ribasim.run(config) + @test_throws "Priority parameter is missing" Ribasim.run(toml_path) end @test length(logger.logs) == 3 @test logger.logs[1].level == Error diff --git a/docs/guide/examples.ipynb b/docs/guide/examples.ipynb index 4bdcfcbf1..70480dada 100644 --- a/docs/guide/examples.ipynb +++ b/docs/guide/examples.ipynb @@ -414,7 +414,9 @@ "metadata": {}, "outputs": [], "source": [ - "df_basin = pd.read_feather(datadir / \"basic/results/basin.arrow\")\n", + "df_basin = pd.read_feather(\n", + " datadir / \"basic/results/basin.arrow\", dtype_backend=\"pyarrow\"\n", + ")\n", "df_basin_wide = df_basin.pivot_table(\n", " index=\"time\", columns=\"node_id\", values=[\"storage\", \"level\"]\n", ")\n", @@ -428,7 +430,7 @@ "metadata": {}, "outputs": [], "source": [ - "df_flow = pd.read_feather(datadir / \"basic/results/flow.arrow\")\n", + "df_flow = pd.read_feather(datadir / \"basic/results/flow.arrow\", dtype_backend=\"pyarrow\")\n", "df_flow[\"edge\"] = list(zip(df_flow.from_node_id, df_flow.to_node_id))\n", "df_flow[\"flow_m3d\"] = df_flow.flow_rate * 86400\n", "ax = df_flow.pivot_table(index=\"time\", columns=\"edge\", values=\"flow_m3d\").plot()\n", @@ -709,7 +711,9 @@ "metadata": {}, "outputs": [], "source": [ - "df_basin = pd.read_feather(datadir / \"level_range/results/basin.arrow\")\n", + "df_basin = pd.read_feather(\n", + " datadir / \"level_range/results/basin.arrow\", dtype_backend=\"pyarrow\"\n", + ")\n", "df_basin_wide = df_basin.pivot_table(\n", " index=\"time\", columns=\"node_id\", values=[\"storage\", \"level\"]\n", ")\n", @@ -991,7 +995,9 @@ "source": [ "from matplotlib.dates import date2num\n", "\n", - "df_basin = pd.read_feather(datadir / \"pid_control/results/basin.arrow\")\n", + "df_basin = pd.read_feather(\n", + " datadir / \"pid_control/results/basin.arrow\", dtype_backend=\"pyarrow\"\n", + ")\n", "df_basin_wide = df_basin.pivot_table(\n", " index=\"time\", columns=\"node_id\", values=[\"storage\", \"level\"]\n", ")\n", @@ -1272,7 +1278,9 @@ "source": [ "import matplotlib.ticker as plticker\n", "\n", - "df_allocation = pd.read_feather(datadir / \"allocation_example/results/allocation.arrow\")\n", + "df_allocation = pd.read_feather(\n", + " datadir / \"allocation_example/results/allocation.arrow\", dtype_backend=\"pyarrow\"\n", + ")\n", "df_allocation_wide = df_allocation.pivot_table(\n", " index=\"time\",\n", " columns=[\"node_type\", \"node_id\", \"priority\"],\n", @@ -1318,7 +1326,9 @@ "metadata": {}, "outputs": [], "source": [ - "df_basin = pd.read_feather(datadir / \"allocation_example/results/basin.arrow\")\n", + "df_basin = pd.read_feather(\n", + " datadir / \"allocation_example/results/basin.arrow\", dtype_backend=\"pyarrow\"\n", + ")\n", "df_basin_wide = df_basin.pivot_table(\n", " index=\"time\", columns=\"node_id\", values=[\"storage\", \"level\"]\n", ")\n", @@ -1557,7 +1567,9 @@ "metadata": {}, "outputs": [], "source": [ - "df_basin = pd.read_feather(datadir / \"level_demand/results/basin.arrow\")\n", + "df_basin = pd.read_feather(\n", + " datadir / \"level_demand/results/basin.arrow\", dtype_backend=\"pyarrow\"\n", + ")\n", "df_basin = df_basin[df_basin.node_id == 2]\n", "df_basin_wide = df_basin.pivot_table(\n", " index=\"time\", columns=\"node_id\", values=[\"storage\", \"level\"]\n", @@ -1953,7 +1965,7 @@ "outputs": [], "source": [ "datadir_flow = datadir / \"local_pidcontrolled_cascade/results/flow.arrow\"\n", - "df_flow = pd.read_feather(datadir_flow)\n", + "df_flow = pd.read_feather(datadir_flow, dtype_backend=\"pyarrow\")\n", "df_flow[\"edge\"] = list(zip(df_flow.from_node_id, df_flow.to_node_id))\n", "df_flow[\"flow_m3d\"] = df_flow.flow_rate * 86400\n", "\n", @@ -1995,7 +2007,7 @@ "outputs": [], "source": [ "datadir_basin = datadir / \"local_pidcontrolled_cascade/results/basin.arrow\"\n", - "df_basin = pd.read_feather(datadir_basin)\n", + "df_basin = pd.read_feather(datadir_basin, dtype_backend=\"pyarrow\")\n", "df_basin[\"vertical_flux\"] = (\n", " df_basin[\"precipitation\"]\n", " - df_basin[\"evaporation\"]\n", @@ -2060,7 +2072,9 @@ " Node(1, Point(0, 0)),\n", " [\n", " level_boundary.Time(\n", - " time=pd.date_range(start=\"2020-01-01\", end=\"2021-01-01\", periods=100),\n", + " time=pd.date_range(\n", + " start=\"2020-01-01\", end=\"2021-01-01\", periods=100, unit=\"ms\"\n", + " ),\n", " level=6.0 + np.sin(np.linspace(0, 6 * np.pi, 100)),\n", " )\n", " ],\n", @@ -2286,7 +2300,9 @@ "metadata": {}, "outputs": [], "source": [ - "df_flow = pd.read_feather(datadir / \"outlet_continuous_control/results/flow.arrow\")\n", + "df_flow = pd.read_feather(\n", + " datadir / \"outlet_continuous_control/results/flow.arrow\", dtype_backend=\"pyarrow\"\n", + ")\n", "fig, ax = plt.subplots()\n", "\n", "\n", diff --git a/docs/tutorial/irrigation-demand.ipynb b/docs/tutorial/irrigation-demand.ipynb index 768608cf2..4497c8381 100644 --- a/docs/tutorial/irrigation-demand.ipynb +++ b/docs/tutorial/irrigation-demand.ipynb @@ -309,7 +309,9 @@ "metadata": {}, "outputs": [], "source": [ - "df_basin = pd.read_feather(base_dir / \"Crystal-2/results/basin.arrow\")\n", + "df_basin = pd.read_feather(\n", + " base_dir / \"Crystal-2/results/basin.arrow\", dtype_backend=\"pyarrow\"\n", + ")\n", "\n", "# Create pivot tables and plot for basin data\n", "df_basin_wide = df_basin.pivot_table(\n", @@ -393,7 +395,9 @@ "metadata": {}, "outputs": [], "source": [ - "df_flow = pd.read_feather(base_dir / \"Crystal-2/results/flow.arrow\")\n", + "df_flow = pd.read_feather(\n", + " base_dir / \"Crystal-2/results/flow.arrow\", dtype_backend=\"pyarrow\"\n", + ")\n", "# Add the edge names and then remove unnamed edges\n", "df_flow[\"name\"] = model.edge.df[\"name\"].loc[df_flow[\"edge_id\"]].to_numpy()\n", "df_flow = df_flow[df_flow[\"name\"].astype(bool)]\n", diff --git a/docs/tutorial/natural-flow.ipynb b/docs/tutorial/natural-flow.ipynb index a4e817a0a..3807769bf 100644 --- a/docs/tutorial/natural-flow.ipynb +++ b/docs/tutorial/natural-flow.ipynb @@ -404,7 +404,9 @@ "metadata": {}, "outputs": [], "source": [ - "df_basin = pd.read_feather(base_dir / \"Crystal-1/results/basin.arrow\")\n", + "df_basin = pd.read_feather(\n", + " base_dir / \"Crystal-1/results/basin.arrow\", dtype_backend=\"pyarrow\"\n", + ")\n", "\n", "# Create pivot tables and plot for Basin data\n", "df_basin_wide = df_basin.pivot_table(\n", @@ -453,7 +455,9 @@ "source": [ "# Plot flow data\n", "# Read the flow results\n", - "df_flow = pd.read_feather(base_dir / \"Crystal-1/results/flow.arrow\")\n", + "df_flow = pd.read_feather(\n", + " base_dir / \"Crystal-1/results/flow.arrow\", dtype_backend=\"pyarrow\"\n", + ")\n", "# Add the edge names and then remove unnamed edges\n", "df_flow[\"name\"] = model.edge.df[\"name\"].loc[df_flow[\"edge_id\"]].to_numpy()\n", "df_flow = df_flow[df_flow[\"name\"].astype(bool)]\n", diff --git a/docs/tutorial/reservoir.ipynb b/docs/tutorial/reservoir.ipynb index bcd262745..03c1673ed 100644 --- a/docs/tutorial/reservoir.ipynb +++ b/docs/tutorial/reservoir.ipynb @@ -283,7 +283,9 @@ "metadata": {}, "outputs": [], "source": [ - "df_basin = pd.read_feather(base_dir / \"Crystal-3/results/basin.arrow\")\n", + "df_basin = pd.read_feather(\n", + " base_dir / \"Crystal-3/results/basin.arrow\", dtype_backend=\"pyarrow\"\n", + ")\n", "\n", "# Create pivot tables and plot for Basin data\n", "df_basin_wide = df_basin.pivot_table(\n", @@ -329,7 +331,9 @@ "metadata": {}, "outputs": [], "source": [ - "df_flow = pd.read_feather(base_dir / \"Crystal-3/results/flow.arrow\")\n", + "df_flow = pd.read_feather(\n", + " base_dir / \"Crystal-3/results/flow.arrow\", dtype_backend=\"pyarrow\"\n", + ")\n", "# Add the edge names and then remove unnamed edges\n", "df_flow[\"name\"] = model.edge.df[\"name\"].loc[df_flow[\"edge_id\"]].to_numpy()\n", "df_flow = df_flow[df_flow[\"name\"].astype(bool)]\n", diff --git a/pixi.toml b/pixi.toml index 347ec664a..b8759c8f2 100644 --- a/pixi.toml +++ b/pixi.toml @@ -88,6 +88,7 @@ test-ribasim-regression = { cmd = "julia --project=core --eval 'using Pkg; Pkg.t generate-testmodels = { cmd = "python utils/generate-testmodels.py", inputs = [ "python/ribasim", "python/ribasim_testmodels", + "utils/generate-testmodels.py", ], outputs = [ "generated_testmodels", ] } @@ -99,6 +100,7 @@ codegen = { cmd = "julia --project utils/gen_python.jl && ruff format python/rib "initialize-julia", ], inputs = [ "core", + "utils", ], outputs = [ "python/ribasim/ribasim/schemas.py", "python/ribasim/ribasim/validation.py", diff --git a/python/ribasim/ribasim/config.py b/python/ribasim/ribasim/config.py index bc3d7ad5b..2511c1eac 100644 --- a/python/ribasim/ribasim/config.py +++ b/python/ribasim/ribasim/config.py @@ -51,7 +51,7 @@ UserDemandStaticSchema, UserDemandTimeSchema, ) -from ribasim.utils import _pascal_to_snake +from ribasim.utils import _concat, _pascal_to_snake class Allocation(ChildModel): @@ -242,11 +242,10 @@ def add( ) assert table.df is not None table_to_append = table.df.assign(node_id=node_id) - setattr( - self, - member_name, - pd.concat([existing_table, table_to_append], ignore_index=True), - ) + if isinstance(table_to_append, GeoDataFrame): + table_to_append.set_crs(self._parent.crs, inplace=True) + new_table = _concat([existing_table, table_to_append], ignore_index=True) + setattr(self, member_name, new_table) node_table = node.into_geodataframe( node_type=self.__class__.__name__, node_id=node_id @@ -255,7 +254,7 @@ def add( if self.node.df is None: self.node.df = node_table else: - df = pd.concat([self.node.df, node_table]) + df = _concat([self.node.df, node_table]) self.node.df = df self._parent._used_node_ids.add(node_id) diff --git a/python/ribasim/ribasim/delwaq/generate.py b/python/ribasim/ribasim/delwaq/generate.py index 740d5cd42..84317c86f 100644 --- a/python/ribasim/ribasim/delwaq/generate.py +++ b/python/ribasim/ribasim/delwaq/generate.py @@ -5,7 +5,7 @@ from datetime import timedelta from pathlib import Path -from ribasim.utils import MissingOptionalModule +from ribasim.utils import MissingOptionalModule, _concat try: import networkx as nx @@ -70,17 +70,18 @@ def _make_boundary(data, boundary_type): """ bid = _boundary_name(data.node_id.iloc[0], boundary_type) piv = ( - data.pivot_table(index="time", columns="substance", values="concentration") + data.pivot_table( + index="time", columns="substance", values="concentration", fill_value=-999 + ) .reset_index() .reset_index(drop=True) ) - piv.time = piv.time.dt.strftime("%Y/%m/%d-%H:%M:%S") + # Convert Arrow time to Numpy to avoid needing tzdata somehow + piv.time = piv.time.astype("datetime64[ns]").dt.strftime("%Y/%m/%d-%H:%M:%S") boundary = { "name": bid, "substances": list(map(_quote, piv.columns[1:])), - "df": piv.to_string( - formatters={"time": _quote}, header=False, index=False, na_rep=-999 - ), + "df": piv.to_string(formatters={"time": _quote}, header=False, index=False), } substances = data.substance.unique() return boundary, substances @@ -181,7 +182,7 @@ def _setup_graph(nodes, edge, use_evaporation=True): boundary_id -= 1 node_mapping[node_id] = boundary_id else: - raise Exception("Found unexpected node $node_id in delwaq graph.") + raise Exception(f"Found unexpected node {node_id} in delwaq graph.") nx.relabel_nodes(G, node_mapping, copy=False) @@ -281,8 +282,12 @@ def generate( # Read in model and results model = ribasim.Model.read(toml_path) - basins = pd.read_feather(toml_path.parent / "results" / "basin.arrow") - flows = pd.read_feather(toml_path.parent / "results" / "flow.arrow") + basins = pd.read_feather( + toml_path.parent / "results" / "basin.arrow", dtype_backend="pyarrow" + ) + flows = pd.read_feather( + toml_path.parent / "results" / "flow.arrow", dtype_backend="pyarrow" + ) output_folder.mkdir(exist_ok=True) @@ -359,7 +364,7 @@ def generate( columns={boundary_type: "flow_rate"} ) df["edge_id"] = edge_id - nflows = pd.concat([nflows, df], ignore_index=True) + nflows = _concat([nflows, df], ignore_index=True) # Save flows to Delwaq format nflows.sort_values(by=["time", "edge_id"], inplace=True) diff --git a/python/ribasim/ribasim/delwaq/parse.py b/python/ribasim/ribasim/delwaq/parse.py index 3c527d02a..0e87a24f4 100644 --- a/python/ribasim/ribasim/delwaq/parse.py +++ b/python/ribasim/ribasim/delwaq/parse.py @@ -2,10 +2,8 @@ from pathlib import Path -import pandas as pd - import ribasim -from ribasim.utils import MissingOptionalModule +from ribasim.utils import MissingOptionalModule, _concat try: import xugrid as xu @@ -49,7 +47,7 @@ def parse( dfs.append(df) - df = pd.concat(dfs).reset_index(drop=True) + df = _concat(dfs).reset_index(drop=True) df.sort_values(["time", "node_id"], inplace=True) model.basin.concentration_external = df diff --git a/python/ribasim/ribasim/geometry/edge.py b/python/ribasim/ribasim/geometry/edge.py index 4f0480aa3..9142c6817 100644 --- a/python/ribasim/ribasim/geometry/edge.py +++ b/python/ribasim/ribasim/geometry/edge.py @@ -14,7 +14,7 @@ from shapely.geometry import LineString, MultiLineString, Point from ribasim.input_base import SpatialTableModel -from ribasim.utils import UsedIDs +from ribasim.utils import UsedIDs, _concat from ribasim.validation import ( can_connect, control_edge_neighbor_amount, @@ -131,7 +131,7 @@ def add( index=pd.Index([edge_id], name="edge_id"), ) - self.df = GeoDataFrame[EdgeSchema](pd.concat([self.df, table_to_append])) + self.df = GeoDataFrame[EdgeSchema](_concat([self.df, table_to_append])) if self.df.duplicated(subset=["from_node_id", "to_node_id"]).any(): raise ValueError( f"Edges have to be unique, but edge with from_node_id {from_node.node_id} to_node_id {to_node.node_id} already exists." diff --git a/python/ribasim/ribasim/input_base.py b/python/ribasim/ribasim/input_base.py index ed797553f..c6613c908 100644 --- a/python/ribasim/ribasim/input_base.py +++ b/python/ribasim/ribasim/input_base.py @@ -61,8 +61,6 @@ "user_demand", ] -gpd.options.io_engine = "pyogrio" - context_file_loading: ContextVar[dict[str, Any]] = ContextVar( "file_loading", default={} ) @@ -294,7 +292,12 @@ def _from_db(cls, path: Path, table: str) -> pd.DataFrame | None: if exists(connection, table): query = f"select * from {esc_id(table)}" df = pd.read_sql_query( - query, connection, parse_dates={"time": {"format": "ISO8601"}} + query, + connection, + # we store TIMESTAMP in SQLite like "2025-05-29 14:16:00" + # see https://www.sqlite.org/lang_datefunc.html + parse_dates={"time": {"format": "ISO8601"}}, + dtype_backend="pyarrow", ) df.set_index("fid", inplace=True) else: @@ -305,7 +308,7 @@ def _from_db(cls, path: Path, table: str) -> pd.DataFrame | None: @classmethod def _from_arrow(cls, path: Path) -> pd.DataFrame: directory = context_file_loading.get().get("directory", Path(".")) - return pd.read_feather(directory / path) + return pd.read_feather(directory / path, dtype_backend="pyarrow") def sort(self): """Sort the table as required. @@ -374,7 +377,15 @@ def _from_db(cls, path: Path, table: str): with closing(connect(path)) as connection: if exists(connection, table): # pyogrio hardcodes fid name on reading - df = gpd.read_file(path, layer=table, fid_as_index=True) + df = gpd.read_file( + path, + layer=table, + engine="pyogrio", + fid_as_index=True, + use_arrow=True, + # tell pyarrow to map to pd.ArrowDtype rather than NumPy + arrow_to_pandas_kwargs={"types_mapper": pd.ArrowDtype}, + ) df.index.rename(cls.tableschema()._index_name(), inplace=True) else: df = None @@ -396,6 +407,7 @@ def _write_geopackage(self, path: Path) -> None: driver="GPKG", index=True, fid=self.df.index.name, + engine="pyogrio", ) _add_styles_to_geopackage(path, self.tablename()) diff --git a/python/ribasim/ribasim/model.py b/python/ribasim/ribasim/model.py index 9d4ad4db4..1f2d1ceab 100644 --- a/python/ribasim/ribasim/model.py +++ b/python/ribasim/ribasim/model.py @@ -55,6 +55,7 @@ from ribasim.utils import ( MissingOptionalModule, UsedIDs, + _concat, _edge_lookup, _node_lookup, _node_lookup_numpy, @@ -236,7 +237,7 @@ def node_table(self) -> NodeTable: """Compute the full sorted NodeTable from all node types.""" df_chunks = [node.node.df for node in self._nodes()] df = ( - pd.concat(df_chunks) + _concat(df_chunks) if df_chunks else pd.DataFrame(index=pd.Index([], name="node_id")) ) @@ -308,7 +309,7 @@ def write(self, filepath: str | PathLike[str]) -> Path: def _validate_model(self): df_edge = self.edge.df df_chunks = [node.node.df for node in self._nodes()] - df_node = pd.concat(df_chunks) + df_node = _concat(df_chunks) df_graph = df_edge # Join df_edge with df_node to get to_node_type @@ -416,7 +417,7 @@ def _add_source_sink_node( f"{direction}_node_count": 0, f"{direction}_node_type": node, } - node_info = pd.concat( + node_info = _concat( [node_info, pd.DataFrame([new_row])], ignore_index=True ) @@ -454,8 +455,8 @@ def plot_control_listen(self, ax): df_listen_edge = pd.DataFrame( data={ - "control_node_id": pd.Series([], dtype=np.int32), - "listen_node_id": pd.Series([], dtype=np.int32), + "control_node_id": pd.Series([], dtype="int32[pyarrow]"), + "listen_node_id": pd.Series([], dtype="int32[pyarrow]"), } ) @@ -466,7 +467,7 @@ def plot_control_listen(self, ax): to_add = table[["node_id", "listen_node_id"]].drop_duplicates() to_add.columns = ["control_node_id", "listen_node_id"] - df_listen_edge = pd.concat([df_listen_edge, to_add]) + df_listen_edge = _concat([df_listen_edge, to_add]) # Listen edges from ContinuousControl and DiscreteControl for table, name in ( @@ -481,7 +482,7 @@ def plot_control_listen(self, ax): "control_node_id", "listen_node_id", ] - df_listen_edge = pd.concat([df_listen_edge, to_add]) + df_listen_edge = _concat([df_listen_edge, to_add]) # Collect geometry data node = self.node_table().df @@ -654,8 +655,8 @@ def _add_flow(self, uds, node_lookup): "perhaps the model needs to be run first." ) - basin_df = pd.read_feather(basin_path) - flow_df = pd.read_feather(flow_path) + basin_df = pd.read_feather(basin_path, dtype_backend="pyarrow") + flow_df = pd.read_feather(flow_path, dtype_backend="pyarrow") _time_in_ns(basin_df) _time_in_ns(flow_df) @@ -695,6 +696,7 @@ def _add_allocation(self, uds): alloc_flow_df = pd.read_feather( alloc_flow_path, columns=["time", "edge_id", "flow_rate", "optimization_type", "priority"], + dtype_backend="pyarrow", ) _time_in_ns(alloc_flow_df) diff --git a/python/ribasim/ribasim/schemas.py b/python/ribasim/ribasim/schemas.py index 0a26d361e..ffefdb124 100644 --- a/python/ribasim/ribasim/schemas.py +++ b/python/ribasim/ribasim/schemas.py @@ -1,9 +1,11 @@ # Automatically generated file. Do not modify. -from typing import Any, Callable +from typing import Annotated, Any, Callable +import pandas as pd import pandera as pa -from pandera.dtypes import Int32, Timestamp +import pyarrow +from pandera.dtypes import Int32 from pandera.typing import Index, Series from ribasim import migrations @@ -28,287 +30,579 @@ def migrate(cls, df: Any, schema_version: int) -> Any: class BasinConcentrationExternalSchema(_BaseSchema): fid: Index[Int32] = pa.Field(default=1, check_name=True, coerce=True) - node_id: Series[Int32] = pa.Field(nullable=False, default=0) - time: Series[Timestamp] = pa.Field(nullable=False) - substance: Series[str] = pa.Field(nullable=False) - concentration: Series[float] = pa.Field(nullable=True) + node_id: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=False, default=0 + ) + time: Series[Annotated[pd.ArrowDtype, pyarrow.timestamp("ms")]] = pa.Field( + nullable=False + ) + substance: Series[Annotated[pd.ArrowDtype, pyarrow.string()]] = pa.Field( + nullable=False + ) + concentration: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=True + ) class BasinConcentrationStateSchema(_BaseSchema): fid: Index[Int32] = pa.Field(default=1, check_name=True, coerce=True) - node_id: Series[Int32] = pa.Field(nullable=False, default=0) - substance: Series[str] = pa.Field(nullable=False) - concentration: Series[float] = pa.Field(nullable=True) + node_id: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=False, default=0 + ) + substance: Series[Annotated[pd.ArrowDtype, pyarrow.string()]] = pa.Field( + nullable=False + ) + concentration: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=True + ) class BasinConcentrationSchema(_BaseSchema): fid: Index[Int32] = pa.Field(default=1, check_name=True, coerce=True) - node_id: Series[Int32] = pa.Field(nullable=False, default=0) - time: Series[Timestamp] = pa.Field(nullable=False) - substance: Series[str] = pa.Field(nullable=False) - drainage: Series[float] = pa.Field(nullable=True) - precipitation: Series[float] = pa.Field(nullable=True) + node_id: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=False, default=0 + ) + time: Series[Annotated[pd.ArrowDtype, pyarrow.timestamp("ms")]] = pa.Field( + nullable=False + ) + substance: Series[Annotated[pd.ArrowDtype, pyarrow.string()]] = pa.Field( + nullable=False + ) + drainage: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=True + ) + precipitation: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=True + ) class BasinProfileSchema(_BaseSchema): fid: Index[Int32] = pa.Field(default=1, check_name=True, coerce=True) - node_id: Series[Int32] = pa.Field(nullable=False, default=0) - area: Series[float] = pa.Field(nullable=False) - level: Series[float] = pa.Field(nullable=False) + node_id: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=False, default=0 + ) + area: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field(nullable=False) + level: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=False + ) class BasinStateSchema(_BaseSchema): fid: Index[Int32] = pa.Field(default=1, check_name=True, coerce=True) - node_id: Series[Int32] = pa.Field(nullable=False, default=0) - level: Series[float] = pa.Field(nullable=False) + node_id: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=False, default=0 + ) + level: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=False + ) class BasinStaticSchema(_BaseSchema): fid: Index[Int32] = pa.Field(default=1, check_name=True, coerce=True) - node_id: Series[Int32] = pa.Field(nullable=False, default=0) - drainage: Series[float] = pa.Field(nullable=True) - potential_evaporation: Series[float] = pa.Field(nullable=True) - infiltration: Series[float] = pa.Field(nullable=True) - precipitation: Series[float] = pa.Field(nullable=True) + node_id: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=False, default=0 + ) + drainage: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=True + ) + potential_evaporation: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = ( + pa.Field(nullable=True) + ) + infiltration: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=True + ) + precipitation: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=True + ) class BasinSubgridSchema(_BaseSchema): fid: Index[Int32] = pa.Field(default=1, check_name=True, coerce=True) - subgrid_id: Series[Int32] = pa.Field(nullable=False, default=0) - node_id: Series[Int32] = pa.Field(nullable=False, default=0) - basin_level: Series[float] = pa.Field(nullable=False) - subgrid_level: Series[float] = pa.Field(nullable=False) + subgrid_id: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=False + ) + node_id: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=False, default=0 + ) + basin_level: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=False + ) + subgrid_level: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=False + ) class BasinTimeSchema(_BaseSchema): fid: Index[Int32] = pa.Field(default=1, check_name=True, coerce=True) - node_id: Series[Int32] = pa.Field(nullable=False, default=0) - time: Series[Timestamp] = pa.Field(nullable=False) - drainage: Series[float] = pa.Field(nullable=True) - potential_evaporation: Series[float] = pa.Field(nullable=True) - infiltration: Series[float] = pa.Field(nullable=True) - precipitation: Series[float] = pa.Field(nullable=True) + node_id: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=False, default=0 + ) + time: Series[Annotated[pd.ArrowDtype, pyarrow.timestamp("ms")]] = pa.Field( + nullable=False + ) + drainage: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=True + ) + potential_evaporation: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = ( + pa.Field(nullable=True) + ) + infiltration: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=True + ) + precipitation: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=True + ) class ContinuousControlFunctionSchema(_BaseSchema): fid: Index[Int32] = pa.Field(default=1, check_name=True, coerce=True) - node_id: Series[Int32] = pa.Field(nullable=False, default=0) - input: Series[float] = pa.Field(nullable=False) - output: Series[float] = pa.Field(nullable=False) - controlled_variable: Series[str] = pa.Field(nullable=False) + node_id: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=False, default=0 + ) + input: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=False + ) + output: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=False + ) + controlled_variable: Series[Annotated[pd.ArrowDtype, pyarrow.string()]] = pa.Field( + nullable=False + ) class ContinuousControlVariableSchema(_BaseSchema): fid: Index[Int32] = pa.Field(default=1, check_name=True, coerce=True) - node_id: Series[Int32] = pa.Field(nullable=False, default=0) - listen_node_id: Series[Int32] = pa.Field(nullable=False, default=0) - variable: Series[str] = pa.Field(nullable=False) - weight: Series[float] = pa.Field(nullable=True) - look_ahead: Series[float] = pa.Field(nullable=True) + node_id: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=False, default=0 + ) + listen_node_id: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=False + ) + variable: Series[Annotated[pd.ArrowDtype, pyarrow.string()]] = pa.Field( + nullable=False + ) + weight: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=True + ) + look_ahead: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=True + ) class DiscreteControlConditionSchema(_BaseSchema): fid: Index[Int32] = pa.Field(default=1, check_name=True, coerce=True) - node_id: Series[Int32] = pa.Field(nullable=False, default=0) - compound_variable_id: Series[Int32] = pa.Field(nullable=False, default=0) - greater_than: Series[float] = pa.Field(nullable=False) + node_id: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=False, default=0 + ) + compound_variable_id: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=False + ) + greater_than: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=False + ) class DiscreteControlLogicSchema(_BaseSchema): fid: Index[Int32] = pa.Field(default=1, check_name=True, coerce=True) - node_id: Series[Int32] = pa.Field(nullable=False, default=0) - truth_state: Series[str] = pa.Field(nullable=False) - control_state: Series[str] = pa.Field(nullable=False) + node_id: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=False, default=0 + ) + truth_state: Series[Annotated[pd.ArrowDtype, pyarrow.string()]] = pa.Field( + nullable=False + ) + control_state: Series[Annotated[pd.ArrowDtype, pyarrow.string()]] = pa.Field( + nullable=False + ) class DiscreteControlVariableSchema(_BaseSchema): fid: Index[Int32] = pa.Field(default=1, check_name=True, coerce=True) - node_id: Series[Int32] = pa.Field(nullable=False, default=0) - compound_variable_id: Series[Int32] = pa.Field(nullable=False, default=0) - listen_node_id: Series[Int32] = pa.Field(nullable=False, default=0) - variable: Series[str] = pa.Field(nullable=False) - weight: Series[float] = pa.Field(nullable=True) - look_ahead: Series[float] = pa.Field(nullable=True) + node_id: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=False, default=0 + ) + compound_variable_id: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=False + ) + listen_node_id: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=False + ) + variable: Series[Annotated[pd.ArrowDtype, pyarrow.string()]] = pa.Field( + nullable=False + ) + weight: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=True + ) + look_ahead: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=True + ) class FlowBoundaryConcentrationSchema(_BaseSchema): fid: Index[Int32] = pa.Field(default=1, check_name=True, coerce=True) - node_id: Series[Int32] = pa.Field(nullable=False, default=0) - time: Series[Timestamp] = pa.Field(nullable=False) - substance: Series[str] = pa.Field(nullable=False) - concentration: Series[float] = pa.Field(nullable=False) + node_id: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=False, default=0 + ) + time: Series[Annotated[pd.ArrowDtype, pyarrow.timestamp("ms")]] = pa.Field( + nullable=False + ) + substance: Series[Annotated[pd.ArrowDtype, pyarrow.string()]] = pa.Field( + nullable=False + ) + concentration: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=False + ) class FlowBoundaryStaticSchema(_BaseSchema): fid: Index[Int32] = pa.Field(default=1, check_name=True, coerce=True) - node_id: Series[Int32] = pa.Field(nullable=False, default=0) - active: Series[pa.BOOL] = pa.Field(nullable=True) - flow_rate: Series[float] = pa.Field(nullable=False) + node_id: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=False, default=0 + ) + active: Series[Annotated[pd.ArrowDtype, pyarrow.bool_()]] = pa.Field(nullable=True) + flow_rate: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=False + ) class FlowBoundaryTimeSchema(_BaseSchema): fid: Index[Int32] = pa.Field(default=1, check_name=True, coerce=True) - node_id: Series[Int32] = pa.Field(nullable=False, default=0) - time: Series[Timestamp] = pa.Field(nullable=False) - flow_rate: Series[float] = pa.Field(nullable=False) + node_id: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=False, default=0 + ) + time: Series[Annotated[pd.ArrowDtype, pyarrow.timestamp("ms")]] = pa.Field( + nullable=False + ) + flow_rate: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=False + ) class FlowDemandStaticSchema(_BaseSchema): fid: Index[Int32] = pa.Field(default=1, check_name=True, coerce=True) - node_id: Series[Int32] = pa.Field(nullable=False, default=0) - demand: Series[float] = pa.Field(nullable=False) - priority: Series[Int32] = pa.Field(nullable=False, default=0) + node_id: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=False, default=0 + ) + demand: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=False + ) + priority: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=True + ) class FlowDemandTimeSchema(_BaseSchema): fid: Index[Int32] = pa.Field(default=1, check_name=True, coerce=True) - node_id: Series[Int32] = pa.Field(nullable=False, default=0) - time: Series[Timestamp] = pa.Field(nullable=False) - demand: Series[float] = pa.Field(nullable=False) - priority: Series[Int32] = pa.Field(nullable=False, default=0) + node_id: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=False, default=0 + ) + time: Series[Annotated[pd.ArrowDtype, pyarrow.timestamp("ms")]] = pa.Field( + nullable=False + ) + demand: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=False + ) + priority: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=True + ) class LevelBoundaryConcentrationSchema(_BaseSchema): fid: Index[Int32] = pa.Field(default=1, check_name=True, coerce=True) - node_id: Series[Int32] = pa.Field(nullable=False, default=0) - time: Series[Timestamp] = pa.Field(nullable=False) - substance: Series[str] = pa.Field(nullable=False) - concentration: Series[float] = pa.Field(nullable=False) + node_id: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=False, default=0 + ) + time: Series[Annotated[pd.ArrowDtype, pyarrow.timestamp("ms")]] = pa.Field( + nullable=False + ) + substance: Series[Annotated[pd.ArrowDtype, pyarrow.string()]] = pa.Field( + nullable=False + ) + concentration: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=False + ) class LevelBoundaryStaticSchema(_BaseSchema): fid: Index[Int32] = pa.Field(default=1, check_name=True, coerce=True) - node_id: Series[Int32] = pa.Field(nullable=False, default=0) - active: Series[pa.BOOL] = pa.Field(nullable=True) - level: Series[float] = pa.Field(nullable=False) + node_id: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=False, default=0 + ) + active: Series[Annotated[pd.ArrowDtype, pyarrow.bool_()]] = pa.Field(nullable=True) + level: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=False + ) class LevelBoundaryTimeSchema(_BaseSchema): fid: Index[Int32] = pa.Field(default=1, check_name=True, coerce=True) - node_id: Series[Int32] = pa.Field(nullable=False, default=0) - time: Series[Timestamp] = pa.Field(nullable=False) - level: Series[float] = pa.Field(nullable=False) + node_id: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=False, default=0 + ) + time: Series[Annotated[pd.ArrowDtype, pyarrow.timestamp("ms")]] = pa.Field( + nullable=False + ) + level: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=False + ) class LevelDemandStaticSchema(_BaseSchema): fid: Index[Int32] = pa.Field(default=1, check_name=True, coerce=True) - node_id: Series[Int32] = pa.Field(nullable=False, default=0) - min_level: Series[float] = pa.Field(nullable=True) - max_level: Series[float] = pa.Field(nullable=True) - priority: Series[Int32] = pa.Field(nullable=False, default=0) + node_id: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=False, default=0 + ) + min_level: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=True + ) + max_level: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=True + ) + priority: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=True + ) class LevelDemandTimeSchema(_BaseSchema): fid: Index[Int32] = pa.Field(default=1, check_name=True, coerce=True) - node_id: Series[Int32] = pa.Field(nullable=False, default=0) - time: Series[Timestamp] = pa.Field(nullable=False) - min_level: Series[float] = pa.Field(nullable=True) - max_level: Series[float] = pa.Field(nullable=True) - priority: Series[Int32] = pa.Field(nullable=False, default=0) + node_id: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=False, default=0 + ) + time: Series[Annotated[pd.ArrowDtype, pyarrow.timestamp("ms")]] = pa.Field( + nullable=False + ) + min_level: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=True + ) + max_level: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=True + ) + priority: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=True + ) class LinearResistanceStaticSchema(_BaseSchema): fid: Index[Int32] = pa.Field(default=1, check_name=True, coerce=True) - node_id: Series[Int32] = pa.Field(nullable=False, default=0) - active: Series[pa.BOOL] = pa.Field(nullable=True) - resistance: Series[float] = pa.Field(nullable=False) - max_flow_rate: Series[float] = pa.Field(nullable=True) - control_state: Series[str] = pa.Field(nullable=True) + node_id: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=False, default=0 + ) + active: Series[Annotated[pd.ArrowDtype, pyarrow.bool_()]] = pa.Field(nullable=True) + resistance: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=False + ) + max_flow_rate: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=True + ) + control_state: Series[Annotated[pd.ArrowDtype, pyarrow.string()]] = pa.Field( + nullable=True + ) class ManningResistanceStaticSchema(_BaseSchema): fid: Index[Int32] = pa.Field(default=1, check_name=True, coerce=True) - node_id: Series[Int32] = pa.Field(nullable=False, default=0) - active: Series[pa.BOOL] = pa.Field(nullable=True) - length: Series[float] = pa.Field(nullable=False) - manning_n: Series[float] = pa.Field(nullable=False) - profile_width: Series[float] = pa.Field(nullable=False) - profile_slope: Series[float] = pa.Field(nullable=False) - control_state: Series[str] = pa.Field(nullable=True) + node_id: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=False, default=0 + ) + active: Series[Annotated[pd.ArrowDtype, pyarrow.bool_()]] = pa.Field(nullable=True) + length: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=False + ) + manning_n: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=False + ) + profile_width: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=False + ) + profile_slope: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=False + ) + control_state: Series[Annotated[pd.ArrowDtype, pyarrow.string()]] = pa.Field( + nullable=True + ) class OutletStaticSchema(_BaseSchema): fid: Index[Int32] = pa.Field(default=1, check_name=True, coerce=True) - node_id: Series[Int32] = pa.Field(nullable=False, default=0) - active: Series[pa.BOOL] = pa.Field(nullable=True) - flow_rate: Series[float] = pa.Field(nullable=False) - min_flow_rate: Series[float] = pa.Field(nullable=True) - max_flow_rate: Series[float] = pa.Field(nullable=True) - min_upstream_level: Series[float] = pa.Field(nullable=True) - max_downstream_level: Series[float] = pa.Field(nullable=True) - control_state: Series[str] = pa.Field(nullable=True) + node_id: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=False, default=0 + ) + active: Series[Annotated[pd.ArrowDtype, pyarrow.bool_()]] = pa.Field(nullable=True) + flow_rate: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=False + ) + min_flow_rate: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=True + ) + max_flow_rate: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=True + ) + min_upstream_level: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=True + ) + max_downstream_level: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = ( + pa.Field(nullable=True) + ) + control_state: Series[Annotated[pd.ArrowDtype, pyarrow.string()]] = pa.Field( + nullable=True + ) class PidControlStaticSchema(_BaseSchema): fid: Index[Int32] = pa.Field(default=1, check_name=True, coerce=True) - node_id: Series[Int32] = pa.Field(nullable=False, default=0) - active: Series[pa.BOOL] = pa.Field(nullable=True) - listen_node_id: Series[Int32] = pa.Field(nullable=False, default=0) - target: Series[float] = pa.Field(nullable=False) - proportional: Series[float] = pa.Field(nullable=False) - integral: Series[float] = pa.Field(nullable=False) - derivative: Series[float] = pa.Field(nullable=False) - control_state: Series[str] = pa.Field(nullable=True) + node_id: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=False, default=0 + ) + active: Series[Annotated[pd.ArrowDtype, pyarrow.bool_()]] = pa.Field(nullable=True) + listen_node_id: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=False + ) + target: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=False + ) + proportional: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=False + ) + integral: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=False + ) + derivative: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=False + ) + control_state: Series[Annotated[pd.ArrowDtype, pyarrow.string()]] = pa.Field( + nullable=True + ) class PidControlTimeSchema(_BaseSchema): fid: Index[Int32] = pa.Field(default=1, check_name=True, coerce=True) - node_id: Series[Int32] = pa.Field(nullable=False, default=0) - listen_node_id: Series[Int32] = pa.Field(nullable=False, default=0) - time: Series[Timestamp] = pa.Field(nullable=False) - target: Series[float] = pa.Field(nullable=False) - proportional: Series[float] = pa.Field(nullable=False) - integral: Series[float] = pa.Field(nullable=False) - derivative: Series[float] = pa.Field(nullable=False) - control_state: Series[str] = pa.Field(nullable=True) + node_id: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=False, default=0 + ) + listen_node_id: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=False + ) + time: Series[Annotated[pd.ArrowDtype, pyarrow.timestamp("ms")]] = pa.Field( + nullable=False + ) + target: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=False + ) + proportional: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=False + ) + integral: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=False + ) + derivative: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=False + ) + control_state: Series[Annotated[pd.ArrowDtype, pyarrow.string()]] = pa.Field( + nullable=True + ) class PumpStaticSchema(_BaseSchema): fid: Index[Int32] = pa.Field(default=1, check_name=True, coerce=True) - node_id: Series[Int32] = pa.Field(nullable=False, default=0) - active: Series[pa.BOOL] = pa.Field(nullable=True) - flow_rate: Series[float] = pa.Field(nullable=False) - min_flow_rate: Series[float] = pa.Field(nullable=True) - max_flow_rate: Series[float] = pa.Field(nullable=True) - min_upstream_level: Series[float] = pa.Field(nullable=True) - max_downstream_level: Series[float] = pa.Field(nullable=True) - control_state: Series[str] = pa.Field(nullable=True) + node_id: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=False, default=0 + ) + active: Series[Annotated[pd.ArrowDtype, pyarrow.bool_()]] = pa.Field(nullable=True) + flow_rate: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=False + ) + min_flow_rate: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=True + ) + max_flow_rate: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=True + ) + min_upstream_level: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=True + ) + max_downstream_level: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = ( + pa.Field(nullable=True) + ) + control_state: Series[Annotated[pd.ArrowDtype, pyarrow.string()]] = pa.Field( + nullable=True + ) class TabulatedRatingCurveStaticSchema(_BaseSchema): fid: Index[Int32] = pa.Field(default=1, check_name=True, coerce=True) - node_id: Series[Int32] = pa.Field(nullable=False, default=0) - active: Series[pa.BOOL] = pa.Field(nullable=True) - level: Series[float] = pa.Field(nullable=False) - flow_rate: Series[float] = pa.Field(nullable=False) - max_downstream_level: Series[float] = pa.Field(nullable=True) - control_state: Series[str] = pa.Field(nullable=True) + node_id: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=False, default=0 + ) + active: Series[Annotated[pd.ArrowDtype, pyarrow.bool_()]] = pa.Field(nullable=True) + level: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=False + ) + flow_rate: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=False + ) + max_downstream_level: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = ( + pa.Field(nullable=True) + ) + control_state: Series[Annotated[pd.ArrowDtype, pyarrow.string()]] = pa.Field( + nullable=True + ) class TabulatedRatingCurveTimeSchema(_BaseSchema): fid: Index[Int32] = pa.Field(default=1, check_name=True, coerce=True) - node_id: Series[Int32] = pa.Field(nullable=False, default=0) - time: Series[Timestamp] = pa.Field(nullable=False) - level: Series[float] = pa.Field(nullable=False) - flow_rate: Series[float] = pa.Field(nullable=False) - max_downstream_level: Series[float] = pa.Field(nullable=True) + node_id: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=False, default=0 + ) + time: Series[Annotated[pd.ArrowDtype, pyarrow.timestamp("ms")]] = pa.Field( + nullable=False + ) + level: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=False + ) + flow_rate: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=False + ) + max_downstream_level: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = ( + pa.Field(nullable=True) + ) class UserDemandStaticSchema(_BaseSchema): fid: Index[Int32] = pa.Field(default=1, check_name=True, coerce=True) - node_id: Series[Int32] = pa.Field(nullable=False, default=0) - active: Series[pa.BOOL] = pa.Field(nullable=True) - demand: Series[float] = pa.Field(nullable=True) - return_factor: Series[float] = pa.Field(nullable=False) - min_level: Series[float] = pa.Field(nullable=False) - priority: Series[Int32] = pa.Field(nullable=False, default=0) + node_id: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=False, default=0 + ) + active: Series[Annotated[pd.ArrowDtype, pyarrow.bool_()]] = pa.Field(nullable=True) + demand: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=True + ) + return_factor: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=False + ) + min_level: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=False + ) + priority: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=True + ) class UserDemandTimeSchema(_BaseSchema): fid: Index[Int32] = pa.Field(default=1, check_name=True, coerce=True) - node_id: Series[Int32] = pa.Field(nullable=False, default=0) - time: Series[Timestamp] = pa.Field(nullable=False) - demand: Series[float] = pa.Field(nullable=False) - return_factor: Series[float] = pa.Field(nullable=False) - min_level: Series[float] = pa.Field(nullable=False) - priority: Series[Int32] = pa.Field(nullable=False, default=0) + node_id: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=False, default=0 + ) + time: Series[Annotated[pd.ArrowDtype, pyarrow.timestamp("ms")]] = pa.Field( + nullable=False + ) + demand: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=False + ) + return_factor: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=False + ) + min_level: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=False + ) + priority: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=True + ) diff --git a/python/ribasim/ribasim/utils.py b/python/ribasim/ribasim/utils.py index 1b4dc78ca..42fd638b1 100644 --- a/python/ribasim/ribasim/utils.py +++ b/python/ribasim/ribasim/utils.py @@ -1,4 +1,5 @@ import re +from warnings import catch_warnings, filterwarnings import numpy as np import pandas as pd @@ -70,6 +71,20 @@ def _time_in_ns(df) -> None: df["time"] = df["time"].astype("datetime64[ns]") +def _concat(dfs, **kwargs): + """Concatenate DataFrames with a warning filter.""" + with catch_warnings(): + # The behavior of array concatenation with empty entries is deprecated. + # In a future version, this will no longer exclude empty items when determining + # the result dtype. To retain the old behavior, exclude the empty entries before + # the concat operation. + filterwarnings( + "ignore", + category=FutureWarning, + ) + return pd.concat(dfs, **kwargs) + + class UsedIDs(BaseModel): """A helper class to manage globally unique node IDs. diff --git a/python/ribasim/tests/test_io.py b/python/ribasim/tests/test_io.py index 65ab9aca3..0320dca87 100644 --- a/python/ribasim/tests/test_io.py +++ b/python/ribasim/tests/test_io.py @@ -10,7 +10,7 @@ from pandas.testing import assert_frame_equal from pydantic import ValidationError from ribasim import Model, Node, Solver -from ribasim.nodes import basin, pump, user_demand +from ribasim.nodes import basin, flow_boundary, flow_demand, pump, user_demand from ribasim.utils import UsedIDs from shapely.geometry import Point @@ -64,7 +64,7 @@ def test_basic_transient(basic_transient, tmp_path): time = model_loaded.basin.time assert model_orig.basin.time.df.time.iloc[0] == time.df.time.iloc[0] - assert time.df.node_id.dtype == np.int32 + assert time.df.node_id.dtype == "int32[pyarrow]" __assert_equal(model_orig.basin.time.df, time.df) assert time.df.shape == (1468, 6) @@ -332,3 +332,58 @@ def test_closed_model(basic, tmp_path): basic.write(toml_path) model = ribasim.Model.read(toml_path) model.write(toml_path) + + +def test_arrow_dtype(): + # Below millisecond precision is not supported + with pytest.raises(ValidationError): + flow_boundary.Time( + time=["2021-01-01 00:00:00.1234"], + flow_rate=np.ones(1), + ) + + # Extra columns don't get coerced to Arrow types + df = flow_boundary.Time( + time=["2021-01-01 00:00:00.123", "2021-01-01 00:00:00.456"], + flow_rate=[1, 2.2], + meta_obj=["foo", "bar"], + meta_str=pd.Series(["a", pd.NA], dtype="string[pyarrow]"), + ).df + + assert (df["node_id"] == 0).all() + assert df["node_id"].dtype == "int32[pyarrow]" + assert df["time"].dtype == "timestamp[ms][pyarrow]" + assert df["time"].dt.tz is None + assert df["time"].diff().iloc[1] == pd.Timedelta("333ms") + assert df["flow_rate"].dtype == "double[pyarrow]" + assert df["meta_obj"].dtype == object + assert df["meta_str"].dtype == "string[pyarrow]" + assert df["meta_str"].isna().iloc[1] + + # Check a string column that is part of the schema and a boolean column + df = pump.Static( + flow_rate=np.ones(2), + control_state=["foo", pd.NA], + active=[None, False], + ).df + + assert df["control_state"].dtype == "string[pyarrow]" + assert df["active"].dtype == "bool[pyarrow]" + assert df["active"].isna().iloc[0] + + # Optional integer column + df = flow_demand.Static( + demand=[1, 2.2], + priority=[1, pd.NA], + ).df + + assert df["priority"].dtype == "int32[pyarrow]" + assert df["priority"].isna().iloc[1] + + # Missing optional integer column + df = flow_demand.Static( + demand=[1, 2.2], + ).df + + assert df["priority"].dtype == "int32[pyarrow]" + assert df["priority"].isna().all() diff --git a/python/ribasim/tests/test_model.py b/python/ribasim/tests/test_model.py index 2a376bf87..ba703d5fa 100644 --- a/python/ribasim/tests/test_model.py +++ b/python/ribasim/tests/test_model.py @@ -83,9 +83,10 @@ def test_invalid_node_id(): def test_tabulated_rating_curve_model(tabulated_rating_curve, tmp_path): model_orig = tabulated_rating_curve - model_orig.set_crs(model_orig.crs) basin_area = tabulated_rating_curve.basin.area.df assert basin_area is not None + assert basin_area.crs == CRS.from_epsg(28992) + model_orig.set_crs(model_orig.crs) assert basin_area.geometry.geom_type.iloc[0] == "MultiPolygon" assert basin_area.crs == CRS.from_epsg(28992) model_orig.write(tmp_path / "tabulated_rating_curve/ribasim.toml") @@ -114,15 +115,15 @@ def test_write_adds_fid_in_tables(basic, tmp_path): model_orig.write(tmp_path / "basic/ribasim.toml") with connect(tmp_path / "basic/database.gpkg") as connection: query = f"select * from {esc_id('Basin / profile')}" - df = pd.read_sql_query(query, connection) + df = pd.read_sql_query(query, connection, dtype_backend="pyarrow") assert "fid" in df.columns query = "select node_id from Node" - df = pd.read_sql_query(query, connection) + df = pd.read_sql_query(query, connection, dtype_backend="pyarrow") assert "node_id" in df.columns query = "select edge_id from Edge" - df = pd.read_sql_query(query, connection) + df = pd.read_sql_query(query, connection, dtype_backend="pyarrow") assert "edge_id" in df.columns diff --git a/python/ribasim_testmodels/ribasim_testmodels/basic.py b/python/ribasim_testmodels/ribasim_testmodels/basic.py index d0451a1ae..f431150a7 100644 --- a/python/ribasim_testmodels/ribasim_testmodels/basic.py +++ b/python/ribasim_testmodels/ribasim_testmodels/basic.py @@ -290,13 +290,13 @@ def tabulated_rating_curve_model() -> ribasim.Model: [ tabulated_rating_curve.Time( time=[ - # test subsecond precision - pd.Timestamp("2020-01-01 00:00:00.000001"), - pd.Timestamp("2020-01"), - pd.Timestamp("2020-02"), - pd.Timestamp("2020-02"), - pd.Timestamp("2020-03"), - pd.Timestamp("2020-03"), + # test millisecond precision + pd.Timestamp("2020-01-01"), + pd.Timestamp("2020-01-01"), + pd.Timestamp("2020-02-01 00:00:00.001"), + pd.Timestamp("2020-02-01 00:00:00.001"), + pd.Timestamp("2020-03-01"), + pd.Timestamp("2020-03-01"), ], level=[0.0, 1.0, 0.0, 1.1, 0.0, 1.2], flow_rate=[0.0, 10 / 86400, 0.0, 10 / 86400, 0.0, 10 / 86400], diff --git a/python/ribasim_testmodels/ribasim_testmodels/continuous_control.py b/python/ribasim_testmodels/ribasim_testmodels/continuous_control.py index fa1427250..0da7fdca2 100644 --- a/python/ribasim_testmodels/ribasim_testmodels/continuous_control.py +++ b/python/ribasim_testmodels/ribasim_testmodels/continuous_control.py @@ -25,7 +25,9 @@ def outlet_continuous_control_model() -> Model: Node(1, Point(0, 0)), [ level_boundary.Time( - time=pd.date_range(start="2020-01-01", end="2021-01-01", periods=100), + time=pd.date_range( + start="2020-01-01", end="2021-01-01", periods=100, unit="ms" + ), level=6.0 + np.sin(np.linspace(0, 6 * np.pi, 100)), ) ], diff --git a/python/ribasim_testmodels/ribasim_testmodels/discrete_control.py b/python/ribasim_testmodels/ribasim_testmodels/discrete_control.py index 007056851..f86ecfb00 100644 --- a/python/ribasim_testmodels/ribasim_testmodels/discrete_control.py +++ b/python/ribasim_testmodels/ribasim_testmodels/discrete_control.py @@ -542,7 +542,9 @@ def concentration_condition_model() -> Model: basin.Profile(area=1000.0, level=[0.0, 1.0]), basin.State(level=[20.0]), basin.ConcentrationExternal( - time=pd.date_range(start="2020-01-01", end="2021-01-01", periods=100), + time=pd.date_range( + start="2020-01-01", end="2021-01-01", periods=100, unit="ms" + ), substance="kryptonite", concentration=np.sin(np.linspace(0, 6 * np.pi, 100)) ** 2, ), diff --git a/python/ribasim_testmodels/ribasim_testmodels/time.py b/python/ribasim_testmodels/ribasim_testmodels/time.py index 64e006d8a..b77f46c2f 100644 --- a/python/ribasim_testmodels/ribasim_testmodels/time.py +++ b/python/ribasim_testmodels/ribasim_testmodels/time.py @@ -20,8 +20,8 @@ def flow_boundary_time_model() -> Model: ) n_times = 100 - time = pd.date_range(start="2020-03-01", end="2020-10-01", periods=n_times).astype( - "datetime64[s]" + time = pd.date_range( + start="2020-03-01", end="2020-10-01", periods=n_times, unit="s" ) flow_rate = 1 + np.sin(np.pi * np.linspace(0, 0.5, n_times)) ** 2 diff --git a/utils/gen_python.jl b/utils/gen_python.jl index 969d56510..3226d5ac7 100644 --- a/utils/gen_python.jl +++ b/utils/gen_python.jl @@ -5,21 +5,17 @@ using Legolas using OteraEngine using Ribasim -pythontype(::Type{<:AbstractString}) = "Series[str]" -pythontype(::Type{<:Integer}) = "Series[Int32]" -pythontype(::Type{<:AbstractFloat}) = "Series[float]" -pythontype(::Type{<:Number}) = "Series[float]" -pythontype(::Type{<:Bool}) = "Series[pa.BOOL]" # pa.BOOL is a nullable boolean type, bool is not nullable -pythontype(::Type{<:Enum}) = "Series[str]" -pythontype(::Type{<:DateTime}) = "Series[Timestamp]" -pythontype(::Type{<:Any}) = "Series[Any]" -function pythontype(T::Union) - nonmissingtypes = filter(x -> x != Missing, Base.uniontypes(T)) - return join(map(pythontype, nonmissingtypes), " | ") -end +pythontype(::Type{Union{Missing, T}}) where {T} = pythontype(T) +pythontype(::Type{<:AbstractString}) = "Series[Annotated[pd.ArrowDtype, pyarrow.string()]]" +pythontype(::Type{<:Integer}) = "Series[Annotated[pd.ArrowDtype, pyarrow.int32()]]" +pythontype(::Type{<:AbstractFloat}) = "Series[Annotated[pd.ArrowDtype, pyarrow.float64()]]" +pythontype(::Type{<:Number}) = "Series[Annotated[pd.ArrowDtype, pyarrow.float64()]]" +pythontype(::Type{<:Bool}) = "Series[Annotated[pd.ArrowDtype, pyarrow.bool_()]]" +pythontype(::Type{<:Enum}) = "Series[Annotated[pd.ArrowDtype, pyarrow.string()]]" +pythontype(::Type{<:DateTime}) = "Series[Annotated[pd.ArrowDtype, pyarrow.timestamp('ms')]]" isnullable(_) = "False" -isnullable(T::Union) = typeintersect(T, Missing) == Missing ? "True" : "False" +isnullable(::Type{T}) where {T >: Union{Missing}} = "True" function strip_prefix(T::DataType) n = string(T) diff --git a/utils/templates/schemas.py.jinja b/utils/templates/schemas.py.jinja index 1f8ee5ef2..19b53c7ce 100644 --- a/utils/templates/schemas.py.jinja +++ b/utils/templates/schemas.py.jinja @@ -1,9 +1,11 @@ # Automatically generated file. Do not modify. -from typing import Any, Callable +from typing import Annotated, Any, Callable +import pandas as pd import pandera as pa -from pandera.dtypes import Int32, Timestamp +import pyarrow +from pandera.dtypes import Int32 from pandera.typing import Index, Series from ribasim import migrations @@ -28,7 +30,7 @@ class _BaseSchema(pa.DataFrameModel): class {{m[:name]}}Schema(_BaseSchema): fid: Index[Int32] = pa.Field(default=1, check_name=True, coerce=True) {% for f in m[:fields] %} - {% if (f[2] == "Series[Int32]") %} + {% if (f[1] == :node_id) %} {{ f[1] }}: {{ f[2] }} = pa.Field(nullable={{ f[3] }}, default=0) {% else %} {{ f[1] }}: {{ f[2] }} = pa.Field(nullable={{ f[3] }})