Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use dotted version vectors on common tests #11

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@
]},
{docs, [
{deps, [edown]}
]},
{test, [
{deps, [
{dvv, ".*",
{git, "https://github.com/ricardobcl/Dotted-Version-Vectors.git", {tag, "1.0"}}}
]}
]}
]}.

Expand Down
32 changes: 25 additions & 7 deletions src/plumtree_broadcast.erl
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,6 @@ start_link() ->
%% list is empty)
InitEagers = Members,
InitLazys = [],
plumtree_util:log(debug, "init peers, eager: ~p, lazy: ~p",
[InitEagers, InitLazys]),
Mods = app_helper:get_env(plumtree, broadcast_mods, []),
Res = start_link(Members, InitEagers, InitLazys, Mods,
[{lazy_tick_period, LazyTickPeriod},
Expand Down Expand Up @@ -256,6 +254,8 @@ debug_get_tree(Root, Nodes) ->
%% @private
-spec init([[any()], ...]) -> {ok, state()}.
init([AllMembers, InitEagers, InitLazys, Mods, Opts]) ->
plumtree_util:log(debug, "init ~p peers, eager: ~p, lazy: ~p",
[AllMembers, InitEagers, InitLazys]),
LazyTickPeriod = proplists:get_value(lazy_tick_period, Opts),
ExchangeTickPeriod = proplists:get_value(exchange_tick_period, Opts),
schedule_lazy_tick(LazyTickPeriod),
Expand Down Expand Up @@ -300,7 +300,8 @@ handle_cast({broadcast, MessageId, Message, Mod, Round, Root, From}, State) ->
{noreply, State1};
handle_cast({prune, Root, From}, State) ->
plumtree_util:log(debug, "received ~p", [{prune, Root, From}]),
plumtree_util:log(debug, "moving peer ~p from eager to lazy", [From]),
plumtree_util:log(debug, "moving peer ~p from eager to lazy on tree rooted at ~p",
[From, Root]),
State1 = add_lazy(From, Root, State),
{noreply, State1};
handle_cast({i_have, MessageId, Mod, Round, Root, From}, State) ->
Expand Down Expand Up @@ -374,12 +375,25 @@ code_change(_OldVsn, State, _Extra) ->
%%%===================================================================
handle_broadcast(false, _MessageId, _Message, Mod, _Round, Root, From, State) -> %% stale msg
%% remove sender from eager and set as lazy
plumtree_util:log(debug, "moving peer ~p from eager to lazy", [From]),
plumtree_util:log(debug, "moving peer ~p from eager to lazy on tree rooted at ~p, requesting it to also do the same",
[From, Root]),
State1 = add_lazy(From, Root, State),
_ = send({prune, Root, myself()}, Mod, From),
State1;
%% The next clause is designed to allow the callback to override the message id
%% after the merge, suppose node A eager pushed a change to node B, node B would then lazy
%% push it to node C, at this point the message id being sent to C is the one that originated
%% from A. Concurrently B takes an update that subsumes the previous one, now node C receives the
%% lazy push and hasn't seen this message and asks B to graft it, if C now sends the message id
%% that it got from A, node B will not answer the graft since it is deemed stale.
%% With this extra clause the callback is able to return a new message id that resulted from
%% the merge and have that be propagated.
handle_broadcast({true, MessageId}, _OldMessageId, Message, Mod, Round, Root, From, State) ->
handle_broadcast(true, MessageId, Message, Mod, Round, Root, From, State);
handle_broadcast(true, MessageId, Message, Mod, Round, Root, From, State) -> %% valid msg
%% remove sender from lazy and set as eager
plumtree_util:log(debug, "moving peer ~p from lazy to eager on tree rooted at ~p",
[From, Root]),
State1 = add_eager(From, Root, State),
State2 = eager_push(MessageId, Message, Mod, Round+1, Root, From, State1),
schedule_lazy_push(MessageId, Mod, Round+1, Root, From, State2).
Expand All @@ -390,6 +404,8 @@ handle_ihave(true, MessageId, Mod, Round, Root, From, State) -> %% stale i_have
handle_ihave(false, MessageId, Mod, Round, Root, From, State) -> %% valid i_have
%% TODO: don't graft immediately
_ = send({graft, MessageId, Mod, Round, Root, myself()}, Mod, From),
plumtree_util:log(debug, "moving peer ~p from lazy to eager on tree rooted at ~p, graft requested from ~p",
[From, Root, From]),
add_eager(From, Root, State).

handle_graft(stale, MessageId, Mod, Round, Root, From, State) ->
Expand All @@ -401,6 +417,8 @@ handle_graft({ok, Message}, MessageId, Mod, Round, Root, From, State) ->
%% we don't ack outstanding here because the broadcast may fail to be delivered
%% instead we will allow the i_have to be sent once more and let the subsequent
%% ignore serve as the ack.
plumtree_util:log(debug, "moving peer ~p from lazy to eager on tree rooted at ~p",
[From, Root]),
State1 = add_eager(From, Root, State),
_ = send({broadcast, MessageId, Message, Mod, Round, Root, myself()}, Mod, From),
State1;
Expand Down Expand Up @@ -451,12 +469,12 @@ send_lazy(#state{outstanding=Outstanding}) ->
[send_lazy(Peer, Messages) || {Peer, Messages} <- orddict:to_list(Outstanding)].

send_lazy(Peer, Messages) ->
plumtree_util:log(debug, "flushing ~p outstanding lazy pushes to peer ~p",
[ordsets:size(Messages), Peer]),
[send_lazy(MessageId, Mod, Round, Root, Peer) ||
{MessageId, Mod, Round, Root} <- ordsets:to_list(Messages)].

send_lazy(MessageId, Mod, Round, Root, Peer) ->
plumtree_util:log(debug, "sending lazy push ~p",
[{i_have, MessageId, Mod, Round, Root, myself()}]),
send({i_have, MessageId, Mod, Round, Root, myself()}, Mod, Peer).

maybe_exchange(State) ->
Expand Down Expand Up @@ -645,7 +663,7 @@ send(Msg, Mod, P) ->
partisan_peer_service),
PeerServiceManager = PeerService:manager(),
instrument_transmission(Msg, Mod),
PeerServiceManager:forward_message(P, ?SERVER, Msg).
ok = PeerServiceManager:forward_message(P, ?SERVER, Msg).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this fails, will it crash? Is that right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will, the idea is to not silently forward messages to failed peers

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But, won't this crash the caller? I'm wondering if this should return error instead?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it doesn't crash the caller but it does crash the plumtree_broadcast worker process, returning error here doesn't really help since it is being ignored on all invocations

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another alternative is reporting an error so it has some visibility, thoughts?

%% TODO: add debug logging
%% gen_server:cast({?SERVER, P}, Msg).

Expand Down
3 changes: 2 additions & 1 deletion src/plumtree_broadcast_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@

%% Given the message id and payload, merge the message in the local state.
%% If the message has already been received return `false', otherwise return `true'
-callback merge(any(), any()) -> boolean().
%% If a new message id is to be propagated after the merge return `{true, MessageId}`
-callback merge(any(), any()) -> boolean() | {true, any()}.

%% Return true if the message (given the message id) has already been received.
%% `false' otherwise
Expand Down
22 changes: 12 additions & 10 deletions test/plumtree_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ membership_test(Config) ->
lists:foreach(fun(_) ->
{_, Node} = plumtree_test_utils:select_random(Nodes),
ok = rpc:call(Node,
plumtree_broadcast, broadcast,
[{k, rand_compat:uniform()}, plumtree_test_broadcast_handler])
plumtree_test_broadcast_handler, put,
[k, rand_compat:uniform()])
end, lists:seq(1, BroadcastRounds1)),
%% allow 100ms per broadcast to settle
timer:sleep(100 * BroadcastRounds1),
Expand All @@ -177,8 +177,8 @@ membership_test(Config) ->
lists:foreach(fun(_) ->
{_, Node} = plumtree_test_utils:select_random(Nodes),
ok = rpc:call(Node,
plumtree_broadcast, broadcast,
[{k, rand_compat:uniform()}, plumtree_test_broadcast_handler])
plumtree_test_broadcast_handler, put,
[k, rand_compat:uniform()])
end, lists:seq(1, BroadcastRounds2)),
%% allow 100ms per broadcast to settle
timer:sleep(100 * BroadcastRounds1),
Expand Down Expand Up @@ -225,8 +225,8 @@ broadcast_test(Config) ->
lists:foreach(fun(_) ->
{_, Node} = plumtree_test_utils:select_random(Nodes),
ok = rpc:call(Node,
plumtree_broadcast, broadcast,
[{k, rand_compat:uniform()}, plumtree_test_broadcast_handler])
plumtree_test_broadcast_handler, put,
[k, rand_compat:uniform()])
end, lists:seq(1, BroadcastRounds1)),
%% allow 500ms per broadcast to settle
timer:sleep(200 * BroadcastRounds1),
Expand All @@ -238,13 +238,13 @@ broadcast_test(Config) ->
Rand = rand_compat:uniform(),
{_, RandomNode} = plumtree_test_utils:select_random(Nodes),
ok = rpc:call(RandomNode,
plumtree_broadcast, broadcast,
[{k, Rand}, plumtree_test_broadcast_handler]),
plumtree_test_broadcast_handler, put,
[k, Rand]),
ct:pal("requested node ~p to broadcast {k, ~p}",
[RandomNode, Rand]),

VerifyFun = fun(Node, Rand0) ->
case rpc:call(Node, plumtree_test_broadcast_handler, read, [k]) of
case rpc:call(Node, plumtree_test_broadcast_handler, get, [k]) of
{error, not_found} ->
{false, not_found};
{ok, NodeRand} when NodeRand =:= Rand0 -> true;
Expand Down Expand Up @@ -406,7 +406,9 @@ start(_Case, Config, Options) ->

StartFun = fun({_Name, Node}) ->
%% Start plumtree.
{ok, _} = rpc:call(Node, plumtree, start, [])
{ok, _} = rpc:call(Node, plumtree, start, []),
%% set debug log level for test run
ok = rpc:call(Node, lager, set_loglevel, [{lager_file_backend,"log/console.log"}, debug])
end,
lists:foreach(StartFun, Nodes),

Expand Down
133 changes: 77 additions & 56 deletions test/plumtree_test_broadcast_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
is_stale/1,
graft/1,
exchange/1]).

%% gen_server callbacks
-export([init/1,
handle_call/3,
Expand All @@ -40,40 +41,42 @@

%% API
-export([start_link/0,
read/1]).
get/1,
put/2]).

-record(state, {}).
-type state() :: #state{}.
-type state() :: #state{}.

-spec start_link() -> ok.
start_link() ->
{ok, _} = gen_server:start_link({local, ?SERVER}, ?MODULE,
[], []),
ok.

-spec read(Key :: any()) -> {ok, any()} | {error, not_found}.
read(Key) ->
case ets:lookup(?MODULE, Key) of
[{Key, Value}] ->
% lager:info("read key ~p: ~p",
% [Key, Value]),
{ok, Value};
_ ->
lager:info("unable to find key: ~p",
[Key]),
{error, not_found}
-spec get(Key :: any()) -> {error, not_found} | {ok, any()}.
get(Key) ->
case dbread(Key) of
undefined -> {error, not_found};
Obj ->
{ok, plumtree_test_object:value(Obj)}
end.

-spec put(Key :: any(),
Value :: any()) -> ok.
put(Key, Value) ->
Existing = dbread(Key),
UpdatedObj = plumtree_test_object:modify(Existing, Value, this_server_id()),
dbwrite(Key, UpdatedObj),
plumtree_broadcast:broadcast({Key, UpdatedObj}, plumtree_test_broadcast_handler),
ok.

%%%===================================================================
%%% gen_server callbacks
%%%===================================================================

%% @private
-spec init([[any()], ...]) -> {ok, state()}.
init([]) ->
msgs_seen = ets:new(msgs_seen, [named_table, set, public,
{keypos, 1},
{read_concurrency, true}]),
?MODULE = ets:new(?MODULE, [named_table, set, public,
{keypos, 1},
{read_concurrency, true}]),
Expand Down Expand Up @@ -111,61 +114,58 @@ code_change(_OldVsn, State, _Extra) ->

%% Return a two-tuple of message id and payload from a given broadcast
-spec broadcast_data(any()) -> {any(), any()}.
broadcast_data({Key, _Value} = Data) ->
MsgId = erlang:phash2(Data),
broadcast_data({Key, Object}) ->
MsgId = {Key, plumtree_test_object:context(Object)},
lager:info("broadcast_data(~p), msg id: ~p",
[Data, MsgId]),
true = ets:insert(msgs_seen, {MsgId, Key}),
true = ets:insert(?MODULE, Data),
{MsgId, Data}.
[Object, MsgId]),
{MsgId, Object}.

%% Given the message id and payload, merge the message in the local state.
%% If the message has already been received return `false', otherwise return `true'
-spec merge(any(), any()) -> boolean().
merge(MsgId, {Key, _Value} = Payload) ->
case ets:lookup(msgs_seen, MsgId) of
[{MsgId, _}] ->
lager:info("msg with id ~p has already been seen",
[MsgId]),
false;
_ ->
lager:info("merging(~p, ~p) in local state",
[MsgId, Payload]),
%% insert the message in the local state
true = ets:insert(?MODULE, Payload),
%% mark this message as been seen
true = ets:insert_new(msgs_seen, {MsgId, Key}),
true
-spec merge(any(), any()) -> boolean() | {true, any()}.
merge({Key, _Context} = MsgId, RemoteObj) ->
Existing = dbread(Key),
lager:info("merge msg id ~p, remote object: ~p, existing object: ~p",
[MsgId, RemoteObj, Existing]),
case plumtree_test_object:reconcile(RemoteObj, Existing) of
false -> false;
{true, Reconciled} ->
lager:info("merge object has ben reconciled to ~p",
[Reconciled]),
dbwrite(Key, Reconciled),
{true, {Key, plumtree_test_object:context(Reconciled)}}
end.

%% Return true if the message (given the message id) has already been received.
%% `false' otherwise
-spec is_stale(any()) -> boolean().
is_stale(MsgId) ->
case ets:lookup(msgs_seen, MsgId) of
[{MsgId, _}] ->
lager:info("is_stale(~p): ~p",
[MsgId, true]),
true;
_ ->
lager:info("is_stale(~p): ~p",
[MsgId, false]),
false
is_stale({Key, Context}) ->
case dbread(Key) of
undefined -> false;
Existing ->
plumtree_test_object:is_stale(Context, Existing)
end.

%% Return the message associated with the given message id. In some cases a message
%% has already been sent with information that subsumes the message associated with the given
%% message id. In this case, `stale' is returned.
-spec graft(any()) -> stale | {ok, any()} | {error, any()}.
graft(MsgId) ->
% lager:info("graft(~p)",
% [MsgId]),
case ets:lookup(msgs_seen, MsgId) of
[{MsgId, Key}] ->
[{Key,Msg}] = ets:lookup(?MODULE, Key),
{ok, {Key, Msg}};
_ ->
{error, not_found}
graft({Key, Context}) ->
case dbread(Key) of
undefined ->
%% this *really* should not happen
lager:alert("unable to graft key ~p, could not find it",
[Key]),
{error, not_found};
Object ->
LocalContext = plumtree_test_object:context(Object),
case LocalContext =:= Context of
true -> {ok, Object};
false ->
lager:info("graft({~p, ~p}), context provided does not match local context ~p",
[Key, Context, LocalContext]),
stale
end
end.

%% Trigger an exchange between the local handler and the handler on the given node.
Expand All @@ -176,3 +176,24 @@ graft(MsgId) ->
-spec exchange(node()) -> {ok, pid()} | {error, term()}.
exchange(_Node) ->
{ok, self()}.

%% @private
-spec dbread(Key :: any()) -> any() | undefined.
dbread(Key) ->
case ets:lookup(?MODULE, Key) of
[{Key, Object}] ->
Object;
_ ->
undefined
end.

%% @private
-spec dbwrite(Key :: any(),
Value :: any()) -> any().
dbwrite(Key, Object) ->
ets:insert(?MODULE, {Key, Object}),
Object.

%% @private
this_server_id() -> node().

Loading