From 15f93f338c374e854c1836dd3fa1433678dc5426 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Fri, 27 Oct 2023 16:56:31 +0200 Subject: [PATCH 01/10] Add super stream creation/deletion commands in spec --- deps/rabbitmq_stream/docs/PROTOCOL.adoc | 35 +++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/deps/rabbitmq_stream/docs/PROTOCOL.adoc b/deps/rabbitmq_stream/docs/PROTOCOL.adoc index 581aad3270a9..6fb87769ed6d 100644 --- a/deps/rabbitmq_stream/docs/PROTOCOL.adoc +++ b/deps/rabbitmq_stream/docs/PROTOCOL.adoc @@ -230,6 +230,16 @@ used to make the difference between a request (0) and a response (1). Example fo |0x001c |Yes +|<> +|Client +|0x001d +|Yes + +|<> +|Client +|0x001e +|Yes + |=== === DeclarePublisher @@ -754,6 +764,31 @@ StreamStatsResponse => Key Version CorrelationId ResponseCode Stats Value => int64 ``` +=== CreateSuperStream + +``` +CreateSuperStream => Key Version CorrelationId Name [Partition] [RoutingKey] Arguments + Key => uint16 // 0x001d + Version => uint16 + CorrelationId => uint32 + Name => string + Partition => string + RoutingKey => string + Arguments => [Argument] + Argument => Key Value + Key => string + Value => string +``` + +=== DeleteSuperStream + +``` +Delete => Key Version CorrelationId Name + Key => uint16 // 0x001e + Version => uint16 + CorrelationId => uint32 + Name => string +``` == Authentication From 388608143dff48e0e6d2798860860aa48dae080d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 7 Nov 2023 16:56:18 +0100 Subject: [PATCH 02/10] Add super stream add/delete protocol commands --- .../src/rabbit_stream_reader.erl | 228 ++++++++++++++---- .../src/rabbit_stream_utils.erl | 71 +++++- deps/rabbitmq_stream/test/commands_SUITE.erl | 14 +- .../include/rabbit_stream.hrl | 2 + .../src/rabbit_stream_core.erl | 66 ++++- .../test/rabbit_stream_core_SUITE.erl | 11 + 6 files changed, 321 insertions(+), 71 deletions(-) diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 744e6e2501a9..b3258d186f32 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -179,10 +179,6 @@ open/3, close_sent/3]). - %% not called by gen_statem since gen_statem:enter_loop/4 is used - - %% states - callback_mode() -> [state_functions, state_enter]. @@ -1747,7 +1743,7 @@ handle_frame_post_auth(Transport, {declare_publisher, PublisherId, WriterRef, Stream}}) -> case rabbit_stream_utils:check_write_permitted(stream_r(Stream, Connection0), - User, #{}) + User) of ok -> case {maps:is_key(PublisherId, Publishers0), @@ -1876,7 +1872,6 @@ handle_frame_post_auth(Transport, handle_frame_post_auth(Transport, #stream_connection{socket = S, credits = Credits, - virtual_host = VirtualHost, user = User, publishers = Publishers} = Connection, @@ -1890,15 +1885,8 @@ handle_frame_post_auth(Transport, message_counters = Counters} = Publisher, increase_messages_received(Counters, MessageCount), - case rabbit_stream_utils:check_write_permitted(#resource{name = - Stream, - kind = - queue, - virtual_host - = - VirtualHost}, - User, #{}) - of + case rabbit_stream_utils:check_write_permitted(stream_r(Stream, Connection), + User) of ok -> rabbit_stream_utils:write_messages(Version, Leader, Reference, @@ -2294,18 +2282,11 @@ handle_frame_post_auth(Transport, {Connection, State} end; handle_frame_post_auth(_Transport, - #stream_connection{virtual_host = VirtualHost, - user = User} = - Connection, + #stream_connection{user = User} = Connection, State, {store_offset, Reference, Stream, Offset}) -> - case rabbit_stream_utils:check_write_permitted(#resource{name = - Stream, - kind = queue, - virtual_host = - VirtualHost}, - User, #{}) - of + case rabbit_stream_utils:check_write_permitted(stream_r(Stream, Connection), + User) of ok -> case lookup_leader(Stream, Connection) of {error, Error} -> @@ -2398,24 +2379,13 @@ handle_frame_post_auth(Transport, end; handle_frame_post_auth(Transport, #stream_connection{virtual_host = VirtualHost, - user = - #user{username = Username} = - User} = - Connection, + user = #user{username = Username} = User} = Connection, State, {request, CorrelationId, {create_stream, Stream, Arguments}}) -> case rabbit_stream_utils:enforce_correct_name(Stream) of {ok, StreamName} -> - case rabbit_stream_utils:check_configure_permitted(#resource{name = - StreamName, - kind = - queue, - virtual_host - = - VirtualHost}, - User, #{}) - of + case rabbit_stream_utils:check_configure_permitted(stream_r(StreamName, Connection), User) of ok -> case rabbit_stream_manager:create(VirtualHost, StreamName, @@ -2489,19 +2459,10 @@ handle_frame_post_auth(Transport, handle_frame_post_auth(Transport, #stream_connection{socket = S, virtual_host = VirtualHost, - user = - #user{username = Username} = - User} = - Connection, + user = #user{username = Username} = User} = Connection, State, {request, CorrelationId, {delete_stream, Stream}}) -> - case rabbit_stream_utils:check_configure_permitted(#resource{name = - Stream, - kind = queue, - virtual_host = - VirtualHost}, - User, #{}) - of + case rabbit_stream_utils:check_configure_permitted(stream_r(Stream, Connection), User) of ok -> case rabbit_stream_manager:delete(VirtualHost, Stream, Username) of {ok, deleted} -> @@ -2917,6 +2878,154 @@ handle_frame_post_auth(Transport, Frame = rabbit_stream_core:frame({response, CorrelationId, Response}), send(Transport, S, Frame), {Connection, State}; +handle_frame_post_auth(Transport, + #stream_connection{virtual_host = VirtualHost, + user = #user{username = Username} = User} = Connection, + State, + {request, CorrelationId, + {create_super_stream, SuperStream, Partitions, RoutingKeys, Arguments}}) -> + case rabbit_stream_utils:enforce_correct_name(SuperStream) of + {ok, SuperStreamName} -> + case rabbit_stream_utils:check_super_stream_management_permitted(VirtualHost, + SuperStreamName, + Partitions, + User) of + ok -> + case rabbit_stream_manager:create_super_stream(VirtualHost, + SuperStreamName, + Partitions, + Arguments, + RoutingKeys, + Username) of + ok -> + rabbit_log:debug("Created super stream ~tp", [SuperStreamName]), + response_ok(Transport, + Connection, + create_super_stream, + CorrelationId), + {Connection, State}; + {error, {validation_failed, Msg}} -> + rabbit_log:warning("Error while trying to create super stream ~tp: ~tp", + [SuperStreamName, Msg]), + response(Transport, + Connection, + create_super_stream, + CorrelationId, + ?RESPONSE_CODE_PRECONDITION_FAILED), + rabbit_global_counters:increase_protocol_counter(stream, + ?PRECONDITION_FAILED, + 1), + {Connection, State}; + {error, {reference_already_exists, Msg}} -> + rabbit_log:warning("Error while trying to create super stream ~tp: ~tp", + [SuperStreamName, Msg]), + response(Transport, + Connection, + create_super_stream, + CorrelationId, + ?RESPONSE_CODE_STREAM_ALREADY_EXISTS), + rabbit_global_counters:increase_protocol_counter(stream, + ?STREAM_ALREADY_EXISTS, + 1), + {Connection, State}; + {error, Error} -> + rabbit_log:warning("Error while trying to create super stream ~tp: ~tp", + [SuperStreamName, Error]), + response(Transport, + Connection, + create_super_stream, + CorrelationId, + ?RESPONSE_CODE_INTERNAL_ERROR), + rabbit_global_counters:increase_protocol_counter(stream, + ?INTERNAL_ERROR, + 1), + {Connection, State} + end; + error -> + response(Transport, + Connection, + create_super_stream, + CorrelationId, + ?RESPONSE_CODE_ACCESS_REFUSED), + rabbit_global_counters:increase_protocol_counter(stream, + ?ACCESS_REFUSED, + 1), + {Connection, State} + end; + _ -> + response(Transport, + Connection, + create_super_stream, + CorrelationId, + ?RESPONSE_CODE_PRECONDITION_FAILED), + rabbit_global_counters:increase_protocol_counter(stream, + ?PRECONDITION_FAILED, + 1), + {Connection, State} + end; +handle_frame_post_auth(Transport, + #stream_connection{socket = S, + virtual_host = VirtualHost, + user = #user{username = Username} = User} = Connection, + State, + {request, CorrelationId, {delete_super_stream, SuperStream}}) -> + Partitions = case rabbit_stream_manager:partitions(VirtualHost, SuperStream) of + {ok, Ps} -> + Ps; + _ -> + [] + end, + case rabbit_stream_utils:check_super_stream_management_permitted(VirtualHost, + SuperStream, + Partitions, + User) of + ok -> + case rabbit_stream_manager:delete_super_stream(VirtualHost, SuperStream, Username) of + ok -> + response_ok(Transport, + Connection, + delete_super_stream, + CorrelationId), + {Connection1, State1} = clean_state_after_super_stream_deletion(Partitions, + Connection, + State, + Transport, S), + {Connection1, State1}; + {error, stream_not_found} -> + response(Transport, + Connection, + delete_super_stream, + CorrelationId, + ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST), + rabbit_global_counters:increase_protocol_counter(stream, + ?STREAM_DOES_NOT_EXIST, + 1), + {Connection, State}; + {error, Error} -> + rabbit_log:warning("Error while trying to delete super stream ~tp: ~tp", + [SuperStream, Error]), + response(Transport, + Connection, + delete_super_stream, + CorrelationId, + ?RESPONSE_CODE_PRECONDITION_FAILED), + rabbit_global_counters:increase_protocol_counter(stream, + ?PRECONDITION_FAILED, + 1), + {Connection, State} + + end; + error -> + response(Transport, + Connection, + delete_stream, + CorrelationId, + ?RESPONSE_CODE_ACCESS_REFUSED), + rabbit_global_counters:increase_protocol_counter(stream, + ?ACCESS_REFUSED, + 1), + {Connection, State} + end; handle_frame_post_auth(Transport, #stream_connection{socket = S} = Connection, State, @@ -3248,6 +3357,27 @@ stream_r(Stream, #stream_connection{virtual_host = VHost}) -> kind = queue, virtual_host = VHost}. +clean_state_after_super_stream_deletion(Partitions, Connection, State, Transport, S) -> + lists:foldl(fun(Partition, {Conn, St}) -> + case + clean_state_after_stream_deletion_or_failure(undefined, Partition, + Conn, + St) + of + {cleaned, NewConnection, NewState} -> + Command = {metadata_update, Partition, + ?RESPONSE_CODE_STREAM_NOT_AVAILABLE}, + Frame = rabbit_stream_core:frame(Command), + send(Transport, S, Frame), + rabbit_global_counters:increase_protocol_counter(stream, + ?STREAM_NOT_AVAILABLE, + 1), + {NewConnection, NewState}; + {not_cleaned, SameConnection, SameState} -> + {SameConnection, SameState} + end + end, {Connection, State}, Partitions). + clean_state_after_stream_deletion_or_failure(MemberPid, Stream, #stream_connection{virtual_host = VirtualHost, diff --git a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl index 74fb467e23bf..3b7f1c654bb1 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl @@ -22,8 +22,8 @@ parse_map/2, auth_mechanisms/1, auth_mechanism_to_module/2, - check_configure_permitted/3, - check_write_permitted/3, + check_configure_permitted/2, + check_write_permitted/2, check_read_permitted/3, extract_stream_list/2, sort_partitions/1, @@ -32,7 +32,8 @@ filter_defined/1, filter_spec/1, command_versions/0, - filtering_supported/0]). + filtering_supported/0, + check_super_stream_management_permitted/4]). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -190,15 +191,62 @@ check_resource_access(User, Resource, Perm, Context) -> end end. -check_configure_permitted(Resource, User, Context) -> - check_resource_access(User, Resource, configure, Context). +check_configure_permitted(Resource, User) -> + check_resource_access(User, Resource, configure, #{}). -check_write_permitted(Resource, User, Context) -> - check_resource_access(User, Resource, write, Context). +check_write_permitted(Resource, User) -> + check_resource_access(User, Resource, write, #{}). check_read_permitted(Resource, User, Context) -> check_resource_access(User, Resource, read, Context). +check_super_stream_management_permitted(VirtualHost, SuperStream, Partitions, User) -> + Exchange = e(VirtualHost, SuperStream), + %% exchange creation + case check_configure_permitted(Exchange, User) of + ok -> + %% stream creations + case check_streams_permissions(fun check_configure_permitted/2, + VirtualHost, Partitions, + User) of + ok -> + %% binding from exchange + case check_read_permitted(Exchange, User, #{}) of + ok -> + %% binding to streams + case check_streams_permissions(fun check_write_permitted/2, + VirtualHost, Partitions, + User) of + ok -> + ok; + _ -> + error + end; + _ -> + error + end; + _ -> + error + end; + _ -> + error + end. + +check_streams_permissions(Fun, VirtualHost, List, User) -> + case lists:all(fun(S) -> + case Fun(q(VirtualHost, S), User) of + ok -> + true; + _ -> + false + end + end, List) of + true -> + ok; + _ -> + error + end. + extract_stream_list(<<>>, Streams) -> Streams; extract_stream_list(<>, @@ -295,3 +343,12 @@ command_versions() -> filtering_supported() -> rabbit_feature_flags:is_enabled(stream_filtering). + +q(VirtualHost, Name) -> + r(VirtualHost, Name, queue). + +e(VirtualHost, Name) -> + r(VirtualHost, Name, exchange). + +r(VirtualHost, Name, Kind) when Kind =:= exchange orelse Kind =:= queue -> + #resource{virtual_host = VirtualHost, name = Name, kind = Kind}. diff --git a/deps/rabbitmq_stream/test/commands_SUITE.erl b/deps/rabbitmq_stream/test/commands_SUITE.erl index fce0c170b61d..17cf5dbc5890 100644 --- a/deps/rabbitmq_stream/test/commands_SUITE.erl +++ b/deps/rabbitmq_stream/test/commands_SUITE.erl @@ -25,7 +25,7 @@ 'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamPublishersCommand'). -define(COMMAND_ADD_SUPER_STREAM, 'Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand'). --define(COMMAND_DELETE_SUPER_STREAM, +-define(COMMAND_DELETE_SUPER_STREAM_CLI, 'Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteSuperStreamCommand'). -define(COMMAND_LIST_CONSUMER_GROUPS, 'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConsumerGroupsCommand'). @@ -719,15 +719,15 @@ add_super_stream_validate(_Config) -> delete_super_stream_merge_defaults(_Config) -> ?assertMatch({[<<"super-stream">>], #{vhost := <<"/">>}}, - ?COMMAND_DELETE_SUPER_STREAM:merge_defaults([<<"super-stream">>], + ?COMMAND_DELETE_SUPER_STREAM_CLI:merge_defaults([<<"super-stream">>], #{})), ok. delete_super_stream_validate(_Config) -> ?assertMatch({validation_failure, not_enough_args}, - ?COMMAND_DELETE_SUPER_STREAM:validate([], #{})), + ?COMMAND_DELETE_SUPER_STREAM_CLI:validate([], #{})), ?assertMatch({validation_failure, too_many_args}, - ?COMMAND_DELETE_SUPER_STREAM:validate([<<"a">>, <<"b">>], + ?COMMAND_DELETE_SUPER_STREAM_CLI:validate([<<"a">>, <<"b">>], #{})), ?assertEqual(ok, ?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>], #{})), ok. @@ -748,7 +748,7 @@ add_delete_super_stream_run(Config) -> [<<"invoices-0">>, <<"invoices-1">>, <<"invoices-2">>]}, partitions(Config, <<"invoices">>)), ?assertMatch({ok, _}, - ?COMMAND_DELETE_SUPER_STREAM:run([<<"invoices">>], Opts)), + ?COMMAND_DELETE_SUPER_STREAM_CLI:run([<<"invoices">>], Opts)), ?assertEqual({error, stream_not_found}, partitions(Config, <<"invoices">>)), @@ -763,7 +763,7 @@ add_delete_super_stream_run(Config) -> <<"invoices-apac">>]}, partitions(Config, <<"invoices">>)), ?assertMatch({ok, _}, - ?COMMAND_DELETE_SUPER_STREAM:run([<<"invoices">>], Opts)), + ?COMMAND_DELETE_SUPER_STREAM_CLI:run([<<"invoices">>], Opts)), ?assertEqual({error, stream_not_found}, partitions(Config, <<"invoices">>)), @@ -797,7 +797,7 @@ add_delete_super_stream_run(Config) -> rabbit_misc:table_lookup(Args, <<"x-queue-type">>)), ?assertMatch({ok, _}, - ?COMMAND_DELETE_SUPER_STREAM:run([<<"invoices">>], Opts)), + ?COMMAND_DELETE_SUPER_STREAM_CLI:run([<<"invoices">>], Opts)), ok. diff --git a/deps/rabbitmq_stream_common/include/rabbit_stream.hrl b/deps/rabbitmq_stream_common/include/rabbit_stream.hrl index 82c173be0e6d..d465d551e3cc 100644 --- a/deps/rabbitmq_stream_common/include/rabbit_stream.hrl +++ b/deps/rabbitmq_stream_common/include/rabbit_stream.hrl @@ -26,6 +26,8 @@ -define(COMMAND_CONSUMER_UPDATE, 26). -define(COMMAND_EXCHANGE_COMMAND_VERSIONS, 27). -define(COMMAND_STREAM_STATS, 28). +-define(COMMAND_CREATE_SUPER_STREAM, 29). +-define(COMMAND_DELETE_SUPER_STREAM, 30). -define(REQUEST, 0). -define(RESPONSE, 1). diff --git a/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl b/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl index e7343ff1a99f..db9814b3db3a 100644 --- a/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl +++ b/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl @@ -115,7 +115,9 @@ {exchange_command_versions, [{Command :: atom(), MinVersion :: command_version(), MaxVersion :: command_version()}]} | - {stream_stats, Stream :: binary()}} | + {stream_stats, Stream :: binary()}, + {create_super_stream, stream_name(), Partitions :: [binary()], RoutingKeys :: [binary()], Args :: #{binary() => binary()}} | + {delete_super_stream, stream_name()}} | {response, correlation_id(), {declare_publisher | delete_publisher | @@ -124,7 +126,9 @@ create_stream | delete_stream | close | - sasl_authenticate, + sasl_authenticate | + create_super_stream, + delete_super_stream, response_code()} | {query_publisher_sequence, response_code(), sequence()} | {open, response_code(), #{binary() => binary()}} | @@ -561,9 +565,7 @@ request_body({create_stream = Tag, Stream, Args}) -> request_body({delete_stream = Tag, Stream}) -> {Tag, <>}; request_body({metadata = Tag, Streams}) -> - StreamsBin = - lists:foldr(fun(Stream, Acc) -> [<> | Acc] end, [], - Streams), + StreamsBin = generate_list(Streams), {Tag, [<<(length(Streams)):32>>, StreamsBin]}; request_body({peer_properties = Tag, Props}) -> PropsBin = generate_map(Props), @@ -605,7 +607,16 @@ request_body({exchange_command_versions = Tag, CommandVersions}) -> CommandVersionsLength = length(CommandVersions), {Tag, [<>, CommandVersionsBin]}; request_body({stream_stats = Tag, Stream}) -> - {Tag, <>}. + {Tag, <>}; +request_body({create_super_stream = Tag, SuperStream, Partitions, RoutingKeys, Args}) -> + PartitionsBin = generate_list(Partitions), + RoutingKeysBin = generate_list(RoutingKeys), + ArgsBin = generate_map(Args), + {Tag, [<>, PartitionsBin, + <<(length(RoutingKeys)):32>>, RoutingKeysBin, + <<(map_size(Args)):32>>, ArgsBin]}; +request_body({delete_super_stream = Tag, SuperStream}) -> + {Tag, <>}. append_data(Prev, Data) when is_binary(Prev) -> [Prev, Data]; @@ -885,6 +896,23 @@ parse_request(<>) -> request(CorrelationId, {stream_stats, Stream}); +parse_request(<>) -> + {Partitions, <>} = list_of_strings(PartitionsCount, Rest0), + {RoutingKeys, <<_ArgumentsCount:32, Rest2/binary>>} = list_of_strings(RoutingKeysCount, Rest1), + Args = parse_map(Rest2, #{}), + request(CorrelationId, {create_super_stream, Stream, Partitions, RoutingKeys, Args}); +parse_request(<>) -> + request(CorrelationId, {delete_super_stream, SuperStream}); parse_request(Bin) -> {unknown, Bin}. @@ -1052,10 +1080,24 @@ parse_int_map(<<>>, Acc) -> parse_int_map(<>, Acc) -> parse_int_map(Rem, Acc#{Key => Value}). +generate_list(List) -> + lists:foldr(fun(E, Acc) -> [<> | Acc] end, [], + List). + generate_map(Map) -> maps:fold(fun(K, V, Acc) -> [<> | Acc] end, [], Map). +list_of_strings(Count, Bin) -> + list_of_strings(Count, [], Bin). + +list_of_strings(_, Acc, <<>>) -> + {lists:reverse(Acc), <<>>}; +list_of_strings(0, Acc, Rest) -> + {lists:reverse(Acc), Rest}; +list_of_strings(Count, Acc, <>) -> + list_of_strings(Count - 1, [String | Acc], Rem). + list_of_strings(<<>>) -> []; list_of_strings(<>) -> @@ -1135,7 +1177,11 @@ command_id(consumer_update) -> command_id(exchange_command_versions) -> ?COMMAND_EXCHANGE_COMMAND_VERSIONS; command_id(stream_stats) -> - ?COMMAND_STREAM_STATS. + ?COMMAND_STREAM_STATS; +command_id(create_super_stream) -> + ?COMMAND_CREATE_SUPER_STREAM; +command_id(delete_super_stream) -> + ?COMMAND_DELETE_SUPER_STREAM. parse_command_id(?COMMAND_DECLARE_PUBLISHER) -> declare_publisher; @@ -1192,7 +1238,11 @@ parse_command_id(?COMMAND_CONSUMER_UPDATE) -> parse_command_id(?COMMAND_EXCHANGE_COMMAND_VERSIONS) -> exchange_command_versions; parse_command_id(?COMMAND_STREAM_STATS) -> - stream_stats. + stream_stats; +parse_command_id(?COMMAND_CREATE_SUPER_STREAM) -> + create_super_stream; +parse_command_id(?COMMAND_DELETE_SUPER_STREAM) -> + delete_super_stream. element_index(Element, List) -> element_index(Element, List, 0). diff --git a/deps/rabbitmq_stream_common/test/rabbit_stream_core_SUITE.erl b/deps/rabbitmq_stream_common/test/rabbit_stream_core_SUITE.erl index 4934b02affa8..2d5f0e07ef90 100644 --- a/deps/rabbitmq_stream_common/test/rabbit_stream_core_SUITE.erl +++ b/deps/rabbitmq_stream_common/test/rabbit_stream_core_SUITE.erl @@ -110,6 +110,15 @@ roundtrip(_Config) -> {exchange_command_versions, [{deliver, ?VERSION_1, ?VERSION_1}]}}), test_roundtrip({request, 99, {stream_stats, <<"stream_name">>}}), + test_roundtrip({request, 99, + {create_super_stream, <<"hello">>, + [<<"stream1">>, <<"stream2">>, <<"stream3">>], [<<"rk1">>, <<"rk2">>, <<"rk3">>], + Args}}), + test_roundtrip({request, 99, + {create_super_stream, <<"super_stream_name">>, + [<<"stream1">>, <<"stream2">>, <<"stream3">>], [<<"rk1">>, <<"rk2">>, <<"rk3">>], + #{}}}), + test_roundtrip({request, 99, {delete_super_stream, <<"super_stream_name">>}}), %% RESPONSES [test_roundtrip({response, 99, {Tag, 53}}) @@ -120,6 +129,8 @@ roundtrip(_Config) -> unsubscribe, create_stream, delete_stream, + create_super_stream, + delete_super_stream, open, close]], From d04ccdb0ce06b6975f9cd0814e3817db92b1a2e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Wed, 8 Nov 2023 14:35:33 +0100 Subject: [PATCH 03/10] Return correct frame name --- deps/rabbitmq_stream/src/rabbit_stream_manager.erl | 4 ++-- deps/rabbitmq_stream/src/rabbit_stream_reader.erl | 2 +- deps/rabbitmq_stream/src/rabbit_stream_utils.erl | 4 +++- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index 3abd74cc05a3..70068cb099bb 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -445,8 +445,8 @@ handle_call({route, RoutingKey, VirtualHost, SuperStream}, _From, end catch exit:Error -> - rabbit_log:error("Error while looking up exchange ~tp, ~tp", - [rabbit_misc:rs(ExchangeName), Error]), + rabbit_log:warning("Error while looking up exchange ~tp, ~tp", + [rabbit_misc:rs(ExchangeName), Error]), {error, stream_not_found} end, {reply, Res, State}; diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index b3258d186f32..77fe67e8529d 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -3018,7 +3018,7 @@ handle_frame_post_auth(Transport, error -> response(Transport, Connection, - delete_stream, + delete_super_stream, CorrelationId, ?RESPONSE_CODE_ACCESS_REFUSED), rabbit_global_counters:increase_protocol_counter(stream, diff --git a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl index 3b7f1c654bb1..32914ffc1c27 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl @@ -339,7 +339,9 @@ command_versions() -> {heartbeat, ?VERSION_1, ?VERSION_1}, {route, ?VERSION_1, ?VERSION_1}, {partitions, ?VERSION_1, ?VERSION_1}, - {stream_stats, ?VERSION_1, ?VERSION_1}]. + {stream_stats, ?VERSION_1, ?VERSION_1}, + {create_super_stream, ?VERSION_1, ?VERSION_1}, + {delete_super_stream, ?VERSION_1, ?VERSION_1}]. filtering_supported() -> rabbit_feature_flags:is_enabled(stream_filtering). From c352eed64a8fdbdcda04dbf00a563f4c22f81141 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Wed, 8 Nov 2023 15:33:36 +0100 Subject: [PATCH 04/10] Fix specification --- deps/rabbitmq_stream_common/src/rabbit_stream_core.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl b/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl index db9814b3db3a..61f0de8bbdea 100644 --- a/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl +++ b/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl @@ -115,7 +115,7 @@ {exchange_command_versions, [{Command :: atom(), MinVersion :: command_version(), MaxVersion :: command_version()}]} | - {stream_stats, Stream :: binary()}, + {stream_stats, Stream :: binary()} | {create_super_stream, stream_name(), Partitions :: [binary()], RoutingKeys :: [binary()], Args :: #{binary() => binary()}} | {delete_super_stream, stream_name()}} | {response, correlation_id(), @@ -127,7 +127,7 @@ delete_stream | close | sasl_authenticate | - create_super_stream, + create_super_stream | delete_super_stream, response_code()} | {query_publisher_sequence, response_code(), sequence()} | From 043ddcba95145d6af0fa95a7e33f113cc721deda Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Wed, 8 Nov 2023 17:10:48 +0100 Subject: [PATCH 05/10] Add super stream creation/deletion test --- .../test/rabbit_stream_SUITE.erl | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl index 762ce22fa504..f10f3415f74a 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -26,6 +26,8 @@ -compile(nowarn_export_all). -compile(export_all). +-import(rabbit_stream_core, [frame/1]). + -define(WAIT, 5000). all() -> @@ -37,6 +39,7 @@ groups() -> test_stream, test_stream_tls, test_publish_v2, + test_super_stream_creation_deletion, test_gc_consumers, test_gc_publishers, test_update_secret, @@ -306,6 +309,54 @@ test_publish_v2(Config) -> closed = wait_for_socket_close(Transport, S, 10), ok. + +test_super_stream_creation_deletion(Config) -> + T = gen_tcp, + Port = get_port(T, Config), + Opts = get_opts(T), + {ok, S} = T:connect("localhost", Port, Opts), + C = rabbit_stream_core:init(0), + test_peer_properties(T, S, C), + test_authenticate(T, S, C), + + Ss = atom_to_binary(?FUNCTION_NAME, utf8), + Partitions = [unicode:characters_to_binary([Ss, <<"-">>, integer_to_binary(N)]) || N <- lists:seq(0, 2)], + Rks = [integer_to_binary(N) || N <- lists:seq(0, 2)], + SsCreationFrame = frame({request, 1, {create_super_stream, Ss, Partitions, Rks, #{}}}), + ok = T:send(S, SsCreationFrame), + {Cmd1, _} = receive_commands(T, S, C), + ?assertMatch({response, 1, {create_super_stream, ?RESPONSE_CODE_OK}}, + Cmd1), + + PartitionsFrame = frame({request, 1, {partitions, Ss}}), + ok = T:send(S, PartitionsFrame), + {Cmd2, _} = receive_commands(T, S, C), + ?assertMatch({response, 1, {partitions, ?RESPONSE_CODE_OK, Partitions}}, + Cmd2), + [begin + RouteFrame = frame({request, 1, {route, Rk, Ss}}), + ok = T:send(S, RouteFrame), + {Command, _} = receive_commands(T, S, C), + ?assertMatch({response, 1, {route, ?RESPONSE_CODE_OK, _}}, Command), + {response, 1, {route, ?RESPONSE_CODE_OK, [P]}} = Command, + ?assertEqual(unicode:characters_to_binary([Ss, <<"-">>, Rk]), P) + end || Rk <- Rks], + + SsDeletionFrame = frame({request, 1, {delete_super_stream, Ss}}), + ok = T:send(S, SsDeletionFrame), + {Cmd3, _} = receive_commands(T, S, C), + ?assertMatch({response, 1, {delete_super_stream, ?RESPONSE_CODE_OK}}, + Cmd3), + + ok = T:send(S, PartitionsFrame), + {Cmd4, _} = receive_commands(T, S, C), + ?assertMatch({response, 1, {partitions, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, []}}, + Cmd4), + + test_close(T, S, C), + closed = wait_for_socket_close(T, S, 10), + ok. + test_metadata(Config) -> Stream = atom_to_binary(?FUNCTION_NAME, utf8), Transport = gen_tcp, From 809e97fd38ce88ea650bf9fecb56685acc1edd6d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Mon, 13 Nov 2023 10:29:33 +0100 Subject: [PATCH 06/10] Use maybe syntax for super stream management permissions --- .../src/rabbit_stream_utils.erl | 49 +++++++------------ 1 file changed, 18 insertions(+), 31 deletions(-) diff --git a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl index 32914ffc1c27..ed87006003a7 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl @@ -16,6 +16,8 @@ -module(rabbit_stream_utils). +-feature(maybe_expr, enable). + %% API -export([enforce_correct_name/1, write_messages/5, @@ -202,32 +204,20 @@ check_read_permitted(Resource, User, Context) -> check_super_stream_management_permitted(VirtualHost, SuperStream, Partitions, User) -> Exchange = e(VirtualHost, SuperStream), - %% exchange creation - case check_configure_permitted(Exchange, User) of - ok -> - %% stream creations - case check_streams_permissions(fun check_configure_permitted/2, - VirtualHost, Partitions, - User) of - ok -> - %% binding from exchange - case check_read_permitted(Exchange, User, #{}) of - ok -> - %% binding to streams - case check_streams_permissions(fun check_write_permitted/2, - VirtualHost, Partitions, - User) of - ok -> - ok; - _ -> - error - end; - _ -> - error - end; - _ -> - error - end; + maybe + %% exchange creation + ok ?= check_configure_permitted(Exchange, User), + %% stream creations + ok ?= check_streams_permissions(fun check_configure_permitted/2, + VirtualHost, Partitions, + User), + %% binding from exchange + ok ?= check_read_permitted(Exchange, User, #{}), + %% binding to streams + ok ?= check_streams_permissions(fun check_write_permitted/2, + VirtualHost, Partitions, + User) + else _ -> error end. @@ -347,10 +337,7 @@ filtering_supported() -> rabbit_feature_flags:is_enabled(stream_filtering). q(VirtualHost, Name) -> - r(VirtualHost, Name, queue). + rabbit_misc:r(VirtualHost, queue, Name). e(VirtualHost, Name) -> - r(VirtualHost, Name, exchange). - -r(VirtualHost, Name, Kind) when Kind =:= exchange orelse Kind =:= queue -> - #resource{virtual_host = VirtualHost, name = Name, kind = Kind}. + rabbit_misc:r(VirtualHost, exchange, Name). From 9beda5702eaa518419434e7ef52bfadf83f2a310 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Mon, 13 Nov 2023 14:54:45 +0100 Subject: [PATCH 07/10] Use "binding key" lingo for super stream creation Instead of "routing key". --- deps/rabbitmq_stream/docs/PROTOCOL.adoc | 4 ++-- .../src/rabbit_stream_manager.erl | 22 +++++++++---------- .../src/rabbit_stream_reader.erl | 4 ++-- .../test/rabbit_stream_SUITE.erl | 6 ++--- .../src/rabbit_stream_core.erl | 14 ++++++------ .../test/rabbit_stream_core_SUITE.erl | 4 ++-- 6 files changed, 27 insertions(+), 27 deletions(-) diff --git a/deps/rabbitmq_stream/docs/PROTOCOL.adoc b/deps/rabbitmq_stream/docs/PROTOCOL.adoc index 6fb87769ed6d..fa98829b153b 100644 --- a/deps/rabbitmq_stream/docs/PROTOCOL.adoc +++ b/deps/rabbitmq_stream/docs/PROTOCOL.adoc @@ -767,13 +767,13 @@ StreamStatsResponse => Key Version CorrelationId ResponseCode Stats === CreateSuperStream ``` -CreateSuperStream => Key Version CorrelationId Name [Partition] [RoutingKey] Arguments +CreateSuperStream => Key Version CorrelationId Name [Partition] [BindingKey] Arguments Key => uint16 // 0x001d Version => uint16 CorrelationId => uint32 Name => string Partition => string - RoutingKey => string + BindingKey => string Arguments => [Argument] Argument => Key Value Key => string diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index 70068cb099bb..ed39e532af2e 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -73,7 +73,7 @@ create_super_stream(VirtualHost, Name, Partitions, Arguments, - RoutingKeys, + BindingKeys, Username) -> gen_server:call(?MODULE, {create_super_stream, @@ -81,7 +81,7 @@ create_super_stream(VirtualHost, Name, Partitions, Arguments, - RoutingKeys, + BindingKeys, Username}). -spec delete_super_stream(binary(), binary(), binary()) -> @@ -226,7 +226,7 @@ handle_call({create_super_stream, Name, Partitions, Arguments, - RoutingKeys, + BindingKeys, Username}, _From, State) -> case validate_super_stream_creation(VirtualHost, Name, Partitions) of @@ -273,7 +273,7 @@ handle_call({create_super_stream, add_super_stream_bindings(VirtualHost, Name, Partitions, - RoutingKeys, + BindingKeys, Username), case BindingsResult of ok -> @@ -758,15 +758,15 @@ declare_super_stream_exchange(VirtualHost, Name, Username) -> add_super_stream_bindings(VirtualHost, Name, Partitions, - RoutingKeys, + BindingKeys, Username) -> - PartitionsRoutingKeys = lists:zip(Partitions, RoutingKeys), + PartitionsBindingKeys = lists:zip(Partitions, BindingKeys), BindingsResult = - lists:foldl(fun ({Partition, RoutingKey}, {ok, Order}) -> + lists:foldl(fun ({Partition, BindingKey}, {ok, Order}) -> case add_super_stream_binding(VirtualHost, Name, Partition, - RoutingKey, + BindingKey, Order, Username) of @@ -778,7 +778,7 @@ add_super_stream_bindings(VirtualHost, (_, {{error, _Reason}, _Order} = Acc) -> Acc end, - {ok, 0}, PartitionsRoutingKeys), + {ok, 0}, PartitionsBindingKeys), case BindingsResult of {ok, _} -> ok; @@ -789,7 +789,7 @@ add_super_stream_bindings(VirtualHost, add_super_stream_binding(VirtualHost, SuperStream, Partition, - RoutingKey, + BindingKey, Order, Username) -> {ok, ExchangeNameBin} = @@ -806,7 +806,7 @@ add_super_stream_binding(VirtualHost, Order), case rabbit_binding:add(#binding{source = ExchangeName, destination = QueueName, - key = RoutingKey, + key = BindingKey, args = Arguments}, fun (_X, Q) when ?is_amqqueue(Q) -> try diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 77fe67e8529d..e21325ba5190 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -2883,7 +2883,7 @@ handle_frame_post_auth(Transport, user = #user{username = Username} = User} = Connection, State, {request, CorrelationId, - {create_super_stream, SuperStream, Partitions, RoutingKeys, Arguments}}) -> + {create_super_stream, SuperStream, Partitions, BindingKeys, Arguments}}) -> case rabbit_stream_utils:enforce_correct_name(SuperStream) of {ok, SuperStreamName} -> case rabbit_stream_utils:check_super_stream_management_permitted(VirtualHost, @@ -2895,7 +2895,7 @@ handle_frame_post_auth(Transport, SuperStreamName, Partitions, Arguments, - RoutingKeys, + BindingKeys, Username) of ok -> rabbit_log:debug("Created super stream ~tp", [SuperStreamName]), diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl index f10f3415f74a..75c2ea8d8209 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -321,8 +321,8 @@ test_super_stream_creation_deletion(Config) -> Ss = atom_to_binary(?FUNCTION_NAME, utf8), Partitions = [unicode:characters_to_binary([Ss, <<"-">>, integer_to_binary(N)]) || N <- lists:seq(0, 2)], - Rks = [integer_to_binary(N) || N <- lists:seq(0, 2)], - SsCreationFrame = frame({request, 1, {create_super_stream, Ss, Partitions, Rks, #{}}}), + Bks = [integer_to_binary(N) || N <- lists:seq(0, 2)], + SsCreationFrame = frame({request, 1, {create_super_stream, Ss, Partitions, Bks, #{}}}), ok = T:send(S, SsCreationFrame), {Cmd1, _} = receive_commands(T, S, C), ?assertMatch({response, 1, {create_super_stream, ?RESPONSE_CODE_OK}}, @@ -340,7 +340,7 @@ test_super_stream_creation_deletion(Config) -> ?assertMatch({response, 1, {route, ?RESPONSE_CODE_OK, _}}, Command), {response, 1, {route, ?RESPONSE_CODE_OK, [P]}} = Command, ?assertEqual(unicode:characters_to_binary([Ss, <<"-">>, Rk]), P) - end || Rk <- Rks], + end || Rk <- Bks], SsDeletionFrame = frame({request, 1, {delete_super_stream, Ss}}), ok = T:send(S, SsDeletionFrame), diff --git a/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl b/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl index 61f0de8bbdea..29555686b543 100644 --- a/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl +++ b/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl @@ -116,7 +116,7 @@ [{Command :: atom(), MinVersion :: command_version(), MaxVersion :: command_version()}]} | {stream_stats, Stream :: binary()} | - {create_super_stream, stream_name(), Partitions :: [binary()], RoutingKeys :: [binary()], Args :: #{binary() => binary()}} | + {create_super_stream, stream_name(), Partitions :: [binary()], BindingKeys :: [binary()], Args :: #{binary() => binary()}} | {delete_super_stream, stream_name()}} | {response, correlation_id(), {declare_publisher | @@ -608,12 +608,12 @@ request_body({exchange_command_versions = Tag, CommandVersions}) -> {Tag, [<>, CommandVersionsBin]}; request_body({stream_stats = Tag, Stream}) -> {Tag, <>}; -request_body({create_super_stream = Tag, SuperStream, Partitions, RoutingKeys, Args}) -> +request_body({create_super_stream = Tag, SuperStream, Partitions, BindingKeys, Args}) -> PartitionsBin = generate_list(Partitions), - RoutingKeysBin = generate_list(RoutingKeys), + BindingKeysBin = generate_list(BindingKeys), ArgsBin = generate_map(Args), {Tag, [<>, PartitionsBin, - <<(length(RoutingKeys)):32>>, RoutingKeysBin, + <<(length(BindingKeys)):32>>, BindingKeysBin, <<(map_size(Args)):32>>, ArgsBin]}; request_body({delete_super_stream = Tag, SuperStream}) -> {Tag, <>}. @@ -903,10 +903,10 @@ parse_request(<>) -> - {Partitions, <>} = list_of_strings(PartitionsCount, Rest0), - {RoutingKeys, <<_ArgumentsCount:32, Rest2/binary>>} = list_of_strings(RoutingKeysCount, Rest1), + {Partitions, <>} = list_of_strings(PartitionsCount, Rest0), + {BindingKeys, <<_ArgumentsCount:32, Rest2/binary>>} = list_of_strings(BindingKeysCount, Rest1), Args = parse_map(Rest2, #{}), - request(CorrelationId, {create_super_stream, Stream, Partitions, RoutingKeys, Args}); + request(CorrelationId, {create_super_stream, Stream, Partitions, BindingKeys, Args}); parse_request(< test_roundtrip({request, 99, {stream_stats, <<"stream_name">>}}), test_roundtrip({request, 99, {create_super_stream, <<"hello">>, - [<<"stream1">>, <<"stream2">>, <<"stream3">>], [<<"rk1">>, <<"rk2">>, <<"rk3">>], + [<<"stream1">>, <<"stream2">>, <<"stream3">>], [<<"bk1">>, <<"bk2">>, <<"bk3">>], Args}}), test_roundtrip({request, 99, {create_super_stream, <<"super_stream_name">>, - [<<"stream1">>, <<"stream2">>, <<"stream3">>], [<<"rk1">>, <<"rk2">>, <<"rk3">>], + [<<"stream1">>, <<"stream2">>, <<"stream3">>], [<<"bk1">>, <<"bk2">>, <<"bk3">>], #{}}}), test_roundtrip({request, 99, {delete_super_stream, <<"super_stream_name">>}}), From 2eb9b5f87b51c960b96506c88f38ad16c29d9471 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Mon, 13 Nov 2023 15:48:23 +0100 Subject: [PATCH 08/10] Make sure partitions and binding keys counts are equal --- .../src/rabbit_stream_manager.erl | 7 +++++-- .../test/rabbit_stream_SUITE.erl | 12 +++++++++-- .../test/rabbit_stream_manager_SUITE.erl | 20 +++++++++++++------ 3 files changed, 29 insertions(+), 10 deletions(-) diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index ed39e532af2e..b3a9b454edfa 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -229,7 +229,7 @@ handle_call({create_super_stream, BindingKeys, Username}, _From, State) -> - case validate_super_stream_creation(VirtualHost, Name, Partitions) of + case validate_super_stream_creation(VirtualHost, Name, Partitions, BindingKeys) of {error, Reason} -> {reply, {error, Reason}, State}; ok -> @@ -655,7 +655,10 @@ super_stream_partitions(VirtualHost, SuperStream) -> {error, stream_not_found} end. -validate_super_stream_creation(VirtualHost, Name, Partitions) -> +validate_super_stream_creation(_VirtualHost, _Name, Partitions, BindingKeys) + when length(Partitions) =/= length(BindingKeys) -> + {error, {validation_failed, "There must be the same number of partitions and binding keys"}}; +validate_super_stream_creation(VirtualHost, Name, Partitions, _BindingKeys) -> case exchange_exists(VirtualHost, Name) of {error, validation_failed} -> {error, diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl index 75c2ea8d8209..ad5b83306171 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -332,7 +332,7 @@ test_super_stream_creation_deletion(Config) -> ok = T:send(S, PartitionsFrame), {Cmd2, _} = receive_commands(T, S, C), ?assertMatch({response, 1, {partitions, ?RESPONSE_CODE_OK, Partitions}}, - Cmd2), + Cmd2), [begin RouteFrame = frame({request, 1, {route, Rk, Ss}}), ok = T:send(S, RouteFrame), @@ -351,7 +351,15 @@ test_super_stream_creation_deletion(Config) -> ok = T:send(S, PartitionsFrame), {Cmd4, _} = receive_commands(T, S, C), ?assertMatch({response, 1, {partitions, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, []}}, - Cmd4), + Cmd4), + + %% not the same number of partitions and binding keys + SsCreationBadFrame = frame({request, 1, {create_super_stream, Ss, + [<<"s1">>, <<"s2">>], [<<"bk1">>], #{}}}), + ok = T:send(S, SsCreationBadFrame), + {Cmd5, _} = receive_commands(T, S, C), + ?assertMatch({response, 1, {create_super_stream, ?RESPONSE_CODE_PRECONDITION_FAILED}}, + Cmd5), test_close(T, S, C), closed = wait_for_socket_close(T, S, 10), diff --git a/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl index 7ccc2deb4685..2b7a7f502e6b 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl @@ -99,14 +99,14 @@ lookup_member(Config) -> ?assertEqual({ok, deleted}, delete_stream(Config, Stream)). manage_super_stream(Config) -> - % create super stream + %% create super stream ?assertEqual(ok, create_super_stream(Config, <<"invoices">>, [<<"invoices-0">>, <<"invoices-1">>, <<"invoices-2">>], [<<"0">>, <<"1">>, <<"2">>])), - % get the correct partitions + %% get the correct partitions ?assertEqual({ok, [<<"invoices-0">>, <<"invoices-1">>, <<"invoices-2">>]}, partitions(Config, <<"invoices">>)), @@ -117,7 +117,7 @@ manage_super_stream(Config) -> <- [{<<"invoices-0">>, <<"0">>}, {<<"invoices-1">>, <<"1">>}, {<<"invoices-2">>, <<"2">>}]], - % get an error if trying to re-create it + %% get an error if trying to re-create it ?assertMatch({error, _}, create_super_stream(Config, <<"invoices">>, @@ -125,13 +125,13 @@ manage_super_stream(Config) -> <<"invoices-2">>], [<<"0">>, <<"1">>, <<"2">>])), - % can delete it + %% can delete it ?assertEqual(ok, delete_super_stream(Config, <<"invoices">>)), - % create a stream with the same name as a potential partition + %% create a stream with the same name as a potential partition ?assertMatch({ok, _}, create_stream(Config, <<"invoices-1">>)), - % cannot create the super stream because a partition already exists + %% cannot create the super stream because a partition already exists ?assertMatch({error, _}, create_super_stream(Config, <<"invoices">>, @@ -140,6 +140,14 @@ manage_super_stream(Config) -> [<<"0">>, <<"1">>, <<"2">>])), ?assertMatch({ok, _}, delete_stream(Config, <<"invoices-1">>)), + + %% not the same number of partitions and binding keys + ?assertMatch({error, {validation_failed, _}}, + create_super_stream(Config, + <<"invoices">>, + [<<"invoices-0">>, <<"invoices-1">>], + [<<"0">>])), + ok. partition_index(Config) -> From 22698a4adc810a9a33b63112b4316931343d2075 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Mon, 13 Nov 2023 15:55:08 +0100 Subject: [PATCH 09/10] Simplify maybe block --- deps/rabbitmq_stream/src/rabbit_stream_utils.erl | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl index ed87006003a7..3ee7a6fe78e5 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl @@ -202,6 +202,8 @@ check_write_permitted(Resource, User) -> check_read_permitted(Resource, User, Context) -> check_resource_access(User, Resource, read, Context). +-spec check_super_stream_management_permitted(binary(), binary(), [binary()], binary()) -> + ok | error. check_super_stream_management_permitted(VirtualHost, SuperStream, Partitions, User) -> Exchange = e(VirtualHost, SuperStream), maybe @@ -214,12 +216,9 @@ check_super_stream_management_permitted(VirtualHost, SuperStream, Partitions, Us %% binding from exchange ok ?= check_read_permitted(Exchange, User, #{}), %% binding to streams - ok ?= check_streams_permissions(fun check_write_permitted/2, - VirtualHost, Partitions, - User) - else - _ -> - error + check_streams_permissions(fun check_write_permitted/2, + VirtualHost, Partitions, + User) end. check_streams_permissions(Fun, VirtualHost, List, User) -> From 5e1155c52340ea55ecfa9213b6dd71b16d7e1cbf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Mon, 13 Nov 2023 16:30:37 +0100 Subject: [PATCH 10/10] Fix function spec --- deps/rabbitmq_stream/src/rabbit_stream_utils.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl index 3ee7a6fe78e5..4f5d86ec53bc 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl @@ -202,7 +202,7 @@ check_write_permitted(Resource, User) -> check_read_permitted(Resource, User, Context) -> check_resource_access(User, Resource, read, Context). --spec check_super_stream_management_permitted(binary(), binary(), [binary()], binary()) -> +-spec check_super_stream_management_permitted(rabbit_types:vhost(), binary(), [binary()], rabbit_types:user()) -> ok | error. check_super_stream_management_permitted(VirtualHost, SuperStream, Partitions, User) -> Exchange = e(VirtualHost, SuperStream),