Skip to content

Commit

Permalink
Changed webhook to POST an adaptive card
Browse files Browse the repository at this point in the history
  • Loading branch information
domwhewell-sage committed Sep 2, 2024
1 parent bac9442 commit b04b047
Showing 1 changed file with 105 additions and 2 deletions.
107 changes: 105 additions & 2 deletions bbot/modules/output/teams.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import yaml

from bbot.modules.templates.webhook import WebhookOutputModule


Expand All @@ -10,13 +12,114 @@ class Teams(WebhookOutputModule):
}
options = {"webhook_url": "", "event_types": ["VULNERABILITY", "FINDING"], "min_severity": "LOW"}
options_desc = {
"webhook_url": "Discord webhook URL",
"webhook_url": "Teams webhook URL",
"event_types": "Types of events to send",
"min_severity": "Only allow VULNERABILITY events of this severity or higher",
}
_module_threads = 5
good_status_code = 200
content_key = "text"
adaptive_card = {
"type": "message",
"attachments": [
{
"contentType": "application/vnd.microsoft.card.adaptive",
"contentUrl": None,
"content": {
"$schema": "http://adaptivecards.io/schemas/adaptive-card.json",
"type": "AdaptiveCard",
"version": "1.2",
"body": [],
},
}
],
}

async def handle_event(self, event):
while 1:
data = self.format_message(self.adaptive_card.copy(), event)

response = await self.helpers.request(
url=self.webhook_url,
method="POST",
json=data,
)
status_code = getattr(response, "status_code", 0)
if self.evaluate_response(response):
break
else:
response_data = getattr(response, "text", "")
try:
retry_after = response.json().get("retry_after", 1)
except Exception:
retry_after = 1
self.verbose(
f"Error sending {event}: status code {status_code}, response: {response_data}, retrying in {retry_after} seconds"
)
await self.helpers.sleep(retry_after)

def format_message_str(self, event):
items = []
msg = event.data
if len(msg) > self.message_size_limit:
msg = msg[: self.message_size_limit - 3] + "..."
items.append({"type": "TextBlock", "text": f"{msg}", "wrap": True})
items.append({"type": "FactSet", "facts": [{"title": "Tags:", "value": ", ".join(event.tags)}]})
return items

def format_message_other(self, event):
event_yaml = yaml.dump(event.data)
msg = event_yaml
if len(msg) > self.message_size_limit:
msg = msg[: self.message_size_limit - 3] + "..."
return [{"type": "TextBlock", "text": f"{msg}", "wrap": True}]

def get_severity_color(self, event):
color = "Accent"
if event.type == "VULNERABILITY":
severity = event.data.get("severity", "UNKNOWN")
if severity == "CRITICAL":
color = "Attention"
elif severity == "HIGH":
color = "Attention"
elif severity == "MEDIUM":
color = "Warning"
elif severity == "LOW":
color = "Good"
return color

def format_message(self, adaptive_card, event):
heading = {"type": "TextBlock", "text": f"{event.type}", "wrap": True, "style": "heading"}
body = adaptive_card["attachments"][0]["content"]["body"]
body.append(heading)
if event.type in ("VULNERABILITY", "FINDING"):
subheading = {
"type": "TextBlock",
"text": event.data.get("severity", "UNKNOWN"),
"spacing": "None",
"size": "Large",
"wrap": True,
}
subheading["color"] = self.get_severity_color(self, event)
main_text = {
"type": "ColumnSet",
"separator": True,
"spacing": "Medium",
"columns": [
{
"type": "Column",
"width": "stretch",
"items": [],
}
],
}
if isinstance(event.data, str):
items = self.format_message_str(event)
else:
items = self.format_message_other(event)
for item in items:
main_text["columns"][0]["items"].append(item)
body.append(main_text)
return adaptive_card

def evaluate_response(self, response):
text = getattr(response, "text", "")
Expand Down

0 comments on commit b04b047

Please sign in to comment.