Skip to content

Commit

Permalink
Updated Postgres indexer to only index if the update is newer than th…
Browse files Browse the repository at this point in the history
…e current index
  • Loading branch information
danmiller192 committed Jul 16, 2024
1 parent 3a43cc6 commit dc3a2e1
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,19 @@ public PostgresIndexDAO(
@Override
public void indexWorkflow(WorkflowSummary workflow) {
String INSERT_WORKFLOW_INDEX_SQL =
"INSERT INTO workflow_index (workflow_id, correlation_id, workflow_type, start_time, status, json_data)"
+ "VALUES (?, ?, ?, ?, ?, ?::JSONB) ON CONFLICT (workflow_id) \n"
"INSERT INTO workflow_index (workflow_id, correlation_id, workflow_type, start_time, update_time, status, json_data)"
+ "VALUES (?, ?, ?, ?, ?, ?, ?::JSONB) ON CONFLICT (workflow_id) \n"
+ "DO UPDATE SET correlation_id = EXCLUDED.correlation_id, workflow_type = EXCLUDED.workflow_type, "
+ "start_time = EXCLUDED.start_time, status = EXCLUDED.status, json_data = EXCLUDED.json_data";
+ "start_time = EXCLUDED.start_time, status = EXCLUDED.status, json_data = EXCLUDED.json_data "
+ "WHERE EXCLUDED.update_time > workflow_index.update_time";

if (onlyIndexOnStatusChange) {
INSERT_WORKFLOW_INDEX_SQL += " WHERE workflow_index.status != EXCLUDED.status";
INSERT_WORKFLOW_INDEX_SQL += " AND workflow_index.status != EXCLUDED.status";
}

TemporalAccessor updateTa = DateTimeFormatter.ISO_INSTANT.parse(workflow.getUpdateTime());
Timestamp updateTime = Timestamp.from(Instant.from(updateTa));

TemporalAccessor ta = DateTimeFormatter.ISO_INSTANT.parse(workflow.getStartTime());
Timestamp startTime = Timestamp.from(Instant.from(ta));

Expand All @@ -102,6 +106,7 @@ public void indexWorkflow(WorkflowSummary workflow) {
.addParameter(workflow.getCorrelationId())
.addParameter(workflow.getWorkflowType())
.addParameter(startTime)
.addParameter(updateTime)
.addParameter(workflow.getStatus().toString())
.addJsonParameter(workflow)
.executeUpdate());
Expand Down Expand Up @@ -135,10 +140,11 @@ public void indexTask(TaskSummary task) {
"INSERT INTO task_index (task_id, task_type, task_def_name, status, start_time, update_time, workflow_type, json_data)"
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?::JSONB) ON CONFLICT (task_id) "
+ "DO UPDATE SET task_type = EXCLUDED.task_type, task_def_name = EXCLUDED.task_def_name, "
+ "status = EXCLUDED.status, update_time = EXCLUDED.update_time, json_data = EXCLUDED.json_data";
+ "status = EXCLUDED.status, update_time = EXCLUDED.update_time, json_data = EXCLUDED.json_data "
+ "WHERE EXCLUDED.update_time > task_index.update_time";

if (onlyIndexOnStatusChange) {
INSERT_TASK_INDEX_SQL += " WHERE task_index.status != EXCLUDED.status";
INSERT_TASK_INDEX_SQL += " AND task_index.status != EXCLUDED.status";
}

TemporalAccessor updateTa = DateTimeFormatter.ISO_INSTANT.parse(task.getUpdateTime());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
ALTER TABLE workflow_index
ADD update_time TIMESTAMP WITH TIME ZONE NULL;

UPDATE workflow_index
SET update_time = to_timestamp(json_data->>'updateTime', 'YYYY-MM-DDTHH24:MI:SSZ')::timestamp WITH time zone;

ALTER TABLE workflow_index
ALTER COLUMN update_time SET NOT NULL;
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ private WorkflowSummary getMockWorkflowSummary(String id) {
wfs.setCorrelationId("correlation-id");
wfs.setWorkflowType("workflow-type");
wfs.setStartTime("2023-02-07T08:42:45Z");
wfs.setUpdateTime("2023-02-07T08:43:45Z");
wfs.setStatus(Workflow.WorkflowStatus.RUNNING);
return wfs;
}
Expand Down Expand Up @@ -142,13 +143,15 @@ public void testIndexWorkflowOnlyStatusChange() throws SQLException {

// Change the record, but not the status, and re-index
wfs.setCorrelationId("new-correlation-id");
wfs.setUpdateTime("2023-02-07T08:44:45Z");
indexDAO.indexWorkflow(wfs);

// retrieve the record, make sure it hasn't changed
checkWorkflow("workflow-id", "RUNNING", "correlation-id");

// Change the status and re-index
wfs.setStatus(Workflow.WorkflowStatus.FAILED);
wfs.setUpdateTime("2023-02-07T08:45:45Z");
indexDAO.indexWorkflow(wfs);

// retrieve the record, make sure it has changed
Expand All @@ -172,9 +175,10 @@ public void testIndexTaskOnlyStatusChange() throws SQLException {

// Change the status and re-index
ts.setStatus(Task.Status.FAILED);
ts.setUpdateTime("2023-02-07T10:43:45Z");
indexDAO.indexTask(ts);

// retrieve the record, make sure it has changed
checkTask("task-id", "FAILED", "2023-02-07 10:42:45.0");
checkTask("task-id", "FAILED", "2023-02-07 10:43:45.0");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ private WorkflowSummary getMockWorkflowSummary(String id) {
wfs.setCorrelationId("correlation-id");
wfs.setWorkflowType("workflow-type");
wfs.setStartTime("2023-02-07T08:42:45Z");
wfs.setUpdateTime("2023-02-07T08:43:45Z");
wfs.setStatus(Workflow.WorkflowStatus.COMPLETED);
return wfs;
}
Expand Down Expand Up @@ -173,7 +174,7 @@ private void compareTaskSummary(TaskSummary ts) throws SQLException {

@Test
public void testIndexNewWorkflow() throws SQLException {
WorkflowSummary wfs = getMockWorkflowSummary("workflow-id");
WorkflowSummary wfs = getMockWorkflowSummary("workflow-id-new");

indexDAO.indexWorkflow(wfs);

Expand All @@ -182,22 +183,44 @@ public void testIndexNewWorkflow() throws SQLException {

@Test
public void testIndexExistingWorkflow() throws SQLException {
WorkflowSummary wfs = getMockWorkflowSummary("workflow-id");
WorkflowSummary wfs = getMockWorkflowSummary("workflow-id-existing");

indexDAO.indexWorkflow(wfs);

compareWorkflowSummary(wfs);

wfs.setStatus(Workflow.WorkflowStatus.FAILED);
wfs.setUpdateTime("2023-02-07T08:44:45Z");

indexDAO.indexWorkflow(wfs);

compareWorkflowSummary(wfs);
}

@Test
public void testIndexExistingWorkflowWithOlderUpdateToEnsureItsNotIndexed()
throws SQLException {

WorkflowSummary wfs = getMockWorkflowSummary("workflow-id-existing-no-index");

indexDAO.indexWorkflow(wfs);

compareWorkflowSummary(wfs);

// Set the update time to the past
wfs.setUpdateTime("2023-02-07T08:42:45Z");
wfs.setStatus(Workflow.WorkflowStatus.FAILED);

indexDAO.indexWorkflow(wfs);

// Reset the workflow to check it's not been updated
wfs = getMockWorkflowSummary("workflow-id-existing-no-index");
compareWorkflowSummary(wfs);
}

@Test
public void testIndexNewTask() throws SQLException {
TaskSummary ts = getMockTaskSummary("task-id");
TaskSummary ts = getMockTaskSummary("task-id-new");

indexDAO.indexTask(ts);

Expand All @@ -206,16 +229,36 @@ public void testIndexNewTask() throws SQLException {

@Test
public void testIndexExistingTask() throws SQLException {
TaskSummary ts = getMockTaskSummary("task-id");
TaskSummary ts = getMockTaskSummary("task-id-existing");

indexDAO.indexTask(ts);

compareTaskSummary(ts);

ts.setUpdateTime("2023-02-07T09:43:45Z");
ts.setStatus(Task.Status.FAILED);

indexDAO.indexTask(ts);

compareTaskSummary(ts);
}

@Test
public void testIndexExistingTaskWithOlderUpdateToEnsureItsNotIndexed() throws SQLException {
TaskSummary ts = getMockTaskSummary("task-id-exiting-no-update");

indexDAO.indexTask(ts);

compareTaskSummary(ts);

// Set the update time to the past
ts.setUpdateTime("2023-02-07T09:41:45Z");
ts.setStatus(Task.Status.FAILED);

indexDAO.indexTask(ts);

// Reset the task to check it's not been updated
ts = getMockTaskSummary("task-id-exiting-no-update");
compareTaskSummary(ts);
}

Expand Down Expand Up @@ -275,7 +318,7 @@ public void testFullTextSearchWorkflowSummary() {

@Test
public void testJsonSearchWorkflowSummary() {
WorkflowSummary wfs = getMockWorkflowSummary("workflow-id");
WorkflowSummary wfs = getMockWorkflowSummary("workflow-id-summary");
wfs.setVersion(3);

indexDAO.indexWorkflow(wfs);
Expand All @@ -297,40 +340,40 @@ public void testJsonSearchWorkflowSummary() {
@Test
public void testSearchWorkflowSummaryPagination() {
for (int i = 0; i < 5; i++) {
WorkflowSummary wfs = getMockWorkflowSummary("workflow-id-" + i);
WorkflowSummary wfs = getMockWorkflowSummary("workflow-id-pagination-" + i);
indexDAO.indexWorkflow(wfs);
}

List<String> orderBy = Arrays.asList(new String[] {"workflowId:DESC"});
SearchResult<WorkflowSummary> results =
indexDAO.searchWorkflowSummary("", "*", 0, 2, orderBy);
indexDAO.searchWorkflowSummary("", "workflow-id-pagination*", 0, 2, orderBy);
assertEquals("Wrong totalHits returned", 3, results.getTotalHits());
assertEquals("Wrong number of results returned", 2, results.getResults().size());
assertEquals(
"Results returned in wrong order",
"workflow-id-4",
"workflow-id-pagination-4",
results.getResults().get(0).getWorkflowId());
assertEquals(
"Results returned in wrong order",
"workflow-id-3",
"workflow-id-pagination-3",
results.getResults().get(1).getWorkflowId());
results = indexDAO.searchWorkflowSummary("", "*", 2, 2, orderBy);
assertEquals("Wrong totalHits returned", 5, results.getTotalHits());
assertEquals("Wrong number of results returned", 2, results.getResults().size());
assertEquals(
"Results returned in wrong order",
"workflow-id-2",
"workflow-id-pagination-2",
results.getResults().get(0).getWorkflowId());
assertEquals(
"Results returned in wrong order",
"workflow-id-1",
"workflow-id-pagination-1",
results.getResults().get(1).getWorkflowId());
results = indexDAO.searchWorkflowSummary("", "*", 4, 2, orderBy);
assertEquals("Wrong totalHits returned", 7, results.getTotalHits());
assertEquals("Wrong number of results returned", 2, results.getResults().size());
assertEquals(
"Results returned in wrong order",
"workflow-id-0",
"workflow-id-pagination-0",
results.getResults().get(0).getWorkflowId());
}

Expand All @@ -351,7 +394,7 @@ public void testSearchTaskSummary() {
@Test
public void testSearchTaskSummaryPagination() {
for (int i = 0; i < 5; i++) {
TaskSummary ts = getMockTaskSummary("task-id-" + i);
TaskSummary ts = getMockTaskSummary("task-id-pagination-" + i);
indexDAO.indexTask(ts);
}

Expand All @@ -361,29 +404,29 @@ public void testSearchTaskSummaryPagination() {
assertEquals("Wrong number of results returned", 2, results.getResults().size());
assertEquals(
"Results returned in wrong order",
"task-id-4",
"task-id-pagination-4",
results.getResults().get(0).getTaskId());
assertEquals(
"Results returned in wrong order",
"task-id-3",
"task-id-pagination-3",
results.getResults().get(1).getTaskId());
results = indexDAO.searchTaskSummary("", "*", 2, 2, orderBy);
assertEquals("Wrong totalHits returned", 5, results.getTotalHits());
assertEquals("Wrong number of results returned", 2, results.getResults().size());
assertEquals(
"Results returned in wrong order",
"task-id-2",
"task-id-pagination-2",
results.getResults().get(0).getTaskId());
assertEquals(
"Results returned in wrong order",
"task-id-1",
"task-id-pagination-1",
results.getResults().get(1).getTaskId());
results = indexDAO.searchTaskSummary("", "*", 4, 2, orderBy);
assertEquals("Wrong totalHits returned", 7, results.getTotalHits());
assertEquals("Wrong number of results returned", 2, results.getResults().size());
assertEquals(
"Results returned in wrong order",
"task-id-0",
"task-id-pagination-0",
results.getResults().get(0).getTaskId());
}

Expand Down

0 comments on commit dc3a2e1

Please sign in to comment.