Skip to content

Commit

Permalink
Merge pull request rabbitmq#9813 from rabbitmq/super-stream-frames
Browse files Browse the repository at this point in the history
Add super stream creation/deletion commands
  • Loading branch information
michaelklishin authored Nov 15, 2023
2 parents 2adec32 + 5e1155c commit 6d9e9b9
Show file tree
Hide file tree
Showing 10 changed files with 436 additions and 93 deletions.
35 changes: 35 additions & 0 deletions deps/rabbitmq_stream/docs/PROTOCOL.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,16 @@ used to make the difference between a request (0) and a response (1). Example fo
|0x001c
|Yes

|<<createsuperstream>>
|Client
|0x001d
|Yes

|<<deletesuperstream>>
|Client
|0x001e
|Yes

|===

=== DeclarePublisher
Expand Down Expand Up @@ -754,6 +764,31 @@ StreamStatsResponse => Key Version CorrelationId ResponseCode Stats
Value => int64
```

=== CreateSuperStream

```
CreateSuperStream => Key Version CorrelationId Name [Partition] [BindingKey] Arguments
Key => uint16 // 0x001d
Version => uint16
CorrelationId => uint32
Name => string
Partition => string
BindingKey => string
Arguments => [Argument]
Argument => Key Value
Key => string
Value => string
```

=== DeleteSuperStream

```
Delete => Key Version CorrelationId Name
Key => uint16 // 0x001e
Version => uint16
CorrelationId => uint32
Name => string
```

== Authentication

Expand Down
33 changes: 18 additions & 15 deletions deps/rabbitmq_stream/src/rabbit_stream_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,15 @@ create_super_stream(VirtualHost,
Name,
Partitions,
Arguments,
RoutingKeys,
BindingKeys,
Username) ->
gen_server:call(?MODULE,
{create_super_stream,
VirtualHost,
Name,
Partitions,
Arguments,
RoutingKeys,
BindingKeys,
Username}).

-spec delete_super_stream(binary(), binary(), binary()) ->
Expand Down Expand Up @@ -226,10 +226,10 @@ handle_call({create_super_stream,
Name,
Partitions,
Arguments,
RoutingKeys,
BindingKeys,
Username},
_From, State) ->
case validate_super_stream_creation(VirtualHost, Name, Partitions) of
case validate_super_stream_creation(VirtualHost, Name, Partitions, BindingKeys) of
{error, Reason} ->
{reply, {error, Reason}, State};
ok ->
Expand Down Expand Up @@ -273,7 +273,7 @@ handle_call({create_super_stream,
add_super_stream_bindings(VirtualHost,
Name,
Partitions,
RoutingKeys,
BindingKeys,
Username),
case BindingsResult of
ok ->
Expand Down Expand Up @@ -445,8 +445,8 @@ handle_call({route, RoutingKey, VirtualHost, SuperStream}, _From,
end
catch
exit:Error ->
rabbit_log:error("Error while looking up exchange ~tp, ~tp",
[rabbit_misc:rs(ExchangeName), Error]),
rabbit_log:warning("Error while looking up exchange ~tp, ~tp",
[rabbit_misc:rs(ExchangeName), Error]),
{error, stream_not_found}
end,
{reply, Res, State};
Expand Down Expand Up @@ -655,7 +655,10 @@ super_stream_partitions(VirtualHost, SuperStream) ->
{error, stream_not_found}
end.

validate_super_stream_creation(VirtualHost, Name, Partitions) ->
validate_super_stream_creation(_VirtualHost, _Name, Partitions, BindingKeys)
when length(Partitions) =/= length(BindingKeys) ->
{error, {validation_failed, "There must be the same number of partitions and binding keys"}};
validate_super_stream_creation(VirtualHost, Name, Partitions, _BindingKeys) ->
case exchange_exists(VirtualHost, Name) of
{error, validation_failed} ->
{error,
Expand Down Expand Up @@ -758,15 +761,15 @@ declare_super_stream_exchange(VirtualHost, Name, Username) ->
add_super_stream_bindings(VirtualHost,
Name,
Partitions,
RoutingKeys,
BindingKeys,
Username) ->
PartitionsRoutingKeys = lists:zip(Partitions, RoutingKeys),
PartitionsBindingKeys = lists:zip(Partitions, BindingKeys),
BindingsResult =
lists:foldl(fun ({Partition, RoutingKey}, {ok, Order}) ->
lists:foldl(fun ({Partition, BindingKey}, {ok, Order}) ->
case add_super_stream_binding(VirtualHost,
Name,
Partition,
RoutingKey,
BindingKey,
Order,
Username)
of
Expand All @@ -778,7 +781,7 @@ add_super_stream_bindings(VirtualHost,
(_, {{error, _Reason}, _Order} = Acc) ->
Acc
end,
{ok, 0}, PartitionsRoutingKeys),
{ok, 0}, PartitionsBindingKeys),
case BindingsResult of
{ok, _} ->
ok;
Expand All @@ -789,7 +792,7 @@ add_super_stream_bindings(VirtualHost,
add_super_stream_binding(VirtualHost,
SuperStream,
Partition,
RoutingKey,
BindingKey,
Order,
Username) ->
{ok, ExchangeNameBin} =
Expand All @@ -806,7 +809,7 @@ add_super_stream_binding(VirtualHost,
Order),
case rabbit_binding:add(#binding{source = ExchangeName,
destination = QueueName,
key = RoutingKey,
key = BindingKey,
args = Arguments},
fun (_X, Q) when ?is_amqqueue(Q) ->
try
Expand Down
Loading

0 comments on commit 6d9e9b9

Please sign in to comment.