Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

projection actor intialization #368

Merged
merged 57 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
483b3c6
projection actor intialization
eliasdarruda Sep 8, 2024
9fdb3b2
Avoid concurrency
Sep 8, 2024
5221fe3
Avoid big batchers
Sep 8, 2024
54800f1
Refactor, rename. Only projections, we do not follow eventsourced fai…
Sep 8, 2024
5c8a701
strict ordering opts
eliasdarruda Sep 8, 2024
b5bb779
Avoid term EventSource from codebase
Sep 8, 2024
389faf8
Mix format
eliasdarruda Sep 8, 2024
d019f70
Minor adjust
Sep 9, 2024
88055c2
Merge branch 'projection-actor' of https://github.com/eigr/spawn into…
Sep 9, 2024
ea93316
Save stream consumer pid in actor state. Enable replay action!
Sep 9, 2024
6ced34b
Use actor on init instead of state.actor
Sep 9, 2024
e06bcaa
Fix state struct
Sep 9, 2024
2546f9b
Fix guard cause
Sep 9, 2024
3fc32ba
Fix guard clause
Sep 9, 2024
df7b4ed
Intial replay support
Sep 9, 2024
0bcc844
Add client replay api
Sep 9, 2024
8f4dcd1
Some configuration comments
Sep 9, 2024
191b218
Handle projection events
Sep 10, 2024
fd3ec64
Adjusts
Sep 10, 2024
360903f
Fix. Broadway processing logic
Sep 10, 2024
a41300e
Adjusts
Sep 10, 2024
98193ac
Add opentelemetry on proxy supervisor tree and change configurations …
Sep 11, 2024
e4e23c1
Opentelemetry adjust
Sep 11, 2024
df37d09
Adjusts
Sep 12, 2024
11a122a
Merge branch 'projection-actor' of https://github.com/eigr/spawn into…
Sep 12, 2024
bfff7a9
Update archs
Sep 12, 2024
dca1695
Spawn dev mode with nats container
eliasdarruda Sep 12, 2024
5b5c1d0
cli with nats
eliasdarruda Sep 12, 2024
f603ba7
Nats and proxy running in the CLI
eliasdarruda Sep 12, 2024
a87861e
Using command instead of wait strategy to up nats broker in spawn cli
Sep 13, 2024
9d9cdbb
Respect function spec types
Sep 13, 2024
3338527
Create projection requests
Sep 15, 2024
0ebee23
Spawn CLI working with NATS
eliasdarruda Sep 15, 2024
a9c950a
Add Statestores projection api
Sep 17, 2024
3d11079
Merge branch 'projection-actor' of https://github.com/eigr/spawn into…
Sep 17, 2024
15b9fb9
Fix some bugs in queries and add some tests
Sep 17, 2024
895b037
improve tests
Sep 17, 2024
89d70f4
Refactor. Deduplicate code
Sep 17, 2024
79f0ed8
Implement projection statestore behaviour
Sep 17, 2024
56d049a
Refactor
Sep 17, 2024
35d4b5e
Adjust image tag
Sep 17, 2024
24c4cbd
postgres correct table definition
eliasdarruda Sep 18, 2024
0829227
fix build issues on statestores
eliasdarruda Sep 18, 2024
92ea1fc
Update tests
Sep 18, 2024
6cfa3aa
Merge branch 'projection-actor' of https://github.com/eigr/spawn into…
Sep 18, 2024
559f411
Fix. Postgres projection adapter tests
Sep 19, 2024
5881e56
Merge branch 'main' into projection-actor
sleipnir Sep 19, 2024
8dcb969
Fix. Temporary return nil for native statestore
Sep 19, 2024
ccad5f1
Merge branch 'projection-actor' of https://github.com/eigr/spawn into…
Sep 19, 2024
4c38d99
Adjusts for native adapter
Sep 19, 2024
11f98ae
Fix table name
Sep 19, 2024
ba73cfd
Use exposed ports to gRPC
Sep 24, 2024
fb3e236
Merge branch 'main' into projection-actor
eliasdarruda Nov 27, 2024
7d0abb7
merge conflict and format
eliasdarruda Nov 27, 2024
68df32e
fix wrong parameters
eliasdarruda Nov 27, 2024
4aaf700
projection actor example working with elixir sdk
eliasdarruda Nov 28, 2024
816439f
working with clock actor sending messages to projection actor
eliasdarruda Nov 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ erl_crash.dump

.elixir_ls/**

**/**/_build/
**/**/deps/

# Temporary Sqlite3 database
eigr-functions-db.db

Expand Down
4 changes: 2 additions & 2 deletions .tool-versions
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
elixir 1.15
erlang 25.3.2.8
elixir 1.17.2
erlang 26.1.2

405 changes: 293 additions & 112 deletions Makefile

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion compile-pb.sh
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,5 @@ for file in $PROTOS; do
--plugin=ProtobufGenerate.Plugins.GRPCWithOptions \
--one-file-per-module \
$BASE_PATH/$file
done
done

11 changes: 0 additions & 11 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,8 @@ config :logger, :console,

config :protobuf, extensions: :enabled

# config :prometheus, MetricsEndpoint.Exporter,
# path: "/metrics",
# format: :auto,
# registry: :default,
# auth: false

config :opentelemetry, :resource, service: %{name: "spawn"}

# config :opentelemetry,
# span_processor: :batch,
# traces_exporter: {:otel_exporter_stdout, []}
# #traces_exporter: {:otel_exporter_stdout, []}

config :opentelemetry,
:processors,
otel_batch_processor: %{
Expand Down
6 changes: 6 additions & 0 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,9 @@ if config_env() == :prod do
config :logger,
level: String.to_atom(System.get_env("SPAWN_PROXY_LOGGER_LEVEL", "info"))
end

# For OTLP set the following variables:

# OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://localhost:55681
# OTEL_EXPORTER_OTLP_TRACES_PROTOCOL=grpc
# OTEL_EXPORTER_OTLP_TRACES_COMPRESSION=gzip
10 changes: 9 additions & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
import Config

config :spawn_statestores, Statestores.Adapters.MySQLSnapshotAdapter,
config :spawn_statestores, Statestores.Adapters.MariaDBSnapshotAdapter,
pool: Ecto.Adapters.SQL.Sandbox,
ownership_timeout: :infinity,
pool_size: 24,
prepare: :unnamed,
queue_target: 5_000,
queue_interval: 500

config :spawn_statestores, Statestores.Adapters.MariaDBProjectionAdapter,
pool: Ecto.Adapters.SQL.Sandbox,
ownership_timeout: :infinity,
pool_size: 24,
Expand Down
40 changes: 21 additions & 19 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ services:
ports:
- 4317:4317
- 4318:4318
- 55681:55681
volumes:
- ./config/otel-collector-config.yaml:/conf/otel-collector-config.yaml
links:
Expand Down Expand Up @@ -57,6 +58,8 @@ services:
- mariadb:/var/lib/mysql
ports:
- "3307:3306"
networks:
- mysql-compose-network

adminer:
image: adminer
Expand All @@ -69,25 +72,24 @@ services:
image: 'nats:latest'
ports:
- "4222:4222"

spawn-proxy:
image: eigr/spawn-proxy:1.4.3
restart: always
environment:
PROXY_APP_NAME: spawn
PROXY_HTTP_PORT: 9001
PROXY_DATABASE_TYPE: postgres
PROXY_DATABASE_NAME: eigr-functions-db
PROXY_DATABASE_USERNAME: postgres
PROXY_DATABASE_SECRET: password
PROXY_DATABASE_HOST: localhost
PROXY_DATABASE_PORT: 5432
SPAWN_STATESTORE_KEY: 3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE=
USER_FUNCTION_HOST: 0.0.0.0 # Your NodeJS runtime host
USER_FUNCTION_PORT: 8090 # Your NodeJS runtime exposed port
# network_mode: host # only uncomment this if you're running your nodejs locally in Linux, check note below for Windows
ports:
- "9001:9001"
# spawn-proxy:
# image: eigr/spawn-proxy:1.4.3
# restart: always
# environment:
# PROXY_APP_NAME: spawn
# PROXY_HTTP_PORT: 9001
# PROXY_DATABASE_TYPE: postgres
# PROXY_DATABASE_NAME: eigr-functions-db
# PROXY_DATABASE_USERNAME: postgres
# PROXY_DATABASE_SECRET: password
# PROXY_DATABASE_HOST: localhost
# PROXY_DATABASE_PORT: 5432
# SPAWN_STATESTORE_KEY: 3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE=
# USER_FUNCTION_HOST: 0.0.0.0 # Your NodeJS runtime host
# USER_FUNCTION_PORT: 8090 # Your NodeJS runtime exposed port
# # network_mode: host # only uncomment this if you're running your nodejs locally in Linux, check note below for Windows
# ports:
# - "9001:9001"

networks:
mysql-compose-network:
Expand Down
21 changes: 21 additions & 0 deletions lib/actors/actor/entity/entity.ex
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,14 @@ defmodule Actors.Actor.Entity do
handle_invocation_request(invocation, opts, nil, state)
|> reply_to_noreply()

{:process_projection_events, events} ->
Invocation.process_projection_events(events, state)
|> reply_to_noreply()

{:replay, opts} ->
Invocation.replay(opts, state)
|> reply_to_noreply()

action ->
do_handle_cast(action, state)
end
Expand Down Expand Up @@ -536,6 +544,19 @@ defmodule Actors.Actor.Entity do
GenServer.call(via(ref), :get_state, timeout)
end

@doc """
When the Actor is a Projection the messages sent to the projection can be reprocessed.
See this for more information about this programming model.
"""
@spec replay(pid() | module(), Keyword.t()) :: {:error, term()} | :ok
def replay(ref, opts) when is_pid(ref) do
GenServer.cast(ref, {:replay, opts})
end

def replay(ref, opts) do
GenServer.cast(via(ref), {:replay, opts})
end

@doc """
Retrieve the health check readiness status.
"""
Expand Down
159 changes: 149 additions & 10 deletions lib/actors/actor/entity/invocation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,30 @@ defmodule Actors.Actor.Entity.Invocation do
require Logger
require OpenTelemetry.Tracer, as: Tracer

alias Actors.Actor.Entity.{EntityState, Lifecycle}
alias Actors.Exceptions.NotAuthorizedException
alias Actors.Actor.Entity.EntityState
alias Actors.Actor.Entity.Lifecycle
alias Actors.Actor.Entity.Lifecycle.StreamInitiator
alias Actors.Actor.InvocationScheduler
alias Actors.Exceptions.NotAuthorizedException
alias Actors.Actor.Pubsub

alias Eigr.Functions.Protocol.Actors.{
Actor,
ActorId,
ActorSettings,
ActorSystem,
ActorState,
Action,
FixedTimerAction
FixedTimerAction,
Metadata
}

alias Eigr.Functions.Protocol.{
ActorInvocation,
ActorInvocationResponse,
Broadcast,
Context,
Fact,
Forward,
InvocationRequest,
Pipe,
Expand All @@ -32,8 +38,9 @@ defmodule Actors.Actor.Entity.Invocation do
Noop
}

alias Actors.Actor.Pubsub
alias Spawn.Utils.Nats

import Spawn.Utils.AnySerializer, only: [any_pack!: 1, unpack_any_bin: 1]
import Spawn.Utils.Common, only: [return_and_maybe_hibernate: 1]

@default_actions [
Expand All @@ -53,6 +60,79 @@ defmodule Actors.Actor.Entity.Invocation do

@http_host_interface Actors.Actor.Interface.Http

def process_projection_events(messages, state) do
%EntityState{actor: %Actor{} = actor} = state

invocations =
messages
|> Enum.map(fn %Broadway.Message{data: %Fact{} = message} ->
system_name = Map.get(message.metadata, "spawn-system")
parent = Map.get(message.metadata, "actor-parent")
name = Map.get(message.metadata, "actor-name")
action = Map.get(message.metadata, "actor-action")

%InvocationRequest{
async: true,
system: %ActorSystem{name: system_name},
actor: %Actor{id: actor.id},
metadata: message.metadata,
action_name: action,
payload: parse_payload(unpack_any_bin(message.state)),
caller: %ActorId{name: name, system: system_name, parent: parent}
}
end)

spawn(fn ->
invocations
|> Flow.from_enumerable(min_demand: 1, max_demand: System.schedulers_online())
|> Flow.map(fn invocation ->
try do
Actors.invoke(invocation, span_ctx: Tracer.current_span_ctx())
catch
error ->
Logger.warning(
"Error during processing events on projection. Invocation: #{inspect(invocation)} Error: #{inspect(error)}"
)

:ok
end
end)
|> Flow.run()
end)

{:noreply, state}
end

defp parse_payload(response) do
case response do
nil -> {:noop, %Noop{}}
%Noop{} = noop -> {:noop, noop}
{:noop, %Noop{} = noop} -> {:noop, noop}
{_, nil} -> {:noop, %Noop{}}
{:value, response} -> {:value, any_pack!(response)}
response -> {:value, any_pack!(response)}
end
end

def replay(
call_opts,
%EntityState{
actor:
%Actor{
settings:
%ActorSettings{
kind: :PROJECTION
} = _settings
} = actor,
projection_stream_pid: stream_pid
} = state
) do
{:ok, newpid} = StreamInitiator.replay(stream_pid, actor, call_opts)
{:noreply, %{state | projection_stream_pid: newpid}}
end

def replay(_replaymsg, _call_opts, state), do: {:noreply, state}

def handle_timers([], _system, _actor), do: :ok

def handle_timers(timers, system, actor) when is_list(timers) do
Expand Down Expand Up @@ -289,12 +369,31 @@ defmodule Actors.Actor.Entity.Invocation do
request,
%ActorInvocationResponse{checkpoint: checkpoint} = response,
%EntityState{
actor:
%Actor{
id: id,
settings:
%ActorSettings{
kind: kind,
projection_settings: projection_settings
} = _settings
} = _actor,
revision: revision
} = state,
opts
) do
response_params = %{
actor_id: id,
kind: kind,
projection_settings: projection_settings,
request: request,
response: response,
state: state,
opts: opts
}

response =
case do_response(request, response, state, opts) do
case do_response(response_params) do
:noreply ->
{:noreply, state}
|> return_and_maybe_hibernate()
Expand Down Expand Up @@ -400,19 +499,59 @@ defmodule Actors.Actor.Entity.Invocation do
end

defp do_response(
_request,
%ActorInvocationResponse{workflow: workflow} = response,
_state,
_opts
%{
actor_id: id,
kind: kind,
projection_settings: settings,
request: request,
response: %ActorInvocationResponse{workflow: workflow} = response,
state: state,
opts: _opts
} = _params
)
when is_nil(workflow) or workflow == %{} do
:ok = do_handle_projection(id, request.action_name, settings, state)

response
end

defp do_response(request, response, state, opts) do
defp do_response(
%{
actor_id: id,
kind: kind,
projection_settings: settings,
request: request,
response: response,
state: state,
opts: opts
} = _params
) do
:ok = do_handle_projection(id, request.action_name, settings, state)

do_run_workflow(request, response, state, opts)
end

defp do_handle_projection(id, action, %{sourceable: true} = _settings, state) do
actor_name_or_parent = if id.parent == "", do: id.name, else: id.parent

subject = "actors.#{actor_name_or_parent}.#{id.name}.#{action}"
payload = Google.Protobuf.Any.encode(state.actor.state.state)

uuid = UUID.uuid4(:hex)

Gnat.pub(Nats.connection_name(), subject, payload,
headers: [
{"Nats-Msg-Id", uuid},
{"Spawn-System", "#{id.system}"},
{"Actor-Parent", "#{id.parent}"},
{"Actor-Name", "#{id.name}"},
{"Actor-Action", "#{action}"}
]
)
end

defp do_handle_projection(_id, _action, _settings, _state), do: :ok

defp do_run_workflow(
_request,
%ActorInvocationResponse{workflow: workflow} = response,
Expand Down
Loading
Loading