Skip to content

Commit

Permalink
Merge branch 'devel' into fix/proxy_heartbeat
Browse files Browse the repository at this point in the history
  • Loading branch information
andre-merzky authored Dec 13, 2024
2 parents 189956a + c23c288 commit ce6ef6c
Show file tree
Hide file tree
Showing 16 changed files with 248 additions and 371 deletions.
47 changes: 28 additions & 19 deletions .github/workflows/ci-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ jobs:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
sudo apt update
sudo apt install -y openmpi-bin
python -m venv testenv
. testenv/bin/activate
python -m pip install --upgrade pip setuptools wheel
python -m pip install .
- name: run examples
timeout-minutes: 15
timeout-minutes: 10
run: |
sudo apt update
sudo apt install -y openmpi-bin
export RADICAL_LOG_LVL=DEBUG_9
export RADICAL_PROFILE=True
export RADICAL_DEBUG=TRUE
Expand All @@ -42,25 +42,34 @@ jobs:
export RADICAL_UTILS_ZMQ_LOG_LVL=INFO
export RADICAL_UTILS_HEARTBEAT_LOG_LVL=INFO
. testenv/bin/activate
mkdir example_artifacts/
cd example_artifacts/
../examples/00_getting_started.py
../examples/09_mpi_tasks.py
../examples/01_task_details.py
../examples/02_failing_tasks.py
../examples/03_multiple_pilots.py
../examples/04_scheduler_selection.py
../examples/05_task_input_data.py
../examples/06_task_output_data.py
../examples/07_shared_task_data.py
../examples/08_task_environment.py
../examples/10_pre_and_post_exec.py
../examples/11_task_input_data_tar.py
../examples/11_task_input_folder.py
export RP_ROOT=$(pwd)
export BASE=/home/runner/radical.pilot.sandbox/
mkdir -p $BASE/client_sessions
cd $BASE/client_sessions
radical-stack
$RP_ROOT/examples/00_getting_started.py
$RP_ROOT/examples/09_mpi_tasks.py
$RP_ROOT/examples/01_task_details.py
$RP_ROOT/examples/02_failing_tasks.py
$RP_ROOT/examples/03_multiple_pilots.py
$RP_ROOT/examples/04_scheduler_selection.py
$RP_ROOT/examples/05_task_input_data.py
$RP_ROOT/examples/06_task_output_data.py
$RP_ROOT/examples/07_shared_task_data.py
$RP_ROOT/examples/08_task_environment.py
$RP_ROOT/examples/10_pre_and_post_exec.py
$RP_ROOT/examples/11_task_input_data_tar.py
$RP_ROOT/examples/11_task_input_folder.py
- name: prepare example_artifacts
if: always()
run: |
tar zcf /home/runner/example_artifacts.tgz /home/runner/radical.pilot.sandbox/
- name: upload example_artifacts
if: always()
uses: actions/upload-artifact@v3
with:
name: example_artifacts
path: example_artifacts
path: /home/runner/example_artifacts.tgz

75 changes: 8 additions & 67 deletions bin/radical-pilot-bridge
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@ __license__ = "MIT"

import os
import sys
import time

import threading as mt
import setproctitle as spt
import multiprocessing as mp

import radical.utils as ru

from radical.pilot.messages import HeartbeatMessage
from radical.pilot.messages import ComponentStartedMessage


# ------------------------------------------------------------------------------
Expand All @@ -28,27 +26,8 @@ def main(sid, reg_addr, uid, ppid, evt):
- name: name of the bridge
- kind: type of bridge (`pubsub` or `queue`)
If the config contains a `heartbeat` section, that section must be formatted
as follows:
{
'from' : 'uid',
'addr_pub': 'addr_pub',
'addr_sub': 'addr_sub',
'interval': <float>,
'timeout' : <float>
}
If that section exists, the process will daemonize and heartbeats are used
to manage the bridge lifetime: the lifetime of this bridge is then dependent
on receiving heartbeats from the given `uid`: after `timeout` seconds of no
heartbeats arriving, the bridge will terminate. The bridge itself will
publish heartbeats every `interval` seconds on the heartbeat channel under
its own uid.
If the heartbeat section is not present in the config file, the components
lifetime is expected to be explicitly managed, i.e., that this wrapper
process hosting the bridge is terminated externally.
The config will also contain a `cmgr_url` entry which points to the cmgr
this component should register with.
The config file may contain other entries which are passed to the bridge
and are interpreted by the bridge implementation.
Expand Down Expand Up @@ -105,11 +84,8 @@ def wrapped_main(sid, reg_addr, uid, log, prof, ppid, evt):
pwatcher.watch(int(ppid))
pwatcher.watch(os.getpid())

term = mt.Event()
reg = ru.zmq.RegistryClient(url=reg_addr)

hb_cfg = ru.TypedDict(reg['heartbeat'])
b_cfg = ru.TypedDict(reg['bridges.%s.cfg' % uid])
reg = ru.zmq.RegistryClient(url=reg_addr)
b_cfg = ru.TypedDict(reg['bridges.%s.cfg' % uid])

# create the instance and begin to work
bridge = ru.zmq.Bridge.create(uid, cfg=b_cfg)
Expand All @@ -119,48 +95,13 @@ def wrapped_main(sid, reg_addr, uid, log, prof, ppid, evt):

reg.close()
bridge.start()

evt.set()

# re-enable the test below if timing issues crop up
# if 'pubsub' in uid:
# d = ru.zmq.test_pubsub(bridge.channel, bridge.addr_pub, bridge.addr_sub)

# bridge runs - send heartbeats so that cmgr knows about it
hb_pub = ru.zmq.Publisher('heartbeat', hb_cfg.addr_pub, log=log, prof=prof)

def hb_beat_cb():
hb_pub.put('heartbeat', HeartbeatMessage(uid=uid))

def hb_term_cb(hb_uid):
bridge.stop()
term.set()
return False

hb = ru.Heartbeat(uid=uid,
timeout=hb_cfg.timeout,
interval=hb_cfg.interval,
beat_cb=hb_beat_cb,
term_cb=hb_term_cb,
log=log)
hb.start()

# always watch out for session heartbeat
hb.watch(uid=sid)

# react on session heartbeats
def hb_sub_cb(topic, msg):
hb_msg = HeartbeatMessage(from_dict=msg)
if hb_msg.uid == sid:
hb.beat(uid=sid)

ru.zmq.Subscriber('heartbeat', hb_cfg.addr_sub,
topic='heartbeat', cb=hb_sub_cb,
log=log, prof=prof)
pipe = ru.zmq.Pipe(mode=ru.zmq.MODE_PUSH, url=b_cfg.cmgr_url)
pipe.put(ComponentStartedMessage(uid=uid, pid=os.getpid()))

# all is set up - we can sit idle 'til end of time.
while not term.is_set():
time.sleep(1)
bridge.wait()


# ------------------------------------------------------------------------------
Expand Down
71 changes: 6 additions & 65 deletions bin/radical-pilot-component
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,13 @@ __license__ = "MIT"

import os
import sys
import time

import threading as mt
import setproctitle as spt
import multiprocessing as mp

import radical.utils as ru
import radical.pilot as rp

from radical.pilot.messages import HeartbeatMessage

# ------------------------------------------------------------------------------
#
Expand All @@ -28,27 +25,8 @@ def main(sid, reg_addr, uid, ppid, evt):
- name: name of the component
- kind: type of component
If the config contains a `heartbeat` section, that section must be formatted
as follows:
{
'from' : 'uid',
'addr_pub': 'addr_pub',
'addr_sub': 'addr_sub',
'interval': <float>,
'timeout' : <float>
}
If that section exists, the process will daemonize and heartbeats are used
to manage the bridge lifetime: the lifetime of this bridge is then dependent
on receiving heartbeats from the given `uid`: after `timeout` seconds of no
heartbeats arriving, the bridge will terminate. The bridge itself will
publish heartbeats every `interval` seconds on the heartbeat channel under
its own uid.
If the heartbeat section is not present in the config file, the components
lifetime is expected to be explicitly managed, i.e., that this wrapper
process hosting the bridge is terminated externally.
The config will also contain a `cmgr_url` entry which points to the cmgr
this component should register with.
The config file may contain other entries which are passed to the component
and are interpreted by the component implementation.
Expand Down Expand Up @@ -79,59 +57,22 @@ def wrapped_main(sid, reg_addr, uid, log, prof, ppid, evt):
pwatcher.watch(int(ppid))
pwatcher.watch(os.getpid())

term = mt.Event()
reg = ru.zmq.RegistryClient(url=reg_addr)

hb_cfg = ru.TypedDict(reg['heartbeat'])
c_cfg = ru.TypedDict(reg['components.%s.cfg' % uid])
reg = ru.zmq.RegistryClient(url=reg_addr)
c_cfg = ru.TypedDict(reg['components.%s.cfg' % uid])

reg.close()

evt.set()

# start a non-primary session
session = rp.Session(uid=sid, cfg=c_cfg,
_role=rp.Session._DEFAULT, _reg_addr=reg_addr)

# create the instance and begin to work
comp = rp.utils.BaseComponent.create(c_cfg, session)
comp.start()

# component runs - send heartbeats so that session knows about it
hb_pub = ru.zmq.Publisher('heartbeat', hb_cfg.addr_pub, log=log, prof=prof)

def hb_beat_cb():
hb_pub.put('heartbeat', HeartbeatMessage(uid=uid))

def hb_term_cb(hb_uid):
comp.stop()
term.set()
return False

hb = ru.Heartbeat(uid=uid,
timeout=hb_cfg.timeout,
interval=hb_cfg.interval,
beat_cb=hb_beat_cb,
term_cb=hb_term_cb,
log=log)
hb.start()

# always watch out for session heartbeat
hb.watch(uid=sid)

# react on session heartbeats
def hb_sub_cb(topic, msg):
hb_msg = HeartbeatMessage(from_dict=msg)
if hb_msg.uid == sid:
hb.beat(uid=sid)

ru.zmq.Subscriber('heartbeat', hb_cfg.addr_sub,
topic='heartbeat', cb=hb_sub_cb,
log=log, prof=prof)
evt.set()

# all is set up - we can sit idle 'til end of time.
while not term.is_set():
time.sleep(1)
comp.wait()


# ------------------------------------------------------------------------------
Expand Down
18 changes: 15 additions & 3 deletions src/radical/pilot/agent/agent_0.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,8 @@ def finalize(self):
try : log = ru.ru_open('./agent_0.log', 'r').read(1024)
except: pass

self._log.debug('final cause: %s', self._final_cause)

if self._final_cause == 'timeout' : state = rps.DONE
elif self._final_cause == 'cancel' : state = rps.CANCELED
elif self._final_cause == 'sys.exit' : state = rps.CANCELED
Expand Down Expand Up @@ -620,13 +622,23 @@ def control_cb(self, topic, msg):
return self._ctrl_cancel_pilots(msg)

elif cmd == 'service_info':
self._log.debug('=== PILOT COMMAND: %s: %s', cmd, arg)
self._log.debug('PILOT COMMAND: %s: %s', cmd, arg)
return self._ctrl_service_info(msg, arg)

else:
self._log.error('invalid command: [%s]', cmd)


# --------------------------------------------------------------------------
#
def stop(self):

self._log.info('stop agent')
self._final_cause = 'cancel'
super().stop()
self._session.close()


# --------------------------------------------------------------------------
#
def _ctrl_cancel_pilots(self, msg):
Expand Down Expand Up @@ -655,7 +667,7 @@ def _ctrl_service_info(self, msg, arg):
error = arg['error']
info = arg['info']

self._log.debug('=== service info: %s: %s', uid, info)
self._log.debug('service info: %s: %s', uid, info)

# This message signals that an agent service instance is up and running.
# We expect to find the service UID in args and can then unblock the
Expand Down Expand Up @@ -683,7 +695,7 @@ def _ctrl_service_info(self, msg, arg):
self._reg['services.%s' % uid] = info

# signal main thread when that the service is up
self._log.debug('=== set service start event for %s', uid)
self._log.debug('set service start event for %s', uid)
self._service_start_evt.set()

return True
Expand Down
2 changes: 1 addition & 1 deletion src/radical/pilot/agent/executing/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def _to_watcher(self):
`self._cancel_task(task)`. That has to be implemented by al executors.
'''

while True:
while not self._term.is_set():

# check once per second at most
time.sleep(1)
Expand Down
Loading

0 comments on commit ce6ef6c

Please sign in to comment.