diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp index d3e3b4332337..17653822b18c 100644 --- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp +++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp @@ -9,8 +9,10 @@ #include #include +#include #include #include +#include #include #include @@ -223,11 +225,12 @@ class TRowDispatcher : public TActorBootstrapped { void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed::TPtr&); void Handle(NFq::TEvPrivate::TEvUpdateMetrics::TPtr&); void Handle(NFq::TEvPrivate::TEvPrintStateToLog::TPtr&); + void Handle(const NMon::TEvHttpInfo::TPtr&); void DeleteConsumer(const ConsumerSessionKey& key); void UpdateInterconnectSessions(const NActors::TActorId& interconnectSession); void UpdateMetrics(); - void PrintInternalState(); + TString GetInternalState(); STRICT_STFUNC( StateFunc, { @@ -252,6 +255,7 @@ class TRowDispatcher : public TActorBootstrapped { hFunc(NFq::TEvRowDispatcher::TEvNewDataArrived, Handle); hFunc(NFq::TEvPrivate::TEvUpdateMetrics, Handle); hFunc(NFq::TEvPrivate::TEvPrintStateToLog, Handle); + hFunc(NMon::TEvHttpInfo, Handle); }) }; @@ -287,6 +291,13 @@ void TRowDispatcher::Bootstrap() { Schedule(TDuration::Seconds(CoordinatorPingPeriodSec), new TEvPrivate::TEvCoordinatorPing()); Schedule(TDuration::Seconds(UpdateMetricsPeriodSec), new NFq::TEvPrivate::TEvUpdateMetrics()); Schedule(TDuration::Seconds(PrintStateToLogPeriodSec), new NFq::TEvPrivate::TEvPrintStateToLog()); + + NActors::TMon* mon = NKikimr::AppData()->Mon; + if (mon) { + ::NMonitoring::TIndexMonPage* actorsMonPage = mon->RegisterIndexPage("actors", "Actors"); + mon->RegisterActorPage(actorsMonPage, "row_dispatcher", "Row Dispatcher", false, + TlsActivationContext->ExecutorThread.ActorSystem, SelfId()); + } } void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPtr& ev) { @@ -372,7 +383,7 @@ void TRowDispatcher::UpdateMetrics() { } } -void TRowDispatcher::PrintInternalState() { +TString TRowDispatcher::GetInternalState() { TStringStream str; str << "Statistics:\n"; for (auto& [key, sessionsInfo] : TopicSessions) { @@ -390,7 +401,7 @@ void TRowDispatcher::PrintInternalState() { } } } - LOG_ROW_DISPATCHER_DEBUG(str.Str()); + return str.Str(); } void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) { @@ -632,10 +643,22 @@ void TRowDispatcher::Handle(NFq::TEvPrivate::TEvUpdateMetrics::TPtr&) { } void TRowDispatcher::Handle(NFq::TEvPrivate::TEvPrintStateToLog::TPtr&) { - PrintInternalState(); + LOG_ROW_DISPATCHER_DEBUG(GetInternalState()); Schedule(TDuration::Seconds(PrintStateToLogPeriodSec), new NFq::TEvPrivate::TEvPrintStateToLog()); } +void TRowDispatcher::Handle(const NMon::TEvHttpInfo::TPtr& ev) { + TStringStream str; + HTML(str) { + PRE() { + str << "Current state:" << Endl; + str << GetInternalState() << Endl; + str << Endl; + } + } + Send(ev->Sender, new NMon::TEvHttpInfoRes(str.Str())); +} + void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvSessionStatistic::TPtr& ev) { LOG_ROW_DISPATCHER_TRACE("TEvSessionStatistic from " << ev->Sender); const auto& key = ev->Get()->Stat.SessionKey; diff --git a/ydb/core/fq/libs/row_dispatcher/ya.make b/ydb/core/fq/libs/row_dispatcher/ya.make index 405fe5f3cae4..45eb3c9e0fdf 100644 --- a/ydb/core/fq/libs/row_dispatcher/ya.make +++ b/ydb/core/fq/libs/row_dispatcher/ya.make @@ -14,11 +14,13 @@ SRCS( PEERDIR( contrib/libs/fmt contrib/libs/simdjson + ydb/core/base ydb/core/fq/libs/actors/logging ydb/core/fq/libs/config/protos ydb/core/fq/libs/row_dispatcher/events ydb/core/fq/libs/shared_resources ydb/core/fq/libs/ydb + ydb/core/mon ydb/library/actors/core ydb/library/security ydb/library/yql/dq/actors/common