diff --git a/canary/src/main/resources/META-INF/microprofile-config.properties b/canary/src/main/resources/META-INF/microprofile-config.properties index d2c1db5ec..e409f979e 100644 --- a/canary/src/main/resources/META-INF/microprofile-config.properties +++ b/canary/src/main/resources/META-INF/microprofile-config.properties @@ -7,7 +7,7 @@ lh.canary.id=canary-default lh.canary.topic.name=canary-beats lh.canary.topic.creation.enable=true lh.canary.topic.creation.timeout.ms=5000 -lh.canary.topic.replicas=3 +lh.canary.topic.replicas=1 lh.canary.topic.partitions=12 lh.canary.workflow.creation.enable=true lh.canary.workflow.name=canary-workflow diff --git a/server/src/main/java/io/littlehorse/common/util/RocksConfigSetter.java b/server/src/main/java/io/littlehorse/common/util/RocksConfigSetter.java index c6470e7b0..06b8b877f 100644 --- a/server/src/main/java/io/littlehorse/common/util/RocksConfigSetter.java +++ b/server/src/main/java/io/littlehorse/common/util/RocksConfigSetter.java @@ -5,10 +5,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.state.RocksDBConfigSetter; import org.apache.kafka.streams.state.internals.BlockBasedTableConfigWithAccessibleCache; -import org.rocksdb.Cache; -import org.rocksdb.CompactionStyle; -import org.rocksdb.Options; -import org.rocksdb.RateLimiter; +import org.rocksdb.*; import static org.rocksdb.RateLimiter.DEFAULT_FAIRNESS; import static org.rocksdb.RateLimiter.DEFAULT_MODE; @@ -48,42 +45,42 @@ public class RocksConfigSetter implements RocksDBConfigSetter { @Override public void setConfig(final String storeName, final Options options, final Map configs) { -// log.trace("Overriding rocksdb settings for store {}", storeName); -// -// LHServerConfig serverConfig = (LHServerConfig) configs.get(LH_SERVER_CONFIG_KEY); -// -// BlockBasedTableConfigWithAccessibleCache tableConfig = -// (BlockBasedTableConfigWithAccessibleCache) options.tableFormatConfig(); -// if (serverConfig.getGlobalRocksdbBlockCache() != null) { -// // Streams provisions a *NON-shared* 50MB cache for every RocksDB instance. Need -// // to .close() it to avoid leaks so that we can provide a global one. -// Cache oldCache = tableConfig.blockCache(); -// tableConfig.setBlockCache(serverConfig.getGlobalRocksdbBlockCache()); -// oldCache.close(); -// } -// -// tableConfig.setOptimizeFiltersForMemory(OPTIMIZE_FILTERS_FOR_MEMORY); -// tableConfig.setBlockSize(BLOCK_SIZE); -// options.setTableFormatConfig(tableConfig); -// -// options.setOptimizeFiltersForHits(OPTIMIZE_FILTERS_FOR_HITS); -// options.setCompactionStyle(CompactionStyle.LEVEL); -// -// options.setIncreaseParallelism(serverConfig.getRocksDBCompactionThreads()); -// -// // Memtable size -// options.setWriteBufferSize( -// isCoreStore(storeName) ? serverConfig.getCoreMemtableSize() : serverConfig.getTimerMemtableSize()); -// -// if (serverConfig.getGlobalRocksdbWriteBufferManager() != null) { -// options.setWriteBufferManager(serverConfig.getGlobalRocksdbWriteBufferManager()); -// } - // Streams default is 3 -// options.setMaxWriteBufferNumber(5); - long rateBytesPerSecond = mbToBytes(120); + log.trace("Overriding rocksdb settings for store {}", storeName); + + LHServerConfig serverConfig = (LHServerConfig) configs.get(LH_SERVER_CONFIG_KEY); + + BlockBasedTableConfigWithAccessibleCache tableConfig = + (BlockBasedTableConfigWithAccessibleCache) options.tableFormatConfig(); + if (serverConfig.getGlobalRocksdbBlockCache() != null) { + // Streams provisions a *NON-shared* 50MB cache for every RocksDB instance. Need + // to .close() it to avoid leaks so that we can provide a global one. + Cache oldCache = tableConfig.blockCache(); + tableConfig.setBlockCache(serverConfig.getGlobalRocksdbBlockCache()); + oldCache.close(); + } + + tableConfig.setOptimizeFiltersForMemory(OPTIMIZE_FILTERS_FOR_MEMORY); + tableConfig.setBlockSize(BLOCK_SIZE); + options.setTableFormatConfig(tableConfig); + + options.setOptimizeFiltersForHits(OPTIMIZE_FILTERS_FOR_HITS); + options.setCompactionStyle(CompactionStyle.UNIVERSAL); + + options.setIncreaseParallelism(serverConfig.getRocksDBCompactionThreads()); + + // Memtable size + options.setWriteBufferSize( + isCoreStore(storeName) ? serverConfig.getCoreMemtableSize() : serverConfig.getTimerMemtableSize()); + + if (serverConfig.getGlobalRocksdbWriteBufferManager() != null) { + options.setWriteBufferManager(serverConfig.getGlobalRocksdbWriteBufferManager()); + } +// Streams default is 3 + options.setMaxWriteBufferNumber(5); + long rateBytesPerSecond = mbToBytes(10); log.info("Rate bytes per second = {}", rateBytesPerSecond); options.setRateLimiter(new RateLimiter(rateBytesPerSecond, DEFAULT_REFILL_PERIOD_MICROS, DEFAULT_FAIRNESS, - DEFAULT_MODE, true)); + DEFAULT_MODE, false)); // Future Work: Enable larger scaling by using Partitioned Index Filters // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters