Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor command producers #1120

Draft
wants to merge 25 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
624ce84
adds extra producer for task claim events
eduwercamacaro Oct 4, 2024
1131eef
asyc producer callback completion
eduwercamacaro Oct 4, 2024
d11d34b
refactor poll task protocol
eduwercamacaro Oct 9, 2024
a7dffed
refactor task claims
eduwercamacaro Oct 10, 2024
9ac2da9
configs
eduwercamacaro Oct 30, 2024
6659a7b
merge master
eduwercamacaro Nov 11, 2024
8d13df0
merge master
eduwercamacaro Nov 18, 2024
847b8b8
spotless
eduwercamacaro Nov 18, 2024
1272b18
refactor task claim producer
eduwercamacaro Nov 18, 2024
96e874b
adds unit test for task queue producer
eduwercamacaro Nov 19, 2024
4a9e040
merge master
eduwercamacaro Nov 25, 2024
82451b0
fix compilation errors
eduwercamacaro Nov 25, 2024
06ec897
erge branch 'master' into refactor-command-producers
eduwercamacaro Nov 27, 2024
4a98d7d
installs jemalloc on server dockerfile
eduwercamacaro Dec 5, 2024
1cefa45
Merge branch 'server-imports-jemalloc' into refactor-command-producers
eduwercamacaro Dec 5, 2024
687e548
disables config setters
eduwercamacaro Dec 5, 2024
7336810
rollback rocksdb config setter
Dec 12, 2024
bf52f34
spotless fix
eduwercamacaro Dec 12, 2024
cb0ca35
increase rates per second
eduwercamacaro Dec 12, 2024
e42182b
merge master
eduwercamacaro Dec 17, 2024
745d67e
merge master
eduwercamacaro Dec 18, 2024
4673dac
async waiters metric
eduwercamacaro Dec 19, 2024
c35bb0f
Merge branch 'master' into refactor-command-producers
eduwercamacaro Dec 19, 2024
7c13d26
preserve one task queue ordering
eduwercamacaro Dec 19, 2024
d540530
taskqueue refactor
eduwercamacaro Dec 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions canary/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ plugins {
}

repositories {
mavenLocal()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:porg:

mavenCentral()
}

Expand Down
11 changes: 6 additions & 5 deletions canary/canary.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ lh.canary.workflow.creation.enable=true
lh.canary.topic.replicas=1
lh.canary.topic.partitions=1
# Aggregator settings
lh.canary.aggregator.enable=true
lh.canary.aggregator.enable=false

# Metronome settings
lh.canary.metronome.enable=true
lh.canary.metronome.enable=false
lh.canary.metronome.worker.enable=true
lh.canary.metronome.frequency.ms=1000
lh.canary.metronome.run.threads=1
lh.canary.metronome.run.requests=300
lh.canary.metronome.run.frequency.ms=1000
lh.canary.metronome.run.threads=20
lh.canary.metronome.run.requests=700
lh.canary.metronome.run.sample.rate=1
lh.canary.lhw.num.worker.threads=20
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public MetronomeWorker(final BeatProducer producer, final LHConfig lhConfig) {
@LHTaskMethod(MetronomeWorkflow.TASK_NAME)
public void executeTask(final long startTime, final boolean sampleIteration, final WorkerContext context) {
final String id = "%s/%s".formatted(context.getIdempotencyKey(), context.getAttemptNumber());
log.debug("Executing task {} {}", MetronomeWorkflow.TASK_NAME, id);
log.info("Executing task {} {}", MetronomeWorkflow.TASK_NAME, id);
if (sampleIteration) {
producer.send(
id, BeatType.TASK_RUN_EXECUTION, Duration.between(Instant.ofEpochMilli(startTime), Instant.now()));
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
version=0.0.0-development
group=io.littlehorse
kafkaVersion=3.8.0
kafkaVersion=3.8.1
lombokVersion=1.18.32
grpcVersion=1.56.1
junitVersion=5.9.2
2 changes: 1 addition & 1 deletion local-dev/configs/server-1.config
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ LHS_REPLICATION_FACTOR=1
LHS_CLUSTER_PARTITIONS=12
LHS_STATE_DIR=/tmp/kafkaState
LHS_STREAMS_NUM_WARMUP_REPLICAS=8
LHS_STREAMS_NUM_STANDBY_REPLICAS=0
LHS_STREAMS_NUM_STANDBY_REPLICAS=1
LHS_STREAMS_SESSION_TIMEOUT=10000
LHS_SHOULD_CREATE_TOPICS=true
LHS_CORE_STREAM_THREADS=2
Expand Down
2 changes: 1 addition & 1 deletion local-dev/configs/server-2.config
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ LHS_REPLICATION_FACTOR=1
LHS_CLUSTER_PARTITIONS=12
LHS_STATE_DIR=/tmp/kafkaStateTwo
LHS_STREAMS_NUM_WARMUP_REPLICAS=8
LHS_STREAMS_NUM_STANDBY_REPLICAS=0
LHS_STREAMS_NUM_STANDBY_REPLICAS=3
LHS_STREAMS_SESSION_TIMEOUT=10000
LHS_SHOULD_CREATE_TOPICS=true
LHS_CORE_STREAM_THREADS=2
Expand Down
2 changes: 1 addition & 1 deletion local-dev/configs/server-3.config
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ LHS_REPLICATION_FACTOR=1
LHS_CLUSTER_PARTITIONS=12
LHS_STATE_DIR=/tmp/kafkaStateThree
LHS_STREAMS_NUM_WARMUP_REPLICAS=8
LHS_STREAMS_NUM_STANDBY_REPLICAS=0
LHS_STREAMS_NUM_STANDBY_REPLICAS=2
LHS_STREAMS_SESSION_TIMEOUT=10000
LHS_SHOULD_CREATE_TOPICS=true
LHS_CORE_STREAM_THREADS=2
Expand Down
2 changes: 1 addition & 1 deletion local-dev/do-canary.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ WORK_DIR=$(cd "$SCRIPT_DIR/.." && pwd)

cd "$WORK_DIR"

./gradlew canary:installDist
#./gradlew canary:installDist
./canary/build/install/canary/bin/canary canary/canary.properties
11 changes: 11 additions & 0 deletions local-dev/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
services:
kafka:
ports:
- "9092:9092"
container_name: lh-server-kafka
image: apache/kafka:3.8.0
deploy:
resources:
limits:
cpus: '3500'
memory: '12G'
5 changes: 5 additions & 0 deletions local-dev/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ services:
- "9092:9092"
container_name: lh-server-kafka
image: apache/kafka:3.8.0
deploy:
resources:
limits:
cpus: '3.5'
memory: '12G'
EOF
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,6 @@ protected String[] getEnvKeyPrefixes() {
* @return the number of worker threads to run.
*/
public int getWorkerThreads() {
return Integer.valueOf(getOrSetDefault(NUM_WORKER_THREADS_KEY, "2"));
return Integer.valueOf(getOrSetDefault(NUM_WORKER_THREADS_KEY, "3"));
}
}
1 change: 1 addition & 0 deletions server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ publishing {
}

repositories {
mavenLocal()
mavenCentral()
maven {
url = uri("https://packages.confluent.io/maven/")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.littlehorse.common;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.TransactionAbortedException;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;

import java.util.Map;

public class LHProductionExceptionHandler implements ProductionExceptionHandler {

public LHProductionExceptionHandler() {

}

@Override
public ProductionExceptionHandlerResponse handle(ProducerRecord<byte[], byte[]> record, Exception exception) {
if (exception instanceof TransactionAbortedException) {
return ProductionExceptionHandlerResponse.CONTINUE;
}
return ProductionExceptionHandlerResponse.FAIL;
}

@Override
public void configure(Map<String, ?> configs) {
//nothing to do
}
}
35 changes: 24 additions & 11 deletions server/src/main/java/io/littlehorse/common/LHServerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -628,13 +628,6 @@ public void cleanup() {
if (this.txnProducer != null) this.txnProducer.close();
}

public LHProducer getProducer() {
if (producer == null) {
producer = new LHProducer(this);
}
return producer;
}

public boolean shouldCreateTopics() {
return Boolean.valueOf(getOrSetDefault(SHOULD_CREATE_TOPICS_KEY, "true"));
}
Expand All @@ -656,9 +649,8 @@ public long getTimerMemtableSize() {
return Long.valueOf(getOrSetDefault(TIMER_MEMTABLE_SIZE_BYTES_KEY, String.valueOf(1024L * 1024L * 32)));
}

public Properties getKafkaProducerConfig(String component) {
public Properties getKafkaCommandProducerConfig(String component) {
Properties conf = new Properties();
conf.put("client.id", this.getClientId(component));
conf.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers());
conf.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
conf.put(
Expand All @@ -674,6 +666,16 @@ public Properties getKafkaProducerConfig(String component) {
return conf;
}

public Properties getKafkaTaskClaimProducer() {
Properties conf = getKafkaCommandProducerConfig("task-claim");
// conf.put(ProducerConfig.ACKS_CONFIG, "0");
// conf.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
// conf.put(ProducerConfig.LINGER_MS_CONFIG, "0");
// conf.put(ProducerConfig.BATCH_SIZE_CONFIG, "49152");
// conf.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1000");
return conf;
}

/*
* Right now, this only supports mtls auth. We want to:
* 1. Support other auth types, especially SCRAM-SHA-512
Expand Down Expand Up @@ -908,7 +910,7 @@ private Properties getBaseStreamsConfig() {

// Configs required by KafkaStreams. Some of these are overriden by the application logic itself.
props.put("default.deserialization.exception.handler", LogAndContinueExceptionHandler.class);
props.put("default.production.exception.handler", DefaultProductionExceptionHandler.class);
props.put("default.production.exception.handler", LHProductionExceptionHandler.class);
props.put("default.value.serde", Serdes.StringSerde.class.getName());
props.put("default.key.serde", Serdes.StringSerde.class.getName());

Expand Down Expand Up @@ -944,7 +946,18 @@ private Properties getBaseStreamsConfig() {
// in the case of a server failure while a request is being processed, the resulting
// `Command` should be processed on a new server within a minute. Issue #479
// should verify this behavior
props.put("consumer.session.timeout.ms", getStreamsSessionTimeout());
props.put(
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, getStreamsSessionTimeout());
props.put(
ProducerConfig.MAX_BLOCK_MS_CONFIG, getStreamsSessionTimeout());
// props.put(
// ConsumerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG,
// "120000");
// props.put(
// ConsumerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG,
// "60000");
// props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,
// "240000");

// In case we need to authenticate to Kafka, this sets it.
addKafkaSecuritySettings(props);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.littlehorse.common.model.corecommand.subcommand;

import com.google.protobuf.Empty;
import com.google.protobuf.Message;
import io.grpc.Status;
import io.littlehorse.common.LHSerializable;
Expand All @@ -11,7 +12,6 @@
import io.littlehorse.common.model.getable.objectId.TaskRunIdModel;
import io.littlehorse.common.proto.TaskClaimEventPb;
import io.littlehorse.common.util.LHUtil;
import io.littlehorse.sdk.common.proto.PollTaskResponse;
import io.littlehorse.server.streams.taskqueue.PollTaskRequestObserver;
import io.littlehorse.server.streams.topology.core.ExecutionContext;
import io.littlehorse.server.streams.topology.core.ProcessorExecutionContext;
Expand Down Expand Up @@ -66,37 +66,35 @@ public TaskClaimEventPb.Builder toProto() {
}

public boolean hasResponse() {
// TaskClaimEvents are always due to a Task Worker's poll request.
return true;
return false;
}

@Override
public PollTaskResponse process(ProcessorExecutionContext executionContext, LHServerConfig config) {
public Empty process(ProcessorExecutionContext executionContext, LHServerConfig config) {
TaskRunModel taskRun = executionContext.getableManager().get(taskRunId);
if (taskRun == null) {
log.warn("Got claimTask for non-existent taskRun {}", taskRunId);
throw new LHApiException(Status.INVALID_ARGUMENT, "Got claimTask for nonexistent taskRun {}" + taskRunId);
}

executionContext.getTaskManager().markTaskAsScheduled(taskRunId);
taskRun.onTaskAttemptStarted(this);
return Empty.newBuilder().build();
// Needs to be done before we process the event, since processing the event
// will delete the task schedule request.
ScheduledTaskModel scheduledTask = executionContext.getTaskManager().markTaskAsScheduled(taskRunId);

// It's totally fine for the scheduledTask to be null. That happens when someone already
// claimed that task. This happens when a server is recovering from a crash. The fact that it
// is null prevents it from being scheduled twice.
// ScheduledTaskModel scheduledTask = executionContext.getTaskManager().markTaskAsScheduled(taskRunId);
//
// We shouldn't throw an error on this, we just return an empty optional.
if (scheduledTask == null) {
log.warn("Processing pollTaskRequest for task {} that was already claimed", taskRunId);
return PollTaskResponse.newBuilder().build();
} else {
taskRun.onTaskAttemptStarted(this);
executionContext.getableManager().get(taskRunId.wfRunId).advance(time);
return PollTaskResponse.newBuilder()
.setResult(scheduledTask.toProto())
.build();
}
// // It's totally fine for the scheduledTask to be null. That happens when someone already
// // claimed that task. This happens when a server is recovering from a crash. The fact that it
// // is null prevents it from being scheduled twice.
// //
// // We shouldn't throw an error on this, we just return an empty optional.
// if (scheduledTask == null) {
// return Empty.newBuilder().build();
// } else {
// taskRun.onTaskAttemptStarted(this);
// executionContext.getableManager().get(taskRunId.wfRunId).advance(time);
// return Empty.newBuilder().build();
// }
}

public static TaskClaimEvent fromProto(TaskClaimEventPb proto, ExecutionContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,27 @@ public void onTaskAttemptStarted(TaskClaimEvent se) {
attempt.setStatus(TaskStatus.TASK_RUNNING);
}

public void onTaskAttemptStarted() {
transitionTo(TaskStatus.TASK_RUNNING);

// create a timer to mark the task is timeout if it does not finish
// ReportTaskRunModel taskResult = new ReportTaskRunModel();
// taskResult.setTaskRunId(id);
// taskResult.setTime(new Date(System.currentTimeMillis() + (1000 * timeoutSeconds)));
// taskResult.setStatus(TaskStatus.TASK_TIMEOUT);
// CommandModel timerCommand = new CommandModel(taskResult, taskResult.getTime());
// LHTimer timer = new LHTimer(timerCommand);
// processorContext.getTaskManager().scheduleTimer(timer);

// Now that that's out of the way, we can mark the TaskRun as running.
// Also we need to save the task worker version and client id.
TaskAttemptModel attempt = getLatestAttempt();
attempt.setTaskWorkerId("eduwer");
attempt.setTaskWorkerVersion("camacaro");
attempt.setStartTime(new Date());
attempt.setStatus(TaskStatus.TASK_RUNNING);
}

public void onTaskAttemptResultReported(ReportTaskRunModel ce) {
if (ce.getAttemptNumber() >= attempts.size()) {
throw new LHApiException(Status.INVALID_ARGUMENT, "Specified Task Attempt does not exist!");
Expand Down
12 changes: 4 additions & 8 deletions server/src/main/java/io/littlehorse/common/util/LHProducer.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package io.littlehorse.common.util;

import io.littlehorse.common.LHServerConfig;
import io.littlehorse.common.model.AbstractCommand;
import java.io.Closeable;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
Expand All @@ -14,20 +14,16 @@

public class LHProducer implements Closeable {

private KafkaProducer<String, Bytes> prod;
private final KafkaProducer<String, Bytes> prod;

public LHProducer(LHServerConfig config) {
prod = new KafkaProducer<>(config.getKafkaProducerConfig(config.getLHInstanceName()));
public LHProducer(Properties configs) {
prod = new KafkaProducer<>(configs);
}

public Future<RecordMetadata> send(String key, AbstractCommand<?> t, String topic, Callback cb, Header... headers) {
return sendRecord(new ProducerRecord<>(topic, null, key, new Bytes(t.toBytes()), List.of(headers)), cb);
}

public Future<RecordMetadata> send(String key, AbstractCommand<?> t, String topic) {
return this.send(key, t, topic, null);
}

public Future<RecordMetadata> sendRecord(ProducerRecord<String, Bytes> record, Callback cb) {
return (cb != null) ? prod.send(record, cb) : prod.send(record);
}
Expand Down
Loading
Loading