Skip to content

Commit

Permalink
Refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanjos committed Jun 21, 2024
1 parent bfc4657 commit 583c46f
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 90 deletions.
99 changes: 25 additions & 74 deletions lib/absinthe/subscription.ex
Original file line number Diff line number Diff line change
Expand Up @@ -148,83 +148,34 @@ defmodule Absinthe.Subscription do
source: doc.source
}

storage_module = storage_implementation(pubsub)

:telemetry.span(
[:absinthe, :subscription, :document, :subscribe],
%{
doc_id: doc_id,
field_keys: field_keys,
storage_module: storage_module
},
fn ->
storage_process_name = storage_name(pubsub)
storage_module.put(storage_process_name, doc_id, doc_value)
result = storage_module.subscribe(storage_process_name, doc_id, field_keys)

{result,
%{
doc_id: doc_id,
field_keys: field_keys,
storage_module: storage_module
}}
end
)
storage_module = document_storage(pubsub)
storage_process_name = document_storage_name(pubsub)

storage_module.put(storage_process_name, doc_id, doc_value)
storage_module.subscribe(storage_process_name, doc_id, field_keys)
end

@doc false
def unsubscribe(pubsub, doc_id) do
storage_module = storage_implementation(pubsub)

:telemetry.span(
[:absinthe, :subscription, :document, :unsubscribe],
%{
doc_id: doc_id,
storage_module: storage_module
},
fn ->
storage_process_name = storage_name(pubsub)
storage_module.unsubscribe(storage_process_name, doc_id)
result = storage_module.delete(storage_process_name, doc_id)

{result,
%{
doc_id: doc_id,
storage_module: storage_module
}}
end
)
storage_module = document_storage(pubsub)
storage_process_name = document_storage_name(pubsub)

storage_module.unsubscribe(storage_process_name, doc_id)
storage_module.delete(storage_process_name, doc_id)
end

@doc false
def get(pubsub, key) do
storage_module = storage_implementation(pubsub)

:telemetry.span(
[:absinthe, :subscription, :document, :get],
%{
field_key: key,
storage_module: storage_module
},
fn ->
storage_process_name = storage_name(pubsub)

result =
storage_process_name
|> storage_module.get_docs_by_field_key(key)
|> Enum.map(fn {doc_id, %{initial_phases: initial_phases} = doc} ->
initial_phases = PipelineSerializer.unpack(initial_phases)
{doc_id, Map.put(doc, :initial_phases, initial_phases)}
end)
|> Map.new()

{result,
%{
field_key: key,
storage_module: storage_module
}}
end
)
storage_module = document_storage(pubsub)
storage_process_name = document_storage_name(pubsub)

storage_process_name
|> storage_module.get_docs_by_field_key(key)
|> Enum.map(fn {doc_id, %{initial_phases: initial_phases} = doc} ->
initial_phases = PipelineSerializer.unpack(initial_phases)
{doc_id, Map.put(doc, :initial_phases, initial_phases)}
end)
|> Map.new()
end

@doc false
Expand All @@ -233,17 +184,17 @@ defmodule Absinthe.Subscription do
end

@doc false
def storage_name(pubsub) do
def document_storage_name(pubsub) do
Module.concat([pubsub, :Storage])
end

def storage_implementation(pubsub) do
{:ok, storage_implementation} =
def document_storage(pubsub) do
{:ok, document_storage} =
pubsub
|> registry_name
|> Registry.meta(:storage_implementation)
|> Registry.meta(:document_storage)

storage_implementation
document_storage
end

@doc false
Expand Down
17 changes: 9 additions & 8 deletions lib/absinthe/subscription/document_storage.ex
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
defmodule Absinthe.Subscription.DocumentStorage do
@moduledoc """
Behaviour for storing subscription documents. Used to tell
Absinthe how to store documents and the field keys subcribed to those
Absinthe how to store documents and the field keys associated with those
documents.
"""

@doc """
Child spec to determine how to start the
Document storage process
Document storage process. This will be supervised. Absinthe will give
the process a name and that name will be passed in the other callbacks
in order to reference it there.
"""
@callback child_spec(opts :: Keyword.t()) :: Supervisor.child_spec()

@doc """
Adds `doc` to storage with `doc_id` as the key if not already in storage.
Adds `doc` to storage with `doc_id` as the key.
"""
@callback put(
storage_process_name :: atom,
Expand All @@ -25,7 +27,7 @@ defmodule Absinthe.Subscription.DocumentStorage do
{:ok, term} | {:error, :reason}

@doc """
Associates each `{field, key}` pair in `field_keys` to the `doc_id`
Associates each `{field, key}` pair in `field_keys` to `doc_id`.
"""
@callback subscribe(
storage_process_name :: atom,
Expand All @@ -35,18 +37,17 @@ defmodule Absinthe.Subscription.DocumentStorage do
{:ok, term} | {:error, :reason}

@doc """
Removes the document
Removes the document.
"""
@callback delete(storage_process_name :: atom, doc_id :: term) :: :ok

@doc """
Removes the field_keys associated with `doc_id` from
storage
Removes the field_keys associated with `doc_id`.
"""
@callback unsubscribe(storage_process_name :: atom, doc_id :: term) :: :ok

@doc """
Get all docs associated with the field_key
Get all docs associated with `field_key`
"""
@callback get_docs_by_field_key(
storage_process_name :: atom,
Expand Down
17 changes: 9 additions & 8 deletions lib/absinthe/subscription/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ defmodule Absinthe.Subscription.Supervisor do
pool_size = Keyword.get(opts, :pool_size, System.schedulers_online() * 2)
compress_registry? = Keyword.get(opts, :compress_registry?, true)

storage_implementation =
Keyword.get(opts, :storage_implementation, Absinthe.Subscription.DefaultDocumentStorage)
document_storage =
Keyword.get(opts, :document_storage, Absinthe.Subscription.DefaultDocumentStorage)

storage_opts =
case storage_implementation do
case document_storage do
Absinthe.Subscription.DefaultDocumentStorage ->
[
keys: :duplicate,
Expand All @@ -42,15 +42,16 @@ defmodule Absinthe.Subscription.Supervisor do

Supervisor.start_link(
__MODULE__,
{pubsub, pool_size, storage_implementation, storage_opts}
{pubsub, pool_size, document_storage, storage_opts}
)
end

def init({pubsub, pool_size, storage_implementation, storage_opts}) do
def init({pubsub, pool_size, document_storage, storage_opts}) do
registry_name = Absinthe.Subscription.registry_name(pubsub)
meta = [pool_size: pool_size, storage_implementation: storage_implementation]
meta = [pool_size: pool_size, document_storage: document_storage]

storage_opts = Keyword.put(storage_opts, :name, Absinthe.Subscription.storage_name(pubsub))
storage_opts =
Keyword.put(storage_opts, :name, Absinthe.Subscription.document_storage_name(pubsub))

children = [
{Registry,
Expand All @@ -59,7 +60,7 @@ defmodule Absinthe.Subscription.Supervisor do
name: registry_name,
meta: meta
]},
storage_implementation.child_spec(storage_opts),
document_storage.child_spec(storage_opts),
{Absinthe.Subscription.ProxySupervisor, [pubsub, registry_name, pool_size]}
]

Expand Down

0 comments on commit 583c46f

Please sign in to comment.