diff --git a/canary/build.gradle b/canary/build.gradle index d15cd4077..8a3e4dbf3 100644 --- a/canary/build.gradle +++ b/canary/build.gradle @@ -9,6 +9,7 @@ plugins { } repositories { + mavenLocal() mavenCentral() } diff --git a/canary/canary.properties b/canary/canary.properties index 64d6086db..88d67ead6 100644 --- a/canary/canary.properties +++ b/canary/canary.properties @@ -5,12 +5,15 @@ lh.canary.workflow.creation.enable=true lh.canary.topic.replicas=1 lh.canary.topic.partitions=1 # Aggregator settings -lh.canary.aggregator.enable=true +lh.canary.aggregator.enable=false # Metronome settings -lh.canary.metronome.enable=true +lh.canary.metronome.enable=false lh.canary.metronome.worker.enable=true -lh.canary.metronome.frequency.ms=1000 -lh.canary.metronome.run.threads=1 -lh.canary.metronome.run.requests=300 +lh.canary.metronome.run.frequency.ms=1000 +lh.canary.metronome.run.threads=20 +lh.canary.metronome.run.requests=700 lh.canary.metronome.run.sample.rate=1 +lh.canary.lhw.num.worker.threads=20 +lh.canary.metronome.server.id=lh +lh.canary.metronome.server.dataplane.id=localhost diff --git a/canary/src/main/java/io/littlehorse/canary/metronome/MetronomeWorker.java b/canary/src/main/java/io/littlehorse/canary/metronome/MetronomeWorker.java index f15047a99..53a69b4bd 100644 --- a/canary/src/main/java/io/littlehorse/canary/metronome/MetronomeWorker.java +++ b/canary/src/main/java/io/littlehorse/canary/metronome/MetronomeWorker.java @@ -30,7 +30,7 @@ public MetronomeWorker(final BeatProducer producer, final LHConfig lhConfig) { @LHTaskMethod(MetronomeWorkflow.TASK_NAME) public void executeTask(final long startTime, final boolean sampleIteration, final WorkerContext context) { final String id = "%s/%s".formatted(context.getIdempotencyKey(), context.getAttemptNumber()); - log.debug("Executing task {} {}", MetronomeWorkflow.TASK_NAME, id); + log.info("Executing task {} {}", MetronomeWorkflow.TASK_NAME, id); if (sampleIteration) { producer.send( id, BeatType.TASK_RUN_EXECUTION, Duration.between(Instant.ofEpochMilli(startTime), Instant.now())); diff --git a/canary/src/main/resources/META-INF/microprofile-config.properties b/canary/src/main/resources/META-INF/microprofile-config.properties index d2c1db5ec..e409f979e 100644 --- a/canary/src/main/resources/META-INF/microprofile-config.properties +++ b/canary/src/main/resources/META-INF/microprofile-config.properties @@ -7,7 +7,7 @@ lh.canary.id=canary-default lh.canary.topic.name=canary-beats lh.canary.topic.creation.enable=true lh.canary.topic.creation.timeout.ms=5000 -lh.canary.topic.replicas=3 +lh.canary.topic.replicas=1 lh.canary.topic.partitions=12 lh.canary.workflow.creation.enable=true lh.canary.workflow.name=canary-workflow diff --git a/canary/src/main/resources/log4j2.properties b/canary/src/main/resources/log4j2.properties index bd719f5e7..fc21b66c1 100644 --- a/canary/src/main/resources/log4j2.properties +++ b/canary/src/main/resources/log4j2.properties @@ -38,7 +38,7 @@ appender.grpc.layout.pattern = %d{HH:mm:ss} %highlight{%-5p} [GRPC] %c - %m%n rootLogger = ERROR, error # Kafka logger -logger.kafka = WARN, kafka +logger.kafka = INFO, kafka logger.kafka.name = org.apache.kafka logger.kafka.additivity = false diff --git a/docker/server/Dockerfile b/docker/server/Dockerfile index 6f908ff2a..98c7db2ef 100644 --- a/docker/server/Dockerfile +++ b/docker/server/Dockerfile @@ -1,5 +1,7 @@ FROM amazoncorretto:21 WORKDIR /lh +RUN amazon-linux-extras install epel -y && yum install jemalloc -y +ENV LD_PRELOAD=/usr/lib64/libjemalloc.so.1 COPY ./docker/server/docker-entrypoint.sh /lh COPY ./server/build/libs/server-*-all.jar /lh/server.jar ENTRYPOINT ["/lh/docker-entrypoint.sh"] diff --git a/gradle.properties b/gradle.properties index aa854c956..a6dda7cef 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,6 +1,6 @@ version=0.0.0-development group=io.littlehorse -kafkaVersion=3.8.0 +kafkaVersion=3.9.0 lombokVersion=1.18.32 grpcVersion=1.56.1 junitVersion=5.9.2 diff --git a/local-dev/configs/server-1.config b/local-dev/configs/server-1.config index 773f7591d..3413e5316 100644 --- a/local-dev/configs/server-1.config +++ b/local-dev/configs/server-1.config @@ -5,12 +5,12 @@ LHS_REPLICATION_FACTOR=1 LHS_CLUSTER_PARTITIONS=12 LHS_STATE_DIR=/tmp/kafkaState LHS_STREAMS_NUM_WARMUP_REPLICAS=8 -LHS_STREAMS_NUM_STANDBY_REPLICAS=0 +LHS_STREAMS_NUM_STANDBY_REPLICAS=1 LHS_STREAMS_SESSION_TIMEOUT=10000 LHS_SHOULD_CREATE_TOPICS=true -LHS_CORE_STREAM_THREADS=2 -LHS_STREAMS_METRICS_LEVEL=debug -LHS_METRICS_LEVEL=info +LHS_CORE_STREAM_THREADS=4 +LHS_STREAMS_METRICS_LEVEL=info +LHS_METRICS_LEVEL=trace LHS_STREAMS_TRANSACTION_TIMEOUT_MS=10000 diff --git a/local-dev/configs/server-2.config b/local-dev/configs/server-2.config index 1c4d8583c..37bfe02bc 100644 --- a/local-dev/configs/server-2.config +++ b/local-dev/configs/server-2.config @@ -5,10 +5,10 @@ LHS_REPLICATION_FACTOR=1 LHS_CLUSTER_PARTITIONS=12 LHS_STATE_DIR=/tmp/kafkaStateTwo LHS_STREAMS_NUM_WARMUP_REPLICAS=8 -LHS_STREAMS_NUM_STANDBY_REPLICAS=0 +LHS_STREAMS_NUM_STANDBY_REPLICAS=3 LHS_STREAMS_SESSION_TIMEOUT=10000 LHS_SHOULD_CREATE_TOPICS=true -LHS_CORE_STREAM_THREADS=2 +LHS_CORE_STREAM_THREADS=4 LHS_STREAMS_METRICS_LEVEL=info LHS_HEALTH_SERVICE_PORT=1832 diff --git a/local-dev/configs/server-3.config b/local-dev/configs/server-3.config index 1a21f3806..8e67774db 100644 --- a/local-dev/configs/server-3.config +++ b/local-dev/configs/server-3.config @@ -5,10 +5,10 @@ LHS_REPLICATION_FACTOR=1 LHS_CLUSTER_PARTITIONS=12 LHS_STATE_DIR=/tmp/kafkaStateThree LHS_STREAMS_NUM_WARMUP_REPLICAS=8 -LHS_STREAMS_NUM_STANDBY_REPLICAS=0 +LHS_STREAMS_NUM_STANDBY_REPLICAS=2 LHS_STREAMS_SESSION_TIMEOUT=10000 LHS_SHOULD_CREATE_TOPICS=true -LHS_CORE_STREAM_THREADS=2 +LHS_CORE_STREAM_THREADS=4 LHS_STREAMS_METRICS_LEVEL=info LHS_HEALTH_SERVICE_PORT=1842 diff --git a/local-dev/do-canary.sh b/local-dev/do-canary.sh index 394adb509..45744c3c9 100755 --- a/local-dev/do-canary.sh +++ b/local-dev/do-canary.sh @@ -7,5 +7,5 @@ WORK_DIR=$(cd "$SCRIPT_DIR/.." && pwd) cd "$WORK_DIR" -./gradlew canary:installDist +#./gradlew canary:installDist ./canary/build/install/canary/bin/canary canary/canary.properties diff --git a/local-dev/do-server.sh b/local-dev/do-server.sh index 3815b4890..6f8705c7a 100755 --- a/local-dev/do-server.sh +++ b/local-dev/do-server.sh @@ -19,5 +19,7 @@ fi cd "$WORK_DIR" +export LD_PRELOAD="/tmp/jemalloc/libjemalloc.so" + ./gradlew server:installDist -x shadowJar -x test ./server/build/install/server/server/server "$CONFIG_PATH" diff --git a/local-dev/docker-compose.yml b/local-dev/docker-compose.yml new file mode 100644 index 000000000..b4a2cdac9 --- /dev/null +++ b/local-dev/docker-compose.yml @@ -0,0 +1,11 @@ +services: + kafka: + ports: + - "9092:9092" + container_name: lh-server-kafka + image: apache/kafka:3.8.0 + deploy: + resources: + limits: + cpus: '3500' + memory: '12G' \ No newline at end of file diff --git a/local-dev/setup.sh b/local-dev/setup.sh index a2b40da12..b66ebca18 100755 --- a/local-dev/setup.sh +++ b/local-dev/setup.sh @@ -12,6 +12,11 @@ services: - "9092:9092" container_name: lh-server-kafka image: apache/kafka:3.8.0 + deploy: + resources: + limits: + cpus: '3.5' + memory: '12G' EOF ) diff --git a/sdk-java/src/main/java/io/littlehorse/sdk/common/config/LHConfig.java b/sdk-java/src/main/java/io/littlehorse/sdk/common/config/LHConfig.java index 15e6fd87d..3de4b1fa0 100644 --- a/sdk-java/src/main/java/io/littlehorse/sdk/common/config/LHConfig.java +++ b/sdk-java/src/main/java/io/littlehorse/sdk/common/config/LHConfig.java @@ -450,6 +450,6 @@ protected String[] getEnvKeyPrefixes() { * @return the number of worker threads to run. */ public int getWorkerThreads() { - return Integer.valueOf(getOrSetDefault(NUM_WORKER_THREADS_KEY, "2")); + return Integer.valueOf(getOrSetDefault(NUM_WORKER_THREADS_KEY, "3")); } } diff --git a/server/build.gradle b/server/build.gradle index ed4f47415..fb6cb52d4 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -21,6 +21,7 @@ publishing { } repositories { + mavenLocal() mavenCentral() maven { url = uri("https://packages.confluent.io/maven/") diff --git a/server/src/main/java/io/littlehorse/common/LHConstants.java b/server/src/main/java/io/littlehorse/common/LHConstants.java index 2dac9236e..5cc433717 100644 --- a/server/src/main/java/io/littlehorse/common/LHConstants.java +++ b/server/src/main/java/io/littlehorse/common/LHConstants.java @@ -68,7 +68,7 @@ public class LHConstants { public static final String PARTITION_METRICS_KEY = "partitionMetrics"; public static final Duration MAX_INCOMING_REQUEST_IDLE_TIME = Duration.ofSeconds(60); - public static final int MAX_TASKRUNS_IN_ONE_TASKQUEUE = 20_000; + public static final int MAX_TASKRUNS_IN_ONE_TASKQUEUE = 2000; public static final String STRING_MASK = "*****"; } diff --git a/server/src/main/java/io/littlehorse/common/LHProductionExceptionHandler.java b/server/src/main/java/io/littlehorse/common/LHProductionExceptionHandler.java new file mode 100644 index 000000000..b653f0c2c --- /dev/null +++ b/server/src/main/java/io/littlehorse/common/LHProductionExceptionHandler.java @@ -0,0 +1,24 @@ +package io.littlehorse.common; + +import java.util.Map; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.errors.TransactionAbortedException; +import org.apache.kafka.streams.errors.ProductionExceptionHandler; + +public class LHProductionExceptionHandler implements ProductionExceptionHandler { + + public LHProductionExceptionHandler() {} + + @Override + public ProductionExceptionHandlerResponse handle(ProducerRecord record, Exception exception) { + if (exception instanceof TransactionAbortedException) { + return ProductionExceptionHandlerResponse.CONTINUE; + } + return ProductionExceptionHandlerResponse.FAIL; + } + + @Override + public void configure(Map configs) { + // nothing to do + } +} diff --git a/server/src/main/java/io/littlehorse/common/LHServerConfig.java b/server/src/main/java/io/littlehorse/common/LHServerConfig.java index ff8835f02..625c8f7ce 100644 --- a/server/src/main/java/io/littlehorse/common/LHServerConfig.java +++ b/server/src/main/java/io/littlehorse/common/LHServerConfig.java @@ -45,7 +45,6 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler; import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; import org.jetbrains.annotations.Nullable; import org.rocksdb.Cache; @@ -182,7 +181,6 @@ protected String[] getEnvKeyPrefixes() { private Admin kafkaAdmin; private LHProducer producer; - private LHProducer txnProducer; public int getHotMetadataPartition() { return (Utils.toPositive(Utils.murmur2(LHConstants.META_PARTITION_KEY.getBytes())) % getClusterPartitions()); @@ -663,14 +661,6 @@ public List getAdvertisedListeners() { public void cleanup() { if (this.kafkaAdmin != null) this.kafkaAdmin.close(); if (this.producer != null) this.producer.close(); - if (this.txnProducer != null) this.txnProducer.close(); - } - - public LHProducer getProducer() { - if (producer == null) { - producer = new LHProducer(this); - } - return producer; } public boolean shouldCreateTopics() { @@ -694,7 +684,7 @@ public long getTimerMemtableSize() { return Long.valueOf(getOrSetDefault(TIMER_MEMTABLE_SIZE_BYTES_KEY, String.valueOf(1024L * 1024L * 32))); } - public Properties getKafkaProducerConfig(String component) { + public Properties getKafkaCommandProducerConfig(String component) { Properties conf = new Properties(); conf.put("client.id", this.getClientId(component)); conf.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, "rebootstrap"); @@ -708,11 +698,23 @@ public Properties getKafkaProducerConfig(String component) { ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class); conf.put(ProducerConfig.ACKS_CONFIG, "all"); + conf.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000"); + conf.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "10000"); conf.put(ProducerConfig.LINGER_MS_CONFIG, getOrSetDefault(LINGER_MS_KEY, "0")); addKafkaSecuritySettings(conf); return conf; } + public Properties getKafkaTaskClaimProducer() { + Properties conf = getKafkaCommandProducerConfig("task-claim"); + // conf.put(ProducerConfig.ACKS_CONFIG, "0"); + // conf.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false"); + // conf.put(ProducerConfig.LINGER_MS_CONFIG, "200"); + // conf.put(ProducerConfig.BATCH_SIZE_CONFIG, "49152"); + // conf.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1000"); + return conf; + } + /* * Right now, this only supports mtls auth. We want to: * 1. Support other auth types, especially SCRAM-SHA-512 @@ -871,6 +873,7 @@ public Properties getTimerStreamsConfig() { props.put("client.id", this.getClientId("timer")); props.put("processing.guarantee", "at_least_once"); props.put("consumer.isolation.level", "read_uncommitted"); + props.put("state.dir", props.get("state.dir") + File.separator + "timer"); props.put("num.stream.threads", Integer.valueOf(getOrSetDefault(TIMER_STREAM_THREADS_KEY, "1"))); // The timer topology is ALOS, so we can have a larger commit interval with less of a problem. Looking at the @@ -953,7 +956,7 @@ private Properties getBaseStreamsConfig() { // Configs required by KafkaStreams. Some of these are overriden by the application logic itself. props.put("default.deserialization.exception.handler", LogAndContinueExceptionHandler.class); - props.put("default.production.exception.handler", DefaultProductionExceptionHandler.class); + props.put("default.production.exception.handler", LHProductionExceptionHandler.class); props.put("default.value.serde", Serdes.StringSerde.class.getName()); props.put("default.key.serde", Serdes.StringSerde.class.getName()); @@ -989,7 +992,16 @@ private Properties getBaseStreamsConfig() { // in the case of a server failure while a request is being processed, the resulting // `Command` should be processed on a new server within a minute. Issue #479 // should verify this behavior - props.put("consumer.session.timeout.ms", getStreamsSessionTimeout()); + props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, getStreamsSessionTimeout()); + props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, getStreamsSessionTimeout()); + // props.put( + // ConsumerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG, + // "120000"); + // props.put( + // ConsumerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, + // "60000"); + // props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, + // "240000"); // In case we need to authenticate to Kafka, this sets it. addKafkaSecuritySettings(props); diff --git a/server/src/main/java/io/littlehorse/common/model/corecommand/subcommand/TaskClaimEvent.java b/server/src/main/java/io/littlehorse/common/model/corecommand/subcommand/TaskClaimEvent.java index 6073c3042..8f85d0cc2 100644 --- a/server/src/main/java/io/littlehorse/common/model/corecommand/subcommand/TaskClaimEvent.java +++ b/server/src/main/java/io/littlehorse/common/model/corecommand/subcommand/TaskClaimEvent.java @@ -1,5 +1,6 @@ package io.littlehorse.common.model.corecommand.subcommand; +import com.google.protobuf.Empty; import com.google.protobuf.Message; import io.grpc.Status; import io.littlehorse.common.LHSerializable; @@ -11,8 +12,6 @@ import io.littlehorse.common.model.getable.objectId.TaskRunIdModel; import io.littlehorse.common.proto.TaskClaimEventPb; import io.littlehorse.common.util.LHUtil; -import io.littlehorse.sdk.common.proto.PollTaskResponse; -import io.littlehorse.server.streams.taskqueue.PollTaskRequestObserver; import io.littlehorse.server.streams.topology.core.ExecutionContext; import io.littlehorse.server.streams.topology.core.ProcessorExecutionContext; import java.util.Date; @@ -40,11 +39,11 @@ public class TaskClaimEvent extends CoreSubCommand { public TaskClaimEvent() {} - public TaskClaimEvent(ScheduledTaskModel task, PollTaskRequestObserver taskClaimer) { + public TaskClaimEvent(ScheduledTaskModel task, String taskWorkerVersion, String taskWorkerId) { this.taskRunId = task.getTaskRunId(); this.time = new Date(); - this.taskWorkerId = taskClaimer.getClientId(); - this.taskWorkerVersion = taskClaimer.getTaskWorkerVersion(); + this.taskWorkerId = taskWorkerId; + this.taskWorkerVersion = taskWorkerVersion; } public Class getProtoBaseClass() { @@ -66,37 +65,35 @@ public TaskClaimEventPb.Builder toProto() { } public boolean hasResponse() { - // TaskClaimEvents are always due to a Task Worker's poll request. - return true; + return false; } @Override - public PollTaskResponse process(ProcessorExecutionContext executionContext, LHServerConfig config) { + public Empty process(ProcessorExecutionContext executionContext, LHServerConfig config) { TaskRunModel taskRun = executionContext.getableManager().get(taskRunId); if (taskRun == null) { log.warn("Got claimTask for non-existent taskRun {}", taskRunId); throw new LHApiException(Status.INVALID_ARGUMENT, "Got claimTask for nonexistent taskRun {}" + taskRunId); } - + executionContext.getTaskManager().markTaskAsScheduled(taskRunId); + taskRun.onTaskAttemptStarted(this); + return Empty.newBuilder().build(); // Needs to be done before we process the event, since processing the event // will delete the task schedule request. - ScheduledTaskModel scheduledTask = executionContext.getTaskManager().markTaskAsScheduled(taskRunId); - - // It's totally fine for the scheduledTask to be null. That happens when someone already - // claimed that task. This happens when a server is recovering from a crash. The fact that it - // is null prevents it from being scheduled twice. + // ScheduledTaskModel scheduledTask = executionContext.getTaskManager().markTaskAsScheduled(taskRunId); // - // We shouldn't throw an error on this, we just return an empty optional. - if (scheduledTask == null) { - log.warn("Processing pollTaskRequest for task {} that was already claimed", taskRunId); - return PollTaskResponse.newBuilder().build(); - } else { - taskRun.onTaskAttemptStarted(this); - executionContext.getableManager().get(taskRunId.wfRunId).advance(time); - return PollTaskResponse.newBuilder() - .setResult(scheduledTask.toProto()) - .build(); - } + // // It's totally fine for the scheduledTask to be null. That happens when someone already + // // claimed that task. This happens when a server is recovering from a crash. The fact that it + // // is null prevents it from being scheduled twice. + // // + // // We shouldn't throw an error on this, we just return an empty optional. + // if (scheduledTask == null) { + // return Empty.newBuilder().build(); + // } else { + // taskRun.onTaskAttemptStarted(this); + // executionContext.getableManager().get(taskRunId.wfRunId).advance(time); + // return Empty.newBuilder().build(); + // } } public static TaskClaimEvent fromProto(TaskClaimEventPb proto, ExecutionContext context) { diff --git a/server/src/main/java/io/littlehorse/common/model/getable/core/taskrun/TaskRunModel.java b/server/src/main/java/io/littlehorse/common/model/getable/core/taskrun/TaskRunModel.java index 540ef918d..f945f92ec 100644 --- a/server/src/main/java/io/littlehorse/common/model/getable/core/taskrun/TaskRunModel.java +++ b/server/src/main/java/io/littlehorse/common/model/getable/core/taskrun/TaskRunModel.java @@ -270,6 +270,27 @@ public void onTaskAttemptStarted(TaskClaimEvent se) { attempt.setStatus(TaskStatus.TASK_RUNNING); } + public void onTaskAttemptStarted() { + transitionTo(TaskStatus.TASK_RUNNING); + + // create a timer to mark the task is timeout if it does not finish + // ReportTaskRunModel taskResult = new ReportTaskRunModel(); + // taskResult.setTaskRunId(id); + // taskResult.setTime(new Date(System.currentTimeMillis() + (1000 * timeoutSeconds))); + // taskResult.setStatus(TaskStatus.TASK_TIMEOUT); + // CommandModel timerCommand = new CommandModel(taskResult, taskResult.getTime()); + // LHTimer timer = new LHTimer(timerCommand); + // processorContext.getTaskManager().scheduleTimer(timer); + + // Now that that's out of the way, we can mark the TaskRun as running. + // Also we need to save the task worker version and client id. + TaskAttemptModel attempt = getLatestAttempt(); + attempt.setTaskWorkerId("eduwer"); + attempt.setTaskWorkerVersion("camacaro"); + attempt.setStartTime(new Date()); + attempt.setStatus(TaskStatus.TASK_RUNNING); + } + public void onTaskAttemptResultReported(ReportTaskRunModel ce) { if (ce.getAttemptNumber() >= attempts.size()) { throw new LHApiException(Status.INVALID_ARGUMENT, "Specified Task Attempt does not exist!"); diff --git a/server/src/main/java/io/littlehorse/common/util/LHProducer.java b/server/src/main/java/io/littlehorse/common/util/LHProducer.java index aa6618d91..23a1918e2 100644 --- a/server/src/main/java/io/littlehorse/common/util/LHProducer.java +++ b/server/src/main/java/io/littlehorse/common/util/LHProducer.java @@ -1,12 +1,13 @@ package io.littlehorse.common.util; -import io.littlehorse.common.LHServerConfig; import io.littlehorse.common.model.AbstractCommand; import java.io.Closeable; import java.util.List; +import java.util.Properties; import java.util.concurrent.Future; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.header.Header; @@ -14,18 +15,18 @@ public class LHProducer implements Closeable { - private KafkaProducer prod; + private final Producer prod; - public LHProducer(LHServerConfig config) { - prod = new KafkaProducer<>(config.getKafkaProducerConfig(config.getLHInstanceName())); + public LHProducer(Properties configs) { + prod = new KafkaProducer<>(configs); } - public Future send(String key, AbstractCommand t, String topic, Callback cb, Header... headers) { - return sendRecord(new ProducerRecord<>(topic, null, key, new Bytes(t.toBytes()), List.of(headers)), cb); + public LHProducer(Producer prod) { + this.prod = prod; } - public Future send(String key, AbstractCommand t, String topic) { - return this.send(key, t, topic, null); + public Future send(String key, AbstractCommand t, String topic, Callback cb, Header... headers) { + return sendRecord(new ProducerRecord<>(topic, null, key, new Bytes(t.toBytes()), List.of(headers)), cb); } public Future sendRecord(ProducerRecord record, Callback cb) { diff --git a/server/src/main/java/io/littlehorse/common/util/RocksConfigSetter.java b/server/src/main/java/io/littlehorse/common/util/RocksConfigSetter.java index 471bbba16..46f5d5e59 100644 --- a/server/src/main/java/io/littlehorse/common/util/RocksConfigSetter.java +++ b/server/src/main/java/io/littlehorse/common/util/RocksConfigSetter.java @@ -1,13 +1,15 @@ package io.littlehorse.common.util; +import static org.rocksdb.RateLimiter.DEFAULT_FAIRNESS; +import static org.rocksdb.RateLimiter.DEFAULT_MODE; +import static org.rocksdb.RateLimiter.DEFAULT_REFILL_PERIOD_MICROS; + import io.littlehorse.common.LHServerConfig; import java.util.Map; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.state.RocksDBConfigSetter; import org.apache.kafka.streams.state.internals.BlockBasedTableConfigWithAccessibleCache; -import org.rocksdb.Cache; -import org.rocksdb.CompactionStyle; -import org.rocksdb.Options; +import org.rocksdb.*; @Slf4j public class RocksConfigSetter implements RocksDBConfigSetter { @@ -62,7 +64,7 @@ public void setConfig(final String storeName, final Options options, final Map listeners; + private final LHProducer commandProducer; private RequestExecutionContext requestContext() { return contextKey.get(); @@ -76,7 +64,12 @@ private RequestExecutionContext requestContext() { public LHServer(LHServerConfig config) throws LHMisconfigurationException { this.metadataCache = new MetadataCache(); this.config = config; - this.taskQueueManager = new TaskQueueManager(this, LHConstants.MAX_TASKRUNS_IN_ONE_TASKQUEUE); + LHProducer taskClaimProducer = new LHProducer(config.getKafkaTaskClaimProducer()); + final TaskQueueCommandProducer taskQueueProducer = + new TaskQueueCommandProducer(taskClaimProducer, config.getCoreCmdTopicName()); + this.taskQueueManager = + new TaskQueueManager(this, LHConstants.MAX_TASKRUNS_IN_ONE_TASKQUEUE, taskQueueProducer); + this.commandProducer = new LHProducer(config.getKafkaCommandProducerConfig(config.getLHInstanceName())); // Kafka Streams Setup if (config.getLHInstanceId().isPresent()) { @@ -106,11 +99,13 @@ public LHServer(LHServerConfig config) throws LHMisconfigurationException { this.healthService = new HealthService(config, coreStreams, timerStreams, taskQueueManager, metadataCache, internalComms); - this.listeners = - config.getListeners().stream().map(this::createListener).toList(); + this.listeners = config.getListeners().stream() + .map(serverListenerConfig -> createListener(serverListenerConfig, taskQueueProducer)) + .toList(); } - private LHServerListener createListener(ServerListenerConfig listenerConfig) { + private LHServerListener createListener( + ServerListenerConfig listenerConfig, TaskQueueCommandProducer taskQueueProducer) { return new LHServerListener( listenerConfig, taskQueueManager, @@ -122,35 +117,15 @@ private LHServerListener createListener(ServerListenerConfig listenerConfig) { new MetricCollectingServerInterceptor(healthService.getMeterRegistry()), new RequestAuthorizer(contextKey, metadataCache, coreStoreProvider, config), listenerConfig.getRequestAuthenticator()), - contextKey); + contextKey, + commandProducer, + taskQueueProducer); } public String getInstanceName() { return config.getLHInstanceName(); } - /* - * Sends a command to Kafka and simultaneously does a waitForProcessing() internal - * grpc call that asynchronously waits for the command to be processed. It - * infers the request context from the GRPC Context. - */ - public void returnTaskToClient(ScheduledTaskModel scheduledTask, PollTaskRequestObserver client) { - TaskClaimEvent claimEvent = new TaskClaimEvent(scheduledTask, client); - - processCommand( - new CommandModel(claimEvent), - client.getResponseObserver(), - PollTaskResponse.class, - false, - client.getPrincipalId(), - client.getTenantId(), - client.getRequestContext()); - } - - public LHProducer getProducer() { - return internalComms.getProducer(); - } - public void onResponseReceived(String commandId, WaitForCommandResponse response) { internalComms.onResponseReceived(commandId, response); } @@ -159,79 +134,6 @@ public void sendErrorToClient(String commandId, Throwable caught) { internalComms.sendErrorToClientForCommand(commandId, caught); } - /* - * Sends a command to Kafka and simultaneously does a waitForProcessing() internal - * grpc call that asynchronously waits for the command to be processed. - * - * Explicit request context. Useful for callers who do not have access to the GRPC - * context, for example the `returnTaskToClient()` method. That method is called - * from within the CommandProcessor#process() method. - * - * REFACTOR_SUGGESTION: We should create a CommandSender.java class which is responsible - * for sending commands to Kafka and waiting for the execution. That class should - * not depend on RequestExecutionContext but rather the AuthorizationContext. The - * `TaskClaimEvent#reportTaskToClient()` flow should not go through KafkaStreamsServerImpl - * anymore. - */ - private void processCommand( - AbstractCommand command, - StreamObserver responseObserver, - Class responseCls, - boolean shouldCompleteStream) { - RequestExecutionContext requestContext = requestContext(); - processCommand( - command, - responseObserver, - responseCls, - shouldCompleteStream, - requestContext.authorization().principalId(), - requestContext.authorization().tenantId(), - requestContext); - } - - /* - * This method is called from within the `CommandProcessor#process()` method (specifically, on the - * TaskClaimEvent#process()) method. Therefore, we cannot infer the RequestExecutionContext like - * we do in the other places, because the GRPC context does not exist in this case. - * Note that this is not a GRPC method that @Override's a super method and takes in - * a protobuf + StreamObserver. - */ - private void processCommand( - AbstractCommand command, - StreamObserver responseObserver, - Class responseCls, - boolean shouldCompleteStream, - PrincipalIdModel principalId, - TenantIdModel tenantId, - RequestExecutionContext context) { - StreamObserver commandObserver = new POSTStreamObserver<>( - responseObserver, - responseCls, - shouldCompleteStream, - internalComms, - command, - context, - // Streams Session Timeout is how long it takes to notice that the server is down. - // Then we need the rebalance to occur, and the new server must process the command. - // So we give it a buffer of 10 additional seconds. - Duration.ofMillis(10_000 + config.getStreamsSessionTimeout()), - networkThreadpool); - - Callback callback = internalComms.createProducerCommandCallback(command, commandObserver, context); - - command.setCommandId(LHUtil.generateGuid()); - - Headers commandMetadata = HeadersUtil.metadataHeadersFor(tenantId, principalId); - internalComms - .getProducer() - .send( - command.getPartitionKey(), - command, - command.getTopic(config), - callback, - commandMetadata.toArray()); - } - private WfService getServiceFromContext() { return requestContext().service(); } @@ -279,6 +181,13 @@ public void start() throws IOException { public void close() { CountDownLatch latch = new CountDownLatch(4 + listeners.size()); + for (LHServerListener listener : listeners) { + log.info("Closing listener {}", listener); + listener.close(); + latch.countDown(); + } + commandProducer.close(); + taskQueueManager.close(); new Thread(() -> { log.info("Closing timer Kafka Streams"); @@ -310,15 +219,6 @@ public void close() { }) .start(); - for (LHServerListener listener : listeners) { - new Thread(() -> { - log.info("Closing listener {}", listener); - listener.close(); - latch.countDown(); - }) - .start(); - } - try { latch.await(); log.info("Done shutting down all LHServer threads"); diff --git a/server/src/main/java/io/littlehorse/server/LHServerListener.java b/server/src/main/java/io/littlehorse/server/LHServerListener.java index c344c4400..b5e4491b3 100644 --- a/server/src/main/java/io/littlehorse/server/LHServerListener.java +++ b/server/src/main/java/io/littlehorse/server/LHServerListener.java @@ -17,7 +17,6 @@ import io.littlehorse.common.LHServerConfig; import io.littlehorse.common.exceptions.LHApiException; import io.littlehorse.common.model.AbstractCommand; -import io.littlehorse.common.model.ScheduledTaskModel; import io.littlehorse.common.model.corecommand.CommandModel; import io.littlehorse.common.model.corecommand.subcommand.AssignUserTaskRunRequestModel; import io.littlehorse.common.model.corecommand.subcommand.CancelUserTaskRunRequestModel; @@ -32,13 +31,11 @@ import io.littlehorse.common.model.corecommand.subcommand.SaveUserTaskRunProgressRequestModel; import io.littlehorse.common.model.corecommand.subcommand.ScheduleWfRequestModel; import io.littlehorse.common.model.corecommand.subcommand.StopWfRunRequestModel; -import io.littlehorse.common.model.corecommand.subcommand.TaskClaimEvent; import io.littlehorse.common.model.corecommand.subcommand.TaskWorkerHeartBeatRequestModel; import io.littlehorse.common.model.getable.core.events.WorkflowEventModel; import io.littlehorse.common.model.getable.core.externalevent.ExternalEventModel; import io.littlehorse.common.model.getable.core.noderun.NodeRunModel; import io.littlehorse.common.model.getable.core.taskrun.TaskRunModel; -import io.littlehorse.common.model.getable.core.taskworkergroup.HostModel; import io.littlehorse.common.model.getable.core.taskworkergroup.TaskWorkerGroupModel; import io.littlehorse.common.model.getable.core.usertaskrun.UserTaskRunModel; import io.littlehorse.common.model.getable.core.variable.VariableModel; @@ -106,7 +103,6 @@ import io.littlehorse.sdk.common.proto.ExternalEventList; import io.littlehorse.sdk.common.proto.GetLatestUserTaskDefRequest; import io.littlehorse.sdk.common.proto.GetLatestWfSpecRequest; -import io.littlehorse.sdk.common.proto.LHHostInfo; import io.littlehorse.sdk.common.proto.ListExternalEventsRequest; import io.littlehorse.sdk.common.proto.ListNodeRunsRequest; import io.littlehorse.sdk.common.proto.ListTaskMetricsRequest; @@ -200,7 +196,6 @@ import io.littlehorse.sdk.common.proto.WorkflowEventId; import io.littlehorse.sdk.common.proto.WorkflowEventIdList; import io.littlehorse.sdk.common.proto.WorkflowEventList; -import io.littlehorse.server.auth.internalport.InternalCallCredentials; import io.littlehorse.server.listener.ServerListenerConfig; import io.littlehorse.server.streams.BackendInternalComms; import io.littlehorse.server.streams.lhinternalscan.PublicScanReply; @@ -253,6 +248,7 @@ import io.littlehorse.server.streams.lhinternalscan.publicsearchreplies.SearchWorkflowEventReply; import io.littlehorse.server.streams.taskqueue.ClusterHealthRequestObserver; import io.littlehorse.server.streams.taskqueue.PollTaskRequestObserver; +import io.littlehorse.server.streams.taskqueue.TaskQueueCommandProducer; import io.littlehorse.server.streams.taskqueue.TaskQueueManager; import io.littlehorse.server.streams.topology.core.CoreStoreProvider; import io.littlehorse.server.streams.topology.core.RequestExecutionContext; @@ -264,16 +260,13 @@ import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.time.Duration; -import java.util.Date; import java.util.List; -import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.common.header.Headers; import org.apache.kafka.streams.errors.InvalidStateStoreException; -import org.apache.kafka.streams.processor.TaskId; /** * This class provides the implementation for public RPCs. @@ -291,6 +284,8 @@ public class LHServerListener extends LittleHorseImplBase implements Closeable { private final CoreStoreProvider coreStoreProvider; private final ScheduledExecutorService networkThreadpool; private final String listenerName; + private final LHProducer commandProducer; + private final TaskQueueCommandProducer taskQueueProducer; private Server grpcListener; @@ -306,11 +301,11 @@ public LHServerListener( CoreStoreProvider coreStoreProvider, MetadataCache metadataCache, List interceptors, - Context.Key contextKey) { - + Context.Key contextKey, + LHProducer commandProducer, + TaskQueueCommandProducer taskQueueProducer) { // All dependencies are passed in as arguments; nothing is instantiated here, // because all listeners share the same threading infrastructure. - this.metadataCache = metadataCache; this.serverConfig = listenerConfig.getConfig(); this.taskQueueManager = taskQueueManager; @@ -319,9 +314,9 @@ public LHServerListener( this.internalComms = internalComms; this.listenerName = listenerConfig.getName(); this.contextKey = contextKey; - + this.commandProducer = commandProducer; this.grpcListener = null; - + this.taskQueueProducer = taskQueueProducer; ServerBuilder builder = Grpc.newServerBuilderForPort( listenerConfig.getPort(), listenerConfig.getCredentials()) .permitKeepAliveTime(15, TimeUnit.SECONDS) @@ -646,37 +641,8 @@ public void registerTaskWorker( @Override @Authorize(resources = ACLResource.ACL_TASK, actions = ACLAction.WRITE_METADATA) public void reportTask(ReportTaskRun req, StreamObserver ctx) { - // There is no need to wait for the ReportTaskRun to actually be processed, because - // we would just return a google.protobuf.Empty anyways. All we need to do is wait for - // the Command to be persisted into Kafka. ReportTaskRunModel reqModel = LHSerializable.fromProto(req, ReportTaskRunModel.class, requestContext()); - - TenantIdModel tenantId = requestContext().authorization().tenantId(); - PrincipalIdModel principalId = requestContext().authorization().principalId(); - Headers commandMetadata = HeadersUtil.metadataHeadersFor(tenantId, principalId); - - CommandModel command = new CommandModel(reqModel, new Date()); - - Callback kafkaProducerCallback = (meta, exn) -> { - try { - if (exn == null) { - ctx.onNext(Empty.getDefaultInstance()); - ctx.onCompleted(); - } else { - ctx.onError(new LHApiException(Status.UNAVAILABLE, "Failed recording command to Kafka")); - } - } catch (IllegalStateException e) { - log.debug("Call already closed"); - } - }; - - LHProducer producer = internalComms.getProducer(); - producer.send( - command.getPartitionKey(), - command, - command.getTopic(serverConfig), - kafkaProducerCallback, - commandMetadata.toArray()); + taskQueueProducer.send(reqModel, requestContext().authorization(), ctx); } @Override @@ -1086,36 +1052,6 @@ public void getServerVersion(Empty request, StreamObserver ctx) { ctx.onCompleted(); } - /* - * Sends a command to Kafka and simultaneously does a waitForProcessing() internal - * grpc call that asynchronously waits for the command to be processed. It - * infers the request context from the GRPC Context. - */ - public void returnTaskToClient(ScheduledTaskModel scheduledTask, PollTaskRequestObserver client) { - TaskClaimEvent claimEvent = new TaskClaimEvent(scheduledTask, client); - - processCommand( - new CommandModel(claimEvent), - client.getResponseObserver(), - PollTaskResponse.class, - false, - client.getPrincipalId(), - client.getTenantId(), - client.getRequestContext()); - } - - public LHProducer getProducer() { - return internalComms.getProducer(); - } - - public void onResponseReceived(String commandId, WaitForCommandResponse response) { - internalComms.onResponseReceived(commandId, response); - } - - public void sendErrorToClient(String commandId, Exception caught) { - internalComms.sendErrorToClientForCommand(commandId, caught); - } - /* * Sends a command to Kafka and simultaneously does a waitForProcessing() internal * grpc call that asynchronously waits for the command to be processed. @@ -1184,39 +1120,15 @@ private void processCommand( command.setCommandId(LHUtil.generateGuid()); Headers commandMetadata = HeadersUtil.metadataHeadersFor(tenantId, principalId); - internalComms - .getProducer() - .send( - command.getPartitionKey(), - command, - command.getTopic(serverConfig), - callback, - commandMetadata.toArray()); + commandProducer.send( + command.getPartitionKey(), + command, + command.getTopic(serverConfig), + callback, + commandMetadata.toArray()); } private WfService getServiceFromContext() { return requestContext().service(); } - - public void onTaskScheduled( - TaskId streamsTaskId, TaskDefIdModel taskDef, ScheduledTaskModel scheduledTask, TenantIdModel tenantId) { - taskQueueManager.onTaskScheduled(streamsTaskId, taskDef, scheduledTask, tenantId); - } - - public void drainPartitionTaskQueue(TaskId streamsTaskId) { - taskQueueManager.drainPartition(streamsTaskId); - } - - public Set getAllInternalHosts() { - return internalComms.getAllInternalHosts(); - } - - public LHHostInfo getAdvertisedHost( - HostModel host, String listenerName, InternalCallCredentials internalCredentials) { - return internalComms.getAdvertisedHost(host, listenerName, internalCredentials); - } - - public void onEventThrown(WorkflowEventModel event) { - internalComms.onWorkflowEventThrown(event); - } } diff --git a/server/src/main/java/io/littlehorse/server/monitoring/HealthService.java b/server/src/main/java/io/littlehorse/server/monitoring/HealthService.java index f026c9e60..189a2c130 100644 --- a/server/src/main/java/io/littlehorse/server/monitoring/HealthService.java +++ b/server/src/main/java/io/littlehorse/server/monitoring/HealthService.java @@ -24,6 +24,7 @@ import org.apache.kafka.streams.processor.StandbyUpdateListener; import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.TaskId; +import org.eclipse.jetty.util.thread.ExecutorThreadPool; @Slf4j public class HealthService implements Closeable, StateRestoreListener, StandbyUpdateListener { @@ -40,6 +41,7 @@ public class HealthService implements Closeable, StateRestoreListener, StandbyUp private KafkaStreams coreStreams; private KafkaStreams timerStreams; + private final ExecutorThreadPool jettyThreadPool = new ExecutorThreadPool(); public HealthService( LHServerConfig config, @@ -48,9 +50,10 @@ public HealthService( TaskQueueManager taskQueueManager, MetadataCache metadataCache, BackendInternalComms internalComms) { - this.prom = new PrometheusMetricExporter(config); + this.prom = new PrometheusMetricExporter(config, internalComms.asyncWaiters()); this.numberOfPartitionPerTopic = config.partitionsByTopic(); - + this.jettyThreadPool.setName("javalin-service"); + this.jettyThreadPool.setMaxThreads(10); this.coreState = new InstanceState(coreStreams, internalComms); this.prom.bind( coreStreams, @@ -59,7 +62,7 @@ public HealthService( metadataCache, new StandbyMetrics(standbyStores, config.getLHInstanceName()), coreState); - this.server = Javalin.create(); + this.server = Javalin.create(javalinConfig -> javalinConfig.jetty.threadPool = jettyThreadPool); this.coreStreams = coreStreams; this.timerStreams = timerStreams; diff --git a/server/src/main/java/io/littlehorse/server/monitoring/metrics/AsyncWaiterMetrics.java b/server/src/main/java/io/littlehorse/server/monitoring/metrics/AsyncWaiterMetrics.java new file mode 100644 index 000000000..3fdd61e68 --- /dev/null +++ b/server/src/main/java/io/littlehorse/server/monitoring/metrics/AsyncWaiterMetrics.java @@ -0,0 +1,21 @@ +package io.littlehorse.server.monitoring.metrics; + +import io.littlehorse.server.streams.util.AsyncWaiters; +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.binder.MeterBinder; + +public class AsyncWaiterMetrics implements MeterBinder { + + private static final String METRIC_NAME = "async_waiter_size"; + private final AsyncWaiters asyncWaiters; + + public AsyncWaiterMetrics(AsyncWaiters asyncWaiters) { + this.asyncWaiters = asyncWaiters; + } + + @Override + public void bindTo(MeterRegistry meterRegistry) { + Gauge.builder(METRIC_NAME, asyncWaiters, AsyncWaiters::size).register(meterRegistry); + } +} diff --git a/server/src/main/java/io/littlehorse/server/monitoring/metrics/PrometheusMetricExporter.java b/server/src/main/java/io/littlehorse/server/monitoring/metrics/PrometheusMetricExporter.java index bf4c866fb..c5e5627f9 100644 --- a/server/src/main/java/io/littlehorse/server/monitoring/metrics/PrometheusMetricExporter.java +++ b/server/src/main/java/io/littlehorse/server/monitoring/metrics/PrometheusMetricExporter.java @@ -4,6 +4,7 @@ import io.littlehorse.common.LHServerConfig; import io.littlehorse.server.monitoring.StandbyMetrics; import io.littlehorse.server.streams.taskqueue.TaskQueueManager; +import io.littlehorse.server.streams.util.AsyncWaiters; import io.littlehorse.server.streams.util.MetadataCache; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tags; @@ -27,11 +28,13 @@ public class PrometheusMetricExporter implements Closeable { private PrometheusMeterRegistry prometheusRegistry; private LHServerConfig config; private TaskQueueManagerMetrics taskQueueManagerMetrics; + private final AsyncWaiters asyncWaiters; - public PrometheusMetricExporter(LHServerConfig config) { + public PrometheusMetricExporter(LHServerConfig config, AsyncWaiters asyncWaiters) { this.config = config; this.prometheusRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT); this.prometheusRegistry.config().commonTags("application_id", config.getLHClusterId()); + this.asyncWaiters = asyncWaiters; new ServerMetricFilter(prometheusRegistry, ServerFilterRules.fromLevel(config.getServerMetricLevel())) .initialize(); } @@ -65,6 +68,9 @@ public void bind( taskQueueManagerMetrics = new TaskQueueManagerMetrics(taskQueueManager); taskQueueManagerMetrics.bindTo(prometheusRegistry); + AsyncWaiterMetrics asyncWaiterMetrics = new AsyncWaiterMetrics(asyncWaiters); + asyncWaiterMetrics.bindTo(prometheusRegistry); + JvmMemoryMetrics jvmMeter = new JvmMemoryMetrics(); jvmMeter.bindTo(prometheusRegistry); diff --git a/server/src/main/java/io/littlehorse/server/monitoring/metrics/TaskQueueManagerMetrics.java b/server/src/main/java/io/littlehorse/server/monitoring/metrics/TaskQueueManagerMetrics.java index 7388de0f6..4775bb48b 100644 --- a/server/src/main/java/io/littlehorse/server/monitoring/metrics/TaskQueueManagerMetrics.java +++ b/server/src/main/java/io/littlehorse/server/monitoring/metrics/TaskQueueManagerMetrics.java @@ -1,6 +1,6 @@ package io.littlehorse.server.monitoring.metrics; -import io.littlehorse.server.streams.taskqueue.OneTaskQueue; +import io.littlehorse.server.streams.taskqueue.TaskQueue; import io.littlehorse.server.streams.taskqueue.TaskQueueManager; import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.MeterRegistry; @@ -17,6 +17,7 @@ public class TaskQueueManagerMetrics implements MeterBinder, Closeable { public static final String METRIC_NAME = "lh_in_memory_task_queue_size"; public static final String TENANT_ID_TAG = "tenant_id"; public static final String TASK_NAME_TAG = "task_name"; + public static final String REHYDRATION_COUNT_METRIC_NAME = "taskqueue_rehydration_count"; private final TaskQueueManager taskQueueManager; private final ScheduledExecutorService mainExecutor; @@ -34,20 +35,21 @@ private void updateMetrics(MeterRegistry registry) { taskQueueManager.all().stream() .filter(queue -> !wasRegistered(registry, queue)) .forEach(queue -> { - log.trace("Adding new metric for queue {}", queue.getTaskDefName()); - Gauge.builder(METRIC_NAME, queue, OneTaskQueue::size) - .tag(TENANT_ID_TAG, queue.getTenantId().getId()) - .tag(TASK_NAME_TAG, queue.getTaskDefName()) + log.trace("Adding new metric for queue {}", queue.taskDefName()); + Gauge.builder(METRIC_NAME, queue, TaskQueue::size) + .tag(TENANT_ID_TAG, queue.tenantId().getId()) + .tag(TASK_NAME_TAG, queue.taskDefName()) .register(registry); }); + Gauge.builder(REHYDRATION_COUNT_METRIC_NAME, taskQueueManager, TaskQueueManager::rehydrationCount) + .register(registry); } - private boolean wasRegistered(MeterRegistry registry, OneTaskQueue queue) { + private boolean wasRegistered(MeterRegistry registry, TaskQueue queue) { return registry.getMeters().stream() .filter(meter -> meter.getId().getName().equals(METRIC_NAME)) - .filter(meter -> - queue.getTenantId().getId().equals(meter.getId().getTag(TENANT_ID_TAG))) - .anyMatch(meter -> queue.getTaskDefName().equals(meter.getId().getTag(TASK_NAME_TAG))); + .filter(meter -> queue.tenantId().getId().equals(meter.getId().getTag(TENANT_ID_TAG))) + .anyMatch(meter -> queue.taskDefName().equals(meter.getId().getTag(TASK_NAME_TAG))); } @Override diff --git a/server/src/main/java/io/littlehorse/server/streams/BackendInternalComms.java b/server/src/main/java/io/littlehorse/server/streams/BackendInternalComms.java index 402d58989..dd6de9ec8 100644 --- a/server/src/main/java/io/littlehorse/server/streams/BackendInternalComms.java +++ b/server/src/main/java/io/littlehorse/server/streams/BackendInternalComms.java @@ -45,7 +45,6 @@ import io.littlehorse.common.proto.ScanResultTypePb; import io.littlehorse.common.proto.WaitForCommandRequest; import io.littlehorse.common.proto.WaitForCommandResponse; -import io.littlehorse.common.util.LHProducer; import io.littlehorse.common.util.LHUtil; import io.littlehorse.sdk.common.exception.LHMisconfigurationException; import io.littlehorse.sdk.common.exception.LHSerdeError; @@ -116,12 +115,10 @@ public class BackendInternalComms implements Closeable { @Getter private HostInfo thisHost; - private LHProducer producer; - private ChannelCredentials clientCreds; private Map channels; - private AsyncWaiters asyncWaiters; + private final AsyncWaiters asyncWaiters; private ConcurrentHashMap otherHosts; private final Context.Key contextKey; @@ -164,7 +161,6 @@ public BackendInternalComms( .build(); thisHost = new HostInfo(config.getInternalAdvertisedHost(), config.getInternalAdvertisedPort()); - this.producer = config.getProducer(); this.asyncWaiters = new AsyncWaiters(networkThreadPool); } @@ -237,13 +233,18 @@ public > T getObject( } } + public AsyncWaiters asyncWaiters() { + return asyncWaiters; + } + public ProducerCommandCallback createProducerCommandCallback( AbstractCommand command, StreamObserver observer, RequestExecutionContext requestCtx) { Function internalStub = (meta) -> getInternalAsyncClient(meta.activeHost(), InternalCallCredentials.forContext(requestCtx)); - return new ProducerCommandCallback(observer, command, coreStreams, thisHost, internalStub, asyncWaiters); + return new ProducerCommandCallback( + observer, command, coreStreams, thisHost, internalStub, asyncWaiters, networkThreadPool); } public void waitForCommand( @@ -355,10 +356,6 @@ private InternalGetAdvertisedHostsResponse getPublicListenersForHost( return info; } - public LHProducer getProducer() { - return producer; - } - public void onWorkflowEventThrown(WorkflowEventModel event) { asyncWaiters.registerWorkflowEventHappened(event); } diff --git a/server/src/main/java/io/littlehorse/server/streams/ProducerCommandCallback.java b/server/src/main/java/io/littlehorse/server/streams/ProducerCommandCallback.java index 37d7048fc..8b742b24f 100644 --- a/server/src/main/java/io/littlehorse/server/streams/ProducerCommandCallback.java +++ b/server/src/main/java/io/littlehorse/server/streams/ProducerCommandCallback.java @@ -4,11 +4,14 @@ import io.grpc.stub.StreamObserver; import io.littlehorse.common.exceptions.LHApiException; import io.littlehorse.common.model.AbstractCommand; +import io.littlehorse.common.model.corecommand.CommandModel; import io.littlehorse.common.proto.LHInternalsGrpc; import io.littlehorse.common.proto.WaitForCommandRequest; import io.littlehorse.common.proto.WaitForCommandResponse; import io.littlehorse.server.streams.util.AsyncWaiters; +import java.util.concurrent.Executor; import java.util.function.Function; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.Serdes; @@ -16,6 +19,7 @@ import org.apache.kafka.streams.KeyQueryMetadata; import org.apache.kafka.streams.state.HostInfo; +@Slf4j public class ProducerCommandCallback implements Callback { private final StreamObserver observer; private final AbstractCommand command; @@ -23,6 +27,7 @@ public class ProducerCommandCallback implements Callback { private final HostInfo thisHost; private final Function internalStub; private final AsyncWaiters asyncWaiters; + private final Executor completionHandlerPool; public ProducerCommandCallback( StreamObserver observer, @@ -30,26 +35,38 @@ public ProducerCommandCallback( KafkaStreams coreStreams, HostInfo thisHost, Function internalStub, - AsyncWaiters asyncWaiters) { + AsyncWaiters asyncWaiters, + Executor completionHandlerPool) { this.observer = observer; this.command = command; this.coreStreams = coreStreams; this.thisHost = thisHost; this.internalStub = internalStub; this.asyncWaiters = asyncWaiters; + this.completionHandlerPool = completionHandlerPool; } @Override public void onCompletion(RecordMetadata metadata, Exception exception) { - try { - if (exception != null) { - observer.onError(new LHApiException(Status.UNAVAILABLE, "Failed recording command to Kafka")); - } else { - waitForCommand(command, observer); + completionHandlerPool.execute(() -> { + if (command instanceof CommandModel cmd) { + long commandTime = cmd.getTime().getTime(); + long currentTime = System.currentTimeMillis(); + long latency = currentTime - commandTime; + if (latency > 10) { + log.debug("Latency %s ms".formatted(latency)); + } } - } catch (LHApiException ex) { - observer.onError(ex); - } + try { + if (exception != null) { + observer.onError(new LHApiException(Status.UNAVAILABLE, "Failed recording command to Kafka")); + } else { + waitForCommand(command, observer); + } + } catch (LHApiException ex) { + observer.onError(ex); + } + }); } private void waitForCommand(AbstractCommand command, StreamObserver observer) { diff --git a/server/src/main/java/io/littlehorse/server/streams/ServerTopology.java b/server/src/main/java/io/littlehorse/server/streams/ServerTopology.java index 3b1bc3219..4943df010 100644 --- a/server/src/main/java/io/littlehorse/server/streams/ServerTopology.java +++ b/server/src/main/java/io/littlehorse/server/streams/ServerTopology.java @@ -2,10 +2,8 @@ import io.littlehorse.common.LHServerConfig; import io.littlehorse.common.model.LHTimer; -import io.littlehorse.common.model.repartitioncommand.RepartitionCommand; import io.littlehorse.common.proto.Command; import io.littlehorse.common.proto.MetadataCommand; -import io.littlehorse.common.util.serde.LHDeserializer; import io.littlehorse.common.util.serde.LHSerde; import io.littlehorse.common.util.serde.ProtobufDeserializer; import io.littlehorse.server.LHServer; @@ -14,7 +12,6 @@ import io.littlehorse.server.streams.topology.core.processors.CommandProcessor; import io.littlehorse.server.streams.topology.core.processors.MetadataGlobalStoreProcessor; import io.littlehorse.server.streams.topology.core.processors.MetadataProcessor; -import io.littlehorse.server.streams.topology.core.processors.RepartitionCommandProcessor; import io.littlehorse.server.streams.topology.timer.TimerProcessor; import io.littlehorse.server.streams.util.MetadataCache; import org.apache.kafka.common.serialization.Serde; @@ -149,18 +146,18 @@ public static Topology initCoreTopology( topo.addStateStore(coreStoreBuilder, CORE_PROCESSOR); // Repartition sub-topology - topo.addSource( - CORE_REPARTITION_SOURCE, - Serdes.String().deserializer(), - new LHDeserializer<>(RepartitionCommand.class), - config.getRepartitionTopicName()); - topo.addProcessor( - CORE_REPARTITION_PROCESSOR, - () -> new RepartitionCommandProcessor(config, metadataCache), - CORE_REPARTITION_SOURCE); - StoreBuilder> repartitionedStoreBuilder = Stores.keyValueStoreBuilder( - Stores.persistentKeyValueStore(CORE_REPARTITION_STORE), Serdes.String(), Serdes.Bytes()); - topo.addStateStore(repartitionedStoreBuilder, CORE_REPARTITION_PROCESSOR); + // topo.addSource( + // CORE_REPARTITION_SOURCE, + // Serdes.String().deserializer(), + // new LHDeserializer<>(RepartitionCommand.class), + // config.getRepartitionTopicName()); + // topo.addProcessor( + // CORE_REPARTITION_PROCESSOR, + // () -> new RepartitionCommandProcessor(config, metadataCache), + // CORE_REPARTITION_SOURCE); + // StoreBuilder> repartitionedStoreBuilder = Stores.keyValueStoreBuilder( + // Stores.persistentKeyValueStore(CORE_REPARTITION_STORE), Serdes.String(), Serdes.Bytes()); + // topo.addStateStore(repartitionedStoreBuilder, CORE_REPARTITION_PROCESSOR); // Metadata Global Store StoreBuilder> globalStoreBuilder = Stores.keyValueStoreBuilder( diff --git a/server/src/main/java/io/littlehorse/server/streams/TaskClaimEventProducerCallback.java b/server/src/main/java/io/littlehorse/server/streams/TaskClaimEventProducerCallback.java new file mode 100644 index 000000000..a30a40158 --- /dev/null +++ b/server/src/main/java/io/littlehorse/server/streams/TaskClaimEventProducerCallback.java @@ -0,0 +1,30 @@ +package io.littlehorse.server.streams; + +import io.littlehorse.common.model.ScheduledTaskModel; +import io.littlehorse.server.streams.taskqueue.PollTaskRequestObserver; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.RecordMetadata; + +@Slf4j +public class TaskClaimEventProducerCallback implements Callback { + + private final ScheduledTaskModel scheduledTask; + private final PollTaskRequestObserver client; + + public TaskClaimEventProducerCallback( + final ScheduledTaskModel scheduledTask, final PollTaskRequestObserver client) { + this.scheduledTask = scheduledTask; + this.client = client; + } + + @Override + public void onCompletion(final RecordMetadata metadata, final Exception exception) { + if (exception == null) { + client.sendResponse(scheduledTask); + } else { + client.onError(exception); + log.error("error", exception); + } + } +} diff --git a/server/src/main/java/io/littlehorse/server/streams/taskqueue/OneTaskQueue.java b/server/src/main/java/io/littlehorse/server/streams/taskqueue/OneTaskQueue.java index 825474dd1..af216c37d 100644 --- a/server/src/main/java/io/littlehorse/server/streams/taskqueue/OneTaskQueue.java +++ b/server/src/main/java/io/littlehorse/server/streams/taskqueue/OneTaskQueue.java @@ -18,6 +18,8 @@ import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import lombok.Getter; @@ -27,21 +29,22 @@ // One instance of this class is responsible for coordinating the grpc backend for // one specific TaskDef on one LH Server host. @Slf4j -public class OneTaskQueue { +public class OneTaskQueue implements TaskQueue { - private Queue hungryClients; - private Lock lock; + private final Queue hungryClients; + private final Lock lock; - private LinkedBlockingQueue pendingTasks; - private TaskQueueManager parent; + private final LinkedBlockingQueue pendingTasks; + private final TaskQueueManager parent; @Getter private String taskDefName; - @Getter - private TenantIdModel tenantId; + private final TenantIdModel tenantId; - private String instanceName; + private final String instanceName; + private final AtomicBoolean needsRehydration = new AtomicBoolean(false); + private final AtomicLong rehydrationCount = new AtomicLong(0); private final Map taskTrack = new ConcurrentHashMap<>(); @@ -63,7 +66,7 @@ public OneTaskQueue(String taskDefName, TaskQueueManager parent, int capacity, T * a clean * shutdown (onCompleted()) or connection error (onError()). * - * @param observer is the TaskQueueStreamObserver for the client whose + * @param disconnectedObserver is the TaskQueueStreamObserver for the client whose * connection is now gone. */ public void onRequestDisconnected(PollTaskRequestObserver disconnectedObserver) { @@ -75,7 +78,7 @@ public void onRequestDisconnected(PollTaskRequestObserver disconnectedObserver) hungryClients.removeIf(thing -> { log.trace( "Instance {}: Removing task queue observer for taskdef {} with" + " client id {}: {}", - parent.getBackend().getInstanceName(), + instanceName, taskDefName, disconnectedObserver.getClientId(), disconnectedObserver); @@ -117,6 +120,9 @@ public boolean onTaskScheduled(TaskId streamsTaskId, ScheduledTaskModel schedule instanceName, LHLibUtil.getWfRunId(scheduledTask.getSource().toProto()), hungryClients.isEmpty()); + if (needsRehydration.get()) { + return false; + } PollTaskRequestObserver luckyClient = null; try { @@ -136,7 +142,11 @@ public boolean onTaskScheduled(TaskId streamsTaskId, ScheduledTaskModel schedule || trackedPartition.hasMoreDataOnDisk(); taskTrack.put( streamsTaskId, - new TrackedPartition(hasMoreTasksOnDisk, trackedPartition.lastRehydratedTask(), scheduledTask)); + new TrackedPartition( + hasMoreTasksOnDisk, + trackedPartition.lastRehydratedTask(), + trackedPartition.lastReturnedTask())); + needsRehydration.set(hasMoreTasksOnDisk); return !hasMoreTasksOnDisk; } } finally { @@ -145,7 +155,7 @@ public boolean onTaskScheduled(TaskId streamsTaskId, ScheduledTaskModel schedule // pull this outside of protected zone for performance. if (luckyClient != null) { - parent.itsAMatch(scheduledTask, luckyClient); + itsAMatch(scheduledTask, luckyClient); return true; } return hungryClients.isEmpty(); @@ -158,6 +168,7 @@ public boolean onTaskScheduled(TaskId streamsTaskId, ScheduledTaskModel schedule * that talks to the * client who made the PollTaskRequest. */ + @Override public void onPollRequest(PollTaskRequestObserver requestObserver, RequestExecutionContext requestContext) { if (taskDefName == null) { @@ -181,7 +192,7 @@ public void onPollRequest(PollTaskRequestObserver requestObserver, RequestExecut lock.lock(); if (pendingTasks.isEmpty()) { for (Map.Entry taskHasMoreDataOnDisk : taskTrack.entrySet()) { - if (taskHasMoreDataOnDisk.getValue().hasMoreDataOnDisk()) { + if (taskHasMoreDataOnDisk.getValue().hasMoreDataOnDisk() || needsRehydration.get()) { rehydrateFromStore(requestContext.getableManager(taskHasMoreDataOnDisk.getKey())); } } @@ -198,8 +209,7 @@ public void onPollRequest(PollTaskRequestObserver requestObserver, RequestExecut poll.streamsTaskId(), new TrackedPartition(true, nextTask.getCreatedAt(), nextTask)); taskTrack.put( poll.streamsTaskId(), - new TrackedPartition( - trackedPartition.hasMoreDataOnDisk(), trackedPartition.lastRehydratedTask(), nextTask)); + new TrackedPartition(trackedPartition.hasMoreDataOnDisk(), nextTask.getCreatedAt(), nextTask)); } else { // case 2 hungryClients.add(requestObserver); @@ -209,10 +219,14 @@ public void onPollRequest(PollTaskRequestObserver requestObserver, RequestExecut } if (nextTask != null) { - parent.itsAMatch(nextTask, requestObserver); + itsAMatch(nextTask, requestObserver); } } + private void itsAMatch(ScheduledTaskModel scheduledTask, PollTaskRequestObserver luckyClient) { + parent.itsAMatch(scheduledTask, luckyClient); + } + public boolean hasMoreTasksOnDisk(TaskId streamsTaskId) { return taskTrack.get(streamsTaskId).hasMoreDataOnDisk(); } @@ -222,6 +236,7 @@ public boolean hasMoreTasksOnDisk(TaskId streamsTaskId) { */ private void rehydrateFromStore(ReadOnlyGetableManager readOnlyGetableManager) { log.debug("Rehydrating"); + rehydrationCount.incrementAndGet(); if (readOnlyGetableManager.getSpecificTask().isEmpty()) { throw new IllegalStateException("Only specific task rehydration is permitted."); } @@ -246,7 +261,7 @@ private void rehydrateFromStore(ReadOnlyGetableManager readOnlyGetableManager) { ScheduledTaskModel scheduledTask = readOnlyGetableManager.getScheduledTask(taskRunId); if (scheduledTask != null && notRehydratedYet(scheduledTask, lastRehydratedTask, scheduledTaskModel)) { if (!hungryClients.isEmpty()) { - parent.itsAMatch(scheduledTask, hungryClients.remove()); + itsAMatch(scheduledTask, hungryClients.remove()); } else { queueOutOfCapacity = !pendingTasks.offer(new QueueItem(taskId, scheduledTask)); if (!queueOutOfCapacity) { @@ -257,6 +272,7 @@ private void rehydrateFromStore(ReadOnlyGetableManager readOnlyGetableManager) { } hasMoreTasksOnDisk = queueOutOfCapacity; } + needsRehydration.set(hasMoreTasksOnDisk); taskTrack.put(taskId, new TrackedPartition(hasMoreTasksOnDisk, lastRehydratedTask, null)); } @@ -264,23 +280,40 @@ private record TrackedPartition( Boolean hasMoreDataOnDisk, Date lastRehydratedTask, ScheduledTaskModel lastReturnedTask) {} private boolean notRehydratedYet( - ScheduledTaskModel scheduledTask, Date lastRehydratedTask, ScheduledTaskModel lastReturnedTask) { + ScheduledTaskModel maybe, Date lastRehydratedTask, ScheduledTaskModel lastReturnedTask) { if (lastReturnedTask == null) { return true; } - return (lastRehydratedTask == null && !scheduledTask.getTaskRunId().equals(lastReturnedTask.getTaskRunId()) - || (!scheduledTask.getTaskRunId().equals(lastReturnedTask.getTaskRunId()) - && scheduledTask.getCreatedAt().compareTo(lastRehydratedTask) >= 0)); + return (lastRehydratedTask == null && !maybe.getTaskRunId().equals(lastReturnedTask.getTaskRunId()) + || (!maybe.getTaskRunId().equals(lastReturnedTask.getTaskRunId()) + && maybe.getCreatedAt().compareTo(lastRehydratedTask) >= 0)); } + @Override public void drainPartition(TaskId partitionToDrain) { taskTrack.remove(partitionToDrain); pendingTasks.removeIf(queueItem -> queueItem.streamsTaskId().equals(partitionToDrain)); } + @Override public int size() { return pendingTasks.size(); } + @Override + public TenantIdModel tenantId() { + return tenantId; + } + + @Override + public String taskDefName() { + return taskDefName; + } + + @Override + public long rehydratedCount() { + return rehydrationCount.get(); + } + private record QueueItem(TaskId streamsTaskId, ScheduledTaskModel scheduledTask) {} } diff --git a/server/src/main/java/io/littlehorse/server/streams/taskqueue/PollTaskRequestObserver.java b/server/src/main/java/io/littlehorse/server/streams/taskqueue/PollTaskRequestObserver.java index acb50c6ce..a4e6f5ba1 100644 --- a/server/src/main/java/io/littlehorse/server/streams/taskqueue/PollTaskRequestObserver.java +++ b/server/src/main/java/io/littlehorse/server/streams/taskqueue/PollTaskRequestObserver.java @@ -1,8 +1,10 @@ package io.littlehorse.server.streams.taskqueue; +import io.grpc.stub.ServerCallStreamObserver; import io.grpc.stub.StreamObserver; import io.littlehorse.common.LHSerializable; import io.littlehorse.common.LHServerConfig; +import io.littlehorse.common.model.ScheduledTaskModel; import io.littlehorse.common.model.getable.objectId.PrincipalIdModel; import io.littlehorse.common.model.getable.objectId.TaskDefIdModel; import io.littlehorse.common.model.getable.objectId.TenantIdModel; @@ -47,6 +49,9 @@ public PollTaskRequestObserver( LHServerConfig config, RequestExecutionContext requestContext) { this.responseObserver = responseObserver; + if (responseObserver instanceof ServerCallStreamObserver serverCall) { + serverCall.setOnCancelHandler(() -> {}); + } this.taskQueueManager = manager; this.principalId = principalId; this.tenantId = tenantId; @@ -102,6 +107,12 @@ public void onNext(PollTaskRequest req) { taskQueueManager.onPollRequest(this, tenantId, requestContext); } + public void sendResponse(ScheduledTaskModel toExecute) { + PollTaskResponse response = + PollTaskResponse.newBuilder().setResult(toExecute.toProto()).build(); + responseObserver.onNext(response); + } + @Override public void onCompleted() { taskQueueManager.onRequestDisconnected(this, tenantId); diff --git a/server/src/main/java/io/littlehorse/server/streams/taskqueue/TaskQueue.java b/server/src/main/java/io/littlehorse/server/streams/taskqueue/TaskQueue.java new file mode 100644 index 000000000..9fececba2 --- /dev/null +++ b/server/src/main/java/io/littlehorse/server/streams/taskqueue/TaskQueue.java @@ -0,0 +1,60 @@ +package io.littlehorse.server.streams.taskqueue; + +import io.littlehorse.common.model.ScheduledTaskModel; +import io.littlehorse.common.model.getable.objectId.TenantIdModel; +import io.littlehorse.server.streams.topology.core.RequestExecutionContext; +import org.apache.kafka.streams.processor.TaskId; + +public interface TaskQueue { + + /** + * Called when a gRPC client (and its StreamObserver) disconnect, whether due to + * a clean + * shutdown (onCompleted()) or connection error (onError()). + * + * @param disconnectedObserver is the TaskQueueStreamObserver for the client whose + * connection is now gone. + */ + void onRequestDisconnected(PollTaskRequestObserver disconnectedObserver); + + /** + * Called in two places: 1. In the CommandProcessorDaoImpl::scheduleTask() 2. In + * the + * CommandProcessor::init(). + * + *

+ * Item 1) is quite self-explanatory. + * + *

+ * For Item 2), remember that the Task Queue Manager system is only in-memory. + * Upon a restart + * or rebalance, we need to rebuild that state. During the init() call, we + * iterate through all + * currently scheduled but not started tasks in the state store. + * + * @param scheduledTask is the ::getObjectId() for the TaskScheduleRequest + * that was just + * scheduled. + * @return True if the task was successfully scheduled, or False if the queue is full. + */ + boolean onTaskScheduled(TaskId streamsTaskId, ScheduledTaskModel scheduledTask); + + /** + * Called when a grpc client sends a new PollTaskPb. + * + * @param requestObserver is the grpc StreamObserver representing the channel + * that talks to the + * client who made the PollTaskRequest. + */ + void onPollRequest(PollTaskRequestObserver requestObserver, RequestExecutionContext requestContext); + + int size(); + + long rehydratedCount(); + + void drainPartition(TaskId partitionToDrain); + + TenantIdModel tenantId(); + + String taskDefName(); +} diff --git a/server/src/main/java/io/littlehorse/server/streams/taskqueue/TaskQueueCommandProducer.java b/server/src/main/java/io/littlehorse/server/streams/taskqueue/TaskQueueCommandProducer.java new file mode 100644 index 000000000..b3d7b8847 --- /dev/null +++ b/server/src/main/java/io/littlehorse/server/streams/taskqueue/TaskQueueCommandProducer.java @@ -0,0 +1,99 @@ +package io.littlehorse.server.streams.taskqueue; + +import com.google.protobuf.Empty; +import io.grpc.Status; +import io.grpc.stub.StreamObserver; +import io.littlehorse.common.AuthorizationContext; +import io.littlehorse.common.exceptions.LHApiException; +import io.littlehorse.common.model.ScheduledTaskModel; +import io.littlehorse.common.model.corecommand.CommandModel; +import io.littlehorse.common.model.corecommand.subcommand.ReportTaskRunModel; +import io.littlehorse.common.model.corecommand.subcommand.TaskClaimEvent; +import io.littlehorse.common.model.getable.objectId.PrincipalIdModel; +import io.littlehorse.common.model.getable.objectId.TenantIdModel; +import io.littlehorse.common.util.LHProducer; +import io.littlehorse.common.util.LHUtil; +import io.littlehorse.server.streams.TaskClaimEventProducerCallback; +import io.littlehorse.server.streams.util.HeadersUtil; +import java.io.Closeable; +import java.util.Date; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.common.header.Headers; + +/** + * Everything related to the task protocol + */ +@Slf4j +public class TaskQueueCommandProducer implements Closeable { + + private final LHProducer producer; + private final String commandTopic; + + public TaskQueueCommandProducer(LHProducer producer, String commandTopic) { + this.producer = producer; + this.commandTopic = commandTopic; + } + + /* + * Sends a command to Kafka and simultaneously does a waitForProcessing() internal + * grpc call that asynchronously waits for the command to be processed. It + * infers the request context from the GRPC Context. + */ + public void returnTaskToClient(ScheduledTaskModel scheduledTask, PollTaskRequestObserver client) { + TaskClaimEvent claimEvent = + new TaskClaimEvent(scheduledTask, client.getTaskWorkerVersion(), client.getClientId()); + TaskClaimEventProducerCallback callback = new TaskClaimEventProducerCallback(scheduledTask, client); + processCommand(claimEvent, client.getPrincipalId(), client.getTenantId(), callback); + } + + /* + * This method is called from within the `CommandProcessor#process()` method (specifically, on the + * TaskClaimEvent#process()) method. Therefore, we cannot infer the RequestExecutionContext like + * we do in the other places, because the GRPC context does not exist in this case. + * Note that this is not a GRPC method that @Override's a super method and takes in + * a protobuf + StreamObserver. + */ + private void processCommand( + TaskClaimEvent taskClaimEvent, + PrincipalIdModel principalId, + TenantIdModel tenantId, + TaskClaimEventProducerCallback callback) { + CommandModel command = new CommandModel(taskClaimEvent); + + command.setCommandId(LHUtil.generateGuid()); + + Headers commandMetadata = HeadersUtil.metadataHeadersFor(tenantId, principalId); + producer.send(command.getPartitionKey(), command, commandTopic, callback, commandMetadata.toArray()); + } + + public void send( + ReportTaskRunModel reportTaskRun, AuthorizationContext auth, StreamObserver clientObserver) { + // There is no need to wait for the ReportTaskRun to actually be processed, because + // we would just return a google.protobuf.Empty anyways. All we need to do is wait for + // the Command to be persisted into Kafka. + CommandModel command = new CommandModel(reportTaskRun, new Date()); + + Callback kafkaProducerCallback = (meta, exn) -> { + try { + if (exn == null) { + clientObserver.onNext(Empty.getDefaultInstance()); + clientObserver.onCompleted(); + } else { + clientObserver.onError(new LHApiException(Status.UNAVAILABLE, "Failed recording command to Kafka")); + } + } catch (IllegalStateException e) { + log.debug("Call already closed"); + } + }; + Headers commandMetadata = HeadersUtil.metadataHeadersFor(auth.tenantId(), auth.principalId()); + + producer.send( + command.getPartitionKey(), command, commandTopic, kafkaProducerCallback, commandMetadata.toArray()); + } + + @Override + public void close() { + producer.close(); + } +} diff --git a/server/src/main/java/io/littlehorse/server/streams/taskqueue/TaskQueueImpl.java b/server/src/main/java/io/littlehorse/server/streams/taskqueue/TaskQueueImpl.java new file mode 100644 index 000000000..b6b944c21 --- /dev/null +++ b/server/src/main/java/io/littlehorse/server/streams/taskqueue/TaskQueueImpl.java @@ -0,0 +1,132 @@ +package io.littlehorse.server.streams.taskqueue; + +import io.littlehorse.common.model.ScheduledTaskModel; +import io.littlehorse.common.model.getable.objectId.TenantIdModel; +import io.littlehorse.server.streams.topology.core.RequestExecutionContext; +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.streams.processor.TaskId; + +@Slf4j +public class TaskQueueImpl implements TaskQueue { + + private final String taskDefName; + private final TaskQueueManager parent; + private final int capacity; + private final TenantIdModel tenantId; + private final Lock lock = new ReentrantLock(); + private final Queue hungryClients = new LinkedList<>(); + private final String instanceName; + private final LinkedBlockingQueue pendingTasks; + private final AtomicBoolean needsRehydration = new AtomicBoolean(false); + + public TaskQueueImpl(String taskDefName, TaskQueueManager parent, int capacity, TenantIdModel tenantId) { + this.taskDefName = taskDefName; + this.parent = parent; + this.capacity = capacity; + this.tenantId = tenantId; + this.instanceName = parent.getBackend().getInstanceName(); + this.pendingTasks = new LinkedBlockingQueue<>(capacity); + } + + @Override + public void onRequestDisconnected(PollTaskRequestObserver disconnectedObserver) { + synchronizedBlock(() -> { + hungryClients.removeIf(thing -> { + log.trace( + "Instance {}: Removing task queue observer for taskdef {} with" + " client id {}: {}", + instanceName, + taskDefName, + disconnectedObserver.getClientId(), + disconnectedObserver); + return thing.equals(disconnectedObserver); + }); + }); + } + + @Override + public boolean onTaskScheduled(TaskId streamTaskId, ScheduledTaskModel scheduledTask) { + boolean outOfCapacity = synchronizedBlock(() -> { + if (needsRehydration.get()) { + return true; + } + boolean added = pendingTasks.offer(new QueueItem(streamTaskId, scheduledTask)); + if (!added) { + needsRehydration.set(true); + } + return !added; + }); + if (!outOfCapacity && !hungryClients.isEmpty()) { + synchronizedBlock(() -> { + PollTaskRequestObserver hungryClient = hungryClients.poll(); + if (hungryClient != null) { + parent.itsAMatch(scheduledTask, hungryClient); + } + }); + } + return outOfCapacity; + } + + @Override + public void onPollRequest(PollTaskRequestObserver requestObserver, RequestExecutionContext requestContext) { + synchronizedBlock(() -> { + QueueItem nextItem = pendingTasks.poll(); + if (nextItem != null) { + parent.itsAMatch(nextItem.scheduledTask(), requestObserver); + } else { + hungryClients.add(requestObserver); + } + }); + } + + @Override + public int size() { + return pendingTasks.size(); + } + + @Override + public long rehydratedCount() { + return 0; + } + + @Override + public void drainPartition(TaskId partitionToDrain) { + pendingTasks.removeIf(queueItem -> queueItem.streamsTaskId().equals(partitionToDrain)); + } + + @Override + public TenantIdModel tenantId() { + return tenantId; + } + + @Override + public String taskDefName() { + return taskDefName; + } + + private record QueueItem(TaskId streamsTaskId, ScheduledTaskModel scheduledTask) {} + + private void synchronizedBlock(Runnable runnable) { + try { + lock.lock(); + runnable.run(); + } finally { + lock.unlock(); + } + } + + private boolean synchronizedBlock(Supplier booleanSupplier) { + try { + lock.lock(); + return booleanSupplier.get(); + } finally { + lock.unlock(); + } + } +} diff --git a/server/src/main/java/io/littlehorse/server/streams/taskqueue/TaskQueueManager.java b/server/src/main/java/io/littlehorse/server/streams/taskqueue/TaskQueueManager.java index 729530a72..237a881b4 100644 --- a/server/src/main/java/io/littlehorse/server/streams/taskqueue/TaskQueueManager.java +++ b/server/src/main/java/io/littlehorse/server/streams/taskqueue/TaskQueueManager.java @@ -5,6 +5,7 @@ import io.littlehorse.common.model.getable.objectId.TenantIdModel; import io.littlehorse.server.LHServer; import io.littlehorse.server.streams.topology.core.RequestExecutionContext; +import java.io.Closeable; import java.util.Collection; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; @@ -13,19 +14,24 @@ import org.apache.kafka.streams.processor.TaskId; @Slf4j -public class TaskQueueManager { +public class TaskQueueManager implements Closeable { - private final ConcurrentHashMap taskQueues; + private final ConcurrentHashMap taskQueues; @Getter private LHServer backend; private final int individualQueueConfiguredCapacity; + private final TaskQueueCommandProducer taskQueueCommandProducer; - public TaskQueueManager(LHServer backend, int individualQueueConfiguredCapacity) { + public TaskQueueManager( + LHServer backend, + int individualQueueConfiguredCapacity, + TaskQueueCommandProducer taskQueueCommandProducer) { this.taskQueues = new ConcurrentHashMap<>(); this.backend = backend; this.individualQueueConfiguredCapacity = individualQueueConfiguredCapacity; + this.taskQueueCommandProducer = taskQueueCommandProducer; } public void onPollRequest( @@ -47,20 +53,31 @@ public void drainPartition(TaskId partitionToDrain) { } public void itsAMatch(ScheduledTaskModel scheduledTask, PollTaskRequestObserver luckyClient) { - backend.returnTaskToClient(scheduledTask, luckyClient); + taskQueueCommandProducer.returnTaskToClient(scheduledTask, luckyClient); } - private OneTaskQueue getSubQueue(TenantTaskName tenantTask) { + private TaskQueue getSubQueue(TenantTaskName tenantTask) { return taskQueues.computeIfAbsent( tenantTask, - taskToCreate -> new OneTaskQueue( + taskToCreate -> new TaskQueueImpl( taskToCreate.taskDefName(), this, individualQueueConfiguredCapacity, taskToCreate.tenantId())); } - public Collection all() { + public Collection all() { return taskQueues.values(); } + public long rehydrationCount() { + return taskQueues.values().stream() + .mapToLong(queue -> queue.rehydratedCount()) + .sum(); + } + + @Override + public void close() { + taskQueueCommandProducer.close(); + } + private record TenantTaskName(TenantIdModel tenantId, String taskDefName) { public TenantTaskName { diff --git a/server/src/main/java/io/littlehorse/server/streams/util/AsyncWaiters.java b/server/src/main/java/io/littlehorse/server/streams/util/AsyncWaiters.java index d6adb30c0..92ab6bc97 100644 --- a/server/src/main/java/io/littlehorse/server/streams/util/AsyncWaiters.java +++ b/server/src/main/java/io/littlehorse/server/streams/util/AsyncWaiters.java @@ -111,6 +111,10 @@ public void registerObserverWaitingForWorkflowEvent( } } + public long size() { + return commandWaiters.size(); + } + public void handleRebalance(Set assignedTasks) { Set assignedPartitions = assignedTasks.stream().map(TaskId::partition).collect(Collectors.toSet()); diff --git a/server/src/main/java/io/littlehorse/server/streams/util/POSTStreamObserver.java b/server/src/main/java/io/littlehorse/server/streams/util/POSTStreamObserver.java index 7876ea2ef..b74a98599 100644 --- a/server/src/main/java/io/littlehorse/server/streams/util/POSTStreamObserver.java +++ b/server/src/main/java/io/littlehorse/server/streams/util/POSTStreamObserver.java @@ -111,6 +111,10 @@ public void onNext(WaitForCommandResponse reply) { ctx.onNext(buildRespFromBytes(reply.getResult())); } catch (IllegalStateException e) { log.debug("Call already closed"); + } catch (StatusRuntimeException sre) { + if (sre.getStatus().getCode().equals(Status.CANCELLED.getCode())) { + log.debug("Call already cancelled"); + } } } else if (reply.hasPartitionMigratedResponse()) { internalComms.waitForCommand(command, this, requestContext); diff --git a/server/src/test/java/io/littlehorse/common/MockLHProducer.java b/server/src/test/java/io/littlehorse/common/MockLHProducer.java new file mode 100644 index 000000000..3b597d368 --- /dev/null +++ b/server/src/test/java/io/littlehorse/common/MockLHProducer.java @@ -0,0 +1,28 @@ +package io.littlehorse.common; + +import io.littlehorse.common.util.LHProducer; +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; + +public class MockLHProducer extends LHProducer { + private final MockProducer mockProducer; + + private MockLHProducer(MockProducer mockProducer) { + super(mockProducer); + this.mockProducer = mockProducer; + } + + public MockProducer getKafkaProducer() { + return mockProducer; + } + + public static MockLHProducer create() { + return create(true); + } + + public static MockLHProducer create(boolean autoComplete) { + return new MockLHProducer(new MockProducer<>( + autoComplete, Serdes.String().serializer(), Serdes.Bytes().serializer())); + } +} diff --git a/server/src/test/java/io/littlehorse/server/streams/ProducerCommandCallbackTest.java b/server/src/test/java/io/littlehorse/server/streams/ProducerCommandCallbackTest.java index e0392e467..db95c0bef 100644 --- a/server/src/test/java/io/littlehorse/server/streams/ProducerCommandCallbackTest.java +++ b/server/src/test/java/io/littlehorse/server/streams/ProducerCommandCallbackTest.java @@ -5,6 +5,7 @@ import io.grpc.Status; import io.grpc.StatusRuntimeException; +import io.grpc.netty.shaded.io.netty.util.concurrent.ImmediateExecutor; import io.grpc.stub.StreamObserver; import io.littlehorse.common.exceptions.LHApiException; import io.littlehorse.common.model.AbstractCommand; @@ -38,8 +39,8 @@ class ProducerCommandCallbackTest { private final HostInfo hostInfo = new HostInfo("localhost", 2023); private final LHInternalsGrpc.LHInternalsStub stub = mock(); private final Function stubProvider = (meta) -> stub; - private final ProducerCommandCallback producerCallback = - new ProducerCommandCallback(responseObserver, command, coreStreams, hostInfo, stubProvider, commandWaiters); + private final ProducerCommandCallback producerCallback = new ProducerCommandCallback( + responseObserver, command, coreStreams, hostInfo, stubProvider, commandWaiters, ImmediateExecutor.INSTANCE); private final RecordMetadata metadata = new RecordMetadata(new TopicPartition("my-topic", 2), 0L, 0, 0L, 0, 0); private final WaitForCommandResponse response = mock(); private final KeyQueryMetadata keyQueryMetadata = new KeyQueryMetadata(hostInfo, Collections.emptySet(), 2); diff --git a/server/src/test/java/io/littlehorse/server/streams/taskqueue/OneTaskQueueTest.java b/server/src/test/java/io/littlehorse/server/streams/taskqueue/OneTaskQueueTest.java index dd89ee970..45020f360 100644 --- a/server/src/test/java/io/littlehorse/server/streams/taskqueue/OneTaskQueueTest.java +++ b/server/src/test/java/io/littlehorse/server/streams/taskqueue/OneTaskQueueTest.java @@ -18,6 +18,7 @@ import io.littlehorse.server.streams.topology.core.RequestExecutionContext; import io.littlehorse.server.streams.util.HeadersUtil; import java.util.Date; +import java.util.List; import java.util.Optional; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.api.MockProcessorContext; @@ -25,6 +26,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Answers; +import org.mockito.ArgumentCaptor; import org.mockito.InOrder; import org.mockito.Mockito; @@ -96,35 +98,51 @@ public void shouldRecoverScheduledTaskFromStoreAndKeepTheOriginalOrder() { task3.setCreatedAt(new Date(new Date().getTime() + 4000L)); ScheduledTaskModel task4 = TestUtil.scheduledTaskModel("wf-4"); task4.setCreatedAt(new Date(new Date().getTime() + 5000L)); + ScheduledTaskModel task5 = TestUtil.scheduledTaskModel("wf-5"); + task5.setCreatedAt(new Date(new Date().getTime() + 6000L)); TaskRunModel taskRun1 = TestUtil.taskRun(task1.getTaskRunId(), task3.getTaskDefId()); TaskRunModel taskRun2 = TestUtil.taskRun(task2.getTaskRunId(), task3.getTaskDefId()); TaskRunModel taskRun3 = TestUtil.taskRun(task3.getTaskRunId(), task3.getTaskDefId()); TaskRunModel taskRun4 = TestUtil.taskRun(task4.getTaskRunId(), task4.getTaskDefId()); + TaskRunModel taskRun5 = TestUtil.taskRun(task5.getTaskRunId(), task5.getTaskDefId()); processorContext.getableManager().put(taskRun1); processorContext.getableManager().put(taskRun2); processorContext.getableManager().put(taskRun3); processorContext.getableManager().put(taskRun4); + processorContext.getableManager().put(taskRun5); processorContext.getCoreStore().put(task1); processorContext.getCoreStore().put(task2); processorContext.getCoreStore().put(task3); processorContext.getCoreStore().put(task4); + processorContext.getCoreStore().put(task5); + processorContext.getCoreStore().get(task5.getStoreKey(), ScheduledTaskModel.class); processorContext.endExecution(); OneTaskQueue boundedQueue = - new OneTaskQueue(taskName, taskQueueManager, 1, new TenantIdModel(LHConstants.DEFAULT_TENANT)); + new OneTaskQueue(taskName, taskQueueManager, 2, new TenantIdModel(LHConstants.DEFAULT_TENANT)); boundedQueue.onTaskScheduled(streamsTaskId, task1); boundedQueue.onTaskScheduled(streamsTaskId, task2); - Assertions.assertThat(boundedQueue.hasMoreTasksOnDisk(streamsTaskId)).isTrue(); boundedQueue.onTaskScheduled(streamsTaskId, task3); + Assertions.assertThat(boundedQueue.hasMoreTasksOnDisk(streamsTaskId)).isTrue(); + boundedQueue.onPollRequest(mockClient, requestContext); boundedQueue.onTaskScheduled(streamsTaskId, task4); + boundedQueue.onTaskScheduled(streamsTaskId, task5); boundedQueue.onPollRequest(mockClient, requestContext); boundedQueue.onPollRequest(mockClient, requestContext); boundedQueue.onPollRequest(mockClient, requestContext); boundedQueue.onPollRequest(mockClient, requestContext); InOrder inOrder = inOrder(taskQueueManager); - inOrder.verify(taskQueueManager, times(4)).itsAMatch(any(), same(mockClient)); + ArgumentCaptor captor = ArgumentCaptor.forClass(ScheduledTaskModel.class); + inOrder.verify(taskQueueManager, times(5)).itsAMatch(captor.capture(), same(mockClient)); + List scheduledTasks = captor.getAllValues(); + assertThat(scheduledTasks).hasSize(5); + assertThat(scheduledTasks.get(0).getTaskRunId().wfRunId.getId()).isEqualTo("wf-1"); + assertThat(scheduledTasks.get(1).getTaskRunId().wfRunId.getId()).isEqualTo("wf-2"); + assertThat(scheduledTasks.get(2).getTaskRunId().wfRunId.getId()).isEqualTo("wf-3"); + assertThat(scheduledTasks.get(3).getTaskRunId().wfRunId.getId()).isEqualTo("wf-4"); + assertThat(scheduledTasks.get(4).getTaskRunId().wfRunId.getId()).isEqualTo("wf-5"); } private Command commandProto() { diff --git a/server/src/test/java/io/littlehorse/server/streams/taskqueue/TaskQueueCommandProducerTest.java b/server/src/test/java/io/littlehorse/server/streams/taskqueue/TaskQueueCommandProducerTest.java new file mode 100644 index 000000000..5c2333d31 --- /dev/null +++ b/server/src/test/java/io/littlehorse/server/streams/taskqueue/TaskQueueCommandProducerTest.java @@ -0,0 +1,109 @@ +package io.littlehorse.server.streams.taskqueue; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.protobuf.Empty; +import io.grpc.stub.StreamObserver; +import io.littlehorse.common.AuthorizationContext; +import io.littlehorse.common.LHSerializable; +import io.littlehorse.common.MockLHProducer; +import io.littlehorse.common.model.ScheduledTaskModel; +import io.littlehorse.common.model.corecommand.subcommand.ReportTaskRunModel; +import io.littlehorse.common.model.getable.core.taskrun.TaskRunSourceModel; +import io.littlehorse.common.model.getable.objectId.PrincipalIdModel; +import io.littlehorse.common.model.getable.objectId.TaskDefIdModel; +import io.littlehorse.common.model.getable.objectId.TaskRunIdModel; +import io.littlehorse.common.model.getable.objectId.TenantIdModel; +import io.littlehorse.common.model.getable.objectId.WfRunIdModel; +import io.littlehorse.sdk.common.LHLibUtil; +import io.littlehorse.sdk.common.proto.ReportTaskRun; +import io.littlehorse.sdk.common.proto.TaskRunId; +import io.littlehorse.sdk.common.proto.VariableValue; +import io.littlehorse.sdk.common.proto.WfRunId; +import io.littlehorse.server.streams.topology.core.ProcessorExecutionContext; +import java.util.Date; +import java.util.List; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class TaskQueueCommandProducerTest { + + private final MockLHProducer autoCompleteMockProducer = MockLHProducer.create(); + private final MockLHProducer mockProducer = MockLHProducer.create(false); + private final String commandTopic = "test"; + private final PollTaskRequestObserver requestObserver = mock(); + private final ProcessorExecutionContext processorContext = mock(); + private final AuthorizationContext mockAuthContext = mock(); + private final StreamObserver reportTaskRunObserver = mock(); + + @BeforeEach + void setup() { + when(requestObserver.getPrincipalId()).thenReturn(new PrincipalIdModel("test-principal")); + when(requestObserver.getTenantId()).thenReturn(new TenantIdModel("test-tenant")); + when(requestObserver.getTaskWorkerVersion()).thenReturn("1.0.0"); + when(requestObserver.getClientId()).thenReturn("test-worker"); + when(mockAuthContext.principalId()).thenReturn(new PrincipalIdModel("test-principal")); + when(mockAuthContext.tenantId()).thenReturn(new TenantIdModel("test-tenant")); + } + + @Test + void shouldReturnTaskToClientAfterProducerRecordSent() { + final TaskQueueCommandProducer taskQueueProducer = + new TaskQueueCommandProducer(autoCompleteMockProducer, commandTopic); + ScheduledTaskModel scheduledTask = buildScheduledTask(); + taskQueueProducer.returnTaskToClient(scheduledTask, requestObserver); + verify(requestObserver).sendResponse(scheduledTask); + } + + @Test + void shouldCloseResponseObserverWithErrorOnProducerFailures() { + final RuntimeException expectedException = new RuntimeException("oops"); + final TaskQueueCommandProducer taskQueueProducer = new TaskQueueCommandProducer(mockProducer, commandTopic); + ScheduledTaskModel scheduledTask = buildScheduledTask(); + taskQueueProducer.returnTaskToClient(scheduledTask, requestObserver); + mockProducer.getKafkaProducer().errorNext(expectedException); + verify(requestObserver).onError(expectedException); + } + + @Test + void shouldSendReportTaskRunCommands() { + final TaskQueueCommandProducer taskQueueProducer = + new TaskQueueCommandProducer(autoCompleteMockProducer, commandTopic); + ReportTaskRunModel reportTaskRun = buildReportTaskRunModel(); + taskQueueProducer.send(reportTaskRun, mockAuthContext, reportTaskRunObserver); + verify(reportTaskRunObserver).onNext(any()); + verify(reportTaskRunObserver).onCompleted(); + } + + private ScheduledTaskModel buildScheduledTask() { + final TaskDefIdModel taskDefId = new TaskDefIdModel("task-1"); + final WfRunIdModel wfRunId = new WfRunIdModel("wf-run-1"); + final TaskRunIdModel taskRunId = new TaskRunIdModel(wfRunId, "task-run-1"); + ScheduledTaskModel scheduledTask = new ScheduledTaskModel(); + scheduledTask.setVariables(List.of()); + scheduledTask.setAttemptNumber(1); + scheduledTask.setCreatedAt(new Date()); + scheduledTask.setSource(new TaskRunSourceModel()); + scheduledTask.setTaskDefId(taskDefId); + scheduledTask.setTaskRunId(taskRunId); + return scheduledTask; + } + + private ReportTaskRunModel buildReportTaskRunModel() { + ReportTaskRun reportTaskRun = ReportTaskRun.newBuilder() + .setAttemptNumber(1) + .setOutput(VariableValue.newBuilder().setInt(10)) + .setTaskRunId(TaskRunId.newBuilder() + .setWfRunId(WfRunId.newBuilder().setId("test")) + .setTaskGuid("test-guid")) + .setTime(LHLibUtil.fromDate(new Date())) + .build(); + return LHSerializable.fromProto(reportTaskRun, ReportTaskRunModel.class, processorContext); + } +} diff --git a/server/src/test/java/io/littlehorse/server/streams/taskqueue/TaskQueueManagerTest.java b/server/src/test/java/io/littlehorse/server/streams/taskqueue/TaskQueueManagerTest.java index 1da29f064..413566e4e 100644 --- a/server/src/test/java/io/littlehorse/server/streams/taskqueue/TaskQueueManagerTest.java +++ b/server/src/test/java/io/littlehorse/server/streams/taskqueue/TaskQueueManagerTest.java @@ -39,7 +39,8 @@ public class TaskQueueManagerTest { private final LHServer mockServer = Mockito.mock(); - private final TaskQueueManager queueManager = new TaskQueueManager(mockServer, Integer.MAX_VALUE); + private final TaskQueueCommandProducer mockProducer = Mockito.mock(TaskQueueCommandProducer.class); + private final TaskQueueManager queueManager = new TaskQueueManager(mockServer, Integer.MAX_VALUE, mockProducer); private final TaskDefModel taskDef = TestUtil.taskDef("my-task"); private final TaskDefIdModel taskId = taskDef.getId(); private final TaskId streamsTaskId = TaskId.parse("0_1"); @@ -80,18 +81,18 @@ public void shouldSchedulePendingTask() { queueManager.onTaskScheduled(streamsTaskId, taskId, taskToSchedule, tenantId); // Task was scheduled, now we need to verify only one task is returned to the client trackableObserver.onNext(pollTask); - verify(mockServer, times(1)).returnTaskToClient(taskToSchedule, trackableObserver); + verify(mockProducer, times(1)).returnTaskToClient(taskToSchedule, trackableObserver); Mockito.reset(mockServer); trackableObserver.onNext(pollTask); - verify(mockServer, never()).returnTaskToClient(taskToSchedule, trackableObserver); + verify(mockProducer).returnTaskToClient(taskToSchedule, trackableObserver); } @Test public void shouldFeedHungryClientWhenATaskIsScheduled() { trackableObserver.onNext(pollTask); - verify(mockServer, never()).returnTaskToClient(taskToSchedule, trackableObserver); + verify(mockProducer, never()).returnTaskToClient(taskToSchedule, trackableObserver); queueManager.onTaskScheduled(streamsTaskId, taskId, taskToSchedule, tenantId); - verify(mockServer, times(1)).returnTaskToClient(taskToSchedule, trackableObserver); + verify(mockProducer, times(1)).returnTaskToClient(taskToSchedule, trackableObserver); } @Test @@ -110,6 +111,6 @@ public void shouldSchedulePendingTaskConcurrently() throws Exception { for (int i = 0; i < numberOfTaskToSchedule; i++) { trackableObserver.onNext(pollTask); } - verify(mockServer, times(numberOfTaskToSchedule)).returnTaskToClient(taskToSchedule, trackableObserver); + verify(mockProducer, times(numberOfTaskToSchedule)).returnTaskToClient(taskToSchedule, trackableObserver); } } diff --git a/test-utils/build.gradle b/test-utils/build.gradle index 6ad5c8b7a..f1d2ad1e8 100644 --- a/test-utils/build.gradle +++ b/test-utils/build.gradle @@ -12,6 +12,7 @@ plugins { } repositories { + mavenLocal() // Use Maven Central for resolving dependencies. mavenCentral() }