Skip to content

Commit

Permalink
feat!: changed object keys to help with deterministic replay
Browse files Browse the repository at this point in the history
  • Loading branch information
chgl committed Jan 8, 2025
1 parent 1616df6 commit 155d3a3
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 37 deletions.
148 changes: 114 additions & 34 deletions src/main/java/org/miracum/streams/fhirtoserver/S3BundleStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@
import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.io.output.StringBuilderWriter;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r4.model.Bundle;
import org.hl7.fhir.r4.model.Bundle.BundleEntryComponent;
import org.hl7.fhir.r4.model.Bundle.BundleType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Service;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
Expand Down Expand Up @@ -58,42 +62,30 @@ public Void storeBatch(List<Bundle> bundles) throws DataFormatException, IOExcep
grouped.size(),
grouped.keySet());

var parser = fhirContext.newJsonParser();

for (var entry : grouped.entrySet()) {
try (var stringWriter = new StringBuilderWriter()) {
var resourceType = entry.getKey();
boolean isFirstResource = true;
for (var resource : entry.getValue()) {
if (!(isFirstResource)) {
stringWriter.write("\n");
}
isFirstResource = false;

parser.encodeResourceToWriter(resource, stringWriter);
}
var resourceType = entry.getKey();
var ndjson = listOfResourcesToNdjson(entry.getValue());

var prefix = config.objectNamePrefix().orElse("");

var objectKey =
String.format(
"%s%s/bundle-%s.ndjson", prefix, resourceType, Instant.now().toEpochMilli());

LOG.debug(
"Storing {} resources of type {} as object {}",
entry.getValue().size(),
entry.getKey(),
objectKey);

var body = RequestBody.fromString(stringWriter.toString());
s3Client.putObject(
request ->
request
.bucket(config.bucketName())
.key(objectKey)
.contentType(Constants.CT_FHIR_NDJSON),
body);
}
var prefix = config.objectNamePrefix().orElse("");

var objectKey =
String.format(
"%s%s/bundle-%s.ndjson", prefix, resourceType, Instant.now().toEpochMilli());

LOG.debug(
"Storing {} resources of type {} as object {}",
entry.getValue().size(),
entry.getKey(),
objectKey);

var body = RequestBody.fromString(ndjson);
s3Client.putObject(
request ->
request
.bucket(config.bucketName())
.key(objectKey)
.contentType(Constants.CT_FHIR_NDJSON),
body);
}

// now, deal with the DELETE entries
Expand All @@ -103,18 +95,46 @@ public Void storeBatch(List<Bundle> bundles) throws DataFormatException, IOExcep
mergedBundle.deletBundle().getEntry().stream()
.collect(Collectors.groupingBy(e -> e.getRequest().getUrl().split("/")[0]));

storeDeleteBundles(groupedDeletes);

return null;
}

private String listOfResourcesToNdjson(List<IBaseResource> resources)
throws DataFormatException, IOException {
var parser = fhirContext.newJsonParser();
boolean isFirstResource = true;
try (var stringWriter = new StringBuilderWriter()) {
for (var resource : resources) {
if (!(isFirstResource)) {
stringWriter.write("\n");
}
isFirstResource = false;

parser.encodeResourceToWriter(resource, stringWriter);
}
return stringWriter.toString();
}
}

private void storeDeleteBundles(Map<String, List<BundleEntryComponent>> groupedDeletes) {
LOG.debug(
"Storing {} delete requests in buckets ({})",
groupedDeletes.size(),
groupedDeletes.keySet());

var parser = fhirContext.newJsonParser();

// each entry is one resource type
for (var entry : groupedDeletes.entrySet()) {
LOG.debug("Processing resource type {}", entry.getKey());

var deleteBundle = new Bundle();
deleteBundle.setType(BundleType.TRANSACTION);

// turns all entries of the merged bundles into a single one
// per resource type
// Note that this is not an NDJSON but instead a regular bundle
for (var bundleEntry : entry.getValue()) {
deleteBundle.addEntry().setRequest(bundleEntry.getRequest());
}
Expand Down Expand Up @@ -143,6 +163,66 @@ public Void storeBatch(List<Bundle> bundles) throws DataFormatException, IOExcep
.contentType(Constants.CT_FHIR_JSON_NEW),
body);
}
}

public Void storeSingleBundle(Bundle bundle, MessageHeaders messageHeaders)
throws DataFormatException, IOException {
var mergedBundle =
merger.mergeSeperateDeleteBundles(
List.of(bundle), mergerConfig.entryUniquenessFhirpathExpression());

// extract all POST/PUT bundle entries (mergedBundle.deleteBundle() contains the DELETE entries)
var resources = BundleUtil.toListOfResources(fhirContext, mergedBundle.bundle());

var grouped = resources.stream().collect(Collectors.groupingBy(IBaseResource::fhirType));

for (var entry : grouped.entrySet()) {
var resourceType = entry.getKey();
var ndjson = listOfResourcesToNdjson(entry.getValue());

var prefix = config.objectNamePrefix().orElse("");

var timestamp = messageHeaders.get(KafkaHeaders.RECEIVED_TIMESTAMP);
var partition = messageHeaders.get(KafkaHeaders.RECEIVED_PARTITION);
var offset = messageHeaders.get(KafkaHeaders.OFFSET);

var objectKey =
String.format("%s%s/%s-%s-%s.ndjson", prefix, resourceType, timestamp, partition, offset);

LOG.debug(
"Storing {} resources of type {} as object {}",
entry.getValue().size(),
entry.getKey(),
objectKey);

var body = RequestBody.fromString(ndjson);
var metadata =
Map.of(
"kafka-timestamp",
timestamp.toString(),
"kafka-partition",
partition.toString(),
"kafka-offset",
offset.toString(),
"kafka-topic",
messageHeaders.getOrDefault(KafkaHeaders.RECEIVED_TOPIC, "").toString(),
"kafka-key",
messageHeaders.getOrDefault(KafkaHeaders.RECEIVED_KEY, "").toString(),
"kafka-group-id",
messageHeaders.getOrDefault(KafkaHeaders.GROUP_ID, "").toString());

s3Client.putObject(
request ->
request
.bucket(config.bucketName())
.key(objectKey)
.metadata(metadata)
.contentType(Constants.CT_FHIR_NDJSON),
body);
}

// TODO: DELETE bundle entries are not handled here yet

return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryListener;
Expand Down Expand Up @@ -239,8 +240,10 @@ Consumer<List<Resource>> sinkBatch() {
}

@Bean
Consumer<Resource> sinkSingle() {
return resource -> {
Consumer<Message<Resource>> sinkSingle() {
return message -> {
var resource = message.getPayload();

if (resource == null) {
LOG.warn("resource is null. Ignoring.");
messageNullCounter.increment();
Expand All @@ -264,7 +267,7 @@ Consumer<Resource> sinkSingle() {
if (s3Config.enabled()) {
LOG.debug("Sending bundle to object storage");
try {
retryTemplate.execute(context -> s3Store.storeBatch(List.of(bundle)));
retryTemplate.execute(context -> s3Store.storeSingleBundle(bundle, message.getHeaders()));
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down

0 comments on commit 155d3a3

Please sign in to comment.