Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Measure Hydration/Transformation/Indexing Times #153

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lib/dpul_collections/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ defmodule DpulCollections.Application do
# Start a worker by calling: DpulCollections.Worker.start_link(arg)
# {DpulCollections.Worker, arg},
# Start to serve requests, typically the last entry
DpulCollectionsWeb.Endpoint
DpulCollectionsWeb.Endpoint,
DpulCollections.IndexMetricsTracker
] ++ environment_children(Application.fetch_env!(:dpul_collections, :current_env))

# See https://hexdocs.pm/elixir/Supervisor.html
Expand Down
151 changes: 151 additions & 0 deletions lib/dpul_collections/index_metrics_tracker.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
defmodule DpulCollections.IndexMetricsTracker do
use GenServer
alias DpulCollections.IndexingPipeline.Metrics
alias DpulCollections.IndexingPipeline.Figgy

def start_link(_) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end

@impl true
def init(_) do
:ok =
:telemetry.attach(
"metrics-ack-tracker",
[:database_producer, :ack, :done],
&handle_ack_received/4,
nil
)

{:ok, %{}}
end

def register_fresh_index(source) do
tpendragon marked this conversation as resolved.
Show resolved Hide resolved
GenServer.call(__MODULE__, {:fresh_index, source})
end

def register_polling_started(source) do
GenServer.call(__MODULE__, {:poll_started, source})
end

def index_times(source) do
Metrics.index_metrics(source.processor_marker_key(), "full_index")
end

def reset() do
GenServer.call(__MODULE__, {:reset})
end

@impl true
def handle_call({:reset}, _, _state) do
{:reply, nil, %{}}
end

@impl true
def handle_call({:fresh_index, source}, _, state) do
new_state =
put_in(state, [source.processor_marker_key()], %{
start_time: :erlang.monotonic_time(),
acked_count: 0
})

{:reply, nil, new_state}
end

def handle_call({:poll_started, source}, _, state) do
if get_in(state, [source.processor_marker_key(), :start_time]) != nil &&
get_in(state, [source.processor_marker_key(), :end_time]) == nil do
state = put_in(state, [source.processor_marker_key(), :request_end], true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure what :request_end is, not entirely sure what state should / could look like


{:reply, nil, state}
else
{:reply, nil, state}
end
end

def handle_call(
{:ack_received, metadata = %{processor_marker_key: processor_marker_key}},
_,
state
) do
state =
state
|> put_in(
[processor_marker_key],
handle_ack_received(metadata, Map.get(state, processor_marker_key))
)

{:reply, nil, state}
end

# If there's no stored info yet, do nothing.
defp handle_ack_received(_event, nil), do: nil
# If there's a start and end time, do nothing
defp handle_ack_received(
_event,
processor_state = %{start_time: _start_time, end_time: _end_time}
),
do: processor_state

# If there's a start, trigger for end time, and the unacked_count is 0, create the IndexMetric.
defp handle_ack_received(
%{
processor_marker_key: processor_marker_key,
acked_count: new_acked_count,
unacked_count: 0
},
processor_state = %{
start_time: _start_time,
request_end: true,
acked_count: old_acked_count
}
) do
processor_state =
processor_state
|> put_in([:end_time], :erlang.monotonic_time())
|> Map.delete(:request_end)
|> put_in([:acked_count], old_acked_count + new_acked_count)

duration = processor_state[:end_time] - processor_state[:start_time]

:telemetry.execute(
[:dpulc, :indexing_pipeline, event(processor_marker_key), :time_to_poll],
%{duration: duration},
%{source: processor_marker_key}
)

Metrics.create_index_metric(%{
type: processor_marker_key,
measurement_type: "full_index",
duration: System.convert_time_unit(duration, :native, :second),
records_acked: processor_state[:acked_count]
})

processor_state
end

# If there's a start time, record the acked_count
defp handle_ack_received(
%{acked_count: new_acked_count},
processor_state = %{start_time: _start_time, acked_count: old_acked_count}
) do
processor_state
|> put_in([:acked_count], old_acked_count + new_acked_count)
end

def event("figgy_hydrator") do
:hydrator
end

def event("figgy_transformer") do
:transformer
end

def event("figgy_indexer") do
:indexer
end

defp handle_ack_received([:database_producer, :ack, :done], _measurements, metadata, _config) do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one is split from the others?

GenServer.call(__MODULE__, {:ack_received, metadata})
end
end
129 changes: 129 additions & 0 deletions lib/dpul_collections/indexing_pipeline/dashboard_page.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
defmodule DpulCollections.IndexingPipeline.DashboardPage do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this should be in DpulCollectionsWeb somewhere, is there a particular reason it's here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put it here because it didn't really seem like it was part of the app, but happy to move it.

alias DpulCollections.IndexingPipeline.Figgy.IndexingProducerSource
alias DpulCollections.IndexingPipeline.Figgy.TransformationProducerSource
alias DpulCollections.IndexingPipeline.Figgy.HydrationProducerSource
alias DpulCollections.IndexMetricsTracker
use Phoenix.LiveDashboard.PageBuilder

@impl true
def mount(_params, _session, socket) do
socket =
assign(socket,
hydration_times: IndexMetricsTracker.index_times(HydrationProducerSource),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we could rename the index_times function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about cache_times?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh interesting, I tried index_durations - it's the index part that's confusing?

transformation_times: IndexMetricsTracker.index_times(TransformationProducerSource),
indexing_times: IndexMetricsTracker.index_times(IndexingProducerSource)
)

{:ok, socket, temporary_assigns: [item_count: nil]}
end

@impl true
def menu_link(_, _) do
{:ok, "Index Metrics"}
end

defp hydration_times(_params, _node) do
hydration_times =
IndexMetricsTracker.index_times(HydrationProducerSource)
|> Enum.map(&Map.from_struct/1)

{hydration_times, length(hydration_times)}
end

defp transformation_times(_params, _node) do
transformation_times =
IndexMetricsTracker.index_times(TransformationProducerSource)
|> Enum.map(&Map.from_struct/1)

{transformation_times, length(transformation_times)}
end

defp indexing_times(_params, _node) do
indexing_times =
IndexMetricsTracker.index_times(IndexingProducerSource)
|> Enum.map(&Map.from_struct/1)

{indexing_times, length(indexing_times)}
end

@impl true
def render(assigns) do
~H"""
<.live_table
id="hydration-table"
dom_id="hydration-table"
page={@page}
title="Hydration Metric Times (1 hour .. 2 days)"
row_fetcher={&hydration_times/2}
rows_name="metrics"
>
<:col field={:updated_at} sortable={:desc} />
<:col :let={record} field={:duration} header="Duration (hh:mm:ss)">
<%= to_hh_mm_ss(record.duration) %>
</:col>
<:col field={:records_acked} header="Record Count" />
<:col :let={record} field={:per_second} header="Records per Second">
<%= per_second(record) %>
</:col>
</.live_table>
<.live_table
id="transformation-table"
dom_id="transformation-table"
page={@page}
title="Transformation Metric Times (30 minutes .. 2 hours)"
row_fetcher={&transformation_times/2}
rows_name="metrics"
>
<:col field={:updated_at} sortable={:desc} />
<:col :let={record} field={:duration} header="Duration (hh:mm:ss)">
<%= to_hh_mm_ss(record.duration) %>
</:col>
<:col field={:records_acked} header="Record Count" />
<:col :let={record} field={:per_second} header="Records per Second">
<%= per_second(record) %>
</:col>
</.live_table>
<.live_table
id="indexing-table"
dom_id="indexing-table"
page={@page}
title="Indexing Metric Times (10 minutes .. 1 hour)"
row_fetcher={&indexing_times/2}
rows_name="metrics"
>
<:col field={:updated_at} sortable={:desc} />
<:col :let={record} field={:duration} header="Duration (hh:mm:ss)">
<%= to_hh_mm_ss(record.duration) %>
</:col>
<:col field={:records_acked} header="Record Count" />
<:col :let={record} field={:per_second} header="Records per Second">
<%= per_second(record) %>
</:col>
</.live_table>
"""
end

defp per_second(%{duration: 0, records_acked: records_acked}) do
records_acked
end

defp per_second(%{duration: duration, records_acked: records_acked}) do
records_acked / duration
end

# Pulled from
# https://nickjanetakis.com/blog/formatting-seconds-into-hh-mm-ss-with-elixir-and-python
# and modified to be consistently hh:mm:ss
defp to_hh_mm_ss(0), do: "00:00:00"

defp to_hh_mm_ss(seconds) do
units = [3600, 60, 1]
# Returns a list of how many hours, minutes, and seconds there are, reducing
# the total seconds by that amount if it's greater than 1.
t =
Enum.map_reduce(units, seconds, fn unit, val -> {div(val, unit), rem(val, unit)} end)
|> elem(0)

Enum.map_join(t, ":", fn x -> x |> Integer.to_string() |> String.pad_leading(2, "0") end)
end
end
22 changes: 18 additions & 4 deletions lib/dpul_collections/indexing_pipeline/database_producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ defmodule DpulCollections.IndexingPipeline.DatabaseProducer do
records =
source_module.get_cache_entries_since!(last_queried_marker, total_demand, cache_version)

if last_queried_marker == nil && length(records) > 0 do
DpulCollections.IndexMetricsTracker.register_fresh_index(source_module)
end

new_state =
state
|> Map.put(
Expand All @@ -82,6 +86,7 @@ defmodule DpulCollections.IndexingPipeline.DatabaseProducer do

# Set a timer to try fulfilling demand again later
if new_state.stored_demand > 0 do
DpulCollections.IndexMetricsTracker.register_polling_started(source_module)
Process.send_after(self(), :check_for_updates, 50)
end

Expand Down Expand Up @@ -136,7 +141,12 @@ defmodule DpulCollections.IndexingPipeline.DatabaseProducer do
})
end

notify_ack(pending_markers |> length())
notify_ack(
pending_markers |> length(),
new_state.pulled_records |> length(),
state.source_module.processor_marker_key()
)

{:noreply, messages, new_state}
end

Expand Down Expand Up @@ -214,12 +224,16 @@ defmodule DpulCollections.IndexingPipeline.DatabaseProducer do

# This happens when ack is finished, we listen to this telemetry event in
# tests so we know when the Producer's done processing a message.
@spec notify_ack(integer()) :: any()
defp notify_ack(acked_message_count) do
@spec notify_ack(integer(), integer(), String.t()) :: any()
defp notify_ack(acked_message_count, unacked_count, processor_marker_key) do
:telemetry.execute(
[:database_producer, :ack, :done],
%{},
%{acked_count: acked_message_count}
%{
acked_count: acked_message_count,
unacked_count: unacked_count,
processor_marker_key: processor_marker_key
}
)
end

Expand Down
21 changes: 21 additions & 0 deletions lib/dpul_collections/indexing_pipeline/index_metric.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
defmodule DpulCollections.IndexingPipeline.IndexMetric do
use Ecto.Schema
import Ecto.Changeset

schema "index_metrics" do
field :type, :string
field :measurement_type, :string
# Duration in seconds
field :duration, :integer
field :records_acked, :integer, default: 0

timestamps(type: :utc_datetime_usec)
end

@doc false
def changeset(index_metric, attrs) do
index_metric
|> cast(attrs, [:type, :measurement_type, :duration, :records_acked])
|> validate_required([:type, :measurement_type, :duration, :records_acked])
end
end
29 changes: 29 additions & 0 deletions lib/dpul_collections/indexing_pipeline/metrics.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
defmodule DpulCollections.IndexingPipeline.Metrics do
import Ecto.Query, warn: false
alias DpulCollections.Repo
alias DpulCollections.IndexingPipeline.IndexMetric

@doc """
Creates an IndexMetric
"""
def create_index_metric(attrs \\ %{}) do
{:ok, index_metric} =
%IndexMetric{}
|> IndexMetric.changeset(attrs)
|> Repo.insert()

index_metric
end

@doc """
Get index metrics by type
"""
def index_metrics(type, measurement_type) do
query =
from r in IndexMetric,
where: r.type == ^type and r.measurement_type == ^measurement_type,
order_by: [desc: r.inserted_at]

Repo.all(query)
end
end
Loading