From 905c90935d41843d68416af33beb93943c089254 Mon Sep 17 00:00:00 2001 From: Adriano Santos Date: Tue, 28 Nov 2023 09:44:33 -0300 Subject: [PATCH] Better module docs --- lib/actors/actor/caller_consumer.ex | 163 ++++++++++++++++++++++++++-- lib/actors/actor/caller_producer.ex | 120 ++++++++++++++++++++ 2 files changed, 276 insertions(+), 7 deletions(-) diff --git a/lib/actors/actor/caller_consumer.ex b/lib/actors/actor/caller_consumer.ex index 14d2bd34..74edbf49 100644 --- a/lib/actors/actor/caller_consumer.ex +++ b/lib/actors/actor/caller_consumer.ex @@ -1,6 +1,7 @@ defmodule Actors.Actor.CallerConsumer do @moduledoc """ - + An Elixir module representing a GenStage consumer responsible for handling + events initiated by `CallerProducer` and interacting with actors in the system. """ use GenStage use Retry @@ -46,11 +47,19 @@ defmodule Actors.Actor.CallerConsumer do @erpc_timeout 5_000 + @doc """ + Starts the consumer process and subscribes to the `CallerProducer` GenStage. + """ def start_link(opts \\ []) do id = Keyword.get(opts, :id, 1) GenStage.start_link(__MODULE__, opts, name: Module.concat(__MODULE__, "#{id}")) end + @doc """ + Initializes the GenStage consumer. + + It subscribes to the `CallerProducer` GenStage with specified backpressure values. + """ @impl true def init(opts) do {min_demand, max_demand} = get_backpressure_values_allowed(opts) @@ -74,6 +83,11 @@ defmodule Actors.Actor.CallerConsumer do backpressure_options end + @doc """ + Handles incoming events from the `CallerProducer` GenStage. + + Dispatches events to the appropriate functions for further processing. + """ @impl true def handle_events(events, _from, state) do if length(events) > 1, @@ -114,6 +128,11 @@ defmodule Actors.Actor.CallerConsumer do GenStage.reply(from, response) end + @doc """ + Registers actors in the system based on the provided registration request. + + Handles registration requests and ensures actors are properly registered. + """ def register( %RegistrationRequest{ service_info: %ServiceInfo{} = _service_info, @@ -163,6 +182,33 @@ defmodule Actors.Actor.CallerConsumer do } end + @doc """ + Gets the state of the specified actor. + + This function attempts to retrieve the state of the actor identified by the given + `ActorId`. It uses an exponential backoff strategy for retrying in case of errors + and logs any failures. + + ## Parameters + + - `id` (%ActorId): The unique identifier of the actor. + + ## Returns + + The state of the actor if successful, otherwise an error is raised. + + ## Retry Strategy + + The function utilizes an exponential backoff strategy with randomized delays and + a maximum expiry time of 30,000 milliseconds. + + ## Errors + + The function handles errors such as `:error`, `:exit`, `:noproc`, `:erpc`, + `:noconnection`, and `:timeout`. It also rescues `ErlangError` exceptions and logs + detailed error messages. + + """ def get_state(%ActorId{name: actor_name, system: system_name} = id) do retry with: exponential_backoff() |> randomize |> expiry(30_000), atoms: [:error, :exit, :noproc, :erpc, :noconnection, :timeout], @@ -189,6 +235,35 @@ defmodule Actors.Actor.CallerConsumer do end end + @doc """ + Spawns an actor or a group of actors based on the provided `SpawnRequest`. + + This function is responsible for spawning actors based on the specified `SpawnRequest`. + It retrieves the hosts associated with the provided actor IDs and registers the actors. + Additionally, it handles cases where the system is in the process of draining or stopping. + + ## Parameters + + - `spawn` (%SpawnRequest): The request containing information about the actors to spawn. + - `opts` (Keyword.t): Additional options for spawning the actors. Defaults to an empty keyword list. + + ## Returns + + If successful, it returns `{:ok, %SpawnResponse{status: %RequestStatus{status: :OK, message: "Accepted"}}}`. + Otherwise, an error is raised. + + ## Actor Spawning Process + + - Retrieves actor hosts based on actor IDs from the `ActorRegistry`. + - Filters the hosts based on the system's graceful shutdown status. + - Registers the selected hosts in the `ActorRegistry`. + - Returns a success response. + + ## Errors + + - Raises an `ArgumentError` if attempting to spawn an unnamed actor that has not been registered before. + + """ def spawn_actor(spawn, opts \\ []) def spawn_actor(%SpawnRequest{actors: actors} = _spawn, opts) do @@ -219,6 +294,39 @@ defmodule Actors.Actor.CallerConsumer do {:ok, %SpawnResponse{status: status}} end + @doc """ + Invokes an actor action with distributed tracing using OpenTelemetry. + + This function performs an actor action invocation, incorporating distributed tracing + with OpenTelemetry. It sets up the tracing context, adds relevant attributes, + and handles asynchronous and synchronous invocations. + + ## Parameters + + - `request` (%InvocationRequest): The request containing information about the invocation. + - `opts` (Keyword.t): Additional options for the invocation. Defaults to an empty keyword list. + + ## Returns + + A tuple containing the status and the result of the invocation. + If the invocation is asynchronous, it returns `{:ok, :async}`. + + ## Tracing Context + + The function sets up the tracing context and adds attributes related to the invocation. + It uses OpenTelemetry to trace the client invoke with the kind set to `:client`. + + ## Retry Mechanism + + The function incorporates a retry mechanism with backoff, randomization, and timeout + to handle potential errors during the invocation. + + ## Error Handling + + In case of errors during the invocation, appropriate logging and tracing events are added, + and the error is re-raised with a stack trace. + + """ def invoke_with_span( %InvocationRequest{ actor: %Actor{id: %ActorId{name: _name, system: _actor_id_system} = actor_id} = actor, @@ -364,12 +472,47 @@ defmodule Actors.Actor.CallerConsumer do defp get_caller(nil), do: "external" defp get_caller(caller), do: caller.name - defp do_lookup_action( - system_name, - {pooled, system_name, parent, %ActorId{name: actor_name} = actor_id} = actor_fqdn, - system, - action_fun - ) do + @doc """ + Performs the action of looking up or creating an actor based on the given parameters. + + This function is responsible for looking up or creating an actor based on the specified actor + fully-qualified domain name (FQDN). It incorporates distributed tracing with OpenTelemetry + to capture relevant events and attributes during the lookup or creation process. + + ## Parameters + + - `system_name` (String): The name of the actor system. + - `actor_fqdn` (tuple): A tuple representing the fully-qualified domain name (FQDN) of the actor. + - `system` (%ActorSystem{}): The actor system. + - `action_fun` (function): The function to be invoked once the actor is looked up or created. + It receives the actor reference and actor reference ID as parameters. + + ## Tracing Context + + The function sets up a span with the name "actor-lookup" to trace the lookup or creation + process. It adds relevant attributes, such as the actor FQDN, to the tracing context. + + ## Retry Mechanism + + The function incorporates a retry mechanism with backoff, randomization, and timeout + to handle potential errors during the lookup or creation process. + + ## Returns + + The result of the `action_fun` function or an error tuple in case of failure. + + ## Error Handling + + In case of errors during the lookup or creation process, appropriate logging and tracing events + are added, and the error is returned as part of the result tuple. + + """ + def do_lookup_action( + system_name, + {pooled, system_name, parent, %ActorId{name: actor_name} = actor_id} = actor_fqdn, + system, + action_fun + ) do Tracer.with_span "actor-lookup" do Tracer.set_attributes([{:actor_fqdn, actor_fqdn}]) @@ -494,6 +637,12 @@ defmodule Actors.Actor.CallerConsumer do ActorEntity.invoke(actor_ref, request, opts) end + @doc """ + Tries to reactivate an actor. + + Reactivation is attempted by looking up the actor in the registry + or creating a new actor if not found. + """ @spec try_reactivate_actor(ActorSystem.t(), Actor.t(), any()) :: {:ok, any()} | {:error, any()} def try_reactivate_actor(system, actor, opts \\ []) diff --git a/lib/actors/actor/caller_producer.ex b/lib/actors/actor/caller_producer.ex index d2e57e27..4805eb45 100644 --- a/lib/actors/actor/caller_producer.ex +++ b/lib/actors/actor/caller_producer.ex @@ -1,4 +1,41 @@ defmodule Actors.Actor.CallerProducer do + @moduledoc """ + # Actors.Actor.CallerProducer + This module defines a GenStage producer responsible for managing actor-related + operations such as retrieving actor state, actor registration, actor spawning, and actor invocation. + + ## Client API + + The client API provides functions for interacting with actors. + These functions can be used to initiate operations such as getting actor state, + registering actors, spawning new actors, and invoking actors. + + ### Functions + + - [`start_link/1`](#start_link-1): Starts the CallerProducer process. + - [`get_state/2`](#get_state-2): Retrieves the state of a specified actor. + - [`register/2`](#register-2): Registers an actor with the specified registration request. + - [`spawn_actor/2`](#spawn_actor-2): Spawns an actor based on the specified spawn request. + - [`invoke/2`](#invoke-2): Invokes an actor with the specified invocation request. + - [`enqueue/1`](#enqueue-1): Enqueues an event for processing. + + ## Server API + + The server API includes functions that handle the GenStage behavior. + These functions manage the state and processing of events. + + ### Functions + + - [`init/1`](#init-1): Initializes the GenStage process. + - [`handle_demand/2`](#handle_demand-2): Handles demand requests from consumers. + - [`handle_call/3`](#handle_call-3): Handles call requests from consumers. + - [`handle_cast/2`](#handle_cast-2): Handles cast requests from consumers. + + ## Usage + To interact with this module, use the client API functions provided. + These functions handle backpressure if enabled in the configuration. + + """ use GenStage require Logger @@ -16,11 +53,39 @@ defmodule Actors.Actor.CallerProducer do # Client API + @doc """ + Starts the CallerProducer process. + + ## Parameters + + - `state` (any): Initial state for the process. + + ## Returns + + - `:ignore`: If the process is already running. + - `{:error, reason}`: If an error occurs during process initialization. + - `{:ok, pid}`: If the process starts successfully. + + """ @spec start_link(any) :: :ignore | {:error, any} | {:ok, pid} def start_link(state \\ []) do GenStage.start_link(__MODULE__, state, name: __MODULE__) end + @doc """ + Retrieves the state of a specified actor. + + ## Parameters + + - `actor_id` (ActorId.t()): The ID of the actor. + - `opts` (any): Additional options. + + ## Returns + + - `{:ok, state}`: If the state is successfully retrieved. + - `{:error, reason}`: If an error occurs during the operation. + + """ @spec get_state(ActorId.t()) :: {:ok, term()} | {:error, term()} def get_state(actor_id, opts \\ []) do if Config.get(:actors_global_backpressure_enabled) do @@ -30,6 +95,20 @@ defmodule Actors.Actor.CallerProducer do end end + @doc """ + Registers an actor with the specified registration request. + + ## Parameters + + - `registration` (RegistrationRequest.t()): The registration request. + - `opts` (any): Additional options. + + ## Returns + + - `{:ok, response}`: If the actor is successfully registered. + - `{:error, response}`: If an error occurs during the registration. + + """ @spec register(RegistrationRequest.t(), any()) :: {:ok, RegistrationResponse.t()} | {:error, RegistrationResponse.t()} def register(registration, opts \\ []) do @@ -40,6 +119,20 @@ defmodule Actors.Actor.CallerProducer do end end + @doc """ + Spawns an actor based on the specified spawn request. + + ## Parameters + + - `spawn_req` (SpawnRequest.t()): The spawn request. + - `opts` (any): Additional options. + + ## Returns + + - `{:ok, response}`: If the actor is successfully spawned. + - `{:error, response}`: If an error occurs during the spawning. + + """ @spec spawn_actor(SpawnRequest.t(), any()) :: {:ok, SpawnResponse.t()} def spawn_actor(spawn_req, opts \\ []) do if Config.get(:actors_global_backpressure_enabled) do @@ -49,6 +142,21 @@ defmodule Actors.Actor.CallerProducer do end end + @doc """ + Invokes an actor with the specified invocation request. + + ## Parameters + + - `request` (InvocationRequest.t()): The invocation request. + - `opts` (any): Additional options. + + ## Returns + + - `{:ok, :async}`: If the invocation is asynchronous. + - `{:ok, result}`: If the invocation is successful. + - `{:error, reason}`: If an error occurs during the invocation. + + """ @spec invoke(InvocationRequest.t()) :: {:ok, :async} | {:ok, term()} | {:error, term()} def invoke(request, opts \\ []) @@ -73,26 +181,38 @@ defmodule Actors.Actor.CallerProducer do end end + @doc """ + Enqueues an event for processing. + + ## Parameters + + - `event` (any): The event to be enqueued. + + """ def enqueue(event) do GenStage.call(__MODULE__, {:enqueue, event}) end # Server API + @doc false def init(_state) do {:producer, {:queue.new(), 0}} end + @doc false def handle_demand(incoming_demand, {queue, pending_demand}) do Logger.debug("Consumer pull demand of: #{incoming_demand} elements.") dispatch_events(queue, incoming_demand + pending_demand, []) end + @doc false def handle_call({:enqueue, event}, from, {queue, pending_demand}) do queue = :queue.in({from, event}, queue) dispatch_events(queue, pending_demand, []) end + @doc false def handle_cast({:enqueue, event}, {queue, pending_demand}) do queue = :queue.in({:fake_from, event}, queue) dispatch_events(queue, pending_demand, [])