diff --git a/tests/test_management_commands.py b/tests/test_management_commands.py index 5c443f8b..fb561be0 100644 --- a/tests/test_management_commands.py +++ b/tests/test_management_commands.py @@ -13,3 +13,7 @@ def test_update_permissions(): # TODO: Add cleanup for both Permission and User models # TODO: Add assertation ManagementCommands(args=['update_permissions']) + +def test_load_load_diagrams(): + ManagementCommands(args=['load_diagrams']) + #ManagementCommands(args=['load_diagrams', '--wf_path', './diagrams/multi_user2.bpmn']) diff --git a/zengine/current.py b/zengine/current.py index 8d6dc6e6..a9640c87 100644 --- a/zengine/current.py +++ b/zengine/current.py @@ -23,8 +23,8 @@ from zengine import signals from zengine.client_queue import ClientQueue from zengine.config import settings -from zengine.lib.cache import WFCache from zengine.log import log +from zengine.models import WFCache DEFAULT_LANE_CHANGE_MSG = { 'title': settings.MESSAGES['lane_change_message_title'], @@ -32,6 +32,8 @@ } + + class Current(object): """ This object holds the whole state of the app for passing to view methods (views/tasks) @@ -186,7 +188,7 @@ def __init__(self, **kwargs): self.new_token = True # log.info("TOKEN NEW: %s " % self.token) - self.wfcache = WFCache(self.token) + self.wfcache = WFCache(self.token, self.session.sess_id) # log.debug("\n\nWF_CACHE: %s" % self.wfcache.get()) self.set_client_cmds() diff --git a/zengine/engine.py b/zengine/engine.py index fa2a5487..78dd180d 100644 --- a/zengine/engine.py +++ b/zengine/engine.py @@ -29,12 +29,13 @@ from zengine.auth.permissions import PERM_REQ_TASK_TYPES from zengine.config import settings from zengine.current import WFCurrent -from zengine.lib.camunda_parser import InMemoryPackager +from zengine.lib.camunda_parser import InMemoryPackager, ZopsSerializer from zengine.lib.exceptions import HTTPError from zengine.log import log # crud_view = CrudView() +from zengine.models import BPMNWorkflow class ZEngine(object): @@ -175,7 +176,7 @@ def load_or_create_workflow(self): Tries to load the previously serialized (and saved) workflow Creates a new one if it can't """ - self.workflow_spec = self.get_worfklow_spec() + self.workflow_spec = self.get_worfklow_spec return self._load_workflow() or self.create_workflow() # self.current.update(workflow=self.workflow) @@ -195,6 +196,7 @@ def find_workflow_path(self): log.error(err_msg) raise RuntimeError(err_msg) + @property def get_worfklow_spec(self): """ Generates and caches the workflow spec package from @@ -205,9 +207,14 @@ def get_worfklow_spec(self): """ # TODO: convert from in-process to redis based caching if self.current.workflow_name not in self.workflow_spec_cache: - path = self.find_workflow_path() - spec_package = InMemoryPackager.package_in_memory(self.current.workflow_name, path) - spec = BpmnSerializer().deserialize_workflow_spec(spec_package) + # path = self.find_workflow_path() + # spec_package = InMemoryPackager.package_in_memory(self.current.workflow_name, path) + # spec = BpmnSerializer().deserialize_workflow_spec(spec_package) + + self.current.wf_object = BPMNWorkflow.objects.get(name=self.current.workflow_name) + xml_content = self.current.wf_object.xml.body + spec = ZopsSerializer().deserialize_workflow_spec(xml_content, self.current.workflow_name) + self.workflow_spec_cache[self.current.workflow_name] = spec return self.workflow_spec_cache[self.current.workflow_name] @@ -281,7 +288,7 @@ def switch_from_external_to_main_wf(self): if self.wf_cache['in_external'] and self.current.task_type == 'Simple' and self.current.task_name == 'End': main_wf = self.wf_cache['main_wf'] self.current.workflow_name = main_wf['wf_name'] - self.workflow_spec = self.get_worfklow_spec() + self.workflow_spec = self.get_worfklow_spec self.workflow = self.deserialize_workflow(main_wf['wf_state']) self.current.workflow = self.workflow self.wf_cache['in_external'] = False @@ -299,7 +306,7 @@ def switch_to_external_wf(self): main_wf = self.wf_cache.copy() external_wf_name = self.current.task.task_spec.topic self.current.workflow_name = external_wf_name - self.workflow_spec = self.get_worfklow_spec() + self.workflow_spec = self.get_worfklow_spec self.workflow = self.create_workflow() self.current.workflow = self.workflow self.check_for_authentication() diff --git a/zengine/lib/cache.py b/zengine/lib/cache.py index fd16b3dc..04256f22 100644 --- a/zengine/lib/cache.py +++ b/zengine/lib/cache.py @@ -267,17 +267,6 @@ def is_alive(self): return time.time() - float(self.get(0.0)) < self.SESSION_EXPIRE_TIME -class WFCache(Cache): - """ - Cache object for workflow instances. - - Args: - wf_token: Token of the workflow instance. - """ - PREFIX = 'WF' - - def __init__(self, wf_token): - super(WFCache, self).__init__(wf_token) class ClearCache(Cache): diff --git a/zengine/lib/camunda_parser.py b/zengine/lib/camunda_parser.py index 1317aae2..86be5984 100644 --- a/zengine/lib/camunda_parser.py +++ b/zengine/lib/camunda_parser.py @@ -8,10 +8,14 @@ # # This file is licensed under the GNU General Public License v3 # (GPLv3). See LICENSE.txt for details. +from StringIO import StringIO + from SpiffWorkflow.bpmn.parser.util import full_attr, BPMN_MODEL_NS, ATTRIBUTE_NS +from SpiffWorkflow.bpmn.storage.BpmnSerializer import BpmnSerializer from SpiffWorkflow.bpmn.storage.Packager import Packager +from SpiffWorkflow.storage.Serializer import Serializer from six import BytesIO - +import xml.etree.ElementTree as ET __author__ = "Evren Esat Ozkan" from SpiffWorkflow.bpmn.parser.BpmnParser import BpmnParser @@ -185,6 +189,19 @@ def _parse_list(cls, elm): def _parse_script(cls, elm): return elm.get('scriptFormat'), elm.text +class ZopsSerializer(Serializer): + """ + Deserialize direct XML -> Spec + """ + + def deserialize_workflow_spec(self, xml_content, filename): + + parser = CamundaBMPNParser() + bpmn = ET.parse(BytesIO(xml_content)) + parser.add_bpmn_xml(bpmn, svg=None, filename='%s' % filename) + return parser.get_spec(filename) + + class InMemoryPackager(Packager): """ Creates spiff's wf packages on the fly. diff --git a/zengine/management_commands.py b/zengine/management_commands.py index df0aec0c..b7ed312d 100644 --- a/zengine/management_commands.py +++ b/zengine/management_commands.py @@ -6,9 +6,11 @@ # # This file is licensed under the GNU General Public License v3 # (GPLv3). See LICENSE.txt for details. +import glob + import six -from pyoko.db.adapter.db_riak import BlockSave +from pyoko.db.adapter.db_riak import BlockSave, BlockDelete from pyoko.exceptions import ObjectDoesNotExist from pyoko.lib.utils import get_object_from_path from pyoko.manage import * @@ -194,8 +196,6 @@ def run(self): worker.run() - - class PrepareMQ(Command): """ Creates necessary exchanges, queues and bindings @@ -236,12 +236,88 @@ class LoadDiagrams(Command): """ Loads wf diagrams from disk to DB """ + CMD_NAME = 'load_diagrams' HELP = 'Loads workflow diagrams from diagrams folder to DB' + PARAMS = [ + {'name': 'wf_path', 'default': None, 'help': 'Only update given BPMN diagram'}, + {'name': 'clear', 'action': 'store_true', 'help': 'Clear all task related models'}, + {'name': 'force', 'action': 'store_true', 'help': 'Load BPMN file even if there are active WFInstances exists.'}, + ] def run(self): - self.get_workflows() + """ + read workflows, checks if it's updated, + tries to update if there aren't any running instances of that wf + """ + from zengine.models.workflow_manager import DiagramXML, BPMNWorkflow, RunningInstancesExist + if self.manager.args.clear: + self._clear_models() + return + + if self.manager.args.wf_path: + paths = self.get_wf_from_path(self.manager.args.wf_path) + else: + paths = self.get_workflows() + count = 0 + for wf_name, content in paths: + wf, wf_is_new = BPMNWorkflow.objects.get_or_create(name=wf_name) + content = self._tmp_fix_diagram(content) + diagram, diagram_is_updated = DiagramXML.get_or_create_by_content(wf_name, content) + if wf_is_new or diagram_is_updated: + count += 1 + print("%s created or updated" % wf_name.upper()) + try: + wf.set_xml(diagram, self.manager.args.force) + except RunningInstancesExist as e: + print(e.message) + print("Give \"--force\" parameter to enforce") + print("%s BPMN file loaded" % count) + + + def _clear_models(self): + from zengine.models.workflow_manager import DiagramXML, BPMNWorkflow, WFInstance + print("Workflow related models will be cleared") + c = len(DiagramXML.objects.delete()) + print("%s DiagramXML object deleted" % c) + c = len(BPMNWorkflow.objects.delete()) + print("%s BPMNWorkflow object deleted" % c) + c = len(WFInstance.objects.delete()) + print("%s WFInstance object deleted" % c) + + def _tmp_fix_diagram(self, content): + # Temporary solution for easier transition from old to new xml format + # TODO: Will be removed after all diagrams converted. + return content.replace( + 'targetNamespace="http://activiti.org/bpmn"', + 'targetNamespace="http://bpmn.io/schema/bpmn"' + ).replace( + 'xmlns:camunda="http://activiti.org/bpmn"', + 'xmlns:camunda="http://camunda.org/schema/1.0/bpmn"' + ) + + def get_wf_from_path(self, path): + """ + load xml from given path + Args: + path: diagram path + + Returns: + + """ + with open(path) as fp: + content = fp.read() + return [(os.path.basename(os.path.splitext(path)[0]), content), ] def get_workflows(self): - pass + """ + Scans and loads all wf found under WORKFLOW_PACKAGES_PATHS + + Yields: XML content of diagram file + + """ + for pth in settings.WORKFLOW_PACKAGES_PATHS: + for f in glob.glob("%s/*.bpmn" % pth): + with open(f) as fp: + yield os.path.basename(os.path.splitext(f)[0]), fp.read() diff --git a/zengine/models/__init__.py b/zengine/models/__init__.py index 6ac86ac8..0e755f15 100644 --- a/zengine/models/__init__.py +++ b/zengine/models/__init__.py @@ -7,4 +7,5 @@ # This file is licensed under the GNU General Public License v3 # (GPLv3). See LICENSE.txt for details. from .auth import * +from .workflow_manager import * from ..messaging.model import * diff --git a/zengine/models/auth.py b/zengine/models/auth.py index 61f2c995..19aca589 100644 --- a/zengine/models/auth.py +++ b/zengine/models/auth.py @@ -20,7 +20,7 @@ class Unit(Model): """ name = field.String("İsim", index=True) - parent = LinkProxy('Unit', verbose_name='Parent Unit', reverse_name='sub_units') + # parent = LinkProxy('Unit', verbose_name='Parent Unit', reverse_name='sub_units') class Meta: verbose_name = "Unit" diff --git a/zengine/models/workflow_manager.py b/zengine/models/workflow_manager.py index fbdee606..b71ea931 100644 --- a/zengine/models/workflow_manager.py +++ b/zengine/models/workflow_manager.py @@ -6,15 +6,23 @@ # # This file is licensed under the GNU General Public License v3 # (GPLv3). See LICENSE.txt for details. +import json +from pika.exceptions import ChannelClosed +from pika.exceptions import ConnectionClosed from pyoko import Model, field, ListNode, LinkProxy from pyoko.conf import settings from pyoko.lib.utils import get_object_from_path +from SpiffWorkflow.bpmn.parser.util import full_attr, BPMN_MODEL_NS, ATTRIBUTE_NS +from zengine.client_queue import get_mq_connection +from zengine.lib.cache import Cache +from zengine.lib.translation import gettext_lazy as _ UnitModel = get_object_from_path(settings.UNIT_MODEL) RoleModel = get_object_from_path(settings.ROLE_MODEL) AbstractRoleModel = get_object_from_path(settings.ABSTRACT_ROLE_MODEL) + class DiagramXML(Model): """ Diagram XML versions @@ -22,6 +30,67 @@ class DiagramXML(Model): body = field.String("XML content", index=False) name = field.String("Name") + @classmethod + def get_or_create_by_content(cls, name, content): + """ + if xml content updated, create a new entry for given wf name + Args: + name: name of wf + content: xml content + + Returns (DiagramXML(), bool): A tuple with two members. + (DiagramXML instance and True if it's new or False it's already exists + """ + new = False + diagrams = cls.objects.filter(name=name) + if diagrams: + diagram = diagrams[0] + if diagram.body != content: + new = True + else: + new = True + if new: + diagram = cls(name=name, body=content).save() + return diagram, new + + +class RunningInstancesExist(Exception): + pass + + +NS = {'bpmn': BPMN_MODEL_NS, + 'camunda': ATTRIBUTE_NS} + + +class BPMNParser(object): + """ + Custom BPMN diagram parser + """ + + def __init__(self, xml_content=None, xml_file=None): + if xml_content: + import StringIO + self.root = ET.parse(StringIO(xml_content)) + else: + self.root = ET.parse(xml_file) + + def _get_wf_description(self): + """ + Tries to get WF description from 'collabration' or 'process' or 'pariticipant' + + Returns str: WF description + + """ + + desc = ( + self.root.find('bpmn:collaboration/bpmn:documentation', NS) or + self.root.find('bpmn:process/bpmn:documentation', NS) or + self.root.find('bpmn:collaboration/bpmn:participant/bpmn:documentation', NS) + ) + + return desc.text if desc else '' + + class BPMNWorkflow(Model): """ @@ -53,6 +122,95 @@ class Meta: def __unicode__(self): return '%s' % self.name + def set_xml(self, diagram, force=False): + """ + updates xml link if there aren't any running instances of this wf + Args: + diagram: XMLDiagram object + """ + no_of_running = WFInstance.objects.filter(wf=self, finished=False, started=True).count() + if no_of_running and not force: + raise RunningInstancesExist( + "Can't update WF diagram! Running %s WF instances exists for %s" % ( + no_of_running, self.name + )) + else: + self.xml = diagram + self.save() + + +JOB_REPEATING_PERIODS = ( + (0, 'No repeat'), + (5, 'Hourly'), + (10, 'Daily'), + (15, 'Weekly'), + (20, 'Monthly'), + (25, 'Yearly'), +) + +JOB_NOTIFICATION_DENSITY = ( + (0, _('None')), + (5, _('15 day before, once in 3 days')), + (10, _('1 week before, daily')), + (15, _('Day before start time')), +) + +ROLE_SEARCH_DEPTH = ( + (1, _('Selected unit')), + (2, _('Selected unit and all sub-units of it')) +) + + +class Task(Model): + """ + + Task definition for workflows + + """ + run = field.Boolean("Create tasks", default=False) + wf = BPMNWorkflow() + name = field.String(_("Name of task")) + abstract_role = AbstractRoleModel(null=True) + role = RoleModel(null=True) + unit = UnitModel(null=True) + search_depth = field.Integer(_("Get roles from"), choices=ROLE_SEARCH_DEPTH) + role_query_code = field.String(_("Role query method"), null=True) + object_query_code = field.String(_("Role query method"), null=True) + object_key = field.String(_("Subject ID"), null=True) + start_date = field.DateTime(_("Start time")) + finish_date = field.DateTime(_("Finish time")) + repeat = field.Integer(_("Repeating period"), default=0, choices=JOB_REPEATING_PERIODS) + notification_density = field.Integer(_("Notification density"), + choices=JOB_NOTIFICATION_DENSITY) + + class Meta: + verbose_name = "Workflow Task" + verbose_name_plural = "Workflows Tasks" + search_fields = ['name'] + list_fields = ['name', ] + + def create_tasks(self): + """ + creates all the task that defined by this wf task instance + """ + WFInstance(wf=self.wf, ) + roles = self.get_roles() + + def get_roles(self): + pass + + def create_periodic_tasks(self): + pass + + def post_save(self): + if self.run: + self.run = False + self.create_tasks() + self.save() + + def __unicode__(self): + return '%s' % self.name + class WFInstance(Model): """ @@ -61,13 +219,15 @@ class WFInstance(Model): """ wf = BPMNWorkflow() - role = RoleModel() - diagram_version = field.DateTime() + task = Task() + current_actor = RoleModel() wf_object = field.String("Subject ID") - start_date = field.DateTime("Start time") last_activation = field.DateTime("Last activation") + finished = field.Boolean(default=False) + started = field.Boolean(default=False) + start_date = field.DateTime("Start time") finish_date = field.DateTime("Finish time") - state = field.String("WF State") + state = field.String("Serialized state of WF") class Meta: verbose_name = "Workflow Instance" @@ -83,55 +243,71 @@ def __unicode__(self): return '%s instance (%s)' % (self.wf.name, self.key) -JOB_REPEATING_PERIODS = ( - (0, 'No repeat'), - (5, 'Hourly'), - (10, 'Daily'), - (15, 'Weekly'), - (20, 'Monthly'), - (25, 'Yearly'), -) +class TaskInvitation(Model): + instance = WFInstance() + role = RoleModel() + name = field.String() + start_date = field.DateTime("Start time") + finish_date = field.DateTime("Finish time") -JOB_NOTIFICATION_DENSITY = ( - (0, 'None'), - (5, '15 day before, once in 3 days'), - (10, '1 week before, daily'), - (15, 'Day before start time'), -) + def __unicode__(self): + return "%s invitation for %s" % (self.name, self.role) -JOB_TYPES = ( - (0, 'Model'), - (5, 'Abstract Role'), - (10, 'Role'), - (15, 'Unit'), -) + def delete_other_invitations(self): + """ + When one person use an invitation, we should delete other invitations + """ + # TODO: Signal logged-in users to remove the task from task list + self.objects.filter(instance=self.instance).exclude(key=self.key).delete() -class WFTask(Model): +class WFCache(Cache): """ + Cache object for workflow instances. - Task definition for workflows - + Args: + wf_token: Token of the workflow instance. """ - wf = BPMNWorkflow() - name = field.String("Name of task") - abstract_role = AbstractRoleModel(null=True) - role = RoleModel(null=True) - root_unit = UnitModel(null=True) - role_query_code = field.String("Role query method", null=True) - object_query_code = field.String("Role query method", null=True) - object_key = field.String("Subject ID", null=True) - start_date = field.DateTime("Start time") - finish_date = field.DateTime("Finish time") - repeat = field.Integer("Repeating period", default=0, choices=JOB_REPEATING_PERIODS) - task_type = field.Integer("Task Type", choices=JOB_TYPES) - notification_density = field.Integer("Notification density", choices=JOB_NOTIFICATION_DENSITY) + PREFIX = 'WF' + mq_channel = None + mq_connection = None - class Meta: - verbose_name = "Workflow Task" - verbose_name_plural = "Workflows Tasks" - search_fields = ['name'] - list_fields = ['name', ] + def __init__(self, wf_token, sess_id): + self.db_key = wf_token + self.sess_id = sess_id + super(WFCache, self).__init__(wf_token) - def __unicode__(self): - return '%s' % self.name + @classmethod + def _connect_mq(cls): + if cls.mq_connection is None or cls.mq_connection.is_closed: + cls.mq_connection, cls.mq_channel = get_mq_connection() + return cls.mq_channel + + def write_to_db_through_mq(self, sess_id, val): + """ + write wf state to DB through MQ >> Worker >> _zops_sync_wf_cache + Args: + sess_id: users session id + """ + data = dict(exchange='input_exc', + routing_key=sess_id, + body=json.dumps({'data': { + 'view': '_zops_sync_wf_cache', + 'wf_state': val, + 'token': self.db_key}, + '_zops_remote_ip': ''})) + try: + self.mq_channel.basic_publish(**data) + except (ConnectionClosed, ChannelClosed): + self._connect_mq().basic_publish(**data) + + def get_from_db(self): + return WFInstance.objects.get(self.db_key) + + def get(self): + return super(WFCache, self).get() or self.get_from_db() + + def set(self, val, lifetime=None): + super(WFCache, self).set(val, lifetime) + self.write_to_db_through_mq(self.sess_id, val) + return val diff --git a/zengine/settings.py b/zengine/settings.py index baae5987..ebb163e5 100644 --- a/zengine/settings.py +++ b/zengine/settings.py @@ -123,6 +123,7 @@ 'sessid_to_userid': 'zengine.views.system.sessid_to_userid', 'mark_offline_user': 'zengine.views.system.mark_offline_user', 'ping': 'zengine.views.dev_utils.Ping', + '_zops_sync_wf_cache': 'zengine.views.system.sync_wf_cache', '_zops_create_message': 'zengine.messaging.views.create_message', '_zops_show_channel': 'zengine.messaging.views.show_channel', '_zops_list_channels': 'zengine.messaging.views.list_channels',