From 38454eaf6eae027b04cec050c3018dfae8262e21 Mon Sep 17 00:00:00 2001 From: AmnaSnene Date: Tue, 9 Jul 2024 16:42:14 -0400 Subject: [PATCH] peer discovery: add a timer to schedule `connectivityChanged` with a maximum interval of one minute --- include/opendht/peer_discovery.h | 35 +++++++++++++++++++------------- src/dhtrunner.cpp | 4 ++++ src/peer_discovery.cpp | 35 ++++++++++++++++++++++++++++++++ 3 files changed, 60 insertions(+), 14 deletions(-) diff --git a/include/opendht/peer_discovery.h b/include/opendht/peer_discovery.h index 626aabd59..a22f42180 100644 --- a/include/opendht/peer_discovery.h +++ b/include/opendht/peer_discovery.h @@ -23,7 +23,9 @@ #include "sockaddr.h" #include "infohash.h" #include "logger.h" +#include "utils.h" +#include #include namespace asio { @@ -36,31 +38,34 @@ class OPENDHT_PUBLIC PeerDiscovery { public: static constexpr in_port_t DEFAULT_PORT = 8888; - using ServiceDiscoveredCallback = std::function; + using ServiceDiscoveredCallback = std::function; - PeerDiscovery(in_port_t port = DEFAULT_PORT, std::shared_ptr ioContext = {}, std::shared_ptr logger = {}); + PeerDiscovery(in_port_t port = DEFAULT_PORT, + std::shared_ptr ioContext = {}, + std::shared_ptr logger = {}); ~PeerDiscovery(); /** - * startDiscovery - Keep Listening data from the sender until node is joinned or stop is called - */ + * startDiscovery - Keep Listening data from the sender until node is joinned + * or stop is called + */ void startDiscovery(const std::string &type, ServiceDiscoveredCallback callback); - template - void startDiscovery(const std::string &type, std::function cb) { - startDiscovery(type, [cb](msgpack::object&& ob, SockAddr&& addr) { + template + void startDiscovery(const std::string &type, std::function cb) { + startDiscovery(type, [cb](msgpack::object &&ob, SockAddr &&addr) { cb(ob.as(), std::move(addr)); }); } /** * startPublish - Keeping sending data until node is joinned or stop is called - */ + */ void startPublish(const std::string &type, const msgpack::sbuffer &pack_buf); void startPublish(sa_family_t domain, const std::string &type, const msgpack::sbuffer &pack_buf); - template - void startPublish(const std::string &type, const T& object) { + template + void startPublish(const std::string &type, const T &object) { msgpack::sbuffer buf; msgpack::pack(buf, object); startPublish(type, buf); @@ -68,22 +73,24 @@ class OPENDHT_PUBLIC PeerDiscovery /** * Thread Stopper - */ + */ void stop(); /** * Remove possible callBack to discovery - */ + */ bool stopDiscovery(const std::string &type); /** * Remove different serivce message to send - */ + */ bool stopPublish(const std::string &type); bool stopPublish(sa_family_t domain, const std::string &type); void connectivityChanged(); + void stopConnectivityChanged(); + private: class DomainPeerDiscovery; std::unique_ptr peerDiscovery4_; @@ -92,4 +99,4 @@ class OPENDHT_PUBLIC PeerDiscovery std::thread ioRunnner_; }; -} +} // namespace dht diff --git a/src/dhtrunner.cpp b/src/dhtrunner.cpp index d83fba204..b30092c06 100644 --- a/src/dhtrunner.cpp +++ b/src/dhtrunner.cpp @@ -269,6 +269,10 @@ DhtRunner::run(const Config& config, Context&& context) if (status4 == NodeStatus::Disconnected && status6 == NodeStatus::Disconnected) { peerDiscovery_->connectivityChanged(); } + else if (status4 != NodeStatus::Connected || status6 != NodeStatus::Connected) { + peerDiscovery_->stopConnectivityChanged(); + } + }); } #endif diff --git a/src/peer_discovery.cpp b/src/peer_discovery.cpp index d020bf4fd..22fe8ce6f 100644 --- a/src/peer_discovery.cpp +++ b/src/peer_discovery.cpp @@ -52,6 +52,7 @@ class PeerDiscovery::DomainPeerDiscovery void connectivityChanged(); + void stopConnectivityChanged(); private: Sp logger_; //dmtx_ for callbackmap_ and drunning_ (write) @@ -59,6 +60,14 @@ class PeerDiscovery::DomainPeerDiscovery //mtx_ for messages_ and lrunning (listen) std::mutex mtx_; std::shared_ptr ioContext_; + + static constexpr dht::duration PeerDiscovery_PERIOD_MAX{ + std::chrono::minutes(1)}; + static constexpr std::chrono::seconds PeerDiscovery_PERIOD{10}; + asio::steady_timer peerDiscoveryTimer; + std::chrono::steady_clock::duration peerDiscovery_period{ + PeerDiscovery_PERIOD}; + asio::ip::udp::socket sockFd_; asio::ip::udp::endpoint sockAddrSend_; @@ -86,6 +95,7 @@ class PeerDiscovery::DomainPeerDiscovery PeerDiscovery::DomainPeerDiscovery::DomainPeerDiscovery(asio::ip::udp domain, in_port_t port, Sp ioContext, Sp logger) : logger_(logger) , ioContext_(ioContext) + , peerDiscoveryTimer(*ioContext_) , sockFd_(*ioContext_, domain) , sockAddrSend_(asio::ip::address::from_string(domain.family() == AF_INET ? MULTICAST_ADDRESS_IPV4 : MULTICAST_ADDRESS_IPV6), port) @@ -323,6 +333,24 @@ PeerDiscovery::DomainPeerDiscovery::connectivityChanged() }); if (logger_) logger_->d("PeerDiscovery: connectivity changed"); + + if (peerDiscovery_period == PeerDiscovery_PERIOD_MAX ){ + peerDiscovery_period = PeerDiscovery_PERIOD; + } + else{ + peerDiscoveryTimer.expires_after(peerDiscovery_period); + peerDiscoveryTimer.async_wait([this](const asio::error_code& ec) { + if (ec == asio::error::operation_aborted) + return; + connectivityChanged(); + }); + peerDiscovery_period = std::min(peerDiscovery_period * 2, PeerDiscovery_PERIOD_MAX); + } +} + +void PeerDiscovery::DomainPeerDiscovery::stopConnectivityChanged() { + peerDiscoveryTimer.cancel(); + peerDiscovery_period = PeerDiscovery_PERIOD; } PeerDiscovery::PeerDiscovery(in_port_t port, Sp ioContext, Sp logger) @@ -435,4 +463,11 @@ PeerDiscovery::connectivityChanged() peerDiscovery6_->connectivityChanged(); } +void PeerDiscovery::stopConnectivityChanged() { + if (peerDiscovery4_) + peerDiscovery4_->stopConnectivityChanged(); + if (peerDiscovery6_) + peerDiscovery6_->stopConnectivityChanged(); +} + } /* namespace dht */