From 084324cec29d969d23284fa7c440db15848e103d Mon Sep 17 00:00:00 2001 From: junaidHussain-clari <91540528+junaidHussain-clari@users.noreply.github.com> Date: Tue, 3 Sep 2024 12:02:17 +0530 Subject: [PATCH] adding an extension to archive workflows in S3 bucket --- workflow-event-listener/build.gradle | 1 + ...rchivingWorkflowListenerConfiguration.java | 3 + .../ArchivingWorkflowListenerProperties.java | 55 ++++++ .../archive/ArchivingWorkflowToS3.java | 181 ++++++++++++++++++ 4 files changed, 240 insertions(+) create mode 100644 workflow-event-listener/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWorkflowToS3.java diff --git a/workflow-event-listener/build.gradle b/workflow-event-listener/build.gradle index 3445f4795..75bf9541e 100644 --- a/workflow-event-listener/build.gradle +++ b/workflow-event-listener/build.gradle @@ -12,6 +12,7 @@ dependencies { implementation group: 'javax.inject', name: 'javax.inject', version: '1' implementation "org.apache.commons:commons-lang3:" implementation group: 'org.apache.httpcomponents', name: 'httpclient', version: '4.5.14' + implementation "com.amazonaws:aws-java-sdk-s3:${revAwsSdk}" compileOnly 'org.springframework.boot:spring-boot-starter' compileOnly 'org.springframework.boot:spring-boot-starter-web' diff --git a/workflow-event-listener/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWorkflowListenerConfiguration.java b/workflow-event-listener/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWorkflowListenerConfiguration.java index d468ba960..60979d6c8 100644 --- a/workflow-event-listener/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWorkflowListenerConfiguration.java +++ b/workflow-event-listener/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWorkflowListenerConfiguration.java @@ -30,6 +30,9 @@ public WorkflowStatusListener getWorkflowStatusListener( ExecutionDAOFacade executionDAOFacade, ArchivingWorkflowListenerProperties properties) { if (properties.getTtlDuration().getSeconds() > 0) { return new ArchivingWithTTLWorkflowStatusListener(executionDAOFacade, properties); + } else if (properties.getWorkflowArchivalType() + == ArchivingWorkflowListenerProperties.ArchivalType.S3) { + return new ArchivingWorkflowToS3(executionDAOFacade, properties); } else { return new ArchivingWorkflowStatusListener(executionDAOFacade); } diff --git a/workflow-event-listener/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWorkflowListenerProperties.java b/workflow-event-listener/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWorkflowListenerProperties.java index 7e2fad5cb..04815e017 100644 --- a/workflow-event-listener/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWorkflowListenerProperties.java +++ b/workflow-event-listener/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWorkflowListenerProperties.java @@ -30,6 +30,29 @@ public ArchivingWorkflowListenerProperties(Environment environment) { this.environment = environment; } + /** Type of archival */ + public enum ArchivalType { + DEFAULT, + S3 + } + + /** Archival type that we need in place. By Default the value is default */ + private ArchivalType workflowArchivalType = ArchivalType.DEFAULT; + + /** name of the S3 bucket where we want to archive the workflow */ + private String workflowS3ArchivalDefaultBucketName = ""; + + /** region of the S3 bucket where we want to archive the workflow */ + private String workflowS3ArchivalBucketRegion = "us-east-1"; + + /** + * Set this variable to true if you want to archive only the workflows that didn't succeed. When + * true, only unsuccessful workflows will be archived, while both successful and unsuccessful + * workflows will be deleted from the datastore. This helps manage storage costs on S3 and keeps + * only the failed workflows for debugging. + */ + private Boolean workflowArchiveUnsuccessfulOnly = false; + /** * The time to live in seconds for workflow archiving module. Currently, only RedisExecutionDAO * supports this @@ -56,6 +79,38 @@ public void setDelayQueueWorkerThreadCount(int delayQueueWorkerThreadCount) { this.delayQueueWorkerThreadCount = delayQueueWorkerThreadCount; } + public ArchivalType getWorkflowArchivalType() { + return workflowArchivalType; + } + + public void setWorkflowArchivalType(ArchivalType workflowArchivalType) { + this.workflowArchivalType = workflowArchivalType; + } + + public String getWorkflowS3ArchivalDefaultBucketName() { + return workflowS3ArchivalDefaultBucketName; + } + + public void setWorkflowS3ArchivalDefaultBucketName(String workflowS3ArchivalDefaultBucketName) { + this.workflowS3ArchivalDefaultBucketName = workflowS3ArchivalDefaultBucketName; + } + + public String getWorkflowS3ArchivalBucketRegion() { + return workflowS3ArchivalBucketRegion; + } + + public void setWorkflowS3ArchivalBucketRegion(String workflowS3ArchivalBucketRegion) { + this.workflowS3ArchivalBucketRegion = workflowS3ArchivalBucketRegion; + } + + public Boolean getWorkflowArchiveUnsuccessfulOnly() { + return workflowArchiveUnsuccessfulOnly; + } + + public void setWorkflowArchiveUnsuccessfulOnly(Boolean workflowArchiveUnsuccessfulOnly) { + this.workflowArchiveUnsuccessfulOnly = workflowArchiveUnsuccessfulOnly; + } + /** The time to delay the archival of workflow */ public int getWorkflowArchivalDelay() { return environment.getProperty( diff --git a/workflow-event-listener/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWorkflowToS3.java b/workflow-event-listener/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWorkflowToS3.java new file mode 100644 index 000000000..bccd1688f --- /dev/null +++ b/workflow-event-listener/src/main/java/com/netflix/conductor/contribs/listener/archive/ArchivingWorkflowToS3.java @@ -0,0 +1,181 @@ +/* + * Copyright 2023 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.contribs.listener.archive; + +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.netflix.conductor.core.dal.ExecutionDAOFacade; +import com.netflix.conductor.core.listener.WorkflowStatusListener; +import com.netflix.conductor.metrics.Monitors; +import com.netflix.conductor.model.WorkflowModel; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.fasterxml.jackson.databind.ObjectMapper; +import jakarta.annotation.*; + +public class ArchivingWorkflowToS3 implements WorkflowStatusListener { + + private static final Logger LOGGER = LoggerFactory.getLogger(ArchivingWorkflowToS3.class); + private final ExecutionDAOFacade executionDAOFacade; + + private final ArchivingWorkflowListenerProperties properties; + private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor; + + private final AmazonS3 s3Client; + + private final String bucketName; + private final String bucketRegion; + private final ObjectMapper objectMapper; + private final int delayArchiveSeconds; + + public ArchivingWorkflowToS3( + ExecutionDAOFacade executionDAOFacade, ArchivingWorkflowListenerProperties properties) { + this.executionDAOFacade = executionDAOFacade; + this.properties = properties; + bucketName = properties.getWorkflowS3ArchivalDefaultBucketName(); + bucketRegion = properties.getWorkflowS3ArchivalBucketRegion(); + s3Client = AmazonS3ClientBuilder.standard().withRegion(bucketRegion).build(); + this.delayArchiveSeconds = properties.getWorkflowArchivalDelay(); + objectMapper = new ObjectMapper(); + this.scheduledThreadPoolExecutor = + new ScheduledThreadPoolExecutor( + properties.getDelayQueueWorkerThreadCount(), + (runnable, executor) -> { + LOGGER.warn( + "Request {} to delay S3 archiving index dropped in executor {}", + runnable, + executor); + Monitors.recordDiscardedArchivalCount(); + }); + this.scheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true); + LOGGER.warn( + "Workflow removal archiving in S3 with TTL is no longer supported, " + + "when using this class, workflows will be removed immediately"); + } + + @PreDestroy + public void shutdownExecutorService() { + try { + LOGGER.info("Gracefully shutdown executor service in S3 Archival Listener"); + scheduledThreadPoolExecutor.shutdown(); + if (scheduledThreadPoolExecutor.awaitTermination( + delayArchiveSeconds, TimeUnit.SECONDS)) { + LOGGER.debug("tasks completed, shutting down"); + } else { + LOGGER.warn("Forcing shutdown after waiting for {} seconds", delayArchiveSeconds); + scheduledThreadPoolExecutor.shutdownNow(); + } + } catch (InterruptedException ie) { + LOGGER.warn( + "Shutdown interrupted, invoking shutdownNow on scheduledThreadPoolExecutor for delay queue S3 Archival Listener"); + scheduledThreadPoolExecutor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + + @Override + public void onWorkflowCompleted(WorkflowModel workflow) { + archiveWorkflow(workflow); + } + + @Override + public void onWorkflowTerminated(WorkflowModel workflow) { + archiveWorkflow(workflow); + } + + private void archiveWorkflow(final WorkflowModel workflow) { + // Only archive unsuccessful workflows if enabled + if (!properties.getWorkflowArchiveUnsuccessfulOnly() + || !workflow.getStatus().isSuccessful()) { + final String fileName = workflow.getWorkflowId() + ".json"; + final String filePathPrefix = workflow.getWorkflowName(); + final String fullFilePath = filePathPrefix + '/' + fileName; + + try { + // Upload workflow as a json file to s3 + s3Client.putObject( + bucketName, fullFilePath, objectMapper.writeValueAsString(workflow)); + LOGGER.debug( + "Archived workflow. Workflow Name :{} Workflow Id :{} Workflow Status :{} to S3 bucket:{}", + workflow.getWorkflowName(), + workflow.getWorkflowId(), + workflow.getStatus(), + bucketName); + } catch (final Exception e) { + LOGGER.error( + "Exception occurred when archiving workflow to S3. Workflow Name : {} Workflow Id : {} Workflow Status : {} :", + workflow.getWorkflowName(), + workflow.getWorkflowId(), + workflow.getStatus(), + e); + throw new RuntimeException(e); + } + } + if (delayArchiveSeconds > 0) { + scheduledThreadPoolExecutor.schedule( + new DelayS3ArchiveWorkflow(workflow, executionDAOFacade), + delayArchiveSeconds, + TimeUnit.SECONDS); + } else { + LOGGER.info( + "Archived workflow. Workflow Name : {} Workflow Id : {} Workflow Status : {}", + workflow.getWorkflowName(), + workflow.getWorkflowId(), + workflow.getStatus()); + this.executionDAOFacade.removeWorkflow(workflow.getWorkflowId(), true); + Monitors.recordWorkflowArchived(workflow.getWorkflowName(), workflow.getStatus()); + } + } + + private class DelayS3ArchiveWorkflow implements Runnable { + + private final String workflowId; + private final String workflowName; + private final WorkflowModel.Status status; + private final ExecutionDAOFacade executionDAOFacade; + + DelayS3ArchiveWorkflow(WorkflowModel workflow, ExecutionDAOFacade executionDAOFacade) { + this.workflowId = workflow.getWorkflowId(); + this.workflowName = workflow.getWorkflowName(); + this.status = workflow.getStatus(); + this.executionDAOFacade = executionDAOFacade; + } + + @Override + public void run() { + try { + this.executionDAOFacade.removeWorkflow(workflowId, true); + LOGGER.info( + "Archived workflow. Workflow Name : {} Workflow Id : {} Workflow Status : {}", + workflowName, + workflowId, + status); + Monitors.recordWorkflowArchived(workflowName, status); + Monitors.recordArchivalDelayQueueSize( + scheduledThreadPoolExecutor.getQueue().size()); + } catch (Exception e) { + LOGGER.error( + "Unable to archive workflow. Workflow Name : {} Workflow Id : {} Workflow Status : {}", + workflowName, + workflowId, + status, + e); + } + } + } +}