From ad90fa290f53649ab40d4cbb0c1d2a8ed253c429 Mon Sep 17 00:00:00 2001 From: omgronny Date: Fri, 22 Nov 2024 15:50:21 +0300 Subject: [PATCH] YT-22455: Introduce list jobs continuation token [nodiff:caesar] commit_hash:d45b3da99e7b19120e02298ca6e87c02cc800ea2 --- yt/cpp/mapreduce/interface/operation.h | 12 +++ .../rpc_parameters_serialization.cpp | 9 ++ yt/yt/client/api/operation_client.cpp | 101 ++++++++++++++++++ yt/yt/client/api/operation_client.h | 53 +++++++-- yt/yt/client/api/rpc_proxy/client_impl.cpp | 9 ++ yt/yt/client/api/rpc_proxy/helpers.cpp | 8 ++ yt/yt/client/driver/scheduler_commands.cpp | 16 +++ yt/yt/client/ya.make | 1 + yt/yt/core/ytree/yson_struct-inl.h | 13 +++ yt/yt/core/ytree/yson_struct.h | 3 + .../api/rpc_proxy/proto/api_service.proto | 7 ++ 11 files changed, 222 insertions(+), 10 deletions(-) diff --git a/yt/cpp/mapreduce/interface/operation.h b/yt/cpp/mapreduce/interface/operation.h index 76c4eabdf..a87ddc30b 100644 --- a/yt/cpp/mapreduce/interface/operation.h +++ b/yt/cpp/mapreduce/interface/operation.h @@ -2855,6 +2855,18 @@ struct TListJobsOptions /// @brief Return only jobs with monitoring descriptor. FLUENT_FIELD_OPTION(bool, WithMonitoringDescriptor); + /// + /// @brief Search for jobs with start time >= `FromTime`. + FLUENT_FIELD_OPTION(TInstant, FromTime); + + /// + /// @brief Search for jobs with start time <= `ToTime`. + FLUENT_FIELD_OPTION(TInstant, ToTime); + + /// + /// @brief Search for jobs with filters encoded in token. + FLUENT_FIELD_OPTION(TString, ContinuationToken); + /// @} /// diff --git a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp index bebc59ec9..3a78ed331 100644 --- a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp +++ b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp @@ -562,6 +562,15 @@ TNode SerializeParamsForListJobs( if (options.WithMonitoringDescriptor_) { result["with_monitoring_descriptor"] = *options.WithMonitoringDescriptor_; } + if (options.FromTime_) { + result["from_time"] = ToString(options.FromTime_); + } + if (options.ToTime_) { + result["to_time"] = ToString(options.ToTime_); + } + if (options.ContinuationToken_) { + result["continuation_token"] = *options.ContinuationToken_; + } if (options.SortField_) { result["sort_field"] = ToString(*options.SortField_); diff --git a/yt/yt/client/api/operation_client.cpp b/yt/yt/client/api/operation_client.cpp index 8f8dbb355..07253e5ee 100644 --- a/yt/yt/client/api/operation_client.cpp +++ b/yt/yt/client/api/operation_client.cpp @@ -3,14 +3,115 @@ #include #include +#include + +#include namespace NYT::NApi { using namespace NYTree; using namespace NJobTrackerClient; +using namespace NYson; //////////////////////////////////////////////////////////////////////////////// +void TListJobsContinuationTokenSerializer::Register(TRegistrar registrar) +{ + registrar.ExternalClassParameter("version", &TThat::Version) + .Default(0) + .DontSerializeDefault(); + + registrar.ExternalBaseClassParameter("job_competition_id", &TListJobsOptions::JobCompetitionId) + .Default() + .DontSerializeDefault(); + + registrar.ExternalBaseClassParameter("type", &TThat::Type) + .Default() + .DontSerializeDefault(); + + registrar.ExternalBaseClassParameter("state", &TThat::State) + .Default() + .DontSerializeDefault(); + + registrar.ExternalBaseClassParameter("address", &TThat::Address) + .Default() + .DontSerializeDefault(); + + registrar.ExternalBaseClassParameter("with_stderr", &TThat::WithStderr) + .Default() + .DontSerializeDefault(); + + registrar.ExternalBaseClassParameter("with_fail_context", &TThat::WithFailContext) + .Default() + .DontSerializeDefault(); + + registrar.ExternalBaseClassParameter("with_spec", &TThat::WithSpec) + .Default() + .DontSerializeDefault(); + + registrar.ExternalBaseClassParameter("with_competitors", &TThat::WithCompetitors) + .Default() + .DontSerializeDefault(); + + registrar.ExternalBaseClassParameter("with_monitoring_gescriptor", &TThat::WithMonitoringDescriptor) + .Default() + .DontSerializeDefault(); + + registrar.ExternalBaseClassParameter("task_name", &TThat::TaskName) + .Default() + .DontSerializeDefault(); + + registrar.ExternalBaseClassParameter("running_jobs_lookbehind_period", &TThat::RunningJobsLookbehindPeriod) + .Default(TDuration::Max()) + .DontSerializeDefault(); + + registrar.ExternalBaseClassParameter("sort_field", &TThat::SortField) + .Default() + .DontSerializeDefault(); + + registrar.ExternalBaseClassParameter("sort_order", &TThat::SortOrder) + .Default() + .DontSerializeDefault(); + + registrar.ExternalBaseClassParameter("offset", &TThat::Offset) + .Default(0) + .DontSerializeDefault(); + + registrar.ExternalBaseClassParameter("limit", &TThat::Limit) + .Default(1000) + .DontSerializeDefault(); + + registrar.ExternalBaseClassParameter("include_archive", &TThat::IncludeArchive) + .Default(false) + .DontSerializeDefault(); + + registrar.ExternalBaseClassParameter("include_cypress", &TThat::IncludeCypress) + .Default(false) + .DontSerializeDefault(); + + registrar.ExternalBaseClassParameter("include_controller_agent", &TThat::IncludeControllerAgent) + .Default(false) + .DontSerializeDefault(); +} + +TString EncodeNewToken(TListJobsOptions&& options, int jobCount) +{ + options.Offset += jobCount; + options.ContinuationToken.reset(); + + TListJobsContinuationToken token; + static_cast(token) = std::move(options); + + auto optionsYson = ConvertToYsonString(token); + return Base64Encode(optionsYson.ToString()); +} + +TListJobsOptions DecodeListJobsOptionsFromToken(const TString& continuationToken) +{ + auto optionsYson = TYsonString(Base64StrictDecode(continuationToken)); + return ConvertTo(optionsYson); +} + void Serialize( const TOperation& operation, NYson::IYsonConsumer* consumer, diff --git a/yt/yt/client/api/operation_client.h b/yt/yt/client/api/operation_client.h index 19cea82ad..ae11054bd 100644 --- a/yt/yt/client/api/operation_client.h +++ b/yt/yt/client/api/operation_client.h @@ -166,16 +166,16 @@ struct TPollJobShellResponse }; DEFINE_ENUM(EJobSortField, - ((None) (0)) - ((Type) (1)) - ((State) (2)) - ((StartTime) (3)) - ((FinishTime) (4)) - ((Address) (5)) - ((Duration) (6)) - ((Progress) (7)) - ((Id) (8)) - ((TaskName) (9)) + ((None) (0)) + ((Type) (1)) + ((State) (2)) + ((StartTime) (3)) + ((FinishTime) (4)) + ((Address) (5)) + ((Duration) (6)) + ((Progress) (7)) + ((Id) (8)) + ((TaskName) (9)) ); DEFINE_ENUM(EJobSortDirection, @@ -206,6 +206,11 @@ struct TListJobsOptions std::optional WithMonitoringDescriptor; std::optional TaskName; + std::optional FromTime; + std::optional ToTime; + + std::optional ContinuationToken; + TDuration RunningJobsLookbehindPeriod = TDuration::Max(); EJobSortField SortField = EJobSortField::None; @@ -221,6 +226,32 @@ struct TListJobsOptions EDataSource DataSource = EDataSource::Auto; }; +struct TListJobsContinuationToken + : public TListJobsOptions +{ + int Version = 0; +}; + +//////////////////////////////////////////////////////////////////////////////// + +class TListJobsContinuationTokenSerializer + : public virtual NYTree::TExternalizedYsonStruct +{ +public: + REGISTER_EXTERNALIZED_YSON_STRUCT(TListJobsContinuationToken, TListJobsContinuationTokenSerializer); + + static void Register(TRegistrar registrar); +}; + +ASSIGN_EXTERNAL_YSON_SERIALIZER(TListJobsContinuationToken, TListJobsContinuationTokenSerializer); + +//////////////////////////////////////////////////////////////////////////////// + +TString EncodeNewToken(TListJobsOptions&& options, int jobCount); +TListJobsOptions DecodeListJobsOptionsFromToken(const TString& continuationToken); + +//////////////////////////////////////////////////////////////////////////////// + struct TAbandonJobOptions : public TTimeoutOptions { }; @@ -387,6 +418,8 @@ struct TListJobsResult TListJobsStatistics Statistics; std::vector Errors; + + std::optional ContinuationToken; }; struct TGetJobStderrResponse diff --git a/yt/yt/client/api/rpc_proxy/client_impl.cpp b/yt/yt/client/api/rpc_proxy/client_impl.cpp index a763772b7..f7397d2f3 100644 --- a/yt/yt/client/api/rpc_proxy/client_impl.cpp +++ b/yt/yt/client/api/rpc_proxy/client_impl.cpp @@ -1486,6 +1486,15 @@ TFuture TClient::ListJobs( if (options.TaskName) { req->set_task_name(*options.TaskName); } + if (options.FromTime) { + req->set_from_time(NYT::ToProto(*options.FromTime)); + } + if (options.ToTime) { + req->set_to_time(NYT::ToProto(*options.ToTime)); + } + if (options.ContinuationToken) { + req->set_continuation_token(*options.ContinuationToken); + } req->set_sort_field(static_cast(options.SortField)); req->set_sort_order(static_cast(options.SortOrder)); diff --git a/yt/yt/client/api/rpc_proxy/helpers.cpp b/yt/yt/client/api/rpc_proxy/helpers.cpp index 88c12aa79..dba328684 100644 --- a/yt/yt/client/api/rpc_proxy/helpers.cpp +++ b/yt/yt/client/api/rpc_proxy/helpers.cpp @@ -407,6 +407,9 @@ void ToProto( if (result.ArchiveJobCount) { proto->set_archive_job_count(*result.ArchiveJobCount); } + if (result.ContinuationToken) { + proto->set_continuation_token(*result.ContinuationToken); + } ToProto(proto->mutable_statistics(), result.Statistics); ToProto(proto->mutable_errors(), result.Errors); @@ -433,6 +436,11 @@ void FromProto( } else { result->ArchiveJobCount.reset(); } + if (proto.has_continuation_token()) { + result->ContinuationToken = proto.continuation_token(); + } else { + result->ContinuationToken.reset(); + } FromProto(&result->Statistics, proto.statistics()); FromProto(&result->Errors, proto.errors()); diff --git a/yt/yt/client/driver/scheduler_commands.cpp b/yt/yt/client/driver/scheduler_commands.cpp index 53f82c7bf..d64f69d33 100644 --- a/yt/yt/client/driver/scheduler_commands.cpp +++ b/yt/yt/client/driver/scheduler_commands.cpp @@ -490,6 +490,21 @@ void TListJobsCommand::Register(TRegistrar registrar) [] (TThis* command) -> auto& { return command->Options.WithMonitoringDescriptor; }) .Optional(/*init*/ false); + registrar.ParameterWithUniversalAccessor>( + "from_time", + [] (TThis* command) -> auto& { return command->Options.FromTime; }) + .Optional(/*init*/ false); + + registrar.ParameterWithUniversalAccessor>( + "to_time", + [] (TThis* command) -> auto& { return command->Options.ToTime; }) + .Optional(/*init*/ false); + + registrar.ParameterWithUniversalAccessor>( + "continuation_token", + [] (TThis* command) -> auto& { return command->Options.ContinuationToken; }) + .Optional(/*init*/ false); + registrar.ParameterWithUniversalAccessor( "job_competition_id", [] (TThis* command) -> auto& { return command->Options.JobCompetitionId; }) @@ -593,6 +608,7 @@ void TListJobsCommand::DoExecute(ICommandContextPtr context) } }) .Item("errors").Value(result.Errors) + .Item("continuation_token").Value(result.ContinuationToken) .EndMap()); } diff --git a/yt/yt/client/ya.make b/yt/yt/client/ya.make index fef0c1093..2b384fe81 100644 --- a/yt/yt/client/ya.make +++ b/yt/yt/client/ya.make @@ -220,6 +220,7 @@ PEERDIR( yt/yt/library/quantile_digest yt/yt_proto/yt/client library/cpp/json + library/cpp/string_utils/base64 contrib/libs/pfr ) diff --git a/yt/yt/core/ytree/yson_struct-inl.h b/yt/yt/core/ytree/yson_struct-inl.h index 92bd2d65f..447105658 100644 --- a/yt/yt/core/ytree/yson_struct-inl.h +++ b/yt/yt/core/ytree/yson_struct-inl.h @@ -248,6 +248,19 @@ void TYsonStructRegistrar::ExternalPostprocessor(TExternalPostprocessor }); } +template +template +TYsonStructParameter& TYsonStructRegistrar::ExternalBaseClassParameter(const TString& key, TValue(TBase::*field)) +{ + static_assert(std::derived_from); + static_assert(std::derived_from); + auto universalAccessor = [field] (TStruct* serializer) -> auto& { + return serializer->That_->*field; + }; + + return ParameterWithUniversalAccessor(key, universalAccessor); +} + template void TYsonStructRegistrar::UnrecognizedStrategy(EUnrecognizedStrategy strategy) { diff --git a/yt/yt/core/ytree/yson_struct.h b/yt/yt/core/ytree/yson_struct.h index ae23668f0..097addf65 100644 --- a/yt/yt/core/ytree/yson_struct.h +++ b/yt/yt/core/ytree/yson_struct.h @@ -313,6 +313,9 @@ class TYsonStructRegistrar // requires std::derived_from> TYsonStructParameter& ExternalClassParameter(const TString& key, TValue(TExternal::*field)); + template + TYsonStructParameter& ExternalBaseClassParameter(const TString& key, TValue(TBase::*field)); + template // requires (CInvocable) void ExternalPreprocessor(TExternalPreprocessor preprocessor); diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto index 00d0104d4..2ea1b46e2 100644 --- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto +++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto @@ -2409,6 +2409,11 @@ message TReqListJobs optional bool with_monitoring_descriptor = 21; + optional uint64 from_time = 22; // TInstant + optional uint64 to_time = 23; // TInstant + + optional string continuation_token = 24; + optional TMasterReadOptions master_read_options = 102; } @@ -3179,6 +3184,8 @@ message TListJobsResult required TListJobsStatistics statistics = 5; repeated NYT.NProto.TError errors = 6; + + optional string continuation_token = 7; } message TJobTraceEvent