Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

workflow_index.update_time bugfix #288

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package com.netflix.conductor.postgres.config;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Map;
import java.util.Optional;

Expand All @@ -25,7 +26,6 @@
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.*;
import org.springframework.core.env.*;
import org.springframework.retry.RetryContext;
import org.springframework.retry.backoff.NoBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
Expand Down Expand Up @@ -58,13 +58,19 @@ public PostgresConfiguration(DataSource dataSource, PostgresProperties propertie
public Flyway flywayForPrimaryDb() {
FluentConfiguration config = Flyway.configure();

var locations = new ArrayList<String>();
locations.add("classpath:db/migration_postgres");

if (properties.getExperimentalQueueNotify()) {
config.locations(
"classpath:db/migration_postgres", "classpath:db/migration_postgres_notify");
} else {
config.locations("classpath:db/migration_postgres");
locations.add("classpath:db/migration_postgres_notify");
}

if (properties.isApplyDataMigrations()) {
locations.add("classpath:db/migration_postgres_data");
}

config.locations(locations.toArray(new String[0]));

return config.configuration(Map.of("flyway.postgresql.transactional.lock", "false"))
.schemas(properties.getSchema())
.dataSource(dataSource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ public class PostgresProperties {

private boolean onlyIndexOnStatusChange = false;

/** The boolean indicating whether data migrations should be executed */
private boolean applyDataMigrations = true;

public String schema = "public";

public boolean allowFullTextQueries = true;
Expand Down Expand Up @@ -83,6 +86,14 @@ public void setOnlyIndexOnStatusChange(boolean onlyIndexOnStatusChange) {
this.onlyIndexOnStatusChange = onlyIndexOnStatusChange;
}

public boolean isApplyDataMigrations() {
return applyDataMigrations;
}

public void setApplyDataMigrations(boolean applyDataMigrations) {
this.applyDataMigrations = applyDataMigrations;
}

public Integer getDeadlockRetryMax() {
return deadlockRetryMax;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ public void indexWorkflow(WorkflowSummary workflow) {
"INSERT INTO workflow_index (workflow_id, correlation_id, workflow_type, start_time, update_time, status, json_data)"
+ "VALUES (?, ?, ?, ?, ?, ?, ?::JSONB) ON CONFLICT (workflow_id) \n"
+ "DO UPDATE SET correlation_id = EXCLUDED.correlation_id, workflow_type = EXCLUDED.workflow_type, "
+ "start_time = EXCLUDED.start_time, status = EXCLUDED.status, json_data = EXCLUDED.json_data "
+ "start_time = EXCLUDED.start_time, status = EXCLUDED.status, json_data = EXCLUDED.json_data, "
+ "update_time = EXCLUDED.update_time "
+ "WHERE EXCLUDED.update_time >= workflow_index.update_time";

if (onlyIndexOnStatusChange) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
ALTER TABLE workflow_index
ADD COLUMN IF NOT EXISTS update_time TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT TIMESTAMP WITH TIME ZONE 'epoch';

-- SET DEFAULT AGAIN IN CASE COLUMN ALREADY EXISTED from deleted V13 migration
ALTER TABLE workflow_index
ALTER COLUMN update_time SET DEFAULT TIMESTAMP WITH TIME ZONE 'epoch';

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- Optional back-fill script to populate updateTime historically.
UPDATE workflow_index
SET update_time = to_timestamp(json_data->>'updateTime', 'YYYY-MM-DD"T"HH24:MI:SS.MS')::timestamp AT TIME ZONE '00:00'
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Further testing found that the previous scripts formatting was not interpreting hours and time zones correctly.
Postgres does not interpret the timezone Z from the json->>'updateTime', it has to be set explicitly. Also, the T has to be explicitly skipped by enclosing it in double quotes.

WHERE json_data->>'updateTime' IS NOT NULL;
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2024 Conductor Authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.postgres.config;

import java.util.Arrays;
import java.util.Objects;

import org.flywaydb.core.Flyway;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.flyway.FlywayAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.ResourcePatternResolver;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;

import com.netflix.conductor.common.config.TestObjectMapperConfiguration;

import static org.junit.Assert.assertTrue;

@ContextConfiguration(
classes = {
TestObjectMapperConfiguration.class,
PostgresConfiguration.class,
FlywayAutoConfiguration.class
})
@RunWith(SpringRunner.class)
@TestPropertySource(
properties = {
"conductor.app.asyncIndexingEnabled=false",
"conductor.elasticsearch.version=0",
"conductor.indexing.type=postgres",
"conductor.postgres.applyDataMigrations=false",
"spring.flyway.clean-disabled=false"
})
@SpringBootTest
public class PostgresConfigurationDataMigrationTest {

@Autowired Flyway flyway;

@Autowired ResourcePatternResolver resourcePatternResolver;

// clean the database between tests.
@Before
public void before() {
flyway.migrate();
}

@Test
public void dataMigrationIsNotAppliedWhenDisabled() throws Exception {
var files = resourcePatternResolver.getResources("classpath:db/migration_postgres_data/*");
Arrays.stream(flyway.info().applied())
.forEach(
migrationInfo ->
assertTrue(
"Data migration wrongly applied: "
+ migrationInfo.getScript(),
Arrays.stream(files)
.map(Resource::getFilename)
.filter(Objects::nonNull)
.noneMatch(
fileName ->
fileName.contains(
migrationInfo
.getScript()))));
}
}
Loading