Skip to content

Commit

Permalink
feat: aded metrics for processed resource counts for debugging
Browse files Browse the repository at this point in the history
  • Loading branch information
chgl committed Jan 10, 2025
1 parent d5892ff commit 108b573
Showing 1 changed file with 24 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import java.io.IOException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.logging.log4j.util.Strings;
Expand Down Expand Up @@ -85,6 +88,17 @@ public class SendToServerProcessor {
private static final DistributionSummary sendBundleSizeDistribution =
Metrics.globalRegistry.summary("fhirtoserver.fhir.batch.bundle.size");

private static final Counter totalMessagesReadCounter =
Metrics.globalRegistry.counter("fhirtoserver.fhir.messages.read.total");

private static final Set<String> distinctBundleIds =
Metrics.globalRegistry.gaugeCollectionSize(
"fhirtoserver.fhir.bundle.distinct.ids", Tags.empty(), ConcurrentHashMap.newKeySet());

private static final Set<String> distinctResourceIds =
Metrics.globalRegistry.gaugeCollectionSize(
"fhirtoserver.fhir.resource.distinct.ids", Tags.empty(), ConcurrentHashMap.newKeySet());

private final IGenericClient client;
private final RetryTemplate retryTemplate;
private final String fhirPathFilterExpression;
Expand All @@ -95,7 +109,7 @@ public class SendToServerProcessor {
private final FhirBundleMergerConfig batchMergingConfig;

private final S3Config s3Config;
private S3BundleStore s3Store;
private final S3BundleStore s3Store;

public SendToServerProcessor(
IGenericClient fhirClient,
Expand Down Expand Up @@ -245,6 +259,7 @@ Consumer<Message<List<Resource>>> sinkBatch() {
@Bean
Consumer<Message<Resource>> sinkSingle() {
return message -> {
totalMessagesReadCounter.increment();
if (message == null) {
LOG.warn("message is null. Ignoring.");
messageNullCounter.increment();
Expand All @@ -259,12 +274,20 @@ Consumer<Message<Resource>> sinkSingle() {
return;
}

distinctBundleIds.add(bundle.getId());

if (bundle.getEntry().isEmpty()) {
LOG.warn("received batch is empty. Ignoring.");
messageEmptyCounter.increment();
return;
}

// TODO: assumes entry with resource vs DELETE requests
// only for debugging purposes now.
for (var entry : bundle.getEntry()) {
distinctResourceIds.add(entry.getResource().getId());
}

LOG.debug("Processing single bundle {}", kv("bundleId", bundle.getId()));

if (s3Config.enabled()) {
Expand Down

0 comments on commit 108b573

Please sign in to comment.