Skip to content

Commit

Permalink
Catch exception of the deserializeNodes method
Browse files Browse the repository at this point in the history
Signed-off-by: Lucas Dias <[email protected]>
  • Loading branch information
lucassdiass committed Dec 10, 2024
2 parents 6e296b7 + 8f1c075 commit 1e8a0aa
Show file tree
Hide file tree
Showing 12 changed files with 124 additions and 64 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ include(FindPkgConfig)
include(cmake/CheckAtomic.cmake)

set (opendht_VERSION_MAJOR 2)
set (opendht_VERSION_MINOR 4.2)
set (opendht_VERSION_MINOR 4.3)
set (opendht_VERSION ${opendht_VERSION_MAJOR}.${opendht_VERSION_MINOR})
set (PACKAGE_VERSION ${opendht_VERSION})
set (VERSION "${opendht_VERSION}")
Expand Down
6 changes: 5 additions & 1 deletion c/opendht.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,11 @@ bool dht_infohash_is_zero(const dht_infohash* h) {
}

void dht_infohash_from_hex(dht_infohash* h, const char* dat) {
*h = dht_infohash_to_c(dht::InfoHash(std::string(dat, HASH_LEN*2)));
*h = dht_infohash_to_c(dht::InfoHash(std::string_view(dat, HASH_LEN*2)));
}

void dht_infohash_from_hex_null(dht_infohash* h, const char* dat) {
*h = dht_infohash_to_c(dht::InfoHash(std::string_view(dat)));
}

const char* dht_pkid_print(const dht_pkid* h) {
Expand Down
1 change: 1 addition & 0 deletions c/opendht_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ typedef struct dht_infohash dht_infohash;
OPENDHT_C_PUBLIC void dht_infohash_zero(dht_infohash* h);
OPENDHT_C_PUBLIC void dht_infohash_random(dht_infohash* h);
OPENDHT_C_PUBLIC void dht_infohash_from_hex(dht_infohash* h, const char* dat);
OPENDHT_C_PUBLIC void dht_infohash_from_hex_null(dht_infohash* h, const char* dat);
OPENDHT_C_PUBLIC void dht_infohash_get(dht_infohash* h, const uint8_t* dat, size_t dat_size);
OPENDHT_C_PUBLIC void dht_infohash_get_from_string(dht_infohash* h, const char* str);
OPENDHT_C_PUBLIC const char* dht_infohash_print(const dht_infohash* h);
Expand Down
2 changes: 1 addition & 1 deletion configure.ac
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
dnl define macros
m4_define([opendht_major_version], 2)
m4_define([opendht_minor_version], 4)
m4_define([opendht_patch_version], 2)
m4_define([opendht_patch_version], 3)
m4_define([opendht_version],
[opendht_major_version.opendht_minor_version.opendht_patch_version])

Expand Down
3 changes: 3 additions & 0 deletions include/opendht/callbacks.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ struct OPENDHT_PUBLIC Config {
/* If non-0, overrides the default maximum store size. -1 means no limit. */
ssize_t max_store_size {0};

/* If non-0, overrides the default maximum store key count. -1 means no limit. */
ssize_t max_store_keys {0};

/**
* Use appropriate bahavior for a public IP, stable node:
* - No connectivity change triggered when a search fails
Expand Down
2 changes: 1 addition & 1 deletion include/opendht/dht.h
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ class OPENDHT_PUBLIC Dht final : public DhtInterface {
static constexpr unsigned LISTEN_NODES {4};

/* The maximum number of hashes we're willing to track. */
static constexpr unsigned MAX_HASHES {1024 * 1024};
static constexpr unsigned MAX_HASHES {1024 * 1024 * 1024};

/* The maximum number of searches we keep data about. */
static constexpr unsigned MAX_SEARCHES {1024 * 1024};
Expand Down
3 changes: 2 additions & 1 deletion src/dht.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1793,7 +1793,8 @@ Dht::Dht(std::unique_ptr<net::DatagramSocket>&& sock, const Config& config, cons
myid(config.node_id ? config.node_id : InfoHash::getRandom(rd)),
store(),
store_quota(),
max_store_keys(config.max_store_size ? (int)config.max_store_size : MAX_HASHES),
max_store_keys(config.max_store_keys ? (int)config.max_store_keys : MAX_HASHES),
max_store_size(config.max_store_size ? (int)config.max_store_size : DEFAULT_STORAGE_LIMIT),
max_searches(config.max_searches ? (int)config.max_searches : MAX_SEARCHES),
network_engine(myid, fromDhtConfig(config), std::move(sock), logger_, rd, scheduler,
std::bind(&Dht::onError, this, _1, _2),
Expand Down
50 changes: 32 additions & 18 deletions src/network_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -519,8 +519,15 @@ NetworkEngine::process(std::unique_ptr<ParsedMessage>&& msg, const SockAddr& fro
throw DhtProtocolException {DhtProtocolException::UNKNOWN_TID, "Can't find socket", msg->id};
node->received(now, {});
onNewNode(node, 2);
deserializeNodes(*msg, from);
rsocket->on_receive(node, std::move(*msg));
try {
deserializeNodes(*msg, from);
rsocket->on_receive(node, std::move(*msg));
} catch (DhtProtocolException &ex) {
if (logIncoming_)
if (logger_)
logger_->ERR("Exception in deserializeNodes: %s, code: %u", ex.getMsg().c_str(), ex.getCode());
return;
}
}
else if (msg->type == MessageType::Error or msg->type == MessageType::Reply) {
auto rsocket = node->getSocket(msg->tid);
Expand Down Expand Up @@ -576,26 +583,32 @@ NetworkEngine::process(std::unique_ptr<ParsedMessage>&& msg, const SockAddr& fro
}
case MessageType::Reply:
if (req) { /* request reply */
auto& r = *req;
if (r.getType() == MessageType::AnnounceValue
or r.getType() == MessageType::Listen
or r.getType() == MessageType::Refresh) {
r.node->authSuccess();
}
r.reply_time = scheduler.time();
try {
deserializeNodes(*msg, from);
auto& r = *req;
if (r.getType() == MessageType::AnnounceValue
or r.getType() == MessageType::Listen
or r.getType() == MessageType::Refresh) {
r.node->authSuccess();
}
r.reply_time = scheduler.time();
deserializeNodes(*msg, from);
r.setDone(std::move(*msg));
} catch (...) {
req->node->authError();
}
} catch (DhtProtocolException &ex) {
if (logIncoming_)
if (logger_)
logger_->ERR("Exception in deserializeNodes: %s, code: %u", ex.getMsg().c_str(), ex.getCode());
return;
}
break;
} else { /* request socket data */
try {
try {
deserializeNodes(*msg, from);
rsocket->on_receive(node, std::move(*msg));
} catch(DhtProtocolException &ex) {
node->authError();
} catch (DhtProtocolException &ex) {
if (logIncoming_)
if (logger_)
logger_->ERR("Exception in deserializeNodes: %s, code: %u", ex.getMsg().c_str(), ex.getCode());
return;
}
}
break;
Expand Down Expand Up @@ -1162,11 +1175,12 @@ NetworkEngine::sendAnnounceValue(const Sp<Node>& n,
msgpack::packer<msgpack::sbuffer> pk(&buffer);
pk.pack_map(5+(config.network?1:0));

pk.pack(KEY_A); pk.pack_map((created < scheduler.time() ? 5 : 4));
bool add_created = created < scheduler.time();
pk.pack(KEY_A); pk.pack_map(add_created ? 5 : 4);
pk.pack(KEY_REQ_ID); pk.pack(myid);
pk.pack(KEY_REQ_H); pk.pack(infohash);
auto v = packValueHeader(buffer, {value});
if (created < scheduler.time()) {
if (add_created) {
pk.pack(KEY_REQ_CREATION);
pk.pack(to_time_t(created));
}
Expand Down
2 changes: 1 addition & 1 deletion src/node_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ NodeCache::NodeMap::getNode(const InfoHash& id, const SockAddr& addr, time_point
cleanup();
cleanup_counter = 0;
}
} else if (confirm and node->isOld(now)) {
} else if (confirm or node->isOld(now)) {
node->update(addr);
}
return node;
Expand Down
2 changes: 1 addition & 1 deletion src/search.h
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ struct Dht::SearchNode {
* @return true if there exists an expired request, else false.
*/
static bool pending(const SyncStatus& status) {
return std::find_if(status.begin(), status.end(),
return std::find_if(status.cbegin(), status.cend(),
[](const SyncStatus::value_type& r){
return r.second and r.second->pending();
}) != status.cend();
Expand Down
32 changes: 23 additions & 9 deletions tests/dhtrunnertester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ DhtRunnerTester::setUp() {
dht::DhtRunner::Config config;
config.dht_config.node_config.max_peer_req_per_sec = -1;
config.dht_config.node_config.max_req_per_sec = -1;
config.dht_config.node_config.max_store_size = -1;
config.dht_config.node_config.max_store_keys = -1;

node1.run(0, config);
node2.run(0, config);
Expand Down Expand Up @@ -105,23 +107,35 @@ DhtRunnerTester::testListen() {
constexpr unsigned N = 2048;
constexpr unsigned SZ = 56 * 1024;

auto ftokena = node1.listen(a, [&](const std::shared_ptr<dht::Value>&) {
valueCount++;
auto ftokena = node1.listen(a, [&](const std::vector<std::shared_ptr<dht::Value>>& values, bool expired) {
if (expired)
valueCount -= values.size();
else
valueCount += values.size();
return true;
});

auto ftokenb = node1.listen(b, [&](const std::shared_ptr<dht::Value>&) {
valueCount++;
auto ftokenb = node1.listen(b, [&](const std::vector<std::shared_ptr<dht::Value>>& values, bool expired) {
if (expired)
valueCount -= values.size();
else
valueCount += values.size();
return false;
});

auto ftokenc = node1.listen(c, [&](const std::shared_ptr<dht::Value>&) {
valueCount++;
auto ftokenc = node1.listen(c, [&](const std::vector<std::shared_ptr<dht::Value>>& values, bool expired) {
if (expired)
valueCount -= values.size();
else
valueCount += values.size();
return true;
});

auto ftokend = node1.listen(d, [&](const std::shared_ptr<dht::Value>&) {
valueCount++;
auto ftokend = node1.listen(d, [&](const std::vector<std::shared_ptr<dht::Value>>& values, bool expired) {
if (expired)
valueCount -= values.size();
else
valueCount += values.size();
return true;
});

Expand All @@ -145,7 +159,7 @@ DhtRunnerTester::testListen() {
});
auto bigVal = std::make_shared<dht::Value>();
bigVal->data = mtu;
node2.put(c, bigVal, [&](bool ok) {
node2.put(c, std::move(bigVal), [&](bool ok) {
std::lock_guard<std::mutex> lock(mutex);
putCount++;
if (ok) putOkCount3++;
Expand Down
83 changes: 53 additions & 30 deletions tools/dhtcnode.c
Original file line number Diff line number Diff line change
Expand Up @@ -120,19 +120,20 @@ struct dht_params {
};

static const struct option long_options[] = {
{"help", no_argument , NULL, 'h'},
{"port", required_argument, NULL, 'p'},
{"net", required_argument, NULL, 'n'},
{"bootstrap", required_argument, NULL, 'b'},
{"identity", no_argument , NULL, 'i'},
{"verbose", no_argument , NULL, 'v'},
{"service", no_argument , NULL, 's'},
{"peer-discovery", no_argument , NULL, 'D'},
{"no-rate-limit", no_argument , NULL, 'U'},
{"persist", required_argument, NULL, 'f'},
{"logfile", required_argument, NULL, 'l'},
{"syslog", no_argument , NULL, 'L'},
{NULL, 0 , NULL, 0}
{"help", no_argument , NULL, 'h'},
{"port", required_argument, NULL, 'p'},
{"net", required_argument, NULL, 'n'},
{"bootstrap", required_argument, NULL, 'b'},
{"identity", no_argument , NULL, 'i'},
{"verbose", no_argument , NULL, 'v'},
{"service", no_argument , NULL, 's'},
{"peer-discovery", no_argument , NULL, 'D'},
{"no-rate-limit", no_argument , NULL, 'U'},
{"persist", required_argument, NULL, 'f'},
{"logfile", required_argument, NULL, 'l'},
{"syslog", no_argument , NULL, 'L'},
{"version", no_argument , NULL, 'V'},
{NULL, 0 , NULL, 0}
};

struct dht_params
Expand Down Expand Up @@ -169,19 +170,35 @@ parse_args(int argc, char **argv) {
case 's':
params.service = true;
break;
case 'V':
params.version = true;
break;
default:
break;
}
}
return params;
}

dht_infohash parse_key(const char* key_str) {
dht_infohash key;
dht_infohash_from_hex_null(&key, key_str);
if (dht_infohash_is_zero(&key)) {
dht_infohash_get_from_string(&key, key_str);
printf("Using h(%s) = %s\n", key_str, dht_infohash_print(&key));
}
return key;
}

int main(int argc, char **argv)
{
printf("OpenDHT version %s\n", dht_version());

struct dht_params params = parse_args(argc, argv);

if (params.version) {
printf("OpenDHT version %s\n", dht_version());
return EXIT_SUCCESS;
}

dht_runner* runner = dht_runner_new();
dht_runner_config dht_config;
dht_runner_config_default(&dht_config);
Expand All @@ -199,6 +216,7 @@ int main(int argc, char **argv)
char cmd[64];
char arg[64];
char value[256];
dht_infohash key;
while (true) {
const char* line_read = readline("> ");
if (!line_read)
Expand All @@ -225,40 +243,45 @@ int main(int argc, char **argv)
free(addrs);
}
continue;
} else if (!strcmp(cmd, "ll")) {
dht_infohash node_id = dht_runner_get_node_id(runner);
printf("DHT node %s running on port %u\n", dht_infohash_print(&node_id), dht_runner_get_bound_port(runner, AF_INET));
continue;
}

dht_infohash key;
dht_infohash_from_hex(&key, arg);
if (dht_infohash_is_zero(&key)) {
dht_infohash_get_from_string(&key, arg);
printf("Using h(%s) = %s\n", arg, dht_infohash_print(&key));
else if (!strcmp(cmd, "ll")) {
dht_infohash key = dht_runner_get_node_id(runner);
printf("DHT node %s running on port %u\n", dht_infohash_print(&key), dht_runner_get_bound_port(runner, AF_INET));
continue;
}
if (!strcmp(cmd, "g")) {
else if (!strcmp(cmd, "g")) {
key = parse_key(arg);
dht_runner_get(runner, &key, dht_get_callback, dht_get_done_callback, runner);
} else if (!strcmp(cmd, "l")) {
}
else if (!strcmp(cmd, "l")) {
key = parse_key(arg);
struct listen_context* ctx = malloc(sizeof(struct listen_context));
ctx->runner = runner;
ctx->count = 0;
ctx->token = dht_runner_listen(runner, &key, dht_value_callback, listen_context_free, ctx);
} else if (!strcmp(cmd, "p")) {
}
else if (!strcmp(cmd, "p")) {
key = parse_key(arg);
dht_value* val = dht_value_new_from_string(value);
dht_runner_put(runner, &key, val, dht_put_done_callback, runner, true);
dht_value_unref(val);
}
else {
printf("Unkown command: %s\n", cmd);
}
}
printf("Stopping..\n");

// Graceful shutdown
printf("Stopping…\n");
struct op_context ctx;
ctx.runner = runner;
atomic_init(&ctx.stop, false);
dht_runner_shutdown(runner, dht_shutdown_callback, &ctx);

// Wait until shutdown callback is called
while (!atomic_load(&ctx.stop)) {
usleep(10000);
}
dht_runner_delete(runner);
return 0;
return EXIT_SUCCESS;
}

0 comments on commit 1e8a0aa

Please sign in to comment.