Skip to content

Commit

Permalink
async waiters metric
Browse files Browse the repository at this point in the history
  • Loading branch information
eduwercamacaro committed Dec 19, 2024
1 parent 745d67e commit 4673dac
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,8 @@ public Properties getKafkaCommandProducerConfig(String component) {
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
org.apache.kafka.common.serialization.StringSerializer.class);
conf.put(ProducerConfig.ACKS_CONFIG, "all");
conf.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000");
conf.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "10000");
conf.put(ProducerConfig.LINGER_MS_CONFIG, getOrSetDefault(LINGER_MS_KEY, "0"));
addKafkaSecuritySettings(conf);
return conf;
Expand All @@ -707,7 +709,7 @@ public Properties getKafkaTaskClaimProducer() {
Properties conf = getKafkaCommandProducerConfig("task-claim");
// conf.put(ProducerConfig.ACKS_CONFIG, "0");
// conf.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
// conf.put(ProducerConfig.LINGER_MS_CONFIG, "0");
// conf.put(ProducerConfig.LINGER_MS_CONFIG, "200");
// conf.put(ProducerConfig.BATCH_SIZE_CONFIG, "49152");
// conf.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1000");
return conf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public HealthService(
TaskQueueManager taskQueueManager,
MetadataCache metadataCache,
BackendInternalComms internalComms) {
this.prom = new PrometheusMetricExporter(config);
this.prom = new PrometheusMetricExporter(config, internalComms.asyncWaiters());
this.numberOfPartitionPerTopic = config.partitionsByTopic();
this.jettyThreadPool.setName("javalin-service");
this.jettyThreadPool.setMaxThreads(10);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.littlehorse.server.monitoring.metrics;

import io.littlehorse.server.streams.util.AsyncWaiters;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.MeterBinder;

public class AsyncWaiterMetrics implements MeterBinder {

private static final String METRIC_NAME = "async_waiter_size";
private final AsyncWaiters asyncWaiters;

public AsyncWaiterMetrics(AsyncWaiters asyncWaiters) {
this.asyncWaiters = asyncWaiters;
}

@Override
public void bindTo(MeterRegistry meterRegistry) {
Gauge.builder(METRIC_NAME, asyncWaiters, AsyncWaiters::size).register(meterRegistry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.littlehorse.common.LHServerConfig;
import io.littlehorse.server.monitoring.StandbyMetrics;
import io.littlehorse.server.streams.taskqueue.TaskQueueManager;
import io.littlehorse.server.streams.util.AsyncWaiters;
import io.littlehorse.server.streams.util.MetadataCache;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
Expand All @@ -27,11 +28,13 @@ public class PrometheusMetricExporter implements Closeable {
private PrometheusMeterRegistry prometheusRegistry;
private LHServerConfig config;
private TaskQueueManagerMetrics taskQueueManagerMetrics;
private final AsyncWaiters asyncWaiters;

public PrometheusMetricExporter(LHServerConfig config) {
public PrometheusMetricExporter(LHServerConfig config, AsyncWaiters asyncWaiters) {
this.config = config;
this.prometheusRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
this.prometheusRegistry.config().commonTags("application_id", config.getLHClusterId());
this.asyncWaiters = asyncWaiters;
new ServerMetricFilter(prometheusRegistry, ServerFilterRules.fromLevel(config.getServerMetricLevel()))
.initialize();
}
Expand Down Expand Up @@ -65,6 +68,9 @@ public void bind(
taskQueueManagerMetrics = new TaskQueueManagerMetrics(taskQueueManager);
taskQueueManagerMetrics.bindTo(prometheusRegistry);

AsyncWaiterMetrics asyncWaiterMetrics = new AsyncWaiterMetrics(asyncWaiters);
asyncWaiterMetrics.bindTo(prometheusRegistry);

JvmMemoryMetrics jvmMeter = new JvmMemoryMetrics();
jvmMeter.bindTo(prometheusRegistry);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public class TaskQueueManagerMetrics implements MeterBinder, Closeable {
public static final String METRIC_NAME = "lh_in_memory_task_queue_size";
public static final String TENANT_ID_TAG = "tenant_id";
public static final String TASK_NAME_TAG = "task_name";
public static final String REHYDRATION_COUNT_METRIC_NAME = "taskqueue_rehydration_count";
private final TaskQueueManager taskQueueManager;
private final ScheduledExecutorService mainExecutor;

Expand All @@ -40,6 +41,8 @@ private void updateMetrics(MeterRegistry registry) {
.tag(TASK_NAME_TAG, queue.getTaskDefName())
.register(registry);
});
Gauge.builder(REHYDRATION_COUNT_METRIC_NAME, taskQueueManager, TaskQueueManager::rehydrationCount)
.register(registry);
}

private boolean wasRegistered(MeterRegistry registry, OneTaskQueue queue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public class BackendInternalComms implements Closeable {
private ChannelCredentials clientCreds;

private Map<String, ManagedChannel> channels;
private AsyncWaiters asyncWaiters;
private final AsyncWaiters asyncWaiters;
private ConcurrentHashMap<HostInfo, InternalGetAdvertisedHostsResponse> otherHosts;

private final Context.Key<RequestExecutionContext> contextKey;
Expand Down Expand Up @@ -233,6 +233,10 @@ public <U extends Message, T extends AbstractGetable<U>> T getObject(
}
}

public AsyncWaiters asyncWaiters() {
return asyncWaiters;
}

public ProducerCommandCallback createProducerCommandCallback(
AbstractCommand<?> command,
StreamObserver<WaitForCommandResponse> observer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import lombok.Getter;
Expand All @@ -42,6 +43,7 @@ public class OneTaskQueue {
private TenantIdModel tenantId;

private String instanceName;
private AtomicLong rehydrationCount = new AtomicLong(0);

private final Map<TaskId, TrackedPartition> taskTrack = new ConcurrentHashMap<>();

Expand Down Expand Up @@ -226,6 +228,7 @@ public boolean hasMoreTasksOnDisk(TaskId streamsTaskId) {
*/
private void rehydrateFromStore(ReadOnlyGetableManager readOnlyGetableManager) {
log.debug("Rehydrating");
rehydrationCount.incrementAndGet();
if (readOnlyGetableManager.getSpecificTask().isEmpty()) {
throw new IllegalStateException("Only specific task rehydration is permitted.");
}
Expand Down Expand Up @@ -286,5 +289,9 @@ public int size() {
return pendingTasks.size();
}

public long rehydratedCount() {
return rehydrationCount.get();
}

private record QueueItem(TaskId streamsTaskId, ScheduledTaskModel scheduledTask) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ public Collection<OneTaskQueue> all() {
return taskQueues.values();
}

public long rehydrationCount() {
return taskQueues.values().stream()
.mapToLong(queue -> queue.rehydratedCount())
.sum();
}

@Override
public void close() {
taskQueueCommandProducer.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ public void registerObserverWaitingForWorkflowEvent(
}
}

public long size() {
return commandWaiters.size();
}

public void handleRebalance(Set<TaskId> assignedTasks) {
Set<Integer> assignedPartitions =
assignedTasks.stream().map(TaskId::partition).collect(Collectors.toSet());
Expand Down

0 comments on commit 4673dac

Please sign in to comment.