-
Notifications
You must be signed in to change notification settings - Fork 45
/
finalize.py
52 lines (41 loc) · 1.86 KB
/
finalize.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
"""Finalize tasks."""
import datetime
from sqlalchemy.exc import SQLAlchemyError
from f8a_worker.utils import json_serial
from f8a_worker.base import BaseTask
from f8a_worker.models import Analysis, PackageAnalysis
class FinalizeTask(BaseTask):
"""Finish EPV analysis flow and store audit."""
def run(self, arguments):
"""Run task."""
self._strict_assert(arguments.get('document_id'))
try:
record = self.storage.session.query(Analysis).\
filter(Analysis.id == arguments['document_id']).one()
record.finished_at = json_serial(datetime.datetime.utcnow())
record.release = '{}:{}:{}'.format(arguments.get('ecosystem'),
arguments.get('name'),
arguments.get('version'))
self.storage.session.commit()
except SQLAlchemyError:
self.storage.session.rollback()
raise
# Commented out for now since we want to sync to S3
# if self.task_name.endswith('Error'):
# raise RuntimeError("Flow %s failed" % self.flow_name)
class PackageFinalizeTask(BaseTask):
"""Finish Package-level flow and store audit."""
def run(self, arguments):
"""Run task."""
self._strict_assert(arguments.get('document_id'))
try:
record = self.storage.session.query(PackageAnalysis).\
filter(PackageAnalysis.id == arguments['document_id']).one()
record.finished_at = json_serial(datetime.datetime.utcnow())
self.storage.session.commit()
except SQLAlchemyError:
self.storage.session.rollback()
raise
# Commented out for now since we want to sync to S3
# if self.task_name.endswith('Error'):
# raise RuntimeError("Flow %s failed" % self.flow_name)