Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nats state handoff instead of CRDT #396

Merged
merged 3 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ jobs:
- name: Install Protoc
uses: arduino/setup-protoc@v3

- name: Install NATS with JetStream
run: |
wget https://github.com/nats-io/nats-server/releases/download/v2.10.0/nats-server-v2.10.0-linux-amd64.tar.gz
tar -xvzf nats-server-v2.10.0-linux-amd64.tar.gz
sudo mv nats-server-v2.10.0-linux-amd64/nats-server /usr/local/bin/
nats-server --jetstream &

- name: Set up Elixir
uses: erlef/setup-beam@v1
with:
Expand All @@ -50,13 +57,13 @@ jobs:

- name: Run tests spawn
run: |
MIX_ENV=test PROXY_DATABASE_TYPE=native PROXY_CLUSTER_STRATEGY=gossip PROXY_DATABASE_POOL_SIZE=15 PROXY_HTTP_PORT=9005 SPAWN_STATESTORE_KEY=3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE= elixir --name [email protected] -S mix test
MIX_ENV=test PROXY_DATABASE_TYPE=native SPAWN_SUPERVISORS_STATE_HANDOFF_CONTROLLER=nats SPAWN_USE_INTERNAL_NATS=true SPAWN_PUBSUB_ADAPTER=nats PROXY_CLUSTER_STRATEGY=gossip PROXY_DATABASE_POOL_SIZE=15 PROXY_HTTP_PORT=9005 SPAWN_STATESTORE_KEY=3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE= elixir --name [email protected] -S mix test

- name: Run tests spawn_sdk
run: |
cd spawn_sdk/spawn_sdk
mix deps.get
MIX_ENV=test PROXY_DATABASE_TYPE=native PROXY_CLUSTER_STRATEGY=gossip PROXY_DATABASE_POOL_SIZE=15 PROXY_HTTP_PORT=9005 SPAWN_STATESTORE_KEY=3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE= elixir --name [email protected] -S mix test
MIX_ENV=test PROXY_DATABASE_TYPE=native SPAWN_SUPERVISORS_STATE_HANDOFF_CONTROLLER=nats SPAWN_USE_INTERNAL_NATS=true SPAWN_PUBSUB_ADAPTER=nats PROXY_CLUSTER_STRATEGY=gossip PROXY_DATABASE_POOL_SIZE=15 PROXY_HTTP_PORT=9005 SPAWN_STATESTORE_KEY=3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE= elixir --name [email protected] -S mix test
cd ../../

- name: Run tests spawn_statestores
Expand Down
31 changes: 30 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ test-spawn:
PROXY_CLUSTER_STRATEGY=gossip \
PROXY_HTTP_PORT=9005 \
SPAWN_STATESTORE_KEY=3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE= \
SPAWN_USE_INTERNAL_NATS=true \
SPAWN_SUPERVISORS_STATE_HANDOFF_CONTROLLER=nats \
elixir --name [email protected] -S mix test

test-sdk:
Expand All @@ -159,6 +161,7 @@ test-sdk:
PROXY_DATABASE_PORT=3307 \
PROXY_DATABASE_POOL_SIZE=50 \
SPAWN_USE_INTERNAL_NATS=true \
SPAWN_SUPERVISORS_STATE_HANDOFF_CONTROLLER=nats \
SPAWN_STATESTORE_KEY=3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE= \
elixir --name [email protected] -S mix test

Expand Down Expand Up @@ -290,7 +293,10 @@ run-proxy-local-nodejs-test:
PROXY_HTTP_PORT=9001 \
SPAWN_STATESTORE_KEY=3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE= \
PROXY_ACTOR_SYSTEM_NAME=SpawnSysTest \
SPAWN_SUPERVISORS_STATE_HANDOFF_CONTROLLER=crdt \
SPAWN_USE_INTERNAL_NATS=true \
PROXY_DATABASE_PORT=3307 \
SPAWN_PUBSUB_ADAPTER=nats \
SPAWN_SUPERVISORS_STATE_HANDOFF_CONTROLLER=nats \
iex --name [email protected] -S mix

run-proxy-local-dicegame:
Expand Down Expand Up @@ -330,8 +336,31 @@ run-sdk-local3:
PROXY_DATABASE_TYPE=mariadb \
PROXY_DATABASE_PORT=3307 \
SPAWN_STATESTORE_KEY=3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE= \
SPAWN_SUPERVISORS_STATE_HANDOFF_CONTROLLER=nats \
iex --name [email protected] -S mix

run-sdk-local4:
cd spawn_sdk/spawn_sdk_example && mix deps.get && \
PROXY_CLUSTER_STRATEGY=epmd \
SPAWN_USE_INTERNAL_NATS=true \
SPAWN_PUBSUB_ADAPTER=nats \
PROXY_DATABASE_TYPE=mariadb \
PROXY_DATABASE_PORT=3307 \
SPAWN_STATESTORE_KEY=3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE= \
SPAWN_SUPERVISORS_STATE_HANDOFF_CONTROLLER=nats \
iex --name [email protected] -S mix

run-sdk-local5:
cd spawn_sdk/spawn_sdk_example && mix deps.get && \
PROXY_CLUSTER_STRATEGY=epmd \
SPAWN_USE_INTERNAL_NATS=true \
SPAWN_PUBSUB_ADAPTER=nats \
PROXY_DATABASE_TYPE=mariadb \
PROXY_DATABASE_PORT=3307 \
SPAWN_STATESTORE_KEY=3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE= \
SPAWN_SUPERVISORS_STATE_HANDOFF_CONTROLLER=nats \
iex --name [email protected] -S mix

run-sdk-local-with-mariadb:
cd spawn_sdk/spawn_sdk_example && mix deps.get && \
PROXY_CLUSTER_STRATEGY=epmd \
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ services:
- "3307:3306"
networks:
- mysql-compose-network
command: ["--max_connections=1000"]

adminer:
image: adminer
Expand Down
11 changes: 10 additions & 1 deletion lib/actors/actor/caller_consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ defmodule Actors.Actor.CallerConsumer do
|> Map.values()
|> Enum.map(fn actor -> ActorPool.create_actor_host_pool(actor, opts) end)
|> List.flatten()
|> tap(fn hosts ->
:persistent_term.put(:local_requested_actors, hosts |> Enum.map(& &1.actor.id))
end)
|> Enum.filter(&(&1.node == Node.self()))
|> ActorRegistry.register()
|> tap(fn _sts -> warmup_actors(actor_system, actors, opts) end)
Expand Down Expand Up @@ -437,7 +440,13 @@ defmodule Actors.Actor.CallerConsumer do
end
end)
|> List.flatten()
|> Enum.filter(&(&1.node == Node.self()))

hosts =
if Config.get(:state_handoff_controller_adapter) == "crdt" do
Enum.filter(hosts, &(&1.node == Node.self()))
else
hosts
end

ActorRegistry.register(hosts)

Expand Down
109 changes: 3 additions & 106 deletions lib/actors/actor/entity/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule Actors.Actor.Pool do

require Logger

alias Actors.Registry.{ActorRegistry, HostActor}
alias Actors.Registry.HostActor

alias Eigr.Functions.Protocol.Actors.{
Actor,
Expand All @@ -15,8 +15,6 @@ defmodule Actors.Actor.Pool do

alias Spawn.Utils.Common

@http_host_interface Actors.Actor.Interface.Http

@doc """
Creates an actor host pool for a given pooled actor.

Expand All @@ -29,112 +27,11 @@ defmodule Actors.Actor.Pool do
"""
@spec create_actor_host_pool(Actor.t(), keyword()) :: list(HostActor.t())
def create_actor_host_pool(
%Actor{id: %ActorId{} = id, settings: %ActorSettings{} = settings} = actor,
%Actor{id: %ActorId{} = _id, settings: %ActorSettings{} = _settings} = actor,
opts
) do
opts = Keyword.merge(opts, hash: Common.actor_host_hash())

case ActorRegistry.get_hosts_by_actor(id) do
{:ok, actor_hosts} ->
if settings.kind == :POOLED do
build_pool(:distributed, actor, actor_hosts, opts)
else
actor_hosts
end

_ ->
if settings.kind == :POOLED do
build_pool(:local, actor, nil, opts)
else
[%HostActor{node: Node.self(), actor: actor, opts: opts}]
end
end
end

defp build_pool(
:local,
%Actor{
id: %ActorId{system: system, parent: _parent, name: name} = _id,
settings:
%ActorSettings{kind: :POOLED, min_pool_size: min, max_pool_size: max} = _settings
} = actor,
_hosts,
opts
) do
{_current_value, new_opts} =
Keyword.get_and_update(opts, :interface, fn current_value ->
case current_value do
nil ->
{@http_host_interface, @http_host_interface}

_ ->
{current_value, current_value}
end
end)

max_pool = if max < min, do: get_defaul_max_pool(min), else: max

Enum.into(
min..max_pool,
[],
fn index ->
name_alias = build_name_alias(name, index)

pooled_actor = %Actor{
actor
| id: %ActorId{system: system, parent: name_alias, name: name}
}

Logger.debug("Registering metadata for the Pooled Actor #{name} with Alias #{name_alias}")
%HostActor{node: Node.self(), actor: pooled_actor, opts: new_opts}
end
)
end

defp build_pool(
:distributed,
%Actor{
id: %ActorId{system: system, parent: _parent, name: name} = _id,
settings:
%ActorSettings{kind: :POOLED, min_pool_size: min, max_pool_size: max} = _settings
} = actor,
hosts,
opts
) do
{_current_value, new_opts} =
Keyword.get_and_update(opts, :interface, fn current_value ->
case current_value do
nil ->
{@http_host_interface, @http_host_interface}

_ ->
{current_value, current_value}
end
end)

max_pool = if max < min, do: get_defaul_max_pool(min), else: max

Enum.into(
min..max_pool,
[],
fn index ->
host = Enum.random(hosts)
name_alias = build_name_alias(name, index)

pooled_actor = %Actor{
actor
| id: %ActorId{system: system, parent: name_alias, name: name}
}

Logger.debug("Registering metadata for the Pooled Actor #{name} with Alias #{name_alias}")
%HostActor{node: host.node, actor: pooled_actor, opts: new_opts}
end
)
end

defp build_name_alias(name, index), do: "#{name}-#{index}"

defp get_defaul_max_pool(min_pool) do
length(Node.list() ++ [Node.self()]) * (System.schedulers_online() + min_pool)
[%HostActor{node: Node.self(), actor: actor, opts: opts}]
end
end
8 changes: 8 additions & 0 deletions lib/actors/config/persistent_term_config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,14 @@ if Code.ensure_loaded?(:persistent_term) do
persistent: true
)

"nats" ->
Application.put_env(
:spawn,
:state_handoff_controller_adapter,
Spawn.Cluster.StateHandoff.Controllers.NatsKvController,
persistent: true
)

_ ->
if Code.ensure_loaded?(Statestores.Supervisor) do
Application.put_env(
Expand Down
2 changes: 1 addition & 1 deletion lib/actors/registry/host_actor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Actors.Registry.HostActor do
defstruct actor: nil, node: nil, opts: nil

@type t :: %__MODULE__{
node: pid(),
node: node(),
actor: Actor.t(),
opts: Keyword.t()
}
Expand Down
Loading
Loading