Skip to content

Commit 49d9f3b

Browse files
Refactor: simplify code and rename files for clarity.
1 parent caf41ae commit 49d9f3b

File tree

7 files changed

+261
-170
lines changed

7 files changed

+261
-170
lines changed

src/oracledb/aq.py

+3
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ def _verify_message(self, message: "MessageProperties") -> None:
6868
else:
6969
if not isinstance(message.payload, (str, bytes)):
7070
errors._raise_err(errors.ERR_PAYLOAD_CANNOT_BE_ENQUEUED)
71+
if self.connection.thin:
72+
if message.recipients:
73+
errors._raise_not_supported("specifying AQ message recipients")
7174

7275
@property
7376
def connection(self) -> "connection_module.Connection":

src/oracledb/impl/thin/constants.pxi

+1
Original file line numberDiff line numberDiff line change
@@ -551,6 +551,7 @@ cdef enum:
551551
TNS_JSON_MAX_LENGTH = 32 * 1024 * 1024
552552
TNS_VECTOR_MAX_LENGTH = 1 * 1024 * 1024
553553
TNS_AQ_MESSAGE_ID_LENGTH = 16
554+
TNS_AQ_MESSAGE_VERSION = 1
554555

555556
# base 64 encoding alphabet
556557
cdef bytes TNS_BASE64_ALPHABET = \
+223
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
#------------------------------------------------------------------------------
2+
# Copyright (c) 2025, Oracle and/or its affiliates.
3+
#
4+
# This software is dual-licensed to you under the Universal Permissive License
5+
# (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl and Apache License
6+
# 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose
7+
# either license.
8+
#
9+
# If you elect to accept the software under the Apache License, Version 2.0,
10+
# the following applies:
11+
#
12+
# Licensed under the Apache License, Version 2.0 (the "License");
13+
# you may not use this file except in compliance with the License.
14+
# You may obtain a copy of the License at
15+
#
16+
# https://www.apache.org/licenses/LICENSE-2.0
17+
#
18+
# Unless required by applicable law or agreed to in writing, software
19+
# distributed under the License is distributed on an "AS IS" BASIS,
20+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
21+
# See the License for the specific language governing permissions and
22+
# limitations under the License.
23+
#------------------------------------------------------------------------------
24+
25+
#------------------------------------------------------------------------------
26+
# aq_base.pyx
27+
#
28+
# Cython file defining the base class for messages that are sent to the
29+
# database and the responses that are received by the client for enqueing and
30+
# dequeuing AQ messages (embedded in thin_impl.pyx).
31+
#------------------------------------------------------------------------------
32+
33+
cdef class AqBaseMessage(Message):
34+
cdef:
35+
BaseThinQueueImpl queue_impl
36+
ThinDeqOptionsImpl deq_options_impl
37+
ThinEnqOptionsImpl enq_options_impl
38+
bint no_msg_found
39+
40+
cdef object _process_date(self, ReadBuffer buf):
41+
"""
42+
Processes a date found in the buffer.
43+
"""
44+
cdef:
45+
const char_type *ptr
46+
uint32_t num_bytes
47+
OracleData data
48+
ssize_t length
49+
buf.read_ub4(&num_bytes)
50+
if num_bytes > 0:
51+
buf.read_raw_bytes_and_length(&ptr, &length)
52+
decode_date(ptr, length, &data.buffer)
53+
return convert_date_to_python(&data.buffer)
54+
55+
cdef int _process_error_info(self, ReadBuffer buf) except -1:
56+
"""
57+
Process error information from the buffer. If the error that indicates
58+
that no messages were received is detected, the error is cleared and
59+
the flag set so that the dequeue can handle that case.
60+
"""
61+
Message._process_error_info(self, buf)
62+
if self.error_info.num == TNS_ERR_NO_MESSAGES_FOUND:
63+
self.error_info.num = 0
64+
self.error_occurred = False
65+
self.no_msg_found = True
66+
67+
cdef int _process_extensions(self, ReadBuffer buf,
68+
ThinMsgPropsImpl props_impl) except -1:
69+
"""
70+
Processes extensions to the message property object returned by the
71+
database.
72+
"""
73+
cdef:
74+
bytes text_value, binary_value, value
75+
uint32_t i, num_extensions
76+
uint16_t keyword
77+
buf.read_ub4(&num_extensions)
78+
if num_extensions > 0:
79+
buf.skip_ub1()
80+
for i in range(num_extensions):
81+
text_value = buf.read_bytes_with_length()
82+
binary_value = buf.read_bytes_with_length()
83+
value = text_value or binary_value
84+
buf.read_ub2(&keyword)
85+
if value is not None:
86+
if keyword == TNS_AQ_EXT_KEYWORD_AGENT_NAME:
87+
props_impl.sender_agent_name = value
88+
elif keyword == TNS_AQ_EXT_KEYWORD_AGENT_ADDRESS:
89+
props_impl.sender_agent_address = value
90+
elif keyword == TNS_AQ_EXT_KEYWORD_AGENT_PROTOCOL:
91+
props_impl.sender_agent_protocol = value
92+
elif keyword == TNS_AQ_EXT_KEYWORD_ORIGINAL_MSGID:
93+
props_impl.original_msg_id = value
94+
95+
cdef bytes _process_msg_id(self, ReadBuffer buf):
96+
"""
97+
Reads a message id from the buffer and returns it.
98+
"""
99+
cdef const char_type *ptr
100+
ptr = buf.read_raw_bytes(TNS_AQ_MESSAGE_ID_LENGTH)
101+
return ptr[:TNS_AQ_MESSAGE_ID_LENGTH]
102+
103+
cdef int _process_msg_props(self, ReadBuffer buf,
104+
ThinMsgPropsImpl props_impl) except -1:
105+
"""
106+
Processes a message property object returned by the database.
107+
"""
108+
cdef uint32_t temp32
109+
buf.read_sb4(&props_impl.priority)
110+
buf.read_sb4(&props_impl.delay)
111+
buf.read_sb4(&props_impl.expiration)
112+
props_impl.correlation = buf.read_str_with_length()
113+
buf.read_sb4(&props_impl.num_attempts)
114+
props_impl.exceptionq = buf.read_str_with_length()
115+
buf.read_sb4(&props_impl.state)
116+
props_impl.enq_time = self._process_date(buf)
117+
props_impl.enq_txn_id = buf.read_bytes_with_length()
118+
self._process_extensions(buf, props_impl)
119+
buf.read_ub4(&temp32) # user properties
120+
if temp32 > 0:
121+
errors._raise_err(errors.ERR_NOT_IMPLEMENTED)
122+
buf.skip_ub4() # csn
123+
buf.skip_ub4() # dsn
124+
buf.skip_ub4() # flags
125+
if buf._caps.ttc_field_version >= TNS_CCAP_FIELD_VERSION_21_1:
126+
buf.skip_ub4() # shard number
127+
128+
cdef object _process_payload(self, ReadBuffer buf):
129+
"""
130+
Processes the payload for an enqueued message returned by the database.
131+
"""
132+
cdef:
133+
ThinDbObjectImpl obj_impl
134+
uint32_t image_length
135+
bytes payload
136+
if self.queue_impl.payload_type is not None:
137+
obj_impl = buf.read_dbobject(self.queue_impl.payload_type)
138+
if obj_impl is None:
139+
obj_impl = self.queue_impl.payload_type.create_new_object()
140+
return PY_TYPE_DB_OBJECT._from_impl(obj_impl)
141+
else:
142+
buf.read_bytes_with_length() # TOID
143+
buf.read_bytes_with_length() # OID
144+
buf.read_bytes_with_length() # snapshot
145+
buf.skip_ub2() # version no
146+
buf.read_ub4(&image_length) # image length
147+
buf.skip_ub2() # flags
148+
if image_length > 0:
149+
payload = buf.read_bytes()[4:image_length]
150+
if self.queue_impl.is_json:
151+
return self.conn_impl.decode_oson(payload)
152+
return payload
153+
elif not self.queue_impl.is_json:
154+
return b''
155+
156+
cdef object _process_recipients(self, ReadBuffer buf):
157+
"""
158+
Process recipients for a message. Currently this is unsupported.
159+
"""
160+
cdef uint32_t temp32
161+
buf.read_ub4(&temp32)
162+
if temp32 > 0:
163+
errors._raise_err(errors.ERR_NOT_IMPLEMENTED)
164+
return []
165+
166+
cdef int _write_msg_props(self, WriteBuffer buf,
167+
ThinMsgPropsImpl props_impl) except -1:
168+
"""
169+
Write a message property object to the buffer.
170+
"""
171+
buf.write_ub4(props_impl.priority)
172+
buf.write_ub4(props_impl.delay)
173+
buf.write_sb4(props_impl.expiration)
174+
self._write_value_with_length(buf, props_impl.correlation)
175+
buf.write_ub4(0) # number of attempts
176+
self._write_value_with_length(buf, props_impl.exceptionq)
177+
buf.write_ub4(props_impl.state)
178+
buf.write_ub4(0) # enqueue time length
179+
self._write_value_with_length(buf, props_impl.enq_txn_id)
180+
buf.write_ub4(4) # number of extensions
181+
buf.write_uint8(0x0e) # unknown extra byte
182+
buf.write_extension_values(None, None, TNS_AQ_EXT_KEYWORD_AGENT_NAME)
183+
buf.write_extension_values(None, None,
184+
TNS_AQ_EXT_KEYWORD_AGENT_ADDRESS)
185+
buf.write_extension_values(None, b'\x00',
186+
TNS_AQ_EXT_KEYWORD_AGENT_PROTOCOL)
187+
buf.write_extension_values(None, None,
188+
TNS_AQ_EXT_KEYWORD_ORIGINAL_MSGID)
189+
buf.write_ub4(0) # user property
190+
buf.write_ub4(0) # cscn
191+
buf.write_ub4(0) # dscn
192+
buf.write_ub4(0) # flags
193+
if buf._caps.ttc_field_version >= TNS_CCAP_FIELD_VERSION_21_1:
194+
buf.write_ub4(0xffffffffl) # shard id
195+
196+
cdef int _write_payload(self, WriteBuffer buf,
197+
ThinMsgPropsImpl props_impl) except -1:
198+
"""
199+
Writes the payload of the message property object to the buffer.
200+
"""
201+
if self.queue_impl.is_json:
202+
buf.write_oson(props_impl.payload_obj,
203+
self.conn_impl._oson_max_fname_size, False)
204+
elif self.queue_impl.payload_type is not None:
205+
buf.write_dbobject(props_impl.payload_obj)
206+
else:
207+
buf.write_bytes(props_impl.payload_obj)
208+
209+
cdef int _write_value_with_length(self, WriteBuffer buf,
210+
object value) except -1:
211+
"""
212+
Write a string to the buffer, prefixed by a length.
213+
"""
214+
cdef bytes value_bytes
215+
if value is None:
216+
buf.write_ub4(0)
217+
else:
218+
if isinstance(value, str):
219+
value_bytes = value.encode()
220+
else:
221+
value_bytes = value
222+
buf.write_ub4(len(value_bytes))
223+
buf.write_bytes_with_length(value_bytes)

src/oracledb/impl/thin/messages/deq.pyx renamed to src/oracledb/impl/thin/messages/aq_deq.pyx

+9-94
Original file line numberDiff line numberDiff line change
@@ -23,125 +23,40 @@
2323
#------------------------------------------------------------------------------
2424

2525
#------------------------------------------------------------------------------
26-
# deq.pyx
26+
# aq_deq.pyx
2727
#
2828
# Cython file defining the messages that are sent to the database and the
2929
# responses that are received by the client for dequeuing an AQ message
3030
# (embedded in thin_impl.pyx).
3131
#------------------------------------------------------------------------------
3232

3333
@cython.final
34-
cdef class DeqMessage(Message):
34+
cdef class AqDeqMessage(AqBaseMessage):
3535
cdef:
36-
BaseThinQueueImpl queue_impl
37-
ThinDeqOptionsImpl deq_options_impl
3836
ThinMsgPropsImpl props_impl
39-
bint no_msg_found
4037

4138
cdef int _initialize_hook(self) except -1:
4239
"""
4340
Perform initialization
4441
"""
4542
self.function_code = TNS_FUNC_AQ_DEQ
4643

47-
cdef int _process_error_info(self, ReadBuffer buf) except -1:
48-
"""
49-
Process error information from the buffer. If the error that indicates
50-
that no messages were received is detected, the error is cleared and
51-
the flag set so that the dequeue can handle that case.
52-
"""
53-
Message._process_error_info(self, buf)
54-
if self.error_info.num == TNS_ERR_NO_MESSAGES_FOUND:
55-
self.error_info.num = 0
56-
self.error_occurred = False
57-
self.no_msg_found = True
58-
5944
cdef int _process_return_parameters(self, ReadBuffer buf) except -1:
6045
"""
6146
Process the return parameters of the AQ Dequeue request.
6247
"""
6348
cdef:
64-
uint32_t num_bytes, num_extensions, i
65-
bytes text_value, binary_value, value
66-
ssize_t temp_num_bytes
67-
const char_type *ptr
49+
uint32_t num_bytes
6850
uint16_t keyword
69-
OracleData data
7051
uint32_t imageLength
7152
ThinDbObjectImpl obj_impl
7253
ThinDbObjectTypeImpl type_impl
7354
buf.read_ub4(&num_bytes)
7455
if num_bytes > 0:
75-
buf.read_sb4(&self.props_impl.priority) # priority
76-
buf.read_sb4(&self.props_impl.delay) # delay
77-
buf.read_sb4(&self.props_impl.expiration) # expiration
78-
self.props_impl.correlation = buf.read_str_with_length()
79-
buf.read_sb4(&self.props_impl.num_attempts)
80-
self.props_impl.exceptionq = buf.read_str_with_length()
81-
buf.read_sb4(&self.props_impl.state)
82-
buf.read_ub4(&num_bytes) # enqueue time
83-
if num_bytes > 0:
84-
buf.read_raw_bytes_and_length(&ptr, &temp_num_bytes)
85-
decode_date(ptr, temp_num_bytes, &data.buffer)
86-
self.props_impl.enq_time = convert_date_to_python(&data.buffer)
87-
self.props_impl.enq_txn_id = buf.read_bytes_with_length()
88-
buf.read_ub4(&num_extensions) # number of extensions
89-
if num_extensions > 0:
90-
buf.skip_ub1()
91-
for i in range(num_extensions):
92-
text_value = buf.read_bytes_with_length()
93-
binary_value = buf.read_bytes_with_length()
94-
value = text_value or binary_value
95-
buf.read_ub2(&keyword) # extension keyword
96-
if value is not None:
97-
if keyword == TNS_AQ_EXT_KEYWORD_AGENT_NAME:
98-
self.props_impl.sender_agent_name = value
99-
elif keyword == TNS_AQ_EXT_KEYWORD_AGENT_ADDRESS:
100-
self.props_impl.sender_agent_address = value
101-
elif keyword == TNS_AQ_EXT_KEYWORD_AGENT_PROTOCOL:
102-
self.props_impl.sender_agent_protocol = value
103-
elif keyword == TNS_AQ_EXT_KEYWORD_ORIGINAL_MSGID:
104-
self.props_impl.original_msg_id = value
105-
buf.read_ub4(&num_bytes) # user properties
106-
if num_bytes > 0:
107-
errors._raise_err(errors.ERR_NOT_IMPLEMENTED)
108-
buf.skip_ub4() # csn
109-
buf.skip_ub4() # dsn
110-
buf.skip_ub4() # flags
111-
if buf._caps.ttc_field_version >= TNS_CCAP_FIELD_VERSION_21_1:
112-
buf.skip_ub4() # shard number
113-
buf.read_ub4(&num_bytes) # num recipients
114-
if num_bytes > 0:
115-
errors._raise_err(errors.ERR_NOT_IMPLEMENTED)
116-
if self.queue_impl.payload_type is not None:
117-
type_impl = self.queue_impl.payload_type
118-
obj_impl = buf.read_dbobject(type_impl)
119-
if obj_impl is None:
120-
obj_impl = type_impl.create_new_object()
121-
self.props_impl.payload = PY_TYPE_DB_OBJECT._from_impl(obj_impl)
122-
else:
123-
buf.read_ub4(&num_bytes) # TOID len
124-
if num_bytes > 0:
125-
buf.skip_raw_bytes(num_bytes)
126-
buf.read_ub4(&num_bytes) # OID len
127-
if num_bytes > 0:
128-
buf.skip_raw_bytes(num_bytes)
129-
buf.read_ub4(&num_bytes) # snapshot
130-
if num_bytes > 0:
131-
buf.skip_raw_bytes(num_bytes)
132-
buf.skip_ub2() # version no
133-
buf.read_ub4(&imageLength) # image len
134-
buf.skip_ub2() # flags
135-
if imageLength > 0:
136-
self.props_impl.payload = buf.read_bytes()[4:imageLength]
137-
if self.queue_impl.is_json:
138-
self.props_impl.payload = \
139-
self.conn_impl.decode_oson(self.props_impl.payload)
140-
else:
141-
if not self.queue_impl.is_json:
142-
self.props_impl.payload = b''
143-
ptr = buf._get_raw(TNS_AQ_MESSAGE_ID_LENGTH)
144-
self.props_impl.msgid = ptr[:TNS_AQ_MESSAGE_ID_LENGTH]
56+
self._process_msg_props(buf, self.props_impl)
57+
self.props_impl.recipients = self._process_recipients(buf)
58+
self.props_impl.payload = self._process_payload(buf)
59+
self.props_impl.msgid = self._process_msg_id(buf)
14560

14661
cdef int _write_message(self, WriteBuffer buf) except -1:
14762
"""
@@ -190,10 +105,10 @@ cdef class DeqMessage(Message):
190105
buf.write_ub4(0) # correlation id len
191106
buf.write_uint8(1) # toid of payload
192107
buf.write_ub4(16) # toid length
193-
buf.write_ub2(self.props_impl.version) # version of type
108+
buf.write_ub2(TNS_AQ_MESSAGE_VERSION)
194109
buf.write_uint8(1) # payload
195110
buf.write_uint8(1) # return msg id
196-
buf.write_ub4(16) # mesg id length
111+
buf.write_ub4(TNS_AQ_MESSAGE_ID_LENGTH)
197112
deq_flags = 0
198113
delivery_mode = self.deq_options_impl.delivery_mode
199114
if (delivery_mode == TNS_AQ_MSG_BUFFERED):

0 commit comments

Comments
 (0)