From fd151a81e86640e01810a2a21aaf65a54d082d43 Mon Sep 17 00:00:00 2001 From: Abel Soares Siqueira Date: Mon, 28 Oct 2024 17:21:49 +0100 Subject: [PATCH] Move incoming and outgoing flow computation to DuckDB Related to #895 --- src/create-model.jl | 3 ++ src/model-preparation.jl | 57 ++++++++++++++++---- src/tmp.jl | 114 ++++++++++++++++++++++++++++++++++++++- test/test-pipeline.jl | 1 + 4 files changed, 162 insertions(+), 13 deletions(-) diff --git a/src/create-model.jl b/src/create-model.jl index bc69a338..fe30cfa0 100644 --- a/src/create-model.jl +++ b/src/create-model.jl @@ -17,6 +17,7 @@ function create_model!(energy_problem; kwargs...) dataframes = energy_problem.dataframes sets = create_sets(graph, years) energy_problem.model = @timeit to "create_model" create_model( + energy_problem.db_connection, graph, sets, variables, @@ -44,6 +45,7 @@ end Create the energy model given the `graph`, `representative_periods`, dictionary of `dataframes` (created by [`construct_dataframes`](@ref)), timeframe, and groups. """ function create_model( + connection, graph, sets, variables, @@ -115,6 +117,7 @@ function create_model( incoming_flow_storage_inter_rp_balance, outgoing_flow_storage_inter_rp_balance, ) = add_expressions_to_dataframe!( + connection, dataframes, variables, model, diff --git a/src/model-preparation.jl b/src/model-preparation.jl index 960cfd7e..7777e4c7 100644 --- a/src/model-preparation.jl +++ b/src/model-preparation.jl @@ -62,8 +62,14 @@ function construct_dataframes( tmp_create_constraints_indices(connection) + # WIP: Can these queries be left undordered by the end of the refactor? # WIP: highest_in_out is not included in constraints_partition anymore - dataframes[:highest_in_out] = TulipaIO.get_table(connection, "cons_indices_highest_in_out") + dataframes[:highest_in_out] = + DuckDB.query( + connection, + "SELECT * FROM cons_indices_highest_in_out + ORDER BY asset, year, rep_period, time_block_start", + ) |> DataFrame dataframes[:highest_in_out].timesteps_block = map( r -> r[1]:r[2], zip( @@ -75,7 +81,12 @@ function construct_dataframes( # WIP: highest_in is not included in constraints_partition anymore tmp_create_constraints_indices(connection) - dataframes[:highest_in] = TulipaIO.get_table(connection, "cons_indices_highest_in") + dataframes[:highest_in] = + DuckDB.query( + connection, + "SELECT * FROM cons_indices_highest_in + ORDER BY asset, year, rep_period, time_block_start", + ) |> DataFrame dataframes[:highest_in].timesteps_block = map( r -> r[1]:r[2], zip(dataframes[:highest_in].time_block_start, dataframes[:highest_in].time_block_end), @@ -480,6 +491,7 @@ function add_expression_terms_inter_rp_constraints!( end function add_expressions_to_dataframe!( + connection, dataframes, variables, model, @@ -512,15 +524,38 @@ function add_expressions_to_dataframe!( use_highest_resolution = false, multiply_by_duration = true, ) - add_expression_terms_intra_rp_constraints!( - dataframes[:highest_in_out], - dataframes[:flows], - expression_workspace, - representative_periods, - graph; - use_highest_resolution = true, - multiply_by_duration = false, - ) + # WIP: Changing the function below + # add_expression_terms_intra_rp_constraints!( + # dataframes[:highest_in_out], + # dataframes[:flows], + # expression_workspace, + # representative_periods, + # graph; + # use_highest_resolution = true, + # multiply_by_duration = false, + # ) + dataframes[:highest_in_out].incoming_flow = JuMP.AffExpr[ + if length(row.indices) > 0 + sum(unique(variables[:flow].container[row.indices])) + else + JuMP.AffExpr(0.0) + end for row in DuckDB.query( + connection, + "SELECT * FROM highest_in_out_incoming + ORDER BY asset, year, rep_period, time_block_start", + ) + ] + dataframes[:highest_in_out].outgoing_flow = JuMP.AffExpr[ + if length(row.indices) > 0 + sum(unique(variables[:flow].container[row.indices])) + else + JuMP.AffExpr(0.0) + end for row in DuckDB.query( + connection, + "SELECT * FROM highest_in_out_outgoing + ORDER BY asset, year, rep_period, time_block_start", + ) + ] add_expression_terms_intra_rp_constraints!( dataframes[:highest_in], dataframes[:flows], diff --git a/src/tmp.jl b/src/tmp.jl index fa9e2749..0763817a 100644 --- a/src/tmp.jl +++ b/src/tmp.jl @@ -124,7 +124,7 @@ function tmp_create_partition_tables(connection) DBInterface.execute( connection, - "CREATE OR REPLACE TABLE flow_time_resolution( + "CREATE OR REPLACE TEMP TABLE t_flow_time_resolution( from_asset STRING, to_asset STRING, year INT, @@ -135,7 +135,7 @@ function tmp_create_partition_tables(connection) )", ) - appender = DuckDB.Appender(connection, "flow_time_resolution") + appender = DuckDB.Appender(connection, "t_flow_time_resolution") for row in TulipaIO.get_table(Val(:raw), connection, "explicit_flows_rep_periods_partitions") durations = if row.specification == "uniform" step = parse(Int, row.partition) @@ -157,6 +157,17 @@ function tmp_create_partition_tables(connection) _append_given_durations(appender, row, durations) end DuckDB.close(appender) + + DBInterface.execute( + connection, + "CREATE OR REPLACE TABLE flow_time_resolution + AS + SELECT + *, + row_number() OVER (ORDER BY from_asset, to_asset, year, rep_period, time_block_start) AS variable_index + FROM t_flow_time_resolution + ", + ) end function tmp_create_constraints_indices(connection) @@ -265,6 +276,9 @@ function tmp_create_constraints_indices(connection) WHERE main.type in ('storage') ", ) + + # WIP: For now, the incoming and outgoing expressions are being computed here + tmp_create_expressions(connection) end """ @@ -303,3 +317,99 @@ function tmp_example_of_flow_expression_problem() return connection, ep end + +function tmp_create_expressions(connection) + DuckDB.execute( + connection, + "CREATE OR REPLACE TABLE t_incoming_nonzero AS + SELECT + t_flows.from_asset, + t_flows.to_asset, + t_flows.year, + t_flows.rep_period, + t_flows.variable_index, + t_cons.time_block_start, + greatest(0, 1 + + least(t_flows.time_block_end,t_cons.time_block_end) + - greatest(t_flows.time_block_start,t_cons.time_block_start)) AS duration + FROM flow_time_resolution AS t_flows + INNER JOIN cons_indices_highest_in_out AS t_cons + ON t_flows.to_asset=t_cons.asset + AND t_flows.year=t_cons.year + AND t_flows.rep_period=t_cons.rep_period + WHERE duration > 0", + ) + + DuckDB.execute( + connection, + "CREATE OR REPLACE TABLE t_outgoing_nonzero AS + SELECT + t_flows.from_asset, + t_flows.to_asset, + t_flows.year, + t_flows.rep_period, + t_flows.variable_index, + t_cons.time_block_start, + greatest(0, 1 + + least(t_flows.time_block_end,t_cons.time_block_end) + - greatest(t_flows.time_block_start,t_cons.time_block_start)) AS duration + FROM flow_time_resolution AS t_flows + INNER JOIN cons_indices_highest_in_out AS t_cons + ON t_flows.from_asset=t_cons.asset + AND t_flows.year=t_cons.year + AND t_flows.rep_period=t_cons.rep_period + WHERE duration > 0", + ) + + DuckDB.execute( + connection, + "CREATE OR REPLACE TABLE highest_in_out_incoming AS + SELECT + t1.asset, t1.year, t1.rep_period, t1.time_block_start, + COALESCE(indices, []) AS indices, + COALESCE(durations, []) AS durations, + FROM cons_indices_highest_in_out AS t1 + LEFT JOIN ( + SELECT + to_asset as asset, + year, + rep_period, + time_block_start, + array_agg(variable_index) AS indices, + array_agg(duration) AS durations, + FROM t_incoming_nonzero + GROUP BY to_asset, year, rep_period, time_block_start + ) AS t2 + ON t1.asset=t2.asset + AND t1.year=t2.year + AND t1.rep_period=t2.rep_period + AND t1.time_block_start=t2.time_block_start + ", + ) + + DuckDB.execute( + connection, + "CREATE OR REPLACE TABLE highest_in_out_outgoing AS + SELECT + t1.asset, t1.year, t1.rep_period, t1.time_block_start, + COALESCE(indices, []) AS indices, + COALESCE(durations, []) AS durations, + FROM cons_indices_highest_in_out AS t1 + LEFT JOIN ( + SELECT + from_asset as asset, + year, + rep_period, + time_block_start, + array_agg(variable_index) AS indices, + array_agg(duration) AS durations, + FROM t_outgoing_nonzero + GROUP BY from_asset, year, rep_period, time_block_start + ) AS t2 + ON t1.asset=t2.asset + AND t1.year=t2.year + AND t1.rep_period=t2.rep_period + AND t1.time_block_start=t2.time_block_start + ", + ) +end diff --git a/test/test-pipeline.jl b/test/test-pipeline.jl index 4e09f12c..f331739e 100644 --- a/test/test-pipeline.jl +++ b/test/test-pipeline.jl @@ -36,6 +36,7 @@ end # Create model model = create_model( + connection, graph, sets, variables,