diff --git a/rocksdb/_rocksdb.pyx b/rocksdb/_rocksdb.pyx index 618586e..c9ae11e 100644 --- a/rocksdb/_rocksdb.pyx +++ b/rocksdb/_rocksdb.pyx @@ -35,10 +35,10 @@ from . cimport universal_compaction from .universal_compaction cimport kCompactionStopStyleSimilarSize from .universal_compaction cimport kCompactionStopStyleTotalSize -from .options cimport kCompactionStyleLevel -from .options cimport kCompactionStyleUniversal -from .options cimport kCompactionStyleFIFO -from .options cimport kCompactionStyleNone +from .advanced_options cimport kCompactionStyleLevel +from .advanced_options cimport kCompactionStyleUniversal +from .advanced_options cimport kCompactionStyleFIFO +from .advanced_options cimport kCompactionStyleNone from .slice_ cimport Slice from .status cimport Status @@ -905,7 +905,6 @@ cdef class ColumnFamilyOptions(object): self.copts.min_write_buffer_number_to_merge = value property compression_opts: - # FIXME: add missing fields. def __get__(self): cdef dict ret_ob = {} @@ -913,6 +912,9 @@ cdef class ColumnFamilyOptions(object): ret_ob['level'] = self.copts.compression_opts.level ret_ob['strategy'] = self.copts.compression_opts.strategy ret_ob['max_dict_bytes'] = self.copts.compression_opts.max_dict_bytes + ret_ob['zstd_max_train_bytes'] = self.copts.compression_opts.zstd_max_train_bytes + ret_ob['parallel_threads'] = self.copts.compression_opts.parallel_threads + ret_ob['enabled'] = self.copts.compression_opts.enabled return ret_ob @@ -928,28 +930,65 @@ cdef class ColumnFamilyOptions(object): copts.strategy = value['strategy'] if 'max_dict_bytes' in value: copts.max_dict_bytes = value['max_dict_bytes'] + if 'zstd_max_train_bytes' in value: + copts.zstd_max_train_bytes = value['zstd_max_train_bytes'] + if 'parallel_threads' in value: + copts.parallel_threads = value['parallel_threads'] + if 'enabled' in value: + copts.enabled = value['enabled'] - # FIXME: add bottommost_compression_opts + property bottommost_compression_opts: + def __get__(self): + cdef dict ret_ob = {} + + ret_ob['window_bits'] = self.copts.bottommost_compression_opts.window_bits + ret_ob['level'] = self.copts.bottommost_compression_opts.level + ret_ob['strategy'] = self.copts.bottommost_compression_opts.strategy + ret_ob['max_dict_bytes'] = self.copts.bottommost_compression_opts.max_dict_bytes + ret_ob['zstd_max_train_bytes'] = self.copts.bottommost_compression_opts.zstd_max_train_bytes + ret_ob['parallel_threads'] = self.copts.bottommost_compression_opts.parallel_threads + ret_ob['enabled'] = self.copts.bottommost_compression_opts.enabled + + return ret_ob + + def __set__(self, dict value): + cdef options.CompressionOptions* copts + copts = cython.address(self.copts.bottommost_compression_opts) + # CompressionOptions(int wbits, int _lev, int _strategy, int _max_dict_bytes) + if 'window_bits' in value: + copts.window_bits = value['window_bits'] + if 'level' in value: + copts.level = value['level'] + if 'strategy' in value: + copts.strategy = value['strategy'] + if 'max_dict_bytes' in value: + copts.max_dict_bytes = value['max_dict_bytes'] + if 'zstd_max_train_bytes' in value: + copts.zstd_max_train_bytes = value['zstd_max_train_bytes'] + if 'parallel_threads' in value: + copts.parallel_threads = value['parallel_threads'] + if 'enabled' in value: + copts.enabled = value['enabled'] property compaction_pri: def __get__(self): - if self.copts.compaction_pri == options.kByCompensatedSize: + if self.copts.compaction_pri == options.advanced_options.kByCompensatedSize: return CompactionPri.by_compensated_size - if self.copts.compaction_pri == options.kOldestLargestSeqFirst: + if self.copts.compaction_pri == options.advanced_options.kOldestLargestSeqFirst: return CompactionPri.oldest_largest_seq_first - if self.copts.compaction_pri == options.kOldestSmallestSeqFirst: + if self.copts.compaction_pri == options.advanced_options.kOldestSmallestSeqFirst: return CompactionPri.oldest_smallest_seq_first - if self.copts.compaction_pri == options.kMinOverlappingRatio: + if self.copts.compaction_pri == options.advanced_options.kMinOverlappingRatio: return CompactionPri.min_overlapping_ratio def __set__(self, value): if value == CompactionPri.by_compensated_size: - self.copts.compaction_pri = options.kByCompensatedSize + self.copts.compaction_pri = options.advanced_options.kByCompensatedSize elif value == CompactionPri.oldest_largest_seq_first: - self.copts.compaction_pri = options.kOldestLargestSeqFirst + self.copts.compaction_pri = options.advanced_options.kOldestLargestSeqFirst elif value == CompactionPri.oldest_smallest_seq_first: - self.copts.compaction_pri = options.kOldestSmallestSeqFirst + self.copts.compaction_pri = options.advanced_options.kOldestSmallestSeqFirst elif value == CompactionPri.min_overlapping_ratio: - self.copts.compaction_pri = options.kMinOverlappingRatio + self.copts.compaction_pri = options.advanced_options.kMinOverlappingRatio else: raise TypeError("Unknown compaction pri: %s" % value) @@ -1328,6 +1367,18 @@ cdef class Options(ColumnFamilyOptions): def __set__(self, value): self.opts.max_open_files = value + property max_file_opening_threads: + def __get__(self): + return self.opts.max_file_opening_threads + def __set__(self, value): + self.opts.max_file_opening_threads = value + + property max_total_wal_size: + def __get__(self): + return self.opts.max_total_wal_size + def __set__(self, value): + self.opts.max_total_wal_size = value + property use_fsync: def __get__(self): return self.opts.use_fsync @@ -1352,17 +1403,29 @@ cdef class Options(ColumnFamilyOptions): def __set__(self, value): self.opts.delete_obsolete_files_period_micros = value + property max_background_jobs: + def __get__(self): + return self.opts.max_background_jobs + def __set__(self, value): + self.opts.max_background_jobs = value + + property base_background_compactions: + def __get__(self): + return self.opts.base_background_compactions + def __set__(self, value): + self.opts.base_background_compactions = value + property max_background_compactions: def __get__(self): return self.opts.max_background_compactions def __set__(self, value): self.opts.max_background_compactions = value - property max_background_jobs: + property max_subcompactions: def __get__(self): - return self.opts.max_background_jobs + return self.opts.max_subcompactions def __set__(self, value): - self.opts.max_background_jobs = value + self.opts.max_subcompactions = value property max_background_flushes: def __get__(self): @@ -1388,6 +1451,12 @@ cdef class Options(ColumnFamilyOptions): def __set__(self, value): self.opts.keep_log_file_num = value + property recycle_log_file_num: + def __get__(self): + return self.opts.recycle_log_file_num + def __set__(self, value): + self.opts.recycle_log_file_num = value + property max_manifest_file_size: def __get__(self): return self.opts.max_manifest_file_size @@ -1418,18 +1487,6 @@ cdef class Options(ColumnFamilyOptions): def __set__(self, value): self.opts.manifest_preallocation_size = value - property enable_write_thread_adaptive_yield: - def __get__(self): - return self.opts.enable_write_thread_adaptive_yield - def __set__(self, value): - self.opts.enable_write_thread_adaptive_yield = value - - property allow_concurrent_memtable_write: - def __get__(self): - return self.opts.allow_concurrent_memtable_write - def __set__(self, value): - self.opts.allow_concurrent_memtable_write = value - property allow_mmap_reads: def __get__(self): return self.opts.allow_mmap_reads @@ -1442,6 +1499,24 @@ cdef class Options(ColumnFamilyOptions): def __set__(self, value): self.opts.allow_mmap_writes = value + property use_direct_reads: + def __get__(self): + return self.opts.use_direct_reads + def __set__(self, value): + self.opts.use_direct_reads = value + + property use_direct_io_for_flush_and_compaction: + def __get__(self): + return self.opts.use_direct_io_for_flush_and_compaction + def __set__(self, value): + self.opts.use_direct_io_for_flush_and_compaction = value + + property allow_fallocate: + def __get__(self): + return self.opts.allow_fallocate + def __set__(self, value): + self.opts.allow_fallocate = value + property is_fd_close_on_exec: def __get__(self): return self.opts.is_fd_close_on_exec @@ -1460,12 +1535,36 @@ cdef class Options(ColumnFamilyOptions): def __set__(self, value): self.opts.stats_dump_period_sec = value + property stats_persist_period_sec: + def __get__(self): + return self.opts.stats_persist_period_sec + def __set__(self, value): + self.opts.stats_persist_period_sec = value + + property persist_stats_to_disk: + def __get__(self): + return self.opts.persist_stats_to_disk + def __set__(self, value): + self.opts.persist_stats_to_disk = value + + property stats_history_buffer_size: + def __get__(self): + return self.opts.stats_history_buffer_size + def __set__(self, value): + self.opts.stats_history_buffer_size = value + property advise_random_on_open: def __get__(self): return self.opts.advise_random_on_open def __set__(self, value): self.opts.advise_random_on_open = value + property db_write_buffer_size: + def __get__(self): + return self.opts.db_write_buffer_size + def __set__(self, value): + self.opts.db_write_buffer_size = value + # TODO: need to remove -Wconversion to make this work # property access_hint_on_compaction_start: # def __get__(self): @@ -1473,6 +1572,30 @@ cdef class Options(ColumnFamilyOptions): # def __set__(self, AccessHint value): # self.opts.access_hint_on_compaction_start = value + property new_table_reader_for_compaction_inputs: + def __get__(self): + return self.opts.new_table_reader_for_compaction_inputs + def __set__(self, value): + self.opts.new_table_reader_for_compaction_inputs = value + + property compaction_readahead_size: + def __get__(self): + return self.opts.compaction_readahead_size + def __set__(self, value): + self.opts.compaction_readahead_size = value + + property random_access_max_buffer_size: + def __get__(self): + return self.opts.random_access_max_buffer_size + def __set__(self, value): + self.opts.random_access_max_buffer_size = value + + property writable_file_max_buffer_size: + def __get__(self): + return self.opts.writable_file_max_buffer_size + def __set__(self, value): + self.opts.writable_file_max_buffer_size = value + property use_adaptive_mutex: def __get__(self): return self.opts.use_adaptive_mutex @@ -1485,6 +1608,90 @@ cdef class Options(ColumnFamilyOptions): def __set__(self, value): self.opts.bytes_per_sync = value + property wal_bytes_per_sync: + def __get__(self): + return self.opts.wal_bytes_per_sync + def __set__(self, value): + self.opts.wal_bytes_per_sync = value + + property strict_bytes_per_sync: + def __get__(self): + return self.opts.strict_bytes_per_sync + def __set__(self, value): + self.opts.strict_bytes_per_sync = value + + property enable_thread_tracking: + def __get__(self): + return self.opts.enable_thread_tracking + def __set__(self, value): + self.opts.enable_thread_tracking = value + + property delayed_write_rate: + def __get__(self): + return self.opts.delayed_write_rate + def __set__(self, value): + self.opts.delayed_write_rate = value + + property enable_pipelined_write: + def __get__(self): + return self.opts.enable_pipelined_write + def __set__(self, value): + self.opts.enable_pipelined_write = value + + property unordered_write: + def __get__(self): + return self.opts.unordered_write + def __set__(self, value): + self.opts.unordered_write = value + + property allow_concurrent_memtable_write: + def __get__(self): + return self.opts.allow_concurrent_memtable_write + def __set__(self, value): + self.opts.allow_concurrent_memtable_write = value + + property enable_write_thread_adaptive_yield: + def __get__(self): + return self.opts.enable_write_thread_adaptive_yield + def __set__(self, value): + self.opts.enable_write_thread_adaptive_yield = value + + property max_write_batch_group_size_bytes: + def __get__(self): + return self.opts.max_write_batch_group_size_bytes + def __set__(self, value): + self.opts.max_write_batch_group_size_bytes = value + + property write_thread_max_yield_usec: + def __get__(self): + return self.opts.write_thread_max_yield_usec + def __set__(self, value): + self.opts.write_thread_max_yield_usec = value + + property write_thread_slow_yield_usec: + def __get__(self): + return self.opts.write_thread_slow_yield_usec + def __set__(self, value): + self.opts.write_thread_slow_yield_usec = value + + property skip_stats_update_on_db_open: + def __get__(self): + return self.opts.skip_stats_update_on_db_open + def __set__(self, value): + self.opts.skip_stats_update_on_db_open = value + + property skip_checking_sst_file_sizes_on_db_open: + def __get__(self): + return self.opts.skip_checking_sst_file_sizes_on_db_open + def __set__(self, value): + self.opts.skip_checking_sst_file_sizes_on_db_open = value + + property allow_2pc: + def __get__(self): + return self.opts.allow_2pc + def __set__(self, value): + self.opts.allow_2pc = value + property row_cache: def __get__(self): return self.py_row_cache @@ -1499,6 +1706,84 @@ cdef class Options(ColumnFamilyOptions): self.py_row_cache = value self.opts.row_cache = self.py_row_cache.get_cache() + property fail_if_options_file_error: + def __get__(self): + return self.opts.fail_if_options_file_error + def __set__(self, value): + self.opts.fail_if_options_file_error = value + + property dump_malloc_stats: + def __get__(self): + return self.opts.dump_malloc_stats + def __set__(self, value): + self.opts.dump_malloc_stats = value + + property avoid_flush_during_recovery: + def __get__(self): + return self.opts.avoid_flush_during_recovery + def __set__(self, value): + self.opts.avoid_flush_during_recovery = value + + property avoid_flush_during_shutdown: + def __get__(self): + return self.opts.avoid_flush_during_shutdown + def __set__(self, value): + self.opts.avoid_flush_during_shutdown = value + + property allow_ingest_behind: + def __get__(self): + return self.opts.allow_ingest_behind + def __set__(self, value): + self.opts.allow_ingest_behind = value + + property preserve_deletes: + def __get__(self): + return self.opts.preserve_deletes + def __set__(self, value): + self.opts.preserve_deletes = value + + property two_write_queues: + def __get__(self): + return self.opts.two_write_queues + def __set__(self, value): + self.opts.two_write_queues = value + + property manual_wal_flush: + def __get__(self): + return self.opts.manual_wal_flush + def __set__(self, value): + self.opts.manual_wal_flush = value + + property atomic_flush: + def __get__(self): + return self.opts.atomic_flush + def __set__(self, value): + self.opts.atomic_flush = value + + property avoid_unnecessary_blocking_io: + def __get__(self): + return self.opts.avoid_unnecessary_blocking_io + def __set__(self, value): + self.opts.avoid_unnecessary_blocking_io = value + + property write_dbid_to_manifest: + def __get__(self): + return self.opts.write_dbid_to_manifest + def __set__(self, value): + self.opts.write_dbid_to_manifest = value + + property log_readahead_size: + def __get__(self): + return self.opts.log_readahead_size + def __set__(self, value): + self.opts.log_readahead_size = value + + property best_efforts_recovery: + def __get__(self): + return self.opts.best_efforts_recovery + def __set__(self, value): + self.opts.best_efforts_recovery = value + # Forward declaration cdef class Snapshot diff --git a/rocksdb/advanced_options.pxd b/rocksdb/advanced_options.pxd new file mode 100644 index 0000000..a237cb1 --- /dev/null +++ b/rocksdb/advanced_options.pxd @@ -0,0 +1,121 @@ +from libcpp cimport bool as cpp_bool +from libcpp.string cimport string +from libcpp.vector cimport vector +from libc.stdint cimport uint64_t +from libc.stdint cimport uint32_t +from libc.stdint cimport int64_t +from libc.stdint cimport int32_t +from .std_memory cimport shared_ptr +from .comparator cimport Comparator +from .merge_operator cimport MergeOperator +from .logger cimport Logger +from .slice_ cimport Slice +from .snapshot cimport Snapshot +from .slice_transform cimport SliceTransform +from .table_factory cimport TableFactory +from .memtablerep cimport MemTableRepFactory +from .universal_compaction cimport CompactionOptionsUniversal +from .cache cimport Cache +from .options cimport Options +from .options cimport CompressionType +from .table_properties cimport TablePropertiesCollectorFactory + +cdef extern from "rocksdb/advanced_options.h" namespace "rocksdb": + ctypedef enum CompactionStyle: + kCompactionStyleLevel + kCompactionStyleUniversal + kCompactionStyleFIFO + kCompactionStyleNone + + ctypedef enum CompactionPri: + kByCompensatedSize + kOldestLargestSeqFirst + kOldestSmallestSeqFirst + kMinOverlappingRatio + + cdef cppclass CompactionOptionsFIFO: + uint64_t max_table_files_size + cpp_bool allow_compaction + CompactionOptionsFIFO() + CompactionOptionsFIFO(uint64_t, cpp_bool) + + cdef cppclass CompressionOptions: + int window_bits; + int level; + int strategy; + uint32_t max_dict_bytes + uint32_t zstd_max_train_bytes + uint32_t parallel_threads + cpp_bool enabled + CompressionOptions() except + + CompressionOptions(int, int, int, int, + int, int, cpp_bool) except + + + cdef enum UpdateStatus: + UPDATE_FAILED + UPDATED_INPLACE + UPDATED + + cdef cppclass AdvancedColumnFamilyOptions: + int max_write_buffer_number + int min_write_buffer_number_to_merge + int max_write_buffer_number_to_maintain + int64_t max_write_buffer_size_to_maintain + cpp_bool inplace_update_support + size_t inplace_update_num_locks + + UpdateStatus (*inplace_callback)(char*, + uint32_t*, + Slice, + string*) + double memtable_prefix_bloom_size_ratio + cpp_bool memtable_whole_key_filtering + size_t memtable_huge_page_size + shared_ptr[const SliceTransform] memtable_insert_with_hint_prefix_extractor + uint32_t bloom_locality + size_t arena_block_size + vector[CompressionType] compression_per_level + int num_levels + int level0_slowdown_writes_trigger + int level0_stop_writes_trigger + uint64_t target_file_size_base + int target_file_size_multiplier + cpp_bool level_compaction_dynamic_level_bytes + double max_bytes_for_level_multiplier + + vector[int] max_bytes_for_level_multiplier_additional + uint64_t max_compaction_bytes + uint64_t soft_pending_compaction_bytes_limit + uint64_t hard_pending_compaction_bytes_limit + + CompactionStyle compaction_style + CompactionPri compaction_pri + CompactionOptionsUniversal compaction_options_universal + + CompactionOptionsFIFO compaction_options_fifo + + uint64_t max_sequential_skip_in_iterations + shared_ptr[MemTableRepFactory] memtable_factory + vector[shared_ptr[TablePropertiesCollectorFactory]] table_properties_collector_factories + + size_t max_successive_merges + cpp_bool optimize_filters_for_hits + + cpp_bool paranoid_file_checks + cpp_bool force_consistency_checks + + cpp_bool report_bg_io_stats + uint64_t ttl + + uint64_t periodic_compaction_seconds + uint64_t sample_for_compression + + AdvancedColumnFamilyOptions(); + AdvancedColumnFamilyOptions(const Options&); + # ---------------- OPTIONS NOT SUPPORTED ANYMORE ---------------- + # But kept for compatibality as they are still in the header files. + int max_mem_compaction_level + double soft_rate_limit + double hard_rate_limit + unsigned int rate_limit_delay_max_milliseconds + cpp_bool purge_redundant_kvs_while_flush diff --git a/rocksdb/db.pxd b/rocksdb/db.pxd index e5c239c..6cc4d11 100644 --- a/rocksdb/db.pxd +++ b/rocksdb/db.pxd @@ -4,6 +4,7 @@ from .status cimport Status from libcpp cimport bool as cpp_bool from libcpp.string cimport string from libcpp.vector cimport vector +from .types cimport SequenceNumber from .slice_ cimport Slice from .snapshot cimport Snapshot from .iterator cimport Iterator @@ -39,7 +40,6 @@ cdef extern from "cpp/write_batch_iter_helper.hpp" namespace "py_rocks": cdef extern from "rocksdb/db.h" namespace "rocksdb": - ctypedef uint64_t SequenceNumber string kDefaultColumnFamilyName cdef struct LiveFileMetaData: diff --git a/rocksdb/options.pxd b/rocksdb/options.pxd index a6e4d1d..722b71a 100644 --- a/rocksdb/options.pxd +++ b/rocksdb/options.pxd @@ -11,27 +11,22 @@ from .slice_ cimport Slice from .snapshot cimport Snapshot from .slice_transform cimport SliceTransform from .table_factory cimport TableFactory -#from .statistics cimport Statistics +from .statistics cimport Statistics from .memtablerep cimport MemTableRepFactory from .universal_compaction cimport CompactionOptionsUniversal from .cache cimport Cache +from . cimport advanced_options +from .advanced_options cimport CompressionOptions +from .advanced_options cimport AdvancedColumnFamilyOptions +from .env cimport Env +from .types cimport SequenceNumber cdef extern from "rocksdb/options.h" namespace "rocksdb": - cdef cppclass CompressionOptions: - int window_bits; - int level; - int strategy; - uint32_t max_dict_bytes - # FIXME: add missing fields: max_dict_bytes, zstd_max_train_bytes, - # parallel_threads, enabled - CompressionOptions() except + - CompressionOptions(int, int, int, int) except + - - ctypedef enum CompactionStyle: - kCompactionStyleLevel - kCompactionStyleUniversal - kCompactionStyleFIFO - kCompactionStyleNone + ctypedef enum CpuPriority: + kIdle + kLow + kNormal + kHigh ctypedef enum CompressionType: kNoCompression @@ -45,38 +40,80 @@ cdef extern from "rocksdb/options.h" namespace "rocksdb": kZSTDNotFinalCompression kDisableCompressionOption - ctypedef enum ReadTier: - kReadAllTier - kBlockCacheTier + cdef cppclass ColumnFamilyOptions(AdvancedColumnFamilyOptions): + ColumnFamilyOptions* OldDefaults(int, int) + ColumnFamilyOptions* OptimizeForSmallDb(shared_ptr[Cache]*) + ColumnFamilyOptions* OptimizeForPointLookup(uint64_t) + ColumnFamilyOptions* OptimizeLevelStyleCompaction(uint64_t) + ColumnFamilyOptions* OptimizeUniversalStyleCompaction(uint64_t) + const Comparator* comparator + shared_ptr[MergeOperator] merge_operator + # TODO: compaction_filter + # TODO: compaction_filter_factory + size_t write_buffer_size + advanced_options.CompressionType compression + advanced_options.CompressionType bottommost_compression + CompressionOptions bottommost_compression_opts + advanced_options.CompactionPri compaction_pri + CompressionOptions compression_opts + int level0_file_num_compaction_trigger + shared_ptr[SliceTransform] prefix_extractor + uint64_t max_bytes_for_level_base + # Deprecated but kept here since it is in the header. + uint64_t snap_refresh_nanos + cpp_bool disable_auto_compactions + shared_ptr[TableFactory] table_factory - ctypedef enum CompactionPri: - kByCompensatedSize - kOldestLargestSeqFirst - kOldestSmallestSeqFirst - kMinOverlappingRatio + vector[DbPath] cf_paths + # TODO shared_ptr[ConcurrentTaskLimiter] compaction_thread_limiter + ColumnFamilyOptions() + ColumnFamilyOptions(const Options& options) + void Dump(Logger*) # This needs to be in _rocksdb.pxd so it will export into python - #cpdef enum AccessHint "rocksdb::DBOptions::AccessHint": - # NONE, - # NORMAL, - # SEQUENTIAL, - # WILLNEED + cpdef enum AccessHint "rocksdb::DBOptions::AccessHint": + NONE, + NORMAL, + SEQUENTIAL, + WILLNEED + + cpdef enum WALRecoveryMode: + kTolerateCorruptedTailRecords + kAbsoluteConsistency + kPointInTimeRecovery + kSkipAnyCorruptedRecords + + cdef cppclass DbPath: + string path + uint64_t target_size + + DbPath() except + + DbPath(const string&, uint64_t) except + cdef cppclass DBOptions: + DBOptions* OldDefaults(int, int) nogil except+ + DBOptions* OptimizeForSmallDb(shared_ptr[Cache]*) nogil except+ + void IncreaseParallelism(int) nogil except+ cpp_bool create_if_missing cpp_bool create_missing_column_families cpp_bool error_if_exists cpp_bool paranoid_checks - # TODO: env + Env* env + # TODO shared_ptr[RateLimiter] rate_limiter + # TODO shared_ptr[SstFileManager] sst_file_manager shared_ptr[Logger] info_log + # TODO InfoLogLevel info_log_level int max_open_files int max_file_opening_threads - #shared_ptr[Statistics] statistics + uint64_t max_total_wal_size + shared_ptr[Statistics] statistics cpp_bool use_fsync + vector[DbPath] db_paths string db_log_dir string wal_dir uint64_t delete_obsolete_files_period_micros int max_background_jobs + int base_background_compactions int max_background_compactions uint32_t max_subcompactions int max_background_flushes @@ -84,7 +121,6 @@ cdef extern from "rocksdb/options.h" namespace "rocksdb": size_t log_file_time_to_roll size_t keep_log_file_num size_t recycle_log_file_num - size_t stats_history_buffer_size uint64_t max_manifest_file_size int table_cache_numshardbits uint64_t WAL_ttl_seconds @@ -98,70 +134,96 @@ cdef extern from "rocksdb/options.h" namespace "rocksdb": cpp_bool is_fd_close_on_exec cpp_bool skip_log_error_on_recovery unsigned int stats_dump_period_sec + unsigned int stats_persist_period_sec + cpp_bool persist_stats_to_disk + size_t stats_history_buffer_size cpp_bool advise_random_on_open size_t db_write_buffer_size - # AccessHint access_hint_on_compaction_start + # TODO shared_ptr[WriteBufferManager] write_buffer_manager + AccessHint access_hint_on_compaction_start + cpp_bool new_table_reader_for_compaction_inputs + size_t compaction_readahead_size + size_t random_access_max_buffer_size + size_t writable_file_max_buffer_size cpp_bool use_adaptive_mutex + DBOptions() nogil except+ + DBOptions(const Options&) nogil except+ + void Dump(Logger*) nogil except+ + uint64_t bytes_per_sync + uint64_t wal_bytes_per_sync + cpp_bool strict_bytes_per_sync + # TODO vector[shared_ptr[EventListener]] listeners + cpp_bool enable_thread_tracking + uint64_t delayed_write_rate + cpp_bool enable_pipelined_write + cpp_bool unordered_write cpp_bool allow_concurrent_memtable_write cpp_bool enable_write_thread_adaptive_yield + uint64_t max_write_batch_group_size_bytes + uint64_t write_thread_max_yield_usec + uint64_t write_thread_slow_yield_usec + cpp_bool skip_stats_update_on_db_open + cpp_bool skip_checking_sst_file_sizes_on_db_open + WALRecoveryMode wal_recovery_mode + cpp_bool allow_2pc shared_ptr[Cache] row_cache - void IncreaseParallelism(int) nogil except+ - - cdef cppclass ColumnFamilyOptions: - ColumnFamilyOptions() - ColumnFamilyOptions(const Options& options) - const Comparator* comparator - shared_ptr[MergeOperator] merge_operator - # TODO: compaction_filter - # TODO: compaction_filter_factory - size_t write_buffer_size - int max_write_buffer_number - int min_write_buffer_number_to_merge - CompressionType compression - CompactionPri compaction_pri - # TODO: compression_per_level - shared_ptr[SliceTransform] prefix_extractor - int num_levels - int level0_file_num_compaction_trigger - int level0_slowdown_writes_trigger - int level0_stop_writes_trigger - int max_mem_compaction_level - uint64_t target_file_size_base - int target_file_size_multiplier - uint64_t max_bytes_for_level_base - double max_bytes_for_level_multiplier - vector[int] max_bytes_for_level_multiplier_additional - int expanded_compaction_factor - int source_compaction_factor - int max_grandparent_overlap_factor - cpp_bool disableDataSync - double soft_rate_limit - double hard_rate_limit - unsigned int rate_limit_delay_max_milliseconds - size_t arena_block_size - # TODO: PrepareForBulkLoad() - cpp_bool disable_auto_compactions - cpp_bool purge_redundant_kvs_while_flush - cpp_bool allow_os_buffer - cpp_bool verify_checksums_in_compaction - CompactionStyle compaction_style - CompactionOptionsUniversal compaction_options_universal - cpp_bool filter_deletes - uint64_t max_sequential_skip_in_iterations - shared_ptr[MemTableRepFactory] memtable_factory - shared_ptr[TableFactory] table_factory - # TODO: table_properties_collectors - cpp_bool inplace_update_support - size_t inplace_update_num_locks - # TODO: remove options source_compaction_factor, max_grandparent_overlap_bytes and expanded_compaction_factor from document - uint64_t max_compaction_bytes - CompressionOptions compression_opts - cpp_bool optimize_filters_for_hits - cpp_bool paranoid_file_checks + # TODO WalFilter* wal_filter + cpp_bool fail_if_options_file_error + cpp_bool dump_malloc_stats + cpp_bool avoid_flush_during_recovery + cpp_bool avoid_flush_during_shutdown + cpp_bool allow_ingest_behind + cpp_bool preserve_deletes + cpp_bool two_write_queues + cpp_bool manual_wal_flush + cpp_bool atomic_flush + cpp_bool avoid_unnecessary_blocking_io + cpp_bool write_dbid_to_manifest + size_t log_readahead_size + # TODO shared_ptr[FileChecksumGenFactory] file_checksum_gen_factory + cpp_bool best_efforts_recovery cdef cppclass Options(DBOptions, ColumnFamilyOptions): - pass + Options() except+ + Options(const DBOptions&, const ColumnFamilyOptions&) except+ + Options* OldDefaults(int, int) nogil except+ + void Dump(Logger*) nogil except+ + void DumpCFOptions(Logger*) nogil except+ + Options* PrepareForBulkLoad() nogil except+ + Options* OptimizeForSmallDb() nogil except+ + + ctypedef enum ReadTier: + kReadAllTier + kBlockCacheTier + kPersistedTier + kMemtableTier + + cdef cppclass ReadOptions: + const Snapshot* snapshot + const Slice* iterate_lower_bound + const Slice* iterate_upper_bound + size_t readahead_size + uint64_t max_skippable_internal_keys + ReadTier read_tier + cpp_bool verify_checksums + cpp_bool fill_cache + cpp_bool tailing + cpp_bool managed + cpp_bool total_order_seek + cpp_bool auto_prefix_mode + cpp_bool prefix_same_as_start + cpp_bool pin_data + cpp_bool background_purge_on_iterator_cleanup + cpp_bool ignore_range_deletions + # TODO std::function table_filter + SequenceNumber iter_start_seqnum + const Slice* timestamp + const Slice* iter_start_ts + # TODO std::chrono::microseconds deadline + uint64_t value_size_soft_limit + ReadOptions() nogil except+ + ReadOptions(cpp_bool, cpp_bool) nogil except+ cdef cppclass WriteOptions: cpp_bool sync @@ -169,23 +231,62 @@ cdef extern from "rocksdb/options.h" namespace "rocksdb": cpp_bool ignore_missing_column_families cpp_bool no_slowdown cpp_bool low_pri - - cdef cppclass ReadOptions: - cpp_bool verify_checksums - cpp_bool fill_cache - const Snapshot* snapshot - ReadTier read_tier + cpp_bool memtable_insert_hint_per_batch + const Slice* timestamp + WriteOptions() nogil except+ cdef cppclass FlushOptions: cpp_bool wait + cpp_bool allow_write_stall + FlushOptions() nogil except+ + + cdef cppclass CompactionOptions: + CompressionType compression + uint64_t output_file_size_limit + uint32_t max_subcompactions + CompactionOptions() nogil except+ ctypedef enum BottommostLevelCompaction: blc_skip "rocksdb::BottommostLevelCompaction::kSkip" blc_is_filter "rocksdb::BottommostLevelCompaction::kIfHaveCompactionFilter" blc_force "rocksdb::BottommostLevelCompaction::kForce" + blc_force_optimized "rocksdb::BottommostLevelCompaction::kForceOptimized" cdef cppclass CompactRangeOptions: + cpp_bool exclusive_manual_compaction cpp_bool change_level int target_level uint32_t target_path_id BottommostLevelCompaction bottommost_level_compaction + cpp_bool allow_write_stall + uint32_t max_subcompactions + + cdef cppclass IngestExternalFileOptions: + cpp_bool move_files + cpp_bool failed_move_fall_back_to_copy + cpp_bool snapshot_consistency + cpp_bool allow_global_seqno + cpp_bool allow_blocking_flush + cpp_bool ingest_behind + cpp_bool write_global_seqno + cpp_bool verify_checksums_before_ingest + size_t verify_checksums_readahead_size + cpp_bool verify_file_checksum + + ctypedef enum TraceFilterType: + kTraceFilterNone + kTraceFilterGet + kTraceFilterWrite + + cdef cppclass TraceOptions: + uint64_t max_trace_file_size + uint64_t sampling_frequency + uint64_t filter + + cdef cppclass ImportColumnFamilyOptions: + cpp_bool move_files + + cdef cppclass SizeApproximationOptions: + cpp_bool include_memtabtles + cpp_bool include_files + double files_size_error_margin diff --git a/rocksdb/statistics.pxd b/rocksdb/statistics.pxd index 1028c8a..fa519b1 100644 --- a/rocksdb/statistics.pxd +++ b/rocksdb/statistics.pxd @@ -1,5 +1,9 @@ -from libc.stdint cimport uint32_t, uint8_t -from .std_memory cimport shared_ptr +from libc.stdint cimport uint32_t, uint8_t, uint64_t +from libcpp.memory cimport shared_ptr +from libcpp.string cimport string +from libcpp cimport bool as cpp_bool +from libcpp.map cimport map +from .status cimport Status cdef extern from "rocksdb/statistics.h" namespace "rocksdb": ctypedef enum StatsLevel: @@ -9,5 +13,32 @@ cdef extern from "rocksdb/statistics.h" namespace "rocksdb": kExceptTimeForMutex kAll + cdef cppclass HistogramData: + double median + double percentile95 + double percentile99 + double average + double standard_deviation + double max + uint64_t count + uint64_t sum + double min + cdef cppclass Statistics: + const char* Type() nogil except+ + uint64_t getTickerCount(uint32_t) nogil except+ + void histogramData(uint32_t type, + HistogramData* const) nogil except+ + string getHistogramString(uint32_t) nogil except+ + void recordTick(uint32_t, uint64_t) nogil except+ + void setTickerCount(uint32_t tickerType, uint64_t count) nogil except+ + uint64_t getAndResetTickerCount(uint32_t) nogil except+ + void reportTimeToHistogram(uint32_t, uint64_t) nogil except+ + void measureTime(uint32_t, uint64_t) nogil except+ + void recordInHistogram(uint32_t, uint64_t) nogil except+ + Status Reset() nogil except+ + string ToString() nogil except+ + cpp_bool getTickerMap(map[string, uint64_t]*) nogil except+ + cpp_bool HistEnabledForType(uint32_t type) nogil except+ void set_stats_level(StatsLevel) nogil except+ + StatsLevel get_stats_level() nogil except+ diff --git a/rocksdb/table_properties.pxd b/rocksdb/table_properties.pxd new file mode 100644 index 0000000..ef649cc --- /dev/null +++ b/rocksdb/table_properties.pxd @@ -0,0 +1,75 @@ +from libc.stdint cimport uint32_t, uint64_t +from libcpp cimport bool as cpp_bool +from libcpp.string cimport string +from libcpp.vector cimport vector +from libcpp.map cimport map + +from .slice_ cimport Slice +from .status cimport Status +from .types cimport SequenceNumber, EntryType + + +cdef extern from "rocksdb/table_properties.h" namespace "rocksdb": + ctypedef map[string, string] UserCollectedProperties + + cdef cppclass TablePropertiesCollector: + Status Add(const Slice&, const Slice&) + Status AddUserKey(const Slice&, const Slice&, + EntryType, SequenceNumber, + uint64_t) + void BlockAdd(uint64_t, + uint64_t, + uint64_t) + + Status Finish(UserCollectedProperties*) + UserCollectedProperties GetReadableProperties() + const char* Name() + cpp_bool NeedCompact() + + cdef cppclass TablePropertiesCollectorFactory_Context "rocksdb::TablePropertiesCollectorFactory::Context": + uint32_t column_family_id + uint32_t kUnknownColumnFamily + + cdef cppclass TablePropertiesCollectorFactory: + TablePropertiesCollector* CreateTablePropertiesCollector( + TablePropertiesCollectorFactory_Context context) + const char* Name() + string ToString() + + cdef cppclass TableProperties: + uint64_t data_size + uint64_t index_size + uint64_t index_partitions + uint64_t top_level_index_size + uint64_t index_key_is_user_key + uint64_t index_value_is_delta_encoded + uint64_t filter_size + uint64_t raw_key_size + uint64_t raw_value_size + uint64_t num_data_blocks + uint64_t num_entries + uint64_t num_deletions + uint64_t num_merge_operands + uint64_t num_range_deletions + uint64_t format_version + uint64_t fixed_key_len + uint64_t column_family_id + uint64_t creation_time + uint64_t oldest_key_time + uint64_t file_creation_time + + string column_family_name + string filter_policy_name + string comparator_name + string merge_operator_name + string prefix_extractor_name + string property_collectors_names + string compression_name + string compression_options + UserCollectedProperties user_collected_properties + UserCollectedProperties readable_properties + map[string, uint64_t] properties_offsets + string ToString(const string&, + const string&) + void Add(const TableProperties&) + diff --git a/rocksdb/tests/test_options.py b/rocksdb/tests/test_options.py index 127f56b..1642c3f 100644 --- a/rocksdb/tests/test_options.py +++ b/rocksdb/tests/test_options.py @@ -37,17 +37,16 @@ class TestOptions(unittest.TestCase): # opts.merge_operator = "not an operator" # FIXME: travis test should include the latest version of rocksdb - # def test_compaction_pri(self): - # opts = rocksdb.Options() + def test_compaction_pri(self): + opts = rocksdb.Options() # default compaction_pri - # self.assertEqual(opts.compaction_pri, rocksdb.CompactionPri.by_compensated_size) - # self.assertEqual(opts.compaction_pri, rocksdb.CompactionPri.min_overlapping_ratio) - # opts.compaction_pri = rocksdb.CompactionPri.by_compensated_size - # self.assertEqual(opts.compaction_pri, rocksdb.CompactionPri.by_compensated_size) - # opts.compaction_pri = rocksdb.CompactionPri.oldest_largest_seq_first - # self.assertEqual(opts.compaction_pri, rocksdb.CompactionPri.oldest_largest_seq_first) - # opts.compaction_pri = rocksdb.CompactionPri.min_overlapping_ratio - # self.assertEqual(opts.compaction_pri, rocksdb.CompactionPri.min_overlapping_ratio) + self.assertEqual(opts.compaction_pri, rocksdb.CompactionPri.min_overlapping_ratio) + opts.compaction_pri = rocksdb.CompactionPri.by_compensated_size + self.assertEqual(opts.compaction_pri, rocksdb.CompactionPri.by_compensated_size) + opts.compaction_pri = rocksdb.CompactionPri.oldest_largest_seq_first + self.assertEqual(opts.compaction_pri, rocksdb.CompactionPri.oldest_largest_seq_first) + opts.compaction_pri = rocksdb.CompactionPri.min_overlapping_ratio + self.assertEqual(opts.compaction_pri, rocksdb.CompactionPri.min_overlapping_ratio) def test_enable_write_thread_adaptive_yield(self): opts = rocksdb.Options() @@ -67,20 +66,58 @@ def test_compression_opts(self): # default value self.assertEqual(isinstance(compression_opts, dict), True) self.assertEqual(compression_opts['window_bits'], -14) - # This doesn't match rocksdb latest - # self.assertEqual(compression_opts['level'], -1) + self.assertEqual(compression_opts['level'], 2**15 - 1) self.assertEqual(compression_opts['strategy'], 0) self.assertEqual(compression_opts['max_dict_bytes'], 0) + self.assertEqual(compression_opts['zstd_max_train_bytes'], 0) + self.assertEqual(compression_opts['parallel_threads'], 1) + self.assertEqual(compression_opts['enabled'], False) with self.assertRaises(TypeError): - opts.compression_opts = list(1,2) + opts.compression_opts = list(1, 2) + + new_opts = { + 'window_bits': 1, + 'level': 2, + 'strategy': 3, + 'max_dict_bytes': 4, + 'zstd_max_train_bytes': 15, + 'parallel_threads': 4, + 'enabled': True} + opts.compression_opts = new_opts + self.assertIsNot(new_opts, opts.compression_opts) + for key, value in new_opts.items(): + self.assertEqual(opts.compression_opts[key], value) + + def test_bottommost_compression_opts(self): + opts = rocksdb.Options() + bottommost_compression_opts = opts.bottommost_compression_opts + # default value + self.assertEqual(isinstance(bottommost_compression_opts, dict), True) + self.assertEqual(bottommost_compression_opts['window_bits'], -14) + self.assertEqual(bottommost_compression_opts['level'], 2**15 - 1) + self.assertEqual(bottommost_compression_opts['strategy'], 0) + self.assertEqual(bottommost_compression_opts['max_dict_bytes'], 0) + self.assertEqual(bottommost_compression_opts['zstd_max_train_bytes'], 0) + self.assertEqual(bottommost_compression_opts['parallel_threads'], 1) + self.assertEqual(bottommost_compression_opts['enabled'], False) - opts.compression_opts = {'window_bits': 1, 'level': 2, 'strategy': 3, 'max_dict_bytes': 4} - compression_opts = opts.compression_opts - self.assertEqual(compression_opts['window_bits'], 1) - self.assertEqual(compression_opts['level'], 2) - self.assertEqual(compression_opts['strategy'], 3) - self.assertEqual(compression_opts['max_dict_bytes'], 4) + with self.assertRaises(TypeError): + opts.compression_opts = list(1, 2) + + new_opts = { + 'window_bits': 1, + 'level': 2, + 'strategy': 3, + 'max_dict_bytes': 4, + 'zstd_max_train_bytes': 15, + 'parallel_threads': 4, + 'enabled': True, + } + opts.bottommost_compression_opts = new_opts + self.assertIsNot(new_opts, opts.bottommost_compression_opts) + for key, value in new_opts.items(): + self.assertEqual(opts.bottommost_compression_opts[key], value) def test_simple(self): opts = rocksdb.Options() @@ -165,8 +202,84 @@ def test_compaction_opts_universal(self): self.assertEqual(2, uopts['min_merge_width']) self.assertEqual(30, uopts['max_merge_width']) - def test_row_cache(self): - opts = rocksdb.Options() - self.assertIsNone(opts.row_cache) - opts.row_cache = cache = rocksdb.LRUCache(2*1024*1024) - self.assertEqual(cache, opts.row_cache) + def test_rocksdb_options(self): + NOTNONE = object() + UNSETTABLE = object() + for option, def_value, new_value in ( + ('max_open_files', NOTNONE, 10), + ('row_cache', None, rocksdb.LRUCache(2*1024*1024)), + ('max_file_opening_threads', NOTNONE, 10), + ('max_total_wal_size', NOTNONE, 10), + ('max_background_jobs', NOTNONE, 10), + ('base_background_compactions', NOTNONE, 10), + ('max_background_compactions', NOTNONE, 10), + ('max_subcompactions', NOTNONE, 10), + ('max_background_flushes', NOTNONE, 10), + ('max_log_file_size', NOTNONE, 10), + ('log_file_time_to_roll', NOTNONE, 10), + ('keep_log_file_num', 1000, 10), + ('recycle_log_file_num', NOTNONE, 10), + ('max_manifest_file_size', NOTNONE, 10), + ('table_cache_numshardbits', NOTNONE, 10), + ('wal_ttl_seconds', NOTNONE, 10), + ('wal_size_limit_mb', NOTNONE, 10), + ('manifest_preallocation_size', NOTNONE, 10), + ('allow_mmap_reads', False, True), + ('allow_mmap_writes', False, True), + ('use_direct_reads', False, True), + ('use_direct_io_for_flush_and_compaction', False, True), + ('allow_fallocate', True, False), + ('is_fd_close_on_exec', True, False), + ('skip_log_error_on_recovery', False, True), + ('stats_dump_period_sec', 600, 3600), + ('stats_persist_period_sec', 600, 3600), + ('persist_stats_to_disk', False, True), + ('stats_history_buffer_size', 1024*1024, 1024), + ('advise_random_on_open', True, False), + ('db_write_buffer_size', 0, 100), + ('new_table_reader_for_compaction_inputs', False, True), + ('compaction_readahead_size', 0, 10), + ('random_access_max_buffer_size', 1024*1024, 100), + ('writable_file_max_buffer_size', 1024*1024, 100), + ('use_adaptive_mutex', False, True), + ('bytes_per_sync', 0, 10), + ('wal_bytes_per_sync', 0, 10), + ('strict_bytes_per_sync', False, True), + ('enable_thread_tracking', False, True), + ('delayed_write_rate', 0, 10), + ('enable_pipelined_write', False, True), + ('unordered_write', False, True), + ('allow_concurrent_memtable_write', True, False), + ('enable_write_thread_adaptive_yield', True, False), + ('max_write_batch_group_size_bytes', 1 << 20, 10), + ('write_thread_max_yield_usec', 100, 200), + ('write_thread_slow_yield_usec', 3, 2000), + ('skip_stats_update_on_db_open', False, True), + ('skip_checking_sst_file_sizes_on_db_open', False, True), + ('allow_2pc', False, True), + ('fail_if_options_file_error', False, True), + ('dump_malloc_stats', False, True), + ('avoid_flush_during_recovery', False, True), + ('avoid_flush_during_shutdown', False, True), + ('allow_ingest_behind', False, True), + ('preserve_deletes', False, True), + ('two_write_queues', False, True), + ('manual_wal_flush', False, True), + ('atomic_flush', False, True), + ('avoid_unnecessary_blocking_io', False, True), + ('write_dbid_to_manifest', False, True), + ('log_readahead_size', 0, 10), + ('best_efforts_recovery', False, True), + ): + with self.subTest(option=option): + opts = rocksdb.Options() + if def_value is NOTNONE: + self.assertIsNotNone(getattr(opts, option)) + else: + self.assertEqual(def_value, getattr(opts, option)) + if new_value is UNSETTABLE: + self.assertRaises( + Exception, setattr, opts, option, new_value) + else: + setattr(opts, option, new_value) + self.assertEqual(getattr(opts, option), new_value) diff --git a/rocksdb/types.pxd b/rocksdb/types.pxd new file mode 100644 index 0000000..b8c9ca9 --- /dev/null +++ b/rocksdb/types.pxd @@ -0,0 +1,28 @@ +from libc.stdint cimport uint64_t, uint32_t +from .slice_ cimport Slice +from libcpp.string cimport string +from libcpp cimport bool as cpp_bool + +cdef extern from "rocksdb/types.h" namespace "rocksdb": + ctypedef uint64_t SequenceNumber + + cdef enum EntryType: + kEntryPut + kEntryDelete + kEntrySingleDelete + kEntryMerge + kEntryRangeDeletion + kEntryBlobIndex + kEntryOther + + cdef cppclass FullKey: + Slice user_key + SequenceNumber sequence + EntryType type + + FullKey() except+ + FullKey(const Slice&, const SequenceNumber&, EntryType) except+ + string DebugString(cpp_bool hex) nogil except+ + void clear() nogil except+ + + cpp_bool ParseFullKey(const Slice&, FullKey*)