diff --git a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/config/PostgresConfiguration.java b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/config/PostgresConfiguration.java index 1e00cb067..e1165ca55 100644 --- a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/config/PostgresConfiguration.java +++ b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/config/PostgresConfiguration.java @@ -13,6 +13,7 @@ package com.netflix.conductor.postgres.config; import java.sql.SQLException; +import java.util.ArrayList; import java.util.Map; import java.util.Optional; @@ -25,7 +26,6 @@ import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.*; -import org.springframework.core.env.*; import org.springframework.retry.RetryContext; import org.springframework.retry.backoff.NoBackOffPolicy; import org.springframework.retry.policy.SimpleRetryPolicy; @@ -58,13 +58,19 @@ public PostgresConfiguration(DataSource dataSource, PostgresProperties propertie public Flyway flywayForPrimaryDb() { FluentConfiguration config = Flyway.configure(); + var locations = new ArrayList(); + locations.add("classpath:db/migration_postgres"); + if (properties.getExperimentalQueueNotify()) { - config.locations( - "classpath:db/migration_postgres", "classpath:db/migration_postgres_notify"); - } else { - config.locations("classpath:db/migration_postgres"); + locations.add("classpath:db/migration_postgres_notify"); + } + + if (properties.isApplyDataMigrations()) { + locations.add("classpath:db/migration_postgres_data"); } + config.locations(locations.toArray(new String[0])); + return config.configuration(Map.of("flyway.postgresql.transactional.lock", "false")) .schemas(properties.getSchema()) .dataSource(dataSource) diff --git a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/config/PostgresProperties.java b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/config/PostgresProperties.java index 0ddf80098..9c650e694 100644 --- a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/config/PostgresProperties.java +++ b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/config/PostgresProperties.java @@ -39,6 +39,9 @@ public class PostgresProperties { private boolean onlyIndexOnStatusChange = false; + /** The boolean indicating whether data migrations should be executed */ + private boolean applyDataMigrations = true; + public String schema = "public"; public boolean allowFullTextQueries = true; @@ -83,6 +86,14 @@ public void setOnlyIndexOnStatusChange(boolean onlyIndexOnStatusChange) { this.onlyIndexOnStatusChange = onlyIndexOnStatusChange; } + public boolean isApplyDataMigrations() { + return applyDataMigrations; + } + + public void setApplyDataMigrations(boolean applyDataMigrations) { + this.applyDataMigrations = applyDataMigrations; + } + public Integer getDeadlockRetryMax() { return deadlockRetryMax; } diff --git a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresIndexDAO.java b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresIndexDAO.java index b0481a293..b50756cca 100644 --- a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresIndexDAO.java +++ b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresIndexDAO.java @@ -85,7 +85,8 @@ public void indexWorkflow(WorkflowSummary workflow) { "INSERT INTO workflow_index (workflow_id, correlation_id, workflow_type, start_time, update_time, status, json_data)" + "VALUES (?, ?, ?, ?, ?, ?, ?::JSONB) ON CONFLICT (workflow_id) \n" + "DO UPDATE SET correlation_id = EXCLUDED.correlation_id, workflow_type = EXCLUDED.workflow_type, " - + "start_time = EXCLUDED.start_time, status = EXCLUDED.status, json_data = EXCLUDED.json_data " + + "start_time = EXCLUDED.start_time, status = EXCLUDED.status, json_data = EXCLUDED.json_data, " + + "update_time = EXCLUDED.update_time " + "WHERE EXCLUDED.update_time >= workflow_index.update_time"; if (onlyIndexOnStatusChange) { diff --git a/postgres-persistence/src/main/resources/db/migration_postgres/V13.1__workflow_index_columns.sql b/postgres-persistence/src/main/resources/db/migration_postgres/V13.1__workflow_index_columns.sql new file mode 100644 index 000000000..f36334d37 --- /dev/null +++ b/postgres-persistence/src/main/resources/db/migration_postgres/V13.1__workflow_index_columns.sql @@ -0,0 +1,6 @@ +ALTER TABLE workflow_index + ADD COLUMN IF NOT EXISTS update_time TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT TIMESTAMP WITH TIME ZONE 'epoch'; + +-- SET DEFAULT AGAIN IN CASE COLUMN ALREADY EXISTED from deleted V13 migration +ALTER TABLE workflow_index + ALTER COLUMN update_time SET DEFAULT TIMESTAMP WITH TIME ZONE 'epoch'; diff --git a/postgres-persistence/src/main/resources/db/migration_postgres/V13__workflow_index_columns.sql b/postgres-persistence/src/main/resources/db/migration_postgres/V13__workflow_index_columns.sql deleted file mode 100644 index c57c03f4e..000000000 --- a/postgres-persistence/src/main/resources/db/migration_postgres/V13__workflow_index_columns.sql +++ /dev/null @@ -1,8 +0,0 @@ -ALTER TABLE workflow_index -ADD update_time TIMESTAMP WITH TIME ZONE NULL; - -UPDATE workflow_index -SET update_time = to_timestamp(json_data->>'updateTime', 'YYYY-MM-DDTHH24:MI:SSZ')::timestamp WITH time zone; - -ALTER TABLE workflow_index -ALTER COLUMN update_time SET NOT NULL; \ No newline at end of file diff --git a/postgres-persistence/src/main/resources/db/migration_postgres_data/V13.2__workflow_index_backfill_update_time.sql b/postgres-persistence/src/main/resources/db/migration_postgres_data/V13.2__workflow_index_backfill_update_time.sql new file mode 100644 index 000000000..2ffbec396 --- /dev/null +++ b/postgres-persistence/src/main/resources/db/migration_postgres_data/V13.2__workflow_index_backfill_update_time.sql @@ -0,0 +1,4 @@ +-- Optional back-fill script to populate updateTime historically. +UPDATE workflow_index +SET update_time = to_timestamp(json_data->>'updateTime', 'YYYY-MM-DD"T"HH24:MI:SS.MS')::timestamp AT TIME ZONE '00:00' +WHERE json_data->>'updateTime' IS NOT NULL; \ No newline at end of file diff --git a/postgres-persistence/src/test/java/com/netflix/conductor/postgres/config/PostgresConfigurationDataMigrationTest.java b/postgres-persistence/src/test/java/com/netflix/conductor/postgres/config/PostgresConfigurationDataMigrationTest.java new file mode 100644 index 000000000..0f5167fd6 --- /dev/null +++ b/postgres-persistence/src/test/java/com/netflix/conductor/postgres/config/PostgresConfigurationDataMigrationTest.java @@ -0,0 +1,81 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.postgres.config; + +import java.util.Arrays; +import java.util.Objects; + +import org.flywaydb.core.Flyway; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.flyway.FlywayAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.core.io.Resource; +import org.springframework.core.io.support.ResourcePatternResolver; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit4.SpringRunner; + +import com.netflix.conductor.common.config.TestObjectMapperConfiguration; + +import static org.junit.Assert.assertTrue; + +@ContextConfiguration( + classes = { + TestObjectMapperConfiguration.class, + PostgresConfiguration.class, + FlywayAutoConfiguration.class + }) +@RunWith(SpringRunner.class) +@TestPropertySource( + properties = { + "conductor.app.asyncIndexingEnabled=false", + "conductor.elasticsearch.version=0", + "conductor.indexing.type=postgres", + "conductor.postgres.applyDataMigrations=false", + "spring.flyway.clean-disabled=false" + }) +@SpringBootTest +public class PostgresConfigurationDataMigrationTest { + + @Autowired Flyway flyway; + + @Autowired ResourcePatternResolver resourcePatternResolver; + + // clean the database between tests. + @Before + public void before() { + flyway.migrate(); + } + + @Test + public void dataMigrationIsNotAppliedWhenDisabled() throws Exception { + var files = resourcePatternResolver.getResources("classpath:db/migration_postgres_data/*"); + Arrays.stream(flyway.info().applied()) + .forEach( + migrationInfo -> + assertTrue( + "Data migration wrongly applied: " + + migrationInfo.getScript(), + Arrays.stream(files) + .map(Resource::getFilename) + .filter(Objects::nonNull) + .noneMatch( + fileName -> + fileName.contains( + migrationInfo + .getScript())))); + } +}