Skip to content

Commit

Permalink
rollback rocksdb config setter
Browse files Browse the repository at this point in the history
  • Loading branch information
Eduwer Camacaro authored and Eduwer Camacaro committed Dec 12, 2024
1 parent 687e548 commit 7336810
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ lh.canary.id=canary-default
lh.canary.topic.name=canary-beats
lh.canary.topic.creation.enable=true
lh.canary.topic.creation.timeout.ms=5000
lh.canary.topic.replicas=3
lh.canary.topic.replicas=1
lh.canary.topic.partitions=12
lh.canary.workflow.creation.enable=true
lh.canary.workflow.name=canary-workflow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.apache.kafka.streams.state.internals.BlockBasedTableConfigWithAccessibleCache;
import org.rocksdb.Cache;
import org.rocksdb.CompactionStyle;
import org.rocksdb.Options;
import org.rocksdb.RateLimiter;
import org.rocksdb.*;

import static org.rocksdb.RateLimiter.DEFAULT_FAIRNESS;
import static org.rocksdb.RateLimiter.DEFAULT_MODE;
Expand Down Expand Up @@ -48,42 +45,42 @@ public class RocksConfigSetter implements RocksDBConfigSetter {

@Override
public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
// log.trace("Overriding rocksdb settings for store {}", storeName);
//
// LHServerConfig serverConfig = (LHServerConfig) configs.get(LH_SERVER_CONFIG_KEY);
//
// BlockBasedTableConfigWithAccessibleCache tableConfig =
// (BlockBasedTableConfigWithAccessibleCache) options.tableFormatConfig();
// if (serverConfig.getGlobalRocksdbBlockCache() != null) {
// // Streams provisions a *NON-shared* 50MB cache for every RocksDB instance. Need
// // to .close() it to avoid leaks so that we can provide a global one.
// Cache oldCache = tableConfig.blockCache();
// tableConfig.setBlockCache(serverConfig.getGlobalRocksdbBlockCache());
// oldCache.close();
// }
//
// tableConfig.setOptimizeFiltersForMemory(OPTIMIZE_FILTERS_FOR_MEMORY);
// tableConfig.setBlockSize(BLOCK_SIZE);
// options.setTableFormatConfig(tableConfig);
//
// options.setOptimizeFiltersForHits(OPTIMIZE_FILTERS_FOR_HITS);
// options.setCompactionStyle(CompactionStyle.LEVEL);
//
// options.setIncreaseParallelism(serverConfig.getRocksDBCompactionThreads());
//
// // Memtable size
// options.setWriteBufferSize(
// isCoreStore(storeName) ? serverConfig.getCoreMemtableSize() : serverConfig.getTimerMemtableSize());
//
// if (serverConfig.getGlobalRocksdbWriteBufferManager() != null) {
// options.setWriteBufferManager(serverConfig.getGlobalRocksdbWriteBufferManager());
// }
// Streams default is 3
// options.setMaxWriteBufferNumber(5);
long rateBytesPerSecond = mbToBytes(120);
log.trace("Overriding rocksdb settings for store {}", storeName);

LHServerConfig serverConfig = (LHServerConfig) configs.get(LH_SERVER_CONFIG_KEY);

BlockBasedTableConfigWithAccessibleCache tableConfig =
(BlockBasedTableConfigWithAccessibleCache) options.tableFormatConfig();
if (serverConfig.getGlobalRocksdbBlockCache() != null) {
// Streams provisions a *NON-shared* 50MB cache for every RocksDB instance. Need
// to .close() it to avoid leaks so that we can provide a global one.
Cache oldCache = tableConfig.blockCache();
tableConfig.setBlockCache(serverConfig.getGlobalRocksdbBlockCache());
oldCache.close();
}

tableConfig.setOptimizeFiltersForMemory(OPTIMIZE_FILTERS_FOR_MEMORY);
tableConfig.setBlockSize(BLOCK_SIZE);
options.setTableFormatConfig(tableConfig);

options.setOptimizeFiltersForHits(OPTIMIZE_FILTERS_FOR_HITS);
options.setCompactionStyle(CompactionStyle.UNIVERSAL);

options.setIncreaseParallelism(serverConfig.getRocksDBCompactionThreads());

// Memtable size
options.setWriteBufferSize(
isCoreStore(storeName) ? serverConfig.getCoreMemtableSize() : serverConfig.getTimerMemtableSize());

if (serverConfig.getGlobalRocksdbWriteBufferManager() != null) {
options.setWriteBufferManager(serverConfig.getGlobalRocksdbWriteBufferManager());
}
// Streams default is 3
options.setMaxWriteBufferNumber(5);
long rateBytesPerSecond = mbToBytes(10);
log.info("Rate bytes per second = {}", rateBytesPerSecond);
options.setRateLimiter(new RateLimiter(rateBytesPerSecond, DEFAULT_REFILL_PERIOD_MICROS, DEFAULT_FAIRNESS,
DEFAULT_MODE, true));
DEFAULT_MODE, false));

// Future Work: Enable larger scaling by using Partitioned Index Filters
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
Expand Down

0 comments on commit 7336810

Please sign in to comment.