Skip to content

Commit

Permalink
improve parsing of bound & delay imports
Browse files Browse the repository at this point in the history
  • Loading branch information
huettenhain committed Nov 19, 2024
1 parent c6db338 commit d64b95f
Showing 1 changed file with 62 additions and 26 deletions.
88 changes: 62 additions & 26 deletions refinery/units/formats/pe/pemeta.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from contextlib import suppress
from importlib import resources
from datetime import datetime, timezone
from datetime import timedelta
from dataclasses import dataclass
from enum import Enum

Expand All @@ -23,6 +23,7 @@
from refinery.units import Arg, Unit
from refinery.units.sinks.ppjson import ppjson
from refinery.units.formats.pe import get_pe_size
from refinery.lib.tools import date_from_timestamp

from refinery import data

Expand Down Expand Up @@ -151,11 +152,11 @@ def __init__(
debug : Arg.Switch('-D', help='Parse the PDB path from the debug directory.') = False,
dotnet : Arg.Switch('-N', help='Parse the .NET header.') = False,
signatures : Arg.Switch('-S', help='Parse digital signatures.') = False,
timestamps : Arg.Switch('-T', help='Extract time stamps.') = False,
timestamps : Arg.Counts('-T', help='Extract time stamps. Specify twice for more detail.') = 0,
version : Arg.Switch('-V', help='Parse the VERSION resource.') = False,
header : Arg.Switch('-H', help='Parse base data from the PE header.') = False,
exports : Arg.Counts('-E', help='List all exported functions. Specify twice to include addresses.') = 0,
imports : Arg.Switch('-I', help='List all imported functions') = False,
imports : Arg.Counts('-I', help='List all imported functions. Specify twice to include addresses.') = 0,
tabular : Arg.Switch('-t', help='Print information in a table rather than as JSON') = False,
timeraw : Arg.Switch('-r', help='Extract time stamps as numbers instead of human-readable format.') = False,
):
Expand Down Expand Up @@ -358,25 +359,37 @@ def parse_exports(self, pe: PE, data=None, include_addresses=False) -> list:
info.append(item)
return info

def parse_imports(self, pe: PE, data=None) -> list:
def parse_imports(self, pe: PE, data=None, include_addresses=False) -> list:
info = {}
dirs = []
for name in [
'DIRECTORY_ENTRY_IMPORT',
'DIRECTORY_ENTRY_DELAY_IMPORT',
'DIRECTORY_ENTRY_BOUND_IMPORT',
]:
pe.parse_data_directories(directories=[DIRECTORY_ENTRY[F'IMAGE_{name}']])
with suppress(AttributeError):
dirs.append(getattr(pe, name))
self.log_warn(dirs)
for idd in itertools.chain(*dirs):
dll = idd.dll.decode('ascii')
dll: bytes = idd.dll
dll = dll.decode('ascii')
if dll.lower().endswith('.dll'):
dll = dll[:-4]
imports = info.setdefault(dll, [])
for imp in idd.imports:
name = imp.name and imp.name.decode('ascii') or F'@{imp.ordinal}'
imports.append(name)
dll = dll[:~3]
imports: list[str] = info.setdefault(dll, [])
with suppress(AttributeError):
symbols = idd.imports
with suppress(AttributeError):
symbols = idd.entries
try:
for imp in symbols:
name: bytes = imp.name
name = name and name.decode('ascii') or F'@{imp.ordinal}'
if not include_addresses:
imports.append(name)
else:
imports.append(dict(Name=name, Address=self._vint(pe, imp.address)))
except Exception as e:
self.log_warn(F'error parsing {name}: {e!s}')
return info

def parse_header(self, pe: PE, data=None) -> dict:
Expand Down Expand Up @@ -456,7 +469,7 @@ def format_macro_name(name: str, prefix, convert=True):
header_information['EntryPoint'] = self._vint(pe, pe.OPTIONAL_HEADER.AddressOfEntryPoint + base)
return header_information

def parse_time_stamps(self, pe: PE, raw_time_stamps: bool) -> dict:
def parse_time_stamps(self, pe: PE, raw_time_stamps: bool, more_detail: bool) -> dict:
"""
Extracts time stamps from the PE header (link time), as well as from the imports,
exports, debug, and resource directory. The resource time stamp is also parsed as
Expand All @@ -465,16 +478,13 @@ def parse_time_stamps(self, pe: PE, raw_time_stamps: bool) -> dict:
if raw_time_stamps:
def dt(ts): return ts
else:
def dt(ts):
# parse as UTC but then forget time zone information
return datetime.fromtimestamp(
ts,
tz=timezone.utc
).replace(tzinfo=None)
dt = date_from_timestamp

pe.parse_data_directories(directories=[
DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_IMPORT'],
DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_EXPORT'],
DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_BOUND_IMPORT'],
DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_DELAY_IMPORT'],
DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_DEBUG'],
DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_RESOURCE']
])
Expand All @@ -484,13 +494,35 @@ def dt(ts):
with suppress(AttributeError):
info.update(Linker=dt(pe.FILE_HEADER.TimeDateStamp))

with suppress(AttributeError):
for entry in pe.DIRECTORY_ENTRY_IMPORT:
info.update(Import=dt(entry.TimeDateStamp()))

with suppress(AttributeError):
for entry in pe.DIRECTORY_ENTRY_DEBUG:
info.update(DbgDir=dt(entry.struct.TimeDateStamp))
for dir_name, _dll, info_key in [
('DIRECTORY_ENTRY_IMPORT', 'dll', 'Import'), # noqa
('DIRECTORY_ENTRY_DELAY_IMPORT', 'dll', 'Symbol'), # noqa
('DIRECTORY_ENTRY_BOUND_IMPORT', 'name', 'Module'), # noqa
]:
impts = {}
for entry in getattr(pe, dir_name, []):
ts = 0
with suppress(AttributeError):
ts = entry.struct.dwTimeDateStamp
with suppress(AttributeError):
ts = entry.struct.TimeDateStamp
if ts == 0 or ts == 0xFFFFFFFF:
continue
name = getattr(entry, _dll, B'').decode()
if name.lower().endswith('.dll'):
name = name[:-4]
impts[name] = dt(ts)
if not impts:
continue
if not more_detail:
dmin = min(impts.values())
dmax = max(impts.values())
small_delta = 2 * 60 * 60
if not raw_time_stamps:
small_delta = timedelta(seconds=small_delta)
if dmax - dmin < small_delta:
impts = dmin
info[info_key] = impts

with suppress(AttributeError):
Export = pe.DIRECTORY_ENTRY_EXPORT.struct.TimeDateStamp
Expand All @@ -506,6 +538,10 @@ def dt(ts):
info.update(RsrcTS=dt(res_timestamp))

def norm(value):
if isinstance(value, list):
return [norm(v) for v in value]
if isinstance(value, dict):
return {k: norm(v) for k, v in value.items()}
if isinstance(value, int):
return value
return str(value)
Expand Down Expand Up @@ -602,7 +638,7 @@ def process(self, data):
signature = self.parse_signature(next(data | pesig))

if self.args.timestamps:
ts = self.parse_time_stamps(pe, self.args.timeraw)
ts = self.parse_time_stamps(pe, self.args.timeraw, self.args.timestamps > 1)
with suppress(KeyError):
ts.update(Signed=signature['Timestamp'])
result.update(TimeStamp=ts)
Expand Down

0 comments on commit d64b95f

Please sign in to comment.