Skip to content

Commit

Permalink
Add new parameters CompactionBlobThreshold and CompactionBlobThreshol…
Browse files Browse the repository at this point in the history
…dSSD.

It is thresholds in bytes, if data size less threshold it will be written
 to mixed channel and to merged channel in other case.
  • Loading branch information
agalibin committed Feb 12, 2025
1 parent 0b3e0e0 commit 237b5ee
Show file tree
Hide file tree
Showing 11 changed files with 568 additions and 50 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ test-results
# Test binaries and generated dirs
*-ut
**/tests/tests
**/tests/**/cloud-blockstore-tests-loadtest-compaction

# Autogenerated util, library and contrib files
/util/all_*.cpp
Expand Down
7 changes: 7 additions & 0 deletions cloud/blockstore/config/storage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1113,4 +1113,11 @@ message TStorageServiceConfig

// Timeout for TDestroyVolumeActor (in milliseconds)
optional uint32 DestroyVolumeTimeout = 405;

// Minimum compaction data size (in bytes) that lets us write the data to
// blobstorage (as a merged blob) else we will write it to mixed channel.
optional uint32 CompactionToMergedThreshold = 406;

// Overrides CompactionToMergedThreshold for SSD volumes.
optional uint32 CompactionToMergedThresholdSSD = 407;
}
2 changes: 2 additions & 0 deletions cloud/blockstore/libs/storage/core/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,8 @@ TDuration MSeconds(ui32 value)
xxx(CalculateSplittedUsedQuotaMetric, bool, false )\
\
xxx(DestroyVolumeTimeout, TDuration, Seconds(30) )\
xxx(CompactionToMergedThreshold, ui32, 0 )\
xxx(CompactionToMergedThresholdSSD, ui32, 0 )\
// BLOCKSTORE_STORAGE_CONFIG_RW

#define BLOCKSTORE_STORAGE_CONFIG(xxx) \
Expand Down
3 changes: 3 additions & 0 deletions cloud/blockstore/libs/storage/core/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,9 @@ class TStorageConfig
bool GetAutomaticallyEnableBufferCopyingAfterChecksumMismatch() const;
[[nodiscard]] bool GetNonReplicatedVolumeDirectAcquireEnabled() const;
[[nodiscard]] TDuration GetDestroyVolumeTimeout() const;

ui32 GetCompactionToMergedThreshold() const;
ui32 GetCompactionToMergedThresholdSSD() const;
};

ui64 GetAllocationUnit(
Expand Down
11 changes: 11 additions & 0 deletions cloud/blockstore/libs/storage/core/proto_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,17 @@ ui32 GetWriteBlobThreshold(
return config.GetWriteBlobThreshold();
}

ui32 GetCompactionToMergedThreshold(
const TStorageConfig& config,
const NCloud::NProto::EStorageMediaKind mediaKind)
{
if (mediaKind == NCloud::NProto::STORAGE_MEDIA_SSD) {
return config.GetCompactionToMergedThresholdSSD();
}

return config.GetCompactionToMergedThreshold();
}

bool CompareVolumeConfigs(
const NKikimrBlockStore::TVolumeConfig& prevConfig,
const NKikimrBlockStore::TVolumeConfig& newConfig)
Expand Down
4 changes: 4 additions & 0 deletions cloud/blockstore/libs/storage/core/proto_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ ui32 GetWriteBlobThreshold(
const TStorageConfig& config,
const NCloud::NProto::EStorageMediaKind mediaKind);

ui32 GetCompactionToMergedThreshold(
const TStorageConfig& config,
const NCloud::NProto::EStorageMediaKind mediaKind);

inline bool RequiresCheckpointSupport(const NProto::TReadBlocksRequest& request)
{
return !request.GetCheckpointId().empty();
Expand Down
134 changes: 84 additions & 50 deletions cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <cloud/blockstore/libs/service/request_helpers.h>
#include <cloud/blockstore/libs/storage/core/config.h>
#include <cloud/blockstore/libs/storage/core/probes.h>
#include <cloud/blockstore/libs/storage/core/proto_helpers.h>

#include <cloud/storage/core/libs/common/alloc.h>
#include <cloud/storage/core/libs/common/block_buffer.h>
Expand Down Expand Up @@ -40,6 +41,7 @@ struct TRangeCompactionInfo
const ui32 BlobsSkippedByCompaction;
const ui32 BlocksSkippedByCompaction;
const TVector<ui32> BlockChecksums;
const EChannelDataKind ChannelDataKind;

TGuardedBuffer<TBlockBuffer> BlobContent;
TVector<ui32> ZeroBlocks;
Expand All @@ -59,6 +61,7 @@ struct TRangeCompactionInfo
ui32 blobsSkippedByCompaction,
ui32 blocksSkippedByCompaction,
TVector<ui32> blockChecksums,
EChannelDataKind channelDataKind,
TBlockBuffer blobContent,
TVector<ui32> zeroBlocks,
TAffectedBlobs affectedBlobs,
Expand All @@ -72,6 +75,7 @@ struct TRangeCompactionInfo
, BlobsSkippedByCompaction(blobsSkippedByCompaction)
, BlocksSkippedByCompaction(blocksSkippedByCompaction)
, BlockChecksums(std::move(blockChecksums))
, ChannelDataKind(channelDataKind)
, BlobContent(std::move(blobContent))
, ZeroBlocks(std::move(zeroBlocks))
, AffectedBlobs(std::move(affectedBlobs))
Expand Down Expand Up @@ -549,6 +553,7 @@ void TCompactionActor::WriteBlobs(const TActorContext& ctx)

void TCompactionActor::AddBlobs(const TActorContext& ctx)
{
TVector<TAddMixedBlob> mixedBlobs;
TVector<TAddMergedBlob> mergedBlobs;
TVector<TMergedBlobCompactionInfo> blobCompactionInfos;
TAffectedBlobs affectedBlobs;
Expand All @@ -560,7 +565,8 @@ void TCompactionActor::AddBlobs(const TActorContext& ctx)
TBlockMask skipMask,
const TVector<ui32>& blockChecksums,
ui32 blobsSkipped,
ui32 blocksSkipped)
ui32 blocksSkipped,
EChannelDataKind channelDataKind)
{
while (skipMask.Get(range.End - range.Start)) {
Y_ABORT_UNLESS(range.End > range.Start);
Expand All @@ -571,9 +577,27 @@ void TCompactionActor::AddBlobs(const TActorContext& ctx)
--range.End;
}

mergedBlobs.emplace_back(blobId, range, skipMask, blockChecksums);

blobCompactionInfos.push_back({blobsSkipped, blocksSkipped});
if (channelDataKind == EChannelDataKind::Merged) {
mergedBlobs.emplace_back(blobId, range, skipMask, blockChecksums);
blobCompactionInfos.push_back({blobsSkipped, blocksSkipped});
} else if (channelDataKind == EChannelDataKind::Mixed) {
TVector<ui32> blockIndexes(Reserve(range.Size()));
for (auto blockIndex = range.Start; blockIndex <= range.End;
++blockIndex)
{
if (!skipMask.Get(blockIndex - range.Start)) {
blockIndexes.emplace_back(blockIndex);
}
}
mixedBlobs.emplace_back(blobId, std::move(blockIndexes), blockChecksums);
} else {
LOG_ERROR(
ctx,
TBlockStoreComponents::PARTITION,
"[%lu] unexpected channel data kind %u",
TabletId,
static_cast<int>(channelDataKind));
}
};

for (auto& rc: RangeCompactionInfos) {
Expand All @@ -584,7 +608,8 @@ void TCompactionActor::AddBlobs(const TActorContext& ctx)
rc.DataBlobSkipMask,
rc.BlockChecksums,
rc.BlobsSkippedByCompaction,
rc.BlocksSkippedByCompaction);
rc.BlocksSkippedByCompaction,
rc.ChannelDataKind);
}

if (rc.ZeroBlobId) {
Expand All @@ -602,7 +627,8 @@ void TCompactionActor::AddBlobs(const TActorContext& ctx)
rc.ZeroBlobSkipMask,
rc.BlockChecksums,
blobsSkipped,
blocksSkipped);
blocksSkipped,
rc.ChannelDataKind);
}

if (rc.DataBlobId && rc.ZeroBlobId) {
Expand Down Expand Up @@ -664,7 +690,7 @@ void TCompactionActor::AddBlobs(const TActorContext& ctx)
auto request = std::make_unique<TEvPartitionPrivate::TEvAddBlobsRequest>(
RequestInfo->CallContext,
CommitId,
TVector<TAddMixedBlob>(),
std::move(mixedBlobs),
std::move(mergedBlobs),
TVector<TAddFreshBlob>(),
ADD_COMPACTION_RESULT,
Expand Down Expand Up @@ -1481,9 +1507,7 @@ namespace {

void PrepareRangeCompaction(
const TStorageConfig& config,
const TString& cloudId,
const TString& folderId,
const TString& diskId,
const bool incrementalCompactionEnabled,
const ui64 commitId,
const bool fullCompaction,
const TActorContext& ctx,
Expand All @@ -1494,19 +1518,10 @@ void PrepareRangeCompaction(
TPartitionState& state,
TTxPartition::TRangeCompaction& args)
{
const bool incrementalCompactionEnabledForCloud =
config.IsIncrementalCompactionFeatureEnabled(cloudId, folderId, diskId);
const bool incrementalCompactionEnabled =
config.GetIncrementalCompactionEnabled()
|| incrementalCompactionEnabledForCloud;

TCompactionBlockVisitor visitor(args, commitId);
state.FindFreshBlocks(visitor, args.BlockRange, commitId);
visitor.KeepTrackOfAffectedBlocks = true;
ready &= state.FindMixedBlocksForCompaction(
db,
visitor,
args.RangeIdx);
ready &= state.FindMixedBlocksForCompaction(db, visitor, args.RangeIdx);
visitor.KeepTrackOfAffectedBlocks = false;
ready &= db.FindMergedBlocks(
visitor,
Expand All @@ -1528,10 +1543,7 @@ void PrepareRangeCompaction(
}
}

if (ready
&& incrementalCompactionEnabled
&& !fullCompaction)
{
if (ready && incrementalCompactionEnabled && !fullCompaction) {
THashMap<TPartialBlobId, ui32, TPartialBlobIdHash> liveBlocks;
for (const auto& m: args.BlockMarks) {
if (m.CommitId && m.BlobId) {
Expand All @@ -1546,21 +1558,19 @@ void PrepareRangeCompaction(
}

Sort(
blobIds.begin(),
blobIds.end(),
[&] (const TPartialBlobId& l, const TPartialBlobId& r) {
return liveBlocks[l] < liveBlocks[r];
}
);
blobIds,
[&](const TPartialBlobId& l, const TPartialBlobId& r)
{ return liveBlocks[l] < liveBlocks[r]; });

auto it = blobIds.begin();
args.BlobsSkipped = blobIds.size();
ui32 blocks = 0;

while (it != blobIds.end()) {
const auto bytes = blocks * state.GetBlockSize();
const auto blobCountOk = args.BlobsSkipped
<= config.GetMaxSkippedBlobsDuringCompaction();
const auto blobCountOk =
args.BlobsSkipped <=
config.GetMaxSkippedBlobsDuringCompaction();
const auto byteCountOk =
bytes >= config.GetTargetCompactionBytesPerOp();

Expand Down Expand Up @@ -1608,6 +1618,7 @@ void PrepareRangeCompaction(
}

if (liveBlocks.size()) {
// TODO: need make UTs
TAffectedBlocks affectedBlocks;
for (const auto& b: args.AffectedBlocks) {
if (!skippedBlockIndices.contains(b.BlockIndex)) {
Expand Down Expand Up @@ -1652,6 +1663,7 @@ void PrepareRangeCompaction(

void CompleteRangeCompaction(
const bool blobPatchingEnabled,
const ui32 compactionThreshold,
const ui64 commitId,
TTabletStorageInfo& tabletStorageInfo,
TPartitionState& state,
Expand All @@ -1666,11 +1678,14 @@ void CompleteRangeCompaction(

// at first we count number of data blocks
size_t dataBlocksCount = 0, zeroBlocksCount = 0;

for (const auto& mark: args.BlockMarks) {
if (mark.CommitId) {
const bool isFresh = !mark.BlockContent.Empty();
const bool isMixedOrMerged = !IsDeletionMarker(mark.BlobId);
// there could be fresh block OR merged/mixed block
Y_ABORT_UNLESS(!(mark.BlockContent && !IsDeletionMarker(mark.BlobId)));
if (mark.BlockContent || !IsDeletionMarker(mark.BlobId)) {
Y_ABORT_UNLESS(!(isFresh && isMixedOrMerged));
if (isFresh || isMixedOrMerged) {
++dataBlocksCount;
} else {
++zeroBlocksCount;
Expand All @@ -1682,19 +1697,26 @@ void CompleteRangeCompaction(
TPartialBlobId dataBlobId, zeroBlobId;
TBlockMask dataBlobSkipMask, zeroBlobSkipMask;

auto channelDataKind = EChannelDataKind::Merged;
if (dataBlocksCount) {
ui32 skipped = 0;
for (const auto& mark: args.BlockMarks) {
if (!mark.BlockContent && IsDeletionMarker(mark.BlobId)) {
const bool isFresh = !mark.BlockContent.Empty();
const bool isMixedOrMerged = !IsDeletionMarker(mark.BlobId);
if (!isFresh && !isMixedOrMerged) {
++skipped;
}
}

const auto blobSize = (args.BlockRange.Size() - skipped) * state.GetBlockSize();
if (blobSize < compactionThreshold) {
channelDataKind = EChannelDataKind::Mixed;
}
dataBlobId = state.GenerateBlobId(
EChannelDataKind::Merged,
channelDataKind,
compactionPermissions,
commitId,
(args.BlockRange.Size() - skipped) * state.GetBlockSize(),
blobSize,
result.size());
}

Expand All @@ -1706,7 +1728,7 @@ void CompleteRangeCompaction(
// compaction range but for the last actual block that's referenced by
// the corresponding blob
zeroBlobId = state.GenerateBlobId(
EChannelDataKind::Merged,
channelDataKind,
compactionPermissions,
commitId,
0,
Expand Down Expand Up @@ -1789,14 +1811,15 @@ void CompleteRangeCompaction(
zeroBlobSkipMask.Set(blockIndex - args.BlockRange.Start);
}

if (!patchingCandidate
&& blobPatchingEnabled
&& mark.BlobId.BlobSize() == dataBlobId.BlobSize())
{
patchingCandidate = mark.BlobId;
++patchingCandidateChangedBlockCount;
} else if (patchingCandidate == mark.BlobId) {
++patchingCandidateChangedBlockCount;
if (blobPatchingEnabled) {
if (!patchingCandidate &&
mark.BlobId.BlobSize() == dataBlobId.BlobSize())
{
patchingCandidate = mark.BlobId;
++patchingCandidateChangedBlockCount;
} else if (patchingCandidate == mark.BlobId) {
++patchingCandidateChangedBlockCount;
}
}
} else {
dataBlobSkipMask.Set(blockIndex - args.BlockRange.Start);
Expand Down Expand Up @@ -1877,6 +1900,7 @@ void CompleteRangeCompaction(
args.BlobsSkipped,
args.BlocksSkipped,
std::move(blockChecksums),
channelDataKind,
std::move(blobContent),
std::move(zeroBlocks),
std::move(args.AffectedBlobs),
Expand All @@ -1901,18 +1925,25 @@ bool TPartitionActor::PrepareCompaction(
TRequestScope timer(*args.RequestInfo);
TPartitionDatabase db(tx.DB);

const bool incrementalCompactionEnabled =
Config->GetIncrementalCompactionEnabled() ||
Config->IsIncrementalCompactionFeatureEnabled(
PartitionConfig.GetCloudId(),
PartitionConfig.GetFolderId(),
PartitionConfig.GetDiskId());
const bool fullCompaction =
args.CompactionOptions.test(ToBit(ECompactionOption::Full));

bool ready = true;

THashSet<TPartialBlobId, TPartialBlobIdHash> affectedBlobIds;

for (auto& rangeCompaction: args.RangeCompactions) {
PrepareRangeCompaction(
*Config,
PartitionConfig.GetCloudId(),
PartitionConfig.GetFolderId(),
PartitionConfig.GetDiskId(),
incrementalCompactionEnabled,
args.CommitId,
args.CompactionOptions.test(ToBit(ECompactionOption::Full)),
fullCompaction,
ctx,
TabletID(),
affectedBlobIds,
Expand Down Expand Up @@ -1967,6 +1998,9 @@ void TPartitionActor::CompleteCompaction(

CompleteRangeCompaction(
blobPatchingEnabled,
GetCompactionToMergedThreshold(
*Config,
PartitionConfig.GetStorageMediaKind()),
args.CommitId,
*Info(),
*State,
Expand Down
Loading

0 comments on commit 237b5ee

Please sign in to comment.