Skip to content

Commit

Permalink
fet: finished aggregated solution implementation for ollama
Browse files Browse the repository at this point in the history
  • Loading branch information
KaiserRuben committed Jul 15, 2024
1 parent 730d3dd commit 1ce18e9
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 3 deletions.
14 changes: 13 additions & 1 deletion src/ai/LLM/BaseLLMService.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,17 @@ def generate_aggregated_solution(self, findings: List[Finding]) -> List[Tuple[st

return results

def _get_findings_str_for_aggregation(self, findings, details=False) -> str:
findings_str = ''
for id, finding in enumerate(findings):
findings_str += f"{id + 1}. \n"
findings_str += finding.description.replace('\n', ' ') + '\n'
if details:
findings_str += "Locations: " ",".join(finding.location_list)
findings_str += str(finding.category) + '\n'
if finding.solution:
findings_str += finding.solution.short_description.replace('\n', ' ') + '\n\n'
return findings_str

def _subdivide_finding_group(self, findings: List[Finding]) -> List[Tuple[List[Finding], Dict]]:
prompt = self._get_subdivision_prompt(findings)
Expand All @@ -131,7 +142,8 @@ def _get_subdivision_prompt(self, findings: List[Finding]) -> str:
pass

@abstractmethod
def _process_subdivision_response(self, response: Dict, findings: List[Finding]) -> List[Tuple[List[Finding], Dict]]:
def _process_subdivision_response(self, response: Dict, findings: List[Finding]) -> List[
Tuple[List[Finding], Dict]]:
pass

@abstractmethod
Expand Down
49 changes: 47 additions & 2 deletions src/ai/LLM/Strategies/OLLAMAService.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Dict, Union, Optional
from typing import List, Dict, Union, Optional, Tuple
import logging
import httpx
from tenacity import retry, stop_after_attempt, wait_exponential
Expand All @@ -14,7 +14,10 @@
LONG_RECOMMENDATION_TEMPLATE,
META_PROMPT_GENERATOR_TEMPLATE,
GENERIC_LONG_RECOMMENDATION_TEMPLATE,
SEARCH_TERMS_TEMPLATE, COMBINE_DESCRIPTIONS_TEMPLATE,
SEARCH_TERMS_TEMPLATE,
COMBINE_DESCRIPTIONS_TEMPLATE,
AGGREGATED_SOLUTION_TEMPLATE,
SUBDIVISION_PROMPT_TEMPLATE
)
from config import config

Expand Down Expand Up @@ -137,6 +140,48 @@ def _process_search_terms_response(self, response: Dict[str, str], finding: Find
return ""
return clean(response["search_terms"], llm_service=self)

def _get_subdivision_prompt(self, findings: List[Finding]) -> str:
findings_str = self._get_findings_str_for_aggregation(findings)
return SUBDIVISION_PROMPT_TEMPLATE.format(data=findings_str)

def _process_subdivision_response(self, response: Dict, findings: List[Finding]) -> List[Tuple[List[Finding], Dict]]:
if "subdivisions" not in response:
logger.warning("Failed to subdivide findings")
return [(findings, {})] # Return all findings as a single group if subdivision fails

subdivisions = response["subdivisions"]
result = []
for subdivision in subdivisions:
try:
if "-" in subdivision["group"]: # Bro, the llm is trolling! I swear, I put in the prompt explicitly it should make it comma seperated!
left = int(subdivision["group"].split("-")[0])
right = int(subdivision["group"].split("-")[1])
group_indices = [i for i in range(left, right+1)]
else:
group_indices = [int(i) - 1 for i in subdivision["group"].split(',')]
except ValueError:
logger.error(f"Failed to parse group indices: {subdivision['group']}")
continue
group = [findings[i] for i in group_indices if i < len(findings)]
meta_info = {"reason": subdivision.get("reason", "")}
result.append((group, meta_info))

return result

def _get_aggregated_solution_prompt(self, findings: List[Finding], meta_info: Dict) -> str:
findings_str = self._get_findings_str_for_aggregation(findings, details=True)

return AGGREGATED_SOLUTION_TEMPLATE.format(
data=findings_str,
meta_info=meta_info.get("reason", "")
)

def _process_aggregated_solution_response(self, response: Dict[str, str]) -> str:
if "aggregated_solution" not in response:
logger.warning("Failed to generate an aggregated solution")
return ""
return clean(response["aggregated_solution"], llm_service=self)

def convert_dict_to_str(self, data: Dict) -> str:
"""
Convert a dictionary to a string representation.
Expand Down
35 changes: 35 additions & 0 deletions src/ai/LLM/Strategies/ollama_prompts.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,41 @@ def answer_in_json_prompt(key: str) -> str:
"[DATA]\n{data}\n[/DATA]"
)

SUBDIVISION_PROMPT_TEMPLATE = (
"You are a cybersecurity expert tasked with grouping related security findings. "
"Analyze the following list of findings and group them based on their relationships or common themes. "
"For each group, provide a brief reason for grouping them together.\n\n"
"Provide your answer in the following JSON format:\n"
'{{"subdivisions": [\n'
' {{"group": "<comma-separated list of finding numbers, e.g. 3,4,5,8>", "reason": "<brief reason for grouping>"}}\n'
']}}\n\n'
"Findings:\n{data}"
)

AGGREGATED_SOLUTION_TEMPLATE = (
"As a senior cybersecurity strategist, your task is to provide a high-level, strategic solution for a group of related security findings. "
"Your goal is to synthesize the information and create a broad, actionable recommendation that addresses the root causes of multiple issues.\n\n"
"Group meta information: {meta_info}\n\n"
"Instructions:\n"
"1. Review the group of findings provided at the end of this prompt.\n"
"2. Identify common themes or root causes among the findings.\n"
"3. Generate a strategic, overarching solution that addresses these core issues.\n"
"4. Your solution should be:\n"
" - High-level: Focus on broad strategies rather than specific technical fixes\n"
" - Widely applicable: Address multiple findings with each recommendation\n"
" - Proactive: Aim to prevent similar issues in the future\n"
" - Actionable: Provide clear, general steps for implementation\n"
" - Concise: Use clear and precise language\n\n"
"Your response should be structured as follows:\n"
"1. Summary: A brief overview of the core security challenges (1-2 sentences)\n"
"2. Strategic Solution: A high-level approach to address the underlying issues (3-5 key points)\n"
"3. Implementation Guidance: General steps for putting the strategy into action\n"
"4. Long-term Considerations: Suggestions for ongoing improvement and risk mitigation\n\n"
"You may use Markdown formatting in your response to improve readability.\n"
f"{answer_in_json_prompt('aggregated_solution')}"
"Findings:\n{data}"
)

CONVERT_DICT_TO_STR_TEMPLATE = (
"You are a data formatting expert. Convert the following dictionary into a human-readable string. "
f"{answer_in_json_prompt('converted_text')}"
Expand Down

0 comments on commit 1ce18e9

Please sign in to comment.