diff --git a/.github/workflows/ci-release.yml b/.github/workflows/ci-release.yml index 9e75aab9e..dd346898d 100644 --- a/.github/workflows/ci-release.yml +++ b/.github/workflows/ci-release.yml @@ -25,15 +25,15 @@ jobs: python-version: ${{ matrix.python-version }} - name: Install dependencies run: | + sudo apt update + sudo apt install -y openmpi-bin python -m venv testenv . testenv/bin/activate python -m pip install --upgrade pip setuptools wheel python -m pip install . - name: run examples - timeout-minutes: 15 + timeout-minutes: 10 run: | - sudo apt update - sudo apt install -y openmpi-bin export RADICAL_LOG_LVL=DEBUG_9 export RADICAL_PROFILE=True export RADICAL_DEBUG=TRUE @@ -42,26 +42,35 @@ jobs: export RADICAL_UTILS_ZMQ_LOG_LVL=INFO export RADICAL_UTILS_HEARTBEAT_LOG_LVL=INFO . testenv/bin/activate - mkdir example_artifacts/ - cd example_artifacts/ - ../examples/00_getting_started.py - ../examples/09_mpi_tasks.py - ../examples/01_task_details.py - ../examples/02_failing_tasks.py - ../examples/03_multiple_pilots.py - ../examples/04_scheduler_selection.py - ../examples/05_task_input_data.py - ../examples/06_task_output_data.py - ../examples/07_shared_task_data.py - ../examples/08_task_environment.py - ../examples/10_pre_and_post_exec.py - ../examples/11_task_input_data_tar.py - ../examples/11_task_input_folder.py + export RP_ROOT=$(pwd) + export BASE=/home/runner/radical.pilot.sandbox/ + mkdir -p $BASE/client_sessions + cd $BASE/client_sessions + radical-stack + $RP_ROOT/examples/00_getting_started.py + $RP_ROOT/examples/09_mpi_tasks.py + $RP_ROOT/examples/01_task_details.py + $RP_ROOT/examples/02_failing_tasks.py + $RP_ROOT/examples/03_multiple_pilots.py + $RP_ROOT/examples/04_scheduler_selection.py + $RP_ROOT/examples/05_task_input_data.py + $RP_ROOT/examples/06_task_output_data.py + $RP_ROOT/examples/07_shared_task_data.py + $RP_ROOT/examples/08_task_environment.py + $RP_ROOT/examples/10_pre_and_post_exec.py + $RP_ROOT/examples/11_task_input_data_tar.py + $RP_ROOT/examples/11_task_input_folder.py + + - name: prepare example_artifacts + if: always() + run: | + tar zcf /home/runner/example_artifacts.tgz /home/runner/radical.pilot.sandbox/ + - name: upload example_artifacts if: always() uses: actions/upload-artifact@v4 with: name: example_artifacts_${{ matrix.python-version }} - path: example_artifacts + path: /home/runner/example_artifacts.tgz overwrite: true diff --git a/bin/radical-pilot-bridge b/bin/radical-pilot-bridge index 8a5ac5149..34c94b37b 100755 --- a/bin/radical-pilot-bridge +++ b/bin/radical-pilot-bridge @@ -6,15 +6,13 @@ __license__ = "MIT" import os import sys -import time -import threading as mt import setproctitle as spt import multiprocessing as mp import radical.utils as ru -from radical.pilot.messages import HeartbeatMessage +from radical.pilot.messages import ComponentStartedMessage # ------------------------------------------------------------------------------ @@ -28,27 +26,8 @@ def main(sid, reg_addr, uid, ppid, evt): - name: name of the bridge - kind: type of bridge (`pubsub` or `queue`) - If the config contains a `heartbeat` section, that section must be formatted - as follows: - - { - 'from' : 'uid', - 'addr_pub': 'addr_pub', - 'addr_sub': 'addr_sub', - 'interval': , - 'timeout' : - } - - If that section exists, the process will daemonize and heartbeats are used - to manage the bridge lifetime: the lifetime of this bridge is then dependent - on receiving heartbeats from the given `uid`: after `timeout` seconds of no - heartbeats arriving, the bridge will terminate. The bridge itself will - publish heartbeats every `interval` seconds on the heartbeat channel under - its own uid. - - If the heartbeat section is not present in the config file, the components - lifetime is expected to be explicitly managed, i.e., that this wrapper - process hosting the bridge is terminated externally. + The config will also contain a `cmgr_url` entry which points to the cmgr + this component should register with. The config file may contain other entries which are passed to the bridge and are interpreted by the bridge implementation. @@ -105,11 +84,8 @@ def wrapped_main(sid, reg_addr, uid, log, prof, ppid, evt): pwatcher.watch(int(ppid)) pwatcher.watch(os.getpid()) - term = mt.Event() - reg = ru.zmq.RegistryClient(url=reg_addr) - - hb_cfg = ru.TypedDict(reg['heartbeat']) - b_cfg = ru.TypedDict(reg['bridges.%s.cfg' % uid]) + reg = ru.zmq.RegistryClient(url=reg_addr) + b_cfg = ru.TypedDict(reg['bridges.%s.cfg' % uid]) # create the instance and begin to work bridge = ru.zmq.Bridge.create(uid, cfg=b_cfg) @@ -119,48 +95,13 @@ def wrapped_main(sid, reg_addr, uid, log, prof, ppid, evt): reg.close() bridge.start() - evt.set() - # re-enable the test below if timing issues crop up - # if 'pubsub' in uid: - # d = ru.zmq.test_pubsub(bridge.channel, bridge.addr_pub, bridge.addr_sub) - - # bridge runs - send heartbeats so that cmgr knows about it - hb_pub = ru.zmq.Publisher('heartbeat', hb_cfg.addr_pub, log=log, prof=prof) - - def hb_beat_cb(): - hb_pub.put('heartbeat', HeartbeatMessage(uid=uid)) - - def hb_term_cb(hb_uid): - bridge.stop() - term.set() - return False - - hb = ru.Heartbeat(uid=uid, - timeout=hb_cfg.timeout, - interval=hb_cfg.interval, - beat_cb=hb_beat_cb, - term_cb=hb_term_cb, - log=log) - hb.start() - - # always watch out for session heartbeat - hb.watch(uid=sid) - - # react on session heartbeats - def hb_sub_cb(topic, msg): - hb_msg = HeartbeatMessage(from_dict=msg) - if hb_msg.uid == sid: - hb.beat(uid=sid) - - ru.zmq.Subscriber('heartbeat', hb_cfg.addr_sub, - topic='heartbeat', cb=hb_sub_cb, - log=log, prof=prof) + pipe = ru.zmq.Pipe(mode=ru.zmq.MODE_PUSH, url=b_cfg.cmgr_url) + pipe.put(ComponentStartedMessage(uid=uid, pid=os.getpid())) # all is set up - we can sit idle 'til end of time. - while not term.is_set(): - time.sleep(1) + bridge.wait() # ------------------------------------------------------------------------------ diff --git a/bin/radical-pilot-component b/bin/radical-pilot-component index 304ab6e85..0ddd00bc9 100755 --- a/bin/radical-pilot-component +++ b/bin/radical-pilot-component @@ -6,16 +6,13 @@ __license__ = "MIT" import os import sys -import time -import threading as mt import setproctitle as spt import multiprocessing as mp import radical.utils as ru import radical.pilot as rp -from radical.pilot.messages import HeartbeatMessage # ------------------------------------------------------------------------------ # @@ -28,27 +25,8 @@ def main(sid, reg_addr, uid, ppid, evt): - name: name of the component - kind: type of component - If the config contains a `heartbeat` section, that section must be formatted - as follows: - - { - 'from' : 'uid', - 'addr_pub': 'addr_pub', - 'addr_sub': 'addr_sub', - 'interval': , - 'timeout' : - } - - If that section exists, the process will daemonize and heartbeats are used - to manage the bridge lifetime: the lifetime of this bridge is then dependent - on receiving heartbeats from the given `uid`: after `timeout` seconds of no - heartbeats arriving, the bridge will terminate. The bridge itself will - publish heartbeats every `interval` seconds on the heartbeat channel under - its own uid. - - If the heartbeat section is not present in the config file, the components - lifetime is expected to be explicitly managed, i.e., that this wrapper - process hosting the bridge is terminated externally. + The config will also contain a `cmgr_url` entry which points to the cmgr + this component should register with. The config file may contain other entries which are passed to the component and are interpreted by the component implementation. @@ -79,16 +57,11 @@ def wrapped_main(sid, reg_addr, uid, log, prof, ppid, evt): pwatcher.watch(int(ppid)) pwatcher.watch(os.getpid()) - term = mt.Event() - reg = ru.zmq.RegistryClient(url=reg_addr) - - hb_cfg = ru.TypedDict(reg['heartbeat']) - c_cfg = ru.TypedDict(reg['components.%s.cfg' % uid]) + reg = ru.zmq.RegistryClient(url=reg_addr) + c_cfg = ru.TypedDict(reg['components.%s.cfg' % uid]) reg.close() - evt.set() - # start a non-primary session session = rp.Session(uid=sid, cfg=c_cfg, _role=rp.Session._DEFAULT, _reg_addr=reg_addr) @@ -96,42 +69,10 @@ def wrapped_main(sid, reg_addr, uid, log, prof, ppid, evt): # create the instance and begin to work comp = rp.utils.BaseComponent.create(c_cfg, session) comp.start() - - # component runs - send heartbeats so that session knows about it - hb_pub = ru.zmq.Publisher('heartbeat', hb_cfg.addr_pub, log=log, prof=prof) - - def hb_beat_cb(): - hb_pub.put('heartbeat', HeartbeatMessage(uid=uid)) - - def hb_term_cb(hb_uid): - comp.stop() - term.set() - return False - - hb = ru.Heartbeat(uid=uid, - timeout=hb_cfg.timeout, - interval=hb_cfg.interval, - beat_cb=hb_beat_cb, - term_cb=hb_term_cb, - log=log) - hb.start() - - # always watch out for session heartbeat - hb.watch(uid=sid) - - # react on session heartbeats - def hb_sub_cb(topic, msg): - hb_msg = HeartbeatMessage(from_dict=msg) - if hb_msg.uid == sid: - hb.beat(uid=sid) - - ru.zmq.Subscriber('heartbeat', hb_cfg.addr_sub, - topic='heartbeat', cb=hb_sub_cb, - log=log, prof=prof) + evt.set() # all is set up - we can sit idle 'til end of time. - while not term.is_set(): - time.sleep(1) + comp.wait() # ------------------------------------------------------------------------------ diff --git a/src/radical/pilot/agent/agent_0.py b/src/radical/pilot/agent/agent_0.py index 2694642f7..e0ad305f1 100644 --- a/src/radical/pilot/agent/agent_0.py +++ b/src/radical/pilot/agent/agent_0.py @@ -298,6 +298,8 @@ def finalize(self): try : log = ru.ru_open('./agent_0.log', 'r').read(1024) except: pass + self._log.debug('final cause: %s', self._final_cause) + if self._final_cause == 'timeout' : state = rps.DONE elif self._final_cause == 'cancel' : state = rps.CANCELED elif self._final_cause == 'sys.exit' : state = rps.CANCELED @@ -620,13 +622,23 @@ def control_cb(self, topic, msg): return self._ctrl_cancel_pilots(msg) elif cmd == 'service_info': - self._log.debug('=== PILOT COMMAND: %s: %s', cmd, arg) + self._log.debug('PILOT COMMAND: %s: %s', cmd, arg) return self._ctrl_service_info(msg, arg) else: self._log.error('invalid command: [%s]', cmd) + # -------------------------------------------------------------------------- + # + def stop(self): + + self._log.info('stop agent') + self._final_cause = 'cancel' + super().stop() + self._session.close() + + # -------------------------------------------------------------------------- # def _ctrl_cancel_pilots(self, msg): @@ -655,7 +667,7 @@ def _ctrl_service_info(self, msg, arg): error = arg['error'] info = arg['info'] - self._log.debug('=== service info: %s: %s', uid, info) + self._log.debug('service info: %s: %s', uid, info) # This message signals that an agent service instance is up and running. # We expect to find the service UID in args and can then unblock the @@ -683,7 +695,7 @@ def _ctrl_service_info(self, msg, arg): self._reg['services.%s' % uid] = info # signal main thread when that the service is up - self._log.debug('=== set service start event for %s', uid) + self._log.debug('set service start event for %s', uid) self._service_start_evt.set() return True diff --git a/src/radical/pilot/agent/executing/base.py b/src/radical/pilot/agent/executing/base.py index 4dd05b693..4d23dee25 100644 --- a/src/radical/pilot/agent/executing/base.py +++ b/src/radical/pilot/agent/executing/base.py @@ -156,7 +156,7 @@ def _to_watcher(self): `self._cancel_task(task)`. That has to be implemented by al executors. ''' - while True: + while not self._term.is_set(): # check once per second at most time.sleep(1) diff --git a/src/radical/pilot/messages.py b/src/radical/pilot/messages.py index 2cd141c55..23ac314cc 100644 --- a/src/radical/pilot/messages.py +++ b/src/radical/pilot/messages.py @@ -4,6 +4,7 @@ import radical.utils as ru + # ------------------------------------------------------------------------------ # class RPBaseMessage(ru.Message): @@ -11,9 +12,15 @@ class RPBaseMessage(ru.Message): # rpc distinguishes messages which are forwarded to the proxy bridge and # those which are not and thus remain local to the module they originate in. - _schema = {'fwd' : bool} - _defaults = {'_msg_type': 'rp_msg', - 'fwd' : False} + _msg_type = 'rp_msg' + _schema = {'fwd': bool} + _defaults = {'fwd': False} + + @classmethod + def register(cls): + # TODO: this should be moved to the RU base class + cls._defaults['_msg_type'] = cls._msg_type + ru.Message.register_msg_type(cls._msg_type, cls) # we do not register this message type - it is not supposed to be used @@ -22,31 +29,32 @@ class RPBaseMessage(ru.Message): # ------------------------------------------------------------------------------ # -class HeartbeatMessage(RPBaseMessage): +class ComponentStartedMessage(RPBaseMessage): - # heartbeat messages are never forwarded + # startup messages are never forwarded - _schema = {'uid' : str} - _defaults = {'_msg_type': 'heartbeat', - 'fwd' : False, - 'uid' : None} + _msg_type = 'component_start' + _schema = {'uid': str, + 'pid': int} + _defaults = {'fwd': False, + 'uid': None, + 'pid': None} -ru.Message.register_msg_type('heartbeat', HeartbeatMessage) +ComponentStartedMessage.register() # ------------------------------------------------------------------------------ # class RPCRequestMessage(RPBaseMessage): + _msg_type = 'rpc_req' _schema = {'uid' : str, # uid of message 'addr' : str, # who is expected to act on the request 'cmd' : str, # rpc command 'args' : list, # rpc command arguments 'kwargs' : dict} # rpc command named arguments - _defaults = { - '_msg_type': 'rpc_req', - 'fwd' : True, + _defaults = {'fwd' : True, 'uid' : None, 'addr' : None, 'cmd' : None, @@ -54,20 +62,20 @@ class RPCRequestMessage(RPBaseMessage): 'kwargs' : {}} - -ru.Message.register_msg_type('rpc_req', RPCRequestMessage) +RPCRequestMessage.register() # ------------------------------------------------------------------------------ # class RPCResultMessage(RPBaseMessage): + _msg_type = 'rpc_res' _schema = {'uid' : str, # uid of rpc call 'val' : Any, # return value (`None` by default) 'out' : str, # stdout 'err' : str, # stderr 'exc' : str} # raised exception representation - _defaults = {'_msg_type': 'rpc_res', + _defaults = {'_msg_type': _msg_type, 'fwd' : True, 'uid' : None, 'val' : None, @@ -90,7 +98,7 @@ def __init__(self, rpc_req=None, from_dict=None, **kwargs): super().__init__(from_dict, **kwargs) -ru.Message.register_msg_type('rpc_res', RPCResultMessage) +RPCResultMessage.register() # ------------------------------------------------------------------------------ diff --git a/src/radical/pilot/pilot.py b/src/radical/pilot/pilot.py index 62711b9ae..b68073e92 100644 --- a/src/radical/pilot/pilot.py +++ b/src/radical/pilot/pilot.py @@ -3,8 +3,11 @@ __copyright__ = "Copyright 2013-2016, http://radical.rutgers.edu" __license__ = "MIT" +import os +import sys import copy import time +import signal import threading as mt @@ -77,6 +80,7 @@ def __init__(self, pmgr: PilotManager, descr): self._uid = self._descr.get('uid') self._state = rps.NEW self._log = pmgr._log + self._sub = None self._pilot_dict = dict() self._callbacks = dict() self._cb_lock = ru.RLock() @@ -167,9 +171,10 @@ def __init__(self, pmgr: PilotManager, descr): self._ctrl_pub = ru.zmq.Publisher(rpc.CONTROL_PUBSUB, url=ctrl_addr_pub, log=self._log, prof=self._prof) - ru.zmq.Subscriber(rpc.CONTROL_PUBSUB, url=ctrl_addr_sub, - log=self._log, prof=self._prof, cb=self._control_cb, - topic=rpc.CONTROL_PUBSUB) + self._sub = ru.zmq.Subscriber(rpc.CONTROL_PUBSUB, url=ctrl_addr_sub, + log=self._log, prof=self._prof, + cb=self._control_cb, + topic=rpc.CONTROL_PUBSUB) # -------------------------------------------------------------------------- @@ -189,13 +194,16 @@ def _default_state_cb(self, pilot, state=None): self._log.info("[Callback]: pilot %s state: %s.", uid, state) if state == rps.FAILED and self._exit_on_error: + self._log.error("[Callback]: pilot '%s' failed (exit)", uid) + self._sub.stop() # There are different ways to tell main... - ru.cancel_main_thread('int') + # ru.print_stacktrace() + sys.stderr.write('=== pilot failed, exit_on_error ===\n') + ru.cancel_main_thread('term') # raise RuntimeError('pilot %s failed - fatal!' % self.uid) - # os.kill(os.getpid()) - # sys.exit() + # os.kill(os.getpid(), signal.SIGTERM) # -------------------------------------------------------------------------- @@ -234,6 +242,9 @@ def _update(self, pilot_dict): self._state = target + if self._state in rps.FINAL: + self._sub.stop() + # FIXME: this is a hack to get the resource details into the pilot resources = pilot_dict.get('resources') or {} rm_info = resources.get('rm_info') @@ -610,6 +621,7 @@ def wait(self, state=None, timeout=None): if self.state in rps.FINAL: + # we will never see another state progression. Raise an error # (unless we waited for this) if self.state in states: diff --git a/src/radical/pilot/pilot_manager.py b/src/radical/pilot/pilot_manager.py index e5f354c27..1ab6655a8 100644 --- a/src/radical/pilot/pilot_manager.py +++ b/src/radical/pilot/pilot_manager.py @@ -242,6 +242,8 @@ def close(self, terminate=True): ru.write_json(json, tgt) + super().close() + # -------------------------------------------------------------------------- # diff --git a/src/radical/pilot/proxy.py b/src/radical/pilot/proxy.py index b48502233..157ee23a7 100644 --- a/src/radical/pilot/proxy.py +++ b/src/radical/pilot/proxy.py @@ -123,8 +123,7 @@ # To any request other than the above, the ZMQ bridge will respond: # 'err': 'invalid request' # -# ------------------------------------------------------------------------------ - +# # ------------------------------------------------------------------------------ # class Proxy(ru.zmq.Server): @@ -132,6 +131,7 @@ class Proxy(ru.zmq.Server): def __init__(self, path=None): self._lock = mt.Lock() + self._term = mt.Event() self._clients = dict() ru.zmq.Server.__init__(self, uid='radical.pilot.proxy', @@ -153,9 +153,9 @@ def __init__(self, path=None): def _monitor(self): # this is a daemon thread - it never exits until process termination - while True: + while not self._term.is_set(): - time.sleep(10) + time.sleep(0.5) now = time.time() # iterate w/o lock, and thus get a snapshot of the known sids @@ -192,6 +192,8 @@ def _monitor(self): # def stop(self): + self._term.set() + for sid in self._clients: self._log.info('stop client %s' % sid) self._clients[sid]['term'].set() diff --git a/src/radical/pilot/session.py b/src/radical/pilot/session.py index cd6c0246e..d0eec6b64 100644 --- a/src/radical/pilot/session.py +++ b/src/radical/pilot/session.py @@ -15,7 +15,6 @@ from . import constants as rpc from . import utils as rpu -from .messages import HeartbeatMessage from .proxy import Proxy from .resource_config import ResourceConfig, ENDPOINTS_DEFAULT @@ -65,16 +64,6 @@ class Session(object): entities. """ - # In that role, the session will create a special pubsub channel `heartbeat` - # which is used by all components in its hierarchy to exchange heartbeat - # messages. Those messages are used to watch component health - if - # a (parent or child) component fails to send heartbeats for a certain - # amount of time, it is considered dead and the process tree will terminate. - # That heartbeat management is implemented in the `ru.Heartbeat` class. - # Only primary sessions instantiate a heartbeat channel (i.e., only the root - # sessions of RP client or agent modules), but all components need to call - # the sessions `heartbeat()` method at regular intervals. - # the reporter is an application-level singleton _reporter = None @@ -168,16 +157,17 @@ def __init__(self, proxy_url: Optional[str ] = None, self._proxy_cfg = None self._closed = False self._created = time.time() + self._to_stop = list() self._close_options = _CloseOptions(close_options) self._close_options.verify() - self._proxy = None # proxy client instance - self._reg = None # registry client instance - self._pmgrs = dict() # map IDs to pmgr instances - self._tmgrs = dict() # map IDs to tmgr instances - self._cmgr = None # only primary sessions have a cmgr - self._rm = None # resource manager (agent_0 sessions) - self._hb = None # heartbeat monitor + self._proxy = None # proxy server instance + self._proxy_client = None # proxy client instance + self._reg = None # registry client instance + self._pmgrs = dict() # map IDs to pmgr instances + self._tmgrs = dict() # map IDs to tmgr instances + self._cmgr = None # only primary sessions have a cmgr + self._rm = None # resource manager (agent_0 sessions) if _reg_addr: @@ -252,17 +242,12 @@ def _init_primary(self): # only primary sessions start and initialize the proxy service self._start_proxy() - # start heartbeat channel - self._start_heartbeat() - # push the session config into the registry self._publish_cfg() # start bridges and components self._start_components() - time.sleep(1) - # primary session hooks into the control pubsub bcfg = self._reg['bridges.%s' % rpc.CONTROL_PUBSUB] self._ctrl_pub = ru.zmq.Publisher(channel=rpc.CONTROL_PUBSUB, @@ -299,7 +284,6 @@ def _init_agent_0(self): self._start_registry() self._connect_registry() self._connect_proxy() - self._start_heartbeat() self._publish_cfg() self._init_rm() self._start_components() @@ -548,93 +532,6 @@ def _init_cfg_from_registry(self): self._prof.prof('session_start', uid=self._uid) - # -------------------------------------------------------------------------- - # - def _start_heartbeat(self): - - # only primary and agent_0 sessions manage heartbeats - assert self._role in [self._PRIMARY, self._AGENT_0] - - # start the embedded heartbeat pubsub bridge - self._hb_pubsub = ru.zmq.PubSub('heartbeat_pubsub', - cfg={'uid' : 'heartbeat_pubsub', - 'type' : 'pubsub', - 'log_lvl': 'debug', - 'path' : self._cfg.path}) - self._hb_pubsub.start() - time.sleep(1) - - # re-enable the test below if timing issues crop up - # ru.zmq.test_pubsub(self._hb_pubsub.channel, - # self._hb_pubsub.addr_pub, - # self._hb_pubsub.addr_sub), - - # fill 'cfg.heartbeat' section - self._cfg.heartbeat.addr_pub = str(self._hb_pubsub.addr_pub) - self._cfg.heartbeat.addr_sub = str(self._hb_pubsub.addr_sub) - - # create a publisher for that channel to publish own heartbeat - self._hb_pub = ru.zmq.Publisher(channel='heartbeat_pubsub', - url=self._cfg.heartbeat.addr_pub, - log=self._log, - prof=self._prof) - - - # -------------------------------------- - # start the heartbeat monitor, but first - # define its callbacks - def _hb_beat_cb(): - # called on every heartbeat: cfg.heartbeat.interval` - # publish own heartbeat - self._hb_pub.put('heartbeat', HeartbeatMessage(uid=self._uid)) - - # also update proxy heartbeat - if self._proxy: - try: - self._proxy.request('heartbeat', {'sid': self._uid}) - except: - # ignore errors in case proxy went away already - pass - # -------------------------------------- - - # -------------------------------------- - # called when some entity misses - # heartbeats: `cfg.heartbeat.timeout` - def _hb_term_cb(hb_uid): - if self._cmgr: - self._cmgr.close() - return False - # -------------------------------------- - - # create heartbeat manager which monitors all components in this session - # self._log.debug('hb %s from session', self._uid) - self._hb = ru.Heartbeat(uid=self._uid, - timeout=self._cfg.heartbeat.timeout, - interval=self._cfg.heartbeat.interval, - beat_cb=_hb_beat_cb, - term_cb=_hb_term_cb, - log=self._log) - self._hb.start() - - # -------------------------------------- - # subscribe to heartbeat msgs and inform - # self._hb about every heartbeat - def _hb_msg_cb(topic, msg): - - hb_msg = HeartbeatMessage(from_dict=msg) - - if hb_msg.uid != self._uid: - self._hb.beat(uid=hb_msg.uid) - # -------------------------------------- - - ru.zmq.Subscriber(channel='heartbeat_pubsub', - topic='heartbeat', - url=self._cfg.heartbeat.addr_sub, - cb=_hb_msg_cb, - log=self._log, - prof=self._prof) - - # -------------------------------------------------------------------------- # def _publish_cfg(self): @@ -644,15 +541,13 @@ def _publish_cfg(self): assert self._role in [self._PRIMARY, self._AGENT_0] - # push proxy, bridges, components and heartbeat subsections separately + # push proxy, bridges, and components subsections separately flat_cfg = copy.deepcopy(self._cfg) - del flat_cfg['heartbeat'] del flat_cfg['bridges'] del flat_cfg['components'] self._reg['cfg'] = flat_cfg - self._reg['heartbeat'] = self._cfg.heartbeat self._reg['bridges'] = self._cfg.bridges # proxy bridges self._reg['components'] = {} @@ -713,8 +608,8 @@ def _start_proxy(self): # configure proxy channels try: - self._proxy = ru.zmq.Client(url=self._cfg.proxy_url, log=self._log) - self._proxy_cfg = self._proxy.request('register', {'sid':self._uid}) + self._proxy_client = ru.zmq.Client(url=self._cfg.proxy_url, log=self._log) + self._proxy_cfg = self._proxy_client.request('register', {'sid':self._uid}) except: self._log.exception('%s: failed to start proxy', self._role) @@ -731,8 +626,8 @@ def _connect_proxy(self): assert self._cfg.proxy_url # query the proxy service to fetch proxy cfg created by primary session - self._proxy = ru.zmq.Client(url=self._cfg.proxy_url) - self._proxy_cfg = self._proxy.request('lookup', {'sid': self._uid}) + self._proxy_client = ru.zmq.Client(url=self._cfg.proxy_url) + self._proxy_cfg = self._proxy_client.request('lookup', {'sid': self._uid}) self._log.debug('proxy response: %s', self._proxy_cfg) @@ -806,8 +701,11 @@ def pubsub_fwd(topic, msg): publisher.put(tgt, msg) - ru.zmq.Subscriber(channel=src, topic=src, path=path, cb=pubsub_fwd, - url=url_sub, log=self._log, prof=self._prof) + sub = ru.zmq.Subscriber(channel=src, topic=src, path=path, + cb=pubsub_fwd, url=url_sub, + log=self._log, prof=self._prof) + + self._to_stop.append(sub) # -------------------------------------------------------------------------- @@ -880,9 +778,8 @@ def _start_components(self): assert self._role in [self._PRIMARY, self._AGENT_0, self._AGENT_N] - # primary sessions and agents have a component manager which also - # manages heartbeat. 'self._cmgr.close()` should be called during - # termination + # primary sessions and agents have a component manager + # 'self._cmgr.close()` should be called during termination self._cmgr = rpu.ComponentManager(self.uid, self.reg_addr, self._uid) self._cmgr.start_bridges(self._cfg.bridges) self._cmgr.start_components(self._cfg.components) @@ -934,6 +831,7 @@ def close(self, **kwargs): options = self._close_options if options.terminate: + # terminate all components if self._role == self._PRIMARY: self._ctrl_pub.put(rpc.CONTROL_PUBSUB, {'cmd': 'terminate', @@ -953,22 +851,25 @@ def close(self, **kwargs): if self._cmgr: self._cmgr.close() - # stop heartbeats - if self._hb: - self._hb.stop() - self._hb_pubsub.stop() - - if self._proxy: + if self._proxy_client: if self._role == self._PRIMARY: try: self._log.debug('session %s closes service', self._uid) - self._proxy.request('unregister', {'sid': self._uid}) + self._proxy_client.request('unregister', {'sid': self._uid}) except: pass if self._role in [self._PRIMARY, self._AGENT_0]: - self._proxy.close() + self._proxy_client.close() + self._proxy_client = None + + if self._proxy: + + if self._role in [self._PRIMARY, self._AGENT_0]: + + self._proxy.stop() + self._proxy.wait() self._proxy = None self._log.debug("session %s closed", self._uid) @@ -1002,24 +903,27 @@ def close(self, **kwargs): self._rep.ok('>>ok\n') + for thing in self._to_stop: + thing.stop() + + # -------------------------------------------------------------------------- # def _run_proxy(self): - proxy = Proxy(path=self._cfg.path) + self._proxy = Proxy(path=self._cfg.path) try: - proxy.start() + self._proxy.start() - self._proxy_url = proxy.addr + self._proxy_url = self._proxy.addr self._proxy_event.set() # run forever until process is interrupted or killed - proxy.wait() + self._proxy.wait() finally: - proxy.stop() - proxy.wait() + self._proxy.stop() # -------------------------------------------------------------------------- diff --git a/src/radical/pilot/states.py b/src/radical/pilot/states.py index 784197c46..a59a939f3 100644 --- a/src/radical/pilot/states.py +++ b/src/radical/pilot/states.py @@ -68,10 +68,10 @@ def _pilot_state_progress(pid, current, target): if target in [DONE, FAILED, CANCELED]: return [target, []] - # allow to transition from FAILED to DONE (done gets picked up from DB, + # allow to transition from FAILED to DONE (done gets picked up from proxy, # sometimes after pilot watcher detects demise) if current == FAILED: - if target in [DONE, FAILED]: + if target in FINAL: return [target, []] if current in FINAL and target != current: diff --git a/src/radical/pilot/task_manager.py b/src/radical/pilot/task_manager.py index 30f9d0fc3..6307df3e5 100644 --- a/src/radical/pilot/task_manager.py +++ b/src/radical/pilot/task_manager.py @@ -175,10 +175,10 @@ def __init__(self, session, cfg='default', scheduler=None): self._ctrl_pub = ru.zmq.Publisher(rpc.CONTROL_PUBSUB, url=ctrl_addr_pub, log=self._log, prof=self._prof) - ru.zmq.Subscriber(rpc.CONTROL_PUBSUB, url=ctrl_addr_sub, - log=self._log, prof=self._prof, - cb=self._control_cb, - topic=rpc.CONTROL_PUBSUB) + self._ctrl_sub = ru.zmq.Subscriber(rpc.CONTROL_PUBSUB, url=ctrl_addr_sub, + log=self._log, prof=self._prof, + cb=self._control_cb, + topic=rpc.CONTROL_PUBSUB) self._prof.prof('setup_done', uid=self._uid) self._rep.ok('>>ok\n') @@ -248,6 +248,10 @@ def close(self): tgt = '%s/%s.json' % (self._session.path, self.uid) ru.write_json(json, tgt) + self._ctrl_sub.stop() + + super().close() + # -------------------------------------------------------------------------- # diff --git a/src/radical/pilot/utils/component.py b/src/radical/pilot/utils/component.py index 008271496..fdc5cb851 100644 --- a/src/radical/pilot/utils/component.py +++ b/src/radical/pilot/utils/component.py @@ -16,6 +16,7 @@ from .. import constants as rpc from .. import states as rps +from ..messages import ComponentStartedMessage from ..messages import RPCRequestMessage, RPCResultMessage @@ -103,9 +104,8 @@ def __init__(self, cfg, session): the session under which to run this component, and a uid for the component itself which MUST be unique within the scope of the given session. - All components and the component managers will continuously sent heartbeat - messages on the control pubsub - missing heartbeats will by default lead to - component termination. + Components will send a startup message to the component manager upon + successful initialization. Further, the class must implement the registered work methods, with a signature of:: @@ -230,13 +230,22 @@ def start(self): assert self._thread.is_alive() + # send startup message + if self._cfg.cmgr_url: + self._log.debug('send startup message to %s', self._cfg.cmgr_url) + pipe = ru.zmq.Pipe(mode=ru.zmq.MODE_PUSH, url=self._cfg.cmgr_url) + pipe.put(ComponentStartedMessage(uid=self.uid, pid=os.getpid())) + + # give the message some time to get out + time.sleep(0.1) + # -------------------------------------------------------------------------- # def wait(self): while not self._term.is_set(): - time.sleep(1) + time.sleep(0.1) # -------------------------------------------------------------------------- @@ -553,9 +562,6 @@ def _finalize(self): # call component level finalize, before we tear down channels self.finalize() - for thread in self._threads.values(): - thread.stop() - self._log.debug('%s close prof', self.uid) try: self._prof.prof('component_final') @@ -821,28 +827,28 @@ def register_timed_cb(self, cb, cb_data=None, timer=None): class Idler(mt.Thread): # -------------------------------------------------------------- - def __init__(self, name, log, timer, cb, cb_data, cb_lock): + def __init__(self, name, log, timer, cb, cb_data, cb_lock, term): self._name = name self._log = log self._timeout = timer self._cb = cb self._cb_data = cb_data self._cb_lock = cb_lock + self._term = term self._last = 0.0 - self._term = mt.Event() - super(Idler, self).__init__() + super().__init__() self.daemon = True self.start() - def stop(self): - self._term.set() - def run(self): try: self._log.debug('start idle thread: %s', self._cb) ret = True - while ret and not self._term.is_set(): + while ret: + if self._term.is_set(): + break + if self._timeout and \ self._timeout > (time.time() - self._last): # not yet @@ -861,7 +867,8 @@ def run(self): # ------------------------------------------------------------------ idler = Idler(name=name, timer=timer, log=self._log, - cb=cb, cb_data=cb_data, cb_lock=self._cb_lock) + cb=cb, cb_data=cb_data, cb_lock=self._cb_lock, + term=self._term) self._threads[name] = idler self._log.debug('%s registered idler %s', self.uid, name) @@ -1203,6 +1210,22 @@ def publish(self, pubsub, msg, topic=None): self._publishers[pubsub].put(topic, msg) + # -------------------------------------------------------------------------- + # + def close(self): + + self._term.set() + + for inp in self._inputs: + self._inputs[inp]['queue'].stop() + + for sub in self._subscribers: + self._subscribers[sub].stop() + + self._prof.close() + self._log.close() + + # ------------------------------------------------------------------------------ # class ClientComponent(BaseComponent): diff --git a/src/radical/pilot/utils/component_manager.py b/src/radical/pilot/utils/component_manager.py index d27da077d..27d6a83ac 100644 --- a/src/radical/pilot/utils/component_manager.py +++ b/src/radical/pilot/utils/component_manager.py @@ -6,10 +6,11 @@ import os import time +import signal import radical.utils as ru -from ..messages import HeartbeatMessage +from ..messages import ComponentStartedMessage # ------------------------------------------------------------------------------ @@ -22,8 +23,8 @@ class ComponentManager(object): etc. This ComponentManager centralises the code needed to spawn, manage and terminate such components. Any code which needs to create component should create a ComponentManager instance and pass the required component and - bridge layout and configuration. Callng `stop()` on the cmgr will terminate - the components and brisged. + bridge layout and configuration. Calling `stop()` on the cmgr will + terminate the components and bridges. ''' # -------------------------------------------------------------------------- @@ -36,10 +37,10 @@ def __init__(self, sid, reg_addr, owner): self._sid = sid self._reg_addr = reg_addr self._owner = owner + self._to_kill = list() self._reg = ru.zmq.RegistryClient(url=self._reg_addr) self._cfg = ru.Config(from_dict=self._reg['cfg']) - self._hb_cfg = ru.Config(from_dict=self._reg['heartbeat']) self._uid = ru.generate_id('cmgr.%(item_counter)04d', ru.ID_CUSTOM, ns=self._sid) @@ -55,30 +56,29 @@ def __init__(self, sid, reg_addr, owner): self._log.debug('cmgr %s (%s)', self._uid, self._owner) - # component managers listen on the heartbeat pubsub to see if spawned - # components come alive - self._heartbeats = dict() # heartbeats we have seen - ru.zmq.Subscriber(channel='heartbeat_pubsub', - topic='heartbeat', - url=self._hb_cfg.addr_sub, - cb=self._hb_msg_cb, - log=self._log, - prof=self._prof) + # component managers open a zmq pipe so that components and bridges can + # send registration messages. + self._startups = dict() # startup messages we have seen + def register_cb(msg): + self._log.debug('=== got message: %s', msg) + msg = ru.zmq.Message.deserialize(msg) + if isinstance(msg, ComponentStartedMessage): + self._startups[msg.uid] = msg + self._to_kill.append(msg.pid) + else: + self._log.error('unknown message type: %s', type(msg)) - # -------------------------------------------------------------------------- - # - def _hb_msg_cb(self, topic, msg): - - hb_msg = HeartbeatMessage(from_dict=msg) - self._heartbeats[hb_msg.uid] = time.time() + self._pipe = ru.zmq.Pipe(mode=ru.zmq.MODE_PULL) + self._pipe.register_cb(register_cb) + self._cfg.cmgr_url = str(self._pipe.url) # -------------------------------------------------------------------------- # def _wait_startup(self, uids, timeout): ''' - Wait for the first heartbeat of the given component UIDs to appear. If + Wait for the startup message of the given component UIDs to appear. If that does not happen before timeout, an exception is raised. ''' @@ -89,7 +89,7 @@ def _wait_startup(self, uids, timeout): self._log.debug('wait for : %s', nok) - ok = [uid for uid in uids if uid in self._heartbeats] + ok = [uid for uid in uids if uid in self._startups] nok = [uid for uid in uids if uid not in ok] if len(ok) == len(uids): @@ -131,13 +131,13 @@ def start_bridges(self, bridges): bcfg.uid = uid bcfg.channel = bname bcfg.cmgr = self.uid + bcfg.cmgr_url = self._cfg.cmgr_url bcfg.owner = self._owner bcfg.sid = self._cfg.sid bcfg.path = self._cfg.path bcfg.reg_addr = self._cfg.reg_addr bcfg.log_lvl = self._cfg.log_lvl bcfg.debug_lvl = self._cfg.debug_lvl - bcfg.heartbeat = self._hb_cfg self._reg['bridges.%s.cfg' % bname] = bcfg @@ -151,12 +151,11 @@ def start_bridges(self, bridges): self._log.error(msg) raise RuntimeError(msg) - self._heartbeats[bname] = None self._log.info('created bridge %s [%s]', bname, bname) - # all bridges are started, wait for their heartbeats + # all bridges are started, wait for their startup messages self._log.debug('wait for %s', buids) - self._wait_startup(buids, timeout=self._hb_cfg.timeout) + self._wait_startup(buids, timeout=10.0) self._prof.prof('start_bridges_stop', uid=self._uid) @@ -184,13 +183,13 @@ def start_components(self, components, cfg = None): ccfg.owner = self._owner ccfg.sid = self._cfg.sid ccfg.cmgr = self._cfg.uid + ccfg.cmgr_url = self._cfg.cmgr_url ccfg.base = self._cfg.base ccfg.path = self._cfg.path ccfg.reg_addr = self._cfg.reg_addr ccfg.proxy_url = self._cfg.proxy_url ccfg.log_lvl = self._cfg.log_lvl ccfg.debug_lvl = self._cfg.debug_lvl - ccfg.heartbeat = self._hb_cfg if cfg: ru.dict_merge(ccfg, cfg, ru.OVERWRITE) @@ -213,7 +212,7 @@ def start_components(self, components, cfg = None): # all components should start now, wait for heartbeats to appear. self._log.debug('wait for %s', cuids) - self._wait_startup(cuids, timeout=self._hb_cfg.timeout) + self._wait_startup(cuids, timeout=10.0) self._prof.prof('start_components_stop', uid=self._uid) @@ -224,6 +223,18 @@ def close(self): self._prof.prof('close', uid=self._uid) + for pid in self._to_kill: + + self._log.debug('kill %s', pid) + + try: + os.kill(pid, signal.SIGKILL) + + except ProcessLookupError: + pass + + self._pipe.stop() + # ------------------------------------------------------------------------------ diff --git a/src/radical/pilot/utils/staging_helper.py b/src/radical/pilot/utils/staging_helper.py index 5773c9962..18eaf62c3 100644 --- a/src/radical/pilot/utils/staging_helper.py +++ b/src/radical/pilot/utils/staging_helper.py @@ -149,6 +149,9 @@ def copy(self, src, tgt, flags): # # FIXME: why?? # flags = 0 + src = ru.Url(src) + tgt = ru.Url(tgt) + assert self._has_saga tmp = ru.Url(tgt) diff --git a/tests/unit_tests/test_executing/test_base.py b/tests/unit_tests/test_executing/test_base.py index 0f8df63e7..a9bf6556d 100755 --- a/tests/unit_tests/test_executing/test_base.py +++ b/tests/unit_tests/test_executing/test_base.py @@ -75,10 +75,15 @@ def test_initialize(self, mocked_rm, mocked_init): 'resource_manager': 'FORK', 'agent_spawner' : 'POPEN'}) - ec._log = ec._prof = mock.Mock() - ec.work = ec.control_cb = mock.Mock() - ec.register_input = ec.register_output = mock.Mock() - ec.register_publisher = ec.register_subscriber = mock.Mock() + ec._term = mock.Mock() + ec._log = mock.Mock() + ec._prof = mock.Mock() + ec.work = mock.Mock() + ec.control_cb = mock.Mock() + ec.register_input = mock.Mock() + ec.register_output = mock.Mock() + ec.register_publisher = mock.Mock() + ec.register_subscriber = mock.Mock() mocked_rm.create.return_value = mocked_rm ec.initialize()