Skip to content

Commit

Permalink
Merge pull request #1 from conductor-oss/main
Browse files Browse the repository at this point in the history
sync upstream to v3.21.2 t
  • Loading branch information
bstout authored Jul 18, 2024
2 parents bf0ff7e + 55a31af commit df8ac66
Show file tree
Hide file tree
Showing 12 changed files with 96 additions and 31 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ allprojects {
implementation('org.apache.logging.log4j:log4j-slf4j-impl')
implementation('org.apache.logging.log4j:log4j-jul')
implementation('org.apache.logging.log4j:log4j-web')

implementation "com.fasterxml.jackson.datatype:jackson-datatype-jsr310"
compileOnly 'org.projectlombok:lombok:1.18.34'

annotationProcessor 'org.projectlombok:lombok:1.18.34'
Expand Down
3 changes: 2 additions & 1 deletion common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ dependencies {
compileOnly 'org.springframework.boot:spring-boot-starter'
compileOnly 'org.springframework.boot:spring-boot-starter-validation'

implementation "org.springdoc:springdoc-openapi-starter-webmvc-ui:${revSpringDoc}"
compileOnly "org.springdoc:springdoc-openapi-starter-webmvc-ui:${revSpringDoc}"

implementation "org.apache.commons:commons-lang3"

Expand All @@ -23,6 +23,7 @@ dependencies {
implementation "com.fasterxml.jackson.module:jackson-module-afterburner:${revFasterXml}"

testImplementation 'org.springframework.boot:spring-boot-starter-validation'
testImplementation "org.springdoc:springdoc-openapi-starter-webmvc-ui:${revSpringDoc}"
}

/*
Expand Down
2 changes: 1 addition & 1 deletion dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,6 @@ ext {
revNatsStreaming = '2.6.5'
revNats = '2.16.14'
revStan = '2.2.3'
revFlyway = '9.0.4'
revFlyway = '10.15.2'

}
2 changes: 1 addition & 1 deletion docs/devguide/running/source.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ In this article we will explore how you can set up Conductor on your local machi
features.

### Prerequisites
1. JDK 11 or greater
1. JDK 17 or greater
2. (Optional) Docker if you want to run tests. You can install docker from [here](https://www.docker.com/get-started/).
3. Node for building and running UI. Instructions at [https://nodejs.org](https://nodejs.org).
4. Yarn for building and running UI. Instructions at [https://classic.yarnpkg.com/en/docs/install](https://classic.yarnpkg.com/en/docs/install).
Expand Down
2 changes: 1 addition & 1 deletion mysql-persistence/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ dependencies {

implementation "mysql:mysql-connector-java:8.0.33"
implementation "org.springframework.boot:spring-boot-starter-jdbc"
implementation "org.flywaydb:flyway-mysql"
implementation "org.flywaydb:flyway-mysql:${revFlyway}"

testImplementation "org.apache.groovy:groovy-all:${revGroovy}"

Expand Down
2 changes: 2 additions & 0 deletions postgres-external-storage/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ dependencies {
implementation "org.postgresql:postgresql:${revPostgres}"
implementation 'org.springframework.boot:spring-boot-starter-jdbc'
implementation "org.flywaydb:flyway-core:${revFlyway}"
implementation "org.flywaydb:flyway-database-postgresql:${revFlyway}"

implementation "org.springdoc:springdoc-openapi-starter-webmvc-ui:${revSpringDoc}"
implementation "commons-codec:commons-codec:${revCodec}"

Expand Down
1 change: 1 addition & 0 deletions postgres-persistence/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dependencies {
implementation "org.postgresql:postgresql:${revPostgres}"
implementation "org.springframework.boot:spring-boot-starter-jdbc"
implementation "org.flywaydb:flyway-core:${revFlyway}"
implementation "org.flywaydb:flyway-database-postgresql:${revFlyway}"

testImplementation "org.apache.groovy:groovy-all:${revGroovy}"
testImplementation project(':conductor-server')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public Flyway flywayForPrimaryDb() {
config.locations("classpath:db/migration_postgres");
}

return config.configuration(Map.of())
return config.configuration(Map.of("flyway.postgresql.transactional.lock", "false"))
.schemas(properties.getSchema())
.dataSource(dataSource)
.outOfOrder(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,19 @@ public PostgresIndexDAO(
@Override
public void indexWorkflow(WorkflowSummary workflow) {
String INSERT_WORKFLOW_INDEX_SQL =
"INSERT INTO workflow_index (workflow_id, correlation_id, workflow_type, start_time, status, json_data)"
+ "VALUES (?, ?, ?, ?, ?, ?::JSONB) ON CONFLICT (workflow_id) \n"
"INSERT INTO workflow_index (workflow_id, correlation_id, workflow_type, start_time, update_time, status, json_data)"
+ "VALUES (?, ?, ?, ?, ?, ?, ?::JSONB) ON CONFLICT (workflow_id) \n"
+ "DO UPDATE SET correlation_id = EXCLUDED.correlation_id, workflow_type = EXCLUDED.workflow_type, "
+ "start_time = EXCLUDED.start_time, status = EXCLUDED.status, json_data = EXCLUDED.json_data";
+ "start_time = EXCLUDED.start_time, status = EXCLUDED.status, json_data = EXCLUDED.json_data "
+ "WHERE EXCLUDED.update_time > workflow_index.update_time";

if (onlyIndexOnStatusChange) {
INSERT_WORKFLOW_INDEX_SQL += " WHERE workflow_index.status != EXCLUDED.status";
INSERT_WORKFLOW_INDEX_SQL += " AND workflow_index.status != EXCLUDED.status";
}

TemporalAccessor updateTa = DateTimeFormatter.ISO_INSTANT.parse(workflow.getUpdateTime());
Timestamp updateTime = Timestamp.from(Instant.from(updateTa));

TemporalAccessor ta = DateTimeFormatter.ISO_INSTANT.parse(workflow.getStartTime());
Timestamp startTime = Timestamp.from(Instant.from(ta));

Expand All @@ -102,6 +106,7 @@ public void indexWorkflow(WorkflowSummary workflow) {
.addParameter(workflow.getCorrelationId())
.addParameter(workflow.getWorkflowType())
.addParameter(startTime)
.addParameter(updateTime)
.addParameter(workflow.getStatus().toString())
.addJsonParameter(workflow)
.executeUpdate());
Expand Down Expand Up @@ -135,10 +140,11 @@ public void indexTask(TaskSummary task) {
"INSERT INTO task_index (task_id, task_type, task_def_name, status, start_time, update_time, workflow_type, json_data)"
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?::JSONB) ON CONFLICT (task_id) "
+ "DO UPDATE SET task_type = EXCLUDED.task_type, task_def_name = EXCLUDED.task_def_name, "
+ "status = EXCLUDED.status, update_time = EXCLUDED.update_time, json_data = EXCLUDED.json_data";
+ "status = EXCLUDED.status, update_time = EXCLUDED.update_time, json_data = EXCLUDED.json_data "
+ "WHERE EXCLUDED.update_time > task_index.update_time";

if (onlyIndexOnStatusChange) {
INSERT_TASK_INDEX_SQL += " WHERE task_index.status != EXCLUDED.status";
INSERT_TASK_INDEX_SQL += " AND task_index.status != EXCLUDED.status";
}

TemporalAccessor updateTa = DateTimeFormatter.ISO_INSTANT.parse(task.getUpdateTime());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
ALTER TABLE workflow_index
ADD update_time TIMESTAMP WITH TIME ZONE NULL;

UPDATE workflow_index
SET update_time = to_timestamp(json_data->>'updateTime', 'YYYY-MM-DDTHH24:MI:SSZ')::timestamp WITH time zone;

ALTER TABLE workflow_index
ALTER COLUMN update_time SET NOT NULL;
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ private WorkflowSummary getMockWorkflowSummary(String id) {
wfs.setCorrelationId("correlation-id");
wfs.setWorkflowType("workflow-type");
wfs.setStartTime("2023-02-07T08:42:45Z");
wfs.setUpdateTime("2023-02-07T08:43:45Z");
wfs.setStatus(Workflow.WorkflowStatus.RUNNING);
return wfs;
}
Expand Down Expand Up @@ -142,13 +143,15 @@ public void testIndexWorkflowOnlyStatusChange() throws SQLException {

// Change the record, but not the status, and re-index
wfs.setCorrelationId("new-correlation-id");
wfs.setUpdateTime("2023-02-07T08:44:45Z");
indexDAO.indexWorkflow(wfs);

// retrieve the record, make sure it hasn't changed
checkWorkflow("workflow-id", "RUNNING", "correlation-id");

// Change the status and re-index
wfs.setStatus(Workflow.WorkflowStatus.FAILED);
wfs.setUpdateTime("2023-02-07T08:45:45Z");
indexDAO.indexWorkflow(wfs);

// retrieve the record, make sure it has changed
Expand All @@ -172,9 +175,10 @@ public void testIndexTaskOnlyStatusChange() throws SQLException {

// Change the status and re-index
ts.setStatus(Task.Status.FAILED);
ts.setUpdateTime("2023-02-07T10:43:45Z");
indexDAO.indexTask(ts);

// retrieve the record, make sure it has changed
checkTask("task-id", "FAILED", "2023-02-07 10:42:45.0");
checkTask("task-id", "FAILED", "2023-02-07 10:43:45.0");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ private WorkflowSummary getMockWorkflowSummary(String id) {
wfs.setCorrelationId("correlation-id");
wfs.setWorkflowType("workflow-type");
wfs.setStartTime("2023-02-07T08:42:45Z");
wfs.setUpdateTime("2023-02-07T08:43:45Z");
wfs.setStatus(Workflow.WorkflowStatus.COMPLETED);
return wfs;
}
Expand Down Expand Up @@ -173,7 +174,7 @@ private void compareTaskSummary(TaskSummary ts) throws SQLException {

@Test
public void testIndexNewWorkflow() throws SQLException {
WorkflowSummary wfs = getMockWorkflowSummary("workflow-id");
WorkflowSummary wfs = getMockWorkflowSummary("workflow-id-new");

indexDAO.indexWorkflow(wfs);

Expand All @@ -182,22 +183,44 @@ public void testIndexNewWorkflow() throws SQLException {

@Test
public void testIndexExistingWorkflow() throws SQLException {
WorkflowSummary wfs = getMockWorkflowSummary("workflow-id");
WorkflowSummary wfs = getMockWorkflowSummary("workflow-id-existing");

indexDAO.indexWorkflow(wfs);

compareWorkflowSummary(wfs);

wfs.setStatus(Workflow.WorkflowStatus.FAILED);
wfs.setUpdateTime("2023-02-07T08:44:45Z");

indexDAO.indexWorkflow(wfs);

compareWorkflowSummary(wfs);
}

@Test
public void testIndexExistingWorkflowWithOlderUpdateToEnsureItsNotIndexed()
throws SQLException {

WorkflowSummary wfs = getMockWorkflowSummary("workflow-id-existing-no-index");

indexDAO.indexWorkflow(wfs);

compareWorkflowSummary(wfs);

// Set the update time to the past
wfs.setUpdateTime("2023-02-07T08:42:45Z");
wfs.setStatus(Workflow.WorkflowStatus.FAILED);

indexDAO.indexWorkflow(wfs);

// Reset the workflow to check it's not been updated
wfs = getMockWorkflowSummary("workflow-id-existing-no-index");
compareWorkflowSummary(wfs);
}

@Test
public void testIndexNewTask() throws SQLException {
TaskSummary ts = getMockTaskSummary("task-id");
TaskSummary ts = getMockTaskSummary("task-id-new");

indexDAO.indexTask(ts);

Expand All @@ -206,16 +229,36 @@ public void testIndexNewTask() throws SQLException {

@Test
public void testIndexExistingTask() throws SQLException {
TaskSummary ts = getMockTaskSummary("task-id");
TaskSummary ts = getMockTaskSummary("task-id-existing");

indexDAO.indexTask(ts);

compareTaskSummary(ts);

ts.setUpdateTime("2023-02-07T09:43:45Z");
ts.setStatus(Task.Status.FAILED);

indexDAO.indexTask(ts);

compareTaskSummary(ts);
}

@Test
public void testIndexExistingTaskWithOlderUpdateToEnsureItsNotIndexed() throws SQLException {
TaskSummary ts = getMockTaskSummary("task-id-exiting-no-update");

indexDAO.indexTask(ts);

compareTaskSummary(ts);

// Set the update time to the past
ts.setUpdateTime("2023-02-07T09:41:45Z");
ts.setStatus(Task.Status.FAILED);

indexDAO.indexTask(ts);

// Reset the task to check it's not been updated
ts = getMockTaskSummary("task-id-exiting-no-update");
compareTaskSummary(ts);
}

Expand Down Expand Up @@ -275,7 +318,7 @@ public void testFullTextSearchWorkflowSummary() {

@Test
public void testJsonSearchWorkflowSummary() {
WorkflowSummary wfs = getMockWorkflowSummary("workflow-id");
WorkflowSummary wfs = getMockWorkflowSummary("workflow-id-summary");
wfs.setVersion(3);

indexDAO.indexWorkflow(wfs);
Expand All @@ -297,40 +340,40 @@ public void testJsonSearchWorkflowSummary() {
@Test
public void testSearchWorkflowSummaryPagination() {
for (int i = 0; i < 5; i++) {
WorkflowSummary wfs = getMockWorkflowSummary("workflow-id-" + i);
WorkflowSummary wfs = getMockWorkflowSummary("workflow-id-pagination-" + i);
indexDAO.indexWorkflow(wfs);
}

List<String> orderBy = Arrays.asList(new String[] {"workflowId:DESC"});
SearchResult<WorkflowSummary> results =
indexDAO.searchWorkflowSummary("", "*", 0, 2, orderBy);
indexDAO.searchWorkflowSummary("", "workflow-id-pagination*", 0, 2, orderBy);
assertEquals("Wrong totalHits returned", 3, results.getTotalHits());
assertEquals("Wrong number of results returned", 2, results.getResults().size());
assertEquals(
"Results returned in wrong order",
"workflow-id-4",
"workflow-id-pagination-4",
results.getResults().get(0).getWorkflowId());
assertEquals(
"Results returned in wrong order",
"workflow-id-3",
"workflow-id-pagination-3",
results.getResults().get(1).getWorkflowId());
results = indexDAO.searchWorkflowSummary("", "*", 2, 2, orderBy);
assertEquals("Wrong totalHits returned", 5, results.getTotalHits());
assertEquals("Wrong number of results returned", 2, results.getResults().size());
assertEquals(
"Results returned in wrong order",
"workflow-id-2",
"workflow-id-pagination-2",
results.getResults().get(0).getWorkflowId());
assertEquals(
"Results returned in wrong order",
"workflow-id-1",
"workflow-id-pagination-1",
results.getResults().get(1).getWorkflowId());
results = indexDAO.searchWorkflowSummary("", "*", 4, 2, orderBy);
assertEquals("Wrong totalHits returned", 7, results.getTotalHits());
assertEquals("Wrong number of results returned", 2, results.getResults().size());
assertEquals(
"Results returned in wrong order",
"workflow-id-0",
"workflow-id-pagination-0",
results.getResults().get(0).getWorkflowId());
}

Expand All @@ -351,7 +394,7 @@ public void testSearchTaskSummary() {
@Test
public void testSearchTaskSummaryPagination() {
for (int i = 0; i < 5; i++) {
TaskSummary ts = getMockTaskSummary("task-id-" + i);
TaskSummary ts = getMockTaskSummary("task-id-pagination-" + i);
indexDAO.indexTask(ts);
}

Expand All @@ -361,29 +404,29 @@ public void testSearchTaskSummaryPagination() {
assertEquals("Wrong number of results returned", 2, results.getResults().size());
assertEquals(
"Results returned in wrong order",
"task-id-4",
"task-id-pagination-4",
results.getResults().get(0).getTaskId());
assertEquals(
"Results returned in wrong order",
"task-id-3",
"task-id-pagination-3",
results.getResults().get(1).getTaskId());
results = indexDAO.searchTaskSummary("", "*", 2, 2, orderBy);
assertEquals("Wrong totalHits returned", 5, results.getTotalHits());
assertEquals("Wrong number of results returned", 2, results.getResults().size());
assertEquals(
"Results returned in wrong order",
"task-id-2",
"task-id-pagination-2",
results.getResults().get(0).getTaskId());
assertEquals(
"Results returned in wrong order",
"task-id-1",
"task-id-pagination-1",
results.getResults().get(1).getTaskId());
results = indexDAO.searchTaskSummary("", "*", 4, 2, orderBy);
assertEquals("Wrong totalHits returned", 7, results.getTotalHits());
assertEquals("Wrong number of results returned", 2, results.getResults().size());
assertEquals(
"Results returned in wrong order",
"task-id-0",
"task-id-pagination-0",
results.getResults().get(0).getTaskId());
}

Expand Down

0 comments on commit df8ac66

Please sign in to comment.