Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to only index workflows on Postgres when their status changes #83

Merged
merged 1 commit into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class PostgresProperties {

private Integer experimentalQueueNotifyStalePeriod = 5000;

private boolean onlyIndexOnStatusChange = false;

public String schema = "public";

public boolean allowFullTextQueries = true;
Expand Down Expand Up @@ -73,6 +75,14 @@ public void setTaskDefCacheRefreshInterval(Duration taskDefCacheRefreshInterval)
this.taskDefCacheRefreshInterval = taskDefCacheRefreshInterval;
}

public boolean getOnlyIndexOnStatusChange() {
return onlyIndexOnStatusChange;
}

public void setOnlyIndexOnStatusChange(boolean onlyIndexOnStatusChange) {
this.onlyIndexOnStatusChange = onlyIndexOnStatusChange;
}

public Integer getDeadlockRetryMax() {
return deadlockRetryMax;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,16 @@ public class PostgresIndexDAO extends PostgresBaseDAO implements IndexDAO {
private static final int CORE_POOL_SIZE = 6;
private static final long KEEP_ALIVE_TIME = 1L;

private boolean onlyIndexOnStatusChange;

public PostgresIndexDAO(
RetryTemplate retryTemplate,
ObjectMapper objectMapper,
DataSource dataSource,
PostgresProperties properties) {
super(retryTemplate, objectMapper, dataSource);
this.properties = properties;
this.onlyIndexOnStatusChange = properties.getOnlyIndexOnStatusChange();

int maximumPoolSize = properties.getAsyncMaxPoolSize();
int workerQueueSize = properties.getAsyncWorkerQueueSize();
Expand Down Expand Up @@ -84,19 +87,25 @@ public void indexWorkflow(WorkflowSummary workflow) {
+ "DO UPDATE SET correlation_id = EXCLUDED.correlation_id, workflow_type = EXCLUDED.workflow_type, "
+ "start_time = EXCLUDED.start_time, status = EXCLUDED.status, json_data = EXCLUDED.json_data";

if (onlyIndexOnStatusChange) {
INSERT_WORKFLOW_INDEX_SQL += " WHERE workflow_index.status != EXCLUDED.status";
}

TemporalAccessor ta = DateTimeFormatter.ISO_INSTANT.parse(workflow.getStartTime());
Timestamp startTime = Timestamp.from(Instant.from(ta));

queryWithTransaction(
INSERT_WORKFLOW_INDEX_SQL,
q ->
q.addParameter(workflow.getWorkflowId())
.addParameter(workflow.getCorrelationId())
.addParameter(workflow.getWorkflowType())
.addParameter(startTime)
.addParameter(workflow.getStatus().toString())
.addJsonParameter(workflow)
.executeUpdate());
int rowsUpdated =
queryWithTransaction(
INSERT_WORKFLOW_INDEX_SQL,
q ->
q.addParameter(workflow.getWorkflowId())
.addParameter(workflow.getCorrelationId())
.addParameter(workflow.getWorkflowType())
.addParameter(startTime)
.addParameter(workflow.getStatus().toString())
.addJsonParameter(workflow)
.executeUpdate());
logger.debug("Postgres index workflow rows updated: {}", rowsUpdated);
}

@Override
Expand Down Expand Up @@ -128,24 +137,30 @@ public void indexTask(TaskSummary task) {
+ "DO UPDATE SET task_type = EXCLUDED.task_type, task_def_name = EXCLUDED.task_def_name, "
+ "status = EXCLUDED.status, update_time = EXCLUDED.update_time, json_data = EXCLUDED.json_data";

if (onlyIndexOnStatusChange) {
INSERT_TASK_INDEX_SQL += " WHERE task_index.status != EXCLUDED.status";
}

TemporalAccessor updateTa = DateTimeFormatter.ISO_INSTANT.parse(task.getUpdateTime());
Timestamp updateTime = Timestamp.from(Instant.from(updateTa));

TemporalAccessor startTa = DateTimeFormatter.ISO_INSTANT.parse(task.getStartTime());
Timestamp startTime = Timestamp.from(Instant.from(startTa));

queryWithTransaction(
INSERT_TASK_INDEX_SQL,
q ->
q.addParameter(task.getTaskId())
.addParameter(task.getTaskType())
.addParameter(task.getTaskDefName())
.addParameter(task.getStatus().toString())
.addParameter(startTime)
.addParameter(updateTime)
.addParameter(task.getWorkflowType())
.addJsonParameter(task)
.executeUpdate());
int rowsUpdated =
queryWithTransaction(
INSERT_TASK_INDEX_SQL,
q ->
q.addParameter(task.getTaskId())
.addParameter(task.getTaskType())
.addParameter(task.getTaskDefName())
.addParameter(task.getStatus().toString())
.addParameter(startTime)
.addParameter(updateTime)
.addParameter(task.getWorkflowType())
.addJsonParameter(task)
.executeUpdate());
logger.debug("Postgres index task rows updated: {}", rowsUpdated);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
* Copyright 2023 Conductor Authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.postgres.dao;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.*;

import javax.sql.DataSource;

import org.flywaydb.core.Flyway;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.flyway.FlywayAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;

import com.netflix.conductor.common.config.TestObjectMapperConfiguration;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.run.TaskSummary;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.common.run.WorkflowSummary;
import com.netflix.conductor.postgres.config.PostgresConfiguration;
import com.netflix.conductor.postgres.util.Query;

import com.fasterxml.jackson.databind.ObjectMapper;

import static org.junit.Assert.assertEquals;

@ContextConfiguration(
classes = {
TestObjectMapperConfiguration.class,
PostgresConfiguration.class,
FlywayAutoConfiguration.class
})
@RunWith(SpringRunner.class)
@TestPropertySource(
properties = {
"conductor.app.asyncIndexingEnabled=false",
"conductor.elasticsearch.version=0",
"conductor.indexing.type=postgres",
"conductor.postgres.onlyIndexOnStatusChange=true",
"spring.flyway.clean-disabled=false"
})
@SpringBootTest
public class PostgresIndexDAOStatusChangeOnlyTest {

@Autowired private PostgresIndexDAO indexDAO;

@Autowired private ObjectMapper objectMapper;

@Qualifier("dataSource")
@Autowired
private DataSource dataSource;

@Autowired Flyway flyway;

// clean the database between tests.
@Before
public void before() {
flyway.migrate();
}

private WorkflowSummary getMockWorkflowSummary(String id) {
WorkflowSummary wfs = new WorkflowSummary();
wfs.setWorkflowId(id);
wfs.setCorrelationId("correlation-id");
wfs.setWorkflowType("workflow-type");
wfs.setStartTime("2023-02-07T08:42:45Z");
wfs.setStatus(Workflow.WorkflowStatus.RUNNING);
return wfs;
}

private TaskSummary getMockTaskSummary(String taskId) {
TaskSummary ts = new TaskSummary();
ts.setTaskId(taskId);
ts.setTaskType("task-type");
ts.setTaskDefName("task-def-name");
ts.setStatus(Task.Status.SCHEDULED);
ts.setStartTime("2023-02-07T09:41:45Z");
ts.setUpdateTime("2023-02-07T09:42:45Z");
ts.setWorkflowType("workflow-type");
return ts;
}

private List<Map<String, Object>> queryDb(String query) throws SQLException {
try (Connection c = dataSource.getConnection()) {
try (Query q = new Query(objectMapper, c, query)) {
return q.executeAndFetchMap();
}
}
}

public void checkWorkflow(String workflowId, String status, String correlationId)
throws SQLException {
List<Map<String, Object>> result =
queryDb(
String.format(
"SELECT * FROM workflow_index WHERE workflow_id = '%s'",
workflowId));
assertEquals("Wrong number of rows returned", 1, result.size());
assertEquals("Wrong status returned", status, result.get(0).get("status"));
assertEquals(
"Correlation id does not match",
correlationId,
result.get(0).get("correlation_id"));
}

public void checkTask(String taskId, String status, String updateTime) throws SQLException {
List<Map<String, Object>> result =
queryDb(String.format("SELECT * FROM task_index WHERE task_id = '%s'", taskId));
assertEquals("Wrong number of rows returned", 1, result.size());
assertEquals("Wrong status returned", status, result.get(0).get("status"));
assertEquals(
"Update time does not match",
updateTime,
result.get(0).get("update_time").toString());
}

@Test
public void testIndexWorkflowOnlyStatusChange() throws SQLException {
WorkflowSummary wfs = getMockWorkflowSummary("workflow-id");
indexDAO.indexWorkflow(wfs);

// retrieve the record, make sure it exists
checkWorkflow("workflow-id", "RUNNING", "correlation-id");

// Change the record, but not the status, and re-index
wfs.setCorrelationId("new-correlation-id");
indexDAO.indexWorkflow(wfs);

// retrieve the record, make sure it hasn't changed
checkWorkflow("workflow-id", "RUNNING", "correlation-id");

// Change the status and re-index
wfs.setStatus(Workflow.WorkflowStatus.FAILED);
indexDAO.indexWorkflow(wfs);

// retrieve the record, make sure it has changed
checkWorkflow("workflow-id", "FAILED", "new-correlation-id");
}

@Test
public void testIndexTaskOnlyStatusChange() throws SQLException {
TaskSummary ts = getMockTaskSummary("task-id");

indexDAO.indexTask(ts);

// retrieve the record, make sure it exists
checkTask("task-id", "SCHEDULED", "2023-02-07 09:42:45.0");

// Change the record, but not the status
ts.setUpdateTime("2023-02-07T10:42:45Z");
indexDAO.indexTask(ts);

// retrieve the record, make sure it hasn't changed
checkTask("task-id", "SCHEDULED", "2023-02-07 09:42:45.0");

// Change the status and re-index
ts.setStatus(Task.Status.FAILED);
indexDAO.indexTask(ts);

// retrieve the record, make sure it has changed
checkTask("task-id", "FAILED", "2023-02-07 10:42:45.0");
}
}
Loading