Skip to content

Commit

Permalink
search: combine callbacks in case of multiple equivalent puts
Browse files Browse the repository at this point in the history
  • Loading branch information
aberaud committed Nov 19, 2024
1 parent c4857bf commit 1d3185a
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 19 deletions.
2 changes: 1 addition & 1 deletion src/dht.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ Dht::shutdown(ShutdownCallback cb, bool stop)
r.second.done_cb(false, {});
sr.second->callbacks.clear();
for (const auto& a : sr.second->announce) {
if (a.callback) a.callback(false, {});
for (auto& cb : a.callbacks) cb(false, {});
}
sr.second->announce.clear();
sr.second->listeners.clear();
Expand Down
43 changes: 25 additions & 18 deletions src/search.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ struct Dht::Announce {
bool permanent;
Sp<Value> value;
time_point created;
DoneCallback callback;
std::vector<DoneCallback> callbacks;
};

struct Dht::SearchNode {
Expand Down Expand Up @@ -442,8 +442,9 @@ struct Dht::Search {
g.second.done_cb = {};
}
for (auto& a : announce) {
a.callback(false, {});
a.callback = {};
for (auto& cb : a.callbacks)
cb(false, {});
a.callbacks.clear();
}
}

Expand Down Expand Up @@ -627,7 +628,9 @@ struct Dht::Search {
return a.value->id == value->id;
});
if (a_sr == announce.end()) {
announce.emplace_back(Announce {permanent, value, created, callback});
auto& a = announce.emplace_back(Announce {permanent, value, created, {}} );
if (callback)
a.callbacks.emplace_back(std::move(callback));
for (auto& n : nodes) {
n->probe_query.reset();
n->acked[value->id].req.reset();
Expand All @@ -636,23 +639,25 @@ struct Dht::Search {
a_sr->permanent = permanent;
a_sr->created = created;
if (a_sr->value != value) {
// Value is updated, previous ops are failed
a_sr->value = value;
for (auto& n : nodes) {
n->acked[value->id].req.reset();
n->probe_query.reset();
}
}
if (isAnnounced(value->id)) {
if (a_sr->callback)
a_sr->callback(true, {});
a_sr->callback = {};
for (auto& cb: a_sr->callbacks)
cb(false, {});
a_sr->callbacks.clear();
if (callback)
a_sr->callbacks.emplace_back(std::move(callback));
} else if (isAnnounced(value->id)) {
// Same value, already announced
if (callback)
callback(true, {});
return;
} else {
if (a_sr->callback)
a_sr->callback(false, {});
a_sr->callback = callback;
// Same value, not announced yet
if (callback)
a_sr->callbacks.emplace_back(std::move(callback));
}
}
}
Expand Down Expand Up @@ -722,8 +727,8 @@ struct Dht::Search {
std::vector<DoneCallback> a_cbs;
a_cbs.reserve(announce.size());
for (auto ait = announce.begin() ; ait != announce.end(); ) {
if (ait->callback)
a_cbs.emplace_back(std::move(ait->callback));
a_cbs.insert(a_cbs.end(), std::make_move_iterator(ait->callbacks.begin()), std::make_move_iterator(ait->callbacks.end()));
ait->callbacks.clear();
if (not ait->permanent)
ait = announce.erase(ait);
else
Expand All @@ -747,9 +752,11 @@ struct Dht::Search {
if (vid != Value::INVALID_ID and (!a.value || a.value->id != vid))
return true;
if (isAnnounced(a.value->id)) {
if (a.callback) {
a.callback(true, getNodes());
a.callback = nullptr;
if (!a.callbacks.empty()) {
const auto& nodes = getNodes();
for (auto& cb : a.callbacks)
cb(true, nodes);
a.callbacks.clear();
}
if (not a.permanent)
return false;
Expand Down

0 comments on commit 1d3185a

Please sign in to comment.