Skip to content

Commit

Permalink
Feat: add certificate and key changes event (#590)
Browse files Browse the repository at this point in the history
feat: add event bridge handling and multi-thread S3 upload
  • Loading branch information
GiuMontesano authored Jan 10, 2025
1 parent 0005c63 commit 095615e
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 45 deletions.
3 changes: 3 additions & 0 deletions src/infra/modules/backend/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@

| Name | Type |
|------|------|
| [aws_cloudwatch_event_rule.cert_key_changes](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/cloudwatch_event_rule) | resource |
| [aws_cloudwatch_event_target.metadata_lambda](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/cloudwatch_event_target) | resource |
| [aws_cloudwatch_log_group.ecs_core](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/cloudwatch_log_group) | resource |
| [aws_cloudwatch_metric_alarm.dlq_assertions](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/cloudwatch_metric_alarm) | resource |
| [aws_cloudwatch_metric_alarm.dlq_sessions](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/cloudwatch_metric_alarm) | resource |
Expand All @@ -176,6 +178,7 @@
| [aws_iam_role_policy_attachment.deploy_lambda](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_role_policy_attachment) | resource |
| [aws_iam_role_policy_attachment.switch_region](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_role_policy_attachment) | resource |
| [aws_lambda_event_source_mapping.trigger](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lambda_event_source_mapping) | resource |
| [aws_lambda_permission.cert_key_changes](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lambda_permission) | resource |
| [aws_pipes_pipe.sessions](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/pipes_pipe) | resource |
| [aws_s3_bucket_notification.bucket_notification](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/s3_bucket_notification) | resource |
| [aws_security_group_rule.metadata_vpc_tls](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/security_group_rule) | resource |
Expand Down
39 changes: 39 additions & 0 deletions src/infra/modules/backend/lambda.tf
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,45 @@ resource "aws_lambda_event_source_mapping" "trigger" {
enabled = true
}

resource "aws_cloudwatch_event_rule" "cert_key_changes" {
name = "capture-cert-key-change"
description = "Capture each cert.pem and key.pem changes"

event_pattern = jsonencode(
{
"source" : [
"aws.ssm"
],
"detail-type" : [
"Parameter Store Change"
],
"detail" : {
"name" : [
"cert.pem",
"key.pem"
],
"operation" : [
"Create",
"Update",
"Delete",
"LabelParameterVersion"
]
}
})
}

resource "aws_cloudwatch_event_target" "metadata_lambda" {
rule = aws_cloudwatch_event_rule.cert_key_changes.name
arn = module.metadata_lambda.lambda_function_arn
}

resource "aws_lambda_permission" "cert_key_changes" {
action = "lambda:InvokeFunction"
function_name = module.metadata_lambda.lambda_function_name
principal = "events.amazonaws.com"
source_arn = aws_cloudwatch_event_rule.cert_key_changes.arn
}

## Lambda idp_metadata

data "aws_iam_policy_document" "idp_metadata_lambda" {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent.DynamodbStreamRecord;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.json.JsonMapper;
import io.quarkus.logging.Log;
import it.pagopa.oneid.common.model.Client;
import it.pagopa.oneid.common.model.exception.OneIdentityException;
Expand All @@ -13,8 +16,14 @@
import jakarta.inject.Inject;
import jakarta.ws.rs.core.MediaType;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.xml.transform.TransformerException;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
Expand All @@ -37,7 +46,7 @@
import software.amazon.awssdk.services.s3.model.S3Exception;

@CustomLogging
public class ServiceMetadata implements RequestHandler<DynamodbEvent, String> {
public class ServiceMetadata implements RequestHandler<Object, String> {

@Inject
Map<String, Client> clientsMap;
Expand Down Expand Up @@ -67,67 +76,96 @@ public static String getStringValue(Element element) throws SAMLUtilsException {
return result.getWriter().toString();
}

private boolean hasMetadataChanged(DynamodbStreamRecord record) {

String acsIndexOld = record.getDynamodb().getOldImage().get("acsIndex").getN();
String acsIndexNew = record.getDynamodb().getNewImage().get("acsIndex").getN();
@Override
public String handleRequest(Object event, Context context) {

try {
ObjectWriter ow = new ObjectMapper().writer().withDefaultPrettyPrinter();
String json = ow.writeValueAsString(event);
JsonMapper mapper = JsonMapper.builder().build();
JsonNode eventNode = mapper.readTree(json);

// DynamodbEvent
JsonNode records = eventNode.get("Records");
if (records != null) {
records.forEach(record -> {
if (record.get("eventName").asText().equals("MODIFY") && !hasMetadataChanged(record)) {
Log.info("SPID and CIE metadata didn't change");
} else {
processMetadataAndUpload();
Log.info("SPID and CIE metadata uploaded successfully");
}
});
return "SPID and CIE metadata uploaded successfully";
}

//ScheduledEvent
processMetadataAndUpload();
Log.info("SPID and CIE metadata uploaded successfully");


} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}

return "SPID and CIE metadata uploaded successfully";
}


private boolean hasMetadataChanged(JsonNode nodeRecord) {

String acsIndexOld = nodeRecord.get("dynamodb").get("OldImage").get("acsIndex").get("N")
.asText();
String acsIndexNew = nodeRecord.get("dynamodb").get("NewImage").get("acsIndex").get("N")
.asText();
if (!acsIndexNew.equals(acsIndexOld)) {
return true;
}
String friendlyNameOld = record.getDynamodb().getOldImage().get("friendlyName").getS();
String friendlyNameNew = record.getDynamodb().getNewImage().get("friendlyName").getS();
String friendlyNameOld = nodeRecord.get("dynamodb").get("OldImage").get("friendlyName").get("S")
.asText();
String friendlyNameNew = nodeRecord.get("dynamodb").get("NewImage").get("friendlyName").get("S")
.asText();
if (!friendlyNameNew.equals(friendlyNameOld)) {
return true;
}

List<String> requestedParametersOld = record.getDynamodb().getOldImage()
.get("requestedParameters").getSS();
List<String> requestedParametersNew = record.getDynamodb().getNewImage()
.get("requestedParameters").getSS();
List<String> requestedParametersOld = new ArrayList<>();
nodeRecord.get("dynamodb").get("OldImage")
.get("requestedParameters").get("SS").forEach(parameterOld -> {
requestedParametersOld.add(parameterOld.asText());
});
List<String> requestedParametersNew = new ArrayList<>();
nodeRecord.get("dynamodb").get("NewImage")
.get("requestedParameters").get("SS").forEach(parameterNew -> {
requestedParametersNew.add(parameterNew.asText());
});
if (!requestedParametersNew.equals(requestedParametersOld)) {
return true;
}
String authLevelOld = record.getDynamodb().getOldImage().get("authLevel").getS();
String authLevelNew = record.getDynamodb().getNewImage().get("authLevel").getS();
String authLevelOld = nodeRecord.get("dynamodb").get("OldImage").get("authLevel").get("S")
.asText();
String authLevelNew = nodeRecord.get("dynamodb").get("NewImage").get("authLevel").get("S")
.asText();
if (!authLevelNew.equals(authLevelOld)) {
return true;
}
String attributeIndexOld = record.getDynamodb().getOldImage().get("attributeIndex")
.getN();
String attributeIndexNew = record.getDynamodb().getNewImage().get("attributeIndex")
.getN();
String attributeIndexOld = nodeRecord.get("dynamodb").get("OldImage").get("attributeIndex")
.get("N")
.asText();
String attributeIndexNew = nodeRecord.get("dynamodb").get("NewImage").get("attributeIndex")
.get("N")
.asText();
if (!attributeIndexNew.equals(attributeIndexOld)) {
return true;
}
boolean isActiveOld = record.getDynamodb().getOldImage().get("active").getBOOL();
boolean isActiveNew = record.getDynamodb().getNewImage().get("active").getBOOL();
boolean isActiveOld = nodeRecord.get("dynamodb").get("OldImage").get("active").get("BOOL")
.asBoolean();
boolean isActiveNew = nodeRecord.get("dynamodb").get("NewImage").get("active").get("BOOL")
.asBoolean();
return isActiveNew != isActiveOld;
}

@Override
public String handleRequest(DynamodbEvent event, Context context) {
for (DynamodbStreamRecord record : event.getRecords()) {
try {

if (record.getEventName().equals("MODIFY") && !hasMetadataChanged(record)) {
return "SPID and CIE metadata didn't change";
}

//TODO: consider using a thread pool
String spidMetadata = generateMetadata(IdType.spid);
String cieMetadata = generateMetadata(IdType.cie);

uploadToS3("spid.xml", spidMetadata);
uploadToS3("cie.xml", cieMetadata);
} catch (Exception e) {
Log.error("Error processing DynamoDB Event: " + e.getMessage());
throw new RuntimeException(e);
}
}

return "SPID and CIE metadata uploaded successfully";
}

private void uploadToS3(String objectKey, String content) {
PutObjectRequest putObjectRequest = PutObjectRequest.builder()
.bucket(bucketName)
Expand All @@ -144,6 +182,45 @@ private void uploadToS3(String objectKey, String content) {
}
}

private void processMetadataAndUpload() {

try (ExecutorService executorService = Executors.newFixedThreadPool(2)) {

// Generate metadata for SPID and CIE in parallel
CompletableFuture<String> spidMetadataFuture = CompletableFuture.supplyAsync(() -> {
try {
return generateMetadata(IdType.spid);
} catch (OneIdentityException e) {
throw new CompletionException(e);
}
}, executorService);
CompletableFuture<String> cieMetadataFuture = CompletableFuture.supplyAsync(() -> {
try {
return generateMetadata(IdType.cie);
} catch (OneIdentityException e) {
throw new CompletionException(e);
}
}, executorService);

// Upload spidMetadata to S3 once generated
CompletableFuture<Void> spidUpload = spidMetadataFuture.thenAcceptAsync(spidMetadata -> {
uploadToS3("spid.xml", spidMetadata);
}, executorService);

// Upload cieMetadata to S3 once generated
CompletableFuture<Void> cieUpload = cieMetadataFuture.thenAcceptAsync(cieMetadata -> {
uploadToS3("cie.xml", cieMetadata);
}, executorService);

// Wait for both uploads to finish
CompletableFuture.allOf(spidUpload, cieUpload).join();

} catch (Exception e) {
Log.error("Error processing event: " + e.getMessage());
throw new RuntimeException(e);
}
}

private String generateMetadata(IdType idType) throws OneIdentityException {

EntityDescriptor entityDescriptor = samlUtils.buildSAMLObject(EntityDescriptor.class);
Expand Down Expand Up @@ -176,7 +253,7 @@ private String generateMetadata(IdType idType) throws OneIdentityException {

Element plaintextElement;
try {
plaintextElement = out.marshall(entityDescriptor);
plaintextElement = Objects.requireNonNull(out).marshall(entityDescriptor);
} catch (MarshallingException e) {
throw new OneIdentityException(e);
}
Expand Down

0 comments on commit 095615e

Please sign in to comment.