diff --git a/src/connection.pyx b/src/connection.pyx index 0b166b21c..fcc343674 100644 --- a/src/connection.pyx +++ b/src/connection.pyx @@ -170,6 +170,21 @@ cdef class Connection(StructBase): if c_connection.connection_set_properties(self._c_value, value._c_value) != 0: self._value_error() + @property + def desired_capabilities(self): + cdef c_amqpvalue.AMQP_VALUE _value + if c_connection.connection_get_desired_capabilities(self._c_value, &_value) == 0: + if _value == NULL: + return None + return value_factory(_value) + else: + self._value_error() + + @desired_capabilities.setter + def desired_capabilities(self, AMQPValue value): + if c_connection.connection_set_desired_capabilities(self._c_value, value._c_value) != 0: + self._value_error() + @property def remote_max_frame_size(self): cdef stdint.uint32_t _value diff --git a/src/vendor/azure-uamqp-c/inc/azure_uamqp_c/connection.h b/src/vendor/azure-uamqp-c/inc/azure_uamqp_c/connection.h index 42018983f..9b8be11bd 100644 --- a/src/vendor/azure-uamqp-c/inc/azure_uamqp_c/connection.h +++ b/src/vendor/azure-uamqp-c/inc/azure_uamqp_c/connection.h @@ -90,6 +90,8 @@ extern "C" { MOCKABLE_FUNCTION(, int, connection_get_idle_timeout, CONNECTION_HANDLE, connection, milliseconds*, idle_timeout); MOCKABLE_FUNCTION(, int, connection_set_properties, CONNECTION_HANDLE, connection, fields, properties); MOCKABLE_FUNCTION(, int, connection_get_properties, CONNECTION_HANDLE, connection, fields*, properties); + MOCKABLE_FUNCTION(, int, connection_set_desired_capabilities, CONNECTION_HANDLE, connection, AMQP_VALUE, desired_capabilities); + MOCKABLE_FUNCTION(, int, connection_get_desired_capabilities, CONNECTION_HANDLE, connection, AMQP_VALUE*, desired_capabilities); MOCKABLE_FUNCTION(, int, connection_get_remote_max_frame_size, CONNECTION_HANDLE, connection, uint32_t*, remote_max_frame_size); MOCKABLE_FUNCTION(, int, connection_set_remote_idle_timeout_empty_frame_send_ratio, CONNECTION_HANDLE, connection, double, idle_timeout_empty_frame_send_ratio); MOCKABLE_FUNCTION(, uint64_t, connection_handle_deadlines, CONNECTION_HANDLE, connection); diff --git a/src/vendor/azure-uamqp-c/src/connection.c b/src/vendor/azure-uamqp-c/src/connection.c index 196d0ec3f..3f6d3f424 100644 --- a/src/vendor/azure-uamqp-c/src/connection.c +++ b/src/vendor/azure-uamqp-c/src/connection.c @@ -83,6 +83,7 @@ typedef struct CONNECTION_INSTANCE_TAG tickcounter_ms_t last_frame_received_time; tickcounter_ms_t last_frame_sent_time; fields properties; + AMQP_VALUE desired_capabilities; unsigned int is_underlying_io_open : 1; unsigned int idle_timeout_specified : 1; @@ -387,6 +388,19 @@ static int send_open_frame(CONNECTION_HANDLE connection) connection_set_state(connection, CONNECTION_STATE_END); result = __FAILURE__; } + else if ((connection->desired_capabilities != NULL) && + (open_set_desired_capabilities(open_performative, connection->desired_capabilities) != 0)) + { + LogError("Cannot set desired capabilities"); + + if(xio_close(connection->io, NULL, NULL) != 0) + { + LogError("xio_close failed"); + } + + connection_set_state(connection, CONNECTION_STATE_END); + result = __FAILURE__; + } else { AMQP_VALUE open_performative_value = amqpvalue_create_open(open_performative); @@ -1247,6 +1261,7 @@ CONNECTION_HANDLE connection_create2(XIO_HANDLE xio, const char* hostname, const connection->header_bytes_received = 0; connection->is_remote_frame_received = 0; connection->properties = NULL; + connection->desired_capabilities = NULL; connection->is_underlying_io_open = 0; connection->remote_max_frame_size = 512; @@ -1318,6 +1333,11 @@ void connection_destroy(CONNECTION_HANDLE connection) amqpvalue_destroy(connection->properties); } + if (connection->desired_capabilities != NULL) + { + amqpvalue_destroy(connection->desired_capabilities); + } + free(connection->host_name); free(connection->container_id); @@ -1731,6 +1751,99 @@ int connection_get_properties(CONNECTION_HANDLE connection, fields* properties) return result; } +int connection_set_desired_capabilities(CONNECTION_HANDLE connection, AMQP_VALUE desired_capabilities) +{ + int result; + + if (connection == NULL) + { + LogError("NULL connection"); + result = __FAILURE__; + } + else + { + if (connection->connection_state != CONNECTION_STATE_START) + { + LogError("Connection already open"); + result = __FAILURE__; + } + else + { + if (desired_capabilities == NULL) + { + if (connection->desired_capabilities != NULL) + { + fields_destroy(connection->desired_capabilities); + connection->desired_capabilities = NULL; + } + + result = 0; + } + else + { + AMQP_VALUE new_desired_capabilities; + + new_desired_capabilities = fields_clone(desired_capabilities); + if (new_desired_capabilities == NULL) + { + LogError("Cannot clone connection desired capabilities"); + result = __FAILURE__; + } + else + { + if (connection->desired_capabilities != NULL) + { + amqpvalue_destroy(connection->desired_capabilities); + } + + connection->desired_capabilities = new_desired_capabilities; + + result = 0; + } + } + } + } + + return result; +} + +int connection_get_desired_capabilities(CONNECTION_HANDLE connection, AMQP_VALUE* desired_capabilities) +{ + int result; + + if ((connection == NULL) || + (desired_capabilities == NULL)) + { + LogError("Bad arguments: connection = %p, desired capabilities = %p", + connection, desired_capabilities); + result = __FAILURE__; + } + else + { + if (connection->desired_capabilities == NULL) + { + *desired_capabilities = NULL; + + result = 0; + } + else + { + *desired_capabilities = amqpvalue_clone(connection->desired_capabilities); + if (*desired_capabilities == NULL) + { + LogError("Cannot clone desired capabilities"); + result = __FAILURE__; + } + else + { + result = 0; + } + } + } + + return result; +} + int connection_get_remote_max_frame_size(CONNECTION_HANDLE connection, uint32_t* remote_max_frame_size) { int result; diff --git a/src/vendor/inc/c_connection.pxd b/src/vendor/inc/c_connection.pxd index 7f03130f0..785fd3be3 100644 --- a/src/vendor/inc/c_connection.pxd +++ b/src/vendor/inc/c_connection.pxd @@ -56,6 +56,8 @@ cdef extern from "azure_uamqp_c/connection.h": int connection_get_idle_timeout(CONNECTION_HANDLE connection, c_amqp_definitions.milliseconds* idle_timeout) int connection_set_properties(CONNECTION_HANDLE connection, c_amqp_definitions.fields properties) int connection_get_properties(CONNECTION_HANDLE connection, c_amqp_definitions.fields* properties) + int connection_set_desired_capabilities(CONNECTION_HANDLE connection, c_amqpvalue.AMQP_VALUE desired_capabilities) + int connection_get_desired_capabilities(CONNECTION_HANDLE connection, c_amqpvalue.AMQP_VALUE* desired_capabilities) int connection_get_remote_max_frame_size(CONNECTION_HANDLE connection, stdint.uint32_t* remote_max_frame_size) int connection_set_remote_idle_timeout_empty_frame_send_ratio(CONNECTION_HANDLE connection, double idle_timeout_empty_frame_send_ratio) stdint.uint64_t connection_handle_deadlines(CONNECTION_HANDLE connection) diff --git a/uamqp/async_ops/client_async.py b/uamqp/async_ops/client_async.py index 69d4939fd..be508cece 100644 --- a/uamqp/async_ops/client_async.py +++ b/uamqp/async_ops/client_async.py @@ -65,6 +65,18 @@ class AMQPClientAsync(client.AMQPClient): :type idle_timeout: int :param properties: Connection properties. :type properties: dict + :param desired_capabilities: The extension capabilities desired from the peer + endpoint to be sent in the link ATTACH frame. + To create an desired_capabilities object, please do as follows: + - 1. Create an array of desired capability symbols: `capabilities_symbol_array = [types.AMQPSymbol(string)]` + - 2. Transform the array to AMQPValue object: `utils.data_factory(types.AMQPArray(capabilities_symbol_array))` + :type desired_capabilities: ~uamqp.c_uamqp.AMQPValue + :param connection_desired_capabilities: The extension capabilities desired from the + peer endpoint to be sent in the connection OPEN frame. + To create a desired_capabilities object, please do as follows: + - 1. Create an array of desired capability symbols: `capabilities_symbol_array = [types.AMQPSymbol(string)]` + - 2. Transform the array to AMQPValue object: `utils.data_factory(types.AMQPArray(capabilities_symbol_array))` + :type connection_desired_capabilities: ~uamqp.c_uamqp.AMQPValue :param remote_idle_timeout_empty_frame_send_ratio: Ratio of empty frames to idle time for Connections with no activity. Value must be between 0.0 and 1.0 inclusive. Default is 0.5. @@ -718,11 +730,6 @@ class ReceiveClientAsync(client.ReceiveClient, AMQPClientAsync): will assume successful receipt of the message and clear it from the queue. The default is `PeekLock`. :type receive_settle_mode: ~uamqp.constants.ReceiverSettleMode - :param desired_capabilities: The extension capabilities desired from the peer endpoint. - To create an desired_capabilities object, please do as follows: - - 1. Create an array of desired capability symbols: `capabilities_symbol_array = [types.AMQPSymbol(string)]` - - 2. Transform the array to AMQPValue object: `utils.data_factory(types.AMQPArray(capabilities_symbol_array))` - :type desired_capabilities: ~uamqp.c_uamqp.AMQPValue :param max_message_size: The maximum allowed message size negotiated for the Link. :type max_message_size: int :param link_properties: Metadata to be sent in the Link ATTACH frame. diff --git a/uamqp/async_ops/connection_async.py b/uamqp/async_ops/connection_async.py index 1fd379f8c..8bf99547a 100644 --- a/uamqp/async_ops/connection_async.py +++ b/uamqp/async_ops/connection_async.py @@ -46,6 +46,12 @@ class ConnectionAsync(connection.Connection): :type idle_timeout: int :param properties: Connection properties. :type properties: dict + :param desired_capabilities: The extension capabilities desired from the peer + endpoint to be sent in the connection OPEN frame. + To create a desired_capabilities object, please do as follows: + - 1. Create an array of desired capability symbols: `capabilities_symbol_array = [types.AMQPSymbol(string)]` + - 2. Transform the array to AMQPValue object: `utils.data_factory(types.AMQPArray(capabilities_symbol_array))` + :type desired_capabilities: ~uamqp.c_uamqp.AMQPValue :param remote_idle_timeout_empty_frame_send_ratio: Ratio of empty frames to idle time for Connections with no activity. Value must be between 0.0 and 1.0 inclusive. Default is 0.5. @@ -66,6 +72,7 @@ def __init__(self, hostname, sasl, channel_max=None, idle_timeout=None, properties=None, + desired_capabilities=None, remote_idle_timeout_empty_frame_send_ratio=None, error_policy=None, debug=False, @@ -79,6 +86,7 @@ def __init__(self, hostname, sasl, channel_max=channel_max, idle_timeout=idle_timeout, properties=properties, + desired_capabilities=desired_capabilities, remote_idle_timeout_empty_frame_send_ratio=remote_idle_timeout_empty_frame_send_ratio, error_policy=error_policy, debug=debug, diff --git a/uamqp/async_ops/receiver_async.py b/uamqp/async_ops/receiver_async.py index aeee79978..4f8940ffc 100644 --- a/uamqp/async_ops/receiver_async.py +++ b/uamqp/async_ops/receiver_async.py @@ -50,8 +50,9 @@ class MessageReceiverAsync(receiver.MessageReceiver): from the service that the message was successfully sent. If set to 'Settled', the client will not wait for confirmation and assume success. :type send_settle_mode: ~uamqp.constants.SenderSettleMode - :param desired_capabilities: The extension capabilities desired from the peer endpoint. - To create an desired_capabilities object, please do as follows: + :param desired_capabilities: The extension capabilities desired from the peer + endpoint to be sent in the link ATTACH frame. + To create a desired_capabilities object, please do as follows: - 1. Create an array of desired capability symbols: `capabilities_symbol_array = [types.AMQPSymbol(string)]` - 2. Transform the array to AMQPValue object: `utils.data_factory(types.AMQPArray(capabilities_symbol_array))` :type desired_capabilities: ~uamqp.c_uamqp.AMQPValue diff --git a/uamqp/client.py b/uamqp/client.py index e2cd1af0e..14747a022 100644 --- a/uamqp/client.py +++ b/uamqp/client.py @@ -54,6 +54,18 @@ class AMQPClient(object): :type idle_timeout: int :param properties: Connection properties. :type properties: dict + :param desired_capabilities: The extension capabilities desired from the peer + endpoint to be sent in the link ATTACH frame. + To create an desired_capabilities object, please do as follows: + - 1. Create an array of desired capability symbols: `capabilities_symbol_array = [types.AMQPSymbol(string)]` + - 2. Transform the array to AMQPValue object: `utils.data_factory(types.AMQPArray(capabilities_symbol_array))` + :type desired_capabilities: ~uamqp.c_uamqp.AMQPValue + :param connection_desired_capabilities: The extension capabilities desired from the peer + endpoint to be sent in the connection OPEN frame. + To create a desired_capabilities object, please do as follows: + - 1. Create an array of desired capability symbols: `capabilities_symbol_array = [types.AMQPSymbol(string)]` + - 2. Transform the array to AMQPValue object: `utils.data_factory(types.AMQPArray(capabilities_symbol_array))` + :type connection_desired_capabilities: ~uamqp.c_uamqp.AMQPValue :param remote_idle_timeout_empty_frame_send_ratio: Ratio of empty frames to idle time for Connections with no activity. Value must be between 0.0 and 1.0 inclusive. Default is 0.5. @@ -126,6 +138,7 @@ def __init__( self._properties = kwargs.pop('properties', None) self._remote_idle_timeout_empty_frame_send_ratio = kwargs.pop( 'remote_idle_timeout_empty_frame_send_ratio', None) + self._connection_desired_capabilities = kwargs.pop("connection_desired_capabilities", None) # Session settings self._outgoing_window = kwargs.pop('outgoing_window', None) or constants.MAX_FRAME_SIZE_BYTES @@ -255,7 +268,9 @@ def open(self, connection=None): remote_idle_timeout_empty_frame_send_ratio=self._remote_idle_timeout_empty_frame_send_ratio, error_policy=self._error_policy, debug=self._debug_trace, - encoding=self._encoding) + encoding=self._encoding, + desired_capabilities=self._connection_desired_capabilities + ) self._build_session() if self._keep_alive_interval: self._keep_alive_thread = threading.Thread(target=self._keep_alive) @@ -823,11 +838,6 @@ class ReceiveClient(AMQPClient): will assume successful receipt of the message and clear it from the queue. The default is `PeekLock`. :type receive_settle_mode: ~uamqp.constants.ReceiverSettleMode - :param desired_capabilities: The extension capabilities desired from the peer endpoint. - To create an desired_capabilities object, please do as follows: - - 1. Create an array of desired capability symbols: `capabilities_symbol_array = [types.AMQPSymbol(string)]` - - 2. Transform the array to AMQPValue object: `utils.data_factory(types.AMQPArray(capabilities_symbol_array))` - :type desired_capabilities: ~uamqp.c_uamqp.AMQPValue :param max_message_size: The maximum allowed message size negotiated for the Link. :type max_message_size: int :param link_properties: Metadata to be sent in the Link ATTACH frame. diff --git a/uamqp/connection.py b/uamqp/connection.py index a5703c5d1..903647289 100644 --- a/uamqp/connection.py +++ b/uamqp/connection.py @@ -48,6 +48,12 @@ class Connection(object): :type idle_timeout: int :param properties: Connection properties. :type properties: dict + :param desired_capabilities: The extension capabilities desired from the peer + endpoint to be sent in the connection OPEN frame. + To create a desired_capabilities object, please do as follows: + - 1. Create an array of desired capability symbols: `capabilities_symbol_array = [types.AMQPSymbol(string)]` + - 2. Transform the array to AMQPValue object: `utils.data_factory(types.AMQPArray(capabilities_symbol_array))` + :type desired_capabilities: ~uamqp.c_uamqp.AMQPValue :param remote_idle_timeout_empty_frame_send_ratio: Ratio of empty frames to idle time for Connections with no activity. Value must be between 0.0 and 1.0 inclusive. Default is 0.5. @@ -66,6 +72,7 @@ def __init__(self, hostname, sasl, channel_max=None, idle_timeout=None, properties=None, + desired_capabilities=None, remote_idle_timeout_empty_frame_send_ratio=None, error_policy=None, debug=False, @@ -102,6 +109,8 @@ def __init__(self, hostname, sasl, self.properties = properties if remote_idle_timeout_empty_frame_send_ratio: self._conn.remote_idle_timeout_empty_frame_send_ratio = remote_idle_timeout_empty_frame_send_ratio + if desired_capabilities: + self.desired_capabilities = desired_capabilities def __enter__(self): """Open the Connection in a context manager.""" @@ -310,6 +319,17 @@ def properties(self, value): raise TypeError("Connection properties must be a dictionary.") self._conn.properties = utils.data_factory(value, encoding=self._encoding) + @property + def desired_capabilities(self): + return self._conn.desired_capabilities + + @desired_capabilities.setter + def desired_capabilities(self, value): + if not isinstance(value, uamqp.c_uamqp.AMQPValue): + raise TypeError("Connection desired capabilities must be type of uamqp.c_uamqp.AMQPValue.\ + Please use uamqp.utils.data_factory method to encode desired capabilities first") + self._conn.desired_capabilities = value + @property def remote_max_frame_size(self): return self._conn.remote_max_frame_size diff --git a/uamqp/receiver.py b/uamqp/receiver.py index 27ae2ea74..5747f7616 100644 --- a/uamqp/receiver.py +++ b/uamqp/receiver.py @@ -52,8 +52,9 @@ class MessageReceiver(object): from the service that the message was successfully sent. If set to 'Settled', the client will not wait for confirmation and assume success. :type send_settle_mode: ~uamqp.constants.SenderSettleMode - :param desired_capabilities: The extension capabilities desired from the peer endpoint. - To create an desired_capabilities object, please do as follows: + :param desired_capabilities: The extension capabilities desired from the peer + endpoint to be sent in the link ATTACH frame. + To create a desired_capabilities object, please do as follows: - 1. Create an array of desired capability symbols: `capabilities_symbol_array = [types.AMQPSymbol(string)]` - 2. Transform the array to AMQPValue object: `utils.data_factory(types.AMQPArray(capabilities_symbol_array))` :type desired_capabilities: ~uamqp.c_uamqp.AMQPValue