Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mas 3.0 dialyzer #800

Merged
merged 8 commits into from
Oct 8, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ dialyzer_warnings
.rebar/
_build/
rebar.lock
.DS_Store
19 changes: 0 additions & 19 deletions dialyzer.ignore-warnings

This file was deleted.

36 changes: 26 additions & 10 deletions src/gen_leader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@
-type msg() :: term().
-type reason() :: atom().
-type extra() :: term().
-type server() :: #server{}.

-callback init(list()) -> {ok, state()} | {ignore, term()} | {stop, term()}.
-callback elected(state(), election(), node()) -> {ok, term(), state()}.
Expand Down Expand Up @@ -933,7 +934,7 @@ loop(#server{parent = Parent,
_Msg when Debug == [] ->
handle_msg(Msg, Server, Role, E);
_Msg ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event},
Debug1 = sys:handle_debug(Debug, fun print_event/3,
E#election.name, {in, Msg}),
handle_msg(Msg, Server#server{debug = Debug1}, Role, E)
end
Expand All @@ -949,8 +950,18 @@ system_continue(_Parent, _Debug, [normal, Server, Role, E]) ->
loop(Server, Role, E,{}).

%% @hidden
system_terminate(Reason, _Parent, _Debug, [_Mode, Server, Role, E]) ->
terminate(Reason, [], Server, Role, E).
system_terminate(Reason,
_Parent,
_Debug,
[_Mode,
#server{mod = Mod,
state = State,
debug = Debug},
_Role,
#election{name = Name, cand_timer = Timer}]) ->
_ = timer:cancel(Timer),
R = maybe_log_reason(Mod, Reason, Name, [], State, Debug),
exit(R).

%% @hidden
system_code_change([Mode, Server, Role, E], _Module, OldVsn, Extra) ->
Expand Down Expand Up @@ -1112,33 +1123,38 @@ reply({To, Tag}, Reply, #server{state = State} = Server, Role, E) ->
handle_debug(#server{debug = []} = Server, _Role, _E, _Event) ->
Server;
handle_debug(#server{debug = Debug} = Server, _Role, E, Event) ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event},
Debug1 = sys:handle_debug(Debug, fun print_event/3,
E#election.name, Event),
Server#server{debug = Debug1}.

%%% ---------------------------------------------------
%%% Terminate the server.
%%% ---------------------------------------------------

-spec terminate(reason(), msg(), server(), any(), election()) -> no_return().
terminate(Reason, Msg, #server{mod = Mod,
state = State,
debug = Debug} = _Server, _Role,
#election{name = Name, cand_timer = Timer} = _E) ->
_ = timer:cancel(Timer),
R = maybe_log_reason(Mod, Reason, Name, Msg, State, Debug),
exit(R).

maybe_log_reason(Mod, Reason, Name, Msg, State, Debug) ->
case catch Mod:terminate(Reason, State) of
{'EXIT', R} ->
error_info(R, Name, Msg, State, Debug),
exit(R);
R;
_ ->
case Reason of
normal ->
exit(normal);
ok;
shutdown ->
exit(shutdown);
ok;
_ ->
error_info(Reason, Name, Msg, State, Debug),
exit(Reason)
end
error_info(Reason, Name, Msg, State, Debug)
end,
Reason
end.

%% Maybe we shouldn't do this? We have the crash report...
Expand Down
16 changes: 8 additions & 8 deletions src/riak_core_cluster_conn.erl
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,17 @@
-type ranch_transport_messages() :: {atom(), atom(), atom()}.
-record(state, {mode :: atom(),
remote :: remote(),
socket :: port(),
name :: clustername(),
socket :: port() | undefined,
name :: clustername() |undefined,
previous_name="undefined" :: clustername(),
members=[] :: [peer_address() | node_address()],
connection_ref :: reference(),
connection_ref :: reference() | undefined,
connection_timeout :: timeout(),
transport :: atom(),
address :: peer_address(),
connection_props :: proplists:proplist(),
transport_msgs :: ranch_transport_messages(),
proto_version :: {non_neg_integer(), non_neg_integer()} }).
transport :: atom() | undefined,
address :: peer_address() | undefined,
connection_props = [] :: proplists:proplist(),
transport_msgs :: ranch_transport_messages() | undefined,
proto_version :: {non_neg_integer(), non_neg_integer()} | undefined }).
-type state() :: #state{}.

%%%===================================================================
Expand Down
4 changes: 2 additions & 2 deletions src/riak_core_connection_mgr.erl
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@
is_black_listed = false :: boolean(), %% true after a failed connection attempt
backoff_delay=0 :: counter(), %% incremented on each failure, reset to zero on success
failures = orddict:new() :: orddict:orddict(), %% failure reasons
last_fail_time :: erlang:timestamp(), %% time of last failure since 1970
next_try_secs :: counter() %% time in seconds to next retry attempt
last_fail_time :: erlang:timestamp() | undefined, %% time of last failure since 1970
next_try_secs = 0 :: counter() %% time in seconds to next retry attempt
}).

%% connection request record
Expand Down
4 changes: 3 additions & 1 deletion src/riak_repl.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
ebloom,
riak_core,
riak_kv,
ranch]},
riak_api,
ranch,
gen_fsm_compat]},
{registered, [riak_repl_connector_sup,
riak_repl_leader,
riak_repl_stats,
Expand Down
28 changes: 19 additions & 9 deletions src/riak_repl2_fscoordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@
-endif.

-record(stat_cache, {
worker :: {pid(), reference()},
refresh_timer :: reference(),
worker :: {pid(), reference()} | undefined,
refresh_timer :: reference() | undefined,
refresh_interval = app_helper:get_env(riak_repl, fullsync_stat_refresh_interval, ?DEFAULT_STAT_REFRESH_INTERVAL) :: pos_integer(),
last_refresh = riak_core_util:moment() :: 'undefined' | pos_integer(),
stats = [] :: [tuple()]
Expand Down Expand Up @@ -566,9 +566,10 @@ handle_abnormal_exit(ExitType, Pid, _Cause, {value, PartitionWithSource, Running
Dropped = [Partition#partition_info.index | State4#state.dropped],
Purgatory = queue:filter(fun({P, _}) -> P =/= Partition end,
State4#state.purgatory),
{noreply, State4#state{error_exits = ErrorExits1,
purgatory = Purgatory,
dropped = Dropped}};
maybe_complete_fullsync(Running,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more than just a refactoring...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, I was a bit lazy combining the two. Shall I revert out the commits associated with #799 and put in a separate PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reference. This is fine.

State4#state{error_exits = ErrorExits1,
purgatory = Purgatory,
dropped = Dropped});
true ->
Now = riak_core_util:moment(),
Purgatory = queue:in({Partition, Now}, State4#state.purgatory),
Expand Down Expand Up @@ -1129,7 +1130,10 @@ test_purgatory_wait_time() ->
FakeNode = '[email protected]',
FakePid = fake_pid(),
PartInfo = #partition_info{running_source= FakePid, index=Index, node=FakeNode},
State = #state{running_sources=[PartInfo], transport=mock_transport},
State =
#state{running_sources=[PartInfo],
transport=mock_transport,
fullsync_start_time = riak_core_util:moment()},
meck:expect(riak_core_util, moment, [],
meck:seq([1, 2, 6])),

Expand Down Expand Up @@ -1185,9 +1189,11 @@ test_retry_count() ->

#state{purgatory=Purgatory,
dropped=Dropped,
error_exits=ErrorExits} = retry(0,
MaxRetriesExpected,
#state{transport=mock_transport}),
error_exits=ErrorExits} =
retry(0,
MaxRetriesExpected,
#state{transport=mock_transport,
fullsync_start_time = riak_core_util:moment()}),

?assertEqual(1, ErrorExits),
?assertEqual(1, length(Dropped)),
Expand Down Expand Up @@ -1228,6 +1234,10 @@ setup() ->
meck:new(riak_core, [passthrough]),
%% make a fake transport
meck:new(mock_transport, [non_strict]),
meck:new(riak_core_connection),
meck:expect(riak_core_connection,
symbolic_clustername,
fun() -> "test_cluster" end),
{OrigRetries, OrigWait}.

teardown({OrigRetries, OrigWait}) ->
Expand Down
2 changes: 1 addition & 1 deletion src/riak_repl2_fssink.erl
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
cluster,
fullsync_worker,
work_dir = undefined,
strategy :: keylist | aae,
strategy :: keylist | aae | undefined,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation says:

%% If the protocol version >= 2.0, and the cluster has aae enabled and the replication
%% configuration stanza enables the aae strategy (the default is keylist),

In other words, it assumes a default. I would expect the init function to then implement that default by initialising the state with keylist alternatively to follow the logic.

At this moment there is a possible race condition when the server is started and a call legacy_status comes in, this may crash the server (line 107). Dializer seems not to spot this, though.

proto,
ver % highest common wire protocol in common with fs source
}).
Expand Down
2 changes: 1 addition & 1 deletion src/riak_repl2_leader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
candidates=[] :: [node()], % candidate nodes for leader
workers=[node()] :: [node()], % workers
check_tref :: timer:tref(), % check mailbox timer
elected_mbox_size :: integer() % elected leader box size
elected_mbox_size :: undefined | non_neg_integer() % elected leader box size
}).

%%%===================================================================
Expand Down
5 changes: 2 additions & 3 deletions src/riak_repl2_rtsink_conn.erl
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ bt_dropped(BucketType, #state{bt_drops = BucketDict} = State) ->
-define(REACTIVATE_SOCK_INT_MILLIS_TEST_VAL, 20).
-define(PORT_RANGE, 999999).

-compile(export_all).
-compile([export_all, nowarn_export_all]).

riak_repl2_rtsink_conn_test_() ->
{spawn,
Expand Down Expand Up @@ -520,8 +520,7 @@ cache_peername_test_case() ->
catch(meck:unload(riak_repl_util)),
meck:new(riak_repl_util, [passthrough]),
meck:expect(riak_repl_util, generate_socket_tag, fun(Prefix, _Transport, _Socket) ->
random:seed(now()),
Portnum = random:uniform(?PORT_RANGE),
Portnum = rand:uniform(?PORT_RANGE),
lists:flatten(io_lib:format("~s_~p -> ~p:~p",[
Prefix,
Portnum,
Expand Down
6 changes: 3 additions & 3 deletions src/riak_repl_aae_sink.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
clustername,
socket,
transport,
tree_pid :: pid(), %% pid of the AAE tree
tree_pid :: pid() | undefined, %% pid of the AAE tree
partition,
sender :: pid(),
sender :: pid() | undefined,
owner :: pid() %% our fssource owner
}).

Expand All @@ -43,7 +43,7 @@ init_sync(AAEWorker) ->
%%%===================================================================

init([ClusterName, Transport, Socket, OwnerPid]) ->
lager:info("Starting AAE fullsync sink worker"),
lager:info("Starting AAE fullsync sink worker with Socket ~w", [Socket]),
{ok, #state{clustername=ClusterName, socket=Socket, transport=Transport, owner=OwnerPid}}.

handle_call(init_sync, _From, State=#state{transport=Transport, socket=Socket}) ->
Expand Down
37 changes: 19 additions & 18 deletions src/riak_repl_aae_source.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,43 +34,44 @@

-record(exchange, {mode :: inline | buffered,
buffer :: ets:tid(),
bloom :: reference(), %% ebloom
bloom :: reference() | undefined, %% ebloom
limit :: non_neg_integer(),
count :: non_neg_integer()
}).

-type exchange() :: #exchange{}.
-type version() :: legacy | non_neg_integer().

%% Per state transition timeout used by certain transitions
-define(DEFAULT_ACTION_TIMEOUT, 300000). %% 5 minutes

%% the first this many differences are not put in the bloom
%% filter, but simply sent to the remote site directly.
-define(GET_OBJECT_LIMIT, 1000).

%% Diff percentage needed to use bloom filter
-define(DIFF_PERCENTAGE, 5).

-record(state, {cluster,
client, %% riak:local_client()
transport,
socket,
index :: index(),
indexns :: [index_n()],
tree_pid :: pid(),
tree_mref :: reference(),
tree_version :: version(),
indexns :: [index_n()] | undefined,
tree_pid :: pid() | undefined,
tree_mref :: reference() | undefined,
tree_version :: version() | undefined,
built :: non_neg_integer(),
timeout :: pos_integer(),
wire_ver :: atom(),
timeout = ?DEFAULT_ACTION_TIMEOUT :: pos_integer(),
wire_ver :: atom() | undefined,
diff_batch_size = 1000 :: non_neg_integer(),
estimated_nr_keys :: non_neg_integer(),
estimated_nr_keys :: non_neg_integer() | undefined,
local_lock = false :: boolean(),
owner :: pid(),
proto :: term(),
exchange :: exchange()
exchange :: exchange() | undefined
}).

%% Per state transition timeout used by certain transitions
-define(DEFAULT_ACTION_TIMEOUT, 300000). %% 5 minutes

%% the first this many differences are not put in the bloom
%% filter, but simply sent to the remote site directly.
-define(GET_OBJECT_LIMIT, 1000).

%% Diff percentage needed to use bloom filter
-define(DIFF_PERCENTAGE, 5).

%%%===================================================================
%%% API
Expand Down
2 changes: 1 addition & 1 deletion src/riak_repl_cs.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
-module(riak_repl_cs).

-ifdef(TEST).
-compile(export_all).
-compile([export_all, nowarn_export_all]).
-include_lib("eunit/include/eunit.hrl").
-endif.

Expand Down
2 changes: 1 addition & 1 deletion src/riak_repl_fullsync_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ handle_call({get, B, K, Transport, Socket, Pool, Ver}, From, State) ->
gen_server:reply(From, ok),
%% do the get and send it to the client
{ok, Client} = riak:local_client(),
case Client:get(B, K, 1, ?REPL_FSM_TIMEOUT) of
case riak_client:get(B, K, 1, ?REPL_FSM_TIMEOUT, Client) of
{ok, RObj} ->
%% we don't actually have the vclock to compare, so just send the
%% key and let the other side sort things out.
Expand Down
Loading