Skip to content

Commit

Permalink
disables config setters
Browse files Browse the repository at this point in the history
  • Loading branch information
eduwercamacaro committed Dec 5, 2024
1 parent 1cefa45 commit 687e548
Show file tree
Hide file tree
Showing 12 changed files with 89 additions and 52 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
version=0.0.0-development
group=io.littlehorse
kafkaVersion=3.8.1
kafkaVersion=3.9.0
lombokVersion=1.18.32
grpcVersion=1.56.1
junitVersion=5.9.2
6 changes: 3 additions & 3 deletions local-dev/configs/server-1.config
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ LHS_STREAMS_NUM_WARMUP_REPLICAS=8
LHS_STREAMS_NUM_STANDBY_REPLICAS=1
LHS_STREAMS_SESSION_TIMEOUT=10000
LHS_SHOULD_CREATE_TOPICS=true
LHS_CORE_STREAM_THREADS=2
LHS_STREAMS_METRICS_LEVEL=debug
LHS_METRICS_LEVEL=info
LHS_CORE_STREAM_THREADS=4
LHS_STREAMS_METRICS_LEVEL=info
LHS_METRICS_LEVEL=trace

LHS_STREAMS_TRANSACTION_TIMEOUT_MS=10000

Expand Down
2 changes: 1 addition & 1 deletion local-dev/configs/server-2.config
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ LHS_STREAMS_NUM_WARMUP_REPLICAS=8
LHS_STREAMS_NUM_STANDBY_REPLICAS=3
LHS_STREAMS_SESSION_TIMEOUT=10000
LHS_SHOULD_CREATE_TOPICS=true
LHS_CORE_STREAM_THREADS=2
LHS_CORE_STREAM_THREADS=4
LHS_STREAMS_METRICS_LEVEL=info

LHS_HEALTH_SERVICE_PORT=1832
Expand Down
2 changes: 1 addition & 1 deletion local-dev/configs/server-3.config
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ LHS_STREAMS_NUM_WARMUP_REPLICAS=8
LHS_STREAMS_NUM_STANDBY_REPLICAS=2
LHS_STREAMS_SESSION_TIMEOUT=10000
LHS_SHOULD_CREATE_TOPICS=true
LHS_CORE_STREAM_THREADS=2
LHS_CORE_STREAM_THREADS=4
LHS_STREAMS_METRICS_LEVEL=info

LHS_HEALTH_SERVICE_PORT=1842
Expand Down
2 changes: 2 additions & 0 deletions local-dev/do-server.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,7 @@ fi

cd "$WORK_DIR"

export LD_PRELOAD="/tmp/jemalloc/libjemalloc.so"

./gradlew server:installDist -x shadowJar -x test
./server/build/install/server/server/server "$CONFIG_PATH"
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ protected String[] getEnvKeyPrefixes() {

private Admin kafkaAdmin;
private LHProducer producer;
private LHProducer txnProducer;

public int getHotMetadataPartition() {
return (Utils.toPositive(Utils.murmur2(LHConstants.META_PARTITION_KEY.getBytes())) % getClusterPartitions());
Expand Down Expand Up @@ -662,7 +661,6 @@ public List<AdvertisedListenerConfig> getAdvertisedListeners() {
public void cleanup() {
if (this.kafkaAdmin != null) this.kafkaAdmin.close();
if (this.producer != null) this.producer.close();
if (this.txnProducer != null) this.txnProducer.close();
}

public boolean shouldCreateTopics() {
Expand Down Expand Up @@ -873,6 +871,7 @@ public Properties getTimerStreamsConfig() {
props.put("client.id", this.getClientId("timer"));
props.put("processing.guarantee", "at_least_once");
props.put("consumer.isolation.level", "read_uncommitted");
props.put("state.dir", props.get("state.dir") + File.separator + "timer");
props.put("num.stream.threads", Integer.valueOf(getOrSetDefault(TIMER_STREAM_THREADS_KEY, "1")));

// The timer topology is ALOS, so we can have a larger commit interval with less of a problem. Looking at the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@
import org.rocksdb.Cache;
import org.rocksdb.CompactionStyle;
import org.rocksdb.Options;
import org.rocksdb.RateLimiter;

import static org.rocksdb.RateLimiter.DEFAULT_FAIRNESS;
import static org.rocksdb.RateLimiter.DEFAULT_MODE;
import static org.rocksdb.RateLimiter.DEFAULT_REFILL_PERIOD_MICROS;

@Slf4j
public class RocksConfigSetter implements RocksDBConfigSetter {
Expand Down Expand Up @@ -43,38 +48,42 @@ public class RocksConfigSetter implements RocksDBConfigSetter {

@Override
public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
log.trace("Overriding rocksdb settings for store {}", storeName);

LHServerConfig serverConfig = (LHServerConfig) configs.get(LH_SERVER_CONFIG_KEY);

BlockBasedTableConfigWithAccessibleCache tableConfig =
(BlockBasedTableConfigWithAccessibleCache) options.tableFormatConfig();
if (serverConfig.getGlobalRocksdbBlockCache() != null) {
// Streams provisions a *NON-shared* 50MB cache for every RocksDB instance. Need
// to .close() it to avoid leaks so that we can provide a global one.
Cache oldCache = tableConfig.blockCache();
tableConfig.setBlockCache(serverConfig.getGlobalRocksdbBlockCache());
oldCache.close();
}

tableConfig.setOptimizeFiltersForMemory(OPTIMIZE_FILTERS_FOR_MEMORY);
tableConfig.setBlockSize(BLOCK_SIZE);
options.setTableFormatConfig(tableConfig);

options.setOptimizeFiltersForHits(OPTIMIZE_FILTERS_FOR_HITS);
options.setCompactionStyle(CompactionStyle.LEVEL);

options.setIncreaseParallelism(serverConfig.getRocksDBCompactionThreads());

// Memtable size
options.setWriteBufferSize(
isCoreStore(storeName) ? serverConfig.getCoreMemtableSize() : serverConfig.getTimerMemtableSize());

if (serverConfig.getGlobalRocksdbWriteBufferManager() != null) {
options.setWriteBufferManager(serverConfig.getGlobalRocksdbWriteBufferManager());
}
// log.trace("Overriding rocksdb settings for store {}", storeName);
//
// LHServerConfig serverConfig = (LHServerConfig) configs.get(LH_SERVER_CONFIG_KEY);
//
// BlockBasedTableConfigWithAccessibleCache tableConfig =
// (BlockBasedTableConfigWithAccessibleCache) options.tableFormatConfig();
// if (serverConfig.getGlobalRocksdbBlockCache() != null) {
// // Streams provisions a *NON-shared* 50MB cache for every RocksDB instance. Need
// // to .close() it to avoid leaks so that we can provide a global one.
// Cache oldCache = tableConfig.blockCache();
// tableConfig.setBlockCache(serverConfig.getGlobalRocksdbBlockCache());
// oldCache.close();
// }
//
// tableConfig.setOptimizeFiltersForMemory(OPTIMIZE_FILTERS_FOR_MEMORY);
// tableConfig.setBlockSize(BLOCK_SIZE);
// options.setTableFormatConfig(tableConfig);
//
// options.setOptimizeFiltersForHits(OPTIMIZE_FILTERS_FOR_HITS);
// options.setCompactionStyle(CompactionStyle.LEVEL);
//
// options.setIncreaseParallelism(serverConfig.getRocksDBCompactionThreads());
//
// // Memtable size
// options.setWriteBufferSize(
// isCoreStore(storeName) ? serverConfig.getCoreMemtableSize() : serverConfig.getTimerMemtableSize());
//
// if (serverConfig.getGlobalRocksdbWriteBufferManager() != null) {
// options.setWriteBufferManager(serverConfig.getGlobalRocksdbWriteBufferManager());
// }
// Streams default is 3
options.setMaxWriteBufferNumber(5);
// options.setMaxWriteBufferNumber(5);
long rateBytesPerSecond = mbToBytes(120);
log.info("Rate bytes per second = {}", rateBytesPerSecond);
options.setRateLimiter(new RateLimiter(rateBytesPerSecond, DEFAULT_REFILL_PERIOD_MICROS, DEFAULT_FAIRNESS,
DEFAULT_MODE, true));

// Future Work: Enable larger scaling by using Partitioned Index Filters
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
Expand All @@ -86,6 +95,10 @@ public void setConfig(final String storeName, final Options options, final Map<S
@Override
public void close(final String storeName, final Options options) {}

private long mbToBytes(long mb){
return mb * 1024 * 1024;
}

private boolean isCoreStore(String storeName) {
return !storeName.contains("timer");
}
Expand Down
18 changes: 9 additions & 9 deletions server/src/main/java/io/littlehorse/server/LHServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.littlehorse.server.auth.internalport.InternalCallCredentials;
import io.littlehorse.server.listener.ServerListenerConfig;
import io.littlehorse.server.monitoring.HealthService;
import io.littlehorse.server.monitoring.metrics.MetricsCollectorRestoreListener;
import io.littlehorse.server.streams.BackendInternalComms;
import io.littlehorse.server.streams.ServerTopology;
import io.littlehorse.server.streams.taskqueue.TaskQueueCommandProducer;
Expand All @@ -29,6 +30,7 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -181,6 +183,13 @@ public void start() throws IOException {

public void close() {
CountDownLatch latch = new CountDownLatch(4 + listeners.size());
for (LHServerListener listener : listeners) {
log.info("Closing listener {}", listener);
listener.close();
latch.countDown();
}
commandProducer.close();
taskQueueManager.close();

new Thread(() -> {
log.info("Closing timer Kafka Streams");
Expand Down Expand Up @@ -212,15 +221,6 @@ public void close() {
})
.start();

for (LHServerListener listener : listeners) {
new Thread(() -> {
log.info("Closing listener {}", listener);
listener.close();
latch.countDown();
})
.start();
}

try {
latch.await();
log.info("Done shutting down all LHServer threads");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.kafka.streams.processor.StandbyUpdateListener;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
import org.eclipse.jetty.util.thread.ExecutorThreadPool;

@Slf4j
public class HealthService implements Closeable, StateRestoreListener, StandbyUpdateListener {
Expand All @@ -40,6 +41,7 @@ public class HealthService implements Closeable, StateRestoreListener, StandbyUp

private KafkaStreams coreStreams;
private KafkaStreams timerStreams;
private final ExecutorThreadPool jettyThreadPool = new ExecutorThreadPool();

public HealthService(
LHServerConfig config,
Expand All @@ -50,7 +52,8 @@ public HealthService(
BackendInternalComms internalComms) {
this.prom = new PrometheusMetricExporter(config);
this.numberOfPartitionPerTopic = config.partitionsByTopic();

this.jettyThreadPool.setName("javalin-service");
this.jettyThreadPool.setMaxThreads(10);
this.coreState = new InstanceState(coreStreams, internalComms);
this.prom.bind(
coreStreams,
Expand All @@ -59,7 +62,7 @@ public HealthService(
metadataCache,
new StandbyMetrics(standbyStores, config.getLHInstanceName()),
coreState);
this.server = Javalin.create();
this.server = Javalin.create(javalinConfig -> javalinConfig.jetty.threadPool = jettyThreadPool);

this.coreStreams = coreStreams;
this.timerStreams = timerStreams;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.littlehorse.server.streams.taskqueue;

import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import io.littlehorse.common.LHSerializable;
import io.littlehorse.common.LHServerConfig;
Expand Down Expand Up @@ -48,6 +49,9 @@ public PollTaskRequestObserver(
LHServerConfig config,
RequestExecutionContext requestContext) {
this.responseObserver = responseObserver;
if(responseObserver instanceof ServerCallStreamObserver<PollTaskResponse> serverCall) {
serverCall.setOnCancelHandler(() -> {});
}
this.taskQueueManager = manager;
this.principalId = principalId;
this.tenantId = tenantId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
import io.littlehorse.common.util.LHUtil;
import io.littlehorse.server.streams.TaskClaimEventProducerCallback;
import io.littlehorse.server.streams.util.HeadersUtil;

import java.io.Closeable;
import java.io.IOException;
import java.util.Date;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.Callback;
Expand All @@ -24,7 +27,7 @@
* Everything related to the task protocol
*/
@Slf4j
public class TaskQueueCommandProducer {
public class TaskQueueCommandProducer implements Closeable {

private final LHProducer producer;
private final String commandTopic;
Expand Down Expand Up @@ -90,4 +93,9 @@ public void send(
producer.send(
command.getPartitionKey(), command, commandTopic, kafkaProducerCallback, commandMetadata.toArray());
}

@Override
public void close() {
producer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
import io.littlehorse.common.model.getable.objectId.TenantIdModel;
import io.littlehorse.server.LHServer;
import io.littlehorse.server.streams.topology.core.RequestExecutionContext;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -13,7 +16,7 @@
import org.apache.kafka.streams.processor.TaskId;

@Slf4j
public class TaskQueueManager {
public class TaskQueueManager implements Closeable {

private final ConcurrentHashMap<TenantTaskName, OneTaskQueue> taskQueues;

Expand Down Expand Up @@ -66,6 +69,11 @@ public Collection<OneTaskQueue> all() {
return taskQueues.values();
}

@Override
public void close() {
taskQueueCommandProducer.close();
}

private record TenantTaskName(TenantIdModel tenantId, String taskDefName) {

public TenantTaskName {
Expand Down

0 comments on commit 687e548

Please sign in to comment.