Skip to content

Commit

Permalink
Create empty Arrow tables
Browse files Browse the repository at this point in the history
  • Loading branch information
visr committed Jun 6, 2024
1 parent 006fbf4 commit 3f661db
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 12 deletions.
4 changes: 3 additions & 1 deletion core/src/model.jl
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,9 @@ function Model(config::Config)::Model
set_initial_allocation_mean_flows!(integrator)
end

return Model(integrator, config, saved)
model = Model(integrator, config, saved)
write_results(model) # check permissions
return model
end

"Get all saved times in seconds since start"
Expand Down
30 changes: 19 additions & 11 deletions core/src/write.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,36 +7,37 @@ function write_results(model::Model)::Model
(; config) = model
(; results) = model.config
compress = get_compressor(results)
init = model.integrator.t == 0

# basin
table = basin_table(model)
path = results_path(config, RESULTS_FILENAME.basin)
write_arrow(path, table, compress)
write_arrow(path, table, compress; init)

# flow
table = flow_table(model)
path = results_path(config, RESULTS_FILENAME.flow)
write_arrow(path, table, compress)
write_arrow(path, table, compress; init)

# discrete control
table = discrete_control_table(model)
path = results_path(config, RESULTS_FILENAME.control)
write_arrow(path, table, compress)
write_arrow(path, table, compress; init)

# allocation
table = allocation_table(model)
path = results_path(config, RESULTS_FILENAME.allocation)
write_arrow(path, table, compress)
write_arrow(path, table, compress; init)

# allocation flow
table = allocation_flow_table(model)
path = results_path(config, RESULTS_FILENAME.allocation_flow)
write_arrow(path, table, compress)
write_arrow(path, table, compress; init)

# exported levels
table = subgrid_level_table(model)
path = results_path(config, RESULTS_FILENAME.subgrid_level)
write_arrow(path, table, compress)
write_arrow(path, table, compress; init)

@debug "Wrote results."
return model
Expand Down Expand Up @@ -328,11 +329,13 @@ end
function write_arrow(
path::AbstractString,
table::NamedTuple,
compress::Union{ZstdCompressor, Nothing},
compress::Union{ZstdCompressor, Nothing};
init::Bool,
)::Nothing
# Don't write empty tables
if isempty(table.time)
# Avoid confusion with old files
# At the start of the simulation, write an empty table to ensure we have permissions
# and fail early.
# At the end of the simulation, write all non-empty tables, and remove existing empty ones.
if !init && isempty(table.time)
try
rm(path; force = true)
catch
Expand All @@ -345,7 +348,12 @@ function write_arrow(
table = merge(table, (; time = convert.(Arrow.DATETIME, table.time)))
metadata = ["ribasim_version" => string(pkgversion(Ribasim))]
mkpath(dirname(path))
Arrow.write(path, table; compress, metadata)
try
Arrow.write(path, table; compress, metadata)
catch
@error "Failed to write results, it may be locked." path
error("Failed to write results.")
end
return nothing
end

Expand Down

0 comments on commit 3f661db

Please sign in to comment.