From 5b30a8ab4d44d34f79c8fcad7ea1e3fe59559244 Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Wed, 12 Feb 2025 20:39:48 +0300 Subject: [PATCH 1/5] Fix busywait on adding to full async input buffer DoExecute->PollAsyncInput->source.PollAsyncInput->ContinueExecution->DoExecute... --- .../yql/dq/actors/compute/dq_compute_actor_async_input_helper.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h index 22aa7e6f586e..1f59859ea1a5 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h @@ -98,7 +98,7 @@ struct TComputeActorAsyncInputHelper { } else { CA_LOG_T("Skip polling async input[" << Index << "]: no free space: " << freeSpace); - return EResumeSource::CAPollAsyncNoSpace; // If there is no free space in buffer, => we have something to process + // If there is no free space in buffer, => we have something to process } return {}; } From f09ceb169fda186e2ee4eae8fb8e2496117d903c Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Fri, 14 Feb 2025 18:06:18 +0300 Subject: [PATCH 2/5] Revert "Fix busywait on adding to full async input buffer" This reverts commit 5b30a8ab4d44d34f79c8fcad7ea1e3fe59559244. --- .../yql/dq/actors/compute/dq_compute_actor_async_input_helper.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h index 1f59859ea1a5..22aa7e6f586e 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h @@ -98,7 +98,7 @@ struct TComputeActorAsyncInputHelper { } else { CA_LOG_T("Skip polling async input[" << Index << "]: no free space: " << freeSpace); - // If there is no free space in buffer, => we have something to process + return EResumeSource::CAPollAsyncNoSpace; // If there is no free space in buffer, => we have something to process } return {}; } From c73b4ced58d4c72a908873cabfa6bdfb7f1fdd18 Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Fri, 14 Feb 2025 20:05:48 +0300 Subject: [PATCH 3/5] try2: if last run tried to output, but cannot, don't retry poll on full input --- .../yql/dq/actors/compute/dq_async_compute_actor.cpp | 8 +++++++- .../yql/dq/actors/compute/dq_compute_actor_impl.h | 10 +++++++--- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index 167a9490323d..1058b8c88846 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -696,7 +696,13 @@ class TDqAsyncComputeActor : public TDqComputeActorBase } } - void PollAsyncInput() { + void PollAsyncInput(bool continueExecuteOnFull = true) { if (!Running) { CA_LOG_T("Skip polling inputs and sources because not running"); return; @@ -1450,7 +1450,9 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped CA_LOG_T("Poll inputs"); for (auto& [inputIndex, transform] : InputTransformsMap) { if (auto resume = transform.PollAsyncInput(MetricsReporter, WatermarksTracker, RuntimeSettings.AsyncInputPushLimit)) { - ContinueExecute(*resume); + if (*resume != EResumeSource::CAPollAsyncNoSpace || continueExecuteOnFull) { + ContinueExecute(*resume); + } } } @@ -1463,7 +1465,9 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped CA_LOG_T("Poll sources"); for (auto& [inputIndex, source] : SourcesMap) { if (auto resume = source.PollAsyncInput(MetricsReporter, WatermarksTracker, RuntimeSettings.AsyncInputPushLimit)) { - ContinueExecute(*resume); + if (*resume != EResumeSource::CAPollAsyncNoSpace || continueExecuteOnFull) { + ContinueExecute(*resume); + } } } } From ba8b5fd75b4d45ea796a9d8318fa1269a375ffc0 Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Mon, 17 Feb 2025 20:13:13 +0300 Subject: [PATCH 4/5] LastRunStatus was always overwritten (cherry picked from commit 7cb4eb1d3f9d9983880aad7a50fe84153386e625) --- ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 757b8af2ef62..50183c67d3c5 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -367,13 +367,13 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped } void ProcessOutputsImpl(ERunStatus status) { - ProcessOutputsState.LastRunStatus = status; - CA_LOG_T("ProcessOutputsState.Inflight: " << ProcessOutputsState.Inflight); if (ProcessOutputsState.Inflight == 0) { ProcessOutputsState = TProcessOutputsState(); } + ProcessOutputsState.LastRunStatus = status; + for (auto& entry : OutputChannelsMap) { const ui64 channelId = entry.first; TOutputChannelInfo& outputChannel = entry.second; From fd1892357293f15b5eca611b87c728c6408bdd96 Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Tue, 18 Feb 2025 08:29:58 +0000 Subject: [PATCH 5/5] Explicitly track isFull --- .../actors/compute/dq_async_compute_actor.cpp | 21 ++++++++++++------- .../dq/actors/compute/dq_compute_actor_impl.h | 3 ++- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index 1058b8c88846..a7b1500ad49e 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -528,6 +528,9 @@ class TDqAsyncComputeActor : public TDqComputeActorBaseShouldSkipData(outputChannel.ChannelId); const bool hasFreeMemory = Channels->HasFreeMemoryInChannel(outputChannel.ChannelId); UpdateBlocked(outputChannel, !hasFreeMemory); + if (!hasFreeMemory) { + ProcessOutputsState.IsFull = true; + } if (!shouldSkipData && !outputChannel.EarlyFinish && !hasFreeMemory) { CA_LOG_T("DrainOutputChannel return because No free memory in channel, channel: " << outputChannel.ChannelId); @@ -555,6 +558,9 @@ class TDqAsyncComputeActor : public TDqComputeActorBaseGetFreeSpace(); + if (sinkFreeSpaceBeforeSend <= 0) { + ProcessOutputsState.IsFull = true; + } i64 toSend = sinkFreeSpaceBeforeSend + allowedOvercommit; CA_LOG_T("About to drain sink " << outputIndex @@ -696,13 +702,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBaseHasFreeMemoryInChannel(outputChannel.ChannelId)) { + ProcessOutputsState.IsFull = true; + } + ProcessOutputsState.DataWasSent |= asyncData.Changed; ProcessOutputsState.AllOutputsFinished = @@ -1024,6 +1028,9 @@ class TDqAsyncComputeActor : public TDqComputeActorBaseGetFreeSpace()) << ", sent data from buffer: " << dataSize); + if (sinkInfo.AsyncOutput->GetFreeSpace() <= 0) { + ProcessOutputsState.IsFull = true; + } ProcessOutputsState.DataWasSent |= dataWasSent; ProcessOutputsState.AllOutputsFinished = diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 50183c67d3c5..81babad5b4c8 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -438,7 +438,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped return; } - if (status != ERunStatus::Finished) { + if (status == ERunStatus::PendingInput) { for (auto& [id, inputTransform] : InputTransformsMap) { if (!inputTransform.Buffer->Empty()) { ContinueExecute(EResumeSource::CAPendingInput); @@ -2030,6 +2030,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped bool AllOutputsFinished = true; ERunStatus LastRunStatus = ERunStatus::PendingInput; bool LastPopReturnedNoData = false; + bool IsFull = false; }; TProcessOutputsState ProcessOutputsState;