Skip to content

Commit

Permalink
perf(server): use executor for returning tasks to client
Browse files Browse the repository at this point in the history
This commit needs extensive testing. The StreamObserver is not considered threadSafe; I haven't spent enough time with this commit to know whether we are opening up the chance of multiple concurrent calls to a client StreamObserver.
  • Loading branch information
coltmcnealy-lh committed Oct 7, 2024
1 parent 1dc2c16 commit 4460dca
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 17 deletions.
13 changes: 2 additions & 11 deletions server/src/main/java/io/littlehorse/common/util/LHProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,13 @@ public LHProducer(LHServerConfig config) {
}

public Future<RecordMetadata> send(String key, AbstractCommand<?> t, String topic, Callback cb, Header... headers) {
return sendRecord(new ProducerRecord<>(topic, null, key, new Bytes(t.toBytes()), List.of(headers)), cb);
return doSend(new ProducerRecord<>(topic, null, key, new Bytes(t.toBytes()), List.of(headers)), cb);
}

public Future<RecordMetadata> send(String key, AbstractCommand<?> t, String topic) {
return this.send(key, t, topic, null);
}

public Future<RecordMetadata> sendRecord(ProducerRecord<String, Bytes> record, Callback cb) {
private Future<RecordMetadata> doSend(ProducerRecord<String, Bytes> record, Callback cb) {
return (cb != null) ? prod.send(record, cb) : prod.send(record);
}

public Future<RecordMetadata> sendToPartition(String key, AbstractCommand<?> val, String topic, int partition) {
Bytes valBytes = val == null ? null : new Bytes(val.toBytes());
return sendRecord(new ProducerRecord<>(topic, partition, key, valBytes), null);
}

public void close() {
this.prod.close();
}
Expand Down
6 changes: 5 additions & 1 deletion server/src/main/java/io/littlehorse/server/LHServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.common.header.Headers;
Expand All @@ -67,7 +68,10 @@ public class LHServer {
private Context.Key<RequestExecutionContext> contextKey = Context.key("executionContextKey");
private final MetadataCache metadataCache;
private final CoreStoreProvider coreStoreProvider;

@Getter
private final ScheduledExecutorService networkThreadpool;

private final List<LHServerListener> listeners;

private RequestExecutionContext requestContext() {
Expand Down Expand Up @@ -338,6 +342,6 @@ public LHHostInfo getAdvertisedHost(
}

public void onEventThrown(WorkflowEventModel event) {
internalComms.onWorkflowEventThrown(event);
networkThreadpool.submit(() -> internalComms.onWorkflowEventThrown(event));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.ProcessorContext;
Expand All @@ -29,6 +30,7 @@ public class LHTaskManager {
private final String timerTopicName;
private final String commandTopicName;
private final AuthorizationContext authContext;
private final ExecutorService networkThreadPool;

private final ProcessorContext<String, CommandProcessorOutput> processorContext;
private final TaskQueueManager taskQueueManager;
Expand All @@ -40,13 +42,15 @@ public LHTaskManager(
AuthorizationContext authContext,
ProcessorContext<String, CommandProcessorOutput> processorContext,
TaskQueueManager taskQueueManager,
TenantScopedStore coreStore) {
TenantScopedStore coreStore,
ExecutorService networkThreadPool) {
this.timerTopicName = timerTopicName;
this.commandTopicName = commandTopicName;
this.authContext = authContext;
this.processorContext = processorContext;
this.taskQueueManager = taskQueueManager;
this.coreStore = coreStore;
this.networkThreadPool = networkThreadPool;
}

/**
Expand Down Expand Up @@ -89,8 +93,8 @@ void forwardPendingTasks() {
ScheduledTaskModel scheduledTask = entry.getValue();
if (scheduledTask != null) {
this.coreStore.put(scheduledTask);
taskQueueManager.onTaskScheduled(
taskId, scheduledTask.getTaskDefId(), scheduledTask, authContext.tenantId());
networkThreadPool.submit(() -> taskQueueManager.onTaskScheduled(
taskId, scheduledTask.getTaskDefId(), scheduledTask, authContext.tenantId()));
} else {
this.coreStore.delete(scheduledTaskId, StoreableType.SCHEDULED_TASK);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ public LHTaskManager getTaskManager() {
authContext,
processorContext,
globalTaskQueueManager,
coreStore);
coreStore,
server.getNetworkThreadpool());
return currentTaskManager;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.littlehorse.server.streams.util.MetadataCache;
import java.time.Duration;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.header.Headers;
Expand All @@ -53,6 +54,7 @@ public class CommandProcessor implements Processor<String, Command, String, Comm
private final LHServer server;
private final MetadataCache metadataCache;
private final TaskQueueManager globalTaskQueueManager;
private final ExecutorService networkThreadPool;

private KeyValueStore<String, Bytes> nativeStore;
private KeyValueStore<String, Bytes> globalStore;
Expand All @@ -70,6 +72,7 @@ public CommandProcessor(
this.metadataCache = metadataCache;
this.globalTaskQueueManager = globalTaskQueueManager;
this.exceptionHandler = new LHProcessingExceptionHandler(server);
this.networkThreadPool = server.getNetworkThreadpool();
}

@Override
Expand Down Expand Up @@ -105,7 +108,19 @@ private void processHelper(final Record<String, Command> commandRecord) {
.setResult(response.toByteString())
.build();

server.onResponseReceived(command.getCommandId(), cmdReply);
// The 'onResponseReceived' method can involve waiting on a lock in the AsyncWaiters class;
// we don't want to do that here so submit to an executor for async processing.
//
// LHServer#onResponseReceived()
// BackendInternalComms#onResponseReceived()
// AsyncWaiters#registerCommandProcessed()
// CommandWaiter#setResponseAndMaybeComplete()
//
// The CommandWaiter method involves blocking on a lock. No need to hold up the stream thread
// for this.
networkThreadPool.submit(() -> {
server.onResponseReceived(command.getCommandId(), cmdReply);
});
}
} catch (KafkaException ke) {
throw ke;
Expand Down

0 comments on commit 4460dca

Please sign in to comment.