Skip to content

Commit 73cadf3

Browse files
committed
Allow message id override on merge
When performing a broadcast merge it might be the case that the application wants to return a new message id that results from the merge and have that broadcast instead of the originally received message id. Support this by handling an extra return tuple from the callback.
1 parent 41f453e commit 73cadf3

3 files changed

+14
-3
lines changed

src/plumtree_broadcast.erl

+10
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,16 @@ handle_broadcast(false, _MessageId, _Message, Mod, _Round, Root, From, State) ->
380380
State1 = add_lazy(From, Root, State),
381381
_ = send({prune, Root, myself()}, Mod, From),
382382
State1;
383+
%% The next clause is designed to allow the callback to override the message id
384+
%% after the merge, suppose node A eager pushed a change to node B, node B would then lazy
385+
%% push it to node C, at this point the message id being sent to C is the one that originated
386+
%% from A. Concurrently B takes an update that subsumes the previous one, now node C receives the
387+
%% lazy push and hasn't seen this message and asks B to graft it, if C now sends the message id
388+
%% that it got from A, node B will not answer the graft since it is deemed stale.
389+
%% With this extra clause the callback is able to return a new message id that resulted from
390+
%% the merge and have that be propagated.
391+
handle_broadcast({true, MessageId}, _OldMessageId, Message, Mod, Round, Root, From, State) ->
392+
handle_broadcast(true, MessageId, Message, Mod, Round, Root, From, State);
383393
handle_broadcast(true, MessageId, Message, Mod, Round, Root, From, State) -> %% valid msg
384394
%% remove sender from lazy and set as eager
385395
plumtree_util:log(debug, "moving peer ~p from lazy to eager on tree rooted at ~p",

src/plumtree_broadcast_handler.erl

+2-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@
2424

2525
%% Given the message id and payload, merge the message in the local state.
2626
%% If the message has already been received return `false', otherwise return `true'
27-
-callback merge(any(), any()) -> boolean().
27+
%% If a new message id is to be propagated after the merge return `{true, MessageId}`
28+
-callback merge(any(), any()) -> boolean() | {true, any()}.
2829

2930
%% Return true if the message (given the message id) has already been received.
3031
%% `false' otherwise

test/plumtree_test_broadcast_handler.erl

+2-2
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ broadcast_data({Key, Object}) ->
122122

123123
%% Given the message id and payload, merge the message in the local state.
124124
%% If the message has already been received return `false', otherwise return `true'
125-
-spec merge(any(), any()) -> boolean().
125+
-spec merge(any(), any()) -> boolean() | {true, any()}.
126126
merge({Key, _Context} = MsgId, RemoteObj) ->
127127
Existing = dbread(Key),
128128
lager:info("merge msg id ~p, remote object: ~p, existing object: ~p",
@@ -133,7 +133,7 @@ merge({Key, _Context} = MsgId, RemoteObj) ->
133133
lager:info("merge object has ben reconciled to ~p",
134134
[Reconciled]),
135135
dbwrite(Key, Reconciled),
136-
true
136+
{true, {Key, plumtree_test_object:context(Reconciled)}}
137137
end.
138138

139139
%% Return true if the message (given the message id) has already been received.

0 commit comments

Comments
 (0)