Skip to content

Commit

Permalink
taskqueue refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
eduwercamacaro committed Dec 20, 2024
1 parent 7c13d26 commit d540530
Show file tree
Hide file tree
Showing 6 changed files with 221 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class LHConstants {
public static final String PARTITION_METRICS_KEY = "partitionMetrics";

public static final Duration MAX_INCOMING_REQUEST_IDLE_TIME = Duration.ofSeconds(60);
public static final int MAX_TASKRUNS_IN_ONE_TASKQUEUE = 500;
public static final int MAX_TASKRUNS_IN_ONE_TASKQUEUE = 2000;

public static final String STRING_MASK = "*****";
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.littlehorse.server.monitoring.metrics;

import io.littlehorse.server.streams.taskqueue.OneTaskQueue;
import io.littlehorse.server.streams.taskqueue.TaskQueue;
import io.littlehorse.server.streams.taskqueue.TaskQueueManager;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
Expand Down Expand Up @@ -35,22 +35,21 @@ private void updateMetrics(MeterRegistry registry) {
taskQueueManager.all().stream()
.filter(queue -> !wasRegistered(registry, queue))
.forEach(queue -> {
log.trace("Adding new metric for queue {}", queue.getTaskDefName());
Gauge.builder(METRIC_NAME, queue, OneTaskQueue::size)
.tag(TENANT_ID_TAG, queue.getTenantId().getId())
.tag(TASK_NAME_TAG, queue.getTaskDefName())
log.trace("Adding new metric for queue {}", queue.taskDefName());
Gauge.builder(METRIC_NAME, queue, TaskQueue::size)
.tag(TENANT_ID_TAG, queue.tenantId().getId())
.tag(TASK_NAME_TAG, queue.taskDefName())
.register(registry);
});
Gauge.builder(REHYDRATION_COUNT_METRIC_NAME, taskQueueManager, TaskQueueManager::rehydrationCount)
.register(registry);
}

private boolean wasRegistered(MeterRegistry registry, OneTaskQueue queue) {
private boolean wasRegistered(MeterRegistry registry, TaskQueue queue) {
return registry.getMeters().stream()
.filter(meter -> meter.getId().getName().equals(METRIC_NAME))
.filter(meter ->
queue.getTenantId().getId().equals(meter.getId().getTag(TENANT_ID_TAG)))
.anyMatch(meter -> queue.getTaskDefName().equals(meter.getId().getTag(TASK_NAME_TAG)));
.filter(meter -> queue.tenantId().getId().equals(meter.getId().getTag(TENANT_ID_TAG)))
.anyMatch(meter -> queue.taskDefName().equals(meter.getId().getTag(TASK_NAME_TAG)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
// One instance of this class is responsible for coordinating the grpc backend for
// one specific TaskDef on one LH Server host.
@Slf4j
public class OneTaskQueue {
public class OneTaskQueue implements TaskQueue {

private final Queue<PollTaskRequestObserver> hungryClients;
private final Lock lock;
Expand All @@ -40,7 +40,6 @@ public class OneTaskQueue {
@Getter
private String taskDefName;

@Getter
private final TenantIdModel tenantId;

private final String instanceName;
Expand Down Expand Up @@ -169,6 +168,7 @@ public boolean onTaskScheduled(TaskId streamsTaskId, ScheduledTaskModel schedule
* that talks to the
* client who made the PollTaskRequest.
*/
@Override
public void onPollRequest(PollTaskRequestObserver requestObserver, RequestExecutionContext requestContext) {

if (taskDefName == null) {
Expand All @@ -192,7 +192,7 @@ public void onPollRequest(PollTaskRequestObserver requestObserver, RequestExecut
lock.lock();
if (pendingTasks.isEmpty()) {
for (Map.Entry<TaskId, TrackedPartition> taskHasMoreDataOnDisk : taskTrack.entrySet()) {
if (taskHasMoreDataOnDisk.getValue().hasMoreDataOnDisk()) {
if (taskHasMoreDataOnDisk.getValue().hasMoreDataOnDisk() || needsRehydration.get()) {
rehydrateFromStore(requestContext.getableManager(taskHasMoreDataOnDisk.getKey()));
}
}
Expand Down Expand Up @@ -289,15 +289,28 @@ private boolean notRehydratedYet(
&& maybe.getCreatedAt().compareTo(lastRehydratedTask) >= 0));
}

@Override
public void drainPartition(TaskId partitionToDrain) {
taskTrack.remove(partitionToDrain);
pendingTasks.removeIf(queueItem -> queueItem.streamsTaskId().equals(partitionToDrain));
}

@Override
public int size() {
return pendingTasks.size();
}

@Override
public TenantIdModel tenantId() {
return tenantId;
}

@Override
public String taskDefName() {
return taskDefName;
}

@Override
public long rehydratedCount() {
return rehydrationCount.get();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package io.littlehorse.server.streams.taskqueue;

import io.littlehorse.common.model.ScheduledTaskModel;
import io.littlehorse.common.model.getable.objectId.TenantIdModel;
import io.littlehorse.server.streams.topology.core.RequestExecutionContext;
import org.apache.kafka.streams.processor.TaskId;

public interface TaskQueue {

/**
* Called when a gRPC client (and its StreamObserver) disconnect, whether due to
* a clean
* shutdown (onCompleted()) or connection error (onError()).
*
* @param disconnectedObserver is the TaskQueueStreamObserver for the client whose
* connection is now gone.
*/
void onRequestDisconnected(PollTaskRequestObserver disconnectedObserver);

/**
* Called in two places: 1. In the CommandProcessorDaoImpl::scheduleTask() 2. In
* the
* CommandProcessor::init().
*
* <p>
* Item 1) is quite self-explanatory.
*
* <p>
* For Item 2), remember that the Task Queue Manager system is only in-memory.
* Upon a restart
* or rebalance, we need to rebuild that state. During the init() call, we
* iterate through all
* currently scheduled but not started tasks in the state store.
*
* @param scheduledTask is the ::getObjectId() for the TaskScheduleRequest
* that was just
* scheduled.
* @return True if the task was successfully scheduled, or False if the queue is full.
*/
boolean onTaskScheduled(TaskId streamsTaskId, ScheduledTaskModel scheduledTask);

/**
* Called when a grpc client sends a new PollTaskPb.
*
* @param requestObserver is the grpc StreamObserver representing the channel
* that talks to the
* client who made the PollTaskRequest.
*/
void onPollRequest(PollTaskRequestObserver requestObserver, RequestExecutionContext requestContext);

int size();

long rehydratedCount();

void drainPartition(TaskId partitionToDrain);

TenantIdModel tenantId();

String taskDefName();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package io.littlehorse.server.streams.taskqueue;

import io.littlehorse.common.model.ScheduledTaskModel;
import io.littlehorse.common.model.getable.objectId.TenantIdModel;
import io.littlehorse.server.streams.topology.core.RequestExecutionContext;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.processor.TaskId;

@Slf4j
public class TaskQueueImpl implements TaskQueue {

private final String taskDefName;
private final TaskQueueManager parent;
private final int capacity;
private final TenantIdModel tenantId;
private final Lock lock = new ReentrantLock();
private final Queue<PollTaskRequestObserver> hungryClients = new LinkedList<>();
private final String instanceName;
private final LinkedBlockingQueue<QueueItem> pendingTasks;
private final AtomicBoolean needsRehydration = new AtomicBoolean(false);

public TaskQueueImpl(String taskDefName, TaskQueueManager parent, int capacity, TenantIdModel tenantId) {
this.taskDefName = taskDefName;
this.parent = parent;
this.capacity = capacity;
this.tenantId = tenantId;
this.instanceName = parent.getBackend().getInstanceName();
this.pendingTasks = new LinkedBlockingQueue<>(capacity);
}

@Override
public void onRequestDisconnected(PollTaskRequestObserver disconnectedObserver) {
synchronizedBlock(() -> {
hungryClients.removeIf(thing -> {
log.trace(
"Instance {}: Removing task queue observer for taskdef {} with" + " client id {}: {}",
instanceName,
taskDefName,
disconnectedObserver.getClientId(),
disconnectedObserver);
return thing.equals(disconnectedObserver);
});
});
}

@Override
public boolean onTaskScheduled(TaskId streamTaskId, ScheduledTaskModel scheduledTask) {
boolean outOfCapacity = synchronizedBlock(() -> {
if (needsRehydration.get()) {
return true;
}
boolean added = pendingTasks.offer(new QueueItem(streamTaskId, scheduledTask));
if (!added) {
needsRehydration.set(true);
}
return !added;
});
if (!outOfCapacity && !hungryClients.isEmpty()) {
synchronizedBlock(() -> {
PollTaskRequestObserver hungryClient = hungryClients.poll();
if (hungryClient != null) {
parent.itsAMatch(scheduledTask, hungryClient);
}
});
}
return outOfCapacity;
}

@Override
public void onPollRequest(PollTaskRequestObserver requestObserver, RequestExecutionContext requestContext) {
synchronizedBlock(() -> {
QueueItem nextItem = pendingTasks.poll();
if (nextItem != null) {
parent.itsAMatch(nextItem.scheduledTask(), requestObserver);
} else {
hungryClients.add(requestObserver);
}
});
}

@Override
public int size() {
return pendingTasks.size();
}

@Override
public long rehydratedCount() {
return 0;
}

@Override
public void drainPartition(TaskId partitionToDrain) {
pendingTasks.removeIf(queueItem -> queueItem.streamsTaskId().equals(partitionToDrain));
}

@Override
public TenantIdModel tenantId() {
return tenantId;
}

@Override
public String taskDefName() {
return taskDefName;
}

private record QueueItem(TaskId streamsTaskId, ScheduledTaskModel scheduledTask) {}

private void synchronizedBlock(Runnable runnable) {
try {
lock.lock();
runnable.run();
} finally {
lock.unlock();
}
}

private boolean synchronizedBlock(Supplier<Boolean> booleanSupplier) {
try {
lock.lock();
return booleanSupplier.get();
} finally {
lock.unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
@Slf4j
public class TaskQueueManager implements Closeable {

private final ConcurrentHashMap<TenantTaskName, OneTaskQueue> taskQueues;
private final ConcurrentHashMap<TenantTaskName, TaskQueue> taskQueues;

@Getter
private LHServer backend;
Expand Down Expand Up @@ -56,14 +56,14 @@ public void itsAMatch(ScheduledTaskModel scheduledTask, PollTaskRequestObserver
taskQueueCommandProducer.returnTaskToClient(scheduledTask, luckyClient);
}

private OneTaskQueue getSubQueue(TenantTaskName tenantTask) {
private TaskQueue getSubQueue(TenantTaskName tenantTask) {
return taskQueues.computeIfAbsent(
tenantTask,
taskToCreate -> new OneTaskQueue(
taskToCreate -> new TaskQueueImpl(
taskToCreate.taskDefName(), this, individualQueueConfiguredCapacity, taskToCreate.tenantId()));
}

public Collection<OneTaskQueue> all() {
public Collection<TaskQueue> all() {
return taskQueues.values();
}

Expand Down

0 comments on commit d540530

Please sign in to comment.