Skip to content

Commit

Permalink
Merge pull request #2017 from blacklanternsecurity/nats
Browse files Browse the repository at this point in the history
New Module: NATS Output
  • Loading branch information
TheTechromancer authored Nov 25, 2024
2 parents d79f6f3 + 3755647 commit 3d4cfd9
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 64 deletions.
53 changes: 53 additions & 0 deletions bbot/modules/output/nats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import json
import nats
from bbot.modules.output.base import BaseOutputModule


class NATS(BaseOutputModule):
watched_events = ["*"]
meta = {
"description": "Output scan data to a NATS subject",
"created_date": "2024-11-22",
"author": "@TheTechromancer",
}
options = {
"servers": [],
"subject": "bbot_events",
}
options_desc = {
"servers": "A list of NATS server addresses",
"subject": "The NATS subject to publish events to",
}
deps_pip = ["nats-py"]

async def setup(self):
self.servers = list(self.config.get("servers", []))
if not self.servers:
return False, "NATS servers are required"
self.subject = self.config.get("subject", "bbot_events")

# Connect to the NATS server
try:
self.nc = await nats.connect(self.servers)
except Exception as e:
import traceback

return False, f"Error connecting to NATS: {e}\n{traceback.format_exc()}"
self.verbose("NATS client connected successfully")
return True

async def handle_event(self, event):
event_json = event.json()
event_data = json.dumps(event_json).encode("utf-8")
while 1:
try:
await self.nc.publish(self.subject, event_data)
break
except Exception as e:
self.warning(f"Error sending event to NATS: {e}, retrying...")
await self.helpers.sleep(1)

async def cleanup(self):
# Close the NATS connection
await self.nc.close()
self.verbose("NATS client disconnected successfully")
2 changes: 1 addition & 1 deletion bbot/test/test_step_1/test_python_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def test_python_api_validation():
# invalid output module
with pytest.raises(ValidationError) as error:
Scanner(output_modules=["asdf"])
assert str(error.value) == 'Could not find output module "asdf". Did you mean "teams"?'
assert str(error.value) == 'Could not find output module "asdf". Did you mean "nats"?'
# invalid excluded module
with pytest.raises(ValidationError) as error:
Scanner(exclude_modules=["asdf"])
Expand Down
16 changes: 16 additions & 0 deletions bbot/test/test_step_2/module_tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,19 @@ async def setup_before_prep(self, module_test):

async def setup_after_prep(self, module_test):
pass

async def wait_for_port_open(self, port):
while not await self.is_port_open("localhost", port):
self.log.verbose(f"Waiting for port {port} to be open...")
await asyncio.sleep(0.5)
# allow an extra second for things to settle
await asyncio.sleep(1)

async def is_port_open(self, host, port):
try:
reader, writer = await asyncio.open_connection(host, port)
writer.close()
await writer.wait_closed()
return True
except (ConnectionRefusedError, OSError):
return False
38 changes: 10 additions & 28 deletions bbot/test/test_step_2/module_tests/test_module_kafka.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import json
import asyncio
from contextlib import suppress

from .base import ModuleTestBase

Expand All @@ -23,18 +22,7 @@ async def setup_before_prep(self, module_test):
)

# Wait for Zookeeper to be ready
while True:
try:
# Attempt to connect to Zookeeper with a timeout
reader, writer = await asyncio.wait_for(asyncio.open_connection("localhost", 2181), timeout=0.5)
break # Exit the loop if the connection is successful
except Exception as e:
self.log.verbose(f"Waiting for Zookeeper to be ready: {e}")
await asyncio.sleep(0.5) # Wait a bit before retrying
finally:
with suppress(Exception):
writer.close()
await writer.wait_closed()
await self.wait_for_port_open(2181)

# Start Kafka using wurstmeister/kafka
await asyncio.create_subprocess_exec(
Expand All @@ -59,23 +47,17 @@ async def setup_before_prep(self, module_test):
"wurstmeister/kafka",
)

# Wait for Kafka to be ready
await self.wait_for_port_open(9092)

from aiokafka import AIOKafkaConsumer

# Wait for Kafka to be ready
while True:
try:
self.consumer = AIOKafkaConsumer(
"bbot_events",
bootstrap_servers="localhost:9092",
group_id="test_group",
)
await self.consumer.start()
break # Exit the loop if the consumer starts successfully
except Exception as e:
self.log.verbose(f"Waiting for Kafka to be ready: {e}")
if hasattr(self, "consumer") and not self.consumer._closed:
await self.consumer.stop()
await asyncio.sleep(0.5) # Wait a bit before retrying
self.consumer = AIOKafkaConsumer(
"bbot_events",
bootstrap_servers="localhost:9092",
group_id="test_group",
)
await self.consumer.start()

async def check(self, module_test, events):
try:
Expand Down
15 changes: 1 addition & 14 deletions bbot/test/test_step_2/module_tests/test_module_mysql.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
import time

from .base import ModuleTestBase

Expand Down Expand Up @@ -28,20 +27,8 @@ async def setup_before_prep(self, module_test):
)
stdout, stderr = await process.communicate()

import aiomysql

# wait for the container to start
start_time = time.time()
while True:
try:
conn = await aiomysql.connect(user="root", password="bbotislife", db="bbot", host="localhost")
conn.close()
break
except Exception as e:
if time.time() - start_time > 60: # timeout after 60 seconds
self.log.error("MySQL server did not start in time.")
raise e
await asyncio.sleep(1)
await self.wait_for_port_open(3306)

if process.returncode != 0:
self.log.error(f"Failed to start MySQL server: {stderr.decode()}")
Expand Down
65 changes: 65 additions & 0 deletions bbot/test/test_step_2/module_tests/test_module_nats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import json
import asyncio
from contextlib import suppress

from .base import ModuleTestBase


class TestNats(ModuleTestBase):
config_overrides = {
"modules": {
"nats": {
"servers": ["nats://localhost:4222"],
"subject": "bbot_events",
}
}
}
skip_distro_tests = True

async def setup_before_prep(self, module_test):
# Start NATS server
await asyncio.create_subprocess_exec(
"docker", "run", "-d", "--rm", "--name", "bbot-test-nats", "-p", "4222:4222", "nats:latest"
)

# Wait for NATS to be ready by checking the port
await self.wait_for_port_open(4222)

# Connect to NATS
import nats

try:
self.nc = await nats.connect(["nats://localhost:4222"])
except Exception as e:
self.log.error(f"Error connecting to NATS: {e}")
raise

# Collect events from NATS
self.nats_events = []

async def message_handler(msg):
event_data = json.loads(msg.data.decode("utf-8"))
self.nats_events.append(event_data)

await self.nc.subscribe("bbot_events", cb=message_handler)

async def check(self, module_test, events):
try:
events_json = [e.json() for e in events]
events_json.sort(key=lambda x: x["timestamp"])

self.nats_events.sort(key=lambda x: x["timestamp"])

# Verify the events match
assert events_json == self.nats_events, "Events do not match"

finally:
with suppress(Exception):
# Clean up: Stop the NATS client
if self.nc.is_connected:
await self.nc.drain()
await self.nc.close()
# Stop NATS server container
await asyncio.create_subprocess_exec(
"docker", "stop", "bbot-test-nats", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
22 changes: 1 addition & 21 deletions bbot/test/test_step_2/module_tests/test_module_postgres.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import time
import asyncio

from .base import ModuleTestBase
Expand All @@ -25,27 +24,8 @@ async def setup_before_prep(self, module_test):
"postgres",
)

import asyncpg

# wait for the container to start
start_time = time.time()
while True:
try:
# Connect to the default 'postgres' database to create 'bbot'
conn = await asyncpg.connect(
user="postgres", password="bbotislife", database="postgres", host="127.0.0.1"
)
await conn.execute("CREATE DATABASE bbot")
await conn.close()
break
except asyncpg.exceptions.DuplicateDatabaseError:
# If the database already exists, break the loop
break
except Exception as e:
if time.time() - start_time > 60: # timeout after 60 seconds
self.log.error("PostgreSQL server did not start in time.")
raise e
await asyncio.sleep(1)
await self.wait_for_port_open(5432)

if process.returncode != 0:
self.log.error("Failed to start PostgreSQL server")
Expand Down

0 comments on commit 3d4cfd9

Please sign in to comment.