diff --git a/src/main/java/org/miracum/streams/fhirtoserver/SendToServerProcessor.java b/src/main/java/org/miracum/streams/fhirtoserver/SendToServerProcessor.java index 25abdcb..1fb78d7 100644 --- a/src/main/java/org/miracum/streams/fhirtoserver/SendToServerProcessor.java +++ b/src/main/java/org/miracum/streams/fhirtoserver/SendToServerProcessor.java @@ -12,6 +12,7 @@ import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.Tags; import io.micrometer.core.instrument.Timer; import java.io.IOException; import java.security.InvalidKeyException; @@ -19,6 +20,8 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import org.apache.logging.log4j.util.Strings; @@ -85,6 +88,17 @@ public class SendToServerProcessor { private static final DistributionSummary sendBundleSizeDistribution = Metrics.globalRegistry.summary("fhirtoserver.fhir.batch.bundle.size"); + private static final Counter totalMessagesReadCounter = + Metrics.globalRegistry.counter("fhirtoserver.fhir.messages.read.total"); + + private static final Set distinctBundleIds = + Metrics.globalRegistry.gaugeCollectionSize( + "fhirtoserver.fhir.bundle.distinct.ids", Tags.empty(), ConcurrentHashMap.newKeySet()); + + private static final Set distinctResourceIds = + Metrics.globalRegistry.gaugeCollectionSize( + "fhirtoserver.fhir.resource.distinct.ids", Tags.empty(), ConcurrentHashMap.newKeySet()); + private final IGenericClient client; private final RetryTemplate retryTemplate; private final String fhirPathFilterExpression; @@ -95,7 +109,7 @@ public class SendToServerProcessor { private final FhirBundleMergerConfig batchMergingConfig; private final S3Config s3Config; - private S3BundleStore s3Store; + private final S3BundleStore s3Store; public SendToServerProcessor( IGenericClient fhirClient, @@ -245,6 +259,7 @@ Consumer>> sinkBatch() { @Bean Consumer> sinkSingle() { return message -> { + totalMessagesReadCounter.increment(); if (message == null) { LOG.warn("message is null. Ignoring."); messageNullCounter.increment(); @@ -259,12 +274,22 @@ Consumer> sinkSingle() { return; } + if (bundle.hasId()) { + distinctBundleIds.add(bundle.getId()); + } + if (bundle.getEntry().isEmpty()) { LOG.warn("received batch is empty. Ignoring."); messageEmptyCounter.increment(); return; } + // TODO: assumes entry with resource vs DELETE requests + // only for debugging purposes now. + for (var entry : bundle.getEntry()) { + distinctResourceIds.add(entry.getResource().getId()); + } + LOG.debug("Processing single bundle {}", kv("bundleId", bundle.getId())); if (s3Config.enabled()) {