From d6a828f152c2088929f5b202e733cdeac0603ddc Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Mon, 2 Sep 2024 15:07:31 +0200 Subject: [PATCH 01/40] Chore: make test case async --- lib/casts/seq_checker_test.exs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/casts/seq_checker_test.exs b/lib/casts/seq_checker_test.exs index 3ec779b..6c375df 100644 --- a/lib/casts/seq_checker_test.exs +++ b/lib/casts/seq_checker_test.exs @@ -1,5 +1,5 @@ defmodule Essig.Casts.SeqCheckerTest do - use ExUnit.Case + use ExUnit.Case, async: true alias Essig.Casts.SeqChecker describe "check_reached/2" do From 90de59b6508fe16fc2b83a18d55d5756b815d6cf Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Mon, 2 Sep 2024 15:08:33 +0200 Subject: [PATCH 02/40] Feat: configure Essig.PubSub --- config/config.exs | 2 ++ lib/essig/application.ex | 1 + 2 files changed, 3 insertions(+) diff --git a/config/config.exs b/config/config.exs index 192ce79..3fc0c0d 100644 --- a/config/config.exs +++ b/config/config.exs @@ -4,6 +4,8 @@ config :essig, ecto_repos: [Essig.Repo], generators: [timestamp_type: :utc_datetime] +config :essig, Essig.PubSub, adapter: Phoenix.PubSub.PG2 + if config_env() == :dev do # setup for ecto_dev_logger (https://github.com/fuelen/ecto_dev_logger) config :essig, Essig.Repo, log: false diff --git a/lib/essig/application.ex b/lib/essig/application.ex index 395171d..2bcd259 100644 --- a/lib/essig/application.ex +++ b/lib/essig/application.ex @@ -7,6 +7,7 @@ defmodule Essig.Application do def start(_type, _args) do children = [ Essig.Repo, + {Phoenix.PubSub, name: Essig.PubSub}, {Registry, keys: :unique, name: Essig.Scopes.Registry}, Essig.Scopes.DynamicSupervisor ] From a9517816060bb9b6069ec104c2f48eef3f4040fc Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Mon, 2 Sep 2024 15:08:51 +0200 Subject: [PATCH 03/40] Chore: remove unused alias --- lib/event_store/read_all_stream_backward.ex | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/event_store/read_all_stream_backward.ex b/lib/event_store/read_all_stream_backward.ex index 4b112a4..cded3d8 100644 --- a/lib/event_store/read_all_stream_backward.ex +++ b/lib/event_store/read_all_stream_backward.ex @@ -1,5 +1,4 @@ defmodule Essig.EventStore.ReadAllStreamBackward do - alias Essig.Schemas.Event use Essig.Repo def run(from_id, amount) do From 1e9c8779ce07c77f9d1da5304d08837362711c85 Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Mon, 2 Sep 2024 15:09:25 +0200 Subject: [PATCH 04/40] Chore: sample event structs for local testing --- lib/sample/events.ex | 64 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) create mode 100644 lib/sample/events.ex diff --git a/lib/sample/events.ex b/lib/sample/events.ex new file mode 100644 index 0000000..762bbe7 --- /dev/null +++ b/lib/sample/events.ex @@ -0,0 +1,64 @@ +defmodule Sample.TestReports.Events.TicketMatchAdded do + @moduledoc """ + - a matching ticket was configured + - we must always have 2 (!!!) subtickets per Test Report process + - "bootloader" / "appsw" + - FIELDS + - [:match_kind, :id, :kind] + - EXAMPLE + - %{match_kind: "manual", id: 50, kind: "bootloader"} + - %{match_kind: "auto", id: 51, kind: "appsw"} + """ + use JsonSerde, alias: "trp.ticket_match_added" + defstruct [:match_kind, :id, :kind] +end + +defmodule Sample.TestReports.Events.ReqTestReportsUpdated do + @moduledoc """ + - there was a change in the required test reports config + - a value was set OR deleted + - removal is only allowed for admins! + - maybe also allow for everyone, we keep the full history anyways + - normal users can only add requirements + - LIST of operations + - `[[name, op, value], [name, op, value]]` + - EXAMPLE: + [ + {:fota, :set, true}, + {:fota, :freeze, true}, + {:fota, :set, false}, + {:fota, :freeze, false}, + ] + + """ + use JsonSerde, alias: "trp.req_test_reports_updated" + defstruct [:ops] +end + +defmodule Sample.TestReports.Events.MasterReportAdded do + @moduledoc """ + - an xml file with master report data was found in ZIP and could be parsed + - we store the XML file name, the content is kept in the BinStorage (BinStorageMasterReport) system (can be potentially multiple MBs) + - we also store the parsed information for the XML file + - and the event version (?) + """ + use JsonSerde, alias: "trp.master_report_added" + defstruct [:path, :report] +end + +defmodule Sample.TestReports.MasterReport do + defstruct meta: %{}, + tool_versions: [], + test_components: [], + test_cases: [] +end + +defimpl Inspect, for: Sample.TestReports.MasterReport do + def inspect(mreport, _opts) do + ~s|Sample.TestReports.MasterReport| + end + + def show_meta(meta) do + ~s|{name: #{meta.name}, path: #{meta.path}, date: #{meta.date}, test_type: #{meta.test_type}, test_tool: #{meta.test_tool}}| + end +end From da72f0ad2031259678285104dff1d4a5f3b2b52b Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Mon, 2 Sep 2024 15:10:59 +0200 Subject: [PATCH 05/40] Chore: broadcast events after insertion + simple subscriber --- lib/event_store.ex | 12 ++++++++++++ lib/sample/my_genserver.ex | 24 ++++++++++++++++++++++++ 2 files changed, 36 insertions(+) create mode 100644 lib/sample/my_genserver.ex diff --git a/lib/event_store.ex b/lib/event_store.ex index ecb5d5a..5f2f17b 100644 --- a/lib/event_store.ex +++ b/lib/event_store.ex @@ -6,6 +6,18 @@ defmodule Essig.EventStore do Essig.EventStore.AppendToStream.run(stream_uuid, stream_type, expected_seq, events) do events = res.insert_events stream = res.update_seq + + scope_uuid = Essig.Context.current_scope() + + # Broadcast the events + Enum.each(events, fn event -> + Phoenix.PubSub.broadcast( + Essig.PubSub, + "events:#{scope_uuid}", + {:new_event, event} + ) + end) + {:ok, %{stream: stream, events: events}} end end diff --git a/lib/sample/my_genserver.ex b/lib/sample/my_genserver.ex new file mode 100644 index 0000000..dd73612 --- /dev/null +++ b/lib/sample/my_genserver.ex @@ -0,0 +1,24 @@ +defmodule MyGenServer do + use GenServer + + def start_link(_) do + GenServer.start_link(__MODULE__, %{}) + end + + def init(state) do + Phoenix.PubSub.subscribe(Essig.PubSub, pubsub_topic()) + {:ok, state} + end + + def handle_info({:new_event, event}, state) do + # Handle the event + IO.inspect(Essig.Context.current_scope(), label: "Current scope") + IO.inspect(event, label: "Received event") + {:noreply, state} + end + + defp pubsub_topic() do + scope_uuid = Essig.Context.current_scope() + "events:#{scope_uuid}" + end +end From b79439db5a58509f82bcebad122cf0bbf2335ede Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Mon, 2 Sep 2024 15:49:55 +0200 Subject: [PATCH 06/40] Feat: also sets metadata when starting casts + adjust tests --- lib/casts/cast_runner.ex | 6 ++++-- lib/casts/cast_runner_test.exs | 30 ++++++++++++++++++++++++---- lib/server_test.exs | 36 ++++++++++++++++++++-------------- 3 files changed, 51 insertions(+), 21 deletions(-) diff --git a/lib/casts/cast_runner.ex b/lib/casts/cast_runner.ex index b54d1fc..619c9e9 100644 --- a/lib/casts/cast_runner.ex +++ b/lib/casts/cast_runner.ex @@ -18,7 +18,9 @@ defmodule Essig.Casts.CastRunner do def init(args) do module = Keyword.fetch!(args, :module) apply(module, :bootstrap, []) - {:ok, %{module: module, seq: 0, max_id: 0}} + init_data = %{key: module, seq: 0, max_id: 0, module: module} + Essig.Casts.MetaTable.set(module, init_data) + {:ok, init_data} end defp via_tuple(module) do @@ -26,7 +28,7 @@ defmodule Essig.Casts.CastRunner do end def handle_call({:send_events, events}, _from, state) do - module = Map.fetch!(state, :module) + module = Map.fetch!(state, :key) {:ok, res, state} = apply(module, :handle_events, [state, events]) state = update_seq_and_max_id(state, events) {:reply, {res, state}, state} diff --git a/lib/casts/cast_runner_test.exs b/lib/casts/cast_runner_test.exs index b7f7986..72aa790 100644 --- a/lib/casts/cast_runner_test.exs +++ b/lib/casts/cast_runner_test.exs @@ -27,8 +27,19 @@ defmodule Essig.Casts.CastRunnerTest do CastRunner.send_events(SampleCast2, events) # Assert that the events were processed by the respective CastRunners - assert MetaTable.get(SampleCast1) == %{key: SampleCast1, max_id: 100, seq: 2} - assert MetaTable.get(SampleCast2) == %{key: SampleCast2, max_id: 100, seq: 2} + assert MetaTable.get(SampleCast1) == %{ + key: SampleCast1, + module: SampleCast1, + max_id: 100, + seq: 2 + } + + assert MetaTable.get(SampleCast2) == %{ + key: SampleCast2, + module: SampleCast2, + max_id: 100, + seq: 2 + } end end @@ -46,8 +57,19 @@ defmodule Essig.Casts.CastRunnerTest do CastRunner.send_events(SampleCast2, events) # # Assert the metadata for each CastRunner - assert MetaTable.get(SampleCast1) == %{key: SampleCast1, max_id: 100, seq: 2} - assert MetaTable.get(SampleCast2) == %{key: SampleCast2, max_id: 100, seq: 4} + assert MetaTable.get(SampleCast1) == %{ + key: SampleCast1, + module: SampleCast1, + max_id: 100, + seq: 2 + } + + assert MetaTable.get(SampleCast2) == %{ + key: SampleCast2, + module: SampleCast2, + max_id: 100, + seq: 4 + } end end end diff --git a/lib/server_test.exs b/lib/server_test.exs index 9109298..8dc9b35 100644 --- a/lib/server_test.exs +++ b/lib/server_test.exs @@ -4,47 +4,53 @@ defmodule Essig.ServerTest do describe "full_run" do test "works with casts" do - Essig.Server.start_scope("app1") + scope_uuid = Essig.UUID7.generate() + Essig.Server.start_scope(scope_uuid) Essig.Server.start_casts([SampleCast1, SampleCast2]) - pid = Essig.Scopes.Registry.get("app1") + pid = Essig.Scopes.Registry.get(scope_uuid) assert is_pid(pid) - assert "app1" in Essig.Scopes.Registry.keys() + assert scope_uuid in Essig.Scopes.Registry.keys() assert is_pid(Essig.Server.get_cast(SampleCast1)) assert is_pid(Essig.Server.get_cast(SampleCast2)) Process.flag(:trap_exit, true) GenServer.stop(pid) - assert eventually(fn -> Essig.Scopes.Registry.get("app1") == nil end) + assert eventually(fn -> Essig.Scopes.Registry.get(scope_uuid) == nil end) - assert_raise ArgumentError, "unknown registry: Essig.Casts.Registry_app1", fn -> - is_pid(Essig.Server.get_cast(SampleCast1)) - end + assert_raise ArgumentError, + "unknown registry: :\"Elixir.Essig.Casts.Registry_#{scope_uuid}\"", + fn -> + is_pid(Essig.Server.get_cast(SampleCast1)) + end - refute "app1" in Essig.Scopes.Registry.keys() + refute scope_uuid in Essig.Scopes.Registry.keys() end test "works with entities" do - Essig.Server.start_scope("app1") + scope_uuid = Essig.UUID7.generate() + Essig.Server.start_scope(scope_uuid) {:ok, _} = Essig.Server.start_entity(Entities.Entity1, "1") # duplicate entities are prevented {:error, {:already_started, _}} = Essig.Server.start_entity(Entities.Entity1, "1") - pid = Essig.Scopes.Registry.get("app1") + pid = Essig.Scopes.Registry.get(scope_uuid) assert is_pid(pid) - assert "app1" in Essig.Scopes.Registry.keys() + assert scope_uuid in Essig.Scopes.Registry.keys() assert is_pid(Essig.Server.get_entity(Entities.Entity1, "1")) Process.flag(:trap_exit, true) GenServer.stop(pid) - assert eventually(fn -> Essig.Scopes.Registry.get("app1") == nil end) + assert eventually(fn -> Essig.Scopes.Registry.get(scope_uuid) == nil end) - assert_raise ArgumentError, "unknown registry: Essig.Entities.Registry_app1", fn -> - is_pid(Essig.Server.get_entity(Entities.Entity1, "1")) - end + assert_raise ArgumentError, + "unknown registry: :\"Elixir.Essig.Entities.Registry_#{scope_uuid}\"", + fn -> + is_pid(Essig.Server.get_entity(Entities.Entity1, "1")) + end end end end From eb0bb36e246c7f33d302df6bbd238814d368e268 Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Mon, 2 Sep 2024 15:50:20 +0200 Subject: [PATCH 07/40] Chore: Drop Ecto from UUID generator module name --- lib/crud/casts_crud_test.exs | 6 +++--- lib/crud/events_crud_test.exs | 4 ++-- lib/crud/streams_crud_test.exs | 10 +++++----- lib/ecto/uuid7.ex | 2 +- lib/event_store/append_to_stream_test.exs | 16 ++++++++-------- lib/event_store_test.exs | 20 ++++++++++---------- lib/schemas/cast.ex | 2 +- lib/schemas/event.ex | 2 +- lib/schemas/scope.ex | 2 +- lib/schemas/stream.ex | 2 +- 10 files changed, 33 insertions(+), 33 deletions(-) diff --git a/lib/crud/casts_crud_test.exs b/lib/crud/casts_crud_test.exs index 82715e9..41bc9ce 100644 --- a/lib/crud/casts_crud_test.exs +++ b/lib/crud/casts_crud_test.exs @@ -10,13 +10,13 @@ defmodule Essig.CastsCrudTest do end test "provides defaults for numeric values" do - uuid = Essig.Ecto.UUID7.generate() + uuid = Essig.UUID7.generate() {:ok, cast} = CastsCrud.create_cast(%{module: "module-1", scope_uuid: uuid, seq: 1}) assert cast.status == :new end test "prevents duplicate casts with same module" do - uuid = Essig.Ecto.UUID7.generate() + uuid = Essig.UUID7.generate() {:ok, _cast} = CastsCrud.create_cast(%{module: "module-1", scope_uuid: uuid, seq: 1}) {:error, changeset} = CastsCrud.create_cast(%{module: "module-1", scope_uuid: uuid, seq: 2}) errors = errors_on(changeset) @@ -24,7 +24,7 @@ defmodule Essig.CastsCrudTest do end test "allows updating the `max_id` value via upsert on (module)" do - uuid = Essig.Ecto.UUID7.generate() + uuid = Essig.UUID7.generate() {:ok, cast} = CastsCrud.create_cast(%{module: "module-1", scope_uuid: uuid, seq: 1}) {:ok, cast2} = diff --git a/lib/crud/events_crud_test.exs b/lib/crud/events_crud_test.exs index 2ffbbca..8ca9c18 100644 --- a/lib/crud/events_crud_test.exs +++ b/lib/crud/events_crud_test.exs @@ -24,8 +24,8 @@ defmodule Essig.Crud.EventsCrudTest do end test "creates proper events" do - stream_uuid = Essig.Ecto.UUID7.generate() - scope_uuid = Essig.Ecto.UUID7.generate() + stream_uuid = Essig.UUID7.generate() + scope_uuid = Essig.UUID7.generate() {:ok, _stream} = StreamsCrud.create_stream(%{ diff --git a/lib/crud/streams_crud_test.exs b/lib/crud/streams_crud_test.exs index 9d63030..ef0214d 100644 --- a/lib/crud/streams_crud_test.exs +++ b/lib/crud/streams_crud_test.exs @@ -10,7 +10,7 @@ defmodule Essig.Crud.StreamsCrudTest do end test "creates a minimal stream record" do - scope_uuid = Essig.Ecto.UUID7.generate() + scope_uuid = Essig.UUID7.generate() {:ok, stream} = StreamsCrud.create_stream(%{scope_uuid: scope_uuid, stream_type: "user", seq: 1}) @@ -19,8 +19,8 @@ defmodule Essig.Crud.StreamsCrudTest do end test "prevents duplicates" do - uuid = Essig.Ecto.UUID7.generate() - scope_uuid = Essig.Ecto.UUID7.generate() + uuid = Essig.UUID7.generate() + scope_uuid = Essig.UUID7.generate() {:ok, stream} = StreamsCrud.create_stream(%{ @@ -44,8 +44,8 @@ defmodule Essig.Crud.StreamsCrudTest do end test "updates the seq on equal streams (upsert_stream)" do - uuid = Essig.Ecto.UUID7.generate() - scope_uuid = Essig.Ecto.UUID7.generate() + uuid = Essig.UUID7.generate() + scope_uuid = Essig.UUID7.generate() {:ok, stream} = StreamsCrud.upsert_stream(%{ diff --git a/lib/ecto/uuid7.ex b/lib/ecto/uuid7.ex index 59516ae..2028dc5 100644 --- a/lib/ecto/uuid7.ex +++ b/lib/ecto/uuid7.ex @@ -1,4 +1,4 @@ -defmodule Essig.Ecto.UUID7 do +defmodule Essig.UUID7 do @moduledoc """ wrapper around Uniq.UUID """ diff --git a/lib/event_store/append_to_stream_test.exs b/lib/event_store/append_to_stream_test.exs index cc88dd2..dcc706a 100644 --- a/lib/event_store/append_to_stream_test.exs +++ b/lib/event_store/append_to_stream_test.exs @@ -3,8 +3,8 @@ defmodule Essig.EventStore.AppendToStreamTest do describe "stream does not exist" do setup do - Essig.Context.set_current_scope(Essig.Ecto.UUID7.generate()) - stream_uuid = Essig.Ecto.UUID7.generate() + Essig.Context.set_current_scope(Essig.UUID7.generate()) + stream_uuid = Essig.UUID7.generate() e1 = %CustomApp.TestReports.Events.TicketMatchAdded{ match_kind: "auto", @@ -48,8 +48,8 @@ defmodule Essig.EventStore.AppendToStreamTest do describe "stream exists + expected value matches" do setup do - Essig.Context.set_current_scope(Essig.Ecto.UUID7.generate()) - stream_uuid = Essig.Ecto.UUID7.generate() + Essig.Context.set_current_scope(Essig.UUID7.generate()) + stream_uuid = Essig.UUID7.generate() e1 = %CustomApp.TestReports.Events.TicketMatchAdded{ match_kind: "auto", @@ -97,8 +97,8 @@ defmodule Essig.EventStore.AppendToStreamTest do describe "stream exists, yet expected seq does not match" do test "returns errors" do - Essig.Context.set_current_scope(Essig.Ecto.UUID7.generate()) - uuid = Essig.Ecto.UUID7.generate() + Essig.Context.set_current_scope(Essig.UUID7.generate()) + uuid = Essig.UUID7.generate() e1 = %CustomApp.TestReports.Events.TicketMatchAdded{ match_kind: "auto", @@ -123,8 +123,8 @@ defmodule Essig.EventStore.AppendToStreamTest do describe "stream exists, seq matches, yet stream type does not match" do test "returns errors" do - Essig.Context.set_current_scope(Essig.Ecto.UUID7.generate()) - uuid = Essig.Ecto.UUID7.generate() + Essig.Context.set_current_scope(Essig.UUID7.generate()) + uuid = Essig.UUID7.generate() e1 = %CustomApp.TestReports.Events.TicketMatchAdded{ match_kind: "auto", diff --git a/lib/event_store_test.exs b/lib/event_store_test.exs index f23fd5c..5039772 100644 --- a/lib/event_store_test.exs +++ b/lib/event_store_test.exs @@ -4,7 +4,7 @@ defmodule Essig.EventStoreTest do use MnemeDefaults setup do - Essig.Context.set_current_scope(Essig.Ecto.UUID7.generate()) + Essig.Context.set_current_scope(Essig.UUID7.generate()) :ok end @@ -28,8 +28,8 @@ defmodule Essig.EventStoreTest do describe "read_all_stream_forward" do test "iterates over ALL global events from oldest to newest" do - uuid1 = Essig.Ecto.UUID7.generate() - uuid2 = Essig.Ecto.UUID7.generate() + uuid1 = Essig.UUID7.generate() + uuid2 = Essig.UUID7.generate() init_stream(uuid1, 0) init_stream(uuid2, 0) init_stream(uuid1, 10) @@ -48,8 +48,8 @@ defmodule Essig.EventStoreTest do describe "read_all_stream_backward" do test "iterates over ALL global events from newest to oldest" do - uuid1 = Essig.Ecto.UUID7.generate() - uuid2 = Essig.Ecto.UUID7.generate() + uuid1 = Essig.UUID7.generate() + uuid2 = Essig.UUID7.generate() # batch 1 init_stream(uuid1, 0) # batch 2 @@ -85,7 +85,7 @@ defmodule Essig.EventStoreTest do describe "read_stream_backward" do test "fetches events from newest to oldest with filters applied" do - uuid = Essig.Ecto.UUID7.generate() + uuid = Essig.UUID7.generate() init_stream(uuid, 0) @@ -119,7 +119,7 @@ defmodule Essig.EventStoreTest do describe "read_stream_forward" do test "fetches events from oldest to newest with filters applied" do - uuid = Essig.Ecto.UUID7.generate() + uuid = Essig.UUID7.generate() init_stream(uuid, 0) {:ok, _a} = @@ -191,12 +191,12 @@ defmodule Essig.EventStoreTest do scope1 = Essig.Context.current_scope() Essig.Server.start_scope(scope1) - uuid1 = Essig.Ecto.UUID7.generate() + uuid1 = Essig.UUID7.generate() init_stream(uuid1, 0) - scope2 = Essig.Ecto.UUID7.generate() + scope2 = Essig.UUID7.generate() Essig.Server.start_scope(scope2) - uuid2 = Essig.Ecto.UUID7.generate() + uuid2 = Essig.UUID7.generate() init_stream(uuid2, 0) # switch to scope1 diff --git a/lib/schemas/cast.ex b/lib/schemas/cast.ex index 0bd23fd..df0b0e3 100644 --- a/lib/schemas/cast.ex +++ b/lib/schemas/cast.ex @@ -1,7 +1,7 @@ defmodule Essig.Schemas.Cast do use Ecto.Schema - @primary_key {:cast_uuid, Ecto.UUID, autogenerate: {Essig.Ecto.UUID7, :generate, []}} + @primary_key {:cast_uuid, Ecto.UUID, autogenerate: {Essig.UUID7, :generate, []}} schema "essig_casts" do # this is another "primary" key, used for global ordering (+ and when fetching all stream) field(:id, :integer, read_after_writes: true) diff --git a/lib/schemas/event.ex b/lib/schemas/event.ex index 743c5f0..4e7032a 100644 --- a/lib/schemas/event.ex +++ b/lib/schemas/event.ex @@ -1,7 +1,7 @@ defmodule Essig.Schemas.Event do use Ecto.Schema - @primary_key {:event_uuid, Ecto.UUID, autogenerate: {Essig.Ecto.UUID7, :generate, []}} + @primary_key {:event_uuid, Ecto.UUID, autogenerate: {Essig.UUID7, :generate, []}} schema "essig_events" do # this is another "primary" key, used for global ordering (+ and when fetching all stream) field(:id, :integer, read_after_writes: true) diff --git a/lib/schemas/scope.ex b/lib/schemas/scope.ex index 268d2a9..fdffb02 100644 --- a/lib/schemas/scope.ex +++ b/lib/schemas/scope.ex @@ -1,7 +1,7 @@ defmodule Essig.Schemas.Scope do use Ecto.Schema - @primary_key {:scope_uuid, Ecto.UUID, autogenerate: {Essig.Ecto.UUID7, :generate, []}} + @primary_key {:scope_uuid, Ecto.UUID, autogenerate: {Essig.UUID7, :generate, []}} schema "essig_scopes" do # this is another "primary" key, used for global ordering (+ and when fetching all stream) field(:id, :integer, read_after_writes: true) diff --git a/lib/schemas/stream.ex b/lib/schemas/stream.ex index e2eb240..771f9f7 100644 --- a/lib/schemas/stream.ex +++ b/lib/schemas/stream.ex @@ -1,7 +1,7 @@ defmodule Essig.Schemas.Stream do use Ecto.Schema - @primary_key {:stream_uuid, Ecto.UUID, autogenerate: {Essig.Ecto.UUID7, :generate, []}} + @primary_key {:stream_uuid, Ecto.UUID, autogenerate: {Essig.UUID7, :generate, []}} schema "essig_streams" do # this is another "primary" key, used for global ordering (+ and when fetching all stream) field(:id, :integer, read_after_writes: true) From 25efe3a9333e0f6bb8500a7836c27c05ab880386 Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Mon, 2 Sep 2024 18:42:44 +0200 Subject: [PATCH 08/40] Chore: add gen_state_machine (wrapper for :gen_statem) --- mix.exs | 3 +++ mix.lock | 1 + 2 files changed, 4 insertions(+) diff --git a/mix.exs b/mix.exs index f3ab7df..bbe1e2f 100644 --- a/mix.exs +++ b/mix.exs @@ -43,6 +43,9 @@ defmodule Essig.MixProject do ## PUB-SUB {:phoenix_pubsub, "~> 2.1"}, + ## State machine handling + {:gen_state_machine, "~> 3.0"}, + ## UTIL {:json_serde, github: "maxohq/json_serde"}, {:liveness, "~> 1.0.0"}, diff --git a/mix.lock b/mix.lock index dbac717..b712c45 100644 --- a/mix.lock +++ b/mix.lock @@ -10,6 +10,7 @@ "ecto_sql": {:hex, :ecto_sql, "3.12.0", "73cea17edfa54bde76ee8561b30d29ea08f630959685006d9c6e7d1e59113b7d", [:mix], [{:db_connection, "~> 2.4.1 or ~> 2.5", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.12", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.7", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.19 or ~> 1.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:tds, "~> 2.1.1 or ~> 2.2", [hex: :tds, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "dc9e4d206f274f3947e96142a8fdc5f69a2a6a9abb4649ef5c882323b6d512f0"}, "ets_select": {:hex, :ets_select, "0.1.3", "6ddb60480d8fadb1949d8ac9a06feb95750993adadceb19276bc6fd588326795", [:mix], [], "hexpm", "d2006673d24023a4c97baa56b0116d4dd1c73d43024c03b889aa1370634bb1ef"}, "file_system": {:hex, :file_system, "1.0.1", "79e8ceaddb0416f8b8cd02a0127bdbababe7bf4a23d2a395b983c1f8b3f73edd", [:mix], [], "hexpm", "4414d1f38863ddf9120720cd976fce5bdde8e91d8283353f0e31850fa89feb9e"}, + "gen_state_machine": {:hex, :gen_state_machine, "3.0.0", "1e57f86a494e5c6b14137ebef26a7eb342b3b0070c7135f2d6768ed3f6b6cdff", [:mix], [], "hexpm", "0a59652574bebceb7309f6b749d2a41b45fdeda8dbb4da0791e355dd19f0ed15"}, "glob_ex": {:hex, :glob_ex, "0.1.8", "f7ef872877ca2ae7a792ab1f9ff73d9c16bf46ecb028603a8a3c5283016adc07", [:mix], [], "hexpm", "9e39d01729419a60a937c9260a43981440c43aa4cadd1fa6672fecd58241c464"}, "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, "json_serde": {:git, "https://github.com/maxohq/json_serde.git", "4cee6a51a8ff04be3c10c7924d9d0588187d56d9", []}, From fd9e9b9f01a28b912ad2ecf36fc49447d0c3a9fd Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Mon, 2 Sep 2024 19:08:27 +0200 Subject: [PATCH 09/40] Feat: fetch DB row for casts on init --- lib/casts/cast_runner.ex | 24 +++++++++++++++++++++++- lib/casts/cast_runner_test.exs | 5 +++-- lib/server_test.exs | 2 +- 3 files changed, 27 insertions(+), 4 deletions(-) diff --git a/lib/casts/cast_runner.ex b/lib/casts/cast_runner.ex index 619c9e9..39fd5da 100644 --- a/lib/casts/cast_runner.ex +++ b/lib/casts/cast_runner.ex @@ -18,8 +18,10 @@ defmodule Essig.Casts.CastRunner do def init(args) do module = Keyword.fetch!(args, :module) apply(module, :bootstrap, []) - init_data = %{key: module, seq: 0, max_id: 0, module: module} + {:ok, row} = fetch_from_db_or_init(module) + init_data = %{key: module, seq: row.seq, max_id: row.max_id, module: module} Essig.Casts.MetaTable.set(module, init_data) + init_data = Map.put(init_data, :row, row) {:ok, init_data} end @@ -46,4 +48,24 @@ defmodule Essig.Casts.CastRunner do Essig.Casts.MetaTable.update(state.module, %{seq: new_seq, max_id: new_max_id}) %{state | seq: new_seq, max_id: new_max_id} end + + defp fetch_from_db_or_init(module) do + case Essig.Crud.CastsCrud.get_cast_by_module(module) do + nil -> + scope_uuid = Essig.Context.current_scope() + + payload = %{ + scope_uuid: scope_uuid, + module: Atom.to_string(module), + seq: 0, + max_id: 0, + setup_done: false + } + + {:ok, _row} = Essig.Crud.CastsCrud.create_cast(payload) + + row -> + {:ok, row} + end + end end diff --git a/lib/casts/cast_runner_test.exs b/lib/casts/cast_runner_test.exs index 72aa790..17ebf8f 100644 --- a/lib/casts/cast_runner_test.exs +++ b/lib/casts/cast_runner_test.exs @@ -1,10 +1,11 @@ defmodule Essig.Casts.CastRunnerTest do - use ExUnit.Case, async: true + use Essig.DataCase alias Essig.Casts.CastRunner alias Essig.Casts.MetaTable setup %{test: test_name} do - Essig.Server.start_scope(test_name) + scope_uuid = Essig.UUID7.generate() + Essig.Server.start_scope(scope_uuid) :ok end diff --git a/lib/server_test.exs b/lib/server_test.exs index 8dc9b35..1920eed 100644 --- a/lib/server_test.exs +++ b/lib/server_test.exs @@ -1,5 +1,5 @@ defmodule Essig.ServerTest do - use ExUnit.Case, async: true + use Essig.DataCase import Liveness describe "full_run" do From 71cbc579302eed42ab6604248f0e1f9fb142057e Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Mon, 2 Sep 2024 19:14:55 +0200 Subject: [PATCH 10/40] Feat: update db row for casts for each update --- lib/casts/cast_runner.ex | 18 ++++++++++++++---- lib/casts/cast_runner_test.exs | 10 +++++----- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/lib/casts/cast_runner.ex b/lib/casts/cast_runner.ex index 39fd5da..98b911b 100644 --- a/lib/casts/cast_runner.ex +++ b/lib/casts/cast_runner.ex @@ -1,6 +1,8 @@ defmodule Essig.Casts.CastRunner do use GenServer + defstruct key: nil, seq: nil, max_id: nil, module: nil, row: nil + ##### PUBLIC API def send_events(module, events) do @@ -19,10 +21,10 @@ defmodule Essig.Casts.CastRunner do module = Keyword.fetch!(args, :module) apply(module, :bootstrap, []) {:ok, row} = fetch_from_db_or_init(module) - init_data = %{key: module, seq: row.seq, max_id: row.max_id, module: module} - Essig.Casts.MetaTable.set(module, init_data) - init_data = Map.put(init_data, :row, row) - {:ok, init_data} + meta_data = %__MODULE__{key: module, seq: row.seq, max_id: row.max_id, module: module} + Essig.Casts.MetaTable.set(module, meta_data) + state = Map.put(meta_data, :row, row) + {:ok, state} end defp via_tuple(module) do @@ -33,6 +35,7 @@ defmodule Essig.Casts.CastRunner do module = Map.fetch!(state, :key) {:ok, res, state} = apply(module, :handle_events, [state, events]) state = update_seq_and_max_id(state, events) + state = update_db(state) {:reply, {res, state}, state} end @@ -49,6 +52,13 @@ defmodule Essig.Casts.CastRunner do %{state | seq: new_seq, max_id: new_max_id} end + defp update_db(state) do + {:ok, row} = + Essig.Crud.CastsCrud.update_cast(state.row, %{seq: state.seq, max_id: state.max_id}) + + Map.put(state, :row, row) + end + defp fetch_from_db_or_init(module) do case Essig.Crud.CastsCrud.get_cast_by_module(module) do nil -> diff --git a/lib/casts/cast_runner_test.exs b/lib/casts/cast_runner_test.exs index 17ebf8f..d7a1834 100644 --- a/lib/casts/cast_runner_test.exs +++ b/lib/casts/cast_runner_test.exs @@ -28,14 +28,14 @@ defmodule Essig.Casts.CastRunnerTest do CastRunner.send_events(SampleCast2, events) # Assert that the events were processed by the respective CastRunners - assert MetaTable.get(SampleCast1) == %{ + assert MetaTable.get(SampleCast1) == %CastRunner{ key: SampleCast1, module: SampleCast1, max_id: 100, seq: 2 } - assert MetaTable.get(SampleCast2) == %{ + assert MetaTable.get(SampleCast2) == %CastRunner{ key: SampleCast2, module: SampleCast2, max_id: 100, @@ -57,15 +57,15 @@ defmodule Essig.Casts.CastRunnerTest do CastRunner.send_events(SampleCast2, events) CastRunner.send_events(SampleCast2, events) - # # Assert the metadata for each CastRunner - assert MetaTable.get(SampleCast1) == %{ + # Assert the metadata for each CastRunner + assert MetaTable.get(SampleCast1) == %CastRunner{ key: SampleCast1, module: SampleCast1, max_id: 100, seq: 2 } - assert MetaTable.get(SampleCast2) == %{ + assert MetaTable.get(SampleCast2) == %CastRunner{ key: SampleCast2, module: SampleCast2, max_id: 100, From 4741e8bb33ea566d3b5329c4c0576a0d275038c2 Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Tue, 3 Sep 2024 23:19:09 +0200 Subject: [PATCH 11/40] Chore: remove unused test_name from setup callback --- lib/casts/cast_runner_test.exs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/casts/cast_runner_test.exs b/lib/casts/cast_runner_test.exs index d7a1834..d5e7279 100644 --- a/lib/casts/cast_runner_test.exs +++ b/lib/casts/cast_runner_test.exs @@ -3,7 +3,7 @@ defmodule Essig.Casts.CastRunnerTest do alias Essig.Casts.CastRunner alias Essig.Casts.MetaTable - setup %{test: test_name} do + setup do scope_uuid = Essig.UUID7.generate() Essig.Server.start_scope(scope_uuid) :ok From e4227ecf6ba89a6d77f03421ddbecb425d2c8b81 Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Tue, 3 Sep 2024 23:46:42 +0200 Subject: [PATCH 12/40] Feat: ensure sequential ES inserts only by applying locks. Problem: - It seems to be quite non-trivial to process events that are causally dependent. - We can get sequential IDs, where records with lower IDs can become visible later than records with higher IDs - We can get transaction IDs, that where lower values become visible later - To workaround those issues we would need to write some tricky code and we need to maintain it and write tests for it Solution: - after some contemplation I decided to go for a stupid workaround - we only allow sequential inserts to the EventStore (events) across all Elixir processes - that way the IDs must be only increasing and there should be no interleaving - that way the logic to deal with new events becomes trivial, since we should never see newer events with lower IDs Technical realization: - we use Postgres `pg_try_advisory_lock` and `pg_advisory_unlock` - this allows coordination across all Elixir OS processes of our app, that use the same DB at the same time. - also the locks respect the current Context scope, so that different scopes DO not conflict with each other. - to allow consistent lock unlocking, we need to run the unlocking with the same Postgres DB connection, as the locking - normal Ecto Repo does not enable this functionality - as a workaround we configure SandRepo, that has the sole purpose of requiring and releasing PG locks - event appending to our Event store is only possible through a locked function --- lib/essig/application.ex | 1 + lib/event_store/append_to_stream.ex | 9 +++++++ lib/pg_lock.ex | 35 ++++++++++++++++++++++++++ lib/sand_repo.ex | 39 +++++++++++++++++++++++++++++ 4 files changed, 84 insertions(+) create mode 100644 lib/pg_lock.ex create mode 100644 lib/sand_repo.ex diff --git a/lib/essig/application.ex b/lib/essig/application.ex index 2bcd259..d5dbe31 100644 --- a/lib/essig/application.ex +++ b/lib/essig/application.ex @@ -7,6 +7,7 @@ defmodule Essig.Application do def start(_type, _args) do children = [ Essig.Repo, + Essig.SandRepo, {Phoenix.PubSub, name: Essig.PubSub}, {Registry, keys: :unique, name: Essig.Scopes.Registry}, Essig.Scopes.DynamicSupervisor diff --git a/lib/event_store/append_to_stream.ex b/lib/event_store/append_to_stream.ex index 78c7021..5e5759c 100644 --- a/lib/event_store/append_to_stream.ex +++ b/lib/event_store/append_to_stream.ex @@ -2,6 +2,15 @@ defmodule Essig.EventStore.AppendToStream do use Essig.Repo def run(stream_uuid, stream_type, expected_seq, events) do + # To ensure sequential inserts only, we use locking. + # The likelihood of this triggering in production is low, but still possible. + # Locks are across all OS processes, since we use Postgres for this. + Essig.PGLock.with_lock("es-insert", fn -> + run_unprotected(stream_uuid, stream_type, expected_seq, events) + end) + end + + defp run_unprotected(stream_uuid, stream_type, expected_seq, events) do multi(stream_uuid, stream_type, expected_seq, events) |> Repo.transaction() end diff --git a/lib/pg_lock.ex b/lib/pg_lock.ex new file mode 100644 index 0000000..859f5fc --- /dev/null +++ b/lib/pg_lock.ex @@ -0,0 +1,35 @@ +defmodule Essig.PGLock do + @moduledoc """ + A simple wrapper around pg_try_advisory_lock and pg_advisory_unlock. + To get consistent PG connection, it uses SandRepo (which is configured with a Sandbox as pool) + + This makes it possible to use the same connection for locking and releasing the lock! + Because releasing the lock on a different connection than locking it will fail. + This is the best workaround I could come up with. + """ + use Essig.SandRepo + + def with_lock(kind, fun) do + lock_key = :erlang.phash2("#{kind}-#{Essig.Context.current_scope()}") + + case get_lock(lock_key) do + {:ok, %{rows: [[true]]}} -> + try do + fun.() + after + release_lock(lock_key) + end + + _ -> + {:error, :locked} + end + end + + def get_lock(key) do + Ecto.Adapters.SQL.query(SandRepo, "SELECT pg_try_advisory_lock($1)", [key], []) + end + + def release_lock(key) do + Ecto.Adapters.SQL.query(SandRepo, "SELECT pg_advisory_unlock($1)", [key], []) + end +end diff --git a/lib/sand_repo.ex b/lib/sand_repo.ex new file mode 100644 index 0000000..f6f6e39 --- /dev/null +++ b/lib/sand_repo.ex @@ -0,0 +1,39 @@ +defmodule Essig.SandRepo do + @moduledoc """ + This is special SandRepo, that allows checking out a single connection. + We use this when getting a DB advisory lock and releasing it afterwards. + This is not possible with the standard Ecto.Repo outside of a transaction. + + To keep the configuration overhead low, we use dynamic config (init -callback) and + copy the main config for the Essig.Repo with a few tweaks. + This means the DB config stays unchanged. + """ + + use Ecto.Repo, + otp_app: :essig, + adapter: Ecto.Adapters.Postgres + + use EctoCursorBasedStream + + @impl true + def init(_type, _config) do + special_config = [ + telemetry_prefix: [:essig, :sand_repo], + pool: Ecto.Adapters.SQL.Sandbox + ] + + main_config = Application.get_env(:essig, Essig.Repo) + config = Keyword.merge(main_config, special_config) + + {:ok, config} + end + + defmacro __using__(_) do + quote do + alias Essig.SandRepo + require Ecto.Query + import Ecto.Query + import Ecto.Changeset + end + end +end From 1182bc7253f7c8d998c98daf48d5d8686c693a51 Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Tue, 3 Sep 2024 23:47:23 +0200 Subject: [PATCH 13/40] Chore: add some useful snippets --- snippets/experiment.txt | 37 ++++++++++++++++ snippets/gen_statem.txt | 51 +++++++++++++++++++++++ snippets/init_casts.txt | 22 ++++++++++ snippets/postgres_logical_replication.txt | 30 +++++++++++++ snippets/shortishly-articles.txt | 25 +++++++++++ snippets/traffic-light.txt | 36 ++++++++++++++++ 6 files changed, 201 insertions(+) create mode 100644 snippets/experiment.txt create mode 100644 snippets/gen_statem.txt create mode 100644 snippets/init_casts.txt create mode 100644 snippets/postgres_logical_replication.txt create mode 100644 snippets/shortishly-articles.txt create mode 100644 snippets/traffic-light.txt diff --git a/snippets/experiment.txt b/snippets/experiment.txt new file mode 100644 index 0000000..0e80e0b --- /dev/null +++ b/snippets/experiment.txt @@ -0,0 +1,37 @@ +DROP TABLE IF EXISTS rows; + +CREATE TABLE rows ( + id SERIAL PRIMARY KEY, + name varchar, + xid bigserial NOT NULL, + snapmin bigserial NOT NULL, + timestamp timestamp default current_timestamp +); + +CREATE OR REPLACE FUNCTION pgx_add_xid() + RETURNS TRIGGER AS $$ + BEGIN + NEW.xid := pg_current_xact_id(); + NEW.snapmin := pg_snapshot_xmin(pg_current_snapshot()); + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + + + +CREATE OR REPLACE TRIGGER pgx_add_xid_before_insert_update + BEFORE INSERT OR UPDATE ON rows + FOR EACH ROW + EXECUTE FUNCTION pgx_add_xid(); + + +-- run this in 2 psql shells, interleaving the inserts. this helps to see the xid vs snapmin relationship -- + +begin; +insert into rows (name) values ('1'); +insert into rows (name) values ('2'); +insert into rows (name) values ('3'); + +commit; + +select * from rows order by id desc; diff --git a/snippets/gen_statem.txt b/snippets/gen_statem.txt new file mode 100644 index 0000000..d9bc789 --- /dev/null +++ b/snippets/gen_statem.txt @@ -0,0 +1,51 @@ +### Videos + +- [State (Machine) Of Enlightenment - 01.09.2024](https://www.youtube.com/watch?v=5ym6va__LW8) +- [gen_statem Unveiled: A Theoretical Exploration of State Machines - FOSDEM 2024](https://fosdem.org/2024/schedule/event/fosdem-2024-2130-genstatem-unveiled-a-theoretical-exploration-of-state-machines/) + - [Death by Accidental Complexity by Ulf Wiger](https://www.infoq.com/presentations/Death-by-Accidental-Complexity/) + - https://dm3.github.io/2010/08/01/death-by-accidental-complexity.html + - https://github.com/uwiger/plain_fsm + +- [Pretty state machine - 2019](https://codesync.global/media/pretty-state-machine/) + +- [Lonestar ElixirConf 2018 - Managing state in distributed Elixir - Jerel Unruh](https://www.youtube.com/watch?v=V3iBgStaPmA&t=1131s) + - from 18:45 +- [Andrea Leopardi [GigCityElixir24] The World is a Network](https://youtu.be/9UFeQ11soQg?t=1641) +- [Szymon Świerk - Building event-driven state machines with gen_statem | Elixir Community Krakow](https://www.youtube.com/watch?v=ehZoWwMjWBw&t=137s) +- [Get more out of OTP with GenStateMachine | Erlang Solutions Webinar - 2020](https://www.youtube.com/watch?v=NW2b6lBuBas&t=3113s) +- [Raimo Niskanen - gen_statem - The Tool You Never Knew You Always Wanted - Code BEAM SF 2018](https://www.youtube.com/watch?v=f_jl6MR3kXQ&t=1514s) + +Articles: + +- https://www.erlang.org/doc/system/statem.html (design docs) +- [gen_statem in context - 2024](https://moosie.us/gen_statem_in_context) +- https://andrealeopardi.com/posts/connection-managers-with-gen-statem/ +- https://github.com/antoinereyt/gen_statem_meetup?tab=readme-ov-file +- https://2024-06-06.adoptingerlang.org/docs/cheat_sheets/ + - gen_statem cheat sheet +- https://meraj-gearhead.ca/state-machine-in-elixir-using-erlangs-genstatem-behaviour +- https://dockyard.com/blog/2020/01/31/state-timeouts-with-gen_statem +- https://slides.com/jprem/state-machines-in-elixir-with-gen_statem#/1/4 +- https://shortishly.com/blog/free-tracing-and-event-logging-with-sys/ + - free debugging and logging with sys +- https://erlangforums.com/t/pgmp-postgresql-client-with-logical-replication-to-ets/1707/17 + - great description of how to use gen_statem to handle logical replication events + - https://github.com/pgmp/pgmp_client/blob/master/lib/pgmp_client/replication_connection.ex + +- Shortishly ARTCILES: + [[shortishly-articles.txt]] + + +The gen_statem package, which is part of OTP, offers several special features: + + 1. State-based event handling: Events are handled differently based on the current state. + 2. Flexible state representation: States can be atoms, tuples, or any Erlang term. + 3. State data: Each state can have associated data. + 4. Timeout events: You can set timeouts to automatically trigger state transitions. + 5. State enter calls: Special callbacks can be defined for when a state is entered. + 6. Postponing events: Events can be postponed and handled later in a different state. + 7. State timeouts: Timeouts can be set specifically for each state. + 8. Generic time events: You can set events to occur at specific times. + + + diff --git a/snippets/init_casts.txt b/snippets/init_casts.txt new file mode 100644 index 0000000..8091954 --- /dev/null +++ b/snippets/init_casts.txt @@ -0,0 +1,22 @@ +scope_uuid = Essig.UUID7.generate() + +Essig.Server.start_scope(scope_uuid) +Essig.Server.start_casts([SampleCast1]) + +stream_uuid = Essig.UUID7.generate() + +{:ok, %{events: events}} = Essig.EventStore.append_to_stream(stream_uuid, "trp", 0, [ + %Sample.TestReports.Events.MasterReportAdded{path: "local/path/to", report: "report 1"}, + %Sample.TestReports.Events.MasterReportAdded{path: "local/path/to", report: "report 2"}, + %Sample.TestReports.Events.MasterReportAdded{path: "local/path/to", report: "report 3"}, +]) + +Essig.Casts.CastRunner.send_events(SampleCast1, events) + + +{:ok, %{events: events}} = Essig.EventStore.append_to_stream(stream_uuid, "trp", 3, [ + %Sample.TestReports.Events.MasterReportAdded{path: "local/path/to", report: "report 1"}, + %Sample.TestReports.Events.MasterReportAdded{path: "local/path/to", report: "report 2"}, + %Sample.TestReports.Events.MasterReportAdded{path: "local/path/to", report: "report 3"}, +]) +Essig.Casts.CastRunner.send_events(SampleCast1, events) diff --git a/snippets/postgres_logical_replication.txt b/snippets/postgres_logical_replication.txt new file mode 100644 index 0000000..6c072b9 --- /dev/null +++ b/snippets/postgres_logical_replication.txt @@ -0,0 +1,30 @@ + + +Practical Notes in Change Data Capture with Debezium and Postgres +- https://medium.com/cermati-tech/practical-notes-in-change-data-capture-with-debezium-and-postgres-fe31bb11ab78 +- - https://debezium.io/blog/2021/10/07/incremental-snapshots/ (!!!) + ### Watermark-based Snapshots + In late 2019, the Netflix engineering team announced that they had developed an in-house change data capture framework. They also came up with an innovative solution of executing concurrent snapshots using watermarking, described in the paper DBLog: A Watermark Based Change-Data-Capture Framework by Andreas Andreakis and Ioannis Papapanagiotou. + The main idea behind this approach is that change data streaming is executed continuously together with snapshotting. The framework inserts low and high watermarks into the transaction log (by writing to the source database) and between those two points, a part of the snapshotted table is read. The framework keeps a record of database changes in between the watermarks and reconciles them with the snapshotted values, if the same records are snapshotted and modified during the window. + This means that the data is snapshotted in chunks - no lengthy process at the connector start, and also in case of crashes or a controlled termination of the connector, the snapshotting can be resumed since the last completed chunk. + As per Netflix, the implementation is provided for MySQL and PostgreSQL databases. + + + ### Signalling Table + Before moving to Debezium’s implementation of the watermark-based snapshotting approach, a small detour is needed. + Sometimes it can be useful to control Debezium from the outside, so to force it to execute some requested action. + Let’s suppose it is necessary to re-snapshot an already snapshotted table - a so-called ad-hoc snapshot. + The user would need to send a command to Debezium to pause the current operation and do the snapshot. + For that purpose, Debezium defines the concept signals, issued via a signalling table. + This is a special table, designated for communication between the user and Debezium. + Debezium captures the table and when the user requires a certain operation to be executed, + they simply write a record to the signalling table (sending a signal). + Debezium will receive the captured change and then execute the required action. + + + +- DBLog: A Watermark Based Change-Data-Capture Framework + - https://arxiv.org/pdf/2010.12597v1 + - https://github.com/abhishek-ch/around-dataengineering/blob/master/docs/dblog_netflix_cdc.md + - https://netflixtechblog.com/dblog-a-generic-change-data-capture-framework-69351fb9099b - DBLog: A Generic Change-Data-Capture Framework + - https://medium.com/brexeng/change-data-capture-at-brex-c71263616dd7 - Change Data Capture at Brex diff --git a/snippets/shortishly-articles.txt b/snippets/shortishly-articles.txt new file mode 100644 index 0000000..cfafa5f --- /dev/null +++ b/snippets/shortishly-articles.txt @@ -0,0 +1,25 @@ + + +- https://shortishly.com/blog/cache-consistency-with-streaming-replication/ + - Cache consistency with logical streaming replication + + - https://shortishly.com/blog/pgmp-log-rep-postgresql-fifteen/ + - PGMP Logical Replication in PostgreSQL 15 + + - https://shortishly.com/blog/pgec-read-write-notify/ + - pgec reads: memory replicated cache, writes: PostgreSQL with a Redis API + - https://github.com/shortishly/pgec + + - https://shortishly.com/blog/postgresql-edge-cache/ + - pgec is a real-time in memory database replication cache, with a memcached and REST API + + - https://shortishly.com/blog/mysql-replication-redis-api/ + msec is a disk cache with a Redis compatible API using MySQL/MariaDB replication to remain consistent. Inserts, updates or deletes in the database are replicated in real time and persisted to a local store. + + - https://shortishly.com/blog/property-testing-a-database-driver/ + + + + + - https://erlangforums.com/t/pgmp-postgresql-client-with-logical-replication-to-ets/1707/17 + - great description of how to use gen_statem to handle logical replication events \ No newline at end of file diff --git a/snippets/traffic-light.txt b/snippets/traffic-light.txt new file mode 100644 index 0000000..2476375 --- /dev/null +++ b/snippets/traffic-light.txt @@ -0,0 +1,36 @@ +defmodule TrafficLight do + use GenStateMachine + + def start_link do + GenStateMachine.start_link(__MODULE__, :red, name: __MODULE__) + end + + # Add this init function + def init(:red) do + {:ok, :red, nil} + end + + def change do + GenStateMachine.cast(__MODULE__, :change) + end + + def state do + GenStateMachine.call(__MODULE__, :get_state) + end + + def handle_event(:cast, :change, :red, _data) do + {:next_state, :green, nil} + end + + def handle_event(:cast, :change, :green, _data) do + {:next_state, :yellow, nil} + end + + def handle_event(:cast, :change, :yellow, _data) do + {:next_state, :red, nil} + end + + def handle_event({:call, from}, :get_state, state, _data) do + {:keep_state_and_data, [{:reply, from, state}]} + end +end From 214ffdabb0683e6aa0ebe1f47fec551117179d7e Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Wed, 4 Sep 2024 00:36:43 +0200 Subject: [PATCH 14/40] Feat: just use a Repo with pool=1 for PG locks Problem: - the sandbox config has some test-specific params, like timing out, cleaning up the conn, etc. - this prevents proper usage Solution: - we just stick to a normal repo with a pool of size=1, that way we always get the same connection - this is only use for locking, so there should be no performance issues --- lib/essig/application.ex | 2 +- lib/pg_lock.ex | 15 +++++++++++---- lib/{sand_repo.ex => repo_single_conn.ex} | 10 +++++----- 3 files changed, 17 insertions(+), 10 deletions(-) rename lib/{sand_repo.ex => repo_single_conn.ex} (81%) diff --git a/lib/essig/application.ex b/lib/essig/application.ex index d5dbe31..aae19e5 100644 --- a/lib/essig/application.ex +++ b/lib/essig/application.ex @@ -7,7 +7,7 @@ defmodule Essig.Application do def start(_type, _args) do children = [ Essig.Repo, - Essig.SandRepo, + Essig.RepoSingleConn, {Phoenix.PubSub, name: Essig.PubSub}, {Registry, keys: :unique, name: Essig.Scopes.Registry}, Essig.Scopes.DynamicSupervisor diff --git a/lib/pg_lock.ex b/lib/pg_lock.ex index 859f5fc..c81df23 100644 --- a/lib/pg_lock.ex +++ b/lib/pg_lock.ex @@ -1,13 +1,20 @@ defmodule Essig.PGLock do @moduledoc """ A simple wrapper around pg_try_advisory_lock and pg_advisory_unlock. - To get consistent PG connection, it uses SandRepo (which is configured with a Sandbox as pool) + To get consistent PG connection, it uses second Repo with pool_size=1 This makes it possible to use the same connection for locking and releasing the lock! Because releasing the lock on a different connection than locking it will fail. This is the best workaround I could come up with. + + + - Check locks with: + + ```sql + SELECT locktype, transactionid, virtualtransaction, mode FROM pg_locks; + ``` """ - use Essig.SandRepo + use Essig.RepoSingleConn def with_lock(kind, fun) do lock_key = :erlang.phash2("#{kind}-#{Essig.Context.current_scope()}") @@ -26,10 +33,10 @@ defmodule Essig.PGLock do end def get_lock(key) do - Ecto.Adapters.SQL.query(SandRepo, "SELECT pg_try_advisory_lock($1)", [key], []) + Ecto.Adapters.SQL.query(RepoSingleConn, "SELECT pg_try_advisory_lock($1)", [key], []) end def release_lock(key) do - Ecto.Adapters.SQL.query(SandRepo, "SELECT pg_advisory_unlock($1)", [key], []) + Ecto.Adapters.SQL.query(RepoSingleConn, "SELECT pg_advisory_unlock($1)", [key], []) end end diff --git a/lib/sand_repo.ex b/lib/repo_single_conn.ex similarity index 81% rename from lib/sand_repo.ex rename to lib/repo_single_conn.ex index f6f6e39..5e8fd30 100644 --- a/lib/sand_repo.ex +++ b/lib/repo_single_conn.ex @@ -1,10 +1,10 @@ -defmodule Essig.SandRepo do +defmodule Essig.RepoSingleConn do @moduledoc """ - This is special SandRepo, that allows checking out a single connection. + This is special RepoSingleConn, with a single connection. We use this when getting a DB advisory lock and releasing it afterwards. This is not possible with the standard Ecto.Repo outside of a transaction. - To keep the configuration overhead low, we use dynamic config (init -callback) and + To keep the configuration overhead low, we use dynamic config (init-callback) and copy the main config for the Essig.Repo with a few tweaks. This means the DB config stays unchanged. """ @@ -19,7 +19,7 @@ defmodule Essig.SandRepo do def init(_type, _config) do special_config = [ telemetry_prefix: [:essig, :sand_repo], - pool: Ecto.Adapters.SQL.Sandbox + pool_size: 1 ] main_config = Application.get_env(:essig, Essig.Repo) @@ -30,7 +30,7 @@ defmodule Essig.SandRepo do defmacro __using__(_) do quote do - alias Essig.SandRepo + alias Essig.RepoSingleConn require Ecto.Query import Ecto.Query import Ecto.Changeset From 88f08f4ab825f856fd383782e53efd657911fe81 Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Wed, 4 Sep 2024 11:12:35 +0200 Subject: [PATCH 15/40] Chore: wording --- lib/pg_lock.ex | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/lib/pg_lock.ex b/lib/pg_lock.ex index c81df23..8c87d41 100644 --- a/lib/pg_lock.ex +++ b/lib/pg_lock.ex @@ -1,10 +1,9 @@ defmodule Essig.PGLock do @moduledoc """ A simple wrapper around pg_try_advisory_lock and pg_advisory_unlock. - To get consistent PG connection, it uses second Repo with pool_size=1 - + To get a consistent PG connection, it uses a second Repo with pool_size=1. This makes it possible to use the same connection for locking and releasing the lock! - Because releasing the lock on a different connection than locking it will fail. + Because releasing the lock on a different connection than the one it was created wont work. This is the best workaround I could come up with. @@ -17,7 +16,7 @@ defmodule Essig.PGLock do use Essig.RepoSingleConn def with_lock(kind, fun) do - lock_key = :erlang.phash2("#{kind}-#{Essig.Context.current_scope()}") + lock_key = :erlang.phash2([Essig.Context.current_scope(), kind]) case get_lock(lock_key) do {:ok, %{rows: [[true]]}} -> From 9e97a87f993473230c0adbfddfd49236981972c8 Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Wed, 4 Sep 2024 13:16:35 +0200 Subject: [PATCH 16/40] Feat: wrapper for pubsub --- lib/pubsub.ex | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100644 lib/pubsub.ex diff --git a/lib/pubsub.ex b/lib/pubsub.ex new file mode 100644 index 0000000..673fb58 --- /dev/null +++ b/lib/pubsub.ex @@ -0,0 +1,13 @@ +defmodule Essig.Pubsub do + def broadcast(topic, message) do + Phoenix.PubSub.broadcast(Essig.PubSub, topic, message) + end + + def subscribe(topic) do + Phoenix.PubSub.subscribe(Essig.PubSub, topic) + end + + def unsubscribe(topic) do + Phoenix.PubSub.unsubscribe(Essig.PubSub, topic) + end +end From 9a5da73fad6272b1af6521d9aa1859f199d5feae Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Wed, 4 Sep 2024 13:20:20 +0200 Subject: [PATCH 17/40] Feat: add xid (transaction id) and snapmin (min logical replication snapshot) to events Problem: - we might need to extend features for our event store - to reduce chatiness (notification per event), we need to store the current transaction id Solution: - we add current PG transaction id + min current LSN to every document via triggers - based on the example here - https://github.com/josevalim/sync/blob/main/priv/repo/migrations/20240806131210_create_publication.exs --- lib/migrations/migration002.ex | 30 ++++++++++++++++++++++++++++++ lib/schemas/event.ex | 4 ++++ 2 files changed, 34 insertions(+) create mode 100644 lib/migrations/migration002.ex diff --git a/lib/migrations/migration002.ex b/lib/migrations/migration002.ex new file mode 100644 index 0000000..6d0d19e --- /dev/null +++ b/lib/migrations/migration002.ex @@ -0,0 +1,30 @@ +defmodule Migrations.Migration002 do + use Ecto.Migration + + def change do + alter table(:essig_events) do + add(:_xid, :bigserial) + add(:_snapmin, :bigserial) + end + + execute " + CREATE OR REPLACE FUNCTION essig_add_xid_snapmin() + RETURNS TRIGGER AS $$ + BEGIN + NEW._xid := pg_current_xact_id(); + NEW._snapmin := pg_snapshot_xmin(pg_current_snapshot()); + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + ", + "DROP FUNCTION essig_add_xid_snapmin()" + + execute " + CREATE OR REPLACE TRIGGER essig_add_xid_to_events + BEFORE INSERT OR UPDATE ON essig_events + FOR EACH ROW + EXECUTE FUNCTION essig_add_xid_snapmin(); + ", + "DROP TRIGGER essig_add_xid_to_events;" + end +end diff --git a/lib/schemas/event.ex b/lib/schemas/event.ex index 4e7032a..3433af4 100644 --- a/lib/schemas/event.ex +++ b/lib/schemas/event.ex @@ -15,6 +15,10 @@ defmodule Essig.Schemas.Event do field(:seq, :integer) + ## transaction metadata + field(:_xid, :integer, read_after_writes: true) + field(:_snapmin, :integer, read_after_writes: true) + # no updated_at! timestamps(type: :utc_datetime_usec, updated_at: false) end From 745eeb9172308122f0d159e953526ea07c007c5e Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Wed, 4 Sep 2024 13:32:29 +0200 Subject: [PATCH 18/40] Feat: implementation for DB based notification system for new events Problem: - we somehow need to make sure, that our app notices new inserted events - a local Pubsub would miss out on inserts from other app instances (iex shells / etc) - also we do not want to overload our system by sending large messages (many events in a single message) Solution: - we add a signals tables (somewhat inspired by Debezium - https://debezium.io/blog/2023/06/27/Debezium-signaling-and-notifications/) - after each transaction on events table we also insert a small row into signals table with the current scope_uuid - it has configured triggers, that execute a `pg_notify` function with small message - in our app we start PGNotifyListener, whose purpose is to re-broadcast those pg_notify messages to the local PubSub system - that way our app still deals with Phx PubSub, only those messages originate from DB and work across OS process boundaries without a distributed Erlang cluster --- lib/essig/application.ex | 1 + lib/event_store.ex | 12 ---- lib/event_store/append_to_stream.ex | 13 ++++ lib/helpers/map.ex | 97 +++++++++++++++++++++++++++++ lib/migrations/all.ex | 4 +- lib/migrations/migration003.ex | 50 +++++++++++++++ lib/pg_notify_listener.ex | 33 ++++++++++ 7 files changed, 197 insertions(+), 13 deletions(-) create mode 100644 lib/helpers/map.ex create mode 100644 lib/migrations/migration003.ex create mode 100644 lib/pg_notify_listener.ex diff --git a/lib/essig/application.ex b/lib/essig/application.ex index aae19e5..2b7bb5b 100644 --- a/lib/essig/application.ex +++ b/lib/essig/application.ex @@ -9,6 +9,7 @@ defmodule Essig.Application do Essig.Repo, Essig.RepoSingleConn, {Phoenix.PubSub, name: Essig.PubSub}, + Essig.PGNotifyListener, {Registry, keys: :unique, name: Essig.Scopes.Registry}, Essig.Scopes.DynamicSupervisor ] diff --git a/lib/event_store.ex b/lib/event_store.ex index 5f2f17b..ecb5d5a 100644 --- a/lib/event_store.ex +++ b/lib/event_store.ex @@ -6,18 +6,6 @@ defmodule Essig.EventStore do Essig.EventStore.AppendToStream.run(stream_uuid, stream_type, expected_seq, events) do events = res.insert_events stream = res.update_seq - - scope_uuid = Essig.Context.current_scope() - - # Broadcast the events - Enum.each(events, fn event -> - Phoenix.PubSub.broadcast( - Essig.PubSub, - "events:#{scope_uuid}", - {:new_event, event} - ) - end) - {:ok, %{stream: stream, events: events}} end end diff --git a/lib/event_store/append_to_stream.ex b/lib/event_store/append_to_stream.ex index 5e5759c..28136ee 100644 --- a/lib/event_store/append_to_stream.ex +++ b/lib/event_store/append_to_stream.ex @@ -33,6 +33,9 @@ defmodule Essig.EventStore.AppendToStream do last_event = Enum.at(insert_events, -1) Essig.Crud.StreamsCrud.update_stream(stream, %{seq: last_event.seq}) end) + |> Ecto.Multi.run(:signal_new_events, fn _repo, _ -> + signal_new_events() + end) end defp ensure_stream_exists(stream_uuid, stream_type) do @@ -94,4 +97,14 @@ defmodule Essig.EventStore.AppendToStream do events -> {:ok, Enum.reverse(events)} end end + + defp signal_new_events() do + scope_uuid = Essig.Context.current_scope() + bin_uuid = Ecto.UUID.dump!(scope_uuid) + + {:ok, _} = + Repo.query("insert into essig_signals(scope_uuid) values ($1)", [bin_uuid]) + + {:ok, true} + end end diff --git a/lib/helpers/map.ex b/lib/helpers/map.ex new file mode 100644 index 0000000..06130b8 --- /dev/null +++ b/lib/helpers/map.ex @@ -0,0 +1,97 @@ +defmodule Essig.Helpers.Map do + @moduledoc """ + Functions to transform maps + // https://gist.github.com/kipcole9/0bd4c6fb6109bfec9955f785087f53fb - Helpers for Elixir Maps: underscore, atomise and stringify map keys + + """ + + @doc """ + Convert map string camelCase keys to underscore_keys + """ + def underscore_keys(nil), do: nil + + def underscore_keys(map = %{}) do + map + |> Enum.map(fn {k, v} -> {Macro.underscore(k), underscore_keys(v)} end) + |> Enum.map(fn {k, v} -> {String.replace(k, "-", "_"), v} end) + |> Enum.into(%{}) + end + + # Walk the list and atomize the keys of + # of any map members + def underscore_keys([head | rest]) do + [underscore_keys(head) | underscore_keys(rest)] + end + + def underscore_keys(not_a_map) do + not_a_map + end + + @doc """ + Convert map string keys to :atom keys + """ + def atomize_keys(nil), do: nil + + # Structs don't do enumerable and anyway the keys are already + # atoms + def atomize_keys(struct = %{__struct__: _}) do + struct + end + + def atomize_keys(map = %{}) do + map + |> Enum.map(fn {k, v} -> {String.to_atom(k), atomize_keys(v)} end) + |> Enum.into(%{}) + end + + # Walk the list and atomize the keys of + # of any map members + def atomize_keys([head | rest]) do + [atomize_keys(head) | atomize_keys(rest)] + end + + def atomize_keys(not_a_map) do + not_a_map + end + + @doc """ + Convert map atom keys to strings + """ + def stringify_keys(nil), do: nil + + def stringify_keys(map = %{}) do + map + |> Enum.map(fn {k, v} -> {Atom.to_string(k), stringify_keys(v)} end) + |> Enum.into(%{}) + end + + # Walk the list and stringify the keys of + # of any map members + def stringify_keys([head | rest]) do + [stringify_keys(head) | stringify_keys(rest)] + end + + def stringify_keys(not_a_map) do + not_a_map + end + + @doc """ + Deep merge two maps + """ + def deep_merge(left, right) do + Map.merge(left, right, &deep_resolve/3) + end + + # Key exists in both maps, and both values are maps as well. + # These can be merged recursively. + defp deep_resolve(_key, left = %{}, right = %{}) do + deep_merge(left, right) + end + + # Key exists in both maps, but at least one of the values is + # NOT a map. We fall back to standard merge behavior, preferring + # the value on the right. + defp deep_resolve(_key, _left, right) do + right + end +end diff --git a/lib/migrations/all.ex b/lib/migrations/all.ex index 6f0fc6f..abebcaa 100644 --- a/lib/migrations/all.ex +++ b/lib/migrations/all.ex @@ -4,7 +4,9 @@ defmodule Migrations.All do """ def modules do [ - {20_240_824_120_000, Migrations.Migration001} + {2024_0824_120000, Migrations.Migration001}, + {2024_0904_112600, Migrations.Migration002}, + {2024_0904_114100, Migrations.Migration003} ] end end diff --git a/lib/migrations/migration003.ex b/lib/migrations/migration003.ex new file mode 100644 index 0000000..41a4e0b --- /dev/null +++ b/lib/migrations/migration003.ex @@ -0,0 +1,50 @@ +defmodule Migrations.Migration003 do + use Ecto.Migration + + def change do + create table(:essig_signals, primary_key: false) do + add(:id, :bigserial, primary_key: true) + add(:scope_uuid, :uuid, null: false, default: fragment("gen_random_uuid()")) + add(:_xid, :bigserial) + add(:_snapmin, :bigserial) + end + + execute " + -- Trigger on singals table, to notify on new transactions (events) via pg_notify + CREATE OR REPLACE TRIGGER essig_add_xid_to_signals + BEFORE INSERT OR UPDATE ON essig_signals + FOR EACH ROW + EXECUTE FUNCTION essig_add_xid_snapmin(); + ", + "DROP TRIGGER essig_add_xid_to_signals;" + + execute " + -- Function to notify on new transactions (events) via pg_notify + CREATE OR REPLACE FUNCTION notify_new_events() + RETURNS TRIGGER AS $$ + DECLARE + payload JSON; + BEGIN + payload := json_build_object( + 'scope_uuid', NEW.scope_uuid, + '_xid', NEW._xid, + '_snapmin', NEW._snapmin + ); + + PERFORM pg_notify('new_events', payload::TEXT); + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + ", + "DROP FUNCTION notify_new_events();" + + execute " + -- Trigger to notify on new transactions (events) via pg_notify + CREATE TRIGGER signals_notify_new_events + BEFORE INSERT ON essig_signals + FOR EACH ROW + EXECUTE PROCEDURE notify_new_events(); + ", + "DROP TRIGGER signals_notify_new_events;" + end +end diff --git a/lib/pg_notify_listener.ex b/lib/pg_notify_listener.ex new file mode 100644 index 0000000..826e712 --- /dev/null +++ b/lib/pg_notify_listener.ex @@ -0,0 +1,33 @@ +defmodule Essig.PGNotifyListener do + @moduledoc """ + Receives notifications from the database (signals table / new_events channel) + and rebroadcasts them to the `Phoenix.PubSub` system. + + This is a small payload once per transaction. + """ + use GenServer + + def start_link(_), do: GenServer.start_link(__MODULE__, []) + + def init(_arg) do + config = Essig.Repo.config() + config = Keyword.put(config, :auto_reconnect, true) + {:ok, pid} = Postgrex.Notifications.start_link(config) + Postgrex.Notifications.listen(pid, "new_events") + {:ok, []} + end + + def handle_info({:notification, _connection_pid, _ref, _channel, payload}, state) do + with {:ok, map} = Jason.decode(payload) do + rebroadcast(map) + end + + {:noreply, state} + end + + def rebroadcast(map) do + # "{\"scope_uuid\" : \"0191bca1-36d5-7235-8084-6d955e50f6dc\", \"_xid\" : 182310, \"_snapmin\" : 182310} + map = Essig.Helpers.Map.atomize_keys(map) + Essig.Pubsub.broadcast("new_events", {:new_events, map}) + end +end From f6bc05422213b33399f643701c07948fe5b5d852 Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Wed, 4 Sep 2024 13:33:52 +0200 Subject: [PATCH 19/40] Chore: A small checker module --- lib/checker.ex | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 lib/checker.ex diff --git a/lib/checker.ex b/lib/checker.ex new file mode 100644 index 0000000..1d90730 --- /dev/null +++ b/lib/checker.ex @@ -0,0 +1,23 @@ +defmodule Essig.Checker do + @moduledoc """ + A small dev-only module to test the event store. + """ + def run do + scope_uuid = Essig.UUID7.generate() + + Essig.Server.start_scope(scope_uuid) + Essig.Server.start_casts([SampleCast1]) + + stream_uuid = Essig.UUID7.generate() + + {:ok, %{events: events}} = + Essig.EventStore.append_to_stream(stream_uuid, "trp", 0, [ + %Sample.TestReports.Events.MasterReportAdded{path: "local/path/to", report: "report 1"}, + %Sample.TestReports.Events.MasterReportAdded{path: "local/path/to", report: "report 2"}, + %Sample.TestReports.Events.MasterReportAdded{path: "local/path/to", report: "report 3"} + ]) + + # this will be unnecessary soon + Essig.Casts.CastRunner.send_events(SampleCast1, events) + end +end From 8c05558672cddd2412a2007edf5aa5f7e2d7c897 Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Wed, 4 Sep 2024 13:44:36 +0200 Subject: [PATCH 20/40] Chore: move sql comments to be within function bodies, so they can be inspected later --- lib/migrations/migration002.ex | 3 +++ lib/migrations/migration003.ex | 4 +++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/lib/migrations/migration002.ex b/lib/migrations/migration002.ex index 6d0d19e..dd01001 100644 --- a/lib/migrations/migration002.ex +++ b/lib/migrations/migration002.ex @@ -11,6 +11,9 @@ defmodule Migrations.Migration002 do CREATE OR REPLACE FUNCTION essig_add_xid_snapmin() RETURNS TRIGGER AS $$ BEGIN + -- we add current transaction id and minimal LSN + -- based on suggestions here: https://github.com/josevalim/sync/blob/main/priv/repo/migrations/20240806131210_create_publication.exs + NEW._xid := pg_current_xact_id(); NEW._snapmin := pg_snapshot_xmin(pg_current_snapshot()); RETURN NEW; diff --git a/lib/migrations/migration003.ex b/lib/migrations/migration003.ex index 41a4e0b..db09b18 100644 --- a/lib/migrations/migration003.ex +++ b/lib/migrations/migration003.ex @@ -19,12 +19,14 @@ defmodule Migrations.Migration003 do "DROP TRIGGER essig_add_xid_to_signals;" execute " - -- Function to notify on new transactions (events) via pg_notify + CREATE OR REPLACE FUNCTION notify_new_events() RETURNS TRIGGER AS $$ DECLARE payload JSON; BEGIN + -- Function to notify on new transactions (events) via pg_notify + payload := json_build_object( 'scope_uuid', NEW.scope_uuid, '_xid', NEW._xid, From 40c486ab9d334c840c295065f8ff5cbb6de9eeaf Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Wed, 4 Sep 2024 15:41:16 +0200 Subject: [PATCH 21/40] Chore: move uuid7 file to lib/ folder --- lib/{ecto => }/uuid7.ex | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename lib/{ecto => }/uuid7.ex (100%) diff --git a/lib/ecto/uuid7.ex b/lib/uuid7.ex similarity index 100% rename from lib/ecto/uuid7.ex rename to lib/uuid7.ex From bf5d375460a27e17b8ae265c76b5d670f15b62f8 Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Wed, 4 Sep 2024 19:29:44 +0200 Subject: [PATCH 22/40] Chore: adjust naming in tests --- lib/meta_table/handler_meta_test.exs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/lib/meta_table/handler_meta_test.exs b/lib/meta_table/handler_meta_test.exs index 2dcce9b..41759a9 100644 --- a/lib/meta_table/handler_meta_test.exs +++ b/lib/meta_table/handler_meta_test.exs @@ -8,25 +8,25 @@ defmodule Essig.HandlerMetaTest do setup do # Set up a test app context - test_app = "test_app_#{:rand.uniform(1000)}" - Essig.Context.set_current_scope(test_app) + scope_uuid = Essig.UUID7.generate() + Essig.Context.set_current_scope(scope_uuid) on_exit(fn -> Essig.Context.set_current_scope(nil) end) - %{test_app: test_app} + %{scope_uuid: scope_uuid} end - test "init/0 creates an ETS table", %{test_app: test_app} do + test "init/0 creates an ETS table", %{scope_uuid: scope_uuid} do HandlerMeta.init() - assert :ets.info(String.to_atom("#{test_app}_handler_meta")) != :undefined + assert :ets.info(String.to_atom("#{scope_uuid}_handler_meta")) != :undefined end - test "repeated init/0 does not raise", %{test_app: test_app} do + test "repeated init/0 does not raise", %{scope_uuid: scope_uuid} do name = HandlerMeta.init() assert name == HandlerMeta.init() - assert :ets.info(String.to_atom("#{test_app}_handler_meta")) != :undefined + assert :ets.info(String.to_atom("#{scope_uuid}_handler_meta")) != :undefined end test "set/2 inserts data into the ETS table" do @@ -139,7 +139,7 @@ defmodule Essig.HandlerMetaTest do assert Enum.at(result, 0) == {TestModule1, %{status: :new, key: TestModule1}} end - test "operations with different app contexts", %{test_app: test_app} do + test "operations with different app contexts", %{scope_uuid: scope_uuid} do HandlerMeta.init() HandlerMeta.set(TestModule, %{field: "value1"}) @@ -151,7 +151,7 @@ defmodule Essig.HandlerMetaTest do assert HandlerMeta.get(TestModule) == %{field: "value2", key: TestModule} # Switch back to the original app context - Essig.Context.set_current_scope(test_app) + Essig.Context.set_current_scope(scope_uuid) assert HandlerMeta.get(TestModule) == %{field: "value1", key: TestModule} end end From d228e6df734de36e4ed098128a90d63b9f3ed3f1 Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Wed, 4 Sep 2024 19:30:36 +0200 Subject: [PATCH 23/40] Chore: rename `_xid` to `txid` --- lib/checker.ex | 2 +- lib/migrations/migration002.ex | 18 +++++++++--------- lib/migrations/migration003.ex | 20 ++++++++++---------- lib/pg_notify_listener.ex | 1 - lib/schemas/event.ex | 4 ++-- snippets/experiment.txt | 12 ++++++------ 6 files changed, 28 insertions(+), 29 deletions(-) diff --git a/lib/checker.ex b/lib/checker.ex index 1d90730..25efa99 100644 --- a/lib/checker.ex +++ b/lib/checker.ex @@ -18,6 +18,6 @@ defmodule Essig.Checker do ]) # this will be unnecessary soon - Essig.Casts.CastRunner.send_events(SampleCast1, events) + # Essig.Casts.CastRunner.send_events(SampleCast1, events) end end diff --git a/lib/migrations/migration002.ex b/lib/migrations/migration002.ex index dd01001..02957b7 100644 --- a/lib/migrations/migration002.ex +++ b/lib/migrations/migration002.ex @@ -3,31 +3,31 @@ defmodule Migrations.Migration002 do def change do alter table(:essig_events) do - add(:_xid, :bigserial) - add(:_snapmin, :bigserial) + add(:txid, :bigint) + add(:snapmin, :bigint) end execute " - CREATE OR REPLACE FUNCTION essig_add_xid_snapmin() + CREATE OR REPLACE FUNCTION essig_add_txid_snapmin() RETURNS TRIGGER AS $$ BEGIN -- we add current transaction id and minimal LSN -- based on suggestions here: https://github.com/josevalim/sync/blob/main/priv/repo/migrations/20240806131210_create_publication.exs - NEW._xid := pg_current_xact_id(); - NEW._snapmin := pg_snapshot_xmin(pg_current_snapshot()); + NEW.txid := pg_current_xact_id(); + NEW.snapmin := pg_snapshot_xmin(pg_current_snapshot()); RETURN NEW; END; $$ LANGUAGE plpgsql; ", - "DROP FUNCTION essig_add_xid_snapmin()" + "DROP FUNCTION essig_add_txid_snapmin()" execute " - CREATE OR REPLACE TRIGGER essig_add_xid_to_events + CREATE OR REPLACE TRIGGER essig_add_txid_to_events BEFORE INSERT OR UPDATE ON essig_events FOR EACH ROW - EXECUTE FUNCTION essig_add_xid_snapmin(); + EXECUTE FUNCTION essig_add_txid_snapmin(); ", - "DROP TRIGGER essig_add_xid_to_events;" + "DROP TRIGGER essig_add_txid_to_events;" end end diff --git a/lib/migrations/migration003.ex b/lib/migrations/migration003.ex index db09b18..f2feaf6 100644 --- a/lib/migrations/migration003.ex +++ b/lib/migrations/migration003.ex @@ -5,39 +5,39 @@ defmodule Migrations.Migration003 do create table(:essig_signals, primary_key: false) do add(:id, :bigserial, primary_key: true) add(:scope_uuid, :uuid, null: false, default: fragment("gen_random_uuid()")) - add(:_xid, :bigserial) - add(:_snapmin, :bigserial) + add(:txid, :bigint) + add(:snapmin, :bigint) end execute " -- Trigger on singals table, to notify on new transactions (events) via pg_notify - CREATE OR REPLACE TRIGGER essig_add_xid_to_signals + CREATE OR REPLACE TRIGGER essig_add_txid_to_signals BEFORE INSERT OR UPDATE ON essig_signals FOR EACH ROW - EXECUTE FUNCTION essig_add_xid_snapmin(); + EXECUTE FUNCTION essig_add_txid_snapmin(); ", - "DROP TRIGGER essig_add_xid_to_signals;" + "DROP TRIGGER essig_add_txid_to_signals;" - execute " + execute "drop function if exists notify_new_events CASCADE", "" + execute """ CREATE OR REPLACE FUNCTION notify_new_events() RETURNS TRIGGER AS $$ DECLARE payload JSON; BEGIN -- Function to notify on new transactions (events) via pg_notify - payload := json_build_object( 'scope_uuid', NEW.scope_uuid, - '_xid', NEW._xid, - '_snapmin', NEW._snapmin + 'txid', NEW.txid, + 'snapmin', NEW.snapmin ); PERFORM pg_notify('new_events', payload::TEXT); RETURN NEW; END; $$ LANGUAGE plpgsql; - ", + """, "DROP FUNCTION notify_new_events();" execute " diff --git a/lib/pg_notify_listener.ex b/lib/pg_notify_listener.ex index 826e712..ab90d0e 100644 --- a/lib/pg_notify_listener.ex +++ b/lib/pg_notify_listener.ex @@ -26,7 +26,6 @@ defmodule Essig.PGNotifyListener do end def rebroadcast(map) do - # "{\"scope_uuid\" : \"0191bca1-36d5-7235-8084-6d955e50f6dc\", \"_xid\" : 182310, \"_snapmin\" : 182310} map = Essig.Helpers.Map.atomize_keys(map) Essig.Pubsub.broadcast("new_events", {:new_events, map}) end diff --git a/lib/schemas/event.ex b/lib/schemas/event.ex index 3433af4..49144a9 100644 --- a/lib/schemas/event.ex +++ b/lib/schemas/event.ex @@ -16,8 +16,8 @@ defmodule Essig.Schemas.Event do field(:seq, :integer) ## transaction metadata - field(:_xid, :integer, read_after_writes: true) - field(:_snapmin, :integer, read_after_writes: true) + field(:txid, :integer, read_after_writes: true) + field(:snapmin, :integer, read_after_writes: true) # no updated_at! timestamps(type: :utc_datetime_usec, updated_at: false) diff --git a/snippets/experiment.txt b/snippets/experiment.txt index 0e80e0b..c8efbd3 100644 --- a/snippets/experiment.txt +++ b/snippets/experiment.txt @@ -3,15 +3,15 @@ DROP TABLE IF EXISTS rows; CREATE TABLE rows ( id SERIAL PRIMARY KEY, name varchar, - xid bigserial NOT NULL, + txid bigserial NOT NULL, snapmin bigserial NOT NULL, timestamp timestamp default current_timestamp ); -CREATE OR REPLACE FUNCTION pgx_add_xid() +CREATE OR REPLACE FUNCTION pgx_add_txid() RETURNS TRIGGER AS $$ BEGIN - NEW.xid := pg_current_xact_id(); + NEW.txid := pg_current_xact_id(); NEW.snapmin := pg_snapshot_xmin(pg_current_snapshot()); RETURN NEW; END; @@ -19,13 +19,13 @@ CREATE OR REPLACE FUNCTION pgx_add_xid() -CREATE OR REPLACE TRIGGER pgx_add_xid_before_insert_update +CREATE OR REPLACE TRIGGER pgx_add_txid_before_insert_update BEFORE INSERT OR UPDATE ON rows FOR EACH ROW - EXECUTE FUNCTION pgx_add_xid(); + EXECUTE FUNCTION pgx_add_txid(); --- run this in 2 psql shells, interleaving the inserts. this helps to see the xid vs snapmin relationship -- +-- run this in 2 psql shells, interleaving the inserts. this helps to see thetxid vs snapmin relationship -- begin; insert into rows (name) values ('1'); From 7370ce340a2de9f4c0e02131c8ddb9d63c9ceba0 Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Wed, 4 Sep 2024 19:40:38 +0200 Subject: [PATCH 24/40] Chore: remove debug statement from migration --- lib/migrations/migration003.ex | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/migrations/migration003.ex b/lib/migrations/migration003.ex index f2feaf6..33cef7b 100644 --- a/lib/migrations/migration003.ex +++ b/lib/migrations/migration003.ex @@ -18,8 +18,6 @@ defmodule Migrations.Migration003 do ", "DROP TRIGGER essig_add_txid_to_signals;" - execute "drop function if exists notify_new_events CASCADE", "" - execute """ CREATE OR REPLACE FUNCTION notify_new_events() RETURNS TRIGGER AS $$ From 0ace7e672f10db652e9f0b97d00fa359fdae7490 Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Thu, 5 Sep 2024 02:52:00 +0200 Subject: [PATCH 25/40] Chore: Checker allows appending to same stream --- lib/checker.ex | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/lib/checker.ex b/lib/checker.ex index 25efa99..dbaaf8b 100644 --- a/lib/checker.ex +++ b/lib/checker.ex @@ -4,14 +4,18 @@ defmodule Essig.Checker do """ def run do scope_uuid = Essig.UUID7.generate() + stream_uuid = Essig.UUID7.generate() + run(scope_uuid, stream_uuid) + end + def run(scope_uuid, stream_uuid) do Essig.Server.start_scope(scope_uuid) Essig.Server.start_casts([SampleCast1]) - stream_uuid = Essig.UUID7.generate() + seq = Essig.EventStore.last_seq(stream_uuid) {:ok, %{events: events}} = - Essig.EventStore.append_to_stream(stream_uuid, "trp", 0, [ + Essig.EventStore.append_to_stream(stream_uuid, "trp", seq, [ %Sample.TestReports.Events.MasterReportAdded{path: "local/path/to", report: "report 1"}, %Sample.TestReports.Events.MasterReportAdded{path: "local/path/to", report: "report 2"}, %Sample.TestReports.Events.MasterReportAdded{path: "local/path/to", report: "report 3"} From b45fd282cef07a9b76df2029639efe353c15579a Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Thu, 5 Sep 2024 03:01:39 +0200 Subject: [PATCH 26/40] Chore: fix compilation warning --- lib/checker.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/checker.ex b/lib/checker.ex index dbaaf8b..07bb409 100644 --- a/lib/checker.ex +++ b/lib/checker.ex @@ -14,7 +14,7 @@ defmodule Essig.Checker do seq = Essig.EventStore.last_seq(stream_uuid) - {:ok, %{events: events}} = + {:ok, %{events: _events}} = Essig.EventStore.append_to_stream(stream_uuid, "trp", seq, [ %Sample.TestReports.Events.MasterReportAdded{path: "local/path/to", report: "report 1"}, %Sample.TestReports.Events.MasterReportAdded{path: "local/path/to", report: "report 2"}, From c5fb31a3b61710555b1f1a6d448b50953c962413 Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Thu, 5 Sep 2024 03:02:16 +0200 Subject: [PATCH 27/40] Feat: also store stream_uuid on essig_signals table --- lib/event_store/append_to_stream.ex | 10 +++++++--- lib/migrations/migration003.ex | 2 ++ 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/lib/event_store/append_to_stream.ex b/lib/event_store/append_to_stream.ex index 28136ee..4513f82 100644 --- a/lib/event_store/append_to_stream.ex +++ b/lib/event_store/append_to_stream.ex @@ -34,7 +34,7 @@ defmodule Essig.EventStore.AppendToStream do Essig.Crud.StreamsCrud.update_stream(stream, %{seq: last_event.seq}) end) |> Ecto.Multi.run(:signal_new_events, fn _repo, _ -> - signal_new_events() + signal_new_events(stream_uuid) end) end @@ -98,12 +98,16 @@ defmodule Essig.EventStore.AppendToStream do end end - defp signal_new_events() do + defp signal_new_events(stream_uuid) do scope_uuid = Essig.Context.current_scope() bin_uuid = Ecto.UUID.dump!(scope_uuid) + stream_uuid = Ecto.UUID.dump!(stream_uuid) {:ok, _} = - Repo.query("insert into essig_signals(scope_uuid) values ($1)", [bin_uuid]) + Repo.query("insert into essig_signals(scope_uuid, stream_uuid) values ($1, $2)", [ + bin_uuid, + stream_uuid + ]) {:ok, true} end diff --git a/lib/migrations/migration003.ex b/lib/migrations/migration003.ex index 33cef7b..2ab3f4e 100644 --- a/lib/migrations/migration003.ex +++ b/lib/migrations/migration003.ex @@ -5,6 +5,7 @@ defmodule Migrations.Migration003 do create table(:essig_signals, primary_key: false) do add(:id, :bigserial, primary_key: true) add(:scope_uuid, :uuid, null: false, default: fragment("gen_random_uuid()")) + add(:stream_uuid, :uuid, null: false) add(:txid, :bigint) add(:snapmin, :bigint) end @@ -27,6 +28,7 @@ defmodule Migrations.Migration003 do -- Function to notify on new transactions (events) via pg_notify payload := json_build_object( 'scope_uuid', NEW.scope_uuid, + 'stream_uuid', NEW.stream_uuid, 'txid', NEW.txid, 'snapmin', NEW.snapmin ); From d85687aac82023c15748085667949336131276bc Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Thu, 5 Sep 2024 03:02:41 +0200 Subject: [PATCH 28/40] Chore: fix module name for EventStore.BaseQuery --- lib/event_store/base_query.ex | 2 +- lib/event_store/read_all_stream_backward.ex | 2 +- lib/event_store/read_all_stream_forward.ex | 2 +- lib/event_store/read_stream_backward.ex | 2 +- lib/event_store/read_stream_forward.ex | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/event_store/base_query.ex b/lib/event_store/base_query.ex index b9bb5a6..7b85dd2 100644 --- a/lib/event_store/base_query.ex +++ b/lib/event_store/base_query.ex @@ -1,4 +1,4 @@ -defmodule EventStore.BaseQuery do +defmodule Essig.EventStore.BaseQuery do alias Essig.Schemas.Event use Essig.Repo diff --git a/lib/event_store/read_all_stream_backward.ex b/lib/event_store/read_all_stream_backward.ex index cded3d8..5ec988a 100644 --- a/lib/event_store/read_all_stream_backward.ex +++ b/lib/event_store/read_all_stream_backward.ex @@ -6,7 +6,7 @@ defmodule Essig.EventStore.ReadAllStreamBackward do end def query(from_id, amount) do - EventStore.BaseQuery.query() + Essig.EventStore.BaseQuery.query() |> where([event], event.id < ^from_id) |> order_by(desc: :id) |> limit(^amount) diff --git a/lib/event_store/read_all_stream_forward.ex b/lib/event_store/read_all_stream_forward.ex index 490c386..3b77673 100644 --- a/lib/event_store/read_all_stream_forward.ex +++ b/lib/event_store/read_all_stream_forward.ex @@ -6,7 +6,7 @@ defmodule Essig.EventStore.ReadAllStreamForward do end def query(from_id, amount) do - EventStore.BaseQuery.query() + Essig.EventStore.BaseQuery.query() |> where([event], event.id > ^from_id) |> order_by(asc: :id) |> limit(^amount) diff --git a/lib/event_store/read_stream_backward.ex b/lib/event_store/read_stream_backward.ex index fe59b02..d86be85 100644 --- a/lib/event_store/read_stream_backward.ex +++ b/lib/event_store/read_stream_backward.ex @@ -6,7 +6,7 @@ defmodule Essig.EventStore.ReadStreamBackward do end def query(stream_uuid, from_seq, amount) do - EventStore.BaseQuery.query() + Essig.EventStore.BaseQuery.query() |> where([event], event.stream_uuid == ^stream_uuid) |> where([event], event.seq < ^from_seq) |> order_by(desc: :seq) diff --git a/lib/event_store/read_stream_forward.ex b/lib/event_store/read_stream_forward.ex index e0dd73f..fd98659 100644 --- a/lib/event_store/read_stream_forward.ex +++ b/lib/event_store/read_stream_forward.ex @@ -6,7 +6,7 @@ defmodule Essig.EventStore.ReadStreamForward do end def query(stream_uuid, from_seq, amount) do - EventStore.BaseQuery.query() + Essig.EventStore.BaseQuery.query() |> where([event], event.stream_uuid == ^stream_uuid) |> where([event], event.seq > ^from_seq) |> order_by(asc: :id) From 5a96ac348d23a1591b9bfb4eda7638b16031dff6 Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Thu, 5 Sep 2024 22:07:27 +0200 Subject: [PATCH 29/40] Feat: EventStore.Cache module, that supports concurrent requests without work duplication Problem: - we need to fetch the same data from the DB at the same time from all the cast runner processes - this could easily overload the DB due to the "thundering herd" effect Solution: - implement a caching layer for the event store - this caching layer holds all in-flight requests except the first one (for the value generation) and gives them the cached value after it was generated - current module is a draft for the concept, with fake work --- lib/event_store/cache.ex | 93 ++++++++++++++++++++++++++++++++++ lib/event_store/cache_test.exs | 62 +++++++++++++++++++++++ 2 files changed, 155 insertions(+) create mode 100644 lib/event_store/cache.ex create mode 100644 lib/event_store/cache_test.exs diff --git a/lib/event_store/cache.ex b/lib/event_store/cache.ex new file mode 100644 index 0000000..8eebac4 --- /dev/null +++ b/lib/event_store/cache.ex @@ -0,0 +1,93 @@ +defmodule Essig.EventStore.Cache do + @behaviour :gen_statem + + defstruct busy: %{}, cache: %{} + + def start_link(_), do: :gen_statem.start_link(__MODULE__, [], []) + + @impl true + def callback_mode(), do: [:handle_event_function, :state_enter] + + @impl true + def init(_) do + # state / data / actions + {:ok, %{}, %__MODULE__{}, []} + end + + def request(pid, request), do: :gen_statem.call(pid, {:request, request}) + def get_state(pid), do: :gen_statem.call(pid, :get_state) + + ### INTERNAL ### + + @impl :gen_statem + def handle_event(:enter, _before_state, _after_state, _data), do: {:keep_state_and_data, []} + + # just return state / data + def handle_event({:call, from}, :get_state, state, data) do + {:keep_state, data, [{:reply, from, {state, data}}]} + end + + # + def handle_event({:call, from}, {:request, request}, _, data) do + res = get_from_cache(data, request) + in_busy = is_busy_for_request(data, request) + + cond do + # we have a result in cache, so we reply immediately + res != nil -> + actions = [{:reply, from, res}] + {:keep_state_and_data, actions} + + # we are already busy with this request, so we postpone + in_busy -> + actions = [:postpone] + {:keep_state_and_data, actions} + + # not in cache and no in-progress fetching, so we schedule a fetch + true -> + data = mark_busy_for_request(data, request, from) + actions = [{:next_event, :internal, {:fetch_data, request, from}}] + {:next_state, data.busy, data, actions} + end + end + + # fetch data and populate the cache + def handle_event(:cast, {:set_response, request, response, from}, _state, data) do + data = mark_done_for_request(data, request) + data = store_in_cache(data, request, response) + actions = [{:reply, from, response}] + {:next_state, data.busy, data, actions} + end + + def handle_event(:internal, {:fetch_data, request, from}, _s, _data) do + pid = self() + + Task.start(fn -> + Process.sleep(200) + response = "RESULT: #{inspect(request)}" + GenServer.cast(pid, {:set_response, request, response, from}) + end) + + {:keep_state_and_data, []} + end + + def is_busy_for_request(data, request) do + Map.get(data.busy, request, false) + end + + def mark_busy_for_request(data, request, from) do + %__MODULE__{data | busy: Map.put(data.busy, request, from)} + end + + def mark_done_for_request(data, request) do + %__MODULE__{data | busy: Map.delete(data.busy, request)} + end + + def store_in_cache(data, request, res) do + %__MODULE__{data | cache: Map.put(data.cache, request, res)} + end + + def get_from_cache(data, request) do + Map.get(data.cache, request, nil) + end +end diff --git a/lib/event_store/cache_test.exs b/lib/event_store/cache_test.exs new file mode 100644 index 0000000..c2b570c --- /dev/null +++ b/lib/event_store/cache_test.exs @@ -0,0 +1,62 @@ +defmodule Essig.EventStore.CacheTest do + use Essig.DataCase + alias Essig.EventStore.Cache + + describe "full run" do + test "multiple tasks with same request get the same result" do + {:ok, pid} = Cache.start_link([]) + assert is_pid(pid) + + tasks = + for _ <- 1..5 do + Task.async(fn -> + Cache.request(pid, {:a, 1}) + end) + end + + results = Task.await_many(tasks) + + assert length(results) == 5 + [first_result | rest] = results + # IO.inspect(first_result) + assert Enum.all?(rest, &(&1 == first_result)) + end + + test "multiple tasks with different requests work fine" do + {:ok, pid} = Cache.start_link([]) + + # we populate the cache + pop_tasks = + for _ <- 1..10 do + Task.async(fn -> + Cache.request(pid, {:a, :rand.uniform(3)}) + end) + end + + fetch_tasks = + for _ <- 1..5 do + Task.async(fn -> + Cache.request(pid, {:a, 1}) + Cache.request(pid, {:a, 2}) + Cache.request(pid, {:a, 3}) + end) + end + + Task.await_many(pop_tasks ++ fetch_tasks) + + assert Cache.request(pid, {:a, 1}) == "RESULT: {:a, 1}" + state = Cache.get_state(pid) + + assert state == + {%{}, + %Essig.EventStore.Cache{ + busy: %{}, + cache: %{ + {:a, 1} => "RESULT: {:a, 1}", + {:a, 2} => "RESULT: {:a, 2}", + {:a, 3} => "RESULT: {:a, 3}" + } + }} + end + end +end From 71d303b41795db144cb0afde5c2002c6e7d5bc72 Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Thu, 5 Sep 2024 23:05:29 +0200 Subject: [PATCH 30/40] Feat: EventStore.Cache now supports using MFA (module / function / arguments) tuple Problem: - we need a flexible way to tell the cache how to fill cache values - hardcoding logic feels inflexible Solution: - by using MFA (module / function / arguments) this cache becomes generic - this can be used for any kind of caching and is extremely versatile --- lib/event_store/cache.ex | 31 ++++++++++++++++++++++--------- lib/event_store/cache_test.exs | 29 ++++++++++++++++++++--------- 2 files changed, 42 insertions(+), 18 deletions(-) diff --git a/lib/event_store/cache.ex b/lib/event_store/cache.ex index 8eebac4..ab599d9 100644 --- a/lib/event_store/cache.ex +++ b/lib/event_store/cache.ex @@ -1,9 +1,21 @@ defmodule Essig.EventStore.Cache do + @moduledoc """ + Cache layer for the EventStore. + + - allows concurrent requests without work duplication and blocking + + Special notes: + - we use a map to signify the state of the cache (usually this is an atom) + - this map contains currently running cache misses + - every change on this map triggers a state transition in gen_statem + - the postponed requests get a chance to run again on each state transition + """ + @behaviour :gen_statem defstruct busy: %{}, cache: %{} - def start_link(_), do: :gen_statem.start_link(__MODULE__, [], []) + def start_link(opts \\ []), do: :gen_statem.start_link(__MODULE__, [], opts) @impl true def callback_mode(), do: [:handle_event_function, :state_enter] @@ -14,6 +26,8 @@ defmodule Essig.EventStore.Cache do {:ok, %{}, %__MODULE__{}, []} end + ### PUBLIC API ### + def request(pid, request), do: :gen_statem.call(pid, {:request, request}) def get_state(pid), do: :gen_statem.call(pid, :get_state) @@ -59,35 +73,34 @@ defmodule Essig.EventStore.Cache do {:next_state, data.busy, data, actions} end - def handle_event(:internal, {:fetch_data, request, from}, _s, _data) do + def handle_event(:internal, {:fetch_data, {mod, fun, args} = request, from}, _s, _data) do pid = self() Task.start(fn -> - Process.sleep(200) - response = "RESULT: #{inspect(request)}" + response = apply(mod, fun, args) GenServer.cast(pid, {:set_response, request, response, from}) end) {:keep_state_and_data, []} end - def is_busy_for_request(data, request) do + defp is_busy_for_request(data, request) do Map.get(data.busy, request, false) end - def mark_busy_for_request(data, request, from) do + defp mark_busy_for_request(data, request, from) do %__MODULE__{data | busy: Map.put(data.busy, request, from)} end - def mark_done_for_request(data, request) do + defp mark_done_for_request(data, request) do %__MODULE__{data | busy: Map.delete(data.busy, request)} end - def store_in_cache(data, request, res) do + defp store_in_cache(data, request, res) do %__MODULE__{data | cache: Map.put(data.cache, request, res)} end - def get_from_cache(data, request) do + defp get_from_cache(data, request) do Map.get(data.cache, request, nil) end end diff --git a/lib/event_store/cache_test.exs b/lib/event_store/cache_test.exs index c2b570c..62bffa1 100644 --- a/lib/event_store/cache_test.exs +++ b/lib/event_store/cache_test.exs @@ -2,6 +2,17 @@ defmodule Essig.EventStore.CacheTest do use Essig.DataCase alias Essig.EventStore.Cache + defmodule ReqBackend do + def fetch(request) do + Process.sleep(20) + "RESULT: #{inspect(request)}" + end + end + + def req_tuple(value) do + {ReqBackend, :fetch, [value]} + end + describe "full run" do test "multiple tasks with same request get the same result" do {:ok, pid} = Cache.start_link([]) @@ -10,7 +21,7 @@ defmodule Essig.EventStore.CacheTest do tasks = for _ <- 1..5 do Task.async(fn -> - Cache.request(pid, {:a, 1}) + Cache.request(pid, req_tuple(1)) end) end @@ -29,22 +40,22 @@ defmodule Essig.EventStore.CacheTest do pop_tasks = for _ <- 1..10 do Task.async(fn -> - Cache.request(pid, {:a, :rand.uniform(3)}) + Cache.request(pid, req_tuple(:rand.uniform(3))) end) end fetch_tasks = for _ <- 1..5 do Task.async(fn -> - Cache.request(pid, {:a, 1}) - Cache.request(pid, {:a, 2}) - Cache.request(pid, {:a, 3}) + Cache.request(pid, req_tuple(1)) + Cache.request(pid, req_tuple(2)) + Cache.request(pid, req_tuple(3)) end) end Task.await_many(pop_tasks ++ fetch_tasks) - assert Cache.request(pid, {:a, 1}) == "RESULT: {:a, 1}" + assert Cache.request(pid, req_tuple(1)) == "RESULT: 1" state = Cache.get_state(pid) assert state == @@ -52,9 +63,9 @@ defmodule Essig.EventStore.CacheTest do %Essig.EventStore.Cache{ busy: %{}, cache: %{ - {:a, 1} => "RESULT: {:a, 1}", - {:a, 2} => "RESULT: {:a, 2}", - {:a, 3} => "RESULT: {:a, 3}" + {Essig.EventStore.CacheTest.ReqBackend, :fetch, [1]} => "RESULT: 1", + {Essig.EventStore.CacheTest.ReqBackend, :fetch, [2]} => "RESULT: 2", + {Essig.EventStore.CacheTest.ReqBackend, :fetch, [3]} => "RESULT: 3" } }} end From cd79d4180b15e067212b6e46f166cf6ada153f47 Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Thu, 5 Sep 2024 23:16:27 +0200 Subject: [PATCH 31/40] Feat: Essig.EventStore.Cache -> Essig.Cache - now it became a generic caching layer, without other dependencies, useful for ANYTHING. ;) --- lib/{event_store => }/cache.ex | 13 +++++++++---- lib/{event_store => }/cache_test.exs | 14 ++++++-------- 2 files changed, 15 insertions(+), 12 deletions(-) rename lib/{event_store => }/cache.ex (92%) rename lib/{event_store => }/cache_test.exs (77%) diff --git a/lib/event_store/cache.ex b/lib/cache.ex similarity index 92% rename from lib/event_store/cache.ex rename to lib/cache.ex index ab599d9..f56e8ee 100644 --- a/lib/event_store/cache.ex +++ b/lib/cache.ex @@ -1,8 +1,13 @@ -defmodule Essig.EventStore.Cache do +defmodule Essig.Cache do @moduledoc """ - Cache layer for the EventStore. - - - allows concurrent requests without work duplication and blocking + Generic cache layer for anything. + Allows concurrent requests without work duplication and blocking. + + Usage: + ``` + {:ok, pid} = Essig.Cache.start_link() + response = Essig.Cache.request(pid, {Mod, :fun, [arg1, arg2]}) + ``` Special notes: - we use a map to signify the state of the cache (usually this is an atom) diff --git a/lib/event_store/cache_test.exs b/lib/cache_test.exs similarity index 77% rename from lib/event_store/cache_test.exs rename to lib/cache_test.exs index 62bffa1..589f708 100644 --- a/lib/event_store/cache_test.exs +++ b/lib/cache_test.exs @@ -1,6 +1,6 @@ -defmodule Essig.EventStore.CacheTest do +defmodule Essig.CacheTest do use Essig.DataCase - alias Essig.EventStore.Cache + alias Essig.Cache defmodule ReqBackend do def fetch(request) do @@ -16,7 +16,6 @@ defmodule Essig.EventStore.CacheTest do describe "full run" do test "multiple tasks with same request get the same result" do {:ok, pid} = Cache.start_link([]) - assert is_pid(pid) tasks = for _ <- 1..5 do @@ -29,7 +28,6 @@ defmodule Essig.EventStore.CacheTest do assert length(results) == 5 [first_result | rest] = results - # IO.inspect(first_result) assert Enum.all?(rest, &(&1 == first_result)) end @@ -60,12 +58,12 @@ defmodule Essig.EventStore.CacheTest do assert state == {%{}, - %Essig.EventStore.Cache{ + %Essig.Cache{ busy: %{}, cache: %{ - {Essig.EventStore.CacheTest.ReqBackend, :fetch, [1]} => "RESULT: 1", - {Essig.EventStore.CacheTest.ReqBackend, :fetch, [2]} => "RESULT: 2", - {Essig.EventStore.CacheTest.ReqBackend, :fetch, [3]} => "RESULT: 3" + {Essig.CacheTest.ReqBackend, :fetch, [1]} => "RESULT: 1", + {Essig.CacheTest.ReqBackend, :fetch, [2]} => "RESULT: 2", + {Essig.CacheTest.ReqBackend, :fetch, [3]} => "RESULT: 3" } }} end From bffd565c287c5127e399791466c501f0e7adfbb8 Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Thu, 5 Sep 2024 23:25:07 +0200 Subject: [PATCH 32/40] Feat: add option to remove entry from cache in Essig.Cache --- lib/cache.ex | 14 ++++++++++++-- lib/cache_test.exs | 31 ++++++++++++++++++++++--------- 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/lib/cache.ex b/lib/cache.ex index f56e8ee..961a65f 100644 --- a/lib/cache.ex +++ b/lib/cache.ex @@ -34,6 +34,7 @@ defmodule Essig.Cache do ### PUBLIC API ### def request(pid, request), do: :gen_statem.call(pid, {:request, request}) + def remove(pid, request), do: :gen_statem.call(pid, {:remove, request}) def get_state(pid), do: :gen_statem.call(pid, :get_state) ### INTERNAL ### @@ -42,8 +43,13 @@ defmodule Essig.Cache do def handle_event(:enter, _before_state, _after_state, _data), do: {:keep_state_and_data, []} # just return state / data - def handle_event({:call, from}, :get_state, state, data) do - {:keep_state, data, [{:reply, from, {state, data}}]} + def handle_event({:call, from}, :get_state, _state, data) do + {:keep_state, data, [{:reply, from, data}]} + end + + def handle_event({:call, from}, {:remove, request}, _state, data) do + data = remove_from_cache(data, request) + {:keep_state, data, [{:reply, from, :ok}]} end # @@ -105,6 +111,10 @@ defmodule Essig.Cache do %__MODULE__{data | cache: Map.put(data.cache, request, res)} end + defp remove_from_cache(data, request) do + %__MODULE__{data | cache: Map.delete(data.cache, request)} + end + defp get_from_cache(data, request) do Map.get(data.cache, request, nil) end diff --git a/lib/cache_test.exs b/lib/cache_test.exs index 589f708..8c5710a 100644 --- a/lib/cache_test.exs +++ b/lib/cache_test.exs @@ -57,15 +57,28 @@ defmodule Essig.CacheTest do state = Cache.get_state(pid) assert state == - {%{}, - %Essig.Cache{ - busy: %{}, - cache: %{ - {Essig.CacheTest.ReqBackend, :fetch, [1]} => "RESULT: 1", - {Essig.CacheTest.ReqBackend, :fetch, [2]} => "RESULT: 2", - {Essig.CacheTest.ReqBackend, :fetch, [3]} => "RESULT: 3" - } - }} + %Essig.Cache{ + busy: %{}, + cache: %{ + {Essig.CacheTest.ReqBackend, :fetch, [1]} => "RESULT: 1", + {Essig.CacheTest.ReqBackend, :fetch, [2]} => "RESULT: 2", + {Essig.CacheTest.ReqBackend, :fetch, [3]} => "RESULT: 3" + } + } + end + end + + describe "remove_cache" do + test "works" do + {:ok, pid} = Cache.start_link([]) + Cache.request(pid, req_tuple(1)) + Cache.request(pid, req_tuple(2)) + Cache.remove(pid, req_tuple(1)) + + assert Cache.get_state(pid) == %Essig.Cache{ + busy: %{}, + cache: %{{Essig.CacheTest.ReqBackend, :fetch, [2]} => "RESULT: 2"} + } end end end From 91226cb61ba89b59553a70fd122f3d9540ab2a4c Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Thu, 5 Sep 2024 23:45:57 +0200 Subject: [PATCH 33/40] Feat: also update last_used timestamps for each accessed key in cache Problem: - we would like to prevent our cache from growing indefinitely - for this we need to remember, when a key was used Solution: - store the timestamp for each key --- lib/cache.ex | 25 ++++++++++++++++++++----- lib/cache_test.exs | 8 ++++++-- 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/lib/cache.ex b/lib/cache.ex index 961a65f..4755307 100644 --- a/lib/cache.ex +++ b/lib/cache.ex @@ -18,7 +18,7 @@ defmodule Essig.Cache do @behaviour :gen_statem - defstruct busy: %{}, cache: %{} + defstruct busy: %{}, cache: %{}, last_used: %{} def start_link(opts \\ []), do: :gen_statem.start_link(__MODULE__, [], opts) @@ -48,20 +48,20 @@ defmodule Essig.Cache do end def handle_event({:call, from}, {:remove, request}, _state, data) do - data = remove_from_cache(data, request) + data = remove_from_cache(data, request) |> remove_last_used(request) {:keep_state, data, [{:reply, from, :ok}]} end # def handle_event({:call, from}, {:request, request}, _, data) do - res = get_from_cache(data, request) + {res, data} = get_from_cache(data, request) in_busy = is_busy_for_request(data, request) cond do # we have a result in cache, so we reply immediately res != nil -> actions = [{:reply, from, res}] - {:keep_state_and_data, actions} + {:keep_state, data, actions} # we are already busy with this request, so we postpone in_busy -> @@ -116,6 +116,21 @@ defmodule Essig.Cache do end defp get_from_cache(data, request) do - Map.get(data.cache, request, nil) + res = Map.get(data.cache, request, nil) + + if res do + {res, update_last_used(data, request)} + else + {nil, data} + end + end + + def update_last_used(data, request) do + time = :erlang.monotonic_time() + %__MODULE__{data | last_used: Map.put(data.last_used, request, time)} + end + + def remove_last_used(data, request) do + %__MODULE__{data | last_used: Map.delete(data.last_used, request)} end end diff --git a/lib/cache_test.exs b/lib/cache_test.exs index 8c5710a..904bfc1 100644 --- a/lib/cache_test.exs +++ b/lib/cache_test.exs @@ -54,7 +54,7 @@ defmodule Essig.CacheTest do Task.await_many(pop_tasks ++ fetch_tasks) assert Cache.request(pid, req_tuple(1)) == "RESULT: 1" - state = Cache.get_state(pid) + state = clean_state(pid) assert state == %Essig.Cache{ @@ -75,10 +75,14 @@ defmodule Essig.CacheTest do Cache.request(pid, req_tuple(2)) Cache.remove(pid, req_tuple(1)) - assert Cache.get_state(pid) == %Essig.Cache{ + assert clean_state(pid) == %Essig.Cache{ busy: %{}, cache: %{{Essig.CacheTest.ReqBackend, :fetch, [2]} => "RESULT: 2"} } end end + + def clean_state(pid) do + Cache.get_state(pid) |> Map.put(:last_used, %{}) + end end From d9d608acbe2db2bc015c4f9fd76acc075f73da82 Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Fri, 6 Sep 2024 18:24:35 +0200 Subject: [PATCH 34/40] Feat: prepare support for expiration of Essig.Cache values --- lib/cache.ex | 49 +++++++++++++++++++++++++++++++++++----------- lib/cache_test.exs | 10 +++++++++- 2 files changed, 47 insertions(+), 12 deletions(-) diff --git a/lib/cache.ex b/lib/cache.ex index 4755307..8bece94 100644 --- a/lib/cache.ex +++ b/lib/cache.ex @@ -6,7 +6,12 @@ defmodule Essig.Cache do Usage: ``` {:ok, pid} = Essig.Cache.start_link() + + # populate cache with default `expire_in` value response = Essig.Cache.request(pid, {Mod, :fun, [arg1, arg2]}) + + # populate cache with custom `expire_in` value + response = Essig.Cache.request(pid, {Mod, :fun, [arg1, arg2]}, expire_in: :timer.seconds(15)) ``` Special notes: @@ -18,7 +23,9 @@ defmodule Essig.Cache do @behaviour :gen_statem - defstruct busy: %{}, cache: %{}, last_used: %{} + @default_expire_in :timer.seconds(30) + + defstruct busy: %{}, cache: %{}, valid_until: %{}, expire_in: %{} def start_link(opts \\ []), do: :gen_statem.start_link(__MODULE__, [], opts) @@ -33,7 +40,7 @@ defmodule Essig.Cache do ### PUBLIC API ### - def request(pid, request), do: :gen_statem.call(pid, {:request, request}) + def request(pid, request, opts \\ []), do: :gen_statem.call(pid, {:request, request, opts}) def remove(pid, request), do: :gen_statem.call(pid, {:remove, request}) def get_state(pid), do: :gen_statem.call(pid, :get_state) @@ -48,14 +55,20 @@ defmodule Essig.Cache do end def handle_event({:call, from}, {:remove, request}, _state, data) do - data = remove_from_cache(data, request) |> remove_last_used(request) + data = + remove_from_cache(data, request) + |> remove_valid_until(request) + |> remove_expire_in(request) + {:keep_state, data, [{:reply, from, :ok}]} end # - def handle_event({:call, from}, {:request, request}, _, data) do + def handle_event({:call, from}, {:request, request, req_opts}, _, data) do + expire_in = Keyword.get(req_opts, :expire_in, @default_expire_in) + data = update_expire_in(data, request, expire_in) {res, data} = get_from_cache(data, request) - in_busy = is_busy_for_request(data, request) + is_busy = is_busy_for_request(data, request) cond do # we have a result in cache, so we reply immediately @@ -64,7 +77,7 @@ defmodule Essig.Cache do {:keep_state, data, actions} # we are already busy with this request, so we postpone - in_busy -> + is_busy -> actions = [:postpone] {:keep_state_and_data, actions} @@ -87,6 +100,7 @@ defmodule Essig.Cache do def handle_event(:internal, {:fetch_data, {mod, fun, args} = request, from}, _s, _data) do pid = self() + # run the fetch in a separate process, to unblock our main loop Task.start(fn -> response = apply(mod, fun, args) GenServer.cast(pid, {:set_response, request, response, from}) @@ -119,18 +133,31 @@ defmodule Essig.Cache do res = Map.get(data.cache, request, nil) if res do - {res, update_last_used(data, request)} + {res, update_valid_until(data, request)} else {nil, data} end end - def update_last_used(data, request) do + def update_valid_until(data, request) do time = :erlang.monotonic_time() - %__MODULE__{data | last_used: Map.put(data.last_used, request, time)} + expire_in = Map.get(data.expire_in, request, @default_expire_in) + + %__MODULE__{ + data + | valid_until: Map.put(data.valid_until, request, time + expire_in * 1_000_000) + } + end + + def remove_valid_until(data, request) do + %__MODULE__{data | valid_until: Map.delete(data.valid_until, request)} + end + + def update_expire_in(data, request, expire_in) do + %__MODULE__{data | expire_in: Map.put(data.expire_in, request, expire_in)} end - def remove_last_used(data, request) do - %__MODULE__{data | last_used: Map.delete(data.last_used, request)} + def remove_expire_in(data, request) do + %__MODULE__{data | expire_in: Map.delete(data.expire_in, request)} end end diff --git a/lib/cache_test.exs b/lib/cache_test.exs index 904bfc1..c32ee6f 100644 --- a/lib/cache_test.exs +++ b/lib/cache_test.exs @@ -63,6 +63,11 @@ defmodule Essig.CacheTest do {Essig.CacheTest.ReqBackend, :fetch, [1]} => "RESULT: 1", {Essig.CacheTest.ReqBackend, :fetch, [2]} => "RESULT: 2", {Essig.CacheTest.ReqBackend, :fetch, [3]} => "RESULT: 3" + }, + expire_in: %{ + {Essig.CacheTest.ReqBackend, :fetch, [1]} => 30000, + {Essig.CacheTest.ReqBackend, :fetch, [2]} => 30000, + {Essig.CacheTest.ReqBackend, :fetch, [3]} => 30000 } } end @@ -77,12 +82,15 @@ defmodule Essig.CacheTest do assert clean_state(pid) == %Essig.Cache{ busy: %{}, + expire_in: %{ + {Essig.CacheTest.ReqBackend, :fetch, [2]} => 30000 + }, cache: %{{Essig.CacheTest.ReqBackend, :fetch, [2]} => "RESULT: 2"} } end end def clean_state(pid) do - Cache.get_state(pid) |> Map.put(:last_used, %{}) + Cache.get_state(pid) |> Map.put(:valid_until, %{}) end end From 6db80f8fef8a7ebeebc1e3f7a55bcb93b3269610 Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Fri, 6 Sep 2024 19:16:48 +0200 Subject: [PATCH 35/40] Feat: function to handle expired entries in Essig.Cache --- lib/cache.ex | 12 ++++++++++++ lib/cache_test.exs | 42 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+) diff --git a/lib/cache.ex b/lib/cache.ex index 8bece94..678a4bb 100644 --- a/lib/cache.ex +++ b/lib/cache.ex @@ -93,6 +93,7 @@ defmodule Essig.Cache do def handle_event(:cast, {:set_response, request, response, from}, _state, data) do data = mark_done_for_request(data, request) data = store_in_cache(data, request, response) + data = update_valid_until(data, request) actions = [{:reply, from, response}] {:next_state, data.busy, data, actions} end @@ -160,4 +161,15 @@ defmodule Essig.Cache do def remove_expire_in(data, request) do %__MODULE__{data | expire_in: Map.delete(data.expire_in, request)} end + + def remove_expired_entries(data, now) do + expired_keys = Enum.filter(data.valid_until, fn {_, v} -> v < now end) + + Enum.reduce(expired_keys, data, fn {k, _time}, acc -> + acc + |> remove_from_cache(k) + |> remove_valid_until(k) + |> remove_expire_in(k) + end) + end end diff --git a/lib/cache_test.exs b/lib/cache_test.exs index c32ee6f..f63b6dd 100644 --- a/lib/cache_test.exs +++ b/lib/cache_test.exs @@ -90,6 +90,48 @@ defmodule Essig.CacheTest do end end + describe "remove_expired" do + test "works" do + {:ok, pid} = Cache.start_link([]) + + Cache.request(pid, req_tuple(1), expire_in: :timer.seconds(5)) + Cache.request(pid, req_tuple(2), expire_in: :timer.seconds(10)) + Cache.request(pid, req_tuple(3), expire_in: :timer.seconds(15)) + # default expire_in - 30 seconds + Cache.request(pid, req_tuple(4)) + + now = :erlang.monotonic_time() + + data = Cache.get_state(pid) + time_factor = 1_000_000 + + data1 = Cache.remove_expired_entries(data, now + :timer.seconds(5) * time_factor) + + assert data1.cache == %{ + {Essig.CacheTest.ReqBackend, :fetch, [2]} => "RESULT: 2", + {Essig.CacheTest.ReqBackend, :fetch, [3]} => "RESULT: 3", + {Essig.CacheTest.ReqBackend, :fetch, [4]} => "RESULT: 4" + } + + data2 = Cache.remove_expired_entries(data, now + :timer.seconds(10) * time_factor) + + assert data2.cache == %{ + {Essig.CacheTest.ReqBackend, :fetch, [3]} => "RESULT: 3", + {Essig.CacheTest.ReqBackend, :fetch, [4]} => "RESULT: 4" + } + + data3 = Cache.remove_expired_entries(data, now + :timer.seconds(15) * time_factor) + + assert data3.cache == %{ + {Essig.CacheTest.ReqBackend, :fetch, [4]} => "RESULT: 4" + } + + data4 = Cache.remove_expired_entries(data, now + :timer.seconds(30) * time_factor) + + assert data4.cache == %{} + end + end + def clean_state(pid) do Cache.get_state(pid) |> Map.put(:valid_until, %{}) end From ba1537936a8aa4b4e9728d810d6a0a2e9c877452 Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Sat, 7 Sep 2024 00:04:30 +0200 Subject: [PATCH 36/40] Feat: use GenCache for Essig.Cache --- lib/cache.ex | 180 ++------------------------------------- lib/cache_test.exs | 138 ------------------------------ lib/essig/application.ex | 1 + mix.exs | 1 + mix.lock | 1 + 5 files changed, 12 insertions(+), 309 deletions(-) delete mode 100644 lib/cache_test.exs diff --git a/lib/cache.ex b/lib/cache.ex index 678a4bb..b3a5f10 100644 --- a/lib/cache.ex +++ b/lib/cache.ex @@ -1,175 +1,13 @@ defmodule Essig.Cache do - @moduledoc """ - Generic cache layer for anything. - Allows concurrent requests without work duplication and blocking. - - Usage: - ``` - {:ok, pid} = Essig.Cache.start_link() - - # populate cache with default `expire_in` value - response = Essig.Cache.request(pid, {Mod, :fun, [arg1, arg2]}) - - # populate cache with custom `expire_in` value - response = Essig.Cache.request(pid, {Mod, :fun, [arg1, arg2]}, expire_in: :timer.seconds(15)) - ``` - - Special notes: - - we use a map to signify the state of the cache (usually this is an atom) - - this map contains currently running cache misses - - every change on this map triggers a state transition in gen_statem - - the postponed requests get a chance to run again on each state transition - """ - - @behaviour :gen_statem - - @default_expire_in :timer.seconds(30) - - defstruct busy: %{}, cache: %{}, valid_until: %{}, expire_in: %{} - - def start_link(opts \\ []), do: :gen_statem.start_link(__MODULE__, [], opts) - - @impl true - def callback_mode(), do: [:handle_event_function, :state_enter] - - @impl true - def init(_) do - # state / data / actions - {:ok, %{}, %__MODULE__{}, []} - end - - ### PUBLIC API ### - - def request(pid, request, opts \\ []), do: :gen_statem.call(pid, {:request, request, opts}) - def remove(pid, request), do: :gen_statem.call(pid, {:remove, request}) - def get_state(pid), do: :gen_statem.call(pid, :get_state) - - ### INTERNAL ### - - @impl :gen_statem - def handle_event(:enter, _before_state, _after_state, _data), do: {:keep_state_and_data, []} - - # just return state / data - def handle_event({:call, from}, :get_state, _state, data) do - {:keep_state, data, [{:reply, from, data}]} - end - - def handle_event({:call, from}, {:remove, request}, _state, data) do - data = - remove_from_cache(data, request) - |> remove_valid_until(request) - |> remove_expire_in(request) - - {:keep_state, data, [{:reply, from, :ok}]} - end - - # - def handle_event({:call, from}, {:request, request, req_opts}, _, data) do - expire_in = Keyword.get(req_opts, :expire_in, @default_expire_in) - data = update_expire_in(data, request, expire_in) - {res, data} = get_from_cache(data, request) - is_busy = is_busy_for_request(data, request) - - cond do - # we have a result in cache, so we reply immediately - res != nil -> - actions = [{:reply, from, res}] - {:keep_state, data, actions} - - # we are already busy with this request, so we postpone - is_busy -> - actions = [:postpone] - {:keep_state_and_data, actions} - - # not in cache and no in-progress fetching, so we schedule a fetch - true -> - data = mark_busy_for_request(data, request, from) - actions = [{:next_event, :internal, {:fetch_data, request, from}}] - {:next_state, data.busy, data, actions} - end - end - - # fetch data and populate the cache - def handle_event(:cast, {:set_response, request, response, from}, _state, data) do - data = mark_done_for_request(data, request) - data = store_in_cache(data, request, response) - data = update_valid_until(data, request) - actions = [{:reply, from, response}] - {:next_state, data.busy, data, actions} - end - - def handle_event(:internal, {:fetch_data, {mod, fun, args} = request, from}, _s, _data) do - pid = self() - - # run the fetch in a separate process, to unblock our main loop - Task.start(fn -> - response = apply(mod, fun, args) - GenServer.cast(pid, {:set_response, request, response, from}) - end) - - {:keep_state_and_data, []} - end - - defp is_busy_for_request(data, request) do - Map.get(data.busy, request, false) - end - - defp mark_busy_for_request(data, request, from) do - %__MODULE__{data | busy: Map.put(data.busy, request, from)} - end - - defp mark_done_for_request(data, request) do - %__MODULE__{data | busy: Map.delete(data.busy, request)} - end - - defp store_in_cache(data, request, res) do - %__MODULE__{data | cache: Map.put(data.cache, request, res)} - end - - defp remove_from_cache(data, request) do - %__MODULE__{data | cache: Map.delete(data.cache, request)} - end - - defp get_from_cache(data, request) do - res = Map.get(data.cache, request, nil) - - if res do - {res, update_valid_until(data, request)} - else - {nil, data} - end - end - - def update_valid_until(data, request) do - time = :erlang.monotonic_time() - expire_in = Map.get(data.expire_in, request, @default_expire_in) - - %__MODULE__{ - data - | valid_until: Map.put(data.valid_until, request, time + expire_in * 1_000_000) + use GenCache + + def child_spec(opts) do + %{ + id: __MODULE__, + start: {__MODULE__, :start_link, [opts]}, + type: :worker, + restart: :permanent, + shutdown: 500 } end - - def remove_valid_until(data, request) do - %__MODULE__{data | valid_until: Map.delete(data.valid_until, request)} - end - - def update_expire_in(data, request, expire_in) do - %__MODULE__{data | expire_in: Map.put(data.expire_in, request, expire_in)} - end - - def remove_expire_in(data, request) do - %__MODULE__{data | expire_in: Map.delete(data.expire_in, request)} - end - - def remove_expired_entries(data, now) do - expired_keys = Enum.filter(data.valid_until, fn {_, v} -> v < now end) - - Enum.reduce(expired_keys, data, fn {k, _time}, acc -> - acc - |> remove_from_cache(k) - |> remove_valid_until(k) - |> remove_expire_in(k) - end) - end end diff --git a/lib/cache_test.exs b/lib/cache_test.exs deleted file mode 100644 index f63b6dd..0000000 --- a/lib/cache_test.exs +++ /dev/null @@ -1,138 +0,0 @@ -defmodule Essig.CacheTest do - use Essig.DataCase - alias Essig.Cache - - defmodule ReqBackend do - def fetch(request) do - Process.sleep(20) - "RESULT: #{inspect(request)}" - end - end - - def req_tuple(value) do - {ReqBackend, :fetch, [value]} - end - - describe "full run" do - test "multiple tasks with same request get the same result" do - {:ok, pid} = Cache.start_link([]) - - tasks = - for _ <- 1..5 do - Task.async(fn -> - Cache.request(pid, req_tuple(1)) - end) - end - - results = Task.await_many(tasks) - - assert length(results) == 5 - [first_result | rest] = results - assert Enum.all?(rest, &(&1 == first_result)) - end - - test "multiple tasks with different requests work fine" do - {:ok, pid} = Cache.start_link([]) - - # we populate the cache - pop_tasks = - for _ <- 1..10 do - Task.async(fn -> - Cache.request(pid, req_tuple(:rand.uniform(3))) - end) - end - - fetch_tasks = - for _ <- 1..5 do - Task.async(fn -> - Cache.request(pid, req_tuple(1)) - Cache.request(pid, req_tuple(2)) - Cache.request(pid, req_tuple(3)) - end) - end - - Task.await_many(pop_tasks ++ fetch_tasks) - - assert Cache.request(pid, req_tuple(1)) == "RESULT: 1" - state = clean_state(pid) - - assert state == - %Essig.Cache{ - busy: %{}, - cache: %{ - {Essig.CacheTest.ReqBackend, :fetch, [1]} => "RESULT: 1", - {Essig.CacheTest.ReqBackend, :fetch, [2]} => "RESULT: 2", - {Essig.CacheTest.ReqBackend, :fetch, [3]} => "RESULT: 3" - }, - expire_in: %{ - {Essig.CacheTest.ReqBackend, :fetch, [1]} => 30000, - {Essig.CacheTest.ReqBackend, :fetch, [2]} => 30000, - {Essig.CacheTest.ReqBackend, :fetch, [3]} => 30000 - } - } - end - end - - describe "remove_cache" do - test "works" do - {:ok, pid} = Cache.start_link([]) - Cache.request(pid, req_tuple(1)) - Cache.request(pid, req_tuple(2)) - Cache.remove(pid, req_tuple(1)) - - assert clean_state(pid) == %Essig.Cache{ - busy: %{}, - expire_in: %{ - {Essig.CacheTest.ReqBackend, :fetch, [2]} => 30000 - }, - cache: %{{Essig.CacheTest.ReqBackend, :fetch, [2]} => "RESULT: 2"} - } - end - end - - describe "remove_expired" do - test "works" do - {:ok, pid} = Cache.start_link([]) - - Cache.request(pid, req_tuple(1), expire_in: :timer.seconds(5)) - Cache.request(pid, req_tuple(2), expire_in: :timer.seconds(10)) - Cache.request(pid, req_tuple(3), expire_in: :timer.seconds(15)) - # default expire_in - 30 seconds - Cache.request(pid, req_tuple(4)) - - now = :erlang.monotonic_time() - - data = Cache.get_state(pid) - time_factor = 1_000_000 - - data1 = Cache.remove_expired_entries(data, now + :timer.seconds(5) * time_factor) - - assert data1.cache == %{ - {Essig.CacheTest.ReqBackend, :fetch, [2]} => "RESULT: 2", - {Essig.CacheTest.ReqBackend, :fetch, [3]} => "RESULT: 3", - {Essig.CacheTest.ReqBackend, :fetch, [4]} => "RESULT: 4" - } - - data2 = Cache.remove_expired_entries(data, now + :timer.seconds(10) * time_factor) - - assert data2.cache == %{ - {Essig.CacheTest.ReqBackend, :fetch, [3]} => "RESULT: 3", - {Essig.CacheTest.ReqBackend, :fetch, [4]} => "RESULT: 4" - } - - data3 = Cache.remove_expired_entries(data, now + :timer.seconds(15) * time_factor) - - assert data3.cache == %{ - {Essig.CacheTest.ReqBackend, :fetch, [4]} => "RESULT: 4" - } - - data4 = Cache.remove_expired_entries(data, now + :timer.seconds(30) * time_factor) - - assert data4.cache == %{} - end - end - - def clean_state(pid) do - Cache.get_state(pid) |> Map.put(:valid_until, %{}) - end -end diff --git a/lib/essig/application.ex b/lib/essig/application.ex index 2b7bb5b..9bdf4c1 100644 --- a/lib/essig/application.ex +++ b/lib/essig/application.ex @@ -10,6 +10,7 @@ defmodule Essig.Application do Essig.RepoSingleConn, {Phoenix.PubSub, name: Essig.PubSub}, Essig.PGNotifyListener, + Essig.Cache, {Registry, keys: :unique, name: Essig.Scopes.Registry}, Essig.Scopes.DynamicSupervisor ] diff --git a/mix.exs b/mix.exs index bbe1e2f..21697c8 100644 --- a/mix.exs +++ b/mix.exs @@ -49,6 +49,7 @@ defmodule Essig.MixProject do ## UTIL {:json_serde, github: "maxohq/json_serde"}, {:liveness, "~> 1.0.0"}, + {:gen_cache, "~> 0.1"}, {:uniq, "~> 0.6"}, ## DEBUG diff --git a/mix.lock b/mix.lock index b712c45..df40ba0 100644 --- a/mix.lock +++ b/mix.lock @@ -10,6 +10,7 @@ "ecto_sql": {:hex, :ecto_sql, "3.12.0", "73cea17edfa54bde76ee8561b30d29ea08f630959685006d9c6e7d1e59113b7d", [:mix], [{:db_connection, "~> 2.4.1 or ~> 2.5", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.12", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.7", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.19 or ~> 1.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:tds, "~> 2.1.1 or ~> 2.2", [hex: :tds, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "dc9e4d206f274f3947e96142a8fdc5f69a2a6a9abb4649ef5c882323b6d512f0"}, "ets_select": {:hex, :ets_select, "0.1.3", "6ddb60480d8fadb1949d8ac9a06feb95750993adadceb19276bc6fd588326795", [:mix], [], "hexpm", "d2006673d24023a4c97baa56b0116d4dd1c73d43024c03b889aa1370634bb1ef"}, "file_system": {:hex, :file_system, "1.0.1", "79e8ceaddb0416f8b8cd02a0127bdbababe7bf4a23d2a395b983c1f8b3f73edd", [:mix], [], "hexpm", "4414d1f38863ddf9120720cd976fce5bdde8e91d8283353f0e31850fa89feb9e"}, + "gen_cache": {:hex, :gen_cache, "0.1.0", "2ee13fc603a1f0503b6ea6c411568ef0f9cd12f71215266d420e5c8649eb90cc", [:mix], [], "hexpm", "7e1d71307fe32a4dc028c843ea9ba13973a9767302b38f17a705462763f6cad2"}, "gen_state_machine": {:hex, :gen_state_machine, "3.0.0", "1e57f86a494e5c6b14137ebef26a7eb342b3b0070c7135f2d6768ed3f6b6cdff", [:mix], [], "hexpm", "0a59652574bebceb7309f6b749d2a41b45fdeda8dbb4da0791e355dd19f0ed15"}, "glob_ex": {:hex, :glob_ex, "0.1.8", "f7ef872877ca2ae7a792ab1f9ff73d9c16bf46ecb028603a8a3c5283016adc07", [:mix], [], "hexpm", "9e39d01729419a60a937c9260a43981440c43aa4cadd1fa6672fecd58241c464"}, "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, From 6197ec4e31374ca2efc7d4e548ead53523bf15fc Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Sat, 7 Sep 2024 00:21:40 +0200 Subject: [PATCH 37/40] Chore: keep ecto logs in dev --- config/config.exs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/config.exs b/config/config.exs index 3fc0c0d..7a037f9 100644 --- a/config/config.exs +++ b/config/config.exs @@ -8,7 +8,7 @@ config :essig, Essig.PubSub, adapter: Phoenix.PubSub.PG2 if config_env() == :dev do # setup for ecto_dev_logger (https://github.com/fuelen/ecto_dev_logger) - config :essig, Essig.Repo, log: false + # config :essig, Essig.Repo, log: false config :essig, Essig.Repo, username: System.get_env("POSTGRES_USER") || "postgres", From 0efcd0e0b55d46f5b6500d8e9d2fe7aa8744d9bc Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Sat, 7 Sep 2024 00:24:21 +0200 Subject: [PATCH 38/40] Feat: add EventStoreRead with explicit current_scope handling, to make reads cachable --- lib/event_store_reads.ex | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 lib/event_store_reads.ex diff --git a/lib/event_store_reads.ex b/lib/event_store_reads.ex new file mode 100644 index 0000000..0294a91 --- /dev/null +++ b/lib/event_store_reads.ex @@ -0,0 +1,32 @@ +defmodule Essig.EventStoreReads do + @moduledoc """ + EventStoreReads sets current scope explicitly, for reads. + This makes it possible to cache with [GenCache](https://github.com/maxohq/gen_cache/) + """ + use Essig.Repo + + def read_stream_forward(scope_uuid, stream_uuid, from_seq, amount) do + Essig.Context.set_current_scope(scope_uuid) + Essig.EventStore.ReadStreamForward.run(stream_uuid, from_seq, amount) + end + + def read_all_stream_forward(scope_uuid, from_id, amount) do + Essig.Context.set_current_scope(scope_uuid) + Essig.EventStore.ReadAllStreamForward.run(from_id, amount) + end + + def read_stream_backward(scope_uuid, stream_uuid, from_seq, amount) do + Essig.Context.set_current_scope(scope_uuid) + Essig.EventStore.ReadStreamBackward.run(stream_uuid, from_seq, amount) + end + + def read_all_stream_backward(scope_uuid, from_id, amount) do + Essig.Context.set_current_scope(scope_uuid) + Essig.EventStore.ReadAllStreamBackward.run(from_id, amount) + end + + def last_seq(scope_uuid, stream_uuid) do + Essig.Context.set_current_scope(scope_uuid) + Essig.EventStore.LastSeq.run(stream_uuid) + end +end From 489d09f9ba73b4e0131868776f9a0bd59956e5e4 Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Sat, 7 Sep 2024 00:29:49 +0200 Subject: [PATCH 39/40] Chore: usage examples --- lib/event_store_reads.ex | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/lib/event_store_reads.ex b/lib/event_store_reads.ex index 0294a91..872f1f6 100644 --- a/lib/event_store_reads.ex +++ b/lib/event_store_reads.ex @@ -2,6 +2,12 @@ defmodule Essig.EventStoreReads do @moduledoc """ EventStoreReads sets current scope explicitly, for reads. This makes it possible to cache with [GenCache](https://github.com/maxohq/gen_cache/) + + Usage: + ``` + iex> Essig.Cache.request({Essig.EventStoreReads, :read_all_stream_forward, ["0191be93-c178-71a0-8bd8-bcde9f55b7d6", 5,10]}) + iex> Essig.Cache.request({Essig.EventStoreReads, :read_stream_forward, ["0191be93-c178-71a0-8bd8-bcde9f55b7d6", "0191be93-c178-7203-b137-c4e294ca925b", 5, 10]}) + ``` """ use Essig.Repo From 9568358a01df5dbb6d8101f10384743539a6fbd0 Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Sun, 8 Sep 2024 22:10:25 +0200 Subject: [PATCH 40/40] Feat: add max_id and count columns to the essig_signals table Problem: - after getting the "poke" from the signals table, we currently know the transaction id and scope / stream uuids, but we still have issue an extra query, to figure out how many new events we need to fetch Solution: - the count of new events + max_id can be easily derived from the signals table, so we add new columns and send those values in the append_to_stream function. --- lib/event_store/append_to_stream.ex | 22 +++++++++++++------- lib/migrations/all.ex | 3 ++- lib/migrations/migration004.ex | 32 +++++++++++++++++++++++++++++ 3 files changed, 49 insertions(+), 8 deletions(-) create mode 100644 lib/migrations/migration004.ex diff --git a/lib/event_store/append_to_stream.ex b/lib/event_store/append_to_stream.ex index 4513f82..05f700a 100644 --- a/lib/event_store/append_to_stream.ex +++ b/lib/event_store/append_to_stream.ex @@ -33,8 +33,11 @@ defmodule Essig.EventStore.AppendToStream do last_event = Enum.at(insert_events, -1) Essig.Crud.StreamsCrud.update_stream(stream, %{seq: last_event.seq}) end) - |> Ecto.Multi.run(:signal_new_events, fn _repo, _ -> - signal_new_events(stream_uuid) + |> Ecto.Multi.run(:signal_new_events, fn _repo, %{insert_events: insert_events} -> + last_event = Enum.at(insert_events, -1) + max_id = last_event.id + count = Enum.count(insert_events) + signal_new_events(stream_uuid, count, max_id) end) end @@ -98,16 +101,21 @@ defmodule Essig.EventStore.AppendToStream do end end - defp signal_new_events(stream_uuid) do + defp signal_new_events(stream_uuid, count, max_id) do scope_uuid = Essig.Context.current_scope() bin_uuid = Ecto.UUID.dump!(scope_uuid) stream_uuid = Ecto.UUID.dump!(stream_uuid) {:ok, _} = - Repo.query("insert into essig_signals(scope_uuid, stream_uuid) values ($1, $2)", [ - bin_uuid, - stream_uuid - ]) + Repo.query( + "insert into essig_signals(scope_uuid, stream_uuid, count, max_id) values ($1, $2, $3, $4)", + [ + bin_uuid, + stream_uuid, + count, + max_id + ] + ) {:ok, true} end diff --git a/lib/migrations/all.ex b/lib/migrations/all.ex index abebcaa..4269584 100644 --- a/lib/migrations/all.ex +++ b/lib/migrations/all.ex @@ -6,7 +6,8 @@ defmodule Migrations.All do [ {2024_0824_120000, Migrations.Migration001}, {2024_0904_112600, Migrations.Migration002}, - {2024_0904_114100, Migrations.Migration003} + {2024_0904_114100, Migrations.Migration003}, + {2024_0907_232900, Migrations.Migration004} ] end end diff --git a/lib/migrations/migration004.ex b/lib/migrations/migration004.ex new file mode 100644 index 0000000..6bd216c --- /dev/null +++ b/lib/migrations/migration004.ex @@ -0,0 +1,32 @@ +defmodule Migrations.Migration004 do + use Ecto.Migration + + def change do + alter table(:essig_signals) do + add(:count, :integer, null: false) + add(:max_id, :bigint, null: false) + end + + execute """ + CREATE OR REPLACE FUNCTION notify_new_events() + RETURNS TRIGGER AS $$ + DECLARE + payload JSON; + BEGIN + -- Function to notify on new transactions (events) via pg_notify + payload := json_build_object( + 'scope_uuid', NEW.scope_uuid, + 'stream_uuid', NEW.stream_uuid, + 'txid', NEW.txid, + 'count', NEW.count, + 'max_id', NEW.max_id + ); + + PERFORM pg_notify('new_events', payload::TEXT); + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + """, + "DROP FUNCTION notify_new_events();" + end +end