-
Notifications
You must be signed in to change notification settings - Fork 45
/
init_analysis_flow.py
167 lines (140 loc) · 6.84 KB
/
init_analysis_flow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
"""Initialize package-version level analysis."""
import os
import datetime
import shutil
import re
from selinon import FatalTaskError
from sqlalchemy.orm.exc import NoResultFound
from tempfile import mkdtemp
from f8a_worker.object_cache import ObjectCache
from f8a_worker.base import BaseTask
from f8a_worker.process import IndianaJones, MavenCoordinates
from f8a_worker.models import Analysis, EcosystemBackend, Ecosystem, Version, Package
from f8a_worker.utils import normalize_package_name
from f8a_utils.versions import is_pkg_public
from f8a_worker.errors import NotABugFatalTaskError
pattern = r'[\*Xx\-\>\=\<\~\^\|\/\:\+]'
pattern_ignore = re.compile(pattern)
class InitAnalysisFlow(BaseTask):
"""Download source and start whole analysis."""
def execute(self, arguments):
"""Task code.
:param arguments: dictionary with task arguments
:return: {}, results
"""
self.log.debug("Input Arguments: {}".format(arguments))
self._strict_assert(isinstance(arguments.get('ecosystem'), str))
self._strict_assert(isinstance(arguments.get('name'), str))
self._strict_assert(isinstance(arguments.get('version'), str))
db = self.storage.session
try:
ecosystem = Ecosystem.by_name(db, arguments['ecosystem'])
except NoResultFound:
raise FatalTaskError('Unknown ecosystem: %r' % arguments['ecosystem'])
# make sure we store package name in its normalized form
arguments['name'] = normalize_package_name(ecosystem.backend.name, arguments['name'])
if len(pattern_ignore.findall(arguments['version'])) > 0:
self.log.info("Incorrect version alert {} {}".format(
arguments['name'], arguments['version']))
raise NotABugFatalTaskError("Incorrect version alert")
# Dont try ingestion for private packages
if is_pkg_public(arguments['ecosystem'], arguments['name']):
self.log.info("Ingestion flow for {} {}".format(
arguments['ecosystem'], arguments['name']))
else:
self.log.info("Private package ingestion ignored {} {}".format(
arguments['ecosystem'], arguments['name']))
raise NotABugFatalTaskError("Private package alert")
p = Package.get_or_create(db, ecosystem_id=ecosystem.id, name=arguments['name'])
v = Version.get_or_create(db, package_id=p.id, identifier=arguments['version'])
if not arguments.get('force'):
if db.query(Analysis).filter(Analysis.version_id == v.id).count() > 0:
arguments['analysis_already_exists'] = True
self.log.debug("Arguments returned by initAnalysisFlow without force: {}"
.format(arguments))
return arguments
cache_path = mkdtemp(dir=self.configuration.WORKER_DATA_DIR)
epv_cache = ObjectCache.get_from_dict(arguments)
npm_dir = self.configuration.NPM_DATA_DIR
try:
if not epv_cache.\
has_source_tarball():
_, source_tarball_path = IndianaJones.fetch_artifact(
ecosystem=ecosystem,
artifact=arguments['name'],
version=arguments['version'],
target_dir=cache_path
)
epv_cache.put_source_tarball(source_tarball_path)
if ecosystem.is_backed_by(EcosystemBackend.maven):
if not epv_cache.has_source_jar():
try:
source_jar_path = self._download_source_jar(cache_path, ecosystem,
arguments)
epv_cache.put_source_jar(source_jar_path)
except Exception as e:
self.log.info(
'Failed to fetch source jar for maven artifact "{n}/{v}": {err}'.
format(n=arguments.get('name'),
v=arguments.get('version'),
err=str(e))
)
if not epv_cache.has_pom_xml():
pom_xml_path = self._download_pom_xml(cache_path, ecosystem, arguments)
epv_cache.put_pom_xml(pom_xml_path)
finally:
# always clean up cache
shutil.rmtree(cache_path)
if arguments['ecosystem'] == "npm":
shutil.rmtree(npm_dir, True)
a = Analysis(version=v, access_count=1, started_at=datetime.datetime.utcnow())
db.add(a)
db.commit()
arguments['document_id'] = a.id
# export ecosystem backend so we can use it to easily control flow later
arguments['ecosystem_backend'] = ecosystem.backend.name
self.log.debug("Arguments returned by InitAnalysisFlow are: {}".format(arguments))
return arguments
@staticmethod
def _download_source_jar(target, ecosystem, arguments):
"""Download sources jar."""
artifact_coords = MavenCoordinates.from_str(arguments['name'])
artifact_coords.packaging = 'jar' # source is always jar even for war/aar etc.
sources_classifiers = ['sources', 'src']
if artifact_coords.classifier not in sources_classifiers:
for sources_classifier in sources_classifiers:
artifact_coords.classifier = sources_classifier
try:
_, source_jar_path = IndianaJones.fetch_artifact(
ecosystem=ecosystem,
artifact=artifact_coords.to_str(omit_version=True),
version=arguments['version'],
target_dir=target
)
except Exception:
if sources_classifier == sources_classifiers[-1]:
# fetching of all variants failed
raise
else:
return source_jar_path
@staticmethod
def _download_pom_xml(target, ecosystem, arguments):
"""Download pom.xml."""
artifact_coords = MavenCoordinates.from_str(arguments['name'])
artifact_coords.packaging = 'pom'
artifact_coords.classifier = '' # pom.xml files have no classifiers
IndianaJones.fetch_artifact(
ecosystem=ecosystem,
artifact=artifact_coords.to_str(omit_version=True),
version=arguments['version'],
target_dir=target
)
# pom has to be named precisely pom.xml, otherwise mercator's Java handler
# which uses maven as subprocess won't see it
pom_xml_path = os.path.join(target, 'pom.xml')
os.rename(
os.path.join(target,
'{}-{}.pom'.format(artifact_coords.artifactId, arguments['version'])),
pom_xml_path
)
return pom_xml_path