From e99bebccfce907d305c958e9c5f429f4da191398 Mon Sep 17 00:00:00 2001 From: Ben Pirt Date: Wed, 21 Feb 2024 11:34:30 +0000 Subject: [PATCH] Add option to only index workflows on Postgres when their status changes --- .../postgres/config/PostgresProperties.java | 10 + .../postgres/dao/PostgresIndexDAO.java | 59 +++--- .../PostgresIndexDAOStatusChangeOnlyTest.java | 181 ++++++++++++++++++ 3 files changed, 228 insertions(+), 22 deletions(-) create mode 100644 postgres-persistence/src/test/java/com/netflix/conductor/postgres/dao/PostgresIndexDAOStatusChangeOnlyTest.java diff --git a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/config/PostgresProperties.java b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/config/PostgresProperties.java index 3b42d46c9..0ddf80098 100644 --- a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/config/PostgresProperties.java +++ b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/config/PostgresProperties.java @@ -37,6 +37,8 @@ public class PostgresProperties { private Integer experimentalQueueNotifyStalePeriod = 5000; + private boolean onlyIndexOnStatusChange = false; + public String schema = "public"; public boolean allowFullTextQueries = true; @@ -73,6 +75,14 @@ public void setTaskDefCacheRefreshInterval(Duration taskDefCacheRefreshInterval) this.taskDefCacheRefreshInterval = taskDefCacheRefreshInterval; } + public boolean getOnlyIndexOnStatusChange() { + return onlyIndexOnStatusChange; + } + + public void setOnlyIndexOnStatusChange(boolean onlyIndexOnStatusChange) { + this.onlyIndexOnStatusChange = onlyIndexOnStatusChange; + } + public Integer getDeadlockRetryMax() { return deadlockRetryMax; } diff --git a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresIndexDAO.java b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresIndexDAO.java index 5af102522..6d80818d5 100644 --- a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresIndexDAO.java +++ b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresIndexDAO.java @@ -48,6 +48,8 @@ public class PostgresIndexDAO extends PostgresBaseDAO implements IndexDAO { private static final int CORE_POOL_SIZE = 6; private static final long KEEP_ALIVE_TIME = 1L; + private boolean onlyIndexOnStatusChange; + public PostgresIndexDAO( RetryTemplate retryTemplate, ObjectMapper objectMapper, @@ -55,6 +57,7 @@ public PostgresIndexDAO( PostgresProperties properties) { super(retryTemplate, objectMapper, dataSource); this.properties = properties; + this.onlyIndexOnStatusChange = properties.getOnlyIndexOnStatusChange(); int maximumPoolSize = properties.getAsyncMaxPoolSize(); int workerQueueSize = properties.getAsyncWorkerQueueSize(); @@ -84,19 +87,25 @@ public void indexWorkflow(WorkflowSummary workflow) { + "DO UPDATE SET correlation_id = EXCLUDED.correlation_id, workflow_type = EXCLUDED.workflow_type, " + "start_time = EXCLUDED.start_time, status = EXCLUDED.status, json_data = EXCLUDED.json_data"; + if (onlyIndexOnStatusChange) { + INSERT_WORKFLOW_INDEX_SQL += " WHERE workflow_index.status != EXCLUDED.status"; + } + TemporalAccessor ta = DateTimeFormatter.ISO_INSTANT.parse(workflow.getStartTime()); Timestamp startTime = Timestamp.from(Instant.from(ta)); - queryWithTransaction( - INSERT_WORKFLOW_INDEX_SQL, - q -> - q.addParameter(workflow.getWorkflowId()) - .addParameter(workflow.getCorrelationId()) - .addParameter(workflow.getWorkflowType()) - .addParameter(startTime) - .addParameter(workflow.getStatus().toString()) - .addJsonParameter(workflow) - .executeUpdate()); + int rowsUpdated = + queryWithTransaction( + INSERT_WORKFLOW_INDEX_SQL, + q -> + q.addParameter(workflow.getWorkflowId()) + .addParameter(workflow.getCorrelationId()) + .addParameter(workflow.getWorkflowType()) + .addParameter(startTime) + .addParameter(workflow.getStatus().toString()) + .addJsonParameter(workflow) + .executeUpdate()); + logger.debug("Postgres index workflow rows updated: {}", rowsUpdated); } @Override @@ -128,24 +137,30 @@ public void indexTask(TaskSummary task) { + "DO UPDATE SET task_type = EXCLUDED.task_type, task_def_name = EXCLUDED.task_def_name, " + "status = EXCLUDED.status, update_time = EXCLUDED.update_time, json_data = EXCLUDED.json_data"; + if (onlyIndexOnStatusChange) { + INSERT_TASK_INDEX_SQL += " WHERE task_index.status != EXCLUDED.status"; + } + TemporalAccessor updateTa = DateTimeFormatter.ISO_INSTANT.parse(task.getUpdateTime()); Timestamp updateTime = Timestamp.from(Instant.from(updateTa)); TemporalAccessor startTa = DateTimeFormatter.ISO_INSTANT.parse(task.getStartTime()); Timestamp startTime = Timestamp.from(Instant.from(startTa)); - queryWithTransaction( - INSERT_TASK_INDEX_SQL, - q -> - q.addParameter(task.getTaskId()) - .addParameter(task.getTaskType()) - .addParameter(task.getTaskDefName()) - .addParameter(task.getStatus().toString()) - .addParameter(startTime) - .addParameter(updateTime) - .addParameter(task.getWorkflowType()) - .addJsonParameter(task) - .executeUpdate()); + int rowsUpdated = + queryWithTransaction( + INSERT_TASK_INDEX_SQL, + q -> + q.addParameter(task.getTaskId()) + .addParameter(task.getTaskType()) + .addParameter(task.getTaskDefName()) + .addParameter(task.getStatus().toString()) + .addParameter(startTime) + .addParameter(updateTime) + .addParameter(task.getWorkflowType()) + .addJsonParameter(task) + .executeUpdate()); + logger.debug("Postgres index task rows updated: {}", rowsUpdated); } @Override diff --git a/postgres-persistence/src/test/java/com/netflix/conductor/postgres/dao/PostgresIndexDAOStatusChangeOnlyTest.java b/postgres-persistence/src/test/java/com/netflix/conductor/postgres/dao/PostgresIndexDAOStatusChangeOnlyTest.java new file mode 100644 index 000000000..fbbbad4dc --- /dev/null +++ b/postgres-persistence/src/test/java/com/netflix/conductor/postgres/dao/PostgresIndexDAOStatusChangeOnlyTest.java @@ -0,0 +1,181 @@ +/* + * Copyright 2023 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.postgres.dao; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.*; + +import javax.sql.DataSource; + +import org.flywaydb.core.Flyway; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.flyway.FlywayAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit4.SpringRunner; + +import com.netflix.conductor.common.config.TestObjectMapperConfiguration; +import com.netflix.conductor.common.metadata.tasks.Task; +import com.netflix.conductor.common.run.TaskSummary; +import com.netflix.conductor.common.run.Workflow; +import com.netflix.conductor.common.run.WorkflowSummary; +import com.netflix.conductor.postgres.config.PostgresConfiguration; +import com.netflix.conductor.postgres.util.Query; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import static org.junit.Assert.assertEquals; + +@ContextConfiguration( + classes = { + TestObjectMapperConfiguration.class, + PostgresConfiguration.class, + FlywayAutoConfiguration.class + }) +@RunWith(SpringRunner.class) +@TestPropertySource( + properties = { + "conductor.app.asyncIndexingEnabled=false", + "conductor.elasticsearch.version=0", + "conductor.indexing.type=postgres", + "conductor.postgres.onlyIndexOnStatusChange=true", + "spring.flyway.clean-disabled=false" + }) +@SpringBootTest +public class PostgresIndexDAOStatusChangeOnlyTest { + + @Autowired private PostgresIndexDAO indexDAO; + + @Autowired private ObjectMapper objectMapper; + + @Qualifier("dataSource") + @Autowired + private DataSource dataSource; + + @Autowired Flyway flyway; + + // clean the database between tests. + @Before + public void before() { + flyway.migrate(); + } + + private WorkflowSummary getMockWorkflowSummary(String id) { + WorkflowSummary wfs = new WorkflowSummary(); + wfs.setWorkflowId(id); + wfs.setCorrelationId("correlation-id"); + wfs.setWorkflowType("workflow-type"); + wfs.setStartTime("2023-02-07T08:42:45Z"); + wfs.setStatus(Workflow.WorkflowStatus.RUNNING); + return wfs; + } + + private TaskSummary getMockTaskSummary(String taskId) { + TaskSummary ts = new TaskSummary(); + ts.setTaskId(taskId); + ts.setTaskType("task-type"); + ts.setTaskDefName("task-def-name"); + ts.setStatus(Task.Status.SCHEDULED); + ts.setStartTime("2023-02-07T09:41:45Z"); + ts.setUpdateTime("2023-02-07T09:42:45Z"); + ts.setWorkflowType("workflow-type"); + return ts; + } + + private List> queryDb(String query) throws SQLException { + try (Connection c = dataSource.getConnection()) { + try (Query q = new Query(objectMapper, c, query)) { + return q.executeAndFetchMap(); + } + } + } + + public void checkWorkflow(String workflowId, String status, String correlationId) + throws SQLException { + List> result = + queryDb( + String.format( + "SELECT * FROM workflow_index WHERE workflow_id = '%s'", + workflowId)); + assertEquals("Wrong number of rows returned", 1, result.size()); + assertEquals("Wrong status returned", status, result.get(0).get("status")); + assertEquals( + "Correlation id does not match", + correlationId, + result.get(0).get("correlation_id")); + } + + public void checkTask(String taskId, String status, String updateTime) throws SQLException { + List> result = + queryDb(String.format("SELECT * FROM task_index WHERE task_id = '%s'", taskId)); + assertEquals("Wrong number of rows returned", 1, result.size()); + assertEquals("Wrong status returned", status, result.get(0).get("status")); + assertEquals( + "Update time does not match", + updateTime, + result.get(0).get("update_time").toString()); + } + + @Test + public void testIndexWorkflowOnlyStatusChange() throws SQLException { + WorkflowSummary wfs = getMockWorkflowSummary("workflow-id"); + indexDAO.indexWorkflow(wfs); + + // retrieve the record, make sure it exists + checkWorkflow("workflow-id", "RUNNING", "correlation-id"); + + // Change the record, but not the status, and re-index + wfs.setCorrelationId("new-correlation-id"); + indexDAO.indexWorkflow(wfs); + + // retrieve the record, make sure it hasn't changed + checkWorkflow("workflow-id", "RUNNING", "correlation-id"); + + // Change the status and re-index + wfs.setStatus(Workflow.WorkflowStatus.FAILED); + indexDAO.indexWorkflow(wfs); + + // retrieve the record, make sure it has changed + checkWorkflow("workflow-id", "FAILED", "new-correlation-id"); + } + + @Test + public void testIndexTaskOnlyStatusChange() throws SQLException { + TaskSummary ts = getMockTaskSummary("task-id"); + + indexDAO.indexTask(ts); + + // retrieve the record, make sure it exists + checkTask("task-id", "SCHEDULED", "2023-02-07 09:42:45.0"); + + // Change the record, but not the status + ts.setUpdateTime("2023-02-07T10:42:45Z"); + indexDAO.indexTask(ts); + + // retrieve the record, make sure it hasn't changed + checkTask("task-id", "SCHEDULED", "2023-02-07 09:42:45.0"); + + // Change the status and re-index + ts.setStatus(Task.Status.FAILED); + indexDAO.indexTask(ts); + + // retrieve the record, make sure it has changed + checkTask("task-id", "FAILED", "2023-02-07 10:42:45.0"); + } +}