From 47adfbcd986cf1fdb7476741d3c36ace98189fdc Mon Sep 17 00:00:00 2001 From: Saurabh Vishwas Joshi Date: Thu, 31 Oct 2024 11:30:28 -0700 Subject: [PATCH] [push_manager] change fulfillment of push request from round robin to FIFO --- src/ray/object_manager/push_manager.cc | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/ray/object_manager/push_manager.cc b/src/ray/object_manager/push_manager.cc index 55d988be43c2..4bbc54e64f92 100644 --- a/src/ray/object_manager/push_manager.cc +++ b/src/ray/object_manager/push_manager.cc @@ -63,18 +63,19 @@ void PushManager::OnChunkComplete(const NodeID &dest_id, const ObjectID &obj_id) void PushManager::ScheduleRemainingPushes() { bool keep_looping = true; - // Loop over all active pushes for approximate round-robin prioritization. - // TODO(ekl) this isn't the best implementation of round robin, we should - // consider tracking the number of chunks active per-push and balancing those. + + // Keep looping while we have capacity and are making progress while (chunks_in_flight_ < max_chunks_in_flight_ && keep_looping) { - // Loop over each active push and try to send another chunk. + // Loop over the list of active pushes and try to send another chunk auto it = push_requests_with_chunks_to_send_.begin(); keep_looping = false; + // Walk through each push request in the list while (it != push_requests_with_chunks_to_send_.end() && chunks_in_flight_ < max_chunks_in_flight_) { auto push_id = it->first; auto &info = it->second; - if (info->SendOneChunk()) { + while (chunks_in_flight_ < max_chunks_in_flight_ && info->SendOneChunk()) { + // finish as many chunks of the head request as possible chunks_in_flight_ += 1; keep_looping = true; RAY_LOG(DEBUG) << "Sending chunk " << info->next_chunk_id << " of " @@ -83,6 +84,7 @@ void PushManager::ScheduleRemainingPushes() { << " / " << max_chunks_in_flight_ << " max, remaining chunks: " << NumChunksRemaining(); } + // If all chunks are sent for the current request, remove it from the list if (info->NoChunksToSend()) { it = push_requests_with_chunks_to_send_.erase(it); } else {