Skip to content

Commit

Permalink
fix: switch to aws s3 sdk for better ceph compatibility (#134)
Browse files Browse the repository at this point in the history
* fix: switch to aws s3 sdk for better ceph compatibility

* retry retryable

* rm delete bundles from mock test data

[skip ci]
  • Loading branch information
chgl authored Jan 3, 2025
1 parent 01d95a7 commit ae981ff
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 86 deletions.
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ dependencies {

implementation 'org.fhir:ucum:1.0.8'

implementation 'io.minio:minio:8.5.13'
implementation platform("software.amazon.awssdk:bom:2.29.43")
implementation "software.amazon.awssdk:s3"

testImplementation 'org.springframework.boot:spring-boot-starter-test'
}
Expand Down
3 changes: 3 additions & 0 deletions hack/delete-bundles.ndjson
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"resourceType":"Bundle","type":"batch","entry":[{"request":{"method":"DELETE","url":"Encounter/cid.108"}}]}
{"resourceType":"Bundle","type":"batch","entry":[{"request":{"method":"DELETE","url":"Observation/id-6457799fed46dffc"}}]}
{"resourceType":"Bundle","type":"batch","entry":[{"request":{"method":"DELETE","url":"Patient/pid.999"}}]}
35 changes: 22 additions & 13 deletions src/main/java/org/miracum/streams/fhirtoserver/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@
import ca.uhn.fhir.okhttp.client.OkHttpRestfulClientFactory;
import ca.uhn.fhir.rest.client.api.IGenericClient;
import ca.uhn.fhir.rest.client.interceptor.BasicAuthInterceptor;
import io.minio.MinioClient;
import io.minio.credentials.AwsEnvironmentProvider;
import io.minio.credentials.Provider;
import io.minio.credentials.StaticProvider;
import java.net.URISyntaxException;
import java.time.Duration;
import okhttp3.OkHttpClient;
import org.hl7.fhir.r4.hapi.fluentpath.FhirPathR4;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.services.s3.*;

@Configuration
public class Config {
Expand Down Expand Up @@ -64,19 +66,26 @@ IFhirPath fhirPath(FhirContext ctx) {

@Bean
@ConditionalOnProperty(prefix = "s3", name = "enabled", havingValue = "true")
MinioClient minioClient(S3Config config) {
Provider credentialsProvider;
S3Client s3Client(S3Config config) throws URISyntaxException {
AwsCredentialsProvider credentialsProvider = null;

if (config.accessKey().isPresent() && config.secretKey().isPresent()) {
credentialsProvider =
new StaticProvider(config.accessKey().get(), config.secretKey().get(), null);
var credentials =
AwsBasicCredentials.create(config.accessKey().get(), config.secretKey().get());
credentialsProvider = StaticCredentialsProvider.create(credentials);
} else {
credentialsProvider = new AwsEnvironmentProvider();
credentialsProvider = EnvironmentVariableCredentialsProvider.create();
}

return MinioClient.builder()
.credentialsProvider(credentialsProvider)
.endpoint(config.endpointUrl())
.build();
var builder =
S3Client.builder()
.region(config.region())
.endpointOverride(config.endpointUrl().toURI())
.forcePathStyle(config.forcePathStyle())
.overrideConfiguration(
b -> b.apiCallTimeout(Duration.ofSeconds(config.timeoutSeconds())))
.credentialsProvider(credentialsProvider);

return builder.build();
}
}
129 changes: 65 additions & 64 deletions src/main/java/org/miracum/streams/fhirtoserver/S3BundleStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,8 @@
import ca.uhn.fhir.parser.DataFormatException;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.util.BundleUtil;
import io.minio.MinioClient;
import io.minio.PutObjectArgs;
import io.minio.errors.ErrorResponseException;
import io.minio.errors.InsufficientDataException;
import io.minio.errors.InternalException;
import io.minio.errors.InvalidResponseException;
import io.minio.errors.ServerException;
import io.minio.errors.XmlParserException;
import java.io.ByteArrayInputStream;
import io.micrometer.common.lang.Nullable;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.time.Instant;
import java.util.List;
import java.util.stream.Collectors;
Expand All @@ -26,52 +15,49 @@
import org.hl7.fhir.r4.model.Bundle.BundleType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.lang.Nullable;
import org.springframework.stereotype.Service;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;

@Service
public class S3BundleStore {
private static final Logger LOG = LoggerFactory.getLogger(S3BundleStore.class);
private MinioClient minioClient;
private FhirBundleMerger merger;
private FhirContext fhirContext;
private FhirBundleMergerConfig mergerConfig;
private S3Config config;
private S3Client s3Client;

public S3BundleStore(
@Nullable MinioClient minioClient,
@Nullable S3Client s3Client,
S3Config config,
FhirBundleMerger merger,
FhirContext fhirContext,
FhirBundleMergerConfig mergerConfig) {
this.minioClient = minioClient;
this.s3Client = s3Client;
this.config = config;
this.merger = merger;
this.fhirContext = fhirContext;
this.mergerConfig = mergerConfig;
}

public Void storeBatch(List<Bundle> bundles)
throws DataFormatException,
IOException,
InvalidKeyException,
ErrorResponseException,
InsufficientDataException,
InternalException,
InvalidResponseException,
NoSuchAlgorithmException,
ServerException,
XmlParserException {
// start by merging all those bundles
public Void storeBatch(List<Bundle> bundles) throws DataFormatException, IOException {
// start by merging all those bundles split into POST/PUT and DELETE bundles
var mergedBundle =
merger.mergeSeperateDeleteBundles(
bundles, mergerConfig.entryUniquenessFhirpathExpression());

// extract all POST/PUT bundle entries
// extract all POST/PUT bundle entries (mergedBundle.deleteBundle() contains the DELETE entries)
var resources = BundleUtil.toListOfResources(fhirContext, mergedBundle.bundle());

var grouped = resources.stream().collect(Collectors.groupingBy(IBaseResource::fhirType));

LOG.debug(
"Storing {} resources in {} buckets ({})",
resources.size(),
grouped.size(),
grouped.keySet());

var parser = fhirContext.newJsonParser();

for (var entry : grouped.entrySet()) {
Expand All @@ -87,60 +73,75 @@ public Void storeBatch(List<Bundle> bundles)
parser.encodeResourceToWriter(resource, stringWriter);
}

var bais =
new ByteArrayInputStream(stringWriter.toString().getBytes(StandardCharsets.UTF_8));
var prefix = config.objectNamePrefix().orElse("");
var putArgs =
PutObjectArgs.builder()
.bucket(config.bucketName())
.object(
String.format(
"%s%s/bundle-%s.ndjson",
prefix, resourceType, Instant.now().toEpochMilli()))
.stream(bais, bais.available(), 0)
.contentType(Constants.CT_FHIR_NDJSON)
.build();

minioClient.putObject(putArgs);

var objectKey =
String.format(
"%s%s/bundle-%s.ndjson", prefix, resourceType, Instant.now().toEpochMilli());

LOG.debug(
"Storing {} resources of type {} as object {}",
entry.getValue().size(),
entry.getKey(),
objectKey);

var body = RequestBody.fromString(stringWriter.toString());
s3Client.putObject(
request ->
request
.bucket(config.bucketName())
.key(objectKey)
.contentType(Constants.CT_FHIR_NDJSON),
body);
}
}

// now, deal with the DELETE entries

// extract the bundles grouped by the resource type of the DELETE request
var groupedDeletes =
mergedBundle.deletBundle().getEntry().stream()
.collect(Collectors.groupingBy(e -> e.getRequest().getUrl().split("/")[0]));

LOG.debug(
"Storing {} delete requests in buckets ({})",
groupedDeletes.size(),
groupedDeletes.keySet());

// each entry is one resource type
for (var entry : groupedDeletes.entrySet()) {
var deleteBundle = new Bundle();
deleteBundle.setType(BundleType.BATCH);
deleteBundle.setType(BundleType.TRANSACTION);

// turns all entries off the merged bundles into a single one
// turns all entries of the merged bundles into a single one
// per resource type
for (var bundleEntry : entry.getValue()) {
deleteBundle.addEntry().setRequest(bundleEntry.getRequest());
}

try (var stringWriter = new StringBuilderWriter()) {
var resourceType = entry.getKey();
var resourceType = entry.getKey();

parser.encodeResourceToWriter(deleteBundle, stringWriter);
var deleteBundleJson = parser.encodeResourceToString(deleteBundle);

var bais =
new ByteArrayInputStream(stringWriter.toString().getBytes(StandardCharsets.UTF_8));
var prefix = config.objectNamePrefix().orElse("");
var putArgs =
PutObjectArgs.builder()
.bucket(config.bucketName())
.object(
String.format(
"%s%s/_delete/bundle-%s.json",
prefix, resourceType, Instant.now().toEpochMilli()))
.stream(bais, bais.available(), 0)
.contentType(Constants.CT_FHIR_NDJSON)
.build();

minioClient.putObject(putArgs);
}
var prefix = config.objectNamePrefix().orElse("");

var objectKey =
String.format(
"%s%s/_delete/bundle-%s.json", prefix, resourceType, Instant.now().toEpochMilli());

LOG.debug(
"Storing delete bundle with {} entries as object {}",
deleteBundle.getEntry().size(),
objectKey);

var body = RequestBody.fromString(deleteBundleJson);
s3Client.putObject(
request ->
request
.bucket(config.bucketName())
.key(objectKey)
.contentType(Constants.CT_FHIR_JSON_NEW),
body);
}
return null;
}
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/org/miracum/streams/fhirtoserver/S3Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@
import java.util.Optional;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.validation.annotation.Validated;
import software.amazon.awssdk.regions.Region;

@ConfigurationProperties(prefix = "s3")
@Validated
public record S3Config(
boolean enabled,
boolean forcePathStyle,
long timeoutSeconds,
URL endpointUrl,
Optional<String> accessKey,
Optional<String> secretKey,
@NotEmpty String bucketName,
Region region,
Optional<String> objectNamePrefix) {}
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import io.minio.errors.ErrorResponseException;
import io.minio.errors.InternalException;
import io.minio.errors.ServerException;
import io.minio.errors.XmlParserException;
import java.io.IOException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
Expand Down Expand Up @@ -44,6 +40,8 @@
import org.springframework.stereotype.Service;
import org.springframework.web.client.HttpServerErrorException;
import org.springframework.web.client.ResourceAccessException;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.exception.RetryableException;

@Configuration
@Service
Expand Down Expand Up @@ -131,14 +129,12 @@ public SendToServerProcessor(
retryableExceptions.put(InternalErrorException.class, true);
retryableExceptions.put(ResourceNotFoundException.class, false);
retryableExceptions.put(ResourceVersionConflictException.class, false);
retryableExceptions.put(XmlParserException.class, false);
retryableExceptions.put(ServerException.class, false);
retryableExceptions.put(NoSuchAlgorithmException.class, false);
retryableExceptions.put(InternalException.class, true);
retryableExceptions.put(ErrorResponseException.class, true);
retryableExceptions.put(DataFormatException.class, false);
retryableExceptions.put(InvalidKeyException.class, false);
retryableExceptions.put(IOException.class, true);
retryableExceptions.put(AwsServiceException.class, true);
retryableExceptions.put(RetryableException.class, true);

retryTemplate.setRetryPolicy(new SimpleRetryPolicy(Integer.MAX_VALUE, retryableExceptions));

Expand Down
3 changes: 3 additions & 0 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ s3:
secret-key: ""
bucket-name: "fhir"
object-name-prefix: ""
region: "eu-central-1"
force-path-style: true
timeout-seconds: 120

logging:
pattern:
Expand Down

0 comments on commit ae981ff

Please sign in to comment.