Skip to content

Commit

Permalink
initial commit. example of exposing stream consumer metric to prometh…
Browse files Browse the repository at this point in the history
…eus.
  • Loading branch information
markus812498 committed Nov 15, 2023
1 parent 9eecc15 commit cd42069
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,12 @@
{4, undefined, connection_process_reductions_total, counter, "Total number of connection process reductions"}
]},

{stream_consumer_metrics, [
{2, undefined, consumed, counter, "Total number of messages consumed on connection", consumed},
{2, undefined, offset, counter, "Total offset of consuming connection", offset},
{2, undefined, offset_lag, counter, "Total offset lag of consuming connection", offset_lag}
]},

{connection_metrics, [
{2, undefined, connection_incoming_packets_total, counter, "Total number of packets received on a connection", recv_cnt},
{2, undefined, connection_outgoing_packets_total, counter, "Total number of packets sent on a connection", send_cnt},
Expand Down Expand Up @@ -497,6 +503,15 @@ get_data(connection_metrics = Table, false, _, _) ->
sum(proplists:get_value(channels, Props), A4)}
end, empty(Table), Table),
[{Table, [{recv_cnt, A1}, {send_cnt, A2}, {send_pend, A3}, {channels, A4}]}];
get_data(stream_consumer_metrics = Table, false, _, _) ->
{Table, A1, A2, A3, A4} = ets:foldl(fun({_, Props}, {T, A1, A2, A3, A4}) ->
{T,
sum(proplists:get_value(credits, Props), A1),
sum(proplists:get_value(consumed, Props), A2),
sum(proplists:get_value(offset, Props), A3),
sum(proplists:get_value(offset_lag, Props), A4)}
end, empty(Table), Table),
[{Table, [{credits, A1}, {consumed, A2}, {offset, A3}, {offset_lag, A4}]}];
get_data(channel_metrics = Table, false, _, _) ->
{Table, A1, A2, A3, A4, A5, A6, A7} =
ets:foldl(fun({_, Props}, {T, A1, A2, A3, A4, A5, A6, A7}) ->
Expand Down
1 change: 1 addition & 0 deletions deps/rabbitmq_stream/include/rabbit_stream_metrics.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

-define(TABLE_CONSUMER, rabbit_stream_consumer_created).
-define(TABLE_PUBLISHER, rabbit_stream_publisher_created).
-define(TABLE_CONSUMER_METRIC, stream_consumer_metrics).

-define(STREAM_DOES_NOT_EXIST, ?NUM_PROTOCOL_COUNTERS + 1).
-define(SUBSCRIPTION_ID_ALREADY_EXISTS, ?NUM_PROTOCOL_COUNTERS + 2).
Expand Down
7 changes: 7 additions & 0 deletions deps/rabbitmq_stream/src/rabbit_stream_metrics.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
init() ->
_ = rabbit_core_metrics:create_table({?TABLE_CONSUMER, set}),
_ = rabbit_core_metrics:create_table({?TABLE_PUBLISHER, set}),
_ = rabbit_core_metrics:create_table({?TABLE_CONSUMER_METRIC, set}),
ok.

consumer_created(Connection,
Expand All @@ -54,6 +55,8 @@ consumer_created(Connection,
{properties, Properties}],
ets:insert(?TABLE_CONSUMER,
{{StreamResource, Connection, SubscriptionId}, Values}),
ets:insert(?TABLE_CONSUMER_METRIC,
{Connection, Values}),
rabbit_global_counters:consumer_created(stream),
rabbit_core_metrics:consumer_created(Connection,
consumer_tag(SubscriptionId),
Expand Down Expand Up @@ -91,6 +94,8 @@ consumer_updated(Connection,
{properties, Properties}],
ets:insert(?TABLE_CONSUMER,
{{StreamResource, Connection, SubscriptionId}, Values}),
ets:insert(?TABLE_CONSUMER_METRIC,
{Connection, Values}),
rabbit_core_metrics:consumer_updated(Connection,
consumer_tag(SubscriptionId),
false,
Expand All @@ -107,6 +112,8 @@ consumer_updated(Connection,
consumer_cancelled(Connection, StreamResource, SubscriptionId) ->
ets:delete(?TABLE_CONSUMER,
{StreamResource, Connection, SubscriptionId}),
ets:delete(?TABLE_CONSUMER_METRIC,
{{Connection}}),
rabbit_global_counters:consumer_deleted(stream),
rabbit_core_metrics:consumer_deleted(Connection,
consumer_tag(SubscriptionId),
Expand Down

0 comments on commit cd42069

Please sign in to comment.