Skip to content

Commit

Permalink
fix optimizer signals (#4915)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored May 28, 2024
1 parent b8e7cb5 commit 0ca1ac0
Showing 1 changed file with 21 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
#pragma once
#include "counters.h"

#include <ydb/core/formats/arrow/reader/position.h>
#include <ydb/core/tx/columnshard/blobs_action/abstract/storages_manager.h>
#include <ydb/core/tx/columnshard/common/limits.h>
#include <ydb/core/tx/columnshard/data_locks/manager/manager.h>
#include <ydb/core/tx/columnshard/engines/changes/general_compaction.h>
#include <ydb/core/tx/columnshard/engines/changes/abstract/abstract.h>
#include <ydb/core/tx/columnshard/engines/changes/general_compaction.h>
#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
#include <ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h>
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>

#include <ydb/library/accessor/accessor.h>

#include <util/generic/hash.h>
#include <util/system/types.h>
#include <util/generic/hash_set.h>
#include <ydb/core/formats/arrow/reader/position.h>
#include <util/system/types.h>

namespace NKikimr::NOlap::NStorageOptimizer::NBuckets {

Expand All @@ -30,6 +31,7 @@ class TSimplePortionsGroupInfo {
YDB_READONLY(i64, Bytes, 0);
YDB_READONLY(i64, Count, 0);
YDB_READONLY(i64, RecordsCount, 0);

public:
NJson::TJsonValue SerializeToJson() const {
NJson::TJsonValue result = NJson::JSON_MAP;
Expand Down Expand Up @@ -62,11 +64,10 @@ class TPortionsGroupInfo: public TSimplePortionsGroupInfo {
private:
using TBase = TSimplePortionsGroupInfo;
std::shared_ptr<TPortionCategoryCounters> Signals;

public:
TPortionsGroupInfo(const std::shared_ptr<TPortionCategoryCounters>& signals)
: Signals(signals)
{

: Signals(signals) {
}

void AddPortion(const std::shared_ptr<TPortionInfo>& p) {
Expand Down Expand Up @@ -286,13 +287,13 @@ class TPortionsPool {

TPortionsPool(const std::shared_ptr<TCounters>& counters, const TDuration futureDetector)
: Counters(counters)
, FutureDetector(futureDetector)
{
, FutureDetector(futureDetector) {
}

~TPortionsPool() {
for (auto&& i : Actuals) {
Counters->PortionsForMerge->RemovePortion(i.second);
Counters->ActualPortions->RemovePortion(i.second);
}
for (auto&& f : Futures) {
for (auto&& i : f.second) {
Expand Down Expand Up @@ -433,7 +434,7 @@ class TPortionsPool {
}
}

bool Remove(const std::shared_ptr<TPortionInfo>& portion) Y_WARN_UNUSED_RESULT {
[[nodiscard]] bool Remove(const std::shared_ptr<TPortionInfo>& portion) {
portion->AddRuntimeFeature(TPortionInfo::ERuntimeFeature::Optimized);
if (RemovePreActual(portion)) {
return true;
Expand Down Expand Up @@ -660,12 +661,12 @@ class TPortionsBucket: public TMoveOnly {
TPortionsBucket& Owner;
const bool IsEmptyOthers = false;
const bool HasNextBorder = false;

public:
TModificationGuard(TPortionsBucket& owner)
: Owner(owner)
, IsEmptyOthers(Owner.Others.ActualsEmpty())
, HasNextBorder(Owner.NextBorder)
{
, HasNextBorder(Owner.NextBorder) {
AFL_VERIFY_DEBUG(Owner.Validate());
}

Expand Down Expand Up @@ -710,8 +711,7 @@ class TPortionsBucket: public TMoveOnly {
TPortionsBucket(const std::shared_ptr<TPortionInfo>& portion, const std::shared_ptr<TCounters>& counters)
: MainPortion(portion)
, Counters(counters)
, Others(Counters, GetCommonFreshnessCheckDuration())
{
, Others(Counters, GetCommonFreshnessCheckDuration()) {
if (MainPortion) {
Counters->PortionsAlone->AddPortion(MainPortion);
}
Expand Down Expand Up @@ -995,13 +995,14 @@ class TPortionBuckets {
}
AddBucketToRating(insertInfo.first->second);
}

public:
TPortionBuckets(const std::shared_ptr<arrow::Schema>& primaryKeysSchema, const std::shared_ptr<IStoragesManager>& storagesManager, const std::shared_ptr<TCounters>& counters)
TPortionBuckets(const std::shared_ptr<arrow::Schema>& primaryKeysSchema, const std::shared_ptr<IStoragesManager>& storagesManager,
const std::shared_ptr<TCounters>& counters)
: PrimaryKeysSchema(primaryKeysSchema)
, StoragesManager(storagesManager)
, LeftBucket(std::make_shared<TPortionsBucket>(nullptr, counters))
, Counters(counters)
{
, Counters(counters) {
AddBucketToRating(LeftBucket);
}

Expand Down Expand Up @@ -1128,6 +1129,7 @@ class TOptimizerPlanner: public IOptimizerPlanner {
std::shared_ptr<TCounters> Counters;
TPortionBuckets Buckets;
const std::shared_ptr<IStoragesManager> StoragesManager;

protected:
virtual bool DoIsLocked(const std::shared_ptr<NDataLocks::TManager>& dataLocksManager) const override {
return Buckets.IsLocked(dataLocksManager);
Expand Down Expand Up @@ -1156,7 +1158,6 @@ class TOptimizerPlanner: public IOptimizerPlanner {
}
virtual std::shared_ptr<TColumnEngineChanges> DoGetOptimizationTask(std::shared_ptr<TGranuleMeta> granule, const std::shared_ptr<NDataLocks::TManager>& locksManager) const override {
return Buckets.BuildOptimizationTask(granule, locksManager);

}
virtual void DoActualize(const TInstant currentInstant) override {
Buckets.Actualize(currentInstant);
Expand All @@ -1175,6 +1176,7 @@ class TOptimizerPlanner: public IOptimizerPlanner {
virtual NJson::TJsonValue DoSerializeToJsonVisual() const override {
return Buckets.SerializeToJson();
}

public:
virtual std::vector<NArrow::NMerger::TSortableBatchPosition> GetBucketPositions() const override {
return Buckets.GetBucketPositions();
Expand All @@ -1184,9 +1186,8 @@ class TOptimizerPlanner: public IOptimizerPlanner {
: TBase(pathId)
, Counters(std::make_shared<TCounters>())
, Buckets(primaryKeysSchema, storagesManager, Counters)
, StoragesManager(storagesManager)
{
, StoragesManager(storagesManager) {
}
};

}
} // namespace NKikimr::NOlap::NStorageOptimizer::NBuckets

0 comments on commit 0ca1ac0

Please sign in to comment.