Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Module: RabbitMQ Output #2013

Merged
merged 22 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions bbot/modules/output/kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import json
from aiokafka import AIOKafkaProducer

from bbot.modules.output.base import BaseOutputModule


class Kafka(BaseOutputModule):
watched_events = ["*"]
meta = {
"description": "Output scan data to a Kafka topic",
"created_date": "2024-11-22",
"author": "@TheTechromancer",
}
options = {
"bootstrap_servers": "localhost:9092",
"topic": "bbot_events",
}
options_desc = {
"bootstrap_servers": "A comma-separated list of Kafka server addresses",
"topic": "The Kafka topic to publish events to",
}
deps_pip = ["aiokafka~=0.12.0"]

async def setup(self):
self.bootstrap_servers = self.config.get("bootstrap_servers", "localhost:9092")
self.topic = self.config.get("topic", "bbot_events")
self.producer = AIOKafkaProducer(bootstrap_servers=self.bootstrap_servers)

# Start the producer
await self.producer.start()
self.verbose("Kafka producer started successfully")
return True

async def handle_event(self, event):
event_json = event.json()
event_data = json.dumps(event_json).encode("utf-8")
await self.producer.send_and_wait(self.topic, event_data)

async def cleanup(self):
# Stop the producer
await self.producer.stop()
self.verbose("Kafka producer stopped successfully")
56 changes: 56 additions & 0 deletions bbot/modules/output/rabbitmq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import json
import aio_pika

from bbot.modules.output.base import BaseOutputModule


class RabbitMQ(BaseOutputModule):
watched_events = ["*"]
meta = {
"description": "Output scan data to a RabbitMQ queue",
"created_date": "2024-11-22",
"author": "@TheTechromancer",
}
options = {
"url": "amqp://guest:guest@localhost/",
"queue": "bbot_events",
}
options_desc = {
"url": "The RabbitMQ connection URL",
"queue": "The RabbitMQ queue to publish events to",
}
deps_pip = ["aio_pika~=9.5.0"]

async def setup(self):
self.rabbitmq_url = self.config.get("url", "amqp://guest:guest@localhost/")
self.queue_name = self.config.get("queue", "bbot_events")

# Connect to RabbitMQ
self.connection = await aio_pika.connect_robust(self.rabbitmq_url)
self.channel = await self.connection.channel()

# Declare the queue
self.queue = await self.channel.declare_queue(self.queue_name, durable=True)
self.verbose("RabbitMQ connection and queue setup successfully")
return True

async def handle_event(self, event):
event_json = event.json()
event_data = json.dumps(event_json).encode("utf-8")

# Publish the message to the queue
while 1:
try:
await self.channel.default_exchange.publish(
aio_pika.Message(body=event_data),
routing_key=self.queue_name,
)
break
except Exception as e:
self.error(f"Error publishing message to RabbitMQ: {e}, rerying...")
await self.helpers.sleep(1)

async def cleanup(self):
# Close the connection
await self.connection.close()
self.verbose("RabbitMQ connection closed successfully")
6 changes: 3 additions & 3 deletions bbot/scanner/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -865,15 +865,15 @@ async def _cleanup(self):
if not self._cleanedup:
self._cleanedup = True
self.status = "CLEANING_UP"
# clean up modules
for mod in self.modules.values():
await mod._cleanup()
# clean up dns engine
if self.helpers._dns is not None:
await self.helpers.dns.shutdown()
# clean up web engine
if self.helpers._web is not None:
await self.helpers.web.shutdown()
# clean up modules
for mod in self.modules.values():
await mod._cleanup()
with contextlib.suppress(Exception):
self.home.rmdir()
self.helpers.clean_old_scans()
Expand Down
2 changes: 1 addition & 1 deletion bbot/test/test_step_1/test_python_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def test_python_api_validation():
# normal module as output module
with pytest.raises(ValidationError) as error:
Scanner(output_modules=["robots"])
assert str(error.value) == 'Could not find output module "robots". Did you mean "web_report"?'
assert str(error.value) == 'Could not find output module "robots". Did you mean "rabbitmq"?'
# invalid preset type
with pytest.raises(ValidationError) as error:
Scanner(preset="asdf")
Expand Down
108 changes: 108 additions & 0 deletions bbot/test/test_step_2/module_tests/test_module_kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import json
import asyncio
from contextlib import suppress

from .base import ModuleTestBase


class TestKafka(ModuleTestBase):
config_overrides = {
"modules": {
"kafka": {
"bootstrap_servers": "localhost:9092",
"topic": "bbot_events",
}
}
}
skip_distro_tests = True

async def setup_before_prep(self, module_test):
# Start Zookeeper
await asyncio.create_subprocess_exec(
"docker", "run", "-d", "--rm", "--name", "bbot-test-zookeeper", "-p", "2181:2181", "zookeeper:3.9"
)

# Wait for Zookeeper to be ready
while True:
try:
# Attempt to connect to Zookeeper with a timeout
reader, writer = await asyncio.wait_for(asyncio.open_connection("localhost", 2181), timeout=0.5)
break # Exit the loop if the connection is successful
except Exception as e:
self.log.verbose(f"Waiting for Zookeeper to be ready: {e}")
await asyncio.sleep(0.5) # Wait a bit before retrying
finally:
with suppress(Exception):
writer.close()
await writer.wait_closed()

# Start Kafka using wurstmeister/kafka
await asyncio.create_subprocess_exec(
"docker",
"run",
"-d",
"--rm",
"--name",
"bbot-test-kafka",
"--link",
"bbot-test-zookeeper:zookeeper",
"-e",
"KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181",
"-e",
"KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092",
"-e",
"KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092",
"-e",
"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1",
"-p",
"9092:9092",
"wurstmeister/kafka",
)

from aiokafka import AIOKafkaConsumer

# Wait for Kafka to be ready
while True:
try:
self.consumer = AIOKafkaConsumer(
"bbot_events",
bootstrap_servers="localhost:9092",
group_id="test_group",
)
await self.consumer.start()
break # Exit the loop if the consumer starts successfully
except Exception as e:
self.log.verbose(f"Waiting for Kafka to be ready: {e}")
if hasattr(self, "consumer") and not self.consumer._closed:
await self.consumer.stop()
await asyncio.sleep(0.5) # Wait a bit before retrying

async def check(self, module_test, events):
try:
events_json = [e.json() for e in events]
events_json.sort(key=lambda x: x["timestamp"])

# Collect events from Kafka
kafka_events = []
async for msg in self.consumer:
event_data = json.loads(msg.value.decode("utf-8"))
kafka_events.append(event_data)
if len(kafka_events) >= len(events_json):
break

kafka_events.sort(key=lambda x: x["timestamp"])

# Verify the events match
assert events_json == kafka_events, "Events do not match"

finally:
# Clean up: Stop the Kafka consumer
if hasattr(self, "consumer") and not self.consumer._closed:
await self.consumer.stop()
# Stop Kafka and Zookeeper containers
await asyncio.create_subprocess_exec(
"docker", "stop", "bbot-test-kafka", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
await asyncio.create_subprocess_exec(
"docker", "stop", "bbot-test-zookeeper", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
69 changes: 69 additions & 0 deletions bbot/test/test_step_2/module_tests/test_module_rabbitmq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import json
import asyncio
from contextlib import suppress

from .base import ModuleTestBase


class TestRabbitMQ(ModuleTestBase):
config_overrides = {
"modules": {
"rabbitmq": {
"url": "amqp://guest:guest@localhost/",
"queue": "bbot_events",
}
}
}
skip_distro_tests = True

async def setup_before_prep(self, module_test):
import aio_pika

# Start RabbitMQ
await asyncio.create_subprocess_exec(
"docker", "run", "-d", "--rm", "--name", "bbot-test-rabbitmq", "-p", "5672:5672", "rabbitmq:3-management"
)

# Wait for RabbitMQ to be ready
while True:
try:
# Attempt to connect to RabbitMQ with a timeout
connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
break # Exit the loop if the connection is successful
except Exception as e:
with suppress(Exception):
await connection.close()
self.log.verbose(f"Waiting for RabbitMQ to be ready: {e}")
await asyncio.sleep(0.5) # Wait a bit before retrying

self.connection = connection
self.channel = await self.connection.channel()
self.queue = await self.channel.declare_queue("bbot_events", durable=True)

async def check(self, module_test, events):
try:
events_json = [e.json() for e in events]
events_json.sort(key=lambda x: x["timestamp"])

# Collect events from RabbitMQ
rabbitmq_events = []
async with self.queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process():
event_data = json.loads(message.body.decode("utf-8"))
rabbitmq_events.append(event_data)
if len(rabbitmq_events) >= len(events_json):
break

rabbitmq_events.sort(key=lambda x: x["timestamp"])

# Verify the events match
assert events_json == rabbitmq_events, "Events do not match"

finally:
# Clean up: Close the RabbitMQ connection
await self.connection.close()
# Stop RabbitMQ container
await asyncio.create_subprocess_exec(
"docker", "stop", "bbot-test-rabbitmq", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
Loading