Skip to content

Commit

Permalink
Merge pull request conductor-oss#288 from lbestatlas/issue/237-workfl…
Browse files Browse the repository at this point in the history
…ow-index-update-time

workflow_index.update_time bugfix
  • Loading branch information
v1r3n authored Oct 24, 2024
2 parents 984806a + ca60fca commit 6e77cc5
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package com.netflix.conductor.postgres.config;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Map;
import java.util.Optional;

Expand All @@ -25,7 +26,6 @@
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.*;
import org.springframework.core.env.*;
import org.springframework.retry.RetryContext;
import org.springframework.retry.backoff.NoBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
Expand Down Expand Up @@ -58,13 +58,19 @@ public PostgresConfiguration(DataSource dataSource, PostgresProperties propertie
public Flyway flywayForPrimaryDb() {
FluentConfiguration config = Flyway.configure();

var locations = new ArrayList<String>();
locations.add("classpath:db/migration_postgres");

if (properties.getExperimentalQueueNotify()) {
config.locations(
"classpath:db/migration_postgres", "classpath:db/migration_postgres_notify");
} else {
config.locations("classpath:db/migration_postgres");
locations.add("classpath:db/migration_postgres_notify");
}

if (properties.isApplyDataMigrations()) {
locations.add("classpath:db/migration_postgres_data");
}

config.locations(locations.toArray(new String[0]));

return config.configuration(Map.of("flyway.postgresql.transactional.lock", "false"))
.schemas(properties.getSchema())
.dataSource(dataSource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ public class PostgresProperties {

private boolean onlyIndexOnStatusChange = false;

/** The boolean indicating whether data migrations should be executed */
private boolean applyDataMigrations = true;

public String schema = "public";

public boolean allowFullTextQueries = true;
Expand Down Expand Up @@ -83,6 +86,14 @@ public void setOnlyIndexOnStatusChange(boolean onlyIndexOnStatusChange) {
this.onlyIndexOnStatusChange = onlyIndexOnStatusChange;
}

public boolean isApplyDataMigrations() {
return applyDataMigrations;
}

public void setApplyDataMigrations(boolean applyDataMigrations) {
this.applyDataMigrations = applyDataMigrations;
}

public Integer getDeadlockRetryMax() {
return deadlockRetryMax;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ public void indexWorkflow(WorkflowSummary workflow) {
"INSERT INTO workflow_index (workflow_id, correlation_id, workflow_type, start_time, update_time, status, json_data)"
+ "VALUES (?, ?, ?, ?, ?, ?, ?::JSONB) ON CONFLICT (workflow_id) \n"
+ "DO UPDATE SET correlation_id = EXCLUDED.correlation_id, workflow_type = EXCLUDED.workflow_type, "
+ "start_time = EXCLUDED.start_time, status = EXCLUDED.status, json_data = EXCLUDED.json_data "
+ "start_time = EXCLUDED.start_time, status = EXCLUDED.status, json_data = EXCLUDED.json_data, "
+ "update_time = EXCLUDED.update_time "
+ "WHERE EXCLUDED.update_time >= workflow_index.update_time";

if (onlyIndexOnStatusChange) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
ALTER TABLE workflow_index
ADD COLUMN IF NOT EXISTS update_time TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT TIMESTAMP WITH TIME ZONE 'epoch';

-- SET DEFAULT AGAIN IN CASE COLUMN ALREADY EXISTED from deleted V13 migration
ALTER TABLE workflow_index
ALTER COLUMN update_time SET DEFAULT TIMESTAMP WITH TIME ZONE 'epoch';

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- Optional back-fill script to populate updateTime historically.
UPDATE workflow_index
SET update_time = to_timestamp(json_data->>'updateTime', 'YYYY-MM-DD"T"HH24:MI:SS.MS')::timestamp AT TIME ZONE '00:00'
WHERE json_data->>'updateTime' IS NOT NULL;
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2024 Conductor Authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.postgres.config;

import java.util.Arrays;
import java.util.Objects;

import org.flywaydb.core.Flyway;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.flyway.FlywayAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.ResourcePatternResolver;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;

import com.netflix.conductor.common.config.TestObjectMapperConfiguration;

import static org.junit.Assert.assertTrue;

@ContextConfiguration(
classes = {
TestObjectMapperConfiguration.class,
PostgresConfiguration.class,
FlywayAutoConfiguration.class
})
@RunWith(SpringRunner.class)
@TestPropertySource(
properties = {
"conductor.app.asyncIndexingEnabled=false",
"conductor.elasticsearch.version=0",
"conductor.indexing.type=postgres",
"conductor.postgres.applyDataMigrations=false",
"spring.flyway.clean-disabled=false"
})
@SpringBootTest
public class PostgresConfigurationDataMigrationTest {

@Autowired Flyway flyway;

@Autowired ResourcePatternResolver resourcePatternResolver;

// clean the database between tests.
@Before
public void before() {
flyway.migrate();
}

@Test
public void dataMigrationIsNotAppliedWhenDisabled() throws Exception {
var files = resourcePatternResolver.getResources("classpath:db/migration_postgres_data/*");
Arrays.stream(flyway.info().applied())
.forEach(
migrationInfo ->
assertTrue(
"Data migration wrongly applied: "
+ migrationInfo.getScript(),
Arrays.stream(files)
.map(Resource::getFilename)
.filter(Objects::nonNull)
.noneMatch(
fileName ->
fileName.contains(
migrationInfo
.getScript()))));
}
}

0 comments on commit 6e77cc5

Please sign in to comment.