From 3ba82872dd26560fb7b24e114c039c61654bb94e Mon Sep 17 00:00:00 2001 From: sband Date: Fri, 12 May 2023 17:46:00 +0530 Subject: [PATCH 01/16] fix: Store the resources in S3 buckets (data file and revisions offset files) #582 --- .../org/sirix/io/cloud/CloudPlatform.java | 7 + .../cloud/CloudStorageConnectionFactory.java | 7 + .../org/sirix/io/cloud/ICloudStorage.java | 9 + .../io/cloud/amazon/AmazonS3Storage.java | 197 ++++++++++++++++++ .../cloud/amazon/AmazonS3StorageReader.java | 122 +++++++++++ .../cloud/amazon/AmazonS3StorageWriter.java | 5 + 6 files changed, 347 insertions(+) create mode 100644 bundles/sirix-core/src/main/java/org/sirix/io/cloud/CloudPlatform.java create mode 100644 bundles/sirix-core/src/main/java/org/sirix/io/cloud/CloudStorageConnectionFactory.java create mode 100644 bundles/sirix-core/src/main/java/org/sirix/io/cloud/ICloudStorage.java create mode 100644 bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3Storage.java create mode 100644 bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageReader.java create mode 100644 bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageWriter.java diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/CloudPlatform.java b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/CloudPlatform.java new file mode 100644 index 000000000..fdd4ae509 --- /dev/null +++ b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/CloudPlatform.java @@ -0,0 +1,7 @@ +package org.sirix.io.cloud; + +public enum CloudPlatform { + + AWS, GCP, AZURE + +} diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/CloudStorageConnectionFactory.java b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/CloudStorageConnectionFactory.java new file mode 100644 index 000000000..3dbde9922 --- /dev/null +++ b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/CloudStorageConnectionFactory.java @@ -0,0 +1,7 @@ +package org.sirix.io.cloud; + +public interface CloudStorageConnectionFactory { + + + +} diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/ICloudStorage.java b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/ICloudStorage.java new file mode 100644 index 000000000..95a3c5742 --- /dev/null +++ b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/ICloudStorage.java @@ -0,0 +1,9 @@ +package org.sirix.io.cloud; + +import org.sirix.io.IOStorage; + +public interface ICloudStorage extends IOStorage { + + + +} diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3Storage.java b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3Storage.java new file mode 100644 index 000000000..4e69ab687 --- /dev/null +++ b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3Storage.java @@ -0,0 +1,197 @@ +package org.sirix.io.cloud.amazon; + +import java.nio.file.Path; + +import org.sirix.access.ResourceConfiguration; +import org.sirix.io.Reader; +import org.sirix.io.RevisionFileData; +import org.sirix.io.Writer; +import org.sirix.io.bytepipe.ByteHandler; +import org.sirix.io.bytepipe.ByteHandlerPipeline; +import org.sirix.io.cloud.ICloudStorage; +import org.sirix.utils.LogWrapper; +import org.slf4j.LoggerFactory; + +import com.github.benmanes.caffeine.cache.AsyncCache; + +import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; +import software.amazon.awssdk.core.waiters.WaiterResponse; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.CreateBucketRequest; +import software.amazon.awssdk.services.s3.model.HeadBucketRequest; +import software.amazon.awssdk.services.s3.model.HeadBucketResponse; +import software.amazon.awssdk.services.s3.model.NoSuchBucketException; +import software.amazon.awssdk.services.s3.model.S3Exception; +import software.amazon.awssdk.services.s3.waiters.S3Waiter; + +/** + * Factory to provide Amazon S3 as storage backend + * + * @Auther Sanket Band (@sband) + **/ + +public class AmazonS3Storage implements ICloudStorage { + + /** + * Data file name. + */ + private static final String FILENAME = "sirix.data"; + + /** + * Revisions file name. + */ + private static final String REVISIONS_FILENAME = "sirix.revisions"; + + /** + * Instance to local storage. + */ + private final Path file; + + /** + * S3 storage bucket name + * + */ + private String bucketName; + + private S3Client s3Client; + + /** Logger. */ + private static final LogWrapper LOGGER = new LogWrapper(LoggerFactory.getLogger(AmazonS3Storage.class)); + + /** + * Byte handler pipeline. + */ + private final ByteHandlerPipeline byteHandlerPipeline; + + /** + * Revision file data cache. + */ + private final AsyncCache cache; + + /** + * Support AWS authentication only with .aws credentials file with the required + * profile name from the creds file + */ + public AmazonS3Storage(String bucketName, String awsProfile, + String region, + boolean shouldCreateBucketIfNotExists, final ResourceConfiguration resourceConfig, + AsyncCache cache, + ByteHandlerPipeline byteHandlerPipeline) { + this.bucketName = bucketName; + this.s3Client = this.getS3Client(awsProfile,region); + /* + * If the bucket does not exist, should create a new bucket based on the boolean + */ + /* + * Exit the system if the cloud storage bucket cannot be created Alternatively, + * we could just set a flag that could be checked before creating a reader or + * writer. Return null if the bucket is not created OR does not exist But that + * would keep the user under false impression that the bucket is created OR + * exists already even if it does not exists + */ + if (!isBucketExists(bucketName, s3Client)) { + if (shouldCreateBucketIfNotExists) { + createBucket(bucketName, s3Client); + } else { + LOGGER.error(String.format("Bucket: %s, does not exists on Amazon S3 storage, exiting the system", + bucketName)); + System.exit(1); + } + } + this.cache = cache; + this.byteHandlerPipeline = byteHandlerPipeline; + this.file = resourceConfig.resourcePath; + } + + private void createBucket(String bucketName, S3Client s3Client) { + try { + S3Waiter s3Waiter = s3Client.waiter(); + CreateBucketRequest bucketRequest = CreateBucketRequest.builder().bucket(bucketName).build(); + + s3Client.createBucket(bucketRequest); + HeadBucketRequest bucketRequestWait = HeadBucketRequest.builder().bucket(bucketName).build(); + + WaiterResponse waiterResponse = s3Waiter.waitUntilBucketExists(bucketRequestWait); + if (waiterResponse.matched().response().isPresent()) { + LOGGER.info(String.format("S3 bucket: %s has been created.", bucketName)); + } + } catch (S3Exception e) { + LOGGER.error(e.awsErrorDetails().errorMessage()); + LOGGER.error(String.format("Bucket: %s could not be created. Will not consume S3 storage", bucketName)); + System.exit(1); + } + } + + private boolean isBucketExists(String bucketName, S3Client s3Client) { + HeadBucketRequest headBucketRequest = HeadBucketRequest.builder().bucket(bucketName).build(); + + try { + s3Client.headBucket(headBucketRequest); + return true; + } catch (NoSuchBucketException e) { + return false; + } + } + + private S3Client getS3Client(String awsProfile, String region) { + S3Client s3Client = null; + s3Client = S3Client.builder() + .region(Region.of(region)) + .credentialsProvider(ProfileCredentialsProvider.create(awsProfile)) + .build(); + return s3Client; + } + + @Override + public Writer createWriter() { + /* + * This would create a writer that connects to the + * remote storagee*/ + return null; + } + + @Override + public Reader createReader() { + /* + * This would create a reader that connects to the + * remote storage on cloud + * */ + return null; + } + + @Override + public void close() { + // TODO Auto-generated method stub + + } + + @Override + public boolean exists() { + // TODO Auto-generated method stub + return false; + } + + @Override + public ByteHandler getByteHandler() { + return this.byteHandlerPipeline; + } + + /** + * Getting path for data file. + * This path would be used on the local storage + * @return the path for this data file + */ + private Path getDataFilePath() { + return file.resolve(ResourceConfiguration.ResourcePaths.DATA.getPath()).resolve(FILENAME); + } + + /** + * Getting concrete storage for this file. + * + * @return the concrete storage for this database + */ + private Path getRevisionFilePath() { + return file.resolve(ResourceConfiguration.ResourcePaths.DATA.getPath()).resolve(REVISIONS_FILENAME); + } +} diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageReader.java b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageReader.java new file mode 100644 index 000000000..0409eaa00 --- /dev/null +++ b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageReader.java @@ -0,0 +1,122 @@ +package org.sirix.io.cloud.amazon; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.RandomAccessFile; +import java.nio.file.FileSystems; +import java.time.Instant; + +import org.checkerframework.checker.nullness.qual.Nullable; +import org.sirix.api.PageReadOnlyTrx; +import org.sirix.io.Reader; +import org.sirix.io.RevisionFileData; +import org.sirix.io.bytepipe.ByteHandler; +import org.sirix.io.file.FileReader; +import org.sirix.page.PagePersister; +import org.sirix.page.PageReference; +import org.sirix.page.RevisionRootPage; +import org.sirix.page.SerializationType; +import org.sirix.page.interfaces.Page; +import org.sirix.utils.LogWrapper; +import org.slf4j.LoggerFactory; + +import com.github.benmanes.caffeine.cache.Cache; + +import software.amazon.awssdk.core.ResponseBytes; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.S3Exception; + +public class AmazonS3StorageReader implements Reader { + + /** + * S3 storage bucket name + * + */ + private String bucketName; + + private S3Client s3Client; + + /** Logger. */ + private static final LogWrapper LOGGER = new LogWrapper(LoggerFactory.getLogger(AmazonS3StorageReader.class)); + + + private FileReader reader; + + public AmazonS3StorageReader(String bucketName, S3Client s3Client, + final RandomAccessFile dataFile, + final RandomAccessFile revisionsOffsetFile, + final ByteHandler byteHandler, + final SerializationType serializationType, + final PagePersister pagePersister, + final Cache cache) { + this.bucketName = bucketName; + this.s3Client = s3Client; + this.reader = new FileReader(dataFile, + revisionsOffsetFile, + byteHandler, + serializationType, + pagePersister, + cache); + } + + private void readObjectDataFromS3(String keyName) { + + try { + GetObjectRequest objectRequest = GetObjectRequest + .builder() + .key(keyName) + .bucket(bucketName) + .build(); + + ResponseBytes objectBytes = s3Client.getObjectAsBytes(objectRequest); + byte[] data = objectBytes.asByteArray(); + String path = System.getProperty("java.io.tmpdir") + FileSystems.getDefault().getSeparator() + keyName; + // Write the data to a local file. + File myFile = new File(path); + OutputStream os = new FileOutputStream(myFile); + os.write(data); + os.close(); + } catch (IOException ex) { + ex.printStackTrace(); + } catch (S3Exception e) { + System.err.println(e.awsErrorDetails().errorMessage()); + System.exit(1); + } + } + + @Override + public PageReference readUberPageReference() { + return reader.readUberPageReference(); + } + + @Override + public Page read(PageReference key, @Nullable PageReadOnlyTrx pageReadTrx) { + return reader.read(key, pageReadTrx); + } + + @Override + public void close() { + s3Client.close(); + reader.close(); + } + + @Override + public RevisionRootPage readRevisionRootPage(int revision, PageReadOnlyTrx pageReadTrx) { + return reader.readRevisionRootPage(revision, pageReadTrx); + } + + @Override + public Instant readRevisionRootPageCommitTimestamp(int revision) { + return reader.readRevisionRootPageCommitTimestamp(revision); + } + + @Override + public RevisionFileData getRevisionFileData(int revision) { + return reader.getRevisionFileData(revision); + } + +} diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageWriter.java b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageWriter.java new file mode 100644 index 000000000..e2f6fd572 --- /dev/null +++ b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageWriter.java @@ -0,0 +1,5 @@ +package org.sirix.io.cloud.amazon; + +public class AmazonS3StorageWriter { + +} From f42771b8ebebfc0ba2a70928982ad38c48ab78c1 Mon Sep 17 00:00:00 2001 From: sband Date: Tue, 16 May 2023 17:40:37 +0530 Subject: [PATCH 02/16] fix#582: Partial implementation of AWS Storage writer --- .../cloud/amazon/AmazonS3StorageWriter.java | 166 +++++++++++++++++- 1 file changed, 165 insertions(+), 1 deletion(-) diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageWriter.java b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageWriter.java index e2f6fd572..cdd9699b8 100644 --- a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageWriter.java +++ b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageWriter.java @@ -1,5 +1,169 @@ package org.sirix.io.cloud.amazon; -public class AmazonS3StorageWriter { +import static java.util.Objects.requireNonNull; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; + +import org.checkerframework.checker.nullness.qual.Nullable; +import org.sirix.api.PageReadOnlyTrx; +import org.sirix.io.AbstractForwardingReader; +import org.sirix.io.Reader; +import org.sirix.io.RevisionFileData; +import org.sirix.io.Writer; +import org.sirix.io.file.FileReader; +import org.sirix.page.PagePersister; +import org.sirix.page.PageReference; +import org.sirix.page.RevisionRootPage; +import org.sirix.page.SerializationType; +import org.sirix.page.interfaces.Page; +import org.sirix.utils.LogWrapper; +import org.slf4j.LoggerFactory; + +import com.github.benmanes.caffeine.cache.AsyncCache; + +import net.openhft.chronicle.bytes.Bytes; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.S3Exception; + +public class AmazonS3StorageWriter extends AbstractForwardingReader implements Writer { + + /** + * Random access to work on. + */ + private RandomAccessFile dataFile; + + /** + * {@link AmazonS3StorageReader} reference for this writer. + */ + private final AmazonS3StorageReader reader; + + private final SerializationType type; + + private RandomAccessFile revisionsFile; + + private final PagePersister pagePersister; + + private final AsyncCache cache; + + private boolean isFirstUberPage; + + private final Bytes byteBufferBytes = Bytes.elasticByteBuffer(1_000); + + private final S3Client s3Client; + + /** Logger. */ + private static final LogWrapper LOGGER = new LogWrapper(LoggerFactory.getLogger(AmazonS3StorageWriter.class)); + + public AmazonS3StorageWriter (final String dataFileKeyName, final String revisionsOffsetFileKeyName, + final SerializationType serializationType, final PagePersister pagePersister, + final AsyncCache cache, final AmazonS3StorageReader reader, + final S3Client s3Client) throws FileNotFoundException { + this.dataFile = new RandomAccessFile(requireNonNull(reader.readObjectDataFromS3(dataFileKeyName)).toFile(),"rw"); + type = requireNonNull(serializationType); + this.revisionsFile = type == SerializationType.DATA ? new RandomAccessFile(requireNonNull(reader.readObjectDataFromS3(revisionsOffsetFileKeyName)).toFile(),"rw") : null; + this.pagePersister = requireNonNull(pagePersister); + this.cache = cache; + this.reader = requireNonNull(reader); + this.s3Client = s3Client; + } + + /** + * @param bucketName - S3 bucket name on AWS + * @param keyName - Name of the file that includes the full path that is supposed to be used on the local file system + * @param object - File that could be read from the local filesystem that contains the actual information + * to be stored on S3 + * + * The expectation is that user provides a File object which will contain the data that needs to backed up to the remote + * storage i.e. AWS S3 in this case + * */ + protected void writeObjectToS3(String bucketName, String keyName, File object, boolean isDataFile) { + try { + Map metadata = new HashMap<>(); + metadata.put("x-amz-meta-sirix", isDataFile ? "data" : "revision"); + PutObjectRequest putOb = PutObjectRequest.builder() + .bucket(bucketName) + .key(keyName) + .metadata(metadata) + .build(); + + s3Client.putObject(putOb, RequestBody.fromFile(object)); + } catch (S3Exception e) { + LOGGER.error(e.awsErrorDetails().errorMessage()); + System.exit(1); + } + } + + @Override + public PageReference readUberPageReference() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Page read(PageReference key, @Nullable PageReadOnlyTrx pageReadTrx) { + // TODO Auto-generated method stub + return null; + } + + @Override + public void close() { + // TODO Auto-generated method stub + } + + @Override + public RevisionRootPage readRevisionRootPage(int revision, PageReadOnlyTrx pageReadTrx) { + // TODO Auto-generated method stub + return null; + } + + @Override + public Instant readRevisionRootPageCommitTimestamp(int revision) { + // TODO Auto-generated method stub + return null; + } + + @Override + public RevisionFileData getRevisionFileData(int revision) { + // TODO Auto-generated method stub + return null; + } + + @Override + public Writer write(PageReadOnlyTrx pageReadOnlyTrx, PageReference pageReference, Bytes bufferedBytes) { + // TODO Auto-generated method stub + return null; + } + + @Override + public Writer writeUberPageReference(PageReadOnlyTrx pageReadOnlyTrx, PageReference pageReference, + Bytes bufferedBytes) { + // TODO Auto-generated method stub + return null; + } + + @Override + public Writer truncateTo(PageReadOnlyTrx pageReadOnlyTrx, int revision) { + // TODO Auto-generated method stub + return null; + } + + @Override + public Writer truncate() { + // TODO Auto-generated method stub + return null; + } + + @Override + protected Reader delegate() { + // TODO Auto-generated method stub + return null; + } } From c035de764320743579f2b0b39e3484ee832a9bbb Mon Sep 17 00:00:00 2001 From: sband Date: Tue, 16 May 2023 17:42:04 +0530 Subject: [PATCH 03/16] fix#582: Use logger instead of System.err object --- .../cloud/amazon/AmazonS3StorageReader.java | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageReader.java b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageReader.java index 0409eaa00..2cf02566e 100644 --- a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageReader.java +++ b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageReader.java @@ -6,6 +6,7 @@ import java.io.OutputStream; import java.io.RandomAccessFile; import java.nio.file.FileSystems; +import java.nio.file.Path; import java.time.Instant; import org.checkerframework.checker.nullness.qual.Nullable; @@ -46,7 +47,8 @@ public class AmazonS3StorageReader implements Reader { private FileReader reader; - public AmazonS3StorageReader(String bucketName, S3Client s3Client, + public AmazonS3StorageReader(String bucketName, String keyName, + S3Client s3Client, final RandomAccessFile dataFile, final RandomAccessFile revisionsOffsetFile, final ByteHandler byteHandler, @@ -63,7 +65,12 @@ public AmazonS3StorageReader(String bucketName, S3Client s3Client, cache); } - private void readObjectDataFromS3(String keyName) { + /** + * @param keyName - Key name of the object to be read from S3 storage + * @return path - The location of the local file that contains the data that is written to the file system storage + * in the system temp directory. + */ + protected Path readObjectDataFromS3(String keyName) { try { GetObjectRequest objectRequest = GetObjectRequest @@ -77,15 +84,17 @@ private void readObjectDataFromS3(String keyName) { String path = System.getProperty("java.io.tmpdir") + FileSystems.getDefault().getSeparator() + keyName; // Write the data to a local file. File myFile = new File(path); - OutputStream os = new FileOutputStream(myFile); - os.write(data); - os.close(); + try(OutputStream os = new FileOutputStream(myFile)){ + os.write(data); + } + return Path.of(path); } catch (IOException ex) { ex.printStackTrace(); } catch (S3Exception e) { - System.err.println(e.awsErrorDetails().errorMessage()); + LOGGER.error(e.awsErrorDetails().errorMessage()); System.exit(1); } + return null; } @Override From d38b5b0951bd1f58144adb6406f1591af5b9460a Mon Sep 17 00:00:00 2001 From: sband Date: Thu, 18 May 2023 15:15:35 +0530 Subject: [PATCH 04/16] fix#582 : AWS S3 Writer implementation --- .../cloud/amazon/AmazonS3StorageWriter.java | 199 ++++++++++++++---- 1 file changed, 158 insertions(+), 41 deletions(-) diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageWriter.java b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageWriter.java index cdd9699b8..434ba8bd3 100644 --- a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageWriter.java +++ b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageWriter.java @@ -2,25 +2,36 @@ import static java.util.Objects.requireNonNull; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; import java.io.File; import java.io.FileNotFoundException; +import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; +import java.nio.file.FileSystems; import java.time.Instant; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; -import org.checkerframework.checker.nullness.qual.Nullable; +import org.jetbrains.annotations.NotNull; import org.sirix.api.PageReadOnlyTrx; +import org.sirix.exception.SirixIOException; import org.sirix.io.AbstractForwardingReader; +import org.sirix.io.IOStorage; import org.sirix.io.Reader; import org.sirix.io.RevisionFileData; import org.sirix.io.Writer; -import org.sirix.io.file.FileReader; +import org.sirix.page.KeyValueLeafPage; import org.sirix.page.PagePersister; import org.sirix.page.PageReference; import org.sirix.page.RevisionRootPage; import org.sirix.page.SerializationType; +import org.sirix.page.UberPage; import org.sirix.page.interfaces.Page; import org.sirix.utils.LogWrapper; import org.slf4j.LoggerFactory; @@ -59,16 +70,21 @@ public class AmazonS3StorageWriter extends AbstractForwardingReader implements W private final S3Client s3Client; + private final String bucketName; + /** Logger. */ private static final LogWrapper LOGGER = new LogWrapper(LoggerFactory.getLogger(AmazonS3StorageWriter.class)); public AmazonS3StorageWriter (final String dataFileKeyName, final String revisionsOffsetFileKeyName, + final String bucketName, final SerializationType serializationType, final PagePersister pagePersister, final AsyncCache cache, final AmazonS3StorageReader reader, final S3Client s3Client) throws FileNotFoundException { + this.bucketName = bucketName; this.dataFile = new RandomAccessFile(requireNonNull(reader.readObjectDataFromS3(dataFileKeyName)).toFile(),"rw"); type = requireNonNull(serializationType); - this.revisionsFile = type == SerializationType.DATA ? new RandomAccessFile(requireNonNull(reader.readObjectDataFromS3(revisionsOffsetFileKeyName)).toFile(),"rw") : null; + this.revisionsFile = type == SerializationType.DATA ? + new RandomAccessFile(requireNonNull(reader.readObjectDataFromS3(revisionsOffsetFileKeyName)).toFile(),"rw") : null; this.pagePersister = requireNonNull(pagePersister); this.cache = cache; this.reader = requireNonNull(reader); @@ -84,7 +100,7 @@ public AmazonS3StorageWriter (final String dataFileKeyName, final String revisio * The expectation is that user provides a File object which will contain the data that needs to backed up to the remote * storage i.e. AWS S3 in this case * */ - protected void writeObjectToS3(String bucketName, String keyName, File object, boolean isDataFile) { + protected void writeObjectToS3(String keyName, File object, boolean isDataFile) { try { Map metadata = new HashMap<>(); metadata.put("x-amz-meta-sirix", isDataFile ? "data" : "revision"); @@ -101,69 +117,170 @@ protected void writeObjectToS3(String bucketName, String keyName, File object, b } } - @Override - public PageReference readUberPageReference() { - // TODO Auto-generated method stub - return null; - } - - @Override - public Page read(PageReference key, @Nullable PageReadOnlyTrx pageReadTrx) { - // TODO Auto-generated method stub - return null; - } @Override public void close() { - // TODO Auto-generated method stub + try { + if (dataFile != null) { + dataFile.close(); + } + if (revisionsFile != null) { + revisionsFile.close(); + } + if (reader != null) { + reader.close(); + } + this.s3Client.close(); + } catch (final IOException e) { + throw new SirixIOException(e); + } } @Override - public RevisionRootPage readRevisionRootPage(int revision, PageReadOnlyTrx pageReadTrx) { - // TODO Auto-generated method stub - return null; + public Writer write(PageReadOnlyTrx pageReadOnlyTrx, PageReference pageReference, Bytes bufferedBytes) { + try { + final long fileSize = dataFile.length(); + long offset = fileSize == 0 ? IOStorage.FIRST_BEACON : fileSize; + return writePageReference(pageReadOnlyTrx, pageReference, offset); + } catch (final IOException e) { + throw new SirixIOException(e); + } } - @Override - public Instant readRevisionRootPageCommitTimestamp(int revision) { - // TODO Auto-generated method stub - return null; - } - @Override - public RevisionFileData getRevisionFileData(int revision) { - // TODO Auto-generated method stub - return null; + private String getFileKeyName(String fileDescriptorPath) { + return fileDescriptorPath.substring((System.getProperty("java.io.tmpdir")+FileSystems.getDefault().getSeparator()).length()); } + @NotNull + private AmazonS3StorageWriter writePageReference(final PageReadOnlyTrx pageReadOnlyTrx, final PageReference pageReference, + long offset) { + // Perform byte operations. + try { + // Serialize page. + final Page page = pageReference.getPage(); - @Override - public Writer write(PageReadOnlyTrx pageReadOnlyTrx, PageReference pageReference, Bytes bufferedBytes) { - // TODO Auto-generated method stub - return null; - } + final byte[] serializedPage; + + try (final ByteArrayOutputStream output = new ByteArrayOutputStream(1_000); + final DataOutputStream dataOutput = new DataOutputStream(reader.getByteHandler().serialize(output))) { + pagePersister.serializePage(pageReadOnlyTrx, byteBufferBytes, page, type); + final var byteArray = byteBufferBytes.toByteArray(); + dataOutput.write(byteArray); + dataOutput.flush(); + serializedPage = output.toByteArray(); + } + + byteBufferBytes.clear(); + + final byte[] writtenPage = new byte[serializedPage.length + IOStorage.OTHER_BEACON]; + final ByteBuffer buffer = ByteBuffer.allocate(writtenPage.length); + buffer.putInt(serializedPage.length); + buffer.put(serializedPage); + buffer.flip(); + buffer.get(writtenPage); + + // Getting actual offset and appending to the end of the current file. + if (type == SerializationType.DATA) { + if (page instanceof RevisionRootPage) { + if (offset % REVISION_ROOT_PAGE_BYTE_ALIGN != 0) { + offset += REVISION_ROOT_PAGE_BYTE_ALIGN - (offset % REVISION_ROOT_PAGE_BYTE_ALIGN); + } + } else if (offset % PAGE_FRAGMENT_BYTE_ALIGN != 0) { + offset += PAGE_FRAGMENT_BYTE_ALIGN - (offset % PAGE_FRAGMENT_BYTE_ALIGN); + } + } + dataFile.seek(offset); + dataFile.write(writtenPage); + /*Write the file object to S3*/ + this.writeObjectToS3(this.getFileKeyName(dataFile.getFD().toString()), new File(dataFile.getFD().toString()), Boolean.TRUE); + + // Remember page coordinates. + pageReference.setKey(offset); + + if (page instanceof KeyValueLeafPage keyValueLeafPage) { + pageReference.setHash(keyValueLeafPage.getHashCode()); + } else { + /*TODO : Check for correctness of this*/ + pageReference.setHash(reader.getHashFunction().hashBytes(serializedPage).asBytes()); + } + + if (type == SerializationType.DATA) { + if (page instanceof RevisionRootPage revisionRootPage) { + if (revisionRootPage.getRevision() == 0) { + revisionsFile.seek(revisionsFile.length() + IOStorage.FIRST_BEACON); + } else { + revisionsFile.seek(revisionsFile.length()); + } + revisionsFile.writeLong(offset); + revisionsFile.writeLong(revisionRootPage.getRevisionTimestamp()); + if (cache != null) { + final long currOffset = offset; + cache.put(revisionRootPage.getRevision(), + CompletableFuture.supplyAsync(() -> new RevisionFileData(currOffset, + Instant.ofEpochMilli(revisionRootPage.getRevisionTimestamp())))); + } + } else if (page instanceof UberPage && isFirstUberPage) { + revisionsFile.seek(0); + revisionsFile.write(serializedPage); + revisionsFile.seek(IOStorage.FIRST_BEACON >> 1); + revisionsFile.write(serializedPage); + } + this.writeObjectToS3(this.getFileKeyName(revisionsFile.getFD().toString()), new File(revisionsFile.getFD().toString()), Boolean.FALSE); + } + + return this; + } catch (final IOException e) { + throw new SirixIOException(e); + } + } @Override public Writer writeUberPageReference(PageReadOnlyTrx pageReadOnlyTrx, PageReference pageReference, Bytes bufferedBytes) { - // TODO Auto-generated method stub - return null; + isFirstUberPage = true; + writePageReference(pageReadOnlyTrx, pageReference, 0); + isFirstUberPage = false; + writePageReference(pageReadOnlyTrx, pageReference, 100); + return this; } @Override public Writer truncateTo(PageReadOnlyTrx pageReadOnlyTrx, int revision) { - // TODO Auto-generated method stub - return null; + try { + final var dataFileRevisionRootPageOffset = + cache.get(revision, (unused) -> getRevisionFileData(revision)).get(5, TimeUnit.SECONDS).offset(); + + // Read page from file. + dataFile.seek(dataFileRevisionRootPageOffset); + final int dataLength = dataFile.readInt(); + + dataFile.getChannel().truncate(dataFileRevisionRootPageOffset + IOStorage.OTHER_BEACON + dataLength); + this.writeObjectToS3(getFileKeyName(dataFile.getFD().toString()), new File(dataFile.getFD().toString()), Boolean.TRUE); + } catch (InterruptedException | ExecutionException | TimeoutException | IOException e) { + throw new IllegalStateException(e); + } + + return this; } @Override public Writer truncate() { - // TODO Auto-generated method stub - return null; + try { + dataFile.setLength(0); + this.writeObjectToS3(getFileKeyName(dataFile.getFD().toString()), new File(dataFile.getFD().toString()), Boolean.TRUE); + if (revisionsFile != null) { + revisionsFile.setLength(0); + this.writeObjectToS3(getFileKeyName(revisionsFile.getFD().toString()), new File(revisionsFile.getFD().toString()), Boolean.FALSE); + } + } catch (final IOException e) { + throw new SirixIOException(e); + } + + return this; } @Override protected Reader delegate() { - // TODO Auto-generated method stub - return null; + return this.reader; } } From ee7cddb586902fd3de5579fdb17562872cd16cd5 Mon Sep 17 00:00:00 2001 From: sband Date: Fri, 19 May 2023 17:15:45 +0530 Subject: [PATCH 05/16] fix#582: Complete implementation of S3 storage, storage reader and writer classes --- .../io/cloud/amazon/AmazonS3Storage.java | 90 +++++++++++-------- .../cloud/amazon/AmazonS3StorageReader.java | 38 +++++--- .../cloud/amazon/AmazonS3StorageWriter.java | 47 +++++++--- .../java/org/sirix/io/file/FileReader.java | 8 ++ 4 files changed, 125 insertions(+), 58 deletions(-) diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3Storage.java b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3Storage.java index 4e69ab687..649250203 100644 --- a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3Storage.java +++ b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3Storage.java @@ -1,5 +1,8 @@ package org.sirix.io.cloud.amazon; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; import java.nio.file.Path; import org.sirix.access.ResourceConfiguration; @@ -9,6 +12,8 @@ import org.sirix.io.bytepipe.ByteHandler; import org.sirix.io.bytepipe.ByteHandlerPipeline; import org.sirix.io.cloud.ICloudStorage; +import org.sirix.page.PagePersister; +import org.sirix.page.SerializationType; import org.sirix.utils.LogWrapper; import org.slf4j.LoggerFactory; @@ -17,6 +22,7 @@ import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; import software.amazon.awssdk.core.waiters.WaiterResponse; import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.CreateBucketRequest; import software.amazon.awssdk.services.s3.model.HeadBucketRequest; @@ -31,7 +37,7 @@ * @Auther Sanket Band (@sband) **/ -public class AmazonS3Storage implements ICloudStorage { +public final class AmazonS3Storage implements ICloudStorage { /** * Data file name. @@ -69,6 +75,11 @@ public class AmazonS3Storage implements ICloudStorage { */ private final AsyncCache cache; + private String awsProfile; + private String region; + + private final AmazonS3StorageReader reader; + /** * Support AWS authentication only with .aws credentials file with the required * profile name from the creds file @@ -79,32 +90,22 @@ public AmazonS3Storage(String bucketName, String awsProfile, AsyncCache cache, ByteHandlerPipeline byteHandlerPipeline) { this.bucketName = bucketName; - this.s3Client = this.getS3Client(awsProfile,region); - /* - * If the bucket does not exist, should create a new bucket based on the boolean - */ - /* - * Exit the system if the cloud storage bucket cannot be created Alternatively, - * we could just set a flag that could be checked before creating a reader or - * writer. Return null if the bucket is not created OR does not exist But that - * would keep the user under false impression that the bucket is created OR - * exists already even if it does not exists - */ - if (!isBucketExists(bucketName, s3Client)) { - if (shouldCreateBucketIfNotExists) { - createBucket(bucketName, s3Client); - } else { - LOGGER.error(String.format("Bucket: %s, does not exists on Amazon S3 storage, exiting the system", - bucketName)); - System.exit(1); - } - } + this.awsProfile = awsProfile; + this.region = region; this.cache = cache; this.byteHandlerPipeline = byteHandlerPipeline; this.file = resourceConfig.resourcePath; + this.reader = new AmazonS3StorageReader(bucketName, + s3Client, + getDataFilePath().toAbsolutePath().toString(), + getRevisionFilePath().toAbsolutePath().toString(), + new ByteHandlerPipeline(byteHandlerPipeline), + SerializationType.DATA, + new PagePersister(), + cache.synchronous()); } - private void createBucket(String bucketName, S3Client s3Client) { + void createBucket(String bucketName) { try { S3Waiter s3Waiter = s3Client.waiter(); CreateBucketRequest bucketRequest = CreateBucketRequest.builder().bucket(bucketName).build(); @@ -123,7 +124,7 @@ private void createBucket(String bucketName, S3Client s3Client) { } } - private boolean isBucketExists(String bucketName, S3Client s3Client) { + boolean isBucketExists(String bucketName) { HeadBucketRequest headBucketRequest = HeadBucketRequest.builder().bucket(bucketName).build(); try { @@ -134,7 +135,7 @@ private boolean isBucketExists(String bucketName, S3Client s3Client) { } } - private S3Client getS3Client(String awsProfile, String region) { + S3Client getS3Client() { S3Client s3Client = null; s3Client = S3Client.builder() .region(Region.of(region)) @@ -143,33 +144,50 @@ private S3Client getS3Client(String awsProfile, String region) { return s3Client; } + S3AsyncClient getAsyncS3Client() { + S3AsyncClient s3AsyncClient = S3AsyncClient.builder() + .region(Region.of(region)) + .credentialsProvider(ProfileCredentialsProvider.create(awsProfile)) + .build(); + return s3AsyncClient; + } + @Override public Writer createWriter() { - /* - * This would create a writer that connects to the - * remote storagee*/ - return null; + AmazonS3StorageReader reader = new AmazonS3StorageReader(bucketName, + s3Client, + getDataFilePath().toAbsolutePath().toString(), + getRevisionFilePath().toAbsolutePath().toString(), + new ByteHandlerPipeline(byteHandlerPipeline), + SerializationType.DATA, + new PagePersister(), + cache.synchronous()); + return new AmazonS3StorageWriter (getDataFilePath().toAbsolutePath().toString(), + getRevisionFilePath().toAbsolutePath().toString(), + bucketName, + SerializationType.DATA,new PagePersister(), + cache,reader, + this.getAsyncS3Client()); } @Override public Reader createReader() { - /* - * This would create a reader that connects to the - * remote storage on cloud - * */ - return null; + return this.reader; } @Override public void close() { - // TODO Auto-generated method stub } @Override public boolean exists() { - // TODO Auto-generated method stub - return false; + Path storage = this.reader.readObjectDataFromS3(getDataFilePath().toAbsolutePath().toString()); + try { + return Files.exists(storage) && Files.size(storage) > 0; + } catch (final IOException e) { + throw new UncheckedIOException(e); + } } @Override diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageReader.java b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageReader.java index 2cf02566e..7fbdf7edb 100644 --- a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageReader.java +++ b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageReader.java @@ -1,6 +1,7 @@ package org.sirix.io.cloud.amazon; import java.io.File; +import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; @@ -24,6 +25,7 @@ import org.slf4j.LoggerFactory; import com.github.benmanes.caffeine.cache.Cache; +import com.google.common.hash.HashFunction; import software.amazon.awssdk.core.ResponseBytes; import software.amazon.awssdk.services.s3.S3Client; @@ -47,22 +49,30 @@ public class AmazonS3StorageReader implements Reader { private FileReader reader; - public AmazonS3StorageReader(String bucketName, String keyName, - S3Client s3Client, - final RandomAccessFile dataFile, - final RandomAccessFile revisionsOffsetFile, + public AmazonS3StorageReader(String bucketName, + S3Client s3Client, + String dataFileKeyName, + String revisionsOffsetFileKeyName, final ByteHandler byteHandler, final SerializationType serializationType, final PagePersister pagePersister, final Cache cache) { this.bucketName = bucketName; this.s3Client = s3Client; - this.reader = new FileReader(dataFile, - revisionsOffsetFile, - byteHandler, - serializationType, - pagePersister, - cache); + Path dataFilePath = readObjectDataFromS3(dataFileKeyName); + Path revisionOffsetFilePath = readObjectDataFromS3(revisionsOffsetFileKeyName); + try { + this.reader = new FileReader(new RandomAccessFile(dataFilePath.toFile(), "r"), + new RandomAccessFile(revisionOffsetFilePath.toFile(), "r"), + byteHandler, + serializationType, + pagePersister, + cache); + }catch(IOException io) { + LOGGER.error(io.getMessage()); + System.exit(1); + } + } /** @@ -97,6 +107,14 @@ protected Path readObjectDataFromS3(String keyName) { return null; } + ByteHandler getByteHandler() { + return this.reader.getByteHandler(); + } + + HashFunction getHashFunction() { + return this.reader.getHashFunction(); + } + @Override public PageReference readUberPageReference() { return reader.readUberPageReference(); diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageWriter.java b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageWriter.java index 434ba8bd3..470c0897b 100644 --- a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageWriter.java +++ b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageWriter.java @@ -39,9 +39,10 @@ import com.github.benmanes.caffeine.cache.AsyncCache; import net.openhft.chronicle.bytes.Bytes; -import software.amazon.awssdk.core.sync.RequestBody; -import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; import software.amazon.awssdk.services.s3.model.S3Exception; public class AmazonS3StorageWriter extends AbstractForwardingReader implements Writer { @@ -68,7 +69,7 @@ public class AmazonS3StorageWriter extends AbstractForwardingReader implements W private final Bytes byteBufferBytes = Bytes.elasticByteBuffer(1_000); - private final S3Client s3Client; + private final S3AsyncClient s3Client; private final String bucketName; @@ -79,12 +80,18 @@ public AmazonS3StorageWriter (final String dataFileKeyName, final String revisio final String bucketName, final SerializationType serializationType, final PagePersister pagePersister, final AsyncCache cache, final AmazonS3StorageReader reader, - final S3Client s3Client) throws FileNotFoundException { + final S3AsyncClient s3Client) { this.bucketName = bucketName; - this.dataFile = new RandomAccessFile(requireNonNull(reader.readObjectDataFromS3(dataFileKeyName)).toFile(),"rw"); type = requireNonNull(serializationType); - this.revisionsFile = type == SerializationType.DATA ? - new RandomAccessFile(requireNonNull(reader.readObjectDataFromS3(revisionsOffsetFileKeyName)).toFile(),"rw") : null; + try { + this.dataFile = new RandomAccessFile(requireNonNull(reader.readObjectDataFromS3(dataFileKeyName)).toFile(),"rw"); + this.revisionsFile = type == SerializationType.DATA ? + new RandomAccessFile(requireNonNull(reader.readObjectDataFromS3(revisionsOffsetFileKeyName)).toFile(),"rw") : null; + }catch(IOException io) { + LOGGER.error(String.format("Cannot create S3 storage writer, " + + "please check if DATA path OR Revision offset file path exists. Error details : %s", io.getMessage())); + } + this.pagePersister = requireNonNull(pagePersister); this.cache = cache; this.reader = requireNonNull(reader); @@ -96,9 +103,7 @@ public AmazonS3StorageWriter (final String dataFileKeyName, final String revisio * @param keyName - Name of the file that includes the full path that is supposed to be used on the local file system * @param object - File that could be read from the local filesystem that contains the actual information * to be stored on S3 - * - * The expectation is that user provides a File object which will contain the data that needs to backed up to the remote - * storage i.e. AWS S3 in this case + * * */ protected void writeObjectToS3(String keyName, File object, boolean isDataFile) { try { @@ -110,7 +115,25 @@ protected void writeObjectToS3(String keyName, File object, boolean isDataFile) .metadata(metadata) .build(); - s3Client.putObject(putOb, RequestBody.fromFile(object)); + CompletableFuture objectFutureResponse = s3Client.putObject(putOb, + AsyncRequestBody.fromFile(object)); + objectFutureResponse.whenComplete((response, error) -> { + try { + if (response != null) { + LOGGER.info(String.format("Object: %s has been uploaded on %s", keyName,bucketName)); + object.delete(); + } else { + // Handle error + error.printStackTrace(); + LOGGER.error(error.getMessage()); + System.exit(1); + } + } finally { + s3Client.close(); + } + }); + + objectFutureResponse.join(); } catch (S3Exception e) { LOGGER.error(e.awsErrorDetails().errorMessage()); System.exit(1); @@ -147,10 +170,10 @@ public Writer write(PageReadOnlyTrx pageReadOnlyTrx, PageReference pageReference } } - private String getFileKeyName(String fileDescriptorPath) { return fileDescriptorPath.substring((System.getProperty("java.io.tmpdir")+FileSystems.getDefault().getSeparator()).length()); } + @NotNull private AmazonS3StorageWriter writePageReference(final PageReadOnlyTrx pageReadOnlyTrx, final PageReference pageReference, long offset) { diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/file/FileReader.java b/bundles/sirix-core/src/main/java/org/sirix/io/file/FileReader.java index 0a9e5c6ea..0bbe9dd6d 100644 --- a/bundles/sirix-core/src/main/java/org/sirix/io/file/FileReader.java +++ b/bundles/sirix-core/src/main/java/org/sirix/io/file/FileReader.java @@ -205,4 +205,12 @@ public void close() { throw new SirixIOException(e); } } + + public ByteHandler getByteHandler() { + return this.byteHandler; + } + + public HashFunction getHashFunction() { + return this.hashFunction; + } } From bb0ee16887f0476861bffbff2dcc6f6ce66b3e4c Mon Sep 17 00:00:00 2001 From: sband Date: Tue, 23 May 2023 14:52:43 +0530 Subject: [PATCH 06/16] fix#582 : Code for Combined Storage --- .../cloud/CloudStorageConnectionFactory.java | 7 -- .../sirix/io/combined/CombinedStorage.java | 48 ++++++++++++ .../io/combined/CombinedStorageReader.java | 74 +++++++++++++++++++ .../io/combined/CombinedStorageWriter.java | 70 ++++++++++++++++++ libraries.gradle | 4 +- 5 files changed, 195 insertions(+), 8 deletions(-) delete mode 100644 bundles/sirix-core/src/main/java/org/sirix/io/cloud/CloudStorageConnectionFactory.java create mode 100644 bundles/sirix-core/src/main/java/org/sirix/io/combined/CombinedStorage.java create mode 100644 bundles/sirix-core/src/main/java/org/sirix/io/combined/CombinedStorageReader.java create mode 100644 bundles/sirix-core/src/main/java/org/sirix/io/combined/CombinedStorageWriter.java diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/CloudStorageConnectionFactory.java b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/CloudStorageConnectionFactory.java deleted file mode 100644 index 3dbde9922..000000000 --- a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/CloudStorageConnectionFactory.java +++ /dev/null @@ -1,7 +0,0 @@ -package org.sirix.io.cloud; - -public interface CloudStorageConnectionFactory { - - - -} diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/combined/CombinedStorage.java b/bundles/sirix-core/src/main/java/org/sirix/io/combined/CombinedStorage.java new file mode 100644 index 000000000..c1831bc23 --- /dev/null +++ b/bundles/sirix-core/src/main/java/org/sirix/io/combined/CombinedStorage.java @@ -0,0 +1,48 @@ +package org.sirix.io.combined; + +import org.sirix.io.IOStorage; +import org.sirix.io.Reader; +import org.sirix.io.Writer; +import org.sirix.io.bytepipe.ByteHandler; +import org.sirix.io.cloud.ICloudStorage; + +public class CombinedStorage implements IOStorage { + + private final IOStorage localStorage; + + private final ICloudStorage remoteStorage; + + public CombinedStorage(final IOStorage localStorage, + final ICloudStorage remoteStorage) { + this.localStorage = localStorage; + this.remoteStorage = remoteStorage; + } + + @Override + public Writer createWriter() { + return new CombinedStorageWriter(localStorage.createWriter(), remoteStorage.createWriter(), localStorage.createReader()); + } + + @Override + public Reader createReader() { + return new CombinedStorageReader(localStorage.createReader(), remoteStorage.createReader()); + } + + @Override + public void close() { + localStorage.close(); + remoteStorage.close(); + } + + @Override + public boolean exists() { + if(!localStorage.exists()) return remoteStorage.exists(); + return localStorage.exists(); + } + + @Override + public ByteHandler getByteHandler() { + return localStorage.getByteHandler(); + } + +} diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/combined/CombinedStorageReader.java b/bundles/sirix-core/src/main/java/org/sirix/io/combined/CombinedStorageReader.java new file mode 100644 index 000000000..1307acc9c --- /dev/null +++ b/bundles/sirix-core/src/main/java/org/sirix/io/combined/CombinedStorageReader.java @@ -0,0 +1,74 @@ +package org.sirix.io.combined; + +import java.time.Instant; + +import org.checkerframework.checker.nullness.qual.Nullable; +import org.sirix.api.PageReadOnlyTrx; +import org.sirix.io.Reader; +import org.sirix.io.RevisionFileData; +import org.sirix.page.PageReference; +import org.sirix.page.RevisionRootPage; +import org.sirix.page.interfaces.Page; + +public class CombinedStorageReader implements Reader { + + + private Reader localReader, remoteReader; + + public CombinedStorageReader(Reader localReader, Reader remoteReader) { + this.localReader = localReader; + this.remoteReader = remoteReader; + } + + @Override + public PageReference readUberPageReference() { + PageReference pageReference = localReader.readUberPageReference(); + if(pageReference==null) { + pageReference = remoteReader.readUberPageReference(); + } + return pageReference; + } + + @Override + public Page read(PageReference key, @Nullable PageReadOnlyTrx pageReadTrx) { + Page page = localReader.read(key, pageReadTrx); + if(page==null) { + page = remoteReader.read(key, pageReadTrx); + } + return page; + } + + @Override + public void close() { + localReader.close(); + remoteReader.close(); + } + + @Override + public RevisionRootPage readRevisionRootPage(int revision, PageReadOnlyTrx pageReadTrx) { + RevisionRootPage revRootPage = localReader.readRevisionRootPage(revision, pageReadTrx); + if(revRootPage==null) { + revRootPage = remoteReader.readRevisionRootPage(revision, pageReadTrx); + } + return revRootPage; + } + + @Override + public Instant readRevisionRootPageCommitTimestamp(int revision) { + Instant revRootPageCommitTS = localReader.readRevisionRootPageCommitTimestamp(revision); + if(revRootPageCommitTS==null) { + revRootPageCommitTS = remoteReader.readRevisionRootPageCommitTimestamp(revision); + } + return revRootPageCommitTS; + } + + @Override + public RevisionFileData getRevisionFileData(int revision) { + RevisionFileData revFileData = localReader.getRevisionFileData(revision); + if(revFileData == null) { + revFileData = remoteReader.getRevisionFileData(revision); + } + return revFileData; + } + +} diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/combined/CombinedStorageWriter.java b/bundles/sirix-core/src/main/java/org/sirix/io/combined/CombinedStorageWriter.java new file mode 100644 index 000000000..638da07d4 --- /dev/null +++ b/bundles/sirix-core/src/main/java/org/sirix/io/combined/CombinedStorageWriter.java @@ -0,0 +1,70 @@ +package org.sirix.io.combined; + +import java.nio.ByteBuffer; +import java.time.Instant; + +import org.checkerframework.checker.nullness.qual.Nullable; +import org.sirix.api.PageReadOnlyTrx; +import org.sirix.io.AbstractForwardingReader; +import org.sirix.io.Reader; +import org.sirix.io.RevisionFileData; +import org.sirix.io.Writer; +import org.sirix.page.PageReference; +import org.sirix.page.RevisionRootPage; +import org.sirix.page.interfaces.Page; + +import net.openhft.chronicle.bytes.Bytes; + +public class CombinedStorageWriter extends AbstractForwardingReader implements Writer { + + private Writer localStorageWriter, remoteStorageWriter; + private Reader storageReader; + + public CombinedStorageWriter(Writer localWriter, Writer remoteWriter, Reader storageReader) { + this.localStorageWriter = localWriter; + this.remoteStorageWriter = remoteWriter; + this.storageReader = storageReader; + } + + @Override + public Writer write(PageReadOnlyTrx pageReadOnlyTrx, PageReference pageReference, Bytes bufferedBytes) { + Writer writer = localStorageWriter.write(pageReadOnlyTrx, pageReference, bufferedBytes); + remoteStorageWriter.write(pageReadOnlyTrx, pageReference, bufferedBytes); + return writer; + } + + @Override + public Writer writeUberPageReference(PageReadOnlyTrx pageReadOnlyTrx, PageReference pageReference, + Bytes bufferedBytes) { + Writer writer = localStorageWriter.writeUberPageReference(pageReadOnlyTrx, pageReference, bufferedBytes); + remoteStorageWriter.writeUberPageReference(pageReadOnlyTrx, pageReference, bufferedBytes); + return writer; + } + + @Override + public Writer truncateTo(PageReadOnlyTrx pageReadOnlyTrx, int revision) { + Writer writer = localStorageWriter.truncateTo(pageReadOnlyTrx, revision); + remoteStorageWriter.truncateTo(pageReadOnlyTrx, revision); + return writer; + } + + @Override + public Writer truncate() { + Writer writer = localStorageWriter.truncate(); + remoteStorageWriter.truncate(); + return writer; + } + + @Override + public void close() { + localStorageWriter.close(); + remoteStorageWriter.close(); + + } + + @Override + protected Reader delegate() { + return storageReader; + } + +} diff --git a/libraries.gradle b/libraries.gradle index 72dde06da..f7b8d66ec 100644 --- a/libraries.gradle +++ b/libraries.gradle @@ -44,7 +44,9 @@ implLibraries = [ iouring : 'one.jasyncfio:jasyncfio:0.0.7:linux-amd64', roaringbitmap : 'org.roaringbitmap:RoaringBitmap:0.9.36', fastObjectPool : 'cn.danielw:fast-object-pool:2.2.1', - zeroAllocationHashing : 'net.openhft:zero-allocation-hashing:0.16' + zeroAllocationHashing : 'net.openhft:zero-allocation-hashing:0.16', + amazonS3 : 'software.amazon.awssdk:s3:2.20.62' + ] testLibraries = [ From 0fa2031dc0ba5a257cc4030f311b7fc595640e30 Mon Sep 17 00:00:00 2001 From: sband Date: Thu, 25 May 2023 11:41:35 +0530 Subject: [PATCH 07/16] fix#582 : Add async in combind storage writer and write files on local file system without deleting instead of using tmp parition --- bundles/sirix-core/build.gradle | 1 + .../io/cloud/amazon/AmazonS3Storage.java | 22 ++++---- .../cloud/amazon/AmazonS3StorageReader.java | 4 +- .../cloud/amazon/AmazonS3StorageWriter.java | 3 +- .../io/combined/CombinedStorageWriter.java | 52 +++++++++++++++---- 5 files changed, 60 insertions(+), 22 deletions(-) diff --git a/bundles/sirix-core/build.gradle b/bundles/sirix-core/build.gradle index 4f6f589f1..816e04fa2 100644 --- a/bundles/sirix-core/build.gradle +++ b/bundles/sirix-core/build.gradle @@ -19,6 +19,7 @@ dependencies { api implLibraries.iouring api implLibraries.lz4 api implLibraries.roaringbitmap + api implLibraries.amazonS3 implementation implLibraries.snappyJava implementation implLibraries.browniesCollections diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3Storage.java b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3Storage.java index 649250203..6b6b17877 100644 --- a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3Storage.java +++ b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3Storage.java @@ -87,19 +87,22 @@ public final class AmazonS3Storage implements ICloudStorage { public AmazonS3Storage(String bucketName, String awsProfile, String region, boolean shouldCreateBucketIfNotExists, final ResourceConfiguration resourceConfig, - AsyncCache cache, - ByteHandlerPipeline byteHandlerPipeline) { + AsyncCache cache) { this.bucketName = bucketName; this.awsProfile = awsProfile; this.region = region; this.cache = cache; - this.byteHandlerPipeline = byteHandlerPipeline; + this.byteHandlerPipeline = resourceConfig.byteHandlePipeline; this.file = resourceConfig.resourcePath; + this.s3Client = getS3Client(); //this client is needed for the below checks, so initialize it here only. + if(!isBucketExists(bucketName) && shouldCreateBucketIfNotExists) { + createBucket(bucketName); + } this.reader = new AmazonS3StorageReader(bucketName, s3Client, getDataFilePath().toAbsolutePath().toString(), getRevisionFilePath().toAbsolutePath().toString(), - new ByteHandlerPipeline(byteHandlerPipeline), + new ByteHandlerPipeline(this.byteHandlerPipeline), SerializationType.DATA, new PagePersister(), cache.synchronous()); @@ -125,7 +128,7 @@ void createBucket(String bucketName) { } boolean isBucketExists(String bucketName) { - HeadBucketRequest headBucketRequest = HeadBucketRequest.builder().bucket(bucketName).build(); + HeadBucketRequest headBucketRequest = HeadBucketRequest.builder().bucket(bucketName).build(); try { s3Client.headBucket(headBucketRequest); @@ -136,20 +139,17 @@ boolean isBucketExists(String bucketName) { } S3Client getS3Client() { - S3Client s3Client = null; - s3Client = S3Client.builder() + return this.s3Client==null ? S3Client.builder() .region(Region.of(region)) .credentialsProvider(ProfileCredentialsProvider.create(awsProfile)) - .build(); - return s3Client; + .build() : this.s3Client; } S3AsyncClient getAsyncS3Client() { - S3AsyncClient s3AsyncClient = S3AsyncClient.builder() + return S3AsyncClient.builder() .region(Region.of(region)) .credentialsProvider(ProfileCredentialsProvider.create(awsProfile)) .build(); - return s3AsyncClient; } @Override diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageReader.java b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageReader.java index 7fbdf7edb..d8c888c05 100644 --- a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageReader.java +++ b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageReader.java @@ -91,7 +91,9 @@ protected Path readObjectDataFromS3(String keyName) { ResponseBytes objectBytes = s3Client.getObjectAsBytes(objectRequest); byte[] data = objectBytes.asByteArray(); - String path = System.getProperty("java.io.tmpdir") + FileSystems.getDefault().getSeparator() + keyName; + /*As the bucketName has to be same as the database name, it makes sense to use/create file on the local filesystem + * instead of in the tmp partition*/ + String path = FileSystems.getDefault().getSeparator() + bucketName + FileSystems.getDefault().getSeparator() + keyName; // Write the data to a local file. File myFile = new File(path); try(OutputStream os = new FileOutputStream(myFile)){ diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageWriter.java b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageWriter.java index 470c0897b..f758924a4 100644 --- a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageWriter.java +++ b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageWriter.java @@ -121,7 +121,8 @@ protected void writeObjectToS3(String keyName, File object, boolean isDataFile) try { if (response != null) { LOGGER.info(String.format("Object: %s has been uploaded on %s", keyName,bucketName)); - object.delete(); + /*No need to delete/cleanup the file as we are writing on the local file system, so this avoid + * unnecessarily filling up the filesystem space*/ } else { // Handle error error.printStackTrace(); diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/combined/CombinedStorageWriter.java b/bundles/sirix-core/src/main/java/org/sirix/io/combined/CombinedStorageWriter.java index 638da07d4..0d17de722 100644 --- a/bundles/sirix-core/src/main/java/org/sirix/io/combined/CombinedStorageWriter.java +++ b/bundles/sirix-core/src/main/java/org/sirix/io/combined/CombinedStorageWriter.java @@ -1,17 +1,17 @@ package org.sirix.io.combined; import java.nio.ByteBuffer; -import java.time.Instant; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; -import org.checkerframework.checker.nullness.qual.Nullable; import org.sirix.api.PageReadOnlyTrx; import org.sirix.io.AbstractForwardingReader; import org.sirix.io.Reader; -import org.sirix.io.RevisionFileData; import org.sirix.io.Writer; +import org.sirix.io.cloud.amazon.AmazonS3StorageReader; import org.sirix.page.PageReference; -import org.sirix.page.RevisionRootPage; -import org.sirix.page.interfaces.Page; +import org.sirix.utils.LogWrapper; +import org.slf4j.LoggerFactory; import net.openhft.chronicle.bytes.Bytes; @@ -19,6 +19,8 @@ public class CombinedStorageWriter extends AbstractForwardingReader implements W private Writer localStorageWriter, remoteStorageWriter; private Reader storageReader; + /** Logger. */ + private static final LogWrapper LOGGER = new LogWrapper(LoggerFactory.getLogger(CombinedStorageWriter.class)); public CombinedStorageWriter(Writer localWriter, Writer remoteWriter, Reader storageReader) { this.localStorageWriter = localWriter; @@ -29,7 +31,15 @@ public CombinedStorageWriter(Writer localWriter, Writer remoteWriter, Reader sto @Override public Writer write(PageReadOnlyTrx pageReadOnlyTrx, PageReference pageReference, Bytes bufferedBytes) { Writer writer = localStorageWriter.write(pageReadOnlyTrx, pageReference, bufferedBytes); - remoteStorageWriter.write(pageReadOnlyTrx, pageReference, bufferedBytes); + CompletableFuture remoteWriterTask = CompletableFuture.supplyAsync(() -> remoteStorageWriter.write(pageReadOnlyTrx, pageReference, bufferedBytes)); + if (writer == null) { + try { + writer = remoteWriterTask.get(); + } catch (InterruptedException | ExecutionException e) { + LOGGER.error("Could not complete remote write operation, please check the error details"); + e.printStackTrace(); + } + } return writer; } @@ -37,21 +47,45 @@ public Writer write(PageReadOnlyTrx pageReadOnlyTrx, PageReference pageReference public Writer writeUberPageReference(PageReadOnlyTrx pageReadOnlyTrx, PageReference pageReference, Bytes bufferedBytes) { Writer writer = localStorageWriter.writeUberPageReference(pageReadOnlyTrx, pageReference, bufferedBytes); - remoteStorageWriter.writeUberPageReference(pageReadOnlyTrx, pageReference, bufferedBytes); + CompletableFuture remoteWriterTask = CompletableFuture.supplyAsync(() -> remoteStorageWriter.writeUberPageReference(pageReadOnlyTrx, pageReference, bufferedBytes)); + if (writer == null) { + try { + writer = remoteWriterTask.get(); + } catch (InterruptedException | ExecutionException e) { + LOGGER.error("Could not complete remote write operation, please check the error details"); + e.printStackTrace(); + } + } return writer; } @Override public Writer truncateTo(PageReadOnlyTrx pageReadOnlyTrx, int revision) { Writer writer = localStorageWriter.truncateTo(pageReadOnlyTrx, revision); - remoteStorageWriter.truncateTo(pageReadOnlyTrx, revision); + CompletableFuture remoteWriterTask = CompletableFuture.supplyAsync(() -> remoteStorageWriter.truncateTo(pageReadOnlyTrx, revision)); + if (writer == null) { + try { + writer = remoteWriterTask.get(); + } catch (InterruptedException | ExecutionException e) { + LOGGER.error("Could not complete remote write operation, please check the error details"); + e.printStackTrace(); + } + } return writer; } @Override public Writer truncate() { Writer writer = localStorageWriter.truncate(); - remoteStorageWriter.truncate(); + CompletableFuture remoteWriterTask = CompletableFuture.supplyAsync(() -> remoteStorageWriter.truncate()); + if (writer == null) { + try { + writer = remoteWriterTask.get(); + } catch (InterruptedException | ExecutionException e) { + LOGGER.error("Could not complete remote write operation, please check the error details"); + e.printStackTrace(); + } + } return writer; } From 22c6419aea2061431f6dc0a068582c1c73466dd2 Mon Sep 17 00:00:00 2001 From: sband Date: Fri, 26 May 2023 15:24:50 +0530 Subject: [PATCH 08/16] fix#582: Review comments implemented --- .../sirix/access/ResourceConfiguration.java | 105 ++++++++++++++---- .../main/java/org/sirix/io/StorageType.java | 11 ++ .../io/cloud/amazon/AmazonS3Storage.java | 41 +++---- .../cloud/amazon/AmazonS3StorageReader.java | 17 +-- 4 files changed, 119 insertions(+), 55 deletions(-) diff --git a/bundles/sirix-core/src/main/java/org/sirix/access/ResourceConfiguration.java b/bundles/sirix-core/src/main/java/org/sirix/access/ResourceConfiguration.java index 36b81cc06..507beb77a 100644 --- a/bundles/sirix-core/src/main/java/org/sirix/access/ResourceConfiguration.java +++ b/bundles/sirix-core/src/main/java/org/sirix/access/ResourceConfiguration.java @@ -28,11 +28,23 @@ package org.sirix.access; -import com.google.common.base.MoreObjects; -import com.google.gson.stream.JsonReader; -import com.google.gson.stream.JsonWriter; -import net.openhft.hashing.LongHashFunction; +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + import org.checkerframework.checker.index.qual.NonNegative; +import org.json.JSONObject; import org.sirix.BinaryEncodingVersion; import org.sirix.access.trx.node.HashType; import org.sirix.exception.SirixIOException; @@ -46,20 +58,11 @@ import org.sirix.settings.VersioningType; import org.sirix.utils.OS; -import java.io.FileReader; -import java.io.FileWriter; -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; +import com.google.common.base.MoreObjects; +import com.google.gson.stream.JsonReader; +import com.google.gson.stream.JsonWriter; -import static com.google.common.base.Preconditions.checkArgument; -import static java.util.Objects.requireNonNull; +import net.openhft.hashing.LongHashFunction; /** * Holds the settings for a resource which acts as a base for session that can not change. This @@ -165,6 +168,36 @@ public static int compareStructure(final Path file) { } } + public static final class AWSStorageInformation { + private final String awsProfile; + private final String awsRegion; + private final String bucketName; //this should be same as the database name + private final boolean shouldCreateBucketIfNotExists; + + public AWSStorageInformation(String awsProfile, String awsRegion, String bucketName, + boolean shouldCreateBucketIfNotExists) { + this.awsProfile = awsProfile; + this.awsRegion = awsRegion; + this.bucketName = bucketName; + this.shouldCreateBucketIfNotExists = shouldCreateBucketIfNotExists; + } + + public String getAwsProfile() { + return awsProfile; + } + + public String getAwsRegion() { + return awsRegion; + } + + public String getBucketName() { + return bucketName; + } + + public boolean shouldCreateBucketIfNotExists() { + return shouldCreateBucketIfNotExists; + } + } // FIXED STANDARD FIELDS /** * Standard storage. @@ -297,6 +330,14 @@ public static int compareStructure(final Path file) { // END MEMBERS FOR FIXED FIELDS + /* + * Optional AWS Credentials + * */ + /* + * This could be improved in future to make it more sophisticated in terms setting the credentials + * for creating the cloud client connection + * */ + public final AWSStorageInformation awsStoreInfo; /** * Get a new builder instance. * @@ -330,6 +371,7 @@ private ResourceConfiguration(final ResourceConfiguration.Builder builder) { customCommitTimestamps = builder.customCommitTimestamps; storeNodeHistory = builder.storeNodeHistory; binaryVersion = builder.binaryEncodingVersion; + awsStoreInfo = builder.awsStoreInfo; } public BinaryEncodingVersion getBinaryEncodingVersion() { @@ -448,7 +490,7 @@ public boolean storeNodeHistory() { private static final String[] JSONNAMES = { "binaryEncoding", "revisioning", "revisioningClass", "numbersOfRevisiontoRestore", "byteHandlerClasses", "storageKind", "hashKind", "hashFunction", "compression", "pathSummary", "resourceID", "deweyIDsStored", - "persistenter", "storeDiffs", "customCommitTimestamps", "storeNodeHistory", "storeChildCount" }; + "persistenter", "storeDiffs", "customCommitTimestamps", "storeNodeHistory", "storeChildCount", "awsStoreInfo" }; /** * Serialize the configuration. @@ -596,7 +638,21 @@ public static ResourceConfiguration deserialize(final Path file) throws SirixIOE name = jsonReader.nextName(); assert name.equals(JSONNAMES[16]); final boolean storeChildCount = jsonReader.nextBoolean(); - + name = jsonReader.nextName(); + assert name.equals(JSONNAMES[17]); + /*Begin object to read the nested json properties required aws connection*/ + jsonReader.beginObject(); + AWSStorageInformation awsStoreInfo=null; + if(jsonReader.hasNext()) { + final String awsProfile=jsonReader.nextString(); + final String awsRegion=jsonReader.nextString(); + final String bucketName=jsonReader.nextString(); + final boolean shouldCreateBucketIfNotExists=jsonReader.nextBoolean(); + awsStoreInfo = new AWSStorageInformation(awsProfile, + awsRegion, bucketName, shouldCreateBucketIfNotExists); + } + jsonReader.endObject(); + /*End object to end reading the nested json properties*/ jsonReader.endObject(); jsonReader.close(); fileReader.close(); @@ -619,7 +675,8 @@ public static ResourceConfiguration deserialize(final Path file) throws SirixIOE .storeDiffs(storeDiffs) .storeChildCount(storeChildCount) .customCommitTimestamps(customCommitTimestamps) - .storeNodeHistory(storeNodeHistory); + .storeNodeHistory(storeNodeHistory) + .awsStoreInfo(awsStoreInfo); // Deserialized instance. final ResourceConfiguration config = new ResourceConfiguration(builder); @@ -713,6 +770,8 @@ public static final class Builder { private BinaryEncodingVersion binaryEncodingVersion = BINARY_ENCODING_VERSION; + private AWSStorageInformation awsStoreInfo; + /** * Constructor, setting the mandatory fields. * @@ -880,6 +939,12 @@ public Builder binaryEncodingVersion(BinaryEncodingVersion binaryEncodingVersion return this; } + /*Since this is an optional config parameter, null check is not needed*/ + public Builder awsStoreInfo(final AWSStorageInformation awsStoreInfo) { + this.awsStoreInfo = awsStoreInfo; + return this; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/StorageType.java b/bundles/sirix-core/src/main/java/org/sirix/io/StorageType.java index 2dd9f68c7..bff9bfb22 100644 --- a/bundles/sirix-core/src/main/java/org/sirix/io/StorageType.java +++ b/bundles/sirix-core/src/main/java/org/sirix/io/StorageType.java @@ -24,6 +24,7 @@ import com.github.benmanes.caffeine.cache.Caffeine; import org.sirix.access.ResourceConfiguration; import org.sirix.exception.SirixIOException; +import org.sirix.io.cloud.amazon.AmazonS3Storage; import org.sirix.io.file.FileStorage; import org.sirix.io.filechannel.FileChannelStorage; import org.sirix.io.iouring.IOUringStorage; @@ -113,6 +114,16 @@ public IOStorage getInstance(final ResourceConfiguration resourceConf) { storage.loadRevisionFileDataIntoMemory(cache); return storage; } + }, + + CLOUD { + @Override + public IOStorage getInstance(final ResourceConfiguration resourceConf) { + final AsyncCache cache = + getIntegerRevisionFileDataAsyncCache(resourceConf); + final var storage = new AmazonS3Storage(resourceConf, cache); + return storage; + } }; public static final ConcurrentMap> CACHE_REPOSITORY = diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3Storage.java b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3Storage.java index 6b6b17877..c832533c2 100644 --- a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3Storage.java +++ b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3Storage.java @@ -54,12 +54,6 @@ public final class AmazonS3Storage implements ICloudStorage { */ private final Path file; - /** - * S3 storage bucket name - * - */ - private String bucketName; - private S3Client s3Client; /** Logger. */ @@ -75,26 +69,24 @@ public final class AmazonS3Storage implements ICloudStorage { */ private final AsyncCache cache; - private String awsProfile; - private String region; + private ResourceConfiguration.AWSStorageInformation awsStorageInfo; private final AmazonS3StorageReader reader; + /** * Support AWS authentication only with .aws credentials file with the required * profile name from the creds file */ - public AmazonS3Storage(String bucketName, String awsProfile, - String region, - boolean shouldCreateBucketIfNotExists, final ResourceConfiguration resourceConfig, + public AmazonS3Storage(final ResourceConfiguration resourceConfig, AsyncCache cache) { - this.bucketName = bucketName; - this.awsProfile = awsProfile; - this.region = region; + this.awsStorageInfo = resourceConfig.awsStoreInfo; this.cache = cache; this.byteHandlerPipeline = resourceConfig.byteHandlePipeline; this.file = resourceConfig.resourcePath; this.s3Client = getS3Client(); //this client is needed for the below checks, so initialize it here only. + String bucketName = awsStorageInfo.getBucketName(); + boolean shouldCreateBucketIfNotExists = awsStorageInfo.shouldCreateBucketIfNotExists(); if(!isBucketExists(bucketName) && shouldCreateBucketIfNotExists) { createBucket(bucketName); } @@ -105,7 +97,8 @@ public AmazonS3Storage(String bucketName, String awsProfile, new ByteHandlerPipeline(this.byteHandlerPipeline), SerializationType.DATA, new PagePersister(), - cache.synchronous()); + cache.synchronous(), + resourceConfig); } void createBucket(String bucketName) { @@ -140,31 +133,23 @@ boolean isBucketExists(String bucketName) { S3Client getS3Client() { return this.s3Client==null ? S3Client.builder() - .region(Region.of(region)) - .credentialsProvider(ProfileCredentialsProvider.create(awsProfile)) + .region(Region.of(awsStorageInfo.getAwsRegion())) + .credentialsProvider(ProfileCredentialsProvider.create(awsStorageInfo.getAwsProfile())) .build() : this.s3Client; } S3AsyncClient getAsyncS3Client() { return S3AsyncClient.builder() - .region(Region.of(region)) - .credentialsProvider(ProfileCredentialsProvider.create(awsProfile)) + .region(Region.of(awsStorageInfo.getAwsRegion())) + .credentialsProvider(ProfileCredentialsProvider.create(awsStorageInfo.getAwsProfile())) .build(); } @Override public Writer createWriter() { - AmazonS3StorageReader reader = new AmazonS3StorageReader(bucketName, - s3Client, - getDataFilePath().toAbsolutePath().toString(), - getRevisionFilePath().toAbsolutePath().toString(), - new ByteHandlerPipeline(byteHandlerPipeline), - SerializationType.DATA, - new PagePersister(), - cache.synchronous()); return new AmazonS3StorageWriter (getDataFilePath().toAbsolutePath().toString(), getRevisionFilePath().toAbsolutePath().toString(), - bucketName, + awsStorageInfo.getBucketName(), SerializationType.DATA,new PagePersister(), cache,reader, this.getAsyncS3Client()); diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageReader.java b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageReader.java index d8c888c05..b1c96382e 100644 --- a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageReader.java +++ b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageReader.java @@ -1,7 +1,6 @@ package org.sirix.io.cloud.amazon; import java.io.File; -import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; @@ -11,6 +10,7 @@ import java.time.Instant; import org.checkerframework.checker.nullness.qual.Nullable; +import org.sirix.access.ResourceConfiguration; import org.sirix.api.PageReadOnlyTrx; import org.sirix.io.Reader; import org.sirix.io.RevisionFileData; @@ -39,10 +39,11 @@ public class AmazonS3StorageReader implements Reader { * S3 storage bucket name * */ - private String bucketName; + private final String bucketName; - private S3Client s3Client; + private final S3Client s3Client; + private final ResourceConfiguration resourceConfig; /** Logger. */ private static final LogWrapper LOGGER = new LogWrapper(LoggerFactory.getLogger(AmazonS3StorageReader.class)); @@ -56,9 +57,11 @@ public AmazonS3StorageReader(String bucketName, final ByteHandler byteHandler, final SerializationType serializationType, final PagePersister pagePersister, - final Cache cache) { + final Cache cache, + ResourceConfiguration resourceConfig) { this.bucketName = bucketName; this.s3Client = s3Client; + this.resourceConfig = resourceConfig; Path dataFilePath = readObjectDataFromS3(dataFileKeyName); Path revisionOffsetFilePath = readObjectDataFromS3(revisionsOffsetFileKeyName); try { @@ -93,13 +96,13 @@ protected Path readObjectDataFromS3(String keyName) { byte[] data = objectBytes.asByteArray(); /*As the bucketName has to be same as the database name, it makes sense to use/create file on the local filesystem * instead of in the tmp partition*/ - String path = FileSystems.getDefault().getSeparator() + bucketName + FileSystems.getDefault().getSeparator() + keyName; + Path path = resourceConfig.resourcePath; // Write the data to a local file. - File myFile = new File(path); + File myFile = path.toFile(); try(OutputStream os = new FileOutputStream(myFile)){ os.write(data); } - return Path.of(path); + return path; } catch (IOException ex) { ex.printStackTrace(); } catch (S3Exception e) { From ee428445f184f5495d51b24980a1b18f27050cfc Mon Sep 17 00:00:00 2001 From: sband Date: Fri, 26 May 2023 15:40:50 +0530 Subject: [PATCH 09/16] fix#582: Review comments implemented --- .../io/combined/CombinedStorageWriter.java | 38 ++----------------- 1 file changed, 3 insertions(+), 35 deletions(-) diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/combined/CombinedStorageWriter.java b/bundles/sirix-core/src/main/java/org/sirix/io/combined/CombinedStorageWriter.java index 0d17de722..aea138543 100644 --- a/bundles/sirix-core/src/main/java/org/sirix/io/combined/CombinedStorageWriter.java +++ b/bundles/sirix-core/src/main/java/org/sirix/io/combined/CombinedStorageWriter.java @@ -31,15 +31,7 @@ public CombinedStorageWriter(Writer localWriter, Writer remoteWriter, Reader sto @Override public Writer write(PageReadOnlyTrx pageReadOnlyTrx, PageReference pageReference, Bytes bufferedBytes) { Writer writer = localStorageWriter.write(pageReadOnlyTrx, pageReference, bufferedBytes); - CompletableFuture remoteWriterTask = CompletableFuture.supplyAsync(() -> remoteStorageWriter.write(pageReadOnlyTrx, pageReference, bufferedBytes)); - if (writer == null) { - try { - writer = remoteWriterTask.get(); - } catch (InterruptedException | ExecutionException e) { - LOGGER.error("Could not complete remote write operation, please check the error details"); - e.printStackTrace(); - } - } + CompletableFuture.supplyAsync(() -> remoteStorageWriter.write(pageReadOnlyTrx, pageReference, bufferedBytes)); return writer; } @@ -47,30 +39,14 @@ public Writer write(PageReadOnlyTrx pageReadOnlyTrx, PageReference pageReference public Writer writeUberPageReference(PageReadOnlyTrx pageReadOnlyTrx, PageReference pageReference, Bytes bufferedBytes) { Writer writer = localStorageWriter.writeUberPageReference(pageReadOnlyTrx, pageReference, bufferedBytes); - CompletableFuture remoteWriterTask = CompletableFuture.supplyAsync(() -> remoteStorageWriter.writeUberPageReference(pageReadOnlyTrx, pageReference, bufferedBytes)); - if (writer == null) { - try { - writer = remoteWriterTask.get(); - } catch (InterruptedException | ExecutionException e) { - LOGGER.error("Could not complete remote write operation, please check the error details"); - e.printStackTrace(); - } - } + CompletableFuture.supplyAsync(() -> remoteStorageWriter.writeUberPageReference(pageReadOnlyTrx, pageReference, bufferedBytes)); return writer; } @Override public Writer truncateTo(PageReadOnlyTrx pageReadOnlyTrx, int revision) { Writer writer = localStorageWriter.truncateTo(pageReadOnlyTrx, revision); - CompletableFuture remoteWriterTask = CompletableFuture.supplyAsync(() -> remoteStorageWriter.truncateTo(pageReadOnlyTrx, revision)); - if (writer == null) { - try { - writer = remoteWriterTask.get(); - } catch (InterruptedException | ExecutionException e) { - LOGGER.error("Could not complete remote write operation, please check the error details"); - e.printStackTrace(); - } - } + CompletableFuture.supplyAsync(() -> remoteStorageWriter.truncateTo(pageReadOnlyTrx, revision)); return writer; } @@ -78,14 +54,6 @@ public Writer truncateTo(PageReadOnlyTrx pageReadOnlyTrx, int revision) { public Writer truncate() { Writer writer = localStorageWriter.truncate(); CompletableFuture remoteWriterTask = CompletableFuture.supplyAsync(() -> remoteStorageWriter.truncate()); - if (writer == null) { - try { - writer = remoteWriterTask.get(); - } catch (InterruptedException | ExecutionException e) { - LOGGER.error("Could not complete remote write operation, please check the error details"); - e.printStackTrace(); - } - } return writer; } From c7cb0184bd35a434ca4369450947cd6ec478ed2b Mon Sep 17 00:00:00 2001 From: sband Date: Fri, 26 May 2023 16:25:11 +0530 Subject: [PATCH 10/16] fix#582 : add to serialize aws config info --- .../java/org/sirix/access/ResourceConfiguration.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/bundles/sirix-core/src/main/java/org/sirix/access/ResourceConfiguration.java b/bundles/sirix-core/src/main/java/org/sirix/access/ResourceConfiguration.java index 507beb77a..bd833a409 100644 --- a/bundles/sirix-core/src/main/java/org/sirix/access/ResourceConfiguration.java +++ b/bundles/sirix-core/src/main/java/org/sirix/access/ResourceConfiguration.java @@ -490,7 +490,8 @@ public boolean storeNodeHistory() { private static final String[] JSONNAMES = { "binaryEncoding", "revisioning", "revisioningClass", "numbersOfRevisiontoRestore", "byteHandlerClasses", "storageKind", "hashKind", "hashFunction", "compression", "pathSummary", "resourceID", "deweyIDsStored", - "persistenter", "storeDiffs", "customCommitTimestamps", "storeNodeHistory", "storeChildCount", "awsStoreInfo" }; + "persistenter", "storeDiffs", "customCommitTimestamps", "storeNodeHistory", "storeChildCount", "awsStoreInfo", + "awsProfile","awsRegion","bucketName","shouldCreateBucketIfNotExists"}; /** * Serialize the configuration. @@ -542,6 +543,15 @@ public static void serialize(final ResourceConfiguration config) throws SirixIOE jsonWriter.name(JSONNAMES[15]).value(config.storeNodeHistory); // Child count. jsonWriter.name(JSONNAMES[16]).value(config.storeChildCount); + + jsonWriter.name(JSONNAMES[17]); + jsonWriter.beginObject(); + jsonWriter.name(JSONNAMES[18]).value(config.awsStoreInfo.getAwsProfile()); + jsonWriter.name(JSONNAMES[19]).value(config.awsStoreInfo.getAwsRegion()); + jsonWriter.name(JSONNAMES[20]).value(config.awsStoreInfo.getBucketName()); + jsonWriter.name(JSONNAMES[21]).value(config.awsStoreInfo.shouldCreateBucketIfNotExists()); + jsonWriter.endObject(); + jsonWriter.endObject(); } catch (final IOException e) { throw new SirixIOException(e); From 290452399ebe0c45ca17ebe5cb253930aecd4fe1 Mon Sep 17 00:00:00 2001 From: sband Date: Mon, 29 May 2023 15:00:26 +0530 Subject: [PATCH 11/16] fix#582: Test case failing - fix --- .../sirix/access/ResourceConfiguration.java | 33 +++++++++++-------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/bundles/sirix-core/src/main/java/org/sirix/access/ResourceConfiguration.java b/bundles/sirix-core/src/main/java/org/sirix/access/ResourceConfiguration.java index 3038a2709..8ca01b4b1 100644 --- a/bundles/sirix-core/src/main/java/org/sirix/access/ResourceConfiguration.java +++ b/bundles/sirix-core/src/main/java/org/sirix/access/ResourceConfiguration.java @@ -31,6 +31,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; +import java.io.EOFException; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; @@ -44,7 +45,6 @@ import java.util.Objects; import org.checkerframework.checker.index.qual.NonNegative; -import org.json.JSONObject; import org.sirix.BinaryEncodingVersion; import org.sirix.access.trx.node.HashType; import org.sirix.exception.SirixIOException; @@ -337,7 +337,7 @@ public boolean shouldCreateBucketIfNotExists() { * This could be improved in future to make it more sophisticated in terms setting the credentials * for creating the cloud client connection * */ - public final AWSStorageInformation awsStoreInfo; + public AWSStorageInformation awsStoreInfo; /** * Get a new builder instance. * @@ -649,21 +649,26 @@ public static ResourceConfiguration deserialize(final Path file) throws SirixIOE assert name.equals(JSONNAMES[16]); final boolean storeChildCount = jsonReader.nextBoolean(); name = jsonReader.nextName(); - assert name.equals(JSONNAMES[17]); - /*Begin object to read the nested json properties required aws connection*/ - jsonReader.beginObject(); AWSStorageInformation awsStoreInfo=null; - if(jsonReader.hasNext()) { - final String awsProfile=jsonReader.nextString(); - final String awsRegion=jsonReader.nextString(); - final String bucketName=jsonReader.nextString(); - final boolean shouldCreateBucketIfNotExists=jsonReader.nextBoolean(); - awsStoreInfo = new AWSStorageInformation(awsProfile, - awsRegion, bucketName, shouldCreateBucketIfNotExists); + try { + assert name.equals(JSONNAMES[17]); + /*Since awsStore information is optional, it is important that we add a check on this instead + * of test cases failing because of this change*/ + /*Begin object to read the nested json properties required aws connection*/ + jsonReader.beginObject(); + if(jsonReader.hasNext()) { + final String awsProfile=jsonReader.nextString(); + final String awsRegion=jsonReader.nextString(); + final String bucketName=jsonReader.nextString(); + final boolean shouldCreateBucketIfNotExists=jsonReader.nextBoolean(); + awsStoreInfo = new AWSStorageInformation(awsProfile,awsRegion, bucketName, shouldCreateBucketIfNotExists); + } + jsonReader.endObject(); + /*End object to end reading the nested json properties*/ + }catch(SirixIOException | EOFException io) { + /*Ignore exception, as this information is optional*/ } jsonReader.endObject(); - /*End object to end reading the nested json properties*/ - jsonReader.endObject(); jsonReader.close(); fileReader.close(); From 50c807f3e2cf6c92f37fdb3a8de8a1a604e4a931 Mon Sep 17 00:00:00 2001 From: sband Date: Wed, 31 May 2023 14:14:00 +0530 Subject: [PATCH 12/16] fix#582: Test case failing - fix --- .../main/java/org/sirix/access/ResourceConfiguration.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/bundles/sirix-core/src/main/java/org/sirix/access/ResourceConfiguration.java b/bundles/sirix-core/src/main/java/org/sirix/access/ResourceConfiguration.java index 8ca01b4b1..c5af841a1 100644 --- a/bundles/sirix-core/src/main/java/org/sirix/access/ResourceConfiguration.java +++ b/bundles/sirix-core/src/main/java/org/sirix/access/ResourceConfiguration.java @@ -544,13 +544,12 @@ public static void serialize(final ResourceConfiguration config) throws SirixIOE // Child count. jsonWriter.name(JSONNAMES[16]).value(config.storeChildCount); - jsonWriter.name(JSONNAMES[17]); - jsonWriter.beginObject(); + jsonWriter.name(JSONNAMES[17]).beginObject(); jsonWriter.name(JSONNAMES[18]).value(config.awsStoreInfo.getAwsProfile()); jsonWriter.name(JSONNAMES[19]).value(config.awsStoreInfo.getAwsRegion()); jsonWriter.name(JSONNAMES[20]).value(config.awsStoreInfo.getBucketName()); jsonWriter.name(JSONNAMES[21]).value(config.awsStoreInfo.shouldCreateBucketIfNotExists()); - jsonWriter.endObject(); + jsonWriter.name(JSONNAMES[17]).endObject(); jsonWriter.endObject(); } catch (final IOException e) { From 37fa795fe7a1de7d65fdcc980cd7abcd2dce9247 Mon Sep 17 00:00:00 2001 From: sband Date: Wed, 31 May 2023 14:17:53 +0530 Subject: [PATCH 13/16] fix#582 : Add null check for aws store info --- .../org/sirix/access/ResourceConfiguration.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/bundles/sirix-core/src/main/java/org/sirix/access/ResourceConfiguration.java b/bundles/sirix-core/src/main/java/org/sirix/access/ResourceConfiguration.java index c5af841a1..6b24ddf33 100644 --- a/bundles/sirix-core/src/main/java/org/sirix/access/ResourceConfiguration.java +++ b/bundles/sirix-core/src/main/java/org/sirix/access/ResourceConfiguration.java @@ -543,13 +543,14 @@ public static void serialize(final ResourceConfiguration config) throws SirixIOE jsonWriter.name(JSONNAMES[15]).value(config.storeNodeHistory); // Child count. jsonWriter.name(JSONNAMES[16]).value(config.storeChildCount); - - jsonWriter.name(JSONNAMES[17]).beginObject(); - jsonWriter.name(JSONNAMES[18]).value(config.awsStoreInfo.getAwsProfile()); - jsonWriter.name(JSONNAMES[19]).value(config.awsStoreInfo.getAwsRegion()); - jsonWriter.name(JSONNAMES[20]).value(config.awsStoreInfo.getBucketName()); - jsonWriter.name(JSONNAMES[21]).value(config.awsStoreInfo.shouldCreateBucketIfNotExists()); - jsonWriter.name(JSONNAMES[17]).endObject(); + if(config.awsStoreInfo != null) { + jsonWriter.name(JSONNAMES[17]).beginObject(); + jsonWriter.name(JSONNAMES[18]).value(config.awsStoreInfo.getAwsProfile()); + jsonWriter.name(JSONNAMES[19]).value(config.awsStoreInfo.getAwsRegion()); + jsonWriter.name(JSONNAMES[20]).value(config.awsStoreInfo.getBucketName()); + jsonWriter.name(JSONNAMES[21]).value(config.awsStoreInfo.shouldCreateBucketIfNotExists()); + jsonWriter.name(JSONNAMES[17]).endObject(); + } jsonWriter.endObject(); } catch (final IOException e) { From d7de235536fe0496c965a9bfe1ec99cf3ada9509 Mon Sep 17 00:00:00 2001 From: sband Date: Thu, 8 Jun 2023 13:27:28 +0530 Subject: [PATCH 14/16] fix #582: Replace FileReader with FileChannelReader --- .../io/cloud/amazon/AmazonS3Storage.java | 89 ++-- .../cloud/amazon/AmazonS3StorageReader.java | 91 ++-- .../cloud/amazon/AmazonS3StorageWriter.java | 407 +++++++++--------- .../java/org/sirix/io/file/FileReader.java | 8 - .../io/filechannel/FileChannelReader.java | 7 + 5 files changed, 293 insertions(+), 309 deletions(-) diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3Storage.java b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3Storage.java index c832533c2..66e97dea3 100644 --- a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3Storage.java +++ b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3Storage.java @@ -48,57 +48,49 @@ public final class AmazonS3Storage implements ICloudStorage { * Revisions file name. */ private static final String REVISIONS_FILENAME = "sirix.revisions"; - - /** - * Instance to local storage. - */ - private final Path file; + + /** + * Instance to local storage. + */ + private final Path file; private S3Client s3Client; /** Logger. */ private static final LogWrapper LOGGER = new LogWrapper(LoggerFactory.getLogger(AmazonS3Storage.class)); - + /** * Byte handler pipeline. - */ - private final ByteHandlerPipeline byteHandlerPipeline; + */ + private final ByteHandlerPipeline byteHandlerPipeline; /** * Revision file data cache. - */ - private final AsyncCache cache; - - private ResourceConfiguration.AWSStorageInformation awsStorageInfo; + */ + private final AsyncCache cache; - private final AmazonS3StorageReader reader; + private ResourceConfiguration.AWSStorageInformation awsStorageInfo; + private final AmazonS3StorageReader reader; /** * Support AWS authentication only with .aws credentials file with the required * profile name from the creds file */ - public AmazonS3Storage(final ResourceConfiguration resourceConfig, - AsyncCache cache) { + public AmazonS3Storage(final ResourceConfiguration resourceConfig, AsyncCache cache) { this.awsStorageInfo = resourceConfig.awsStoreInfo; this.cache = cache; - this.byteHandlerPipeline = resourceConfig.byteHandlePipeline; + this.byteHandlerPipeline = resourceConfig.byteHandlePipeline; this.file = resourceConfig.resourcePath; - this.s3Client = getS3Client(); //this client is needed for the below checks, so initialize it here only. + this.s3Client = getS3Client(); // this client is needed for the below checks, so initialize it here only. String bucketName = awsStorageInfo.getBucketName(); boolean shouldCreateBucketIfNotExists = awsStorageInfo.shouldCreateBucketIfNotExists(); - if(!isBucketExists(bucketName) && shouldCreateBucketIfNotExists) { + if (!isBucketExists(bucketName) && shouldCreateBucketIfNotExists) { createBucket(bucketName); } - this.reader = new AmazonS3StorageReader(bucketName, - s3Client, - getDataFilePath().toAbsolutePath().toString(), - getRevisionFilePath().toAbsolutePath().toString(), - new ByteHandlerPipeline(this.byteHandlerPipeline), - SerializationType.DATA, - new PagePersister(), - cache.synchronous(), - resourceConfig); + this.reader = new AmazonS3StorageReader(bucketName, s3Client, getDataFilePath().toAbsolutePath().toString(), + getRevisionFilePath().toAbsolutePath().toString(), new ByteHandlerPipeline(this.byteHandlerPipeline), + SerializationType.DATA, new PagePersister(), cache.synchronous(), resourceConfig); } void createBucket(String bucketName) { @@ -121,38 +113,33 @@ void createBucket(String bucketName) { } boolean isBucketExists(String bucketName) { - HeadBucketRequest headBucketRequest = HeadBucketRequest.builder().bucket(bucketName).build(); + HeadBucketRequest headBucketRequest = HeadBucketRequest.builder().bucket(bucketName).build(); try { s3Client.headBucket(headBucketRequest); return true; } catch (NoSuchBucketException e) { return false; - } + } } S3Client getS3Client() { - return this.s3Client==null ? S3Client.builder() - .region(Region.of(awsStorageInfo.getAwsRegion())) - .credentialsProvider(ProfileCredentialsProvider.create(awsStorageInfo.getAwsProfile())) - .build() : this.s3Client; + return this.s3Client == null + ? S3Client.builder().region(Region.of(awsStorageInfo.getAwsRegion())) + .credentialsProvider(ProfileCredentialsProvider.create(awsStorageInfo.getAwsProfile())).build() + : this.s3Client; } S3AsyncClient getAsyncS3Client() { - return S3AsyncClient.builder() - .region(Region.of(awsStorageInfo.getAwsRegion())) - .credentialsProvider(ProfileCredentialsProvider.create(awsStorageInfo.getAwsProfile())) - .build(); + return S3AsyncClient.builder().region(Region.of(awsStorageInfo.getAwsRegion())) + .credentialsProvider(ProfileCredentialsProvider.create(awsStorageInfo.getAwsProfile())).build(); } @Override public Writer createWriter() { - return new AmazonS3StorageWriter (getDataFilePath().toAbsolutePath().toString(), - getRevisionFilePath().toAbsolutePath().toString(), - awsStorageInfo.getBucketName(), - SerializationType.DATA,new PagePersister(), - cache,reader, - this.getAsyncS3Client()); + return new AmazonS3StorageWriter(getDataFilePath().toAbsolutePath().toString(), + getRevisionFilePath().toAbsolutePath().toString(), awsStorageInfo.getBucketName(), + SerializationType.DATA, new PagePersister(), cache, reader, this.getAsyncS3Client()); } @Override @@ -169,10 +156,10 @@ public void close() { public boolean exists() { Path storage = this.reader.readObjectDataFromS3(getDataFilePath().toAbsolutePath().toString()); try { - return Files.exists(storage) && Files.size(storage) > 0; - } catch (final IOException e) { - throw new UncheckedIOException(e); - } + return Files.exists(storage) && Files.size(storage) > 0; + } catch (final IOException e) { + throw new UncheckedIOException(e); + } } @Override @@ -181,12 +168,12 @@ public ByteHandler getByteHandler() { } /** - * Getting path for data file. - * This path would be used on the local storage + * Getting path for data file. This path would be used on the local storage + * * @return the path for this data file */ private Path getDataFilePath() { - return file.resolve(ResourceConfiguration.ResourcePaths.DATA.getPath()).resolve(FILENAME); + return file.resolve(ResourceConfiguration.ResourcePaths.DATA.getPath()).resolve(FILENAME); } /** @@ -195,6 +182,6 @@ private Path getDataFilePath() { * @return the concrete storage for this database */ private Path getRevisionFilePath() { - return file.resolve(ResourceConfiguration.ResourcePaths.DATA.getPath()).resolve(REVISIONS_FILENAME); + return file.resolve(ResourceConfiguration.ResourcePaths.DATA.getPath()).resolve(REVISIONS_FILENAME); } } diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageReader.java b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageReader.java index b1c96382e..ca36e3aec 100644 --- a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageReader.java +++ b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageReader.java @@ -5,7 +5,6 @@ import java.io.IOException; import java.io.OutputStream; import java.io.RandomAccessFile; -import java.nio.file.FileSystems; import java.nio.file.Path; import java.time.Instant; @@ -16,6 +15,7 @@ import org.sirix.io.RevisionFileData; import org.sirix.io.bytepipe.ByteHandler; import org.sirix.io.file.FileReader; +import org.sirix.io.filechannel.FileChannelReader; import org.sirix.page.PagePersister; import org.sirix.page.PageReference; import org.sirix.page.RevisionRootPage; @@ -34,11 +34,11 @@ import software.amazon.awssdk.services.s3.model.S3Exception; public class AmazonS3StorageReader implements Reader { - + /** * S3 storage bucket name * - */ + */ private final String bucketName; private final S3Client s3Client; @@ -46,71 +46,60 @@ public class AmazonS3StorageReader implements Reader { private final ResourceConfiguration resourceConfig; /** Logger. */ private static final LogWrapper LOGGER = new LogWrapper(LoggerFactory.getLogger(AmazonS3StorageReader.class)); - - - private FileReader reader; - - public AmazonS3StorageReader(String bucketName, - S3Client s3Client, - String dataFileKeyName, - String revisionsOffsetFileKeyName, - final ByteHandler byteHandler, - final SerializationType serializationType, - final PagePersister pagePersister, - final Cache cache, - ResourceConfiguration resourceConfig) { + + private FileChannelReader reader; + + public AmazonS3StorageReader(String bucketName, S3Client s3Client, String dataFileKeyName, + String revisionsOffsetFileKeyName, final ByteHandler byteHandler, final SerializationType serializationType, + final PagePersister pagePersister, final Cache cache, + ResourceConfiguration resourceConfig) { this.bucketName = bucketName; this.s3Client = s3Client; this.resourceConfig = resourceConfig; Path dataFilePath = readObjectDataFromS3(dataFileKeyName); Path revisionOffsetFilePath = readObjectDataFromS3(revisionsOffsetFileKeyName); try { - this.reader = new FileReader(new RandomAccessFile(dataFilePath.toFile(), "r"), - new RandomAccessFile(revisionOffsetFilePath.toFile(), "r"), - byteHandler, - serializationType, - pagePersister, - cache); - }catch(IOException io) { + this.reader = new FileChannelReader(new RandomAccessFile(dataFilePath.toFile(), "r").getChannel(), + new RandomAccessFile(revisionOffsetFilePath.toFile(), "r").getChannel(), byteHandler, serializationType, + pagePersister, cache); + } catch (IOException io) { LOGGER.error(io.getMessage()); System.exit(1); } } - + /** * @param keyName - Key name of the object to be read from S3 storage - * @return path - The location of the local file that contains the data that is written to the file system storage - * in the system temp directory. + * @return path - The location of the local file that contains the data that is + * written to the file system storage in the system temp directory. */ protected Path readObjectDataFromS3(String keyName) { - + try { - GetObjectRequest objectRequest = GetObjectRequest - .builder() - .key(keyName) - .bucket(bucketName) - .build(); - - ResponseBytes objectBytes = s3Client.getObjectAsBytes(objectRequest); - byte[] data = objectBytes.asByteArray(); - /*As the bucketName has to be same as the database name, it makes sense to use/create file on the local filesystem - * instead of in the tmp partition*/ - Path path = resourceConfig.resourcePath; - // Write the data to a local file. - File myFile = path.toFile(); - try(OutputStream os = new FileOutputStream(myFile)){ - os.write(data); - } - return path; - } catch (IOException ex) { - ex.printStackTrace(); - } catch (S3Exception e) { - LOGGER.error(e.awsErrorDetails().errorMessage()); - System.exit(1); - } + GetObjectRequest objectRequest = GetObjectRequest.builder().key(keyName).bucket(bucketName).build(); + + ResponseBytes objectBytes = s3Client.getObjectAsBytes(objectRequest); + byte[] data = objectBytes.asByteArray(); + /* + * As the bucketName has to be same as the database name, it makes sense to + * use/create file on the local filesystem instead of in the tmp partition + */ + Path path = resourceConfig.resourcePath; + // Write the data to a local file. + File myFile = path.toFile(); + try (OutputStream os = new FileOutputStream(myFile)) { + os.write(data); + } + return path; + } catch (IOException ex) { + ex.printStackTrace(); + } catch (S3Exception e) { + LOGGER.error(e.awsErrorDetails().errorMessage()); + System.exit(1); + } return null; - } + } ByteHandler getByteHandler() { return this.reader.getByteHandler(); diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageWriter.java b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageWriter.java index f758924a4..3b8d34c61 100644 --- a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageWriter.java +++ b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageWriter.java @@ -48,259 +48,268 @@ public class AmazonS3StorageWriter extends AbstractForwardingReader implements Writer { /** - * Random access to work on. - */ - private RandomAccessFile dataFile; + * Random access to work on. + */ + private RandomAccessFile dataFile; - /** - * {@link AmazonS3StorageReader} reference for this writer. - */ - private final AmazonS3StorageReader reader; + /** + * {@link AmazonS3StorageReader} reference for this writer. + */ + private final AmazonS3StorageReader reader; - private final SerializationType type; + private final SerializationType type; - private RandomAccessFile revisionsFile; + private RandomAccessFile revisionsFile; - private final PagePersister pagePersister; + private final PagePersister pagePersister; - private final AsyncCache cache; + private final AsyncCache cache; - private boolean isFirstUberPage; + private boolean isFirstUberPage; - private final Bytes byteBufferBytes = Bytes.elasticByteBuffer(1_000); + private final Bytes byteBufferBytes = Bytes.elasticByteBuffer(1_000); - private final S3AsyncClient s3Client; + private final S3AsyncClient s3Client; - private final String bucketName; + private final String bucketName; - /** Logger. */ - private static final LogWrapper LOGGER = new LogWrapper(LoggerFactory.getLogger(AmazonS3StorageWriter.class)); + /** Logger. */ + private static final LogWrapper LOGGER = new LogWrapper(LoggerFactory.getLogger(AmazonS3StorageWriter.class)); - public AmazonS3StorageWriter (final String dataFileKeyName, final String revisionsOffsetFileKeyName, - final String bucketName, - final SerializationType serializationType, final PagePersister pagePersister, - final AsyncCache cache, final AmazonS3StorageReader reader, - final S3AsyncClient s3Client) { + public AmazonS3StorageWriter(final String dataFileKeyName, final String revisionsOffsetFileKeyName, + final String bucketName, final SerializationType serializationType, final PagePersister pagePersister, + final AsyncCache cache, final AmazonS3StorageReader reader, + final S3AsyncClient s3Client) { this.bucketName = bucketName; type = requireNonNull(serializationType); try { - this.dataFile = new RandomAccessFile(requireNonNull(reader.readObjectDataFromS3(dataFileKeyName)).toFile(),"rw"); - this.revisionsFile = type == SerializationType.DATA ? - new RandomAccessFile(requireNonNull(reader.readObjectDataFromS3(revisionsOffsetFileKeyName)).toFile(),"rw") : null; - }catch(IOException io) { - LOGGER.error(String.format("Cannot create S3 storage writer, " - + "please check if DATA path OR Revision offset file path exists. Error details : %s", io.getMessage())); + this.dataFile = new RandomAccessFile(requireNonNull(reader.readObjectDataFromS3(dataFileKeyName)).toFile(), + "rw"); + this.revisionsFile = type == SerializationType.DATA + ? new RandomAccessFile( + requireNonNull(reader.readObjectDataFromS3(revisionsOffsetFileKeyName)).toFile(), "rw") + : null; + } catch (IOException io) { + LOGGER.error(String.format( + "Cannot create S3 storage writer, " + + "please check if DATA path OR Revision offset file path exists. Error details : %s", + io.getMessage())); } - this.pagePersister = requireNonNull(pagePersister); - this.cache = cache; - this.reader = requireNonNull(reader); - this.s3Client = s3Client; + this.pagePersister = requireNonNull(pagePersister); + this.cache = cache; + this.reader = requireNonNull(reader); + this.s3Client = s3Client; } /** * @param bucketName - S3 bucket name on AWS - * @param keyName - Name of the file that includes the full path that is supposed to be used on the local file system - * @param object - File that could be read from the local filesystem that contains the actual information - * to be stored on S3 - * - * */ + * @param keyName - Name of the file that includes the full path that is + * supposed to be used on the local file system + * @param object - File that could be read from the local filesystem that + * contains the actual information to be stored on S3 + * + */ protected void writeObjectToS3(String keyName, File object, boolean isDataFile) { try { - Map metadata = new HashMap<>(); - metadata.put("x-amz-meta-sirix", isDataFile ? "data" : "revision"); - PutObjectRequest putOb = PutObjectRequest.builder() - .bucket(bucketName) - .key(keyName) - .metadata(metadata) - .build(); - - CompletableFuture objectFutureResponse = s3Client.putObject(putOb, - AsyncRequestBody.fromFile(object)); - objectFutureResponse.whenComplete((response, error) -> { - try { - if (response != null) { - LOGGER.info(String.format("Object: %s has been uploaded on %s", keyName,bucketName)); - /*No need to delete/cleanup the file as we are writing on the local file system, so this avoid - * unnecessarily filling up the filesystem space*/ - } else { - // Handle error - error.printStackTrace(); - LOGGER.error(error.getMessage()); - System.exit(1); - } - } finally { - s3Client.close(); - } - }); - - objectFutureResponse.join(); - } catch (S3Exception e) { - LOGGER.error(e.awsErrorDetails().errorMessage()); - System.exit(1); - } + Map metadata = new HashMap<>(); + metadata.put("x-amz-meta-sirix", isDataFile ? "data" : "revision"); + PutObjectRequest putOb = PutObjectRequest.builder().bucket(bucketName).key(keyName).metadata(metadata) + .build(); + + CompletableFuture objectFutureResponse = s3Client.putObject(putOb, + AsyncRequestBody.fromFile(object)); + objectFutureResponse.whenComplete((response, error) -> { + try { + if (response != null) { + LOGGER.info(String.format("Object: %s has been uploaded on %s", keyName, bucketName)); + /* + * No need to delete/cleanup the file as we are writing on the local file + * system, so this avoid unnecessarily filling up the filesystem space + */ + } else { + // Handle error + error.printStackTrace(); + LOGGER.error(error.getMessage()); + System.exit(1); + } + } finally { + s3Client.close(); + } + }); + + objectFutureResponse.join(); + } catch (S3Exception e) { + LOGGER.error(e.awsErrorDetails().errorMessage()); + System.exit(1); + } } - @Override public void close() { try { - if (dataFile != null) { - dataFile.close(); - } - if (revisionsFile != null) { - revisionsFile.close(); - } - if (reader != null) { - reader.close(); - } - this.s3Client.close(); - } catch (final IOException e) { - throw new SirixIOException(e); - } + if (dataFile != null) { + dataFile.close(); + } + if (revisionsFile != null) { + revisionsFile.close(); + } + if (reader != null) { + reader.close(); + } + this.s3Client.close(); + } catch (final IOException e) { + throw new SirixIOException(e); + } } @Override public Writer write(PageReadOnlyTrx pageReadOnlyTrx, PageReference pageReference, Bytes bufferedBytes) { try { - final long fileSize = dataFile.length(); - long offset = fileSize == 0 ? IOStorage.FIRST_BEACON : fileSize; - return writePageReference(pageReadOnlyTrx, pageReference, offset); - } catch (final IOException e) { - throw new SirixIOException(e); - } + final long fileSize = dataFile.length(); + long offset = fileSize == 0 ? IOStorage.FIRST_BEACON : fileSize; + return writePageReference(pageReadOnlyTrx, pageReference, offset); + } catch (final IOException e) { + throw new SirixIOException(e); + } } private String getFileKeyName(String fileDescriptorPath) { - return fileDescriptorPath.substring((System.getProperty("java.io.tmpdir")+FileSystems.getDefault().getSeparator()).length()); + return fileDescriptorPath + .substring((System.getProperty("java.io.tmpdir") + FileSystems.getDefault().getSeparator()).length()); } @NotNull - private AmazonS3StorageWriter writePageReference(final PageReadOnlyTrx pageReadOnlyTrx, final PageReference pageReference, - long offset) { - // Perform byte operations. - try { - // Serialize page. - final Page page = pageReference.getPage(); - - final byte[] serializedPage; - - try (final ByteArrayOutputStream output = new ByteArrayOutputStream(1_000); - final DataOutputStream dataOutput = new DataOutputStream(reader.getByteHandler().serialize(output))) { - pagePersister.serializePage(pageReadOnlyTrx, byteBufferBytes, page, type); - final var byteArray = byteBufferBytes.toByteArray(); - dataOutput.write(byteArray); - dataOutput.flush(); - serializedPage = output.toByteArray(); - } - - byteBufferBytes.clear(); - - final byte[] writtenPage = new byte[serializedPage.length + IOStorage.OTHER_BEACON]; - final ByteBuffer buffer = ByteBuffer.allocate(writtenPage.length); - buffer.putInt(serializedPage.length); - buffer.put(serializedPage); - buffer.flip(); - buffer.get(writtenPage); - - // Getting actual offset and appending to the end of the current file. - if (type == SerializationType.DATA) { - if (page instanceof RevisionRootPage) { - if (offset % REVISION_ROOT_PAGE_BYTE_ALIGN != 0) { - offset += REVISION_ROOT_PAGE_BYTE_ALIGN - (offset % REVISION_ROOT_PAGE_BYTE_ALIGN); - } - } else if (offset % PAGE_FRAGMENT_BYTE_ALIGN != 0) { - offset += PAGE_FRAGMENT_BYTE_ALIGN - (offset % PAGE_FRAGMENT_BYTE_ALIGN); - } - } - dataFile.seek(offset); - dataFile.write(writtenPage); - /*Write the file object to S3*/ - this.writeObjectToS3(this.getFileKeyName(dataFile.getFD().toString()), new File(dataFile.getFD().toString()), Boolean.TRUE); - - // Remember page coordinates. - pageReference.setKey(offset); - - if (page instanceof KeyValueLeafPage keyValueLeafPage) { - pageReference.setHash(keyValueLeafPage.getHashCode()); - } else { - /*TODO : Check for correctness of this*/ - pageReference.setHash(reader.getHashFunction().hashBytes(serializedPage).asBytes()); - } - - if (type == SerializationType.DATA) { - if (page instanceof RevisionRootPage revisionRootPage) { - if (revisionRootPage.getRevision() == 0) { - revisionsFile.seek(revisionsFile.length() + IOStorage.FIRST_BEACON); - } else { - revisionsFile.seek(revisionsFile.length()); - } - revisionsFile.writeLong(offset); - revisionsFile.writeLong(revisionRootPage.getRevisionTimestamp()); - if (cache != null) { - final long currOffset = offset; - cache.put(revisionRootPage.getRevision(), - CompletableFuture.supplyAsync(() -> new RevisionFileData(currOffset, - Instant.ofEpochMilli(revisionRootPage.getRevisionTimestamp())))); - } - } else if (page instanceof UberPage && isFirstUberPage) { - revisionsFile.seek(0); - revisionsFile.write(serializedPage); - revisionsFile.seek(IOStorage.FIRST_BEACON >> 1); - revisionsFile.write(serializedPage); - } - this.writeObjectToS3(this.getFileKeyName(revisionsFile.getFD().toString()), new File(revisionsFile.getFD().toString()), Boolean.FALSE); - } - - return this; - } catch (final IOException e) { - throw new SirixIOException(e); - } - } + private AmazonS3StorageWriter writePageReference(final PageReadOnlyTrx pageReadOnlyTrx, + final PageReference pageReference, long offset) { + // Perform byte operations. + try { + // Serialize page. + final Page page = pageReference.getPage(); + + final byte[] serializedPage; + + try (final ByteArrayOutputStream output = new ByteArrayOutputStream(1_000); + final DataOutputStream dataOutput = new DataOutputStream( + reader.getByteHandler().serialize(output))) { + pagePersister.serializePage(pageReadOnlyTrx, byteBufferBytes, page, type); + final var byteArray = byteBufferBytes.toByteArray(); + dataOutput.write(byteArray); + dataOutput.flush(); + serializedPage = output.toByteArray(); + } + + byteBufferBytes.clear(); + + final byte[] writtenPage = new byte[serializedPage.length + IOStorage.OTHER_BEACON]; + final ByteBuffer buffer = ByteBuffer.allocate(writtenPage.length); + buffer.putInt(serializedPage.length); + buffer.put(serializedPage); + buffer.flip(); + buffer.get(writtenPage); + + // Getting actual offset and appending to the end of the current file. + if (type == SerializationType.DATA) { + if (page instanceof RevisionRootPage) { + if (offset % REVISION_ROOT_PAGE_BYTE_ALIGN != 0) { + offset += REVISION_ROOT_PAGE_BYTE_ALIGN - (offset % REVISION_ROOT_PAGE_BYTE_ALIGN); + } + } else if (offset % PAGE_FRAGMENT_BYTE_ALIGN != 0) { + offset += PAGE_FRAGMENT_BYTE_ALIGN - (offset % PAGE_FRAGMENT_BYTE_ALIGN); + } + } + dataFile.seek(offset); + dataFile.write(writtenPage); + /* Write the file object to S3 */ + this.writeObjectToS3(this.getFileKeyName(dataFile.getFD().toString()), + new File(dataFile.getFD().toString()), Boolean.TRUE); + + // Remember page coordinates. + pageReference.setKey(offset); + + if (page instanceof KeyValueLeafPage keyValueLeafPage) { + pageReference.setHash(keyValueLeafPage.getHashCode()); + } else { + pageReference.setHash(reader.getHashFunction().hashBytes(serializedPage).asBytes()); + } + + if (type == SerializationType.DATA) { + if (page instanceof RevisionRootPage revisionRootPage) { + if (revisionRootPage.getRevision() == 0) { + revisionsFile.seek(revisionsFile.length() + IOStorage.FIRST_BEACON); + } else { + revisionsFile.seek(revisionsFile.length()); + } + revisionsFile.writeLong(offset); + revisionsFile.writeLong(revisionRootPage.getRevisionTimestamp()); + if (cache != null) { + final long currOffset = offset; + cache.put(revisionRootPage.getRevision(), + CompletableFuture.supplyAsync(() -> new RevisionFileData(currOffset, + Instant.ofEpochMilli(revisionRootPage.getRevisionTimestamp())))); + } + } else if (page instanceof UberPage && isFirstUberPage) { + revisionsFile.seek(0); + revisionsFile.write(serializedPage); + revisionsFile.seek(IOStorage.FIRST_BEACON >> 1); + revisionsFile.write(serializedPage); + } + this.writeObjectToS3(this.getFileKeyName(revisionsFile.getFD().toString()), + new File(revisionsFile.getFD().toString()), Boolean.FALSE); + } + + return this; + } catch (final IOException e) { + throw new SirixIOException(e); + } + } @Override public Writer writeUberPageReference(PageReadOnlyTrx pageReadOnlyTrx, PageReference pageReference, Bytes bufferedBytes) { isFirstUberPage = true; - writePageReference(pageReadOnlyTrx, pageReference, 0); - isFirstUberPage = false; - writePageReference(pageReadOnlyTrx, pageReference, 100); - return this; + writePageReference(pageReadOnlyTrx, pageReference, 0); + isFirstUberPage = false; + writePageReference(pageReadOnlyTrx, pageReference, 100); + return this; } @Override public Writer truncateTo(PageReadOnlyTrx pageReadOnlyTrx, int revision) { try { - final var dataFileRevisionRootPageOffset = - cache.get(revision, (unused) -> getRevisionFileData(revision)).get(5, TimeUnit.SECONDS).offset(); - - // Read page from file. - dataFile.seek(dataFileRevisionRootPageOffset); - final int dataLength = dataFile.readInt(); - - dataFile.getChannel().truncate(dataFileRevisionRootPageOffset + IOStorage.OTHER_BEACON + dataLength); - this.writeObjectToS3(getFileKeyName(dataFile.getFD().toString()), new File(dataFile.getFD().toString()), Boolean.TRUE); - } catch (InterruptedException | ExecutionException | TimeoutException | IOException e) { - throw new IllegalStateException(e); - } + final var dataFileRevisionRootPageOffset = cache.get(revision, (unused) -> getRevisionFileData(revision)) + .get(5, TimeUnit.SECONDS).offset(); + + // Read page from file. + dataFile.seek(dataFileRevisionRootPageOffset); + final int dataLength = dataFile.readInt(); + + dataFile.getChannel().truncate(dataFileRevisionRootPageOffset + IOStorage.OTHER_BEACON + dataLength); + this.writeObjectToS3(getFileKeyName(dataFile.getFD().toString()), new File(dataFile.getFD().toString()), + Boolean.TRUE); + } catch (InterruptedException | ExecutionException | TimeoutException | IOException e) { + throw new IllegalStateException(e); + } - return this; + return this; } @Override public Writer truncate() { try { - dataFile.setLength(0); - this.writeObjectToS3(getFileKeyName(dataFile.getFD().toString()), new File(dataFile.getFD().toString()), Boolean.TRUE); - if (revisionsFile != null) { - revisionsFile.setLength(0); - this.writeObjectToS3(getFileKeyName(revisionsFile.getFD().toString()), new File(revisionsFile.getFD().toString()), Boolean.FALSE); - } - } catch (final IOException e) { - throw new SirixIOException(e); - } - - return this; + dataFile.setLength(0); + this.writeObjectToS3(getFileKeyName(dataFile.getFD().toString()), new File(dataFile.getFD().toString()), + Boolean.TRUE); + if (revisionsFile != null) { + revisionsFile.setLength(0); + this.writeObjectToS3(getFileKeyName(revisionsFile.getFD().toString()), + new File(revisionsFile.getFD().toString()), Boolean.FALSE); + } + } catch (final IOException e) { + throw new SirixIOException(e); + } + + return this; } @Override diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/file/FileReader.java b/bundles/sirix-core/src/main/java/org/sirix/io/file/FileReader.java index 0bbe9dd6d..0a9e5c6ea 100644 --- a/bundles/sirix-core/src/main/java/org/sirix/io/file/FileReader.java +++ b/bundles/sirix-core/src/main/java/org/sirix/io/file/FileReader.java @@ -205,12 +205,4 @@ public void close() { throw new SirixIOException(e); } } - - public ByteHandler getByteHandler() { - return this.byteHandler; - } - - public HashFunction getHashFunction() { - return this.hashFunction; - } } diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/filechannel/FileChannelReader.java b/bundles/sirix-core/src/main/java/org/sirix/io/filechannel/FileChannelReader.java index 195cc0074..9a32ac3cc 100644 --- a/bundles/sirix-core/src/main/java/org/sirix/io/filechannel/FileChannelReader.java +++ b/bundles/sirix-core/src/main/java/org/sirix/io/filechannel/FileChannelReader.java @@ -167,4 +167,11 @@ public RevisionFileData getRevisionFileData(int revision) { public void close() { } + public ByteHandler getByteHandler() { + return this.byteHandler; + } + + public HashFunction getHashFunction() { + return this.hashFunction; + } } From 1e922229eb11e207299aabc82e3d8758547e06df Mon Sep 17 00:00:00 2001 From: sband Date: Sun, 11 Jun 2023 16:59:09 +0530 Subject: [PATCH 15/16] fix #582: Rectify failing test' --- .../org/sirix/access/ResourceConfiguration.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/bundles/sirix-core/src/main/java/org/sirix/access/ResourceConfiguration.java b/bundles/sirix-core/src/main/java/org/sirix/access/ResourceConfiguration.java index 6b24ddf33..85958f2eb 100644 --- a/bundles/sirix-core/src/main/java/org/sirix/access/ResourceConfiguration.java +++ b/bundles/sirix-core/src/main/java/org/sirix/access/ResourceConfiguration.java @@ -59,6 +59,7 @@ import org.sirix.utils.OS; import com.google.common.base.MoreObjects; +import com.google.common.base.Optional; import com.google.gson.stream.JsonReader; import com.google.gson.stream.JsonWriter; @@ -371,7 +372,9 @@ private ResourceConfiguration(final ResourceConfiguration.Builder builder) { customCommitTimestamps = builder.customCommitTimestamps; storeNodeHistory = builder.storeNodeHistory; binaryVersion = builder.binaryEncodingVersion; - awsStoreInfo = builder.awsStoreInfo; + if(builder.awsStoreInfo != null) { + awsStoreInfo = builder.awsStoreInfo; + } } public BinaryEncodingVersion getBinaryEncodingVersion() { @@ -648,13 +651,11 @@ public static ResourceConfiguration deserialize(final Path file) throws SirixIOE name = jsonReader.nextName(); assert name.equals(JSONNAMES[16]); final boolean storeChildCount = jsonReader.nextBoolean(); - name = jsonReader.nextName(); + //name = jsonReader.nextName(); AWSStorageInformation awsStoreInfo=null; try { + name = jsonReader.nextName(); assert name.equals(JSONNAMES[17]); - /*Since awsStore information is optional, it is important that we add a check on this instead - * of test cases failing because of this change*/ - /*Begin object to read the nested json properties required aws connection*/ jsonReader.beginObject(); if(jsonReader.hasNext()) { final String awsProfile=jsonReader.nextString(); @@ -664,9 +665,8 @@ public static ResourceConfiguration deserialize(final Path file) throws SirixIOE awsStoreInfo = new AWSStorageInformation(awsProfile,awsRegion, bucketName, shouldCreateBucketIfNotExists); } jsonReader.endObject(); - /*End object to end reading the nested json properties*/ - }catch(SirixIOException | EOFException io) { - /*Ignore exception, as this information is optional*/ + }catch(SirixIOException | EOFException | IllegalStateException io) { + } jsonReader.endObject(); jsonReader.close(); From d5fd21a8191cb8c53fc13b6f4d0e2c32ed615b5d Mon Sep 17 00:00:00 2001 From: sband Date: Sun, 18 Jun 2023 17:41:43 +0530 Subject: [PATCH 16/16] fix#582 : Test case --- bundles/sirix-core/build.gradle | 1 + .../io/cloud/amazon/AWSS3StorageTest.java | 76 +++++++++++++++++++ libraries.gradle | 3 +- 3 files changed, 79 insertions(+), 1 deletion(-) create mode 100644 bundles/sirix-core/src/test/java/org/sirix/io/cloud/amazon/AWSS3StorageTest.java diff --git a/bundles/sirix-core/build.gradle b/bundles/sirix-core/build.gradle index 816e04fa2..6ccbd7afa 100644 --- a/bundles/sirix-core/build.gradle +++ b/bundles/sirix-core/build.gradle @@ -43,6 +43,7 @@ dependencies { testImplementation testLibraries.commonsCollections4 testImplementation testLibraries.commonsCollections4Tests testImplementation testLibraries.assertjCore + testImplementation testLibraries.s3Mock } description = 'SirixDB is a hybrid on-disk and in-memory document oriented, versioned database system. It has a ' + diff --git a/bundles/sirix-core/src/test/java/org/sirix/io/cloud/amazon/AWSS3StorageTest.java b/bundles/sirix-core/src/test/java/org/sirix/io/cloud/amazon/AWSS3StorageTest.java new file mode 100644 index 000000000..811774895 --- /dev/null +++ b/bundles/sirix-core/src/test/java/org/sirix/io/cloud/amazon/AWSS3StorageTest.java @@ -0,0 +1,76 @@ +package org.sirix.io.cloud.amazon; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.net.URI; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.sirix.XmlTestHelper; +import org.sirix.XmlTestHelper.PATHS; +import org.sirix.access.ResourceConfiguration; +import org.sirix.api.Database; +import org.sirix.api.xml.XmlResourceSession; +import org.sirix.io.StorageType; + +import io.findify.s3mock.S3Mock; +import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; + +public class AWSS3StorageTest { + + private AmazonS3Storage awsStorage; + private S3Client s3Client; + private AmazonS3StorageWriter cloudWriter; + private AmazonS3StorageReader cloudReader; + + @Before + public void setup() { + final ResourceConfiguration.Builder resourceConfig = new ResourceConfiguration.Builder(XmlTestHelper.RESOURCE); + resourceConfig.storageType(StorageType.CLOUD); + Database xmlDatabase = XmlTestHelper.getDatabase(PATHS.PATH1.getFile()); + resourceConfig.awsStoreInfo(new ResourceConfiguration.AWSStorageInformation("default", + Region.US_EAST_1.id(), xmlDatabase.getName(), true)); + ResourceConfiguration testResources = resourceConfig.build(); + S3Mock api = S3Mock.create(8001, "."); + api.start(); + s3Client = S3Client.builder().region(Region.of(testResources.awsStoreInfo.getAwsRegion())) + .credentialsProvider(AnonymousCredentialsProvider.create()) + .dualstackEnabled(true) + .endpointOverride(URI.create("http://127.0.0.1:8001")) + .build(); + testResources.resourcePath = PATHS.PATH1.getFile(); + + awsStorage = (AmazonS3Storage)StorageType.CLOUD.getInstance(testResources); + awsStorage.setS3Client(s3Client); + cloudWriter = (AmazonS3StorageWriter)awsStorage.createWriter(); + cloudReader = (AmazonS3StorageReader)awsStorage.createReader(); + } + + @Test + public void testS3StorageWriterNotNull() { + assertNotNull(cloudWriter); + } + + @Test + public void testS3StorageReaderNotNull() { + assertNotNull(cloudReader); + } + + @Test + public void testCreateBucket() { + awsStorage.createBucket(); + assertTrue(awsStorage.isBucketExists()); + } + + + @After + public void tearDown() { + XmlTestHelper.deleteEverything(); + s3Client.close(); + } + + +} \ No newline at end of file diff --git a/libraries.gradle b/libraries.gradle index f7b8d66ec..9cad91620 100644 --- a/libraries.gradle +++ b/libraries.gradle @@ -67,5 +67,6 @@ testLibraries = [ kotestAssertions : 'io.kotest:kotest-assertions-core-jvm:4.0.5', commonsCollections4 : 'org.apache.commons:commons-collections4:4.3', commonsCollections4Tests : 'org.apache.commons:commons-collections4:4.3:tests', - assertjCore : 'org.assertj:assertj-core:3.23.1' + assertjCore : 'org.assertj:assertj-core:3.23.1', + s3Mock : 'io.findify:s3mock_2.13:0.2.6' ]