From 155d3a3cd293e3ee08caf1a415dc46790f1500db Mon Sep 17 00:00:00 2001 From: chgl Date: Wed, 8 Jan 2025 22:00:07 +0100 Subject: [PATCH] feat!: changed object keys to help with deterministic replay --- .../streams/fhirtoserver/S3BundleStore.java | 148 ++++++++++++++---- .../fhirtoserver/SendToServerProcessor.java | 9 +- 2 files changed, 120 insertions(+), 37 deletions(-) diff --git a/src/main/java/org/miracum/streams/fhirtoserver/S3BundleStore.java b/src/main/java/org/miracum/streams/fhirtoserver/S3BundleStore.java index 8bfd496..1f4a7c5 100644 --- a/src/main/java/org/miracum/streams/fhirtoserver/S3BundleStore.java +++ b/src/main/java/org/miracum/streams/fhirtoserver/S3BundleStore.java @@ -8,13 +8,17 @@ import java.io.IOException; import java.time.Instant; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import org.apache.commons.io.output.StringBuilderWriter; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.r4.model.Bundle; +import org.hl7.fhir.r4.model.Bundle.BundleEntryComponent; import org.hl7.fhir.r4.model.Bundle.BundleType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.MessageHeaders; import org.springframework.stereotype.Service; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.S3Client; @@ -58,42 +62,30 @@ public Void storeBatch(List bundles) throws DataFormatException, IOExcep grouped.size(), grouped.keySet()); - var parser = fhirContext.newJsonParser(); - for (var entry : grouped.entrySet()) { - try (var stringWriter = new StringBuilderWriter()) { - var resourceType = entry.getKey(); - boolean isFirstResource = true; - for (var resource : entry.getValue()) { - if (!(isFirstResource)) { - stringWriter.write("\n"); - } - isFirstResource = false; - - parser.encodeResourceToWriter(resource, stringWriter); - } + var resourceType = entry.getKey(); + var ndjson = listOfResourcesToNdjson(entry.getValue()); - var prefix = config.objectNamePrefix().orElse(""); - - var objectKey = - String.format( - "%s%s/bundle-%s.ndjson", prefix, resourceType, Instant.now().toEpochMilli()); - - LOG.debug( - "Storing {} resources of type {} as object {}", - entry.getValue().size(), - entry.getKey(), - objectKey); - - var body = RequestBody.fromString(stringWriter.toString()); - s3Client.putObject( - request -> - request - .bucket(config.bucketName()) - .key(objectKey) - .contentType(Constants.CT_FHIR_NDJSON), - body); - } + var prefix = config.objectNamePrefix().orElse(""); + + var objectKey = + String.format( + "%s%s/bundle-%s.ndjson", prefix, resourceType, Instant.now().toEpochMilli()); + + LOG.debug( + "Storing {} resources of type {} as object {}", + entry.getValue().size(), + entry.getKey(), + objectKey); + + var body = RequestBody.fromString(ndjson); + s3Client.putObject( + request -> + request + .bucket(config.bucketName()) + .key(objectKey) + .contentType(Constants.CT_FHIR_NDJSON), + body); } // now, deal with the DELETE entries @@ -103,18 +95,46 @@ public Void storeBatch(List bundles) throws DataFormatException, IOExcep mergedBundle.deletBundle().getEntry().stream() .collect(Collectors.groupingBy(e -> e.getRequest().getUrl().split("/")[0])); + storeDeleteBundles(groupedDeletes); + + return null; + } + + private String listOfResourcesToNdjson(List resources) + throws DataFormatException, IOException { + var parser = fhirContext.newJsonParser(); + boolean isFirstResource = true; + try (var stringWriter = new StringBuilderWriter()) { + for (var resource : resources) { + if (!(isFirstResource)) { + stringWriter.write("\n"); + } + isFirstResource = false; + + parser.encodeResourceToWriter(resource, stringWriter); + } + return stringWriter.toString(); + } + } + + private void storeDeleteBundles(Map> groupedDeletes) { LOG.debug( "Storing {} delete requests in buckets ({})", groupedDeletes.size(), groupedDeletes.keySet()); + var parser = fhirContext.newJsonParser(); + // each entry is one resource type for (var entry : groupedDeletes.entrySet()) { + LOG.debug("Processing resource type {}", entry.getKey()); + var deleteBundle = new Bundle(); deleteBundle.setType(BundleType.TRANSACTION); // turns all entries of the merged bundles into a single one // per resource type + // Note that this is not an NDJSON but instead a regular bundle for (var bundleEntry : entry.getValue()) { deleteBundle.addEntry().setRequest(bundleEntry.getRequest()); } @@ -143,6 +163,66 @@ public Void storeBatch(List bundles) throws DataFormatException, IOExcep .contentType(Constants.CT_FHIR_JSON_NEW), body); } + } + + public Void storeSingleBundle(Bundle bundle, MessageHeaders messageHeaders) + throws DataFormatException, IOException { + var mergedBundle = + merger.mergeSeperateDeleteBundles( + List.of(bundle), mergerConfig.entryUniquenessFhirpathExpression()); + + // extract all POST/PUT bundle entries (mergedBundle.deleteBundle() contains the DELETE entries) + var resources = BundleUtil.toListOfResources(fhirContext, mergedBundle.bundle()); + + var grouped = resources.stream().collect(Collectors.groupingBy(IBaseResource::fhirType)); + + for (var entry : grouped.entrySet()) { + var resourceType = entry.getKey(); + var ndjson = listOfResourcesToNdjson(entry.getValue()); + + var prefix = config.objectNamePrefix().orElse(""); + + var timestamp = messageHeaders.get(KafkaHeaders.RECEIVED_TIMESTAMP); + var partition = messageHeaders.get(KafkaHeaders.RECEIVED_PARTITION); + var offset = messageHeaders.get(KafkaHeaders.OFFSET); + + var objectKey = + String.format("%s%s/%s-%s-%s.ndjson", prefix, resourceType, timestamp, partition, offset); + + LOG.debug( + "Storing {} resources of type {} as object {}", + entry.getValue().size(), + entry.getKey(), + objectKey); + + var body = RequestBody.fromString(ndjson); + var metadata = + Map.of( + "kafka-timestamp", + timestamp.toString(), + "kafka-partition", + partition.toString(), + "kafka-offset", + offset.toString(), + "kafka-topic", + messageHeaders.getOrDefault(KafkaHeaders.RECEIVED_TOPIC, "").toString(), + "kafka-key", + messageHeaders.getOrDefault(KafkaHeaders.RECEIVED_KEY, "").toString(), + "kafka-group-id", + messageHeaders.getOrDefault(KafkaHeaders.GROUP_ID, "").toString()); + + s3Client.putObject( + request -> + request + .bucket(config.bucketName()) + .key(objectKey) + .metadata(metadata) + .contentType(Constants.CT_FHIR_NDJSON), + body); + } + + // TODO: DELETE bundle entries are not handled here yet + return null; } } diff --git a/src/main/java/org/miracum/streams/fhirtoserver/SendToServerProcessor.java b/src/main/java/org/miracum/streams/fhirtoserver/SendToServerProcessor.java index 1f4f87b..d8558e0 100644 --- a/src/main/java/org/miracum/streams/fhirtoserver/SendToServerProcessor.java +++ b/src/main/java/org/miracum/streams/fhirtoserver/SendToServerProcessor.java @@ -31,6 +31,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.lang.Nullable; +import org.springframework.messaging.Message; import org.springframework.retry.RetryCallback; import org.springframework.retry.RetryContext; import org.springframework.retry.RetryListener; @@ -239,8 +240,10 @@ Consumer> sinkBatch() { } @Bean - Consumer sinkSingle() { - return resource -> { + Consumer> sinkSingle() { + return message -> { + var resource = message.getPayload(); + if (resource == null) { LOG.warn("resource is null. Ignoring."); messageNullCounter.increment(); @@ -264,7 +267,7 @@ Consumer sinkSingle() { if (s3Config.enabled()) { LOG.debug("Sending bundle to object storage"); try { - retryTemplate.execute(context -> s3Store.storeBatch(List.of(bundle))); + retryTemplate.execute(context -> s3Store.storeSingleBundle(bundle, message.getHeaders())); } catch (Exception e) { throw new RuntimeException(e); }