From 2f629e88d41459c3566ec861c1dc51bd4b176899 Mon Sep 17 00:00:00 2001 From: abc3 Date: Mon, 7 Aug 2023 21:27:07 +0200 Subject: [PATCH 1/5] feat: metric endpoint per tenant --- VERSION | 2 +- lib/supavisor.ex | 1 + lib/supavisor/application.ex | 1 + lib/supavisor/monitoring/prom_ex.ex | 25 ++++++++++++ lib/supavisor/syn_handler.ex | 1 + lib/supavisor/tenants_metrics.ex | 40 +++++++++++++++++++ .../controllers/metrics_controller.ex | 22 ++++++++++ lib/supavisor_web/router.ex | 1 + 8 files changed, 92 insertions(+), 1 deletion(-) create mode 100644 lib/supavisor/tenants_metrics.ex diff --git a/VERSION b/VERSION index 8cf7c24c..770cb090 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.2.25 +0.2.26 diff --git a/lib/supavisor.ex b/lib/supavisor.ex index ab7e2d04..bdabc020 100644 --- a/lib/supavisor.ex +++ b/lib/supavisor.ex @@ -107,6 +107,7 @@ defmodule Supavisor do @spec del_all_cache(String.t(), String.t()) :: map() def del_all_cache(tenant, user) do %{secrets: Cachex.del(Supavisor.Cache, {:secrets, tenant, user})} + %{metrics: Cachex.del(Supavisor.Cache, {:metrics, tenant})} end @spec get_local_pool(String.t(), String.t()) :: pid() | nil diff --git a/lib/supavisor/application.ex b/lib/supavisor/application.ex index d834821a..53627ea9 100644 --- a/lib/supavisor/application.ex +++ b/lib/supavisor/application.ex @@ -73,6 +73,7 @@ defmodule Supavisor.Application do }, Supavisor.Vault, {Cachex, name: Supavisor.Cache}, + Supavisor.TenantsMetrics, # Start the Endpoint (http/https) SupavisorWeb.Endpoint ] diff --git a/lib/supavisor/monitoring/prom_ex.ex b/lib/supavisor/monitoring/prom_ex.ex index 30c56c47..c6196ffb 100644 --- a/lib/supavisor/monitoring/prom_ex.ex +++ b/lib/supavisor/monitoring/prom_ex.ex @@ -82,6 +82,31 @@ defmodule Supavisor.Monitoring.PromEx do metrics end + @spec do_cache_tenants_metrics() :: :ok + def do_cache_tenants_metrics() do + metrics = get_metrics() + + Registry.select(Supavisor.Registry.TenantSups, [{{:"$1", :_, :_}, [], [:"$1"]}]) + |> Enum.uniq() + |> Enum.each(fn tenant -> + filtered = + metrics + |> String.split("\n") + |> Enum.filter(&String.contains?(&1, "tenant=\"#{tenant}\"")) + |> Enum.join("\n") + + Cachex.put(Supavisor.Cache, {:metrics, tenant}, filtered) + end) + end + + @spec get_tenant_metrics(String.t()) :: String.t() + def get_tenant_metrics(tenant) do + case Cachex.get(Supavisor.Cache, {:metrics, tenant}) do + {_, metrics} when is_binary(metrics) -> metrics + _ -> "" + end + end + @spec parse_and_add_tags(String.t(), String.t()) :: String.t() defp parse_and_add_tags(line, def_tags) do case Regex.run(~r/(?!\#)^(\w+)(?:{(.*?)})?\s*(.+)$/, line) do diff --git a/lib/supavisor/syn_handler.ex b/lib/supavisor/syn_handler.ex index 61ee5d33..ed5a0ff4 100644 --- a/lib/supavisor/syn_handler.ex +++ b/lib/supavisor/syn_handler.ex @@ -16,6 +16,7 @@ defmodule Supavisor.SynHandler do # remove all Prometheus metrics for the specified tenant PromEx.remove_metrics(tenant, user_alias) + Supavisor.del_all_cache(tenant, user_alias) end def resolve_registry_conflict( diff --git a/lib/supavisor/tenants_metrics.ex b/lib/supavisor/tenants_metrics.ex new file mode 100644 index 00000000..48de4702 --- /dev/null +++ b/lib/supavisor/tenants_metrics.ex @@ -0,0 +1,40 @@ +defmodule Supavisor.TenantsMetrics do + @moduledoc false + use GenServer, restart: :transient + require Logger + + alias Supavisor.Monitoring.PromEx + + @check_timeout 10_000 + + def start_link(args) do + GenServer.start_link(__MODULE__, args, name: __MODULE__) + end + + ## Callbacks + + @impl true + def init(_args) do + send(self(), :check_metrics) + {:ok, %{check_ref: make_ref()}} + end + + @impl true + def handle_info(:check_metrics, state) do + Process.cancel_timer(state.check_ref) + + PromEx.do_cache_tenants_metrics() + + {:noreply, %{state | check_ref: check_metrics()}} + end + + ## Internal functions + + defp check_metrics() do + Process.send_after( + self(), + :check_metrics, + @check_timeout + ) + end +end diff --git a/lib/supavisor_web/controllers/metrics_controller.ex b/lib/supavisor_web/controllers/metrics_controller.ex index a049413f..47b0c9ba 100644 --- a/lib/supavisor_web/controllers/metrics_controller.ex +++ b/lib/supavisor_web/controllers/metrics_controller.ex @@ -17,6 +17,16 @@ defmodule SupavisorWeb.MetricsController do |> send_resp(200, cluster_metrics) end + def tenant(conn, %{"external_id" => ext_id}) do + cluster_metrics = fetch_cluster_metrics(ext_id) + + code = if cluster_metrics == "", do: 404, else: 200 + + conn + |> put_resp_content_type("text/plain") + |> send_resp(code, cluster_metrics) + end + @spec fetch_cluster_metrics() :: String.t() defp fetch_cluster_metrics() do Node.list() @@ -29,6 +39,18 @@ defmodule SupavisorWeb.MetricsController do {node, :rpc.call(node, PromEx, :get_metrics, [], 10_000)} end + @spec fetch_cluster_metrics(String.t()) :: String.t() + defp fetch_cluster_metrics(tenant) do + Node.list() + |> Task.async_stream(&fetch_node_metrics(&1, tenant), timeout: :infinity) + |> Enum.reduce(PromEx.get_tenant_metrics(tenant), &merge_node_metrics/2) + end + + @spec fetch_node_metrics(atom(), String.t()) :: {atom(), term()} + defp fetch_node_metrics(node, tenant) do + {node, :rpc.call(node, PromEx, :get_tenant_metrics, [tenant], 10_000)} + end + defp merge_node_metrics({_, {node, {:badrpc, reason}}}, acc) do Logger.error("Cannot fetch metrics from the node #{inspect(node)} because #{inspect(reason)}") acc diff --git a/lib/supavisor_web/router.ex b/lib/supavisor_web/router.ex index f0f93ab5..f82124e2 100644 --- a/lib/supavisor_web/router.ex +++ b/lib/supavisor_web/router.ex @@ -56,6 +56,7 @@ defmodule SupavisorWeb.Router do pipe_through(:metrics) get("/", MetricsController, :index) + get("/:external_id", MetricsController, :tenant) end # Other scopes may use custom stacks. From 80966f22c030e538f4942078598ba597f858d8f3 Mon Sep 17 00:00:00 2001 From: abc3 Date: Mon, 7 Aug 2023 21:37:57 +0200 Subject: [PATCH 2/5] update do_cache_tenants_metrics --- lib/supavisor/monitoring/prom_ex.ex | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/supavisor/monitoring/prom_ex.ex b/lib/supavisor/monitoring/prom_ex.ex index c6196ffb..7ea8718d 100644 --- a/lib/supavisor/monitoring/prom_ex.ex +++ b/lib/supavisor/monitoring/prom_ex.ex @@ -84,14 +84,13 @@ defmodule Supavisor.Monitoring.PromEx do @spec do_cache_tenants_metrics() :: :ok def do_cache_tenants_metrics() do - metrics = get_metrics() + metrics = get_metrics() |> String.split("\n") Registry.select(Supavisor.Registry.TenantSups, [{{:"$1", :_, :_}, [], [:"$1"]}]) |> Enum.uniq() |> Enum.each(fn tenant -> filtered = metrics - |> String.split("\n") |> Enum.filter(&String.contains?(&1, "tenant=\"#{tenant}\"")) |> Enum.join("\n") From c74c3207d998aa6fe62fb45859bdeb3a31d77242 Mon Sep 17 00:00:00 2001 From: abc3 Date: Mon, 7 Aug 2023 21:51:14 +0200 Subject: [PATCH 3/5] update do_cache_tenants_metrics --- lib/supavisor/monitoring/prom_ex.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/supavisor/monitoring/prom_ex.ex b/lib/supavisor/monitoring/prom_ex.ex index 7ea8718d..2ba8ffd5 100644 --- a/lib/supavisor/monitoring/prom_ex.ex +++ b/lib/supavisor/monitoring/prom_ex.ex @@ -91,7 +91,7 @@ defmodule Supavisor.Monitoring.PromEx do |> Enum.each(fn tenant -> filtered = metrics - |> Enum.filter(&String.contains?(&1, "tenant=\"#{tenant}\"")) + |> Stream.filter(&String.contains?(&1, "tenant=\"#{tenant}\"")) |> Enum.join("\n") Cachex.put(Supavisor.Cache, {:metrics, tenant}, filtered) From 452c6e41c5fb7f8de89184c3234c21d7c4a47d2d Mon Sep 17 00:00:00 2001 From: abc3 Date: Wed, 9 Aug 2023 16:02:18 +0200 Subject: [PATCH 4/5] update do_cache_tenants_metrics --- lib/supavisor/monitoring/prom_ex.ex | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/lib/supavisor/monitoring/prom_ex.ex b/lib/supavisor/monitoring/prom_ex.ex index 2ba8ffd5..64c7ce69 100644 --- a/lib/supavisor/monitoring/prom_ex.ex +++ b/lib/supavisor/monitoring/prom_ex.ex @@ -88,13 +88,14 @@ defmodule Supavisor.Monitoring.PromEx do Registry.select(Supavisor.Registry.TenantSups, [{{:"$1", :_, :_}, [], [:"$1"]}]) |> Enum.uniq() - |> Enum.each(fn tenant -> - filtered = - metrics - |> Stream.filter(&String.contains?(&1, "tenant=\"#{tenant}\"")) - |> Enum.join("\n") + |> Enum.reduce(metrics, fn tenant, acc -> + {matched, rest} = Enum.split_with(acc, &String.contains?(&1, "tenant=\"#{tenant}\"")) - Cachex.put(Supavisor.Cache, {:metrics, tenant}, filtered) + if matched != [] do + Cachex.put(Supavisor.Cache, {:metrics, tenant}, Enum.join(matched, "\n")) + end + + rest end) end From e00b25a107b98a127fb7b4fb79186a95dd0da363 Mon Sep 17 00:00:00 2001 From: abc3 Date: Wed, 9 Aug 2023 17:12:39 +0200 Subject: [PATCH 5/5] remove a debug message --- lib/supavisor/monitoring/prom_ex.ex | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/supavisor/monitoring/prom_ex.ex b/lib/supavisor/monitoring/prom_ex.ex index 64c7ce69..28ecde0c 100644 --- a/lib/supavisor/monitoring/prom_ex.ex +++ b/lib/supavisor/monitoring/prom_ex.ex @@ -69,8 +69,6 @@ defmodule Supavisor.Monitoring.PromEx do Application.fetch_env!(:supavisor, :metrics_tags) |> Enum.map_join(",", fn {k, v} -> "#{k}=\"#{v}\"" end) - Logger.debug("Default prom tags: #{def_tags}") - metrics = PromEx.get_metrics(__MODULE__) |> String.split("\n")