Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(lb): Add a callback for checking core node compatibility #176

Merged
merged 1 commit into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/mria_config.erl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2021-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -70,7 +70,9 @@
-type callback() :: start
| stop
| {start | stop, mria_rlog:shard()}
| core_node_discovery.
| core_node_discovery
| lb_custom_info
| lb_custom_info_check.

-type callback_function() :: fun(() -> term()) |
fun((term()) -> term()).
Expand Down
70 changes: 51 additions & 19 deletions src/mria_lb.erl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2021-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -60,6 +60,7 @@
, protocol_version := non_neg_integer()
, db_nodes => [node()]
, shard_badness => [{mria_rlog:shard(), float()}]
, custom_info => _
}.

-define(update, update).
Expand Down Expand Up @@ -192,29 +193,55 @@ find_clusters(NodeInfo) ->
-spec shard_badness(#{node() => node_info()}) -> #{mria_rlog:shard() => {node(), Badness}}
when Badness :: float().
shard_badness(NodeInfo) ->
MyProtoVersion = mria_rlog:get_protocol_version(),
maps:fold(
fun(Node, #{shard_badness := Shards, protocol_version := ProtoVsn}, Acc)
when ProtoVsn =:= MyProtoVersion ->
lists:foldl(
fun({Shard, Badness}, Acc1) ->
maps:update_with(Shard,
fun({_OldNode, OldBadness}) when OldBadness > Badness ->
{Node, Badness};
(Old) ->
Old
end,
{Node, Badness},
Acc1)
end,
Acc,
Shards);
(_Node, _NodeInfo, Acc) ->
Acc
fun(Node, LbInfo = #{shard_badness := Shards}, Acc) ->
case verify_node_compatibility(LbInfo) of
true ->
lists:foldl(
fun({Shard, Badness}, Acc1) ->
maps:update_with(Shard,
fun({_OldNode, OldBadness}) when OldBadness > Badness ->
{Node, Badness};
(Old) ->
Old
end,
{Node, Badness},
Acc1)
end,
Acc,
Shards);
false ->
Acc
end
end,
#{},
NodeInfo).

verify_node_compatibility(LbInfo = #{protocol_version := ProtoVsn}) ->
case mria_config:callback(lb_custom_info_check) of
{ok, CustomCheckFun} ->
ok;
undefined ->
CustomCheckFun = fun(_) -> true end
end,
CustomInfo = maps:get(custom_info, LbInfo, undefined),
MyProtoVersion = mria_rlog:get_protocol_version(),
%% Actual check:
IsCustomCompat = try
Result = CustomCheckFun(CustomInfo),
zmstone marked this conversation as resolved.
Show resolved Hide resolved
is_boolean(Result) orelse
error({non_boolean_result, Result}),
Result
catch
%% TODO: this can get spammy:
EC:Err:Stack ->
?tp(error, mria_failed_to_check_upstream_compatibility,
#{lb_info => LbInfo, EC => Err, stacktrace => Stack}),
false
end,
ProtoVsn =:= MyProtoVersion andalso
IsCustomCompat.

start_timer(LastUpdateTime) ->
%% Leave at least 100 ms between updates to leave some time to
%% process other events:
Expand Down Expand Up @@ -287,11 +314,16 @@ lb_callback() ->
{ok, Vsn} -> Vsn;
undefined -> undefined
end,
CustomInfo = case mria_config:callback(lb_custom_info) of
{ok, CB} -> CB();
thalesmg marked this conversation as resolved.
Show resolved Hide resolved
undefined -> undefined
end,
BasicInfo =
#{ running => IsRunning
, version => Version
, whoami => Whoami
, protocol_version => mria_rlog:get_protocol_version()
, custom_info => CustomInfo
},
MoreInfo =
case Whoami of
Expand Down
26 changes: 25 additions & 1 deletion test/mria_lb_SUITE.erl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2019-2021, 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2019-2021, 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,7 @@

-include_lib("eunit/include/eunit.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include("mria_rlog.hrl").

all() ->
mria_ct:all(?MODULE).
Expand Down Expand Up @@ -233,6 +234,29 @@ t_core_node_leave(_Config) ->
mria_ct:teardown_cluster(Cluster)
end, []).

t_custom_compat_check(_Config) ->
Env = [ {mria, {callback, lb_custom_info_check}, fun(Val) -> Val =:= chosen_one end}
| mria_mnesia_test_util:common_env()],
Cluster = mria_ct:cluster([ core
, core
, {core, [{mria, {callback, lb_custom_info},
fun() -> chosen_one end}]}
, replicant
], Env),
?check_trace(
#{timetrap => 15000},
try
[_C1, _C2, C3, R1] = mria_ct:start_cluster(mria, Cluster),
?assertEqual({ok, C3},
erpc:call( R1
, mria_status, get_core_node, [?mria_meta_shard, infinity]
, infinity
))
after
mria_ct:teardown_cluster(Cluster)
end,
[]).

clear_core_node_list(Replicant) ->
MaybeOldCallback = erpc:call(Replicant, mria_config, callback, [core_node_discovery]),
try
Expand Down