Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

workflow_index.update_time bugfix #288

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package com.netflix.conductor.postgres.config;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Map;
import java.util.Optional;

Expand All @@ -25,7 +26,6 @@
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.*;
import org.springframework.core.env.*;
import org.springframework.retry.RetryContext;
import org.springframework.retry.backoff.NoBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
Expand Down Expand Up @@ -58,13 +58,19 @@ public PostgresConfiguration(DataSource dataSource, PostgresProperties propertie
public Flyway flywayForPrimaryDb() {
FluentConfiguration config = Flyway.configure();

var locations = new ArrayList<String>();
locations.add("classpath:db/migration_postgres");

if (properties.getExperimentalQueueNotify()) {
config.locations(
"classpath:db/migration_postgres", "classpath:db/migration_postgres_notify");
} else {
config.locations("classpath:db/migration_postgres");
locations.add("classpath:db/migration_postgres_notify");
}

if (properties.isApplyDataMigrations()) {
locations.add("classpath:db/migration_postgres_data");
}

config.locations(locations.toArray(new String[0]));

return config.configuration(Map.of("flyway.postgresql.transactional.lock", "false"))
.schemas(properties.getSchema())
.dataSource(dataSource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ public class PostgresProperties {

private boolean onlyIndexOnStatusChange = false;

/** The boolean indicating whether data migrations should be executed */
private boolean applyDataMigrations = true;

public String schema = "public";

public boolean allowFullTextQueries = true;
Expand Down Expand Up @@ -83,6 +86,14 @@ public void setOnlyIndexOnStatusChange(boolean onlyIndexOnStatusChange) {
this.onlyIndexOnStatusChange = onlyIndexOnStatusChange;
}

public boolean isApplyDataMigrations() {
return applyDataMigrations;
}

public void setApplyDataMigrations(boolean applyDataMigrations) {
this.applyDataMigrations = applyDataMigrations;
}

public Integer getDeadlockRetryMax() {
return deadlockRetryMax;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ public void indexWorkflow(WorkflowSummary workflow) {
"INSERT INTO workflow_index (workflow_id, correlation_id, workflow_type, start_time, update_time, status, json_data)"
+ "VALUES (?, ?, ?, ?, ?, ?, ?::JSONB) ON CONFLICT (workflow_id) \n"
+ "DO UPDATE SET correlation_id = EXCLUDED.correlation_id, workflow_type = EXCLUDED.workflow_type, "
+ "start_time = EXCLUDED.start_time, status = EXCLUDED.status, json_data = EXCLUDED.json_data "
+ "start_time = EXCLUDED.start_time, status = EXCLUDED.status, json_data = EXCLUDED.json_data, "
+ "update_time = EXCLUDED.update_time "
+ "WHERE EXCLUDED.update_time >= workflow_index.update_time";

if (onlyIndexOnStatusChange) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
ALTER TABLE workflow_index
ADD COLUMN IF NOT EXISTS update_time TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT TIMESTAMP WITH TIME ZONE 'epoch';

-- SET DEFAULT AGAIN IN CASE COLUMN ALREADY EXISTED from deleted V13 migration
ALTER TABLE workflow_index
ALTER COLUMN update_time SET DEFAULT TIMESTAMP WITH TIME ZONE 'epoch';

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- Optional back-fill script to populate updateTime historically.
UPDATE workflow_index
SET update_time = to_timestamp(json_data->>'updateTime', 'YYYY-MM-DDTHH24:MI:SS.MSZ')::timestamp WITH time zone
WHERE json_data->>'updateTime' IS NOT NULL;
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.netflix.conductor.postgres.config;

import com.netflix.conductor.common.config.TestObjectMapperConfiguration;
import org.flywaydb.core.Flyway;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.flyway.FlywayAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.ResourcePatternResolver;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Arrays;
import java.util.Objects;

import static org.junit.Assert.assertTrue;

@ContextConfiguration(
classes = {
TestObjectMapperConfiguration.class,
PostgresConfiguration.class,
FlywayAutoConfiguration.class
})
@RunWith(SpringRunner.class)
@TestPropertySource(
properties = {
"conductor.app.asyncIndexingEnabled=false",
"conductor.elasticsearch.version=0",
"conductor.indexing.type=postgres",
"conductor.postgres.applyDataMigrations=false",
"spring.flyway.clean-disabled=false"
})
@SpringBootTest
public class PostgresConfigurationDataMigrationTest {

@Autowired
Flyway flyway;

@Autowired
ResourcePatternResolver resourcePatternResolver;

// clean the database between tests.
@Before
public void before() {
flyway.migrate();
}

@Test
public void dataMigrationIsNotAppliedWhenDisabled() throws Exception {
var files = resourcePatternResolver.getResources("classpath:db/migration_postgres_data/*");
Arrays.stream(flyway.info().applied()).forEach(migrationInfo ->
assertTrue("Data migration wrongly applied: " + migrationInfo.getScript(),
Arrays.stream(files)
.map(Resource::getFilename)
.filter(Objects::nonNull)
.noneMatch(fileName -> fileName.contains(migrationInfo.getScript()))));
}
}
Loading