Skip to content

Commit

Permalink
Merge pull request #2 from Frameio/mcaterisano/GRAPH-954/update-absin…
Browse files Browse the repository at this point in the history
…the-fork-to-include-optional-run-docset-callback

[GRAPH-954] update absinthe fork to include optional run docset callback
  • Loading branch information
michaelcaterisano authored Mar 6, 2024
2 parents 61e0c89 + 2408428 commit 432e4e5
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 74 deletions.
111 changes: 47 additions & 64 deletions lib/absinthe/subscription/local.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@ defmodule Absinthe.Subscription.Local do
This module handles broadcasting documents that are local to this node
"""

# This module handles running and broadcasting documents that are local to this
# node.
require Logger

alias Absinthe.Phase
alias Absinthe.Pipeline
alias Absinthe.Pipeline.BatchResolver

require Logger
# This module handles running and broadcasting documents that are local to this
# node.

@doc """
Publish a mutation to the local node only.
Expand All @@ -29,76 +27,61 @@ defmodule Absinthe.Subscription.Local do
{topic, key_strategy, doc}
end

run_docset(pubsub, docs_and_topics, mutation_result)
run_docset_fn =
if function_exported?(pubsub, :run_docset, 3), do: &pubsub.run_docset/3, else: &run_docset/3

:ok
end
run_docset_fn.(pubsub, docs_and_topics, mutation_result)

defp run_docset(pubsub, docs_and_topics, mutation_result) do
run_docset(pubsub, docs_and_topics, mutation_result, %{})
:ok
end

defp run_docset(_pubsub, [], _, _memo), do: :ok

defp run_docset(pubsub, [{topic, _key_strategy, doc} | rest], mutation_result, memo) do
{data, updated_memo} = resolve_doc(doc, mutation_result, memo)
:ok = pubsub.publish_subscription(topic, data)
run_docset(pubsub, rest, mutation_result, updated_memo)
rescue
e ->
BatchResolver.pipeline_error(e, __STACKTRACE__)
end
alias Absinthe.{Phase, Pipeline}

defp resolve_doc(doc, mutation_result, memo) do
doc_key = get_doc_key(doc)

case Map.get(memo, doc_key) do
%{} = memoized_result ->
{memoized_result, memo}

nil ->
pipeline =
doc.initial_phases
|> Pipeline.replace(
Phase.Telemetry,
{Phase.Telemetry, event: [:subscription, :publish, :start]}
)
|> Pipeline.without(Phase.Subscription.SubscribeSelf)
|> Pipeline.insert_before(
Phase.Document.Execution.Resolution,
{Phase.Document.OverrideRoot, root_value: mutation_result}
)
|> Pipeline.upto(Phase.Document.Execution.Resolution)

pipeline = [
pipeline,
[
result_phase(doc),
{Absinthe.Phase.Telemetry, event: [:subscription, :publish, :stop]}
]
]
defp run_docset(pubsub, docs_and_topics, mutation_result) do
for {topic, key_strategy, doc} <- docs_and_topics do
try do
pipeline = pipeline(doc, mutation_result)

{:ok, %{result: data}, _} = Absinthe.Pipeline.run(doc.source, pipeline)

updated_memo = Map.put(memo, doc_key, data)

{data, updated_memo}
Logger.debug("""
Absinthe Subscription Publication
Field Topic: #{inspect(key_strategy)}
Subscription id: #{inspect(topic)}
Data: #{inspect(data)}
""")

:ok = pubsub.publish_subscription(topic, data)
rescue
e ->
BatchResolver.pipeline_error(e, __STACKTRACE__)
end
end
end

defp get_doc_key(doc) do
variables =
Enum.flat_map(doc.initial_phases, fn phase ->
case phase do
{Absinthe.Phase.Document.Variables, opts} ->
opts[:variables]

_ ->
[]
end
end)

:erlang.term_to_binary({doc.source, variables})
def pipeline(doc, mutation_result) do
pipeline =
doc.initial_phases
|> Pipeline.replace(
Phase.Telemetry,
{Phase.Telemetry, event: [:subscription, :publish, :start]}
)
|> Pipeline.without(Phase.Subscription.SubscribeSelf)
|> Pipeline.insert_before(
Phase.Document.Execution.Resolution,
{Phase.Document.OverrideRoot, root_value: mutation_result}
)
|> Pipeline.upto(Phase.Document.Execution.Resolution)

pipeline = [
pipeline,
[
result_phase(doc),
{Absinthe.Phase.Telemetry, event: [:subscription, :publish, :stop]}
]
]

pipeline
end

defp get_docs(pubsub, field, mutation_result, topic: topic_fun)
Expand Down
8 changes: 8 additions & 0 deletions lib/absinthe/subscription/pubsub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,12 @@ defmodule Absinthe.Subscription.Pubsub do
only.
"""
@callback publish_subscription(topic :: binary, data :: map) :: term

# @doc """
# This function is called by publish_mutation and is responsible for resolving the documents
# and publishing the results to the appropriate topics.
# """
@callback run_docset(pubsub :: t, docs_and_topics :: list, mutation_result :: term) :: term

@optional_callbacks run_docset: 3
end
111 changes: 101 additions & 10 deletions test/absinthe/execution/subscription_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,47 @@ defmodule Absinthe.Execution.SubscriptionTest do
end
end

defmodule PubSubWithDocsetRunner do
@behaviour Absinthe.Subscription.Pubsub

def start_link() do
Registry.start_link(keys: :duplicate, name: __MODULE__)
end

def node_name() do
node()
end

def subscribe(topic) do
Registry.register(__MODULE__, topic, [])
:ok
end

def publish_subscription(topic, data) do
message = %{
topic: topic,
event: "subscription:data",
result: data
}

Registry.dispatch(__MODULE__, topic, fn entries ->
for {pid, _} <- entries, do: send(pid, {:broadcast, message})
end)
end

def publish_mutation(_proxy_topic, _mutation_result, _subscribed_fields) do
# this pubsub is local and doesn't support clusters
:ok
end

def run_docset(pubsub, docs_and_topics, _mutation_result) do
for {topic, _key_strategy, _doc} <- docs_and_topics do
# publish mutation results to topic
pubsub.publish_subscription(topic, %{data: %{runner: "calls the custom docset runner"}})
end
end
end

defmodule Schema do
use Absinthe.Schema

Expand Down Expand Up @@ -189,6 +230,9 @@ defmodule Absinthe.Execution.SubscriptionTest do
setup_all do
{:ok, _} = PubSub.start_link()
{:ok, _} = Absinthe.Subscription.start_link(PubSub)

{:ok, _} = PubSubWithDocsetRunner.start_link()
{:ok, _} = Absinthe.Subscription.start_link(PubSubWithDocsetRunner)
:ok
end

Expand Down Expand Up @@ -643,9 +687,7 @@ defmodule Absinthe.Execution.SubscriptionTest do
[:absinthe, :subscription, :publish, :start],
[:absinthe, :subscription, :publish, :stop]
],
fn event, measurements, metadata, config ->
send(self(), {event, measurements, metadata, config})
end,
&Absinthe.TestTelemetryHelper.send_to_pid/4,

Check warning on line 690 in test/absinthe/execution/subscription_test.exs

View workflow job for this annotation

GitHub Actions / Elixir 1.15 / OTP 24

Absinthe.TestTelemetryHelper.send_to_pid/4 is undefined (module Absinthe.TestTelemetryHelper is not available or is yet to be defined)

Check warning on line 690 in test/absinthe/execution/subscription_test.exs

View workflow job for this annotation

GitHub Actions / Elixir 1.14 / OTP 24

Absinthe.TestTelemetryHelper.send_to_pid/4 is undefined (module Absinthe.TestTelemetryHelper is not available or is yet to be defined)

Check warning on line 690 in test/absinthe/execution/subscription_test.exs

View workflow job for this annotation

GitHub Actions / Elixir 1.13 / OTP 25

Absinthe.TestTelemetryHelper.send_to_pid/4 is undefined (module Absinthe.TestTelemetryHelper is not available or is yet to be defined)

Check warning on line 690 in test/absinthe/execution/subscription_test.exs

View workflow job for this annotation

GitHub Actions / Elixir 1.14 / OTP 25

Absinthe.TestTelemetryHelper.send_to_pid/4 is undefined (module Absinthe.TestTelemetryHelper is not available or is yet to be defined)

Check warning on line 690 in test/absinthe/execution/subscription_test.exs

View workflow job for this annotation

GitHub Actions / Elixir 1.15 / OTP 25

Absinthe.TestTelemetryHelper.send_to_pid/4 is undefined (module Absinthe.TestTelemetryHelper is not available or is yet to be defined)

Check warning on line 690 in test/absinthe/execution/subscription_test.exs

View workflow job for this annotation

GitHub Actions / Elixir 1.15 / OTP 26

Absinthe.TestTelemetryHelper.send_to_pid/4 is undefined (module Absinthe.TestTelemetryHelper is not available or is yet to be defined)

Check warning on line 690 in test/absinthe/execution/subscription_test.exs

View workflow job for this annotation

GitHub Actions / Elixir 1.13 / OTP 24

Absinthe.TestTelemetryHelper.send_to_pid/4 is undefined (module Absinthe.TestTelemetryHelper is not available or is yet to be defined)

Check warning on line 690 in test/absinthe/execution/subscription_test.exs

View workflow job for this annotation

GitHub Actions / Elixir 1.13 / OTP 25

Absinthe.TestTelemetryHelper.send_to_pid/4 is undefined (module Absinthe.TestTelemetryHelper is not available or is yet to be defined)

Check warning on line 690 in test/absinthe/execution/subscription_test.exs

View workflow job for this annotation

GitHub Actions / Elixir 1.15 / OTP 26

Absinthe.TestTelemetryHelper.send_to_pid/4 is undefined (module Absinthe.TestTelemetryHelper is not available or is yet to be defined)

Check warning on line 690 in test/absinthe/execution/subscription_test.exs

View workflow job for this annotation

GitHub Actions / Elixir 1.14 / OTP 25

Absinthe.TestTelemetryHelper.send_to_pid/4 is undefined (module Absinthe.TestTelemetryHelper is not available or is yet to be defined)

Check warning on line 690 in test/absinthe/execution/subscription_test.exs

View workflow job for this annotation

GitHub Actions / Elixir 1.15 / OTP 24

Absinthe.TestTelemetryHelper.send_to_pid/4 is undefined (module Absinthe.TestTelemetryHelper is not available or is yet to be defined)

Check warning on line 690 in test/absinthe/execution/subscription_test.exs

View workflow job for this annotation

GitHub Actions / Elixir 1.15 / OTP 25

Absinthe.TestTelemetryHelper.send_to_pid/4 is undefined (module Absinthe.TestTelemetryHelper is not available or is yet to be defined)
%{}
)

Expand All @@ -657,10 +699,13 @@ defmodule Absinthe.Execution.SubscriptionTest do
context: %{pubsub: PubSub}
)

assert_receive {[:absinthe, :execute, :operation, :start], measurements, %{id: id}, _config}
assert_receive {:telemetry_event,
{[:absinthe, :execute, :operation, :start], measurements, %{id: id}, _config}}

assert System.convert_time_unit(measurements[:system_time], :native, :millisecond)

assert_receive {[:absinthe, :execute, :operation, :stop], _, %{id: ^id}, _config}
assert_receive {:telemetry_event,
{[:absinthe, :execute, :operation, :stop], _, %{id: ^id}, _config}}

Absinthe.Subscription.publish(PubSub, "foo", thing: client_id)
assert_receive({:broadcast, msg})
Expand All @@ -672,8 +717,11 @@ defmodule Absinthe.Execution.SubscriptionTest do
} == msg

# Subscription events
assert_receive {[:absinthe, :subscription, :publish, :start], _, %{id: id}, _config}
assert_receive {[:absinthe, :subscription, :publish, :stop], _, %{id: ^id}, _config}
assert_receive {:telemetry_event,
{[:absinthe, :subscription, :publish, :start], _, %{id: id}, _config}}

assert_receive {:telemetry_event,
{[:absinthe, :subscription, :publish, :stop], _, %{id: ^id}, _config}}

:telemetry.detach(context.test)
end
Expand Down Expand Up @@ -715,12 +763,55 @@ defmodule Absinthe.Execution.SubscriptionTest do
refute_receive({:broadcast, _})
end

defp run_subscription(query, schema, opts \\ []) do
opts = Keyword.update(opts, :context, %{pubsub: PubSub}, &Map.put(&1, :pubsub, PubSub))
@query """
subscription ($userId: ID!) {
user(id: $userId) { id name }
}
"""
test "calls the optional run_docset callback if supplied" do
id = "1"

assert {:ok, %{"subscribed" => topic}} =
run_subscription(
@query,
Schema,
variables: %{"userId" => id},
context: %{pubsub: PubSubWithDocsetRunner}
)

mutation = """
mutation ($userId: ID!) {
updateUser(id: $userId) { id name }
}
"""

assert {:ok, %{data: _}} =
run_subscription(mutation, Schema,
variables: %{"userId" => id},
context: %{pubsub: PubSubWithDocsetRunner}
)

assert_receive({:broadcast, msg})

assert %{
event: "subscription:data",
result: %{data: %{runner: "calls the custom docset runner"}},
topic: topic
} == msg
end

def run_subscription(query, schema, opts \\ []) do
opts =
Keyword.update(
opts,
:context,
%{pubsub: PubSub},
&Map.put(&1, :pubsub, opts[:context][:pubsub] || PubSub)
)

case run(query, schema, opts) do
{:ok, %{"subscribed" => topic}} = val ->
PubSub.subscribe(topic)
opts[:context][:pubsub].subscribe(topic)
val

val ->
Expand Down

0 comments on commit 432e4e5

Please sign in to comment.