diff --git a/lambdas/ResumeProcessor/ResumeRequestPreProcessorFunction/src/main/java/com/levio/awsdemo/resumerequestpreprocessor/App.java b/lambdas/ResumeProcessor/ResumeRequestPreProcessorFunction/src/main/java/com/levio/awsdemo/resumerequestpreprocessor/App.java index 0c3fbb5..bb85ed7 100644 --- a/lambdas/ResumeProcessor/ResumeRequestPreProcessorFunction/src/main/java/com/levio/awsdemo/resumerequestpreprocessor/App.java +++ b/lambdas/ResumeProcessor/ResumeRequestPreProcessorFunction/src/main/java/com/levio/awsdemo/resumerequestpreprocessor/App.java @@ -6,19 +6,27 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.joda.JodaModule; +import com.levio.awsdemo.resumerequestpreprocessor.service.S3Service; import com.levio.awsdemo.resumerequestpreprocessor.service.SqsProducerService; +import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; + +import java.util.Map; public class App implements RequestHandler { private final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JodaModule()); private final SqsProducerService sqsProducerService; + private final S3Service s3Service; + public App() { this.sqsProducerService = new SqsProducerService(); + this.s3Service = new S3Service(); } - public App(SqsProducerService sqsProducerService) { + public App(SqsProducerService sqsProducerService, S3Service s3Service) { this.sqsProducerService = sqsProducerService; + this.s3Service = s3Service; } public Void handleRequest(final S3EventNotification input, final Context context) { @@ -29,14 +37,26 @@ public Void handleRequest(final S3EventNotification input, final Context context throw new RuntimeException(e); } - input.getRecords().forEach(s3EventNotificationRecord -> - sqsProducerService.send(extractEmailId(s3EventNotificationRecord.getS3().getObject().getKey())) - ); + input.getRecords().forEach(s3EventNotificationRecord -> { + String key = s3EventNotificationRecord.getS3().getObject().getKey(); + String keyId = extractKeyIdFromKey(key); + String email = s3Service.getEmailMetadata("resume/attachment/" + keyId + ".mp3"); + Map messageAttributes = null; + if (email != null) { + messageAttributes = Map.of( + "Email", MessageAttributeValue.builder() + .dataType("String") + .stringValue(email) + .build() + ); + } + sqsProducerService.send(keyId, messageAttributes); + }); return null; } - private String extractEmailId(String key) { + private String extractKeyIdFromKey(String key) { int lastDotIndex = key.lastIndexOf('.'); int lastSlashIndex = key.lastIndexOf('/', lastDotIndex - 1); return key.substring(lastSlashIndex + 1, lastDotIndex); diff --git a/lambdas/ResumeProcessor/ResumeRequestPreProcessorFunction/src/main/java/com/levio/awsdemo/resumerequestpreprocessor/service/S3Service.java b/lambdas/ResumeProcessor/ResumeRequestPreProcessorFunction/src/main/java/com/levio/awsdemo/resumerequestpreprocessor/service/S3Service.java new file mode 100644 index 0000000..f32e36e --- /dev/null +++ b/lambdas/ResumeProcessor/ResumeRequestPreProcessorFunction/src/main/java/com/levio/awsdemo/resumerequestpreprocessor/service/S3Service.java @@ -0,0 +1,27 @@ +package com.levio.awsdemo.resumerequestpreprocessor.service; + +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.HeadObjectRequest; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; + +public class S3Service { + + private static final String BUCKET_NAME = System.getenv("BUCKET_NAME"); + + private final S3Client s3 = S3Client.builder() + .region(Region.US_EAST_1) + .build(); + + public String getEmailMetadata(String key) { + HeadObjectRequest objectRequest = HeadObjectRequest + .builder() + .key(key) + .bucket(BUCKET_NAME) + .build(); + HeadObjectResponse objectHead = s3.headObject(objectRequest); + + return objectHead.metadata().get("email"); + } + +} diff --git a/lambdas/ResumeProcessor/ResumeRequestPreProcessorFunction/src/main/java/com/levio/awsdemo/resumerequestpreprocessor/service/SqsProducerService.java b/lambdas/ResumeProcessor/ResumeRequestPreProcessorFunction/src/main/java/com/levio/awsdemo/resumerequestpreprocessor/service/SqsProducerService.java index 5540571..fd8de88 100644 --- a/lambdas/ResumeProcessor/ResumeRequestPreProcessorFunction/src/main/java/com/levio/awsdemo/resumerequestpreprocessor/service/SqsProducerService.java +++ b/lambdas/ResumeProcessor/ResumeRequestPreProcessorFunction/src/main/java/com/levio/awsdemo/resumerequestpreprocessor/service/SqsProducerService.java @@ -2,20 +2,24 @@ import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; import software.amazon.awssdk.services.sqs.model.SendMessageRequest; +import java.util.Map; + public class SqsProducerService { private static final String QUEUE_URL = System.getenv("QUEUE_URL"); private final SqsClient sqs = SqsClient.builder() .region(Region.US_EAST_1) .build(); - public void send(String emailId) { + public void send(String keyId, Map messageAttributes) { SendMessageRequest sendMessageRequest = SendMessageRequest.builder() .queueUrl(QUEUE_URL) - .messageBody(emailId) - .messageGroupId(emailId) - .messageDeduplicationId(emailId) + .messageBody(keyId) + .messageGroupId(keyId) + .messageDeduplicationId(keyId) + .messageAttributes(messageAttributes) .build(); sqs.sendMessage(sendMessageRequest);