Skip to content

Commit

Permalink
YT-22455: Introduce list jobs continuation token
Browse files Browse the repository at this point in the history
[nodiff:caesar]
commit_hash:d45b3da99e7b19120e02298ca6e87c02cc800ea2
  • Loading branch information
omgronny committed Nov 22, 2024
1 parent 077b35d commit ad90fa2
Show file tree
Hide file tree
Showing 11 changed files with 222 additions and 10 deletions.
12 changes: 12 additions & 0 deletions yt/cpp/mapreduce/interface/operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -2855,6 +2855,18 @@ struct TListJobsOptions
/// @brief Return only jobs with monitoring descriptor.
FLUENT_FIELD_OPTION(bool, WithMonitoringDescriptor);

///
/// @brief Search for jobs with start time >= `FromTime`.
FLUENT_FIELD_OPTION(TInstant, FromTime);

///
/// @brief Search for jobs with start time <= `ToTime`.
FLUENT_FIELD_OPTION(TInstant, ToTime);

///
/// @brief Search for jobs with filters encoded in token.
FLUENT_FIELD_OPTION(TString, ContinuationToken);

/// @}

///
Expand Down
9 changes: 9 additions & 0 deletions yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,15 @@ TNode SerializeParamsForListJobs(
if (options.WithMonitoringDescriptor_) {
result["with_monitoring_descriptor"] = *options.WithMonitoringDescriptor_;
}
if (options.FromTime_) {
result["from_time"] = ToString(options.FromTime_);
}
if (options.ToTime_) {
result["to_time"] = ToString(options.ToTime_);
}
if (options.ContinuationToken_) {
result["continuation_token"] = *options.ContinuationToken_;
}

if (options.SortField_) {
result["sort_field"] = ToString(*options.SortField_);
Expand Down
101 changes: 101 additions & 0 deletions yt/yt/client/api/operation_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,115 @@
#include <yt/yt/client/job_tracker_client/helpers.h>

#include <yt/yt/core/ytree/fluent.h>
#include <yt/yt/core/ytree/yson_struct.h>

#include <library/cpp/string_utils/base64/base64.h>

namespace NYT::NApi {

using namespace NYTree;
using namespace NJobTrackerClient;
using namespace NYson;

////////////////////////////////////////////////////////////////////////////////

void TListJobsContinuationTokenSerializer::Register(TRegistrar registrar)
{
registrar.ExternalClassParameter("version", &TThat::Version)
.Default(0)
.DontSerializeDefault();

registrar.ExternalBaseClassParameter("job_competition_id", &TListJobsOptions::JobCompetitionId)
.Default()
.DontSerializeDefault();

registrar.ExternalBaseClassParameter("type", &TThat::Type)
.Default()
.DontSerializeDefault();

registrar.ExternalBaseClassParameter("state", &TThat::State)
.Default()
.DontSerializeDefault();

registrar.ExternalBaseClassParameter("address", &TThat::Address)
.Default()
.DontSerializeDefault();

registrar.ExternalBaseClassParameter("with_stderr", &TThat::WithStderr)
.Default()
.DontSerializeDefault();

registrar.ExternalBaseClassParameter("with_fail_context", &TThat::WithFailContext)
.Default()
.DontSerializeDefault();

registrar.ExternalBaseClassParameter("with_spec", &TThat::WithSpec)
.Default()
.DontSerializeDefault();

registrar.ExternalBaseClassParameter("with_competitors", &TThat::WithCompetitors)
.Default()
.DontSerializeDefault();

registrar.ExternalBaseClassParameter("with_monitoring_gescriptor", &TThat::WithMonitoringDescriptor)
.Default()
.DontSerializeDefault();

registrar.ExternalBaseClassParameter("task_name", &TThat::TaskName)
.Default()
.DontSerializeDefault();

registrar.ExternalBaseClassParameter("running_jobs_lookbehind_period", &TThat::RunningJobsLookbehindPeriod)
.Default(TDuration::Max())
.DontSerializeDefault();

registrar.ExternalBaseClassParameter("sort_field", &TThat::SortField)
.Default()
.DontSerializeDefault();

registrar.ExternalBaseClassParameter("sort_order", &TThat::SortOrder)
.Default()
.DontSerializeDefault();

registrar.ExternalBaseClassParameter("offset", &TThat::Offset)
.Default(0)
.DontSerializeDefault();

registrar.ExternalBaseClassParameter("limit", &TThat::Limit)
.Default(1000)
.DontSerializeDefault();

registrar.ExternalBaseClassParameter("include_archive", &TThat::IncludeArchive)
.Default(false)
.DontSerializeDefault();

registrar.ExternalBaseClassParameter("include_cypress", &TThat::IncludeCypress)
.Default(false)
.DontSerializeDefault();

registrar.ExternalBaseClassParameter("include_controller_agent", &TThat::IncludeControllerAgent)
.Default(false)
.DontSerializeDefault();
}

TString EncodeNewToken(TListJobsOptions&& options, int jobCount)
{
options.Offset += jobCount;
options.ContinuationToken.reset();

TListJobsContinuationToken token;
static_cast<TListJobsOptions&>(token) = std::move(options);

auto optionsYson = ConvertToYsonString(token);
return Base64Encode(optionsYson.ToString());
}

TListJobsOptions DecodeListJobsOptionsFromToken(const TString& continuationToken)
{
auto optionsYson = TYsonString(Base64StrictDecode(continuationToken));
return ConvertTo<TListJobsContinuationToken>(optionsYson);
}

void Serialize(
const TOperation& operation,
NYson::IYsonConsumer* consumer,
Expand Down
53 changes: 43 additions & 10 deletions yt/yt/client/api/operation_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,16 +166,16 @@ struct TPollJobShellResponse
};

DEFINE_ENUM(EJobSortField,
((None) (0))
((Type) (1))
((State) (2))
((StartTime) (3))
((FinishTime) (4))
((Address) (5))
((Duration) (6))
((Progress) (7))
((Id) (8))
((TaskName) (9))
((None) (0))
((Type) (1))
((State) (2))
((StartTime) (3))
((FinishTime) (4))
((Address) (5))
((Duration) (6))
((Progress) (7))
((Id) (8))
((TaskName) (9))
);

DEFINE_ENUM(EJobSortDirection,
Expand Down Expand Up @@ -206,6 +206,11 @@ struct TListJobsOptions
std::optional<bool> WithMonitoringDescriptor;
std::optional<TString> TaskName;

std::optional<TInstant> FromTime;
std::optional<TInstant> ToTime;

std::optional<TString> ContinuationToken;

TDuration RunningJobsLookbehindPeriod = TDuration::Max();

EJobSortField SortField = EJobSortField::None;
Expand All @@ -221,6 +226,32 @@ struct TListJobsOptions
EDataSource DataSource = EDataSource::Auto;
};

struct TListJobsContinuationToken
: public TListJobsOptions
{
int Version = 0;
};

////////////////////////////////////////////////////////////////////////////////

class TListJobsContinuationTokenSerializer
: public virtual NYTree::TExternalizedYsonStruct
{
public:
REGISTER_EXTERNALIZED_YSON_STRUCT(TListJobsContinuationToken, TListJobsContinuationTokenSerializer);

static void Register(TRegistrar registrar);
};

ASSIGN_EXTERNAL_YSON_SERIALIZER(TListJobsContinuationToken, TListJobsContinuationTokenSerializer);

////////////////////////////////////////////////////////////////////////////////

TString EncodeNewToken(TListJobsOptions&& options, int jobCount);
TListJobsOptions DecodeListJobsOptionsFromToken(const TString& continuationToken);

////////////////////////////////////////////////////////////////////////////////

struct TAbandonJobOptions
: public TTimeoutOptions
{ };
Expand Down Expand Up @@ -387,6 +418,8 @@ struct TListJobsResult
TListJobsStatistics Statistics;

std::vector<TError> Errors;

std::optional<TString> ContinuationToken;
};

struct TGetJobStderrResponse
Expand Down
9 changes: 9 additions & 0 deletions yt/yt/client/api/rpc_proxy/client_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1486,6 +1486,15 @@ TFuture<TListJobsResult> TClient::ListJobs(
if (options.TaskName) {
req->set_task_name(*options.TaskName);
}
if (options.FromTime) {
req->set_from_time(NYT::ToProto(*options.FromTime));
}
if (options.ToTime) {
req->set_to_time(NYT::ToProto(*options.ToTime));
}
if (options.ContinuationToken) {
req->set_continuation_token(*options.ContinuationToken);
}

req->set_sort_field(static_cast<NProto::EJobSortField>(options.SortField));
req->set_sort_order(static_cast<NProto::EJobSortDirection>(options.SortOrder));
Expand Down
8 changes: 8 additions & 0 deletions yt/yt/client/api/rpc_proxy/helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,9 @@ void ToProto(
if (result.ArchiveJobCount) {
proto->set_archive_job_count(*result.ArchiveJobCount);
}
if (result.ContinuationToken) {
proto->set_continuation_token(*result.ContinuationToken);
}

ToProto(proto->mutable_statistics(), result.Statistics);
ToProto(proto->mutable_errors(), result.Errors);
Expand All @@ -433,6 +436,11 @@ void FromProto(
} else {
result->ArchiveJobCount.reset();
}
if (proto.has_continuation_token()) {
result->ContinuationToken = proto.continuation_token();
} else {
result->ContinuationToken.reset();
}

FromProto(&result->Statistics, proto.statistics());
FromProto(&result->Errors, proto.errors());
Expand Down
16 changes: 16 additions & 0 deletions yt/yt/client/driver/scheduler_commands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,21 @@ void TListJobsCommand::Register(TRegistrar registrar)
[] (TThis* command) -> auto& { return command->Options.WithMonitoringDescriptor; })
.Optional(/*init*/ false);

registrar.ParameterWithUniversalAccessor<std::optional<TInstant>>(
"from_time",
[] (TThis* command) -> auto& { return command->Options.FromTime; })
.Optional(/*init*/ false);

registrar.ParameterWithUniversalAccessor<std::optional<TInstant>>(
"to_time",
[] (TThis* command) -> auto& { return command->Options.ToTime; })
.Optional(/*init*/ false);

registrar.ParameterWithUniversalAccessor<std::optional<TString>>(
"continuation_token",
[] (TThis* command) -> auto& { return command->Options.ContinuationToken; })
.Optional(/*init*/ false);

registrar.ParameterWithUniversalAccessor<TJobId>(
"job_competition_id",
[] (TThis* command) -> auto& { return command->Options.JobCompetitionId; })
Expand Down Expand Up @@ -593,6 +608,7 @@ void TListJobsCommand::DoExecute(ICommandContextPtr context)
}
})
.Item("errors").Value(result.Errors)
.Item("continuation_token").Value(result.ContinuationToken)
.EndMap());
}

Expand Down
1 change: 1 addition & 0 deletions yt/yt/client/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ PEERDIR(
yt/yt/library/quantile_digest
yt/yt_proto/yt/client
library/cpp/json
library/cpp/string_utils/base64
contrib/libs/pfr
)

Expand Down
13 changes: 13 additions & 0 deletions yt/yt/core/ytree/yson_struct-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,19 @@ void TYsonStructRegistrar<TStruct>::ExternalPostprocessor(TExternalPostprocessor
});
}

template <class TStruct>
template <class TBase, class TValue>
TYsonStructParameter<TValue>& TYsonStructRegistrar<TStruct>::ExternalBaseClassParameter(const TString& key, TValue(TBase::*field))
{
static_assert(std::derived_from<TStruct, TExternalizedYsonStruct>);
static_assert(std::derived_from<typename TStruct::TExternal, TBase>);
auto universalAccessor = [field] (TStruct* serializer) -> auto& {
return serializer->That_->*field;
};

return ParameterWithUniversalAccessor<TValue>(key, universalAccessor);
}

template <class TStruct>
void TYsonStructRegistrar<TStruct>::UnrecognizedStrategy(EUnrecognizedStrategy strategy)
{
Expand Down
3 changes: 3 additions & 0 deletions yt/yt/core/ytree/yson_struct.h
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,9 @@ class TYsonStructRegistrar
// requires std::derived_from<TStruct, TExternalizedYsonStruct<TExternal, TStruct>>
TYsonStructParameter<TValue>& ExternalClassParameter(const TString& key, TValue(TExternal::*field));

template <class TBase, class TValue>
TYsonStructParameter<TValue>& ExternalBaseClassParameter(const TString& key, TValue(TBase::*field));

template <class TExternalPreprocessor>
// requires (CInvocable<TExternalPreprocessor, void(typename TStruct::TExternal*)>)
void ExternalPreprocessor(TExternalPreprocessor preprocessor);
Expand Down
7 changes: 7 additions & 0 deletions yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2409,6 +2409,11 @@ message TReqListJobs

optional bool with_monitoring_descriptor = 21;

optional uint64 from_time = 22; // TInstant
optional uint64 to_time = 23; // TInstant

optional string continuation_token = 24;

optional TMasterReadOptions master_read_options = 102;
}

Expand Down Expand Up @@ -3179,6 +3184,8 @@ message TListJobsResult

required TListJobsStatistics statistics = 5;
repeated NYT.NProto.TError errors = 6;

optional string continuation_token = 7;
}

message TJobTraceEvent
Expand Down

0 comments on commit ad90fa2

Please sign in to comment.