Skip to content

Commit

Permalink
Merge branch '626-e2e-tests-multiple-servers' into 478-intelligently-…
Browse files Browse the repository at this point in the history
…allow-user-to-set-streams-configurations-with-smart-guardrails
  • Loading branch information
coltmcnealy-lh committed Jan 30, 2024
2 parents 92ab25b + 572bcc5 commit 57a1365
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 18 deletions.
47 changes: 47 additions & 0 deletions server/src/test/resources/log4j2.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Set to debug or trace if log4j initialization is failing
status = ERROR

# Name of the configuration
name = LHServer

# Console root error appender configuration
appender.error.type = Console
appender.error.name = error
appender.error.layout.type = PatternLayout
appender.error.layout.pattern = %d{HH:mm:ss} %highlight{%-5p} %c{1} - %m%n

# Console kafka appender configuration
appender.kafka.type = Console
appender.kafka.name = kafka
appender.kafka.layout.type = PatternLayout
appender.kafka.layout.pattern = %d{HH:mm:ss} %highlight{%-5p} [KAFKA] %c{1} - %m%n

# Console server appender configuration
appender.server.type = Console
appender.server.name = server
appender.server.layout.type = PatternLayout
appender.server.layout.pattern = %d{HH:mm:ss} %highlight{%-5p} [LH] %c{1} - %m%n

# Console grpc appender configuration
appender.grpc.type = Console
appender.grpc.name = grpc
appender.grpc.layout.type = PatternLayout
appender.grpc.layout.pattern = %d{HH:mm:ss} %highlight{%-5p} [GRPC] %c{1} - %m%n

# Root logger level
rootLogger = ERROR, error

# Kafka logger
logger.kafka = WARN, kafka
logger.kafka.name = org.apache.kafka
logger.kafka.additivity = false

# Server logger
logger.server = DEBUG, server
logger.server.name = io.littlehorse
logger.server.additivity = false

# gRPC logger
logger.grpc = WARN, grpc
logger.grpc.name = io.grpc
logger.grpc.additivity = false
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import io.littlehorse.test.exception.LHTestInitializationException;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
Expand All @@ -21,11 +23,13 @@ public class StandaloneTestBootstrapper implements TestBootstrapper {

private LHConfig workerConfig;
private LittleHorseBlockingStub client;
private static final long MEGABYTE = 1024L * 1024L;

private KafkaContainer kafka;
private KafkaStreamsServerImpl server;
private List<KafkaStreamsServerImpl> servers;

public StandaloneTestBootstrapper() {
this.servers = new ArrayList<>();
try {
setup();
} catch (Exception e) {
Expand All @@ -36,37 +40,45 @@ public StandaloneTestBootstrapper() {
public void setup() throws Exception {
kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"));
kafka.start();

kafka.withKraft()
.withEnv("KAFKA_NUM_NETWORK_THREADS", "2")
.withEnv("KAFKA_NUM_BACKGROUND_THREADS", "2")
.withEnv("KAFKA_NUM_IO_THREADS", "2")
.start();
workerConfig = new LHConfig();
client = workerConfig.getBlockingStub();
startServer();
}

private void startServer() throws Exception {
Properties serverProperties = new Properties();
serverProperties.put(LHServerConfig.KAFKA_BOOTSTRAP_KEY, kafka.getBootstrapServers());
serverProperties.put(LHServerConfig.KAFKA_STATE_DIR_KEY, "/tmp/" + UUID.randomUUID());
serverProperties.put(LHServerConfig.CLUSTER_PARTITIONS_KEY, "3");
Properties props1 = getServerProps(2023, 2011, 1822);
Properties props2 = getServerProps(2024, 2012, 1833);

LHServerConfig serverConfig = new LHServerConfig(serverProperties);
LHServerConfig server1Config = new LHServerConfig(props1);
LHServerConfig server2Config = new LHServerConfig(props2);

for (NewTopic topic : serverConfig.getAllTopics()) {
serverConfig.createKafkaTopic(topic);
for (NewTopic topic : server1Config.getAllTopics()) {
server1Config.createKafkaTopic(topic);
}

// wait until topics are created
TimeUnit.SECONDS.sleep(3);

// run the server in another thread
server = new KafkaStreamsServerImpl(serverConfig);

new Thread(() -> {
try {
server.start();
} catch (IOException exn) {
throw new RuntimeException(exn);
}
})
.start();
servers.add(new KafkaStreamsServerImpl(server1Config));
servers.add(new KafkaStreamsServerImpl(server2Config));

for (KafkaStreamsServerImpl server : servers) {
new Thread(() -> {
try {
server.start();
} catch (IOException exn) {
throw new RuntimeException(exn);
}
})
.start();
}

// wait until the server is up
Awaitility.await()
Expand All @@ -78,6 +90,26 @@ private void startServer() throws Exception {
});
}

private Properties getServerProps(int externalPort, int internalPort, int metricsPort) {
Properties serverProperties = new Properties();
serverProperties.put(LHServerConfig.KAFKA_BOOTSTRAP_KEY, kafka.getBootstrapServers());
serverProperties.put(LHServerConfig.KAFKA_STATE_DIR_KEY, "/tmp/" + UUID.randomUUID());
serverProperties.put(LHServerConfig.CLUSTER_PARTITIONS_KEY, "3");
serverProperties.put(LHServerConfig.ROCKSDB_TOTAL_BLOCK_CACHE_BYTES_KEY, String.valueOf(MEGABYTE * 32));
serverProperties.put(LHServerConfig.ROCKSDB_TOTAL_MEMTABLE_BYTES_KEY, String.valueOf(MEGABYTE * 32));
serverProperties.put(LHServerConfig.TIMER_STATESTORE_CACHE_BYTES_KEY, String.valueOf(MEGABYTE * 32));
serverProperties.put(LHServerConfig.CORE_STATESTORE_CACHE_BYTES_KEY, String.valueOf(MEGABYTE * 32));

serverProperties.put(LHServerConfig.INTERNAL_BIND_PORT_KEY, String.valueOf(internalPort));
serverProperties.put(LHServerConfig.INTERNAL_ADVERTISED_PORT_KEY, String.valueOf(internalPort));
serverProperties.put(LHServerConfig.INTERNAL_ADVERTISED_HOST_KEY, "localhost");
serverProperties.put(LHServerConfig.LISTENERS_KEY, "PLAIN:%d".formatted(externalPort));
serverProperties.put(LHServerConfig.ADVERTISED_LISTENERS_KEY, "PLAIN://localhost:%d".formatted(externalPort));
serverProperties.put(LHServerConfig.HEALTH_SERVICE_PORT_KEY, metricsPort);

return serverProperties;
}

@Override
public LHConfig getWorkerConfig() {
return workerConfig;
Expand Down

0 comments on commit 57a1365

Please sign in to comment.