From afa9499e201dc1130809585909978788e858acbc Mon Sep 17 00:00:00 2001 From: Ben Pirt Date: Tue, 5 Mar 2024 17:14:08 +0000 Subject: [PATCH] Initial implementation of an experimental queue optimisation using Postgres LISTEN/NOTIFY (#87) * Initial implementation of an experimental queue optimisation using Postgres LISTEN/NOTIFY * Use Optional instead of returning null for getSize() * Update connection method for queue listener to make it more thread safe * Only apply the queue notify migrations if configured --- .../config/PostgresConfiguration.java | 19 +- .../postgres/config/PostgresProperties.java | 20 ++ .../postgres/dao/PostgresQueueDAO.java | 26 +- .../postgres/util/PostgresQueueListener.java | 226 ++++++++++++++++++ .../conductor/postgres/util/QueueStats.java | 39 +++ .../V10.1__notify.sql | 59 +++++ .../util/PostgresQueueListenerTest.java | 193 +++++++++++++++ .../src/test/resources/application.properties | 1 - 8 files changed, 576 insertions(+), 7 deletions(-) create mode 100644 postgres-persistence/src/main/java/com/netflix/conductor/postgres/util/PostgresQueueListener.java create mode 100644 postgres-persistence/src/main/java/com/netflix/conductor/postgres/util/QueueStats.java create mode 100644 postgres-persistence/src/main/resources/db/migration_postgres_notify/V10.1__notify.sql create mode 100644 postgres-persistence/src/test/java/com/netflix/conductor/postgres/util/PostgresQueueListenerTest.java diff --git a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/config/PostgresConfiguration.java b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/config/PostgresConfiguration.java index 25eb5ad74..715beefb0 100644 --- a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/config/PostgresConfiguration.java +++ b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/config/PostgresConfiguration.java @@ -19,6 +19,7 @@ import javax.sql.DataSource; import org.flywaydb.core.Flyway; +import org.flywaydb.core.api.configuration.FluentConfiguration; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; @@ -55,9 +56,16 @@ public PostgresConfiguration(DataSource dataSource, PostgresProperties propertie @Bean(initMethod = "migrate") @PostConstruct public Flyway flywayForPrimaryDb() { - return Flyway.configure() - .locations("classpath:db/migration_postgres") - .configuration(Map.of("flyway.postgresql.transactional.lock", "false")) + FluentConfiguration config = Flyway.configure(); + + if (properties.getExperimentalQueueNotify()) { + config.locations( + "classpath:db/migration_postgres", "classpath:db/migration_postgres_notify"); + } else { + config.locations("classpath:db/migration_postgres"); + } + + return config.configuration(Map.of("flyway.postgresql.transactional.lock", "false")) .schemas(properties.getSchema()) .dataSource(dataSource) .outOfOrder(true) @@ -95,8 +103,9 @@ public PostgresPollDataDAO postgresPollDataDAO( @DependsOn({"flywayForPrimaryDb"}) public PostgresQueueDAO postgresQueueDAO( @Qualifier("postgresRetryTemplate") RetryTemplate retryTemplate, - ObjectMapper objectMapper) { - return new PostgresQueueDAO(retryTemplate, objectMapper, dataSource); + ObjectMapper objectMapper, + PostgresProperties properties) { + return new PostgresQueueDAO(retryTemplate, objectMapper, dataSource, properties); } @Bean diff --git a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/config/PostgresProperties.java b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/config/PostgresProperties.java index 4f4338d37..3b42d46c9 100644 --- a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/config/PostgresProperties.java +++ b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/config/PostgresProperties.java @@ -33,6 +33,10 @@ public class PostgresProperties { @DurationUnit(ChronoUnit.MILLIS) private Duration pollDataCacheValidityPeriod = Duration.ofMillis(0); + private boolean experimentalQueueNotify = false; + + private Integer experimentalQueueNotifyStalePeriod = 5000; + public String schema = "public"; public boolean allowFullTextQueries = true; @@ -45,6 +49,22 @@ public class PostgresProperties { /** The size of the queue used for holding async indexing tasks */ private int asyncWorkerQueueSize = 100; + public boolean getExperimentalQueueNotify() { + return experimentalQueueNotify; + } + + public void setExperimentalQueueNotify(boolean experimentalQueueNotify) { + this.experimentalQueueNotify = experimentalQueueNotify; + } + + public Integer getExperimentalQueueNotifyStalePeriod() { + return experimentalQueueNotifyStalePeriod; + } + + public void setExperimentalQueueNotifyStalePeriod(Integer experimentalQueueNotifyStalePeriod) { + this.experimentalQueueNotifyStalePeriod = experimentalQueueNotifyStalePeriod; + } + public Duration getTaskDefCacheRefreshInterval() { return taskDefCacheRefreshInterval; } diff --git a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresQueueDAO.java b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresQueueDAO.java index 71af60838..ab38281aa 100644 --- a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresQueueDAO.java +++ b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresQueueDAO.java @@ -25,7 +25,9 @@ import com.netflix.conductor.core.events.queue.Message; import com.netflix.conductor.dao.QueueDAO; +import com.netflix.conductor.postgres.config.PostgresProperties; import com.netflix.conductor.postgres.util.ExecutorsUtil; +import com.netflix.conductor.postgres.util.PostgresQueueListener; import com.netflix.conductor.postgres.util.Query; import com.fasterxml.jackson.databind.ObjectMapper; @@ -40,8 +42,13 @@ public class PostgresQueueDAO extends PostgresBaseDAO implements QueueDAO { private final ScheduledExecutorService scheduledExecutorService; + private PostgresQueueListener queueListener; + public PostgresQueueDAO( - RetryTemplate retryTemplate, ObjectMapper objectMapper, DataSource dataSource) { + RetryTemplate retryTemplate, + ObjectMapper objectMapper, + DataSource dataSource, + PostgresProperties properties) { super(retryTemplate, objectMapper, dataSource); this.scheduledExecutorService = @@ -53,6 +60,10 @@ public PostgresQueueDAO( UNACK_SCHEDULE_MS, TimeUnit.MILLISECONDS); logger.debug("{} is ready to serve", PostgresQueueDAO.class.getName()); + + if (properties.getExperimentalQueueNotify()) { + this.queueListener = new PostgresQueueListener(dataSource, properties); + } } @PreDestroy @@ -169,6 +180,13 @@ public void remove(String queueName, String messageId) { @Override public int getSize(String queueName) { + if (queueListener != null) { + Optional size = queueListener.getSize(queueName); + if (size.isPresent()) { + return size.get(); + } + } + final String GET_QUEUE_SIZE = "SELECT COUNT(*) FROM queue_message WHERE queue_name = ?"; return queryWithTransaction( GET_QUEUE_SIZE, q -> ((Long) q.addParameter(queueName).executeCount()).intValue()); @@ -425,6 +443,12 @@ private boolean removeMessage(Connection connection, String queueName, String me private List popMessages( Connection connection, String queueName, int count, int timeout) { + if (this.queueListener != null) { + if (!this.queueListener.hasMessagesReady(queueName)) { + return new ArrayList<>(); + } + } + String POP_QUERY = "UPDATE queue_message SET popped = true WHERE message_id IN (" + "SELECT message_id FROM queue_message WHERE queue_name = ? AND popped = false AND " diff --git a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/util/PostgresQueueListener.java b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/util/PostgresQueueListener.java new file mode 100644 index 000000000..4fcbe5326 --- /dev/null +++ b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/util/PostgresQueueListener.java @@ -0,0 +1,226 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.postgres.util; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Optional; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import javax.sql.DataSource; + +import org.postgresql.PGConnection; +import org.postgresql.PGNotification; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.netflix.conductor.core.exception.NonTransientException; +import com.netflix.conductor.postgres.config.PostgresProperties; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +public class PostgresQueueListener { + + private PGConnection pgconn; + + private volatile Connection conn; + + private final Lock connectionLock = new ReentrantLock(); + + private DataSource dataSource; + + private HashMap queues; + + private volatile boolean connected = false; + + private long lastNotificationTime = 0; + + private Integer stalePeriod; + + protected final Logger logger = LoggerFactory.getLogger(getClass()); + + public PostgresQueueListener(DataSource dataSource, PostgresProperties properties) { + logger.info("Using experimental PostgresQueueListener"); + this.dataSource = dataSource; + this.stalePeriod = properties.getExperimentalQueueNotifyStalePeriod(); + connect(); + } + + public boolean hasMessagesReady(String queueName) { + checkUpToDate(); + handleNotifications(); + if (notificationIsStale() || !connected) { + connect(); + return true; + } + + QueueStats queueStats = queues.get(queueName); + if (queueStats == null) { + return false; + } + + if (queueStats.getNextDelivery() > System.currentTimeMillis()) { + return false; + } + + return true; + } + + public Optional getSize(String queueName) { + checkUpToDate(); + handleNotifications(); + if (notificationIsStale() || !connected) { + connect(); + return Optional.empty(); + } + + QueueStats queueStats = queues.get(queueName); + if (queueStats == null) { + return Optional.of(0); + } + + return Optional.of(queueStats.getDepth()); + } + + private boolean notificationIsStale() { + return System.currentTimeMillis() - lastNotificationTime > this.stalePeriod; + } + + private void connect() { + // Attempt to acquire the lock without waiting. + if (!connectionLock.tryLock()) { + // If the lock is not available, return early. + return; + } + + boolean newConnectedState = false; + + try { + // Check if the connection is null or not valid. + if (conn == null || !conn.isValid(1)) { + // Close the old connection if it exists and is not valid. + if (conn != null) { + try { + conn.close(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + } + + // Establish a new connection. + try { + this.conn = dataSource.getConnection(); + this.pgconn = conn.unwrap(PGConnection.class); + + boolean previousAutoCommitMode = conn.getAutoCommit(); + conn.setAutoCommit(true); + try { + conn.prepareStatement("LISTEN conductor_queue_state").execute(); + newConnectedState = true; + } catch (Throwable th) { + conn.rollback(); + logger.error(th.getMessage()); + } finally { + conn.setAutoCommit(previousAutoCommitMode); + } + requestStats(); + } catch (SQLException e) { + throw new NonTransientException(e.getMessage(), e); + } + } + } catch (Exception e) { + throw new NonTransientException(e.getMessage(), e); + } finally { + connected = newConnectedState; + // Ensure the lock is always released. + connectionLock.unlock(); + } + } + + private void requestStats() { + try { + boolean previousAutoCommitMode = conn.getAutoCommit(); + conn.setAutoCommit(true); + try { + conn.prepareStatement("SELECT queue_notify()").execute(); + connected = true; + } catch (Throwable th) { + conn.rollback(); + logger.error(th.getMessage()); + } finally { + conn.setAutoCommit(previousAutoCommitMode); + } + } catch (SQLException e) { + if (!e.getSQLState().equals("08003")) { + logger.error("Error fetching notifications {}", e.getSQLState()); + } + connect(); + } + } + + private void checkUpToDate() { + if (System.currentTimeMillis() - lastNotificationTime > this.stalePeriod * 0.75) { + requestStats(); + } + } + + private void handleNotifications() { + try { + PGNotification[] notifications = pgconn.getNotifications(); + if (notifications == null || notifications.length == 0) { + return; + } + processPayload(notifications[notifications.length - 1].getParameter()); + } catch (SQLException e) { + if (e.getSQLState() != "08003") { + logger.error("Error fetching notifications {}", e.getSQLState()); + } + connect(); + } + } + + private void processPayload(String payload) { + ObjectMapper objectMapper = new ObjectMapper(); + try { + JsonNode notification = objectMapper.readTree(payload); + JsonNode lastNotificationTime = notification.get("__now__"); + if (lastNotificationTime != null) { + this.lastNotificationTime = lastNotificationTime.asLong(); + } + Iterator iterator = notification.fieldNames(); + + HashMap queueStats = new HashMap<>(); + iterator.forEachRemaining( + key -> { + if (!key.equals("__now__")) { + try { + QueueStats stats = + objectMapper.treeToValue( + notification.get(key), QueueStats.class); + queueStats.put(key, stats); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + }); + this.queues = queueStats; + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} diff --git a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/util/QueueStats.java b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/util/QueueStats.java new file mode 100644 index 000000000..6cbb9cecd --- /dev/null +++ b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/util/QueueStats.java @@ -0,0 +1,39 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.postgres.util; + +public class QueueStats { + private Integer depth; + + private long nextDelivery; + + public void setDepth(Integer depth) { + this.depth = depth; + } + + public Integer getDepth() { + return depth; + } + + public void setNextDelivery(long nextDelivery) { + this.nextDelivery = nextDelivery; + } + + public long getNextDelivery() { + return nextDelivery; + } + + public String toString() { + return "{nextDelivery: " + nextDelivery + " depth: " + depth + "}"; + } +} diff --git a/postgres-persistence/src/main/resources/db/migration_postgres_notify/V10.1__notify.sql b/postgres-persistence/src/main/resources/db/migration_postgres_notify/V10.1__notify.sql new file mode 100644 index 000000000..7d40d6eda --- /dev/null +++ b/postgres-persistence/src/main/resources/db/migration_postgres_notify/V10.1__notify.sql @@ -0,0 +1,59 @@ +-- This function notifies on 'conductor_queue_state' with a JSON string containing +-- queue metadata that looks like: +-- { +-- "queue_name_1": { +-- "nextDelivery": 1234567890123, +-- "depth": 10 +-- }, +-- "queue_name_2": { +-- "nextDelivery": 1234567890456, +-- "depth": 5 +-- }, +-- "__now__": 1234567890999 +-- } +-- +CREATE OR REPLACE FUNCTION queue_notify() RETURNS void +LANGUAGE SQL +AS $$ + SELECT pg_notify('conductor_queue_state', ( + SELECT + COALESCE(jsonb_object_agg(KEY, val), '{}'::jsonb) || + jsonb_build_object('__now__', (extract('epoch' from CURRENT_TIMESTAMP)*1000)::bigint) + FROM ( + SELECT + queue_name AS KEY, + jsonb_build_object( + 'nextDelivery', + (extract('epoch' from min(deliver_on))*1000)::bigint, + 'depth', + count(*) + ) AS val + FROM + queue_message + WHERE + popped = FALSE + GROUP BY + queue_name) AS sq)::text); +$$; + + +CREATE FUNCTION queue_notify_trigger() + RETURNS TRIGGER + LANGUAGE PLPGSQL +AS $$ +BEGIN + PERFORM queue_notify(); + RETURN NULL; +END; +$$; + +CREATE TRIGGER queue_update + AFTER UPDATE ON queue_message + FOR EACH ROW + WHEN (OLD.popped IS DISTINCT FROM NEW.popped) + EXECUTE FUNCTION queue_notify_trigger(); + +CREATE TRIGGER queue_insert_delete + AFTER INSERT OR DELETE ON queue_message + FOR EACH ROW + EXECUTE FUNCTION queue_notify_trigger(); diff --git a/postgres-persistence/src/test/java/com/netflix/conductor/postgres/util/PostgresQueueListenerTest.java b/postgres-persistence/src/test/java/com/netflix/conductor/postgres/util/PostgresQueueListenerTest.java new file mode 100644 index 000000000..700813ba5 --- /dev/null +++ b/postgres-persistence/src/test/java/com/netflix/conductor/postgres/util/PostgresQueueListenerTest.java @@ -0,0 +1,193 @@ +/* + * Copyright 2023 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.postgres.util; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.util.*; + +import javax.sql.DataSource; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.flyway.FlywayAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit4.SpringRunner; +import org.testcontainers.shaded.com.fasterxml.jackson.databind.node.JsonNodeFactory; +import org.testcontainers.shaded.com.fasterxml.jackson.databind.node.ObjectNode; + +import com.netflix.conductor.common.config.TestObjectMapperConfiguration; +import com.netflix.conductor.postgres.config.PostgresConfiguration; +import com.netflix.conductor.postgres.config.PostgresProperties; + +import static org.junit.Assert.*; + +@ContextConfiguration( + classes = { + TestObjectMapperConfiguration.class, + PostgresConfiguration.class, + FlywayAutoConfiguration.class + }) +@RunWith(SpringRunner.class) +@TestPropertySource( + properties = { + "conductor.elasticsearch.version=0", + "spring.flyway.clean-disabled=false", + "conductor.database.type=postgres", + "conductor.postgres.experimentalQueueNotify=true", + "conductor.postgres.experimentalQueueNotifyStalePeriod=5000" + }) +@SpringBootTest +public class PostgresQueueListenerTest { + + private PostgresQueueListener listener; + + @Qualifier("dataSource") + @Autowired + private DataSource dataSource; + + @Autowired private PostgresProperties properties; + + private void clearDb() { + try (Connection conn = dataSource.getConnection()) { + conn.setAutoCommit(true); + conn.prepareStatement("truncate table queue_message").executeUpdate(); + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + + private void sendNotification(String queueName, int queueDepth, long nextDelivery) { + JsonNodeFactory factory = JsonNodeFactory.instance; + ObjectNode payload = factory.objectNode(); + ObjectNode queueNode = factory.objectNode(); + queueNode.put("depth", queueDepth); + queueNode.put("nextDelivery", nextDelivery); + payload.put("__now__", System.currentTimeMillis()); + payload.put(queueName, queueNode); + + try (Connection conn = dataSource.getConnection()) { + conn.setAutoCommit(true); + PreparedStatement stmt = + conn.prepareStatement("SELECT pg_notify('conductor_queue_state', ?)"); + stmt.setString(1, payload.toString()); + stmt.execute(); + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + + private void createQueueMessage(String queue_name, String message_id) { + try (Connection conn = dataSource.getConnection()) { + conn.setAutoCommit(true); + PreparedStatement stmt = + conn.prepareStatement( + "INSERT INTO queue_message (deliver_on, queue_name, message_id, priority, offset_time_seconds, payload) VALUES (current_timestamp, ?,?,?,?,?)"); + stmt.setString(1, queue_name); + stmt.setString(2, message_id); + stmt.setInt(3, 0); + stmt.setInt(4, 0); + stmt.setString(5, "dummy-payload"); + stmt.execute(); + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + + private void popQueueMessage(String message_id) { + try (Connection conn = dataSource.getConnection()) { + conn.setAutoCommit(true); + PreparedStatement stmt = + conn.prepareStatement( + "UPDATE queue_message SET popped = TRUE where message_id = ?"); + stmt.setString(1, message_id); + stmt.execute(); + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + + private void deleteQueueMessage(String message_id) { + try (Connection conn = dataSource.getConnection()) { + conn.setAutoCommit(true); + PreparedStatement stmt = + conn.prepareStatement("DELETE FROM queue_message where message_id = ?"); + stmt.setString(1, message_id); + stmt.execute(); + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + + @Before + public void before() { + listener = new PostgresQueueListener(dataSource, properties); + clearDb(); + } + + @Test + public void testHasReadyMessages() { + assertFalse(listener.hasMessagesReady("dummy-task")); + sendNotification("dummy-task", 3, System.currentTimeMillis() - 1); + assertTrue(listener.hasMessagesReady("dummy-task")); + } + + @Test + public void testHasReadyMessagesInFuture() throws InterruptedException { + assertFalse(listener.hasMessagesReady("dummy-task")); + sendNotification("dummy-task", 3, System.currentTimeMillis() + 100); + assertFalse(listener.hasMessagesReady("dummy-task")); + Thread.sleep(101); + assertTrue(listener.hasMessagesReady("dummy-task")); + } + + @Test + public void testGetSize() { + assertEquals(0, listener.getSize("dummy-task").get().intValue()); + sendNotification("dummy-task", 3, System.currentTimeMillis() + 100); + assertEquals(3, listener.getSize("dummy-task").get().intValue()); + } + + @Test + public void testTrigger() throws InterruptedException { + assertEquals(0, listener.getSize("dummy-task").get().intValue()); + assertFalse(listener.hasMessagesReady("dummy-task")); + + createQueueMessage("dummy-task", "dummy-id1"); + createQueueMessage("dummy-task", "dummy-id2"); + assertEquals(2, listener.getSize("dummy-task").get().intValue()); + assertTrue(listener.hasMessagesReady("dummy-task")); + + popQueueMessage("dummy-id2"); + assertEquals(1, listener.getSize("dummy-task").get().intValue()); + assertTrue(listener.hasMessagesReady("dummy-task")); + + deleteQueueMessage("dummy-id2"); + assertEquals(1, listener.getSize("dummy-task").get().intValue()); + assertTrue(listener.hasMessagesReady("dummy-task")); + + deleteQueueMessage("dummy-id1"); + assertEquals(0, listener.getSize("dummy-task").get().intValue()); + assertFalse(listener.hasMessagesReady("test-task")); + } +} diff --git a/postgres-persistence/src/test/resources/application.properties b/postgres-persistence/src/test/resources/application.properties index 06ea54aa4..d4ac858f7 100644 --- a/postgres-persistence/src/test/resources/application.properties +++ b/postgres-persistence/src/test/resources/application.properties @@ -5,4 +5,3 @@ spring.datasource.username=postgres spring.datasource.password=postgres spring.datasource.hikari.maximum-pool-size=8 spring.datasource.hikari.auto-commit=false -spring.flyway.locations=classpath:db/migration_postgres