Skip to content

Commit

Permalink
Address items raised in issue 237
Browse files Browse the repository at this point in the history
- Remove the v13 migration
- Replace with migration script with one that adds update_time with a default of 'epoch' if the column doesn't exist, and sets the default of 'epoch' if the column does exist
- Fix the PostgresIndexDao.indexWorkflow to also set the update_time during an UPDATE
- Make the backfill script for update_time separate an optional and fix the timestamp to include milliseconds
  • Loading branch information
lbestatlas committed Oct 15, 2024
1 parent a28e0e9 commit 8eec176
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package com.netflix.conductor.postgres.config;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Map;
import java.util.Optional;

Expand All @@ -25,7 +26,6 @@
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.*;
import org.springframework.core.env.*;
import org.springframework.retry.RetryContext;
import org.springframework.retry.backoff.NoBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
Expand Down Expand Up @@ -58,13 +58,19 @@ public PostgresConfiguration(DataSource dataSource, PostgresProperties propertie
public Flyway flywayForPrimaryDb() {
FluentConfiguration config = Flyway.configure();

var locations = new ArrayList<String>();
locations.add("classpath:db/migration_postgres");

if (properties.getExperimentalQueueNotify()) {
config.locations(
"classpath:db/migration_postgres", "classpath:db/migration_postgres_notify");
} else {
config.locations("classpath:db/migration_postgres");
locations.add("classpath:db/migration_postgres_notify");
}

if (properties.isApplyDataMigrations()) {
locations.add("classpath:db/migration_postgres_data");
}

config.locations(locations.toArray(new String[0]));

return config.configuration(Map.of("flyway.postgresql.transactional.lock", "false"))
.schemas(properties.getSchema())
.dataSource(dataSource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ public class PostgresProperties {

private boolean onlyIndexOnStatusChange = false;

/** The boolean indicating whether data migrations should be executed */
private boolean applyDataMigrations = true;

public String schema = "public";

public boolean allowFullTextQueries = true;
Expand Down Expand Up @@ -83,6 +86,14 @@ public void setOnlyIndexOnStatusChange(boolean onlyIndexOnStatusChange) {
this.onlyIndexOnStatusChange = onlyIndexOnStatusChange;
}

public boolean isApplyDataMigrations() {
return applyDataMigrations;
}

public void setApplyDataMigrations(boolean applyDataMigrations) {
this.applyDataMigrations = applyDataMigrations;
}

public Integer getDeadlockRetryMax() {
return deadlockRetryMax;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ public void indexWorkflow(WorkflowSummary workflow) {
"INSERT INTO workflow_index (workflow_id, correlation_id, workflow_type, start_time, update_time, status, json_data)"
+ "VALUES (?, ?, ?, ?, ?, ?, ?::JSONB) ON CONFLICT (workflow_id) \n"
+ "DO UPDATE SET correlation_id = EXCLUDED.correlation_id, workflow_type = EXCLUDED.workflow_type, "
+ "start_time = EXCLUDED.start_time, status = EXCLUDED.status, json_data = EXCLUDED.json_data "
+ "start_time = EXCLUDED.start_time, status = EXCLUDED.status, json_data = EXCLUDED.json_data, "
+ "update_time = EXCLUDED.update_time "
+ "WHERE EXCLUDED.update_time >= workflow_index.update_time";

if (onlyIndexOnStatusChange) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
ALTER TABLE workflow_index
ADD COLUMN IF NOT EXISTS update_time TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT TIMESTAMP WITH TIME ZONE 'epoch';

-- SET DEFAULT AGAIN IN CASE COLUMN ALREADY EXISTED from deleted V13 migration
ALTER TABLE workflow_index
ALTER COLUMN update_time SET DEFAULT TIMESTAMP WITH TIME ZONE 'epoch';

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- Optional back-fill script to populate updateTime historically.
UPDATE workflow_index
SET update_time = to_timestamp(json_data->>'updateTime', 'YYYY-MM-DDTHH24:MI:SS.MSZ')::timestamp WITH time zone
WHERE json_data->>'updateTime' IS NOT NULL;
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.netflix.conductor.postgres.config;

import com.netflix.conductor.common.config.TestObjectMapperConfiguration;
import org.flywaydb.core.Flyway;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.flyway.FlywayAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.ResourcePatternResolver;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Arrays;
import java.util.Objects;

import static org.junit.Assert.assertTrue;

@ContextConfiguration(
classes = {
TestObjectMapperConfiguration.class,
PostgresConfiguration.class,
FlywayAutoConfiguration.class
})
@RunWith(SpringRunner.class)
@TestPropertySource(
properties = {
"conductor.app.asyncIndexingEnabled=false",
"conductor.elasticsearch.version=0",
"conductor.indexing.type=postgres",
"conductor.postgres.applyDataMigrations=false",
"spring.flyway.clean-disabled=false"
})
@SpringBootTest
public class PostgresConfigurationDataMigrationTest {

@Autowired
Flyway flyway;

@Autowired
ResourcePatternResolver resourcePatternResolver;

// clean the database between tests.
@Before
public void before() {
flyway.migrate();
}

@Test
public void dataMigrationIsNotAppliedWhenDisabled() throws Exception {
var files = resourcePatternResolver.getResources("classpath:db/migration_postgres_data/*");
Arrays.stream(flyway.info().applied()).forEach(migrationInfo ->
assertTrue("Data migration wrongly applied: " + migrationInfo.getScript(),
Arrays.stream(files)
.map(Resource::getFilename)
.filter(Objects::nonNull)
.noneMatch(fileName -> fileName.contains(migrationInfo.getScript()))));
}
}

0 comments on commit 8eec176

Please sign in to comment.