Skip to content

Commit

Permalink
Merge pull request #368 from eigr/projection-actor
Browse files Browse the repository at this point in the history
projection actor intialization
  • Loading branch information
sleipnir authored Dec 6, 2024
2 parents 7a0832f + 816439f commit 1e7f160
Show file tree
Hide file tree
Showing 59 changed files with 3,597 additions and 779 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ erl_crash.dump

.elixir_ls/**

**/**/_build/
**/**/deps/

# Temporary Sqlite3 database
eigr-functions-db.db

Expand Down
4 changes: 2 additions & 2 deletions .tool-versions
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
elixir 1.15
erlang 25.3.2.8
elixir 1.17.2
erlang 26.1.2

405 changes: 293 additions & 112 deletions Makefile

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion compile-pb.sh
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,5 @@ for file in $PROTOS; do
--plugin=ProtobufGenerate.Plugins.GRPCWithOptions \
--one-file-per-module \
$BASE_PATH/$file
done
done

11 changes: 0 additions & 11 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,8 @@ config :logger, :console,

config :protobuf, extensions: :enabled

# config :prometheus, MetricsEndpoint.Exporter,
# path: "/metrics",
# format: :auto,
# registry: :default,
# auth: false

config :opentelemetry, :resource, service: %{name: "spawn"}

# config :opentelemetry,
# span_processor: :batch,
# traces_exporter: {:otel_exporter_stdout, []}
# #traces_exporter: {:otel_exporter_stdout, []}

config :opentelemetry,
:processors,
otel_batch_processor: %{
Expand Down
6 changes: 6 additions & 0 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,9 @@ if config_env() == :prod do
config :logger,
level: String.to_atom(System.get_env("SPAWN_PROXY_LOGGER_LEVEL", "info"))
end

# For OTLP set the following variables:

# OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://localhost:55681
# OTEL_EXPORTER_OTLP_TRACES_PROTOCOL=grpc
# OTEL_EXPORTER_OTLP_TRACES_COMPRESSION=gzip
10 changes: 9 additions & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
import Config

config :spawn_statestores, Statestores.Adapters.MySQLSnapshotAdapter,
config :spawn_statestores, Statestores.Adapters.MariaDBSnapshotAdapter,
pool: Ecto.Adapters.SQL.Sandbox,
ownership_timeout: :infinity,
pool_size: 24,
prepare: :unnamed,
queue_target: 5_000,
queue_interval: 500

config :spawn_statestores, Statestores.Adapters.MariaDBProjectionAdapter,
pool: Ecto.Adapters.SQL.Sandbox,
ownership_timeout: :infinity,
pool_size: 24,
Expand Down
40 changes: 21 additions & 19 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ services:
ports:
- 4317:4317
- 4318:4318
- 55681:55681
volumes:
- ./config/otel-collector-config.yaml:/conf/otel-collector-config.yaml
links:
Expand Down Expand Up @@ -57,6 +58,8 @@ services:
- mariadb:/var/lib/mysql
ports:
- "3307:3306"
networks:
- mysql-compose-network

adminer:
image: adminer
Expand All @@ -69,25 +72,24 @@ services:
image: 'nats:latest'
ports:
- "4222:4222"

spawn-proxy:
image: eigr/spawn-proxy:1.4.3
restart: always
environment:
PROXY_APP_NAME: spawn
PROXY_HTTP_PORT: 9001
PROXY_DATABASE_TYPE: postgres
PROXY_DATABASE_NAME: eigr-functions-db
PROXY_DATABASE_USERNAME: postgres
PROXY_DATABASE_SECRET: password
PROXY_DATABASE_HOST: localhost
PROXY_DATABASE_PORT: 5432
SPAWN_STATESTORE_KEY: 3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE=
USER_FUNCTION_HOST: 0.0.0.0 # Your NodeJS runtime host
USER_FUNCTION_PORT: 8090 # Your NodeJS runtime exposed port
# network_mode: host # only uncomment this if you're running your nodejs locally in Linux, check note below for Windows
ports:
- "9001:9001"
# spawn-proxy:
# image: eigr/spawn-proxy:1.4.3
# restart: always
# environment:
# PROXY_APP_NAME: spawn
# PROXY_HTTP_PORT: 9001
# PROXY_DATABASE_TYPE: postgres
# PROXY_DATABASE_NAME: eigr-functions-db
# PROXY_DATABASE_USERNAME: postgres
# PROXY_DATABASE_SECRET: password
# PROXY_DATABASE_HOST: localhost
# PROXY_DATABASE_PORT: 5432
# SPAWN_STATESTORE_KEY: 3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE=
# USER_FUNCTION_HOST: 0.0.0.0 # Your NodeJS runtime host
# USER_FUNCTION_PORT: 8090 # Your NodeJS runtime exposed port
# # network_mode: host # only uncomment this if you're running your nodejs locally in Linux, check note below for Windows
# ports:
# - "9001:9001"

networks:
mysql-compose-network:
Expand Down
21 changes: 21 additions & 0 deletions lib/actors/actor/entity/entity.ex
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,14 @@ defmodule Actors.Actor.Entity do
handle_invocation_request(invocation, opts, nil, state)
|> reply_to_noreply()

{:process_projection_events, events} ->
Invocation.process_projection_events(events, state)
|> reply_to_noreply()

{:replay, opts} ->
Invocation.replay(opts, state)
|> reply_to_noreply()

action ->
do_handle_cast(action, state)
end
Expand Down Expand Up @@ -536,6 +544,19 @@ defmodule Actors.Actor.Entity do
GenServer.call(via(ref), :get_state, timeout)
end

@doc """
When the Actor is a Projection the messages sent to the projection can be reprocessed.
See this for more information about this programming model.
"""
@spec replay(pid() | module(), Keyword.t()) :: {:error, term()} | :ok
def replay(ref, opts) when is_pid(ref) do
GenServer.cast(ref, {:replay, opts})
end

def replay(ref, opts) do
GenServer.cast(via(ref), {:replay, opts})
end

@doc """
Retrieve the health check readiness status.
"""
Expand Down
159 changes: 149 additions & 10 deletions lib/actors/actor/entity/invocation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,30 @@ defmodule Actors.Actor.Entity.Invocation do
require Logger
require OpenTelemetry.Tracer, as: Tracer

alias Actors.Actor.Entity.{EntityState, Lifecycle}
alias Actors.Exceptions.NotAuthorizedException
alias Actors.Actor.Entity.EntityState
alias Actors.Actor.Entity.Lifecycle
alias Actors.Actor.Entity.Lifecycle.StreamInitiator
alias Actors.Actor.InvocationScheduler
alias Actors.Exceptions.NotAuthorizedException
alias Actors.Actor.Pubsub

alias Eigr.Functions.Protocol.Actors.{
Actor,
ActorId,
ActorSettings,
ActorSystem,
ActorState,
Action,
FixedTimerAction
FixedTimerAction,
Metadata
}

alias Eigr.Functions.Protocol.{
ActorInvocation,
ActorInvocationResponse,
Broadcast,
Context,
Fact,
Forward,
InvocationRequest,
Pipe,
Expand All @@ -32,8 +38,9 @@ defmodule Actors.Actor.Entity.Invocation do
Noop
}

alias Actors.Actor.Pubsub
alias Spawn.Utils.Nats

import Spawn.Utils.AnySerializer, only: [any_pack!: 1, unpack_any_bin: 1]
import Spawn.Utils.Common, only: [return_and_maybe_hibernate: 1]

@default_actions [
Expand All @@ -53,6 +60,79 @@ defmodule Actors.Actor.Entity.Invocation do

@http_host_interface Actors.Actor.Interface.Http

def process_projection_events(messages, state) do
%EntityState{actor: %Actor{} = actor} = state

invocations =
messages
|> Enum.map(fn %Broadway.Message{data: %Fact{} = message} ->
system_name = Map.get(message.metadata, "spawn-system")
parent = Map.get(message.metadata, "actor-parent")
name = Map.get(message.metadata, "actor-name")
action = Map.get(message.metadata, "actor-action")

%InvocationRequest{
async: true,
system: %ActorSystem{name: system_name},
actor: %Actor{id: actor.id},
metadata: message.metadata,
action_name: action,
payload: parse_payload(unpack_any_bin(message.state)),
caller: %ActorId{name: name, system: system_name, parent: parent}
}
end)

spawn(fn ->
invocations
|> Flow.from_enumerable(min_demand: 1, max_demand: System.schedulers_online())
|> Flow.map(fn invocation ->
try do
Actors.invoke(invocation, span_ctx: Tracer.current_span_ctx())
catch
error ->
Logger.warning(
"Error during processing events on projection. Invocation: #{inspect(invocation)} Error: #{inspect(error)}"
)

:ok
end
end)
|> Flow.run()
end)

{:noreply, state}
end

defp parse_payload(response) do
case response do
nil -> {:noop, %Noop{}}
%Noop{} = noop -> {:noop, noop}
{:noop, %Noop{} = noop} -> {:noop, noop}
{_, nil} -> {:noop, %Noop{}}
{:value, response} -> {:value, any_pack!(response)}
response -> {:value, any_pack!(response)}
end
end

def replay(
call_opts,
%EntityState{
actor:
%Actor{
settings:
%ActorSettings{
kind: :PROJECTION
} = _settings
} = actor,
projection_stream_pid: stream_pid
} = state
) do
{:ok, newpid} = StreamInitiator.replay(stream_pid, actor, call_opts)
{:noreply, %{state | projection_stream_pid: newpid}}
end

def replay(_replaymsg, _call_opts, state), do: {:noreply, state}

def handle_timers([], _system, _actor), do: :ok

def handle_timers(timers, system, actor) when is_list(timers) do
Expand Down Expand Up @@ -289,12 +369,31 @@ defmodule Actors.Actor.Entity.Invocation do
request,
%ActorInvocationResponse{checkpoint: checkpoint} = response,
%EntityState{
actor:
%Actor{
id: id,
settings:
%ActorSettings{
kind: kind,
projection_settings: projection_settings
} = _settings
} = _actor,
revision: revision
} = state,
opts
) do
response_params = %{
actor_id: id,
kind: kind,
projection_settings: projection_settings,
request: request,
response: response,
state: state,
opts: opts
}

response =
case do_response(request, response, state, opts) do
case do_response(response_params) do
:noreply ->
{:noreply, state}
|> return_and_maybe_hibernate()
Expand Down Expand Up @@ -400,19 +499,59 @@ defmodule Actors.Actor.Entity.Invocation do
end

defp do_response(
_request,
%ActorInvocationResponse{workflow: workflow} = response,
_state,
_opts
%{
actor_id: id,
kind: kind,
projection_settings: settings,
request: request,
response: %ActorInvocationResponse{workflow: workflow} = response,
state: state,
opts: _opts
} = _params
)
when is_nil(workflow) or workflow == %{} do
:ok = do_handle_projection(id, request.action_name, settings, state)

response
end

defp do_response(request, response, state, opts) do
defp do_response(
%{
actor_id: id,
kind: kind,
projection_settings: settings,
request: request,
response: response,
state: state,
opts: opts
} = _params
) do
:ok = do_handle_projection(id, request.action_name, settings, state)

do_run_workflow(request, response, state, opts)
end

defp do_handle_projection(id, action, %{sourceable: true} = _settings, state) do
actor_name_or_parent = if id.parent == "", do: id.name, else: id.parent

subject = "actors.#{actor_name_or_parent}.#{id.name}.#{action}"
payload = Google.Protobuf.Any.encode(state.actor.state.state)

uuid = UUID.uuid4(:hex)

Gnat.pub(Nats.connection_name(), subject, payload,
headers: [
{"Nats-Msg-Id", uuid},
{"Spawn-System", "#{id.system}"},
{"Actor-Parent", "#{id.parent}"},
{"Actor-Name", "#{id.name}"},
{"Actor-Action", "#{action}"}
]
)
end

defp do_handle_projection(_id, _action, _settings, _state), do: :ok

defp do_run_workflow(
_request,
%ActorInvocationResponse{workflow: workflow} = response,
Expand Down
Loading

0 comments on commit 1e7f160

Please sign in to comment.