From 663de0ad84e53d8fce675b5c202e529d47a49d4a Mon Sep 17 00:00:00 2001 From: Lavkesh Lahngir Date: Wed, 30 Aug 2023 18:45:44 +0800 Subject: [PATCH] chore: dlq-fixes (#18) * chore: dlq-fixes * chore: version upgrade * chore: fix tests --- build.gradle | 4 ++-- .../firehose/config/GCSConfig.java | 3 +++ .../gotocompany/firehose/config/S3Config.java | 3 +++ .../blobstorage/gcs/GoogleCloudStorage.java | 21 +++++++++++-------- .../sink/common/blobstorage/s3/S3.java | 12 +++++++++-- .../dlq/blobstorage/BlobStorageDlqWriter.java | 3 ++- .../sink/dlq/blobstorage/DlqMessage.java | 2 ++ .../common/gcs/GoogleCloudStorageTest.java | 17 ++++++++++++++- .../blobstorage/BlobStorageDlqWriterTest.java | 16 +++++++------- 9 files changed, 58 insertions(+), 23 deletions(-) diff --git a/build.gradle b/build.gradle index 08da203b9..0b6a05a4c 100644 --- a/build.gradle +++ b/build.gradle @@ -33,7 +33,7 @@ lombok { } group 'com.gotocompany' -version '0.8.12' +version '0.8.13' def projName = "firehose" @@ -101,7 +101,7 @@ dependencies { implementation platform('com.google.cloud:libraries-bom:20.5.0') implementation 'com.google.cloud:google-cloud-storage:2.20.1' implementation 'org.apache.logging.log4j:log4j-core:2.20.0' - implementation group: 'com.gotocompany', name: 'depot', version: '0.4.9' + implementation group: 'com.gotocompany', name: 'depot', version: '0.6.0' implementation group: 'com.networknt', name: 'json-schema-validator', version: '1.0.59' exclude group: 'org.slf4j' testImplementation group: 'junit', name: 'junit', version: '4.11' diff --git a/src/main/java/com/gotocompany/firehose/config/GCSConfig.java b/src/main/java/com/gotocompany/firehose/config/GCSConfig.java index e194a4cc4..2d178596e 100644 --- a/src/main/java/com/gotocompany/firehose/config/GCSConfig.java +++ b/src/main/java/com/gotocompany/firehose/config/GCSConfig.java @@ -13,6 +13,9 @@ public interface GCSConfig extends Config { @Key("${GCS_TYPE}_GCS_BUCKET_NAME") String getGCSBucketName(); + @Key("${GCS_TYPE}_GCS_DIRECTORY_PREFIX") + String getGCSDirectoryPrefix(); + @Key("${GCS_TYPE}_GCS_CREDENTIAL_PATH") String getGCSCredentialPath(); diff --git a/src/main/java/com/gotocompany/firehose/config/S3Config.java b/src/main/java/com/gotocompany/firehose/config/S3Config.java index e0b487e54..2f6c3335f 100644 --- a/src/main/java/com/gotocompany/firehose/config/S3Config.java +++ b/src/main/java/com/gotocompany/firehose/config/S3Config.java @@ -9,6 +9,9 @@ public interface S3Config extends Config { @Key("${S3_TYPE}_S3_BUCKET_NAME") String getS3BucketName(); + @Key("${S3_TYPE}_S3_DIRECTORY_PREFIX") + String getS3DirectoryPrefix(); + @Key("${S3_TYPE}_S3_ACCESS_KEY") String getS3AccessKey(); diff --git a/src/main/java/com/gotocompany/firehose/sink/common/blobstorage/gcs/GoogleCloudStorage.java b/src/main/java/com/gotocompany/firehose/sink/common/blobstorage/gcs/GoogleCloudStorage.java index 191be9143..2facf6b4c 100644 --- a/src/main/java/com/gotocompany/firehose/sink/common/blobstorage/gcs/GoogleCloudStorage.java +++ b/src/main/java/com/gotocompany/firehose/sink/common/blobstorage/gcs/GoogleCloudStorage.java @@ -17,11 +17,9 @@ import org.threeten.bp.Duration; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; -import java.util.Date; public class GoogleCloudStorage implements BlobStorage { private static final Logger LOGGER = LoggerFactory.getLogger(GoogleCloudStorage.class); @@ -29,7 +27,7 @@ public class GoogleCloudStorage implements BlobStorage { private final Storage storage; public GoogleCloudStorage(GCSConfig gcsConfig) throws IOException { - this(gcsConfig, GoogleCredentials.fromStream(new FileInputStream(gcsConfig.getGCSCredentialPath()))); + this(gcsConfig, GoogleCredentials.fromStream(Files.newInputStream(Paths.get(gcsConfig.getGCSCredentialPath())))); checkBucket(); logRetentionPolicy(); } @@ -73,29 +71,34 @@ private void logRetentionPolicy() { Storage.BucketGetOption.fields(Storage.BucketField.RETENTION_POLICY), Storage.BucketGetOption.userProject(gcsConfig.getGCloudProjectID())); LOGGER.info("Retention Policy for {}", bucketName); - LOGGER.info("Retention Period: {}", bucket.getRetentionPeriod()); + LOGGER.info("Retention Period: {}", bucket.getRetentionPeriodDuration()); if (bucket.retentionPolicyIsLocked() != null && bucket.retentionPolicyIsLocked()) { LOGGER.info("Retention Policy is locked"); } - if (bucket.getRetentionEffectiveTime() != null) { - LOGGER.info("Effective Time: {}", new Date(bucket.getRetentionEffectiveTime())); - } } @Override public void store(String objectName, String filePath) throws BlobStorageException { + String finalPath = createPath(objectName); try { byte[] content = Files.readAllBytes(Paths.get(filePath)); - store(objectName, content); + store(finalPath, content); } catch (IOException e) { LOGGER.error("Failed to read local file {}", filePath); throw new BlobStorageException("file_io_error", "File Read failed", e); } } + private String createPath(String objectName) { + String prefix = gcsConfig.getGCSDirectoryPrefix(); + return prefix == null || prefix.isEmpty() + ? objectName : Paths.get(prefix, objectName).toString(); + } + @Override public void store(String objectName, byte[] content) throws BlobStorageException { - BlobInfo blobInfo = BlobInfo.newBuilder(BlobId.of(gcsConfig.getGCSBucketName(), objectName)).build(); + String finalPath = createPath(objectName); + BlobInfo blobInfo = BlobInfo.newBuilder(BlobId.of(gcsConfig.getGCSBucketName(), finalPath)).build(); String blobPath = String.join(File.separator, blobInfo.getBucket(), blobInfo.getName()); try { storage.create(blobInfo, content, Storage.BlobTargetOption.userProject(gcsConfig.getGCloudProjectID())); diff --git a/src/main/java/com/gotocompany/firehose/sink/common/blobstorage/s3/S3.java b/src/main/java/com/gotocompany/firehose/sink/common/blobstorage/s3/S3.java index c22c24c6a..8638b1e79 100644 --- a/src/main/java/com/gotocompany/firehose/sink/common/blobstorage/s3/S3.java +++ b/src/main/java/com/gotocompany/firehose/sink/common/blobstorage/s3/S3.java @@ -72,9 +72,10 @@ private void checkBucket() { @Override public void store(String objectName, String filePath) throws BlobStorageException { + String finalPath = createPath(objectName); try { byte[] content = Files.readAllBytes(Paths.get(filePath)); - store(objectName, content); + store(finalPath, content); } catch (IOException e) { LOGGER.error("Failed to read local file {}", filePath); throw new BlobStorageException("file_io_error", "File Read failed", e); @@ -83,10 +84,11 @@ public void store(String objectName, String filePath) throws BlobStorageExceptio @Override public void store(String objectName, byte[] content) throws BlobStorageException { + String finalPath = createPath(objectName); try { PutObjectRequest putObject = PutObjectRequest.builder() .bucket(s3Config.getS3BucketName()) - .key(objectName) + .key(finalPath) .build(); s3Client.putObject(putObject, RequestBody.fromBytes(content)); LOGGER.info("Created object in S3 {}", objectName); @@ -95,4 +97,10 @@ public void store(String objectName, byte[] content) throws BlobStorageException throw new BlobStorageException(ase.getMessage(), ase.getMessage(), ase); } } + + private String createPath(String objectName) { + String prefix = s3Config.getS3DirectoryPrefix(); + return prefix == null || prefix.isEmpty() + ? objectName : Paths.get(prefix, objectName).toString(); + } } diff --git a/src/main/java/com/gotocompany/firehose/sink/dlq/blobstorage/BlobStorageDlqWriter.java b/src/main/java/com/gotocompany/firehose/sink/dlq/blobstorage/BlobStorageDlqWriter.java index e7aa2dbb7..3fde54710 100644 --- a/src/main/java/com/gotocompany/firehose/sink/dlq/blobstorage/BlobStorageDlqWriter.java +++ b/src/main/java/com/gotocompany/firehose/sink/dlq/blobstorage/BlobStorageDlqWriter.java @@ -60,7 +60,8 @@ private String convertToString(Message message) { message.getPartition(), message.getOffset(), message.getTimestamp(), - message.getErrorInfo().toString())); + message.getErrorInfo().toString(), + message.getErrorInfo().getErrorType().name())); } catch (JsonProcessingException e) { log.warn("Not able to convert message into json", e); return ""; diff --git a/src/main/java/com/gotocompany/firehose/sink/dlq/blobstorage/DlqMessage.java b/src/main/java/com/gotocompany/firehose/sink/dlq/blobstorage/DlqMessage.java index 256f00fcb..67bb59781 100644 --- a/src/main/java/com/gotocompany/firehose/sink/dlq/blobstorage/DlqMessage.java +++ b/src/main/java/com/gotocompany/firehose/sink/dlq/blobstorage/DlqMessage.java @@ -21,4 +21,6 @@ public class DlqMessage { private long timestamp; @JsonProperty("error") private String error; + @JsonProperty("error_type") + private String errorType; } diff --git a/src/test/java/com/gotocompany/firehose/sink/common/gcs/GoogleCloudStorageTest.java b/src/test/java/com/gotocompany/firehose/sink/common/gcs/GoogleCloudStorageTest.java index 86945294c..8041d5e4e 100644 --- a/src/test/java/com/gotocompany/firehose/sink/common/gcs/GoogleCloudStorageTest.java +++ b/src/test/java/com/gotocompany/firehose/sink/common/gcs/GoogleCloudStorageTest.java @@ -18,7 +18,22 @@ public class GoogleCloudStorageTest { @Test - public void shouldCallStorage() throws BlobStorageException { + public void shouldCallStorageWithPrefix() throws BlobStorageException { + GCSConfig config = ConfigFactory.create(GCSConfig.class, new HashMap() {{ + put("GCS_TYPE", "SOME_TYPE"); + put("SOME_TYPE_GCS_BUCKET_NAME", "TestBucket"); + put("SOME_TYPE_GCS_GOOGLE_CLOUD_PROJECT_ID", "projectID"); + put("SOME_TYPE_GCS_DIRECTORY_PREFIX", "some-name"); + }}); + Storage storage = Mockito.mock(Storage.class); + GoogleCloudStorage gcs = new GoogleCloudStorage(config, storage); + BlobInfo blobInfo = BlobInfo.newBuilder(BlobId.of("TestBucket", "some-name/test")).build(); + gcs.store("test", new byte[]{}); + Mockito.verify(storage, Mockito.times(1)).create(blobInfo, new byte[]{}, Storage.BlobTargetOption.userProject("projectID")); + } + + @Test + public void shouldCallStorageWithoutPrefix() throws BlobStorageException { GCSConfig config = ConfigFactory.create(GCSConfig.class, new HashMap() {{ put("GCS_TYPE", "SOME_TYPE"); put("SOME_TYPE_GCS_BUCKET_NAME", "TestBucket"); diff --git a/src/test/java/com/gotocompany/firehose/sink/dlq/blobstorage/BlobStorageDlqWriterTest.java b/src/test/java/com/gotocompany/firehose/sink/dlq/blobstorage/BlobStorageDlqWriterTest.java index c55d6eed2..d2942b30b 100644 --- a/src/test/java/com/gotocompany/firehose/sink/dlq/blobstorage/BlobStorageDlqWriterTest.java +++ b/src/test/java/com/gotocompany/firehose/sink/dlq/blobstorage/BlobStorageDlqWriterTest.java @@ -48,11 +48,11 @@ public void shouldWriteMessagesWithoutErrorInfoToObjectStorage() throws IOExcept Assert.assertEquals(0, blobStorageDLQWriter.write(messages).size()); verify(blobStorage).store(contains("booking/2020-01-02"), - eq(("{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":3,\"timestamp\":1577923200000,\"error\":\"Exception test, ErrorType: DESERIALIZATION_ERROR\"}\n" - + "{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":4,\"timestamp\":1577923200000,\"error\":\"Exception test, ErrorType: DESERIALIZATION_ERROR\"}").getBytes())); + eq(("{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":3,\"timestamp\":1577923200000,\"error\":\"Exception test, ErrorType: DESERIALIZATION_ERROR\",\"error_type\":\"DESERIALIZATION_ERROR\"}\n" + + "{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":4,\"timestamp\":1577923200000,\"error\":\"Exception test, ErrorType: DESERIALIZATION_ERROR\",\"error_type\":\"DESERIALIZATION_ERROR\"}").getBytes())); verify(blobStorage).store(contains("booking/2020-01-01"), - eq(("{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":1,\"timestamp\":1577836800000,\"error\":\"Exception test, ErrorType: DESERIALIZATION_ERROR\"}\n" - + "{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":2,\"timestamp\":1577836800000,\"error\":\"Exception test, ErrorType: DESERIALIZATION_ERROR\"}").getBytes())); + eq(("{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":1,\"timestamp\":1577836800000,\"error\":\"Exception test, ErrorType: DESERIALIZATION_ERROR\",\"error_type\":\"DESERIALIZATION_ERROR\"}\n" + + "{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":2,\"timestamp\":1577836800000,\"error\":\"Exception test, ErrorType: DESERIALIZATION_ERROR\",\"error_type\":\"DESERIALIZATION_ERROR\"}").getBytes())); } @Test @@ -69,11 +69,11 @@ public void shouldWriteMessageErrorTypesToObjectStorage() throws IOException, Bl Assert.assertEquals(0, blobStorageDLQWriter.write(messages).size()); verify(blobStorage).store(contains("booking/2020-01-02"), - eq(("{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":3,\"timestamp\":1577923200000,\"error\":\"Exception , ErrorType: DESERIALIZATION_ERROR\"}\n" - + "{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":4,\"timestamp\":1577923200000,\"error\":\"Exception , ErrorType: SINK_UNKNOWN_ERROR\"}").getBytes())); + eq(("{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":3,\"timestamp\":1577923200000,\"error\":\"Exception , ErrorType: DESERIALIZATION_ERROR\",\"error_type\":\"DESERIALIZATION_ERROR\"}\n" + + "{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":4,\"timestamp\":1577923200000,\"error\":\"Exception , ErrorType: SINK_UNKNOWN_ERROR\",\"error_type\":\"SINK_UNKNOWN_ERROR\"}").getBytes())); verify(blobStorage).store(contains("booking/2020-01-01"), - eq(("{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":1,\"timestamp\":1577836800000,\"error\":\"Exception , ErrorType: DESERIALIZATION_ERROR\"}\n" - + "{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":2,\"timestamp\":1577836800000,\"error\":\"Exception null, ErrorType: SINK_UNKNOWN_ERROR\"}").getBytes())); + eq(("{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":1,\"timestamp\":1577836800000,\"error\":\"Exception , ErrorType: DESERIALIZATION_ERROR\",\"error_type\":\"DESERIALIZATION_ERROR\"}\n" + + "{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":2,\"timestamp\":1577836800000,\"error\":\"Exception null, ErrorType: SINK_UNKNOWN_ERROR\",\"error_type\":\"SINK_UNKNOWN_ERROR\"}").getBytes())); } @Test