Skip to content

Commit

Permalink
Enhance MoodleScanner with output processing and error handling
Browse files Browse the repository at this point in the history
- Added a new method `process_output` to the MoodleScanner class for detailed parsing of moodlescan output.
- Improved error handling by capturing connection errors and logging relevant information.
- Updated the logic for saving task results based on the processed output, including server info, version info, and vulnerabilities.
- Refactored the run method to utilize the new output processing method, enhancing clarity and maintainability.
  • Loading branch information
kshitijk4poor committed Jan 18, 2025
1 parent 9c62730 commit 98d6ac5
Showing 1 changed file with 76 additions and 54 deletions.
130 changes: 76 additions & 54 deletions artemis/modules/moodle_scanner.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
import dataclasses
import subprocess
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional

from karton.core import Task

Expand All @@ -10,6 +10,7 @@
from artemis.module_base import ArtemisBase
from artemis.task_utils import get_target_url


@dataclasses.dataclass
class MoodleMessage:
category: str
Expand All @@ -19,6 +20,7 @@ class MoodleMessage:
def message(self) -> str:
return f"{self.category}: {', '.join(self.problems)}"


def process_moodle_json(result: Dict[str, Any]) -> List[MoodleMessage]:
messages: Dict[str, MoodleMessage] = {}

Expand Down Expand Up @@ -51,24 +53,73 @@ def process_moodle_json(result: Dict[str, Any]) -> List[MoodleMessage]:
]:
if category not in messages:
messages[category] = MoodleMessage(category=category, problems=[])
messages[category].problems.append(item)
messages[category].problems.append(str(item))

return list(messages.values())


@load_risk_class.load_risk_class(load_risk_class.LoadRiskClass.MEDIUM)
class MoodleScanner(ArtemisBase):
"""
Runs Moodle-Scanner -> A Moodle Vulnerability Analyzer
"""

identity = "moodle_scanner"
filters = [
identity: str = "moodle_scanner"
filters: List[Dict[str, str]] = [
{"type": TaskType.SERVICE.value, "service": Service.HTTP.value},
]

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)

def process_output(self, output: str) -> Dict[str, Any]:
"""Process moodlescan output and extract relevant information."""
output_lines = output.splitlines()
server_info: Optional[str] = None
version_info: Optional[str] = None
vulnerabilities: List[str] = []
error_message: Optional[str] = None

for i, line in enumerate(output_lines):
if "Error: Can't connect" in line:
error_message = line
break

if "server" in line.lower() and ":" in line:
server_info = line.split(":", 1)[1].strip()
elif "version" in line.lower() and not line.startswith("."):
# Look at next line for version info if it's not dots or error
if i + 1 < len(output_lines):
next_line = output_lines[i + 1].strip()
if next_line and not next_line.startswith(".") and "error" not in next_line.lower():
version_info = next_line
elif "vulnerability" in line.lower() or "cve" in line.lower():
vulnerabilities.append(line.strip())

# Determine status and reason based on findings
if error_message:
status = TaskStatus.OK
status_reason = error_message
elif vulnerabilities:
status = TaskStatus.INTERESTING
status_reason = f"Found: {', '.join(vulnerabilities)}"
elif version_info and version_info != "Version not found":
status = TaskStatus.INTERESTING
status_reason = f"Found version: {version_info}"
else:
status = TaskStatus.OK
status_reason = "Version not found" if version_info == "Version not found" else None

return {
"server": server_info,
"version": version_info,
"vulnerabilities": vulnerabilities,
"error": error_message,
"raw_output": output,
"status": status,
"status_reason": status_reason,
}

def run(self, current_task: Task) -> None:
base_url = get_target_url(current_task)
self.log.info(f"Starting moodlescan for {base_url}")
Expand All @@ -82,56 +133,6 @@ def run(self, current_task: Task) -> None:
text=True,
check=True,
)

self.log.info(f"Moodlescan stdout: {process.stdout}")
if process.stderr:
self.log.warning(f"Moodlescan stderr: {process.stderr}")

# Parse the output for relevant information
output_lines = process.stdout.splitlines()
server_info = None
version_info = None
vulnerabilities = []

for i, line in enumerate(output_lines):
if "Error: Can't connect" in line:
self.log.info(f"Connection error: {line}")
self.db.save_task_result(
task=current_task, status=TaskStatus.OK, status_reason=line, data={"raw_output": process.stdout}
)
return

if "server" in line.lower() and ":" in line:
server_info = line.split(":", 1)[1].strip()
elif "version" in line.lower() and not line.startswith("."):
# Look at next line for version info if it's not dots or error
if i + 1 < len(output_lines):
next_line = output_lines[i + 1].strip()
if next_line and not next_line.startswith(".") and "error" not in next_line.lower():
version_info = next_line
elif "vulnerability" in line.lower() or "cve" in line.lower():
vulnerabilities.append(line.strip())

result = {
"server": server_info,
"version": version_info,
"vulnerabilities": vulnerabilities,
"raw_output": process.stdout,
}

# Determine if anything interesting was found
if vulnerabilities:
status = TaskStatus.INTERESTING
status_reason = f"Found: {', '.join(vulnerabilities)}"
elif version_info and version_info != "Version not found":
status = TaskStatus.INTERESTING
status_reason = f"Found version: {version_info}"
else:
status = TaskStatus.OK
status_reason = "Version not found" if version_info == "Version not found" else None

self.db.save_task_result(task=current_task, status=status, status_reason=status_reason, data=result)

except subprocess.CalledProcessError as e:
self.log.error(f"Failed to run moodlescan for {base_url}")
self.log.error(f"Exit code: {e.returncode}")
Expand All @@ -145,5 +146,26 @@ def run(self, current_task: Task) -> None:
)
return

self.log.info(f"Moodlescan stdout: {process.stdout}")
if process.stderr:
self.log.warning(f"Moodlescan stderr: {process.stderr}")

result = self.process_output(process.stdout)

if result["error"]:
self.log.info(f"Connection error: {result['error']}")
self.db.save_task_result(
task=current_task,
status=result["status"],
status_reason=result["status_reason"],
data={"raw_output": result["raw_output"]},
)
return

self.db.save_task_result(
task=current_task, status=result["status"], status_reason=result["status_reason"], data=result
)


if __name__ == "__main__":
MoodleScanner().loop()

0 comments on commit 98d6ac5

Please sign in to comment.