From 7c7f32f741d70de5b3a907ee049e4a935a4b0afa Mon Sep 17 00:00:00 2001
From: Rory McKinley <rorymckinley@gmail.com>
Date: Thu, 2 Nov 2023 11:02:51 +0200
Subject: [PATCH 1/2] Partition work_orders

---
 CHANGELOG.md                                  |   2 +
 config/runtime.exs                            |   6 +-
 lib/lightning/application.ex                  |  10 +
 lib/lightning/maintenance/admin_tools.ex      |  17 +
 .../maintenance/partition_table_service.ex    | 182 +++++++
 mix.exs                                       |   5 +-
 ...1114452_create_partitioned_work_orders.exs | 201 +++++++
 .../maintenance/admin_tools_test.exs          |  63 +++
 .../partition_table_service_test.exs          | 495 ++++++++++++++++++
 9 files changed, 979 insertions(+), 2 deletions(-)
 create mode 100644 lib/lightning/maintenance/admin_tools.ex
 create mode 100644 lib/lightning/maintenance/partition_table_service.ex
 create mode 100644 priv/repo/migrations/20231101114452_create_partitioned_work_orders.exs
 create mode 100644 test/lightning/maintenance/admin_tools_test.exs
 create mode 100644 test/lightning/maintenance/partition_table_service_test.exs

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 7c5a7a42e5..e469007d27 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -141,6 +141,8 @@ bearing with us as we move towards our first stable Lightning release.)
   [#1327](https://github.com/OpenFn/Lightning/issues/1327)
 - Have user create workflow name before moving to the canvas
   [#1103](https://github.com/OpenFn/Lightning/issues/1103)
+- Partition `work_orders` by week and year and add functionality to maintain
+  the partitions.  [#1254](https://github.com/OpenFn/Lightning/issues/1254)
 
 ### Changed
 
diff --git a/config/runtime.exs b/config/runtime.exs
index 211a24b573..a6d7be8c9f 100644
--- a/config/runtime.exs
+++ b/config/runtime.exs
@@ -97,7 +97,11 @@ base_oban_cron = [
   {"0 10 * * 1", Lightning.DigestEmailWorker,
    args: %{"type" => "weekly_project_digest"}},
   {"0 10 1 * *", Lightning.DigestEmailWorker,
-   args: %{"type" => "monthly_project_digest"}}
+   args: %{"type" => "monthly_project_digest"}},
+  {"0 1 * * *", Lightning.PartitionTableService,
+   args: %{"add_headroom" => %{"weeks" => 2}}},
+  {"0 0 * * *", Lightning.PartitionTableService,
+   args: %{"drop_older_than" => %{"weeks" => -6}}}
 ]
 
 conditional_cron =
diff --git a/lib/lightning/application.ex b/lib/lightning/application.ex
index 8d196a89a2..75cac71438 100644
--- a/lib/lightning/application.ex
+++ b/lib/lightning/application.ex
@@ -131,4 +131,14 @@ defmodule Lightning.Application do
   def oban_opts() do
     Application.get_env(:lightning, Oban)
   end
+
+  @impl true
+  @doc """
+  Perform any idempotent database setup that must be done after the repo is started.
+  """
+  def start_phase(:ensure_db_config, :normal, _opts) do
+    Lightning.PartitionTableService.add_headroom(:all, 2)
+    Lightning.PartitionTableService.add_headroom(:all, -5)
+    :ok
+  end
 end
diff --git a/lib/lightning/maintenance/admin_tools.ex b/lib/lightning/maintenance/admin_tools.ex
new file mode 100644
index 0000000000..305ad176ec
--- /dev/null
+++ b/lib/lightning/maintenance/admin_tools.ex
@@ -0,0 +1,17 @@
+defmodule Lightning.AdminTools do
+  def generate_iso_weeks(start_date, end_date) do
+    Date.range(start_date, end_date)
+    |> Enum.map(&Timex.beginning_of_week(&1, :mon))
+    |> Enum.uniq()
+    |> Enum.map(fn date ->
+      {year, week} = Timex.iso_week(date)
+
+      {
+        year |> Integer.to_string(),
+        week |> Integer.to_string() |> String.pad_leading(2, "0"),
+        date |> Date.to_string(),
+        date |> Timex.shift(weeks: 1) |> Date.to_string()
+      }
+    end)
+  end
+end
diff --git a/lib/lightning/maintenance/partition_table_service.ex b/lib/lightning/maintenance/partition_table_service.ex
new file mode 100644
index 0000000000..baf7b4f859
--- /dev/null
+++ b/lib/lightning/maintenance/partition_table_service.ex
@@ -0,0 +1,182 @@
+defmodule Lightning.PartitionTableService do
+  @moduledoc """
+  Service to keep the partition tables up to date.
+  """
+
+  use Oban.Worker,
+    queue: :background,
+    max_attempts: 1
+
+  import Ecto.Query
+
+  alias Lightning.Repo
+
+  require Logger
+
+  @impl Oban.Worker
+  def perform(%Oban.Job{args: %{"add_headroom" => %{"weeks" => weeks}}})
+      when is_integer(weeks) do
+    add_headroom(:all, weeks)
+  end
+
+  @impl Oban.Worker
+  def perform(%Oban.Job{args: %{"drop_older_than" => %{"weeks" => weeks}}})
+      when is_integer(weeks) do
+    upper_bound = Timex.shift(DateTime.utc_now(), weeks: weeks)
+
+    remove_empty("work_orders", upper_bound)
+  end
+
+  def add_headroom(:all, weeks) when is_integer(weeks) do
+    add_headroom(:work_orders, weeks) |> log_partition_creation()
+  end
+
+  def add_headroom(:work_orders, weeks) when is_integer(weeks) do
+    proposed_tables = tables_to_add("work_orders", weeks)
+
+    :ok =
+      Enum.each(proposed_tables, fn {partition_name, from, to} ->
+        {
+          Repo.query(create_query(partition_name, "work_orders", from, to))
+        }
+      end)
+
+    proposed_tables
+  end
+
+  def tables_to_add(table, weeks) do
+    today = Date.utc_today()
+
+    existing_tables = get_partitions(table)
+
+    Lightning.AdminTools.generate_iso_weeks(today, today |> Date.add(weeks * 7))
+    |> Enum.map(&to_partition_details(table, &1))
+    |> Enum.reject(fn {name, _from, _to} ->
+      Enum.find(existing_tables, &String.equivalent?(name, &1))
+    end)
+  end
+
+  def get_partitions(parent) do
+    %Postgrex.Result{rows: rows} =
+      Repo.query!(
+        ~S[
+          SELECT CAST(inhrelid::regclass AS text) AS child
+          FROM   pg_catalog.pg_inherits
+          WHERE  inhparent = $1::text::regclass;
+        ],
+        [parent]
+      )
+
+    rows |> List.flatten()
+  end
+
+  @doc """
+  Drops empty partition tables that have an upper partition bound less than the
+  date given.
+
+  This bound is the `TO` part of the partition:
+
+  ```
+  FOR VALUES FROM ('2020-12-28 00:00:00') TO ('2021-01-04 00:00:00')
+  ```
+  """
+  def remove_empty(parent, upper_bound) do
+    parent
+    |> find_range_partitions
+    |> partitions_older_than(upper_bound)
+    |> Enum.each(&drop_empty_partition(parent, &1))
+  end
+
+  def find_range_partitions(parent) do
+    Repo.query!(
+      ~S[
+        SELECT
+          pt.relname AS partition_name,
+          pg_get_expr(pt.relpartbound,
+          pt.oid,
+          TRUE) AS partition_expression
+        FROM
+          pg_class base_tb
+        JOIN pg_inherits i ON
+          i.inhparent = base_tb.oid
+        JOIN pg_class pt ON
+          pt.oid = i.inhrelid
+        WHERE
+          base_tb.oid = $1::text::regclass
+          AND pg_get_expr(
+            pt.relpartbound,
+            pt.oid,
+            TRUE
+          ) != 'DEFAULT'
+      ],
+      [parent]
+    ).rows
+  end
+
+  def partitions_older_than(partitions, bound) do
+    partitions
+    |> Enum.map(fn [table, range_expression] ->
+      [_, to_as_string] =
+        ~r/TO \('(.+)'\)/
+        |> Regex.run(range_expression)
+
+      {:ok, to_as_dt, _} = DateTime.from_iso8601(to_as_string <> "Z")
+
+      [table, to_as_dt]
+    end)
+    |> Enum.filter(fn [_table, to] -> DateTime.compare(to, bound) == :lt end)
+    |> Enum.map(fn [table, _to] -> table end)
+  end
+
+  def drop_empty_partition(parent, partition) do
+    unless valid_chars?(parent) && valid_chars?(partition) do
+      raise ArgumentError, message: "Table name contains invalid characters"
+    end
+
+    partition
+    |> partition_empty?
+    |> handle_drop(parent, partition)
+  end
+
+  defp valid_chars?(table_name) do
+    table_name =~ ~r/\A\w+\z/
+  end
+
+  defp partition_empty?(partition) do
+    from(r in partition, select: count()) |> Repo.one!() == 0
+  end
+
+  defp handle_drop(true, parent, partition) do
+    Logger.info("Detaching #{partition} from #{parent}")
+    Repo.query!("ALTER TABLE #{parent} DETACH PARTITION #{partition};")
+    Logger.info("Dropping #{partition}")
+    Repo.query!("DROP TABLE #{partition};")
+  end
+
+  defp handle_drop(false, _parent, _partition) do
+  end
+
+  defp create_query(partition, parent, from, to) do
+    """
+    CREATE TABLE #{partition}
+      PARTITION OF #{parent}
+        FOR VALUES FROM ('#{from}') TO ('#{to}');
+    """
+  end
+
+  defp to_partition_details(table, {year, week, from, to}) do
+    {"#{table}_#{year}_#{week}", from, to}
+  end
+
+  defp log_partition_creation(partitions) when length(partitions) > 0 do
+    partitions
+    |> Enum.map_join("\n", fn {partition_name, from, to} ->
+      "Created #{partition_name} for #{from} -> #{to}"
+    end)
+    |> Logger.info()
+  end
+
+  defp log_partition_creation(partitions) when partitions == [] do
+    Logger.info("No extra partitions were needed.")
+  end
+end
diff --git a/mix.exs b/mix.exs
index 9b3c4c8d5c..96e4c5979d 100644
--- a/mix.exs
+++ b/mix.exs
@@ -39,7 +39,10 @@ defmodule Lightning.MixProject do
   def application do
     [
       mod: {Lightning.Application, [:timex]},
-      extra_applications: [:logger, :runtime_tools, :os_mon]
+      extra_applications: [:logger, :runtime_tools, :os_mon],
+      start_phases: [
+        ensure_db_config: []
+      ]
     ]
   end
 
diff --git a/priv/repo/migrations/20231101114452_create_partitioned_work_orders.exs b/priv/repo/migrations/20231101114452_create_partitioned_work_orders.exs
new file mode 100644
index 0000000000..0f536b40ad
--- /dev/null
+++ b/priv/repo/migrations/20231101114452_create_partitioned_work_orders.exs
@@ -0,0 +1,201 @@
+defmodule Lightning.Repo.Migrations.CreatePartitionedWorkOrders do
+  use Ecto.Migration
+
+  def up do
+    execute("""
+    ALTER TABLE work_orders
+    RENAME TO work_orders_monolith
+    """)
+
+    execute("""
+    ALTER TABLE work_orders_monolith
+    RENAME CONSTRAINT work_orders_pkey TO work_orders_monolith_pkey
+    """)
+
+    execute("""
+    ALTER INDEX work_orders_reason_id_index RENAME TO work_orders_monolith_reason_id_index
+    """)
+
+    execute("""
+    ALTER INDEX work_orders_state_index RENAME TO work_orders_monolith_state_index
+    """)
+
+    execute("""
+    ALTER INDEX work_orders_workflow_id_index RENAME TO work_orders_monolith_workflow_id_index
+    """)
+
+    execute("""
+    ALTER TABLE work_orders_monolith
+    RENAME CONSTRAINT work_orders_dataclip_id_fkey TO work_orders_monolith_dataclip_id_fkey
+    """)
+
+    execute("""
+    ALTER TABLE work_orders_monolith
+    RENAME CONSTRAINT work_orders_reason_id_fkey TO work_orders_monolith_reason_id_fkey
+    """)
+
+    execute("""
+    ALTER TABLE work_orders_monolith
+    RENAME CONSTRAINT work_orders_trigger_id_fkey TO work_orders_monolith_trigger_id_fkey
+    """)
+
+    execute("""
+    ALTER TABLE work_orders_monolith
+    RENAME CONSTRAINT work_orders_workflow_id_fkey TO work_orders_monolith_workflow_id_fkey
+    """)
+
+    execute("""
+    CREATE TABLE work_orders (
+    id uuid NOT NULL,
+    workflow_id uuid NOT NULL,
+    reason_id uuid,
+    inserted_at timestamp without time zone NOT NULL,
+    updated_at timestamp without time zone NOT NULL,
+    trigger_id uuid,
+    dataclip_id uuid,
+    state character varying(255) DEFAULT 'pending'::character varying NOT NULL,
+    last_activity timestamp without time zone,
+    CONSTRAINT work_orders_pkey PRIMARY KEY (inserted_at, id)
+    ) PARTITION BY RANGE (inserted_at)
+    """)
+
+    Lightning.AdminTools.generate_iso_weeks(~D[2023-01-02], ~D[2024-01-29])
+    |> Enum.each(fn {year, wnum, from, to} ->
+      execute("""
+      CREATE TABLE work_orders_#{year}_#{wnum}
+        PARTITION OF work_orders
+          FOR VALUES FROM ('#{from}') TO ('#{to}')
+      """)
+    end)
+
+    execute("""
+    CREATE TABLE work_orders_default
+    PARTITION OF work_orders
+    DEFAULT
+    """)
+
+    execute("""
+    INSERT INTO work_orders
+    SELECT *
+    FROM work_orders_monolith
+    """)
+
+    execute("""
+    CREATE INDEX work_orders_id_index
+    ON work_orders USING hash (id)
+    """)
+
+    execute("""
+    CREATE INDEX work_orders_reason_id_index
+    ON work_orders USING btree (reason_id)
+    """)
+
+    execute("""
+    CREATE INDEX work_orders_state_index
+    ON work_orders USING btree (state);
+    """)
+
+    execute("""
+    CREATE INDEX work_orders_workflow_id_index
+    ON work_orders USING btree (workflow_id)
+    """)
+
+    execute("""
+    ALTER TABLE work_orders
+    ADD CONSTRAINT work_orders_dataclip_id_fkey
+    FOREIGN KEY (dataclip_id)
+    REFERENCES dataclips(id)
+    ON DELETE SET NULL
+    """)
+
+    execute("""
+    ALTER TABLE work_orders
+    ADD CONSTRAINT work_orders_reason_id_fkey
+    FOREIGN KEY (reason_id)
+    REFERENCES invocation_reasons(id)
+    """)
+
+    execute("""
+    ALTER TABLE work_orders
+    ADD CONSTRAINT work_orders_trigger_id_fkey
+    FOREIGN KEY (trigger_id)
+    REFERENCES triggers(id)
+    ON DELETE SET NULL;
+    """)
+
+    execute("""
+    ALTER TABLE work_orders
+    ADD CONSTRAINT work_orders_workflow_id_fkey
+    FOREIGN KEY (workflow_id)
+    REFERENCES workflows(id)
+    ON DELETE CASCADE;
+    """)
+  end
+
+  def down do
+    Lightning.AdminTools.generate_iso_weeks(~D[2023-01-02], ~D[2024-01-29])
+    |> Enum.each(fn {year, wnum, _from, _to} ->
+      execute("""
+      ALTER TABLE work_orders DETACH PARTITION work_orders_#{year}_#{wnum}
+      """)
+
+      execute("""
+      DROP TABLE work_orders_#{year}_#{wnum}
+      """)
+    end)
+
+    execute("""
+    ALTER TABLE work_orders DETACH PARTITION work_orders_default
+    """)
+
+    execute("""
+    DROP TABLE work_orders_default
+    """)
+
+    execute("""
+    DROP TABLE IF EXISTS work_orders
+    """)
+
+    execute("""
+    ALTER TABLE work_orders_monolith
+    RENAME TO work_orders
+    """)
+
+    execute("""
+    ALTER TABLE work_orders
+    RENAME CONSTRAINT work_orders_monolith_pkey TO work_orders_pkey
+    """)
+
+    execute("""
+    ALTER INDEX work_orders_monolith_reason_id_index RENAME TO work_orders_reason_id_index
+    """)
+
+    execute("""
+    ALTER INDEX work_orders_monolith_state_index RENAME TO work_orders_state_index
+    """)
+
+    execute("""
+    ALTER INDEX work_orders_monolith_workflow_id_index RENAME TO work_orders_workflow_id_index
+    """)
+
+    execute("""
+    ALTER TABLE work_orders
+    RENAME CONSTRAINT work_orders_monolith_dataclip_id_fkey TO work_orders_dataclip_id_fkey
+    """)
+
+    execute("""
+    ALTER TABLE work_orders
+    RENAME CONSTRAINT work_orders_monolith_reason_id_fkey TO work_orders_reason_id_fkey
+    """)
+
+    execute("""
+    ALTER TABLE work_orders
+    RENAME CONSTRAINT work_orders_monolith_trigger_id_fkey TO work_orders_trigger_id_fkey
+    """)
+
+    execute("""
+    ALTER TABLE work_orders
+    RENAME CONSTRAINT work_orders_monolith_workflow_id_fkey TO work_orders_workflow_id_fkey
+    """)
+  end
+end
diff --git a/test/lightning/maintenance/admin_tools_test.exs b/test/lightning/maintenance/admin_tools_test.exs
new file mode 100644
index 0000000000..705c60189e
--- /dev/null
+++ b/test/lightning/maintenance/admin_tools_test.exs
@@ -0,0 +1,63 @@
+defmodule Lightning.AdminToolstest do
+  use ExUnit.Case, async: true
+
+  alias Lightning.AdminTools
+
+  describe "generate_iso_weeks" do
+    test "returns list of weeks when both dates are Mondays" do
+      expected_weeks = [
+        {"2023", "08", "2023-02-20", "2023-02-27"},
+        {"2023", "09", "2023-02-27", "2023-03-06"},
+        {"2023", "10", "2023-03-06", "2023-03-13"},
+        {"2023", "11", "2023-03-13", "2023-03-20"},
+        {"2023", "12", "2023-03-20", "2023-03-27"}
+      ]
+
+      weeks = AdminTools.generate_iso_weeks(~D[2023-02-20], ~D[2023-03-20])
+
+      assert weeks == expected_weeks
+    end
+
+    test "returns list of weeks when start date is a Monday" do
+      expected_weeks = [
+        {"2023", "08", "2023-02-20", "2023-02-27"},
+        {"2023", "09", "2023-02-27", "2023-03-06"},
+        {"2023", "10", "2023-03-06", "2023-03-13"},
+        {"2023", "11", "2023-03-13", "2023-03-20"},
+        {"2023", "12", "2023-03-20", "2023-03-27"}
+      ]
+
+      weeks = AdminTools.generate_iso_weeks(~D[2023-02-20], ~D[2023-03-23])
+
+      assert weeks == expected_weeks
+    end
+
+    test "returns list of weeks when end date is a Monday" do
+      expected_weeks = [
+        {"2023", "08", "2023-02-20", "2023-02-27"},
+        {"2023", "09", "2023-02-27", "2023-03-06"},
+        {"2023", "10", "2023-03-06", "2023-03-13"},
+        {"2023", "11", "2023-03-13", "2023-03-20"},
+        {"2023", "12", "2023-03-20", "2023-03-27"}
+      ]
+
+      weeks = AdminTools.generate_iso_weeks(~D[2023-02-22], ~D[2023-03-20])
+
+      assert weeks == expected_weeks
+    end
+
+    test "returns list of weeks when neither day is a Monday" do
+      expected_weeks = [
+        {"2023", "08", "2023-02-20", "2023-02-27"},
+        {"2023", "09", "2023-02-27", "2023-03-06"},
+        {"2023", "10", "2023-03-06", "2023-03-13"},
+        {"2023", "11", "2023-03-13", "2023-03-20"},
+        {"2023", "12", "2023-03-20", "2023-03-27"}
+      ]
+
+      weeks = AdminTools.generate_iso_weeks(~D[2023-02-22], ~D[2023-03-23])
+
+      assert weeks == expected_weeks
+    end
+  end
+end
diff --git a/test/lightning/maintenance/partition_table_service_test.exs b/test/lightning/maintenance/partition_table_service_test.exs
new file mode 100644
index 0000000000..26af2b923c
--- /dev/null
+++ b/test/lightning/maintenance/partition_table_service_test.exs
@@ -0,0 +1,495 @@
+defmodule Lightning.PartitionTableServiceTest do
+  use Lightning.DataCase, async: false
+
+  import Lightning.Factories
+
+  alias Lightning.PartitionTableService, as: Service
+
+  describe "perform" do
+    test "adds additional partitions" do
+      parent = "work_orders"
+
+      drop_range_partitions(parent)
+
+      add_partitions(parent)
+
+      now = DateTime.utc_now()
+
+      new_partitions =
+        [
+          now |> build_partition_name(parent),
+          now |> date_with_offset(1) |> build_partition_name(parent),
+          now |> date_with_offset(2) |> build_partition_name(parent)
+        ]
+
+      expected = modified_relations(new_partitions, all_relations())
+
+      Service.perform(%Oban.Job{
+        args: %{"add_headroom" => %{"weeks" => 2}}
+      })
+
+      assert all_relations() == expected
+    end
+
+    test "removes obsolete partitions" do
+      parent = "work_orders"
+
+      now = DateTime.now!("Etc/UTC")
+
+      drop_range_partitions(parent)
+
+      new_range_partitions =
+        -2..3
+        |> generate_partition_properties(now)
+        |> Enum.map(&partition_name(&1, parent))
+
+      new_partitions = ["#{parent}_default" | new_range_partitions]
+
+      expected = modified_relations(new_partitions, all_relations())
+
+      generate_partitions(-6..3, now, parent)
+
+      Service.perform(%Oban.Job{
+        args: %{"drop_older_than" => %{"weeks" => -2}}
+      })
+
+      assert all_relations() == expected
+    end
+  end
+
+  test "gets a list of partitions for a given parent" do
+    parent = "work_orders"
+
+    drop_range_partitions(parent)
+
+    add_partitions(parent)
+
+    expected = [
+      "#{parent}_2023_01",
+      "#{parent}_2023_02",
+      "#{parent}_2023_03",
+      "#{parent}_default"
+    ]
+
+    assert Service.get_partitions("work_orders") |> Enum.sort() == expected
+  end
+
+  test "tables_to_add returns tables that do not already exist" do
+    now = DateTime.now!("Etc/UTC")
+
+    parent = "work_orders"
+
+    drop_range_partitions(parent)
+
+    existing_partition_properties =
+      0..3
+      |> Enum.map(&date_with_offset(now, &1))
+      |> generate_partition_properties()
+
+    generate_partitions(existing_partition_properties, parent)
+
+    expected_additional_partitions =
+      4..6
+      |> Enum.map(&date_with_offset(now, &1))
+      |> generate_partition_properties()
+      |> Enum.map(fn properties ->
+        {_, _, from, to} = properties
+
+        {
+          partition_name(properties, parent),
+          from |> DateTime.to_date() |> Date.to_string(),
+          to |> DateTime.to_date() |> Date.to_string()
+        }
+      end)
+      |> Enum.sort_by(fn {a, _, _} -> a end, :asc)
+
+    proposed_partitions = Service.tables_to_add(parent, 6) |> Enum.sort()
+
+    assert proposed_partitions == expected_additional_partitions
+  end
+
+  test "add_headroom - all" do
+    now = DateTime.now!("Etc/UTC")
+
+    parent = "work_orders"
+
+    drop_range_partitions(parent)
+
+    expected_partitions =
+      0..3
+      |> Enum.map(&date_with_offset(now, &1))
+      |> generate_partition_properties()
+      |> Enum.map(&partition_name(&1, parent))
+
+    expected = modified_relations(expected_partitions, all_relations())
+
+    Service.add_headroom(:all, 3)
+
+    assert all_relations() == expected
+  end
+
+  test "add_headroom - parent specified" do
+    now = DateTime.now!("Etc/UTC")
+
+    parent = "work_orders"
+
+    drop_range_partitions(parent)
+
+    new_partitions =
+      0..3
+      |> Enum.map(&date_with_offset(now, &1))
+      |> generate_partition_properties()
+      |> Enum.map(fn {_, _, from, _} -> from end)
+      |> Enum.map(&build_partition_name(&1, parent))
+
+    expected = modified_relations(new_partitions, all_relations())
+
+    Service.add_headroom(:work_orders, 3)
+
+    assert all_relations() == expected
+  end
+
+  test "remove_empty" do
+    parent = "work_orders"
+
+    now = DateTime.now!("Etc/UTC")
+
+    drop_range_partitions(parent)
+
+    expected_range_partitions =
+      -2..3
+      |> generate_partition_properties(now)
+      |> Enum.map(&partition_name(&1, parent))
+
+    new_partitions = ["#{parent}_default" | expected_range_partitions]
+
+    expected = modified_relations(new_partitions, all_relations())
+
+    generate_partitions(-6..3, now, parent)
+
+    weeks_ago = Timex.shift(DateTime.utc_now(), weeks: -2)
+
+    Service.remove_empty(parent, weeks_ago)
+
+    assert all_relations() == expected
+  end
+
+  describe "list partitions" do
+    test "returns partitions of the specified table" do
+      drop_range_partitions("work_orders")
+
+      add_partitions("work_orders")
+
+      sort_fn = fn [name, _expression] -> name end
+
+      expected =
+        [
+          [
+            "work_orders_2023_01",
+            "FOR VALUES FROM ('2023-01-01 00:00:00') TO ('2023-01-31 00:00:00')"
+          ],
+          [
+            "work_orders_2023_02",
+            "FOR VALUES FROM ('2023-02-01 00:00:00') TO ('2023-02-28 00:00:00')"
+          ],
+          [
+            "work_orders_2023_03",
+            "FOR VALUES FROM ('2023-03-01 00:00:00') TO ('2023-03-31 00:00:00')"
+          ]
+        ]
+        |> Enum.sort_by(sort_fn, :asc)
+
+      partitions =
+        Service.find_range_partitions("work_orders")
+        |> Enum.sort_by(sort_fn, :asc)
+
+      assert partitions == expected
+    end
+  end
+
+  describe "partitions_older_than" do
+    test "it returns partition tables that end before the given datetime" do
+      parent = "work_orders"
+      bound = ~U[2023-04-29 23:59:59Z]
+      partitions = input_partitions(parent)
+      expected = ["#{parent}_2023_01", "#{parent}_2023_02", "#{parent}_2023_03"]
+
+      assert Service.partitions_older_than(partitions, bound) == expected
+    end
+
+    defp input_partitions(parent) do
+      [
+        [
+          "#{parent}_2023_01",
+          "FOR VALUES FROM ('2023-01-01 00:00:00') TO ('2023-01-31 00:00:00')"
+        ],
+        [
+          "#{parent}_2023_02",
+          "FOR VALUES FROM ('2023-02-01 00:00:00') TO ('2023-02-28 00:00:00')"
+        ],
+        [
+          "#{parent}_2023_03",
+          "FOR VALUES FROM ('2023-03-01 00:00:00') TO ('2023-03-31 00:00:00')"
+        ],
+        [
+          "#{parent}_2023_04",
+          "FOR VALUES FROM ('2023-04-01 00:00:00') TO ('2023-04-30 00:00:00')"
+        ],
+        [
+          "#{parent}_2023_05",
+          "FOR VALUES FROM ('2023-05-01 00:00:00') TO ('2023-05-31 00:00:00')"
+        ]
+      ]
+    end
+  end
+
+  describe "drop_empty_partition" do
+    test "drops the named partition" do
+      parent = "work_orders"
+
+      drop_range_partitions(parent)
+
+      add_partitions(parent)
+
+      expected =
+        [
+          "#{parent}",
+          "#{parent}_2023_01",
+          "#{parent}_2023_03",
+          "#{parent}_default",
+          "#{parent}_monolith"
+        ]
+        |> Enum.sort()
+
+      Service.drop_empty_partition(parent, "#{parent}_2023_02")
+
+      assert associated_relations(all_relations(), parent) == expected
+    end
+
+    test "does nothing if the table is not empty" do
+      parent = "work_orders"
+
+      drop_range_partitions(parent)
+
+      add_partitions(parent)
+
+      expected =
+        [
+          "#{parent}",
+          "#{parent}_2023_01",
+          "#{parent}_2023_02",
+          "#{parent}_2023_03",
+          "#{parent}_default",
+          "#{parent}_monolith"
+        ]
+        |> Enum.sort()
+
+      insert(:workorder, inserted_at: ~U[2023-02-15 10:00:00Z])
+
+      Service.drop_empty_partition(parent, "#{parent}_2023_02")
+
+      assert associated_relations(all_relations(), parent) == expected
+    end
+
+    test "errors out if the parent contains unexpected chars" do
+      parent = "work_orders"
+
+      drop_range_partitions(parent)
+
+      add_partitions(parent)
+
+      assert_raise(
+        ArgumentError,
+        fn ->
+          Service.drop_empty_partition("#{parent} --", "#{parent}_2023_02")
+        end
+      )
+
+      expected =
+        [
+          "#{parent}",
+          "#{parent}_2023_01",
+          "#{parent}_2023_02",
+          "#{parent}_2023_03",
+          "#{parent}_default",
+          "#{parent}_monolith"
+        ]
+        |> Enum.sort()
+
+      assert associated_relations(all_relations(), parent) == expected
+    end
+
+    test "errors out if the partition contains unexpected chars" do
+      parent = "work_orders"
+
+      drop_range_partitions(parent)
+
+      add_partitions(parent)
+
+      assert_raise(
+        ArgumentError,
+        fn ->
+          Service.drop_empty_partition(parent, "#{parent}_2023_2 --")
+        end
+      )
+
+      expected =
+        [
+          "#{parent}",
+          "#{parent}_2023_01",
+          "#{parent}_2023_02",
+          "#{parent}_2023_03",
+          "#{parent}_default",
+          "#{parent}_monolith"
+        ]
+        |> Enum.sort()
+
+      assert associated_relations(all_relations(), parent) == expected
+    end
+  end
+
+  defp all_relations() do
+    Repo.query!(~S[
+      SELECT c.relname
+      FROM pg_catalog.pg_class c
+      LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
+      LEFT JOIN pg_catalog.pg_am am ON am.oid = c.relam
+      WHERE c.relkind IN ('r','p','')
+      AND n.nspname <> 'pg_catalog'
+      AND n.nspname !~ '^pg_toast'
+      AND n.nspname <> 'information_schema'
+      AND pg_catalog.pg_table_is_visible(c.oid);
+    ]).rows
+    |> List.flatten()
+    |> Enum.sort()
+  end
+
+  defp week(date) do
+    {_year, week} = Timex.iso_week(date)
+
+    week
+  end
+
+  defp build_partition_name(date, parent) do
+    {year, week} = Timex.iso_week(date)
+
+    padded_week = week |> Integer.to_string() |> String.pad_leading(2, "0")
+
+    "#{parent}_#{year}_#{padded_week}"
+  end
+
+  defp drop_range_partitions(parent) do
+    all_relations()
+    |> Enum.filter(&(&1 =~ ~r/\A#{parent}_[2d]/))
+    |> Enum.each(fn partition ->
+      Repo.query!("ALTER TABLE #{parent} DETACH PARTITION #{partition};")
+      Repo.query!("DROP TABLE #{partition};")
+    end)
+  end
+
+  defp add_partitions(parent) do
+    [
+      {"2023", "1", "2023-01-01", "2023-01-31"},
+      {"2023", "2", "2023-02-01", "2023-02-28"},
+      {"2023", "3", "2023-03-01", "2023-03-31"}
+    ]
+    |> Enum.each(&create_range_partition(&1, parent))
+
+    create_default_partition(parent)
+  end
+
+  defp create_range_partition(partition_properties, parent) do
+    {_year, _num, from, to} = partition_properties
+
+    Repo.query!("""
+    CREATE TABLE #{partition_name(partition_properties, parent)}
+    PARTITION OF #{parent}
+    FOR VALUES FROM ('#{from}') TO ('#{to}')
+    """)
+  end
+
+  defp partition_name({year, num, _, _}, parent) when is_binary(num) do
+    padded_num = num |> String.pad_leading(2, "0")
+
+    "#{parent}_#{year}_#{padded_num}"
+  end
+
+  defp partition_name({year, num, _, _}, parent) when is_integer(num) do
+    padded_num = num |> Integer.to_string() |> String.pad_leading(2, "0")
+
+    "#{parent}_#{year}_#{padded_num}"
+  end
+
+  defp create_default_partition(parent) do
+    Repo.query!("""
+    CREATE TABLE #{parent}_default
+    PARTITION OF #{parent}
+    DEFAULT
+    """)
+  end
+
+  defp associated_relations(relations, parent) do
+    relations
+    |> Enum.filter(&(&1 =~ ~r/\A#{parent}/))
+    |> Enum.sort()
+  end
+
+  defp generate_partition_properties(dates) do
+    dates
+    |> Enum.map(fn from ->
+      shifted_from = from |> shift_to_monday()
+      to = range_end(shifted_from)
+
+      {from.year, week(from), shifted_from, to}
+    end)
+  end
+
+  defp generate_partition_properties(range, now) do
+    range
+    |> Enum.map(fn week_offset ->
+      from = date_with_offset(now, week_offset) |> shift_to_monday()
+      to = range_end(from)
+
+      {from.year, normalise_counter(week_offset), from, to}
+    end)
+  end
+
+  defp normalise_counter(counter) when counter < 0 do
+    "minus_#{abs(counter)}"
+  end
+
+  defp normalise_counter(counter) when counter >= 0 do
+    "#{counter}"
+  end
+
+  defp generate_partitions(properties, parent) do
+    properties
+    |> Enum.map(&create_range_partition(&1, parent))
+
+    create_default_partition(parent)
+  end
+
+  defp generate_partitions(range, now, parent) do
+    range
+    |> generate_partition_properties(now)
+    |> Enum.map(&create_range_partition(&1, parent))
+
+    create_default_partition(parent)
+  end
+
+  defp date_with_offset(now, offset) do
+    DateTime.add(now, 7 * offset, :day)
+  end
+
+  defp range_end(range_start) do
+    DateTime.add(range_start, 7, :day)
+  end
+
+  defp shift_to_monday(date) do
+    date |> Timex.beginning_of_week(:mon)
+  end
+
+  defp modified_relations(new_relations, existing_relations) do
+    [new_relations | existing_relations] |> List.flatten() |> Enum.sort()
+  end
+end

From 8a0199c9a35a356fcd50cd83ca3da1b8d9840ff0 Mon Sep 17 00:00:00 2001
From: Rory McKinley <rorymckinley@gmail.com>
Date: Tue, 21 Nov 2023 18:25:21 +0200
Subject: [PATCH 2/2] Review changes

---
 lib/lightning/maintenance/admin_tools.ex             |  4 ++--
 lib/lightning/maintenance/partition_table_service.ex |  2 +-
 .../maintenance/partition_table_service_test.exs     | 12 ++++++------
 3 files changed, 9 insertions(+), 9 deletions(-)

diff --git a/lib/lightning/maintenance/admin_tools.ex b/lib/lightning/maintenance/admin_tools.ex
index 305ad176ec..16c5259798 100644
--- a/lib/lightning/maintenance/admin_tools.ex
+++ b/lib/lightning/maintenance/admin_tools.ex
@@ -1,7 +1,7 @@
 defmodule Lightning.AdminTools do
   def generate_iso_weeks(start_date, end_date) do
     Date.range(start_date, end_date)
-    |> Enum.map(&Timex.beginning_of_week(&1, :mon))
+    |> Enum.map(&Date.beginning_of_week(&1))
     |> Enum.uniq()
     |> Enum.map(fn date ->
       {year, week} = Timex.iso_week(date)
@@ -10,7 +10,7 @@ defmodule Lightning.AdminTools do
         year |> Integer.to_string(),
         week |> Integer.to_string() |> String.pad_leading(2, "0"),
         date |> Date.to_string(),
-        date |> Timex.shift(weeks: 1) |> Date.to_string()
+        date |> Date.add(7) |> Date.to_string()
       }
     end)
   end
diff --git a/lib/lightning/maintenance/partition_table_service.ex b/lib/lightning/maintenance/partition_table_service.ex
index baf7b4f859..6df8d7d49d 100644
--- a/lib/lightning/maintenance/partition_table_service.ex
+++ b/lib/lightning/maintenance/partition_table_service.ex
@@ -22,7 +22,7 @@ defmodule Lightning.PartitionTableService do
   @impl Oban.Worker
   def perform(%Oban.Job{args: %{"drop_older_than" => %{"weeks" => weeks}}})
       when is_integer(weeks) do
-    upper_bound = Timex.shift(DateTime.utc_now(), weeks: weeks)
+    upper_bound = DateTime.add(DateTime.utc_now(), weeks * 7, :day)
 
     remove_empty("work_orders", upper_bound)
   end
diff --git a/test/lightning/maintenance/partition_table_service_test.exs b/test/lightning/maintenance/partition_table_service_test.exs
index 26af2b923c..0a4d855712 100644
--- a/test/lightning/maintenance/partition_table_service_test.exs
+++ b/test/lightning/maintenance/partition_table_service_test.exs
@@ -34,7 +34,7 @@ defmodule Lightning.PartitionTableServiceTest do
     test "removes obsolete partitions" do
       parent = "work_orders"
 
-      now = DateTime.now!("Etc/UTC")
+      now = DateTime.utc_now()
 
       drop_range_partitions(parent)
 
@@ -75,7 +75,7 @@ defmodule Lightning.PartitionTableServiceTest do
   end
 
   test "tables_to_add returns tables that do not already exist" do
-    now = DateTime.now!("Etc/UTC")
+    now = DateTime.utc_now()
 
     parent = "work_orders"
 
@@ -109,7 +109,7 @@ defmodule Lightning.PartitionTableServiceTest do
   end
 
   test "add_headroom - all" do
-    now = DateTime.now!("Etc/UTC")
+    now = DateTime.utc_now()
 
     parent = "work_orders"
 
@@ -129,7 +129,7 @@ defmodule Lightning.PartitionTableServiceTest do
   end
 
   test "add_headroom - parent specified" do
-    now = DateTime.now!("Etc/UTC")
+    now = DateTime.utc_now()
 
     parent = "work_orders"
 
@@ -152,7 +152,7 @@ defmodule Lightning.PartitionTableServiceTest do
   test "remove_empty" do
     parent = "work_orders"
 
-    now = DateTime.now!("Etc/UTC")
+    now = DateTime.utc_now()
 
     drop_range_partitions(parent)
 
@@ -167,7 +167,7 @@ defmodule Lightning.PartitionTableServiceTest do
 
     generate_partitions(-6..3, now, parent)
 
-    weeks_ago = Timex.shift(DateTime.utc_now(), weeks: -2)
+    weeks_ago = DateTime.add(DateTime.utc_now(), -2 * 7, :day)
 
     Service.remove_empty(parent, weeks_ago)