diff --git a/gradle.properties b/gradle.properties index eceb723ed..a6dda7cef 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,6 +1,6 @@ version=0.0.0-development group=io.littlehorse -kafkaVersion=3.8.1 +kafkaVersion=3.9.0 lombokVersion=1.18.32 grpcVersion=1.56.1 junitVersion=5.9.2 diff --git a/local-dev/configs/server-1.config b/local-dev/configs/server-1.config index fdf254e52..3413e5316 100644 --- a/local-dev/configs/server-1.config +++ b/local-dev/configs/server-1.config @@ -8,9 +8,9 @@ LHS_STREAMS_NUM_WARMUP_REPLICAS=8 LHS_STREAMS_NUM_STANDBY_REPLICAS=1 LHS_STREAMS_SESSION_TIMEOUT=10000 LHS_SHOULD_CREATE_TOPICS=true -LHS_CORE_STREAM_THREADS=2 -LHS_STREAMS_METRICS_LEVEL=debug -LHS_METRICS_LEVEL=info +LHS_CORE_STREAM_THREADS=4 +LHS_STREAMS_METRICS_LEVEL=info +LHS_METRICS_LEVEL=trace LHS_STREAMS_TRANSACTION_TIMEOUT_MS=10000 diff --git a/local-dev/configs/server-2.config b/local-dev/configs/server-2.config index f7905a48a..37bfe02bc 100644 --- a/local-dev/configs/server-2.config +++ b/local-dev/configs/server-2.config @@ -8,7 +8,7 @@ LHS_STREAMS_NUM_WARMUP_REPLICAS=8 LHS_STREAMS_NUM_STANDBY_REPLICAS=3 LHS_STREAMS_SESSION_TIMEOUT=10000 LHS_SHOULD_CREATE_TOPICS=true -LHS_CORE_STREAM_THREADS=2 +LHS_CORE_STREAM_THREADS=4 LHS_STREAMS_METRICS_LEVEL=info LHS_HEALTH_SERVICE_PORT=1832 diff --git a/local-dev/configs/server-3.config b/local-dev/configs/server-3.config index 57103f201..8e67774db 100644 --- a/local-dev/configs/server-3.config +++ b/local-dev/configs/server-3.config @@ -8,7 +8,7 @@ LHS_STREAMS_NUM_WARMUP_REPLICAS=8 LHS_STREAMS_NUM_STANDBY_REPLICAS=2 LHS_STREAMS_SESSION_TIMEOUT=10000 LHS_SHOULD_CREATE_TOPICS=true -LHS_CORE_STREAM_THREADS=2 +LHS_CORE_STREAM_THREADS=4 LHS_STREAMS_METRICS_LEVEL=info LHS_HEALTH_SERVICE_PORT=1842 diff --git a/local-dev/do-server.sh b/local-dev/do-server.sh index 3815b4890..6f8705c7a 100755 --- a/local-dev/do-server.sh +++ b/local-dev/do-server.sh @@ -19,5 +19,7 @@ fi cd "$WORK_DIR" +export LD_PRELOAD="/tmp/jemalloc/libjemalloc.so" + ./gradlew server:installDist -x shadowJar -x test ./server/build/install/server/server/server "$CONFIG_PATH" diff --git a/server/src/main/java/io/littlehorse/common/LHServerConfig.java b/server/src/main/java/io/littlehorse/common/LHServerConfig.java index 812377f0a..911a4774c 100644 --- a/server/src/main/java/io/littlehorse/common/LHServerConfig.java +++ b/server/src/main/java/io/littlehorse/common/LHServerConfig.java @@ -181,7 +181,6 @@ protected String[] getEnvKeyPrefixes() { private Admin kafkaAdmin; private LHProducer producer; - private LHProducer txnProducer; public int getHotMetadataPartition() { return (Utils.toPositive(Utils.murmur2(LHConstants.META_PARTITION_KEY.getBytes())) % getClusterPartitions()); @@ -662,7 +661,6 @@ public List getAdvertisedListeners() { public void cleanup() { if (this.kafkaAdmin != null) this.kafkaAdmin.close(); if (this.producer != null) this.producer.close(); - if (this.txnProducer != null) this.txnProducer.close(); } public boolean shouldCreateTopics() { @@ -873,6 +871,7 @@ public Properties getTimerStreamsConfig() { props.put("client.id", this.getClientId("timer")); props.put("processing.guarantee", "at_least_once"); props.put("consumer.isolation.level", "read_uncommitted"); + props.put("state.dir", props.get("state.dir") + File.separator + "timer"); props.put("num.stream.threads", Integer.valueOf(getOrSetDefault(TIMER_STREAM_THREADS_KEY, "1"))); // The timer topology is ALOS, so we can have a larger commit interval with less of a problem. Looking at the diff --git a/server/src/main/java/io/littlehorse/common/util/RocksConfigSetter.java b/server/src/main/java/io/littlehorse/common/util/RocksConfigSetter.java index 471bbba16..c6470e7b0 100644 --- a/server/src/main/java/io/littlehorse/common/util/RocksConfigSetter.java +++ b/server/src/main/java/io/littlehorse/common/util/RocksConfigSetter.java @@ -8,6 +8,11 @@ import org.rocksdb.Cache; import org.rocksdb.CompactionStyle; import org.rocksdb.Options; +import org.rocksdb.RateLimiter; + +import static org.rocksdb.RateLimiter.DEFAULT_FAIRNESS; +import static org.rocksdb.RateLimiter.DEFAULT_MODE; +import static org.rocksdb.RateLimiter.DEFAULT_REFILL_PERIOD_MICROS; @Slf4j public class RocksConfigSetter implements RocksDBConfigSetter { @@ -43,38 +48,42 @@ public class RocksConfigSetter implements RocksDBConfigSetter { @Override public void setConfig(final String storeName, final Options options, final Map configs) { - log.trace("Overriding rocksdb settings for store {}", storeName); - - LHServerConfig serverConfig = (LHServerConfig) configs.get(LH_SERVER_CONFIG_KEY); - - BlockBasedTableConfigWithAccessibleCache tableConfig = - (BlockBasedTableConfigWithAccessibleCache) options.tableFormatConfig(); - if (serverConfig.getGlobalRocksdbBlockCache() != null) { - // Streams provisions a *NON-shared* 50MB cache for every RocksDB instance. Need - // to .close() it to avoid leaks so that we can provide a global one. - Cache oldCache = tableConfig.blockCache(); - tableConfig.setBlockCache(serverConfig.getGlobalRocksdbBlockCache()); - oldCache.close(); - } - - tableConfig.setOptimizeFiltersForMemory(OPTIMIZE_FILTERS_FOR_MEMORY); - tableConfig.setBlockSize(BLOCK_SIZE); - options.setTableFormatConfig(tableConfig); - - options.setOptimizeFiltersForHits(OPTIMIZE_FILTERS_FOR_HITS); - options.setCompactionStyle(CompactionStyle.LEVEL); - - options.setIncreaseParallelism(serverConfig.getRocksDBCompactionThreads()); - - // Memtable size - options.setWriteBufferSize( - isCoreStore(storeName) ? serverConfig.getCoreMemtableSize() : serverConfig.getTimerMemtableSize()); - - if (serverConfig.getGlobalRocksdbWriteBufferManager() != null) { - options.setWriteBufferManager(serverConfig.getGlobalRocksdbWriteBufferManager()); - } +// log.trace("Overriding rocksdb settings for store {}", storeName); +// +// LHServerConfig serverConfig = (LHServerConfig) configs.get(LH_SERVER_CONFIG_KEY); +// +// BlockBasedTableConfigWithAccessibleCache tableConfig = +// (BlockBasedTableConfigWithAccessibleCache) options.tableFormatConfig(); +// if (serverConfig.getGlobalRocksdbBlockCache() != null) { +// // Streams provisions a *NON-shared* 50MB cache for every RocksDB instance. Need +// // to .close() it to avoid leaks so that we can provide a global one. +// Cache oldCache = tableConfig.blockCache(); +// tableConfig.setBlockCache(serverConfig.getGlobalRocksdbBlockCache()); +// oldCache.close(); +// } +// +// tableConfig.setOptimizeFiltersForMemory(OPTIMIZE_FILTERS_FOR_MEMORY); +// tableConfig.setBlockSize(BLOCK_SIZE); +// options.setTableFormatConfig(tableConfig); +// +// options.setOptimizeFiltersForHits(OPTIMIZE_FILTERS_FOR_HITS); +// options.setCompactionStyle(CompactionStyle.LEVEL); +// +// options.setIncreaseParallelism(serverConfig.getRocksDBCompactionThreads()); +// +// // Memtable size +// options.setWriteBufferSize( +// isCoreStore(storeName) ? serverConfig.getCoreMemtableSize() : serverConfig.getTimerMemtableSize()); +// +// if (serverConfig.getGlobalRocksdbWriteBufferManager() != null) { +// options.setWriteBufferManager(serverConfig.getGlobalRocksdbWriteBufferManager()); +// } // Streams default is 3 - options.setMaxWriteBufferNumber(5); +// options.setMaxWriteBufferNumber(5); + long rateBytesPerSecond = mbToBytes(120); + log.info("Rate bytes per second = {}", rateBytesPerSecond); + options.setRateLimiter(new RateLimiter(rateBytesPerSecond, DEFAULT_REFILL_PERIOD_MICROS, DEFAULT_FAIRNESS, + DEFAULT_MODE, true)); // Future Work: Enable larger scaling by using Partitioned Index Filters // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters @@ -86,6 +95,10 @@ public void setConfig(final String storeName, final Options options, final Map { log.info("Closing timer Kafka Streams"); @@ -212,15 +221,6 @@ public void close() { }) .start(); - for (LHServerListener listener : listeners) { - new Thread(() -> { - log.info("Closing listener {}", listener); - listener.close(); - latch.countDown(); - }) - .start(); - } - try { latch.await(); log.info("Done shutting down all LHServer threads"); diff --git a/server/src/main/java/io/littlehorse/server/monitoring/HealthService.java b/server/src/main/java/io/littlehorse/server/monitoring/HealthService.java index f026c9e60..5ef4bd524 100644 --- a/server/src/main/java/io/littlehorse/server/monitoring/HealthService.java +++ b/server/src/main/java/io/littlehorse/server/monitoring/HealthService.java @@ -24,6 +24,7 @@ import org.apache.kafka.streams.processor.StandbyUpdateListener; import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.TaskId; +import org.eclipse.jetty.util.thread.ExecutorThreadPool; @Slf4j public class HealthService implements Closeable, StateRestoreListener, StandbyUpdateListener { @@ -40,6 +41,7 @@ public class HealthService implements Closeable, StateRestoreListener, StandbyUp private KafkaStreams coreStreams; private KafkaStreams timerStreams; + private final ExecutorThreadPool jettyThreadPool = new ExecutorThreadPool(); public HealthService( LHServerConfig config, @@ -50,7 +52,8 @@ public HealthService( BackendInternalComms internalComms) { this.prom = new PrometheusMetricExporter(config); this.numberOfPartitionPerTopic = config.partitionsByTopic(); - + this.jettyThreadPool.setName("javalin-service"); + this.jettyThreadPool.setMaxThreads(10); this.coreState = new InstanceState(coreStreams, internalComms); this.prom.bind( coreStreams, @@ -59,7 +62,7 @@ public HealthService( metadataCache, new StandbyMetrics(standbyStores, config.getLHInstanceName()), coreState); - this.server = Javalin.create(); + this.server = Javalin.create(javalinConfig -> javalinConfig.jetty.threadPool = jettyThreadPool); this.coreStreams = coreStreams; this.timerStreams = timerStreams; diff --git a/server/src/main/java/io/littlehorse/server/streams/taskqueue/PollTaskRequestObserver.java b/server/src/main/java/io/littlehorse/server/streams/taskqueue/PollTaskRequestObserver.java index 784ad8990..b7cc51551 100644 --- a/server/src/main/java/io/littlehorse/server/streams/taskqueue/PollTaskRequestObserver.java +++ b/server/src/main/java/io/littlehorse/server/streams/taskqueue/PollTaskRequestObserver.java @@ -1,5 +1,6 @@ package io.littlehorse.server.streams.taskqueue; +import io.grpc.stub.ServerCallStreamObserver; import io.grpc.stub.StreamObserver; import io.littlehorse.common.LHSerializable; import io.littlehorse.common.LHServerConfig; @@ -48,6 +49,9 @@ public PollTaskRequestObserver( LHServerConfig config, RequestExecutionContext requestContext) { this.responseObserver = responseObserver; + if(responseObserver instanceof ServerCallStreamObserver serverCall) { + serverCall.setOnCancelHandler(() -> {}); + } this.taskQueueManager = manager; this.principalId = principalId; this.tenantId = tenantId; diff --git a/server/src/main/java/io/littlehorse/server/streams/taskqueue/TaskQueueCommandProducer.java b/server/src/main/java/io/littlehorse/server/streams/taskqueue/TaskQueueCommandProducer.java index e92d24549..a1b197698 100644 --- a/server/src/main/java/io/littlehorse/server/streams/taskqueue/TaskQueueCommandProducer.java +++ b/server/src/main/java/io/littlehorse/server/streams/taskqueue/TaskQueueCommandProducer.java @@ -15,6 +15,9 @@ import io.littlehorse.common.util.LHUtil; import io.littlehorse.server.streams.TaskClaimEventProducerCallback; import io.littlehorse.server.streams.util.HeadersUtil; + +import java.io.Closeable; +import java.io.IOException; import java.util.Date; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.Callback; @@ -24,7 +27,7 @@ * Everything related to the task protocol */ @Slf4j -public class TaskQueueCommandProducer { +public class TaskQueueCommandProducer implements Closeable { private final LHProducer producer; private final String commandTopic; @@ -90,4 +93,9 @@ public void send( producer.send( command.getPartitionKey(), command, commandTopic, kafkaProducerCallback, commandMetadata.toArray()); } + + @Override + public void close() { + producer.close(); + } } diff --git a/server/src/main/java/io/littlehorse/server/streams/taskqueue/TaskQueueManager.java b/server/src/main/java/io/littlehorse/server/streams/taskqueue/TaskQueueManager.java index 618c3609e..e3c7b8854 100644 --- a/server/src/main/java/io/littlehorse/server/streams/taskqueue/TaskQueueManager.java +++ b/server/src/main/java/io/littlehorse/server/streams/taskqueue/TaskQueueManager.java @@ -5,6 +5,9 @@ import io.littlehorse.common.model.getable.objectId.TenantIdModel; import io.littlehorse.server.LHServer; import io.littlehorse.server.streams.topology.core.RequestExecutionContext; + +import java.io.Closeable; +import java.io.IOException; import java.util.Collection; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; @@ -13,7 +16,7 @@ import org.apache.kafka.streams.processor.TaskId; @Slf4j -public class TaskQueueManager { +public class TaskQueueManager implements Closeable { private final ConcurrentHashMap taskQueues; @@ -66,6 +69,11 @@ public Collection all() { return taskQueues.values(); } + @Override + public void close() { + taskQueueCommandProducer.close(); + } + private record TenantTaskName(TenantIdModel tenantId, String taskDefName) { public TenantTaskName {