Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve triaging process #250

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 221 additions & 3 deletions src/bot/exts/dragonfly/dragonfly.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import discord
import sentry_sdk
from discord.ext import commands, tasks
from discord.utils import format_dt

from bot.bot import Bot
from bot.constants import Channels, DragonflyConfig, Roles
Expand Down Expand Up @@ -261,6 +262,200 @@ async def report(self: Self, interaction: discord.Interaction, button: discord.u
await interaction.edit_original_response(view=self)


class NoteModal(discord.ui.Modal, title="Add a note"):
"""A modal that allows users to add a note to a package."""

_interaction: discord.Interaction | None = None
note_content = discord.ui.TextInput(
label="Content",
placeholder="Enter the note content here",
min_length=1,
max_length=1000, # Don't want to overfill the embed
)

def __init__(self, embed: discord.Embed, view: discord.ui.View) -> None:
super().__init__()

self.embed = embed
self.view = view

async def on_submit(self, interaction: discord.Interaction) -> None:
"""Modal submit callback."""
if not interaction.response.is_done():
await interaction.response.defer()
self._interaction = interaction

content = f"{self.note_content.value} • {interaction.user.mention}"

# We need to check what fields the embed has to determine where to add the note
# If the embed has no fields, we add the note and return
# Otherwise, we need to make sure the note is added after the event log
# This involves clearing the fields and re-adding them in the correct order
# Which is why we save the event log in a variable

match len(self.embed.fields):
case 0: # Package is awaiting triage, no notes or event log
notes = [content]
event_log = None
case 1: # Package either has notes or event log
if self.embed.fields[0].name == "Notes":
notes = [self.embed.fields[0].value, content]
else:
event_log = self.embed.fields[0].value
notes = [content]
self.embed.clear_fields()
case 2: # Package has both notes and event log
if self.embed.fields[0].name == "Notes":
notes = [self.embed.fields[0].value, content]
event_log = self.embed.fields[1].value
else:
notes = [self.embed.fields[1].value, content]
event_log = self.embed.fields[0].value
self.embed.clear_fields()

self.embed.add_field(name="Notes", value="\n".join(notes), inline=False)

if event_log:
self.embed.add_field(name="Event log", value=event_log, inline=False)

await interaction.message.edit(embed=self.embed, view=self.view)

async def on_error(
self,
interaction: discord.Interaction,
error: Exception,
) -> None:
"""Handle errors that occur in the modal."""
await interaction.response.send_message(
"An unexpected error occured.",
ephemeral=True,
)
raise error


class MalwareView(discord.ui.View):
"""View for the malware triage system."""

message: discord.Message | None = None

def __init__(
self: Self,
embed: discord.Embed,
bot: Bot,
payload: Package,
) -> None:
self.embed = embed
self.bot = bot
self.payload = payload
self.event_log = []

super().__init__(timeout=None)

async def add_event(self, message: str) -> None:
"""Add an event to the event log."""
# Much like earlier, we need to check the fields of the embed to determine where to add the event log
match len(self.embed.fields):
case 0:
pass
case 1:
if self.embed.fields[0].name == "Event log":
self.embed.clear_fields()
case 2:
if self.embed.fields[0].name == "Event log":
self.embed.clear_fields()
elif self.embed.fields[1].name == "Event log":
self.embed.remove_field(1)

self.event_log.append(
message,
) # For future reference, we save the event log in a variable
self.embed.add_field(
name="Event log",
value="\n".join(self.event_log),
inline=False,
)

async def update_status(self, status: str) -> None:
"""Update the status of the package in the embed."""
self.embed.set_footer(text=status)

def get_timestamp(self) -> str:
"""Return the current timestamp, formatted in Discord's relative style."""
return format_dt(datetime.now(UTC), style="R")

@discord.ui.button(
label="Report",
style=discord.ButtonStyle.red,
)
async def report(
self,
interaction: discord.Interaction,
button: discord.ui.Button,
) -> None:
"""Report package and update the embed."""
self.approve.disabled = False
await self.add_event(
f"Reported by {interaction.user.mention} • {self.get_timestamp()}",
)
await self.update_status("Flagged as malicious")

self.embed.color = discord.Color.red()

modal = ConfirmReportModal(package=self.payload, bot=self.bot)
await interaction.response.send_modal(modal)

timed_out = await modal.wait()
if not timed_out:
button.disabled = True
await interaction.edit_original_response(view=self, embed=self.embed)

@discord.ui.button(
label="Approve",
style=discord.ButtonStyle.green,
)
async def approve(
self,
interaction: discord.Interaction,
button: discord.ui.Button,
) -> None:
"""Approve package and update the embed."""
self.report.disabled = False
await self.add_event(
f"Approved by {interaction.user.mention} • {self.get_timestamp()}",
)
await self.update_status("Flagged as benign")

button.disabled = True

self.embed.color = discord.Color.green()
await interaction.response.edit_message(view=self, embed=self.embed)

@discord.ui.button(
label="Add note",
style=discord.ButtonStyle.grey,
)
async def add_note(
self,
interaction: discord.Interaction,
_button: discord.ui.Button,
) -> None:
"""Add note to the embed."""
await interaction.response.send_modal(NoteModal(embed=self.embed, view=self))

async def on_error(
self,
interaction: discord.Interaction[discord.Client],
error: Exception,
_item: discord.ui.Item,
) -> None:
"""Handle errors that occur in the view."""
await interaction.response.send_message(
"An unexpected error occured.",
ephemeral=True,
)
raise error


def _build_package_scan_result_embed(scan_result: Package) -> discord.Embed:
"""Build the embed that shows the results of a package scan."""
condition = (scan_result.score or 0) >= DragonflyConfig.threshold
Expand All @@ -287,6 +482,27 @@ def _build_package_scan_result_embed(scan_result: Package) -> discord.Embed:
return embed


def _build_package_scan_result_triage_embed(
scan_result: Package,
) -> discord.Embed:
"""Build the embed for the malware triage system."""
embed = discord.Embed(
title="View on Inspector",
description=f"```{', '.join(scan_result.rules)}```",
url=scan_result.inspector_url,
color=discord.Color.orange(),
timestamp=datetime.now(UTC),
)
embed.set_author(
name=f"{scan_result.name}@{scan_result.version}",
url=f"https://pypi.org/project/{scan_result.name}/{scan_result.version}",
icon_url="https://seeklogo.com/images/P/pypi-logo-5B953CE804-seeklogo.com.png",
)
embed.set_footer(text="Awaiting triage")

return embed


def _build_all_packages_scanned_embed(scan_results: list[Package]) -> discord.Embed:
"""Build the embed that shows a list of all packages scanned."""
if scan_results:
Expand All @@ -307,11 +523,13 @@ async def run(
scan_results = await bot.dragonfly_services.get_scanned_packages(since=since)
for result in scan_results:
if result.score >= score:
embed = _build_package_scan_result_embed(result)
await alerts_channel.send(
embed = _build_package_scan_result_triage_embed(result)
view = MalwareView(embed=embed, bot=bot, payload=result)

view.message = await alerts_channel.send(
f"<@&{DragonflyConfig.alerts_role_id}>",
embed=embed,
view=ReportView(bot, result),
view=view,
)

await logs_channel.send(embed=_build_all_packages_scanned_embed(scan_results))
Expand Down
Loading