Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for connection desired capabilities #142

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/connection.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,21 @@ cdef class Connection(StructBase):
if c_connection.connection_set_properties(self._c_value, <c_amqp_definitions.fields>value._c_value) != 0:
self._value_error()

@property
def desired_capabilities(self):
cdef c_amqpvalue.AMQP_VALUE _value
if c_connection.connection_get_desired_capabilities(self._c_value, &_value) == 0:
if <void*>_value == NULL:
return None
return value_factory(_value)
else:
self._value_error()

@desired_capabilities.setter
def desired_capabilities(self, AMQPValue value):
if c_connection.connection_set_desired_capabilities(self._c_value, <c_amqpvalue.AMQP_VALUE>value._c_value) != 0:
self._value_error()

@property
def remote_max_frame_size(self):
cdef stdint.uint32_t _value
Expand Down
2 changes: 2 additions & 0 deletions src/vendor/azure-uamqp-c/inc/azure_uamqp_c/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ extern "C" {
MOCKABLE_FUNCTION(, int, connection_get_idle_timeout, CONNECTION_HANDLE, connection, milliseconds*, idle_timeout);
MOCKABLE_FUNCTION(, int, connection_set_properties, CONNECTION_HANDLE, connection, fields, properties);
MOCKABLE_FUNCTION(, int, connection_get_properties, CONNECTION_HANDLE, connection, fields*, properties);
MOCKABLE_FUNCTION(, int, connection_set_desired_capabilities, CONNECTION_HANDLE, connection, AMQP_VALUE, desired_capabilities);
MOCKABLE_FUNCTION(, int, connection_get_desired_capabilities, CONNECTION_HANDLE, connection, AMQP_VALUE*, desired_capabilities);
MOCKABLE_FUNCTION(, int, connection_get_remote_max_frame_size, CONNECTION_HANDLE, connection, uint32_t*, remote_max_frame_size);
MOCKABLE_FUNCTION(, int, connection_set_remote_idle_timeout_empty_frame_send_ratio, CONNECTION_HANDLE, connection, double, idle_timeout_empty_frame_send_ratio);
MOCKABLE_FUNCTION(, uint64_t, connection_handle_deadlines, CONNECTION_HANDLE, connection);
Expand Down
113 changes: 113 additions & 0 deletions src/vendor/azure-uamqp-c/src/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ typedef struct CONNECTION_INSTANCE_TAG
tickcounter_ms_t last_frame_received_time;
tickcounter_ms_t last_frame_sent_time;
fields properties;
AMQP_VALUE desired_capabilities;

unsigned int is_underlying_io_open : 1;
unsigned int idle_timeout_specified : 1;
Expand Down Expand Up @@ -387,6 +388,19 @@ static int send_open_frame(CONNECTION_HANDLE connection)
connection_set_state(connection, CONNECTION_STATE_END);
result = __FAILURE__;
}
else if ((connection->desired_capabilities != NULL) &&
(open_set_desired_capabilities(open_performative, connection->desired_capabilities) != 0))
{
LogError("Cannot set desired capabilities");

if(xio_close(connection->io, NULL, NULL) != 0)
{
LogError("xio_close failed");
}

connection_set_state(connection, CONNECTION_STATE_END);
result = __FAILURE__;
}
else
{
AMQP_VALUE open_performative_value = amqpvalue_create_open(open_performative);
Expand Down Expand Up @@ -1247,6 +1261,7 @@ CONNECTION_HANDLE connection_create2(XIO_HANDLE xio, const char* hostname, const
connection->header_bytes_received = 0;
connection->is_remote_frame_received = 0;
connection->properties = NULL;
connection->desired_capabilities = NULL;

connection->is_underlying_io_open = 0;
connection->remote_max_frame_size = 512;
Expand Down Expand Up @@ -1318,6 +1333,11 @@ void connection_destroy(CONNECTION_HANDLE connection)
amqpvalue_destroy(connection->properties);
}

if (connection->desired_capabilities != NULL)
{
amqpvalue_destroy(connection->desired_capabilities);
}

free(connection->host_name);
free(connection->container_id);

Expand Down Expand Up @@ -1731,6 +1751,99 @@ int connection_get_properties(CONNECTION_HANDLE connection, fields* properties)
return result;
}

int connection_set_desired_capabilities(CONNECTION_HANDLE connection, AMQP_VALUE desired_capabilities)
{
int result;

if (connection == NULL)
{
LogError("NULL connection");
result = __FAILURE__;
}
else
{
if (connection->connection_state != CONNECTION_STATE_START)
{
LogError("Connection already open");
result = __FAILURE__;
}
else
{
if (desired_capabilities == NULL)
{
if (connection->desired_capabilities != NULL)
{
fields_destroy(connection->desired_capabilities);
connection->desired_capabilities = NULL;
}

result = 0;
}
else
{
AMQP_VALUE new_desired_capabilities;

new_desired_capabilities = fields_clone(desired_capabilities);
if (new_desired_capabilities == NULL)
{
LogError("Cannot clone connection desired capabilities");
result = __FAILURE__;
}
else
{
if (connection->desired_capabilities != NULL)
{
amqpvalue_destroy(connection->desired_capabilities);
}

connection->desired_capabilities = new_desired_capabilities;

result = 0;
}
}
}
}

return result;
}

int connection_get_desired_capabilities(CONNECTION_HANDLE connection, AMQP_VALUE* desired_capabilities)
{
int result;

if ((connection == NULL) ||
(desired_capabilities == NULL))
{
LogError("Bad arguments: connection = %p, desired capabilities = %p",
connection, desired_capabilities);
result = __FAILURE__;
}
else
{
if (connection->desired_capabilities == NULL)
{
*desired_capabilities = NULL;

result = 0;
}
else
{
*desired_capabilities = amqpvalue_clone(connection->desired_capabilities);
if (*desired_capabilities == NULL)
{
LogError("Cannot clone desired capabilities");
result = __FAILURE__;
}
else
{
result = 0;
}
}
}

return result;
}

int connection_get_remote_max_frame_size(CONNECTION_HANDLE connection, uint32_t* remote_max_frame_size)
{
int result;
Expand Down
2 changes: 2 additions & 0 deletions src/vendor/inc/c_connection.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ cdef extern from "azure_uamqp_c/connection.h":
int connection_get_idle_timeout(CONNECTION_HANDLE connection, c_amqp_definitions.milliseconds* idle_timeout)
int connection_set_properties(CONNECTION_HANDLE connection, c_amqp_definitions.fields properties)
int connection_get_properties(CONNECTION_HANDLE connection, c_amqp_definitions.fields* properties)
int connection_set_desired_capabilities(CONNECTION_HANDLE connection, c_amqpvalue.AMQP_VALUE desired_capabilities)
int connection_get_desired_capabilities(CONNECTION_HANDLE connection, c_amqpvalue.AMQP_VALUE* desired_capabilities)
int connection_get_remote_max_frame_size(CONNECTION_HANDLE connection, stdint.uint32_t* remote_max_frame_size)
int connection_set_remote_idle_timeout_empty_frame_send_ratio(CONNECTION_HANDLE connection, double idle_timeout_empty_frame_send_ratio)
stdint.uint64_t connection_handle_deadlines(CONNECTION_HANDLE connection)
Expand Down
17 changes: 12 additions & 5 deletions uamqp/async_ops/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,18 @@ class AMQPClientAsync(client.AMQPClient):
:type idle_timeout: int
:param properties: Connection properties.
:type properties: dict
:param desired_capabilities: The extension capabilities desired from the peer
endpoint to be sent in the link ATTACH frame.
To create an desired_capabilities object, please do as follows:
- 1. Create an array of desired capability symbols: `capabilities_symbol_array = [types.AMQPSymbol(string)]`
- 2. Transform the array to AMQPValue object: `utils.data_factory(types.AMQPArray(capabilities_symbol_array))`
:type desired_capabilities: ~uamqp.c_uamqp.AMQPValue
:param connection_desired_capabilities: The extension capabilities desired from the
peer endpoint to be sent in the connection OPEN frame.
To create a desired_capabilities object, please do as follows:
- 1. Create an array of desired capability symbols: `capabilities_symbol_array = [types.AMQPSymbol(string)]`
- 2. Transform the array to AMQPValue object: `utils.data_factory(types.AMQPArray(capabilities_symbol_array))`
:type connection_desired_capabilities: ~uamqp.c_uamqp.AMQPValue
:param remote_idle_timeout_empty_frame_send_ratio: Ratio of empty frames to
idle time for Connections with no activity. Value must be between
0.0 and 1.0 inclusive. Default is 0.5.
Expand Down Expand Up @@ -718,11 +730,6 @@ class ReceiveClientAsync(client.ReceiveClient, AMQPClientAsync):
will assume successful receipt of the message and clear it from the queue. The
default is `PeekLock`.
:type receive_settle_mode: ~uamqp.constants.ReceiverSettleMode
:param desired_capabilities: The extension capabilities desired from the peer endpoint.
To create an desired_capabilities object, please do as follows:
- 1. Create an array of desired capability symbols: `capabilities_symbol_array = [types.AMQPSymbol(string)]`
- 2. Transform the array to AMQPValue object: `utils.data_factory(types.AMQPArray(capabilities_symbol_array))`
:type desired_capabilities: ~uamqp.c_uamqp.AMQPValue
:param max_message_size: The maximum allowed message size negotiated for the Link.
:type max_message_size: int
:param link_properties: Metadata to be sent in the Link ATTACH frame.
Expand Down
8 changes: 8 additions & 0 deletions uamqp/async_ops/connection_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ class ConnectionAsync(connection.Connection):
:type idle_timeout: int
:param properties: Connection properties.
:type properties: dict
:param desired_capabilities: The extension capabilities desired from the peer
endpoint to be sent in the connection OPEN frame.
To create a desired_capabilities object, please do as follows:
- 1. Create an array of desired capability symbols: `capabilities_symbol_array = [types.AMQPSymbol(string)]`
- 2. Transform the array to AMQPValue object: `utils.data_factory(types.AMQPArray(capabilities_symbol_array))`
:type desired_capabilities: ~uamqp.c_uamqp.AMQPValue
:param remote_idle_timeout_empty_frame_send_ratio: Ratio of empty frames to
idle time for Connections with no activity. Value must be between
0.0 and 1.0 inclusive. Default is 0.5.
Expand All @@ -66,6 +72,7 @@ def __init__(self, hostname, sasl,
channel_max=None,
idle_timeout=None,
properties=None,
desired_capabilities=None,
remote_idle_timeout_empty_frame_send_ratio=None,
error_policy=None,
debug=False,
Expand All @@ -79,6 +86,7 @@ def __init__(self, hostname, sasl,
channel_max=channel_max,
idle_timeout=idle_timeout,
properties=properties,
desired_capabilities=desired_capabilities,
remote_idle_timeout_empty_frame_send_ratio=remote_idle_timeout_empty_frame_send_ratio,
error_policy=error_policy,
debug=debug,
Expand Down
5 changes: 3 additions & 2 deletions uamqp/async_ops/receiver_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ class MessageReceiverAsync(receiver.MessageReceiver):
from the service that the message was successfully sent. If set to 'Settled',
the client will not wait for confirmation and assume success.
:type send_settle_mode: ~uamqp.constants.SenderSettleMode
:param desired_capabilities: The extension capabilities desired from the peer endpoint.
To create an desired_capabilities object, please do as follows:
:param desired_capabilities: The extension capabilities desired from the peer
endpoint to be sent in the link ATTACH frame.
To create a desired_capabilities object, please do as follows:
- 1. Create an array of desired capability symbols: `capabilities_symbol_array = [types.AMQPSymbol(string)]`
- 2. Transform the array to AMQPValue object: `utils.data_factory(types.AMQPArray(capabilities_symbol_array))`
:type desired_capabilities: ~uamqp.c_uamqp.AMQPValue
Expand Down
22 changes: 16 additions & 6 deletions uamqp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,18 @@ class AMQPClient(object):
:type idle_timeout: int
:param properties: Connection properties.
:type properties: dict
:param desired_capabilities: The extension capabilities desired from the peer
endpoint to be sent in the link ATTACH frame.
To create an desired_capabilities object, please do as follows:
- 1. Create an array of desired capability symbols: `capabilities_symbol_array = [types.AMQPSymbol(string)]`
- 2. Transform the array to AMQPValue object: `utils.data_factory(types.AMQPArray(capabilities_symbol_array))`
:type desired_capabilities: ~uamqp.c_uamqp.AMQPValue
:param connection_desired_capabilities: The extension capabilities desired from the peer
endpoint to be sent in the connection OPEN frame.
To create a desired_capabilities object, please do as follows:
- 1. Create an array of desired capability symbols: `capabilities_symbol_array = [types.AMQPSymbol(string)]`
- 2. Transform the array to AMQPValue object: `utils.data_factory(types.AMQPArray(capabilities_symbol_array))`
:type connection_desired_capabilities: ~uamqp.c_uamqp.AMQPValue
:param remote_idle_timeout_empty_frame_send_ratio: Ratio of empty frames to
idle time for Connections with no activity. Value must be between
0.0 and 1.0 inclusive. Default is 0.5.
Expand Down Expand Up @@ -126,6 +138,7 @@ def __init__(
self._properties = kwargs.pop('properties', None)
self._remote_idle_timeout_empty_frame_send_ratio = kwargs.pop(
'remote_idle_timeout_empty_frame_send_ratio', None)
self._connection_desired_capabilities = kwargs.pop("connection_desired_capabilities", None)

# Session settings
self._outgoing_window = kwargs.pop('outgoing_window', None) or constants.MAX_FRAME_SIZE_BYTES
Expand Down Expand Up @@ -255,7 +268,9 @@ def open(self, connection=None):
remote_idle_timeout_empty_frame_send_ratio=self._remote_idle_timeout_empty_frame_send_ratio,
error_policy=self._error_policy,
debug=self._debug_trace,
encoding=self._encoding)
encoding=self._encoding,
desired_capabilities=self._connection_desired_capabilities
)
self._build_session()
if self._keep_alive_interval:
self._keep_alive_thread = threading.Thread(target=self._keep_alive)
Expand Down Expand Up @@ -823,11 +838,6 @@ class ReceiveClient(AMQPClient):
will assume successful receipt of the message and clear it from the queue. The
default is `PeekLock`.
:type receive_settle_mode: ~uamqp.constants.ReceiverSettleMode
:param desired_capabilities: The extension capabilities desired from the peer endpoint.
To create an desired_capabilities object, please do as follows:
- 1. Create an array of desired capability symbols: `capabilities_symbol_array = [types.AMQPSymbol(string)]`
- 2. Transform the array to AMQPValue object: `utils.data_factory(types.AMQPArray(capabilities_symbol_array))`
:type desired_capabilities: ~uamqp.c_uamqp.AMQPValue
:param max_message_size: The maximum allowed message size negotiated for the Link.
:type max_message_size: int
:param link_properties: Metadata to be sent in the Link ATTACH frame.
Expand Down
20 changes: 20 additions & 0 deletions uamqp/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ class Connection(object):
:type idle_timeout: int
:param properties: Connection properties.
:type properties: dict
:param desired_capabilities: The extension capabilities desired from the peer
endpoint to be sent in the connection OPEN frame.
To create a desired_capabilities object, please do as follows:
- 1. Create an array of desired capability symbols: `capabilities_symbol_array = [types.AMQPSymbol(string)]`
- 2. Transform the array to AMQPValue object: `utils.data_factory(types.AMQPArray(capabilities_symbol_array))`
:type desired_capabilities: ~uamqp.c_uamqp.AMQPValue
:param remote_idle_timeout_empty_frame_send_ratio: Ratio of empty frames to
idle time for Connections with no activity. Value must be between
0.0 and 1.0 inclusive. Default is 0.5.
Expand All @@ -66,6 +72,7 @@ def __init__(self, hostname, sasl,
channel_max=None,
idle_timeout=None,
properties=None,
desired_capabilities=None,
remote_idle_timeout_empty_frame_send_ratio=None,
error_policy=None,
debug=False,
Expand Down Expand Up @@ -102,6 +109,8 @@ def __init__(self, hostname, sasl,
self.properties = properties
if remote_idle_timeout_empty_frame_send_ratio:
self._conn.remote_idle_timeout_empty_frame_send_ratio = remote_idle_timeout_empty_frame_send_ratio
if desired_capabilities:
self.desired_capabilities = desired_capabilities

def __enter__(self):
"""Open the Connection in a context manager."""
Expand Down Expand Up @@ -310,6 +319,17 @@ def properties(self, value):
raise TypeError("Connection properties must be a dictionary.")
self._conn.properties = utils.data_factory(value, encoding=self._encoding)

@property
def desired_capabilities(self):
return self._conn.desired_capabilities

@desired_capabilities.setter
def desired_capabilities(self, value):
if not isinstance(value, uamqp.c_uamqp.AMQPValue):
raise TypeError("Connection desired capabilities must be type of uamqp.c_uamqp.AMQPValue.\
Please use uamqp.utils.data_factory method to encode desired capabilities first")
self._conn.desired_capabilities = value

@property
def remote_max_frame_size(self):
return self._conn.remote_max_frame_size
5 changes: 3 additions & 2 deletions uamqp/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ class MessageReceiver(object):
from the service that the message was successfully sent. If set to 'Settled',
the client will not wait for confirmation and assume success.
:type send_settle_mode: ~uamqp.constants.SenderSettleMode
:param desired_capabilities: The extension capabilities desired from the peer endpoint.
To create an desired_capabilities object, please do as follows:
:param desired_capabilities: The extension capabilities desired from the peer
endpoint to be sent in the link ATTACH frame.
To create a desired_capabilities object, please do as follows:
- 1. Create an array of desired capability symbols: `capabilities_symbol_array = [types.AMQPSymbol(string)]`
- 2. Transform the array to AMQPValue object: `utils.data_factory(types.AMQPArray(capabilities_symbol_array))`
:type desired_capabilities: ~uamqp.c_uamqp.AMQPValue
Expand Down