From aeb2b07da3d19ea2a12fdb14693e0d54a4a794ca Mon Sep 17 00:00:00 2001 From: Mandar Harshe Date: Sun, 21 Nov 2021 18:44:57 +0100 Subject: [PATCH 1/4] Add compaction_filter --- rocksdb/compaction_filter.pxd | 54 +++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) create mode 100644 rocksdb/compaction_filter.pxd diff --git a/rocksdb/compaction_filter.pxd b/rocksdb/compaction_filter.pxd new file mode 100644 index 0000000..cfc901d --- /dev/null +++ b/rocksdb/compaction_filter.pxd @@ -0,0 +1,54 @@ +from libcpp cimport bool as cpp_bool +from libcpp.string cimport string +from libcpp.vector cimport vector +from libcpp.memory cimport unique_ptr +from libc.stdint cimport uint32_t +from .slice_ cimport Slice + +cdef extern from "rocksdb/compaction_filter.h" namespace "rocksdb": + cdef cppclass CompactionFilterContext: + cpp_bool is_full_compaction + cpp_bool is_manual_compaction + + cdef enum ValueType "rocksdb::CompactionFilter::ValueType": + ValueType_kValue "rocksdb::CompactionFilter::ValueType::kValue" + ValueType_kMergeOperand "rocksdb::CompactionFilter::ValueType::kMergeOperand" + ValueType_kBlobIndex "rocksdb::CompactionFilter::ValueType::kBlobIndex" + + cdef enum Decision "rocksdb::CompactionFilter::Decision": + Decision_kKeep "rocksdb::CompactionFilter::Decision::kKeep" + Decision_kRemove "rocksdb::CompactionFilter::Decision::kRemove" + Decision_kChangeValue "rocksdb::CompactionFilter::Decision::kChangeValue" + Decision_kRemoveAndSkipUntil "rocksdb::CompactionFilter::Decision::kRemoveAndSkipUntil" + + cdef enum BlobDecision "rocksdb::CompactionFilter::BlobDecision": + BlobDecision_kKeep "rocksdb::CompactionFilter::BlobDecision::kKeep" + BlobDecision_kChangeValue "rocksdb::CompactionFilter::BlobDecision::kChangeValue" + BlobDecision_kCorruption "rocksdb::CompactionFilter::BlobDecision::kCorruption" + BlobDecision_kIOError "rocksdb::CompactionFilter::BlobDecision::kIOError" + + cdef cppclass Context "rocksdb::CompactionFilter::Context": + cpp_bool is_full_compaction + cpp_bool is_manual_compaction + uint32_t column_family_id + + cdef cppclass CompactionFilter: + cpp_bool Filter(int, const Slice&, + const Slice&, + string*, + cpp_bool*) nogil except+ + cpp_bool FilterMergeOperand(int, const Slice&, + const Slice&) nogil except+ + Decision FilterV2(int level, const Slice&, ValueType, + const Slice&, string*, + string*) + BlobDecision PrepareBlobOutput(const Slice&, + const Slice&, + string*) + cpp_bool IgnoreSnapshots() + const char* Name() + + cdef cppclass CompactionFilterFactory: + unique_ptr[CompactionFilter] CreateCompactionFilter( + const Context&) + const char* Name() From 1192979dbd850180a4d78a6be4ba133e9949d747 Mon Sep 17 00:00:00 2001 From: Mandar Harshe Date: Sun, 21 Nov 2021 18:51:32 +0100 Subject: [PATCH 2/4] Add concurrent task limiter --- rocksdb/concurrent_task_limiter.pxd | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 rocksdb/concurrent_task_limiter.pxd diff --git a/rocksdb/concurrent_task_limiter.pxd b/rocksdb/concurrent_task_limiter.pxd new file mode 100644 index 0000000..b1320b4 --- /dev/null +++ b/rocksdb/concurrent_task_limiter.pxd @@ -0,0 +1,11 @@ +from libcpp.string cimport string +from libc.stdint cimport int32_t + +cdef extern from "rocksdb/concurrent_task_limiter.h" namespace "rocksdb": + cdef cppclass ConcurrentTaskLimiter: + const string& GetName() + void SetMaxOutstandingTask(int32_t) + void ResetMaxOutstandingTask() + int32_t GetOutstandingTask() + + ConcurrentTaskLimiter* NewConcurrentTaskLimiter(const string&, int32_t) From 9a1c7f9e65b7fd83b020623760f03e400af97857 Mon Sep 17 00:00:00 2001 From: Mandar Harshe Date: Sun, 21 Nov 2021 18:55:31 +0100 Subject: [PATCH 3/4] Add compaction_filter to options --- rocksdb/options.pxd | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/rocksdb/options.pxd b/rocksdb/options.pxd index 722b71a..9068ffb 100644 --- a/rocksdb/options.pxd +++ b/rocksdb/options.pxd @@ -20,6 +20,7 @@ from .advanced_options cimport CompressionOptions from .advanced_options cimport AdvancedColumnFamilyOptions from .env cimport Env from .types cimport SequenceNumber +from .compaction_filter cimport CompactionFilter, CompactionFilterFactory cdef extern from "rocksdb/options.h" namespace "rocksdb": ctypedef enum CpuPriority: @@ -48,8 +49,8 @@ cdef extern from "rocksdb/options.h" namespace "rocksdb": ColumnFamilyOptions* OptimizeUniversalStyleCompaction(uint64_t) const Comparator* comparator shared_ptr[MergeOperator] merge_operator - # TODO: compaction_filter - # TODO: compaction_filter_factory + CompactionFilter* compaction_filter + shared_ptr[CompactionFilterFactory] compaction_filter_factory size_t write_buffer_size advanced_options.CompressionType compression advanced_options.CompressionType bottommost_compression From 1d0ef3a8d9d49e59d2bab1167efe322911a67a8e Mon Sep 17 00:00:00 2001 From: Mandar Harshe Date: Sun, 21 Nov 2021 18:51:47 +0100 Subject: [PATCH 4/4] Add ConcurrentTaskLimiter to options --- rocksdb/options.pxd | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rocksdb/options.pxd b/rocksdb/options.pxd index 9068ffb..75797c3 100644 --- a/rocksdb/options.pxd +++ b/rocksdb/options.pxd @@ -21,6 +21,7 @@ from .advanced_options cimport AdvancedColumnFamilyOptions from .env cimport Env from .types cimport SequenceNumber from .compaction_filter cimport CompactionFilter, CompactionFilterFactory +from .concurrent_task_limiter cimport ConcurrentTaskLimiter cdef extern from "rocksdb/options.h" namespace "rocksdb": ctypedef enum CpuPriority: @@ -66,7 +67,7 @@ cdef extern from "rocksdb/options.h" namespace "rocksdb": shared_ptr[TableFactory] table_factory vector[DbPath] cf_paths - # TODO shared_ptr[ConcurrentTaskLimiter] compaction_thread_limiter + shared_ptr[ConcurrentTaskLimiter] compaction_thread_limiter ColumnFamilyOptions() ColumnFamilyOptions(const Options& options) void Dump(Logger*)