From 71af85d00a4f47808bc563d9f276b5b80f56d232 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Wed, 31 Jan 2024 17:06:18 +0100 Subject: [PATCH 01/13] Do not run check on nodedown in cets_disco When a node goes down, it would be kicked from other nodes in the cluster by global. This process in not instant. This process happens even if node is alive and could reconnect. Reconnecting too fast would interfere with prevent_overlapped_partitions logic though. We still allow for this node to reconnect, but only during the regular checks. The change temporary puts the node into unavalable_nodes list though --- src/cets_discovery.erl | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/cets_discovery.erl b/src/cets_discovery.erl index ced8ccb2..22fd0190 100644 --- a/src/cets_discovery.erl +++ b/src/cets_discovery.erl @@ -107,7 +107,7 @@ phase := initial | regular, results := [join_result()], nodes := ordsets:ordset(node()), - %% The nodes that returned pang, sorted + %% The nodes that returned pang or nodedown, sorted unavailable_nodes := ordsets:ordset(node()), tables := [atom()], backend_module := module(), @@ -308,8 +308,6 @@ handle_info({nodeup, Node}, State) -> {noreply, try_joining(State3)}; handle_info({nodedown, Node}, State) -> State2 = handle_nodedown(Node, State), - %% Do another check to update unavailable_nodes list - self() ! check, {noreply, State2}; handle_info({start_time, Node, StartTime}, State) -> {noreply, handle_receive_start_time(Node, StartTime, State)}; @@ -409,6 +407,9 @@ prune_unavailable_nodes_if_needed(State = #{nodes := Nodes, unavailable_nodes := %% Unavailable nodes is a subset of discovered nodes State#{unavailable_nodes := ordsets:intersection(Nodes, UnNodes)}. +%% We should not ping nodes that just got disconnected. +%% Let the disconnected node to connect if it restarts on its own. +%% Or reconnect to it after a timeout. -spec ping_not_connected_nodes([node()]) -> ok. ping_not_connected_nodes(Nodes) -> Self = self(), @@ -482,8 +483,9 @@ flush_all_checks() -> -spec do_join(atom(), node()) -> join_result(). do_join(Tab, Node) -> + %% Possible race condition: Node got disconnected LocalPid = whereis(Tab), - %% That would trigger autoconnect for the first time + %% That could trigger autoconnect if Node is not connected case rpc:call(Node, erlang, whereis, [Tab]) of Pid when is_pid(Pid), is_pid(LocalPid) -> Result = cets_join:join(cets_discovery, #{table => Tab}, LocalPid, Pid), @@ -563,7 +565,7 @@ handle_system_info(State) -> State#{verify_ready => verify_ready(State)}. -spec handle_nodedown(node(), state()) -> state(). -handle_nodedown(Node, State) -> +handle_nodedown(Node, State = #{unavailable_nodes := UnNodes}) -> State2 = remember_nodedown_timestamp(Node, State), {NodeUpTime, State3} = remove_nodeup_timestamp(Node, State2), ?LOG_WARNING( @@ -574,7 +576,8 @@ handle_nodedown(Node, State) -> time_since_startup_in_milliseconds => time_since_startup_in_milliseconds(State) }) ), - State3. + State4 = State3#{unavailable_nodes := ordsets:add_element(Node, UnNodes)}, + trigger_verify_ready(State4). -spec handle_nodeup(node(), state()) -> state(). handle_nodeup(Node, State) -> From c522d63a97ce77c6d8ea12b384cad4909863e468 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Wed, 31 Jan 2024 18:47:11 +0100 Subject: [PATCH 02/13] Do check manually in disco_node_down_timestamp_is_remembered testcase --- test/cets_SUITE.erl | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/test/cets_SUITE.erl b/test/cets_SUITE.erl index 6db1c3f9..8a92f1af 100644 --- a/test/cets_SUITE.erl +++ b/test/cets_SUITE.erl @@ -2654,7 +2654,10 @@ disco_node_down_timestamp_is_remembered(Config) -> disco_logs_nodeup_after_downtime(Config) -> logger_debug_h:start(#{id => ?FUNCTION_NAME}), #{disco := Disco, node2 := Node2} = setup_two_nodes_and_discovery(Config, [wait, netsplit]), - %% At this point cets_disco should reconnect nodes back automatically. + %% At this point cets_disco should reconnect nodes back automatically + %% after retry_type_to_timeout(after_nodedown) time. + %% We want to speed this up for tests though. + Disco ! check, %% Receive a nodeup after the disconnect. %% This nodeup should contain the downtime_millisecond_duration field %% (initial nodeup should not contain this field). From 7cb73163ca32af53857c5223739cea506229e998 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Wed, 31 Jan 2024 18:48:50 +0100 Subject: [PATCH 03/13] Add new retry_type after_nodedown This allows us to try to reconnect to the remote node after some period of time after the netsplit. Enters regular phase once we contact the remote node. --- src/cets_discovery.erl | 37 ++++++++++++++++++++++++++++++++----- 1 file changed, 32 insertions(+), 5 deletions(-) diff --git a/src/cets_discovery.erl b/src/cets_discovery.erl index 22fd0190..5970eaa8 100644 --- a/src/cets_discovery.erl +++ b/src/cets_discovery.erl @@ -88,7 +88,7 @@ -type get_nodes_result() :: {ok, [node()]} | {error, term()}. %% Result of `get_nodes/2' call. --type retry_type() :: initial | after_error | regular. +-type retry_type() :: initial | after_error | regular | after_nodedown. %% Retry logic type. -type from() :: {pid(), reference()}. @@ -455,8 +455,27 @@ choose_retry_type(#{last_get_nodes_result := {error, _}}) -> after_error; choose_retry_type(#{phase := initial}) -> initial; -choose_retry_type(_) -> - regular. +choose_retry_type(State) -> + case last_node_down(State) of + false -> + regular; + Node -> + %% Allow to reconnect after a netsplit but not too quick. + GracePeriod = retry_type_to_timeout(after_nodedown), + case get_downtime(Node, State) < GracePeriod of + true -> + after_nodedown; + false -> + regular + end + end. + +-spec last_node_down(state()) -> false | node(). +last_node_down(#{nodedown_timestamps := Map}) when map_size(Map) =:= 0 -> + false; +last_node_down(#{nodedown_timestamps := Map}) -> + {Node, _TS} = lists:last(lists:keysort(2, maps:to_list(Map))), + Node. %% Returns timeout in milliseconds to retry calling the get_nodes function. %% get_nodes is called after add_table without waiting. @@ -464,7 +483,8 @@ choose_retry_type(_) -> -spec retry_type_to_timeout(retry_type()) -> non_neg_integer(). retry_type_to_timeout(initial) -> timer:seconds(5); retry_type_to_timeout(after_error) -> timer:seconds(1); -retry_type_to_timeout(regular) -> timer:minutes(5). +retry_type_to_timeout(regular) -> timer:minutes(5); +retry_type_to_timeout(after_nodedown) -> timer:seconds(30). -spec cancel_old_timer(state()) -> ok. cancel_old_timer(#{timer_ref := OldRef}) when is_reference(OldRef) -> @@ -620,6 +640,7 @@ calculate_uptime(undefined) -> calculate_uptime(StartTime) -> time_since(StartTime). +-spec get_downtime(node(), state()) -> milliseconds(). get_downtime(Node, #{nodedown_timestamps := Map}) -> case maps:get(Node, Map, undefined) of undefined -> @@ -662,4 +683,10 @@ handle_receive_start_time(Node, StartTime, State = #{node_start_timestamps := Ma %% Restarted node reconnected, this is fine during the rolling updates ok end, - State#{node_start_timestamps := maps:put(Node, StartTime, Map)}. + %% We are in the regular phase, + %% once we get contact with another disco server. + %% It affects the check intervals. + State#{ + node_start_timestamps := maps:put(Node, StartTime, Map), + phase := regular + }. From b0f16dec1452cf8ce0995e1556348634914cb203 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Wed, 31 Jan 2024 22:24:24 +0100 Subject: [PATCH 04/13] Print status when wait_for_ready fails in cets_SUITE --- test/cets_SUITE.erl | 50 ++++++++++++++++++++++++--------------------- 1 file changed, 27 insertions(+), 23 deletions(-) diff --git a/test/cets_SUITE.erl b/test/cets_SUITE.erl index 8a92f1af..858aad34 100644 --- a/test/cets_SUITE.erl +++ b/test/cets_SUITE.erl @@ -1450,7 +1450,7 @@ test_multinode_auto_discovery(Config) -> ok = file:write_file(FileName, io_lib:format("~s~n~s~n", [Node1, Node2])), {ok, Disco} = cets_discovery:start(#{tables => [Tab], disco_file => FileName}), %% Disco is async, so we have to wait for the final state - ok = cets_discovery:wait_for_ready(Disco, 5000), + ok = wait_for_ready(Disco, 5000), [Node2] = other_nodes(Node1, Tab), [#{memory := _, nodes := [Node1, Node2], size := 0, table := Tab}] = cets_discovery:info(Disco), @@ -1471,7 +1471,7 @@ test_disco_add_table(Config) -> {ok, Disco} = cets_discovery:start(#{tables => [], disco_file => FileName}), cets_discovery:add_table(Disco, Tab), %% Disco is async, so we have to wait for the final state - ok = cets_discovery:wait_for_ready(Disco, 5000), + ok = wait_for_ready(Disco, 5000), [Node2] = other_nodes(Node1, Tab), [#{memory := _, nodes := [Node1, Node2], size := 0, table := Tab}] = cets_discovery:info(Disco), @@ -1521,7 +1521,7 @@ test_disco_file_appears(Config) -> ), ok = file:write_file(FileName, io_lib:format("~s~n~s~n", [Node1, Node2])), %% Disco is async, so we have to wait for the final state - ok = cets_discovery:wait_for_ready(Disco, 5000), + ok = wait_for_ready(Disco, 5000), [Node2] = other_nodes(Node1, Tab), [#{memory := _, nodes := [Node1, Node2], size := 0, table := Tab}] = cets_discovery:info(Disco), @@ -1540,7 +1540,7 @@ test_disco_handles_bad_node(Config) -> {ok, Disco} = cets_discovery:start(#{tables => [], disco_file => FileName}), cets_discovery:add_table(Disco, Tab), %% Check that wait_for_ready would not block forever: - ok = cets_discovery:wait_for_ready(Disco, 5000), + ok = wait_for_ready(Disco, 5000), %% Check if the node sent pang: #{unavailable_nodes := ['badnode@localhost']} = cets_discovery:system_info(Disco), %% Check that other nodes are discovered fine @@ -1556,7 +1556,7 @@ cets_discovery_fun_backend_works(Config) -> F = fun(State) -> {{ok, [Node1, Node2]}, State} end, {ok, Disco} = cets_discovery:start(#{backend_module => cets_discovery_fun, get_nodes_fn => F}), cets_discovery:add_table(Disco, Tab), - ok = cets_discovery:wait_for_ready(Disco, 5000), + ok = wait_for_ready(Disco, 5000), [#{memory := _, nodes := [Node1, Node2], size := 0, table := Tab}] = cets_discovery:info(Disco). @@ -1676,7 +1676,7 @@ disco_handles_node_up_and_down(Config) -> Disco ! {nodeup, BadNode}, Disco ! {nodedown, BadNode}, %% Check that wait_for_ready still works - ok = cets_discovery:wait_for_ready(Disco, 5000). + ok = wait_for_ready(Disco, 5000). status_available_nodes(Config) -> Node1 = node(), @@ -1713,7 +1713,7 @@ status_unavailable_nodes(Config) -> Tab = make_name(Config), {ok, _} = start(Node1, Tab), cets_discovery:add_table(Disco, Tab), - ok = cets_discovery:wait_for_ready(DiscoName, 5000), + ok = wait_for_ready(DiscoName, 5000), ?assertMatch(#{unavailable_nodes := ['badnode@localhost']}, cets_status:status(DiscoName)). status_unavailable_nodes_is_subset_of_discovery_nodes(Config) -> @@ -1733,7 +1733,7 @@ status_unavailable_nodes_is_subset_of_discovery_nodes(Config) -> Tab = make_name(Config), {ok, _} = start(Node1, Tab), cets_discovery:add_table(Disco, Tab), - ok = cets_discovery:wait_for_ready(DiscoName, 5000), + ok = wait_for_ready(DiscoName, 5000), ?assertMatch(#{unavailable_nodes := ['badnode@localhost']}, cets_status:status(DiscoName)), %% Remove badnode from disco meck:expect(BackendModule, get_nodes, GetFn2), @@ -1761,7 +1761,7 @@ status_joined_nodes(Config) -> %% Add table using pids (i.e. no need to do RPCs here) cets_discovery:add_table(Disco1, Tab), cets_discovery:add_table(Disco2, Tab), - ok = cets_discovery:wait_for_ready(DiscoName, 5000), + ok = wait_for_ready(DiscoName, 5000), cets_test_wait:wait_until(fun() -> maps:get(joined_nodes, cets_status:status(DiscoName)) end, [ Node1, Node2 ]). @@ -1784,7 +1784,7 @@ status_discovery_works(Config) -> %% Add table using pids (i.e. no need to do RPCs here) cets_discovery:add_table(Disco1, Tab), cets_discovery:add_table(Disco2, Tab), - ok = cets_discovery:wait_for_ready(DiscoName, 5000), + ok = wait_for_ready(DiscoName, 5000), ?assertMatch(#{discovery_works := true}, cets_status:status(DiscoName)). status_discovered_nodes(Config) -> @@ -1802,7 +1802,7 @@ status_discovered_nodes(Config) -> {ok, _} = start(Node2, Tab), %% Add table using pids (i.e. no need to do RPCs here) cets_discovery:add_table(Disco, Tab), - ok = cets_discovery:wait_for_ready(DiscoName, 5000), + ok = wait_for_ready(DiscoName, 5000), ?assertMatch(#{discovered_nodes := [Node1, Node2]}, cets_status:status(DiscoName)). status_remote_nodes_without_disco(Config) -> @@ -1818,7 +1818,7 @@ status_remote_nodes_without_disco(Config) -> Tab = make_name(Config), {ok, _} = start(Node1, Tab), cets_discovery:add_table(Disco, Tab), - ok = cets_discovery:wait_for_ready(DiscoName, 5000), + ok = wait_for_ready(DiscoName, 5000), ?assertMatch(#{remote_nodes_without_disco := [Node2]}, cets_status:status(DiscoName)). status_remote_nodes_with_unknown_tables(Config) -> @@ -1843,7 +1843,7 @@ status_remote_nodes_with_unknown_tables(Config) -> cets_discovery:add_table(Disco1, Tab1), cets_discovery:add_table(Disco2, Tab1), cets_discovery:add_table(Disco2, Tab2), - ok = cets_discovery:wait_for_ready(DiscoName, 5000), + ok = wait_for_ready(DiscoName, 5000), cets_test_wait:wait_until( fun() -> maps:get(remote_nodes_with_unknown_tables, cets_status:status(DiscoName)) end, [ Node2 @@ -1876,7 +1876,7 @@ status_remote_nodes_with_missing_nodes(Config) -> cets_discovery:add_table(Disco1, Tab1), cets_discovery:add_table(Disco1, Tab2), cets_discovery:add_table(Disco2, Tab1), - ok = cets_discovery:wait_for_ready(DiscoName, 5000), + ok = wait_for_ready(DiscoName, 5000), cets_test_wait:wait_until( fun() -> maps:get(remote_nodes_with_missing_tables, cets_status:status(DiscoName)) end, [ Node2 @@ -1910,7 +1910,8 @@ status_conflict_nodes(Config) -> cets_discovery:add_table(Disco1, Tab2), cets_discovery:add_table(Disco2, Tab1), cets_discovery:add_table(Disco2, Tab2), - ok = cets_discovery:wait_for_ready(DiscoName, 5000), + + ok = wait_for_ready(DiscoName, 5000), set_other_servers(Pid22, []), cets_test_wait:wait_until( fun() -> maps:get(conflict_nodes, cets_status:status(DiscoName)) end, [Node2] @@ -2444,7 +2445,7 @@ disco_connects_to_unconnected_node(Config) -> end, {ok, Disco} = cets_discovery:start(#{backend_module => cets_discovery_fun, get_nodes_fn => F}), cets_discovery:add_table(Disco, Tab), - ok = cets_discovery:wait_for_ready(Disco, 5000). + ok = wait_for_ready(Disco, 5000). %% Joins from a bad (not fully connected) node %% Join process should check if nodes could contact each other before allowing to join @@ -3070,13 +3071,7 @@ setup_two_nodes_and_discovery(Config, Flags) -> cets_discovery:add_table(Disco, Tab), case lists:member(wait, Flags) of true -> - try - ok = cets_discovery:wait_for_ready(Disco, 5000) - catch - Class:Reason:Stacktrace -> - ct:pal("system_info: ~p", [cets_discovery:system_info(Disco)]), - erlang:raise(Class, Reason, Stacktrace) - end; + wait_for_ready(Disco, 5000); false -> ok end, @@ -3274,3 +3269,12 @@ make_process() -> stop -> stop end end). + +wait_for_ready(Disco, Timeout) -> + try + ok = cets_discovery:wait_for_ready(Disco, Timeout) + catch + Class:Reason:Stacktrace -> + ct:pal("system_info: ~p", [cets_discovery:system_info(Disco)]), + erlang:raise(Class, Reason, Stacktrace) + end. From f704fda4640b8ebcbe1fccee7f9b5a89982f4574 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Wed, 31 Jan 2024 23:31:29 +0100 Subject: [PATCH 05/13] Add spec for cets_discovery:get_time() --- src/cets_discovery.erl | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/src/cets_discovery.erl b/src/cets_discovery.erl index 5970eaa8..0f9ac59d 100644 --- a/src/cets_discovery.erl +++ b/src/cets_discovery.erl @@ -216,7 +216,7 @@ wait_for_get_nodes(Server, Timeout) -> %% @private -spec init(term()) -> {ok, state()}. init(Opts) -> - StartTime = erlang:system_time(millisecond), + StartTime = get_time(), %% Sends nodeup / nodedown ok = net_kernel:monitor_nodes(true), Mod = maps:get(backend_module, Opts, cets_discovery_file), @@ -618,13 +618,13 @@ handle_nodeup(Node, State) -> -spec remember_nodeup_timestamp(node(), state()) -> state(). remember_nodeup_timestamp(Node, State = #{nodeup_timestamps := Map}) -> - Time = erlang:system_time(millisecond), + Time = get_time(), Map2 = Map#{Node => Time}, State#{nodeup_timestamps := Map2}. -spec remember_nodedown_timestamp(node(), state()) -> state(). remember_nodedown_timestamp(Node, State = #{nodedown_timestamps := Map}) -> - Time = erlang:system_time(millisecond), + Time = get_time(), Map2 = Map#{Node => Time}, State#{nodedown_timestamps := Map2}. @@ -640,7 +640,7 @@ calculate_uptime(undefined) -> calculate_uptime(StartTime) -> time_since(StartTime). --spec get_downtime(node(), state()) -> milliseconds(). +-spec get_downtime(node(), state()) -> milliseconds() | undefined. get_downtime(Node, #{nodedown_timestamps := Map}) -> case maps:get(Node, Map, undefined) of undefined -> @@ -657,8 +657,15 @@ set_defined(Key, Value, Map) -> time_since_startup_in_milliseconds(#{start_time := StartTime}) -> time_since(StartTime). +-spec time_since(integer()) -> integer(). time_since(StartTime) -> - erlang:system_time(millisecond) - StartTime. + %% Dialyzer thinks integer() - integer() could be float. + %% Do round to avoid the warning. + round(get_time() - StartTime). + +-spec get_time() -> milliseconds(). +get_time() -> + erlang:system_time(millisecond). send_start_time_to(Node, #{start_time := StartTime}) -> case erlang:process_info(self(), registered_name) of From 87246cbd46b8b42bb4acb87bbea5a372c28cb037 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Thu, 1 Feb 2024 01:17:55 +0100 Subject: [PATCH 06/13] Avoid any retries in global:trans in cets_join --- src/cets_join.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cets_join.erl b/src/cets_join.erl index 36f1f0b0..d8570074 100644 --- a/src/cets_join.erl +++ b/src/cets_join.erl @@ -100,7 +100,7 @@ join_loop(LockKey, Info, LocalPid, RemotePid, Start, JoinOpts) -> LockRequest = {LockKey, self()}, %% Just lock all nodes, no magic here :) Nodes = [node() | nodes()], - Retries = 1, + Retries = 0, %% global could abort the transaction when one of the nodes goes down. %% It could usually abort it during startup or update. case global:trans(LockRequest, F, Nodes, Retries) of From 469f177b2f71b1ef3ac93de05a7d5d64f4b17ae1 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Thu, 1 Feb 2024 19:12:12 +0100 Subject: [PATCH 07/13] Pass remote_node and is_leader into handle_down --- src/cets.erl | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/src/cets.erl b/src/cets.erl index 9cd486d4..4e7e23e2 100644 --- a/src/cets.erl +++ b/src/cets.erl @@ -200,7 +200,16 @@ }. %% Status information returned `info/1'. --type handle_down_fun() :: fun((#{remote_pid := server_pid(), table := table_name()}) -> ok). +-type handle_down_fun() :: fun( + ( + #{ + remote_pid := server_pid(), + remote_pid := node(), + table := table_name(), + is_leader := boolean() + } + ) -> ok +). %% Handler function which is called when the remote node goes down. -type handle_conflict_fun() :: fun((tuple(), tuple()) -> tuple()). @@ -896,10 +905,17 @@ handle_get_info( %% Cleanup -spec call_user_handle_down(server_pid(), state()) -> ok. -call_user_handle_down(RemotePid, #{tab := Tab, opts := Opts}) -> +call_user_handle_down(RemotePid, #{tab := Tab, opts := Opts, is_leader := IsLeader}) -> case Opts of #{handle_down := F} -> - FF = fun() -> F(#{remote_pid => RemotePid, table => Tab}) end, + FF = fun() -> + F(#{ + remote_node => node(RemotePid), + remote_pid => RemotePid, + table => Tab, + is_leader => IsLeader + }) + end, Info = #{ task => call_user_handle_down, table => Tab, From 85e91ab1f9ad9391f9d888350e753d6f23ac74a8 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Thu, 1 Feb 2024 19:23:03 +0100 Subject: [PATCH 08/13] Wait for get_fn2_called in status_unavailable_nodes_is_subset_of_discovery_nodes --- test/cets_SUITE.erl | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/test/cets_SUITE.erl b/test/cets_SUITE.erl index 858aad34..cd20eb6f 100644 --- a/test/cets_SUITE.erl +++ b/test/cets_SUITE.erl @@ -1718,8 +1718,12 @@ status_unavailable_nodes(Config) -> status_unavailable_nodes_is_subset_of_discovery_nodes(Config) -> Node1 = node(), + Self = self(), GetFn1 = fun(State) -> {{ok, [Node1, 'badnode@localhost']}, State} end, - GetFn2 = fun(State) -> {{ok, [Node1]}, State} end, + GetFn2 = fun(State) -> + Self ! get_fn2_called, + {{ok, [Node1]}, State} + end, %% Setup meck BackendModule = make_name(Config, disco_backend), meck:new(BackendModule, [non_strict]), @@ -1739,6 +1743,7 @@ status_unavailable_nodes_is_subset_of_discovery_nodes(Config) -> meck:expect(BackendModule, get_nodes, GetFn2), %% Force check. Disco ! check, + receive_message(get_fn2_called), %% The unavailable_nodes list is updated CondF = fun() -> maps:get(unavailable_nodes, cets_status:status(DiscoName)) end, cets_test_wait:wait_until(CondF, []). From 43d8ba78eaaf736990cd4545c035eb6d938359c6 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Thu, 1 Feb 2024 19:38:15 +0100 Subject: [PATCH 09/13] Pass correct is_leader to handle_down --- src/cets.erl | 7 +++++-- test/cets_SUITE.erl | 13 +++++++++++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/src/cets.erl b/src/cets.erl index 4e7e23e2..7977bdee 100644 --- a/src/cets.erl +++ b/src/cets.erl @@ -656,9 +656,12 @@ handle_down2(RemotePid, Reason, State = #{other_servers := Servers, ack_pid := A case lists:member(RemotePid, Servers) of true -> cets_ack:send_remote_down(AckPid, RemotePid), - call_user_handle_down(RemotePid, State), Servers2 = lists:delete(RemotePid, Servers), - update_node_down_history(RemotePid, Reason, set_other_servers(Servers2, State)); + State3 = update_node_down_history( + RemotePid, Reason, set_other_servers(Servers2, State) + ), + call_user_handle_down(RemotePid, State3), + State3; false -> %% This should not happen ?LOG_ERROR(#{ diff --git a/test/cets_SUITE.erl b/test/cets_SUITE.erl index cd20eb6f..d05c3534 100644 --- a/test/cets_SUITE.erl +++ b/test/cets_SUITE.erl @@ -119,6 +119,7 @@ cases() -> get_nodes_request, test_locally, handle_down_is_called, + handle_down_gets_correct_leader_arg_when_leader_goes_down, events_are_applied_in_the_correct_order_after_unpause, pause_multiple_times, unpause_twice, @@ -2029,6 +2030,18 @@ handle_down_is_called(Config) -> after 5000 -> ct:fail(timeout) end. +handle_down_gets_correct_leader_arg_when_leader_goes_down(Config) -> + Parent = self(), + DownFn = fun(#{is_leader := IsLeader}) -> + Parent ! {is_leader_arg, IsLeader} + end, + {ok, Pid1} = start_local(make_name(Config, 1), #{handle_down => DownFn}), + {ok, Pid2} = start_local(make_name(Config, 2), #{handle_down => DownFn}), + ok = cets_join:join(lock_name(Config), #{table => [d1, d2]}, Pid1, Pid2), + Leader = cets:get_leader(Pid1), + exit(Leader, oops), + ?assertEqual(true, receive_message_with_arg(is_leader_arg)). + events_are_applied_in_the_correct_order_after_unpause(Config) -> T = make_name(Config), {ok, Pid} = start_local(T), From cec51bf0e82466277898f115c7580657b8f965b5 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Mon, 12 Feb 2024 10:57:01 +0100 Subject: [PATCH 10/13] Fix spec for handle_down_fun() --- src/cets.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cets.erl b/src/cets.erl index 7977bdee..ad5c7eae 100644 --- a/src/cets.erl +++ b/src/cets.erl @@ -204,7 +204,7 @@ ( #{ remote_pid := server_pid(), - remote_pid := node(), + remote_node := node(), table := table_name(), is_leader := boolean() } @@ -913,8 +913,8 @@ call_user_handle_down(RemotePid, #{tab := Tab, opts := Opts, is_leader := IsLead #{handle_down := F} -> FF = fun() -> F(#{ - remote_node => node(RemotePid), remote_pid => RemotePid, + remote_node => node(RemotePid), table => Tab, is_leader => IsLeader }) From eddf7565a81d0de3c8dea29304ee3bf949d1a0cc Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Wed, 28 Feb 2024 10:17:08 +0100 Subject: [PATCH 11/13] Test retry_type result --- src/cets_discovery.erl | 2 +- test/cets_SUITE.erl | 31 ++++++++++++++++++++++++++++++- 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/src/cets_discovery.erl b/src/cets_discovery.erl index 0f9ac59d..e2910e7a 100644 --- a/src/cets_discovery.erl +++ b/src/cets_discovery.erl @@ -582,7 +582,7 @@ has_join_result_for(Node, Table, #{results := Results}) -> -spec handle_system_info(state()) -> system_info(). handle_system_info(State) -> - State#{verify_ready => verify_ready(State)}. + State#{verify_ready => verify_ready(State), retry_type => choose_retry_type(State)}. -spec handle_nodedown(node(), state()) -> state(). handle_nodedown(Node, State = #{unavailable_nodes := UnNodes}) -> diff --git a/test/cets_SUITE.erl b/test/cets_SUITE.erl index d05c3534..b5278aca 100644 --- a/test/cets_SUITE.erl +++ b/test/cets_SUITE.erl @@ -101,6 +101,8 @@ cases() -> test_disco_add_two_tables, disco_retried_if_get_nodes_fail, disco_uses_regular_retry_interval_in_the_regular_phase, + disco_uses_regular_retry_interval_in_the_regular_phase_after_node_down, + disco_uses_regular_retry_interval_in_the_regular_phase_after_expired_node_down, disco_handles_node_up_and_down, status_available_nodes, status_available_nodes_do_not_contain_nodes_with_stopped_disco, @@ -1643,6 +1645,27 @@ disco_retried_if_get_nodes_fail(Config) -> ok. disco_uses_regular_retry_interval_in_the_regular_phase(Config) -> + #{disco := Disco} = generic_disco_uses_regular_retry_interval_in_the_regular_phase(Config), + #{phase := regular, retry_type := regular} = cets_discovery:system_info(Disco). + +%% Similar to disco_uses_regular_retry_interval_in_the_regular_phase, but has nodedown +disco_uses_regular_retry_interval_in_the_regular_phase_after_node_down(Config) -> + SysInfo = generic_disco_uses_regular_retry_interval_in_the_regular_phase(Config), + #{disco := Disco, node2 := Node2} = SysInfo, + Disco ! {nodedown, Node2}, + #{phase := regular, retry_type := after_nodedown} = cets_discovery:system_info(Disco). + +%% Similar to disco_uses_regular_retry_interval_in_the_regular_phase_after_node_down, but we simulate long downtime +disco_uses_regular_retry_interval_in_the_regular_phase_after_expired_node_down(Config) -> + #{disco := Disco, node2 := Node2} = generic_disco_uses_regular_retry_interval_in_the_regular_phase( + Config + ), + Disco ! {nodedown, Node2}, + TestTimestamp = erlang:system_time(millisecond) - timer:seconds(1000), + set_nodedown_timestamp(Disco, Node2, TestTimestamp), + #{phase := regular, retry_type := regular} = cets_discovery:system_info(Disco). + +generic_disco_uses_regular_retry_interval_in_the_regular_phase(Config) -> Node1 = node(), #{ct2 := Node2} = proplists:get_value(nodes, Config), Tab = make_name(Config), @@ -1655,7 +1678,7 @@ disco_uses_regular_retry_interval_in_the_regular_phase(Config) -> cets_test_wait:wait_until( fun() -> maps:get(last_get_nodes_retry_type, cets_discovery:system_info(Disco)) end, regular ), - #{phase := regular} = cets_discovery:system_info(Disco). + #{disco => Disco, node2 => Node2}. disco_handles_node_up_and_down(Config) -> BadNode = 'badnode@localhost', @@ -3296,3 +3319,9 @@ wait_for_ready(Disco, Timeout) -> ct:pal("system_info: ~p", [cets_discovery:system_info(Disco)]), erlang:raise(Class, Reason, Stacktrace) end. + +%% Overwrites nodedown timestamp for the Node in the discovery server state +set_nodedown_timestamp(Disco, Node, NewTimestamp) -> + sys:replace_state(Disco, fun(#{nodedown_timestamps := Map} = State) -> + State#{nodedown_timestamps := maps:put(Node, NewTimestamp, Map)} + end). From 30f0452617450b563d2dc7ded3707c11d6300103 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Thu, 29 Feb 2024 11:19:44 +0100 Subject: [PATCH 12/13] Make stopping CETS gen_servers more reliable Call peer to stop the processes Use proc_lib to spawn processes for better error reporting in tests --- src/cets_discovery.erl | 6 +-- test/cets_SUITE.erl | 89 +++++++++++++++++++++++++----------------- 2 files changed, 55 insertions(+), 40 deletions(-) diff --git a/src/cets_discovery.erl b/src/cets_discovery.erl index e2910e7a..5305264e 100644 --- a/src/cets_discovery.erl +++ b/src/cets_discovery.erl @@ -658,10 +658,8 @@ time_since_startup_in_milliseconds(#{start_time := StartTime}) -> time_since(StartTime). -spec time_since(integer()) -> integer(). -time_since(StartTime) -> - %% Dialyzer thinks integer() - integer() could be float. - %% Do round to avoid the warning. - round(get_time() - StartTime). +time_since(StartTime) when is_integer(StartTime) -> + get_time() - StartTime. -spec get_time() -> milliseconds(). get_time() -> diff --git a/test/cets_SUITE.erl b/test/cets_SUITE.erl index b5278aca..96ebe447 100644 --- a/test/cets_SUITE.erl +++ b/test/cets_SUITE.erl @@ -328,7 +328,7 @@ insert_new_works_when_leader_is_back(Config) -> Leader = cets:get_leader(Pid1), NotLeader = not_leader(Pid1, Pid2, Leader), cets:set_leader(Leader, false), - spawn(fun() -> + proc_lib:spawn(fun() -> timer:sleep(100), cets:set_leader(Leader, true) end), @@ -343,7 +343,7 @@ insert_new_when_new_leader_has_joined(Config) -> %% Pause insert into the first segment Leader = cets:get_leader(Pid1), PauseMon = cets:pause(Leader), - spawn(fun() -> + proc_lib:spawn(fun() -> timer:sleep(100), ok = cets_join:join(lock_name(Config), #{}, Pid1, Pid3), cets:unpause(Leader, PauseMon) @@ -365,7 +365,7 @@ insert_new_when_new_leader_has_joined_duplicate(Config) -> %% Pause insert into the first segment Leader = cets:get_leader(Pid1), PauseMon = cets:pause(Leader), - spawn(fun() -> + proc_lib:spawn(fun() -> timer:sleep(100), ok = cets_join:join(insert_new_lock5, #{}, Pid1, Pid3), cets:unpause(Leader, PauseMon) @@ -411,7 +411,7 @@ insert_new_is_retried_when_leader_is_reelected(Config) -> NotLeader = not_leader(Pid1, Pid2, Leader), %% Ask process to reject all the leader operations cets:set_leader(Leader, false), - spawn_link(fun() -> + proc_lib:spawn_link(fun() -> wait_till_test_stage(Leader, detected), %% Fix the leader, so it can process our insert_new call cets:set_leader(Leader, true) @@ -440,7 +440,7 @@ insert_new_is_retried_when_leader_is_reelected(Config) -> insert_new_fails_if_the_leader_dies(Config) -> #{pid1 := Pid1, pid2 := Pid2} = given_two_joined_tables(Config), cets:pause(Pid2), - spawn(fun() -> + proc_lib:spawn(fun() -> timer:sleep(100), exit(Pid2, kill) end), @@ -489,14 +489,14 @@ insert_overwrites_data_inconsistently(Config) -> Me = self(), #{pid1 := Pid1, pid2 := Pid2, tab1 := Tab1, tab2 := Tab2} = given_two_joined_tables(Config), - spawn_link(fun() -> + proc_lib:spawn_link(fun() -> sys:replace_state(Pid1, fun(State) -> Me ! replacing_state1, receive_message(continue_test), State end) end), - spawn_link(fun() -> + proc_lib:spawn_link(fun() -> sys:replace_state(Pid2, fun(State) -> Me ! replacing_state2, receive_message(continue_test), @@ -506,11 +506,11 @@ insert_overwrites_data_inconsistently(Config) -> receive_message(replacing_state1), receive_message(replacing_state2), %% Insert at the same time - spawn_link(fun() -> + proc_lib:spawn_link(fun() -> ok = cets:insert(Tab1, {a, 1}), Me ! inserted1 end), - spawn_link(fun() -> + proc_lib:spawn_link(fun() -> ok = cets:insert(Tab2, {a, 2}), Me ! inserted2 end), @@ -529,14 +529,14 @@ insert_new_does_not_overwrite_data(Config) -> Me = self(), #{pid1 := Pid1, pid2 := Pid2, tab1 := Tab1, tab2 := Tab2} = given_two_joined_tables(Config), Leader = cets:get_leader(Pid1), - spawn_link(fun() -> + proc_lib:spawn_link(fun() -> sys:replace_state(Pid1, fun(State) -> Me ! replacing_state1, receive_message(continue_test), State end) end), - spawn_link(fun() -> + proc_lib:spawn_link(fun() -> sys:replace_state(Pid2, fun(State) -> Me ! replacing_state2, receive_message(continue_test), @@ -546,12 +546,12 @@ insert_new_does_not_overwrite_data(Config) -> receive_message(replacing_state1), receive_message(replacing_state2), %% Insert at the same time - spawn_link(fun() -> + proc_lib:spawn_link(fun() -> true = cets:insert_new(Tab1, {a, 1}), Me ! inserted1 end), wait_till_message_queue_length(Leader, 1), - spawn_link(fun() -> + proc_lib:spawn_link(fun() -> false = cets:insert_new(Tab2, {a, 2}), Me ! inserted2 end), @@ -572,14 +572,14 @@ insert_serial_overwrites_data_consistently(Config) -> Me = self(), #{pid1 := Pid1, pid2 := Pid2, tab1 := Tab1, tab2 := Tab2} = given_two_joined_tables(Config), Leader = cets:get_leader(Pid1), - spawn_link(fun() -> + proc_lib:spawn_link(fun() -> sys:replace_state(Pid1, fun(State) -> Me ! replacing_state1, receive_message(continue_test), State end) end), - spawn_link(fun() -> + proc_lib:spawn_link(fun() -> sys:replace_state(Pid2, fun(State) -> Me ! replacing_state2, receive_message(continue_test), @@ -589,7 +589,7 @@ insert_serial_overwrites_data_consistently(Config) -> receive_message(replacing_state1), receive_message(replacing_state2), %% Insert at the same time - spawn_link(fun() -> + proc_lib:spawn_link(fun() -> ok = cets:insert_serial(Tab1, {a, 1}), Me ! inserted1 end), @@ -597,7 +597,7 @@ insert_serial_overwrites_data_consistently(Config) -> %% (just to get a predictable value. The value would be still %% consistent in case first insert comes after the second). wait_till_message_queue_length(Leader, 1), - spawn_link(fun() -> + proc_lib:spawn_link(fun() -> ok = cets:insert_serial(Tab2, {a, 2}), Me ! inserted2 end), @@ -617,7 +617,7 @@ insert_serial_works_when_leader_is_back(Config) -> Leader = cets:get_leader(Pid1), NotLeader = not_leader(Pid1, Pid2, Leader), cets:set_leader(Leader, false), - spawn(fun() -> + proc_lib:spawn(fun() -> timer:sleep(100), cets:set_leader(Leader, true) end), @@ -634,7 +634,7 @@ insert_serial_blocks_when_leader_is_not_back(Config) -> Leader = cets:get_leader(Pid1), NotLeader = not_leader(Pid1, Pid2, Leader), cets:set_leader(Leader, false), - InserterPid = spawn(fun() -> + InserterPid = proc_lib:spawn(fun() -> %% Will block indefinetely, because we set is_leader flag manually. ok = cets:insert_serial(NotLeader, {alice, 32}) end), @@ -991,12 +991,12 @@ join_retried_if_lock_is_busy(Config) -> (_) -> ok end, %% Get the lock in a separate process - spawn_link(fun() -> + proc_lib:spawn_link(fun() -> cets_join:join(Lock, #{}, Pid1, Pid2, #{checkpoint_handler => SleepyF}) end), receive_message(join_start), %% We actually would not return from cets_join:join unless we get the lock - spawn_link(fun() -> + proc_lib:spawn_link(fun() -> ok = cets_join:join(Lock, #{}, Pid1, Pid2, #{checkpoint_handler => F}) end), receive_message(before_retry). @@ -1014,12 +1014,12 @@ join_done_already_while_waiting_for_lock_so_do_nothing(Config) -> F1 = send_join_start_back_and_wait_for_continue_joining(), F2 = fun(_) -> ok end, %% Get the lock in a separate process - spawn_link(fun() -> + proc_lib:spawn_link(fun() -> ok = cets_join:join(Lock, Info, Pid1, Pid3, #{checkpoint_handler => F1}), Me ! first_join_returns end), JoinPid = receive_message_with_arg(join_start), - spawn_link(fun() -> + proc_lib:spawn_link(fun() -> ok = cets_join:join(Lock, Info, Pid1, Pid3, #{checkpoint_handler => F2}), Me ! second_join_returns end), @@ -1040,7 +1040,7 @@ pause_owner_crashed_is_logged(Config) -> logger_debug_h:start(#{id => ?FUNCTION_NAME}), {ok, Pid1} = start_local(make_name(Config, 1)), Me = self(), - PausedByPid = spawn(fun() -> + PausedByPid = proc_lib:spawn(fun() -> cets:pause(Pid1), Me ! paused, error(oops) @@ -1065,7 +1065,7 @@ pause_owner_crashed_is_not_logged_if_reason_is_normal(Config) -> logger_debug_h:start(#{id => ?FUNCTION_NAME}), {ok, Pid1} = start_local(make_name(Config, 1)), Me = self(), - PausedByPid = spawn(fun() -> + PausedByPid = proc_lib:spawn(fun() -> cets:pause(Pid1), Me ! paused end), @@ -1104,7 +1104,7 @@ shutdown_reason_is_not_logged_in_tracked(_Config) -> Me ! ready, timer:sleep(infinity) end, - Pid = spawn(fun() -> cets_long:run_tracked(#{log_ref => LogRef}, F) end), + Pid = proc_lib:spawn(fun() -> cets_long:run_tracked(#{log_ref => LogRef}, F) end), receive_message(ready), exit(Pid, shutdown), wait_for_down(Pid), @@ -1119,7 +1119,7 @@ other_reason_is_logged_in_tracked(_Config) -> Me ! ready, timer:sleep(infinity) end, - Pid = spawn(fun() -> cets_long:run_tracked(#{log_ref => LogRef}, F) end), + Pid = proc_lib:spawn(fun() -> cets_long:run_tracked(#{log_ref => LogRef}, F) end), receive_message(ready), exit(Pid, bad_stuff_happened), wait_for_down(Pid), @@ -1259,7 +1259,7 @@ pause_on_remote_node_returns_if_monitor_process_dies(Config) -> JoinPid = make_process(), #{ct2 := Node2} = proplists:get_value(nodes, Config), AllPids = [rpc(Node2, ?MODULE, make_process, [])], - TestPid = spawn(fun() -> + TestPid = proc_lib:spawn(fun() -> %% Would block cets_join:pause_on_remote_node(JoinPid, AllPids) end), @@ -2146,7 +2146,7 @@ ack_process_handles_unknown_remote_server(Config) -> sys:suspend(Pid2), #{ack_pid := AckPid} = cets:info(Pid1), [Pid2] = cets:other_pids(Pid1), - RandomPid = spawn(fun() -> ok end), + RandomPid = proc_lib:spawn(fun() -> ok end), %% Request returns immediately, %% we actually need to send a ping to ensure it has been processed locally R = cets:insert_request(Pid1, {1}), @@ -2299,7 +2299,7 @@ check_could_reach_each_other_fails(_Config) -> unknown_down_message_is_ignored(Config) -> {ok, Pid} = start_local(make_name(Config)), - RandPid = spawn(fun() -> ok end), + RandPid = proc_lib:spawn(fun() -> ok end), Pid ! {'DOWN', make_ref(), process, RandPid, oops}, still_works(Pid). @@ -2390,10 +2390,10 @@ long_call_fails_because_linked_process_dies(_Config) -> Me ! task_started, timer:sleep(infinity) end, - RunPid = spawn(fun() -> cets_long:run_tracked(#{log_ref => LogRef}, F) end), + RunPid = proc_lib:spawn(fun() -> cets_long:run_tracked(#{log_ref => LogRef}, F) end), %% To avoid race conditions receive_message(task_started), - spawn(fun() -> + proc_lib:spawn(fun() -> link(RunPid), error(sim_error_in_linked_process) end), @@ -2892,11 +2892,13 @@ start_local(Name, Opts) -> schedule_cleanup(Pid) -> Me = self(), - spawn(fun() -> + proc_lib:spawn(fun() -> Ref = erlang:monitor(process, Me), receive {'DOWN', Ref, process, Me, _} -> - cets:stop(Pid) + %% We do an RPC call, because erlang distribution + %% could not be always reliable (because we test netsplits) + rpc(node_to_peer(node(Pid)), cets, stop, [Pid]) end end). @@ -2976,12 +2978,27 @@ start_node(Sname) -> {ok, Peer, Node} = ?CT_PEER(#{ name => Sname, connection => standard_io, args => extra_args(Sname) }), + %% Register so we can find Peer process later in code + register(node_to_peer(Node), Peer), %% Keep nodes running after init_per_suite is finished unlink(Peer), %% Do RPC using alternative connection method ok = peer:call(Peer, code, add_paths, [code:get_path()]), {Node, Peer}. +%% Returns Peer or Node name which could be used to do RPC-s reliably +%% (regardless if Erlang Distribution works or not) +node_to_peer(Node) when Node =:= node() -> + %% There is no peer for the local CT node + Node; +node_to_peer(Node) when is_atom(Node) -> + case whereis(list_to_atom(atom_to_list(Node) ++ "_peer")) of + Pid when is_pid(Pid) -> + Pid; + undefined -> + ct:fail({node_to_peer_failed, Node}) + end. + receive_message(M) -> receive M -> ok @@ -3279,7 +3296,7 @@ get_disco_timestamp(Disco, MapName, NodeKey) -> Timestamp. make_signalling_process() -> - spawn_link(fun() -> + proc_lib:spawn_link(fun() -> receive stop -> ok end @@ -3305,7 +3322,7 @@ assert_unique(List) -> List. make_process() -> - spawn(fun() -> + proc_lib:spawn(fun() -> receive stop -> stop end From fbaf1d8804d10801eddd62a4a0eec63f6d8c8730 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Thu, 29 Feb 2024 18:50:18 +0100 Subject: [PATCH 13/13] Add cleanup table for tests More reliable disconnect_node in tests Link disco process in tests --- test/cets_SUITE.erl | 105 ++++++++++++++++++++++++++++++++------------ 1 file changed, 78 insertions(+), 27 deletions(-) diff --git a/test/cets_SUITE.erl b/test/cets_SUITE.erl index 96ebe447..ee4cef73 100644 --- a/test/cets_SUITE.erl +++ b/test/cets_SUITE.erl @@ -231,6 +231,7 @@ cets_seq_no_log_cases() -> ]. init_per_suite(Config) -> + init_cleanup_table(), Names = [ct2, ct3, ct4, ct5, ct6, ct7], {Nodes, Peers} = lists:unzip([start_node(N) || N <- Names]), [ @@ -264,6 +265,7 @@ init_per_testcase_generic(Name, Config) -> [{testcase, Name} | Config]. end_per_testcase(_, _Config) -> + wait_for_cleanup(), ok. %% Modules that use a multiline LOG_ macro @@ -1451,7 +1453,7 @@ test_multinode_auto_discovery(Config) -> ct:pal("Dir ~p", [Dir]), FileName = filename:join(Dir, "disco.txt"), ok = file:write_file(FileName, io_lib:format("~s~n~s~n", [Node1, Node2])), - {ok, Disco} = cets_discovery:start(#{tables => [Tab], disco_file => FileName}), + {ok, Disco} = cets_discovery:start_link(#{tables => [Tab], disco_file => FileName}), %% Disco is async, so we have to wait for the final state ok = wait_for_ready(Disco, 5000), [Node2] = other_nodes(Node1, Tab), @@ -1471,7 +1473,7 @@ test_disco_add_table(Config) -> ct:pal("Dir ~p", [Dir]), FileName = filename:join(Dir, "disco.txt"), ok = file:write_file(FileName, io_lib:format("~s~n~s~n", [Node1, Node2])), - {ok, Disco} = cets_discovery:start(#{tables => [], disco_file => FileName}), + {ok, Disco} = cets_discovery:start_link(#{tables => [], disco_file => FileName}), cets_discovery:add_table(Disco, Tab), %% Disco is async, so we have to wait for the final state ok = wait_for_ready(Disco, 5000), @@ -1482,7 +1484,9 @@ test_disco_add_table(Config) -> test_disco_delete_table(Config) -> F = fun(State) -> {{ok, []}, State} end, - {ok, Disco} = cets_discovery:start(#{backend_module => cets_discovery_fun, get_nodes_fn => F}), + {ok, Disco} = cets_discovery:start_link(#{ + backend_module => cets_discovery_fun, get_nodes_fn => F + }), Tab = make_name(Config), cets_discovery:add_table(Disco, Tab), #{tables := [Tab]} = cets_discovery:system_info(Disco), @@ -1491,14 +1495,18 @@ test_disco_delete_table(Config) -> test_disco_delete_unknown_table(Config) -> F = fun(State) -> {{ok, []}, State} end, - {ok, Disco} = cets_discovery:start(#{backend_module => cets_discovery_fun, get_nodes_fn => F}), + {ok, Disco} = cets_discovery:start_link(#{ + backend_module => cets_discovery_fun, get_nodes_fn => F + }), Tab = make_name(Config), cets_discovery:delete_table(Disco, Tab), #{tables := []} = cets_discovery:system_info(Disco). test_disco_delete_table_twice(Config) -> F = fun(State) -> {{ok, []}, State} end, - {ok, Disco} = cets_discovery:start(#{backend_module => cets_discovery_fun, get_nodes_fn => F}), + {ok, Disco} = cets_discovery:start_link(#{ + backend_module => cets_discovery_fun, get_nodes_fn => F + }), Tab = make_name(Config), cets_discovery:add_table(Disco, Tab), #{tables := [Tab]} = cets_discovery:system_info(Disco), @@ -1516,7 +1524,7 @@ test_disco_file_appears(Config) -> ct:pal("Dir ~p", [Dir]), FileName = filename:join(Dir, "disco3.txt"), file:delete(FileName), - {ok, Disco} = cets_discovery:start(#{tables => [], disco_file => FileName}), + {ok, Disco} = cets_discovery:start_link(#{tables => [], disco_file => FileName}), cets_discovery:add_table(Disco, Tab), cets_test_wait:wait_until( fun() -> maps:get(last_get_nodes_retry_type, cets_discovery:system_info(Disco)) end, @@ -1540,7 +1548,7 @@ test_disco_handles_bad_node(Config) -> ct:pal("Dir ~p", [Dir]), FileName = filename:join(Dir, "disco_badnode.txt"), ok = file:write_file(FileName, io_lib:format("badnode@localhost~n~s~n~s~n", [Node1, Node2])), - {ok, Disco} = cets_discovery:start(#{tables => [], disco_file => FileName}), + {ok, Disco} = cets_discovery:start_link(#{tables => [], disco_file => FileName}), cets_discovery:add_table(Disco, Tab), %% Check that wait_for_ready would not block forever: ok = wait_for_ready(Disco, 5000), @@ -1557,7 +1565,9 @@ cets_discovery_fun_backend_works(Config) -> {ok, _Pid1} = start(Node1, Tab), {ok, _Pid2} = start(Node2, Tab), F = fun(State) -> {{ok, [Node1, Node2]}, State} end, - {ok, Disco} = cets_discovery:start(#{backend_module => cets_discovery_fun, get_nodes_fn => F}), + {ok, Disco} = cets_discovery:start_link(#{ + backend_module => cets_discovery_fun, get_nodes_fn => F + }), cets_discovery:add_table(Disco, Tab), ok = wait_for_ready(Disco, 5000), [#{memory := _, nodes := [Node1, Node2], size := 0, table := Tab}] = @@ -1566,7 +1576,7 @@ cets_discovery_fun_backend_works(Config) -> test_disco_add_table_twice(Config) -> Dir = proplists:get_value(priv_dir, Config), FileName = filename:join(Dir, "disco.txt"), - {ok, Disco} = cets_discovery:start(#{tables => [], disco_file => FileName}), + {ok, Disco} = cets_discovery:start_link(#{tables => [], disco_file => FileName}), Tab = make_name(Config), {ok, _Pid} = start_local(Tab), cets_discovery:add_table(Disco, Tab), @@ -1593,7 +1603,9 @@ test_disco_add_two_tables(Config) -> Me ! waited_for_sent_both, {{ok, [Node1, Node2]}, State#{waited => true}} end, - {ok, Disco} = cets_discovery:start(#{backend_module => cets_discovery_fun, get_nodes_fn => F}), + {ok, Disco} = cets_discovery:start_link(#{ + backend_module => cets_discovery_fun, get_nodes_fn => F + }), %% Add two tables async cets_discovery:add_table(Disco, Tab1), %% After the first table, Disco would get blocked in get_nodes function (see wait_till_test_stage in F above) @@ -1636,7 +1648,9 @@ disco_retried_if_get_nodes_fail(Config) -> F = fun(State) -> {{error, simulate_error}, State} end, - {ok, Disco} = cets_discovery:start(#{backend_module => cets_discovery_fun, get_nodes_fn => F}), + {ok, Disco} = cets_discovery:start_link(#{ + backend_module => cets_discovery_fun, get_nodes_fn => F + }), cets_discovery:add_table(Disco, Tab), cets_test_wait:wait_until( fun() -> maps:get(last_get_nodes_retry_type, cets_discovery:system_info(Disco)) end, @@ -1672,7 +1686,9 @@ generic_disco_uses_regular_retry_interval_in_the_regular_phase(Config) -> {ok, _} = start(Node1, Tab), {ok, _} = start(Node2, Tab), F = fun(State) -> {{ok, [Node1, Node2]}, State} end, - {ok, Disco} = cets_discovery:start(#{backend_module => cets_discovery_fun, get_nodes_fn => F}), + {ok, Disco} = cets_discovery:start_link(#{ + backend_module => cets_discovery_fun, get_nodes_fn => F + }), Disco ! enter_regular_phase, cets_discovery:add_table(Disco, Tab), cets_test_wait:wait_until( @@ -1690,7 +1706,9 @@ disco_handles_node_up_and_down(Config) -> F = fun(State) -> {{ok, [Node1, Node2, BadNode]}, State} end, - {ok, Disco} = cets_discovery:start(#{backend_module => cets_discovery_fun, get_nodes_fn => F}), + {ok, Disco} = cets_discovery:start_link(#{ + backend_module => cets_discovery_fun, get_nodes_fn => F + }), cets_discovery:add_table(Disco, Tab), %% get_nodes call is async, so wait for it cets_test_wait:wait_until( @@ -1951,7 +1969,9 @@ status_conflict_nodes(Config) -> disco_wait_for_get_nodes_works(_Config) -> F = fun(State) -> {{ok, []}, State} end, - {ok, Disco} = cets_discovery:start(#{backend_module => cets_discovery_fun, get_nodes_fn => F}), + {ok, Disco} = cets_discovery:start_link(#{ + backend_module => cets_discovery_fun, get_nodes_fn => F + }), ok = cets_discovery:wait_for_get_nodes(Disco, 5000). disco_wait_for_get_nodes_blocks_and_returns(Config) -> @@ -1962,7 +1982,9 @@ disco_wait_for_get_nodes_blocks_and_returns(Config) -> wait_for_down(SignallingPid), {{ok, []}, State} end, - {ok, Disco} = cets_discovery:start(#{backend_module => cets_discovery_fun, get_nodes_fn => F}), + {ok, Disco} = cets_discovery:start_link(#{ + backend_module => cets_discovery_fun, get_nodes_fn => F + }), cets_discovery:add_table(Disco, Tab), %% Enter into a blocking get_nodes function Disco ! check, @@ -1994,7 +2016,7 @@ disco_wait_for_get_nodes_when_get_nodes_needs_to_be_retried(Config) -> wait_for_down(SignallingPid2), {{ok, []}, State#{step => 2}} end, - {ok, Disco} = cets_discovery:start(#{ + {ok, Disco} = cets_discovery:start_link(#{ backend_module => cets_discovery_fun, get_nodes_fn => F, step => 1 }), cets_discovery:add_table(Disco, Tab), @@ -2484,7 +2506,9 @@ disco_connects_to_unconnected_node(Config) -> F = fun(State) -> {{ok, [Node1, Node5]}, State} end, - {ok, Disco} = cets_discovery:start(#{backend_module => cets_discovery_fun, get_nodes_fn => F}), + {ok, Disco} = cets_discovery:start_link(#{ + backend_module => cets_discovery_fun, get_nodes_fn => F + }), cets_discovery:add_table(Disco, Tab), ok = wait_for_ready(Disco, 5000). @@ -2748,9 +2772,9 @@ disco_logs_node_reconnects_after_downtime(Config) -> disco_nodeup_timestamp_is_updated_after_node_reconnects(Config) -> logger_debug_h:start(#{id => ?FUNCTION_NAME}), Setup = setup_two_nodes_and_discovery(Config, [wait, disco2]), - #{disco := Disco, node1 := Node1, node2 := Node2, peer2 := Peer2} = Setup, + #{disco := Disco, node2 := Node2} = Setup, OldTimestamp = get_disco_timestamp(Disco, nodeup_timestamps, Node2), - disconnect_node(Peer2, Node1), + disconnect_node_by_name(Config, ct2), wait_for_disco_timestamp_to_be_updated(Disco, nodeup_timestamps, Node2, OldTimestamp). disco_node_start_timestamp_is_updated_after_node_restarts(Config) -> @@ -2892,16 +2916,31 @@ start_local(Name, Opts) -> schedule_cleanup(Pid) -> Me = self(), - proc_lib:spawn(fun() -> + Cleaner = proc_lib:spawn(fun() -> Ref = erlang:monitor(process, Me), receive {'DOWN', Ref, process, Me, _} -> %% We do an RPC call, because erlang distribution %% could not be always reliable (because we test netsplits) - rpc(node_to_peer(node(Pid)), cets, stop, [Pid]) + rpc(node_to_peer(node(Pid)), cets, stop, [Pid]), + ets:delete_object(cleanup_table, {Me, self()}) end + end), + ets:insert(cleanup_table, {Me, Cleaner}). + +init_cleanup_table() -> + spawn(fun() -> + ets:new(cleanup_table, [named_table, public, bag]), + timer:sleep(infinity) end). +%% schedule_cleanup is async, so this function is waiting for it to finish +wait_for_cleanup() -> + [ + wait_for_down(Cleaner) + || {Owner, Cleaner} <- ets:tab2list(cleanup_table), not is_process_alive(Owner) + ]. + start(Node, Tab) -> catch rpc(Node, cets, stop, [Tab]), wait_for_name_to_be_free(Node, Tab), @@ -2979,7 +3018,7 @@ start_node(Sname) -> name => Sname, connection => standard_io, args => extra_args(Sname) }), %% Register so we can find Peer process later in code - register(node_to_peer(Node), Peer), + register(node_to_peer_name(Node), Peer), %% Keep nodes running after init_per_suite is finished unlink(Peer), %% Do RPC using alternative connection method @@ -2992,13 +3031,16 @@ node_to_peer(Node) when Node =:= node() -> %% There is no peer for the local CT node Node; node_to_peer(Node) when is_atom(Node) -> - case whereis(list_to_atom(atom_to_list(Node) ++ "_peer")) of + case whereis(node_to_peer_name(Node)) of Pid when is_pid(Pid) -> Pid; undefined -> ct:fail({node_to_peer_failed, Node}) end. +node_to_peer_name(Node) -> + list_to_atom(atom_to_list(Node) ++ "_peer"). + receive_message(M) -> receive M -> ok @@ -3092,13 +3134,11 @@ setup_two_nodes_and_discovery(Config) -> %% - disco2 - start discovery on Node2 %% - wait - call wait_for_ready/2 setup_two_nodes_and_discovery(Config, Flags) -> - ok = net_kernel:monitor_nodes(true), Me = self(), Node1 = node(), #{ct2 := Peer2} = proplists:get_value(peers, Config), #{ct2 := Node2} = proplists:get_value(nodes, Config), - disconnect_node(Peer2, Node1), - receive_message({nodedown, Node2}), + disconnect_node_by_name(Config, ct2), Tab = make_name(Config), {ok, _Pid1} = start(Node1, Tab), {ok, _Pid2} = start(Peer2, Tab), @@ -3136,7 +3176,7 @@ setup_two_nodes_and_discovery(Config, Flags) -> case lists:member(netsplit, Flags) of true -> %% Simulate a loss of connection between nodes - disconnect_node(Peer2, Node1); + disconnect_node_by_name(Config, ct2); false -> ok end, @@ -3210,6 +3250,17 @@ reconnect_node(Node, Peer) when is_atom(Node), is_pid(Peer) -> disconnect_node(RPCNode, DisconnectNode) -> rpc(RPCNode, erlang, disconnect_node, [DisconnectNode]). +disconnect_node_by_name(Config, Id) -> + Peer = maps:get(Id, proplists:get_value(peers, Config)), + Node = maps:get(Id, proplists:get_value(nodes, Config)), + %% We could need to retry to disconnect, if the local node is currently trying to establish a connection + %% with Node2 (could be triggered by the previous tests) + F = fun() -> + disconnect_node(Peer, node()), + lists:member(Node, nodes()) + end, + cets_test_wait:wait_until(F, false). + not_leader(Leader, Other, Leader) -> Other; not_leader(Other, Leader, Leader) ->