diff --git a/deps/rabbitmq_stream/docs/PROTOCOL.adoc b/deps/rabbitmq_stream/docs/PROTOCOL.adoc index 581aad3270a9..fa98829b153b 100644 --- a/deps/rabbitmq_stream/docs/PROTOCOL.adoc +++ b/deps/rabbitmq_stream/docs/PROTOCOL.adoc @@ -230,6 +230,16 @@ used to make the difference between a request (0) and a response (1). Example fo |0x001c |Yes +|<> +|Client +|0x001d +|Yes + +|<> +|Client +|0x001e +|Yes + |=== === DeclarePublisher @@ -754,6 +764,31 @@ StreamStatsResponse => Key Version CorrelationId ResponseCode Stats Value => int64 ``` +=== CreateSuperStream + +``` +CreateSuperStream => Key Version CorrelationId Name [Partition] [BindingKey] Arguments + Key => uint16 // 0x001d + Version => uint16 + CorrelationId => uint32 + Name => string + Partition => string + BindingKey => string + Arguments => [Argument] + Argument => Key Value + Key => string + Value => string +``` + +=== DeleteSuperStream + +``` +Delete => Key Version CorrelationId Name + Key => uint16 // 0x001e + Version => uint16 + CorrelationId => uint32 + Name => string +``` == Authentication diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index 3abd74cc05a3..b3a9b454edfa 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -73,7 +73,7 @@ create_super_stream(VirtualHost, Name, Partitions, Arguments, - RoutingKeys, + BindingKeys, Username) -> gen_server:call(?MODULE, {create_super_stream, @@ -81,7 +81,7 @@ create_super_stream(VirtualHost, Name, Partitions, Arguments, - RoutingKeys, + BindingKeys, Username}). -spec delete_super_stream(binary(), binary(), binary()) -> @@ -226,10 +226,10 @@ handle_call({create_super_stream, Name, Partitions, Arguments, - RoutingKeys, + BindingKeys, Username}, _From, State) -> - case validate_super_stream_creation(VirtualHost, Name, Partitions) of + case validate_super_stream_creation(VirtualHost, Name, Partitions, BindingKeys) of {error, Reason} -> {reply, {error, Reason}, State}; ok -> @@ -273,7 +273,7 @@ handle_call({create_super_stream, add_super_stream_bindings(VirtualHost, Name, Partitions, - RoutingKeys, + BindingKeys, Username), case BindingsResult of ok -> @@ -445,8 +445,8 @@ handle_call({route, RoutingKey, VirtualHost, SuperStream}, _From, end catch exit:Error -> - rabbit_log:error("Error while looking up exchange ~tp, ~tp", - [rabbit_misc:rs(ExchangeName), Error]), + rabbit_log:warning("Error while looking up exchange ~tp, ~tp", + [rabbit_misc:rs(ExchangeName), Error]), {error, stream_not_found} end, {reply, Res, State}; @@ -655,7 +655,10 @@ super_stream_partitions(VirtualHost, SuperStream) -> {error, stream_not_found} end. -validate_super_stream_creation(VirtualHost, Name, Partitions) -> +validate_super_stream_creation(_VirtualHost, _Name, Partitions, BindingKeys) + when length(Partitions) =/= length(BindingKeys) -> + {error, {validation_failed, "There must be the same number of partitions and binding keys"}}; +validate_super_stream_creation(VirtualHost, Name, Partitions, _BindingKeys) -> case exchange_exists(VirtualHost, Name) of {error, validation_failed} -> {error, @@ -758,15 +761,15 @@ declare_super_stream_exchange(VirtualHost, Name, Username) -> add_super_stream_bindings(VirtualHost, Name, Partitions, - RoutingKeys, + BindingKeys, Username) -> - PartitionsRoutingKeys = lists:zip(Partitions, RoutingKeys), + PartitionsBindingKeys = lists:zip(Partitions, BindingKeys), BindingsResult = - lists:foldl(fun ({Partition, RoutingKey}, {ok, Order}) -> + lists:foldl(fun ({Partition, BindingKey}, {ok, Order}) -> case add_super_stream_binding(VirtualHost, Name, Partition, - RoutingKey, + BindingKey, Order, Username) of @@ -778,7 +781,7 @@ add_super_stream_bindings(VirtualHost, (_, {{error, _Reason}, _Order} = Acc) -> Acc end, - {ok, 0}, PartitionsRoutingKeys), + {ok, 0}, PartitionsBindingKeys), case BindingsResult of {ok, _} -> ok; @@ -789,7 +792,7 @@ add_super_stream_bindings(VirtualHost, add_super_stream_binding(VirtualHost, SuperStream, Partition, - RoutingKey, + BindingKey, Order, Username) -> {ok, ExchangeNameBin} = @@ -806,7 +809,7 @@ add_super_stream_binding(VirtualHost, Order), case rabbit_binding:add(#binding{source = ExchangeName, destination = QueueName, - key = RoutingKey, + key = BindingKey, args = Arguments}, fun (_X, Q) when ?is_amqqueue(Q) -> try diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 744e6e2501a9..e21325ba5190 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -179,10 +179,6 @@ open/3, close_sent/3]). - %% not called by gen_statem since gen_statem:enter_loop/4 is used - - %% states - callback_mode() -> [state_functions, state_enter]. @@ -1747,7 +1743,7 @@ handle_frame_post_auth(Transport, {declare_publisher, PublisherId, WriterRef, Stream}}) -> case rabbit_stream_utils:check_write_permitted(stream_r(Stream, Connection0), - User, #{}) + User) of ok -> case {maps:is_key(PublisherId, Publishers0), @@ -1876,7 +1872,6 @@ handle_frame_post_auth(Transport, handle_frame_post_auth(Transport, #stream_connection{socket = S, credits = Credits, - virtual_host = VirtualHost, user = User, publishers = Publishers} = Connection, @@ -1890,15 +1885,8 @@ handle_frame_post_auth(Transport, message_counters = Counters} = Publisher, increase_messages_received(Counters, MessageCount), - case rabbit_stream_utils:check_write_permitted(#resource{name = - Stream, - kind = - queue, - virtual_host - = - VirtualHost}, - User, #{}) - of + case rabbit_stream_utils:check_write_permitted(stream_r(Stream, Connection), + User) of ok -> rabbit_stream_utils:write_messages(Version, Leader, Reference, @@ -2294,18 +2282,11 @@ handle_frame_post_auth(Transport, {Connection, State} end; handle_frame_post_auth(_Transport, - #stream_connection{virtual_host = VirtualHost, - user = User} = - Connection, + #stream_connection{user = User} = Connection, State, {store_offset, Reference, Stream, Offset}) -> - case rabbit_stream_utils:check_write_permitted(#resource{name = - Stream, - kind = queue, - virtual_host = - VirtualHost}, - User, #{}) - of + case rabbit_stream_utils:check_write_permitted(stream_r(Stream, Connection), + User) of ok -> case lookup_leader(Stream, Connection) of {error, Error} -> @@ -2398,24 +2379,13 @@ handle_frame_post_auth(Transport, end; handle_frame_post_auth(Transport, #stream_connection{virtual_host = VirtualHost, - user = - #user{username = Username} = - User} = - Connection, + user = #user{username = Username} = User} = Connection, State, {request, CorrelationId, {create_stream, Stream, Arguments}}) -> case rabbit_stream_utils:enforce_correct_name(Stream) of {ok, StreamName} -> - case rabbit_stream_utils:check_configure_permitted(#resource{name = - StreamName, - kind = - queue, - virtual_host - = - VirtualHost}, - User, #{}) - of + case rabbit_stream_utils:check_configure_permitted(stream_r(StreamName, Connection), User) of ok -> case rabbit_stream_manager:create(VirtualHost, StreamName, @@ -2489,19 +2459,10 @@ handle_frame_post_auth(Transport, handle_frame_post_auth(Transport, #stream_connection{socket = S, virtual_host = VirtualHost, - user = - #user{username = Username} = - User} = - Connection, + user = #user{username = Username} = User} = Connection, State, {request, CorrelationId, {delete_stream, Stream}}) -> - case rabbit_stream_utils:check_configure_permitted(#resource{name = - Stream, - kind = queue, - virtual_host = - VirtualHost}, - User, #{}) - of + case rabbit_stream_utils:check_configure_permitted(stream_r(Stream, Connection), User) of ok -> case rabbit_stream_manager:delete(VirtualHost, Stream, Username) of {ok, deleted} -> @@ -2917,6 +2878,154 @@ handle_frame_post_auth(Transport, Frame = rabbit_stream_core:frame({response, CorrelationId, Response}), send(Transport, S, Frame), {Connection, State}; +handle_frame_post_auth(Transport, + #stream_connection{virtual_host = VirtualHost, + user = #user{username = Username} = User} = Connection, + State, + {request, CorrelationId, + {create_super_stream, SuperStream, Partitions, BindingKeys, Arguments}}) -> + case rabbit_stream_utils:enforce_correct_name(SuperStream) of + {ok, SuperStreamName} -> + case rabbit_stream_utils:check_super_stream_management_permitted(VirtualHost, + SuperStreamName, + Partitions, + User) of + ok -> + case rabbit_stream_manager:create_super_stream(VirtualHost, + SuperStreamName, + Partitions, + Arguments, + BindingKeys, + Username) of + ok -> + rabbit_log:debug("Created super stream ~tp", [SuperStreamName]), + response_ok(Transport, + Connection, + create_super_stream, + CorrelationId), + {Connection, State}; + {error, {validation_failed, Msg}} -> + rabbit_log:warning("Error while trying to create super stream ~tp: ~tp", + [SuperStreamName, Msg]), + response(Transport, + Connection, + create_super_stream, + CorrelationId, + ?RESPONSE_CODE_PRECONDITION_FAILED), + rabbit_global_counters:increase_protocol_counter(stream, + ?PRECONDITION_FAILED, + 1), + {Connection, State}; + {error, {reference_already_exists, Msg}} -> + rabbit_log:warning("Error while trying to create super stream ~tp: ~tp", + [SuperStreamName, Msg]), + response(Transport, + Connection, + create_super_stream, + CorrelationId, + ?RESPONSE_CODE_STREAM_ALREADY_EXISTS), + rabbit_global_counters:increase_protocol_counter(stream, + ?STREAM_ALREADY_EXISTS, + 1), + {Connection, State}; + {error, Error} -> + rabbit_log:warning("Error while trying to create super stream ~tp: ~tp", + [SuperStreamName, Error]), + response(Transport, + Connection, + create_super_stream, + CorrelationId, + ?RESPONSE_CODE_INTERNAL_ERROR), + rabbit_global_counters:increase_protocol_counter(stream, + ?INTERNAL_ERROR, + 1), + {Connection, State} + end; + error -> + response(Transport, + Connection, + create_super_stream, + CorrelationId, + ?RESPONSE_CODE_ACCESS_REFUSED), + rabbit_global_counters:increase_protocol_counter(stream, + ?ACCESS_REFUSED, + 1), + {Connection, State} + end; + _ -> + response(Transport, + Connection, + create_super_stream, + CorrelationId, + ?RESPONSE_CODE_PRECONDITION_FAILED), + rabbit_global_counters:increase_protocol_counter(stream, + ?PRECONDITION_FAILED, + 1), + {Connection, State} + end; +handle_frame_post_auth(Transport, + #stream_connection{socket = S, + virtual_host = VirtualHost, + user = #user{username = Username} = User} = Connection, + State, + {request, CorrelationId, {delete_super_stream, SuperStream}}) -> + Partitions = case rabbit_stream_manager:partitions(VirtualHost, SuperStream) of + {ok, Ps} -> + Ps; + _ -> + [] + end, + case rabbit_stream_utils:check_super_stream_management_permitted(VirtualHost, + SuperStream, + Partitions, + User) of + ok -> + case rabbit_stream_manager:delete_super_stream(VirtualHost, SuperStream, Username) of + ok -> + response_ok(Transport, + Connection, + delete_super_stream, + CorrelationId), + {Connection1, State1} = clean_state_after_super_stream_deletion(Partitions, + Connection, + State, + Transport, S), + {Connection1, State1}; + {error, stream_not_found} -> + response(Transport, + Connection, + delete_super_stream, + CorrelationId, + ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST), + rabbit_global_counters:increase_protocol_counter(stream, + ?STREAM_DOES_NOT_EXIST, + 1), + {Connection, State}; + {error, Error} -> + rabbit_log:warning("Error while trying to delete super stream ~tp: ~tp", + [SuperStream, Error]), + response(Transport, + Connection, + delete_super_stream, + CorrelationId, + ?RESPONSE_CODE_PRECONDITION_FAILED), + rabbit_global_counters:increase_protocol_counter(stream, + ?PRECONDITION_FAILED, + 1), + {Connection, State} + + end; + error -> + response(Transport, + Connection, + delete_super_stream, + CorrelationId, + ?RESPONSE_CODE_ACCESS_REFUSED), + rabbit_global_counters:increase_protocol_counter(stream, + ?ACCESS_REFUSED, + 1), + {Connection, State} + end; handle_frame_post_auth(Transport, #stream_connection{socket = S} = Connection, State, @@ -3248,6 +3357,27 @@ stream_r(Stream, #stream_connection{virtual_host = VHost}) -> kind = queue, virtual_host = VHost}. +clean_state_after_super_stream_deletion(Partitions, Connection, State, Transport, S) -> + lists:foldl(fun(Partition, {Conn, St}) -> + case + clean_state_after_stream_deletion_or_failure(undefined, Partition, + Conn, + St) + of + {cleaned, NewConnection, NewState} -> + Command = {metadata_update, Partition, + ?RESPONSE_CODE_STREAM_NOT_AVAILABLE}, + Frame = rabbit_stream_core:frame(Command), + send(Transport, S, Frame), + rabbit_global_counters:increase_protocol_counter(stream, + ?STREAM_NOT_AVAILABLE, + 1), + {NewConnection, NewState}; + {not_cleaned, SameConnection, SameState} -> + {SameConnection, SameState} + end + end, {Connection, State}, Partitions). + clean_state_after_stream_deletion_or_failure(MemberPid, Stream, #stream_connection{virtual_host = VirtualHost, diff --git a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl index 74fb467e23bf..4f5d86ec53bc 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl @@ -16,14 +16,16 @@ -module(rabbit_stream_utils). +-feature(maybe_expr, enable). + %% API -export([enforce_correct_name/1, write_messages/5, parse_map/2, auth_mechanisms/1, auth_mechanism_to_module/2, - check_configure_permitted/3, - check_write_permitted/3, + check_configure_permitted/2, + check_write_permitted/2, check_read_permitted/3, extract_stream_list/2, sort_partitions/1, @@ -32,7 +34,8 @@ filter_defined/1, filter_spec/1, command_versions/0, - filtering_supported/0]). + filtering_supported/0, + check_super_stream_management_permitted/4]). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -190,15 +193,49 @@ check_resource_access(User, Resource, Perm, Context) -> end end. -check_configure_permitted(Resource, User, Context) -> - check_resource_access(User, Resource, configure, Context). +check_configure_permitted(Resource, User) -> + check_resource_access(User, Resource, configure, #{}). -check_write_permitted(Resource, User, Context) -> - check_resource_access(User, Resource, write, Context). +check_write_permitted(Resource, User) -> + check_resource_access(User, Resource, write, #{}). check_read_permitted(Resource, User, Context) -> check_resource_access(User, Resource, read, Context). +-spec check_super_stream_management_permitted(rabbit_types:vhost(), binary(), [binary()], rabbit_types:user()) -> + ok | error. +check_super_stream_management_permitted(VirtualHost, SuperStream, Partitions, User) -> + Exchange = e(VirtualHost, SuperStream), + maybe + %% exchange creation + ok ?= check_configure_permitted(Exchange, User), + %% stream creations + ok ?= check_streams_permissions(fun check_configure_permitted/2, + VirtualHost, Partitions, + User), + %% binding from exchange + ok ?= check_read_permitted(Exchange, User, #{}), + %% binding to streams + check_streams_permissions(fun check_write_permitted/2, + VirtualHost, Partitions, + User) + end. + +check_streams_permissions(Fun, VirtualHost, List, User) -> + case lists:all(fun(S) -> + case Fun(q(VirtualHost, S), User) of + ok -> + true; + _ -> + false + end + end, List) of + true -> + ok; + _ -> + error + end. + extract_stream_list(<<>>, Streams) -> Streams; extract_stream_list(<>, @@ -291,7 +328,15 @@ command_versions() -> {heartbeat, ?VERSION_1, ?VERSION_1}, {route, ?VERSION_1, ?VERSION_1}, {partitions, ?VERSION_1, ?VERSION_1}, - {stream_stats, ?VERSION_1, ?VERSION_1}]. + {stream_stats, ?VERSION_1, ?VERSION_1}, + {create_super_stream, ?VERSION_1, ?VERSION_1}, + {delete_super_stream, ?VERSION_1, ?VERSION_1}]. filtering_supported() -> rabbit_feature_flags:is_enabled(stream_filtering). + +q(VirtualHost, Name) -> + rabbit_misc:r(VirtualHost, queue, Name). + +e(VirtualHost, Name) -> + rabbit_misc:r(VirtualHost, exchange, Name). diff --git a/deps/rabbitmq_stream/test/commands_SUITE.erl b/deps/rabbitmq_stream/test/commands_SUITE.erl index fce0c170b61d..17cf5dbc5890 100644 --- a/deps/rabbitmq_stream/test/commands_SUITE.erl +++ b/deps/rabbitmq_stream/test/commands_SUITE.erl @@ -25,7 +25,7 @@ 'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamPublishersCommand'). -define(COMMAND_ADD_SUPER_STREAM, 'Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand'). --define(COMMAND_DELETE_SUPER_STREAM, +-define(COMMAND_DELETE_SUPER_STREAM_CLI, 'Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteSuperStreamCommand'). -define(COMMAND_LIST_CONSUMER_GROUPS, 'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConsumerGroupsCommand'). @@ -719,15 +719,15 @@ add_super_stream_validate(_Config) -> delete_super_stream_merge_defaults(_Config) -> ?assertMatch({[<<"super-stream">>], #{vhost := <<"/">>}}, - ?COMMAND_DELETE_SUPER_STREAM:merge_defaults([<<"super-stream">>], + ?COMMAND_DELETE_SUPER_STREAM_CLI:merge_defaults([<<"super-stream">>], #{})), ok. delete_super_stream_validate(_Config) -> ?assertMatch({validation_failure, not_enough_args}, - ?COMMAND_DELETE_SUPER_STREAM:validate([], #{})), + ?COMMAND_DELETE_SUPER_STREAM_CLI:validate([], #{})), ?assertMatch({validation_failure, too_many_args}, - ?COMMAND_DELETE_SUPER_STREAM:validate([<<"a">>, <<"b">>], + ?COMMAND_DELETE_SUPER_STREAM_CLI:validate([<<"a">>, <<"b">>], #{})), ?assertEqual(ok, ?COMMAND_ADD_SUPER_STREAM:validate([<<"a">>], #{})), ok. @@ -748,7 +748,7 @@ add_delete_super_stream_run(Config) -> [<<"invoices-0">>, <<"invoices-1">>, <<"invoices-2">>]}, partitions(Config, <<"invoices">>)), ?assertMatch({ok, _}, - ?COMMAND_DELETE_SUPER_STREAM:run([<<"invoices">>], Opts)), + ?COMMAND_DELETE_SUPER_STREAM_CLI:run([<<"invoices">>], Opts)), ?assertEqual({error, stream_not_found}, partitions(Config, <<"invoices">>)), @@ -763,7 +763,7 @@ add_delete_super_stream_run(Config) -> <<"invoices-apac">>]}, partitions(Config, <<"invoices">>)), ?assertMatch({ok, _}, - ?COMMAND_DELETE_SUPER_STREAM:run([<<"invoices">>], Opts)), + ?COMMAND_DELETE_SUPER_STREAM_CLI:run([<<"invoices">>], Opts)), ?assertEqual({error, stream_not_found}, partitions(Config, <<"invoices">>)), @@ -797,7 +797,7 @@ add_delete_super_stream_run(Config) -> rabbit_misc:table_lookup(Args, <<"x-queue-type">>)), ?assertMatch({ok, _}, - ?COMMAND_DELETE_SUPER_STREAM:run([<<"invoices">>], Opts)), + ?COMMAND_DELETE_SUPER_STREAM_CLI:run([<<"invoices">>], Opts)), ok. diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl index 762ce22fa504..ad5b83306171 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -26,6 +26,8 @@ -compile(nowarn_export_all). -compile(export_all). +-import(rabbit_stream_core, [frame/1]). + -define(WAIT, 5000). all() -> @@ -37,6 +39,7 @@ groups() -> test_stream, test_stream_tls, test_publish_v2, + test_super_stream_creation_deletion, test_gc_consumers, test_gc_publishers, test_update_secret, @@ -306,6 +309,62 @@ test_publish_v2(Config) -> closed = wait_for_socket_close(Transport, S, 10), ok. + +test_super_stream_creation_deletion(Config) -> + T = gen_tcp, + Port = get_port(T, Config), + Opts = get_opts(T), + {ok, S} = T:connect("localhost", Port, Opts), + C = rabbit_stream_core:init(0), + test_peer_properties(T, S, C), + test_authenticate(T, S, C), + + Ss = atom_to_binary(?FUNCTION_NAME, utf8), + Partitions = [unicode:characters_to_binary([Ss, <<"-">>, integer_to_binary(N)]) || N <- lists:seq(0, 2)], + Bks = [integer_to_binary(N) || N <- lists:seq(0, 2)], + SsCreationFrame = frame({request, 1, {create_super_stream, Ss, Partitions, Bks, #{}}}), + ok = T:send(S, SsCreationFrame), + {Cmd1, _} = receive_commands(T, S, C), + ?assertMatch({response, 1, {create_super_stream, ?RESPONSE_CODE_OK}}, + Cmd1), + + PartitionsFrame = frame({request, 1, {partitions, Ss}}), + ok = T:send(S, PartitionsFrame), + {Cmd2, _} = receive_commands(T, S, C), + ?assertMatch({response, 1, {partitions, ?RESPONSE_CODE_OK, Partitions}}, + Cmd2), + [begin + RouteFrame = frame({request, 1, {route, Rk, Ss}}), + ok = T:send(S, RouteFrame), + {Command, _} = receive_commands(T, S, C), + ?assertMatch({response, 1, {route, ?RESPONSE_CODE_OK, _}}, Command), + {response, 1, {route, ?RESPONSE_CODE_OK, [P]}} = Command, + ?assertEqual(unicode:characters_to_binary([Ss, <<"-">>, Rk]), P) + end || Rk <- Bks], + + SsDeletionFrame = frame({request, 1, {delete_super_stream, Ss}}), + ok = T:send(S, SsDeletionFrame), + {Cmd3, _} = receive_commands(T, S, C), + ?assertMatch({response, 1, {delete_super_stream, ?RESPONSE_CODE_OK}}, + Cmd3), + + ok = T:send(S, PartitionsFrame), + {Cmd4, _} = receive_commands(T, S, C), + ?assertMatch({response, 1, {partitions, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, []}}, + Cmd4), + + %% not the same number of partitions and binding keys + SsCreationBadFrame = frame({request, 1, {create_super_stream, Ss, + [<<"s1">>, <<"s2">>], [<<"bk1">>], #{}}}), + ok = T:send(S, SsCreationBadFrame), + {Cmd5, _} = receive_commands(T, S, C), + ?assertMatch({response, 1, {create_super_stream, ?RESPONSE_CODE_PRECONDITION_FAILED}}, + Cmd5), + + test_close(T, S, C), + closed = wait_for_socket_close(T, S, 10), + ok. + test_metadata(Config) -> Stream = atom_to_binary(?FUNCTION_NAME, utf8), Transport = gen_tcp, diff --git a/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl index 7ccc2deb4685..2b7a7f502e6b 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl @@ -99,14 +99,14 @@ lookup_member(Config) -> ?assertEqual({ok, deleted}, delete_stream(Config, Stream)). manage_super_stream(Config) -> - % create super stream + %% create super stream ?assertEqual(ok, create_super_stream(Config, <<"invoices">>, [<<"invoices-0">>, <<"invoices-1">>, <<"invoices-2">>], [<<"0">>, <<"1">>, <<"2">>])), - % get the correct partitions + %% get the correct partitions ?assertEqual({ok, [<<"invoices-0">>, <<"invoices-1">>, <<"invoices-2">>]}, partitions(Config, <<"invoices">>)), @@ -117,7 +117,7 @@ manage_super_stream(Config) -> <- [{<<"invoices-0">>, <<"0">>}, {<<"invoices-1">>, <<"1">>}, {<<"invoices-2">>, <<"2">>}]], - % get an error if trying to re-create it + %% get an error if trying to re-create it ?assertMatch({error, _}, create_super_stream(Config, <<"invoices">>, @@ -125,13 +125,13 @@ manage_super_stream(Config) -> <<"invoices-2">>], [<<"0">>, <<"1">>, <<"2">>])), - % can delete it + %% can delete it ?assertEqual(ok, delete_super_stream(Config, <<"invoices">>)), - % create a stream with the same name as a potential partition + %% create a stream with the same name as a potential partition ?assertMatch({ok, _}, create_stream(Config, <<"invoices-1">>)), - % cannot create the super stream because a partition already exists + %% cannot create the super stream because a partition already exists ?assertMatch({error, _}, create_super_stream(Config, <<"invoices">>, @@ -140,6 +140,14 @@ manage_super_stream(Config) -> [<<"0">>, <<"1">>, <<"2">>])), ?assertMatch({ok, _}, delete_stream(Config, <<"invoices-1">>)), + + %% not the same number of partitions and binding keys + ?assertMatch({error, {validation_failed, _}}, + create_super_stream(Config, + <<"invoices">>, + [<<"invoices-0">>, <<"invoices-1">>], + [<<"0">>])), + ok. partition_index(Config) -> diff --git a/deps/rabbitmq_stream_common/include/rabbit_stream.hrl b/deps/rabbitmq_stream_common/include/rabbit_stream.hrl index 82c173be0e6d..d465d551e3cc 100644 --- a/deps/rabbitmq_stream_common/include/rabbit_stream.hrl +++ b/deps/rabbitmq_stream_common/include/rabbit_stream.hrl @@ -26,6 +26,8 @@ -define(COMMAND_CONSUMER_UPDATE, 26). -define(COMMAND_EXCHANGE_COMMAND_VERSIONS, 27). -define(COMMAND_STREAM_STATS, 28). +-define(COMMAND_CREATE_SUPER_STREAM, 29). +-define(COMMAND_DELETE_SUPER_STREAM, 30). -define(REQUEST, 0). -define(RESPONSE, 1). diff --git a/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl b/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl index e7343ff1a99f..29555686b543 100644 --- a/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl +++ b/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl @@ -115,7 +115,9 @@ {exchange_command_versions, [{Command :: atom(), MinVersion :: command_version(), MaxVersion :: command_version()}]} | - {stream_stats, Stream :: binary()}} | + {stream_stats, Stream :: binary()} | + {create_super_stream, stream_name(), Partitions :: [binary()], BindingKeys :: [binary()], Args :: #{binary() => binary()}} | + {delete_super_stream, stream_name()}} | {response, correlation_id(), {declare_publisher | delete_publisher | @@ -124,7 +126,9 @@ create_stream | delete_stream | close | - sasl_authenticate, + sasl_authenticate | + create_super_stream | + delete_super_stream, response_code()} | {query_publisher_sequence, response_code(), sequence()} | {open, response_code(), #{binary() => binary()}} | @@ -561,9 +565,7 @@ request_body({create_stream = Tag, Stream, Args}) -> request_body({delete_stream = Tag, Stream}) -> {Tag, <>}; request_body({metadata = Tag, Streams}) -> - StreamsBin = - lists:foldr(fun(Stream, Acc) -> [<> | Acc] end, [], - Streams), + StreamsBin = generate_list(Streams), {Tag, [<<(length(Streams)):32>>, StreamsBin]}; request_body({peer_properties = Tag, Props}) -> PropsBin = generate_map(Props), @@ -605,7 +607,16 @@ request_body({exchange_command_versions = Tag, CommandVersions}) -> CommandVersionsLength = length(CommandVersions), {Tag, [<>, CommandVersionsBin]}; request_body({stream_stats = Tag, Stream}) -> - {Tag, <>}. + {Tag, <>}; +request_body({create_super_stream = Tag, SuperStream, Partitions, BindingKeys, Args}) -> + PartitionsBin = generate_list(Partitions), + BindingKeysBin = generate_list(BindingKeys), + ArgsBin = generate_map(Args), + {Tag, [<>, PartitionsBin, + <<(length(BindingKeys)):32>>, BindingKeysBin, + <<(map_size(Args)):32>>, ArgsBin]}; +request_body({delete_super_stream = Tag, SuperStream}) -> + {Tag, <>}. append_data(Prev, Data) when is_binary(Prev) -> [Prev, Data]; @@ -885,6 +896,23 @@ parse_request(<>) -> request(CorrelationId, {stream_stats, Stream}); +parse_request(<>) -> + {Partitions, <>} = list_of_strings(PartitionsCount, Rest0), + {BindingKeys, <<_ArgumentsCount:32, Rest2/binary>>} = list_of_strings(BindingKeysCount, Rest1), + Args = parse_map(Rest2, #{}), + request(CorrelationId, {create_super_stream, Stream, Partitions, BindingKeys, Args}); +parse_request(<>) -> + request(CorrelationId, {delete_super_stream, SuperStream}); parse_request(Bin) -> {unknown, Bin}. @@ -1052,10 +1080,24 @@ parse_int_map(<<>>, Acc) -> parse_int_map(<>, Acc) -> parse_int_map(Rem, Acc#{Key => Value}). +generate_list(List) -> + lists:foldr(fun(E, Acc) -> [<> | Acc] end, [], + List). + generate_map(Map) -> maps:fold(fun(K, V, Acc) -> [<> | Acc] end, [], Map). +list_of_strings(Count, Bin) -> + list_of_strings(Count, [], Bin). + +list_of_strings(_, Acc, <<>>) -> + {lists:reverse(Acc), <<>>}; +list_of_strings(0, Acc, Rest) -> + {lists:reverse(Acc), Rest}; +list_of_strings(Count, Acc, <>) -> + list_of_strings(Count - 1, [String | Acc], Rem). + list_of_strings(<<>>) -> []; list_of_strings(<>) -> @@ -1135,7 +1177,11 @@ command_id(consumer_update) -> command_id(exchange_command_versions) -> ?COMMAND_EXCHANGE_COMMAND_VERSIONS; command_id(stream_stats) -> - ?COMMAND_STREAM_STATS. + ?COMMAND_STREAM_STATS; +command_id(create_super_stream) -> + ?COMMAND_CREATE_SUPER_STREAM; +command_id(delete_super_stream) -> + ?COMMAND_DELETE_SUPER_STREAM. parse_command_id(?COMMAND_DECLARE_PUBLISHER) -> declare_publisher; @@ -1192,7 +1238,11 @@ parse_command_id(?COMMAND_CONSUMER_UPDATE) -> parse_command_id(?COMMAND_EXCHANGE_COMMAND_VERSIONS) -> exchange_command_versions; parse_command_id(?COMMAND_STREAM_STATS) -> - stream_stats. + stream_stats; +parse_command_id(?COMMAND_CREATE_SUPER_STREAM) -> + create_super_stream; +parse_command_id(?COMMAND_DELETE_SUPER_STREAM) -> + delete_super_stream. element_index(Element, List) -> element_index(Element, List, 0). diff --git a/deps/rabbitmq_stream_common/test/rabbit_stream_core_SUITE.erl b/deps/rabbitmq_stream_common/test/rabbit_stream_core_SUITE.erl index 4934b02affa8..2e859b99d1fa 100644 --- a/deps/rabbitmq_stream_common/test/rabbit_stream_core_SUITE.erl +++ b/deps/rabbitmq_stream_common/test/rabbit_stream_core_SUITE.erl @@ -110,6 +110,15 @@ roundtrip(_Config) -> {exchange_command_versions, [{deliver, ?VERSION_1, ?VERSION_1}]}}), test_roundtrip({request, 99, {stream_stats, <<"stream_name">>}}), + test_roundtrip({request, 99, + {create_super_stream, <<"hello">>, + [<<"stream1">>, <<"stream2">>, <<"stream3">>], [<<"bk1">>, <<"bk2">>, <<"bk3">>], + Args}}), + test_roundtrip({request, 99, + {create_super_stream, <<"super_stream_name">>, + [<<"stream1">>, <<"stream2">>, <<"stream3">>], [<<"bk1">>, <<"bk2">>, <<"bk3">>], + #{}}}), + test_roundtrip({request, 99, {delete_super_stream, <<"super_stream_name">>}}), %% RESPONSES [test_roundtrip({response, 99, {Tag, 53}}) @@ -120,6 +129,8 @@ roundtrip(_Config) -> unsubscribe, create_stream, delete_stream, + create_super_stream, + delete_super_stream, open, close]],