Skip to content

Commit

Permalink
[#26332] docdb: Vector index compaction support in yb-admin
Browse files Browse the repository at this point in the history
Summary:
The code base is updated to allow vector index compaction be triggered by the following command:
```
yb-admin compact_table <vector index name>
yb-admin compact_table_by_id <vector index id>
```
These commands trigger only vector index compaction and no compaction is triggered for the indexable
table. On the other hand, triggering compaction on the indexable table will not trigger compaction on
vector indexes -- this approach is selected to prevent undesirable effects of long vector index
compactions (because they are not yet optimized).

Additionally, `yb-admin flush_table <vector index>` is changed to trigger a flush only for vector
index and no flushes are triggered for the indexable table. Previously, the given command was
acting as if the flush was triggered for the indexable table.

**Upgrade/Rollback safety:**
It is safe to upgrade and rollback as the feature is not released. Also the presence of the values
are ignored by old releases which is acceptable behaviour (same is without the change).
Jira: DB-15678

Test Plan: Jenkins

Reviewers: sergei, slingam

Reviewed By: sergei

Subscribers: ybase

Differential Revision: https://phorge.dev.yugabyte.com/D42404
  • Loading branch information
arybochkin committed Mar 10, 2025
1 parent 4bd8417 commit 12d3543
Show file tree
Hide file tree
Showing 17 changed files with 274 additions and 58 deletions.
4 changes: 4 additions & 0 deletions src/yb/common/entity_ids_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@

#pragma once

#include <memory>
#include <string>
#include <vector>
#include <unordered_set>

#include "yb/util/strongly_typed_string.h"
Expand Down Expand Up @@ -52,5 +54,7 @@ using FlushRequestId = std::string;
using RedisConfigKey = std::string;

using TableIdSet = std::unordered_set<TableId>;
using TableIds = std::vector<TableId>;
using TableIdsPtr = std::shared_ptr<TableIds>;

} // namespace yb
5 changes: 5 additions & 0 deletions src/yb/docdb/doc_vector_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,11 @@ class DocVectorIndexImpl : public DocVectorIndex {
return EncodeDistance(lsm_.Distance(lhs_vec, rhs_vec));
}

Status Compact() override {
LOG_WITH_FUNC(WARNING) << "Vector index compaction is not supported yet";
return Status::OK();
}

Status Flush() override {
return lsm_.Flush(false);
}
Expand Down
1 change: 1 addition & 0 deletions src/yb/docdb/doc_vector_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class DocVectorIndex {
virtual Result<DocVectorIndexSearchResult> Search(
Slice vector, const vector_index::SearchOptions& options) = 0;
virtual Result<EncodedDistance> Distance(Slice lhs, Slice rhs) = 0;
virtual Status Compact() = 0;
virtual Status Flush() = 0;
virtual Status WaitForFlush() = 0;
virtual docdb::ConsensusFrontierPtr GetFlushedFrontier() = 0;
Expand Down
3 changes: 2 additions & 1 deletion src/yb/integration-tests/compaction-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1835,7 +1835,8 @@ class FullCompactionMonitoringTest : public CompactionTest {
for (const auto& tablet_peer : tablet_peers) {
workload_tablet_ptrs.push_back(tablet_peer->shared_tablet());
}
ASSERT_OK(ts_tablet_manager->TriggerAdminCompaction(workload_tablet_ptrs, should_wait));
ASSERT_OK(ts_tablet_manager->TriggerAdminCompaction(
workload_tablet_ptrs, AdminCompactionOptions{ should_wait }));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class TServerFullCompactionStatusMetricsHeartbeatDataProviderITest
test_tablet_ptrs.push_back(tablet_peer->shared_tablet());
}
ASSERT_OK(ts_tablet_manager->TriggerAdminCompaction(
test_tablet_ptrs, should_wait /* should_wait */));
test_tablet_ptrs, AdminCompactionOptions { should_wait }));
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/yb/master/async_flush_tablets_task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ bool AsyncFlushTablets::SendRequest(int attempt) {
}
req.set_regular_only(regular_only_);

if (table()->is_vector_index()) {
req.add_vector_index_ids(table()->id());
}

response_handling_ = false;
ts_admin_proxy_->FlushTabletsAsync(req, &resp_, &rpc_, BindRpcCallback());
VLOG(1) << "Send flush tablets request to " << permanent_uuid_
Expand Down
7 changes: 6 additions & 1 deletion src/yb/master/catalog_entity_info.cc
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,11 @@ bool TableInfo::is_unique_index() const {
: l->pb.is_unique_index();
}

bool TableInfo::is_vector_index() const {
auto l = LockForRead();
return l->is_vector_index();
}

Result<uint32_t> TableInfo::GetPgRelfilenodeOid() const {
return GetPgsqlTableOid(id());
}
Expand Down Expand Up @@ -1197,7 +1202,7 @@ bool PersistentTableInfo::is_index() const {
}

bool PersistentTableInfo::is_vector_index() const {
return pb.index_info().has_vector_idx_options();
return pb.has_index_info() && pb.index_info().has_vector_idx_options();
}

const std::string& PersistentTableInfo::indexed_table_id() const {
Expand Down
1 change: 1 addition & 0 deletions src/yb/master/catalog_entity_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,7 @@ class TableInfo : public RefCountedThreadSafe<TableInfo>,
// For index table
bool is_local_index() const;
bool is_unique_index() const;
bool is_vector_index() const;

void set_is_system() { is_system_ = true; }
bool is_system() const { return is_system_; }
Expand Down
30 changes: 18 additions & 12 deletions src/yb/tablet/tablet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4548,8 +4548,7 @@ Status Tablet::TriggerManualCompactionIfNeeded(rocksdb::CompactionReason compact
std::bind(&Tablet::TriggerManualCompactionSync, this, compaction_reason));
}

Status Tablet::TriggerAdminFullCompactionIfNeededHelper(
std::function<void()> on_compaction_completion) {
Status Tablet::TriggerAdminFullCompactionIfNeeded(const AdminCompactionOptions& options) {
if (!admin_triggered_compaction_pool_ || state_ != State::kOpen) {
return STATUS(ServiceUnavailable, "Admin triggered compaction thread pool unavailable.");
}
Expand All @@ -4560,19 +4559,26 @@ Status Tablet::TriggerAdminFullCompactionIfNeededHelper(
admin_triggered_compaction_pool_->NewToken(ThreadPool::ExecutionMode::SERIAL);
}

return admin_full_compaction_task_pool_token_->SubmitFunc([this, on_compaction_completion]() {
TriggerManualCompactionSync(rocksdb::CompactionReason::kAdminCompaction);
on_compaction_completion();
return admin_full_compaction_task_pool_token_->SubmitFunc([this, options]() {
// TODO(vector_index): since full vector index compaction is not optimizaed and may take a
// significat amount of time, let's trigger it separately from regular manual compaction.
// This logic should be revised later.
if (options.vector_index_ids) {
TriggerVectorIndexCompactionSync(*options.vector_index_ids);
} else {
TriggerManualCompactionSync(rocksdb::CompactionReason::kAdminCompaction);
}
if (options.compaction_completion_callback) {
options.compaction_completion_callback();
}
});
}

Status Tablet::TriggerAdminFullCompactionIfNeeded() {
return TriggerAdminFullCompactionIfNeededHelper();
}

Status Tablet::TriggerAdminFullCompactionWithCallbackIfNeeded(
std::function<void()> on_compaction_completion) {
return TriggerAdminFullCompactionIfNeededHelper(on_compaction_completion);
void Tablet::TriggerVectorIndexCompactionSync(const TableIds& vector_index_ids) {
LOG_WITH_PREFIX_AND_FUNC(INFO) << "vectors index ids: " << AsString(vector_index_ids);
tablet::VectorIndexList{
vector_index_ids.empty() ? vector_indexes().List() : vector_indexes().Collect(vector_index_ids)
}.Compact();
}

void Tablet::TriggerManualCompactionSync(rocksdb::CompactionReason reason) {
Expand Down
15 changes: 7 additions & 8 deletions src/yb/tablet/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ using AddTableListener = std::function<Status(const TableInfo&)>;
YB_STRONGLY_TYPED_BOOL(AllowBootstrappingState);
YB_STRONGLY_TYPED_BOOL(ResetSplit);

struct AdminCompactionOptions {
std::function<void()> compaction_completion_callback;
TableIdsPtr vector_index_ids;
};

struct TabletScopedRWOperationPauses {
ScopedRWOperationPause blocking_rocksdb_shutdown_start;
ScopedRWOperationPause not_blocking_rocksdb_shutdown_start;
Expand Down Expand Up @@ -812,11 +817,7 @@ class Tablet : public AbstractTablet,
Status TriggerManualCompactionIfNeeded(rocksdb::CompactionReason reason);

// Triggers an admin full compaction on this tablet.
Status TriggerAdminFullCompactionIfNeeded();
// Triggers an admin full compaction on this tablet with a callback to execute once the compaction
// completes.
Status TriggerAdminFullCompactionWithCallbackIfNeeded(
std::function<void()> on_compaction_completion);
Status TriggerAdminFullCompactionIfNeeded(const AdminCompactionOptions& options);

bool HasActiveFullCompaction();

Expand Down Expand Up @@ -1043,6 +1044,7 @@ class Tablet : public AbstractTablet,
size_t row_count) const;

void TriggerManualCompactionSync(rocksdb::CompactionReason reason);
void TriggerVectorIndexCompactionSync(const TableIds& vector_index_ids);

Status ForceRocksDBCompact(
const rocksdb::CompactRangeOptions& regular_options,
Expand All @@ -1066,9 +1068,6 @@ class Tablet : public AbstractTablet,
template <class PB>
Result<IsolationLevel> DoGetIsolationLevel(const PB& transaction);

Status TriggerAdminFullCompactionIfNeededHelper(
std::function<void()> on_compaction_completion = []() {});

Status GetTabletKeyRanges(
Slice lower_bound_key, Slice upper_bound_key, uint64_t max_num_ranges,
uint64_t range_size_bytes, Direction direction, uint32_t max_key_length,
Expand Down
39 changes: 37 additions & 2 deletions src/yb/tablet/tablet_vector_indexes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -388,8 +388,8 @@ void TabletVectorIndexes::CompleteShutdown(std::vector<std::string>& out_paths)
// Wait actual shutdown.
}

docdb::DocVectorIndexPtr TabletVectorIndexes::IndexForTable(const TableId& table_id) const {
SharedLock lock(vector_indexes_mutex_);

docdb::DocVectorIndexPtr TabletVectorIndexes::IndexForTableUnlocked(const TableId& table_id) const {
auto it = vector_indexes_map_.find(table_id);
if (it != vector_indexes_map_.end()) {
return it->second;
Expand All @@ -402,6 +402,31 @@ docdb::DocVectorIndexPtr TabletVectorIndexes::IndexForTable(const TableId& table
return nullptr;
}

docdb::DocVectorIndexPtr TabletVectorIndexes::IndexForTable(const TableId& table_id) const {
SharedLock lock(vector_indexes_mutex_);
return IndexForTableUnlocked(table_id);
}

docdb::DocVectorIndexesPtr TabletVectorIndexes::Collect(const std::vector<TableId>& table_ids) {
if (table_ids.empty() || !has_vector_indexes_.load(std::memory_order_acquire)) {
return nullptr;
}

auto result = std::make_shared<docdb::DocVectorIndexes>();
result->reserve(table_ids.size());
{
SharedLock lock(vector_indexes_mutex_);
for (const auto& table_id : table_ids) {
auto index = IndexForTableUnlocked(table_id);
if (!index) {
return nullptr;
}
result->push_back(std::move(index));
}
}
return result;
}

docdb::DocVectorIndexesPtr TabletVectorIndexes::List() const {
if (!has_vector_indexes_.load(std::memory_order_acquire)) {
return nullptr;
Expand Down Expand Up @@ -495,6 +520,16 @@ void VectorIndexList::Flush() {
}
}

void VectorIndexList::Compact() {
if (!list_) {
return;
}

for (const auto& index : *list_) {
WARN_NOT_OK(index->Compact(), "Compact vector index");
}
}

std::string VectorIndexList::ToString() const {
return list_ ? AsString(*list_) : AsString(list_);
}
Expand Down
11 changes: 11 additions & 0 deletions src/yb/tablet/tablet_vector_indexes.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class VectorIndexList {
VectorIndexList() = default;
explicit VectorIndexList(docdb::DocVectorIndexesPtr list) : list_(std::move(list)) {}

void Compact();
void Flush();
Status WaitForFlush();

Expand Down Expand Up @@ -62,7 +63,14 @@ class TabletVectorIndexes : public TabletComponent {
Status CreateIndex(
const TableInfo& index_table, const TableInfoPtr& indexed_table, bool bootstrap)
EXCLUDES(vector_indexes_mutex_);

// Returns a collection of vector indexes for the given vector index table ids. Returns nullptr
// if at least one vector indexes is not found by the give table id. The order of vector indexes
// in the returned collection is not guaranteed to be preserved.
docdb::DocVectorIndexesPtr Collect(const std::vector<TableId>& table_ids);

docdb::DocVectorIndexesPtr List() const EXCLUDES(vector_indexes_mutex_);

void LaunchBackfillsIfNecessary();
void CompleteShutdown(std::vector<std::string>& out_paths);
std::optional<google::protobuf::RepeatedPtrField<std::string>> FinishedBackfills();
Expand Down Expand Up @@ -90,6 +98,9 @@ class TabletVectorIndexes : public TabletComponent {
const TableInfo& index_table, const TableInfoPtr& indexed_table, bool allow_inplace_insert)
REQUIRES(vector_indexes_mutex_);

docdb::DocVectorIndexPtr IndexForTableUnlocked(
const TableId& table_id) const REQUIRES_SHARED(vector_indexes_mutex_);

const VectorIndexThreadPoolProvider thread_pool_provider_;

std::atomic<bool> has_vector_indexes_{false};
Expand Down
Loading

0 comments on commit 12d3543

Please sign in to comment.