Skip to content

Commit

Permalink
search: combine callbacks in case of multiple equivalent puts
Browse files Browse the repository at this point in the history
  • Loading branch information
aberaud committed Nov 20, 2024
1 parent c3bb16e commit c73c8c7
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 20 deletions.
2 changes: 1 addition & 1 deletion src/dht.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ Dht::shutdown(ShutdownCallback cb, bool stop)
r.second.done_cb(false, {});
sr.second->callbacks.clear();
for (const auto& a : sr.second->announce) {
if (a.callback) a.callback(false, {});
for (auto& cb : a.callbacks) cb(false, {});
}
sr.second->announce.clear();
sr.second->listeners.clear();
Expand Down
45 changes: 26 additions & 19 deletions src/search.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ struct Dht::Announce {
bool permanent;
Sp<Value> value;
time_point created;
DoneCallback callback;
std::vector<DoneCallback> callbacks;
};

struct Dht::SearchNode {
Expand Down Expand Up @@ -442,8 +442,9 @@ struct Dht::Search {
g.second.done_cb = {};
}
for (auto& a : announce) {
a.callback(false, {});
a.callback = {};
for (auto& cb : a.callbacks)
cb(false, {});
a.callbacks.clear();
}
}

Expand Down Expand Up @@ -627,32 +628,36 @@ struct Dht::Search {
return a.value->id == value->id;
});
if (a_sr == announce.end()) {
announce.emplace_back(Announce {permanent, value, created, callback});
auto& a = announce.emplace_back(Announce {permanent, value, created, {}} );
if (callback)
a.callbacks.emplace_back(std::move(callback));
for (auto& n : nodes) {
n->probe_query.reset();
n->acked[value->id].req.reset();
}
} else {
a_sr->permanent = permanent;
a_sr->created = created;
if (a_sr->value != value) {
if (a_sr->value != value && *a_sr->value != *value) {
// Value is updated, previous ops are failed
a_sr->value = value;
for (auto& n : nodes) {
n->acked[value->id].req.reset();
n->probe_query.reset();
}
}
if (isAnnounced(value->id)) {
if (a_sr->callback)
a_sr->callback(true, {});
a_sr->callback = {};
for (auto& cb: a_sr->callbacks)
cb(false, {});
a_sr->callbacks.clear();
if (callback)
a_sr->callbacks.emplace_back(std::move(callback));
} else if (isAnnounced(value->id)) {
// Same value, already announced
if (callback)
callback(true, {});
return;
} else {
if (a_sr->callback)
a_sr->callback(false, {});
a_sr->callback = callback;
// Same value, not announced yet
if (callback)
a_sr->callbacks.emplace_back(std::move(callback));
}
}
}
Expand Down Expand Up @@ -722,8 +727,8 @@ struct Dht::Search {
std::vector<DoneCallback> a_cbs;
a_cbs.reserve(announce.size());
for (auto ait = announce.begin() ; ait != announce.end(); ) {
if (ait->callback)
a_cbs.emplace_back(std::move(ait->callback));
a_cbs.insert(a_cbs.end(), std::make_move_iterator(ait->callbacks.begin()), std::make_move_iterator(ait->callbacks.end()));
ait->callbacks.clear();
if (not ait->permanent)
ait = announce.erase(ait);
else
Expand All @@ -747,9 +752,11 @@ struct Dht::Search {
if (vid != Value::INVALID_ID and (!a.value || a.value->id != vid))
return true;
if (isAnnounced(a.value->id)) {
if (a.callback) {
a.callback(true, getNodes());
a.callback = nullptr;
if (!a.callbacks.empty()) {
const auto& nodes = getNodes();
for (auto& cb : a.callbacks)
cb(true, nodes);
a.callbacks.clear();
}
if (not a.permanent)
return false;
Expand Down
28 changes: 28 additions & 0 deletions tests/dhtrunnertester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,34 @@ DhtRunnerTester::testPutDuplicate() {
}


void
DhtRunnerTester::testPutOverride() {
auto key = dht::InfoHash::get("123");
auto val = std::make_shared<dht::Value>("meh");
val->id = 42;
auto val2 = std::make_shared<dht::Value>("hey");
val2->id = 42;
CPPUNIT_ASSERT_EQUAL(val->id, val2->id);
auto val_data = val2->data;
std::promise<bool> p1;
std::promise<bool> p2;
node2.put(key, val, [&](bool ok){
p1.set_value(ok);
});
node2.put(key, val2, [&](bool ok){
p2.set_value(ok);
});
auto p1ret = p1.get_future().get();
auto p2ret = p2.get_future().get();
CPPUNIT_ASSERT(!p1ret);
CPPUNIT_ASSERT(p2ret);
auto vals = node1.get(key).get();
CPPUNIT_ASSERT(not vals.empty());
CPPUNIT_ASSERT(vals.size() == 1);
CPPUNIT_ASSERT(vals.front()->data == val_data);
}


void
DhtRunnerTester::testListen() {
std::mutex mutex;
Expand Down
5 changes: 5 additions & 0 deletions tests/dhtrunnertester.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class DhtRunnerTester : public CppUnit::TestFixture {
CPPUNIT_TEST(testConstructors);
CPPUNIT_TEST(testGetPut);
CPPUNIT_TEST(testPutDuplicate);
CPPUNIT_TEST(testPutOverride);
CPPUNIT_TEST(testListen);
CPPUNIT_TEST(testListenLotOfBytes);
CPPUNIT_TEST(testIdOps);
Expand Down Expand Up @@ -60,6 +61,10 @@ class DhtRunnerTester : public CppUnit::TestFixture {
* Test get and multiple put
*/
void testPutDuplicate();
/**
* Test get and multiple put with changing value
*/
void testPutOverride();
/**
* Test listen method
*/
Expand Down

0 comments on commit c73c8c7

Please sign in to comment.