Skip to content

Commit

Permalink
Multiple refactor and code cleanup (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
pnu-s authored and Gitea committed Apr 3, 2019
1 parent 0c11823 commit 0a1fa3e
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 93 deletions.
201 changes: 120 additions & 81 deletions exodus_core/analysis/static_analysis.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
from bs4 import BeautifulSoup
from collections import namedtuple
from cryptography.x509.name import _SENTINEL, ObjectIdentifier, _NAMEOID_DEFAULT_TYPE, _ASN1Type, NameAttribute
from hashlib import sha256
from PIL import Image
from tempfile import NamedTemporaryFile
import binascii
import dhash
import hashlib
import itertools
import json
import logging
import os
import re
import time
import six
import subprocess
import tempfile
import time
import urllib.request
import zipfile
from collections import namedtuple
from hashlib import sha256
from tempfile import NamedTemporaryFile
import itertools

from androguard.core.bytecodes import axml
from androguard.core.bytecodes.apk import APK
Expand All @@ -25,6 +30,7 @@

def which(program):
import os

def is_exe(fpath):
return os.path.isfile(fpath) and os.access(fpath, os.X_OK)

Expand All @@ -40,24 +46,59 @@ def is_exe(fpath):

return None


def get_details_from_gplaycli(handle):
"""
Get the details from gpc like creator, number of downloads, etc
:param handle: application handle
:return: application details dictionary
"""
TIME_BEFORE_RETRY = 2
API_SEARCH_LIMIT = 5
GPLAYCLI_TOKEN_URL = "https://matlink.fr/token/email/gsfid"

gpc = gplaycli.GPlaycli()
gpc.token_enable = True
gpc.token_url = GPLAYCLI_TOKEN_URL
try:
gpc.token, gpc.gsfid = gpc.retrieve_token(force_new=False)
except (ConnectionError, ValueError):
try:
time.sleep(TIME_BEFORE_RETRY)
gpc.token, gpc.gsfid = gpc.retrieve_token(force_new=True)
except (ConnectionError, ValueError) as e:
logging.error(e)
return None
gpc.connect()
objs = gpc.api.search(handle, API_SEARCH_LIMIT)
try:
for obj in objs:
if obj['docId'] == handle:
return obj
return None
except Exception as e:
logging.error('Unable to parse applications details')
logging.error(e)
return None


class Certificate:
def __init__(self, cert):
self.fingerprint = binascii.hexlify(cert.fingerprint(hashes.SHA1())).decode("ascii")
self.issuer = Certificate.get_Name(cert.issuer, short = False)
self.subject = Certificate.get_Name(cert.subject, short = False)
self.fingerprint = binascii.hexlify(
cert.fingerprint(hashes.SHA1())
).decode("ascii")
self.issuer = Certificate.get_Name(cert.issuer, short=False)
self.subject = Certificate.get_Name(cert.subject, short=False)
self.serial = cert.serial_number

@staticmethod
def get_Name(name, short = False):
def get_Name(name, short=False):
"""
Return the distinguished name of an X509 Certificate
:param name: Name object to return the DN from
:param short: Use short form (Default: False)
:type name: :class:`cryptography.x509.Name`
:type short: Boolean
:rtype: str
"""

Expand Down Expand Up @@ -85,7 +126,7 @@ def __str__(self):


class StaticAnalysis:
def __init__(self, apk_path = None):
def __init__(self, apk_path=None):
self.apk = None
self.apk_path = apk_path
self.signatures = None
Expand All @@ -104,7 +145,7 @@ def _compile_signatures(self):
self.compiled_tracker_signature = []
try:
self.compiled_tracker_signature = [re.compile(track.code_signature)
for track in self.signatures]
for track in self.signatures]
except TypeError:
print("self.signatures is not iterable")

Expand Down Expand Up @@ -147,8 +188,12 @@ def get_embedded_classes(self):
cmd = '%s %s/classes*.dex | perl -n -e\'/[A-Z]+((?:\w+\/)+\w+)/ && print "$1\n"\'|sort|uniq' % (
dexdump, tmp_dir)
try:
self.classes = subprocess.check_output(cmd, stderr = subprocess.STDOUT, shell = True,
universal_newlines = True).splitlines()
self.classes = subprocess.check_output(
cmd,
stderr=subprocess.STDOUT,
shell=True,
universal_newlines=True
).splitlines()
logging.debug('%s classes found in %s' % (len(self.classes), self.apk_path))
return self.classes
except subprocess.CalledProcessError:
Expand All @@ -163,7 +208,6 @@ def detect_trackers_in_list(self, class_list):
if self.signatures is None:
self.load_trackers_signatures()


def _detect_tracker(sig, tracker, class_list):
for clazz in class_list:
if sig.search(clazz):
Expand All @@ -173,7 +217,7 @@ def _detect_tracker(sig, tracker, class_list):
results = []
args = [(self.compiled_tracker_signature[index], tracker, class_list)
for (index, tracker) in enumerate(self.signatures) if
len(tracker.code_signature) > 3]
len(tracker.code_signature) > 3]

for res in itertools.starmap(_detect_tracker, args):
if res:
Expand All @@ -183,7 +227,7 @@ def _detect_tracker(sig, tracker, class_list):
logging.debug('%s trackers detected in %s' % (len(trackers), self.apk_path))
return trackers

def detect_trackers(self, class_list_file = None):
def detect_trackers(self, class_list_file=None):
"""
Detect embedded trackers.
:return: list of embedded trackers
Expand Down Expand Up @@ -264,49 +308,39 @@ def get_application_details(self):
if self.app_details is not None:
return self.app_details

gpc = gplaycli.GPlaycli()
gpc.token_enable = True
gpc.token_url = "https://matlink.fr/token/email/gsfid"
try:
gpc.token, gpc.gsfid = gpc.retrieve_token(force_new = False)
except (ConnectionError, ValueError):
try:
time.sleep(2)
gpc.token, gpc.gsfid = gpc.retrieve_token(force_new = True)
except (ConnectionError, ValueError) as e:
logging.error(e)
return None
gpc.connect()
objs = gpc.api.search(self.get_package(), 5)
try:
for obj in objs:
if self.get_package() == obj['docId']:
self.app_details = obj
return self.app_details
return None
except Exception as e:
logging.error('Unable to parse applications details')
logging.error(e)
return None
details = get_details_from_gplaycli(self.get_package())
if details is not None:
self.app_details = details

return details

def _get_icon_from_details(self, path):
"""
Get icon from applications details dictionary
:param path: path where to write the icon file
:return: icon path
:raises FileNotFoundError: if unable to find icon
"""
details = self.get_application_details()
for i in details['images']:
if i['imageType'] == 4:
f = urllib.request.urlopen(i['url'])
with open(path, mode = 'wb') as fp:
fp.write(f.read())
return path
return ''
if details is not None:
for i in details.get('images'):
if i.get('imageType') == 4:
f = urllib.request.urlopen(i.get('url'))
with open(path, mode='wb') as fp:
fp.write(f.read())
if os.path.isfile(path) and os.path.getsize(path) > 0:
return path

raise FileNotFoundError('Unable to download the icon from details')

def _get_icon_from_gplay(self, handle, path):
"""
Download the application icon from Google Play website
:param handle: application handle
:param path: file to be saved
:return: path of the saved icon
:raises FileNotFoundError: if unable to download icon
"""
from bs4 import BeautifulSoup
ICON_CLASS = 'T75of ujDFqe'

address = 'https://play.google.com/store/apps/details?id=%s' % handle
Expand All @@ -320,10 +354,10 @@ def _get_icon_from_gplay(self, handle, path):
f = urllib.request.urlopen(icon_url)
with open(path, mode='wb') as fp:
fp.write(f.read())
return path
if os.path.isfile(path) and os.path.getsize(path) > 0:
return path
else:
logging.error('Unable to download the icon from Google Play')
raise FileNotFoundError('Unable to download the icon')
raise FileNotFoundError('Unable to download the icon from GPlay')

@staticmethod
def _render_drawable_to_png(self, bxml, path):
Expand All @@ -336,7 +370,6 @@ def save_icon(self, path):
:param path: destination path of the icon
:return: destination path of the icon, None in case of error
"""
from PIL import Image
icon = self.get_icon_path()
if icon is None:
return None
Expand All @@ -346,46 +379,54 @@ def save_icon(self, path):
with open(path, 'wb') as f:
f.write(z.read(icon))
_ = Image.open(path)
logging.info('Get icon from APK: success')
return path
except:
logging.warning('Unable to get the icon from the APK - downloading from details')
except Exception:
logging.warning('Unable to get the icon from the APK')
logging.warning('Downloading icon from details')
try:
saved_path = self._get_icon_from_details(path)
if os.path.isfile(path) and os.path.getsize(path) > 0:
logging.debug('Icon downloaded from Google Play')
return saved_path
logging.debug('Icon downloaded from application details')
return saved_path
except Exception as e:
logging.error(e)
logging.warning('Unable to get the icon from details - downloading from GPlay')
logging.warning(e)
logging.warning('Downloading icon from Google Play')
try:
saved_path = self._get_icon_from_gplay(self.get_package(), path)
if os.path.isfile(path) and os.path.getsize(path) > 0:
logging.debug('Icon downloaded from Google Play')
return saved_path
logging.debug('Icon downloaded from Google Play')
return saved_path
except Exception as e:
logging.error(e)
return None

def get_icon_phash(self):
"""
Get the perceptual hash of the icon
:return: the perceptual hash, None in case of error
Get the perceptual hash of the application icon
:return: the perceptual hash, empty string in case of error
"""
import dhash
from PIL import Image
dhash.force_pil() # Force PIL
with NamedTemporaryFile() as ic:
path = self.save_icon(ic.name)
if path is None:
logging.error('Unable to save the icon')
return ''
try:
image = Image.open(ic.name).convert("RGBA")
row, col = dhash.dhash_row_col(image, size = PHASH_SIZE)
return row << (PHASH_SIZE * PHASH_SIZE) | col
except IOError as e:
logging.error(e)
return ''
return self.get_phash(ic.name)

@staticmethod
def get_phash(image_name):
"""
Get the perceptual hash of the given image
:param image_name: name of the image file
:return: the perceptual hash, empty string in case of error
"""
dhash.force_pil() # Force PIL

try:
image = Image.open(image_name).convert("RGBA")
row, col = dhash.dhash_row_col(image, size=PHASH_SIZE)
return row << (PHASH_SIZE * PHASH_SIZE) | col
except IOError as e:
logging.error(e)
return ''

@staticmethod
def get_icon_similarity(phash_origin, phash_candidate):
Expand All @@ -395,7 +436,6 @@ def get_icon_similarity(phash_origin, phash_candidate):
:param phash_candidate: icon to be compared
:return: similarity score [0,1.0]
"""
import dhash
diff = dhash.get_num_bits_different(phash_origin, phash_candidate)
return 1 - 1. * diff / (PHASH_SIZE * PHASH_SIZE * 2)

Expand All @@ -407,9 +447,8 @@ def get_application_universal_id(self):

def get_certificates(self):
certificates = []
import six
from cryptography.x509.name import _SENTINEL, ObjectIdentifier, _NAMEOID_DEFAULT_TYPE, _ASN1Type, NameAttribute
def _my_name_init(self, oid, value, _type = _SENTINEL):

def _my_name_init(self, oid, value, _type=_SENTINEL):
if not isinstance(oid, ObjectIdentifier):
raise TypeError("oid argument must be an ObjectIdentifier instance.")
if not isinstance(value, six.text_type):
Expand Down
24 changes: 12 additions & 12 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@


def which(program):
import os
def is_exe(fpath):
return os.path.isfile(fpath) and os.access(fpath, os.X_OK)

Expand Down Expand Up @@ -49,14 +48,15 @@ def is_exe(fpath):
'androguard==3.1.0'
]

setup(name = 'exodus_core',
version = '1.0.15',
description = 'Core functionality of εxodus',
author = 'Exodus Privacy',
author_email = '[email protected]',
url = 'https://github.com/Exodus-Privacy/exodus-core',
packages = find_packages(exclude = ["*.tests", "*.tests.*", "test*", "tests"]),
install_requires = install_requires,
include_package_data = True,
zip_safe = False,
)
setup(
name='exodus_core',
version='1.0.16',
description='Core functionality of εxodus',
author='Exodus Privacy',
author_email='[email protected]',
url='https://github.com/Exodus-Privacy/exodus-core',
packages=find_packages(exclude=["*.tests", "*.tests.*", "test*", "tests"]),
install_requires=install_requires,
include_package_data=True,
zip_safe=False,
)

0 comments on commit 0a1fa3e

Please sign in to comment.