Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Message Brokers #321

Merged
merged 3 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
rabbitMQ, SQS and Redis streams as message brokers
  • Loading branch information
phenobarbital committed Dec 2, 2024
commit 3b668e9e151748f464386cb4a34a1224331d7867
22 changes: 22 additions & 0 deletions examples/brokers/nav_rabbitmq_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from navigator import Application
from navigator.brokers.rabbitmq import RMQConsumer


async def rabbit_callback(*args, **kwargs):
# Handle your SQS callback here
print('Received Message:', args, kwargs)

app = Application(
port=5001
)

rmq = RMQConsumer(
callback=rabbit_callback
)
rmq.setup(app)

if __name__ == '__main__':
try:
app.run()
except KeyboardInterrupt:
print('EXIT FROM APP =========')
22 changes: 22 additions & 0 deletions examples/brokers/nav_redis_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from navigator import Application
from navigator.brokers.redis import RedisConsumer


async def redis_callback(*args, **kwargs):
# Handle your SQS callback here
print('Received Message:', args, kwargs)

app = Application(
port=5001
)

rmq = RedisConsumer(
callback=redis_callback
)
rmq.setup(app)

if __name__ == '__main__':
try:
app.run()
except KeyboardInterrupt:
print('EXIT FROM APP =========')
22 changes: 22 additions & 0 deletions examples/brokers/nav_sqs_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from navigator import Application
from navigator.brokers.sqs import SQSConsumer


async def sqs_callback(*args, **kwargs):
# Handle your SQS callback here
print('Received Message:', args, kwargs)

app = Application(
port=5001
)

sqs = SQSConsumer(
callback=sqs_callback
)
sqs.setup(app)

if __name__ == '__main__':
try:
app.run()
except KeyboardInterrupt:
print('EXIT FROM APP =========')
12 changes: 6 additions & 6 deletions examples/test_sqs_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,27 @@ async def main():
)
async with connection as sqs:
# Create an SQS Queue
queue_name = "MainEvent"
queue_name = "navigator"
print(f"Creating queue: {queue_name}")
queue = await sqs.create_queue(queue_name)
queue_url = queue.url
print(f"Queue URL: {queue_url}")
# # Publish a JSON Message
# await sqs.publish_message("MyTestQueue", {"key": "value"})
# await sqs.publish_message({"key": "value"}, "MyTestQueue")
# # Publish JSONPickle
# model = ExampleModel(name="John Doe", age=30)
# await sqs.publish_message("MyTestQueue", model)
# await sqs.publish_message(model, "MyTestQueue")
# # Dataclasses:
# mdl = Example(name="John Doe", age=30)
# await sqs.publish_message("MyTestQueue", mdl)
# await sqs.publish_message(mdl, "MyTestQueue")

# # Publish CloudPickle
# class CustomWrapper:
# def __init__(self, data):
# self.data = data

# wrapper = CustomWrapper(data={"nested_key": "nested_value"})
# await sqs.publish_message("MyTestQueue", wrapper)
# await sqs.publish_message(wrapper, "MyTestQueue")

form = {
"metadata": {
Expand All @@ -70,7 +70,7 @@ async def main():
}
}
# Publish plain text
await sqs.publish_message("MainEvent", form)
await sqs.publish_message(form, "navigator")

if __name__ == "__main__":
try:
Expand Down
4 changes: 2 additions & 2 deletions navigator/applications/base.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ cdef class BaseApplication:
### Application handler:
self.handler = None
self.description: str = description
self.host = APP_HOST
self.port = APP_PORT
self.host = kwargs.pop('host', APP_HOST)
self.port = kwargs.pop('port', APP_PORT)
self.path = None
self.title = title if title else APP_NAME
self.contact = contact
Expand Down
33 changes: 30 additions & 3 deletions navigator/brokers/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
from collections.abc import Awaitable, Callable
from abc import ABC, abstractmethod
import asyncio
from aiohttp import web
from navconfig.logging import logging
from navigator.applications.base import BaseApplication
from .pickle import DataSerializer


Expand All @@ -16,7 +18,8 @@ class BaseConnection(ABC):

def __init__(
self,
credentials: Union[str, dict],
*args,
credentials: Union[str, dict] = None,
timeout: Optional[int] = 5,
**kwargs
):
Expand All @@ -33,6 +36,7 @@ def __init__(
self.reconnect_delay = 1 # Initial delay in seconds
self._lock = asyncio.Lock()
self._serializer = DataSerializer()
super().__init__(*args, **kwargs)

def get_connection(self) -> Optional[Union[Callable, Awaitable]]:
if not self._connection:
Expand Down Expand Up @@ -73,9 +77,8 @@ async def ensure_connection(self) -> None:
@abstractmethod
async def publish_message(
self,
exchange_name: str,
routing_key: str,
body: Union[str, list, dict, Any],
queue_name: Optional[str] = None,
**kwargs
) -> None:
"""
Expand Down Expand Up @@ -105,3 +108,27 @@ async def process_message(
Process a message from the Broker Service.
"""
raise NotImplementedError

async def start(self, app: web.Application) -> None:
await self.connect()

async def stop(self, app: web.Application) -> None:
# close the RabbitMQ connection
await self.disconnect()

def setup(self, app: web.Application = None) -> None:
"""
Setup Broker Connection.
"""
if isinstance(app, BaseApplication):
self.app = app.get_app()
else:
self.app = app
if self.app is None:
raise ValueError(
'App is not defined.'
)
# Initialize the Producer instance.
self.app.on_startup.append(self.start)
self.app.on_shutdown.append(self.stop)
self.app[self._name_] = self
46 changes: 1 addition & 45 deletions navigator/brokers/consumer.py
Original file line number Diff line number Diff line change
@@ -1,67 +1,23 @@
from abc import ABC, abstractmethod
from typing import Awaitable, Callable, Union, Optional, Any
from aiohttp import web
from navconfig.logging import logging
from navigator.applications.base import BaseApplication
from .connection import BaseConnection


class BrokerConsumer(BaseConnection, ABC):
class BrokerConsumer(ABC):
"""
Broker Consumer Interface.
"""
_name_: str = "broker_consumer"

def __init__(
self,
credentials: Union[str, dict],
timeout: Optional[int] = 5,
callback: Optional[Union[Awaitable, Callable]] = None,
**kwargs
):
self._queue_name = kwargs.get('queue_name', 'navigator')
super(BrokerConsumer, self).__init__(credentials, timeout, **kwargs)
self.logger = logging.getLogger('Broker.Consumer')
self._callback_ = callback if callback else self.subscriber_callback

@abstractmethod
async def connect(self):
"""
Connect to Broker.
"""
pass

@abstractmethod
async def disconnect(self):
"""
Disconnect from Broker.
"""
pass

async def start(self, app: web.Application) -> None:
await self.connect()

async def stop(self, app: web.Application) -> None:
# close the RabbitMQ connection
await self.disconnect()

def setup(self, app: web.Application = None) -> None:
"""
Setup BrokerManager.
"""
if isinstance(app, BaseApplication):
self.app = app.get_app()
else:
self.app = app
if self.app is None:
raise ValueError(
'App is not defined.'
)
# Initialize the Producer instance.
self.app.on_startup.append(self.start)
self.app.on_shutdown.append(self.stop)
self.app[self._name_] = self

@abstractmethod
async def event_subscribe(
self,
Expand Down
39 changes: 19 additions & 20 deletions navigator/brokers/producer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from abc import ABC, abstractmethod
from abc import ABC
from typing import Awaitable, Callable, Union, Optional, Any
import asyncio
from functools import wraps
Expand Down Expand Up @@ -107,9 +107,9 @@ async def stop(self, app: web.Application) -> None:

async def queue_event(
self,
exchange: str,
routing_key: str,
body: str,
queue_name: str,
routing_key: Optional[str] = None,
**kwargs
) -> None:
"""
Expand All @@ -118,9 +118,9 @@ async def queue_event(
try:
self.event_queue.put_nowait(
{
'exchange': exchange,
'routing_key': routing_key,
'body': body,
'queue_name': queue_name,
'routing_key': routing_key,
**kwargs
}
)
Expand All @@ -133,19 +133,17 @@ async def queue_event(

async def publish_event(
self,
exchange: str,
routing_key: str,
body: str,
queue_name: str,
**kwargs
) -> None:
"""
Publish Event on a Message Queue Exchange.
"""
# Ensure the exchange exists before publishing
await self.publish_message(
exchange=exchange,
routing_key=routing_key,
body=body,
queue_name=queue_name,
**kwargs
)

Expand Down Expand Up @@ -196,8 +194,8 @@ async def event_publisher(
Uses as an REST API to send events to RabbitMQ.
"""
data = await request.json()
exc = data.get('exchange', 'navigator')
routing_key = data.get('routing_key')
qs = data.pop('queue_name', 'navigator')
routing_key = data.pop('routing_key', None)
if not routing_key:
return web.json_response(
{
Expand All @@ -206,7 +204,7 @@ async def event_publisher(
},
status=422
)
body = data.get('body')
body = data.pop('body')
if not body:
return web.json_response(
{
Expand All @@ -216,10 +214,10 @@ async def event_publisher(
status=422
)
try:
await self.queue_event(exc, routing_key, body)
await self.queue_event(body, qs, routing_key, **data)
return web.json_response({
'status': 'success',
'message': f'Event {exc}.{routing_key} Published Successfully.'
'message': f'Event {qs}.{routing_key} Published Successfully.'
})
except asyncio.QueueFull:
return web.json_response(
Expand All @@ -244,19 +242,20 @@ async def _event_broker(self, worker_id: int):
event = await self.event_queue.get()
try:
# data:
routing = event.get('routing_key')
exchange = event.get('exchange')
body = event.get('body')
max_retries = event.get('max_retries', 5)
routing = event.pop('routing_key')
queue_name = event.pop('queue_name')
body = event.pop('body')
max_retries = event.pop('max_retries', 5)
retry_count = 0
retry_delay = 1
while True:
try:
# Publish the event to RabbitMQ
await self.publish_message(
exchange_name=exchange,
body=body,
queue_name=queue_name,
routing_key=routing,
body=body
**event
)
self.logger.info(
f"Worker {worker_id} published event: {routing}"
Expand Down
2 changes: 2 additions & 0 deletions navigator/brokers/rabbitmq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@
Using RabbitMQ as Message Broker.
"""
from .connection import RabbitMQConnection
from .consumer import RMQConsumer
from .producer import RMQProducer
Loading