diff --git a/README.md b/README.md index bd42476..b8f9ba4 100644 --- a/README.md +++ b/README.md @@ -56,7 +56,7 @@ The client supports two codecs to store the messages to the server: By default you should use `AMQP 1.0` codec: ```python amqp_message = AMQPMessage( - body="hello: {}".format(i), + body=bytes("hello: {}".format(i), "utf-8"), ) ``` @@ -287,7 +287,7 @@ This one: ```python for i in range(1_000_000): amqp_message = AMQPMessage( - body="hello: {}".format(i), + body=bytes("hello: {}".format(i), "utf-8"), ) # send is asynchronous await producer.send(stream=STREAM, message=amqp_message) diff --git a/docs/examples/basic_producers/producer_send.py b/docs/examples/basic_producers/producer_send.py index f2b8bdb..50c2140 100644 --- a/docs/examples/basic_producers/producer_send.py +++ b/docs/examples/basic_producers/producer_send.py @@ -17,7 +17,7 @@ async def publish(): for i in range(MESSAGES): amqp_message = AMQPMessage( - body="hello: {}".format(i), + body=bytes("hello: {}".format(i), "utf-8"), ) # send is asynchronous await producer.send(stream=STREAM, message=amqp_message) diff --git a/docs/examples/basic_producers/producer_send_batch.py b/docs/examples/basic_producers/producer_send_batch.py index d59a6e8..e16b693 100644 --- a/docs/examples/basic_producers/producer_send_batch.py +++ b/docs/examples/basic_producers/producer_send_batch.py @@ -20,7 +20,7 @@ async def publish(): messages = [] for i in range(BATCH): amqp_message = AMQPMessage( - body="hello: {}".format(i), + body=bytes("hello: {}".format(i), "utf-8"), ) messages.append(amqp_message) # send_batch is synchronous. will wait till termination diff --git a/docs/examples/basic_producers/producer_send_wait.py b/docs/examples/basic_producers/producer_send_wait.py index 04c9f7c..efa016d 100644 --- a/docs/examples/basic_producers/producer_send_wait.py +++ b/docs/examples/basic_producers/producer_send_wait.py @@ -14,7 +14,7 @@ async def publish(): # sending a thousand of messages in AMQP format for i in range(MESSAGES): amqp_message = AMQPMessage( - body="hello: {}".format(i), + body=bytes("hello: {}".format(i), "utf-8"), ) # send is synchronous. It will also wait synchronously for the confirmation to arrive from the server # it is really very slow and send() + callback for asynchronous confirmation should be used instead. diff --git a/docs/examples/filtering/producer_filtering.py b/docs/examples/filtering/producer_filtering.py index 1e21991..8900a45 100644 --- a/docs/examples/filtering/producer_filtering.py +++ b/docs/examples/filtering/producer_filtering.py @@ -26,7 +26,7 @@ async def publish(): "region": "New York", } amqp_message = AMQPMessage( - body="hello: {}".format(i), + body=bytes("hello: {}".format(i), "utf-8"), application_properties=application_properties, ) # send is asynchronous @@ -43,7 +43,7 @@ async def publish(): "region": "California", } amqp_message = AMQPMessage( - body="hello: {}".format(i), + body=bytes("hello: {}".format(i), "utf-8"), application_properties=application_properties, ) # send is asynchronous diff --git a/docs/examples/filtering/super_stream_producer_filtering.py b/docs/examples/filtering/super_stream_producer_filtering.py index d87564e..dff1f22 100644 --- a/docs/examples/filtering/super_stream_producer_filtering.py +++ b/docs/examples/filtering/super_stream_producer_filtering.py @@ -36,7 +36,7 @@ async def publish(): for i in range(MESSAGES): application_properties = {"region": "New York", "id": "{}".format(i)} amqp_message = AMQPMessage( - body="hello: {}".format(i), + body=bytes("hello: {}".format(i), "utf-8"), application_properties=application_properties, ) # send is asynchronous @@ -51,7 +51,7 @@ async def publish(): for i in range(MESSAGES): application_properties = {"region": "California", "id": "{}".format(i)} amqp_message = AMQPMessage( - body="hello: {}".format(i), + body=bytes("hello: {}".format(i), "utf-8"), application_properties=application_properties, ) # send is asynchronous diff --git a/docs/examples/producers_with_confirmations/send_batch_with_confirmation.py b/docs/examples/producers_with_confirmations/send_batch_with_confirmation.py index 9c8d5a2..86c4d17 100644 --- a/docs/examples/producers_with_confirmations/send_batch_with_confirmation.py +++ b/docs/examples/producers_with_confirmations/send_batch_with_confirmation.py @@ -34,7 +34,7 @@ async def publish(): messages = [] for i in range(BATCH): amqp_message = AMQPMessage( - body="hello: {}".format(i), + body=bytes("hello: {}".format(i), "utf-8"), ) messages.append(amqp_message) # send_batch is synchronous. will wait till termination diff --git a/docs/examples/producers_with_confirmations/send_with_confirmation.py b/docs/examples/producers_with_confirmations/send_with_confirmation.py index 526ce5d..a393947 100644 --- a/docs/examples/producers_with_confirmations/send_with_confirmation.py +++ b/docs/examples/producers_with_confirmations/send_with_confirmation.py @@ -32,7 +32,7 @@ async def publish(): # sending a million of messages in AMQP format for i in range(MESSAGES): amqp_message = AMQPMessage( - body="hello: {}".format(i), + body=bytes("hello: {}".format(i), "utf-8"), ) # send is asynchronous - also confirmation is taken asynchronously by _on_publish_confirm_client callback # you can specify different callbacks for different messages. diff --git a/docs/examples/reliable_client/BestPracticesClient.py b/docs/examples/reliable_client/BestPracticesClient.py index b82d7a4..405b8f3 100644 --- a/docs/examples/reliable_client/BestPracticesClient.py +++ b/docs/examples/reliable_client/BestPracticesClient.py @@ -217,7 +217,7 @@ async def publish(rabbitmq_configuration: dict): return amqp_message = AMQPMessage( - body="hello: {}".format(i), + body=bytes("hello: {}".format(i), "utf-8"), application_properties={"id": "{}".format(i)}, ) # send is asynchronous diff --git a/docs/examples/single_active_consumer/sac_super_stream_producer.py b/docs/examples/single_active_consumer/sac_super_stream_producer.py index d21e872..d10deea 100644 --- a/docs/examples/single_active_consumer/sac_super_stream_producer.py +++ b/docs/examples/single_active_consumer/sac_super_stream_producer.py @@ -38,7 +38,7 @@ async def publish(): # run slowly several messages in order to test with sac for i in range(1000000): amqp_message = AMQPMessage( - body="message_:{}".format(i), + body=bytes("hello: {}".format(i), "utf-8"), properties=uamqp.message.MessageProperties(message_id=i), ) await producer.send(message=amqp_message) diff --git a/docs/examples/sub_entry_batch/producer_sub_entry_batch.py b/docs/examples/sub_entry_batch/producer_sub_entry_batch.py index 5da5d86..b33723c 100644 --- a/docs/examples/sub_entry_batch/producer_sub_entry_batch.py +++ b/docs/examples/sub_entry_batch/producer_sub_entry_batch.py @@ -19,7 +19,7 @@ async def publish(): messages = [] for i in range(BATCH): amqp_message = AMQPMessage( - body="a:{}".format(i), + body=bytes("hello: {}".format(i), "utf-8"), ) messages.append(amqp_message) diff --git a/docs/examples/super_stream/super_stream_producer.py b/docs/examples/super_stream/super_stream_producer.py index da0393f..5932748 100644 --- a/docs/examples/super_stream/super_stream_producer.py +++ b/docs/examples/super_stream/super_stream_producer.py @@ -36,7 +36,7 @@ async def publish(): start_time = time.perf_counter() for i in range(MESSAGES): amqp_message = AMQPMessage( - body="hello: {}".format(i), + body=bytes("hello: {}".format(i), "utf-8"), application_properties={"id": "{}".format(i)}, ) await super_stream_producer.send(amqp_message) diff --git a/docs/examples/super_stream/super_stream_producer_key.py b/docs/examples/super_stream/super_stream_producer_key.py index 27123a7..c67f929 100644 --- a/docs/examples/super_stream/super_stream_producer_key.py +++ b/docs/examples/super_stream/super_stream_producer_key.py @@ -36,7 +36,7 @@ async def publish(): # Sending a million messages for i in range(MESSAGES): amqp_message = AMQPMessage( - body="hello: {}".format(i), + body=bytes("hello: {}".format(i), "utf-8"), application_properties={"id": "{}".format(i)}, ) await super_stream_producer.send(amqp_message) diff --git a/docs/examples/tls/producer.py b/docs/examples/tls/producer.py index 5e4b42a..fc65ae1 100644 --- a/docs/examples/tls/producer.py +++ b/docs/examples/tls/producer.py @@ -26,7 +26,7 @@ async def publish(): messages = [] for i in range(BATCH): amqp_message = AMQPMessage( - body="hello: {}".format(i), + body=bytes("hello: {}".format(i), "utf-8"), ) messages.append(amqp_message) # send_batch is synchronous. will wait till termination diff --git a/poetry.lock b/poetry.lock index 19cdbba..40d7806 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. [[package]] name = "appnope" @@ -774,39 +774,6 @@ files = [ {file = "typing_extensions-4.11.0.tar.gz", hash = "sha256:83f085bd5ca59c80295fc2a82ab5dac679cbe02b9f33f7d83af68e241bea51b0"}, ] -[[package]] -name = "uamqp" -version = "1.6.4" -description = "AMQP 1.0 Client Library for Python" -optional = false -python-versions = ">=3.7" -files = [ - {file = "uamqp-1.6.4-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:a03f90b280ad4f2fad2f6d4c74f483da3727888e35ede52004d13b75f7f4078b"}, - {file = "uamqp-1.6.4-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bac44c5bbda91ca11d4ee2ef36685cd4e97a6279a74bfa9efe163938192abbb1"}, - {file = "uamqp-1.6.4-cp310-cp310-win32.whl", hash = "sha256:ca01bfb103a020e789663a1a4b75a19577c5632f25dd3c469c9e97def61f90a3"}, - {file = "uamqp-1.6.4-cp310-cp310-win_amd64.whl", hash = "sha256:ce5dfc751ccc8956a22a269848709203e69b20aa61ed38cd32db4abc4970d2db"}, - {file = "uamqp-1.6.4-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:e00c79c7d4c21cc4a4d6cf41d279496053f276530f858a747a2343070e96c98c"}, - {file = "uamqp-1.6.4-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ff847e87f68e0801433f0561f12b8b2884cfdae99cee35122f9b58b26d921961"}, - {file = "uamqp-1.6.4-cp311-cp311-win32.whl", hash = "sha256:a4213146256da9e9c9c15de149359da6fddf8939aa40d7aab4bab1ba009b9809"}, - {file = "uamqp-1.6.4-cp311-cp311-win_amd64.whl", hash = "sha256:d29ae7ec33b4729a9c68e67717da26d355d6e2f82dd0748ba20579caf0e7e9a1"}, - {file = "uamqp-1.6.4-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:61c4f591adbd8c8214c02386ebd9207a180c48ff30bd64fd2df0ab12ed47f831"}, - {file = "uamqp-1.6.4-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:313f8e6e40cd9b0f0c6fa1c381361fc355686da567222b6d47665694795ec968"}, - {file = "uamqp-1.6.4-cp37-cp37m-win32.whl", hash = "sha256:cadf0c43cca07be5580115720c56434fe966d42d8b26635b55f6a9fedb93d5ea"}, - {file = "uamqp-1.6.4-cp37-cp37m-win_amd64.whl", hash = "sha256:5d58b7352ebee51160d5add21cafda7fccfbfcc1d7084adaaca4864b296525d1"}, - {file = "uamqp-1.6.4-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:8a8e17acafc4d9331b176fca6a46098c411ebe6024041e8c91233394d89f2bca"}, - {file = "uamqp-1.6.4-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:41e10eb1e008671c5f8d1970be6977769f7710238530e6c7f017ee7ceb102487"}, - {file = "uamqp-1.6.4-cp38-cp38-win32.whl", hash = "sha256:78ae6e6ac954786fd70471dd7da404dfa530302dcfdd70c6da6dcdef64c2013d"}, - {file = "uamqp-1.6.4-cp38-cp38-win_amd64.whl", hash = "sha256:ea0d51e35eeeada4f22325ea12adb3f2569b63d63c0f1381c843b5ef34e71a67"}, - {file = "uamqp-1.6.4-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:376e5636c0b1d6e2c69390d30d95cabfb35a4fca11562a6324d30e248214707a"}, - {file = "uamqp-1.6.4-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:916f8fc33b9cf1c4d2020aa94e4b91fbff54bd5d679dffe1f111601c2d441376"}, - {file = "uamqp-1.6.4-cp39-cp39-win32.whl", hash = "sha256:a3c2d5e97ff5159acdf88dbbb0f8effabd493be07e5b8d1d785162c9080c40a9"}, - {file = "uamqp-1.6.4-cp39-cp39-win_amd64.whl", hash = "sha256:4de3b9011f4c3f8a69868cec0a7c3f40912215fe2a44bd8563bcb2dbb89db9a7"}, - {file = "uamqp-1.6.4.tar.gz", hash = "sha256:2183332435ef7882fad22724e3f2f63d3fd5a51c7f0c635d63487948b02e3749"}, -] - -[package.dependencies] -certifi = ">=2017.4.17" - [[package]] name = "urllib3" version = "2.0.2" @@ -838,4 +805,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "6b32b6be930a4d0562955c30def757d7a6412deae6be3b49fcf3ee7814960df8" +content-hash = "057ddac490c0ea7b41d2a9eb42763c64831f4a87043bb2431a5f2856815a75a0" diff --git a/pyproject.toml b/pyproject.toml index 4742939..c94b4e8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "rstream" -version = "0.19.1" +version = "0.20.0" description = "A python client for RabbitMQ Streams" authors = ["George Fortunatov ", "Daniele Palaia "] readme = "README.md" @@ -10,7 +10,6 @@ license = "MIT" [tool.poetry.dependencies] python = "^3.9" -uamqp = "^1.6.3" requests = "^2.31.0" mmh3 = "^4.0.0" @@ -24,11 +23,13 @@ pytest-asyncio = "^0.15.1" black = "^23.12.1" requests = "^2.31.0" mmh3 = "^4.0.0" +typing_extensions ="^4.11.0" [tool.poetry.group.dev.dependencies] pytest = "^7.4.0" requests = "^2.31.0" types-requests = "^2.31.0.20240406" +typing_extensions ="^4.11.0" [tool.black] line-length = 110 diff --git a/rstream/__init__.py b/rstream/__init__.py index bf601be..e9fbc5b 100644 --- a/rstream/__init__.py +++ b/rstream/__init__.py @@ -19,6 +19,8 @@ del metadata from .amqp import AMQPMessage, amqp_decoder # noqa: E402 +from ._pyamqp.message import Properties # noqa: E402 +from ._pyamqp.message import Header # noqa: E402 from .compression import CompressionType # noqa: E402 from .constants import ( # noqa: E402 ConsumerOffsetSpecification, @@ -56,6 +58,8 @@ "Consumer", "RawMessage", "Producer", + "Properties", + "Header", "OffsetType", "ConsumerOffsetSpecification", "ConfirmationStatus", diff --git a/rstream/_pyamqp/_decode.py b/rstream/_pyamqp/_decode.py new file mode 100644 index 0000000..6f0c8d0 --- /dev/null +++ b/rstream/_pyamqp/_decode.py @@ -0,0 +1,329 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- +# pylint: disable=redefined-builtin, import-error +# type: ignore + +import logging +import struct +import uuid +from typing import ( + TYPE_CHECKING, + Any, + Callable, + Dict, + List, + Optional, + Tuple, + Union, + cast, +) + +from typing_extensions import Literal + +from .message import Header, Message, Properties + +if TYPE_CHECKING: + from .message import MessageDict + +_LOGGER = logging.getLogger(__name__) +_HEADER_PREFIX = memoryview(b"AMQP") +_COMPOSITES = { + 35: "received", + 36: "accepted", + 37: "rejected", + 38: "released", + 39: "modified", +} + +c_unsigned_char = struct.Struct(">B") +c_signed_char = struct.Struct(">b") +c_unsigned_short = struct.Struct(">H") +c_signed_short = struct.Struct(">h") +c_unsigned_int = struct.Struct(">I") +c_signed_int = struct.Struct(">i") +c_unsigned_long = struct.Struct(">L") +c_unsigned_long_long = struct.Struct(">Q") +c_signed_long_long = struct.Struct(">q") +c_float = struct.Struct(">f") +c_double = struct.Struct(">d") + + +def _decode_null(buffer: memoryview) -> Tuple[memoryview, None]: + return buffer, None + + +def _decode_true(buffer: memoryview) -> Tuple[memoryview, Literal[True]]: + return buffer, True + + +def _decode_false(buffer: memoryview) -> Tuple[memoryview, Literal[False]]: + return buffer, False + + +def _decode_zero(buffer: memoryview) -> Tuple[memoryview, Literal[0]]: + return buffer, 0 + + +def _decode_empty(buffer: memoryview) -> Tuple[memoryview, List[Any]]: + return buffer, [] + + +def _decode_boolean(buffer: memoryview) -> Tuple[memoryview, bool]: + return buffer[1:], buffer[:1] == b"\x01" + + +def _decode_ubyte(buffer: memoryview) -> Tuple[memoryview, int]: + return buffer[1:], buffer[0] + + +def _decode_ushort(buffer: memoryview) -> Tuple[memoryview, int]: + return buffer[2:], c_unsigned_short.unpack(buffer[:2])[0] + + +def _decode_uint_small(buffer: memoryview) -> Tuple[memoryview, int]: + return buffer[1:], buffer[0] + + +def _decode_uint_large(buffer: memoryview) -> Tuple[memoryview, int]: + return buffer[4:], c_unsigned_int.unpack(buffer[:4])[0] + + +def _decode_ulong_small(buffer: memoryview) -> Tuple[memoryview, int]: + return buffer[1:], buffer[0] + + +def _decode_ulong_large(buffer: memoryview) -> Tuple[memoryview, int]: + return buffer[8:], c_unsigned_long_long.unpack(buffer[:8])[0] + + +def _decode_byte(buffer: memoryview) -> Tuple[memoryview, int]: + return buffer[1:], c_signed_char.unpack(buffer[:1])[0] + + +def _decode_short(buffer: memoryview) -> Tuple[memoryview, int]: + return buffer[2:], c_signed_short.unpack(buffer[:2])[0] + + +def _decode_int_small(buffer: memoryview) -> Tuple[memoryview, int]: + return buffer[1:], c_signed_char.unpack(buffer[:1])[0] + + +def _decode_int_large(buffer: memoryview) -> Tuple[memoryview, int]: + return buffer[4:], c_signed_int.unpack(buffer[:4])[0] + + +def _decode_long_small(buffer: memoryview) -> Tuple[memoryview, int]: + return buffer[1:], c_signed_char.unpack(buffer[:1])[0] + + +def _decode_long_large(buffer: memoryview) -> Tuple[memoryview, int]: + return buffer[8:], c_signed_long_long.unpack(buffer[:8])[0] + + +def _decode_float(buffer: memoryview) -> Tuple[memoryview, float]: + return buffer[4:], c_float.unpack(buffer[:4])[0] + + +def _decode_double(buffer: memoryview) -> Tuple[memoryview, float]: + return buffer[8:], c_double.unpack(buffer[:8])[0] + + +def _decode_timestamp(buffer: memoryview) -> Tuple[memoryview, int]: + return buffer[8:], c_signed_long_long.unpack(buffer[:8])[0] + + +def _decode_uuid(buffer: memoryview) -> Tuple[memoryview, uuid.UUID]: + return buffer[16:], uuid.UUID(bytes=buffer[:16].tobytes()) + + +def _decode_binary_small(buffer: memoryview) -> Tuple[memoryview, bytes]: + length_index = buffer[0] + 1 + return buffer[length_index:], buffer[1:length_index].tobytes() + + +def _decode_binary_large(buffer: memoryview) -> Tuple[memoryview, bytes]: + length_index = c_unsigned_long.unpack(buffer[:4])[0] + 4 + return buffer[length_index:], buffer[4:length_index].tobytes() + + +def _decode_list_small(buffer: memoryview) -> Tuple[memoryview, List[Any]]: + count = buffer[1] + buffer = buffer[2:] + values = [None] * count + for i in range(count): + buffer, values[i] = _DECODE_BY_CONSTRUCTOR[buffer[0]](buffer[1:]) + return buffer, values + + +def _decode_list_large(buffer: memoryview) -> Tuple[memoryview, List[Any]]: + count = c_unsigned_long.unpack(buffer[4:8])[0] + buffer = buffer[8:] + values = [None] * count + for i in range(count): + buffer, values[i] = _DECODE_BY_CONSTRUCTOR[buffer[0]](buffer[1:]) + return buffer, values + + +def _decode_map_small(buffer: memoryview) -> Tuple[memoryview, Dict[Any, Any]]: + count = int(buffer[1] / 2) + buffer = buffer[2:] + values = {} + for _ in range(count): + buffer, key = _DECODE_BY_CONSTRUCTOR[buffer[0]](buffer[1:]) + buffer, value = _DECODE_BY_CONSTRUCTOR[buffer[0]](buffer[1:]) + values[key] = value + return buffer, values + + +def _decode_map_large(buffer: memoryview) -> Tuple[memoryview, Dict[Any, Any]]: + count = int(c_unsigned_long.unpack(buffer[4:8])[0] / 2) + buffer = buffer[8:] + values = {} + for _ in range(count): + buffer, key = _DECODE_BY_CONSTRUCTOR[buffer[0]](buffer[1:]) + buffer, value = _DECODE_BY_CONSTRUCTOR[buffer[0]](buffer[1:]) + values[key] = value + return buffer, values + + +def _decode_array_small(buffer: memoryview) -> Tuple[memoryview, List[Any]]: + count = buffer[1] # Ignore first byte (size) and just rely on count + if count: + subconstructor = buffer[2] + buffer = buffer[3:] + values = [None] * count + for i in range(count): + buffer, values[i] = _DECODE_BY_CONSTRUCTOR[subconstructor](buffer) + return buffer, values + return buffer[2:], [] + + +def _decode_array_large(buffer: memoryview) -> Tuple[memoryview, List[Any]]: + count = c_unsigned_long.unpack(buffer[4:8])[0] + if count: + subconstructor = buffer[8] + buffer = buffer[9:] + values = [None] * count + for i in range(count): + buffer, values[i] = _DECODE_BY_CONSTRUCTOR[subconstructor](buffer) + return buffer, values + return buffer[8:], [] + + +def _decode_described(buffer: memoryview) -> Tuple[memoryview, object]: + # TODO: to move the cursor of the buffer to the described value based on size of the + # descriptor without decoding descriptor value + composite_type = buffer[0] + buffer, descriptor = _DECODE_BY_CONSTRUCTOR[composite_type](buffer[1:]) + buffer, value = _DECODE_BY_CONSTRUCTOR[buffer[0]](buffer[1:]) + try: + composite_type = cast(int, _COMPOSITES[descriptor]) + return buffer, {composite_type: value} + except KeyError: + return buffer, value + + +def decode_payload(buffer: memoryview) -> Message: + message: Dict[str, Union[Properties, Header, Dict, bytes, List]] = {} + while buffer: + # Ignore the first two bytes, they will always be the constructors for + # described type then ulong. + descriptor = buffer[2] + buffer, value = _DECODE_BY_CONSTRUCTOR[buffer[3]](buffer[4:]) + if descriptor == 112: + message["header"] = Header(*value) + elif descriptor == 113: + message["delivery_annotations"] = value + elif descriptor == 114: + message["message_annotations"] = value + elif descriptor == 115: + message["properties"] = Properties(*value) + elif descriptor == 116: + message["application_properties"] = value + elif descriptor == 117: + message["body"] = value + elif descriptor == 118: + try: + cast(List, message["sequence"]).append(value) + except KeyError: + message["sequence"] = [value] + elif descriptor == 119: + message["value"] = value + elif descriptor == 120: + message["footer"] = value + # TODO: we can possibly swap out the Message construct with a TypedDict + # for both input and output so we get the best of both. + # casting to TypedDict with named fields to allow for unpacking with ** + message_properties = cast("MessageDict", message) + return Message(**message_properties) + + +def decode_frame(data: memoryview) -> Tuple[int, List[Any]]: + # Ignore the first two bytes, they will always be the constructors for + # described type then ulong. + frame_type = data[2] + compound_list_type = data[3] + if compound_list_type == 0xD0: + # list32 0xd0: data[4:8] is size, data[8:12] is count + count = c_signed_int.unpack(data[8:12])[0] + buffer = data[12:] + else: + # list8 0xc0: data[4] is size, data[5] is count + count = data[5] + buffer = data[6:] + fields: List[Optional[memoryview]] = [None] * count + for i in range(count): + buffer, fields[i] = _DECODE_BY_CONSTRUCTOR[buffer[0]](buffer[1:]) + if frame_type == 20: + fields.append(buffer) + return frame_type, fields + + +def decode_empty_frame(header: memoryview) -> Tuple[int, bytes]: + if header[0:4] == _HEADER_PREFIX: + return 0, header.tobytes() + if header[5] == 0: + return 1, b"EMPTY" + raise ValueError("Received unrecognized empty frame") + + +_DECODE_BY_CONSTRUCTOR: List[Callable] = cast(List[Callable], [None] * 256) +_DECODE_BY_CONSTRUCTOR[0] = _decode_described +_DECODE_BY_CONSTRUCTOR[64] = _decode_null +_DECODE_BY_CONSTRUCTOR[65] = _decode_true +_DECODE_BY_CONSTRUCTOR[66] = _decode_false +_DECODE_BY_CONSTRUCTOR[67] = _decode_zero +_DECODE_BY_CONSTRUCTOR[68] = _decode_zero +_DECODE_BY_CONSTRUCTOR[69] = _decode_empty +_DECODE_BY_CONSTRUCTOR[80] = _decode_ubyte +_DECODE_BY_CONSTRUCTOR[81] = _decode_byte +_DECODE_BY_CONSTRUCTOR[82] = _decode_uint_small +_DECODE_BY_CONSTRUCTOR[83] = _decode_ulong_small +_DECODE_BY_CONSTRUCTOR[84] = _decode_int_small +_DECODE_BY_CONSTRUCTOR[85] = _decode_long_small +_DECODE_BY_CONSTRUCTOR[86] = _decode_boolean +_DECODE_BY_CONSTRUCTOR[96] = _decode_ushort +_DECODE_BY_CONSTRUCTOR[97] = _decode_short +_DECODE_BY_CONSTRUCTOR[112] = _decode_uint_large +_DECODE_BY_CONSTRUCTOR[113] = _decode_int_large +_DECODE_BY_CONSTRUCTOR[114] = _decode_float +_DECODE_BY_CONSTRUCTOR[128] = _decode_ulong_large +_DECODE_BY_CONSTRUCTOR[129] = _decode_long_large +_DECODE_BY_CONSTRUCTOR[130] = _decode_double +_DECODE_BY_CONSTRUCTOR[131] = _decode_timestamp +_DECODE_BY_CONSTRUCTOR[152] = _decode_uuid +_DECODE_BY_CONSTRUCTOR[160] = _decode_binary_small +_DECODE_BY_CONSTRUCTOR[161] = _decode_binary_small +_DECODE_BY_CONSTRUCTOR[163] = _decode_binary_small +_DECODE_BY_CONSTRUCTOR[176] = _decode_binary_large +_DECODE_BY_CONSTRUCTOR[177] = _decode_binary_large +_DECODE_BY_CONSTRUCTOR[179] = _decode_binary_large +_DECODE_BY_CONSTRUCTOR[192] = _decode_list_small +_DECODE_BY_CONSTRUCTOR[193] = _decode_map_small +_DECODE_BY_CONSTRUCTOR[208] = _decode_list_large +_DECODE_BY_CONSTRUCTOR[209] = _decode_map_large +_DECODE_BY_CONSTRUCTOR[224] = _decode_array_small +_DECODE_BY_CONSTRUCTOR[240] = _decode_array_large diff --git a/rstream/_pyamqp/_encode.py b/rstream/_pyamqp/_encode.py new file mode 100644 index 0000000..a0f44cc --- /dev/null +++ b/rstream/_pyamqp/_encode.py @@ -0,0 +1,1023 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- +# pylint: disable=too-many-lines +# pylint: disable=unused-argument +# type: ignore + +# TODO: fix mypy errors for _code/_definition/__defaults__ (issue #26500) +import calendar +import struct +import uuid +from datetime import datetime +from typing import ( + TYPE_CHECKING, + Any, + Callable, + Collection, + Dict, + Iterable, + List, + NamedTuple, + Optional, + Sequence, + Sized, + Tuple, + Union, + cast, +) + +from typing_extensions import Buffer + +from . import performatives +from .message import Message +from .types import ( + TYPE, + VALUE, + AMQPTypes, + ConstructorBytes, + FieldDefinition, + ObjDefinition, +) + +try: + from typing import TypeAlias # type: ignore +except ImportError: + from typing_extensions import TypeAlias + + +if TYPE_CHECKING: + from .message import Header, Properties + + Performative: TypeAlias = Union[ + performatives.OpenFrame, + performatives.BeginFrame, + performatives.AttachFrame, + performatives.FlowFrame, + performatives.TransferFrame, + performatives.DispositionFrame, + performatives.DetachFrame, + performatives.EndFrame, + performatives.CloseFrame, + performatives.SASLMechanism, + performatives.SASLInit, + performatives.SASLChallenge, + performatives.SASLResponse, + performatives.SASLOutcome, + Message, + Header, + Properties, + ] + +_FRAME_OFFSET = b"\x02" +_FRAME_TYPE = b"\x00" +AQMPSimpleType = Union[bool, float, str, bytes, uuid.UUID, None] + + +def _construct(byte: bytes, construct: bool) -> bytes: + return byte if construct else b"" + + +def encode_null(output: bytearray, *args: Any, **kwargs: Any) -> None: + """ + encoding code="0x40" category="fixed" width="0" label="the null value" + + :param bytearray output: The output buffer to write to. + :param any args: Ignored. + """ + output.extend(ConstructorBytes.null) + + +def encode_boolean(output: bytearray, value: bool, with_constructor: bool = True, **kwargs: Any) -> None: + """ + + + + + :param bytearray output: The output buffer to write to. + :param bool value: The boolean to encode. + :param bool with_constructor: Whether to include the constructor byte. + """ + value = bool(value) + if with_constructor: + output.extend(_construct(ConstructorBytes.bool, with_constructor)) + output.extend(b"\x01" if value else b"\x00") + return + + output.extend(ConstructorBytes.bool_true if value else ConstructorBytes.bool_false) + + +def encode_ubyte( + output: bytearray, value: Union[int, bytes], with_constructor: bool = True, **kwargs: Any +) -> None: + """ + + + :param bytearray output: The output buffer to write to. + :param int or bytes value: The ubyte to encode. + :param bool with_constructor: Whether to include the constructor byte. + """ + try: + value = int(value) + except ValueError: + value = cast(bytes, value) + value = ord(value) + try: + output.extend(_construct(ConstructorBytes.ubyte, with_constructor)) + output.extend(struct.pack(">B", abs(value))) + except struct.error as exc: + raise ValueError("Unsigned byte value must be 0-255") from exc + + +def encode_ushort(output: bytearray, value: int, with_constructor: bool = True, **kwargs: Any) -> None: + """ + + + :param bytearray output: The output buffer to write to. + :param int value: The ushort to encode. + :param bool with_constructor: Whether to include the constructor byte. + """ + value = int(value) + try: + output.extend(_construct(ConstructorBytes.ushort, with_constructor)) + output.extend(struct.pack(">H", abs(value))) + except struct.error as exc: + raise ValueError("Unsigned byte value must be 0-65535") from exc + + +def encode_uint( + output: bytearray, value: int, with_constructor: bool = True, use_smallest: bool = True +) -> None: + """ + + + + + :param bytearray output: The output buffer to write to. + :param int value: The uint to encode. + :param bool with_constructor: Whether to include the constructor byte. + :param bool use_smallest: Whether to use the smallest possible encoding. + """ + value = int(value) + if value == 0: + output.extend(ConstructorBytes.uint_0) + return + try: + if use_smallest and value <= 255: + output.extend(_construct(ConstructorBytes.uint_small, with_constructor)) + output.extend(struct.pack(">B", abs(value))) + return + output.extend(_construct(ConstructorBytes.uint_large, with_constructor)) + output.extend(struct.pack(">I", abs(value))) + except struct.error as exc: + raise ValueError("Value supplied for unsigned int invalid: {}".format(value)) from exc + + +def encode_ulong( + output: bytearray, value: int, with_constructor: bool = True, use_smallest: bool = True +) -> None: + """ + + + + + :param bytearray output: The output buffer to write to. + :param int value: The ulong to encode. + :param bool with_constructor: Whether to include the constructor byte. + :param bool use_smallest: Whether to use the smallest possible encoding. + """ + value = int(value) + if value == 0: + output.extend(ConstructorBytes.ulong_0) + return + try: + if use_smallest and value <= 255: + output.extend(_construct(ConstructorBytes.ulong_small, with_constructor)) + output.extend(struct.pack(">B", abs(value))) + return + output.extend(_construct(ConstructorBytes.ulong_large, with_constructor)) + output.extend(struct.pack(">Q", abs(value))) + except struct.error as exc: + raise ValueError("Value supplied for unsigned long invalid: {}".format(value)) from exc + + +def encode_byte(output: bytearray, value: int, with_constructor: bool = True, **kwargs: Any) -> None: + """ + + + :param bytearray output: The output buffer to write to. + :param byte value: The byte to encode. + :param bool with_constructor: Whether to include the constructor byte. + """ + value = int(value) + try: + output.extend(_construct(ConstructorBytes.byte, with_constructor)) + output.extend(struct.pack(">b", value)) + except struct.error as exc: + raise ValueError("Byte value must be -128-127") from exc + + +def encode_short(output: bytearray, value: int, with_constructor: bool = True, **kwargs: Any) -> None: + """ + + + :param bytearray output: The output buffer to write to. + :param int value: The short to encode. + :param bool with_constructor: Whether to include the constructor byte. + """ + value = int(value) + try: + output.extend(_construct(ConstructorBytes.short, with_constructor)) + output.extend(struct.pack(">h", value)) + except struct.error as exc: + raise ValueError("Short value must be -32768-32767") from exc + + +def encode_int( + output: bytearray, value: int, with_constructor: bool = True, use_smallest: bool = True +) -> None: + """ + + + + :param bytearray output: The output buffer to write to. + :param int value: The int to encode. + :param bool with_constructor: Whether to include the constructor byte. + :param bool use_smallest: Whether to use the smallest possible encoding. + """ + value = int(value) + try: + if use_smallest and (-128 <= value <= 127): + output.extend(_construct(ConstructorBytes.int_small, with_constructor)) + output.extend(struct.pack(">b", value)) + return + output.extend(_construct(ConstructorBytes.int_large, with_constructor)) + output.extend(struct.pack(">i", value)) + except struct.error as exc: + raise ValueError("Value supplied for int invalid: {}".format(value)) from exc + + +def encode_long( + output: bytearray, value: Union[int, datetime], with_constructor: bool = True, use_smallest: bool = True +) -> None: + """ + + + + :param bytearray output: The output buffer to write to. + :param int or datetime value: The UUID to encode. + :param bool with_constructor: Whether to include the constructor byte. + :param bool use_smallest: Whether to use the smallest possible encoding. + """ + if isinstance(value, datetime): + value = int((calendar.timegm(value.utctimetuple()) * 1000) + (value.microsecond / 1000)) + try: + if use_smallest and (-128 <= value <= 127): + output.extend(_construct(ConstructorBytes.long_small, with_constructor)) + output.extend(struct.pack(">b", value)) + return + output.extend(_construct(ConstructorBytes.long_large, with_constructor)) + output.extend(struct.pack(">q", value)) + except struct.error as exc: + raise ValueError("Value supplied for long invalid: {}".format(value)) from exc + + +def encode_float(output: bytearray, value: float, with_constructor: bool = True, **kwargs: Any) -> None: + """ + + + :param bytearray output: The output buffer to write to. + :param float value: The value to encode. + :param bool with_constructor: Whether to include the constructor byte. + """ + value = float(value) + output.extend(_construct(ConstructorBytes.float, with_constructor)) + output.extend(struct.pack(">f", value)) + + +def encode_double(output: bytearray, value: float, with_constructor: bool = True, **kwargs: Any) -> None: + """ + + + :param bytearray output: The output buffer to write to. + :param float value: The double to encode. + :param bool with_constructor: Whether to include the constructor byte. + """ + value = float(value) + output.extend(_construct(ConstructorBytes.double, with_constructor)) + output.extend(struct.pack(">d", value)) + + +def encode_timestamp( + output: bytearray, value: Union[int, datetime], with_constructor: bool = True, **kwargs: Any +) -> None: + """ + + + :param bytearray output: The output buffer to write to. + :param int or datetime value: The timestamp to encode. + :param bool with_constructor: Whether to include the constructor byte. + """ + if isinstance(value, datetime): + value = int((calendar.timegm(value.utctimetuple()) * 1000) + (value.microsecond / 1000)) + + value = int(value) + output.extend(_construct(ConstructorBytes.timestamp, with_constructor)) + output.extend(struct.pack(">q", value)) + + +def encode_uuid( + output: bytearray, value: Union[uuid.UUID, str, bytes], with_constructor: bool = True, **kwargs: Any +) -> None: + """ + + + :param bytearray output: The output buffer to write to. + :param uuid.UUID or str or bytes value: The UUID to encode. + :param bool with_constructor: Whether to include the constructor byte. + """ + if isinstance(value, str): + value = uuid.UUID(value).bytes + elif isinstance(value, uuid.UUID): + value = value.bytes + elif isinstance(value, bytes): + value = uuid.UUID(bytes=value).bytes + else: + raise TypeError("Invalid UUID type: {}".format(type(value))) + output.extend(_construct(ConstructorBytes.uuid, with_constructor)) + output.extend(value) + + +def encode_binary( + output: bytearray, + value: Union[bytes, bytearray], + with_constructor: bool = True, + use_smallest: bool = True, +) -> None: + """ + + + + :param bytearray output: The output buffer to write to. + :param bytes or bytearray value: The value to encode. + :param bool with_constructor: Whether to include the constructor in the output. + :param bool use_smallest: Whether to use the smallest possible encoding. + """ + length = len(value) + if use_smallest and length <= 255: + output.extend(_construct(ConstructorBytes.binary_small, with_constructor)) + output.extend(struct.pack(">B", length)) + output.extend(value) + return + try: + output.extend(_construct(ConstructorBytes.binary_large, with_constructor)) + output.extend(struct.pack(">L", length)) + output.extend(value) + except struct.error as exc: + raise ValueError("Binary data to long to encode") from exc + + +def encode_string( + output: bytearray, value: Union[bytes, str], with_constructor: bool = True, use_smallest: bool = True +) -> None: + """ + + + + :param bytearray output: The output buffer to write to. + :param str value: The string to encode. + :param bool with_constructor: Whether to include the constructor byte. + :param bool use_smallest: Whether to use the smallest possible encoding. + """ + if isinstance(value, str): + value = value.encode("utf-8") + length = len(value) + if use_smallest and length <= 255: + output.extend(_construct(ConstructorBytes.string_small, with_constructor)) + output.extend(struct.pack(">B", length)) + output.extend(value) + return + try: + output.extend(_construct(ConstructorBytes.string_large, with_constructor)) + output.extend(struct.pack(">L", length)) + output.extend(value) + except struct.error as exc: + raise ValueError("String value too long to encode.") from exc + + +def encode_symbol( + output: bytearray, value: Union[bytes, str], with_constructor: bool = True, use_smallest: bool = True +) -> None: + """ + + + + :param bytearray output: The output buffer to write to. + :param bytes or str value: The value to encode. + :param bool with_constructor: Whether to include the constructor byte. + :param bool use_smallest: Whether to use the smallest possible encoding. + """ + if isinstance(value, str): + value = value.encode("utf-8") + length = len(value) + if use_smallest and length <= 255: + output.extend(_construct(ConstructorBytes.symbol_small, with_constructor)) + output.extend(struct.pack(">B", length)) + output.extend(value) + return + try: + output.extend(_construct(ConstructorBytes.symbol_large, with_constructor)) + output.extend(struct.pack(">L", length)) + output.extend(value) + except struct.error as exc: + raise ValueError("Symbol value too long to encode.") from exc + + +def encode_list( + output: bytearray, value: Sequence[Any], with_constructor: bool = True, use_smallest: bool = True +) -> None: + """ + + + + + :param bytearray output: The output buffer to write to. + :param sequence value: The list to encode. + :param bool with_constructor: Whether to include the constructor in the output. + :param bool use_smallest: Whether to use the smallest possible encoding. + """ + count = len(cast(Sized, value)) + if use_smallest and count == 0: + output.extend(ConstructorBytes.list_0) + return + encoded_size = 0 + encoded_values = bytearray() + for item in value: + encode_value(encoded_values, item, with_constructor=True) + encoded_size += len(encoded_values) + if use_smallest and count <= 255 and encoded_size < 255: + output.extend(_construct(ConstructorBytes.list_small, with_constructor)) + output.extend(struct.pack(">B", encoded_size + 1)) + output.extend(struct.pack(">B", count)) + else: + try: + output.extend(_construct(ConstructorBytes.list_large, with_constructor)) + output.extend(struct.pack(">L", encoded_size + 4)) + output.extend(struct.pack(">L", count)) + except struct.error as exc: + raise ValueError("List is too large or too long to be encoded.") from exc + output.extend(encoded_values) + + +def encode_map( + output: bytearray, + value: Union[Dict[Any, Any], Iterable[Tuple[Any, Any]]], + with_constructor: bool = True, + use_smallest: bool = True, +) -> None: + """ + + + + :param bytearray output: The output buffer to write to. + :param dict value: The value to encode. + :param bool with_constructor: Whether to include the constructor in the output. + :param bool use_smallest: Whether to use the smallest possible encoding. + """ + count = len(cast(Sized, value)) * 2 + encoded_size = 0 + encoded_values = bytearray() + if isinstance(value, dict): + items: Iterable[Any] = value.items() + elif isinstance(value, Iterable): + items = value + + for key, data in items: + encode_value(encoded_values, key, with_constructor=True) + encode_value(encoded_values, data, with_constructor=True) + encoded_size = len(encoded_values) + if use_smallest and count <= 255 and encoded_size < 255: + output.extend(_construct(ConstructorBytes.map_small, with_constructor)) + output.extend(struct.pack(">B", encoded_size + 1)) + output.extend(struct.pack(">B", count)) + else: + try: + output.extend(_construct(ConstructorBytes.map_large, with_constructor)) + output.extend(struct.pack(">L", encoded_size + 4)) + output.extend(struct.pack(">L", count)) + except struct.error as exc: + raise ValueError("Map is too large or too long to be encoded.") from exc + output.extend(encoded_values) + + +def _check_element_type(item: Dict[str, Any], element_type: Any) -> Any: + if not element_type: + try: + return item["TYPE"] + except (KeyError, TypeError): + return type(item) + try: + if item["TYPE"] != element_type: + raise TypeError("All elements in an array must be the same type.") + except (KeyError, TypeError) as exc: + if not isinstance(item, element_type): + raise TypeError("All elements in an array must be the same type.") from exc + return element_type + + +def encode_array( + output: bytearray, value: Sequence[Any], with_constructor: bool = True, use_smallest: bool = True +) -> None: + """ + + + + :param bytearray output: The output buffer to write to. + :param sequence value: The array to encode. + :param bool with_constructor: Whether to include the constructor in the output. + :param bool use_smallest: Whether to use the smallest possible encoding. + """ + count = len(cast(Sized, value)) + encoded_size = 0 + encoded_values = bytearray() + first_item = True + element_type = None + for item in value: + element_type = _check_element_type(item, element_type) + encode_value(encoded_values, item, with_constructor=first_item, use_smallest=False) + first_item = False + if item is None: + encoded_size -= 1 + break + encoded_size += len(encoded_values) + if use_smallest and count <= 255 and encoded_size < 255: + output.extend(_construct(ConstructorBytes.array_small, with_constructor)) + output.extend(struct.pack(">B", encoded_size + 1)) + output.extend(struct.pack(">B", count)) + else: + try: + output.extend(_construct(ConstructorBytes.array_large, with_constructor)) + output.extend(struct.pack(">L", encoded_size + 4)) + output.extend(struct.pack(">L", count)) + except struct.error as exc: + raise ValueError("Array is too large or too long to be encoded.") from exc + output.extend(encoded_values) + + +def encode_described(output: bytearray, value: Tuple[Any, Any], _: bool = None, **kwargs: Any) -> None: # type: ignore + output.extend(ConstructorBytes.descriptor) + encode_value(output, value[0], **kwargs) + encode_value(output, value[1], **kwargs) + + +def encode_fields(value: Optional[Dict[bytes, Any]]) -> Dict[str, Any]: + """A mapping from field name to value. + + The fields type is a map where the keys are restricted to be of type symbol (this excludes the possibility + of a null key). There is no further restriction implied by the fields type on the allowed values for the + entries or the set of allowed keys. + + + + :param dict or None value: The field values to encode. + :return: The encoded field values. + :rtype: dict + """ + if not value: + return {TYPE: AMQPTypes.null, VALUE: None} + fields = {TYPE: AMQPTypes.map, VALUE: []} + for key, data in value.items(): + if isinstance(key, str): + key = key.encode("utf-8") # type: ignore + cast(List, fields[VALUE]).append(({TYPE: AMQPTypes.symbol, VALUE: key}, data)) + return fields + + +def encode_annotations(value: Optional[Dict[Union[str, bytes], Any]]) -> Dict[str, Any]: + """The annotations type is a map where the keys are restricted to be of type symbol or of type ulong. + + All ulong keys, and all symbolic keys except those beginning with "x-" are reserved. + On receiving an annotations map containing keys or values which it does not recognize, and for which the + key does not begin with the string 'x-opt-' an AMQP container MUST detach the link with the not-implemented + amqp-error. + + + + :param dict or None value: The annotations to encode. + :return: The encoded annotations. + :rtype: dict + """ + if not value: + return {TYPE: AMQPTypes.null, VALUE: None} + fields = {TYPE: AMQPTypes.map, VALUE: []} + for key, data in value.items(): + if isinstance(key, int): + field_key = {TYPE: AMQPTypes.ulong, VALUE: key} + else: + field_key = {TYPE: AMQPTypes.symbol, VALUE: key} + try: + cast(List, fields[VALUE]).append((field_key, {TYPE: data[TYPE], VALUE: data[VALUE]})) + except (KeyError, TypeError): + cast(List, fields[VALUE]).append((field_key, {TYPE: None, VALUE: data})) + return fields + + +def encode_application_properties( + value: Optional[Dict[Union[str, bytes], AQMPSimpleType]] +) -> Dict[Union[str, bytes], Any]: + """The application-properties section is a part of the bare message used for structured application data. + + + + + + Intermediaries may use the data within this structure for the purposes of filtering or routing. + The keys of this map are restricted to be of type string (which excludes the possibility of a null key) + and the values are restricted to be of simple types only, that is (excluding map, list, and array types). + + :param dict value: The application properties to encode. + :return: The encoded application properties. + :rtype: dict + """ + if not value: + return {TYPE: AMQPTypes.null, VALUE: None} + fields: Dict[Union[str, bytes], Any] = {TYPE: AMQPTypes.map, VALUE: cast(List, [])} + for key, data in value.items(): + cast(List, fields[VALUE]).append(({TYPE: AMQPTypes.string, VALUE: key}, data)) + return fields + + +def encode_message_id( + value: Union[int, uuid.UUID, bytes, str] +) -> Dict[str, Union[int, uuid.UUID, bytes, str]]: + """ + + + + + + :param any value: The message ID to encode. + :return: The encoded message ID. + :rtype: dict + """ + if isinstance(value, int): + return {TYPE: AMQPTypes.ulong, VALUE: value} + if isinstance(value, uuid.UUID): + return {TYPE: AMQPTypes.uuid, VALUE: value} + if isinstance(value, bytes): + return {TYPE: AMQPTypes.binary, VALUE: value} + if isinstance(value, str): + return {TYPE: AMQPTypes.string, VALUE: value} + raise TypeError("Unsupported Message ID type.") + + +def encode_node_properties(value: Optional[Dict[str, Any]]) -> Dict[str, Any]: + """Properties of a node. + + + + A symbol-keyed map containing properties of a node used when requesting creation or reporting + the creation of a dynamic node. The following common properties are defined:: + + - `lifetime-policy`: The lifetime of a dynamically generated node. Definitionally, the lifetime will + never be less than the lifetime of the link which caused its creation, however it is possible to extend + the lifetime of dynamically created node using a lifetime policy. The value of this entry MUST be of a type + which provides the lifetime-policy archetype. The following standard lifetime-policies are defined below: + delete-on-close, delete-on-no-links, delete-on-no-messages or delete-on-no-links-or-messages. + + - `supported-dist-modes`: The distribution modes that the node supports. The value of this entry MUST be one or + more symbols which are valid distribution-modes. That is, the value MUST be of the same type as would be valid + in a field defined with the following attributes: + type="symbol" multiple="true" requires="distribution-mode" + + :param dict value: The node properties. + :return: The encoded node properties. + :rtype: dict + """ + if not value: + return {TYPE: AMQPTypes.null, VALUE: None} + # TODO + fields = {TYPE: AMQPTypes.map, VALUE: []} + # fields[{TYPE: AMQPTypes.symbol, VALUE: b'lifetime-policy'}] = { + # TYPE: AMQPTypes.described, + # VALUE: ( + # {TYPE: AMQPTypes.ulong, VALUE: value['lifetime_policy']}, + # {TYPE: AMQPTypes.list, VALUE: []} + # ) + # } + # fields[{TYPE: AMQPTypes.symbol, VALUE: b'supported-dist-modes'}] = {} + return fields + + +def encode_filter_set(value: Optional[Dict[str, Any]]) -> Dict[str, Any]: + """A set of predicates to filter the Messages admitted onto the Link. + + + + A set of named filters. Every key in the map MUST be of type symbol, every value MUST be either null or of a + described type which provides the archetype filter. A filter acts as a function on a message which returns a + boolean result indicating whether the message can pass through that filter or not. A message will pass + through a filter-set if and only if it passes through each of the named filters. If the value for a given key is + null, this acts as if there were no such key present (i.e., all messages pass through the null filter). + + Filter types are a defined extension point. The filter types that a given source supports will be indicated + by the capabilities of the source. + + :param dict value: A set of named filters. + :return: A set of encoded named filters. + :rtype: dict + """ + if not value: + return {TYPE: AMQPTypes.null, VALUE: None} + fields = {TYPE: AMQPTypes.map, VALUE: cast(List, [])} + for name, data in value.items(): + described_filter: Dict[str, Union[Tuple[Dict[str, Any], Any], Optional[str]]] + if data is None: + described_filter = {TYPE: AMQPTypes.null, VALUE: None} + else: + if isinstance(name, str): + name = name.encode("utf-8") # type: ignore + if isinstance(data, (str, bytes)): + described_filter = data # type: ignore + # handle the situation when data is a tuple or list of length 2 + else: + try: + descriptor, filter_value = data + described_filter = { + TYPE: AMQPTypes.described, + VALUE: ({TYPE: AMQPTypes.symbol, VALUE: descriptor}, filter_value), + } + # if its not a type that is known, raise the error from the server + except (ValueError, TypeError): + described_filter = data + + cast(List, fields[VALUE]).append(({TYPE: AMQPTypes.symbol, VALUE: name}, described_filter)) + return fields + + +def encode_unknown(output: bytearray, value: Optional[object], **kwargs: Any) -> None: + """ + Dynamic encoding according to the type of `value`. + :param bytearray output: The output buffer. + :param any value: The value to encode. + """ + if value is None: + encode_null(output, **kwargs) + elif isinstance(value, bool): + encode_boolean(output, value, **kwargs) + elif isinstance(value, str): + encode_string(output, value, **kwargs) + elif isinstance(value, uuid.UUID): + encode_uuid(output, value, **kwargs) + elif isinstance(value, (bytearray, bytes)): + encode_binary(output, value, **kwargs) + elif isinstance(value, float): + encode_double(output, value, **kwargs) + elif isinstance(value, int): + encode_int(output, value, **kwargs) + elif isinstance(value, datetime): + encode_timestamp(output, value, **kwargs) + elif isinstance(value, list): + encode_list(output, value, **kwargs) + elif isinstance(value, tuple): + encode_described(output, cast(Tuple[Any, Any], value), **kwargs) + elif isinstance(value, dict): + encode_map(output, value, **kwargs) + else: + raise TypeError("Unable to encode unknown value: {}".format(value)) + + +_FIELD_DEFINITIONS: Dict[FieldDefinition, Callable[[Any], Any]] = { + FieldDefinition.fields: encode_fields, + FieldDefinition.annotations: encode_annotations, + FieldDefinition.message_id: encode_message_id, + FieldDefinition.app_properties: encode_application_properties, + FieldDefinition.node_properties: encode_node_properties, + FieldDefinition.filter_set: encode_filter_set, +} + +_ENCODE_MAP = { + None: encode_unknown, + AMQPTypes.null: encode_null, + AMQPTypes.boolean: encode_boolean, + AMQPTypes.ubyte: encode_ubyte, + AMQPTypes.byte: encode_byte, + AMQPTypes.ushort: encode_ushort, + AMQPTypes.short: encode_short, + AMQPTypes.uint: encode_uint, + AMQPTypes.int: encode_int, + AMQPTypes.ulong: encode_ulong, + AMQPTypes.long: encode_long, + AMQPTypes.float: encode_float, + AMQPTypes.double: encode_double, + AMQPTypes.timestamp: encode_timestamp, + AMQPTypes.uuid: encode_uuid, + AMQPTypes.binary: encode_binary, + AMQPTypes.string: encode_string, + AMQPTypes.symbol: encode_symbol, + AMQPTypes.list: encode_list, + AMQPTypes.map: encode_map, + AMQPTypes.array: encode_array, + AMQPTypes.described: encode_described, +} + + +def encode_value(output: bytearray, value: Any, **kwargs: Any) -> None: + try: + cast(Callable, _ENCODE_MAP[value[TYPE]])(output, value[VALUE], **kwargs) + except (KeyError, TypeError): + encode_unknown(output, value, **kwargs) + + +def describe_performative(performative: NamedTuple) -> Dict[str, Sequence[Collection[str]]]: + body: List[Dict[str, Any]] = [] + for index, value in enumerate(performative): + # TODO: fix mypy + field = performative._definition[index] # type: ignore # pylint: disable=protected-access + if value is None: + body.append({TYPE: AMQPTypes.null, VALUE: None}) + elif field is None: + continue + elif isinstance(field.type, FieldDefinition): + if field.multiple: + body.append( + { + TYPE: AMQPTypes.array, + VALUE: [_FIELD_DEFINITIONS[field.type](v) for v in value], # type: ignore + } + ) + else: + body.append(_FIELD_DEFINITIONS[field.type](value)) # type: ignore + elif isinstance(field.type, ObjDefinition): + body.append(describe_performative(value)) + else: + if field.multiple: + body.append( + { + TYPE: AMQPTypes.array, + VALUE: [{TYPE: field.type, VALUE: v} for v in value], + } + ) + else: + body.append({TYPE: field.type, VALUE: value}) + + return { + TYPE: AMQPTypes.described, + VALUE: ( + {TYPE: AMQPTypes.ulong, VALUE: performative._code}, # type: ignore # pylint: disable=protected-access + {TYPE: AMQPTypes.list, VALUE: body}, + ), + } + + +def encode_payload(output: bytearray, payload: Message) -> bytes: + if payload[0]: # header + # TODO: Header and Properties encoding can be optimized to + # 1. not encoding trailing None fields + # Possible fix 1: + # header = payload[0] + # header = header[0:max(i for i, v in enumerate(header) if v is not None) + 1] + # Possible fix 2: + # itertools.dropwhile(lambda x: x is None, header[::-1]))[::-1] + # 2. encoding bool without constructor + # Possible fix 3: + # header = list(payload[0]) + # while header[-1] is None: + # del header[-1] + encode_value(output, describe_performative(payload[0])) + + if payload[2]: # message annotations + encode_value( + output, + { + TYPE: AMQPTypes.described, + VALUE: ( + {TYPE: AMQPTypes.ulong, VALUE: 0x00000072}, + encode_annotations(payload[2]), + ), + }, + ) + + if payload[3]: # properties + # TODO: Header and Properties encoding can be optimized to + # 1. not encoding trailing None fields + # 2. encoding bool without constructor + encode_value(output, describe_performative(payload[3])) + + if payload[4]: # application properties + encode_value( + output, + { + TYPE: AMQPTypes.described, + VALUE: ( + {TYPE: AMQPTypes.ulong, VALUE: 0x00000074}, + encode_application_properties(payload[4]), + ), + }, + ) + + if payload[5]: # data + encode_value( + output, + { + TYPE: AMQPTypes.described, + VALUE: ( + {TYPE: AMQPTypes.ulong, VALUE: 0x00000075}, + {TYPE: AMQPTypes.binary, VALUE: payload[5]}, + ), + }, + ) + + if payload[6]: # sequence + for item_value in payload[6]: + encode_value( + output, + { + TYPE: AMQPTypes.described, + VALUE: ( + {TYPE: AMQPTypes.ulong, VALUE: 0x00000076}, + {TYPE: None, VALUE: item_value}, + ), + }, + ) + + if payload[7]: # value + encode_value( + output, + { + TYPE: AMQPTypes.described, + VALUE: ( + {TYPE: AMQPTypes.ulong, VALUE: 0x00000077}, + {TYPE: None, VALUE: payload[7]}, + ), + }, + ) + + if payload[8]: # footer + encode_value( + output, + { + TYPE: AMQPTypes.described, + VALUE: ( + {TYPE: AMQPTypes.ulong, VALUE: 0x00000078}, + encode_annotations(payload[8]), + ), + }, + ) + + # TODO: + # currently the delivery annotations must be finally encoded instead of being encoded at the 2nd position + # otherwise the event hubs service would ignore the delivery annotations + # -- received message doesn't have it populated + # check with service team? + if payload[1]: # delivery annotations + encode_value( + output, + { + TYPE: AMQPTypes.described, + VALUE: ( + {TYPE: AMQPTypes.ulong, VALUE: 0x00000071}, + encode_annotations(payload[1]), + ), + }, + ) + + return output + + +def encode_frame( + frame: Optional[NamedTuple], frame_type: bytes = _FRAME_TYPE +) -> Tuple[bytes, Optional[bytes]]: + # TODO: allow passing type specific bytes manually, e.g. Empty Frame needs padding + if frame is None: + size = 8 + header = size.to_bytes(4, "big") + _FRAME_OFFSET + frame_type + return header, None + + frame_description = describe_performative(frame) + frame_data = bytearray() + encode_value(frame_data, frame_description) + if isinstance(frame, performatives.TransferFrame): + # casting from Optional[Buffer] since payload will not be None at this point + frame_data += cast(Buffer, frame.payload) + + size = len(frame_data) + 8 + header = size.to_bytes(4, "big") + _FRAME_OFFSET + frame_type + return header, frame_data diff --git a/rstream/_pyamqp/constants.py b/rstream/_pyamqp/constants.py new file mode 100644 index 0000000..c0eff6c --- /dev/null +++ b/rstream/_pyamqp/constants.py @@ -0,0 +1,346 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- +# type: ignore + +import struct +from collections import namedtuple +from enum import Enum +from typing import cast + +_AS_BYTES = struct.Struct(">B") + +#: The IANA assigned port number for AMQP.The standard AMQP port number that has been assigned by IANA +#: for TCP, UDP, and SCTP.There are currently no UDP or SCTP mappings defined for AMQP. +#: The port number is reserved for future transport mappings to these protocols. +PORT = 5672 + +# default port for AMQP over Websocket +WEBSOCKET_PORT = 443 + +# subprotocol for AMQP over Websocket +AMQP_WS_SUBPROTOCOL = "AMQPWSB10" + +#: The IANA assigned port number for secure AMQP (amqps).The standard AMQP port number that has been assigned +#: by IANA for secure TCP using TLS. Implementations listening on this port should NOT expect a protocol +#: handshake before TLS is negotiated. +SECURE_PORT = 5671 + + +# default port for AMQP over Websocket +WEBSOCKET_PORT = 443 + + +# subprotocol for AMQP over Websocket +AMQP_WS_SUBPROTOCOL = "AMQPWSB10" + + +MAJOR = 1 #: Major protocol version. +MINOR = 0 #: Minor protocol version. +REV = 0 #: Protocol revision. +HEADER_FRAME = b"AMQP\x00" + _AS_BYTES.pack(MAJOR) + _AS_BYTES.pack(MINOR) + _AS_BYTES.pack(REV) + + +TLS_MAJOR = 1 #: Major protocol version. +TLS_MINOR = 0 #: Minor protocol version. +TLS_REV = 0 #: Protocol revision. +TLS_HEADER_FRAME = ( + b"AMQP\x02" + _AS_BYTES.pack(TLS_MAJOR) + _AS_BYTES.pack(TLS_MINOR) + _AS_BYTES.pack(TLS_REV) +) + +SASL_MAJOR = 1 #: Major protocol version. +SASL_MINOR = 0 #: Minor protocol version. +SASL_REV = 0 #: Protocol revision. +SASL_HEADER_FRAME = ( + b"AMQP\x03" + _AS_BYTES.pack(SASL_MAJOR) + _AS_BYTES.pack(SASL_MINOR) + _AS_BYTES.pack(SASL_REV) +) + +EMPTY_FRAME = b"\x00\x00\x00\x08\x02\x00\x00\x00" + +#: The lower bound for the agreed maximum frame size (in bytes). During the initial Connection negotiation, the +#: two peers must agree upon a maximum frame size. This constant defines the minimum value to which the maximum +#: frame size can be set. By defining this value, the peers can guarantee that they can send frames of up to this +#: size until they have agreed a definitive maximum frame size for that Connection. +MIN_MAX_FRAME_SIZE = 512 +MAX_FRAME_SIZE_BYTES = 1024 * 1024 +MAX_CHANNELS = 65535 +INCOMING_WINDOW = 64 * 1024 +OUTGOING_WINDOW = 64 * 1024 + +DEFAULT_LINK_CREDIT = 10000 + +FIELD = namedtuple("FIELD", "name, type, mandatory, default, multiple") + +STRING_FILTER = b"apache.org:selector-filter:string" + +DEFAULT_AUTH_TIMEOUT = 60 +AUTH_DEFAULT_EXPIRATION_SECONDS = 3600 +TOKEN_TYPE_JWT = "jwt" +TOKEN_TYPE_SASTOKEN = "servicebus.windows.net:sastoken" +CBS_PUT_TOKEN = "put-token" +CBS_NAME = "name" +CBS_OPERATION = "operation" +CBS_TYPE = "type" +CBS_EXPIRATION = "expiration" + +SEND_DISPOSITION_ACCEPT = "accepted" +SEND_DISPOSITION_REJECT = "rejected" + +AUTH_TYPE_SASL_PLAIN = "AUTH_SASL_PLAIN" +AUTH_TYPE_CBS = "AUTH_CBS" + +DEFAULT_WEBSOCKET_HEARTBEAT_SECONDS = 10 +CONNECT_TIMEOUT = 1 +SOCKET_TIMEOUT = 0.2 +WS_TIMEOUT_INTERVAL = 1 + + +class ConnectionState(Enum): + #: In this state a Connection exists, but nothing has been sent or received. This is the state an + #: implementation would be in immediately after performing a socket connect or socket accept. + START = 0 + #: In this state the Connection header has been received from our peer, but we have not yet sent anything. + HDR_RCVD = 1 + #: In this state the Connection header has been sent to our peer, but we have not yet received anything. + HDR_SENT = 2 + #: In this state we have sent and received the Connection header, but we have not yet sent or + #: received an open frame. + HDR_EXCH = 3 + #: In this state we have sent both the Connection header and the open frame, but + #: we have not yet received anything. + OPEN_PIPE = 4 + #: In this state we have sent the Connection header, the open frame, any pipelined Connection traffic, + #: and the close frame, but we have not yet received anything. + OC_PIPE = 5 + #: In this state we have sent and received the Connection header, and received an open frame from + #: our peer, but have not yet sent an open frame. + OPEN_RCVD = 6 + #: In this state we have sent and received the Connection header, and sent an open frame to our peer, + #: but have not yet received an open frame. + OPEN_SENT = 7 + #: In this state we have send and received the Connection header, sent an open frame, any pipelined + #: Connection traffic, and the close frame, but we have not yet received an open frame. + CLOSE_PIPE = 8 + #: In this state the Connection header and the open frame have both been sent and received. + OPENED = 9 + #: In this state we have received a close frame indicating that our partner has initiated a close. + #: This means we will never have to read anything more from this Connection, however we can + #: continue to write frames onto the Connection. If desired, an implementation could do a TCP half-close + #: at this point to shutdown the read side of the Connection. + CLOSE_RCVD = 10 + #: In this state we have sent a close frame to our partner. It is illegal to write anything more onto + #: the Connection, however there may still be incoming frames. If desired, an implementation could do + #: a TCP half-close at this point to shutdown the write side of the Connection. + CLOSE_SENT = 11 + #: The DISCARDING state is a variant of the CLOSE_SENT state where the close is triggered by an error. + #: In this case any incoming frames on the connection MUST be silently discarded until the peer's close + #: frame is received. + DISCARDING = 12 + #: In this state it is illegal for either endpoint to write anything more onto the Connection. The + #: Connection may be safely closed and discarded. + END = 13 + + +class SessionState(Enum): + #: In the UNMAPPED state, the Session endpoint is not mapped to any incoming or outgoing channels on the + #: Connection endpoint. In this state an endpoint cannot send or receive frames. + UNMAPPED = 0 + #: In the BEGIN_SENT state, the Session endpoint is assigned an outgoing channel number, but there is no entry + #: in the incoming channel map. In this state the endpoint may send frames but cannot receive them. + BEGIN_SENT = 1 + #: In the BEGIN_RCVD state, the Session endpoint has an entry in the incoming channel map, but has not yet + #: been assigned an outgoing channel number. The endpoint may receive frames, but cannot send them. + BEGIN_RCVD = 2 + #: In the MAPPED state, the Session endpoint has both an outgoing channel number and an entry in the incoming + #: channel map. The endpoint may both send and receive frames. + MAPPED = 3 + #: In the END_SENT state, the Session endpoint has an entry in the incoming channel map, but is no longer + #: assigned an outgoing channel number. The endpoint may receive frames, but cannot send them. + END_SENT = 4 + #: In the END_RCVD state, the Session endpoint is assigned an outgoing channel number, but there is no entry in + #: the incoming channel map. The endpoint may send frames, but cannot receive them. + END_RCVD = 5 + #: The DISCARDING state is a variant of the END_SENT state where the end is triggered by an error. In this + #: case any incoming frames on the session MUST be silently discarded until the peer's end frame is received. + DISCARDING = 6 + + +class SessionTransferState(Enum): + OKAY = 0 + ERROR = 1 + BUSY = 2 + + +class LinkDeliverySettleReason(Enum): + DISPOSITION_RECEIVED = 0 + SETTLED = 1 + NOT_DELIVERED = 2 + TIMEOUT = 3 + CANCELLED = 4 + + +class LinkState(Enum): + DETACHED = 0 + ATTACH_SENT = 1 + ATTACH_RCVD = 2 + ATTACHED = 3 + DETACH_SENT = 4 + DETACH_RCVD = 5 + ERROR = 6 + + +class ManagementLinkState(Enum): + IDLE = 0 + OPENING = 1 + CLOSING = 2 + OPEN = 3 + ERROR = 4 + + +class ManagementOpenResult(Enum): + OPENING = 0 + OK = 1 + ERROR = 2 + CANCELLED = 3 + + +class ManagementExecuteOperationResult(Enum): + OK = 0 + ERROR = 1 + FAILED_BAD_STATUS = 2 + LINK_CLOSED = 3 + + +class CbsState(Enum): + CLOSED = 0 + OPENING = 1 + OPEN = 2 + ERROR = 3 + + +class CbsAuthState(Enum): + OK = 0 + IDLE = 1 + IN_PROGRESS = 2 + TIMEOUT = 3 + REFRESH_REQUIRED = 4 + EXPIRED = 5 + ERROR = 6 # Put token rejected or complete but fail authentication + FAILURE = 7 # Fail to open cbs links + + +class Role(object): + """Link endpoint role. + + Valid Values: + - False: Sender + - True: Receiver + + + + + + """ + + Sender = False + Receiver = True + + +class SenderSettleMode(object): + """Settlement policy for a Sender. + + Valid Values: + - 0: The Sender will send all deliveries initially unsettled to the Receiver. + - 1: The Sender will send all deliveries settled to the Receiver. + - 2: The Sender may send a mixture of settled and unsettled deliveries to the Receiver. + + + + + + + """ + + Unsettled = 0 + Settled = 1 + Mixed = 2 + + +class ReceiverSettleMode(object): + """Settlement policy for a Receiver. + + Valid Values: + - 0: The Receiver will spontaneously settle all incoming transfers. + - 1: The Receiver will only settle after sending the disposition to the Sender and + receiving a disposition indicating settlement of the delivery from the sender. + + + + + + """ + + First = 0 + Second = 1 + + +class SASLCode(object): + """Codes to indicate the outcome of the sasl dialog. + + + + + + + + + """ + + #: Connection authentication succeeded. + Ok = 0 + #: Connection authentication failed due to an unspecified problem with the supplied credentials. + Auth = 1 + #: Connection authentication failed due to a system error. + Sys = 2 + #: Connection authentication failed due to a system error that is unlikely to be corrected without intervention. + SysPerm = 3 + #: Connection authentication failed due to a transient system error. + SysTemp = 4 + + +class MessageDeliveryState(object): + WaitingToBeSent = 0 + WaitingForSendAck = 1 + Ok = 2 + Error = 3 + Timeout = 4 + Cancelled = 5 + + +MESSAGE_DELIVERY_DONE_STATES = ( + MessageDeliveryState.Ok, + MessageDeliveryState.Error, + MessageDeliveryState.Timeout, + MessageDeliveryState.Cancelled, +) + + +class TransportType(Enum): + """Transport type + The underlying transport protocol type: + Amqp: AMQP over the default TCP transport protocol, it uses port 5671. + AmqpOverWebsocket: Amqp over the Web Sockets transport protocol, it uses + port 443. + """ + + Amqp = 1 + AmqpOverWebsocket = 2 + + def __eq__(self, __o: object) -> bool: + try: + __o = cast(Enum, __o) + return self.value == __o.value + except AttributeError: + return super().__eq__(__o) diff --git a/rstream/_pyamqp/message.py b/rstream/_pyamqp/message.py new file mode 100644 index 0000000..a409819 --- /dev/null +++ b/rstream/_pyamqp/message.py @@ -0,0 +1,301 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- +# type: ignore + +""" +isort:skip_file +""" + +# TODO: fix mypy errors for _code/_definition/__defaults__ (issue #26500) +from typing import ( + TYPE_CHECKING, + Any, + Dict, + List, + NamedTuple, + Optional, + Union, +) + +from typing_extensions import TypedDict + +from .constants import FIELD, MessageDeliveryState +from .performatives import _CAN_ADD_DOCSTRING +from .types import AMQPTypes, FieldDefinition + +if TYPE_CHECKING: + from uuid import UUID + + class MessageDict(TypedDict): # needed for use with spread operator + """ + Typing for Message, used with the spread operator. + """ + + header: Optional["Header"] + delivery_annotations: Optional[Dict[Union[str, bytes], Any]] + message_annotations: Optional[Dict[Union[str, bytes], Any]] + properties: Optional["Properties"] + application_properties: Optional[Dict[Union[str, bytes], Any]] + body: Optional[bytes] + sequence: Optional[List[Any]] + value: Optional[Any] + footer: Optional[Dict[Any, Any]] + + +class Header(NamedTuple): + durable: Optional[bool] = None + priority: Optional[int] = None + ttl: Optional[int] = None + first_acquirer: Optional[bool] = None + delivery_count: Optional[int] = None + + +Header._code = 0x00000070 # type: ignore # pylint:disable=protected-access +Header._definition = ( # type: ignore # pylint:disable=protected-access + FIELD("durable", AMQPTypes.boolean, False, None, False), + FIELD("priority", AMQPTypes.ubyte, False, None, False), + FIELD("ttl", AMQPTypes.uint, False, None, False), + FIELD("first_acquirer", AMQPTypes.boolean, False, None, False), + FIELD("delivery_count", AMQPTypes.uint, False, None, False), +) + +if _CAN_ADD_DOCSTRING: + Header.__doc__ = """ + Transport headers for a Message. + + The header section carries standard delivery details about the transfer of a Message through the AMQP + network. If the header section is omitted the receiver MUST assume the appropriate default values for + the fields within the header unless other target or node specific defaults have otherwise been set. + + :param bool durable: Specify durability requirements. + Durable Messages MUST NOT be lost even if an intermediary is unexpectedly terminated and restarted. + A target which is not capable of fulfilling this guarantee MUST NOT accept messages where the durable + header is set to true: if the source allows the rejected outcome then the message should be rejected + with the precondition-failed error, otherwise the link must be detached by the receiver with the same error. + :param int priority: Relative Message priority. + This field contains the relative Message priority. Higher numbers indicate higher priority Messages. + Messages with higher priorities MAY be delivered before those with lower priorities. An AMQP intermediary + implementing distinct priority levels MUST do so in the following manner: + + - If n distince priorities are implemented and n is less than 10 - priorities 0 to (5 - ceiling(n/2)) + MUST be treated equivalently and MUST be the lowest effective priority. The priorities (4 + fioor(n/2)) + and above MUST be treated equivalently and MUST be the highest effective priority. The priorities + (5 ceiling(n/2)) to (4 + fioor(n/2)) inclusive MUST be treated as distinct priorities. + - If n distinct priorities are implemented and n is 10 or greater - priorities 0 to (n - 1) MUST be + distinct, and priorities n and above MUST be equivalent to priority (n - 1). Thus, for example, if 2 + distinct priorities are implemented, then levels 0 to 4 are equivalent, and levels 5 to 9 are equivalent + and levels 4 and 5 are distinct. If 3 distinct priorities are implements the 0 to 3 are equivalent, + 5 to 9 are equivalent and 3, 4 and 5 are distinct. This scheme ensures that if two priorities are distinct + for a server which implements m separate priority levels they are also distinct for a server which + implements n different priority levels where n > m. + + :param int ttl: Time to live in ms. + Duration in milliseconds for which the Message should be considered 'live'. If this is set then a message + expiration time will be computed based on the time of arrival at an intermediary. Messages that live longer + than their expiration time will be discarded (or dead lettered). When a message is transmitted by an + intermediary that was received with a ttl, the transmitted message's header should contain a ttl that is + computed as the difference between the current time and the formerly computed message expiration + time, i.e. the reduced ttl, so that messages will eventually die if they end up in a delivery loop. + :param bool first_acquirer: If this value is true, then this message has not been acquired by any other Link. + If this value is false, then this message may have previously been acquired by another Link or Links. + :param int delivery_count: The number of prior unsuccessful delivery attempts. + The number of unsuccessful previous attempts to deliver this message. If this value is non-zero it may + be taken as an indication that the delivery may be a duplicate. On first delivery, the value is zero. + It is incremented upon an outcome being settled at the sender, according to rules defined for each outcome. + """ + + +class Properties(NamedTuple): + message_id: Optional[Union[str, bytes, "UUID"]] = None + user_id: Optional[Union[str, bytes]] = None + to: Optional[Union[str, bytes]] = None + subject: Optional[Union[str, bytes]] = None + reply_to: Optional[Union[str, bytes]] = None + correlation_id: Optional[Union[str, bytes]] = None + content_type: Optional[Union[str, bytes]] = None + content_encoding: Optional[Union[str, bytes]] = None + absolute_expiry_time: Optional[int] = None + creation_time: Optional[int] = None + group_id: Optional[Union[str, bytes]] = None + group_sequence: Optional[int] = None + reply_to_group_id: Optional[Union[str, bytes]] = None + + +Properties._code = 0x00000073 # type: ignore # pylint:disable=protected-access +Properties._definition = ( # type: ignore # pylint:disable=protected-access + FIELD("message_id", FieldDefinition.message_id, False, None, False), + FIELD("user_id", AMQPTypes.binary, False, None, False), + FIELD("to", AMQPTypes.string, False, None, False), + FIELD("subject", AMQPTypes.string, False, None, False), + FIELD("reply_to", AMQPTypes.string, False, None, False), + FIELD("correlation_id", FieldDefinition.message_id, False, None, False), + FIELD("content_type", AMQPTypes.symbol, False, None, False), + FIELD("content_encoding", AMQPTypes.symbol, False, None, False), + FIELD("absolute_expiry_time", AMQPTypes.timestamp, False, None, False), + FIELD("creation_time", AMQPTypes.timestamp, False, None, False), + FIELD("group_id", AMQPTypes.string, False, None, False), + FIELD("group_sequence", AMQPTypes.uint, False, None, False), + FIELD("reply_to_group_id", AMQPTypes.string, False, None, False), +) + +if _CAN_ADD_DOCSTRING: + Properties.__doc__ = """ + Immutable properties of the Message. + + The properties section is used for a defined set of standard properties of the message. The properties + section is part of the bare message and thus must, if retransmitted by an intermediary, remain completely + unaltered. + + :param message_id: Application Message identifier. + Message-id is an optional property which uniquely identifies a Message within the Message system. + The Message producer is usually responsible for setting the message-id in such a way that it is assured + to be globally unique. A broker MAY discard a Message as a duplicate if the value of the message-id + matches that of a previously received Message sent to the same Node. + :param bytes user_id: Creating user id. + The identity of the user responsible for producing the Message. The client sets this value, and it MAY + be authenticated by intermediaries. + :param to: The address of the Node the Message is destined for. + The to field identifies the Node that is the intended destination of the Message. On any given transfer + this may not be the Node at the receiving end of the Link. + :param str subject: The subject of the message. + A common field for summary information about the Message content and purpose. + :param reply_to: The Node to send replies to. + The address of the Node to send replies to. + :param correlation_id: Application correlation identifier. + This is a client-specific id that may be used to mark or identify Messages between clients. + :param bytes content_type: MIME content type. + The RFC-2046 MIME type for the Message's application-data section (body). As per RFC-2046 this may contain + a charset parameter defining the character encoding used: e.g. 'text/plain; charset="utf-8"'. + For clarity, the correct MIME type for a truly opaque binary section is application/octet-stream. + When using an application-data section with a section code other than data, contenttype, if set, SHOULD + be set to a MIME type of message/x-amqp+?, where '?' is either data, map or list. + :param bytes content_encoding: MIME content type. + The Content-Encoding property is used as a modifier to the content-type. When present, its value indicates + what additional content encodings have been applied to the application-data, and thus what decoding + mechanisms must be applied in order to obtain the media-type referenced by the content-type header field. + Content-Encoding is primarily used to allow a document to be compressed without losing the identity of + its underlying content type. Content Encodings are to be interpreted as per Section 3.5 of RFC 2616. + Valid Content Encodings are registered at IANA as "Hypertext Transfer Protocol (HTTP) Parameters" + (http://www.iana.org/assignments/http-parameters/httpparameters.xml). Content-Encoding MUST not be set when + the application-data section is other than data. Implementations MUST NOT use the identity encoding. + Instead, implementations should not set this property. Implementations SHOULD NOT use the compress + encoding, except as to remain compatible with messages originally sent with other protocols, + e.g. HTTP or SMTP. Implementations SHOULD NOT specify multiple content encoding values except as to be + compatible with messages originally sent with other protocols, e.g. HTTP or SMTP. + :param datetime absolute_expiry_time: The time when this message is considered expired. + An absolute time when this message is considered to be expired. + :param datetime creation_time: The time when this message was created. + An absolute time when this message was created. + :param str group_id: The group this message belongs to. + Identifies the group the message belongs to. + :param int group_sequence: The sequence-no of this message within its group. + The relative position of this message within its group. + :param str reply_to_group_id: The group the reply message belongs to. + This is a client-specific id that is used so that client can send replies to this message to a specific group. + """ + + +# TODO: should be a class, namedtuple or dataclass, immutability vs performance, need to collect performance data +class Message(NamedTuple): + header: Optional[Header] = None + delivery_annotations: Optional[Dict[Union[str, bytes], Any]] = None + message_annotations: Optional[Dict[Union[str, bytes], Any]] = None + properties: Optional[Properties] = None + application_properties: Optional[Dict[Union[str, bytes], Any]] = None + body: Optional[bytes] = None + sequence: Optional[List[Any]] = None + value: Optional[Any] = None + footer: Optional[Dict[Any, Any]] = None + + +Message._code = 0 # type: ignore # pylint:disable=protected-access +Message._definition = ( # type: ignore # pylint:disable=protected-access + (0x00000070, FIELD("header", Header, False, None, False)), + (0x00000071, FIELD("delivery_annotations", FieldDefinition.annotations, False, None, False)), + (0x00000072, FIELD("message_annotations", FieldDefinition.annotations, False, None, False)), + (0x00000073, FIELD("properties", Properties, False, None, False)), + (0x00000074, FIELD("application_properties", AMQPTypes.map, False, None, False)), + (0x00000075, FIELD("body", AMQPTypes.binary, False, None, True)), + (0x00000076, FIELD("sequence", AMQPTypes.list, False, None, False)), + (0x00000077, FIELD("value", None, False, None, False)), + (0x00000078, FIELD("footer", FieldDefinition.annotations, False, None, False)), +) +if _CAN_ADD_DOCSTRING: + Message.__doc__ = """ + An annotated message consists of the bare message plus sections for annotation at the head and tail + of the bare message. + + There are two classes of annotations: annotations that travel with the message indefinitely, and + annotations that are consumed by the next node. + The exact structure of a message, together with its encoding, is defined by the message format. This document + defines the structure and semantics of message format 0 (MESSAGE-FORMAT). Altogether a message consists of the + following sections: + + - Zero or one header. + - Zero or one delivery-annotations. + - Zero or one message-annotations. + - Zero or one properties. + - Zero or one application-properties. + - The body consists of either: one or more data sections, one or more amqp-sequence sections, + or a single amqp-value section. + - Zero or one footer. + + :param ~uamqp.message.Header header: Transport headers for a Message. + The header section carries standard delivery details about the transfer of a Message through the AMQP + network. If the header section is omitted the receiver MUST assume the appropriate default values for + the fields within the header unless other target or node specific defaults have otherwise been set. + :param dict delivery_annotations: The delivery-annotations section is used for delivery-specific non-standard + properties at the head of the message. Delivery annotations convey information from the sending peer to + the receiving peer. If the recipient does not understand the annotation it cannot be acted upon and its + effects (such as any implied propagation) cannot be acted upon. Annotations may be specific to one + implementation, or common to multiple implementations. The capabilities negotiated on link attach and on + the source and target should be used to establish which annotations a peer supports. A registry of defined + annotations and their meanings can be found here: http://www.amqp.org/specification/1.0/delivery-annotations. + If the delivery-annotations section is omitted, it is equivalent to a delivery-annotations section + containing an empty map of annotations. + :param dict message_annotations: The message-annotations section is used for properties of the message which + are aimed at the infrastructure and should be propagated across every delivery step. Message annotations + convey information about the message. Intermediaries MUST propagate the annotations unless the annotations + are explicitly augmented or modified (e.g. by the use of the modified outcome). + The capabilities negotiated on link attach and on the source and target may be used to establish which + annotations a peer understands, however it a network of AMQP intermediaries it may not be possible to know + if every intermediary will understand the annotation. Note that for some annotation it may not be necessary + for the intermediary to understand their purpose - they may be being used purely as an attribute which can be + filtered on. A registry of defined annotations and their meanings can be found here: + http://www.amqp.org/specification/1.0/message-annotations. If the message-annotations section is omitted, + it is equivalent to a message-annotations section containing an empty map of annotations. + :param ~uamqp.message.Properties: Immutable properties of the Message. + The properties section is used for a defined set of standard properties of the message. The properties + section is part of the bare message and thus must, if retransmitted by an intermediary, remain completely + unaltered. + :param dict application_properties: The application-properties section is a part of the bare message used + for structured application data. Intermediaries may use the data within this structure for the purposes + of filtering or routing. The keys of this map are restricted to be of type string (which excludes the + possibility of a null key) and the values are restricted to be of simple types only (that is excluding + map, list, and array types). + :param list(bytes) data_body: A data section contains opaque binary data. + :param list sequence_body: A sequence section contains an arbitrary number of structured data elements. + :param value_body: An amqp-value section contains a single AMQP value. + :param dict footer: Transport footers for a Message. + The footer section is used for details about the message or delivery which can only be calculated or + evaluated once the whole bare message has been constructed or seen (for example message hashes, HMACs, + signatures and encryption details). A registry of defined footers and their meanings can be found + here: http://www.amqp.org/specification/1.0/footer. + """ + + +class BatchMessage(Message): + _code = 0x80013700 + + +class _MessageDelivery: + def __init__(self, message, state=MessageDeliveryState.WaitingToBeSent, expiry=None): + self.message = message + self.state = state + self.expiry = expiry + self.reason = None + self.delivery = None + self.error = None diff --git a/rstream/_pyamqp/performatives.py b/rstream/_pyamqp/performatives.py new file mode 100644 index 0000000..f2dafaa --- /dev/null +++ b/rstream/_pyamqp/performatives.py @@ -0,0 +1,647 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- +# type: ignore + +""" +isort:skip_file +""" + +import sys + +# TODO: fix mypy errors for _code/_definition/__defaults__ (issue #26500) +from collections import namedtuple +from typing import NamedTuple, Optional + +from typing_extensions import Buffer + +from .constants import FIELD +from .types import AMQPTypes, FieldDefinition, ObjDefinition + +_CAN_ADD_DOCSTRING = sys.version_info.major >= 3 + + +OpenFrame = namedtuple( + "OpenFrame", + [ + "container_id", + "hostname", + "max_frame_size", + "channel_max", + "idle_timeout", + "outgoing_locales", + "incoming_locales", + "offered_capabilities", + "desired_capabilities", + "properties", + ], +) +OpenFrame._code = 0x00000010 # type: ignore # pylint:disable=protected-access +OpenFrame._definition = ( # type: ignore # pylint:disable=protected-access + FIELD("container_id", AMQPTypes.string, True, None, False), + FIELD("hostname", AMQPTypes.string, False, None, False), + FIELD("max_frame_size", AMQPTypes.uint, False, 4294967295, False), + FIELD("channel_max", AMQPTypes.ushort, False, 65535, False), + FIELD("idle_timeout", AMQPTypes.uint, False, None, False), + FIELD("outgoing_locales", AMQPTypes.symbol, False, None, True), + FIELD("incoming_locales", AMQPTypes.symbol, False, None, True), + FIELD("offered_capabilities", AMQPTypes.symbol, False, None, True), + FIELD("desired_capabilities", AMQPTypes.symbol, False, None, True), + FIELD("properties", FieldDefinition.fields, False, None, False), +) +if _CAN_ADD_DOCSTRING: + OpenFrame.__doc__ = """ + OPEN performative. Negotiate Connection parameters. + + The first frame sent on a connection in either direction MUST contain an Open body. + (Note that theConnection header which is sent first on the Connection is *not* a frame.) + The fields indicate thecapabilities and limitations of the sending peer. + + :param str container_id: The ID of the source container. + :param str hostname: The name of the target host. + The dns name of the host (either fully qualified or relative) to which the sendingpeer is connecting. + It is not mandatory to provide the hostname. If no hostname isprovided the receiving peer should select + a default based on its own configuration.This field can be used by AMQP proxies to determine the correct + back-end service toconnect the client to.This field may already have been specified by the sasl-init frame, + if a SASL layer is used, or, the server name indication extension as described in RFC-4366, if a TLSlayer + is used, in which case this field SHOULD be null or contain the same value. It is undefined what a different + value to those already specific means. + :param int max_frame_size: Proposed maximum frame size in bytes. + The largest frame size that the sending peer is able to accept on this Connection. + If this field is not set it means that the peer does not impose any specific limit. A peer MUST NOT send + frames larger than its partner can handle. A peer that receives an oversized frame MUST close the Connection + with the framing-error error-code. Both peers MUST accept frames of up to 512 (MIN-MAX-FRAME-SIZE) + octets large. + :param int channel_max: The maximum channel number that may be used on the Connection. + The channel-max value is the highest channel number that may be used on the Connection. This value plus one + is the maximum number of Sessions that can be simultaneously active on the Connection. A peer MUST not use + channel numbers outside the range that its partner can handle. A peer that receives a channel number + outside the supported range MUST close the Connection with the framing-error error-code. + :param int idle_timeout: Idle time-out in milliseconds. + The idle time-out required by the sender. A value of zero is the same as if it was not set (null). If the + receiver is unable or unwilling to support the idle time-out then it should close the connection with + an error explaining why (eg, because it is too small). If the value is not set, then the sender does not + have an idle time-out. However, senders doing this should be aware that implementations MAY choose to use + an internal default to efficiently manage a peer's resources. + :param list(str) outgoing_locales: Locales available for outgoing text. + A list of the locales that the peer supports for sending informational text. This includes Connection, + Session and Link error descriptions. A peer MUST support at least the en-US locale. Since this value + is always supported, it need not be supplied in the outgoing-locales. A null value or an empty list implies + that only en-US is supported. + :param list(str) incoming_locales: Desired locales for incoming text in decreasing level of preference. + A list of locales that the sending peer permits for incoming informational text. This list is ordered in + decreasing level of preference. The receiving partner will chose the first (most preferred) incoming locale + from those which it supports. If none of the requested locales are supported, en-US will be chosen. Note + that en-US need not be supplied in this list as it is always the fallback. A peer may determine which of the + permitted incoming locales is chosen by examining the partner's supported locales asspecified in the + outgoing_locales field. A null value or an empty list implies that only en-US is supported. + :param list(str) offered_capabilities: The extension capabilities the sender supports. + If the receiver of the offered-capabilities requires an extension capability which is not present in the + offered-capability list then it MUST close the connection. A list of commonly defined connection capabilities + and their meanings can be found here: http://www.amqp.org/specification/1.0/connection-capabilities. + :param list(str) required_capabilities: The extension capabilities the sender may use if the receiver supports + them. The desired-capability list defines which extension capabilities the sender MAY use if the receiver + offers them (i.e. they are in the offered-capabilities list received by the sender of the + desired-capabilities). If the receiver of the desired-capabilities offers extension capabilities which are + not present in the desired-capability list it received, then it can be sure those (undesired) capabilities + will not be used on the Connection. + :param dict properties: Connection properties. + The properties map contains a set of fields intended to indicate information about the connection and its + container. A list of commonly defined connection properties and their meanings can be found + here: http://www.amqp.org/specification/1.0/connection-properties. + """ + + +BeginFrame = namedtuple( + "BeginFrame", + [ + "remote_channel", + "next_outgoing_id", + "incoming_window", + "outgoing_window", + "handle_max", + "offered_capabilities", + "desired_capabilities", + "properties", + ], +) +BeginFrame._code = 0x00000011 # type: ignore # pylint:disable=protected-access +BeginFrame._definition = ( # type: ignore # pylint:disable=protected-access + FIELD("remote_channel", AMQPTypes.ushort, False, None, False), + FIELD("next_outgoing_id", AMQPTypes.uint, True, None, False), + FIELD("incoming_window", AMQPTypes.uint, True, None, False), + FIELD("outgoing_window", AMQPTypes.uint, True, None, False), + FIELD("handle_max", AMQPTypes.uint, False, 4294967295, False), + FIELD("offered_capabilities", AMQPTypes.symbol, False, None, True), + FIELD("desired_capabilities", AMQPTypes.symbol, False, None, True), + FIELD("properties", FieldDefinition.fields, False, None, False), +) +if _CAN_ADD_DOCSTRING: + BeginFrame.__doc__ = """ + BEGIN performative. Begin a Session on a channel. + + Indicate that a Session has begun on the channel. + + :param int remote_channel: The remote channel for this Session. + If a Session is locally initiated, the remote-channel MUST NOT be set. When an endpoint responds to a + remotely initiated Session, the remote-channel MUST be set to the channel on which the remote Session + sent the begin. + :param int next_outgoing_id: The transfer-id of the first transfer id the sender will send. + The next-outgoing-id is used to assign a unique transfer-id to all outgoing transfer frames on a given + session. The next-outgoing-id may be initialized to an arbitrary value and is incremented after each + successive transfer according to RFC-1982 serial number arithmetic. + :param int incoming_window: The initial incoming-window of the sender. + The incoming-window defines the maximum number of incoming transfer frames that the endpoint can currently + receive. This identifies a current maximum incoming transfer-id that can be computed by subtracting one + from the sum of incoming-window and next-incoming-id. + :param int outgoing_window: The initial outgoing-window of the sender. + The outgoing-window defines the maximum number of outgoing transfer frames that the endpoint can currently + send. This identifies a current maximum outgoing transfer-id that can be computed by subtracting one from + the sum of outgoing-window and next-outgoing-id. + :param int handle_max: The maximum handle value that may be used on the Session. + The handle-max value is the highest handle value that may be used on the Session. A peer MUST NOT attempt + to attach a Link using a handle value outside the range that its partner can handle. A peer that receives + a handle outside the supported range MUST close the Connection with the framing-error error-code. + :param list(str) offered_capabilities: The extension capabilities the sender supports. + A list of commonly defined session capabilities and their meanings can be found + here: http://www.amqp.org/specification/1.0/session-capabilities. + :param list(str) desired_capabilities: The extension capabilities the sender may use if the receiver + supports them. + :param dict properties: Session properties. + The properties map contains a set of fields intended to indicate information about the session and its + container. A list of commonly defined session properties and their meanings can be found + here: http://www.amqp.org/specification/1.0/session-properties. + """ + + +AttachFrame = namedtuple( + "AttachFrame", + [ + "name", + "handle", + "role", + "send_settle_mode", + "rcv_settle_mode", + "source", + "target", + "unsettled", + "incomplete_unsettled", + "initial_delivery_count", + "max_message_size", + "offered_capabilities", + "desired_capabilities", + "properties", + ], +) +AttachFrame._code = 0x00000012 # type: ignore # pylint:disable=protected-access +AttachFrame._definition = ( # type: ignore # pylint:disable=protected-access + FIELD("name", AMQPTypes.string, True, None, False), + FIELD("handle", AMQPTypes.uint, True, None, False), + FIELD("role", AMQPTypes.boolean, True, None, False), + FIELD("send_settle_mode", AMQPTypes.ubyte, False, 2, False), + FIELD("rcv_settle_mode", AMQPTypes.ubyte, False, 0, False), + FIELD("source", ObjDefinition.source, False, None, False), + FIELD("target", ObjDefinition.target, False, None, False), + FIELD("unsettled", AMQPTypes.map, False, None, False), + FIELD("incomplete_unsettled", AMQPTypes.boolean, False, False, False), + FIELD("initial_delivery_count", AMQPTypes.uint, False, None, False), + FIELD("max_message_size", AMQPTypes.ulong, False, None, False), + FIELD("offered_capabilities", AMQPTypes.symbol, False, None, True), + FIELD("desired_capabilities", AMQPTypes.symbol, False, None, True), + FIELD("properties", FieldDefinition.fields, False, None, False), +) +if _CAN_ADD_DOCSTRING: + AttachFrame.__doc__ = """ + ATTACH performative. Attach a Link to a Session. + + The attach frame indicates that a Link Endpoint has been attached to the Session. The opening flag + is used to indicate that the Link Endpoint is newly created. + + :param str name: The name of the link. + This name uniquely identifies the link from the container of the source to the container of the target + node, e.g. if the container of the source node is A, and the container of the target node is B, the link + may be globally identified by the (ordered) tuple(A,B,). + :param int handle: The handle of the link. + The handle MUST NOT be used for other open Links. An attempt to attach using a handle which is already + associated with a Link MUST be responded to with an immediate close carrying a Handle-in-usesession-error. + To make it easier to monitor AMQP link attach frames, it is recommended that implementations always assign + the lowest available handle to this field. + :param bool role: The role of the link endpoint. Either Role.Sender (False) or Role.Receiver (True). + :param str send_settle_mode: The settlement mode for the Sender. + Determines the settlement policy for deliveries sent at the Sender. When set at the Receiver this indicates + the desired value for the settlement mode at the Sender. When set at the Sender this indicates the actual + settlement mode in use. + :param str rcv_settle_mode: The settlement mode of the Receiver. + Determines the settlement policy for unsettled deliveries received at the Receiver. When set at the Sender + this indicates the desired value for the settlement mode at the Receiver. When set at the Receiver this + indicates the actual settlement mode in use. + :param ~uamqp.messaging.Source source: The source for Messages. + If no source is specified on an outgoing Link, then there is no source currently attached to the Link. + A Link with no source will never produce outgoing Messages. + :param ~uamqp.messaging.Target target: The target for Messages. + If no target is specified on an incoming Link, then there is no target currently attached to the Link. + A Link with no target will never permit incoming Messages. + :param dict unsettled: Unsettled delivery state. + This is used to indicate any unsettled delivery states when a suspended link is resumed. The map is keyed + by delivery-tag with values indicating the delivery state. The local and remote delivery states for a given + delivery-tag MUST be compared to resolve any in-doubt deliveries. If necessary, deliveries MAY be resent, + or resumed based on the outcome of this comparison. If the local unsettled map is too large to be encoded + within a frame of the agreed maximum frame size then the session may be ended with the + frame-size-too-smallerror. The endpoint SHOULD make use of the ability to send an incomplete unsettled map + to avoid sending an error. The unsettled map MUST NOT contain null valued keys. When reattaching + (as opposed to resuming), the unsettled map MUST be null. + :param bool incomplete_unsettled: + If set to true this field indicates that the unsettled map provided is not complete. When the map is + incomplete the recipient of the map cannot take the absence of a delivery tag from the map as evidence of + settlement. On receipt of an incomplete unsettled map a sending endpoint MUST NOT send any new deliveries + (i.e. deliveries where resume is not set to true) to its partner (and a receiving endpoint which sent an + incomplete unsettled map MUST detach with an error on receiving a transfer which does not have the resume + flag set to true). + :param int initial_delivery_count: This MUST NOT be null if role is sender, + and it is ignored if the role is receiver. + :param int max_message_size: The maximum message size supported by the link endpoint. + This field indicates the maximum message size supported by the link endpoint. Any attempt to deliver a + message larger than this results in a message-size-exceeded link-error. If this field is zero or unset, + there is no maximum size imposed by the link endpoint. + :param list(str) offered_capabilities: The extension capabilities the sender supports. + A list of commonly defined session capabilities and their meanings can be found + here: http://www.amqp.org/specification/1.0/link-capabilities. + :param list(str) desired_capabilities: The extension capabilities the sender may use if the receiver + supports them. + :param dict properties: Link properties. + The properties map contains a set of fields intended to indicate information about the link and its + container. A list of commonly defined link properties and their meanings can be found + here: http://www.amqp.org/specification/1.0/link-properties. + """ + + +FlowFrame = namedtuple( + "FlowFrame", + [ + "next_incoming_id", + "incoming_window", + "next_outgoing_id", + "outgoing_window", + "handle", + "delivery_count", + "link_credit", + "available", + "drain", + "echo", + "properties", + ], +) +FlowFrame.__new__.__defaults__ = (None, None, None, None, None, None, None) # type: ignore +FlowFrame._code = 0x00000013 # type: ignore # pylint:disable=protected-access +FlowFrame._definition = ( # type: ignore # pylint:disable=protected-access + FIELD("next_incoming_id", AMQPTypes.uint, False, None, False), + FIELD("incoming_window", AMQPTypes.uint, True, None, False), + FIELD("next_outgoing_id", AMQPTypes.uint, True, None, False), + FIELD("outgoing_window", AMQPTypes.uint, True, None, False), + FIELD("handle", AMQPTypes.uint, False, None, False), + FIELD("delivery_count", AMQPTypes.uint, False, None, False), + FIELD("link_credit", AMQPTypes.uint, False, None, False), + FIELD("available", AMQPTypes.uint, False, None, False), + FIELD("drain", AMQPTypes.boolean, False, False, False), + FIELD("echo", AMQPTypes.boolean, False, False, False), + FIELD("properties", FieldDefinition.fields, False, None, False), +) +if _CAN_ADD_DOCSTRING: + FlowFrame.__doc__ = """ + FLOW performative. Update link state. + + Updates the flow state for the specified Link. + + :param int next_incoming_id: Identifies the expected transfer-id of the next incoming transfer frame. + This value is not set if and only if the sender has not yet received the begin frame for the session. + :param int incoming_window: Defines the maximum number of incoming transfer frames that the endpoint + concurrently receive. + :param int next_outgoing_id: The transfer-id that will be assigned to the next outgoing transfer frame. + :param int outgoing_window: Defines the maximum number of outgoing transfer frames that the endpoint could + potentially currently send, if it was not constrained by restrictions imposed by its peer's incoming-window. + :param int handle: If set, indicates that the flow frame carries flow state information for the local Link + Endpoint associated with the given handle. If not set, the flow frame is carrying only information + pertaining to the Session Endpoint. If set to a handle that is not currently associated with an attached + Link, the recipient MUST respond by ending the session with an unattached-handle session error. + :param int delivery_count: The endpoint's delivery-count. + When the handle field is not set, this field MUST NOT be set. When the handle identifies that the flow + state is being sent from the Sender Link Endpoint to Receiver Link Endpoint this field MUST be set to the + current delivery-count of the Link Endpoint. When the flow state is being sent from the Receiver Endpoint + to the Sender Endpoint this field MUST be set to the last known value of the corresponding Sending Endpoint. + In the event that the Receiving Link Endpoint has not yet seen the initial attach frame from the Sender + this field MUST NOT be set. + :param int link_credit: The current maximum number of Messages that can be received. + The current maximum number of Messages that can be handled at the Receiver Endpoint of the Link. Only the + receiver endpoint can independently set this value. The sender endpoint sets this to the last known + value seen from the receiver. When the handle field is not set, this field MUST NOT be set. + :param int available: The number of available Messages. + The number of Messages awaiting credit at the link sender endpoint. Only the sender can independently set + this value. The receiver sets this to the last known value seen from the sender. When the handle field is + not set, this field MUST NOT be set. + :param bool drain: Indicates drain mode. + When flow state is sent from the sender to the receiver, this field contains the actual drain mode of the + sender. When flow state is sent from the receiver to the sender, this field contains the desired drain + mode of the receiver. When the handle field is not set, this field MUST NOT be set. + :param bool echo: Request link state from other endpoint. + :param dict properties: Link state properties. + A list of commonly defined link state properties and their meanings can be found + here: http://www.amqp.org/specification/1.0/link-state-properties. + """ + + +class TransferFrame(NamedTuple): + handle: Optional[int] = None + delivery_id: Optional[int] = None + delivery_tag: Optional[bytes] = None + message_format: Optional[int] = None + settled: Optional[bool] = None + more: Optional[bool] = None + rcv_settle_mode: Optional[str] = None + state: Optional[bytes] = None + resume: Optional[bool] = None + aborted: Optional[bool] = None + batchable: Optional[bool] = None + payload: Optional[Buffer] = None + + +TransferFrame._code = 0x00000014 # type: ignore # pylint:disable=protected-access +TransferFrame._definition = ( # type: ignore # pylint:disable=protected-access + FIELD("handle", AMQPTypes.uint, True, None, False), + FIELD("delivery_id", AMQPTypes.uint, False, None, False), + FIELD("delivery_tag", AMQPTypes.binary, False, None, False), + FIELD("message_format", AMQPTypes.uint, False, 0, False), + FIELD("settled", AMQPTypes.boolean, False, None, False), + FIELD("more", AMQPTypes.boolean, False, False, False), + FIELD("rcv_settle_mode", AMQPTypes.ubyte, False, None, False), + FIELD("state", ObjDefinition.delivery_state, False, None, False), + FIELD("resume", AMQPTypes.boolean, False, False, False), + FIELD("aborted", AMQPTypes.boolean, False, False, False), + FIELD("batchable", AMQPTypes.boolean, False, False, False), + None, +) + +if _CAN_ADD_DOCSTRING: + TransferFrame.__doc__ = """ + TRANSFER performative. Transfer a Message. + + The transfer frame is used to send Messages across a Link. Messages may be carried by a single transfer up + to the maximum negotiated frame size for the Connection. Larger Messages may be split across several + transfer frames. + + :param int handle: Specifies the Link on which the Message is transferred. + :param int delivery_id: Alias for delivery-tag. + The delivery-id MUST be supplied on the first transfer of a multi-transfer delivery. On continuation + transfers the delivery-id MAY be omitted. It is an error if the delivery-id on a continuation transfer + differs from the delivery-id on the first transfer of a delivery. + :param bytes delivery_tag: Uniquely identifies the delivery attempt for a given Message on this Link. + This field MUST be specified for the first transfer of a multi transfer message and may only be + omitted for continuation transfers. + :param int message_format: Indicates the message format. + This field MUST be specified for the first transfer of a multi transfer message and may only be omitted + for continuation transfers. + :param bool settled: If not set on the first (or only) transfer for a delivery, then the settled flag MUST + be interpreted as being false. For subsequent transfers if the settled flag is left unset then it MUST be + interpreted as true if and only if the value of the settled flag on any of the preceding transfers was + true; if no preceding transfer was sent with settled being true then the value when unset MUST be taken + as false. If the negotiated value for snd-settle-mode at attachment is settled, then this field MUST be + true on at least one transfer frame for a delivery (i.e. the delivery must be settled at the Sender at + the point the delivery has been completely transferred). If the negotiated value for snd-settle-mode at + attachment is unsettled, then this field MUST be false (or unset) on every transfer frame for a delivery + (unless the delivery is aborted). + :param bool more: Indicates that the Message has more content. + Note that if both the more and aborted fields are set to true, the aborted flag takes precedence. That is + a receiver should ignore the value of the more field if the transfer is marked as aborted. A sender + SHOULD NOT set the more flag to true if it also sets the aborted flag to true. + :param str rcv_settle_mode: If first, this indicates that the Receiver MUST settle the delivery once it has + arrived without waiting for the Sender to settle first. If second, this indicates that the Receiver MUST + NOT settle until sending its disposition to the Sender and receiving a settled disposition from the sender. + If not set, this value is defaulted to the value negotiated on link attach. If the negotiated link value is + first, then it is illegal to set this field to second. If the message is being sent settled by the Sender, + the value of this field is ignored. The (implicit or explicit) value of this field does not form part of the + transfer state, and is not retained if a link is suspended and subsequently resumed. + :param bytes state: The state of the delivery at the sender. + When set this informs the receiver of the state of the delivery at the sender. This is particularly useful + when transfers of unsettled deliveries are resumed after a resuming a link. Setting the state on the + transfer can be thought of as being equivalent to sending a disposition immediately before the transfer + performative, i.e. it is the state of the delivery (not the transfer) that existed at the point the frame + was sent. Note that if the transfer performative (or an earlier disposition performative referring to the + delivery) indicates that the delivery has attained a terminal state, then no future transfer or disposition + sent by the sender can alter that terminal state. + :param bool resume: Indicates a resumed delivery. + If true, the resume flag indicates that the transfer is being used to reassociate an unsettled delivery + from a dissociated link endpoint. The receiver MUST ignore resumed deliveries that are not in its local + unsettled map. The sender MUST NOT send resumed transfers for deliveries not in its local unsettledmap. + If a resumed delivery spans more than one transfer performative, then the resume flag MUST be set to true + on the first transfer of the resumed delivery. For subsequent transfers for the same delivery the resume + flag may be set to true, or may be omitted. In the case where the exchange of unsettled maps makes clear + that all message data has been successfully transferred to the receiver, and that only the final state + (and potentially settlement) at the sender needs to be conveyed, then a resumed delivery may carry no + payload and instead act solely as a vehicle for carrying the terminal state of the delivery at the sender. + :param bool aborted: Indicates that the Message is aborted. + Aborted Messages should be discarded by the recipient (any payload within the frame carrying the performative + MUST be ignored). An aborted Message is implicitly settled. + :param bool batchable: Batchable hint. + If true, then the issuer is hinting that there is no need for the peer to urgently communicate updated + delivery state. This hint may be used to artificially increase the amount of batching an implementation + uses when communicating delivery states, and thereby save bandwidth. If the message being delivered is too + large to fit within a single frame, then the setting of batchable to true on any of the transfer + performatives for the delivery is equivalent to setting batchable to true for all the transfer performatives + for the delivery. The batchable value does not form part of the transfer state, and is not retained if a + link is suspended and subsequently resumed. + """ + + +DispositionFrame = namedtuple("DispositionFrame", ["role", "first", "last", "settled", "state", "batchable"]) +DispositionFrame._code = 0x00000015 # type: ignore # pylint:disable=protected-access +DispositionFrame._definition = ( # type: ignore # pylint:disable=protected-access + FIELD("role", AMQPTypes.boolean, True, None, False), + FIELD("first", AMQPTypes.uint, True, None, False), + FIELD("last", AMQPTypes.uint, False, None, False), + FIELD("settled", AMQPTypes.boolean, False, False, False), + FIELD("state", ObjDefinition.delivery_state, False, None, False), + FIELD("batchable", AMQPTypes.boolean, False, False, False), +) +if _CAN_ADD_DOCSTRING: + DispositionFrame.__doc__ = """ + DISPOSITION performative. Inform remote peer of delivery state changes. + + The disposition frame is used to inform the remote peer of local changes in the state of deliveries. + The disposition frame may reference deliveries from many different links associated with a session, + although all links MUST have the directionality indicated by the specified role. Note that it is possible + for a disposition sent from sender to receiver to refer to a delivery which has not yet completed + (i.e. a delivery which is spread over multiple frames and not all frames have yet been sent). The use of such + interleaving is discouraged in favor of carrying the modified state on the next transfer performative for + the delivery. The disposition performative may refer to deliveries on links that are no longer attached. + As long as the links have not been closed or detached with an error then the deliveries are still "live" and + the updated state MUST be applied. + + :param str role: Directionality of disposition. + The role identifies whether the disposition frame contains information about sending link endpoints + or receiving link endpoints. + :param int first: Lower bound of deliveries. + Identifies the lower bound of delivery-ids for the deliveries in this set. + :param int last: Upper bound of deliveries. + Identifies the upper bound of delivery-ids for the deliveries in this set. If not set, + this is taken to be the same as first. + :param bool settled: Indicates deliveries are settled. + If true, indicates that the referenced deliveries are considered settled by the issuing endpoint. + :param bytes state: Indicates state of deliveries. + Communicates the state of all the deliveries referenced by this disposition. + :param bool batchable: Batchable hint. + If true, then the issuer is hinting that there is no need for the peer to urgently communicate the impact + of the updated delivery states. This hint may be used to artificially increase the amount of batching an + implementation uses when communicating delivery states, and thereby save bandwidth. + """ + +DetachFrame = namedtuple("DetachFrame", ["handle", "closed", "error"]) +DetachFrame._code = 0x00000016 # type: ignore # pylint:disable=protected-access +DetachFrame._definition = ( # type: ignore # pylint:disable=protected-access + FIELD("handle", AMQPTypes.uint, True, None, False), + FIELD("closed", AMQPTypes.boolean, False, False, False), + FIELD("error", ObjDefinition.error, False, None, False), +) +if _CAN_ADD_DOCSTRING: + DetachFrame.__doc__ = """ + DETACH performative. Detach the Link Endpoint from the Session. + + Detach the Link Endpoint from the Session. This un-maps the handle and makes it available for + use by other Links + + :param int handle: The local handle of the link to be detached. + :param bool handle: If true then the sender has closed the link. + :param ~uamqp.error.AMQPError error: Error causing the detach. + If set, this field indicates that the Link is being detached due to an error condition. + The value of the field should contain details on the cause of the error. + """ + + +EndFrame = namedtuple("EndFrame", ["error"]) +EndFrame._code = 0x00000017 # type: ignore # pylint:disable=protected-access +EndFrame._definition = (FIELD("error", ObjDefinition.error, False, None, False),) # type: ignore # pylint:disable=protected-access # noqa: E501 +if _CAN_ADD_DOCSTRING: + EndFrame.__doc__ = """ + END performative. End the Session. + + Indicates that the Session has ended. + + :param ~uamqp.error.AMQPError error: Error causing the end. + If set, this field indicates that the Session is being ended due to an error condition. + The value of the field should contain details on the cause of the error. + """ + + +CloseFrame = namedtuple("CloseFrame", ["error"]) +CloseFrame._code = 0x00000018 # type: ignore # pylint:disable=protected-access +CloseFrame._definition = (FIELD("error", ObjDefinition.error, False, None, False),) # type: ignore # pylint:disable=protected-access # noqa: E501 +if _CAN_ADD_DOCSTRING: + CloseFrame.__doc__ = """ + CLOSE performative. Signal a Connection close. + + Sending a close signals that the sender will not be sending any more frames (or bytes of any other kind) on + the Connection. Orderly shutdown requires that this frame MUST be written by the sender. It is illegal to + send any more frames (or bytes of any other kind) after sending a close frame. + + :param ~uamqp.error.AMQPError error: Error causing the close. + If set, this field indicates that the Connection is being closed due to an error condition. + The value of the field should contain details on the cause of the error. + """ + + +SASLMechanism = namedtuple("SASLMechanism", ["sasl_server_mechanisms"]) +SASLMechanism._code = 0x00000040 # type: ignore # pylint:disable=protected-access +SASLMechanism._definition = (FIELD("sasl_server_mechanisms", AMQPTypes.symbol, True, None, True),) # type: ignore # pylint:disable=protected-access # noqa: E501 +if _CAN_ADD_DOCSTRING: + SASLMechanism.__doc__ = """ + Advertise available sasl mechanisms. + + dvertises the available SASL mechanisms that may be used for authentication. + + :param list(bytes) sasl_server_mechanisms: Supported sasl mechanisms. + A list of the sasl security mechanisms supported by the sending peer. + It is invalid for this list to be null or empty. If the sending peer does not require its partner to + authenticate with it, then it should send a list of one element with its value as the SASL mechanism + ANONYMOUS. The server mechanisms are ordered in decreasing level of preference. + """ + + +SASLInit = namedtuple("SASLInit", ["mechanism", "initial_response", "hostname"]) +SASLInit._code = 0x00000041 # type: ignore # pylint:disable=protected-access +SASLInit._definition = ( # type: ignore # pylint:disable=protected-access + FIELD("mechanism", AMQPTypes.symbol, True, None, False), + FIELD("initial_response", AMQPTypes.binary, False, None, False), + FIELD("hostname", AMQPTypes.string, False, None, False), +) +if _CAN_ADD_DOCSTRING: + SASLInit.__doc__ = """ + Initiate sasl exchange. + + Selects the sasl mechanism and provides the initial response if needed. + + :param bytes mechanism: Selected security mechanism. + The name of the SASL mechanism used for the SASL exchange. If the selected mechanism is not supported by + the receiving peer, it MUST close the Connection with the authentication-failure close-code. Each peer + MUST authenticate using the highest-level security profile it can handle from the list provided by the + partner. + :param bytes initial_response: Security response data. + A block of opaque data passed to the security mechanism. The contents of this data are defined by the + SASL security mechanism. + :param str hostname: The name of the target host. + The DNS name of the host (either fully qualified or relative) to which the sending peer is connecting. It + is not mandatory to provide the hostname. If no hostname is provided the receiving peer should select a + default based on its own configuration. This field can be used by AMQP proxies to determine the correct + back-end service to connect the client to, and to determine the domain to validate the client's credentials + against. This field may already have been specified by the server name indication extension as described + in RFC-4366, if a TLS layer is used, in which case this field SHOULD benull or contain the same value. + It is undefined what a different value to those already specific means. + """ + + +SASLChallenge = namedtuple("SASLChallenge", ["challenge"]) +SASLChallenge._code = 0x00000042 # type: ignore # pylint:disable=protected-access +SASLChallenge._definition = (FIELD("challenge", AMQPTypes.binary, True, None, False),) # type: ignore # pylint:disable=protected-access # noqa: E501 +if _CAN_ADD_DOCSTRING: + SASLChallenge.__doc__ = """ + Security mechanism challenge. + + Send the SASL challenge data as defined by the SASL specification. + + :param bytes challenge: Security challenge data. + Challenge information, a block of opaque binary data passed to the security mechanism. + """ + + +SASLResponse = namedtuple("SASLResponse", ["response"]) +SASLResponse._code = 0x00000043 # type: ignore # pylint:disable=protected-access +SASLResponse._definition = (FIELD("response", AMQPTypes.binary, True, None, False),) # type: ignore # pylint:disable=protected-access # noqa: E501 +if _CAN_ADD_DOCSTRING: + SASLResponse.__doc__ = """ + Security mechanism response. + + Send the SASL response data as defined by the SASL specification. + + :param bytes response: Security response data. + """ + + +SASLOutcome = namedtuple("SASLOutcome", ["code", "additional_data"]) +SASLOutcome._code = 0x00000044 # type: ignore # pylint:disable=protected-access +SASLOutcome._definition = ( # type: ignore # pylint:disable=protected-access + FIELD("code", AMQPTypes.ubyte, True, None, False), + FIELD("additional_data", AMQPTypes.binary, False, None, False), +) +if _CAN_ADD_DOCSTRING: + SASLOutcome.__doc__ = """ + Indicates the outcome of the sasl dialog. + + This frame indicates the outcome of the SASL dialog. Upon successful completion of the SASL dialog the + Security Layer has been established, and the peers must exchange protocol headers to either starta nested + Security Layer, or to establish the AMQP Connection. + + :param int code: Indicates the outcome of the sasl dialog. + A reply-code indicating the outcome of the SASL dialog. + :param bytes additional_data: Additional data as specified in RFC-4422. + The additional-data field carries additional data on successful authentication outcomeas specified by + the SASL specification (RFC-4422). If the authentication is unsuccessful, this field is not set. + """ diff --git a/rstream/_pyamqp/types.py b/rstream/_pyamqp/types.py new file mode 100644 index 0000000..a0796fd --- /dev/null +++ b/rstream/_pyamqp/types.py @@ -0,0 +1,90 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- +# type: ignore + +from enum import Enum + +TYPE = "TYPE" +VALUE = "VALUE" + + +class AMQPTypes(object): + null = "NULL" + boolean = "BOOL" + ubyte = "UBYTE" + byte = "BYTE" + ushort = "USHORT" + short = "SHORT" + uint = "UINT" + int = "INT" + ulong = "ULONG" + long = "LONG" + float = "FLOAT" + double = "DOUBLE" + timestamp = "TIMESTAMP" + uuid = "UUID" + binary = "BINARY" + string = "STRING" + symbol = "SYMBOL" + list = "LIST" + map = "MAP" + array = "ARRAY" + described = "DESCRIBED" + + +class FieldDefinition(Enum): + fields = "fields" + annotations = "annotations" + message_id = "message-id" + app_properties = "application-properties" + node_properties = "node-properties" + filter_set = "filter-set" + + +class ObjDefinition(Enum): + source = "source" + target = "target" + delivery_state = "delivery-state" + error = "error" + + +class ConstructorBytes(object): + null = b"\x40" + bool = b"\x56" + bool_true = b"\x41" + bool_false = b"\x42" + ubyte = b"\x50" + byte = b"\x51" + ushort = b"\x60" + short = b"\x61" + uint_0 = b"\x43" + uint_small = b"\x52" + int_small = b"\x54" + uint_large = b"\x70" + int_large = b"\x71" + ulong_0 = b"\x44" + ulong_small = b"\x53" + long_small = b"\x55" + ulong_large = b"\x80" + long_large = b"\x81" + float = b"\x72" + double = b"\x82" + timestamp = b"\x83" + uuid = b"\x98" + binary_small = b"\xA0" + binary_large = b"\xB0" + string_small = b"\xA1" + string_large = b"\xB1" + symbol_small = b"\xA3" + symbol_large = b"\xB3" + list_0 = b"\x45" + list_small = b"\xC0" + list_large = b"\xD0" + map_small = b"\xC1" + map_large = b"\xD1" + array_small = b"\xE0" + array_large = b"\xF0" + descriptor = b"\x00" diff --git a/rstream/amqp.py b/rstream/amqp.py index 3d95350..4ef5ca6 100644 --- a/rstream/amqp.py +++ b/rstream/amqp.py @@ -1,8 +1,12 @@ from __future__ import annotations -from typing import Any, Optional, Protocol, cast +from typing import Any, Optional, Protocol -import uamqp +from ._pyamqp._decode import decode_payload +from ._pyamqp._encode import encode_payload +from ._pyamqp.message import Message + +# import uamqp class _MessageProtocol(Protocol): @@ -12,15 +16,32 @@ def __bytes__(self) -> bytes: ... -class AMQPMessage(uamqp.Message, _MessageProtocol): +class AMQPMessage(Message, _MessageProtocol): def __init__(self, *args: Any, publishing_id: Optional[int] = None, **kwargs: Any): self.publishing_id = publishing_id super().__init__(*args, **kwargs) def __bytes__(self) -> bytes: - return cast(bytes, self.encode_message()) + returned_value = bytearray() + ret = encode_payload(output=returned_value, payload=self) + return bytes(ret) + + def __str__(self) -> str: + return str(self.body) def amqp_decoder(data: bytes) -> AMQPMessage: - message = AMQPMessage.decode_from_bytes(data) - return message + message = decode_payload(buffer=memoryview(data)) + returned_amqp_message = AMQPMessage( + value=message.value, + application_properties=message.application_properties, + properties=message.properties, + message_annotations=message.message_annotations, + footer=message.footer, + header=message.header, + delivery_annotations=message.delivery_annotations, + sequence=message.sequence, + body=message.body, + ) + + return returned_amqp_message diff --git a/tests/test_amqp.py b/tests/test_amqp.py index 314c6c8..7bddc53 100644 --- a/tests/test_amqp.py +++ b/tests/test_amqp.py @@ -2,13 +2,13 @@ # SPDX-License-Identifier: MIT import pytest -import uamqp.message from rstream import ( AMQPMessage, Consumer, MessageContext, Producer, + Properties, amqp_decoder, ) @@ -17,9 +17,9 @@ async def test_amqp_message(stream: str, consumer: Consumer, producer: Producer) -> None: amqp_message = AMQPMessage( - properties=uamqp.message.MessageProperties(subject=b"test-subject"), - annotations={b"test": 42}, - body="test-body", + properties=Properties(subject=b"test-subject"), + message_annotations={b"test": 42}, + body=b"test-body", ) await producer.send_wait(stream, amqp_message) @@ -34,6 +34,6 @@ def callback(msg: AMQPMessage, message_context: MessageContext): await consumer.run() assert isinstance(incoming_amqp_message, AMQPMessage) - assert list(incoming_amqp_message.get_data()) == list(amqp_message.get_data()) + assert list(incoming_amqp_message.body) == list(amqp_message.body) assert incoming_amqp_message.properties.subject == amqp_message.properties.subject - assert incoming_amqp_message.annotations == amqp_message.annotations + assert incoming_amqp_message.message_annotations == amqp_message.message_annotations diff --git a/tests/test_consumer.py b/tests/test_consumer.py index e7e3793..d31fe3d 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -7,7 +7,6 @@ from functools import partial import pytest -import uamqp from rstream import ( AMQPMessage, @@ -18,6 +17,7 @@ OffsetType, OnClosedErrorInfo, Producer, + Properties, RouteType, SuperStreamConsumer, SuperStreamProducer, @@ -345,8 +345,8 @@ async def test_consume_superstream_with_sac_all_active( for i in range(10000): amqp_message = AMQPMessage( - body="a:{}".format(i), - properties=uamqp.message.MessageProperties(message_id=i), + body=bytes("a:{}".format(i), "utf-8"), + properties=Properties(message_id=str(i)), ) await super_stream_producer_for_sac.send(amqp_message) @@ -390,8 +390,8 @@ async def test_consume_superstream_with_sac_one_non_active( for i in range(10000): amqp_message = AMQPMessage( - body="a:{}".format(i), - properties=uamqp.message.MessageProperties(message_id=i), + body=bytes("a:{}".format(i), "utf-8"), + properties=Properties(message_id=str(i)), ) await super_stream_producer_for_sac.send(amqp_message) @@ -437,8 +437,8 @@ async def test_consume_superstream_with_callback_next( for i in range(10000): amqp_message = AMQPMessage( - body="a:{}".format(i), - properties=uamqp.message.MessageProperties(message_id=i), + body=bytes("a:{}".format(i), "utf-8"), + properties=Properties(message_id=str(i)), ) await super_stream_producer_for_sac.send(amqp_message) @@ -479,8 +479,8 @@ async def test_consume_superstream_with_callback_first( for i in range(10000): amqp_message = AMQPMessage( - body="a:{}".format(i), - properties=uamqp.message.MessageProperties(message_id=i), + body=bytes("a:{}".format(i), "utf-8"), + properties=Properties(message_id=str(i)), ) await super_stream_producer_for_sac.send(amqp_message) @@ -521,8 +521,8 @@ async def test_consume_superstream_with_callback_offset( for i in range(10000): amqp_message = AMQPMessage( - body="a:{}".format(i), - properties=uamqp.message.MessageProperties(message_id=i), + body=bytes("a:{}".format(i), "utf-8"), + properties=Properties(message_id=str(i)), ) await super_stream_producer_for_sac.send(amqp_message) diff --git a/tests/test_producer.py b/tests/test_producer.py index aa6c8d0..17caebd 100644 --- a/tests/test_producer.py +++ b/tests/test_producer.py @@ -426,7 +426,7 @@ async def test_publishing_sequence_superstream( async def publish_with_ids(*ids): for publishing_id in ids: amqp_message = AMQPMessage( - body="a:{}".format(publishing_id), + body=bytes("a:{}".format(publishing_id), "utf-8"), ) await super_stream_producer.send(amqp_message) @@ -449,9 +449,7 @@ async def test_publishing_sequence_superstream_key_routing( async def publish_with_ids(*ids): for publishing_id in ids: - amqp_message = AMQPMessage( - body="a:{}".format(publishing_id), - ) + amqp_message = AMQPMessage(body=bytes("a:{}".format(publishing_id), "utf-8")) # will send to super_stream with routing key of 'key1' await super_stream_key_routing_producer.send(amqp_message) @@ -486,9 +484,7 @@ async def test_publishing_sequence_superstream_with_callback( async def publish_with_ids(*ids): for publishing_id in ids: - amqp_message = AMQPMessage( - body="a:{}".format(publishing_id), - ) + amqp_message = AMQPMessage(body=bytes("a:{}".format(publishing_id), "utf-8")) await super_stream_producer.send( amqp_message, on_publish_confirm=partial( @@ -569,7 +565,7 @@ async def test_super_stream_producer_connection_broke(super_stream: str, consume count = 0 while True: amqp_message = AMQPMessage( - body="hello: {}".format(count), + body=bytes("hello: {}".format(count), "utf-8"), application_properties={"id": "{}".format(count)}, )