Skip to content

Commit

Permalink
Merge pull request #1 from maxohq/feat/event_broadcaster
Browse files Browse the repository at this point in the history
Feat: Pubsub system to update Casts
  • Loading branch information
mindreframer authored Sep 8, 2024
2 parents a1e90ad + 9568358 commit be97aeb
Show file tree
Hide file tree
Showing 45 changed files with 898 additions and 77 deletions.
4 changes: 3 additions & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ config :essig,
ecto_repos: [Essig.Repo],
generators: [timestamp_type: :utc_datetime]

config :essig, Essig.PubSub, adapter: Phoenix.PubSub.PG2

if config_env() == :dev do
# setup for ecto_dev_logger (https://github.com/fuelen/ecto_dev_logger)
config :essig, Essig.Repo, log: false
# config :essig, Essig.Repo, log: false

config :essig, Essig.Repo,
username: System.get_env("POSTGRES_USER") || "postgres",
Expand Down
13 changes: 13 additions & 0 deletions lib/cache.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
defmodule Essig.Cache do
use GenCache

def child_spec(opts) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [opts]},
type: :worker,
restart: :permanent,
shutdown: 500
}
end
end
38 changes: 36 additions & 2 deletions lib/casts/cast_runner.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
defmodule Essig.Casts.CastRunner do
use GenServer

defstruct key: nil, seq: nil, max_id: nil, module: nil, row: nil

##### PUBLIC API

def send_events(module, events) do
Expand All @@ -18,17 +20,22 @@ defmodule Essig.Casts.CastRunner do
def init(args) do
module = Keyword.fetch!(args, :module)
apply(module, :bootstrap, [])
{:ok, %{module: module, seq: 0, max_id: 0}}
{:ok, row} = fetch_from_db_or_init(module)
meta_data = %__MODULE__{key: module, seq: row.seq, max_id: row.max_id, module: module}
Essig.Casts.MetaTable.set(module, meta_data)
state = Map.put(meta_data, :row, row)
{:ok, state}
end

defp via_tuple(module) do
Essig.Casts.Registry.via(module)
end

def handle_call({:send_events, events}, _from, state) do
module = Map.fetch!(state, :module)
module = Map.fetch!(state, :key)
{:ok, res, state} = apply(module, :handle_events, [state, events])
state = update_seq_and_max_id(state, events)
state = update_db(state)
{:reply, {res, state}, state}
end

Expand All @@ -44,4 +51,31 @@ defmodule Essig.Casts.CastRunner do
Essig.Casts.MetaTable.update(state.module, %{seq: new_seq, max_id: new_max_id})
%{state | seq: new_seq, max_id: new_max_id}
end

defp update_db(state) do
{:ok, row} =
Essig.Crud.CastsCrud.update_cast(state.row, %{seq: state.seq, max_id: state.max_id})

Map.put(state, :row, row)
end

defp fetch_from_db_or_init(module) do
case Essig.Crud.CastsCrud.get_cast_by_module(module) do
nil ->
scope_uuid = Essig.Context.current_scope()

payload = %{
scope_uuid: scope_uuid,
module: Atom.to_string(module),
seq: 0,
max_id: 0,
setup_done: false
}

{:ok, _row} = Essig.Crud.CastsCrud.create_cast(payload)

row ->
{:ok, row}
end
end
end
39 changes: 31 additions & 8 deletions lib/casts/cast_runner_test.exs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
defmodule Essig.Casts.CastRunnerTest do
use ExUnit.Case, async: true
use Essig.DataCase
alias Essig.Casts.CastRunner
alias Essig.Casts.MetaTable

setup %{test: test_name} do
Essig.Server.start_scope(test_name)
setup do
scope_uuid = Essig.UUID7.generate()
Essig.Server.start_scope(scope_uuid)
:ok
end

Expand All @@ -27,8 +28,19 @@ defmodule Essig.Casts.CastRunnerTest do
CastRunner.send_events(SampleCast2, events)

# Assert that the events were processed by the respective CastRunners
assert MetaTable.get(SampleCast1) == %{key: SampleCast1, max_id: 100, seq: 2}
assert MetaTable.get(SampleCast2) == %{key: SampleCast2, max_id: 100, seq: 2}
assert MetaTable.get(SampleCast1) == %CastRunner{
key: SampleCast1,
module: SampleCast1,
max_id: 100,
seq: 2
}

assert MetaTable.get(SampleCast2) == %CastRunner{
key: SampleCast2,
module: SampleCast2,
max_id: 100,
seq: 2
}
end
end

Expand All @@ -45,9 +57,20 @@ defmodule Essig.Casts.CastRunnerTest do
CastRunner.send_events(SampleCast2, events)
CastRunner.send_events(SampleCast2, events)

# # Assert the metadata for each CastRunner
assert MetaTable.get(SampleCast1) == %{key: SampleCast1, max_id: 100, seq: 2}
assert MetaTable.get(SampleCast2) == %{key: SampleCast2, max_id: 100, seq: 4}
# Assert the metadata for each CastRunner
assert MetaTable.get(SampleCast1) == %CastRunner{
key: SampleCast1,
module: SampleCast1,
max_id: 100,
seq: 2
}

assert MetaTable.get(SampleCast2) == %CastRunner{
key: SampleCast2,
module: SampleCast2,
max_id: 100,
seq: 4
}
end
end
end
2 changes: 1 addition & 1 deletion lib/casts/seq_checker_test.exs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
defmodule Essig.Casts.SeqCheckerTest do
use ExUnit.Case
use ExUnit.Case, async: true
alias Essig.Casts.SeqChecker

describe "check_reached/2" do
Expand Down
27 changes: 27 additions & 0 deletions lib/checker.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
defmodule Essig.Checker do
@moduledoc """
A small dev-only module to test the event store.
"""
def run do
scope_uuid = Essig.UUID7.generate()
stream_uuid = Essig.UUID7.generate()
run(scope_uuid, stream_uuid)
end

def run(scope_uuid, stream_uuid) do
Essig.Server.start_scope(scope_uuid)
Essig.Server.start_casts([SampleCast1])

seq = Essig.EventStore.last_seq(stream_uuid)

{:ok, %{events: _events}} =
Essig.EventStore.append_to_stream(stream_uuid, "trp", seq, [
%Sample.TestReports.Events.MasterReportAdded{path: "local/path/to", report: "report 1"},
%Sample.TestReports.Events.MasterReportAdded{path: "local/path/to", report: "report 2"},
%Sample.TestReports.Events.MasterReportAdded{path: "local/path/to", report: "report 3"}
])

# this will be unnecessary soon
# Essig.Casts.CastRunner.send_events(SampleCast1, events)
end
end
6 changes: 3 additions & 3 deletions lib/crud/casts_crud_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,21 @@ defmodule Essig.CastsCrudTest do
end

test "provides defaults for numeric values" do
uuid = Essig.Ecto.UUID7.generate()
uuid = Essig.UUID7.generate()
{:ok, cast} = CastsCrud.create_cast(%{module: "module-1", scope_uuid: uuid, seq: 1})
assert cast.status == :new
end

test "prevents duplicate casts with same module" do
uuid = Essig.Ecto.UUID7.generate()
uuid = Essig.UUID7.generate()
{:ok, _cast} = CastsCrud.create_cast(%{module: "module-1", scope_uuid: uuid, seq: 1})
{:error, changeset} = CastsCrud.create_cast(%{module: "module-1", scope_uuid: uuid, seq: 2})
errors = errors_on(changeset)
assert errors == %{scope_uuid: ["has already been taken"]}
end

test "allows updating the `max_id` value via upsert on (module)" do
uuid = Essig.Ecto.UUID7.generate()
uuid = Essig.UUID7.generate()
{:ok, cast} = CastsCrud.create_cast(%{module: "module-1", scope_uuid: uuid, seq: 1})

{:ok, cast2} =
Expand Down
4 changes: 2 additions & 2 deletions lib/crud/events_crud_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ defmodule Essig.Crud.EventsCrudTest do
end

test "creates proper events" do
stream_uuid = Essig.Ecto.UUID7.generate()
scope_uuid = Essig.Ecto.UUID7.generate()
stream_uuid = Essig.UUID7.generate()
scope_uuid = Essig.UUID7.generate()

{:ok, _stream} =
StreamsCrud.create_stream(%{
Expand Down
10 changes: 5 additions & 5 deletions lib/crud/streams_crud_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ defmodule Essig.Crud.StreamsCrudTest do
end

test "creates a minimal stream record" do
scope_uuid = Essig.Ecto.UUID7.generate()
scope_uuid = Essig.UUID7.generate()

{:ok, stream} =
StreamsCrud.create_stream(%{scope_uuid: scope_uuid, stream_type: "user", seq: 1})
Expand All @@ -19,8 +19,8 @@ defmodule Essig.Crud.StreamsCrudTest do
end

test "prevents duplicates" do
uuid = Essig.Ecto.UUID7.generate()
scope_uuid = Essig.Ecto.UUID7.generate()
uuid = Essig.UUID7.generate()
scope_uuid = Essig.UUID7.generate()

{:ok, stream} =
StreamsCrud.create_stream(%{
Expand All @@ -44,8 +44,8 @@ defmodule Essig.Crud.StreamsCrudTest do
end

test "updates the seq on equal streams (upsert_stream)" do
uuid = Essig.Ecto.UUID7.generate()
scope_uuid = Essig.Ecto.UUID7.generate()
uuid = Essig.UUID7.generate()
scope_uuid = Essig.UUID7.generate()

{:ok, stream} =
StreamsCrud.upsert_stream(%{
Expand Down
4 changes: 4 additions & 0 deletions lib/essig/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ defmodule Essig.Application do
def start(_type, _args) do
children = [
Essig.Repo,
Essig.RepoSingleConn,
{Phoenix.PubSub, name: Essig.PubSub},
Essig.PGNotifyListener,
Essig.Cache,
{Registry, keys: :unique, name: Essig.Scopes.Registry},
Essig.Scopes.DynamicSupervisor
]
Expand Down
34 changes: 34 additions & 0 deletions lib/event_store/append_to_stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@ defmodule Essig.EventStore.AppendToStream do
use Essig.Repo

def run(stream_uuid, stream_type, expected_seq, events) do
# To ensure sequential inserts only, we use locking.
# The likelihood of this triggering in production is low, but still possible.
# Locks are across all OS processes, since we use Postgres for this.
Essig.PGLock.with_lock("es-insert", fn ->
run_unprotected(stream_uuid, stream_type, expected_seq, events)
end)
end

defp run_unprotected(stream_uuid, stream_type, expected_seq, events) do
multi(stream_uuid, stream_type, expected_seq, events)
|> Repo.transaction()
end
Expand All @@ -24,6 +33,12 @@ defmodule Essig.EventStore.AppendToStream do
last_event = Enum.at(insert_events, -1)
Essig.Crud.StreamsCrud.update_stream(stream, %{seq: last_event.seq})
end)
|> Ecto.Multi.run(:signal_new_events, fn _repo, %{insert_events: insert_events} ->
last_event = Enum.at(insert_events, -1)
max_id = last_event.id
count = Enum.count(insert_events)
signal_new_events(stream_uuid, count, max_id)
end)
end

defp ensure_stream_exists(stream_uuid, stream_type) do
Expand Down Expand Up @@ -85,4 +100,23 @@ defmodule Essig.EventStore.AppendToStream do
events -> {:ok, Enum.reverse(events)}
end
end

defp signal_new_events(stream_uuid, count, max_id) do
scope_uuid = Essig.Context.current_scope()
bin_uuid = Ecto.UUID.dump!(scope_uuid)
stream_uuid = Ecto.UUID.dump!(stream_uuid)

{:ok, _} =
Repo.query(
"insert into essig_signals(scope_uuid, stream_uuid, count, max_id) values ($1, $2, $3, $4)",
[
bin_uuid,
stream_uuid,
count,
max_id
]
)

{:ok, true}
end
end
16 changes: 8 additions & 8 deletions lib/event_store/append_to_stream_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ defmodule Essig.EventStore.AppendToStreamTest do

describe "stream does not exist" do
setup do
Essig.Context.set_current_scope(Essig.Ecto.UUID7.generate())
stream_uuid = Essig.Ecto.UUID7.generate()
Essig.Context.set_current_scope(Essig.UUID7.generate())
stream_uuid = Essig.UUID7.generate()

e1 = %CustomApp.TestReports.Events.TicketMatchAdded{
match_kind: "auto",
Expand Down Expand Up @@ -48,8 +48,8 @@ defmodule Essig.EventStore.AppendToStreamTest do

describe "stream exists + expected value matches" do
setup do
Essig.Context.set_current_scope(Essig.Ecto.UUID7.generate())
stream_uuid = Essig.Ecto.UUID7.generate()
Essig.Context.set_current_scope(Essig.UUID7.generate())
stream_uuid = Essig.UUID7.generate()

e1 = %CustomApp.TestReports.Events.TicketMatchAdded{
match_kind: "auto",
Expand Down Expand Up @@ -97,8 +97,8 @@ defmodule Essig.EventStore.AppendToStreamTest do

describe "stream exists, yet expected seq does not match" do
test "returns errors" do
Essig.Context.set_current_scope(Essig.Ecto.UUID7.generate())
uuid = Essig.Ecto.UUID7.generate()
Essig.Context.set_current_scope(Essig.UUID7.generate())
uuid = Essig.UUID7.generate()

e1 = %CustomApp.TestReports.Events.TicketMatchAdded{
match_kind: "auto",
Expand All @@ -123,8 +123,8 @@ defmodule Essig.EventStore.AppendToStreamTest do

describe "stream exists, seq matches, yet stream type does not match" do
test "returns errors" do
Essig.Context.set_current_scope(Essig.Ecto.UUID7.generate())
uuid = Essig.Ecto.UUID7.generate()
Essig.Context.set_current_scope(Essig.UUID7.generate())
uuid = Essig.UUID7.generate()

e1 = %CustomApp.TestReports.Events.TicketMatchAdded{
match_kind: "auto",
Expand Down
2 changes: 1 addition & 1 deletion lib/event_store/base_query.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule EventStore.BaseQuery do
defmodule Essig.EventStore.BaseQuery do
alias Essig.Schemas.Event
use Essig.Repo

Expand Down
3 changes: 1 addition & 2 deletions lib/event_store/read_all_stream_backward.ex
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
defmodule Essig.EventStore.ReadAllStreamBackward do
alias Essig.Schemas.Event
use Essig.Repo

def run(from_id, amount) do
query(from_id, amount) |> Repo.all()
end

def query(from_id, amount) do
EventStore.BaseQuery.query()
Essig.EventStore.BaseQuery.query()
|> where([event], event.id < ^from_id)
|> order_by(desc: :id)
|> limit(^amount)
Expand Down
2 changes: 1 addition & 1 deletion lib/event_store/read_all_stream_forward.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule Essig.EventStore.ReadAllStreamForward do
end

def query(from_id, amount) do
EventStore.BaseQuery.query()
Essig.EventStore.BaseQuery.query()
|> where([event], event.id > ^from_id)
|> order_by(asc: :id)
|> limit(^amount)
Expand Down
Loading

0 comments on commit be97aeb

Please sign in to comment.