From 29b314d5b50eb3a05dec8f6218da2014193837fd Mon Sep 17 00:00:00 2001 From: Knight1001 <30550632+knight1001d@users.noreply.github.com> Date: Tue, 9 Jul 2024 13:45:34 +0530 Subject: [PATCH 1/4] Update JDK Version in Prerequisites Conductor requires JDK 17 for building from source code. Updated the required JDK version in the docs. --- docs/devguide/running/source.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/devguide/running/source.md b/docs/devguide/running/source.md index 0c4c4dd82..3bb25560e 100644 --- a/docs/devguide/running/source.md +++ b/docs/devguide/running/source.md @@ -5,7 +5,7 @@ In this article we will explore how you can set up Conductor on your local machi features. ### Prerequisites -1. JDK 11 or greater +1. JDK 17 or greater 2. (Optional) Docker if you want to run tests. You can install docker from [here](https://www.docker.com/get-started/). 3. Node for building and running UI. Instructions at [https://nodejs.org](https://nodejs.org). 4. Yarn for building and running UI. Instructions at [https://classic.yarnpkg.com/en/docs/install](https://classic.yarnpkg.com/en/docs/install). From 02c7499c7fa3c85bebc1d58fd1d6a81f1629cc16 Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Sat, 13 Jul 2024 01:54:25 -0700 Subject: [PATCH 2/4] upgrade flyway --- build.gradle | 2 +- common/build.gradle | 3 ++- dependencies.gradle | 2 +- mysql-persistence/build.gradle | 2 +- postgres-persistence/build.gradle | 1 + .../conductor/postgres/config/PostgresConfiguration.java | 2 +- 6 files changed, 7 insertions(+), 5 deletions(-) diff --git a/build.gradle b/build.gradle index d866866e2..c6ffda75d 100644 --- a/build.gradle +++ b/build.gradle @@ -83,7 +83,7 @@ allprojects { implementation('org.apache.logging.log4j:log4j-slf4j-impl') implementation('org.apache.logging.log4j:log4j-jul') implementation('org.apache.logging.log4j:log4j-web') - + implementation "com.fasterxml.jackson.datatype:jackson-datatype-jsr310" compileOnly 'org.projectlombok:lombok:1.18.34' annotationProcessor 'org.projectlombok:lombok:1.18.34' diff --git a/common/build.gradle b/common/build.gradle index 6c75a821e..8332cb312 100644 --- a/common/build.gradle +++ b/common/build.gradle @@ -9,7 +9,7 @@ dependencies { compileOnly 'org.springframework.boot:spring-boot-starter' compileOnly 'org.springframework.boot:spring-boot-starter-validation' - implementation "org.springdoc:springdoc-openapi-starter-webmvc-ui:${revSpringDoc}" + compileOnly "org.springdoc:springdoc-openapi-starter-webmvc-ui:${revSpringDoc}" implementation "org.apache.commons:commons-lang3" @@ -23,6 +23,7 @@ dependencies { implementation "com.fasterxml.jackson.module:jackson-module-afterburner:${revFasterXml}" testImplementation 'org.springframework.boot:spring-boot-starter-validation' + testImplementation "org.springdoc:springdoc-openapi-starter-webmvc-ui:${revSpringDoc}" } /* diff --git a/dependencies.gradle b/dependencies.gradle index 93637ab1b..4d1955898 100644 --- a/dependencies.gradle +++ b/dependencies.gradle @@ -68,6 +68,6 @@ ext { revNatsStreaming = '2.6.5' revNats = '2.16.14' revStan = '2.2.3' - revFlyway = '9.0.4' + revFlyway = '10.15.2' } diff --git a/mysql-persistence/build.gradle b/mysql-persistence/build.gradle index 267dbee4f..9c1515bf4 100644 --- a/mysql-persistence/build.gradle +++ b/mysql-persistence/build.gradle @@ -16,7 +16,7 @@ dependencies { implementation "mysql:mysql-connector-java:8.0.33" implementation "org.springframework.boot:spring-boot-starter-jdbc" - implementation "org.flywaydb:flyway-mysql" + implementation "org.flywaydb:flyway-mysql:${revFlyway}" testImplementation "org.apache.groovy:groovy-all:${revGroovy}" diff --git a/postgres-persistence/build.gradle b/postgres-persistence/build.gradle index 125d603d9..57c875ad9 100644 --- a/postgres-persistence/build.gradle +++ b/postgres-persistence/build.gradle @@ -16,6 +16,7 @@ dependencies { implementation "org.postgresql:postgresql:${revPostgres}" implementation "org.springframework.boot:spring-boot-starter-jdbc" implementation "org.flywaydb:flyway-core:${revFlyway}" + implementation "org.flywaydb:flyway-database-postgresql:${revFlyway}" testImplementation "org.apache.groovy:groovy-all:${revGroovy}" testImplementation project(':conductor-server') diff --git a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/config/PostgresConfiguration.java b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/config/PostgresConfiguration.java index 8debc582c..1e00cb067 100644 --- a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/config/PostgresConfiguration.java +++ b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/config/PostgresConfiguration.java @@ -65,7 +65,7 @@ public Flyway flywayForPrimaryDb() { config.locations("classpath:db/migration_postgres"); } - return config.configuration(Map.of()) + return config.configuration(Map.of("flyway.postgresql.transactional.lock", "false")) .schemas(properties.getSchema()) .dataSource(dataSource) .outOfOrder(true) From 29ec813dacc53108f536cf6c42402f864a04cdca Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Sat, 13 Jul 2024 07:57:59 -0700 Subject: [PATCH 3/4] add flyway to external storage module --- postgres-external-storage/build.gradle | 2 ++ 1 file changed, 2 insertions(+) diff --git a/postgres-external-storage/build.gradle b/postgres-external-storage/build.gradle index 5322185bc..780fe5c6b 100644 --- a/postgres-external-storage/build.gradle +++ b/postgres-external-storage/build.gradle @@ -8,6 +8,8 @@ dependencies { implementation "org.postgresql:postgresql:${revPostgres}" implementation 'org.springframework.boot:spring-boot-starter-jdbc' implementation "org.flywaydb:flyway-core:${revFlyway}" + implementation "org.flywaydb:flyway-database-postgresql:${revFlyway}" + implementation "org.springdoc:springdoc-openapi-starter-webmvc-ui:${revSpringDoc}" implementation "commons-codec:commons-codec:${revCodec}" From dc3a2e10c23710bcaf90a331facfaca63ac5ea91 Mon Sep 17 00:00:00 2001 From: DanMiller192 <55767381+danmiller192@users.noreply.github.com> Date: Sun, 14 Jul 2024 21:50:47 +0100 Subject: [PATCH 4/4] Updated Postgres indexer to only index if the update is newer than the current index --- .../postgres/dao/PostgresIndexDAO.java | 18 +++-- .../V13__workflow_index_columns.sql | 8 ++ .../PostgresIndexDAOStatusChangeOnlyTest.java | 6 +- .../postgres/dao/PostgresIndexDAOTest.java | 79 ++++++++++++++----- 4 files changed, 86 insertions(+), 25 deletions(-) create mode 100644 postgres-persistence/src/main/resources/db/migration_postgres/V13__workflow_index_columns.sql diff --git a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresIndexDAO.java b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresIndexDAO.java index 6d80818d5..cbe6e3821 100644 --- a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresIndexDAO.java +++ b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresIndexDAO.java @@ -82,15 +82,19 @@ public PostgresIndexDAO( @Override public void indexWorkflow(WorkflowSummary workflow) { String INSERT_WORKFLOW_INDEX_SQL = - "INSERT INTO workflow_index (workflow_id, correlation_id, workflow_type, start_time, status, json_data)" - + "VALUES (?, ?, ?, ?, ?, ?::JSONB) ON CONFLICT (workflow_id) \n" + "INSERT INTO workflow_index (workflow_id, correlation_id, workflow_type, start_time, update_time, status, json_data)" + + "VALUES (?, ?, ?, ?, ?, ?, ?::JSONB) ON CONFLICT (workflow_id) \n" + "DO UPDATE SET correlation_id = EXCLUDED.correlation_id, workflow_type = EXCLUDED.workflow_type, " - + "start_time = EXCLUDED.start_time, status = EXCLUDED.status, json_data = EXCLUDED.json_data"; + + "start_time = EXCLUDED.start_time, status = EXCLUDED.status, json_data = EXCLUDED.json_data " + + "WHERE EXCLUDED.update_time > workflow_index.update_time"; if (onlyIndexOnStatusChange) { - INSERT_WORKFLOW_INDEX_SQL += " WHERE workflow_index.status != EXCLUDED.status"; + INSERT_WORKFLOW_INDEX_SQL += " AND workflow_index.status != EXCLUDED.status"; } + TemporalAccessor updateTa = DateTimeFormatter.ISO_INSTANT.parse(workflow.getUpdateTime()); + Timestamp updateTime = Timestamp.from(Instant.from(updateTa)); + TemporalAccessor ta = DateTimeFormatter.ISO_INSTANT.parse(workflow.getStartTime()); Timestamp startTime = Timestamp.from(Instant.from(ta)); @@ -102,6 +106,7 @@ public void indexWorkflow(WorkflowSummary workflow) { .addParameter(workflow.getCorrelationId()) .addParameter(workflow.getWorkflowType()) .addParameter(startTime) + .addParameter(updateTime) .addParameter(workflow.getStatus().toString()) .addJsonParameter(workflow) .executeUpdate()); @@ -135,10 +140,11 @@ public void indexTask(TaskSummary task) { "INSERT INTO task_index (task_id, task_type, task_def_name, status, start_time, update_time, workflow_type, json_data)" + "VALUES (?, ?, ?, ?, ?, ?, ?, ?::JSONB) ON CONFLICT (task_id) " + "DO UPDATE SET task_type = EXCLUDED.task_type, task_def_name = EXCLUDED.task_def_name, " - + "status = EXCLUDED.status, update_time = EXCLUDED.update_time, json_data = EXCLUDED.json_data"; + + "status = EXCLUDED.status, update_time = EXCLUDED.update_time, json_data = EXCLUDED.json_data " + + "WHERE EXCLUDED.update_time > task_index.update_time"; if (onlyIndexOnStatusChange) { - INSERT_TASK_INDEX_SQL += " WHERE task_index.status != EXCLUDED.status"; + INSERT_TASK_INDEX_SQL += " AND task_index.status != EXCLUDED.status"; } TemporalAccessor updateTa = DateTimeFormatter.ISO_INSTANT.parse(task.getUpdateTime()); diff --git a/postgres-persistence/src/main/resources/db/migration_postgres/V13__workflow_index_columns.sql b/postgres-persistence/src/main/resources/db/migration_postgres/V13__workflow_index_columns.sql new file mode 100644 index 000000000..c57c03f4e --- /dev/null +++ b/postgres-persistence/src/main/resources/db/migration_postgres/V13__workflow_index_columns.sql @@ -0,0 +1,8 @@ +ALTER TABLE workflow_index +ADD update_time TIMESTAMP WITH TIME ZONE NULL; + +UPDATE workflow_index +SET update_time = to_timestamp(json_data->>'updateTime', 'YYYY-MM-DDTHH24:MI:SSZ')::timestamp WITH time zone; + +ALTER TABLE workflow_index +ALTER COLUMN update_time SET NOT NULL; \ No newline at end of file diff --git a/postgres-persistence/src/test/java/com/netflix/conductor/postgres/dao/PostgresIndexDAOStatusChangeOnlyTest.java b/postgres-persistence/src/test/java/com/netflix/conductor/postgres/dao/PostgresIndexDAOStatusChangeOnlyTest.java index 80811e80e..e3c819362 100644 --- a/postgres-persistence/src/test/java/com/netflix/conductor/postgres/dao/PostgresIndexDAOStatusChangeOnlyTest.java +++ b/postgres-persistence/src/test/java/com/netflix/conductor/postgres/dao/PostgresIndexDAOStatusChangeOnlyTest.java @@ -82,6 +82,7 @@ private WorkflowSummary getMockWorkflowSummary(String id) { wfs.setCorrelationId("correlation-id"); wfs.setWorkflowType("workflow-type"); wfs.setStartTime("2023-02-07T08:42:45Z"); + wfs.setUpdateTime("2023-02-07T08:43:45Z"); wfs.setStatus(Workflow.WorkflowStatus.RUNNING); return wfs; } @@ -142,6 +143,7 @@ public void testIndexWorkflowOnlyStatusChange() throws SQLException { // Change the record, but not the status, and re-index wfs.setCorrelationId("new-correlation-id"); + wfs.setUpdateTime("2023-02-07T08:44:45Z"); indexDAO.indexWorkflow(wfs); // retrieve the record, make sure it hasn't changed @@ -149,6 +151,7 @@ public void testIndexWorkflowOnlyStatusChange() throws SQLException { // Change the status and re-index wfs.setStatus(Workflow.WorkflowStatus.FAILED); + wfs.setUpdateTime("2023-02-07T08:45:45Z"); indexDAO.indexWorkflow(wfs); // retrieve the record, make sure it has changed @@ -172,9 +175,10 @@ public void testIndexTaskOnlyStatusChange() throws SQLException { // Change the status and re-index ts.setStatus(Task.Status.FAILED); + ts.setUpdateTime("2023-02-07T10:43:45Z"); indexDAO.indexTask(ts); // retrieve the record, make sure it has changed - checkTask("task-id", "FAILED", "2023-02-07 10:42:45.0"); + checkTask("task-id", "FAILED", "2023-02-07 10:43:45.0"); } } diff --git a/postgres-persistence/src/test/java/com/netflix/conductor/postgres/dao/PostgresIndexDAOTest.java b/postgres-persistence/src/test/java/com/netflix/conductor/postgres/dao/PostgresIndexDAOTest.java index 3d7c80d99..5db5ba046 100644 --- a/postgres-persistence/src/test/java/com/netflix/conductor/postgres/dao/PostgresIndexDAOTest.java +++ b/postgres-persistence/src/test/java/com/netflix/conductor/postgres/dao/PostgresIndexDAOTest.java @@ -87,6 +87,7 @@ private WorkflowSummary getMockWorkflowSummary(String id) { wfs.setCorrelationId("correlation-id"); wfs.setWorkflowType("workflow-type"); wfs.setStartTime("2023-02-07T08:42:45Z"); + wfs.setUpdateTime("2023-02-07T08:43:45Z"); wfs.setStatus(Workflow.WorkflowStatus.COMPLETED); return wfs; } @@ -173,7 +174,7 @@ private void compareTaskSummary(TaskSummary ts) throws SQLException { @Test public void testIndexNewWorkflow() throws SQLException { - WorkflowSummary wfs = getMockWorkflowSummary("workflow-id"); + WorkflowSummary wfs = getMockWorkflowSummary("workflow-id-new"); indexDAO.indexWorkflow(wfs); @@ -182,22 +183,44 @@ public void testIndexNewWorkflow() throws SQLException { @Test public void testIndexExistingWorkflow() throws SQLException { - WorkflowSummary wfs = getMockWorkflowSummary("workflow-id"); + WorkflowSummary wfs = getMockWorkflowSummary("workflow-id-existing"); + + indexDAO.indexWorkflow(wfs); + + compareWorkflowSummary(wfs); + + wfs.setStatus(Workflow.WorkflowStatus.FAILED); + wfs.setUpdateTime("2023-02-07T08:44:45Z"); + + indexDAO.indexWorkflow(wfs); + + compareWorkflowSummary(wfs); + } + + @Test + public void testIndexExistingWorkflowWithOlderUpdateToEnsureItsNotIndexed() + throws SQLException { + + WorkflowSummary wfs = getMockWorkflowSummary("workflow-id-existing-no-index"); indexDAO.indexWorkflow(wfs); compareWorkflowSummary(wfs); + // Set the update time to the past + wfs.setUpdateTime("2023-02-07T08:42:45Z"); wfs.setStatus(Workflow.WorkflowStatus.FAILED); indexDAO.indexWorkflow(wfs); + // Reset the workflow to check it's not been updated + wfs = getMockWorkflowSummary("workflow-id-existing-no-index"); compareWorkflowSummary(wfs); } @Test public void testIndexNewTask() throws SQLException { - TaskSummary ts = getMockTaskSummary("task-id"); + TaskSummary ts = getMockTaskSummary("task-id-new"); indexDAO.indexTask(ts); @@ -206,16 +229,36 @@ public void testIndexNewTask() throws SQLException { @Test public void testIndexExistingTask() throws SQLException { - TaskSummary ts = getMockTaskSummary("task-id"); + TaskSummary ts = getMockTaskSummary("task-id-existing"); + + indexDAO.indexTask(ts); + + compareTaskSummary(ts); + + ts.setUpdateTime("2023-02-07T09:43:45Z"); + ts.setStatus(Task.Status.FAILED); + + indexDAO.indexTask(ts); + + compareTaskSummary(ts); + } + + @Test + public void testIndexExistingTaskWithOlderUpdateToEnsureItsNotIndexed() throws SQLException { + TaskSummary ts = getMockTaskSummary("task-id-exiting-no-update"); indexDAO.indexTask(ts); compareTaskSummary(ts); + // Set the update time to the past + ts.setUpdateTime("2023-02-07T09:41:45Z"); ts.setStatus(Task.Status.FAILED); indexDAO.indexTask(ts); + // Reset the task to check it's not been updated + ts = getMockTaskSummary("task-id-exiting-no-update"); compareTaskSummary(ts); } @@ -275,7 +318,7 @@ public void testFullTextSearchWorkflowSummary() { @Test public void testJsonSearchWorkflowSummary() { - WorkflowSummary wfs = getMockWorkflowSummary("workflow-id"); + WorkflowSummary wfs = getMockWorkflowSummary("workflow-id-summary"); wfs.setVersion(3); indexDAO.indexWorkflow(wfs); @@ -297,40 +340,40 @@ public void testJsonSearchWorkflowSummary() { @Test public void testSearchWorkflowSummaryPagination() { for (int i = 0; i < 5; i++) { - WorkflowSummary wfs = getMockWorkflowSummary("workflow-id-" + i); + WorkflowSummary wfs = getMockWorkflowSummary("workflow-id-pagination-" + i); indexDAO.indexWorkflow(wfs); } List orderBy = Arrays.asList(new String[] {"workflowId:DESC"}); SearchResult results = - indexDAO.searchWorkflowSummary("", "*", 0, 2, orderBy); + indexDAO.searchWorkflowSummary("", "workflow-id-pagination*", 0, 2, orderBy); assertEquals("Wrong totalHits returned", 3, results.getTotalHits()); assertEquals("Wrong number of results returned", 2, results.getResults().size()); assertEquals( "Results returned in wrong order", - "workflow-id-4", + "workflow-id-pagination-4", results.getResults().get(0).getWorkflowId()); assertEquals( "Results returned in wrong order", - "workflow-id-3", + "workflow-id-pagination-3", results.getResults().get(1).getWorkflowId()); results = indexDAO.searchWorkflowSummary("", "*", 2, 2, orderBy); assertEquals("Wrong totalHits returned", 5, results.getTotalHits()); assertEquals("Wrong number of results returned", 2, results.getResults().size()); assertEquals( "Results returned in wrong order", - "workflow-id-2", + "workflow-id-pagination-2", results.getResults().get(0).getWorkflowId()); assertEquals( "Results returned in wrong order", - "workflow-id-1", + "workflow-id-pagination-1", results.getResults().get(1).getWorkflowId()); results = indexDAO.searchWorkflowSummary("", "*", 4, 2, orderBy); assertEquals("Wrong totalHits returned", 7, results.getTotalHits()); assertEquals("Wrong number of results returned", 2, results.getResults().size()); assertEquals( "Results returned in wrong order", - "workflow-id-0", + "workflow-id-pagination-0", results.getResults().get(0).getWorkflowId()); } @@ -351,7 +394,7 @@ public void testSearchTaskSummary() { @Test public void testSearchTaskSummaryPagination() { for (int i = 0; i < 5; i++) { - TaskSummary ts = getMockTaskSummary("task-id-" + i); + TaskSummary ts = getMockTaskSummary("task-id-pagination-" + i); indexDAO.indexTask(ts); } @@ -361,29 +404,29 @@ public void testSearchTaskSummaryPagination() { assertEquals("Wrong number of results returned", 2, results.getResults().size()); assertEquals( "Results returned in wrong order", - "task-id-4", + "task-id-pagination-4", results.getResults().get(0).getTaskId()); assertEquals( "Results returned in wrong order", - "task-id-3", + "task-id-pagination-3", results.getResults().get(1).getTaskId()); results = indexDAO.searchTaskSummary("", "*", 2, 2, orderBy); assertEquals("Wrong totalHits returned", 5, results.getTotalHits()); assertEquals("Wrong number of results returned", 2, results.getResults().size()); assertEquals( "Results returned in wrong order", - "task-id-2", + "task-id-pagination-2", results.getResults().get(0).getTaskId()); assertEquals( "Results returned in wrong order", - "task-id-1", + "task-id-pagination-1", results.getResults().get(1).getTaskId()); results = indexDAO.searchTaskSummary("", "*", 4, 2, orderBy); assertEquals("Wrong totalHits returned", 7, results.getTotalHits()); assertEquals("Wrong number of results returned", 2, results.getResults().size()); assertEquals( "Results returned in wrong order", - "task-id-0", + "task-id-pagination-0", results.getResults().get(0).getTaskId()); }