Skip to content

Commit

Permalink
Tidying up
Browse files Browse the repository at this point in the history
  • Loading branch information
bjpirt committed Feb 7, 2024
1 parent 3dca8b5 commit 7e7a865
Show file tree
Hide file tree
Showing 4 changed files with 462 additions and 469 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,6 @@ public void removeWorkflow(String workflowId, boolean archiveWorkflow) {

private void removeWorkflowIndex(WorkflowModel workflow, boolean archiveWorkflow)
throws JsonProcessingException {
LOGGER.info("Removing workflow");
if (archiveWorkflow) {
if (workflow.getStatus().isTerminal()) {
// Only allow archival if workflow is in terminal state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,59 +58,54 @@ public PostgresIndexDAO(
int workerQueueSize = properties.getAsyncWorkerQueueSize();

// Set up a workerpool for performing async operations.
this.executorService =
new ThreadPoolExecutor(
CORE_POOL_SIZE,
maximumPoolSize,
KEEP_ALIVE_TIME,
TimeUnit.MINUTES,
new LinkedBlockingQueue<>(workerQueueSize),
(runnable, executor) -> {
logger.warn(
"Request {} to async dao discarded in executor {}",
runnable,
executor);
Monitors.recordDiscardedIndexingCount("indexQueue");
});
this.executorService = new ThreadPoolExecutor(
CORE_POOL_SIZE,
maximumPoolSize,
KEEP_ALIVE_TIME,
TimeUnit.MINUTES,
new LinkedBlockingQueue<>(workerQueueSize),
(runnable, executor) -> {
logger.warn(
"Request {} to async dao discarded in executor {}",
runnable,
executor);
Monitors.recordDiscardedIndexingCount("indexQueue");
});
}

@Override
public void indexWorkflow(WorkflowSummary workflow) {
String INSERT_WORKFLOW_INDEX_SQL =
"INSERT INTO workflow_index (workflow_id, correlation_id, workflow_type, start_time, status, json_data)"
+ "VALUES (?, ?, ?, ?, ?, ?::JSONB) ON CONFLICT (workflow_id) \n"
+ "DO UPDATE SET correlation_id = EXCLUDED.correlation_id, workflow_type = EXCLUDED.workflow_type, "
+ "start_time = EXCLUDED.start_time, status = EXCLUDED.status, json_data = EXCLUDED.json_data";
String INSERT_WORKFLOW_INDEX_SQL = "INSERT INTO workflow_index (workflow_id, correlation_id, workflow_type, start_time, status, json_data)"
+ "VALUES (?, ?, ?, ?, ?, ?::JSONB) ON CONFLICT (workflow_id) \n"
+ "DO UPDATE SET correlation_id = EXCLUDED.correlation_id, workflow_type = EXCLUDED.workflow_type, "
+ "start_time = EXCLUDED.start_time, status = EXCLUDED.status, json_data = EXCLUDED.json_data";

TemporalAccessor ta = DateTimeFormatter.ISO_INSTANT.parse(workflow.getStartTime());
Timestamp startTime = Timestamp.from(Instant.from(ta));

queryWithTransaction(
INSERT_WORKFLOW_INDEX_SQL,
q ->
q.addParameter(workflow.getWorkflowId())
.addParameter(workflow.getCorrelationId())
.addParameter(workflow.getWorkflowType())
.addParameter(startTime)
.addParameter(workflow.getStatus().toString())
.addJsonParameter(workflow)
.executeUpdate());
q -> q.addParameter(workflow.getWorkflowId())
.addParameter(workflow.getCorrelationId())
.addParameter(workflow.getWorkflowType())
.addParameter(startTime)
.addParameter(workflow.getStatus().toString())
.addJsonParameter(workflow)
.executeUpdate());
}

@Override
public SearchResult<WorkflowSummary> searchWorkflowSummary(
String query, String freeText, int start, int count, List<String> sort) {
PostgresIndexQueryBuilder queryBuilder =
new PostgresIndexQueryBuilder(
"workflow_index", query, freeText, start, count, sort);

List<WorkflowSummary> results =
queryWithTransaction(
queryBuilder.getQuery(),
q -> {
queryBuilder.addParameters(q);
return q.executeAndFetch(WorkflowSummary.class);
});
PostgresIndexQueryBuilder queryBuilder = new PostgresIndexQueryBuilder(
"workflow_index", query, freeText, start, count, sort);

List<WorkflowSummary> results = queryWithTransaction(
queryBuilder.getQuery(),
q -> {
queryBuilder.addParameters(q);
return q.executeAndFetch(WorkflowSummary.class);
});

// To avoid making a second potentially expensive query to postgres say we've
// got enough results for another page so the pagination works
Expand All @@ -120,11 +115,10 @@ public SearchResult<WorkflowSummary> searchWorkflowSummary(

@Override
public void indexTask(TaskSummary task) {
String INSERT_TASK_INDEX_SQL =
"INSERT INTO task_index (task_id, task_type, task_def_name, status, start_time, update_time, workflow_type, json_data)"
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?::JSONB) ON CONFLICT (task_id) "
+ "DO UPDATE SET task_type = EXCLUDED.task_type, task_def_name = EXCLUDED.task_def_name, "
+ "status = EXCLUDED.status, update_time = EXCLUDED.update_time, json_data = EXCLUDED.json_data";
String INSERT_TASK_INDEX_SQL = "INSERT INTO task_index (task_id, task_type, task_def_name, status, start_time, update_time, workflow_type, json_data)"
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?::JSONB) ON CONFLICT (task_id) "
+ "DO UPDATE SET task_type = EXCLUDED.task_type, task_def_name = EXCLUDED.task_def_name, "
+ "status = EXCLUDED.status, update_time = EXCLUDED.update_time, json_data = EXCLUDED.json_data";

TemporalAccessor updateTa = DateTimeFormatter.ISO_INSTANT.parse(task.getUpdateTime());
Timestamp updateTime = Timestamp.from(Instant.from(updateTa));
Expand All @@ -134,31 +128,29 @@ public void indexTask(TaskSummary task) {

queryWithTransaction(
INSERT_TASK_INDEX_SQL,
q ->
q.addParameter(task.getTaskId())
.addParameter(task.getTaskType())
.addParameter(task.getTaskDefName())
.addParameter(task.getStatus().toString())
.addParameter(startTime)
.addParameter(updateTime)
.addParameter(task.getWorkflowType())
.addJsonParameter(task)
.executeUpdate());
q -> q.addParameter(task.getTaskId())
.addParameter(task.getTaskType())
.addParameter(task.getTaskDefName())
.addParameter(task.getStatus().toString())
.addParameter(startTime)
.addParameter(updateTime)
.addParameter(task.getWorkflowType())
.addJsonParameter(task)
.executeUpdate());
}

@Override
public SearchResult<TaskSummary> searchTaskSummary(
String query, String freeText, int start, int count, List<String> sort) {
PostgresIndexQueryBuilder queryBuilder =
new PostgresIndexQueryBuilder("task_index", query, freeText, start, count, sort);
PostgresIndexQueryBuilder queryBuilder = new PostgresIndexQueryBuilder("task_index", query, freeText, start,
count, sort);

List<TaskSummary> results =
queryWithTransaction(
queryBuilder.getQuery(),
q -> {
queryBuilder.addParameters(q);
return q.executeAndFetch(TaskSummary.class);
});
List<TaskSummary> results = queryWithTransaction(
queryBuilder.getQuery(),
q -> {
queryBuilder.addParameters(q);
return q.executeAndFetch(TaskSummary.class);
});

// To avoid making a second potentially expensive query to postgres say we've
// got enough results for another page so the pagination works
Expand All @@ -168,42 +160,40 @@ public SearchResult<TaskSummary> searchTaskSummary(

@Override
public void addTaskExecutionLogs(List<TaskExecLog> logs) {
String INSERT_LOG =
"INSERT INTO task_execution_logs (task_id, created_time, log) VALUES (?, ?, ?)";
String INSERT_LOG = "INSERT INTO task_execution_logs (task_id, created_time, log) VALUES (?, ?, ?)";
for (TaskExecLog log : logs) {
queryWithTransaction(
INSERT_LOG,
q ->
q.addParameter(log.getTaskId())
.addParameter(new Timestamp(log.getCreatedTime()))
.addParameter(log.getLog())
.executeUpdate());
q -> q.addParameter(log.getTaskId())
.addParameter(new Timestamp(log.getCreatedTime()))
.addParameter(log.getLog())
.executeUpdate());
}
}

@Override
public List<TaskExecLog> getTaskExecutionLogs(String taskId) {
return queryWithTransaction(
"SELECT log, task_id, created_time FROM task_execution_logs WHERE task_id = ? ORDER BY created_time ASC",
q ->
q.addParameter(taskId)
.executeAndFetch(
rs -> {
List<TaskExecLog> result = new ArrayList<>();
while (rs.next()) {
TaskExecLog log = new TaskExecLog();
log.setLog(rs.getString("log"));
log.setTaskId(rs.getString("task_id"));
log.setCreatedTime(
rs.getDate("created_time").getTime());
result.add(log);
}
return result;
}));
q -> q.addParameter(taskId)
.executeAndFetch(
rs -> {
List<TaskExecLog> result = new ArrayList<>();
while (rs.next()) {
TaskExecLog log = new TaskExecLog();
log.setLog(rs.getString("log"));
log.setTaskId(rs.getString("task_id"));
log.setCreatedTime(
rs.getDate("created_time").getTime());
result.add(log);
}
return result;
}));
}

@Override
public void setup() {}
public void setup() {
}

@Override
public CompletableFuture<Void> asyncIndexWorkflow(WorkflowSummary workflow) {
Expand Down Expand Up @@ -233,15 +223,13 @@ public SearchResult<String> searchTasks(

@Override
public void removeWorkflow(String workflowId) {
logger.info("Removing workflow (postgres) {}", workflowId);
String REMOVE_WORKFLOW_SQL = "DELETE FROM workflow_index WHERE workflow_id = ?";

queryWithTransaction(REMOVE_WORKFLOW_SQL, q -> q.addParameter(workflowId).executeUpdate());
}

@Override
public CompletableFuture<Void> asyncRemoveWorkflow(String workflowId) {
logger.info("Async removing workflow (postgres) {}", workflowId);
return CompletableFuture.runAsync(() -> removeWorkflow(workflowId), executorService);
}

Expand All @@ -259,19 +247,16 @@ public CompletableFuture<Void> asyncUpdateWorkflow(

@Override
public void removeTask(String workflowId, String taskId) {
logger.info("Removing task (postgres) {}", taskId);
String REMOVE_TASK_SQL =
"WITH task_delete AS (DELETE FROM task_index WHERE workflow_id = ?)"
+ "DELETE FROM task_execution_logs WHERE task_id =?";
String REMOVE_TASK_SQL = "WITH task_delete AS (DELETE FROM task_index WHERE task_id = ?)"
+ "DELETE FROM task_execution_logs WHERE task_id =?";

queryWithTransaction(
REMOVE_TASK_SQL,
q -> q.addParameter(workflowId).addParameter(workflowId).executeUpdate());
q -> q.addParameter(taskId).addParameter(taskId).executeUpdate());
}

@Override
public CompletableFuture<Void> asyncRemoveTask(String workflowId, String taskId) {
logger.info("Async removing task (postgres) {}", workflowId);
return CompletableFuture.runAsync(() -> removeTask(workflowId, taskId), executorService);
}

Expand Down

This file was deleted.

Loading

0 comments on commit 7e7a865

Please sign in to comment.