Skip to content

Commit

Permalink
feat!: default to single message processing and change s3 object keys (
Browse files Browse the repository at this point in the history
…#136)

BREAKING CHANGE: default back to single message processing

Batch.processing can be enabled by setting `SPRING_PROFILES_ACTIVE=prod,batch`

* fix: only expose health and metrics endpoints

BREAKING CHANGE: changed object keys to help with deterministic replay

* fix: npe
  • Loading branch information
chgl authored Jan 8, 2025
1 parent 505d67c commit ca5227a
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 51 deletions.
148 changes: 114 additions & 34 deletions src/main/java/org/miracum/streams/fhirtoserver/S3BundleStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@
import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.io.output.StringBuilderWriter;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r4.model.Bundle;
import org.hl7.fhir.r4.model.Bundle.BundleEntryComponent;
import org.hl7.fhir.r4.model.Bundle.BundleType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Service;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
Expand Down Expand Up @@ -58,42 +62,30 @@ public Void storeBatch(List<Bundle> bundles) throws DataFormatException, IOExcep
grouped.size(),
grouped.keySet());

var parser = fhirContext.newJsonParser();

for (var entry : grouped.entrySet()) {
try (var stringWriter = new StringBuilderWriter()) {
var resourceType = entry.getKey();
boolean isFirstResource = true;
for (var resource : entry.getValue()) {
if (!(isFirstResource)) {
stringWriter.write("\n");
}
isFirstResource = false;

parser.encodeResourceToWriter(resource, stringWriter);
}
var resourceType = entry.getKey();
var ndjson = listOfResourcesToNdjson(entry.getValue());

var prefix = config.objectNamePrefix().orElse("");

var objectKey =
String.format(
"%s%s/bundle-%s.ndjson", prefix, resourceType, Instant.now().toEpochMilli());

LOG.debug(
"Storing {} resources of type {} as object {}",
entry.getValue().size(),
entry.getKey(),
objectKey);

var body = RequestBody.fromString(stringWriter.toString());
s3Client.putObject(
request ->
request
.bucket(config.bucketName())
.key(objectKey)
.contentType(Constants.CT_FHIR_NDJSON),
body);
}
var prefix = config.objectNamePrefix().orElse("");

var objectKey =
String.format(
"%s%s/bundle-%s.ndjson", prefix, resourceType, Instant.now().toEpochMilli());

LOG.debug(
"Storing {} resources of type {} as object {}",
entry.getValue().size(),
entry.getKey(),
objectKey);

var body = RequestBody.fromString(ndjson);
s3Client.putObject(
request ->
request
.bucket(config.bucketName())
.key(objectKey)
.contentType(Constants.CT_FHIR_NDJSON),
body);
}

// now, deal with the DELETE entries
Expand All @@ -103,18 +95,46 @@ public Void storeBatch(List<Bundle> bundles) throws DataFormatException, IOExcep
mergedBundle.deletBundle().getEntry().stream()
.collect(Collectors.groupingBy(e -> e.getRequest().getUrl().split("/")[0]));

storeDeleteBundles(groupedDeletes);

return null;
}

private String listOfResourcesToNdjson(List<IBaseResource> resources)
throws DataFormatException, IOException {
var parser = fhirContext.newJsonParser();
boolean isFirstResource = true;
try (var stringWriter = new StringBuilderWriter()) {
for (var resource : resources) {
if (!(isFirstResource)) {
stringWriter.write("\n");
}
isFirstResource = false;

parser.encodeResourceToWriter(resource, stringWriter);
}
return stringWriter.toString();
}
}

private void storeDeleteBundles(Map<String, List<BundleEntryComponent>> groupedDeletes) {
LOG.debug(
"Storing {} delete requests in buckets ({})",
groupedDeletes.size(),
groupedDeletes.keySet());

var parser = fhirContext.newJsonParser();

// each entry is one resource type
for (var entry : groupedDeletes.entrySet()) {
LOG.debug("Processing resource type {}", entry.getKey());

var deleteBundle = new Bundle();
deleteBundle.setType(BundleType.TRANSACTION);

// turns all entries of the merged bundles into a single one
// per resource type
// Note that this is not an NDJSON but instead a regular bundle
for (var bundleEntry : entry.getValue()) {
deleteBundle.addEntry().setRequest(bundleEntry.getRequest());
}
Expand Down Expand Up @@ -143,6 +163,66 @@ public Void storeBatch(List<Bundle> bundles) throws DataFormatException, IOExcep
.contentType(Constants.CT_FHIR_JSON_NEW),
body);
}
}

public Void storeSingleBundle(Bundle bundle, MessageHeaders messageHeaders)
throws DataFormatException, IOException {
var mergedBundle =
merger.mergeSeperateDeleteBundles(
List.of(bundle), mergerConfig.entryUniquenessFhirpathExpression());

// extract all POST/PUT bundle entries (mergedBundle.deleteBundle() contains the DELETE entries)
var resources = BundleUtil.toListOfResources(fhirContext, mergedBundle.bundle());

var grouped = resources.stream().collect(Collectors.groupingBy(IBaseResource::fhirType));

for (var entry : grouped.entrySet()) {
var resourceType = entry.getKey();
var ndjson = listOfResourcesToNdjson(entry.getValue());

var prefix = config.objectNamePrefix().orElse("");

var timestamp = messageHeaders.get(KafkaHeaders.RECEIVED_TIMESTAMP);
var partition = messageHeaders.get(KafkaHeaders.RECEIVED_PARTITION);
var offset = messageHeaders.get(KafkaHeaders.OFFSET);

var objectKey =
String.format("%s%s/%s-%s-%s.ndjson", prefix, resourceType, timestamp, partition, offset);

LOG.debug(
"Storing {} resources of type {} as object {}",
entry.getValue().size(),
entry.getKey(),
objectKey);

var body = RequestBody.fromString(ndjson);
var metadata =
Map.of(
"kafka-timestamp",
timestamp.toString(),
"kafka-partition",
partition.toString(),
"kafka-offset",
offset.toString(),
"kafka-topic",
messageHeaders.getOrDefault(KafkaHeaders.RECEIVED_TOPIC, "").toString(),
"kafka-key",
messageHeaders.getOrDefault(KafkaHeaders.RECEIVED_KEY, "").toString(),
"kafka-group-id",
messageHeaders.getOrDefault(KafkaHeaders.GROUP_ID, "").toString());

s3Client.putObject(
request ->
request
.bucket(config.bucketName())
.key(objectKey)
.metadata(metadata)
.contentType(Constants.CT_FHIR_NDJSON),
body);
}

// TODO: DELETE bundle entries are not handled here yet

return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryListener;
Expand Down Expand Up @@ -164,7 +165,7 @@ public <T, E extends Throwable> void onError(
}

@Bean
Consumer<List<Resource>> sink() {
Consumer<List<Resource>> sinkBatch() {
return resourceBatch -> {
if (resourceBatch == null) {
LOG.warn("resource is null. Ignoring.");
Expand Down Expand Up @@ -213,10 +214,10 @@ Consumer<List<Resource>> sink() {
fhirBundleMerger.partitionBundle(
mergedBundle, batchMergingConfig.bundleMaxSize().get());
for (var bundle : partitionedBundles) {
sendSingleBundle(bundle);
sendSingleBundleToServer(bundle);
}
} else {
sendSingleBundle(mergedBundle);
sendSingleBundleToServer(mergedBundle);
}
} else {
LOG.debug("Sending all bundles in batch one by one");
Expand All @@ -232,13 +233,52 @@ Consumer<List<Resource>> sink() {
MDC.put("bundleFirstEntryType", firstEntryResource.getResourceType().name());
}

sendSingleBundle(bundle);
sendSingleBundleToServer(bundle);
}
}
};
}

void sendSingleBundle(Bundle bundle) {
@Bean
Consumer<Message<Resource>> sinkSingle() {
return message -> {
if (message == null) {
LOG.warn("message is null. Ignoring.");
messageNullCounter.increment();
return;
}

var resource = message.getPayload();

if (!(resource instanceof Bundle bundle)) {
LOG.warn("Can only process resources of type Bundle. Ignoring.");
unsupportedResourceTypeCounter.increment();
return;
}

if (bundle.getEntry().isEmpty()) {
LOG.warn("received batch is empty. Ignoring.");
messageEmptyCounter.increment();
return;
}

LOG.debug("Processing single bundle {}", kv("bundleId", bundle.getId()));

if (s3Config.enabled()) {
LOG.debug("Sending bundle to object storage");
try {
retryTemplate.execute(context -> s3Store.storeSingleBundle(bundle, message.getHeaders()));
} catch (Exception e) {
throw new RuntimeException(e);
}
} else {
LOG.debug("Sending single bundle");
sendSingleBundleToServer(bundle);
}
};
}

void sendSingleBundleToServer(Bundle bundle) {
if (overrideBundleType != null) {
bundle.setType(overrideBundleType);
}
Expand Down
4 changes: 4 additions & 0 deletions src/main/resources/application-batch.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
spring:
cloud:
function:
definition: sinkBatch
8 changes: 1 addition & 7 deletions src/main/resources/application-dev.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
spring:
kafka:
bootstrapServers: localhost:9094
cloud:
stream:
function.definition: sink
bindings:
sink-in-0:
destination: fhir-msg
fhir:
url: http://localhost:8082/fhir
filter:
Expand All @@ -22,7 +16,7 @@ fhir:
bundle-max-size: 25

s3:
enabled: false
enabled: true
endpoint-url: "http://localhost:9000"
access-key: "admin"
secret-key: "miniopass"
Expand Down
19 changes: 15 additions & 4 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,17 @@ spring:
request.timeout.ms: ${REQUEST_TIMEOUT_MS:900000}
cloud:
function:
definition: sink
definition: sinkSingle
stream:
bindings:
sink-in-0:
sinkSingle-in-0:
consumer:
concurrency: ${CONSUMER_CONCURRENCY:1}
batch-mode: false
use-native-decoding: true
destination: ${TOPIC:fhir-msg}
group: ${GROUP_ID:fhir-to-server}
sinkBatch-in-0:
consumer:
concurrency: ${CONSUMER_CONCURRENCY:1}
# this can't be set to `false` unless we change the signature of the
Expand All @@ -49,7 +56,11 @@ spring:
key.serializer: org.springframework.kafka.support.serializer.JsonSerializer
value.serializer: org.miracum.kafka.serializers.KafkaFhirSerializer
bindings:
sink-in-0:
sinkSingle-in-0:
consumer:
configuration:
value.deserializer: org.miracum.kafka.serializers.KafkaFhirDeserializer
sinkBatch-in-0:
consumer:
configuration:
value.deserializer: org.miracum.kafka.serializers.KafkaFhirDeserializer
Expand All @@ -65,7 +76,7 @@ management:
endpoints:
web:
exposure:
include: "*"
include: "health,prometheus"
health:
livenessstate:
enabled: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public SendToServerProcessorTests() {

@Test
void process_withNullResource_shouldNotThrow() {
assertDoesNotThrow(() -> sut.sink().accept(null));
assertDoesNotThrow(() -> sut.sinkBatch().accept(null));
assertDoesNotThrow(() -> sut.sinkSingle().accept(null));
}
}

0 comments on commit ca5227a

Please sign in to comment.