From 6aae7be53b9b393cb42044d4dcae2d9476f66f42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20G=C3=B6m=C3=B6ri?= Date: Fri, 1 Nov 2024 01:07:47 +0100 Subject: [PATCH] Add test for stream consumer max offset lag prometheus metric --- .../test/rabbit_prometheus_http_SUITE.erl | 61 ++++++++++++++++++- 1 file changed, 59 insertions(+), 2 deletions(-) diff --git a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl index cd66b0e226be..bea19a4d4e6c 100644 --- a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl +++ b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl @@ -11,8 +11,9 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("rabbitmq_ct_helpers/include/rabbit_mgmt_test.hrl"). +-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). --compile(export_all). +-compile([export_all, nowarn_export_all]). all() -> [ @@ -70,7 +71,8 @@ groups() -> queue_consumer_count_and_queue_metrics_mutually_exclusive_test, vhost_status_metric, exchange_bindings_metric, - exchange_names_metric + exchange_names_metric, + stream_pub_sub_metrics ]}, {special_chars, [], [core_metrics_special_chars]}, {authentication, [], [basic_auth]} @@ -739,6 +741,37 @@ exchange_names_metric(Config) -> }, Names), ok. +stream_pub_sub_metrics(Config) -> + Stream1 = atom_to_list(?FUNCTION_NAME) ++ "1", + MsgPerBatch1 = 2, + publish_via_stream_protocol(list_to_binary(Stream1), MsgPerBatch1, Config), + Stream2 = atom_to_list(?FUNCTION_NAME) ++ "2", + MsgPerBatch2 = 3, + publish_via_stream_protocol(list_to_binary(Stream2), MsgPerBatch2, Config), + + %% aggregated metrics + + %% wait for the stream to emit stats + %% (collect_statistics_interval set to 100ms in this test group) + ?awaitMatch(V when V == #{rabbitmq_stream_consumer_max_offset_lag => #{undefined => [3]}}, + begin + {_, Body1} = http_get_with_pal(Config, "/metrics", [], 200), + maps:with([rabbitmq_stream_consumer_max_offset_lag], + parse_response(Body1)) + end, + 100), + + %% per-object metrics + {_, Body2} = http_get_with_pal(Config, "/metrics/detailed?family=stream_consumer_metrics", + [], 200), + ParsedBody2 = parse_response(Body2), + #{rabbitmq_detailed_stream_consumer_max_offset_lag := MaxOffsetLag} = ParsedBody2, + + ?assertEqual([{#{vhost => "/", queue => Stream1}, [2]}, + {#{vhost => "/", queue => Stream2}, [3]}], + lists:sort(maps:to_list(MaxOffsetLag))), + ok. + core_metrics_special_chars(Config) -> {_, Body1} = http_get_with_pal(Config, "/metrics/detailed?family=queue_coarse_metrics", [], 200), ?assertMatch(#{rabbitmq_detailed_queue_messages := @@ -784,6 +817,30 @@ basic_auth(Config) -> rabbit_ct_broker_helpers:delete_user(Config, <<"monitor">>), rabbit_ct_broker_helpers:delete_user(Config, <<"management">>). +%% ------------------------------------------------------------------- +%% Helpers +%% ------------------------------------------------------------------- + +publish_via_stream_protocol(Stream, MsgPerBatch, Config) -> + {ok, S, C0} = stream_test_utils:connect(Config, 0), + {ok, C1} = stream_test_utils:create_stream(S, C0, Stream), + PublisherId = 98, + {ok, C2} = stream_test_utils:declare_publisher(S, C1, Stream, PublisherId), + Payloads = lists:duplicate(MsgPerBatch, <<"m1">>), + SequenceFrom1 = 1, + {ok, C3} = stream_test_utils:publish(S, C2, PublisherId, SequenceFrom1, Payloads), + + PublisherId2 = 99, + {ok, C4} = stream_test_utils:declare_publisher(S, C3, Stream, PublisherId2), + Payloads2 = lists:duplicate(MsgPerBatch, <<"m2">>), + SequenceFrom2 = SequenceFrom1 + MsgPerBatch, + {ok, C5} = stream_test_utils:publish(S, C4, PublisherId2, SequenceFrom2, Payloads2), + + SubscriptionId = 97, + {ok, C6} = stream_test_utils:subscribe(S, C5, Stream, SubscriptionId, _InitialCredit = 1), + %% delivery of first batch of messages + {{deliver, SubscriptionId, _Bin1}, _C7} = stream_test_utils:receive_stream_commands(S, C6), + ok. http_get(Config, ReqHeaders, CodeExp) -> Path = proplists:get_value(prometheus_path, Config, "/metrics"),