Skip to content

Commit

Permalink
Merge pull request rabbitmq#12553 from rabbitmq/amqpl-topic-permissions
Browse files Browse the repository at this point in the history
Check topic permissions of CC and BCC headers
  • Loading branch information
michaelklishin authored Oct 21, 2024
2 parents 97512b0 + cbe5551 commit c800f42
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 29 deletions.
2 changes: 1 addition & 1 deletion deps/rabbit/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -1559,7 +1559,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
outs = ["test/topic_permission_SUITE.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/rabbit_common:erlang_app"],
deps = ["//deps/amqp_client:erlang_app"],
)
erlang_bytecode(
name = "transactions_SUITE_beam_files",
Expand Down
11 changes: 8 additions & 3 deletions deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -912,8 +912,13 @@ check_write_permitted(Resource, User, Context) ->
check_read_permitted(Resource, User, Context) ->
check_resource_access(User, Resource, read, Context).

check_write_permitted_on_topic(Resource, User, RoutingKey, AuthzContext) ->
check_topic_authorisation(Resource, User, RoutingKey, AuthzContext, write).
check_write_permitted_on_topics(#exchange{type = topic} = Resource, User, Mc, AuthzContext) ->
lists:foreach(
fun(RoutingKey) ->
check_topic_authorisation(Resource, User, RoutingKey, AuthzContext, write)
end, mc:routing_keys(Mc));
check_write_permitted_on_topics(_, _, _, _) ->
ok.

check_read_permitted_on_topic(Resource, User, RoutingKey, AuthzContext) ->
check_topic_authorisation(Resource, User, RoutingKey, AuthzContext, read).
Expand Down Expand Up @@ -1182,7 +1187,6 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
check_write_permitted(ExchangeName, User, AuthzContext),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
check_internal_exchange(Exchange),
check_write_permitted_on_topic(Exchange, User, RoutingKey, AuthzContext),
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
DecodedContent = #content {properties = Props} =
Expand All @@ -1208,6 +1212,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
{error, Reason} ->
rabbit_misc:precondition_failed("invalid message: ~tp", [Reason]);
{ok, Message0} ->
check_write_permitted_on_topics(Exchange, User, Message0, AuthzContext),
Message = rabbit_message_interceptor:intercept(Message0),
check_user_id_header(Message, User),
QNames = rabbit_exchange:route(Exchange, Message, #{return_binding_keys => true}),
Expand Down
149 changes: 124 additions & 25 deletions deps/rabbit/test/topic_permission_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,52 +7,127 @@

-module(topic_permission_SUITE).

-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").

-compile(export_all).
-compile([export_all, nowarn_export_all]).

all() ->
[
{group, sequential_tests}
{group, sequential_tests}
].

groups() -> [
{sequential_tests, [], [
topic_permission_database_access,
topic_permission_checks
]}
groups() ->
[
{sequential_tests, [],
[
amqpl_cc_headers,
amqpl_bcc_headers,
topic_permission_database_access,
topic_permission_checks
]}
].

init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodename_suffix, ?MODULE}
]),
rabbit_ct_helpers:run_setup_steps(Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).
Config1 = rabbit_ct_helpers:set_config(
Config,
[{rmq_nodename_suffix, ?MODULE}]),
rabbit_ct_helpers:run_setup_steps(
Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).

end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).
rabbit_ct_helpers:run_teardown_steps(
Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).

init_per_group(_, Config) ->
Config.

init_per_group(_, Config) -> Config.
end_per_group(_, Config) -> Config.
end_per_group(_, Config) ->
Config.

init_per_testcase(Testcase, Config) ->
ok = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, clear_tables, []),
rabbit_ct_helpers:testcase_started(Config, Testcase).

clear_tables() ->
ok = rabbit_db_vhost:clear(),
ok = rabbit_db_user:clear().

end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, Testcase).

amqpl_cc_headers(Config) ->
amqpl_headers(<<"CC">>, Config).

amqpl_bcc_headers(Config) ->
amqpl_headers(<<"BCC">>, Config).

amqpl_headers(Header, Config) ->
QName1 = <<"q1">>,
QName2 = <<"q2">>,
Ch1 = rabbit_ct_client_helpers:open_channel(Config),

ok = set_topic_permissions(Config, "^a", ".*"),

#'queue.declare_ok'{} = amqp_channel:call(Ch1, #'queue.declare'{queue = QName1}),
#'queue.declare_ok'{} = amqp_channel:call(Ch1, #'queue.declare'{queue = QName2}),
#'queue.bind_ok'{} = amqp_channel:call(Ch1, #'queue.bind'{queue = QName1,
exchange = <<"amq.topic">>,
routing_key = <<"a.1">>}),
#'queue.bind_ok'{} = amqp_channel:call(Ch1, #'queue.bind'{queue = QName2,
exchange = <<"amq.topic">>,
routing_key = <<"a.2">>}),

amqp_channel:call(Ch1, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch1, self()),

%% We have permissions to send to both topics.
%% Therefore, m1 should be sent to both queues.
amqp_channel:call(
Ch1,
#'basic.publish'{exchange = <<"amq.topic">>,
routing_key = <<"a.1">>},
#amqp_msg{payload = <<"m1">>,
props = #'P_basic'{headers = [{Header, array, [{longstr, <<"a.2">>}]}]}}),
receive #'basic.ack'{} -> ok
after 5000 -> ct:fail({missing_confirm, ?LINE})
end,

monitor(process, Ch1),
amqp_channel:call(
Ch1,
#'basic.publish'{exchange = <<"amq.topic">>,
routing_key = <<"x.1">>},
#amqp_msg{payload = <<"m2">>,
props = #'P_basic'{headers = [{Header, array, [{longstr, <<"a.2">>}]}]}}),
ok = assert_channel_down(
Ch1,
<<"ACCESS_REFUSED - write access to topic 'x.1' in exchange "
"'amq.topic' in vhost '/' refused for user 'guest'">>),

Ch2 = rabbit_ct_client_helpers:open_channel(Config),
monitor(process, Ch2),
amqp_channel:call(
Ch2,
#'basic.publish'{exchange = <<"amq.topic">>,
routing_key = <<"a.1">>},
#amqp_msg{payload = <<"m3">>,
props = #'P_basic'{headers = [{Header, array, [{longstr, <<"x.2">>}]}]}}),
ok = assert_channel_down(
Ch2,
<<"ACCESS_REFUSED - write access to topic 'x.2' in exchange "
"'amq.topic' in vhost '/' refused for user 'guest'">>),

Ch3 = rabbit_ct_client_helpers:open_channel(Config),
?assertEqual(#'queue.delete_ok'{message_count = 1},
amqp_channel:call(Ch3, #'queue.delete'{queue = QName1})),
?assertEqual(#'queue.delete_ok'{message_count = 1},
amqp_channel:call(Ch3, #'queue.delete'{queue = QName2})),
ok = rabbit_ct_client_helpers:close_channel(Ch3),
ok = clear_topic_permissions(Config).

topic_permission_database_access(Config) ->
ok = rabbit_ct_broker_helpers:rpc(Config, ?MODULE, clear_tables, []),
ok = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, topic_permission_database_access1, [Config]).

Expand Down Expand Up @@ -134,6 +209,7 @@ topic_permission_database_access1(_Config) ->
ok.

topic_permission_checks(Config) ->
ok = rabbit_ct_broker_helpers:rpc(Config, ?MODULE, clear_tables, []),
ok = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, topic_permission_checks1, [Config]).

Expand Down Expand Up @@ -228,3 +304,26 @@ topic_permission_checks1(_Config) ->
) || Perm <- Permissions],

ok.

clear_tables() ->
ok = rabbit_db_vhost:clear(),
ok = rabbit_db_user:clear().

set_topic_permissions(Config, WritePat, ReadPat) ->
ok = rabbit_ct_broker_helpers:rpc(
Config, 0, rabbit_auth_backend_internal, set_topic_permissions,
[<<"guest">>, <<"/">>, <<"amq.topic">>, WritePat, ReadPat, <<"acting-user">>]).

clear_topic_permissions(Config) ->
ok = rabbit_ct_broker_helpers:rpc(
Config, 0, rabbit_auth_backend_internal, clear_topic_permissions,
[<<"guest">>, <<"/">>, <<"acting-user">>]).

assert_channel_down(Ch, Reason) ->
receive {'DOWN', _MonitorRef, process, Ch,
{shutdown,
{server_initiated_close, 403, Reason}}} ->
ok
after 5000 ->
ct:fail({did_not_receive, Reason})
end.

0 comments on commit c800f42

Please sign in to comment.