Skip to content

Commit

Permalink
Revert "Optimise the postgres queries for popping messages"
Browse files Browse the repository at this point in the history
This reverts commit 0677305.
  • Loading branch information
matiasbur committed May 2, 2024
1 parent 0677305 commit 74667ff
Showing 1 changed file with 36 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -422,19 +422,17 @@ private boolean removeMessage(Connection connection, String queueName, String me
q -> q.addParameter(queueName).addParameter(messageId).executeDelete());
}

private List<Message> popMessages(
Connection connection, String queueName, int count, int timeout) {
private List<Message> peekMessages(Connection connection, String queueName, int count) {
if (count < 1) {
return Collections.emptyList();
}

String POP_QUERY =
"UPDATE queue_message SET popped = true WHERE message_id IN ("
+ "SELECT message_id FROM queue_message WHERE queue_name = ? AND popped = false AND "
+ "deliver_on <= (current_timestamp + (1000 ||' microseconds')::interval) "
+ "ORDER BY priority DESC, deliver_on, created_on LIMIT ? FOR UPDATE SKIP LOCKED"
+ ") RETURNING message_id, priority, payload";
final String PEEK_MESSAGES =
"SELECT message_id, priority, payload FROM queue_message WHERE queue_name = ? AND popped = false AND deliver_on <= (current_timestamp + (1000 ||' microseconds')::interval) ORDER BY priority DESC, deliver_on, created_on LIMIT ? FOR UPDATE SKIP LOCKED";

return query(
connection,
POP_QUERY,
PEEK_MESSAGES,
p ->
p.addParameter(queueName)
.addParameter(count)
Expand All @@ -452,6 +450,34 @@ private List<Message> popMessages(
}));
}

private List<Message> popMessages(
Connection connection, String queueName, int count, int timeout) {
List<Message> messages = peekMessages(connection, queueName, count);

if (messages.isEmpty()) {
return messages;
}

List<Message> poppedMessages = new ArrayList<>();
for (Message message : messages) {
final String POP_MESSAGE =
"UPDATE queue_message SET popped = true WHERE queue_name = ? AND message_id = ? AND popped = false";
int result =
query(
connection,
POP_MESSAGE,
q ->
q.addParameter(queueName)
.addParameter(message.getId())
.executeUpdate());

if (result == 1) {
poppedMessages.add(message);
}
}
return poppedMessages;
}

@Override
public boolean containsMessage(String queueName, String messageId) {
return getWithRetriedTransactions(tx -> existsMessage(tx, queueName, messageId));
Expand All @@ -473,4 +499,4 @@ private class QueueMessage {
public String queueName;
public String messageId;
}
}
}

0 comments on commit 74667ff

Please sign in to comment.