Skip to content

Commit

Permalink
Merge branch 'master' into chore/replace-with-native-methods
Browse files Browse the repository at this point in the history
  • Loading branch information
williballenthin authored Feb 4, 2025
2 parents 40c3386 + 96f9e7c commit 2c12db2
Show file tree
Hide file tree
Showing 18 changed files with 254 additions and 162 deletions.
8 changes: 6 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,22 @@
## master (unreleased)

### New Features

- add warning for dynamic .NET samples #1864 @v1bh475u
- add lint for detecting duplicate features in capa-rules #2250 @v1bh475u
- add span-of-calls scope to match features against a across a sliding window of API calls within a thread @williballenthin #2532
- add lint to catch rules that depend on other rules with impossible scope @williballenthin #2124

### Breaking Changes

- remove `is_static_limitation` method from `capa.rules.Rule`
- add span-of-calls scope to rule format
- capabilities functions return dataclasses instead of tuples

### New Rules (3)

- data-manipulation/encryption/rsa/encrypt-data-using-rsa-via-embedded-library Ana06
- data-manipulation/encryption/use-bigint-function Ana06
- data-manipulation/encryption/rsa/encrypt-data-using-rsa-via-embedded-library @Ana06
- data-manipulation/encryption/use-bigint-function @Ana06
- nursery/dynamic-add-veh [email protected]
-

Expand All @@ -29,6 +32,7 @@
- elffile: handle symbols without a name @williballenthin #2553
- project: remove pytest-cov that wasn't used @williballenthin @2491
- replace binascii methods with native Python methods @v1bh475u #2582
- rules: scopes can now have subscope blocks with the same scope @williballenthin #2584

### capa Explorer Web

Expand Down
61 changes: 38 additions & 23 deletions capa/capabilities/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from typing import Optional
from dataclasses import dataclass

from capa.rules import Scope, RuleSet
from capa.rules import Rule, Scope, RuleSet
from capa.engine import FeatureSet, MatchResults
from capa.features.address import NO_ADDRESS
from capa.render.result_document import LibraryFunction, StaticFeatureCounts, DynamicFeatureCounts
Expand Down Expand Up @@ -58,28 +58,6 @@ def find_file_capabilities(
return FileCapabilities(features, matches, len(file_features))


def has_file_limitation(rules: RuleSet, capabilities: MatchResults, is_standalone=True) -> bool:
file_limitation_rules = list(filter(lambda r: r.is_file_limitation_rule(), rules.rules.values()))

for file_limitation_rule in file_limitation_rules:
if file_limitation_rule.name not in capabilities:
continue

logger.warning("-" * 80)
for line in file_limitation_rule.meta.get("description", "").split("\n"):
logger.warning(" %s", line)
logger.warning(" Identified via rule: %s", file_limitation_rule.name)
if is_standalone:
logger.warning(" ")
logger.warning(" Use -v or -vv if you really want to see the capabilities identified by capa.")
logger.warning("-" * 80)

# bail on first file limitation
return True

return False


@dataclass
class Capabilities:
matches: MatchResults
Expand All @@ -100,3 +78,40 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro
return find_dynamic_capabilities(ruleset, extractor, disable_progress=disable_progress, **kwargs)

raise ValueError(f"unexpected extractor type: {extractor.__class__.__name__}")


def has_limitation(rules: list, capabilities: Capabilities | FileCapabilities, is_standalone: bool) -> bool:

for rule in rules:
if rule.name not in capabilities.matches:
continue
logger.warning("-" * 80)
for line in rule.meta.get("description", "").split("\n"):
logger.warning(" %s", line)
logger.warning(" Identified via rule: %s", rule.name)
if is_standalone:
logger.warning(" ")
logger.warning(" Use -v or -vv if you really want to see the capabilities identified by capa.")
logger.warning("-" * 80)

# bail on first file limitation
return True
return False


def is_static_limitation_rule(r: Rule) -> bool:
return r.meta.get("namespace", "") == "internal/limitation/static"


def has_static_limitation(rules: RuleSet, capabilities: Capabilities | FileCapabilities, is_standalone=True) -> bool:
file_limitation_rules = list(filter(lambda r: is_static_limitation_rule(r), rules.rules.values()))
return has_limitation(file_limitation_rules, capabilities, is_standalone)


def is_dynamic_limitation_rule(r: Rule) -> bool:
return r.meta.get("namespace", "") == "internal/limitation/dynamic"


def has_dynamic_limitation(rules: RuleSet, capabilities: Capabilities | FileCapabilities, is_standalone=True) -> bool:
dynamic_limitation_rules = list(filter(lambda r: is_dynamic_limitation_rule(r), rules.rules.values()))
return has_limitation(dynamic_limitation_rules, capabilities, is_standalone)
10 changes: 3 additions & 7 deletions capa/features/extractors/cape/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import capa.features.extractors.cape.global_
import capa.features.extractors.cape.process
from capa.exceptions import EmptyReportError, UnsupportedFormatError
from capa.features.common import Feature, Characteristic
from capa.features.address import NO_ADDRESS, Address, AbsoluteVirtualAddress, _NoAddress
from capa.features.common import Feature
from capa.features.address import Address, AbsoluteVirtualAddress, _NoAddress
from capa.features.extractors.cape.models import Call, Static, Process, CapeReport
from capa.features.extractors.base_extractor import (
CallHandle,
Expand Down Expand Up @@ -77,11 +77,7 @@ def get_threads(self, ph: ProcessHandle) -> Iterator[ThreadHandle]:
yield from capa.features.extractors.cape.process.get_threads(ph)

def extract_thread_features(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[tuple[Feature, Address]]:
if False:
# force this routine to be a generator,
# but we don't actually have any elements to generate.
yield Characteristic("never"), NO_ADDRESS
return
yield from []

def get_calls(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[CallHandle]:
yield from capa.features.extractors.cape.thread.get_calls(ph, th)
Expand Down
8 changes: 2 additions & 6 deletions capa/features/extractors/drakvuf/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import capa.features.extractors.drakvuf.thread
import capa.features.extractors.drakvuf.global_
import capa.features.extractors.drakvuf.process
from capa.features.common import Feature, Characteristic
from capa.features.common import Feature
from capa.features.address import NO_ADDRESS, Address, ThreadAddress, ProcessAddress, AbsoluteVirtualAddress, _NoAddress
from capa.features.extractors.base_extractor import (
CallHandle,
Expand Down Expand Up @@ -74,11 +74,7 @@ def get_threads(self, ph: ProcessHandle) -> Iterator[ThreadHandle]:
yield from capa.features.extractors.drakvuf.process.get_threads(self.sorted_calls, ph)

def extract_thread_features(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[tuple[Feature, Address]]:
if False:
# force this routine to be a generator,
# but we don't actually have any elements to generate.
yield Characteristic("never"), NO_ADDRESS
return
yield from []

def get_calls(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[CallHandle]:
yield from capa.features.extractors.drakvuf.thread.get_calls(self.sorted_calls, ph, th)
Expand Down
2 changes: 1 addition & 1 deletion capa/features/extractors/elf.py
Original file line number Diff line number Diff line change
Expand Up @@ -1088,7 +1088,7 @@ def guess_os_from_go_buildinfo(elf: ELF) -> Optional[OS]:
# and the 32-byte header is followed by varint-prefixed string data
# for the two string values we care about.
# https://github.com/mandiant/GoReSym/blob/0860a1b1b4f3495e9fb7e71eb4386bf3e0a7c500/buildinfo/buildinfo.go#L185-L193
BUILDINFO_MAGIC = b"\xFF Go buildinf:"
BUILDINFO_MAGIC = b"\xff Go buildinf:"

try:
index = buf.index(BUILDINFO_MAGIC)
Expand Down
5 changes: 1 addition & 4 deletions capa/features/extractors/pefile.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,7 @@ def extract_file_function_names(**kwargs):
"""
extract the names of statically-linked library functions.
"""
if False:
# using a `yield` here to force this to be a generator, not function.
yield NotImplementedError("pefile doesn't have library matching")
return
yield from []


def extract_file_os(**kwargs):
Expand Down
2 changes: 1 addition & 1 deletion capa/ghidra/capa_explorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ def get_capabilities():

capabilities = capa.capabilities.common.find_capabilities(rules, extractor, True)

if capa.capabilities.common.has_file_limitation(rules, capabilities.matches, is_standalone=False):
if capa.capabilities.common.has_static_limitation(rules, capabilities, is_standalone=False):
popup("capa explorer encountered warnings during analysis. Please check the console output for more information.") # type: ignore [name-defined] # noqa: F821
logger.info("capa encountered warnings during analysis")

Expand Down
4 changes: 2 additions & 2 deletions capa/ghidra/capa_ghidra.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def run_headless():
meta.analysis.library_functions = capabilities.library_functions
meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities.matches)

if capa.capabilities.common.has_file_limitation(rules, capabilities.matches, is_standalone=True):
if capa.capabilities.common.has_static_limitation(rules, capabilities, is_standalone=True):
logger.info("capa encountered warnings during analysis")

if args.json:
Expand Down Expand Up @@ -137,7 +137,7 @@ def run_ui():
meta.analysis.library_functions = capabilities.library_functions
meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities.matches)

if capa.capabilities.common.has_file_limitation(rules, capabilities.matches, is_standalone=False):
if capa.capabilities.common.has_static_limitation(rules, capabilities, is_standalone=False):
logger.info("capa encountered warnings during analysis")

if verbose == "vverbose":
Expand Down
2 changes: 1 addition & 1 deletion capa/ida/plugin/form.py
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,7 @@ def slot_progress_feature_extraction(text):

capa.ida.helpers.inform_user_ida_ui("capa encountered file type warnings during analysis")

if capa.capabilities.common.has_file_limitation(ruleset, capabilities.matches, is_standalone=False):
if capa.capabilities.common.has_static_limitation(ruleset, capabilities, is_standalone=False):
capa.ida.helpers.inform_user_ida_ui("capa encountered file limitation warnings during analysis")
except Exception as e:
logger.exception("Failed to check for file limitations (error: %s)", e)
Expand Down
53 changes: 44 additions & 9 deletions capa/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,13 @@
FORMAT_BINJA_DB,
FORMAT_BINEXPORT2,
)
from capa.capabilities.common import Capabilities, find_capabilities, has_file_limitation, find_file_capabilities
from capa.capabilities.common import (
Capabilities,
find_capabilities,
has_static_limitation,
find_file_capabilities,
has_dynamic_limitation,
)
from capa.features.extractors.base_extractor import (
ProcessFilter,
FunctionFilter,
Expand Down Expand Up @@ -747,11 +753,12 @@ def get_file_extractors_from_cli(args, input_format: str) -> list[FeatureExtract
raise ShouldExitError(E_INVALID_FILE_TYPE) from e


def find_file_limitations_from_cli(args, rules: RuleSet, file_extractors: list[FeatureExtractor]) -> bool:
def find_static_limitations_from_cli(args, rules: RuleSet, file_extractors: list[FeatureExtractor]) -> bool:
"""
args:
args: The parsed command line arguments from `install_common_args`.
Only file-scoped feature extractors like pefile are used.
Dynamic feature extractors can handle packed samples and do not need to be considered here.
raises:
Expand All @@ -770,7 +777,7 @@ def find_file_limitations_from_cli(args, rules: RuleSet, file_extractors: list[F

# file limitations that rely on non-file scope won't be detected here.
# nor on FunctionName features, because pefile doesn't support this.
found_file_limitation = has_file_limitation(rules, pure_file_capabilities.matches)
found_file_limitation = has_static_limitation(rules, pure_file_capabilities)
if found_file_limitation:
# bail if capa encountered file limitation e.g. a packed binary
# do show the output in verbose mode, though.
Expand All @@ -780,6 +787,31 @@ def find_file_limitations_from_cli(args, rules: RuleSet, file_extractors: list[F
return found_file_limitation


def find_dynamic_limitations_from_cli(args, rules: RuleSet, file_extractors: list[FeatureExtractor]) -> bool:
"""
Does the dynamic analysis describe some trace that we may not support well?
For example, .NET samples detonated in a sandbox, which may rely on different API patterns than we currently describe in our rules.
args:
args: The parsed command line arguments from `install_common_args`.
raises:
ShouldExitError: if the program is invoked incorrectly and should exit..
"""
found_dynamic_limitation = False
for file_extractor in file_extractors:
pure_dynamic_capabilities = find_file_capabilities(rules, file_extractor, {})
found_dynamic_limitation = has_dynamic_limitation(rules, pure_dynamic_capabilities)

if found_dynamic_limitation:
# bail if capa encountered file limitation e.g. a dotnet sample is detected
# do show the output in verbose mode, though.
if not (args.verbose or args.vverbose or args.json):
logger.debug("file limitation short circuit, won't analyze fully.")
raise ShouldExitError(E_FILE_LIMITATION)
return found_dynamic_limitation


def get_signatures_from_cli(args, input_format: str, backend: str) -> list[Path]:
if backend != BACKEND_VIV:
logger.debug("skipping library code matching: only supported by the vivisect backend")
Expand Down Expand Up @@ -964,11 +996,13 @@ def main(argv: Optional[list[str]] = None):
ensure_input_exists_from_cli(args)
input_format = get_input_format_from_cli(args)
rules = get_rules_from_cli(args)
found_file_limitation = False
found_limitation = False
file_extractors = get_file_extractors_from_cli(args, input_format)
if input_format in STATIC_FORMATS:
# only static extractors have file limitations
file_extractors = get_file_extractors_from_cli(args, input_format)
found_file_limitation = find_file_limitations_from_cli(args, rules, file_extractors)
found_limitation = find_static_limitations_from_cli(args, rules, file_extractors)
if input_format in DYNAMIC_FORMATS:
found_limitation = find_dynamic_limitations_from_cli(args, rules, file_extractors)
except ShouldExitError as e:
return e.status_code

Expand Down Expand Up @@ -1002,8 +1036,9 @@ def main(argv: Optional[list[str]] = None):
)
meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities.matches)

if isinstance(extractor, StaticFeatureExtractor) and found_file_limitation:
if found_limitation:
# bail if capa's static feature extractor encountered file limitation e.g. a packed binary
# or capa's dynamic feature extractor encountered some limitation e.g. a dotnet sample
# do show the output in verbose mode, though.
if not (args.verbose or args.vverbose or args.json):
return E_FILE_LIMITATION
Expand Down Expand Up @@ -1056,7 +1091,7 @@ def ida_main():
meta.analysis.feature_counts = capabilities.feature_counts
meta.analysis.library_functions = capabilities.library_functions

if has_file_limitation(rules, capabilities.matches, is_standalone=False):
if has_static_limitation(rules, capabilities, is_standalone=False):
capa.ida.helpers.inform_user_ida_ui("capa encountered warnings during analysis")

colorama.init(strip=True)
Expand Down Expand Up @@ -1094,7 +1129,7 @@ def ghidra_main():
meta.analysis.feature_counts = capabilities.feature_counts
meta.analysis.library_functions = capabilities.library_functions

if has_file_limitation(rules, capabilities.matches, is_standalone=False):
if has_static_limitation(rules, capabilities, is_standalone=False):
logger.info("capa encountered warnings during analysis")

print(capa.render.default.render(meta, rules, capabilities.matches))
Expand Down
Loading

0 comments on commit 2c12db2

Please sign in to comment.