Skip to content

Commit

Permalink
Implement removeWorkflow for postgres index
Browse files Browse the repository at this point in the history
  • Loading branch information
bjpirt authored and matiasbur committed May 9, 2024
1 parent 4936cf9 commit 62fab6e
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ public class PostgresProperties {

public boolean allowJsonQueries = true;

/** The maximum number of threads allowed in the async pool */
private int asyncMaxPoolSize = 12;

/** The size of the queue used for holding async indexing tasks */
private int asyncWorkerQueueSize = 100;

public Duration getTaskDefCacheRefreshInterval() {
return taskDefCacheRefreshInterval;
}
Expand Down Expand Up @@ -72,4 +78,20 @@ public boolean getAllowJsonQueries() {
public void setAllowJsonQueries(boolean allowJsonQueries) {
this.allowJsonQueries = allowJsonQueries;
}

public int getAsyncWorkerQueueSize() {
return asyncWorkerQueueSize;
}

public void setAsyncWorkerQueueSize(int asyncWorkerQueueSize) {
this.asyncWorkerQueueSize = asyncWorkerQueueSize;
}

public int getAsyncMaxPoolSize() {
return asyncMaxPoolSize;
}

public void setAsyncMaxPoolSize(int asyncMaxPoolSize) {
this.asyncMaxPoolSize = asyncMaxPoolSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
import java.time.temporal.TemporalAccessor;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import javax.sql.DataSource;

Expand All @@ -30,6 +34,7 @@
import com.netflix.conductor.common.run.WorkflowSummary;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.dao.IndexDAO;
import com.netflix.conductor.metrics.Monitors;
import com.netflix.conductor.postgres.config.PostgresProperties;
import com.netflix.conductor.postgres.util.PostgresIndexQueryBuilder;

Expand All @@ -38,6 +43,10 @@
public class PostgresIndexDAO extends PostgresBaseDAO implements IndexDAO {

private final PostgresProperties properties;
private final ExecutorService executorService;

private static final int CORE_POOL_SIZE = 6;
private static final long KEEP_ALIVE_TIME = 1L;

public PostgresIndexDAO(
RetryTemplate retryTemplate,
Expand All @@ -46,6 +55,25 @@ public PostgresIndexDAO(
PostgresProperties properties) {
super(retryTemplate, objectMapper, dataSource);
this.properties = properties;

int maximumPoolSize = properties.getAsyncMaxPoolSize();
int workerQueueSize = properties.getAsyncWorkerQueueSize();

// Set up a workerpool for performing async operations.
this.executorService =
new ThreadPoolExecutor(
CORE_POOL_SIZE,
maximumPoolSize,
KEEP_ALIVE_TIME,
TimeUnit.MINUTES,
new LinkedBlockingQueue<>(workerQueueSize),
(runnable, executor) -> {
logger.warn(
"Request {} to async dao discarded in executor {}",
runnable,
executor);
Monitors.recordDiscardedIndexingCount("indexQueue");
});
}

@Override
Expand Down Expand Up @@ -208,13 +236,14 @@ public SearchResult<String> searchTasks(

@Override
public void removeWorkflow(String workflowId) {
logger.info("removeWorkflow is not supported for postgres indexing");
String REMOVE_WORKFLOW_SQL = "DELETE FROM workflow_index WHERE workflow_id = ?";

queryWithTransaction(REMOVE_WORKFLOW_SQL, q -> q.addParameter(workflowId).executeUpdate());
}

@Override
public CompletableFuture<Void> asyncRemoveWorkflow(String workflowId) {
logger.info("asyncRemoveWorkflow is not supported for postgres indexing");
return CompletableFuture.completedFuture(null);
return CompletableFuture.runAsync(() -> removeWorkflow(workflowId), executorService);
}

@Override
Expand All @@ -231,13 +260,17 @@ public CompletableFuture<Void> asyncUpdateWorkflow(

@Override
public void removeTask(String workflowId, String taskId) {
logger.info("removeTask is not supported for postgres indexing");
String REMOVE_TASK_SQL =
"WITH task_delete AS (DELETE FROM task_index WHERE task_id = ?)"
+ "DELETE FROM task_execution_logs WHERE task_id =?";

queryWithTransaction(
REMOVE_TASK_SQL, q -> q.addParameter(taskId).addParameter(taskId).executeUpdate());
}

@Override
public CompletableFuture<Void> asyncRemoveTask(String workflowId, String taskId) {
logger.info("asyncRemoveTask is not supported for postgres indexing");
return CompletableFuture.completedFuture(null);
return CompletableFuture.runAsync(() -> removeTask(workflowId, taskId), executorService);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,4 +403,51 @@ public void testGetTaskExecutionLogs() throws SQLException {
assertEquals(logs.get(1).getLog(), records.get(1).getLog());
assertEquals(logs.get(1).getCreatedTime(), 1675845987000L);
}

@Test
public void testRemoveWorkflow() throws SQLException {
String workflowId = UUID.randomUUID().toString();
WorkflowSummary wfs = getMockWorkflowSummary(workflowId);
indexDAO.indexWorkflow(wfs);

List<Map<String, Object>> workflow_records =
queryDb("SELECT * FROM workflow_index WHERE workflow_id = '" + workflowId + "'");
assertEquals("Workflow index record was not created", 1, workflow_records.size());

indexDAO.removeWorkflow(workflowId);

workflow_records =
queryDb("SELECT * FROM workflow_index WHERE workflow_id = '" + workflowId + "'");
assertEquals("Workflow index record was not deleted", 0, workflow_records.size());
}

@Test
public void testRemoveTask() throws SQLException {
String workflowId = UUID.randomUUID().toString();

String taskId = UUID.randomUUID().toString();
TaskSummary ts = getMockTaskSummary(taskId);
indexDAO.indexTask(ts);

List<TaskExecLog> logs = new ArrayList<>();
logs.add(getMockTaskExecutionLog(taskId, new Date(1675845986000L).getTime(), "Log 1"));
logs.add(getMockTaskExecutionLog(taskId, new Date(1675845987000L).getTime(), "Log 2"));
indexDAO.addTaskExecutionLogs(logs);

List<Map<String, Object>> task_records =
queryDb("SELECT * FROM task_index WHERE task_id = '" + taskId + "'");
assertEquals("Task index record was not created", 1, task_records.size());

List<Map<String, Object>> log_records =
queryDb("SELECT * FROM task_execution_logs WHERE task_id = '" + taskId + "'");
assertEquals("Task execution logs were not created", 2, log_records.size());

indexDAO.removeTask(workflowId, taskId);

task_records = queryDb("SELECT * FROM task_index WHERE task_id = '" + taskId + "'");
assertEquals("Task index record was not deleted", 0, task_records.size());

log_records = queryDb("SELECT * FROM task_execution_logs WHERE task_id = '" + taskId + "'");
assertEquals("Task execution logs were not deleted", 0, log_records.size());
}
}

0 comments on commit 62fab6e

Please sign in to comment.