Skip to content

Commit

Permalink
feature: adding aioamqp instrumentation
Browse files Browse the repository at this point in the history
Signed-off-by: Cagri Yonca <[email protected]>
  • Loading branch information
CagriYonca committed Feb 13, 2025
1 parent 202f144 commit 12dc6c8
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 5 deletions.
5 changes: 3 additions & 2 deletions src/instana/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ def boot_agent() -> None:

# Import & initialize instrumentation
from instana.instrumentation import (
aioamqp, # noqa: F401
asyncio, # noqa: F401
boto3_inst, # noqa: F401
cassandra_inst, # noqa: F401
Expand Down Expand Up @@ -200,8 +201,8 @@ def boot_agent() -> None:
storage, # noqa: F401
)
from instana.instrumentation.tornado import (
client, # noqa: F401
server, # noqa: F401
client, # noqa: F401, F811
server, # noqa: F401, F811
)

# Hooks
Expand Down
80 changes: 80 additions & 0 deletions src/instana/instrumentation/aioamqp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# (c) Copyright IBM Corp. 2025
# (c) Copyright Instana Inc. 2019

from typing import Any, Callable, Dict, Tuple

import wrapt
from opentelemetry.trace.status import StatusCode

from instana.log import logger
from instana.util.traceutils import get_tracer_tuple, tracing_is_off

try:
import aioamqp

@wrapt.patch_function_wrapper("aioamqp.channel", "Channel.basic_publish")
async def basic_publish_with_instana(
wrapped: Callable[..., aioamqp.connect],
instance: object,
argv: Tuple[object, Tuple[object, ...]],
kwargs: Dict[str, Any],
) -> object:
if tracing_is_off():
return await wrapped(*argv, **kwargs)

tracer, parent_span, _ = get_tracer_tuple()
parent_context = parent_span.get_span_context() if parent_span else None
with tracer.start_as_current_span(
"aioamqp-publisher", span_context=parent_context
) as span:
try:
span.set_attribute("aioamqp.exchange", argv[0])
return await wrapped(*argv, **kwargs)
except Exception as exc:
span.record_exception(exc)
logger.debug(f"basic_publish_with_instana error: {exc}")

@wrapt.patch_function_wrapper("aioamqp.channel", "Channel.basic_consume")
def basic_consume_with_instana(
wrapped: Callable[..., aioamqp.connect],
instance: object,
argv: Tuple[object, Tuple[object, ...]],
kwargs: Dict[str, Any],
) -> object:
if tracing_is_off():
return wrapped(*argv, **kwargs)

callback = argv[0]
tracer, parent_span, _ = get_tracer_tuple()
parent_context = parent_span.get_span_context() if parent_span else None

@wrapt.decorator
async def callback_wrapper(
wrapped_callback: Callable[..., aioamqp.connect],
instance: Any,
args: Tuple,
kwargs: Dict,
) -> object:
with tracer.start_as_current_span(
"aioamqp-consumer", span_context=parent_context
) as span:
try:
span.set_status(StatusCode.OK)
span.set_attribute("aioamqp.callback", callback)
span.set_attribute("aioamqp.message", args[1])
span.set_attribute("aioamqp.exchange_name", args[2].exchange_name)
span.set_attribute("aioamqp.routing_key", args[2].routing_key)
return await wrapped_callback(*args, **kwargs)
except Exception as exc:
span.record_exception(exc)
logger.debug(f"basic_consume_with_instana error: {exc}")

wrapped_callback = callback_wrapper(callback)
argv = (wrapped_callback,) + argv[1:]

return wrapped(*argv, **kwargs)

logger.debug("Instrumenting aioamqp")

except ImportError:
pass
2 changes: 2 additions & 0 deletions src/instana/span/kind.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
)

ENTRY_SPANS = (
"aioamqp-consumer",
"aiohttp-server",
"aws.lambda.entry",
"celery-worker",
Expand All @@ -34,6 +35,7 @@
)

EXIT_SPANS = (
"aioamqp-publisher",
"aiohttp-client",
"boto3",
"cassandra",
Expand Down
128 changes: 128 additions & 0 deletions tests/frameworks/test_aioamqp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import asyncio
from typing import Any, Generator

import aioamqp
import pytest

from instana.singletons import tracer
from tests.helpers import testenv
from aioamqp.properties import Properties
from aioamqp.envelope import Envelope

testenv["rabbitmq_host"] = "127.0.0.1"
testenv["rabbitmq_port"] = 5672


class TestAioamqp:
@pytest.fixture(autouse=True)
def _resource(self) -> Generator[None, None, None]:
self.recorder = tracer.span_processor
self.recorder.clear_spans()

self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(None)
yield
self.loop.run_until_complete(self.delete_queue())
if self.loop.is_running():
self.loop.close()

async def delete_queue(self) -> None:
transport, protocol = await aioamqp.connect(
testenv["rabbitmq_host"],
testenv["rabbitmq_port"],
)
channel = await protocol.channel()
await channel.queue_delete("message_queue")
await asyncio.sleep(1)

async def publish_message(self) -> None:
transport, protocol = await aioamqp.connect(
testenv["rabbitmq_host"],
testenv["rabbitmq_port"],
)
channel = await protocol.channel()

await channel.queue_declare(queue_name="message_queue")

message = "Instana test message"
await channel.basic_publish(
message.encode(), exchange_name="", routing_key="message_queue"
)

await protocol.close()
transport.close()

async def consume_message(self) -> None:
async def callback(
channel: Any,
body: bytes,
envelope: Envelope,
properties: Properties,
) -> None:
with tracer.start_as_current_span("callback-span"):
await channel.basic_client_ack(delivery_tag=envelope.delivery_tag)

_, protocol = await aioamqp.connect(
testenv["rabbitmq_host"], testenv["rabbitmq_port"]
)
channel = await protocol.channel()
await channel.queue_declare(queue_name="message_queue")
await channel.basic_consume(callback, queue_name="message_queue", no_ack=False)

def test_basic_publish(self) -> None:
with tracer.start_as_current_span("test-span"):
self.loop.run_until_complete(self.publish_message())

spans = self.recorder.queued_spans()

assert len(spans) == 2
aioamqp_span = spans[0]
test_span = spans[1]

assert aioamqp_span.n == "aioamqp-publisher"
assert aioamqp_span.p == test_span.s

assert test_span.n == "sdk"
assert not test_span.p

def test_basic_consumer(self) -> None:
with tracer.start_as_current_span("test-span"):
self.loop.run_until_complete(self.publish_message())
self.loop.run_until_complete(self.consume_message())

spans = self.recorder.queued_spans()

assert len(spans) == 4

publisher_span = spans[0]
callback_span = spans[1]
consumer_span = spans[2]
test_span = spans[3]

assert publisher_span.n == "aioamqp-publisher"
assert publisher_span.p == test_span.s
assert (
publisher_span.data["custom"]["attributes"]["aioamqp.exchange"]
== "b'Instana test message'"
)

assert callback_span.n == "sdk"
assert callback_span.data["sdk"]["name"] == "callback-span"
assert callback_span.data["sdk"]["type"] == "intermediate"
assert callback_span.p == consumer_span.s

assert consumer_span.n == "aioamqp-consumer"
assert consumer_span.data["custom"]["attributes"]["aioamqp.callback"]
assert (
consumer_span.data["custom"]["attributes"]["aioamqp.message"]
== "b'Instana test message'"
)
assert (
consumer_span.data["custom"]["attributes"]["aioamqp.routing_key"]
== "message_queue"
)
assert not consumer_span.data["custom"]["attributes"]["exchange_name"]
assert consumer_span.p == test_span.s

assert test_span.n == "sdk"
assert test_span.data["sdk"]["name"] == "test-span"
9 changes: 6 additions & 3 deletions tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@
testenv["mongodb_user"] = os.environ.get("MONGO_USER", None)
testenv["mongodb_pw"] = os.environ.get("MONGO_PW", None)

"""
RabbitMQ Environment
"""
testenv["rabbitmq_host"] = os.environ.get("RABBITMQ_HOST", "127.0.0.1")
testenv["rabbitmq_port"] = os.environ.get("RABBITMQ_PORT", 5672)


def drop_log_spans_from_list(spans):
"""
Expand Down Expand Up @@ -153,6 +159,3 @@ def launch_traced_request(url):
response = requests.get(url)

return response



1 change: 1 addition & 0 deletions tests/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
aioamqp>=0.15.0
aiofiles>=0.5.0
aiohttp>=3.8.3
boto3>=1.17.74
Expand Down

0 comments on commit 12dc6c8

Please sign in to comment.