diff --git a/rebar.config b/rebar.config index f98d561..cdd649e 100644 --- a/rebar.config +++ b/rebar.config @@ -22,6 +22,12 @@ ]}, {docs, [ {deps, [edown]} + ]}, + {test, [ + {deps, [ + {dvv, ".*", + {git, "https://github.com/ricardobcl/Dotted-Version-Vectors.git", {tag, "1.0"}}} + ]} ]} ]}. diff --git a/src/plumtree_broadcast.erl b/src/plumtree_broadcast.erl index 19740b2..fa0293c 100644 --- a/src/plumtree_broadcast.erl +++ b/src/plumtree_broadcast.erl @@ -137,8 +137,6 @@ start_link() -> %% list is empty) InitEagers = Members, InitLazys = [], - plumtree_util:log(debug, "init peers, eager: ~p, lazy: ~p", - [InitEagers, InitLazys]), Mods = app_helper:get_env(plumtree, broadcast_mods, []), Res = start_link(Members, InitEagers, InitLazys, Mods, [{lazy_tick_period, LazyTickPeriod}, @@ -256,6 +254,8 @@ debug_get_tree(Root, Nodes) -> %% @private -spec init([[any()], ...]) -> {ok, state()}. init([AllMembers, InitEagers, InitLazys, Mods, Opts]) -> + plumtree_util:log(debug, "init ~p peers, eager: ~p, lazy: ~p", + [AllMembers, InitEagers, InitLazys]), LazyTickPeriod = proplists:get_value(lazy_tick_period, Opts), ExchangeTickPeriod = proplists:get_value(exchange_tick_period, Opts), schedule_lazy_tick(LazyTickPeriod), @@ -300,7 +300,8 @@ handle_cast({broadcast, MessageId, Message, Mod, Round, Root, From}, State) -> {noreply, State1}; handle_cast({prune, Root, From}, State) -> plumtree_util:log(debug, "received ~p", [{prune, Root, From}]), - plumtree_util:log(debug, "moving peer ~p from eager to lazy", [From]), + plumtree_util:log(debug, "moving peer ~p from eager to lazy on tree rooted at ~p", + [From, Root]), State1 = add_lazy(From, Root, State), {noreply, State1}; handle_cast({i_have, MessageId, Mod, Round, Root, From}, State) -> @@ -374,12 +375,25 @@ code_change(_OldVsn, State, _Extra) -> %%%=================================================================== handle_broadcast(false, _MessageId, _Message, Mod, _Round, Root, From, State) -> %% stale msg %% remove sender from eager and set as lazy - plumtree_util:log(debug, "moving peer ~p from eager to lazy", [From]), + plumtree_util:log(debug, "moving peer ~p from eager to lazy on tree rooted at ~p, requesting it to also do the same", + [From, Root]), State1 = add_lazy(From, Root, State), _ = send({prune, Root, myself()}, Mod, From), State1; +%% The next clause is designed to allow the callback to override the message id +%% after the merge, suppose node A eager pushed a change to node B, node B would then lazy +%% push it to node C, at this point the message id being sent to C is the one that originated +%% from A. Concurrently B takes an update that subsumes the previous one, now node C receives the +%% lazy push and hasn't seen this message and asks B to graft it, if C now sends the message id +%% that it got from A, node B will not answer the graft since it is deemed stale. +%% With this extra clause the callback is able to return a new message id that resulted from +%% the merge and have that be propagated. +handle_broadcast({true, MessageId}, _OldMessageId, Message, Mod, Round, Root, From, State) -> + handle_broadcast(true, MessageId, Message, Mod, Round, Root, From, State); handle_broadcast(true, MessageId, Message, Mod, Round, Root, From, State) -> %% valid msg %% remove sender from lazy and set as eager + plumtree_util:log(debug, "moving peer ~p from lazy to eager on tree rooted at ~p", + [From, Root]), State1 = add_eager(From, Root, State), State2 = eager_push(MessageId, Message, Mod, Round+1, Root, From, State1), schedule_lazy_push(MessageId, Mod, Round+1, Root, From, State2). @@ -390,6 +404,8 @@ handle_ihave(true, MessageId, Mod, Round, Root, From, State) -> %% stale i_have handle_ihave(false, MessageId, Mod, Round, Root, From, State) -> %% valid i_have %% TODO: don't graft immediately _ = send({graft, MessageId, Mod, Round, Root, myself()}, Mod, From), + plumtree_util:log(debug, "moving peer ~p from lazy to eager on tree rooted at ~p, graft requested from ~p", + [From, Root, From]), add_eager(From, Root, State). handle_graft(stale, MessageId, Mod, Round, Root, From, State) -> @@ -401,6 +417,8 @@ handle_graft({ok, Message}, MessageId, Mod, Round, Root, From, State) -> %% we don't ack outstanding here because the broadcast may fail to be delivered %% instead we will allow the i_have to be sent once more and let the subsequent %% ignore serve as the ack. + plumtree_util:log(debug, "moving peer ~p from lazy to eager on tree rooted at ~p", + [From, Root]), State1 = add_eager(From, Root, State), _ = send({broadcast, MessageId, Message, Mod, Round, Root, myself()}, Mod, From), State1; @@ -451,12 +469,12 @@ send_lazy(#state{outstanding=Outstanding}) -> [send_lazy(Peer, Messages) || {Peer, Messages} <- orddict:to_list(Outstanding)]. send_lazy(Peer, Messages) -> + plumtree_util:log(debug, "flushing ~p outstanding lazy pushes to peer ~p", + [ordsets:size(Messages), Peer]), [send_lazy(MessageId, Mod, Round, Root, Peer) || {MessageId, Mod, Round, Root} <- ordsets:to_list(Messages)]. send_lazy(MessageId, Mod, Round, Root, Peer) -> - plumtree_util:log(debug, "sending lazy push ~p", - [{i_have, MessageId, Mod, Round, Root, myself()}]), send({i_have, MessageId, Mod, Round, Root, myself()}, Mod, Peer). maybe_exchange(State) -> @@ -645,7 +663,7 @@ send(Msg, Mod, P) -> partisan_peer_service), PeerServiceManager = PeerService:manager(), instrument_transmission(Msg, Mod), - PeerServiceManager:forward_message(P, ?SERVER, Msg). + ok = PeerServiceManager:forward_message(P, ?SERVER, Msg). %% TODO: add debug logging %% gen_server:cast({?SERVER, P}, Msg). diff --git a/src/plumtree_broadcast_handler.erl b/src/plumtree_broadcast_handler.erl index 9e5bba7..a077907 100644 --- a/src/plumtree_broadcast_handler.erl +++ b/src/plumtree_broadcast_handler.erl @@ -24,7 +24,8 @@ %% Given the message id and payload, merge the message in the local state. %% If the message has already been received return `false', otherwise return `true' --callback merge(any(), any()) -> boolean(). +%% If a new message id is to be propagated after the merge return `{true, MessageId}` +-callback merge(any(), any()) -> boolean() | {true, any()}. %% Return true if the message (given the message id) has already been received. %% `false' otherwise diff --git a/test/plumtree_SUITE.erl b/test/plumtree_SUITE.erl index 6b03d33..638447d 100644 --- a/test/plumtree_SUITE.erl +++ b/test/plumtree_SUITE.erl @@ -161,8 +161,8 @@ membership_test(Config) -> lists:foreach(fun(_) -> {_, Node} = plumtree_test_utils:select_random(Nodes), ok = rpc:call(Node, - plumtree_broadcast, broadcast, - [{k, rand_compat:uniform()}, plumtree_test_broadcast_handler]) + plumtree_test_broadcast_handler, put, + [k, rand_compat:uniform()]) end, lists:seq(1, BroadcastRounds1)), %% allow 100ms per broadcast to settle timer:sleep(100 * BroadcastRounds1), @@ -177,8 +177,8 @@ membership_test(Config) -> lists:foreach(fun(_) -> {_, Node} = plumtree_test_utils:select_random(Nodes), ok = rpc:call(Node, - plumtree_broadcast, broadcast, - [{k, rand_compat:uniform()}, plumtree_test_broadcast_handler]) + plumtree_test_broadcast_handler, put, + [k, rand_compat:uniform()]) end, lists:seq(1, BroadcastRounds2)), %% allow 100ms per broadcast to settle timer:sleep(100 * BroadcastRounds1), @@ -225,8 +225,8 @@ broadcast_test(Config) -> lists:foreach(fun(_) -> {_, Node} = plumtree_test_utils:select_random(Nodes), ok = rpc:call(Node, - plumtree_broadcast, broadcast, - [{k, rand_compat:uniform()}, plumtree_test_broadcast_handler]) + plumtree_test_broadcast_handler, put, + [k, rand_compat:uniform()]) end, lists:seq(1, BroadcastRounds1)), %% allow 500ms per broadcast to settle timer:sleep(200 * BroadcastRounds1), @@ -238,13 +238,13 @@ broadcast_test(Config) -> Rand = rand_compat:uniform(), {_, RandomNode} = plumtree_test_utils:select_random(Nodes), ok = rpc:call(RandomNode, - plumtree_broadcast, broadcast, - [{k, Rand}, plumtree_test_broadcast_handler]), + plumtree_test_broadcast_handler, put, + [k, Rand]), ct:pal("requested node ~p to broadcast {k, ~p}", [RandomNode, Rand]), VerifyFun = fun(Node, Rand0) -> - case rpc:call(Node, plumtree_test_broadcast_handler, read, [k]) of + case rpc:call(Node, plumtree_test_broadcast_handler, get, [k]) of {error, not_found} -> {false, not_found}; {ok, NodeRand} when NodeRand =:= Rand0 -> true; @@ -406,7 +406,9 @@ start(_Case, Config, Options) -> StartFun = fun({_Name, Node}) -> %% Start plumtree. - {ok, _} = rpc:call(Node, plumtree, start, []) + {ok, _} = rpc:call(Node, plumtree, start, []), + %% set debug log level for test run + ok = rpc:call(Node, lager, set_loglevel, [{lager_file_backend,"log/console.log"}, debug]) end, lists:foreach(StartFun, Nodes), diff --git a/test/plumtree_test_broadcast_handler.erl b/test/plumtree_test_broadcast_handler.erl index 0bccc32..0e61432 100644 --- a/test/plumtree_test_broadcast_handler.erl +++ b/test/plumtree_test_broadcast_handler.erl @@ -30,6 +30,7 @@ is_stale/1, graft/1, exchange/1]). + %% gen_server callbacks -export([init/1, handle_call/3, @@ -40,10 +41,11 @@ %% API -export([start_link/0, - read/1]). + get/1, + put/2]). -record(state, {}). --type state() :: #state{}. +-type state() :: #state{}. -spec start_link() -> ok. start_link() -> @@ -51,19 +53,23 @@ start_link() -> [], []), ok. --spec read(Key :: any()) -> {ok, any()} | {error, not_found}. -read(Key) -> - case ets:lookup(?MODULE, Key) of - [{Key, Value}] -> - % lager:info("read key ~p: ~p", - % [Key, Value]), - {ok, Value}; - _ -> - lager:info("unable to find key: ~p", - [Key]), - {error, not_found} +-spec get(Key :: any()) -> {error, not_found} | {ok, any()}. +get(Key) -> + case dbread(Key) of + undefined -> {error, not_found}; + Obj -> + {ok, plumtree_test_object:value(Obj)} end. +-spec put(Key :: any(), + Value :: any()) -> ok. +put(Key, Value) -> + Existing = dbread(Key), + UpdatedObj = plumtree_test_object:modify(Existing, Value, this_server_id()), + dbwrite(Key, UpdatedObj), + plumtree_broadcast:broadcast({Key, UpdatedObj}, plumtree_test_broadcast_handler), + ok. + %%%=================================================================== %%% gen_server callbacks %%%=================================================================== @@ -71,9 +77,6 @@ read(Key) -> %% @private -spec init([[any()], ...]) -> {ok, state()}. init([]) -> - msgs_seen = ets:new(msgs_seen, [named_table, set, public, - {keypos, 1}, - {read_concurrency, true}]), ?MODULE = ets:new(?MODULE, [named_table, set, public, {keypos, 1}, {read_concurrency, true}]), @@ -111,61 +114,58 @@ code_change(_OldVsn, State, _Extra) -> %% Return a two-tuple of message id and payload from a given broadcast -spec broadcast_data(any()) -> {any(), any()}. -broadcast_data({Key, _Value} = Data) -> - MsgId = erlang:phash2(Data), +broadcast_data({Key, Object}) -> + MsgId = {Key, plumtree_test_object:context(Object)}, lager:info("broadcast_data(~p), msg id: ~p", - [Data, MsgId]), - true = ets:insert(msgs_seen, {MsgId, Key}), - true = ets:insert(?MODULE, Data), - {MsgId, Data}. + [Object, MsgId]), + {MsgId, Object}. %% Given the message id and payload, merge the message in the local state. %% If the message has already been received return `false', otherwise return `true' --spec merge(any(), any()) -> boolean(). -merge(MsgId, {Key, _Value} = Payload) -> - case ets:lookup(msgs_seen, MsgId) of - [{MsgId, _}] -> - lager:info("msg with id ~p has already been seen", - [MsgId]), - false; - _ -> - lager:info("merging(~p, ~p) in local state", - [MsgId, Payload]), - %% insert the message in the local state - true = ets:insert(?MODULE, Payload), - %% mark this message as been seen - true = ets:insert_new(msgs_seen, {MsgId, Key}), - true +-spec merge(any(), any()) -> boolean() | {true, any()}. +merge({Key, _Context} = MsgId, RemoteObj) -> + Existing = dbread(Key), + lager:info("merge msg id ~p, remote object: ~p, existing object: ~p", + [MsgId, RemoteObj, Existing]), + case plumtree_test_object:reconcile(RemoteObj, Existing) of + false -> false; + {true, Reconciled} -> + lager:info("merge object has ben reconciled to ~p", + [Reconciled]), + dbwrite(Key, Reconciled), + {true, {Key, plumtree_test_object:context(Reconciled)}} end. %% Return true if the message (given the message id) has already been received. %% `false' otherwise -spec is_stale(any()) -> boolean(). -is_stale(MsgId) -> - case ets:lookup(msgs_seen, MsgId) of - [{MsgId, _}] -> - lager:info("is_stale(~p): ~p", - [MsgId, true]), - true; - _ -> - lager:info("is_stale(~p): ~p", - [MsgId, false]), - false +is_stale({Key, Context}) -> + case dbread(Key) of + undefined -> false; + Existing -> + plumtree_test_object:is_stale(Context, Existing) end. %% Return the message associated with the given message id. In some cases a message %% has already been sent with information that subsumes the message associated with the given %% message id. In this case, `stale' is returned. -spec graft(any()) -> stale | {ok, any()} | {error, any()}. -graft(MsgId) -> - % lager:info("graft(~p)", - % [MsgId]), - case ets:lookup(msgs_seen, MsgId) of - [{MsgId, Key}] -> - [{Key,Msg}] = ets:lookup(?MODULE, Key), - {ok, {Key, Msg}}; - _ -> - {error, not_found} +graft({Key, Context}) -> + case dbread(Key) of + undefined -> + %% this *really* should not happen + lager:alert("unable to graft key ~p, could not find it", + [Key]), + {error, not_found}; + Object -> + LocalContext = plumtree_test_object:context(Object), + case LocalContext =:= Context of + true -> {ok, Object}; + false -> + lager:info("graft({~p, ~p}), context provided does not match local context ~p", + [Key, Context, LocalContext]), + stale + end end. %% Trigger an exchange between the local handler and the handler on the given node. @@ -176,3 +176,24 @@ graft(MsgId) -> -spec exchange(node()) -> {ok, pid()} | {error, term()}. exchange(_Node) -> {ok, self()}. + +%% @private +-spec dbread(Key :: any()) -> any() | undefined. +dbread(Key) -> + case ets:lookup(?MODULE, Key) of + [{Key, Object}] -> + Object; + _ -> + undefined + end. + +%% @private +-spec dbwrite(Key :: any(), + Value :: any()) -> any(). +dbwrite(Key, Object) -> + ets:insert(?MODULE, {Key, Object}), + Object. + +%% @private +this_server_id() -> node(). + diff --git a/test/plumtree_test_object.erl b/test/plumtree_test_object.erl new file mode 100644 index 0000000..e9c9b19 --- /dev/null +++ b/test/plumtree_test_object.erl @@ -0,0 +1,118 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +-module(plumtree_test_object). + +%% API +-export([context/1, + value/1, + values/1, + modify/3, + modify/4, + is_stale/2, + reconcile/2]). + +-type test_object() :: dvvset:clock(). +-type test_context() :: dvvset:vector(). +-type test_value() :: any(). + +%% @doc returns a single value. if the object holds more than one value an error is generated +%% @see values/2 +-spec value(test_object()) -> test_value(). +value(Object) -> + [Value] = values(Object), + Value. + +%% @doc returns a list of values held in the object +-spec values(test_object()) -> [test_value()]. +values(Object) -> + [Value || {Value, _Ts} <- dvvset:values(Object)]. + +%% @doc returns the context (opaque causal history) for the given object +-spec context(test_object()) -> test_context(). +context(Object) -> + dvvset:join(Object). + +%% @doc modifies a potentially existing object, setting its value and updating +%% the causual history. +-spec modify(test_object() | undefined, + test_value(), + term()) -> test_object(). +modify(undefined, Value, ServerId) -> + modify(undefined, undefined, Value, ServerId); +modify(Object, Value, ServerId) -> + modify(Object, context(Object), Value, ServerId). + +-spec modify(test_object() | undefined, + test_context(), + test_value(), + term()) -> test_object(). +modify(undefined, _Context, Value, ServerId) -> + %% Ignore the context since we dont have a value, its invalid if not + %% empty anyways, so give it a valid one + NewRecord = dvvset:new(timestamped_value(Value)), + dvvset:update(NewRecord, ServerId); +modify(Existing, Context, Value, ServerId) -> + InsertRec = dvvset:new(Context, timestamped_value(Value)), + dvvset:update(InsertRec, Existing, ServerId). + +%% @doc Determines if the given context (version vector) is causually newer than +%% an existing object. If the object missing or if the context does not represent +%% an ancestor of the current key, false is returned. Otherwise, when the context +%% does represent an ancestor of the existing object or the existing object itself, +%% true is returned +%% @private +is_stale(RemoteContext, Obj) -> + LocalContext = context(Obj), + %% returns true (stale) when local context is causally newer or equal to remote context + descends(LocalContext, RemoteContext). + +%% @doc Reconciles a remote object received during replication or anti-entropy +%% with a local object. If the remote object is an anscestor of or is equal to the local one +%% `false' is returned, otherwise the reconciled object is returned as the second +%% element of the two-tuple +reconcile(RemoteObj, undefined) -> + {true, RemoteObj}; +reconcile(undefined, _LocalObj) -> + false; +reconcile(RemoteObj, LocalObj) -> + Less = dvvset:less(RemoteObj, LocalObj), + Equal = dvvset:equal(RemoteObj, LocalObj), + case not (Equal or Less) of + false -> false; + true -> + LWW = fun ({_,TS1}, {_,TS2}) -> TS1 =< TS2 end, + {true, dvvset:lww(LWW, dvvset:sync([LocalObj, RemoteObj]))} + end. + +%% @private +descends(_, []) -> + true; +descends(Ca, Cb) -> + [{NodeB, CtrB} | RestB] = Cb, + case lists:keyfind(NodeB, 1, Ca) of + false -> false; + {_, CtrA} -> + (CtrA >= CtrB) andalso descends(Ca, RestB) + end. + +%% @private +timestamped_value(Value) -> + {Value, os:timestamp()}. +