Skip to content

Commit

Permalink
fix: Move migrations to non blocking callback
Browse files Browse the repository at this point in the history
Moves migrations to a non blocking callback meaning that multiple migrations will be able to be started. We're also handling errors better when the migrations are already running preventing multiple migrations from starting
  • Loading branch information
filipecabaco committed Nov 19, 2024
1 parent ac00218 commit 45a8f03
Show file tree
Hide file tree
Showing 11 changed files with 41 additions and 24 deletions.
41 changes: 28 additions & 13 deletions lib/realtime/tenants/migrations.ex
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ defmodule Realtime.Tenants.Migrations do
spec = {__MODULE__, attrs}

case DynamicSupervisor.start_child(supervisor, spec) do
:ignore -> :ok
{:ok, _} -> :ok
{:error, {:already_started, _}} -> {:error, :migration_already_running}
error -> error
end
end
Expand All @@ -143,24 +144,27 @@ defmodule Realtime.Tenants.Migrations do
GenServer.start_link(__MODULE__, attrs, name: name)
end

def init(%__MODULE__{tenant_external_id: tenant_external_id, settings: settings}) do
def init(%__MODULE__{tenant_external_id: tenant_external_id} = state) do
Logger.metadata(external_id: tenant_external_id, project: tenant_external_id)
{:ok, state, {:continue, :run_migration}}
end

def handle_continue(:run_migration, %{settings: settings} = state) do
case migrate(settings) do
{:ok, _} -> :ignore
{:error, error} -> {:stop, error}
{:ok, _} -> {:stop, :normal, state}
{:error, error} -> {:stop, {:shutdown, error}, state}
end
end

defp migrate(
%{
"db_host" => db_host,
"db_port" => db_port,
"db_name" => db_name,
"db_user" => db_user,
"db_password" => db_password
} = settings
) do
defp migrate(settings) do
%{
"db_host" => db_host,
"db_port" => db_port,
"db_name" => db_name,
"db_user" => db_user,
"db_password" => db_password
} = settings

{host, port, name, user, pass} =
Crypto.decrypt_creds(db_host, db_port, db_name, db_user, db_password)

Expand Down Expand Up @@ -195,6 +199,17 @@ defmodule Realtime.Tenants.Migrations do
end)
end

def terminate(:normal, _) do
Logger.info("Migrations ran successfully")

:ok
end

def terminate({:shutdown, error}, _) do
log_error("MigrationsFailedToRun", error)
:ok
end

# @expected_migration_count length(@migrations)

@doc """
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.33.51",
version: "2.33.52",
elixir: "~> 1.16.0",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
4 changes: 2 additions & 2 deletions test/integration/rt_channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ defmodule Realtime.Integration.RtChannelTest do
[tenant] = Tenant |> Repo.all() |> Repo.preload(:extensions)
[%{settings: settings} | _] = tenant.extensions
migrations = %Migrations{tenant_external_id: tenant.external_id, settings: settings}
:ok = Migrations.run_migrations(migrations)

Migrations.run_migrations(migrations)
:timer.sleep(1000)
%{tenant: tenant}
end

Expand Down
2 changes: 1 addition & 1 deletion test/realtime/broadcast_changes/handler_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ defmodule Realtime.BroadcastChanges.HandlerTest do
[%{settings: settings} | _] = tenant.extensions
migrations = %Migrations{tenant_external_id: tenant.external_id, settings: settings}
Migrations.run_migrations(migrations)

:timer.sleep(1000)
{:ok, conn} = Database.connect(tenant, "realtime_test", 1)
clean_table(conn, "realtime", "messages")

Expand Down
2 changes: 1 addition & 1 deletion test/realtime/messages_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ defmodule Realtime.MessagesTest do
[%{settings: settings} | _] = tenant.extensions
migrations = %Migrations{tenant_external_id: tenant.external_id, settings: settings}
Migrations.run_migrations(migrations)

:timer.sleep(1000)
{:ok, conn} = Database.connect(tenant, "realtime_test", 1)
clean_table(conn, "realtime", "messages")
date_start = Date.utc_today() |> Date.add(-10)
Expand Down
2 changes: 1 addition & 1 deletion test/realtime/repo_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ defmodule Realtime.RepoTest do
[%{settings: settings} | _] = tenant.extensions
migrations = %Migrations{tenant_external_id: tenant.external_id, settings: settings}
Migrations.run_migrations(migrations)

:timer.sleep(1000)
clean_table(db_conn, "realtime", "messages")
%{db_conn: db_conn, tenant: tenant}
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ defmodule Realtime.Tenants.Authorization.Policies.BroadcastPoliciesTest do
}

Migrations.run_migrations(migrations)

:timer.sleep(1000)
{:ok, db_conn} = Database.connect(tenant, "realtime_test", 1)

clean_table(db_conn, "realtime", "messages")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ defmodule Realtime.Tenants.Authorization.Policies.PresencePoliciesTest do
}

Migrations.run_migrations(migrations)
:timer.sleep(1000)
{:ok, _} = start_supervised({Connect, tenant_id: tenant.external_id}, restart: :transient)
{:ok, db_conn} = Connect.get_status(tenant.external_id)

Expand Down
2 changes: 1 addition & 1 deletion test/realtime/tenants/authorization_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ defmodule Realtime.Tenants.AuthorizationTest do
[%{settings: settings} | _] = tenant.extensions
migrations = %Migrations{tenant_external_id: tenant.external_id, settings: settings}
Migrations.run_migrations(migrations)

:timer.sleep(1000)
{:ok, db_conn} = Database.connect(tenant, "realtime_test", 1)

clean_table(db_conn, "realtime", "messages")
Expand Down
2 changes: 1 addition & 1 deletion test/realtime/tenants/janitor_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ defmodule Realtime.Tenants.JanitorTest do
fn tenant ->
tenant = Repo.preload(tenant, :extensions)
Connect.lookup_or_start_connection(tenant.external_id)
:timer.sleep(250)
:timer.sleep(1000)
{:ok, conn} = Database.connect(tenant, "realtime_test", 1)
clean_table(conn, "realtime", "messages")
tenant
Expand Down
5 changes: 3 additions & 2 deletions test/realtime/tenants/migrations_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ defmodule Realtime.Tenants.MigrationsTest do
end)
end
|> Task.await_many()
|> Enum.uniq()
|> MapSet.new()

assert [:ok] = res
expected = MapSet.new([:ok, {:error, :migration_already_running}])
assert ^expected = res
end
end
end

0 comments on commit 45a8f03

Please sign in to comment.