Skip to content

Commit

Permalink
fix: Fix the memory reclaim bytes for hash join (#11642)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #11642

Both hash join and probe does the coordinated spill so we shouldn't report the reclaimed bytes from a single node
but shall report from the plan node. Also probe side spill might spill built table from join side and the memory is
actually reclaimed from build side pool instead of probe side.

This PR also removes the unused wait for spill state from hash build

Reviewed By: bikramSingh91, tanjialiang

Differential Revision: D66437719

fbshipit-source-id: 4ac7bb9cf87b4346d234ef5f7c04ed64ee12d249
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Nov 28, 2024
1 parent 4a4b4a0 commit 1ce3c7a
Show file tree
Hide file tree
Showing 10 changed files with 299 additions and 43 deletions.
6 changes: 3 additions & 3 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1140,14 +1140,14 @@ std::string blockingReasonToString(BlockingReason reason) {
return "kWaitForMemory";
case BlockingReason::kWaitForConnector:
return "kWaitForConnector";
case BlockingReason::kWaitForSpill:
return "kWaitForSpill";
case BlockingReason::kYield:
return "kYield";
case BlockingReason::kWaitForArbitration:
return "kWaitForArbitration";
default:
VELOX_UNREACHABLE(
fmt::format("Unknown blocking reason {}", static_cast<int>(reason)));
}
VELOX_UNREACHABLE();
}

DriverThreadContext* driverThreadContext() {
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/Driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ enum class BlockingReason {
kWaitForConnector,
/// Build operator is blocked waiting for all its peers to stop to run group
/// spill on all of them.
///
/// TODO: remove this after Prestissimo is updated.
kWaitForSpill,
/// Some operators (like Table Scan) may run long loops and can 'voluntarily'
/// exit them because Task requested to yield or stop or after a certain time.
Expand Down
13 changes: 0 additions & 13 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ BlockingReason fromStateToBlockingReason(HashBuild::State state) {
return BlockingReason::kNotBlocked;
case HashBuild::State::kYield:
return BlockingReason::kYield;
case HashBuild::State::kWaitForSpill:
return BlockingReason::kWaitForSpill;
case HashBuild::State::kWaitForBuild:
return BlockingReason::kWaitForJoinBuild;
case HashBuild::State::kWaitForProbe:
Expand Down Expand Up @@ -944,13 +942,6 @@ BlockingReason HashBuild::isBlocked(ContinueFuture* future) {
break;
case State::kFinish:
break;
case State::kWaitForSpill:
if (!future_.valid()) {
setRunning();
VELOX_CHECK_NOT_NULL(input_);
addInput(std::move(input_));
}
break;
case State::kWaitForBuild:
[[fallthrough]];
case State::kWaitForProbe:
Expand Down Expand Up @@ -1003,8 +994,6 @@ void HashBuild::checkStateTransition(State state) {
break;
case State::kWaitForBuild:
[[fallthrough]];
case State::kWaitForSpill:
[[fallthrough]];
case State::kWaitForProbe:
[[fallthrough]];
case State::kFinish:
Expand All @@ -1022,8 +1011,6 @@ std::string HashBuild::stateName(State state) {
return "RUNNING";
case State::kYield:
return "YIELD";
case State::kWaitForSpill:
return "WAIT_FOR_SPILL";
case State::kWaitForBuild:
return "WAIT_FOR_BUILD";
case State::kWaitForProbe:
Expand Down
9 changes: 3 additions & 6 deletions velox/exec/HashBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,14 @@ class HashBuild final : public Operator {
/// The yield state that voluntarily yield cpu after running too long when
/// processing input from spilled file.
kYield = 2,
/// The state that waits for the pending group spill to finish. This state
/// only applies if disk spilling is enabled.
kWaitForSpill = 3,
/// The state that waits for the hash tables to be merged together.
kWaitForBuild = 4,
kWaitForBuild = 3,
/// The state that waits for the hash probe to finish before start to build
/// the hash table for one of previously spilled partition. This state only
/// applies if disk spilling is enabled.
kWaitForProbe = 5,
kWaitForProbe = 4,
/// The finishing state.
kFinish = 6,
kFinish = 5,
};
static std::string stateName(State state);

Expand Down
24 changes: 14 additions & 10 deletions velox/exec/HashJoinBridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -382,19 +382,20 @@ uint64_t HashJoinMemoryReclaimer::reclaim(
uint64_t targetBytes,
uint64_t maxWaitMs,
memory::MemoryReclaimer::Stats& stats) {
const auto prevNodeReservedMemory = pool->reservedBytes();

// The flags to track if we have reclaimed from both build and probe operators
// under a hash join node.
bool hasReclaimedFromBuild{false};
bool hasReclaimedFromProbe{false};
uint64_t reclaimedBytes{0};
pool->visitChildren([&](memory::MemoryPool* child) {
VELOX_CHECK_EQ(child->kind(), memory::MemoryPool::Kind::kLeaf);
const bool isBuild = isHashBuildMemoryPool(*child);
if (isBuild) {
if (!hasReclaimedFromBuild) {
// We just need to reclaim from any one of the hash build operator.
hasReclaimedFromBuild = true;
reclaimedBytes += child->reclaim(targetBytes, maxWaitMs, stats);
child->reclaim(targetBytes, maxWaitMs, stats);
}
return !hasReclaimedFromProbe;
}
Expand All @@ -403,22 +404,25 @@ uint64_t HashJoinMemoryReclaimer::reclaim(
// The same as build operator, we only need to reclaim from any one of the
// hash probe operator.
hasReclaimedFromProbe = true;
reclaimedBytes += child->reclaim(targetBytes, maxWaitMs, stats);
child->reclaim(targetBytes, maxWaitMs, stats);
}
return !hasReclaimedFromBuild;
});
if (reclaimedBytes != 0) {
return reclaimedBytes;

auto currNodeReservedMemory = pool->reservedBytes();
VELOX_CHECK_LE(currNodeReservedMemory, prevNodeReservedMemory);
if (currNodeReservedMemory < prevNodeReservedMemory) {
return prevNodeReservedMemory - currNodeReservedMemory;
}

auto joinBridge = joinBridge_.lock();
if (joinBridge == nullptr) {
return reclaimedBytes;
return 0;
}
const auto oldNodeReservedMemory = pool->reservedBytes();
joinBridge->reclaim();
const auto newNodeReservedMemory = pool->reservedBytes();
VELOX_CHECK_LE(newNodeReservedMemory, oldNodeReservedMemory);
return oldNodeReservedMemory - newNodeReservedMemory;
currNodeReservedMemory = pool->reservedBytes();
VELOX_CHECK_LE(currNodeReservedMemory, prevNodeReservedMemory);
return prevNodeReservedMemory - currNodeReservedMemory;
}

bool isHashBuildMemoryPool(const memory::MemoryPool& pool) {
Expand Down
6 changes: 2 additions & 4 deletions velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -647,9 +647,7 @@ RowVectorPtr Task::next(ContinueFuture* future) {
}

VELOX_CHECK_EQ(
static_cast<int>(state_),
static_cast<int>(kRunning),
"Task has already finished processing.");
state_, TaskState::kRunning, "Task has already finished processing.");

// On first call, create the drivers.
if (driverFactories_.empty()) {
Expand Down Expand Up @@ -1480,7 +1478,7 @@ void Task::noMoreSplits(const core::PlanNodeId& planNodeId) {
}

if (allFinished) {
terminate(kFinished);
terminate(TaskState::kFinished);
}
}

Expand Down
4 changes: 2 additions & 2 deletions velox/exec/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -613,13 +613,13 @@ class Task : public std::enable_shared_from_this<Task> {
/// realized when the last thread stops running for 'this'. This is used to
/// mark cancellation by the user.
ContinueFuture requestCancel() {
return terminate(kCanceled);
return terminate(TaskState::kCanceled);
}

/// Like requestCancel but sets end state to kAborted. This is for stopping
/// Tasks due to failures of other parts of the query.
ContinueFuture requestAbort() {
return terminate(kAborted);
return terminate(TaskState::kAborted);
}

void requestYield() {
Expand Down
28 changes: 27 additions & 1 deletion velox/exec/TaskStructs.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,24 @@ class MergeSource;
class MergeJoinSource;
struct Split;

#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY
enum TaskState {
kRunning = 0,
kFinished = 1,
kCanceled = 2,
kAborted = 3,
kFailed = 4
};
#else
/// Corresponds to Presto TaskState, needed for reporting query completion.
enum TaskState { kRunning, kFinished, kCanceled, kAborted, kFailed };
enum class TaskState : int {
kRunning = 0,
kFinished = 1,
kCanceled = 2,
kAborted = 3,
kFailed = 4
};
#endif

std::string taskStateString(TaskState state);

Expand Down Expand Up @@ -139,3 +155,13 @@ struct SplitGroupState {
};

} // namespace facebook::velox::exec

template <>
struct fmt::formatter<facebook::velox::exec::TaskState>
: formatter<std::string> {
auto format(facebook::velox::exec::TaskState state, format_context& ctx)
const {
return formatter<std::string>::format(
facebook::velox::exec::taskStateString(state), ctx);
}
};
Loading

0 comments on commit 1ce3c7a

Please sign in to comment.