From 7c894f9ca0768cfb7bd992ba5beae2148c6fd202 Mon Sep 17 00:00:00 2001 From: Trey Pendragon Date: Mon, 21 Oct 2024 15:36:32 -0700 Subject: [PATCH 01/20] Proof of concept index metrics tracker. --- lib/dpul_collections/index_metrics_tracker.ex | 37 +++++++++++++++++++ .../indexing_pipeline/database_producer.ex | 4 ++ 2 files changed, 41 insertions(+) create mode 100644 lib/dpul_collections/index_metrics_tracker.ex diff --git a/lib/dpul_collections/index_metrics_tracker.ex b/lib/dpul_collections/index_metrics_tracker.ex new file mode 100644 index 00000000..9eb68b42 --- /dev/null +++ b/lib/dpul_collections/index_metrics_tracker.ex @@ -0,0 +1,37 @@ +defmodule DpulCollections.IndexMetricsTracker do + use GenServer + + def start_link(_) do + GenServer.start_link(__MODULE__, [], name: __MODULE__) + end + + def register_fresh_index(source) do + GenServer.cast(__MODULE__, { :fresh_index, source }) + end + + def register_polling_started(source) do + GenServer.cast(__MODULE__, { :poll_started, source }) + end + + @impl true + def init(_) do + {:ok, %{}} + end + + @impl true + def handle_cast({:fresh_index, source}, state) do + new_state = put_in(state, [source], %{start_time: :erlang.monotonic_time()}) + {:noreply, new_state} + end + + def handle_cast({:poll_started, source}, state) do + if get_in(state, [source, :start_time]) != nil && get_in(state, [source, :end_time]) == nil do + state = put_in(state, [source, :end_time], :erlang.monotonic_time()) + duration = state[source][:end_time] - state[source][:start_time] + IO.inspect("Duration (ms)(#{source}): #{System.convert_time_unit(duration, :native, :millisecond)}") + {:noreply, state} + else + {:noreply, state} + end + end +end diff --git a/lib/dpul_collections/indexing_pipeline/database_producer.ex b/lib/dpul_collections/indexing_pipeline/database_producer.ex index ba9d13f2..d61c903f 100644 --- a/lib/dpul_collections/indexing_pipeline/database_producer.ex +++ b/lib/dpul_collections/indexing_pipeline/database_producer.ex @@ -59,6 +59,9 @@ defmodule DpulCollections.IndexingPipeline.DatabaseProducer do source_module: source_module } ) do + if last_queried_marker == nil do + DpulCollections.IndexMetricsTracker.register_fresh_index(source_module) + end total_demand = stored_demand + demand records = @@ -82,6 +85,7 @@ defmodule DpulCollections.IndexingPipeline.DatabaseProducer do # Set a timer to try fulfilling demand again later if new_state.stored_demand > 0 do + DpulCollections.IndexMetricsTracker.register_polling_started(source_module) Process.send_after(self(), :check_for_updates, 50) end From 60c4bc40c0db7beae3461233344111d4def119f4 Mon Sep 17 00:00:00 2001 From: Trey Pendragon Date: Tue, 22 Oct 2024 10:23:31 -0700 Subject: [PATCH 02/20] Start tracking telemetry. --- lib/dpul_collections/index_metrics_tracker.ex | 13 ++++++- lib/dpul_collections_web/telemetry.ex | 35 ++++++++++++++++++- 2 files changed, 46 insertions(+), 2 deletions(-) diff --git a/lib/dpul_collections/index_metrics_tracker.ex b/lib/dpul_collections/index_metrics_tracker.ex index 9eb68b42..b44940ba 100644 --- a/lib/dpul_collections/index_metrics_tracker.ex +++ b/lib/dpul_collections/index_metrics_tracker.ex @@ -1,5 +1,6 @@ defmodule DpulCollections.IndexMetricsTracker do use GenServer + alias DpulCollections.IndexingPipeline.Figgy def start_link(_) do GenServer.start_link(__MODULE__, [], name: __MODULE__) @@ -28,10 +29,20 @@ defmodule DpulCollections.IndexMetricsTracker do if get_in(state, [source, :start_time]) != nil && get_in(state, [source, :end_time]) == nil do state = put_in(state, [source, :end_time], :erlang.monotonic_time()) duration = state[source][:end_time] - state[source][:start_time] - IO.inspect("Duration (ms)(#{source}): #{System.convert_time_unit(duration, :native, :millisecond)}") + :telemetry.execute([:dpulc, :indexing_pipeline, event(source), :time_to_poll], %{duration: duration}, %{source: source}) {:noreply, state} else {:noreply, state} end end + + def event(Figgy.HydrationProducerSource) do + :hydrator + end + def event(Figgy.TransformationProducerSource) do + :transformer + end + def event(Figgy.IndexingProducerSource) do + :indexer + end end diff --git a/lib/dpul_collections_web/telemetry.ex b/lib/dpul_collections_web/telemetry.ex index 1a8ee906..723dd183 100644 --- a/lib/dpul_collections_web/telemetry.ex +++ b/lib/dpul_collections_web/telemetry.ex @@ -1,6 +1,7 @@ defmodule DpulCollectionsWeb.Telemetry do use Supervisor import Telemetry.Metrics + alias DpulCollections.IndexingPipeline.Figgy def start_link(arg) do Supervisor.start_link(__MODULE__, arg, name: __MODULE__) @@ -78,7 +79,39 @@ defmodule DpulCollectionsWeb.Telemetry do summary("vm.memory.total", unit: {:byte, :kilobyte}), summary("vm.total_run_queue_lengths.total"), summary("vm.total_run_queue_lengths.cpu"), - summary("vm.total_run_queue_lengths.io") + summary("vm.total_run_queue_lengths.io"), + + # Indexing Metrics + last_value( + "Hydrator Full Run", + event_name: "dpulc.indexing_pipeline.hydrator.time_to_poll", + measurement: :duration, + unit: {:native, :second}, + description: "Time from fresh index to poll for the Hydrator", + reporter_options: [ + nav: "Indexing Pipeline" + ] + ), + last_value( + "Transformer Full Run", + event_name: "dpulc.indexing_pipeline.transformer.time_to_poll", + measurement: :duration, + unit: {:native, :second}, + description: "Time from fresh index to poll for the Transformer", + reporter_options: [ + nav: "Indexing Pipeline" + ] + ), + last_value( + "Indexer Full Run", + event_name: "dpulc.indexing_pipeline.indexer.time_to_poll", + measurement: :duration, + unit: {:native, :second}, + description: "Time from fresh index to poll for the Indexer", + reporter_options: [ + nav: "Indexing Pipeline" + ] + ), ] end From 584df3ea6c074c37fa08add2821f541bcd837273 Mon Sep 17 00:00:00 2001 From: Trey Pendragon Date: Tue, 22 Oct 2024 10:41:21 -0700 Subject: [PATCH 03/20] Add a basic test. --- .../integration/full_integration_test.exs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs b/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs index a6134ce0..bb66c977 100644 --- a/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs +++ b/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs @@ -3,7 +3,7 @@ defmodule DpulCollections.IndexingPipeline.FiggyFullIntegrationTest do alias DpulCollections.Repo alias DpulCollections.IndexingPipeline.Figgy - alias DpulCollections.{IndexingPipeline, Solr} + alias DpulCollections.{IndexingPipeline, Solr, IndexMetricsTracker} import SolrTestSupport setup do @@ -47,8 +47,11 @@ defmodule DpulCollections.IndexingPipeline.FiggyFullIntegrationTest do {Figgy.IndexingConsumer, cache_version: cache_version, batch_size: 50, write_collection: active_collection()}, {Figgy.TransformationConsumer, cache_version: cache_version, batch_size: 50}, - {Figgy.HydrationConsumer, cache_version: cache_version, batch_size: 50} + {Figgy.HydrationConsumer, cache_version: cache_version, batch_size: 50}, + {IndexMetricsTracker, []} ] + test_pid = self() + :ok = :telemetry.attach("hydration-full-run", [:dpulc, :indexing_pipeline, :hydrator, :time_to_poll], fn _,measurements,_,_ -> send(test_pid, {:hydrator_time_to_poll_hit, measurements}) end, nil) Supervisor.start_link(children, strategy: :one_for_one, name: DpulCollections.TestSupervisor) @@ -146,6 +149,9 @@ defmodule DpulCollections.IndexingPipeline.FiggyFullIntegrationTest do assert hydration_entry.cache_order != hydration_entry_again.cache_order Supervisor.stop(DpulCollections.TestSupervisor, :normal) + + # Ensure metrics are being sent. + assert_receive {:hydrator_time_to_poll_hit, %{duration: _}} end test "indexes expected fields" do From 7fef495f457377cb77e549bbbcf755f077429a1e Mon Sep 17 00:00:00 2001 From: Trey Pendragon Date: Tue, 22 Oct 2024 10:55:41 -0700 Subject: [PATCH 04/20] Mix format. --- lib/dpul_collections/index_metrics_tracker.ex | 14 +++++++++++--- .../indexing_pipeline/database_producer.ex | 1 + lib/dpul_collections_web/telemetry.ex | 2 +- .../integration/full_integration_test.exs | 12 +++++++++++- 4 files changed, 24 insertions(+), 5 deletions(-) diff --git a/lib/dpul_collections/index_metrics_tracker.ex b/lib/dpul_collections/index_metrics_tracker.ex index b44940ba..bf3a3b04 100644 --- a/lib/dpul_collections/index_metrics_tracker.ex +++ b/lib/dpul_collections/index_metrics_tracker.ex @@ -7,11 +7,11 @@ defmodule DpulCollections.IndexMetricsTracker do end def register_fresh_index(source) do - GenServer.cast(__MODULE__, { :fresh_index, source }) + GenServer.cast(__MODULE__, {:fresh_index, source}) end def register_polling_started(source) do - GenServer.cast(__MODULE__, { :poll_started, source }) + GenServer.cast(__MODULE__, {:poll_started, source}) end @impl true @@ -29,7 +29,13 @@ defmodule DpulCollections.IndexMetricsTracker do if get_in(state, [source, :start_time]) != nil && get_in(state, [source, :end_time]) == nil do state = put_in(state, [source, :end_time], :erlang.monotonic_time()) duration = state[source][:end_time] - state[source][:start_time] - :telemetry.execute([:dpulc, :indexing_pipeline, event(source), :time_to_poll], %{duration: duration}, %{source: source}) + + :telemetry.execute( + [:dpulc, :indexing_pipeline, event(source), :time_to_poll], + %{duration: duration}, + %{source: source} + ) + {:noreply, state} else {:noreply, state} @@ -39,9 +45,11 @@ defmodule DpulCollections.IndexMetricsTracker do def event(Figgy.HydrationProducerSource) do :hydrator end + def event(Figgy.TransformationProducerSource) do :transformer end + def event(Figgy.IndexingProducerSource) do :indexer end diff --git a/lib/dpul_collections/indexing_pipeline/database_producer.ex b/lib/dpul_collections/indexing_pipeline/database_producer.ex index d61c903f..cc28f611 100644 --- a/lib/dpul_collections/indexing_pipeline/database_producer.ex +++ b/lib/dpul_collections/indexing_pipeline/database_producer.ex @@ -62,6 +62,7 @@ defmodule DpulCollections.IndexingPipeline.DatabaseProducer do if last_queried_marker == nil do DpulCollections.IndexMetricsTracker.register_fresh_index(source_module) end + total_demand = stored_demand + demand records = diff --git a/lib/dpul_collections_web/telemetry.ex b/lib/dpul_collections_web/telemetry.ex index 723dd183..9d98945a 100644 --- a/lib/dpul_collections_web/telemetry.ex +++ b/lib/dpul_collections_web/telemetry.ex @@ -111,7 +111,7 @@ defmodule DpulCollectionsWeb.Telemetry do reporter_options: [ nav: "Indexing Pipeline" ] - ), + ) ] end diff --git a/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs b/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs index bb66c977..3871512c 100644 --- a/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs +++ b/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs @@ -50,8 +50,18 @@ defmodule DpulCollections.IndexingPipeline.FiggyFullIntegrationTest do {Figgy.HydrationConsumer, cache_version: cache_version, batch_size: 50}, {IndexMetricsTracker, []} ] + test_pid = self() - :ok = :telemetry.attach("hydration-full-run", [:dpulc, :indexing_pipeline, :hydrator, :time_to_poll], fn _,measurements,_,_ -> send(test_pid, {:hydrator_time_to_poll_hit, measurements}) end, nil) + + :ok = + :telemetry.attach( + "hydration-full-run", + [:dpulc, :indexing_pipeline, :hydrator, :time_to_poll], + fn _, measurements, _, _ -> + send(test_pid, {:hydrator_time_to_poll_hit, measurements}) + end, + nil + ) Supervisor.start_link(children, strategy: :one_for_one, name: DpulCollections.TestSupervisor) From f99074b5ef79657606354cbc2368c153b177cbef Mon Sep 17 00:00:00 2001 From: Trey Pendragon Date: Wed, 20 Nov 2024 09:16:45 -0800 Subject: [PATCH 05/20] Save an index metric to the database. --- lib/dpul_collections/application.ex | 3 +- lib/dpul_collections/index_metrics_tracker.ex | 25 ++++++++++++----- .../indexing_pipeline/index_metric.ex | 19 +++++++++++++ .../indexing_pipeline/metrics.ex | 28 +++++++++++++++++++ .../20241120163247_create_index_metrics.exs | 16 +++++++++++ .../index_metrics_tracker_test.exs | 19 +++++++++++++ .../integration/full_integration_test.exs | 3 +- 7 files changed, 103 insertions(+), 10 deletions(-) create mode 100644 lib/dpul_collections/indexing_pipeline/index_metric.ex create mode 100644 lib/dpul_collections/indexing_pipeline/metrics.ex create mode 100644 priv/repo/migrations/20241120163247_create_index_metrics.exs create mode 100644 test/dpul_collections/index_metrics_tracker_test.exs diff --git a/lib/dpul_collections/application.ex b/lib/dpul_collections/application.ex index a23cea48..015f1eae 100644 --- a/lib/dpul_collections/application.ex +++ b/lib/dpul_collections/application.ex @@ -20,7 +20,8 @@ defmodule DpulCollections.Application do # Start a worker by calling: DpulCollections.Worker.start_link(arg) # {DpulCollections.Worker, arg}, # Start to serve requests, typically the last entry - DpulCollectionsWeb.Endpoint + DpulCollectionsWeb.Endpoint, + DpulCollections.IndexMetricsTracker ] ++ environment_children(Application.fetch_env!(:dpul_collections, :current_env)) # See https://hexdocs.pm/elixir/Supervisor.html diff --git a/lib/dpul_collections/index_metrics_tracker.ex b/lib/dpul_collections/index_metrics_tracker.ex index bf3a3b04..efd6b266 100644 --- a/lib/dpul_collections/index_metrics_tracker.ex +++ b/lib/dpul_collections/index_metrics_tracker.ex @@ -1,5 +1,6 @@ defmodule DpulCollections.IndexMetricsTracker do use GenServer + alias DpulCollections.IndexingPipeline.Metrics alias DpulCollections.IndexingPipeline.Figgy def start_link(_) do @@ -7,11 +8,15 @@ defmodule DpulCollections.IndexMetricsTracker do end def register_fresh_index(source) do - GenServer.cast(__MODULE__, {:fresh_index, source}) + GenServer.call(__MODULE__, {:fresh_index, source}) end def register_polling_started(source) do - GenServer.cast(__MODULE__, {:poll_started, source}) + GenServer.call(__MODULE__, {:poll_started, source}) + end + + def index_times(source) do + Metrics.index_metrics(source.processor_marker_key(), "full_index") end @impl true @@ -20,12 +25,12 @@ defmodule DpulCollections.IndexMetricsTracker do end @impl true - def handle_cast({:fresh_index, source}, state) do + def handle_call({:fresh_index, source}, _, state) do new_state = put_in(state, [source], %{start_time: :erlang.monotonic_time()}) - {:noreply, new_state} + {:reply, nil, new_state} end - def handle_cast({:poll_started, source}, state) do + def handle_call({:poll_started, source}, _, state) do if get_in(state, [source, :start_time]) != nil && get_in(state, [source, :end_time]) == nil do state = put_in(state, [source, :end_time], :erlang.monotonic_time()) duration = state[source][:end_time] - state[source][:start_time] @@ -36,9 +41,15 @@ defmodule DpulCollections.IndexMetricsTracker do %{source: source} ) - {:noreply, state} + Metrics.create_index_metric(%{ + type: source.processor_marker_key(), + measurement_type: "full_index", + duration: System.convert_time_unit(duration, :native, :millisecond) + }) + + {:reply, nil, state} else - {:noreply, state} + {:reply, nil, state} end end diff --git a/lib/dpul_collections/indexing_pipeline/index_metric.ex b/lib/dpul_collections/indexing_pipeline/index_metric.ex new file mode 100644 index 00000000..d0a6873c --- /dev/null +++ b/lib/dpul_collections/indexing_pipeline/index_metric.ex @@ -0,0 +1,19 @@ +defmodule DpulCollections.IndexingPipeline.IndexMetric do + use Ecto.Schema + import Ecto.Changeset + + schema "index_metrics" do + field :type, :string + field :measurement_type, :string + field :duration, :decimal + + timestamps(type: :utc_datetime_usec) + end + + @doc false + def changeset(index_metric, attrs) do + index_metric + |> cast(attrs, [:type, :measurement_type, :duration]) + |> validate_required([:type, :measurement_type, :duration]) + end +end diff --git a/lib/dpul_collections/indexing_pipeline/metrics.ex b/lib/dpul_collections/indexing_pipeline/metrics.ex new file mode 100644 index 00000000..b3ee2c2e --- /dev/null +++ b/lib/dpul_collections/indexing_pipeline/metrics.ex @@ -0,0 +1,28 @@ +defmodule DpulCollections.IndexingPipeline.Metrics do + import Ecto.Query, warn: false + alias DpulCollections.Repo + alias DpulCollections.IndexingPipeline.IndexMetric + + @doc """ + Creates an IndexMetric + """ + def create_index_metric(attrs \\ %{}) do + {:ok, index_metric} = + %IndexMetric{} + |> IndexMetric.changeset(attrs) + |> Repo.insert() + + index_metric + end + + @doc """ + Get index metrics by type + """ + def index_metrics(type, measurement_type) do + query = + from r in IndexMetric, + where: r.type == ^type and r.measurement_type == ^measurement_type + + Repo.all(query) + end +end diff --git a/priv/repo/migrations/20241120163247_create_index_metrics.exs b/priv/repo/migrations/20241120163247_create_index_metrics.exs new file mode 100644 index 00000000..bd909bc2 --- /dev/null +++ b/priv/repo/migrations/20241120163247_create_index_metrics.exs @@ -0,0 +1,16 @@ +defmodule DpulCollections.Repo.Migrations.CreateIndexMetrics do + use Ecto.Migration + + def change do + create table(:index_metrics) do + add :type, :string + add :measurement_type, :string + add :duration, :decimal + + timestamps(type: :utc_datetime_usec) + end + + create index(:index_metrics, [:type]) + create index(:index_metrics, [:measurement_type]) + end +end diff --git a/test/dpul_collections/index_metrics_tracker_test.exs b/test/dpul_collections/index_metrics_tracker_test.exs new file mode 100644 index 00000000..e7a9855c --- /dev/null +++ b/test/dpul_collections/index_metrics_tracker_test.exs @@ -0,0 +1,19 @@ +defmodule DpulCollections.IndexMetricsTrackerTest do + alias DpulCollections.IndexingPipeline.IndexMetric + alias DpulCollections.IndexingPipeline.Figgy.HydrationProducerSource + alias DpulCollections.IndexMetricsTracker + alias Phoenix.ActionClauseError + use DpulCollections.DataCase + + describe "index_times/1" do + test "registers index times" do + # Act + IndexMetricsTracker.register_fresh_index(HydrationProducerSource) + IndexMetricsTracker.register_polling_started(HydrationProducerSource) + [metric = %IndexMetric{}] = IndexMetricsTracker.index_times(HydrationProducerSource) + + # Assert + assert metric.duration != 0 + end + end +end diff --git a/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs b/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs index 3871512c..a2882f39 100644 --- a/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs +++ b/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs @@ -47,8 +47,7 @@ defmodule DpulCollections.IndexingPipeline.FiggyFullIntegrationTest do {Figgy.IndexingConsumer, cache_version: cache_version, batch_size: 50, write_collection: active_collection()}, {Figgy.TransformationConsumer, cache_version: cache_version, batch_size: 50}, - {Figgy.HydrationConsumer, cache_version: cache_version, batch_size: 50}, - {IndexMetricsTracker, []} + {Figgy.HydrationConsumer, cache_version: cache_version, batch_size: 50} ] test_pid = self() From 1aa6603d19bcb4004ed4592a3628ab5aa7cc69f2 Mon Sep 17 00:00:00 2001 From: Trey Pendragon Date: Wed, 20 Nov 2024 09:54:04 -0800 Subject: [PATCH 06/20] Store in seconds. I'm not sure this is useful still - the full hydration test definitely takes more than a second to run, but since we're just measuring what it takes to produce those last messages it's less than a second. Maybe the differential isn't a big deal in production. --- lib/dpul_collections/index_metrics_tracker.ex | 2 +- .../indexing_pipeline/database_producer.ex | 8 ++++---- lib/dpul_collections/indexing_pipeline/index_metric.ex | 3 ++- .../migrations/20241120163247_create_index_metrics.exs | 2 +- test/dpul_collections/index_metrics_tracker_test.exs | 3 ++- .../integration/full_integration_test.exs | 4 ++++ 6 files changed, 14 insertions(+), 8 deletions(-) diff --git a/lib/dpul_collections/index_metrics_tracker.ex b/lib/dpul_collections/index_metrics_tracker.ex index efd6b266..5dd380de 100644 --- a/lib/dpul_collections/index_metrics_tracker.ex +++ b/lib/dpul_collections/index_metrics_tracker.ex @@ -44,7 +44,7 @@ defmodule DpulCollections.IndexMetricsTracker do Metrics.create_index_metric(%{ type: source.processor_marker_key(), measurement_type: "full_index", - duration: System.convert_time_unit(duration, :native, :millisecond) + duration: System.convert_time_unit(duration, :native, :second) }) {:reply, nil, state} diff --git a/lib/dpul_collections/indexing_pipeline/database_producer.ex b/lib/dpul_collections/indexing_pipeline/database_producer.ex index cc28f611..553979ca 100644 --- a/lib/dpul_collections/indexing_pipeline/database_producer.ex +++ b/lib/dpul_collections/indexing_pipeline/database_producer.ex @@ -59,15 +59,15 @@ defmodule DpulCollections.IndexingPipeline.DatabaseProducer do source_module: source_module } ) do - if last_queried_marker == nil do - DpulCollections.IndexMetricsTracker.register_fresh_index(source_module) - end - total_demand = stored_demand + demand records = source_module.get_cache_entries_since!(last_queried_marker, total_demand, cache_version) + if last_queried_marker == nil && length(records) > 0 do + DpulCollections.IndexMetricsTracker.register_fresh_index(source_module) + end + new_state = state |> Map.put( diff --git a/lib/dpul_collections/indexing_pipeline/index_metric.ex b/lib/dpul_collections/indexing_pipeline/index_metric.ex index d0a6873c..4d23d1c7 100644 --- a/lib/dpul_collections/indexing_pipeline/index_metric.ex +++ b/lib/dpul_collections/indexing_pipeline/index_metric.ex @@ -5,7 +5,8 @@ defmodule DpulCollections.IndexingPipeline.IndexMetric do schema "index_metrics" do field :type, :string field :measurement_type, :string - field :duration, :decimal + # Duration in seconds + field :duration, :integer timestamps(type: :utc_datetime_usec) end diff --git a/priv/repo/migrations/20241120163247_create_index_metrics.exs b/priv/repo/migrations/20241120163247_create_index_metrics.exs index bd909bc2..8145ae6b 100644 --- a/priv/repo/migrations/20241120163247_create_index_metrics.exs +++ b/priv/repo/migrations/20241120163247_create_index_metrics.exs @@ -5,7 +5,7 @@ defmodule DpulCollections.Repo.Migrations.CreateIndexMetrics do create table(:index_metrics) do add :type, :string add :measurement_type, :string - add :duration, :decimal + add :duration, :integer timestamps(type: :utc_datetime_usec) end diff --git a/test/dpul_collections/index_metrics_tracker_test.exs b/test/dpul_collections/index_metrics_tracker_test.exs index e7a9855c..93afb080 100644 --- a/test/dpul_collections/index_metrics_tracker_test.exs +++ b/test/dpul_collections/index_metrics_tracker_test.exs @@ -13,7 +13,8 @@ defmodule DpulCollections.IndexMetricsTrackerTest do [metric = %IndexMetric{}] = IndexMetricsTracker.index_times(HydrationProducerSource) # Assert - assert metric.duration != 0 + # This is 0 because it takes less than a second to run. + assert metric.duration == 0 end end end diff --git a/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs b/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs index a2882f39..09889ce6 100644 --- a/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs +++ b/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs @@ -1,4 +1,5 @@ defmodule DpulCollections.IndexingPipeline.FiggyFullIntegrationTest do + alias DpulCollections.IndexingPipeline.Figgy.HydrationProducerSource use DpulCollections.DataCase alias DpulCollections.Repo @@ -161,6 +162,9 @@ defmodule DpulCollections.IndexingPipeline.FiggyFullIntegrationTest do # Ensure metrics are being sent. assert_receive {:hydrator_time_to_poll_hit, %{duration: _}} + [hydration_metric_1 | _] = IndexMetricsTracker.index_times(HydrationProducerSource) + # This is 0 because hydration production takes less than a second to run. + assert hydration_metric_1.duration == 0 end test "indexes expected fields" do From e079f5c1d6c395393dd06edd55c67f92e4b38b2c Mon Sep 17 00:00:00 2001 From: Trey Pendragon Date: Wed, 20 Nov 2024 09:58:58 -0800 Subject: [PATCH 07/20] Add processor key to ack telemetry event. --- .../indexing_pipeline/database_producer.ex | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/dpul_collections/indexing_pipeline/database_producer.ex b/lib/dpul_collections/indexing_pipeline/database_producer.ex index 553979ca..83cf291d 100644 --- a/lib/dpul_collections/indexing_pipeline/database_producer.ex +++ b/lib/dpul_collections/indexing_pipeline/database_producer.ex @@ -141,7 +141,7 @@ defmodule DpulCollections.IndexingPipeline.DatabaseProducer do }) end - notify_ack(pending_markers |> length()) + notify_ack(pending_markers |> length(), state.source_module.processor_marker_key()) {:noreply, messages, new_state} end @@ -219,12 +219,12 @@ defmodule DpulCollections.IndexingPipeline.DatabaseProducer do # This happens when ack is finished, we listen to this telemetry event in # tests so we know when the Producer's done processing a message. - @spec notify_ack(integer()) :: any() - defp notify_ack(acked_message_count) do + @spec notify_ack(integer(), String.t()) :: any() + defp notify_ack(acked_message_count, processor_marker_key) do :telemetry.execute( [:database_producer, :ack, :done], %{}, - %{acked_count: acked_message_count} + %{acked_count: acked_message_count, processor_marker_key: processor_marker_key} ) end From 07ae87ba35fc109151500c1432bb4cc6dd192a0a Mon Sep 17 00:00:00 2001 From: Trey Pendragon Date: Wed, 20 Nov 2024 10:29:53 -0800 Subject: [PATCH 08/20] Send unacked message count in ack message. --- .../indexing_pipeline/database_producer.ex | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/lib/dpul_collections/indexing_pipeline/database_producer.ex b/lib/dpul_collections/indexing_pipeline/database_producer.ex index 83cf291d..9f4f53fe 100644 --- a/lib/dpul_collections/indexing_pipeline/database_producer.ex +++ b/lib/dpul_collections/indexing_pipeline/database_producer.ex @@ -141,7 +141,12 @@ defmodule DpulCollections.IndexingPipeline.DatabaseProducer do }) end - notify_ack(pending_markers |> length(), state.source_module.processor_marker_key()) + notify_ack( + pending_markers |> length(), + new_state.pulled_records |> length(), + state.source_module.processor_marker_key() + ) + {:noreply, messages, new_state} end @@ -219,12 +224,16 @@ defmodule DpulCollections.IndexingPipeline.DatabaseProducer do # This happens when ack is finished, we listen to this telemetry event in # tests so we know when the Producer's done processing a message. - @spec notify_ack(integer(), String.t()) :: any() - defp notify_ack(acked_message_count, processor_marker_key) do + @spec notify_ack(integer(), integer(), String.t()) :: any() + defp notify_ack(acked_message_count, unacked_count, processor_marker_key) do :telemetry.execute( [:database_producer, :ack, :done], %{}, - %{acked_count: acked_message_count, processor_marker_key: processor_marker_key} + %{ + acked_count: acked_message_count, + unacked_count: unacked_count, + processor_marker_key: processor_marker_key + } ) end From 5f69e36807bd46742e716ede58ebb940cda5290e Mon Sep 17 00:00:00 2001 From: Trey Pendragon Date: Wed, 20 Nov 2024 11:54:34 -0800 Subject: [PATCH 09/20] WIP --- .../indexing_pipeline/index_metric.ex | 1 + .../20241120163247_create_index_metrics.exs | 1 + .../index_metrics_tracker_test.exs | 22 +++++++++++++++++++ 3 files changed, 24 insertions(+) diff --git a/lib/dpul_collections/indexing_pipeline/index_metric.ex b/lib/dpul_collections/indexing_pipeline/index_metric.ex index 4d23d1c7..c3b233f3 100644 --- a/lib/dpul_collections/indexing_pipeline/index_metric.ex +++ b/lib/dpul_collections/indexing_pipeline/index_metric.ex @@ -7,6 +7,7 @@ defmodule DpulCollections.IndexingPipeline.IndexMetric do field :measurement_type, :string # Duration in seconds field :duration, :integer + field :records_acked, :integer timestamps(type: :utc_datetime_usec) end diff --git a/priv/repo/migrations/20241120163247_create_index_metrics.exs b/priv/repo/migrations/20241120163247_create_index_metrics.exs index 8145ae6b..9473be67 100644 --- a/priv/repo/migrations/20241120163247_create_index_metrics.exs +++ b/priv/repo/migrations/20241120163247_create_index_metrics.exs @@ -6,6 +6,7 @@ defmodule DpulCollections.Repo.Migrations.CreateIndexMetrics do add :type, :string add :measurement_type, :string add :duration, :integer + add :records_acked, :integer timestamps(type: :utc_datetime_usec) end diff --git a/test/dpul_collections/index_metrics_tracker_test.exs b/test/dpul_collections/index_metrics_tracker_test.exs index 93afb080..8cf9359c 100644 --- a/test/dpul_collections/index_metrics_tracker_test.exs +++ b/test/dpul_collections/index_metrics_tracker_test.exs @@ -9,12 +9,34 @@ defmodule DpulCollections.IndexMetricsTrackerTest do test "registers index times" do # Act IndexMetricsTracker.register_fresh_index(HydrationProducerSource) + # Send an ack done with acked_count 1 + :telemetry.execute( + [:database_producer, :ack, :done], + %{}, + %{ + acked_count: 1, + unacked_count: 0, + processor_marker_key: HydrationProducerSource.processor_marker_key() + } + ) IndexMetricsTracker.register_polling_started(HydrationProducerSource) + # Send an ack done with unacked_count 0, this triggers an index time + # create. + :telemetry.execute( + [:database_producer, :ack, :done], + %{}, + %{ + acked_count: 1, + unacked_count: 0, + processor_marker_key: HydrationProducerSource.processor_marker_key() + } + ) [metric = %IndexMetric{}] = IndexMetricsTracker.index_times(HydrationProducerSource) # Assert # This is 0 because it takes less than a second to run. assert metric.duration == 0 + assert metric.records_acked == 2 end end end From bda434b7ae2432fb73dd49f945b0ab5a21e60ec8 Mon Sep 17 00:00:00 2001 From: Trey Pendragon Date: Wed, 20 Nov 2024 14:33:21 -0800 Subject: [PATCH 10/20] Calculate index metrics from last ack. --- lib/dpul_collections/index_metrics_tracker.ex | 125 ++++++++++++++---- .../indexing_pipeline/index_metric.ex | 6 +- .../indexing_pipeline/metrics.ex | 3 +- .../index_metrics_tracker_test.exs | 16 ++- .../integration/full_integration_test.exs | 3 +- 5 files changed, 121 insertions(+), 32 deletions(-) diff --git a/lib/dpul_collections/index_metrics_tracker.ex b/lib/dpul_collections/index_metrics_tracker.ex index 5dd380de..d30edad0 100644 --- a/lib/dpul_collections/index_metrics_tracker.ex +++ b/lib/dpul_collections/index_metrics_tracker.ex @@ -4,7 +4,20 @@ defmodule DpulCollections.IndexMetricsTracker do alias DpulCollections.IndexingPipeline.Figgy def start_link(_) do - GenServer.start_link(__MODULE__, [], name: __MODULE__) + GenServer.start_link(__MODULE__, %{}, name: __MODULE__) + end + + @impl true + def init(_) do + :ok = + :telemetry.attach( + "metrics-ack-tracker", + [:database_producer, :ack, :done], + &handle_ack_received/4, + nil + ) + + {:ok, %{}} end def register_fresh_index(source) do @@ -19,33 +32,21 @@ defmodule DpulCollections.IndexMetricsTracker do Metrics.index_metrics(source.processor_marker_key(), "full_index") end - @impl true - def init(_) do - {:ok, %{}} - end - @impl true def handle_call({:fresh_index, source}, _, state) do - new_state = put_in(state, [source], %{start_time: :erlang.monotonic_time()}) + new_state = + put_in(state, [source.processor_marker_key()], %{ + start_time: :erlang.monotonic_time(), + acked_count: 0 + }) + {:reply, nil, new_state} end def handle_call({:poll_started, source}, _, state) do - if get_in(state, [source, :start_time]) != nil && get_in(state, [source, :end_time]) == nil do - state = put_in(state, [source, :end_time], :erlang.monotonic_time()) - duration = state[source][:end_time] - state[source][:start_time] - - :telemetry.execute( - [:dpulc, :indexing_pipeline, event(source), :time_to_poll], - %{duration: duration}, - %{source: source} - ) - - Metrics.create_index_metric(%{ - type: source.processor_marker_key(), - measurement_type: "full_index", - duration: System.convert_time_unit(duration, :native, :second) - }) + if get_in(state, [source.processor_marker_key(), :start_time]) != nil && + get_in(state, [source.processor_marker_key(), :end_time]) == nil do + state = put_in(state, [source.processor_marker_key(), :request_end], true) {:reply, nil, state} else @@ -53,15 +54,89 @@ defmodule DpulCollections.IndexMetricsTracker do end end - def event(Figgy.HydrationProducerSource) do + def handle_call( + {:ack_received, metadata = %{processor_marker_key: processor_marker_key}}, + _, + state + ) do + state = + state + |> put_in( + [processor_marker_key], + handle_ack_received(metadata, Map.get(state, processor_marker_key)) + ) + + {:reply, nil, state} + end + + # If there's no stored info yet, do nothing. + defp handle_ack_received(_event, nil), do: nil + # If there's a start and end time, do nothing + defp handle_ack_received( + _event, + processor_state = %{start_time: _start_time, end_time: _end_time} + ), + do: processor_state + + # If there's a start, trigger for end time, and the unacked_count is 0, create the IndexMetric. + defp handle_ack_received( + %{ + processor_marker_key: processor_marker_key, + acked_count: new_acked_count, + unacked_count: 0 + }, + processor_state = %{ + start_time: _start_time, + request_end: true, + acked_count: old_acked_count + } + ) do + processor_state = + processor_state + |> put_in([:end_time], :erlang.monotonic_time()) + |> Map.delete(:request_end) + |> put_in([:acked_count], old_acked_count + new_acked_count) + + duration = processor_state[:end_time] - processor_state[:start_time] + + :telemetry.execute( + [:dpulc, :indexing_pipeline, event(processor_marker_key), :time_to_poll], + %{duration: duration}, + %{source: processor_marker_key} + ) + + Metrics.create_index_metric(%{ + type: processor_marker_key, + measurement_type: "full_index", + duration: System.convert_time_unit(duration, :native, :second), + records_acked: processor_state[:acked_count] + }) + + processor_state + end + + # If there's a start time, record the acked_count + defp handle_ack_received( + %{acked_count: new_acked_count}, + processor_state = %{start_time: _start_time, acked_count: old_acked_count} + ) do + processor_state + |> put_in([:acked_count], old_acked_count + new_acked_count) + end + + def event("figgy_hydrator") do :hydrator end - def event(Figgy.TransformationProducerSource) do + def event("figgy_transformer") do :transformer end - def event(Figgy.IndexingProducerSource) do + def event("figgy_indexer") do :indexer end + + defp handle_ack_received([:database_producer, :ack, :done], _measurements, metadata, _config) do + GenServer.call(__MODULE__, {:ack_received, metadata}) + end end diff --git a/lib/dpul_collections/indexing_pipeline/index_metric.ex b/lib/dpul_collections/indexing_pipeline/index_metric.ex index c3b233f3..0ae96c4d 100644 --- a/lib/dpul_collections/indexing_pipeline/index_metric.ex +++ b/lib/dpul_collections/indexing_pipeline/index_metric.ex @@ -7,7 +7,7 @@ defmodule DpulCollections.IndexingPipeline.IndexMetric do field :measurement_type, :string # Duration in seconds field :duration, :integer - field :records_acked, :integer + field :records_acked, :integer, default: 0 timestamps(type: :utc_datetime_usec) end @@ -15,7 +15,7 @@ defmodule DpulCollections.IndexingPipeline.IndexMetric do @doc false def changeset(index_metric, attrs) do index_metric - |> cast(attrs, [:type, :measurement_type, :duration]) - |> validate_required([:type, :measurement_type, :duration]) + |> cast(attrs, [:type, :measurement_type, :duration, :records_acked]) + |> validate_required([:type, :measurement_type, :duration, :records_acked]) end end diff --git a/lib/dpul_collections/indexing_pipeline/metrics.ex b/lib/dpul_collections/indexing_pipeline/metrics.ex index b3ee2c2e..ce15d958 100644 --- a/lib/dpul_collections/indexing_pipeline/metrics.ex +++ b/lib/dpul_collections/indexing_pipeline/metrics.ex @@ -21,7 +21,8 @@ defmodule DpulCollections.IndexingPipeline.Metrics do def index_metrics(type, measurement_type) do query = from r in IndexMetric, - where: r.type == ^type and r.measurement_type == ^measurement_type + where: r.type == ^type and r.measurement_type == ^measurement_type, + order_by: [desc: r.inserted_at] Repo.all(query) end diff --git a/test/dpul_collections/index_metrics_tracker_test.exs b/test/dpul_collections/index_metrics_tracker_test.exs index 8cf9359c..c9a0f575 100644 --- a/test/dpul_collections/index_metrics_tracker_test.exs +++ b/test/dpul_collections/index_metrics_tracker_test.exs @@ -19,7 +19,20 @@ defmodule DpulCollections.IndexMetricsTrackerTest do processor_marker_key: HydrationProducerSource.processor_marker_key() } ) + IndexMetricsTracker.register_polling_started(HydrationProducerSource) + # Send an ack done with unacked_count 1, this tracks ack but doesn't + # finish. + :telemetry.execute( + [:database_producer, :ack, :done], + %{}, + %{ + acked_count: 1, + unacked_count: 1, + processor_marker_key: HydrationProducerSource.processor_marker_key() + } + ) + # Send an ack done with unacked_count 0, this triggers an index time # create. :telemetry.execute( @@ -31,12 +44,13 @@ defmodule DpulCollections.IndexMetricsTrackerTest do processor_marker_key: HydrationProducerSource.processor_marker_key() } ) + [metric = %IndexMetric{}] = IndexMetricsTracker.index_times(HydrationProducerSource) # Assert # This is 0 because it takes less than a second to run. assert metric.duration == 0 - assert metric.records_acked == 2 + assert metric.records_acked == 3 end end end diff --git a/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs b/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs index 09889ce6..25cf1515 100644 --- a/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs +++ b/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs @@ -163,8 +163,7 @@ defmodule DpulCollections.IndexingPipeline.FiggyFullIntegrationTest do # Ensure metrics are being sent. assert_receive {:hydrator_time_to_poll_hit, %{duration: _}} [hydration_metric_1 | _] = IndexMetricsTracker.index_times(HydrationProducerSource) - # This is 0 because hydration production takes less than a second to run. - assert hydration_metric_1.duration == 0 + assert hydration_metric_1.duration > 0 end test "indexes expected fields" do From 795050374c15946939769f2fc656a0196cb57485 Mon Sep 17 00:00:00 2001 From: Trey Pendragon Date: Wed, 20 Nov 2024 14:41:19 -0800 Subject: [PATCH 11/20] Fix coverage. --- test/dpul_collections/index_metrics_tracker_test.exs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/test/dpul_collections/index_metrics_tracker_test.exs b/test/dpul_collections/index_metrics_tracker_test.exs index c9a0f575..e64b9351 100644 --- a/test/dpul_collections/index_metrics_tracker_test.exs +++ b/test/dpul_collections/index_metrics_tracker_test.exs @@ -8,6 +8,18 @@ defmodule DpulCollections.IndexMetricsTrackerTest do describe "index_times/1" do test "registers index times" do # Act + # Send an ack done with acked_count 1, before anything - this should be + # ignored + :telemetry.execute( + [:database_producer, :ack, :done], + %{}, + %{ + acked_count: 1, + unacked_count: 0, + processor_marker_key: HydrationProducerSource.processor_marker_key() + } + ) + IndexMetricsTracker.register_fresh_index(HydrationProducerSource) # Send an ack done with acked_count 1 :telemetry.execute( From ea9abb2f7cb19abbb62f33c7c7f3d3b19891f91f Mon Sep 17 00:00:00 2001 From: Trey Pendragon Date: Wed, 27 Nov 2024 09:11:37 -0800 Subject: [PATCH 12/20] Proof of concept for dashboard page. --- .../indexing_pipeline/dashboard_page.ex | 107 ++++++++++++++++++ lib/dpul_collections_web/router.ex | 3 +- 2 files changed, 109 insertions(+), 1 deletion(-) create mode 100644 lib/dpul_collections/indexing_pipeline/dashboard_page.ex diff --git a/lib/dpul_collections/indexing_pipeline/dashboard_page.ex b/lib/dpul_collections/indexing_pipeline/dashboard_page.ex new file mode 100644 index 00000000..a555ca24 --- /dev/null +++ b/lib/dpul_collections/indexing_pipeline/dashboard_page.ex @@ -0,0 +1,107 @@ +defmodule DpulCollections.IndexingPipeline.DashboardPage do + alias DpulCollections.IndexingPipeline.Figgy.IndexingProducerSource + alias DpulCollections.IndexingPipeline.Figgy.TransformationProducerSource + alias DpulCollections.IndexingPipeline.Figgy.HydrationProducerSource + alias DpulCollections.IndexMetricsTracker + use Phoenix.LiveDashboard.PageBuilder + + @impl true + def mount(_params, _session, socket) do + socket = + assign(socket, + hydration_times: IndexMetricsTracker.index_times(HydrationProducerSource), + transformation_times: IndexMetricsTracker.index_times(TransformationProducerSource), + indexing_times: IndexMetricsTracker.index_times(IndexingProducerSource) + ) + + {:ok, socket, temporary_assigns: [item_count: nil]} + end + + @impl true + def menu_link(_, _) do + {:ok, "Index Metrics"} + end + + defp hydration_times(_params, _node) do + hydration_times = + IndexMetricsTracker.index_times(HydrationProducerSource) + |> Enum.map(&Map.from_struct/1) + + {hydration_times, length(hydration_times)} + end + + defp transformation_times(_params, _node) do + transformation_times = + IndexMetricsTracker.index_times(TransformationProducerSource) + |> Enum.map(&Map.from_struct/1) + + {transformation_times, length(transformation_times)} + end + + defp indexing_times(_params, _node) do + indexing_times = + IndexMetricsTracker.index_times(IndexingProducerSource) + |> Enum.map(&Map.from_struct/1) + + {indexing_times, length(indexing_times)} + end + + @impl true + def render(assigns) do + ~H""" + <.live_table + id="hydration-table" + dom_id="hydration-table" + page={@page} + title="Hydration Metric Times (1 hour .. 2 days)" + row_fetcher={&hydration_times/2} + rows_name="metrics" + > + <:col field={:updated_at} sortable={:desc} /> + <:col field={:duration} header="Duration (s)" /> + <:col field={:records_acked} header="Record Count" /> + <:col :let={record} field={:per_second} header="Records per Second"> + <%= per_second(record) %> + + + <.live_table + id="transformation-table" + dom_id="transformation-table" + page={@page} + title="Transformation Metric Times (30 minutes .. 2 hours)" + row_fetcher={&transformation_times/2} + rows_name="metrics" + > + <:col field={:updated_at} sortable={:desc} /> + <:col field={:duration} header="Duration (s)" /> + <:col field={:records_acked} header="Record Count" /> + <:col :let={record} field={:per_second} header="Records per Second"> + <%= per_second(record) %> + + + <.live_table + id="indexing-table" + dom_id="indexing-table" + page={@page} + title="Indexing Metric Times (10 minutes .. 1 hour)" + row_fetcher={&indexing_times/2} + rows_name="metrics" + > + <:col field={:updated_at} sortable={:desc} /> + <:col field={:duration} header="Duration (s)" /> + <:col field={:records_acked} header="Record Count" /> + <:col :let={record} field={:per_second} header="Records per Second"> + <%= per_second(record) %> + + + """ + end + + defp per_second(%{duration: 0, records_acked: records_acked}) do + records_acked + end + + defp per_second(%{duration: duration, records_acked: records_acked}) do + records_acked / duration + end +end diff --git a/lib/dpul_collections_web/router.ex b/lib/dpul_collections_web/router.ex index 3bcab5e4..38652aa6 100644 --- a/lib/dpul_collections_web/router.ex +++ b/lib/dpul_collections_web/router.ex @@ -50,7 +50,8 @@ defmodule DpulCollectionsWeb.Router do live_dashboard "/dashboard", metrics: DpulCollectionsWeb.Telemetry, additional_pages: [ - broadway: BroadwayDashboard + broadway: BroadwayDashboard, + index_metrics: DpulCollections.IndexingPipeline.DashboardPage ] forward "/mailbox", Plug.Swoosh.MailboxPreview From 3e76b78d90ed10ed56eb47888f65c2f8d76eb0b0 Mon Sep 17 00:00:00 2001 From: Trey Pendragon Date: Wed, 27 Nov 2024 09:43:07 -0800 Subject: [PATCH 13/20] Add tests, format duration better. --- .../indexing_pipeline/dashboard_page.ex | 28 +++++++++-- .../indexing_pipeline/dashboard_page_test.exs | 48 +++++++++++++++++++ 2 files changed, 73 insertions(+), 3 deletions(-) create mode 100644 test/dpul_collections/indexing_pipeline/dashboard_page_test.exs diff --git a/lib/dpul_collections/indexing_pipeline/dashboard_page.ex b/lib/dpul_collections/indexing_pipeline/dashboard_page.ex index a555ca24..003ab459 100644 --- a/lib/dpul_collections/indexing_pipeline/dashboard_page.ex +++ b/lib/dpul_collections/indexing_pipeline/dashboard_page.ex @@ -58,7 +58,9 @@ defmodule DpulCollections.IndexingPipeline.DashboardPage do rows_name="metrics" > <:col field={:updated_at} sortable={:desc} /> - <:col field={:duration} header="Duration (s)" /> + <:col :let={record} field={:duration} header="Duration (hh:mm:ss)"> + <%= to_hh_mm_ss(record.duration) %> + <:col field={:records_acked} header="Record Count" /> <:col :let={record} field={:per_second} header="Records per Second"> <%= per_second(record) %> @@ -73,7 +75,9 @@ defmodule DpulCollections.IndexingPipeline.DashboardPage do rows_name="metrics" > <:col field={:updated_at} sortable={:desc} /> - <:col field={:duration} header="Duration (s)" /> + <:col :let={record} field={:duration} header="Duration (hh:mm:ss)"> + <%= to_hh_mm_ss(record.duration) %> + <:col field={:records_acked} header="Record Count" /> <:col :let={record} field={:per_second} header="Records per Second"> <%= per_second(record) %> @@ -88,7 +92,9 @@ defmodule DpulCollections.IndexingPipeline.DashboardPage do rows_name="metrics" > <:col field={:updated_at} sortable={:desc} /> - <:col field={:duration} header="Duration (s)" /> + <:col :let={record} field={:duration} header="Duration (hh:mm:ss)"> + <%= to_hh_mm_ss(record.duration) %> + <:col field={:records_acked} header="Record Count" /> <:col :let={record} field={:per_second} header="Records per Second"> <%= per_second(record) %> @@ -104,4 +110,20 @@ defmodule DpulCollections.IndexingPipeline.DashboardPage do defp per_second(%{duration: duration, records_acked: records_acked}) do records_acked / duration end + + # Pulled from + # https://nickjanetakis.com/blog/formatting-seconds-into-hh-mm-ss-with-elixir-and-python + # and modified to be consistently hh:mm:ss + defp to_hh_mm_ss(0), do: "00:00:00" + + defp to_hh_mm_ss(seconds) do + units = [3600, 60, 1] + # Returns a list of how many hours, minutes, and seconds there are, reducing + # the total seconds by that amount if it's greater than 1. + t = + Enum.map_reduce(units, seconds, fn unit, val -> {div(val, unit), rem(val, unit)} end) + |> elem(0) + + Enum.map_join(t, ":", fn x -> x |> Integer.to_string() |> String.pad_leading(2, "0") end) + end end diff --git a/test/dpul_collections/indexing_pipeline/dashboard_page_test.exs b/test/dpul_collections/indexing_pipeline/dashboard_page_test.exs new file mode 100644 index 00000000..7519e199 --- /dev/null +++ b/test/dpul_collections/indexing_pipeline/dashboard_page_test.exs @@ -0,0 +1,48 @@ +defmodule DpuLCollections.IndexingPipeline.DashboardPageTest do + alias DpulCollections.IndexingPipeline.Figgy.IndexingProducerSource + alias DpulCollections.IndexingPipeline.Figgy.TransformationProducerSource + alias DpulCollections.IndexingPipeline.Figgy.HydrationProducerSource + alias DpulCollections.IndexingPipeline.Metrics + use DpulCollectionsWeb.ConnCase + import Phoenix.LiveViewTest + @endpoint DpulCollectionsWeb.Endpoint + + test "GET /dev/dashboard/index_metrics", %{conn: conn} do + Metrics.create_index_metric(%{ + type: HydrationProducerSource.processor_marker_key(), + measurement_type: "full_index", + duration: 0, + records_acked: 20 + }) + + Metrics.create_index_metric(%{ + type: TransformationProducerSource.processor_marker_key(), + measurement_type: "full_index", + duration: 10, + records_acked: 20 + }) + + Metrics.create_index_metric(%{ + type: IndexingProducerSource.processor_marker_key(), + measurement_type: "full_index", + duration: 200, + records_acked: 60 + }) + + {:ok, view, html} = + conn + |> put_req_header("authorization", "Basic " <> Base.encode64("admin:test")) + |> get(~p"/dev/dashboard/index_metrics") + |> live + + assert html =~ "Hydration Metric Times" + assert html =~ "Transformation Metric Times" + assert html =~ "Indexing Metric Times" + assert has_element?(view, "td.hydration-table-per_second", "20") + assert has_element?(view, "td.hydration-table-duration", "00:00:00") + assert has_element?(view, "td.transformation-table-per_second", "2") + assert has_element?(view, "td.transformation-table-duration", "00:00:10") + assert has_element?(view, "td.indexing-table-per_second", "0.3") + assert has_element?(view, "td.indexing-table-duration", "00:03:20") + end +end From 74b4c983ae0219d5ef4edfed64fa7e35f74bd513 Mon Sep 17 00:00:00 2001 From: Trey Pendragon Date: Wed, 27 Nov 2024 09:45:36 -0800 Subject: [PATCH 14/20] Remove indexing metrics. --- lib/dpul_collections_web/telemetry.ex | 33 --------------------------- 1 file changed, 33 deletions(-) diff --git a/lib/dpul_collections_web/telemetry.ex b/lib/dpul_collections_web/telemetry.ex index 9d98945a..7e5db1e9 100644 --- a/lib/dpul_collections_web/telemetry.ex +++ b/lib/dpul_collections_web/telemetry.ex @@ -1,7 +1,6 @@ defmodule DpulCollectionsWeb.Telemetry do use Supervisor import Telemetry.Metrics - alias DpulCollections.IndexingPipeline.Figgy def start_link(arg) do Supervisor.start_link(__MODULE__, arg, name: __MODULE__) @@ -80,38 +79,6 @@ defmodule DpulCollectionsWeb.Telemetry do summary("vm.total_run_queue_lengths.total"), summary("vm.total_run_queue_lengths.cpu"), summary("vm.total_run_queue_lengths.io"), - - # Indexing Metrics - last_value( - "Hydrator Full Run", - event_name: "dpulc.indexing_pipeline.hydrator.time_to_poll", - measurement: :duration, - unit: {:native, :second}, - description: "Time from fresh index to poll for the Hydrator", - reporter_options: [ - nav: "Indexing Pipeline" - ] - ), - last_value( - "Transformer Full Run", - event_name: "dpulc.indexing_pipeline.transformer.time_to_poll", - measurement: :duration, - unit: {:native, :second}, - description: "Time from fresh index to poll for the Transformer", - reporter_options: [ - nav: "Indexing Pipeline" - ] - ), - last_value( - "Indexer Full Run", - event_name: "dpulc.indexing_pipeline.indexer.time_to_poll", - measurement: :duration, - unit: {:native, :second}, - description: "Time from fresh index to poll for the Indexer", - reporter_options: [ - nav: "Indexing Pipeline" - ] - ) ] end From be8b41ae133e7a8a82d2e25c8ee3c0c641e14ec9 Mon Sep 17 00:00:00 2001 From: Trey Pendragon Date: Wed, 27 Nov 2024 10:30:58 -0800 Subject: [PATCH 15/20] Format. --- lib/dpul_collections_web/telemetry.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/dpul_collections_web/telemetry.ex b/lib/dpul_collections_web/telemetry.ex index 7e5db1e9..1a8ee906 100644 --- a/lib/dpul_collections_web/telemetry.ex +++ b/lib/dpul_collections_web/telemetry.ex @@ -78,7 +78,7 @@ defmodule DpulCollectionsWeb.Telemetry do summary("vm.memory.total", unit: {:byte, :kilobyte}), summary("vm.total_run_queue_lengths.total"), summary("vm.total_run_queue_lengths.cpu"), - summary("vm.total_run_queue_lengths.io"), + summary("vm.total_run_queue_lengths.io") ] end From 0c0bc37a2727c02c5344e963d89a7eeb3182922f Mon Sep 17 00:00:00 2001 From: Trey Pendragon Date: Wed, 27 Nov 2024 11:07:04 -0800 Subject: [PATCH 16/20] Reset index metrics tracker before testing it. --- lib/dpul_collections/index_metrics_tracker.ex | 9 +++++++++ test/dpul_collections/index_metrics_tracker_test.exs | 5 +++++ 2 files changed, 14 insertions(+) diff --git a/lib/dpul_collections/index_metrics_tracker.ex b/lib/dpul_collections/index_metrics_tracker.ex index d30edad0..122d98c8 100644 --- a/lib/dpul_collections/index_metrics_tracker.ex +++ b/lib/dpul_collections/index_metrics_tracker.ex @@ -32,6 +32,15 @@ defmodule DpulCollections.IndexMetricsTracker do Metrics.index_metrics(source.processor_marker_key(), "full_index") end + def reset() do + GenServer.call(__MODULE__, {:reset}) + end + + @impl true + def handle_call({:reset}, _, _state) do + {:reply, nil, %{}} + end + @impl true def handle_call({:fresh_index, source}, _, state) do new_state = diff --git a/test/dpul_collections/index_metrics_tracker_test.exs b/test/dpul_collections/index_metrics_tracker_test.exs index e64b9351..08b743f9 100644 --- a/test/dpul_collections/index_metrics_tracker_test.exs +++ b/test/dpul_collections/index_metrics_tracker_test.exs @@ -6,6 +6,11 @@ defmodule DpulCollections.IndexMetricsTrackerTest do use DpulCollections.DataCase describe "index_times/1" do + setup do + IndexMetricsTracker.reset() + :ok + end + test "registers index times" do # Act # Send an ack done with acked_count 1, before anything - this should be From 389e95bc8372c79f7be356e86180b60ed39ac0a4 Mon Sep 17 00:00:00 2001 From: Trey Pendragon Date: Wed, 27 Nov 2024 12:43:28 -0800 Subject: [PATCH 17/20] Move DashboardPage --- .../indexing_pipeline/dashboard_page.ex | 2 +- lib/dpul_collections_web/router.ex | 2 +- .../indexing_pipeline/dashboard_page_test.exs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) rename lib/{dpul_collections => dpul_collections_web}/indexing_pipeline/dashboard_page.ex (98%) rename test/{dpul_collections => dpul_collections_web}/indexing_pipeline/dashboard_page_test.exs (96%) diff --git a/lib/dpul_collections/indexing_pipeline/dashboard_page.ex b/lib/dpul_collections_web/indexing_pipeline/dashboard_page.ex similarity index 98% rename from lib/dpul_collections/indexing_pipeline/dashboard_page.ex rename to lib/dpul_collections_web/indexing_pipeline/dashboard_page.ex index 003ab459..19f6cf62 100644 --- a/lib/dpul_collections/indexing_pipeline/dashboard_page.ex +++ b/lib/dpul_collections_web/indexing_pipeline/dashboard_page.ex @@ -1,4 +1,4 @@ -defmodule DpulCollections.IndexingPipeline.DashboardPage do +defmodule DpulCollectionsWeb.IndexingPipeline.DashboardPage do alias DpulCollections.IndexingPipeline.Figgy.IndexingProducerSource alias DpulCollections.IndexingPipeline.Figgy.TransformationProducerSource alias DpulCollections.IndexingPipeline.Figgy.HydrationProducerSource diff --git a/lib/dpul_collections_web/router.ex b/lib/dpul_collections_web/router.ex index 38652aa6..8ddf3cbf 100644 --- a/lib/dpul_collections_web/router.ex +++ b/lib/dpul_collections_web/router.ex @@ -51,7 +51,7 @@ defmodule DpulCollectionsWeb.Router do metrics: DpulCollectionsWeb.Telemetry, additional_pages: [ broadway: BroadwayDashboard, - index_metrics: DpulCollections.IndexingPipeline.DashboardPage + index_metrics: DpulCollectionsWeb.IndexingPipeline.DashboardPage ] forward "/mailbox", Plug.Swoosh.MailboxPreview diff --git a/test/dpul_collections/indexing_pipeline/dashboard_page_test.exs b/test/dpul_collections_web/indexing_pipeline/dashboard_page_test.exs similarity index 96% rename from test/dpul_collections/indexing_pipeline/dashboard_page_test.exs rename to test/dpul_collections_web/indexing_pipeline/dashboard_page_test.exs index 7519e199..a798649c 100644 --- a/test/dpul_collections/indexing_pipeline/dashboard_page_test.exs +++ b/test/dpul_collections_web/indexing_pipeline/dashboard_page_test.exs @@ -1,4 +1,4 @@ -defmodule DpuLCollections.IndexingPipeline.DashboardPageTest do +defmodule DpuLCollectionsWeb.IndexingPipeline.DashboardPageTest do alias DpulCollections.IndexingPipeline.Figgy.IndexingProducerSource alias DpulCollections.IndexingPipeline.Figgy.TransformationProducerSource alias DpulCollections.IndexingPipeline.Figgy.HydrationProducerSource From 761426436c066021dab9f4509fc36766237ca2cb Mon Sep 17 00:00:00 2001 From: Trey Pendragon Date: Wed, 27 Nov 2024 12:59:03 -0800 Subject: [PATCH 18/20] Add some documentation specs. --- lib/dpul_collections/index_metrics_tracker.ex | 28 ++++++++++++++++--- .../indexing_pipeline/database_producer.ex | 5 ++++ 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/lib/dpul_collections/index_metrics_tracker.ex b/lib/dpul_collections/index_metrics_tracker.ex index 122d98c8..317ce0c6 100644 --- a/lib/dpul_collections/index_metrics_tracker.ex +++ b/lib/dpul_collections/index_metrics_tracker.ex @@ -1,7 +1,15 @@ defmodule DpulCollections.IndexMetricsTracker do use GenServer alias DpulCollections.IndexingPipeline.Metrics - alias DpulCollections.IndexingPipeline.Figgy + alias DpulCollections.IndexingPipeline.DatabaseProducer + + @type processor_state :: %{ + start_time: integer(), + end_time: integer(), + polling_started: boolean(), + acked_count: integer() + } + @type state :: %{(processor_key :: String.t()) => processor_state()} def start_link(_) do GenServer.start_link(__MODULE__, %{}, name: __MODULE__) @@ -20,14 +28,17 @@ defmodule DpulCollections.IndexMetricsTracker do {:ok, %{}} end + @spec register_fresh_index(source :: module()) :: term() def register_fresh_index(source) do GenServer.call(__MODULE__, {:fresh_index, source}) end + @spec register_polling_started(source :: module()) :: term() def register_polling_started(source) do GenServer.call(__MODULE__, {:poll_started, source}) end + @spec index_times(source :: module()) :: term() def index_times(source) do Metrics.index_metrics(source.processor_marker_key(), "full_index") end @@ -37,11 +48,13 @@ defmodule DpulCollections.IndexMetricsTracker do end @impl true + @spec handle_call(term(), term(), state()) :: term() def handle_call({:reset}, _, _state) do {:reply, nil, %{}} end @impl true + @spec handle_call(term(), term(), state()) :: term() def handle_call({:fresh_index, source}, _, state) do new_state = put_in(state, [source.processor_marker_key()], %{ @@ -52,10 +65,14 @@ defmodule DpulCollections.IndexMetricsTracker do {:reply, nil, new_state} end + @spec handle_call(term(), term(), state()) :: term() def handle_call({:poll_started, source}, _, state) do + # Record that polling has started if we've recorded a start time but not an + # end time for a source. Then the next the source finishes acknowledgements + # we'll record an end time. if get_in(state, [source.processor_marker_key(), :start_time]) != nil && get_in(state, [source.processor_marker_key(), :end_time]) == nil do - state = put_in(state, [source.processor_marker_key(), :request_end], true) + state = put_in(state, [source.processor_marker_key(), :polling_started], true) {:reply, nil, state} else @@ -63,6 +80,7 @@ defmodule DpulCollections.IndexMetricsTracker do end end + @spec handle_call(term(), term(), state()) :: term() def handle_call( {:ack_received, metadata = %{processor_marker_key: processor_marker_key}}, _, @@ -79,6 +97,8 @@ defmodule DpulCollections.IndexMetricsTracker do end # If there's no stored info yet, do nothing. + @spec handle_ack_received(DatabaseProducer.ack_event_metadata(), processor_state()) :: + processor_state() defp handle_ack_received(_event, nil), do: nil # If there's a start and end time, do nothing defp handle_ack_received( @@ -96,14 +116,14 @@ defmodule DpulCollections.IndexMetricsTracker do }, processor_state = %{ start_time: _start_time, - request_end: true, + polling_started: true, acked_count: old_acked_count } ) do processor_state = processor_state |> put_in([:end_time], :erlang.monotonic_time()) - |> Map.delete(:request_end) + |> Map.delete(:polling_started) |> put_in([:acked_count], old_acked_count + new_acked_count) duration = processor_state[:end_time] - processor_state[:start_time] diff --git a/lib/dpul_collections/indexing_pipeline/database_producer.ex b/lib/dpul_collections/indexing_pipeline/database_producer.ex index 9f4f53fe..41af71e0 100644 --- a/lib/dpul_collections/indexing_pipeline/database_producer.ex +++ b/lib/dpul_collections/indexing_pipeline/database_producer.ex @@ -225,6 +225,11 @@ defmodule DpulCollections.IndexingPipeline.DatabaseProducer do # This happens when ack is finished, we listen to this telemetry event in # tests so we know when the Producer's done processing a message. @spec notify_ack(integer(), integer(), String.t()) :: any() + @type ack_event_metadata :: %{ + acked_count: integer(), + unacked_count: integer(), + processor_marker_key: String.t() + } defp notify_ack(acked_message_count, unacked_count, processor_marker_key) do :telemetry.execute( [:database_producer, :ack, :done], From efbe0f5284c7adcb601f3d9bf773a4d5349d986a Mon Sep 17 00:00:00 2001 From: Trey Pendragon Date: Wed, 27 Nov 2024 13:00:54 -0800 Subject: [PATCH 19/20] index_times -> index_durations --- lib/dpul_collections/index_metrics_tracker.ex | 6 +++--- .../indexing_pipeline/dashboard_page.ex | 12 ++++++------ test/dpul_collections/index_metrics_tracker_test.exs | 4 ++-- .../integration/full_integration_test.exs | 2 +- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/lib/dpul_collections/index_metrics_tracker.ex b/lib/dpul_collections/index_metrics_tracker.ex index 317ce0c6..18765574 100644 --- a/lib/dpul_collections/index_metrics_tracker.ex +++ b/lib/dpul_collections/index_metrics_tracker.ex @@ -38,8 +38,8 @@ defmodule DpulCollections.IndexMetricsTracker do GenServer.call(__MODULE__, {:poll_started, source}) end - @spec index_times(source :: module()) :: term() - def index_times(source) do + @spec index_durations(source :: module()) :: term() + def index_durations(source) do Metrics.index_metrics(source.processor_marker_key(), "full_index") end @@ -68,7 +68,7 @@ defmodule DpulCollections.IndexMetricsTracker do @spec handle_call(term(), term(), state()) :: term() def handle_call({:poll_started, source}, _, state) do # Record that polling has started if we've recorded a start time but not an - # end time for a source. Then the next the source finishes acknowledgements + # end time for a source. Then the next time the source finishes acknowledgements # we'll record an end time. if get_in(state, [source.processor_marker_key(), :start_time]) != nil && get_in(state, [source.processor_marker_key(), :end_time]) == nil do diff --git a/lib/dpul_collections_web/indexing_pipeline/dashboard_page.ex b/lib/dpul_collections_web/indexing_pipeline/dashboard_page.ex index 19f6cf62..9f7f421c 100644 --- a/lib/dpul_collections_web/indexing_pipeline/dashboard_page.ex +++ b/lib/dpul_collections_web/indexing_pipeline/dashboard_page.ex @@ -9,9 +9,9 @@ defmodule DpulCollectionsWeb.IndexingPipeline.DashboardPage do def mount(_params, _session, socket) do socket = assign(socket, - hydration_times: IndexMetricsTracker.index_times(HydrationProducerSource), - transformation_times: IndexMetricsTracker.index_times(TransformationProducerSource), - indexing_times: IndexMetricsTracker.index_times(IndexingProducerSource) + hydration_times: IndexMetricsTracker.index_durations(HydrationProducerSource), + transformation_times: IndexMetricsTracker.index_durations(TransformationProducerSource), + indexing_times: IndexMetricsTracker.index_durations(IndexingProducerSource) ) {:ok, socket, temporary_assigns: [item_count: nil]} @@ -24,7 +24,7 @@ defmodule DpulCollectionsWeb.IndexingPipeline.DashboardPage do defp hydration_times(_params, _node) do hydration_times = - IndexMetricsTracker.index_times(HydrationProducerSource) + IndexMetricsTracker.index_durations(HydrationProducerSource) |> Enum.map(&Map.from_struct/1) {hydration_times, length(hydration_times)} @@ -32,7 +32,7 @@ defmodule DpulCollectionsWeb.IndexingPipeline.DashboardPage do defp transformation_times(_params, _node) do transformation_times = - IndexMetricsTracker.index_times(TransformationProducerSource) + IndexMetricsTracker.index_durations(TransformationProducerSource) |> Enum.map(&Map.from_struct/1) {transformation_times, length(transformation_times)} @@ -40,7 +40,7 @@ defmodule DpulCollectionsWeb.IndexingPipeline.DashboardPage do defp indexing_times(_params, _node) do indexing_times = - IndexMetricsTracker.index_times(IndexingProducerSource) + IndexMetricsTracker.index_durations(IndexingProducerSource) |> Enum.map(&Map.from_struct/1) {indexing_times, length(indexing_times)} diff --git a/test/dpul_collections/index_metrics_tracker_test.exs b/test/dpul_collections/index_metrics_tracker_test.exs index 08b743f9..abcc63d0 100644 --- a/test/dpul_collections/index_metrics_tracker_test.exs +++ b/test/dpul_collections/index_metrics_tracker_test.exs @@ -5,7 +5,7 @@ defmodule DpulCollections.IndexMetricsTrackerTest do alias Phoenix.ActionClauseError use DpulCollections.DataCase - describe "index_times/1" do + describe "index_durations/1" do setup do IndexMetricsTracker.reset() :ok @@ -62,7 +62,7 @@ defmodule DpulCollections.IndexMetricsTrackerTest do } ) - [metric = %IndexMetric{}] = IndexMetricsTracker.index_times(HydrationProducerSource) + [metric = %IndexMetric{}] = IndexMetricsTracker.index_durations(HydrationProducerSource) # Assert # This is 0 because it takes less than a second to run. diff --git a/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs b/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs index 25cf1515..58a9bf55 100644 --- a/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs +++ b/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs @@ -162,7 +162,7 @@ defmodule DpulCollections.IndexingPipeline.FiggyFullIntegrationTest do # Ensure metrics are being sent. assert_receive {:hydrator_time_to_poll_hit, %{duration: _}} - [hydration_metric_1 | _] = IndexMetricsTracker.index_times(HydrationProducerSource) + [hydration_metric_1 | _] = IndexMetricsTracker.index_durations(HydrationProducerSource) assert hydration_metric_1.duration > 0 end From 019182daae4e1704adc26333130bc4b0496099b4 Mon Sep 17 00:00:00 2001 From: Trey Pendragon Date: Wed, 27 Nov 2024 13:11:33 -0800 Subject: [PATCH 20/20] Relocate function. --- lib/dpul_collections/index_metrics_tracker.ex | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/dpul_collections/index_metrics_tracker.ex b/lib/dpul_collections/index_metrics_tracker.ex index 18765574..d5df41bf 100644 --- a/lib/dpul_collections/index_metrics_tracker.ex +++ b/lib/dpul_collections/index_metrics_tracker.ex @@ -153,6 +153,10 @@ defmodule DpulCollections.IndexMetricsTracker do |> put_in([:acked_count], old_acked_count + new_acked_count) end + defp handle_ack_received([:database_producer, :ack, :done], _measurements, metadata, _config) do + GenServer.call(__MODULE__, {:ack_received, metadata}) + end + def event("figgy_hydrator") do :hydrator end @@ -164,8 +168,4 @@ defmodule DpulCollections.IndexMetricsTracker do def event("figgy_indexer") do :indexer end - - defp handle_ack_received([:database_producer, :ack, :done], _measurements, metadata, _config) do - GenServer.call(__MODULE__, {:ack_received, metadata}) - end end