Skip to content

Commit

Permalink
Implementation of Postgres PollDataDAO with read and write caching
Browse files Browse the repository at this point in the history
  • Loading branch information
bjpirt committed Feb 23, 2024
1 parent 386dde5 commit 4a1230b
Show file tree
Hide file tree
Showing 9 changed files with 668 additions and 97 deletions.
48 changes: 48 additions & 0 deletions docs/documentation/advanced/postgresql.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# PostgreSQL

By default conductor runs with an in-memory Redis mock. However, you
can run Conductor against PostgreSQL which provides workflow management, queues and indexing.
There are a number of configuration options that enable you to use more or less of PostgreSQL functionality for your needs.
It has the benefit of requiring fewer moving parts for the infrastructure, but does not scale as well to handle high volumes of workflows.
You should benchmark Conductor with Postgres against your specific workload to be sure.


## Configuration

To enable the basic use of PostgreSQL to manage workflow metadata, set the following property:

```properties
conductor.db.type=postgres
spring.datasource.url=jdbc:postgresql://postgres:5432/conductor
spring.datasource.username=conductor
spring.datasource.password=password
```

To also use PostgreSQL for queues, you can set:

```properties
conductor.queue.type=postgres
```

You can also use PostgreSQL to index workflows, configure this as follows:

```properties
conductor.indexing.enabled=true
conductor.indexing.type=postgres
conductor.elasticsearch.version=0
```

By default, Conductor writes the latest poll for tasks to the database so that it can be used to determine which tasks and domains are active. This creates a lot of database traffic.
To avoid some of this traffic you can configure the PollDataDAO with a write buffer so that it only flushes every x seconds:

```properties
# Flush the data every 5 seconds
conductor.postgres.pollDataFlushInterval=5
```

You can also configure a duration when the cached poll data will be considered stale. This means that the PollDataDAO will try to use the cached data, but if it is older than the configured period, it will check against the database.

```properties
# Data older than 5 seconds is considered stale
conductor.postgres.pollDataCacheValidityPeriod=5
```
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ nav:
- documentation/advanced/azureblob-storage.md
- documentation/advanced/externalpayloadstorage.md
- documentation/advanced/redis.md
- documentation/advanced/postgresql.md
- Client SDKs:
- documentation/clientsdks/index.md
- documentation/clientsdks/java-sdk.md
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,7 @@
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;

import com.netflix.conductor.postgres.dao.PostgresExecutionDAO;
import com.netflix.conductor.postgres.dao.PostgresIndexDAO;
import com.netflix.conductor.postgres.dao.PostgresMetadataDAO;
import com.netflix.conductor.postgres.dao.PostgresQueueDAO;
import com.netflix.conductor.postgres.dao.*;

import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.annotation.*;
Expand Down Expand Up @@ -83,6 +80,15 @@ public PostgresExecutionDAO postgresExecutionDAO(
return new PostgresExecutionDAO(retryTemplate, objectMapper, dataSource);
}

@Bean
@DependsOn({"flywayForPrimaryDb"})
public PostgresPollDataDAO postgresPollDataDAO(
@Qualifier("postgresRetryTemplate") RetryTemplate retryTemplate,
ObjectMapper objectMapper,
PostgresProperties properties) {
return new PostgresPollDataDAO(retryTemplate, objectMapper, dataSource, properties);
}

@Bean
@DependsOn({"flywayForPrimaryDb"})
public PostgresQueueDAO postgresQueueDAO(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ public class PostgresProperties {

private Integer deadlockRetryMax = 3;

@DurationUnit(ChronoUnit.MILLIS)
private Duration pollDataFlushInterval = Duration.ofMillis(0);

@DurationUnit(ChronoUnit.MILLIS)
private Duration pollDataCacheValidityPeriod = Duration.ofMillis(0);

public String schema = "public";

public Duration getTaskDefCacheRefreshInterval() {
Expand All @@ -52,4 +58,20 @@ public String getSchema() {
public void setSchema(String schema) {
this.schema = schema;
}

public Duration getPollDataFlushInterval() {
return pollDataFlushInterval;
}

public void setPollDataFlushInterval(Duration interval) {
this.pollDataFlushInterval = interval;
}

public Duration getPollDataCacheValidityPeriod() {
return pollDataCacheValidityPeriod;
}

public void setPollDataCacheValidityPeriod(Duration period) {
this.pollDataCacheValidityPeriod = period;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

import java.sql.Connection;
import java.sql.Date;
import java.sql.SQLException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.Executors;
Expand All @@ -27,12 +26,10 @@
import org.springframework.retry.support.RetryTemplate;

import com.netflix.conductor.common.metadata.events.EventExecution;
import com.netflix.conductor.common.metadata.tasks.PollData;
import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.core.exception.NonTransientException;
import com.netflix.conductor.dao.ConcurrentExecutionLimitDAO;
import com.netflix.conductor.dao.ExecutionDAO;
import com.netflix.conductor.dao.PollDataDAO;
import com.netflix.conductor.dao.RateLimitingDAO;
import com.netflix.conductor.metrics.Monitors;
import com.netflix.conductor.model.TaskModel;
Expand All @@ -47,7 +44,7 @@
import jakarta.annotation.*;

public class PostgresExecutionDAO extends PostgresBaseDAO
implements ExecutionDAO, RateLimitingDAO, PollDataDAO, ConcurrentExecutionLimitDAO {
implements ExecutionDAO, RateLimitingDAO, ConcurrentExecutionLimitDAO {

private final ScheduledExecutorService scheduledExecutorService;

Expand Down Expand Up @@ -558,45 +555,6 @@ public List<EventExecution> getEventExecutions(
}
}

@Override
public void updateLastPollData(String taskDefName, String domain, String workerId) {
Preconditions.checkNotNull(taskDefName, "taskDefName name cannot be null");
PollData pollData = new PollData(taskDefName, domain, workerId, System.currentTimeMillis());
String effectiveDomain = (domain == null) ? "DEFAULT" : domain;
withTransaction(tx -> insertOrUpdatePollData(tx, pollData, effectiveDomain));
}

@Override
public PollData getPollData(String taskDefName, String domain) {
Preconditions.checkNotNull(taskDefName, "taskDefName name cannot be null");
String effectiveDomain = (domain == null) ? "DEFAULT" : domain;
return getWithRetriedTransactions(tx -> readPollData(tx, taskDefName, effectiveDomain));
}

@Override
public List<PollData> getPollData(String taskDefName) {
Preconditions.checkNotNull(taskDefName, "taskDefName name cannot be null");
return readAllPollData(taskDefName);
}

@Override
public List<PollData> getAllPollData() {
try (Connection tx = dataSource.getConnection()) {
boolean previousAutoCommitMode = tx.getAutoCommit();
tx.setAutoCommit(true);
try {
String GET_ALL_POLL_DATA = "SELECT json_data FROM poll_data ORDER BY queue_name";
return query(tx, GET_ALL_POLL_DATA, q -> q.executeAndFetch(PollData.class));
} catch (Throwable th) {
throw new NonTransientException(th.getMessage(), th);
} finally {
tx.setAutoCommit(previousAutoCommitMode);
}
} catch (SQLException ex) {
throw new NonTransientException(ex.getMessage(), ex);
}
}

private List<TaskModel> getTasks(Connection connection, List<String> taskIds) {
if (taskIds.isEmpty()) {
return Lists.newArrayList();
Expand Down Expand Up @@ -1027,56 +985,6 @@ private EventExecution readEventExecution(
.executeAndFetchFirst(EventExecution.class));
}

private void insertOrUpdatePollData(Connection connection, PollData pollData, String domain) {
/*
* Most times the row will be updated so let's try the update first. This used to be an 'INSERT/ON CONFLICT do update' sql statement. The problem with that
* is that if we try the INSERT first, the sequence will be increased even if the ON CONFLICT happens. Since polling happens *a lot*, the sequence can increase
* dramatically even though it won't be used.
*/
String UPDATE_POLL_DATA =
"UPDATE poll_data SET json_data=?, modified_on=CURRENT_TIMESTAMP WHERE queue_name=? AND domain=?";
int rowsUpdated =
query(
connection,
UPDATE_POLL_DATA,
q ->
q.addJsonParameter(pollData)
.addParameter(pollData.getQueueName())
.addParameter(domain)
.executeUpdate());

if (rowsUpdated == 0) {
String INSERT_POLL_DATA =
"INSERT INTO poll_data (queue_name, domain, json_data, modified_on) VALUES (?, ?, ?, CURRENT_TIMESTAMP) ON CONFLICT (queue_name,domain) DO UPDATE SET json_data=excluded.json_data, modified_on=excluded.modified_on";
execute(
connection,
INSERT_POLL_DATA,
q ->
q.addParameter(pollData.getQueueName())
.addParameter(domain)
.addJsonParameter(pollData)
.executeUpdate());
}
}

private PollData readPollData(Connection connection, String queueName, String domain) {
String GET_POLL_DATA =
"SELECT json_data FROM poll_data WHERE queue_name = ? AND domain = ?";
return query(
connection,
GET_POLL_DATA,
q ->
q.addParameter(queueName)
.addParameter(domain)
.executeAndFetchFirst(PollData.class));
}

private List<PollData> readAllPollData(String queueName) {
String GET_ALL_POLL_DATA = "SELECT json_data FROM poll_data WHERE queue_name = ?";
return queryWithTransaction(
GET_ALL_POLL_DATA, q -> q.addParameter(queueName).executeAndFetch(PollData.class));
}

private List<String> findAllTasksInProgressInOrderOfArrival(TaskModel task, int limit) {
String GET_IN_PROGRESS_TASKS_WITH_LIMIT =
"SELECT task_id FROM task_in_progress WHERE task_def_name = ? ORDER BY created_on LIMIT ?";
Expand Down
Loading

0 comments on commit 4a1230b

Please sign in to comment.