Skip to content

Commit

Permalink
Merge pull request #396 from eigr/nats-state-handoff
Browse files Browse the repository at this point in the history
Nats state handoff instead of CRDT
  • Loading branch information
sleipnir authored Dec 20, 2024
2 parents c410dc9 + ab3dbcc commit f56a0b9
Show file tree
Hide file tree
Showing 23 changed files with 559 additions and 279 deletions.
11 changes: 9 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ jobs:
- name: Install Protoc
uses: arduino/setup-protoc@v3

- name: Install NATS with JetStream
run: |
wget https://github.com/nats-io/nats-server/releases/download/v2.10.0/nats-server-v2.10.0-linux-amd64.tar.gz
tar -xvzf nats-server-v2.10.0-linux-amd64.tar.gz
sudo mv nats-server-v2.10.0-linux-amd64/nats-server /usr/local/bin/
nats-server --jetstream &
- name: Set up Elixir
uses: erlef/setup-beam@v1
with:
Expand All @@ -50,13 +57,13 @@ jobs:
- name: Run tests spawn
run: |
MIX_ENV=test PROXY_DATABASE_TYPE=native PROXY_CLUSTER_STRATEGY=gossip PROXY_DATABASE_POOL_SIZE=15 PROXY_HTTP_PORT=9005 SPAWN_STATESTORE_KEY=3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE= elixir --name [email protected] -S mix test
MIX_ENV=test PROXY_DATABASE_TYPE=native SPAWN_SUPERVISORS_STATE_HANDOFF_CONTROLLER=nats SPAWN_USE_INTERNAL_NATS=true SPAWN_PUBSUB_ADAPTER=nats PROXY_CLUSTER_STRATEGY=gossip PROXY_DATABASE_POOL_SIZE=15 PROXY_HTTP_PORT=9005 SPAWN_STATESTORE_KEY=3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE= elixir --name [email protected] -S mix test
- name: Run tests spawn_sdk
run: |
cd spawn_sdk/spawn_sdk
mix deps.get
MIX_ENV=test PROXY_DATABASE_TYPE=native PROXY_CLUSTER_STRATEGY=gossip PROXY_DATABASE_POOL_SIZE=15 PROXY_HTTP_PORT=9005 SPAWN_STATESTORE_KEY=3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE= elixir --name [email protected] -S mix test
MIX_ENV=test PROXY_DATABASE_TYPE=native SPAWN_SUPERVISORS_STATE_HANDOFF_CONTROLLER=nats SPAWN_USE_INTERNAL_NATS=true SPAWN_PUBSUB_ADAPTER=nats PROXY_CLUSTER_STRATEGY=gossip PROXY_DATABASE_POOL_SIZE=15 PROXY_HTTP_PORT=9005 SPAWN_STATESTORE_KEY=3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE= elixir --name [email protected] -S mix test
cd ../../
- name: Run tests spawn_statestores
Expand Down
31 changes: 30 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ test-spawn:
PROXY_CLUSTER_STRATEGY=gossip \
PROXY_HTTP_PORT=9005 \
SPAWN_STATESTORE_KEY=3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE= \
SPAWN_USE_INTERNAL_NATS=true \
SPAWN_SUPERVISORS_STATE_HANDOFF_CONTROLLER=nats \
elixir --name [email protected] -S mix test

test-sdk:
Expand All @@ -159,6 +161,7 @@ test-sdk:
PROXY_DATABASE_PORT=3307 \
PROXY_DATABASE_POOL_SIZE=50 \
SPAWN_USE_INTERNAL_NATS=true \
SPAWN_SUPERVISORS_STATE_HANDOFF_CONTROLLER=nats \
SPAWN_STATESTORE_KEY=3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE= \
elixir --name [email protected] -S mix test

Expand Down Expand Up @@ -290,7 +293,10 @@ run-proxy-local-nodejs-test:
PROXY_HTTP_PORT=9001 \
SPAWN_STATESTORE_KEY=3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE= \
PROXY_ACTOR_SYSTEM_NAME=SpawnSysTest \
SPAWN_SUPERVISORS_STATE_HANDOFF_CONTROLLER=crdt \
SPAWN_USE_INTERNAL_NATS=true \
PROXY_DATABASE_PORT=3307 \
SPAWN_PUBSUB_ADAPTER=nats \
SPAWN_SUPERVISORS_STATE_HANDOFF_CONTROLLER=nats \
iex --name [email protected] -S mix

run-proxy-local-dicegame:
Expand Down Expand Up @@ -330,8 +336,31 @@ run-sdk-local3:
PROXY_DATABASE_TYPE=mariadb \
PROXY_DATABASE_PORT=3307 \
SPAWN_STATESTORE_KEY=3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE= \
SPAWN_SUPERVISORS_STATE_HANDOFF_CONTROLLER=nats \
iex --name [email protected] -S mix

run-sdk-local4:
cd spawn_sdk/spawn_sdk_example && mix deps.get && \
PROXY_CLUSTER_STRATEGY=epmd \
SPAWN_USE_INTERNAL_NATS=true \
SPAWN_PUBSUB_ADAPTER=nats \
PROXY_DATABASE_TYPE=mariadb \
PROXY_DATABASE_PORT=3307 \
SPAWN_STATESTORE_KEY=3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE= \
SPAWN_SUPERVISORS_STATE_HANDOFF_CONTROLLER=nats \
iex --name [email protected] -S mix

run-sdk-local5:
cd spawn_sdk/spawn_sdk_example && mix deps.get && \
PROXY_CLUSTER_STRATEGY=epmd \
SPAWN_USE_INTERNAL_NATS=true \
SPAWN_PUBSUB_ADAPTER=nats \
PROXY_DATABASE_TYPE=mariadb \
PROXY_DATABASE_PORT=3307 \
SPAWN_STATESTORE_KEY=3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE= \
SPAWN_SUPERVISORS_STATE_HANDOFF_CONTROLLER=nats \
iex --name [email protected] -S mix

run-sdk-local-with-mariadb:
cd spawn_sdk/spawn_sdk_example && mix deps.get && \
PROXY_CLUSTER_STRATEGY=epmd \
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ services:
- "3307:3306"
networks:
- mysql-compose-network
command: ["--max_connections=1000"]

adminer:
image: adminer
Expand Down
11 changes: 10 additions & 1 deletion lib/actors/actor/caller_consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ defmodule Actors.Actor.CallerConsumer do
|> Map.values()
|> Enum.map(fn actor -> ActorPool.create_actor_host_pool(actor, opts) end)
|> List.flatten()
|> tap(fn hosts ->
:persistent_term.put(:local_requested_actors, hosts |> Enum.map(& &1.actor.id))
end)
|> Enum.filter(&(&1.node == Node.self()))
|> ActorRegistry.register()
|> tap(fn _sts -> warmup_actors(actor_system, actors, opts) end)
Expand Down Expand Up @@ -437,7 +440,13 @@ defmodule Actors.Actor.CallerConsumer do
end
end)
|> List.flatten()
|> Enum.filter(&(&1.node == Node.self()))

hosts =
if Config.get(:state_handoff_controller_adapter) == "crdt" do
Enum.filter(hosts, &(&1.node == Node.self()))
else
hosts
end

ActorRegistry.register(hosts)

Expand Down
109 changes: 3 additions & 106 deletions lib/actors/actor/entity/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule Actors.Actor.Pool do

require Logger

alias Actors.Registry.{ActorRegistry, HostActor}
alias Actors.Registry.HostActor

alias Eigr.Functions.Protocol.Actors.{
Actor,
Expand All @@ -15,8 +15,6 @@ defmodule Actors.Actor.Pool do

alias Spawn.Utils.Common

@http_host_interface Actors.Actor.Interface.Http

@doc """
Creates an actor host pool for a given pooled actor.
Expand All @@ -29,112 +27,11 @@ defmodule Actors.Actor.Pool do
"""
@spec create_actor_host_pool(Actor.t(), keyword()) :: list(HostActor.t())
def create_actor_host_pool(
%Actor{id: %ActorId{} = id, settings: %ActorSettings{} = settings} = actor,
%Actor{id: %ActorId{} = _id, settings: %ActorSettings{} = _settings} = actor,
opts
) do
opts = Keyword.merge(opts, hash: Common.actor_host_hash())

case ActorRegistry.get_hosts_by_actor(id) do
{:ok, actor_hosts} ->
if settings.kind == :POOLED do
build_pool(:distributed, actor, actor_hosts, opts)
else
actor_hosts
end

_ ->
if settings.kind == :POOLED do
build_pool(:local, actor, nil, opts)
else
[%HostActor{node: Node.self(), actor: actor, opts: opts}]
end
end
end

defp build_pool(
:local,
%Actor{
id: %ActorId{system: system, parent: _parent, name: name} = _id,
settings:
%ActorSettings{kind: :POOLED, min_pool_size: min, max_pool_size: max} = _settings
} = actor,
_hosts,
opts
) do
{_current_value, new_opts} =
Keyword.get_and_update(opts, :interface, fn current_value ->
case current_value do
nil ->
{@http_host_interface, @http_host_interface}

_ ->
{current_value, current_value}
end
end)

max_pool = if max < min, do: get_defaul_max_pool(min), else: max

Enum.into(
min..max_pool,
[],
fn index ->
name_alias = build_name_alias(name, index)

pooled_actor = %Actor{
actor
| id: %ActorId{system: system, parent: name_alias, name: name}
}

Logger.debug("Registering metadata for the Pooled Actor #{name} with Alias #{name_alias}")
%HostActor{node: Node.self(), actor: pooled_actor, opts: new_opts}
end
)
end

defp build_pool(
:distributed,
%Actor{
id: %ActorId{system: system, parent: _parent, name: name} = _id,
settings:
%ActorSettings{kind: :POOLED, min_pool_size: min, max_pool_size: max} = _settings
} = actor,
hosts,
opts
) do
{_current_value, new_opts} =
Keyword.get_and_update(opts, :interface, fn current_value ->
case current_value do
nil ->
{@http_host_interface, @http_host_interface}

_ ->
{current_value, current_value}
end
end)

max_pool = if max < min, do: get_defaul_max_pool(min), else: max

Enum.into(
min..max_pool,
[],
fn index ->
host = Enum.random(hosts)
name_alias = build_name_alias(name, index)

pooled_actor = %Actor{
actor
| id: %ActorId{system: system, parent: name_alias, name: name}
}

Logger.debug("Registering metadata for the Pooled Actor #{name} with Alias #{name_alias}")
%HostActor{node: host.node, actor: pooled_actor, opts: new_opts}
end
)
end

defp build_name_alias(name, index), do: "#{name}-#{index}"

defp get_defaul_max_pool(min_pool) do
length(Node.list() ++ [Node.self()]) * (System.schedulers_online() + min_pool)
[%HostActor{node: Node.self(), actor: actor, opts: opts}]
end
end
8 changes: 8 additions & 0 deletions lib/actors/config/persistent_term_config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,14 @@ if Code.ensure_loaded?(:persistent_term) do
persistent: true
)

"nats" ->
Application.put_env(
:spawn,
:state_handoff_controller_adapter,
Spawn.Cluster.StateHandoff.Controllers.NatsKvController,
persistent: true
)

_ ->
if Code.ensure_loaded?(Statestores.Supervisor) do
Application.put_env(
Expand Down
2 changes: 1 addition & 1 deletion lib/actors/registry/host_actor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Actors.Registry.HostActor do
defstruct actor: nil, node: nil, opts: nil

@type t :: %__MODULE__{
node: pid(),
node: node(),
actor: Actor.t(),
opts: Keyword.t()
}
Expand Down
Loading

0 comments on commit f56a0b9

Please sign in to comment.