-
-
Notifications
You must be signed in to change notification settings - Fork 486
/
Copy pathclasses.py
599 lines (516 loc) · 20.6 KB
/
classes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
# This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl
# See the file 'LICENSE' for copying permission.
import json
import logging
import time
from abc import ABCMeta
from pathlib import PosixPath
from typing import Dict, Tuple
import requests
from django.conf import settings
from django.core.files.base import ContentFile
from certego_saas.apps.user.models import User
from tests.mock_utils import MockUpResponse, if_mock_connections, patch
from ..choices import Classification, PythonModuleBasePaths
from ..classes import Plugin
from ..helpers import calculate_sha256
from ..models import PythonConfig
from .constants import HashChoices, TypeChoices
from .exceptions import AnalyzerConfigurationException, AnalyzerRunException
from .models import AnalyzerConfig, AnalyzerReport, AnalyzerSourceFile
logger = logging.getLogger(__name__)
class BaseAnalyzerMixin(Plugin, metaclass=ABCMeta):
"""
Abstract Base class for Analyzers.
Never inherit from this branch,
always use either one of ObservableAnalyzer or FileAnalyzer classes.
"""
HashChoices = HashChoices
TypeChoices = TypeChoices
MALICIOUS_EVALUATION = 75
SUSPICIOUS_EVALUATION = 35
FALSE_POSITIVE = -50
@classmethod
def update_support_model(cls, file_name):
pass
@classmethod
def update_source_file(cls, request_data: Dict, file_name) -> bool:
# check if file is updated
logger.info(
f"Source file update started with request data {request_data}, file name {file_name} and python module {cls.python_module}"
)
update = False
response = requests.get(**request_data)
response.raise_for_status()
cfile = ContentFile(response.content, name=file_name)
sha_res = calculate_sha256(response.content)
source_file = AnalyzerSourceFile.objects.filter(
file_name=file_name, python_module=cls.python_module
).first()
# check if source file exists
if source_file:
logger.info(f"Found source file {source_file}")
# check if source file needs to be updated
if source_file.sha256 != sha_res:
logger.info("About to update source file")
source_file.file.delete()
source_file.file = cfile
source_file.sha256 = sha_res
source_file.save()
update = True
else:
logger.info(
f"About to create new source file with file name {file_name} and python module {cls.python_module}"
)
AnalyzerSourceFile.objects.create(
file_name=file_name,
python_module=cls.python_module,
file=cfile,
sha256=sha_res,
)
update = True
return update
@classmethod
def update_internal_data(cls, request_data: Dict, file_name) -> bool:
update = cls.update_source_file(request_data, file_name)
if update:
cls.update_support_model(file_name)
return update
def threat_to_evaluation(self, threat_level):
# MAGIC NUMBERS HERE!!!
# I know, it should be 25-50-75-100. We raised it a bit because too many false positives were generated
self.report: AnalyzerReport
if threat_level >= self.MALICIOUS_EVALUATION:
evaluation = self.report.data_model_class.EVALUATIONS.MALICIOUS.value
elif threat_level >= self.SUSPICIOUS_EVALUATION:
evaluation = self.report.data_model_class.EVALUATIONS.SUSPICIOUS.value
elif threat_level <= self.FALSE_POSITIVE:
evaluation = self.report.data_model_class.EVALUATIONS.TRUSTED.value
else:
evaluation = self.report.data_model_class.EVALUATIONS.CLEAN.value
return evaluation
def _do_create_data_model(self) -> bool:
if self.report.job.analyzable.classification == Classification.GENERIC.value:
return False
if (
not self._config.mapping_data_model
and self.__class__._create_data_model_mtm
== BaseAnalyzerMixin._create_data_model_mtm
and self.__class__._update_data_model
== BaseAnalyzerMixin._update_data_model
):
return False
return True
def _create_data_model_mtm(self):
return {}
def _update_data_model(self, data_model) -> None:
mtm = self._create_data_model_mtm()
for field_name, value in mtm.items():
field = getattr(data_model, field_name)
field.add(*value)
def create_data_model(self):
self.report: AnalyzerReport
if self._do_create_data_model():
data_model = self.report.create_data_model()
if data_model:
self._update_data_model(data_model)
data_model.save()
return data_model
return None
@classmethod
@property
def config_exception(cls):
"""Returns the AnalyzerConfigurationException class."""
return AnalyzerConfigurationException
@property
def analyzer_name(self) -> str:
"""Returns the name of the analyzer."""
return self._config.name
@classmethod
@property
def report_model(cls):
"""Returns the AnalyzerReport model."""
return AnalyzerReport
@classmethod
@property
def config_model(cls):
"""Returns the AnalyzerConfig model."""
return AnalyzerConfig
def get_exceptions_to_catch(self):
"""
Returns additional exceptions to catch when running *start* fn
"""
return (
AnalyzerConfigurationException,
AnalyzerRunException,
)
def _validate_result(self, result, level=0, max_recursion=190):
"""
function to validate result, allowing to store inside postgres without errors.
If the character \u0000 is present in the string, postgres will throw an error
If an integer is bigger than max_int,
Mongodb is not capable to store and will throw an error.
If we have more than 200 recursion levels, every encoding
will throw a maximum_nested_object exception
"""
if level == max_recursion:
logger.info(
f"We have reached max_recursion {max_recursion} level. "
f"The following object will be pruned {result} "
)
return None
if isinstance(result, dict):
for key, values in result.items():
result[key] = self._validate_result(
values, level=level + 1, max_recursion=max_recursion
)
elif isinstance(result, list):
for i, _ in enumerate(result):
result[i] = self._validate_result(
result[i], level=level + 1, max_recursion=max_recursion
)
elif isinstance(result, str):
return result.replace("\u0000", "")
elif isinstance(result, int) and result > 9223372036854775807: # max int 8bytes
result = 9223372036854775807
return result
def after_run_success(self, content):
"""
Handles actions after a successful run.
Args:
content (any): The content to process after a successful run.
"""
super().after_run_success(self._validate_result(content, max_recursion=15))
try:
self.create_data_model()
except Exception as e:
logger.exception(e)
self._job.errors.append(
f"Data model creation failed for {self._config.name}"
)
class ObservableAnalyzer(BaseAnalyzerMixin, metaclass=ABCMeta):
"""
Abstract class for Observable Analyzers.
Inherit from this branch when defining a IP, URL or domain analyzer.
Need to overrwrite `set_params(self, params)`
and `run(self)` functions.
"""
observable_name: str
observable_classification: str
def __init__(
self,
config: PythonConfig,
**kwargs,
):
super().__init__(config, **kwargs)
def config(self, runtime_configuration: Dict):
super().config(runtime_configuration)
self._config: AnalyzerConfig
if self._job.is_sample and self._config.run_hash:
self.observable_classification = Classification.HASH
# check which kind of hash the analyzer needs
run_hash_type = self._config.run_hash_type
if run_hash_type == HashChoices.SHA256:
self.observable_name = self._job.analyzable.sha256
else:
self.observable_name = self._job.analyzable.md5
else:
self.observable_name = self._job.analyzable.name
self.observable_classification = self._job.analyzable.classification
@classmethod
@property
def python_base_path(cls):
return PythonModuleBasePaths.ObservableAnalyzer.value
def before_run(self):
super().before_run()
logger.info(
f"STARTED analyzer: {self.__repr__()} -> "
f"Observable: {self.observable_name}."
)
def after_run(self):
super().after_run()
logger.info(
f"FINISHED analyzer: {self.__repr__()} -> "
f"Observable: {self.observable_name}."
)
class FileAnalyzer(BaseAnalyzerMixin, metaclass=ABCMeta):
"""
Abstract class for File Analyzers.
Inherit from this branch when defining a file analyzer.
Need to overrwrite `set_params(self, params)`
and `run(self)` functions.
"""
md5: str
filename: str
file_mimetype: str
def __init__(
self,
config: PythonConfig,
**kwargs,
):
super().__init__(config, **kwargs)
def config(self, runtime_configuration: Dict):
super().config(runtime_configuration)
self.md5 = self._job.analyzable.md5
self.filename = self._job.analyzable.name
# this is updated in the filepath property, like a cache decorator.
# if the filepath is requested, it means that the analyzer downloads...
# ...the file from AWS because it requires a path and it needs to be deleted
self.__filepath = None
self.file_mimetype = self._job.analyzable.mimetype
@classmethod
@property
def python_base_path(cls) -> PosixPath:
return PythonModuleBasePaths[FileAnalyzer.__name__].value
def read_file_bytes(self) -> bytes:
return self._job.analyzable.read()
@property
def filepath(self) -> str:
"""Returns the file path, retrieving the file from storage if necessary.
Returns:
str: The file path.
"""
if not self.__filepath:
self.__filepath = self._job.analyzable.file.storage.retrieve(
file=self._job.analyzable.file, analyzer=self.analyzer_name
)
return self.__filepath
def before_run(self):
super().before_run()
logger.info(
f"STARTED analyzer: {self.__repr__()} -> "
f"File: ({self.filename}, md5: {self.md5})"
)
def after_run(self):
super().after_run()
# We delete the file only if we have single copy for analyzer
# and the file has been saved locally.
# Otherwise we would remove the single file that we have on the server
if not settings.LOCAL_STORAGE and self.filepath is not None:
import os
try:
os.remove(self.filepath)
except OSError:
logger.warning(f"Filepath {self.filepath} does not exists")
logger.info(
f"FINISHED analyzer: {self.__repr__()} -> "
f"File: ({self.filename}, md5: {self.md5})"
)
class DockerBasedAnalyzer(BaseAnalyzerMixin, metaclass=ABCMeta):
"""
Abstract class for a docker based analyzer (integration).
Inherit this branch along with either
one of ``ObservableAnalyzer`` or ``FileAnalyzer``
when defining a docker based analyzer.
See `peframe.py` for example.
:param name: str
The name of the analyzer service as defined in compose file
and log directory
:param max_tries: int
maximum no. of tries when HTTP polling for result.
:param poll_distance: int
interval between HTTP polling.
"""
name: str
url: str
max_tries: int
poll_distance: int
key_not_found_max_retries: int = 10
@staticmethod
def __raise_in_case_bad_request(name, resp, params_to_check=None) -> bool:
"""
Raises:
:class: `AnalyzerRunException`, if bad status code or no key in response
"""
if params_to_check is None:
params_to_check = ["key"]
# different error messages for different cases
if resp.status_code == 404:
raise AnalyzerConfigurationException(
f"{name} docker container is not running."
)
if resp.status_code == 400:
err = resp.json().get("error", "")
raise AnalyzerRunException(err)
if resp.status_code == 500:
raise AnalyzerRunException(
f"Internal Server Error in {name} docker container"
)
# check to make sure there was a valid params in response
for param in params_to_check:
param_value = resp.json().get(param, None)
if not param_value:
raise AnalyzerRunException(
"Unexpected Error. "
f"Please check log files under /var/log/intel_owl/{name.lower()}/"
)
# just in case couldn't catch the error manually
resp.raise_for_status()
return True
@staticmethod
def __query_for_result(url: str, key: str) -> Tuple[int, dict]:
headers = {"Accept": "application/json"}
resp = requests.get(f"{url}?key={key}", headers=headers)
return resp.status_code, resp.json()
def __polling(self, req_key: str, chance: int, re_poll_try: int = 0):
try:
status_code, json_data = self.__query_for_result(self.url, req_key)
except (requests.RequestException, json.JSONDecodeError) as e:
raise AnalyzerRunException(e)
if status_code == 404:
# This happens when they key does not exist.
# This is possible in case IntelOwl is deployed as a Swarm.
# The previous POST request that created the analysis ...
# ...could have been sent to a different container in another machine.
# so we need to try again and find the server with the key
logger.info(
"Polling again because received a 404."
f" Try #{chance + 1}. Re-Poll try {re_poll_try}. Starting the query..."
f"<-- {self.__repr__()}"
)
if self.key_not_found_max_retries == re_poll_try:
raise AnalyzerRunException(
f"not found key {req_key} in any server after maximum retries"
)
return self.__polling(req_key, chance, re_poll_try=re_poll_try + 1)
else:
status = json_data.get("status", None)
if status and status == self._job.STATUSES.RUNNING.value:
logger.info(
f"Poll number #{chance + 1}, "
f"status: 'running' <-- {self.__repr__()}"
)
else:
return True, json_data
return False, json_data
def __poll_for_result(self, req_key: str) -> dict:
got_result = False
json_data = {}
for chance in range(self.max_tries):
time.sleep(self.poll_distance)
logger.info(
f"Result Polling. Try #{chance + 1}. Starting the query..."
f"<-- {self.__repr__()}"
)
got_result, json_data = self.__polling(req_key, chance)
if got_result:
break
if not got_result:
raise AnalyzerRunException("max polls tried without getting any result.")
return json_data
def _raise_container_not_running(self) -> None:
raise AnalyzerConfigurationException(
f"{self.name} docker container is not running.\n"
"You have to enable it using the appropriate "
"parameter when executing ./start."
)
def _docker_run(
self, req_data: dict, req_files: dict = None, analyzer_name: str = None
) -> dict:
"""
Helper function that takes of care of requesting new analysis,
reading response, polling for result and exception handling for a
docker based analyzer.
Args:
req_data (Dict): Dict of request JSON.
req_files (Dict, optional): Dict of files to send. Defaults to None.
analyzer_name: optional, could be used for edge cases
Raises:
AnalyzerConfigurationException: In case docker service is not running
AnalyzerRunException: Any other error
Returns:
Dict: Final analysis results
"""
# step #1: request new analysis
req_data = {**req_data, "force_unique_key": True}
args = req_data.get("args", [])
logger.debug(f"Making request with arguments: {args} <- {self.__repr__()}")
try:
if req_files:
form_data = {"request_json": json.dumps(req_data)}
resp1 = requests.post(self.url, files=req_files, data=form_data)
else:
resp1 = requests.post(self.url, json=req_data)
except requests.exceptions.ConnectionError:
self._raise_container_not_running()
# step #2: raise AnalyzerRunException in case of error
if not self.__raise_in_case_bad_request(self.name, resp1):
raise AssertionError
# step #3: if no error, continue and try to fetch result
key = resp1.json().get("key")
final_resp = self.__poll_for_result(key)
err = final_resp.get("error", None)
report = final_resp.get("report", None)
# APKiD provides empty result in case it does not support the binary type
if not report and (analyzer_name != "APKiD"):
raise AnalyzerRunException(f"Report is empty. Reason: {err}")
if isinstance(report, dict):
return report
try:
report = json.loads(report)
except json.JSONDecodeError:
# because report may also be a str only. Example: clamav.
pass
return report
def _docker_get(self):
"""
Raises:
AnalyzerConfigurationException: In case docker service is not running
AnalyzerRunException: Any other error
Returns:
Response: Response object of request
"""
# step #1: request new analysis
try:
resp = requests.get(url=self.url)
except requests.exceptions.ConnectionError:
self._raise_container_not_running()
# step #2: raise AnalyzerRunException in case of error
if not self.__raise_in_case_bad_request(self.name, resp, params_to_check=[]):
raise AssertionError
return resp
@staticmethod
def mocked_docker_analyzer_get(*args, **kwargs):
return MockUpResponse(
{"key": "test", "returncode": 0, "report": {"test": "This is a test."}}, 200
)
@staticmethod
def mocked_docker_analyzer_post(*args, **kwargs):
return MockUpResponse({"key": "test", "status": "running"}, 202)
def _monkeypatch(self, patches: list = None):
"""
Here, `_monkeypatch` is an instance method and not a class method.
This is because when defined with `@classmethod`, we were getting the error
```
'_patch' object has no attribute 'is_local'
```
whenever multiple analyzers with same parent class were being called.
"""
if patches is None:
patches = []
# no need to sleep during tests
self.poll_distance = 0
patches.append(
if_mock_connections(
patch(
"requests.get",
side_effect=self.mocked_docker_analyzer_get,
),
patch(
"requests.post",
side_effect=self.mocked_docker_analyzer_post,
),
)
)
return super()._monkeypatch(patches)
def health_check(self, user: User = None) -> bool:
"""
basic health check: if instance is up or not (timeout - 10s)
"""
try:
requests.head(self.url, timeout=10)
except requests.exceptions.RequestException:
health_status = False
else:
health_status = True
return health_status