From ae981ff0444c6356106d8f3d29cade430a340d60 Mon Sep 17 00:00:00 2001 From: chgl Date: Fri, 3 Jan 2025 23:45:27 +0100 Subject: [PATCH] fix: switch to aws s3 sdk for better ceph compatibility (#134) * fix: switch to aws s3 sdk for better ceph compatibility * retry retryable * rm delete bundles from mock test data [skip ci] --- build.gradle | 3 +- hack/delete-bundles.ndjson | 3 + .../miracum/streams/fhirtoserver/Config.java | 35 +++-- .../streams/fhirtoserver/S3BundleStore.java | 129 +++++++++--------- .../streams/fhirtoserver/S3Config.java | 4 + .../fhirtoserver/SendToServerProcessor.java | 12 +- src/main/resources/application.yml | 3 + 7 files changed, 103 insertions(+), 86 deletions(-) create mode 100644 hack/delete-bundles.ndjson diff --git a/build.gradle b/build.gradle index deaec4b..5117ee4 100644 --- a/build.gradle +++ b/build.gradle @@ -58,7 +58,8 @@ dependencies { implementation 'org.fhir:ucum:1.0.8' - implementation 'io.minio:minio:8.5.13' + implementation platform("software.amazon.awssdk:bom:2.29.43") + implementation "software.amazon.awssdk:s3" testImplementation 'org.springframework.boot:spring-boot-starter-test' } diff --git a/hack/delete-bundles.ndjson b/hack/delete-bundles.ndjson new file mode 100644 index 0000000..6a17a7a --- /dev/null +++ b/hack/delete-bundles.ndjson @@ -0,0 +1,3 @@ +{"resourceType":"Bundle","type":"batch","entry":[{"request":{"method":"DELETE","url":"Encounter/cid.108"}}]} +{"resourceType":"Bundle","type":"batch","entry":[{"request":{"method":"DELETE","url":"Observation/id-6457799fed46dffc"}}]} +{"resourceType":"Bundle","type":"batch","entry":[{"request":{"method":"DELETE","url":"Patient/pid.999"}}]} diff --git a/src/main/java/org/miracum/streams/fhirtoserver/Config.java b/src/main/java/org/miracum/streams/fhirtoserver/Config.java index 6a05dc4..f078bd2 100644 --- a/src/main/java/org/miracum/streams/fhirtoserver/Config.java +++ b/src/main/java/org/miracum/streams/fhirtoserver/Config.java @@ -5,10 +5,7 @@ import ca.uhn.fhir.okhttp.client.OkHttpRestfulClientFactory; import ca.uhn.fhir.rest.client.api.IGenericClient; import ca.uhn.fhir.rest.client.interceptor.BasicAuthInterceptor; -import io.minio.MinioClient; -import io.minio.credentials.AwsEnvironmentProvider; -import io.minio.credentials.Provider; -import io.minio.credentials.StaticProvider; +import java.net.URISyntaxException; import java.time.Duration; import okhttp3.OkHttpClient; import org.hl7.fhir.r4.hapi.fluentpath.FhirPathR4; @@ -16,6 +13,11 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.services.s3.*; @Configuration public class Config { @@ -64,19 +66,26 @@ IFhirPath fhirPath(FhirContext ctx) { @Bean @ConditionalOnProperty(prefix = "s3", name = "enabled", havingValue = "true") - MinioClient minioClient(S3Config config) { - Provider credentialsProvider; + S3Client s3Client(S3Config config) throws URISyntaxException { + AwsCredentialsProvider credentialsProvider = null; if (config.accessKey().isPresent() && config.secretKey().isPresent()) { - credentialsProvider = - new StaticProvider(config.accessKey().get(), config.secretKey().get(), null); + var credentials = + AwsBasicCredentials.create(config.accessKey().get(), config.secretKey().get()); + credentialsProvider = StaticCredentialsProvider.create(credentials); } else { - credentialsProvider = new AwsEnvironmentProvider(); + credentialsProvider = EnvironmentVariableCredentialsProvider.create(); } - return MinioClient.builder() - .credentialsProvider(credentialsProvider) - .endpoint(config.endpointUrl()) - .build(); + var builder = + S3Client.builder() + .region(config.region()) + .endpointOverride(config.endpointUrl().toURI()) + .forcePathStyle(config.forcePathStyle()) + .overrideConfiguration( + b -> b.apiCallTimeout(Duration.ofSeconds(config.timeoutSeconds()))) + .credentialsProvider(credentialsProvider); + + return builder.build(); } } diff --git a/src/main/java/org/miracum/streams/fhirtoserver/S3BundleStore.java b/src/main/java/org/miracum/streams/fhirtoserver/S3BundleStore.java index d95d842..8bfd496 100644 --- a/src/main/java/org/miracum/streams/fhirtoserver/S3BundleStore.java +++ b/src/main/java/org/miracum/streams/fhirtoserver/S3BundleStore.java @@ -4,19 +4,8 @@ import ca.uhn.fhir.parser.DataFormatException; import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.util.BundleUtil; -import io.minio.MinioClient; -import io.minio.PutObjectArgs; -import io.minio.errors.ErrorResponseException; -import io.minio.errors.InsufficientDataException; -import io.minio.errors.InternalException; -import io.minio.errors.InvalidResponseException; -import io.minio.errors.ServerException; -import io.minio.errors.XmlParserException; -import java.io.ByteArrayInputStream; +import io.micrometer.common.lang.Nullable; import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.security.InvalidKeyException; -import java.security.NoSuchAlgorithmException; import java.time.Instant; import java.util.List; import java.util.stream.Collectors; @@ -26,52 +15,49 @@ import org.hl7.fhir.r4.model.Bundle.BundleType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.lang.Nullable; import org.springframework.stereotype.Service; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3Client; @Service public class S3BundleStore { private static final Logger LOG = LoggerFactory.getLogger(S3BundleStore.class); - private MinioClient minioClient; private FhirBundleMerger merger; private FhirContext fhirContext; private FhirBundleMergerConfig mergerConfig; private S3Config config; + private S3Client s3Client; public S3BundleStore( - @Nullable MinioClient minioClient, + @Nullable S3Client s3Client, S3Config config, FhirBundleMerger merger, FhirContext fhirContext, FhirBundleMergerConfig mergerConfig) { - this.minioClient = minioClient; + this.s3Client = s3Client; this.config = config; this.merger = merger; this.fhirContext = fhirContext; this.mergerConfig = mergerConfig; } - public Void storeBatch(List bundles) - throws DataFormatException, - IOException, - InvalidKeyException, - ErrorResponseException, - InsufficientDataException, - InternalException, - InvalidResponseException, - NoSuchAlgorithmException, - ServerException, - XmlParserException { - // start by merging all those bundles + public Void storeBatch(List bundles) throws DataFormatException, IOException { + // start by merging all those bundles split into POST/PUT and DELETE bundles var mergedBundle = merger.mergeSeperateDeleteBundles( bundles, mergerConfig.entryUniquenessFhirpathExpression()); - // extract all POST/PUT bundle entries + // extract all POST/PUT bundle entries (mergedBundle.deleteBundle() contains the DELETE entries) var resources = BundleUtil.toListOfResources(fhirContext, mergedBundle.bundle()); var grouped = resources.stream().collect(Collectors.groupingBy(IBaseResource::fhirType)); + LOG.debug( + "Storing {} resources in {} buckets ({})", + resources.size(), + grouped.size(), + grouped.keySet()); + var parser = fhirContext.newJsonParser(); for (var entry : grouped.entrySet()) { @@ -87,60 +73,75 @@ public Void storeBatch(List bundles) parser.encodeResourceToWriter(resource, stringWriter); } - var bais = - new ByteArrayInputStream(stringWriter.toString().getBytes(StandardCharsets.UTF_8)); var prefix = config.objectNamePrefix().orElse(""); - var putArgs = - PutObjectArgs.builder() - .bucket(config.bucketName()) - .object( - String.format( - "%s%s/bundle-%s.ndjson", - prefix, resourceType, Instant.now().toEpochMilli())) - .stream(bais, bais.available(), 0) - .contentType(Constants.CT_FHIR_NDJSON) - .build(); - - minioClient.putObject(putArgs); + + var objectKey = + String.format( + "%s%s/bundle-%s.ndjson", prefix, resourceType, Instant.now().toEpochMilli()); + + LOG.debug( + "Storing {} resources of type {} as object {}", + entry.getValue().size(), + entry.getKey(), + objectKey); + + var body = RequestBody.fromString(stringWriter.toString()); + s3Client.putObject( + request -> + request + .bucket(config.bucketName()) + .key(objectKey) + .contentType(Constants.CT_FHIR_NDJSON), + body); } } + // now, deal with the DELETE entries + // extract the bundles grouped by the resource type of the DELETE request var groupedDeletes = mergedBundle.deletBundle().getEntry().stream() .collect(Collectors.groupingBy(e -> e.getRequest().getUrl().split("/")[0])); + LOG.debug( + "Storing {} delete requests in buckets ({})", + groupedDeletes.size(), + groupedDeletes.keySet()); + + // each entry is one resource type for (var entry : groupedDeletes.entrySet()) { var deleteBundle = new Bundle(); - deleteBundle.setType(BundleType.BATCH); + deleteBundle.setType(BundleType.TRANSACTION); - // turns all entries off the merged bundles into a single one + // turns all entries of the merged bundles into a single one // per resource type for (var bundleEntry : entry.getValue()) { deleteBundle.addEntry().setRequest(bundleEntry.getRequest()); } - try (var stringWriter = new StringBuilderWriter()) { - var resourceType = entry.getKey(); + var resourceType = entry.getKey(); - parser.encodeResourceToWriter(deleteBundle, stringWriter); + var deleteBundleJson = parser.encodeResourceToString(deleteBundle); - var bais = - new ByteArrayInputStream(stringWriter.toString().getBytes(StandardCharsets.UTF_8)); - var prefix = config.objectNamePrefix().orElse(""); - var putArgs = - PutObjectArgs.builder() - .bucket(config.bucketName()) - .object( - String.format( - "%s%s/_delete/bundle-%s.json", - prefix, resourceType, Instant.now().toEpochMilli())) - .stream(bais, bais.available(), 0) - .contentType(Constants.CT_FHIR_NDJSON) - .build(); - - minioClient.putObject(putArgs); - } + var prefix = config.objectNamePrefix().orElse(""); + + var objectKey = + String.format( + "%s%s/_delete/bundle-%s.json", prefix, resourceType, Instant.now().toEpochMilli()); + + LOG.debug( + "Storing delete bundle with {} entries as object {}", + deleteBundle.getEntry().size(), + objectKey); + + var body = RequestBody.fromString(deleteBundleJson); + s3Client.putObject( + request -> + request + .bucket(config.bucketName()) + .key(objectKey) + .contentType(Constants.CT_FHIR_JSON_NEW), + body); } return null; } diff --git a/src/main/java/org/miracum/streams/fhirtoserver/S3Config.java b/src/main/java/org/miracum/streams/fhirtoserver/S3Config.java index ebde87e..d6f1ad8 100644 --- a/src/main/java/org/miracum/streams/fhirtoserver/S3Config.java +++ b/src/main/java/org/miracum/streams/fhirtoserver/S3Config.java @@ -5,13 +5,17 @@ import java.util.Optional; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.validation.annotation.Validated; +import software.amazon.awssdk.regions.Region; @ConfigurationProperties(prefix = "s3") @Validated public record S3Config( boolean enabled, + boolean forcePathStyle, + long timeoutSeconds, URL endpointUrl, Optional accessKey, Optional secretKey, @NotEmpty String bucketName, + Region region, Optional objectNamePrefix) {} diff --git a/src/main/java/org/miracum/streams/fhirtoserver/SendToServerProcessor.java b/src/main/java/org/miracum/streams/fhirtoserver/SendToServerProcessor.java index a4abf93..3b0feec 100644 --- a/src/main/java/org/miracum/streams/fhirtoserver/SendToServerProcessor.java +++ b/src/main/java/org/miracum/streams/fhirtoserver/SendToServerProcessor.java @@ -13,10 +13,6 @@ import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Timer; -import io.minio.errors.ErrorResponseException; -import io.minio.errors.InternalException; -import io.minio.errors.ServerException; -import io.minio.errors.XmlParserException; import java.io.IOException; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; @@ -44,6 +40,8 @@ import org.springframework.stereotype.Service; import org.springframework.web.client.HttpServerErrorException; import org.springframework.web.client.ResourceAccessException; +import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.core.exception.RetryableException; @Configuration @Service @@ -131,14 +129,12 @@ public SendToServerProcessor( retryableExceptions.put(InternalErrorException.class, true); retryableExceptions.put(ResourceNotFoundException.class, false); retryableExceptions.put(ResourceVersionConflictException.class, false); - retryableExceptions.put(XmlParserException.class, false); - retryableExceptions.put(ServerException.class, false); retryableExceptions.put(NoSuchAlgorithmException.class, false); - retryableExceptions.put(InternalException.class, true); - retryableExceptions.put(ErrorResponseException.class, true); retryableExceptions.put(DataFormatException.class, false); retryableExceptions.put(InvalidKeyException.class, false); retryableExceptions.put(IOException.class, true); + retryableExceptions.put(AwsServiceException.class, true); + retryableExceptions.put(RetryableException.class, true); retryTemplate.setRetryPolicy(new SimpleRetryPolicy(Integer.MAX_VALUE, retryableExceptions)); diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 1f1df05..2b7c45a 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -95,6 +95,9 @@ s3: secret-key: "" bucket-name: "fhir" object-name-prefix: "" + region: "eu-central-1" + force-path-style: true + timeout-seconds: 120 logging: pattern: