Skip to content

Commit

Permalink
Initial implementation of an experimental queue optimisation using Po…
Browse files Browse the repository at this point in the history
…stgres LISTEN/NOTIFY (#87)

* Initial implementation of an experimental queue optimisation using Postgres LISTEN/NOTIFY

* Use Optional instead of returning null for getSize()

* Update connection method for queue listener to make it more thread safe

* Only apply the queue notify migrations if configured
  • Loading branch information
bjpirt authored Mar 5, 2024
1 parent 71c9f95 commit afa9499
Show file tree
Hide file tree
Showing 8 changed files with 576 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import javax.sql.DataSource;

import org.flywaydb.core.Flyway;
import org.flywaydb.core.api.configuration.FluentConfiguration;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
Expand Down Expand Up @@ -55,9 +56,16 @@ public PostgresConfiguration(DataSource dataSource, PostgresProperties propertie
@Bean(initMethod = "migrate")
@PostConstruct
public Flyway flywayForPrimaryDb() {
return Flyway.configure()
.locations("classpath:db/migration_postgres")
.configuration(Map.of("flyway.postgresql.transactional.lock", "false"))
FluentConfiguration config = Flyway.configure();

if (properties.getExperimentalQueueNotify()) {
config.locations(
"classpath:db/migration_postgres", "classpath:db/migration_postgres_notify");
} else {
config.locations("classpath:db/migration_postgres");
}

return config.configuration(Map.of("flyway.postgresql.transactional.lock", "false"))
.schemas(properties.getSchema())
.dataSource(dataSource)
.outOfOrder(true)
Expand Down Expand Up @@ -95,8 +103,9 @@ public PostgresPollDataDAO postgresPollDataDAO(
@DependsOn({"flywayForPrimaryDb"})
public PostgresQueueDAO postgresQueueDAO(
@Qualifier("postgresRetryTemplate") RetryTemplate retryTemplate,
ObjectMapper objectMapper) {
return new PostgresQueueDAO(retryTemplate, objectMapper, dataSource);
ObjectMapper objectMapper,
PostgresProperties properties) {
return new PostgresQueueDAO(retryTemplate, objectMapper, dataSource, properties);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ public class PostgresProperties {
@DurationUnit(ChronoUnit.MILLIS)
private Duration pollDataCacheValidityPeriod = Duration.ofMillis(0);

private boolean experimentalQueueNotify = false;

private Integer experimentalQueueNotifyStalePeriod = 5000;

public String schema = "public";

public boolean allowFullTextQueries = true;
Expand All @@ -45,6 +49,22 @@ public class PostgresProperties {
/** The size of the queue used for holding async indexing tasks */
private int asyncWorkerQueueSize = 100;

public boolean getExperimentalQueueNotify() {
return experimentalQueueNotify;
}

public void setExperimentalQueueNotify(boolean experimentalQueueNotify) {
this.experimentalQueueNotify = experimentalQueueNotify;
}

public Integer getExperimentalQueueNotifyStalePeriod() {
return experimentalQueueNotifyStalePeriod;
}

public void setExperimentalQueueNotifyStalePeriod(Integer experimentalQueueNotifyStalePeriod) {
this.experimentalQueueNotifyStalePeriod = experimentalQueueNotifyStalePeriod;
}

public Duration getTaskDefCacheRefreshInterval() {
return taskDefCacheRefreshInterval;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@

import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.postgres.config.PostgresProperties;
import com.netflix.conductor.postgres.util.ExecutorsUtil;
import com.netflix.conductor.postgres.util.PostgresQueueListener;
import com.netflix.conductor.postgres.util.Query;

import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -40,8 +42,13 @@ public class PostgresQueueDAO extends PostgresBaseDAO implements QueueDAO {

private final ScheduledExecutorService scheduledExecutorService;

private PostgresQueueListener queueListener;

public PostgresQueueDAO(
RetryTemplate retryTemplate, ObjectMapper objectMapper, DataSource dataSource) {
RetryTemplate retryTemplate,
ObjectMapper objectMapper,
DataSource dataSource,
PostgresProperties properties) {
super(retryTemplate, objectMapper, dataSource);

this.scheduledExecutorService =
Expand All @@ -53,6 +60,10 @@ public PostgresQueueDAO(
UNACK_SCHEDULE_MS,
TimeUnit.MILLISECONDS);
logger.debug("{} is ready to serve", PostgresQueueDAO.class.getName());

if (properties.getExperimentalQueueNotify()) {
this.queueListener = new PostgresQueueListener(dataSource, properties);
}
}

@PreDestroy
Expand Down Expand Up @@ -169,6 +180,13 @@ public void remove(String queueName, String messageId) {

@Override
public int getSize(String queueName) {
if (queueListener != null) {
Optional<Integer> size = queueListener.getSize(queueName);
if (size.isPresent()) {
return size.get();
}
}

final String GET_QUEUE_SIZE = "SELECT COUNT(*) FROM queue_message WHERE queue_name = ?";
return queryWithTransaction(
GET_QUEUE_SIZE, q -> ((Long) q.addParameter(queueName).executeCount()).intValue());
Expand Down Expand Up @@ -425,6 +443,12 @@ private boolean removeMessage(Connection connection, String queueName, String me
private List<Message> popMessages(
Connection connection, String queueName, int count, int timeout) {

if (this.queueListener != null) {
if (!this.queueListener.hasMessagesReady(queueName)) {
return new ArrayList<>();
}
}

String POP_QUERY =
"UPDATE queue_message SET popped = true WHERE message_id IN ("
+ "SELECT message_id FROM queue_message WHERE queue_name = ? AND popped = false AND "
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
/*
* Copyright 2024 Conductor Authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.postgres.util;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import javax.sql.DataSource;

import org.postgresql.PGConnection;
import org.postgresql.PGNotification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.netflix.conductor.core.exception.NonTransientException;
import com.netflix.conductor.postgres.config.PostgresProperties;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

public class PostgresQueueListener {

private PGConnection pgconn;

private volatile Connection conn;

private final Lock connectionLock = new ReentrantLock();

private DataSource dataSource;

private HashMap<String, QueueStats> queues;

private volatile boolean connected = false;

private long lastNotificationTime = 0;

private Integer stalePeriod;

protected final Logger logger = LoggerFactory.getLogger(getClass());

public PostgresQueueListener(DataSource dataSource, PostgresProperties properties) {
logger.info("Using experimental PostgresQueueListener");
this.dataSource = dataSource;
this.stalePeriod = properties.getExperimentalQueueNotifyStalePeriod();
connect();
}

public boolean hasMessagesReady(String queueName) {
checkUpToDate();
handleNotifications();
if (notificationIsStale() || !connected) {
connect();
return true;
}

QueueStats queueStats = queues.get(queueName);
if (queueStats == null) {
return false;
}

if (queueStats.getNextDelivery() > System.currentTimeMillis()) {
return false;
}

return true;
}

public Optional<Integer> getSize(String queueName) {
checkUpToDate();
handleNotifications();
if (notificationIsStale() || !connected) {
connect();
return Optional.empty();
}

QueueStats queueStats = queues.get(queueName);
if (queueStats == null) {
return Optional.of(0);
}

return Optional.of(queueStats.getDepth());
}

private boolean notificationIsStale() {
return System.currentTimeMillis() - lastNotificationTime > this.stalePeriod;
}

private void connect() {
// Attempt to acquire the lock without waiting.
if (!connectionLock.tryLock()) {
// If the lock is not available, return early.
return;
}

boolean newConnectedState = false;

try {
// Check if the connection is null or not valid.
if (conn == null || !conn.isValid(1)) {
// Close the old connection if it exists and is not valid.
if (conn != null) {
try {
conn.close();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}

// Establish a new connection.
try {
this.conn = dataSource.getConnection();
this.pgconn = conn.unwrap(PGConnection.class);

boolean previousAutoCommitMode = conn.getAutoCommit();
conn.setAutoCommit(true);
try {
conn.prepareStatement("LISTEN conductor_queue_state").execute();
newConnectedState = true;
} catch (Throwable th) {
conn.rollback();
logger.error(th.getMessage());
} finally {
conn.setAutoCommit(previousAutoCommitMode);
}
requestStats();
} catch (SQLException e) {
throw new NonTransientException(e.getMessage(), e);
}
}
} catch (Exception e) {
throw new NonTransientException(e.getMessage(), e);
} finally {
connected = newConnectedState;
// Ensure the lock is always released.
connectionLock.unlock();
}
}

private void requestStats() {
try {
boolean previousAutoCommitMode = conn.getAutoCommit();
conn.setAutoCommit(true);
try {
conn.prepareStatement("SELECT queue_notify()").execute();
connected = true;
} catch (Throwable th) {
conn.rollback();
logger.error(th.getMessage());
} finally {
conn.setAutoCommit(previousAutoCommitMode);
}
} catch (SQLException e) {
if (!e.getSQLState().equals("08003")) {
logger.error("Error fetching notifications {}", e.getSQLState());
}
connect();
}
}

private void checkUpToDate() {
if (System.currentTimeMillis() - lastNotificationTime > this.stalePeriod * 0.75) {
requestStats();
}
}

private void handleNotifications() {
try {
PGNotification[] notifications = pgconn.getNotifications();
if (notifications == null || notifications.length == 0) {
return;
}
processPayload(notifications[notifications.length - 1].getParameter());
} catch (SQLException e) {
if (e.getSQLState() != "08003") {
logger.error("Error fetching notifications {}", e.getSQLState());
}
connect();
}
}

private void processPayload(String payload) {
ObjectMapper objectMapper = new ObjectMapper();
try {
JsonNode notification = objectMapper.readTree(payload);
JsonNode lastNotificationTime = notification.get("__now__");
if (lastNotificationTime != null) {
this.lastNotificationTime = lastNotificationTime.asLong();
}
Iterator<String> iterator = notification.fieldNames();

HashMap<String, QueueStats> queueStats = new HashMap<>();
iterator.forEachRemaining(
key -> {
if (!key.equals("__now__")) {
try {
QueueStats stats =
objectMapper.treeToValue(
notification.get(key), QueueStats.class);
queueStats.put(key, stats);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
});
this.queues = queueStats;
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}
Loading

0 comments on commit afa9499

Please sign in to comment.