diff --git a/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java b/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java index bc5ed4776..da843222d 100644 --- a/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java +++ b/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java @@ -379,7 +379,6 @@ public void removeWorkflow(String workflowId, boolean archiveWorkflow) { private void removeWorkflowIndex(WorkflowModel workflow, boolean archiveWorkflow) throws JsonProcessingException { - LOGGER.info("Removing workflow"); if (archiveWorkflow) { if (workflow.getStatus().isTerminal()) { // Only allow archival if workflow is in terminal state diff --git a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresIndexDAO.java b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresIndexDAO.java index 86935a2cc..8dc0dfaa5 100644 --- a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresIndexDAO.java +++ b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresIndexDAO.java @@ -58,59 +58,54 @@ public PostgresIndexDAO( int workerQueueSize = properties.getAsyncWorkerQueueSize(); // Set up a workerpool for performing async operations. - this.executorService = - new ThreadPoolExecutor( - CORE_POOL_SIZE, - maximumPoolSize, - KEEP_ALIVE_TIME, - TimeUnit.MINUTES, - new LinkedBlockingQueue<>(workerQueueSize), - (runnable, executor) -> { - logger.warn( - "Request {} to async dao discarded in executor {}", - runnable, - executor); - Monitors.recordDiscardedIndexingCount("indexQueue"); - }); + this.executorService = new ThreadPoolExecutor( + CORE_POOL_SIZE, + maximumPoolSize, + KEEP_ALIVE_TIME, + TimeUnit.MINUTES, + new LinkedBlockingQueue<>(workerQueueSize), + (runnable, executor) -> { + logger.warn( + "Request {} to async dao discarded in executor {}", + runnable, + executor); + Monitors.recordDiscardedIndexingCount("indexQueue"); + }); } @Override public void indexWorkflow(WorkflowSummary workflow) { - String INSERT_WORKFLOW_INDEX_SQL = - "INSERT INTO workflow_index (workflow_id, correlation_id, workflow_type, start_time, status, json_data)" - + "VALUES (?, ?, ?, ?, ?, ?::JSONB) ON CONFLICT (workflow_id) \n" - + "DO UPDATE SET correlation_id = EXCLUDED.correlation_id, workflow_type = EXCLUDED.workflow_type, " - + "start_time = EXCLUDED.start_time, status = EXCLUDED.status, json_data = EXCLUDED.json_data"; + String INSERT_WORKFLOW_INDEX_SQL = "INSERT INTO workflow_index (workflow_id, correlation_id, workflow_type, start_time, status, json_data)" + + "VALUES (?, ?, ?, ?, ?, ?::JSONB) ON CONFLICT (workflow_id) \n" + + "DO UPDATE SET correlation_id = EXCLUDED.correlation_id, workflow_type = EXCLUDED.workflow_type, " + + "start_time = EXCLUDED.start_time, status = EXCLUDED.status, json_data = EXCLUDED.json_data"; TemporalAccessor ta = DateTimeFormatter.ISO_INSTANT.parse(workflow.getStartTime()); Timestamp startTime = Timestamp.from(Instant.from(ta)); queryWithTransaction( INSERT_WORKFLOW_INDEX_SQL, - q -> - q.addParameter(workflow.getWorkflowId()) - .addParameter(workflow.getCorrelationId()) - .addParameter(workflow.getWorkflowType()) - .addParameter(startTime) - .addParameter(workflow.getStatus().toString()) - .addJsonParameter(workflow) - .executeUpdate()); + q -> q.addParameter(workflow.getWorkflowId()) + .addParameter(workflow.getCorrelationId()) + .addParameter(workflow.getWorkflowType()) + .addParameter(startTime) + .addParameter(workflow.getStatus().toString()) + .addJsonParameter(workflow) + .executeUpdate()); } @Override public SearchResult searchWorkflowSummary( String query, String freeText, int start, int count, List sort) { - PostgresIndexQueryBuilder queryBuilder = - new PostgresIndexQueryBuilder( - "workflow_index", query, freeText, start, count, sort); - - List results = - queryWithTransaction( - queryBuilder.getQuery(), - q -> { - queryBuilder.addParameters(q); - return q.executeAndFetch(WorkflowSummary.class); - }); + PostgresIndexQueryBuilder queryBuilder = new PostgresIndexQueryBuilder( + "workflow_index", query, freeText, start, count, sort); + + List results = queryWithTransaction( + queryBuilder.getQuery(), + q -> { + queryBuilder.addParameters(q); + return q.executeAndFetch(WorkflowSummary.class); + }); // To avoid making a second potentially expensive query to postgres say we've // got enough results for another page so the pagination works @@ -120,11 +115,10 @@ public SearchResult searchWorkflowSummary( @Override public void indexTask(TaskSummary task) { - String INSERT_TASK_INDEX_SQL = - "INSERT INTO task_index (task_id, task_type, task_def_name, status, start_time, update_time, workflow_type, json_data)" - + "VALUES (?, ?, ?, ?, ?, ?, ?, ?::JSONB) ON CONFLICT (task_id) " - + "DO UPDATE SET task_type = EXCLUDED.task_type, task_def_name = EXCLUDED.task_def_name, " - + "status = EXCLUDED.status, update_time = EXCLUDED.update_time, json_data = EXCLUDED.json_data"; + String INSERT_TASK_INDEX_SQL = "INSERT INTO task_index (task_id, task_type, task_def_name, status, start_time, update_time, workflow_type, json_data)" + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?::JSONB) ON CONFLICT (task_id) " + + "DO UPDATE SET task_type = EXCLUDED.task_type, task_def_name = EXCLUDED.task_def_name, " + + "status = EXCLUDED.status, update_time = EXCLUDED.update_time, json_data = EXCLUDED.json_data"; TemporalAccessor updateTa = DateTimeFormatter.ISO_INSTANT.parse(task.getUpdateTime()); Timestamp updateTime = Timestamp.from(Instant.from(updateTa)); @@ -134,31 +128,29 @@ public void indexTask(TaskSummary task) { queryWithTransaction( INSERT_TASK_INDEX_SQL, - q -> - q.addParameter(task.getTaskId()) - .addParameter(task.getTaskType()) - .addParameter(task.getTaskDefName()) - .addParameter(task.getStatus().toString()) - .addParameter(startTime) - .addParameter(updateTime) - .addParameter(task.getWorkflowType()) - .addJsonParameter(task) - .executeUpdate()); + q -> q.addParameter(task.getTaskId()) + .addParameter(task.getTaskType()) + .addParameter(task.getTaskDefName()) + .addParameter(task.getStatus().toString()) + .addParameter(startTime) + .addParameter(updateTime) + .addParameter(task.getWorkflowType()) + .addJsonParameter(task) + .executeUpdate()); } @Override public SearchResult searchTaskSummary( String query, String freeText, int start, int count, List sort) { - PostgresIndexQueryBuilder queryBuilder = - new PostgresIndexQueryBuilder("task_index", query, freeText, start, count, sort); + PostgresIndexQueryBuilder queryBuilder = new PostgresIndexQueryBuilder("task_index", query, freeText, start, + count, sort); - List results = - queryWithTransaction( - queryBuilder.getQuery(), - q -> { - queryBuilder.addParameters(q); - return q.executeAndFetch(TaskSummary.class); - }); + List results = queryWithTransaction( + queryBuilder.getQuery(), + q -> { + queryBuilder.addParameters(q); + return q.executeAndFetch(TaskSummary.class); + }); // To avoid making a second potentially expensive query to postgres say we've // got enough results for another page so the pagination works @@ -168,16 +160,14 @@ public SearchResult searchTaskSummary( @Override public void addTaskExecutionLogs(List logs) { - String INSERT_LOG = - "INSERT INTO task_execution_logs (task_id, created_time, log) VALUES (?, ?, ?)"; + String INSERT_LOG = "INSERT INTO task_execution_logs (task_id, created_time, log) VALUES (?, ?, ?)"; for (TaskExecLog log : logs) { queryWithTransaction( INSERT_LOG, - q -> - q.addParameter(log.getTaskId()) - .addParameter(new Timestamp(log.getCreatedTime())) - .addParameter(log.getLog()) - .executeUpdate()); + q -> q.addParameter(log.getTaskId()) + .addParameter(new Timestamp(log.getCreatedTime())) + .addParameter(log.getLog()) + .executeUpdate()); } } @@ -185,25 +175,25 @@ public void addTaskExecutionLogs(List logs) { public List getTaskExecutionLogs(String taskId) { return queryWithTransaction( "SELECT log, task_id, created_time FROM task_execution_logs WHERE task_id = ? ORDER BY created_time ASC", - q -> - q.addParameter(taskId) - .executeAndFetch( - rs -> { - List result = new ArrayList<>(); - while (rs.next()) { - TaskExecLog log = new TaskExecLog(); - log.setLog(rs.getString("log")); - log.setTaskId(rs.getString("task_id")); - log.setCreatedTime( - rs.getDate("created_time").getTime()); - result.add(log); - } - return result; - })); + q -> q.addParameter(taskId) + .executeAndFetch( + rs -> { + List result = new ArrayList<>(); + while (rs.next()) { + TaskExecLog log = new TaskExecLog(); + log.setLog(rs.getString("log")); + log.setTaskId(rs.getString("task_id")); + log.setCreatedTime( + rs.getDate("created_time").getTime()); + result.add(log); + } + return result; + })); } @Override - public void setup() {} + public void setup() { + } @Override public CompletableFuture asyncIndexWorkflow(WorkflowSummary workflow) { @@ -233,7 +223,6 @@ public SearchResult searchTasks( @Override public void removeWorkflow(String workflowId) { - logger.info("Removing workflow (postgres) {}", workflowId); String REMOVE_WORKFLOW_SQL = "DELETE FROM workflow_index WHERE workflow_id = ?"; queryWithTransaction(REMOVE_WORKFLOW_SQL, q -> q.addParameter(workflowId).executeUpdate()); @@ -241,7 +230,6 @@ public void removeWorkflow(String workflowId) { @Override public CompletableFuture asyncRemoveWorkflow(String workflowId) { - logger.info("Async removing workflow (postgres) {}", workflowId); return CompletableFuture.runAsync(() -> removeWorkflow(workflowId), executorService); } @@ -259,19 +247,16 @@ public CompletableFuture asyncUpdateWorkflow( @Override public void removeTask(String workflowId, String taskId) { - logger.info("Removing task (postgres) {}", taskId); - String REMOVE_TASK_SQL = - "WITH task_delete AS (DELETE FROM task_index WHERE workflow_id = ?)" - + "DELETE FROM task_execution_logs WHERE task_id =?"; + String REMOVE_TASK_SQL = "WITH task_delete AS (DELETE FROM task_index WHERE task_id = ?)" + + "DELETE FROM task_execution_logs WHERE task_id =?"; queryWithTransaction( REMOVE_TASK_SQL, - q -> q.addParameter(workflowId).addParameter(workflowId).executeUpdate()); + q -> q.addParameter(taskId).addParameter(taskId).executeUpdate()); } @Override public CompletableFuture asyncRemoveTask(String workflowId, String taskId) { - logger.info("Async removing task (postgres) {}", workflowId); return CompletableFuture.runAsync(() -> removeTask(workflowId, taskId), executorService); } diff --git a/postgres-persistence/src/main/resources/db/migration_postgres/V9__workflow_id_index.sql b/postgres-persistence/src/main/resources/db/migration_postgres/V9__workflow_id_index.sql deleted file mode 100644 index e5ae9e7bb..000000000 --- a/postgres-persistence/src/main/resources/db/migration_postgres/V9__workflow_id_index.sql +++ /dev/null @@ -1,6 +0,0 @@ -ALTER TABLE - task_index -ADD - workflow_id TEXT GENERATED ALWAYS AS (json_data ->> 'workflowId') STORED; - -CREATE INDEX task_index_workflow_id_index ON task_index(workflow_id); diff --git a/postgres-persistence/src/test/java/com/netflix/conductor/postgres/dao/PostgresIndexDAOTest.java b/postgres-persistence/src/test/java/com/netflix/conductor/postgres/dao/PostgresIndexDAOTest.java index b3e3595f3..74d3bb81c 100644 --- a/postgres-persistence/src/test/java/com/netflix/conductor/postgres/dao/PostgresIndexDAOTest.java +++ b/postgres-persistence/src/test/java/com/netflix/conductor/postgres/dao/PostgresIndexDAOTest.java @@ -48,387 +48,402 @@ import static org.junit.Assert.*; -@ContextConfiguration( - classes = { - TestObjectMapperConfiguration.class, - PostgresConfiguration.class, - FlywayAutoConfiguration.class - }) +@ContextConfiguration(classes = { + TestObjectMapperConfiguration.class, + PostgresConfiguration.class, + FlywayAutoConfiguration.class +}) @RunWith(SpringRunner.class) -@TestPropertySource( - properties = { - "conductor.app.asyncIndexingEnabled=false", - "conductor.elasticsearch.version=0", - "conductor.indexing.type=postgres", - "spring.flyway.clean-disabled=false" - }) +@TestPropertySource(properties = { + "conductor.app.asyncIndexingEnabled=false", + "conductor.elasticsearch.version=0", + "conductor.indexing.type=postgres", + "spring.flyway.clean-disabled=false" +}) @SpringBootTest public class PostgresIndexDAOTest { - @Autowired private PostgresIndexDAO indexDAO; - - @Autowired private ObjectMapper objectMapper; - - @Qualifier("dataSource") - @Autowired - private DataSource dataSource; - - @Autowired Flyway flyway; - - // clean the database between tests. - @Before - public void before() { - flyway.migrate(); - } - - private WorkflowSummary getMockWorkflowSummary(String id) { - WorkflowSummary wfs = new WorkflowSummary(); - wfs.setWorkflowId(id); - wfs.setCorrelationId("correlation-id"); - wfs.setWorkflowType("workflow-type"); - wfs.setStartTime("2023-02-07T08:42:45Z"); - wfs.setStatus(Workflow.WorkflowStatus.COMPLETED); - return wfs; - } - - private TaskSummary getMockTaskSummary(String taskId) { - TaskSummary ts = new TaskSummary(); - ts.setTaskId(taskId); - ts.setTaskType("task-type"); - ts.setTaskDefName("task-def-name"); - ts.setStatus(Task.Status.COMPLETED); - ts.setStartTime("2023-02-07T09:41:45Z"); - ts.setUpdateTime("2023-02-07T09:42:45Z"); - ts.setWorkflowType("workflow-type"); - return ts; - } - - private TaskExecLog getMockTaskExecutionLog(String taskId, long createdTime, String log) { - TaskExecLog tse = new TaskExecLog(); - tse.setTaskId(taskId); - tse.setLog(log); - tse.setCreatedTime(createdTime); - return tse; - } - - private void compareWorkflowSummary(WorkflowSummary wfs) throws SQLException { - List> result = - queryDb( - String.format( - "SELECT * FROM workflow_index WHERE workflow_id = '%s'", - wfs.getWorkflowId())); - assertEquals("Wrong number of rows returned", 1, result.size()); - assertEquals( - "Workflow id does not match", - wfs.getWorkflowId(), - result.get(0).get("workflow_id")); - assertEquals( - "Correlation id does not match", - wfs.getCorrelationId(), - result.get(0).get("correlation_id")); - assertEquals( - "Workflow type does not match", - wfs.getWorkflowType(), - result.get(0).get("workflow_type")); - TemporalAccessor ta = DateTimeFormatter.ISO_INSTANT.parse(wfs.getStartTime()); - Timestamp startTime = Timestamp.from(Instant.from(ta)); - assertEquals("Start time does not match", startTime, result.get(0).get("start_time")); - assertEquals( - "Status does not match", wfs.getStatus().toString(), result.get(0).get("status")); - } - - private List> queryDb(String query) throws SQLException { - try (Connection c = dataSource.getConnection()) { - try (Query q = new Query(objectMapper, c, query)) { - return q.executeAndFetchMap(); - } + @Autowired + private PostgresIndexDAO indexDAO; + + @Autowired + private ObjectMapper objectMapper; + + @Qualifier("dataSource") + @Autowired + private DataSource dataSource; + + @Autowired + Flyway flyway; + + // clean the database between tests. + @Before + public void before() { + flyway.migrate(); + } + + private WorkflowSummary getMockWorkflowSummary(String id) { + WorkflowSummary wfs = new WorkflowSummary(); + wfs.setWorkflowId(id); + wfs.setCorrelationId("correlation-id"); + wfs.setWorkflowType("workflow-type"); + wfs.setStartTime("2023-02-07T08:42:45Z"); + wfs.setStatus(Workflow.WorkflowStatus.COMPLETED); + return wfs; + } + + private TaskSummary getMockTaskSummary(String taskId) { + TaskSummary ts = new TaskSummary(); + ts.setTaskId(taskId); + ts.setTaskType("task-type"); + ts.setTaskDefName("task-def-name"); + ts.setStatus(Task.Status.COMPLETED); + ts.setStartTime("2023-02-07T09:41:45Z"); + ts.setUpdateTime("2023-02-07T09:42:45Z"); + ts.setWorkflowType("workflow-type"); + return ts; + } + + private TaskExecLog getMockTaskExecutionLog(String taskId, long createdTime, String log) { + TaskExecLog tse = new TaskExecLog(); + tse.setTaskId(taskId); + tse.setLog(log); + tse.setCreatedTime(createdTime); + return tse; + } + + private void compareWorkflowSummary(WorkflowSummary wfs) throws SQLException { + List> result = queryDb( + String.format( + "SELECT * FROM workflow_index WHERE workflow_id = '%s'", + wfs.getWorkflowId())); + assertEquals("Wrong number of rows returned", 1, result.size()); + assertEquals( + "Workflow id does not match", + wfs.getWorkflowId(), + result.get(0).get("workflow_id")); + assertEquals( + "Correlation id does not match", + wfs.getCorrelationId(), + result.get(0).get("correlation_id")); + assertEquals( + "Workflow type does not match", + wfs.getWorkflowType(), + result.get(0).get("workflow_type")); + TemporalAccessor ta = DateTimeFormatter.ISO_INSTANT.parse(wfs.getStartTime()); + Timestamp startTime = Timestamp.from(Instant.from(ta)); + assertEquals("Start time does not match", startTime, result.get(0).get("start_time")); + assertEquals( + "Status does not match", wfs.getStatus().toString(), result.get(0).get("status")); + } + + private List> queryDb(String query) throws SQLException { + try (Connection c = dataSource.getConnection()) { + try (Query q = new Query(objectMapper, c, query)) { + return q.executeAndFetchMap(); + } + } + } + + private void compareTaskSummary(TaskSummary ts) throws SQLException { + List> result = queryDb( + String.format( + "SELECT * FROM task_index WHERE task_id = '%s'", ts.getTaskId())); + assertEquals("Wrong number of rows returned", 1, result.size()); + assertEquals("Task id does not match", ts.getTaskId(), result.get(0).get("task_id")); + assertEquals("Task type does not match", ts.getTaskType(), result.get(0).get("task_type")); + assertEquals( + "Task def name does not match", + ts.getTaskDefName(), + result.get(0).get("task_def_name")); + TemporalAccessor startTa = DateTimeFormatter.ISO_INSTANT.parse(ts.getStartTime()); + Timestamp startTime = Timestamp.from(Instant.from(startTa)); + assertEquals("Start time does not match", startTime, result.get(0).get("start_time")); + TemporalAccessor updateTa = DateTimeFormatter.ISO_INSTANT.parse(ts.getUpdateTime()); + Timestamp updateTime = Timestamp.from(Instant.from(updateTa)); + assertEquals("Update time does not match", updateTime, result.get(0).get("update_time")); + assertEquals( + "Status does not match", ts.getStatus().toString(), result.get(0).get("status")); + assertEquals( + "Workflow type does not match", + ts.getWorkflowType().toString(), + result.get(0).get("workflow_type")); + } + + @Test + public void testIndexNewWorkflow() throws SQLException { + WorkflowSummary wfs = getMockWorkflowSummary("workflow-id"); + + indexDAO.indexWorkflow(wfs); + + compareWorkflowSummary(wfs); + } + + @Test + public void testIndexExistingWorkflow() throws SQLException { + WorkflowSummary wfs = getMockWorkflowSummary("workflow-id"); + + indexDAO.indexWorkflow(wfs); + + compareWorkflowSummary(wfs); + + wfs.setStatus(Workflow.WorkflowStatus.FAILED); + + indexDAO.indexWorkflow(wfs); + + compareWorkflowSummary(wfs); + } + + @Test + public void testIndexNewTask() throws SQLException { + TaskSummary ts = getMockTaskSummary("task-id"); + + indexDAO.indexTask(ts); + + compareTaskSummary(ts); + } + + @Test + public void testIndexExistingTask() throws SQLException { + TaskSummary ts = getMockTaskSummary("task-id"); + + indexDAO.indexTask(ts); + + compareTaskSummary(ts); + + ts.setStatus(Task.Status.FAILED); + + indexDAO.indexTask(ts); + + compareTaskSummary(ts); } - } - private void compareTaskSummary(TaskSummary ts) throws SQLException { - List> result = - queryDb( - String.format( - "SELECT * FROM task_index WHERE task_id = '%s'", ts.getTaskId())); - assertEquals("Wrong number of rows returned", 1, result.size()); - assertEquals("Task id does not match", ts.getTaskId(), result.get(0).get("task_id")); - assertEquals("Task type does not match", ts.getTaskType(), result.get(0).get("task_type")); - assertEquals( - "Task def name does not match", - ts.getTaskDefName(), - result.get(0).get("task_def_name")); - TemporalAccessor startTa = DateTimeFormatter.ISO_INSTANT.parse(ts.getStartTime()); - Timestamp startTime = Timestamp.from(Instant.from(startTa)); - assertEquals("Start time does not match", startTime, result.get(0).get("start_time")); - TemporalAccessor updateTa = DateTimeFormatter.ISO_INSTANT.parse(ts.getUpdateTime()); - Timestamp updateTime = Timestamp.from(Instant.from(updateTa)); - assertEquals("Update time does not match", updateTime, result.get(0).get("update_time")); - assertEquals( - "Status does not match", ts.getStatus().toString(), result.get(0).get("status")); - assertEquals( - "Workflow type does not match", - ts.getWorkflowType().toString(), - result.get(0).get("workflow_type")); - } - - @Test - public void testIndexNewWorkflow() throws SQLException { - WorkflowSummary wfs = getMockWorkflowSummary("workflow-id"); - - indexDAO.indexWorkflow(wfs); - - compareWorkflowSummary(wfs); - } - - @Test - public void testIndexExistingWorkflow() throws SQLException { - WorkflowSummary wfs = getMockWorkflowSummary("workflow-id"); - - indexDAO.indexWorkflow(wfs); - - compareWorkflowSummary(wfs); - - wfs.setStatus(Workflow.WorkflowStatus.FAILED); - - indexDAO.indexWorkflow(wfs); - - compareWorkflowSummary(wfs); - } - - @Test - public void testIndexNewTask() throws SQLException { - TaskSummary ts = getMockTaskSummary("task-id"); - - indexDAO.indexTask(ts); - - compareTaskSummary(ts); - } - - @Test - public void testIndexExistingTask() throws SQLException { - TaskSummary ts = getMockTaskSummary("task-id"); - - indexDAO.indexTask(ts); - - compareTaskSummary(ts); - - ts.setStatus(Task.Status.FAILED); - - indexDAO.indexTask(ts); - - compareTaskSummary(ts); - } - - @Test - public void testAddTaskExecutionLogs() throws SQLException { - List logs = new ArrayList<>(); - String taskId = UUID.randomUUID().toString(); - logs.add(getMockTaskExecutionLog(taskId, 1675845986000L, "Log 1")); - logs.add(getMockTaskExecutionLog(taskId, 1675845987000L, "Log 2")); - - indexDAO.addTaskExecutionLogs(logs); - - List> records = - queryDb("SELECT * FROM task_execution_logs ORDER BY created_time ASC"); - assertEquals("Wrong number of logs returned", 2, records.size()); - assertEquals(logs.get(0).getLog(), records.get(0).get("log")); - assertEquals(new Date(1675845986000L), records.get(0).get("created_time")); - assertEquals(logs.get(1).getLog(), records.get(1).get("log")); - assertEquals(new Date(1675845987000L), records.get(1).get("created_time")); - } - - @Test - public void testSearchWorkflowSummary() { - WorkflowSummary wfs = getMockWorkflowSummary("workflow-id"); - - indexDAO.indexWorkflow(wfs); - - String query = String.format("workflowId=\"%s\"", wfs.getWorkflowId()); - SearchResult results = - indexDAO.searchWorkflowSummary(query, "*", 0, 15, new ArrayList()); - assertEquals("No results returned", 1, results.getResults().size()); - assertEquals( - "Wrong workflow returned", - wfs.getWorkflowId(), - results.getResults().get(0).getWorkflowId()); - } - - @Test - public void testFullTextSearchWorkflowSummary() { - WorkflowSummary wfs = getMockWorkflowSummary("workflow-id"); - - indexDAO.indexWorkflow(wfs); - - String freeText = "notworkflow-id"; - SearchResult results = - indexDAO.searchWorkflowSummary("", freeText, 0, 15, new ArrayList()); - assertEquals("Wrong number of results returned", 0, results.getResults().size()); - - freeText = "workflow-id"; - results = indexDAO.searchWorkflowSummary("", freeText, 0, 15, new ArrayList()); - assertEquals("No results returned", 1, results.getResults().size()); - assertEquals( - "Wrong workflow returned", - wfs.getWorkflowId(), - results.getResults().get(0).getWorkflowId()); - } + @Test + public void testAddTaskExecutionLogs() throws SQLException { + List logs = new ArrayList<>(); + String taskId = UUID.randomUUID().toString(); + logs.add(getMockTaskExecutionLog(taskId, 1675845986000L, "Log 1")); + logs.add(getMockTaskExecutionLog(taskId, 1675845987000L, "Log 2")); + + indexDAO.addTaskExecutionLogs(logs); + + List> records = queryDb( + "SELECT * FROM task_execution_logs ORDER BY created_time ASC"); + assertEquals("Wrong number of logs returned", 2, records.size()); + assertEquals(logs.get(0).getLog(), records.get(0).get("log")); + assertEquals(new Date(1675845986000L), records.get(0).get("created_time")); + assertEquals(logs.get(1).getLog(), records.get(1).get("log")); + assertEquals(new Date(1675845987000L), records.get(1).get("created_time")); + } + + @Test + public void testSearchWorkflowSummary() { + WorkflowSummary wfs = getMockWorkflowSummary("workflow-id"); - @Test - public void testJsonSearchWorkflowSummary() { - WorkflowSummary wfs = getMockWorkflowSummary("workflow-id"); - wfs.setVersion(3); + indexDAO.indexWorkflow(wfs); + + String query = String.format("workflowId=\"%s\"", wfs.getWorkflowId()); + SearchResult results = indexDAO.searchWorkflowSummary(query, "*", 0, 15, + new ArrayList()); + assertEquals("No results returned", 1, results.getResults().size()); + assertEquals( + "Wrong workflow returned", + wfs.getWorkflowId(), + results.getResults().get(0).getWorkflowId()); + } - indexDAO.indexWorkflow(wfs); + @Test + public void testFullTextSearchWorkflowSummary() { + WorkflowSummary wfs = getMockWorkflowSummary("workflow-id"); - String freeText = "{\"correlationId\":\"not-the-id\"}"; - SearchResult results = - indexDAO.searchWorkflowSummary("", freeText, 0, 15, new ArrayList()); - assertEquals("Wrong number of results returned", 0, results.getResults().size()); + indexDAO.indexWorkflow(wfs); - freeText = "{\"correlationId\":\"correlation-id\", \"version\":3}"; - results = indexDAO.searchWorkflowSummary("", freeText, 0, 15, new ArrayList()); - assertEquals("No results returned", 1, results.getResults().size()); - assertEquals( - "Wrong workflow returned", - wfs.getWorkflowId(), - results.getResults().get(0).getWorkflowId()); - } - - @Test - public void testSearchWorkflowSummaryPagination() { - for (int i = 0; i < 5; i++) { - WorkflowSummary wfs = getMockWorkflowSummary("workflow-id-" + i); - indexDAO.indexWorkflow(wfs); + String freeText = "notworkflow-id"; + SearchResult results = indexDAO.searchWorkflowSummary("", freeText, 0, 15, + new ArrayList()); + assertEquals("Wrong number of results returned", 0, results.getResults().size()); + + freeText = "workflow-id"; + results = indexDAO.searchWorkflowSummary("", freeText, 0, 15, new ArrayList()); + assertEquals("No results returned", 1, results.getResults().size()); + assertEquals( + "Wrong workflow returned", + wfs.getWorkflowId(), + results.getResults().get(0).getWorkflowId()); + } + + @Test + public void testJsonSearchWorkflowSummary() { + WorkflowSummary wfs = getMockWorkflowSummary("workflow-id"); + wfs.setVersion(3); + + indexDAO.indexWorkflow(wfs); + + String freeText = "{\"correlationId\":\"not-the-id\"}"; + SearchResult results = indexDAO.searchWorkflowSummary("", freeText, 0, 15, + new ArrayList()); + assertEquals("Wrong number of results returned", 0, results.getResults().size()); + + freeText = "{\"correlationId\":\"correlation-id\", \"version\":3}"; + results = indexDAO.searchWorkflowSummary("", freeText, 0, 15, new ArrayList()); + assertEquals("No results returned", 1, results.getResults().size()); + assertEquals( + "Wrong workflow returned", + wfs.getWorkflowId(), + results.getResults().get(0).getWorkflowId()); + } + + @Test + public void testSearchWorkflowSummaryPagination() { + for (int i = 0; i < 5; i++) { + WorkflowSummary wfs = getMockWorkflowSummary("workflow-id-" + i); + indexDAO.indexWorkflow(wfs); + } + + List orderBy = Arrays.asList(new String[] { "workflowId:DESC" }); + SearchResult results = indexDAO.searchWorkflowSummary("", "*", 0, 2, orderBy); + assertEquals("Wrong totalHits returned", 3, results.getTotalHits()); + assertEquals("Wrong number of results returned", 2, results.getResults().size()); + assertEquals( + "Results returned in wrong order", + "workflow-id-4", + results.getResults().get(0).getWorkflowId()); + assertEquals( + "Results returned in wrong order", + "workflow-id-3", + results.getResults().get(1).getWorkflowId()); + results = indexDAO.searchWorkflowSummary("", "*", 2, 2, orderBy); + assertEquals("Wrong totalHits returned", 5, results.getTotalHits()); + assertEquals("Wrong number of results returned", 2, results.getResults().size()); + assertEquals( + "Results returned in wrong order", + "workflow-id-2", + results.getResults().get(0).getWorkflowId()); + assertEquals( + "Results returned in wrong order", + "workflow-id-1", + results.getResults().get(1).getWorkflowId()); + results = indexDAO.searchWorkflowSummary("", "*", 4, 2, orderBy); + assertEquals("Wrong totalHits returned", 7, results.getTotalHits()); + assertEquals("Wrong number of results returned", 2, results.getResults().size()); + assertEquals( + "Results returned in wrong order", + "workflow-id-0", + results.getResults().get(0).getWorkflowId()); } - List orderBy = Arrays.asList(new String[] {"workflowId:DESC"}); - SearchResult results = - indexDAO.searchWorkflowSummary("", "*", 0, 2, orderBy); - assertEquals("Wrong totalHits returned", 3, results.getTotalHits()); - assertEquals("Wrong number of results returned", 2, results.getResults().size()); - assertEquals( - "Results returned in wrong order", - "workflow-id-4", - results.getResults().get(0).getWorkflowId()); - assertEquals( - "Results returned in wrong order", - "workflow-id-3", - results.getResults().get(1).getWorkflowId()); - results = indexDAO.searchWorkflowSummary("", "*", 2, 2, orderBy); - assertEquals("Wrong totalHits returned", 5, results.getTotalHits()); - assertEquals("Wrong number of results returned", 2, results.getResults().size()); - assertEquals( - "Results returned in wrong order", - "workflow-id-2", - results.getResults().get(0).getWorkflowId()); - assertEquals( - "Results returned in wrong order", - "workflow-id-1", - results.getResults().get(1).getWorkflowId()); - results = indexDAO.searchWorkflowSummary("", "*", 4, 2, orderBy); - assertEquals("Wrong totalHits returned", 7, results.getTotalHits()); - assertEquals("Wrong number of results returned", 2, results.getResults().size()); - assertEquals( - "Results returned in wrong order", - "workflow-id-0", - results.getResults().get(0).getWorkflowId()); - } - - @Test - public void testSearchTaskSummary() { - TaskSummary ts = getMockTaskSummary("task-id"); - - indexDAO.indexTask(ts); - - String query = String.format("taskId=\"%s\"", ts.getTaskId()); - SearchResult results = - indexDAO.searchTaskSummary(query, "*", 0, 15, new ArrayList()); - assertEquals("No results returned", 1, results.getResults().size()); - assertEquals( - "Wrong task returned", ts.getTaskId(), results.getResults().get(0).getTaskId()); - } - - @Test - public void testSearchTaskSummaryPagination() { - for (int i = 0; i < 5; i++) { - TaskSummary ts = getMockTaskSummary("task-id-" + i); - indexDAO.indexTask(ts); + @Test + public void testSearchTaskSummary() { + TaskSummary ts = getMockTaskSummary("task-id"); + + indexDAO.indexTask(ts); + + String query = String.format("taskId=\"%s\"", ts.getTaskId()); + SearchResult results = indexDAO.searchTaskSummary(query, "*", 0, 15, new ArrayList()); + assertEquals("No results returned", 1, results.getResults().size()); + assertEquals( + "Wrong task returned", ts.getTaskId(), results.getResults().get(0).getTaskId()); } - List orderBy = Arrays.asList(new String[] {"taskId:DESC"}); - SearchResult results = indexDAO.searchTaskSummary("", "*", 0, 2, orderBy); - assertEquals("Wrong totalHits returned", 3, results.getTotalHits()); - assertEquals("Wrong number of results returned", 2, results.getResults().size()); - assertEquals( - "Results returned in wrong order", - "task-id-4", - results.getResults().get(0).getTaskId()); - assertEquals( - "Results returned in wrong order", - "task-id-3", - results.getResults().get(1).getTaskId()); - results = indexDAO.searchTaskSummary("", "*", 2, 2, orderBy); - assertEquals("Wrong totalHits returned", 5, results.getTotalHits()); - assertEquals("Wrong number of results returned", 2, results.getResults().size()); - assertEquals( - "Results returned in wrong order", - "task-id-2", - results.getResults().get(0).getTaskId()); - assertEquals( - "Results returned in wrong order", - "task-id-1", - results.getResults().get(1).getTaskId()); - results = indexDAO.searchTaskSummary("", "*", 4, 2, orderBy); - assertEquals("Wrong totalHits returned", 7, results.getTotalHits()); - assertEquals("Wrong number of results returned", 2, results.getResults().size()); - assertEquals( - "Results returned in wrong order", - "task-id-0", - results.getResults().get(0).getTaskId()); - } - - @Test - public void testGetTaskExecutionLogs() throws SQLException { - List logs = new ArrayList<>(); - String taskId = UUID.randomUUID().toString(); - logs.add(getMockTaskExecutionLog(taskId, new Date(1675845986000L).getTime(), "Log 1")); - logs.add(getMockTaskExecutionLog(taskId, new Date(1675845987000L).getTime(), "Log 2")); - - indexDAO.addTaskExecutionLogs(logs); - - List records = indexDAO.getTaskExecutionLogs(logs.get(0).getTaskId()); - assertEquals("Wrong number of logs returned", 2, records.size()); - assertEquals(logs.get(0).getLog(), records.get(0).getLog()); - assertEquals(logs.get(0).getCreatedTime(), 1675845986000L); - assertEquals(logs.get(1).getLog(), records.get(1).getLog()); - assertEquals(logs.get(1).getCreatedTime(), 1675845987000L); - } - - @Test - public void testRemoveWorkflow() throws SQLException { - String workflowId = UUID.randomUUID().toString(); - WorkflowSummary wfs = getMockWorkflowSummary(workflowId); - indexDAO.indexWorkflow(wfs); - - String taskId = UUID.randomUUID().toString(); - TaskSummary ts = getMockTaskSummary(taskId, workflowId); - indexDAO.indexTask(ts); - - List logs = new ArrayList<>(); - logs.add(getMockTaskExecutionLog(taskId, new Date(1675845986000L).getTime(), "Log 1")); - logs.add(getMockTaskExecutionLog(taskId, new Date(1675845987000L).getTime(), "Log 2")); - indexDAO.addTaskExecutionLogs(logs); - - List> records = - queryDb("SELECT * FROM workflow_index WHERE workflow_id = " + workflowId); - assertEquals("Workflow index record was not deleted", 0, records.size()); - - List> records = - queryDb("SELECT * FROM task_index WHERE workflow_id = " + workflowId); - assertEquals("Task index record was not deleted", 0, records.size()); - - List> records = - queryDb("SELECT * FROM task_execution_logs there task_id = " + taskId); - assertEquals("Task execution logs were not deleted", 0, records.size()); - } + @Test + public void testSearchTaskSummaryPagination() { + for (int i = 0; i < 5; i++) { + TaskSummary ts = getMockTaskSummary("task-id-" + i); + indexDAO.indexTask(ts); + } + + List orderBy = Arrays.asList(new String[] { "taskId:DESC" }); + SearchResult results = indexDAO.searchTaskSummary("", "*", 0, 2, orderBy); + assertEquals("Wrong totalHits returned", 3, results.getTotalHits()); + assertEquals("Wrong number of results returned", 2, results.getResults().size()); + assertEquals( + "Results returned in wrong order", + "task-id-4", + results.getResults().get(0).getTaskId()); + assertEquals( + "Results returned in wrong order", + "task-id-3", + results.getResults().get(1).getTaskId()); + results = indexDAO.searchTaskSummary("", "*", 2, 2, orderBy); + assertEquals("Wrong totalHits returned", 5, results.getTotalHits()); + assertEquals("Wrong number of results returned", 2, results.getResults().size()); + assertEquals( + "Results returned in wrong order", + "task-id-2", + results.getResults().get(0).getTaskId()); + assertEquals( + "Results returned in wrong order", + "task-id-1", + results.getResults().get(1).getTaskId()); + results = indexDAO.searchTaskSummary("", "*", 4, 2, orderBy); + assertEquals("Wrong totalHits returned", 7, results.getTotalHits()); + assertEquals("Wrong number of results returned", 2, results.getResults().size()); + assertEquals( + "Results returned in wrong order", + "task-id-0", + results.getResults().get(0).getTaskId()); + } + + @Test + public void testGetTaskExecutionLogs() throws SQLException { + List logs = new ArrayList<>(); + String taskId = UUID.randomUUID().toString(); + logs.add(getMockTaskExecutionLog(taskId, new Date(1675845986000L).getTime(), "Log 1")); + logs.add(getMockTaskExecutionLog(taskId, new Date(1675845987000L).getTime(), "Log 2")); + + indexDAO.addTaskExecutionLogs(logs); + + List records = indexDAO.getTaskExecutionLogs(logs.get(0).getTaskId()); + assertEquals("Wrong number of logs returned", 2, records.size()); + assertEquals(logs.get(0).getLog(), records.get(0).getLog()); + assertEquals(logs.get(0).getCreatedTime(), 1675845986000L); + assertEquals(logs.get(1).getLog(), records.get(1).getLog()); + assertEquals(logs.get(1).getCreatedTime(), 1675845987000L); + } + + @Test + public void testRemoveWorkflow() throws SQLException { + String workflowId = UUID.randomUUID().toString(); + WorkflowSummary wfs = getMockWorkflowSummary(workflowId); + indexDAO.indexWorkflow(wfs); + + List> workflow_records = queryDb( + "SELECT * FROM workflow_index WHERE workflow_id = '" + workflowId + "'"); + assertEquals("Workflow index record was not created", 1, workflow_records.size()); + + indexDAO.removeWorkflow(workflowId); + + workflow_records = queryDb("SELECT * FROM workflow_index WHERE workflow_id = '" + workflowId + "'"); + assertEquals("Workflow index record was not deleted", 0, workflow_records.size()); + } + + @Test + public void testRemoveTask() throws SQLException { + String workflowId = UUID.randomUUID().toString(); + + String taskId = UUID.randomUUID().toString(); + TaskSummary ts = getMockTaskSummary(taskId); + indexDAO.indexTask(ts); + + List logs = new ArrayList<>(); + logs.add(getMockTaskExecutionLog(taskId, new Date(1675845986000L).getTime(), "Log 1")); + logs.add(getMockTaskExecutionLog(taskId, new Date(1675845987000L).getTime(), "Log 2")); + indexDAO.addTaskExecutionLogs(logs); + + List> task_records = queryDb( + "SELECT * FROM task_index WHERE task_id = '" + taskId + "'"); + assertEquals("Task index record was not created", 1, task_records.size()); + + List> log_records = queryDb( + "SELECT * FROM task_execution_logs WHERE task_id = '" + taskId + "'"); + assertEquals("Task execution logs were not created", 2, log_records.size()); + + indexDAO.removeTask(workflowId, taskId); + + task_records = queryDb("SELECT * FROM task_index WHERE task_id = '" + taskId + "'"); + assertEquals("Task index record was not deleted", 0, task_records.size()); + + log_records = queryDb("SELECT * FROM task_execution_logs WHERE task_id = '" + taskId + "'"); + assertEquals("Task execution logs were not deleted", 0, log_records.size()); + } }