Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use pyarrow dtype_backend #1781

Merged
merged 22 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions core/src/util.jl
visr marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -446,11 +446,10 @@ function get_all_priorities(db::DB, config::Config)::Vector{Int32}
(FlowDemandStaticV1, "FlowDemand / static"),
(FlowDemandTimeV1, "FlowDemand / time"),
]
if valid_priorities(
load_structvector(db, config, type).priority,
config.allocation.use_allocation,
)
union!(priorities, load_structvector(db, config, type).priority)
priority_col = load_structvector(db, config, type).priority
priority_col = Int32.(coalesce.(priority_col, Int32(0)))
if valid_priorities(priority_col, config.allocation.use_allocation)
union!(priorities, priority_col)
else
is_valid = false
@error "Missing priority parameter(s) for a $name node in the allocation problem."
Expand Down
10 changes: 5 additions & 5 deletions core/src/validation.jl
visr marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -673,10 +673,10 @@ function valid_sources(
return !errors
end

function valid_priorities(priorities::Vector, use_allocation::Bool)::Bool
errors = false
if 0 in priorities && use_allocation
errors = true
function valid_priorities(priorities::Vector{Int32}, use_allocation::Bool)::Bool
if use_allocation && any(iszero, priorities)
return false
else
return true
visr marked this conversation as resolved.
Show resolved Hide resolved
end
return !errors
end
4 changes: 1 addition & 3 deletions core/test/validation_test.jl
visr marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -460,11 +460,9 @@ end
normpath(@__DIR__, "../../generated_testmodels/invalid_priorities/ribasim.toml")
@test ispath(toml_path)

config = Ribasim.Config(toml_path; allocation_use_allocation = true)

logger = TestLogger()
with_logger(logger) do
@test_throws "Priority parameter is missing" Ribasim.run(config)
@test_throws "Priority parameter is missing" Ribasim.run(toml_path)
end
@test length(logger.logs) == 3
@test logger.logs[1].level == Error
Expand Down
38 changes: 27 additions & 11 deletions docs/guide/examples.ipynb
visr marked this conversation as resolved.
Show resolved Hide resolved
evetion marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,9 @@
"metadata": {},
"outputs": [],
"source": [
"df_basin = pd.read_feather(datadir / \"basic/results/basin.arrow\")\n",
"df_basin = pd.read_feather(\n",
" datadir / \"basic/results/basin.arrow\", dtype_backend=\"pyarrow\"\n",
")\n",
"df_basin_wide = df_basin.pivot_table(\n",
" index=\"time\", columns=\"node_id\", values=[\"storage\", \"level\"]\n",
")\n",
Expand All @@ -428,7 +430,7 @@
"metadata": {},
"outputs": [],
"source": [
"df_flow = pd.read_feather(datadir / \"basic/results/flow.arrow\")\n",
"df_flow = pd.read_feather(datadir / \"basic/results/flow.arrow\", dtype_backend=\"pyarrow\")\n",
"df_flow[\"edge\"] = list(zip(df_flow.from_node_id, df_flow.to_node_id))\n",
"df_flow[\"flow_m3d\"] = df_flow.flow_rate * 86400\n",
"ax = df_flow.pivot_table(index=\"time\", columns=\"edge\", values=\"flow_m3d\").plot()\n",
Expand Down Expand Up @@ -709,7 +711,9 @@
"metadata": {},
"outputs": [],
"source": [
"df_basin = pd.read_feather(datadir / \"level_range/results/basin.arrow\")\n",
"df_basin = pd.read_feather(\n",
" datadir / \"level_range/results/basin.arrow\", dtype_backend=\"pyarrow\"\n",
")\n",
"df_basin_wide = df_basin.pivot_table(\n",
" index=\"time\", columns=\"node_id\", values=[\"storage\", \"level\"]\n",
")\n",
Expand Down Expand Up @@ -991,7 +995,9 @@
"source": [
"from matplotlib.dates import date2num\n",
"\n",
"df_basin = pd.read_feather(datadir / \"pid_control/results/basin.arrow\")\n",
"df_basin = pd.read_feather(\n",
" datadir / \"pid_control/results/basin.arrow\", dtype_backend=\"pyarrow\"\n",
")\n",
"df_basin_wide = df_basin.pivot_table(\n",
" index=\"time\", columns=\"node_id\", values=[\"storage\", \"level\"]\n",
")\n",
Expand Down Expand Up @@ -1272,7 +1278,9 @@
"source": [
"import matplotlib.ticker as plticker\n",
"\n",
"df_allocation = pd.read_feather(datadir / \"allocation_example/results/allocation.arrow\")\n",
"df_allocation = pd.read_feather(\n",
" datadir / \"allocation_example/results/allocation.arrow\", dtype_backend=\"pyarrow\"\n",
")\n",
"df_allocation_wide = df_allocation.pivot_table(\n",
" index=\"time\",\n",
" columns=[\"node_type\", \"node_id\", \"priority\"],\n",
Expand Down Expand Up @@ -1318,7 +1326,9 @@
"metadata": {},
"outputs": [],
"source": [
"df_basin = pd.read_feather(datadir / \"allocation_example/results/basin.arrow\")\n",
"df_basin = pd.read_feather(\n",
" datadir / \"allocation_example/results/basin.arrow\", dtype_backend=\"pyarrow\"\n",
")\n",
"df_basin_wide = df_basin.pivot_table(\n",
" index=\"time\", columns=\"node_id\", values=[\"storage\", \"level\"]\n",
")\n",
Expand Down Expand Up @@ -1557,7 +1567,9 @@
"metadata": {},
"outputs": [],
"source": [
"df_basin = pd.read_feather(datadir / \"level_demand/results/basin.arrow\")\n",
"df_basin = pd.read_feather(\n",
" datadir / \"level_demand/results/basin.arrow\", dtype_backend=\"pyarrow\"\n",
")\n",
"df_basin = df_basin[df_basin.node_id == 2]\n",
"df_basin_wide = df_basin.pivot_table(\n",
" index=\"time\", columns=\"node_id\", values=[\"storage\", \"level\"]\n",
Expand Down Expand Up @@ -1953,7 +1965,7 @@
"outputs": [],
"source": [
"datadir_flow = datadir / \"local_pidcontrolled_cascade/results/flow.arrow\"\n",
"df_flow = pd.read_feather(datadir_flow)\n",
"df_flow = pd.read_feather(datadir_flow, dtype_backend=\"pyarrow\")\n",
"df_flow[\"edge\"] = list(zip(df_flow.from_node_id, df_flow.to_node_id))\n",
"df_flow[\"flow_m3d\"] = df_flow.flow_rate * 86400\n",
"\n",
Expand Down Expand Up @@ -1995,7 +2007,7 @@
"outputs": [],
"source": [
"datadir_basin = datadir / \"local_pidcontrolled_cascade/results/basin.arrow\"\n",
"df_basin = pd.read_feather(datadir_basin)\n",
"df_basin = pd.read_feather(datadir_basin, dtype_backend=\"pyarrow\")\n",
"df_basin[\"vertical_flux\"] = (\n",
" df_basin[\"precipitation\"]\n",
" - df_basin[\"evaporation\"]\n",
Expand Down Expand Up @@ -2060,7 +2072,9 @@
" Node(1, Point(0, 0)),\n",
" [\n",
" level_boundary.Time(\n",
" time=pd.date_range(start=\"2020-01-01\", end=\"2021-01-01\", periods=100),\n",
" time=pd.date_range(\n",
" start=\"2020-01-01\", end=\"2021-01-01\", periods=100, unit=\"ms\"\n",
" ),\n",
" level=6.0 + np.sin(np.linspace(0, 6 * np.pi, 100)),\n",
" )\n",
" ],\n",
Expand Down Expand Up @@ -2286,7 +2300,9 @@
"metadata": {},
"outputs": [],
"source": [
"df_flow = pd.read_feather(datadir / \"outlet_continuous_control/results/flow.arrow\")\n",
"df_flow = pd.read_feather(\n",
" datadir / \"outlet_continuous_control/results/flow.arrow\", dtype_backend=\"pyarrow\"\n",
")\n",
"fig, ax = plt.subplots()\n",
"\n",
"\n",
Expand Down
8 changes: 6 additions & 2 deletions docs/tutorial/irrigation-demand.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,9 @@
"metadata": {},
"outputs": [],
"source": [
"df_basin = pd.read_feather(base_dir / \"Crystal-2/results/basin.arrow\")\n",
"df_basin = pd.read_feather(\n",
" base_dir / \"Crystal-2/results/basin.arrow\", dtype_backend=\"pyarrow\"\n",
")\n",
"\n",
"# Create pivot tables and plot for basin data\n",
"df_basin_wide = df_basin.pivot_table(\n",
Expand Down Expand Up @@ -393,7 +395,9 @@
"metadata": {},
"outputs": [],
"source": [
"df_flow = pd.read_feather(base_dir / \"Crystal-2/results/flow.arrow\")\n",
"df_flow = pd.read_feather(\n",
" base_dir / \"Crystal-2/results/flow.arrow\", dtype_backend=\"pyarrow\"\n",
")\n",
"# Add the edge names and then remove unnamed edges\n",
"df_flow[\"name\"] = model.edge.df[\"name\"].loc[df_flow[\"edge_id\"]].to_numpy()\n",
"df_flow = df_flow[df_flow[\"name\"].astype(bool)]\n",
Expand Down
8 changes: 6 additions & 2 deletions docs/tutorial/natural-flow.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,9 @@
"metadata": {},
"outputs": [],
"source": [
"df_basin = pd.read_feather(base_dir / \"Crystal-1/results/basin.arrow\")\n",
"df_basin = pd.read_feather(\n",
" base_dir / \"Crystal-1/results/basin.arrow\", dtype_backend=\"pyarrow\"\n",
")\n",
"\n",
"# Create pivot tables and plot for Basin data\n",
"df_basin_wide = df_basin.pivot_table(\n",
Expand Down Expand Up @@ -453,7 +455,9 @@
"source": [
"# Plot flow data\n",
"# Read the flow results\n",
"df_flow = pd.read_feather(base_dir / \"Crystal-1/results/flow.arrow\")\n",
"df_flow = pd.read_feather(\n",
" base_dir / \"Crystal-1/results/flow.arrow\", dtype_backend=\"pyarrow\"\n",
")\n",
"# Add the edge names and then remove unnamed edges\n",
"df_flow[\"name\"] = model.edge.df[\"name\"].loc[df_flow[\"edge_id\"]].to_numpy()\n",
"df_flow = df_flow[df_flow[\"name\"].astype(bool)]\n",
Expand Down
8 changes: 6 additions & 2 deletions docs/tutorial/reservoir.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,9 @@
"metadata": {},
"outputs": [],
"source": [
"df_basin = pd.read_feather(base_dir / \"Crystal-3/results/basin.arrow\")\n",
"df_basin = pd.read_feather(\n",
" base_dir / \"Crystal-3/results/basin.arrow\", dtype_backend=\"pyarrow\"\n",
")\n",
"\n",
"# Create pivot tables and plot for Basin data\n",
"df_basin_wide = df_basin.pivot_table(\n",
Expand Down Expand Up @@ -329,7 +331,9 @@
"metadata": {},
"outputs": [],
"source": [
"df_flow = pd.read_feather(base_dir / \"Crystal-3/results/flow.arrow\")\n",
"df_flow = pd.read_feather(\n",
" base_dir / \"Crystal-3/results/flow.arrow\", dtype_backend=\"pyarrow\"\n",
")\n",
"# Add the edge names and then remove unnamed edges\n",
"df_flow[\"name\"] = model.edge.df[\"name\"].loc[df_flow[\"edge_id\"]].to_numpy()\n",
"df_flow = df_flow[df_flow[\"name\"].astype(bool)]\n",
Expand Down
2 changes: 2 additions & 0 deletions pixi.toml
visr marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ test-ribasim-regression = { cmd = "julia --project=core --eval 'using Pkg; Pkg.t
generate-testmodels = { cmd = "python utils/generate-testmodels.py", inputs = [
"python/ribasim",
"python/ribasim_testmodels",
"utils/generate-testmodels.py",
], outputs = [
"generated_testmodels",
] }
Expand All @@ -99,6 +100,7 @@ codegen = { cmd = "julia --project utils/gen_python.jl && ruff format python/rib
"initialize-julia",
], inputs = [
"core",
"utils",
], outputs = [
"python/ribasim/ribasim/schemas.py",
"python/ribasim/ribasim/validation.py",
Expand Down
21 changes: 13 additions & 8 deletions python/ribasim/ribasim/delwaq/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,18 @@
"""
bid = _boundary_name(data.node_id.iloc[0], boundary_type)
piv = (
data.pivot_table(index="time", columns="substance", values="concentration")
data.pivot_table(
index="time", columns="substance", values="concentration", fill_value=-999
)
.reset_index()
.reset_index(drop=True)
)
piv.time = piv.time.dt.strftime("%Y/%m/%d-%H:%M:%S")
# Convert Arrow time to Numpy to avoid needing tzdata somehow
piv.time = piv.time.astype("datetime64[ns]").dt.strftime("%Y/%m/%d-%H:%M:%S")

Check warning on line 80 in python/ribasim/ribasim/delwaq/generate.py

View check run for this annotation

Codecov / codecov/patch

python/ribasim/ribasim/delwaq/generate.py#L80

Added line #L80 was not covered by tests
boundary = {
"name": bid,
"substances": list(map(_quote, piv.columns[1:])),
"df": piv.to_string(
formatters={"time": _quote}, header=False, index=False, na_rep=-999
),
"df": piv.to_string(formatters={"time": _quote}, header=False, index=False),
}
evetion marked this conversation as resolved.
Show resolved Hide resolved
substances = data.substance.unique()
return boundary, substances
Expand Down Expand Up @@ -181,7 +182,7 @@
boundary_id -= 1
node_mapping[node_id] = boundary_id
else:
raise Exception("Found unexpected node $node_id in delwaq graph.")
raise Exception(f"Found unexpected node {node_id} in delwaq graph.")

Check warning on line 185 in python/ribasim/ribasim/delwaq/generate.py

View check run for this annotation

Codecov / codecov/patch

python/ribasim/ribasim/delwaq/generate.py#L185

Added line #L185 was not covered by tests

nx.relabel_nodes(G, node_mapping, copy=False)

Expand Down Expand Up @@ -281,8 +282,12 @@

# Read in model and results
model = ribasim.Model.read(toml_path)
basins = pd.read_feather(toml_path.parent / "results" / "basin.arrow")
flows = pd.read_feather(toml_path.parent / "results" / "flow.arrow")
basins = pd.read_feather(

Check warning on line 285 in python/ribasim/ribasim/delwaq/generate.py

View check run for this annotation

Codecov / codecov/patch

python/ribasim/ribasim/delwaq/generate.py#L285

Added line #L285 was not covered by tests
toml_path.parent / "results" / "basin.arrow", dtype_backend="pyarrow"
)
flows = pd.read_feather(

Check warning on line 288 in python/ribasim/ribasim/delwaq/generate.py

View check run for this annotation

Codecov / codecov/patch

python/ribasim/ribasim/delwaq/generate.py#L288

Added line #L288 was not covered by tests
toml_path.parent / "results" / "flow.arrow", dtype_backend="pyarrow"
)

output_folder.mkdir(exist_ok=True)

Expand Down
16 changes: 13 additions & 3 deletions python/ribasim/ribasim/input_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import geopandas as gpd
import numpy as np
import pandas as pd
import pyogrio
from pandera.typing import DataFrame
from pandera.typing.geopandas import GeoDataFrame
from pydantic import BaseModel as PydanticBaseModel
Expand Down Expand Up @@ -294,7 +295,10 @@ def _from_db(cls, path: Path, table: str) -> pd.DataFrame | None:
if exists(connection, table):
query = f"select * from {esc_id(table)}"
df = pd.read_sql_query(
query, connection, parse_dates={"time": {"format": "ISO8601"}}
query,
connection,
parse_dates={"time": {"format": "ISO8601"}},
visr marked this conversation as resolved.
Show resolved Hide resolved
dtype_backend="pyarrow",
)
df.set_index("fid", inplace=True)
else:
Expand All @@ -305,7 +309,7 @@ def _from_db(cls, path: Path, table: str) -> pd.DataFrame | None:
@classmethod
def _from_arrow(cls, path: Path) -> pd.DataFrame:
directory = context_file_loading.get().get("directory", Path("."))
return pd.read_feather(directory / path)
return pd.read_feather(directory / path, dtype_backend="pyarrow")

def sort(self):
"""Sort the table as required.
Expand Down Expand Up @@ -374,7 +378,13 @@ def _from_db(cls, path: Path, table: str):
with closing(connect(path)) as connection:
if exists(connection, table):
# pyogrio hardcodes fid name on reading
df = gpd.read_file(path, layer=table, fid_as_index=True)
df = pyogrio.read_dataframe(
path,
layer=table,
fid_as_index=True,
use_arrow=True,
arrow_to_pandas_kwargs={"types_mapper": pd.ArrowDtype},
)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only pyogrio supported this directly, geopandas not yet.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Best to place an inline comment to the specific kwarg that's not supported yet?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I misunderstood that all extra kwargs were passed from gpd.read_file to pyogrio.read_dataframe if engine="pyogrio", so I went back to gpd.read_file and added a comment on arrow_to_pandas_kwargs, which ends up in pyarrow, not pyogrio.

df.index.rename(cls.tableschema()._index_name(), inplace=True)
else:
df = None
Expand Down
11 changes: 8 additions & 3 deletions python/ribasim/ribasim/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,11 @@

def node_table(self) -> NodeTable:
"""Compute the full sorted NodeTable from all node types."""
df_chunks = [node.node.df for node in self._nodes()]
df_chunks = [
node.node.df
for node in self._nodes()
if node.node.df is not None and not node.node.df.empty
]
df = (
pd.concat(df_chunks)
if df_chunks
Expand Down Expand Up @@ -654,8 +658,8 @@
"perhaps the model needs to be run first."
)

basin_df = pd.read_feather(basin_path)
flow_df = pd.read_feather(flow_path)
basin_df = pd.read_feather(basin_path, dtype_backend="pyarrow")
flow_df = pd.read_feather(flow_path, dtype_backend="pyarrow")

Check warning on line 662 in python/ribasim/ribasim/model.py

View check run for this annotation

Codecov / codecov/patch

python/ribasim/ribasim/model.py#L661-L662

Added lines #L661 - L662 were not covered by tests
_time_in_ns(basin_df)
_time_in_ns(flow_df)

Expand Down Expand Up @@ -695,6 +699,7 @@
alloc_flow_df = pd.read_feather(
alloc_flow_path,
columns=["time", "edge_id", "flow_rate", "optimization_type", "priority"],
dtype_backend="pyarrow",
)
_time_in_ns(alloc_flow_df)

Expand Down
Loading
Loading