Skip to content

Commit

Permalink
fix: Allow dropping cleaned-up keys (#911)
Browse files Browse the repository at this point in the history
  • Loading branch information
acocuzzo authored May 5, 2023
1 parent 9ffb8d6 commit 4b3157c
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 12 deletions.
36 changes: 24 additions & 12 deletions google/cloud/pubsub_v1/subscriber/_protocol/messages_on_hold.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@
# limitations under the License.

import collections
import logging
import typing
from typing import Any, Callable, Iterable, Optional

if typing.TYPE_CHECKING: # pragma: NO COVER
from google.cloud.pubsub_v1 import subscriber


_LOGGER = logging.getLogger(__name__)


class MessagesOnHold(object):
"""Tracks messages on hold by ordering key. Not thread-safe."""

Expand Down Expand Up @@ -113,14 +117,17 @@ def activate_ordering_keys(
Args:
ordering_keys:
The ordering keys to activate. May be empty.
The ordering keys to activate. May be empty, or contain duplicates.
schedule_message_callback:
The callback to call to schedule a message to be sent to the user.
"""
for key in ordering_keys:
assert (
self._pending_ordered_messages.get(key) is not None
), "A message queue should exist for every ordered message in flight."
pending_ordered_messages = self._pending_ordered_messages.get(key)
if pending_ordered_messages is None:
_LOGGER.warning(
"No message queue exists for message ordering key: %s.", key
)
continue
next_msg = self._get_next_for_ordering_key(key)
if next_msg:
# Schedule the next message because the previous was dropped.
Expand Down Expand Up @@ -154,15 +161,20 @@ def _get_next_for_ordering_key(
def _clean_up_ordering_key(self, ordering_key: str) -> None:
"""Clean up state for an ordering key with no pending messages.
Args:
Args
ordering_key: The ordering key to clean up.
"""
message_queue = self._pending_ordered_messages.get(ordering_key)
assert (
message_queue is not None
), "Cleaning up ordering key that does not exist."
assert not len(message_queue), (
"Ordering key must only be removed if there are no messages "
"left for that key."
)
if message_queue is None:
_LOGGER.warning(
"Tried to clean up ordering key that does not exist: %s", ordering_key
)
return
if len(message_queue) > 0:
_LOGGER.warning(
"Tried to clean up ordering key: %s with %d messages remaining.",
ordering_key,
len(message_queue),
)
return
del self._pending_ordered_messages[ordering_key]
102 changes: 102 additions & 0 deletions tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,72 @@ def test_ordered_messages_one_key():
assert moh.size == 0


def test_ordered_messages_drop_duplicate_keys(caplog):
moh = messages_on_hold.MessagesOnHold()

msg1 = make_message(ack_id="ack1", ordering_key="key1")
moh.put(msg1)
assert moh.size == 1

msg2 = make_message(ack_id="ack2", ordering_key="key1")
moh.put(msg2)
assert moh.size == 2

# Get first message for "key1"
assert moh.get() == msg1
assert moh.size == 1

# Still waiting on the previously-sent message for "key1", and there are no
# other messages, so return None.
assert moh.get() is None
assert moh.size == 1

# Activate "key1".
callback_tracker = ScheduleMessageCallbackTracker()
moh.activate_ordering_keys(["key1", "key1"], callback_tracker)
assert callback_tracker.called
assert callback_tracker.message == msg2
assert moh.size == 0
assert len(moh._pending_ordered_messages) == 0

# Activate "key1" again
callback_tracker = ScheduleMessageCallbackTracker()
moh.activate_ordering_keys(["key1"], callback_tracker)
assert not callback_tracker.called

# Activate "key1" again. There are no other messages for that key, so clean
# up state for that key.
callback_tracker = ScheduleMessageCallbackTracker()
moh.activate_ordering_keys(["key1"], callback_tracker)
assert not callback_tracker.called

msg3 = make_message(ack_id="ack3", ordering_key="key1")
moh.put(msg3)
assert moh.size == 1

# Get next message for "key1"
assert moh.get() == msg3
assert moh.size == 0

# Activate "key1".
callback_tracker = ScheduleMessageCallbackTracker()
moh.activate_ordering_keys(["key1"], callback_tracker)
assert not callback_tracker.called

# Activate "key1" again. There are no other messages for that key, so clean
# up state for that key.
callback_tracker = ScheduleMessageCallbackTracker()
moh.activate_ordering_keys(["key1"], callback_tracker)
assert not callback_tracker.called

# Activate "key1" again after being cleaned up. There are no other messages for that key, so clean
# up state for that key.
callback_tracker = ScheduleMessageCallbackTracker()
moh.activate_ordering_keys(["key1"], callback_tracker)
assert not callback_tracker.called
assert "No message queue exists for message ordering key: key1" in caplog.text


def test_ordered_messages_two_keys():
moh = messages_on_hold.MessagesOnHold()

Expand Down Expand Up @@ -278,3 +344,39 @@ def test_ordered_and_unordered_messages_interleaved():
# No messages left.
assert moh.get() is None
assert moh.size == 0


def test_cleanup_nonexistent_key(caplog):
moh = messages_on_hold.MessagesOnHold()
moh._clean_up_ordering_key("non-existent-key")
assert (
"Tried to clean up ordering key that does not exist: non-existent-key"
in caplog.text
)


def test_cleanup_key_with_messages(caplog):
moh = messages_on_hold.MessagesOnHold()

# Put message with "key1".
msg1 = make_message(ack_id="ack1", ordering_key="key1")
moh.put(msg1)
assert moh.size == 1

# Put another message "key1"
msg2 = make_message(ack_id="ack2", ordering_key="key1")
moh.put(msg2)
assert moh.size == 2

# Get first message for "key1"
assert moh.get() == msg1
assert moh.size == 1

# Get first message for "key1"
assert moh.get() is None
assert moh.size == 1

moh._clean_up_ordering_key("key1")
assert (
"Tried to clean up ordering key: key1 with 1 messages remaining." in caplog.text
)

0 comments on commit 4b3157c

Please sign in to comment.