Skip to content

Commit 52ce270

Browse files
committed
Partition work_orders
1 parent c4e0e54 commit 52ce270

9 files changed

+977
-2
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,8 @@ bearing with us as we move towards our first stable Lightning release.)
109109
[#1327](https://github.com/OpenFn/Lightning/issues/1327)
110110
- Have user create workflow name before moving to the canvas
111111
[#1103](https://github.com/OpenFn/Lightning/issues/1103)
112+
- Partition `work_orders` by week and year and add functionality to maintain
113+
the partitions. [#1254](https://github.com/OpenFn/Lightning/issues/1254)
112114

113115
### Changed
114116

config/runtime.exs

+5-1
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,11 @@ base_oban_cron = [
9797
{"0 10 * * 1", Lightning.DigestEmailWorker,
9898
args: %{"type" => "weekly_project_digest"}},
9999
{"0 10 1 * *", Lightning.DigestEmailWorker,
100-
args: %{"type" => "monthly_project_digest"}}
100+
args: %{"type" => "monthly_project_digest"}},
101+
{"0 1 * * *", Lightning.PartitionTableService,
102+
args: %{"add_headroom" => %{"weeks" => 2}}},
103+
{"0 0 * * *", Lightning.PartitionTableService,
104+
args: %{"drop_older_than" => %{"weeks" => -6}}}
101105
]
102106

103107
conditional_cron =

lib/lightning/application.ex

+10
Original file line numberDiff line numberDiff line change
@@ -131,4 +131,14 @@ defmodule Lightning.Application do
131131
def oban_opts() do
132132
Application.get_env(:lightning, Oban)
133133
end
134+
135+
@impl true
136+
@doc """
137+
Perform any idempotent database setup that must be done after the repo is started.
138+
"""
139+
def start_phase(:ensure_db_config, :normal, _opts) do
140+
Lightning.PartitionTableService.add_headroom(:all, 2)
141+
Lightning.PartitionTableService.add_headroom(:all, -5)
142+
:ok
143+
end
134144
end
+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
defmodule Lightning.AdminTools do
2+
def generate_iso_weeks(start_date, end_date) do
3+
Date.range(start_date, end_date)
4+
|> Enum.map(&Timex.beginning_of_week(&1, :mon))
5+
|> Enum.uniq()
6+
|> Enum.map(fn date ->
7+
{year, week} = Timex.iso_week(date)
8+
9+
{
10+
year |> Integer.to_string(),
11+
week |> Integer.to_string() |> String.pad_leading(2, "0"),
12+
date |> Date.to_string(),
13+
date |> Timex.shift(weeks: 1) |> Date.to_string()
14+
}
15+
end)
16+
end
17+
end
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
defmodule Lightning.PartitionTableService do
2+
@moduledoc """
3+
Service to keep the partition tables up to date.
4+
"""
5+
6+
use Oban.Worker,
7+
queue: :background,
8+
max_attempts: 1
9+
10+
import Ecto.Query
11+
12+
alias Lightning.Repo
13+
14+
require Logger
15+
16+
@impl Oban.Worker
17+
def perform(%Oban.Job{args: %{"add_headroom" => %{"weeks" => weeks}}})
18+
when is_integer(weeks) do
19+
add_headroom(:all, weeks)
20+
end
21+
22+
@impl Oban.Worker
23+
def perform(%Oban.Job{args: %{"drop_older_than" => %{"weeks" => weeks}}})
24+
when is_integer(weeks) do
25+
upper_bound = Timex.shift(DateTime.utc_now(), weeks: weeks)
26+
27+
remove_empty("work_orders", upper_bound)
28+
end
29+
30+
def add_headroom(:all, weeks) when is_integer(weeks) do
31+
add_headroom(:work_orders, weeks) |> log_partition_creation()
32+
end
33+
34+
def add_headroom(:work_orders, weeks) when is_integer(weeks) do
35+
proposed_tables = tables_to_add("work_orders", weeks)
36+
37+
:ok =
38+
Enum.each(proposed_tables, fn {partition_name, from, to} ->
39+
{
40+
Repo.query(create_query(partition_name, "work_orders", from, to))
41+
}
42+
end)
43+
44+
proposed_tables
45+
end
46+
47+
def tables_to_add(table, weeks) do
48+
today = Date.utc_today()
49+
50+
existing_tables = get_partitions(table)
51+
52+
Lightning.AdminTools.generate_iso_weeks(today, today |> Date.add(weeks * 7))
53+
|> Enum.map(&to_partition_details(table, &1))
54+
|> Enum.reject(fn {name, _from, _to} ->
55+
Enum.find(existing_tables, &String.equivalent?(name, &1))
56+
end)
57+
end
58+
59+
def get_partitions(parent) do
60+
%Postgrex.Result{rows: rows} =
61+
Repo.query!(
62+
~S[
63+
SELECT CAST(inhrelid::regclass AS text) AS child
64+
FROM pg_catalog.pg_inherits
65+
WHERE inhparent = $1::text::regclass;
66+
],
67+
[parent]
68+
)
69+
70+
rows |> List.flatten()
71+
end
72+
73+
@doc """
74+
Drops empty partition tables that have an upper partition bound less than the
75+
date given.
76+
77+
This bound is the `TO` part of the partition:
78+
79+
```
80+
FOR VALUES FROM ('2020-12-28 00:00:00') TO ('2021-01-04 00:00:00')
81+
```
82+
"""
83+
def remove_empty(parent, upper_bound) do
84+
parent
85+
|> find_range_partitions
86+
|> partitions_older_than(upper_bound)
87+
|> Enum.each(&drop_empty_partition(parent, &1))
88+
end
89+
90+
def find_range_partitions(parent) do
91+
Repo.query!(
92+
~S[
93+
SELECT
94+
pt.relname AS partition_name,
95+
pg_get_expr(pt.relpartbound,
96+
pt.oid,
97+
TRUE) AS partition_expression
98+
FROM
99+
pg_class base_tb
100+
JOIN pg_inherits i ON
101+
i.inhparent = base_tb.oid
102+
JOIN pg_class pt ON
103+
pt.oid = i.inhrelid
104+
WHERE
105+
base_tb.oid = $1::text::regclass
106+
AND pg_get_expr(
107+
pt.relpartbound,
108+
pt.oid,
109+
TRUE
110+
) != 'DEFAULT'
111+
],
112+
[parent]
113+
).rows
114+
end
115+
116+
def partitions_older_than(partitions, bound) do
117+
partitions
118+
|> Enum.map(fn [table, range_expression] ->
119+
[_, to_as_string] =
120+
~r/TO \('(.+)'\)/
121+
|> Regex.run(range_expression)
122+
123+
{:ok, to_as_dt, _} = DateTime.from_iso8601(to_as_string <> "Z")
124+
125+
[table, to_as_dt]
126+
end)
127+
|> Enum.filter(fn [_table, to] -> DateTime.compare(to, bound) == :lt end)
128+
|> Enum.map(fn [table, _to] -> table end)
129+
end
130+
131+
def drop_empty_partition(parent, partition) do
132+
unless valid_chars?(parent) && valid_chars?(partition) do
133+
raise ArgumentError, message: "Table name contains invalid characters"
134+
end
135+
136+
partition
137+
|> partition_empty?
138+
|> handle_drop(parent, partition)
139+
end
140+
141+
defp valid_chars?(table_name) do
142+
table_name =~ ~r/\A\w+\z/
143+
end
144+
145+
defp partition_empty?(partition) do
146+
from(r in partition, select: count()) |> Repo.one!() == 0
147+
end
148+
149+
defp handle_drop(true, parent, partition) do
150+
Logger.info("Detaching #{partition} from #{parent}")
151+
Repo.query!("ALTER TABLE #{parent} DETACH PARTITION #{partition};")
152+
Logger.info("Dropping #{partition}")
153+
Repo.query!("DROP TABLE #{partition};")
154+
end
155+
156+
defp handle_drop(false, _parent, _partition) do
157+
end
158+
159+
defp create_query(partition, parent, from, to) do
160+
"""
161+
CREATE TABLE #{partition}
162+
PARTITION OF #{parent}
163+
FOR VALUES FROM ('#{from}') TO ('#{to}');
164+
"""
165+
end
166+
167+
defp to_partition_details(table, {year, week, from, to}) do
168+
{"#{table}_#{year}_#{week}", from, to}
169+
end
170+
171+
defp log_partition_creation(partitions) when length(partitions) > 0 do
172+
partitions
173+
|> Enum.map_join("\n", fn {partition_name, from, to} ->
174+
"Created #{partition_name} for #{from} -> #{to}"
175+
end)
176+
|> Logger.info()
177+
end
178+
179+
defp log_partition_creation(partitions) when partitions == [] do
180+
Logger.info("No extra partitions were needed.")
181+
end
182+
end

mix.exs

+4-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,10 @@ defmodule Lightning.MixProject do
3939
def application do
4040
[
4141
mod: {Lightning.Application, [:timex]},
42-
extra_applications: [:logger, :runtime_tools, :os_mon]
42+
extra_applications: [:logger, :runtime_tools, :os_mon],
43+
start_phases: [
44+
ensure_db_config: []
45+
]
4346
]
4447
end
4548

0 commit comments

Comments
 (0)