diff --git a/include/dpp/cluster.h b/include/dpp/cluster.h index fcae83b18e..9b193f8098 100644 --- a/include/dpp/cluster.h +++ b/include/dpp/cluster.h @@ -272,11 +272,11 @@ class DPP_EXPORT cluster { * @param maxclusters The total number of clusters that are active, which may be on separate processes or even separate machines. * @param compressed Whether or not to use compression for shards on this cluster. Saves a ton of bandwidth at the cost of some CPU * @param policy Set the caching policy for the cluster, either lazy (only cache users/members when they message the bot) or aggressive (request whole member lists on seeing new guilds too) - * @param request_threads The number of threads to allocate for making HTTP requests to Discord. This defaults to 12. You can increase this at runtime via the object returned from get_rest(). - * @param request_threads_raw The number of threads to allocate for making HTTP requests to sites outside of Discord. This defaults to 1. You can increase this at runtime via the object returned from get_raw_rest(). + * @param pool_threads The number of threads to allocate for the thread pool. This defaults to half your system concurrency and if set to a number less than 4, will default to 4. + * All callbacks and events are placed into the thread pool. The bigger you make this pool (but generally no bigger than your number of cores), the more your bot will scale. * @throw dpp::exception Thrown on windows, if WinSock fails to initialise, or on any other system if a dpp::request_queue fails to construct */ - cluster(const std::string& token, uint32_t intents = i_default_intents, uint32_t shards = 0, uint32_t cluster_id = 0, uint32_t maxclusters = 1, bool compressed = true, cache_policy_t policy = cache_policy::cpol_default, uint32_t request_threads = 12, uint32_t request_threads_raw = 1); + cluster(const std::string& token, uint32_t intents = i_default_intents, uint32_t shards = 0, uint32_t cluster_id = 0, uint32_t maxclusters = 1, bool compressed = true, cache_policy_t policy = cache_policy::cpol_default, uint32_t pool_threads = std::thread::hardware_concurrency() / 2); /** * @brief Place some arbitrary work into the thread pool for execution when time permits. diff --git a/include/dpp/coro/awaitable.h b/include/dpp/coro/awaitable.h index e8974c0a88..296ace2490 100644 --- a/include/dpp/coro/awaitable.h +++ b/include/dpp/coro/awaitable.h @@ -44,6 +44,7 @@ struct awaitable_dummy { #include #include #include +#include namespace dpp { diff --git a/include/dpp/discordvoiceclient.h b/include/dpp/discordvoiceclient.h index b902208d4c..14a0f63ec8 100644 --- a/include/dpp/discordvoiceclient.h +++ b/include/dpp/discordvoiceclient.h @@ -271,7 +271,7 @@ class DPP_EXPORT discord_voice_client : public websocket_client /** * @brief Last connect time of voice session */ - time_t connect_time; + time_t connect_time{}; /* * @brief For mixing outgoing voice data. @@ -286,12 +286,12 @@ class DPP_EXPORT discord_voice_client : public websocket_client /** * @brief Port number of UDP/RTP endpoint */ - uint16_t port; + uint16_t port{}; /** * @brief SSRC value */ - uint64_t ssrc; + uint64_t ssrc{}; /** * @brief List of supported audio encoding modes @@ -301,7 +301,7 @@ class DPP_EXPORT discord_voice_client : public websocket_client /** * @brief Timescale in nanoseconds */ - uint64_t timescale; + uint64_t timescale{}; /** * @brief Output buffer @@ -428,14 +428,14 @@ class DPP_EXPORT discord_voice_client : public websocket_client /** * @brief If true, audio packet sending is paused */ - bool paused; + bool paused{}; /** * @brief Whether has sent 5 frame of silence before stopping on pause. * * This is to avoid unintended Opus interpolation with subsequent transmissions. */ - bool sent_stop_frames; + bool sent_stop_frames{}; /** * @brief Number of times we have tried to reconnect in the last few seconds @@ -451,13 +451,13 @@ class DPP_EXPORT discord_voice_client : public websocket_client /** * @brief libopus encoder */ - OpusEncoder* encoder; + OpusEncoder* encoder{}; /** * @brief libopus repacketizer * (merges frames into one packet) */ - OpusRepacketizer* repacketizer; + OpusRepacketizer* repacketizer{}; /** * @brief This holds the state information for DAVE E2EE. @@ -499,14 +499,14 @@ class DPP_EXPORT discord_voice_client : public websocket_client /** * @brief File descriptor for UDP connection */ - dpp::socket fd; + dpp::socket fd{}; /** * @brief Secret key for encrypting voice. * If it has been sent, this contains a sequence of exactly 32 bytes * (secret_key_size) and has_secret_key is set to true. */ - std::array secret_key; + std::array secret_key{}; /** * @brief True if the voice client has a secret key @@ -517,21 +517,21 @@ class DPP_EXPORT discord_voice_client : public websocket_client * @brief Sequence number of outbound audio. This is incremented * once per frame sent. */ - uint16_t sequence; + uint16_t sequence{}; /** * @brief Last received sequence from gateway. * * Needed for heartbeat and resume payload. */ - int32_t receive_sequence; + int32_t receive_sequence{}; /** * @brief Timestamp value used in outbound audio. Each packet * has the timestamp value which is incremented to match * how many frames are sent. */ - uint32_t timestamp; + uint32_t timestamp{}; /** * @brief Each packet should have a nonce, a 32-bit incremental @@ -542,7 +542,7 @@ class DPP_EXPORT discord_voice_client : public websocket_client * * Current initial value is hardcoded to 1. */ - uint32_t packet_nonce; + uint32_t packet_nonce{}; /** * @brief Last sent packet high-resolution timestamp @@ -552,7 +552,7 @@ class DPP_EXPORT discord_voice_client : public websocket_client /** * @brief Fraction of the sleep that was not executed after the last audio packet was sent */ - std::chrono::nanoseconds last_sleep_remainder; + std::chrono::nanoseconds last_sleep_remainder{}; /** * @brief Maps receiving ssrc to user id @@ -564,7 +564,7 @@ class DPP_EXPORT discord_voice_client : public websocket_client * When this moves from false to true, this causes the * client to send the 'talking' notification to the websocket. */ - bool sending; + bool sending{}; /** * @brief Number of track markers in the buffer. For example if there @@ -575,7 +575,7 @@ class DPP_EXPORT discord_voice_client : public websocket_client * If the buffer is empty, there are zero tracks in the * buffer. */ - uint32_t tracks; + uint32_t tracks{}; /** * @brief Meta data associated with each track. @@ -587,7 +587,7 @@ class DPP_EXPORT discord_voice_client : public websocket_client /** * @brief Encoding buffer for opus repacketizer and encode */ - uint8_t encode_buffer[65536]; + uint8_t encode_buffer[65536]{}; /** * @brief DAVE - Discord Audio Visual Encryption @@ -708,37 +708,37 @@ class DPP_EXPORT discord_voice_client : public websocket_client /** * @brief Owning cluster */ - class dpp::cluster* creator; + class dpp::cluster* creator{}; /** * @brief True when the thread is shutting down */ - bool terminating; + bool terminating{}; /** * @brief The gain value for the end of the current voice iteration. */ - float end_gain; + float end_gain{}; /** * @brief The gain value for the current voice iteration. */ - float current_gain; + float current_gain{}; /** * @brief The amount to increment each successive sample for, for the current voice iteration. */ - float increment; + float increment{}; /** * @brief Heartbeat interval for sending heartbeat keepalive */ - uint32_t heartbeat_interval; + uint32_t heartbeat_interval{}; /** * @brief Last voice channel websocket heartbeat */ - time_t last_heartbeat; + time_t last_heartbeat{}; /** * @brief Discord voice session token diff --git a/include/dpp/queues.h b/include/dpp/queues.h index bd43db6a72..f54cb9c7a9 100644 --- a/include/dpp/queues.h +++ b/include/dpp/queues.h @@ -25,11 +25,10 @@ #include #include #include -#include #include +#include #include #include -#include #include #include @@ -172,8 +171,11 @@ struct DPP_EXPORT http_request_completion_t { * @brief Results of HTTP requests are called back to these std::function types. * * @note Returned http_completion_events are called ASYNCHRONOUSLY in your - * code which means they execute in a separate thread. The completion events - * arrive in order. + * code which means they execute in a separate thread, results for the requests going + * into a dpp::thread_pool. Completion events may not arrive in order depending on if + * one request takes longer than another. Using the callbacks or using coroutines + * correctly ensures that the order they arrive in the queue does not negatively affect + * your code. */ typedef std::function http_completion_event; @@ -377,7 +379,7 @@ class DPP_EXPORT http_request { * @brief Execute the HTTP request and mark the request complete. * @param owner creating cluster */ - http_request_completion_t run(class in_thread* processor, class cluster* owner); + http_request_completion_t run(class request_concurrency_queue* processor, class cluster* owner); /** @brief Returns true if the request is complete */ bool is_completed(); @@ -416,27 +418,32 @@ struct DPP_EXPORT bucket_t { /** - * @brief Represents a thread in the thread pool handling requests to HTTP(S) servers. - * There are several of these, the total defined by a constant in queues.cpp, and each + * @brief Represents a timer instance in a pool handling requests to HTTP(S) servers. + * There are several of these, the total defined by a constant in cluster.cpp, and each * one will always receive requests for the same rate limit bucket based on its endpoint * portion of the url. This makes rate limit handling reliable and easy to manage. - * Each of these also has its own mutex, so that requests are less likely to block while - * waiting for internal containers to be usable. + * Each of these also has its own mutex, making it thread safe to call and use these + * from anywhere in the code. */ -class DPP_EXPORT in_thread { +class DPP_EXPORT request_concurrency_queue { public: + /** + * @brief Queue index + */ + int in_index{0}; + /** * @brief True if ending. */ std::atomic terminating; /** - * @brief Request queue that owns this in_thread. + * @brief Request queue that owns this request_concurrency_queue. */ class request_queue* requests; /** - * @brief The cluster that owns this in_thread. + * @brief The cluster that owns this request_concurrency_queue. */ class cluster* creator; @@ -446,14 +453,12 @@ class DPP_EXPORT in_thread { std::shared_mutex in_mutex; /** - * @brief Inbound queue thread. + * @brief Inbound queue timer. The timer is called every second, + * and when it wakes up it checks for requests pending to be sent in the queue. + * If there are any requests and we are not waiting on rate limit, it will send them, + * else it will wait for the rate limit to expire. */ - std::thread* in_thr; - - /** - * @brief Inbound queue condition, signalled when there are requests to fulfill. - */ - std::condition_variable in_ready; + dpp::timer in_timer; /** * @brief Rate-limit bucket counters. @@ -466,34 +471,34 @@ class DPP_EXPORT in_thread { std::vector> requests_in; /** - * @brief Inbound queue thread loop. - * @param index Thread index + * @brief Timer callback + * @param index Index ID for this timer */ - void in_loop(uint32_t index); + void tick_and_deliver_requests(uint32_t index); /** - * @brief Construct a new in thread object + * @brief Construct a new concurrency queue object * * @param owner Owning cluster * @param req_q Owning request queue - * @param index Thread index number + * @param index Queue index number, uniquely identifies this queue for hashing */ - in_thread(class cluster* owner, class request_queue* req_q, uint32_t index); + request_concurrency_queue(class cluster* owner, class request_queue* req_q, uint32_t index); /** - * @brief Destroy the in thread object - * This will end the thread that is owned by this object by joining it. + * @brief Destroy the concurrency queue object + * This will stop the timer. */ - ~in_thread(); + ~request_concurrency_queue(); /** - * @brief Terminates the thread - * This will end the thread that is owned by this object, but will not join it. + * @brief Flags the queue as terminating + * This will set the internal atomic bool that indicates this queue is to accept no more requests */ void terminate(); /** - * @brief Post a http_request to this thread. + * @brief Post a http_request to this queue. * * @param req http_request to post. The pointer will be freed when it has * been executed. @@ -507,22 +512,24 @@ class DPP_EXPORT in_thread { * * It ensures asynchronous delivery of events and queueing of requests. * - * It will spawn two threads, one to make outbound HTTP requests and push the returned - * results into a queue, and the second to call the callback methods with these results. - * They are separated so that if the user decides to take a long time processing a reply - * in their callback it won't affect when other requests are sent, and if a HTTP request - * takes a long time due to latency, it won't hold up user processing. + * It will spawn multiple timers to make outbound HTTP requests and then call the callbacks + * of those requests on completion within the dpp::thread_pool for the cluster. + * If the user decides to take a long time processing a reply in their callback it won't affect + * when other requests are sent, and if a HTTP request takes a long time due to latency, it won't + * hold up user processing. * * There are usually two request_queue objects in each dpp::cluster, one of which is used * internally for the various REST methods to Discord such as sending messages, and the other - * used to support user REST calls via dpp::cluster::request(). + * used to support user REST calls via dpp::cluster::request(). They are separated so that the + * one for user requests can be specifically configured to never ever send the Discord token + * unless it is explicitly placed into the request, for security reasons. */ class DPP_EXPORT request_queue { public: /** - * @brief Required so in_thread can access these member variables + * @brief Required so request_concurrency_queue can access these member variables */ - friend class in_thread; + friend class request_concurrency_queue; /** * @brief The cluster that owns this request_queue @@ -545,66 +552,69 @@ class DPP_EXPORT request_queue { }; /** - * @brief A vector of inbound request threads forming a pool. - * There are a set number of these defined by a constant in queues.cpp. A request is always placed + * @brief A vector of timers forming a pool. + * + * There are a set number of these defined by a constant in cluster.cpp. A request is always placed * on the same element in this vector, based upon its url, so that two conditions are satisfied: - * 1) Any requests for the same ratelimit bucket are handled by the same thread in the pool so that + * + * 1) Any requests for the same ratelimit bucket are handled by the same concurrency queue in the pool so that * they do not create unnecessary 429 errors, * 2) Requests for different endpoints go into different buckets, so that they may be requested in parallel - * A global ratelimit event pauses all threads in the pool. These are few and far between. + * A global ratelimit event pauses all timers in the pool. These are few and far between. */ - std::vector> requests_in; + std::vector> requests_in; /** - * @brief Set to true if the threads should terminate + * @brief Set to true if the timers should terminate. + * When this is set to true no further requests are accepted to the queues. */ std::atomic terminating; /** - * @brief True if globally rate limited - makes the entire request thread wait + * @brief True if globally rate limited + * + * When globally rate limited the concurrency queues associated with this request queue + * will not process any requests in their timers until the global rate limit expires. */ bool globally_ratelimited; /** - * @brief How many seconds we are globally rate limited for + * @brief When we are globally rate limited until (unix epoch) * - * @note Only if globally_ratelimited is true. + * @note Only valid if globally_rate limited is true. If we are globally rate limited, + * queues in this class will not process requests until the current unix epoch time + * is greater than this time. */ - uint64_t globally_limited_for; + time_t globally_limited_until; /** - * @brief Number of request threads in the thread pool + * @brief Number of request queues in the pool. This is the direct size of the requests_in + * vector. */ - uint32_t in_thread_pool_size; + uint32_t in_queue_pool_size; /** * @brief constructor * @param owner The creating cluster. - * @param request_threads The number of http request threads to allocate to the threadpool. - * By default eight threads are allocated. - * Side effects: Creates threads for the queue - */ - request_queue(class cluster* owner, uint32_t request_threads = 8); - - /** - * @brief Add more request threads to the library at runtime. - * @note You should do this at a quiet time when there are few requests happening. - * This will reorganise the hashing used to place requests into the thread pool so if you do - * this while the bot is busy there is a small chance of receiving "429 rate limited" errors. - * @param request_threads Number of threads to add. It is not possible to scale down at runtime. - * @return reference to self + * @param request_concurrency The number of http request queues to allocate. + * Each request queue is a dpp::timer which ticks every second looking for new + * requests to run. The timer will hold back requests if we are waiting as to comply + * with rate limits. Adding a request to this class will cause the queue it is placed in + * to run immediately but this cannot override rate limits. + * By default eight concurrency queues are allocated. + * Side effects: Creates timers for the queue */ - request_queue& add_request_threads(uint32_t request_threads); + request_queue(class cluster* owner, uint32_t request_concurrency = 8); /** - * @brief Get the request thread count - * @return uint32_t number of request threads that are active + * @brief Get the request queue concurrency count + * @return uint32_t number of request queues that are active */ - uint32_t get_request_thread_count() const; + uint32_t get_request_queue_count() const; /** * @brief Destroy the request queue object. - * Side effects: Joins and deletes queue threads + * Side effects: Ends and deletes concurrency timers */ ~request_queue(); diff --git a/src/dpp/cluster.cpp b/src/dpp/cluster.cpp index 3d41f311a9..99926b3f37 100644 --- a/src/dpp/cluster.cpp +++ b/src/dpp/cluster.cpp @@ -84,16 +84,17 @@ template bool DPP_EXPORT validate_configuration(); template bool DPP_EXPORT validate_configuration(); -cluster::cluster(const std::string &_token, uint32_t _intents, uint32_t _shards, uint32_t _cluster_id, uint32_t _maxclusters, bool comp, cache_policy_t policy, uint32_t request_threads, uint32_t request_threads_raw) +cluster::cluster(const std::string &_token, uint32_t _intents, uint32_t _shards, uint32_t _cluster_id, uint32_t _maxclusters, bool comp, cache_policy_t policy, uint32_t pool_threads) : default_gateway("gateway.discord.gg"), rest(nullptr), raw_rest(nullptr), compressed(comp), start_time(0), token(_token), last_identify(time(nullptr) - 5), intents(_intents), numshards(_shards), cluster_id(_cluster_id), maxclusters(_maxclusters), rest_ping(0.0), cache_policy(policy), ws_mode(ws_json) { socketengine = create_socket_engine(this); - pool = std::make_unique(this, request_threads); + pool = std::make_unique(this, pool_threads > 4 ? pool_threads : 4); /* Instantiate REST request queues */ try { - rest = new request_queue(this, request_threads); - raw_rest = new request_queue(this, request_threads_raw); + /* NOTE: These no longer use threads. This instantiates 16+4 dpp::timer instances. */ + rest = new request_queue(this, 16); + raw_rest = new request_queue(this, 4); } catch (std::bad_alloc&) { delete rest; diff --git a/src/dpp/queues.cpp b/src/dpp/queues.cpp index f1f34f4cba..0b76a4833b 100644 --- a/src/dpp/queues.cpp +++ b/src/dpp/queues.cpp @@ -20,16 +20,20 @@ * ************************************************************************************/ #include -#ifdef _WIN32 -/* Central point for forcing inclusion of winsock library for all socket code */ -#include -#endif #include #include #include +#ifdef _WIN32 + #include +#endif namespace dpp { +/** + * @brief List of possible request verbs. + * + * This MUST MATCH the size of the dpp::http_method enum! + */ constexpr std::array request_verb { "GET", "POST", @@ -151,7 +155,7 @@ bool http_request::is_completed() } /* Execute a HTTP request */ -http_request_completion_t http_request::run(in_thread* processor, cluster* owner) { +http_request_completion_t http_request::run(request_concurrency_queue* processor, cluster* owner) { http_request_completion_t rv; double start = dpp::utility::time_f(); @@ -247,7 +251,8 @@ http_request_completion_t http_request::run(in_thread* processor, cluster* owner newbucket.timestamp = time(nullptr); processor->requests->globally_ratelimited = rv.ratelimit_global; if (processor->requests->globally_ratelimited) { - processor->requests->globally_limited_for = (newbucket.retry_after ? newbucket.retry_after : newbucket.reset_after); + /* We are globally rate limited - user up to shenanigans */ + processor->requests->globally_limited_until = (newbucket.retry_after ? newbucket.retry_after : newbucket.reset_after) + newbucket.timestamp; } processor->buckets[this->endpoint] = newbucket; @@ -269,141 +274,131 @@ void http_request::stash_self(std::unique_ptr self) { me = std::move(self); } -request_queue::request_queue(class cluster* owner, uint32_t request_threads) : creator(owner), terminating(false), globally_ratelimited(false), globally_limited_for(0), in_thread_pool_size(request_threads) +request_queue::request_queue(class cluster* owner, uint32_t request_concurrency) : creator(owner), terminating(false), globally_ratelimited(false), globally_limited_until(0), in_queue_pool_size(request_concurrency) { - for (uint32_t in_alloc = 0; in_alloc < in_thread_pool_size; ++in_alloc) { - requests_in.push_back(std::make_unique(owner, this, in_alloc)); + /* Create request_concurrency timer instances */ + for (uint32_t in_alloc = 0; in_alloc < in_queue_pool_size; ++in_alloc) { + requests_in.push_back(std::make_unique(owner, this, in_alloc)); } } -request_queue& request_queue::add_request_threads(uint32_t request_threads) +uint32_t request_queue::get_request_queue_count() const { - for (uint32_t in_alloc_ex = 0; in_alloc_ex < request_threads; ++in_alloc_ex) { - requests_in.push_back(std::make_unique(creator, this, in_alloc_ex + in_thread_pool_size)); - } - in_thread_pool_size += request_threads; - return *this; + return in_queue_pool_size; } -uint32_t request_queue::get_request_thread_count() const +request_concurrency_queue::request_concurrency_queue(class cluster* owner, class request_queue* req_q, uint32_t index) : in_index(index), terminating(false), requests(req_q), creator(owner) { - return in_thread_pool_size; + in_timer = creator->start_timer([this](auto timer_handle) { + tick_and_deliver_requests(in_index); + }, 1); } -in_thread::in_thread(class cluster* owner, class request_queue* req_q, uint32_t index) : terminating(false), requests(req_q), creator(owner) -{ - this->in_thr = new std::thread(&in_thread::in_loop, this, index); -} - -in_thread::~in_thread() +request_concurrency_queue::~request_concurrency_queue() { terminate(); - in_thr->join(); - delete in_thr; + creator->stop_timer(in_timer); } -void in_thread::terminate() +void request_concurrency_queue::terminate() { terminating.store(true, std::memory_order_relaxed); - in_ready.notify_one(); } request_queue::~request_queue() { terminating.store(true, std::memory_order_relaxed); for (auto& in_thr : requests_in) { - in_thr->terminate(); // signal all of them here, otherwise they will all join 1 by 1 and it will take forever + /* Note: We don't need to set the atomic to make timers quit, this is purely + * to prevent additional requests going into the queue while it is being destructed + * from other threads, + */ + in_thr->terminate(); } } -void in_thread::in_loop(uint32_t index) +void request_concurrency_queue::tick_and_deliver_requests(uint32_t index) { - utility::set_thread_name(std::string("http_req/") + std::to_string(index)); - while (!terminating.load(std::memory_order_relaxed)) { - std::mutex mtx; - std::unique_lock lock{ mtx }; - in_ready.wait_for(lock, std::chrono::seconds(1)); - /* New request to be sent! */ + if (terminating) { + return; + } - if (!requests->globally_ratelimited) { + if (!requests->globally_ratelimited) { - std::vector requests_view; - { - /* Gather all the requests first within a mutex */ - std::shared_lock lock(in_mutex); - if (requests_in.empty()) { - /* Nothing to copy, wait again */ - continue; - } - requests_view.reserve(requests_in.size()); - std::transform(requests_in.begin(), requests_in.end(), std::back_inserter(requests_view), [](const std::unique_ptr &r) { - return r.get(); - }); + std::vector requests_view; + { + /* Gather all the requests first within a mutex */ + std::shared_lock lock(in_mutex); + if (requests_in.empty()) { + /* Nothing to copy, check again when we call the timer in a second */ + return; } + requests_view.reserve(requests_in.size()); + std::transform(requests_in.begin(), requests_in.end(), std::back_inserter(requests_view), [](const std::unique_ptr &r) { + return r.get(); + }); + } - for (auto& request_view : requests_view) { - const std::string &key = request_view->endpoint; - http_request_completion_t rv; - auto currbucket = buckets.find(key); - - if (currbucket != buckets.end()) { - /* There's a bucket for this request. Check its status. If the bucket says to wait, - * skip all requests in this bucket till its ok. - */ - if (currbucket->second.remaining < 1) { - uint64_t wait = (currbucket->second.retry_after ? currbucket->second.retry_after : currbucket->second.reset_after); - if ((uint64_t)time(nullptr) > currbucket->second.timestamp + wait) { - /* Time has passed, we can process this bucket again. send its request. */ - request_view->run(this, creator); - } else { - if (!request_view->waiting) { - request_view->waiting = true; - } - /* Time not up yet, wait more */ - break; - } - } else { - /* There's limit remaining, we can just run the request */ + for (auto& request_view : requests_view) { + const std::string &key = request_view->endpoint; + http_request_completion_t rv; + auto currbucket = buckets.find(key); + + if (currbucket != buckets.end()) { + /* There's a bucket for this request. Check its status. If the bucket says to wait, + * skip all requests until the timer value indicates the rate limit won't be hit + */ + if (currbucket->second.remaining < 1) { + uint64_t wait = (currbucket->second.retry_after ? currbucket->second.retry_after : currbucket->second.reset_after); + if ((uint64_t)time(nullptr) > currbucket->second.timestamp + wait) { + /* Time has passed, we can process this bucket again. send its request. */ request_view->run(this, creator); + } else { + if (!request_view->waiting) { + request_view->waiting = true; + } + /* Time not up yet, wait more */ + break; } } else { - /* No bucket for this endpoint yet. Just send it, and make one from its reply */ + /* We aren't at the limit, so we can just run the request */ request_view->run(this, creator); } + } else { + /* No bucket for this endpoint yet. Just send it, and make one from its reply */ + request_view->run(this, creator); + } - /* Remove from inbound requests */ - std::unique_ptr rq; - { - /* Find the owned pointer in requests_in */ - std::scoped_lock lock1{in_mutex}; - - const std::string &key = request_view->endpoint; - auto [begin, end] = std::equal_range(requests_in.begin(), requests_in.end(), key, compare_request{}); - for (auto it = begin; it != end; ++it) { - if (it->get() == request_view) { - /* Grab and remove */ - // NOTE: Where to move this to?! - request_view->stash_self(std::move(*it)); - requests_in.erase(it); - break; - } + /* Remove from inbound requests */ + std::unique_ptr rq; + { + /* Find the owned pointer in requests_in */ + std::scoped_lock lock1{in_mutex}; + + const std::string &key = request_view->endpoint; + auto [begin, end] = std::equal_range(requests_in.begin(), requests_in.end(), key, compare_request{}); + for (auto it = begin; it != end; ++it) { + if (it->get() == request_view) { + /* Grab and remove */ + request_view->stash_self(std::move(*it)); + requests_in.erase(it); + break; } } } + } - } else { - if (requests->globally_limited_for > 0) { - std::this_thread::sleep_for(std::chrono::seconds(requests->globally_limited_for)); - requests->globally_limited_for = 0; - } + } else { + /* If we are globally rate limited, do nothing until we are not */ + if (time(nullptr) > requests->globally_limited_until) { + requests->globally_limited_until = 0; requests->globally_ratelimited = false; - in_ready.notify_one(); } } } /* Post a http_request into the queue */ -void in_thread::post_request(std::unique_ptr req) +void request_concurrency_queue::post_request(std::unique_ptr req) { { std::scoped_lock lock(in_mutex); @@ -411,17 +406,21 @@ void in_thread::post_request(std::unique_ptr req) auto where = std::lower_bound(requests_in.begin(), requests_in.end(), req->endpoint, compare_request{}); requests_in.emplace(where, std::move(req)); } - in_ready.notify_one(); + /* Immediately trigger requests in this queue */ + tick_and_deliver_requests(in_index); } -/* Simple hash function for hashing urls into thread pool values, - * ensuring that the same url always ends up on the same thread, +/* @brief Simple hash function for hashing urls into request pool values, + * ensuring that the same url always ends up in the same queue, * which means that it will be part of the same ratelimit bucket. * I did consider std::hash for this, but std::hash returned even * numbers for absolutely every string i passed it on g++ 10.0, * so this was a no-no. There are also much bigger more complex * hash functions that claim to be really fast, but this is * readable and small and fits the requirement exactly. + * + * @param s String to hash + * @return Hash value */ inline uint32_t hash(const char *s) { @@ -435,7 +434,9 @@ inline uint32_t hash(const char *s) /* Post a http_request into a request queue */ request_queue& request_queue::post_request(std::unique_ptr req) { - requests_in[hash(req->endpoint.c_str()) % in_thread_pool_size]->post_request(std::move(req)); + if (!terminating) { + requests_in[hash(req->endpoint.c_str()) % in_queue_pool_size]->post_request(std::move(req)); + } return *this; } diff --git a/src/dpp/socketengines/epoll.cpp b/src/dpp/socketengines/epoll.cpp index c330cd4255..7418f6c7fd 100644 --- a/src/dpp/socketengines/epoll.cpp +++ b/src/dpp/socketengines/epoll.cpp @@ -85,7 +85,7 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base { } const int fd = eh->fd; - if (fd == INVALID_SOCKET) { + if (fd == INVALID_SOCKET || eh->flags & WANT_DELETION) { continue; } diff --git a/src/dpp/socketengines/kqueue.cpp b/src/dpp/socketengines/kqueue.cpp index ce2da57653..9240d2e17b 100644 --- a/src/dpp/socketengines/kqueue.cpp +++ b/src/dpp/socketengines/kqueue.cpp @@ -68,7 +68,7 @@ struct DPP_EXPORT socket_engine_kqueue : public socket_engine_base { for (int j = 0; j < i; j++) { const struct kevent& kev = ke_list[j]; auto* eh = reinterpret_cast(kev.udata); - if (eh == nullptr) { + if (eh == nullptr || eh->flags & WANT_DELETION) { continue; } diff --git a/src/dpp/socketengines/poll.cpp b/src/dpp/socketengines/poll.cpp index dd89c6efda..a336a20954 100644 --- a/src/dpp/socketengines/poll.cpp +++ b/src/dpp/socketengines/poll.cpp @@ -96,6 +96,10 @@ struct DPP_EXPORT socket_engine_poll : public socket_engine_base { } socket_events *eh = iter->second.get(); + if (eh == nullptr || eh->flags & WANT_DELETION) { + continue; + } + try { if ((revents & POLLHUP) != 0) { diff --git a/src/dpp/sslclient.cpp b/src/dpp/sslclient.cpp index 4e81c0f887..454f9517c2 100644 --- a/src/dpp/sslclient.cpp +++ b/src/dpp/sslclient.cpp @@ -547,8 +547,8 @@ void ssl_client::close() SSL_free(ssl->ssl); ssl->ssl = nullptr; } - close_socket(sfd); owner->socketengine->delete_socket(sfd); + close_socket(sfd); sfd = INVALID_SOCKET; obuffer.clear(); buffer.clear(); @@ -565,6 +565,7 @@ ssl_client::~ssl_client() cleanup(); if (timer_handle) { owner->stop_timer(timer_handle); + timer_handle = 0; } } diff --git a/src/unittest/test.cpp b/src/unittest/test.cpp index bded8f1efd..9b2e0778ab 100644 --- a/src/unittest/test.cpp +++ b/src/unittest/test.cpp @@ -1178,19 +1178,25 @@ Markdown lol \\|\\|spoiler\\|\\| \\~\\~strikethrough\\~\\~ \\`small \\*code\\* b } void set_pin_tested() { - assert(!pin_tested); + if (pin_tested) { + return; + } pin_tested = true; delete_message_if_done(); } void set_thread_tested() { - assert(!thread_tested); + if (thread_tested) { + return; + } thread_tested = true; delete_message_if_done(); } void set_file_tested(size_t index) { - assert(!files_tested[index]); + if (files_tested[index]) { + return; + } files_tested[index] = true; if (files_tested == std::array{true, true, true}) { set_test(MESSAGEFILE, files_success == std::array{true, true, true});