Skip to content

Commit

Permalink
Better module docs
Browse files Browse the repository at this point in the history
  • Loading branch information
sleipnir committed Nov 28, 2023
1 parent 20868cd commit 905c909
Show file tree
Hide file tree
Showing 2 changed files with 276 additions and 7 deletions.
163 changes: 156 additions & 7 deletions lib/actors/actor/caller_consumer.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
defmodule Actors.Actor.CallerConsumer do
@moduledoc """
An Elixir module representing a GenStage consumer responsible for handling
events initiated by `CallerProducer` and interacting with actors in the system.
"""
use GenStage
use Retry
Expand Down Expand Up @@ -46,11 +47,19 @@ defmodule Actors.Actor.CallerConsumer do

@erpc_timeout 5_000

@doc """
Starts the consumer process and subscribes to the `CallerProducer` GenStage.
"""
def start_link(opts \\ []) do
id = Keyword.get(opts, :id, 1)
GenStage.start_link(__MODULE__, opts, name: Module.concat(__MODULE__, "#{id}"))
end

@doc """
Initializes the GenStage consumer.
It subscribes to the `CallerProducer` GenStage with specified backpressure values.
"""
@impl true
def init(opts) do
{min_demand, max_demand} = get_backpressure_values_allowed(opts)
Expand All @@ -74,6 +83,11 @@ defmodule Actors.Actor.CallerConsumer do
backpressure_options
end

@doc """
Handles incoming events from the `CallerProducer` GenStage.
Dispatches events to the appropriate functions for further processing.
"""
@impl true
def handle_events(events, _from, state) do
if length(events) > 1,
Expand Down Expand Up @@ -114,6 +128,11 @@ defmodule Actors.Actor.CallerConsumer do
GenStage.reply(from, response)
end

@doc """
Registers actors in the system based on the provided registration request.
Handles registration requests and ensures actors are properly registered.
"""
def register(
%RegistrationRequest{
service_info: %ServiceInfo{} = _service_info,
Expand Down Expand Up @@ -163,6 +182,33 @@ defmodule Actors.Actor.CallerConsumer do
}
end

@doc """
Gets the state of the specified actor.
This function attempts to retrieve the state of the actor identified by the given
`ActorId`. It uses an exponential backoff strategy for retrying in case of errors
and logs any failures.
## Parameters
- `id` (%ActorId): The unique identifier of the actor.
## Returns
The state of the actor if successful, otherwise an error is raised.
## Retry Strategy
The function utilizes an exponential backoff strategy with randomized delays and
a maximum expiry time of 30,000 milliseconds.
## Errors
The function handles errors such as `:error`, `:exit`, `:noproc`, `:erpc`,
`:noconnection`, and `:timeout`. It also rescues `ErlangError` exceptions and logs
detailed error messages.
"""
def get_state(%ActorId{name: actor_name, system: system_name} = id) do
retry with: exponential_backoff() |> randomize |> expiry(30_000),
atoms: [:error, :exit, :noproc, :erpc, :noconnection, :timeout],
Expand All @@ -189,6 +235,35 @@ defmodule Actors.Actor.CallerConsumer do
end
end

@doc """
Spawns an actor or a group of actors based on the provided `SpawnRequest`.
This function is responsible for spawning actors based on the specified `SpawnRequest`.
It retrieves the hosts associated with the provided actor IDs and registers the actors.
Additionally, it handles cases where the system is in the process of draining or stopping.
## Parameters
- `spawn` (%SpawnRequest): The request containing information about the actors to spawn.
- `opts` (Keyword.t): Additional options for spawning the actors. Defaults to an empty keyword list.
## Returns
If successful, it returns `{:ok, %SpawnResponse{status: %RequestStatus{status: :OK, message: "Accepted"}}}`.
Otherwise, an error is raised.
## Actor Spawning Process
- Retrieves actor hosts based on actor IDs from the `ActorRegistry`.
- Filters the hosts based on the system's graceful shutdown status.
- Registers the selected hosts in the `ActorRegistry`.
- Returns a success response.
## Errors
- Raises an `ArgumentError` if attempting to spawn an unnamed actor that has not been registered before.
"""
def spawn_actor(spawn, opts \\ [])

def spawn_actor(%SpawnRequest{actors: actors} = _spawn, opts) do
Expand Down Expand Up @@ -219,6 +294,39 @@ defmodule Actors.Actor.CallerConsumer do
{:ok, %SpawnResponse{status: status}}
end

@doc """
Invokes an actor action with distributed tracing using OpenTelemetry.
This function performs an actor action invocation, incorporating distributed tracing
with OpenTelemetry. It sets up the tracing context, adds relevant attributes,
and handles asynchronous and synchronous invocations.
## Parameters
- `request` (%InvocationRequest): The request containing information about the invocation.
- `opts` (Keyword.t): Additional options for the invocation. Defaults to an empty keyword list.
## Returns
A tuple containing the status and the result of the invocation.
If the invocation is asynchronous, it returns `{:ok, :async}`.
## Tracing Context
The function sets up the tracing context and adds attributes related to the invocation.
It uses OpenTelemetry to trace the client invoke with the kind set to `:client`.
## Retry Mechanism
The function incorporates a retry mechanism with backoff, randomization, and timeout
to handle potential errors during the invocation.
## Error Handling
In case of errors during the invocation, appropriate logging and tracing events are added,
and the error is re-raised with a stack trace.
"""
def invoke_with_span(
%InvocationRequest{
actor: %Actor{id: %ActorId{name: _name, system: _actor_id_system} = actor_id} = actor,
Expand Down Expand Up @@ -364,12 +472,47 @@ defmodule Actors.Actor.CallerConsumer do
defp get_caller(nil), do: "external"
defp get_caller(caller), do: caller.name

defp do_lookup_action(
system_name,
{pooled, system_name, parent, %ActorId{name: actor_name} = actor_id} = actor_fqdn,
system,
action_fun
) do
@doc """
Performs the action of looking up or creating an actor based on the given parameters.
This function is responsible for looking up or creating an actor based on the specified actor
fully-qualified domain name (FQDN). It incorporates distributed tracing with OpenTelemetry
to capture relevant events and attributes during the lookup or creation process.
## Parameters
- `system_name` (String): The name of the actor system.
- `actor_fqdn` (tuple): A tuple representing the fully-qualified domain name (FQDN) of the actor.
- `system` (%ActorSystem{}): The actor system.
- `action_fun` (function): The function to be invoked once the actor is looked up or created.
It receives the actor reference and actor reference ID as parameters.
## Tracing Context
The function sets up a span with the name "actor-lookup" to trace the lookup or creation
process. It adds relevant attributes, such as the actor FQDN, to the tracing context.
## Retry Mechanism
The function incorporates a retry mechanism with backoff, randomization, and timeout
to handle potential errors during the lookup or creation process.
## Returns
The result of the `action_fun` function or an error tuple in case of failure.
## Error Handling
In case of errors during the lookup or creation process, appropriate logging and tracing events
are added, and the error is returned as part of the result tuple.
"""
def do_lookup_action(
system_name,
{pooled, system_name, parent, %ActorId{name: actor_name} = actor_id} = actor_fqdn,
system,
action_fun
) do
Tracer.with_span "actor-lookup" do
Tracer.set_attributes([{:actor_fqdn, actor_fqdn}])

Expand Down Expand Up @@ -494,6 +637,12 @@ defmodule Actors.Actor.CallerConsumer do
ActorEntity.invoke(actor_ref, request, opts)
end

@doc """
Tries to reactivate an actor.
Reactivation is attempted by looking up the actor in the registry
or creating a new actor if not found.
"""
@spec try_reactivate_actor(ActorSystem.t(), Actor.t(), any()) :: {:ok, any()} | {:error, any()}
def try_reactivate_actor(system, actor, opts \\ [])

Expand Down
Loading

0 comments on commit 905c909

Please sign in to comment.