Skip to content

Commit

Permalink
fix: missing metadata for deletes
Browse files Browse the repository at this point in the history
  • Loading branch information
chgl committed Jan 10, 2025
1 parent 2e5b944 commit 407feb6
Showing 1 changed file with 24 additions and 20 deletions.
44 changes: 24 additions & 20 deletions src/main/java/org/miracum/streams/fhirtoserver/S3BundleStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,30 @@ public Void storeBatch(List<Bundle> bundles, MessageHeaders headers)
grouped.size(),
grouped.keySet());

var startTimestamp =
(Long) headers.get(KafkaHeaders.RECEIVED_TIMESTAMP, ArrayList.class).getFirst();
// in Spring Kafka all messages in a batch are from the same partition
var partition =
(Integer) headers.get(KafkaHeaders.RECEIVED_PARTITION, ArrayList.class).getFirst();
var startOffset = (Long) headers.get(KafkaHeaders.OFFSET, ArrayList.class).getFirst();
var metadata =
Map.of(
"kafka-timestamp",
startTimestamp.toString(),
"kafka-partition",
partition.toString(),
"kafka-offset",
startOffset.toString(),
"kafka-topic",
(String) headers.get(KafkaHeaders.RECEIVED_TOPIC, ArrayList.class).getFirst(),
"kafka-group-id",
headers.getOrDefault(KafkaHeaders.GROUP_ID, "").toString());

for (var entry : grouped.entrySet()) {
var resourceType = entry.getKey();
var ndjson = listOfResourcesToNdjson(entry.getValue());

var prefix = config.objectNamePrefix().orElse("");
var startTimestamp =
(Long) headers.get(KafkaHeaders.RECEIVED_TIMESTAMP, ArrayList.class).getFirst();
// in Spring Kafka all messages in a batch are from the same partition
var partition =
(Integer) headers.get(KafkaHeaders.RECEIVED_PARTITION, ArrayList.class).getFirst();
var startOffset = (Long) headers.get(KafkaHeaders.OFFSET, ArrayList.class).getFirst();

var objectKey =
String.format(
Expand All @@ -87,18 +100,6 @@ public Void storeBatch(List<Bundle> bundles, MessageHeaders headers)
objectKey);

var body = RequestBody.fromString(ndjson);
var metadata =
Map.of(
"kafka-timestamp",
startTimestamp.toString(),
"kafka-partition",
partition.toString(),
"kafka-offset",
startOffset.toString(),
"kafka-topic",
(String) headers.get(KafkaHeaders.RECEIVED_TOPIC, ArrayList.class).getFirst(),
"kafka-group-id",
headers.getOrDefault(KafkaHeaders.GROUP_ID, "").toString());

s3Client.putObject(
request ->
Expand All @@ -117,7 +118,7 @@ public Void storeBatch(List<Bundle> bundles, MessageHeaders headers)
mergedBundle.deletBundle().getEntry().stream()
.collect(Collectors.groupingBy(e -> e.getRequest().getUrl().split("/")[0]));

storeDeleteBundles(groupedDeletes, headers);
storeDeleteBundles(groupedDeletes, headers, metadata);

return null;
}
Expand All @@ -140,7 +141,9 @@ private String listOfResourcesToNdjson(List<IBaseResource> resources)
}

private void storeDeleteBundles(
Map<String, List<BundleEntryComponent>> groupedDeletes, MessageHeaders headers) {
Map<String, List<BundleEntryComponent>> groupedDeletes,
MessageHeaders headers,
Map<String, String> metadata) {
LOG.debug(
"Storing {} delete requests in buckets ({})",
groupedDeletes.size(),
Expand Down Expand Up @@ -191,6 +194,7 @@ private void storeDeleteBundles(
request
.bucket(config.bucketName())
.key(objectKey)
.metadata(metadata)
.contentType(Constants.CT_FHIR_JSON_NEW),
body);
}
Expand Down

0 comments on commit 407feb6

Please sign in to comment.