Skip to content

Commit

Permalink
Optimise the postgres queries for popping messages (#76)
Browse files Browse the repository at this point in the history
  • Loading branch information
bjpirt authored Feb 21, 2024
1 parent e04c9bf commit 06bc5fd
Showing 1 changed file with 9 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -422,17 +422,19 @@ private boolean removeMessage(Connection connection, String queueName, String me
q -> q.addParameter(queueName).addParameter(messageId).executeDelete());
}

private List<Message> peekMessages(Connection connection, String queueName, int count) {
if (count < 1) {
return Collections.emptyList();
}
private List<Message> popMessages(
Connection connection, String queueName, int count, int timeout) {

final String PEEK_MESSAGES =
"SELECT message_id, priority, payload FROM queue_message WHERE queue_name = ? AND popped = false AND deliver_on <= (current_timestamp + (1000 ||' microseconds')::interval) ORDER BY priority DESC, deliver_on, created_on LIMIT ? FOR UPDATE SKIP LOCKED";
String POP_QUERY =
"UPDATE queue_message SET popped = true WHERE message_id IN ("
+ "SELECT message_id FROM queue_message WHERE queue_name = ? AND popped = false AND "
+ "deliver_on <= (current_timestamp + (1000 ||' microseconds')::interval) "
+ "ORDER BY priority DESC, deliver_on, created_on LIMIT ? FOR UPDATE SKIP LOCKED"
+ ") RETURNING message_id, priority, payload";

return query(
connection,
PEEK_MESSAGES,
POP_QUERY,
p ->
p.addParameter(queueName)
.addParameter(count)
Expand All @@ -450,34 +452,6 @@ private List<Message> peekMessages(Connection connection, String queueName, int
}));
}

private List<Message> popMessages(
Connection connection, String queueName, int count, int timeout) {
List<Message> messages = peekMessages(connection, queueName, count);

if (messages.isEmpty()) {
return messages;
}

List<Message> poppedMessages = new ArrayList<>();
for (Message message : messages) {
final String POP_MESSAGE =
"UPDATE queue_message SET popped = true WHERE queue_name = ? AND message_id = ? AND popped = false";
int result =
query(
connection,
POP_MESSAGE,
q ->
q.addParameter(queueName)
.addParameter(message.getId())
.executeUpdate());

if (result == 1) {
poppedMessages.add(message);
}
}
return poppedMessages;
}

@Override
public boolean containsMessage(String queueName, String messageId) {
return getWithRetriedTransactions(tx -> existsMessage(tx, queueName, messageId));
Expand Down

0 comments on commit 06bc5fd

Please sign in to comment.