Skip to content

Commit

Permalink
Merge pull request #35 from ifm/develop
Browse files Browse the repository at this point in the history
Release 0.7.0
  • Loading branch information
cwiede authored Aug 19, 2021
2 parents 8ce96c6 + bc2edba commit 2db06a3
Show file tree
Hide file tree
Showing 27 changed files with 667 additions and 1,014 deletions.
47 changes: 37 additions & 10 deletions nexxT/core/ActiveApplication.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
import logging
from PySide2.QtCore import QObject, Slot, Signal, Qt, QCoreApplication
from nexxT.interface import FilterState, OutputPortInterface, InputPortInterface
from nexxT.core.Exceptions import FilterStateMachineError, NexTInternalError
from nexxT.core.Exceptions import FilterStateMachineError, NexTInternalError, PossibleDeadlock
from nexxT.core.CompositeFilter import CompositeFilter
from nexxT.core.Utils import Barrier, assertMainThread
from nexxT.core.Utils import Barrier, assertMainThread, MethodInvoker
from nexxT.core.Thread import NexTThread

logger = logging.getLogger(__name__) # pylint: disable=invalid-name
Expand All @@ -40,7 +40,7 @@ def __init__(self, graph):
self._numThreadsSynced = 0
self._state = FilterState.CONSTRUCTING
self._graphConnected = False
self._portToPortConns = []
self._interThreadConns = []
self._operationInProgress = False
# connect signals and slots
for tname in self._threads:
Expand Down Expand Up @@ -103,7 +103,7 @@ def cleanup(self):
self._composite2graphs = {}
# initialize private variables
self._numThreadsSynced = 0
self._portToPortConns = []
self._interThreadConns = []

def getState(self):
"""
Expand Down Expand Up @@ -232,6 +232,7 @@ def _setupConnections(self):
:return: None
"""
assertMainThread()
graph = {}
if self._graphConnected:
return
for fromNode, fromPort, toNode, toPort in self._allConnections():
Expand All @@ -241,9 +242,28 @@ def _setupConnections(self):
p0 = t0.getFilter(fromNode).getPort(fromPort, OutputPortInterface)
t1 = self._threads[toThread]
p1 = t1.getFilter(toNode).getPort(toPort, InputPortInterface)
p2pc = OutputPortInterface.setupPortToPortConnection(t0.getExecutor(), t1.getExecutor(), p0, p1)
p2pc.moveToThread(t0.qthread())
self._portToPortConns.append(p2pc)
if toThread == fromThread:
OutputPortInterface.setupDirectConnection(p0, p1)
else:
itc = OutputPortInterface.setupInterThreadConnection(p0, p1, self._threads[fromThread].qthread())
self._interThreadConns.append(itc)
if not fromThread in graph:
graph[fromThread] = set()
if not toThread in graph:
graph[toThread] = set()
graph[fromThread].add(toThread)

def _checkCycle(thread, cycleInfo):
if thread in cycleInfo:
cycle = "->".join(cycleInfo[cycleInfo.index(thread):] + [thread])
raise PossibleDeadlock(cycle)
cycle_info = cycleInfo + [thread]
for nt in graph[thread]:
_checkCycle(nt, cycle_info)

for thread in graph:
_checkCycle(thread, [])

self._graphConnected = True

@Slot()
Expand Down Expand Up @@ -348,8 +368,15 @@ def start(self):
raise FilterStateMachineError(self._state, FilterState.STARTING)
self._operationInProgress = True
self._state = FilterState.STARTING
self._setupConnections()
for itc in self._portToPortConns:
try:
self._setupConnections()
except PossibleDeadlock as e:
self._state = FilterState.OPENED
MethodInvoker(self.close, Qt.QueuedConnection)
MethodInvoker(self.deinit, Qt.QueuedConnection)
logger.error(str(e))
return
for itc in self._interThreadConns:
# set connections in active mode.
itc.setStopped(False)
self.performOperation.emit("start", Barrier(len(self._threads)))
Expand All @@ -373,7 +400,7 @@ def stop(self):
raise FilterStateMachineError(self._state, FilterState.STOPPING)
self._operationInProgress = True
self._state = FilterState.STOPPING
for itc in self._portToPortConns:
for itc in self._interThreadConns:
# set connections in active mode.
itc.setStopped(True)
self.performOperation.emit("stop", Barrier(len(self._threads)))
Expand Down
7 changes: 7 additions & 0 deletions nexxT/core/Exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,10 @@ class CompositeRecursion(NexTRuntimeError):
"""
def __init__(self, name):
super().__init__("Composite filter '%s' depends on itself." % name)

class PossibleDeadlock(NexTRuntimeError):
"""
raised during application activation when a possible deadlock is detected (a cycle was found in the thread graph)
"""
def __init__(self, cycle):
super().__init__("This graph is not deadlock-safe. A cycle has been found in the thread graph: %s" % cycle)
170 changes: 0 additions & 170 deletions nexxT/core/Executor.py

This file was deleted.

Loading

0 comments on commit 2db06a3

Please sign in to comment.