Skip to content

Commit

Permalink
more fixes in eventhandler
Browse files Browse the repository at this point in the history
  • Loading branch information
vvb committed Dec 18, 2015
1 parent 29bba45 commit 61bde4b
Showing 1 changed file with 58 additions and 28 deletions.
86 changes: 58 additions & 28 deletions ucsmsdk/ucseventhandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,17 @@ def __enqueue_function(self):

try:
while self.__event_chan_resp and len(self.__wbs):
# log.debug('waiting to acquire lock on %s' %
# self.__enqueue_thread.name)
self.__condition.acquire()
# log.debug('condition acquired by %s' %
# self.__enqueue_thread.name)

if self.__handle.cookie is None:
break
self.__condition.acquire()
if self.__event_chan_resp is None:
break

resp = self.__event_chan_resp.readline()
resp = self.__event_chan_resp.read(int(resp))
for mo_elem in self.__get_mo_elem(resp):
Expand All @@ -154,10 +162,20 @@ def __enqueue_function(self):
for watch_block in self.__wbs:
if watch_block.fmce(mce):
watch_block.enqueue(mce)
# log.debug('condition notified by %s' %
# self.__enqueue_thread.name)
self.__condition.notify()

# log.debug('condition released by %s' %
# self.__enqueue_thread.name)
self.__condition.release()
finally:
self.__condition.release()

if len(self.__wbs) == 0:
self.__condition.acquire()
self.__condition.notify()
self.__condition.release()
except:
raise

def __thread_enqueue_start(self):
"""
Expand All @@ -167,24 +185,29 @@ def __thread_enqueue_start(self):

self.__enqueue_thread = Thread(name="enqueue_thread",
target=self.__enqueue_function)
self.__enqueue_thread.daemon = True
# self.__enqueue_thread.daemon = True
self.__enqueue_thread.start()

def __thread_enqueue_stop(self):
"""
Internal method to stop the enqueue thread.
"""

self.__enqueue_thread = None
self.__event_chan_resp = None
# def __thread_enqueue_stop(self):
# """
# Internal method to stop the enqueue thread.
# """
#
# #self.__enqueue_thread = None
# self.__event_chan_resp = None

def __dequeue_function(self):
"""
Internal method to dequeue to events.
"""

while len(self.__wbs):
# log.debug('waiting to acquire lock on %s' %
# self.__dequeue_thread.name)
self.__condition.acquire()
# log.debug('condition acquired by %s' %
# self.__dequeue_thread.name)

lowest_timeout = None
wb_to_remove = []

Expand All @@ -205,9 +228,10 @@ def __dequeue_function(self):
if poll_sec is not None and managed_object is not None:
pmo = self.__handle.query_dn(managed_object.dn)
if pmo is None:
UcsWarning('Mo ' + managed_object.dn + ' not found.')
UcsWarning('Mo ' +
managed_object.dn +
' not found.')
continue
# gmo = _GenericMo(mo=pmo, option=WriteXmlOption.ALL)
elem = pmo.to_xml()
xml_str = xc.to_xml_str(elem)
gmo = ucsmo.generic_mo_from_xml(xml_str)
Expand All @@ -234,7 +258,8 @@ def __dequeue_function(self):
if mce is None:
continue

if managed_object is None: # Means parameterset is not Mo
# Means parameter set is not Mo
if managed_object is None:
if watch_block.callback is not None:
watch_block.callback(mce)
continue
Expand All @@ -243,7 +268,6 @@ def __dequeue_function(self):
elem = mce.mo.to_xml()
xml_str = xc.to_xml_str(elem)
gmo = ucsmo.generic_mo_from_xml(xml_str)
# gmo = _GenericMo(mo=mce.mo, option=WriteXmlOption.ALL)

attributes = []
if mce is None:
Expand All @@ -263,9 +287,9 @@ def __dequeue_function(self):
mce = MoChangeEvent(event_id=0, mo=pmo,
change_list=prop)

# check if any of the conditional checks match
# if so, call the callback specified by the application
# and remove the watch_block (deferred to outside the loop)
# check if any of the conditional checks match
# if so, call the callback specified by the application
# and remove the watch_block (deferred to outside the loop)
if (
(len(success_value) > 0 and
prop_val in success_value) or
Expand Down Expand Up @@ -309,8 +333,14 @@ def __dequeue_function(self):
# were deleted in the above loop.
# In that case, no need to wait for more events
if len(self.__wbs):
# log.debug('condition wait by %s' %
# self.__dequeue_thread.name)
self.__condition.wait(lowest_timeout)
self.__condition.release()

# log.debug('condition released by %s' %
# self.__dequeue_thread.name)
self.__condition.release()

return

def __thread_dequeue_start(self):
Expand All @@ -320,14 +350,14 @@ def __thread_dequeue_start(self):

self.__dequeue_thread = Thread(name="dequeue_thread",
target=self.__dequeue_function)
self.__dequeue_thread.daemon = True
# self.__dequeue_thread.daemon = True
self.__dequeue_thread.start()

def __thread_dequeue_stop(self):
"""
Internal method to stop dequeue thread.
"""
self.__dequeue_thread = None
# def __thread_dequeue_stop(self):
# """
# Internal method to stop dequeue thread.
# """
# self.__dequeue_thread = None

def watch_block_add(self, params,
filter_callback,
Expand Down Expand Up @@ -361,9 +391,9 @@ def watch_block_remove(self, watch_block):
if watch_block in self.__wbs:
self.__wbs.remove(watch_block)

if len(self.__wbs) == 0:
self.__thread_enqueue_stop()
self.__thread_dequeue_stop()
# if len(self.__wbs) == 0:
# self.__thread_enqueue_stop()
# self.__thread_dequeue_stop()

def add(self,
class_id=None,
Expand Down

0 comments on commit 61bde4b

Please sign in to comment.