Skip to content

Commit

Permalink
Merge pull request conductor-oss#252 from junaidHussain-clari/S3Archive
Browse files Browse the repository at this point in the history
Adding an extension to archive workflows in S3 bucket
  • Loading branch information
v1r3n authored Oct 5, 2024
2 parents ba18cb6 + 084324c commit 3b6a9c0
Show file tree
Hide file tree
Showing 4 changed files with 240 additions and 0 deletions.
1 change: 1 addition & 0 deletions workflow-event-listener/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ dependencies {
implementation group: 'javax.inject', name: 'javax.inject', version: '1'
implementation "org.apache.commons:commons-lang3:"
implementation group: 'org.apache.httpcomponents', name: 'httpclient', version: '4.5.14'
implementation "com.amazonaws:aws-java-sdk-s3:${revAwsSdk}"

compileOnly 'org.springframework.boot:spring-boot-starter'
compileOnly 'org.springframework.boot:spring-boot-starter-web'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ public WorkflowStatusListener getWorkflowStatusListener(
ExecutionDAOFacade executionDAOFacade, ArchivingWorkflowListenerProperties properties) {
if (properties.getTtlDuration().getSeconds() > 0) {
return new ArchivingWithTTLWorkflowStatusListener(executionDAOFacade, properties);
} else if (properties.getWorkflowArchivalType()
== ArchivingWorkflowListenerProperties.ArchivalType.S3) {
return new ArchivingWorkflowToS3(executionDAOFacade, properties);
} else {
return new ArchivingWorkflowStatusListener(executionDAOFacade);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,29 @@ public ArchivingWorkflowListenerProperties(Environment environment) {
this.environment = environment;
}

/** Type of archival */
public enum ArchivalType {
DEFAULT,
S3
}

/** Archival type that we need in place. By Default the value is default */
private ArchivalType workflowArchivalType = ArchivalType.DEFAULT;

/** name of the S3 bucket where we want to archive the workflow */
private String workflowS3ArchivalDefaultBucketName = "";

/** region of the S3 bucket where we want to archive the workflow */
private String workflowS3ArchivalBucketRegion = "us-east-1";

/**
* Set this variable to true if you want to archive only the workflows that didn't succeed. When
* true, only unsuccessful workflows will be archived, while both successful and unsuccessful
* workflows will be deleted from the datastore. This helps manage storage costs on S3 and keeps
* only the failed workflows for debugging.
*/
private Boolean workflowArchiveUnsuccessfulOnly = false;

/**
* The time to live in seconds for workflow archiving module. Currently, only RedisExecutionDAO
* supports this
Expand All @@ -56,6 +79,38 @@ public void setDelayQueueWorkerThreadCount(int delayQueueWorkerThreadCount) {
this.delayQueueWorkerThreadCount = delayQueueWorkerThreadCount;
}

public ArchivalType getWorkflowArchivalType() {
return workflowArchivalType;
}

public void setWorkflowArchivalType(ArchivalType workflowArchivalType) {
this.workflowArchivalType = workflowArchivalType;
}

public String getWorkflowS3ArchivalDefaultBucketName() {
return workflowS3ArchivalDefaultBucketName;
}

public void setWorkflowS3ArchivalDefaultBucketName(String workflowS3ArchivalDefaultBucketName) {
this.workflowS3ArchivalDefaultBucketName = workflowS3ArchivalDefaultBucketName;
}

public String getWorkflowS3ArchivalBucketRegion() {
return workflowS3ArchivalBucketRegion;
}

public void setWorkflowS3ArchivalBucketRegion(String workflowS3ArchivalBucketRegion) {
this.workflowS3ArchivalBucketRegion = workflowS3ArchivalBucketRegion;
}

public Boolean getWorkflowArchiveUnsuccessfulOnly() {
return workflowArchiveUnsuccessfulOnly;
}

public void setWorkflowArchiveUnsuccessfulOnly(Boolean workflowArchiveUnsuccessfulOnly) {
this.workflowArchiveUnsuccessfulOnly = workflowArchiveUnsuccessfulOnly;
}

/** The time to delay the archival of workflow */
public int getWorkflowArchivalDelay() {
return environment.getProperty(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
* Copyright 2023 Conductor Authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.contribs.listener.archive;

import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.netflix.conductor.core.dal.ExecutionDAOFacade;
import com.netflix.conductor.core.listener.WorkflowStatusListener;
import com.netflix.conductor.metrics.Monitors;
import com.netflix.conductor.model.WorkflowModel;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.annotation.*;

public class ArchivingWorkflowToS3 implements WorkflowStatusListener {

private static final Logger LOGGER = LoggerFactory.getLogger(ArchivingWorkflowToS3.class);
private final ExecutionDAOFacade executionDAOFacade;

private final ArchivingWorkflowListenerProperties properties;
private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;

private final AmazonS3 s3Client;

private final String bucketName;
private final String bucketRegion;
private final ObjectMapper objectMapper;
private final int delayArchiveSeconds;

public ArchivingWorkflowToS3(
ExecutionDAOFacade executionDAOFacade, ArchivingWorkflowListenerProperties properties) {
this.executionDAOFacade = executionDAOFacade;
this.properties = properties;
bucketName = properties.getWorkflowS3ArchivalDefaultBucketName();
bucketRegion = properties.getWorkflowS3ArchivalBucketRegion();
s3Client = AmazonS3ClientBuilder.standard().withRegion(bucketRegion).build();
this.delayArchiveSeconds = properties.getWorkflowArchivalDelay();
objectMapper = new ObjectMapper();
this.scheduledThreadPoolExecutor =
new ScheduledThreadPoolExecutor(
properties.getDelayQueueWorkerThreadCount(),
(runnable, executor) -> {
LOGGER.warn(
"Request {} to delay S3 archiving index dropped in executor {}",
runnable,
executor);
Monitors.recordDiscardedArchivalCount();
});
this.scheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true);
LOGGER.warn(
"Workflow removal archiving in S3 with TTL is no longer supported, "
+ "when using this class, workflows will be removed immediately");
}

@PreDestroy
public void shutdownExecutorService() {
try {
LOGGER.info("Gracefully shutdown executor service in S3 Archival Listener");
scheduledThreadPoolExecutor.shutdown();
if (scheduledThreadPoolExecutor.awaitTermination(
delayArchiveSeconds, TimeUnit.SECONDS)) {
LOGGER.debug("tasks completed, shutting down");
} else {
LOGGER.warn("Forcing shutdown after waiting for {} seconds", delayArchiveSeconds);
scheduledThreadPoolExecutor.shutdownNow();
}
} catch (InterruptedException ie) {
LOGGER.warn(
"Shutdown interrupted, invoking shutdownNow on scheduledThreadPoolExecutor for delay queue S3 Archival Listener");
scheduledThreadPoolExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
}

@Override
public void onWorkflowCompleted(WorkflowModel workflow) {
archiveWorkflow(workflow);
}

@Override
public void onWorkflowTerminated(WorkflowModel workflow) {
archiveWorkflow(workflow);
}

private void archiveWorkflow(final WorkflowModel workflow) {
// Only archive unsuccessful workflows if enabled
if (!properties.getWorkflowArchiveUnsuccessfulOnly()
|| !workflow.getStatus().isSuccessful()) {
final String fileName = workflow.getWorkflowId() + ".json";
final String filePathPrefix = workflow.getWorkflowName();
final String fullFilePath = filePathPrefix + '/' + fileName;

try {
// Upload workflow as a json file to s3
s3Client.putObject(
bucketName, fullFilePath, objectMapper.writeValueAsString(workflow));
LOGGER.debug(
"Archived workflow. Workflow Name :{} Workflow Id :{} Workflow Status :{} to S3 bucket:{}",
workflow.getWorkflowName(),
workflow.getWorkflowId(),
workflow.getStatus(),
bucketName);
} catch (final Exception e) {
LOGGER.error(
"Exception occurred when archiving workflow to S3. Workflow Name : {} Workflow Id : {} Workflow Status : {} :",
workflow.getWorkflowName(),
workflow.getWorkflowId(),
workflow.getStatus(),
e);
throw new RuntimeException(e);
}
}
if (delayArchiveSeconds > 0) {
scheduledThreadPoolExecutor.schedule(
new DelayS3ArchiveWorkflow(workflow, executionDAOFacade),
delayArchiveSeconds,
TimeUnit.SECONDS);
} else {
LOGGER.info(
"Archived workflow. Workflow Name : {} Workflow Id : {} Workflow Status : {}",
workflow.getWorkflowName(),
workflow.getWorkflowId(),
workflow.getStatus());
this.executionDAOFacade.removeWorkflow(workflow.getWorkflowId(), true);
Monitors.recordWorkflowArchived(workflow.getWorkflowName(), workflow.getStatus());
}
}

private class DelayS3ArchiveWorkflow implements Runnable {

private final String workflowId;
private final String workflowName;
private final WorkflowModel.Status status;
private final ExecutionDAOFacade executionDAOFacade;

DelayS3ArchiveWorkflow(WorkflowModel workflow, ExecutionDAOFacade executionDAOFacade) {
this.workflowId = workflow.getWorkflowId();
this.workflowName = workflow.getWorkflowName();
this.status = workflow.getStatus();
this.executionDAOFacade = executionDAOFacade;
}

@Override
public void run() {
try {
this.executionDAOFacade.removeWorkflow(workflowId, true);
LOGGER.info(
"Archived workflow. Workflow Name : {} Workflow Id : {} Workflow Status : {}",
workflowName,
workflowId,
status);
Monitors.recordWorkflowArchived(workflowName, status);
Monitors.recordArchivalDelayQueueSize(
scheduledThreadPoolExecutor.getQueue().size());
} catch (Exception e) {
LOGGER.error(
"Unable to archive workflow. Workflow Name : {} Workflow Id : {} Workflow Status : {}",
workflowName,
workflowId,
status,
e);
}
}
}
}

0 comments on commit 3b6a9c0

Please sign in to comment.