Skip to content

Commit

Permalink
Merge pull request #3026 from radical-cybertools/feature/service_star…
Browse files Browse the repository at this point in the history
…tup_flag

Feature/service startup flag
  • Loading branch information
mturilli authored Sep 29, 2023
2 parents 7b1d9d9 + 57bfad7 commit 26bcdff
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 36 deletions.
104 changes: 82 additions & 22 deletions src/radical/pilot/agent/agent_0.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from .. import utils as rpu
from .. import states as rps
from .. import constants as rpc
from .. import TaskDescription
from .. import Session
from .. import TaskDescription, AGENT_SERVICE

Expand Down Expand Up @@ -339,53 +338,116 @@ def _write_sa_configs(self):
#
def _start_services(self):

sds = self._cfg.services
if not sds:
if not self.session.cfg.services:
return

self._log.info('starting agent services')

services = list()
for sd in sds:
services = []
services_data = {}

for sd in self.session.cfg.services:

td = TaskDescription(sd)
td.mode = AGENT_SERVICE
# ensure that the description is viable
td.verify()

cfg = self._cfg
tid = ru.generate_id('service.%(item_counter)04d',
ru.ID_CUSTOM, ns=self._cfg.sid)
ru.ID_CUSTOM, ns=self.session.uid)
task = dict()
task['uid'] = tid
task['type'] = 'service_task'
task['origin'] = 'agent'
task['pilot'] = self.session.cfg.pid
task['description'] = td.as_dict()
task['state'] = rps.AGENT_STAGING_INPUT_PENDING
task['status'] = 'NEW'
task['type'] = 'service_task'
task['uid'] = tid
task['pilot_sandbox'] = cfg.pilot_sandbox
task['task_sandbox'] = cfg.pilot_sandbox + task['uid'] + '/'
task['task_sandbox_path'] = cfg.pilot_sandbox + task['uid'] + '/'
task['session_sandbox'] = cfg.session_sandbox
task['resource_sandbox'] = cfg.resource_sandbox
task['pilot'] = cfg.pid
task['pilot_sandbox'] = self.session.cfg.pilot_sandbox
task['session_sandbox'] = self.session.cfg.session_sandbox
task['resource_sandbox'] = self.session.cfg.resource_sandbox
task['resources'] = {'cpu': td.ranks * td.cores_per_rank,
'gpu': td.ranks * td.gpus_per_rank}

task_sandbox = self.session.cfg.pilot_sandbox + tid + '/'
task['task_sandbox'] = task_sandbox
task['task_sandbox_path'] = task_sandbox

# TODO: use `type='service_task'` in RADICAL-Analytics

# TaskDescription.metadata will contain service related data:
# "name" (unique), "startup_file"

self._service_uids_launched.append(tid)
services.append(task)

self._log.debug('start service %s: %s', tid, sd)

services_data[tid] = {}
if td.metadata.get('startup_file'):
n = td.metadata.get('name')
services_data[tid]['name'] = 'service.%s' % n if n else tid
services_data[tid]['startup_file'] = td.metadata['startup_file']

self.advance(services, publish=False, push=True)

# Waiting 2mins for all services to launch
if not self._services_setup.wait(timeout=60 * 2):
self.register_timed_cb(cb=self._services_startup_cb,
cb_data=services_data,
timer=2)

# waiting for all services to start (max waiting time 2 mins)
if not self._services_setup.wait(timeout=120):
raise RuntimeError('Unable to start services')

self.unregister_timed_cb(self._services_startup_cb)

self._log.info('all agent services started')

def _services_startup_cb(self, cb_data):

for tid in list(cb_data):

service_up = False
startup_file = cb_data[tid].get('startup_file')

if not startup_file:
service_up = True
# FIXME: at this point we assume that since "startup_file" is
# not provided, then we don't wait - this will be
# replaced with another callback (Component.advance will
# publish control command "service_up" for service tasks)

elif os.path.isfile(startup_file):
# if file exists then service is up (general approach)
service_up = True

# collect data from the startup file: at this point we look
# for URLs only
service_urls = {}
with ru.ru_open(startup_file, 'r') as fin:
for line in fin.readlines():
if '://' not in line:
continue
parts = line.split()
if len(parts) == 1:
idx, url = '', parts[0]
elif '://' in parts[1]:
idx, url = parts[0], parts[1]
else:
continue
service_urls[idx] = url

if service_urls:
key = cb_data[tid]['name']
for idx, url in service_urls.items():
if idx:
key += '.%s' % idx
key += '.url'
self.session._reg[key] = url

if service_up:
self.publish(rpc.CONTROL_PUBSUB, {'cmd': 'service_up',
'arg': {'uid': tid}})
del cb_data[tid]

return True

# --------------------------------------------------------------------------
#
Expand Down Expand Up @@ -597,8 +659,6 @@ def _ctrl_service_up(self, msg):
self._log.warn('duplicated service startup signal for %s', uid)
return True

self._log.debug('service startup message for %s', uid)

self._service_uids_running.append(uid)
self._log.debug('service %s started (%s / %s)', uid,
len(self._service_uids_running),
Expand Down
34 changes: 20 additions & 14 deletions tests/unit_tests/test_agent_0/test_agent_0.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ def test_check_control_cb(self, mocked_init):

def _publish_effect(publish_type, cmd):
nonlocal global_control
import pprint
print('=============== pub', pprint.pformat(cmd))
global_control.append((publish_type, cmd))

def _prepenv_effect(env_name, env_spec):
Expand Down Expand Up @@ -102,7 +100,6 @@ def _prepenv_effect(env_name, env_spec):
'kwargs': {'env_name': 'radical',
'env_spec': 'spec'}})
self.assertIsNone(agent_cmp._control_cb(None, msg))
print('====', global_control, '====')
self.assertEqual(global_control[0],
('control_pubsub',
RPCResultMessage({'uid': 'rpc.0004',
Expand Down Expand Up @@ -190,29 +187,36 @@ def local_advance(things, publish, push):
nonlocal advanced_services
advanced_services = things

agent_0 = Agent_0(ru.Config(), self._session)
agent_0.advance = local_advance
agent_0 = Agent_0()
agent_0._uid = 'agent_0'
agent_0._cb_lock = mt.RLock()
agent_0._threads = dict()
agent_0._log = mock.Mock()
agent_0._service_uids_launched = list()
agent_0._services_setup = mock.Mock()

agent_0._cfg = ru.Config(from_dict={'pid' : 12,
'pilot_sandbox': '/',
'services' : []})
agent_0.advance = local_advance

agent_0._session = self._session
agent_0._session._cfg = ru.Config(from_dict={'pid' : 12,
'pilot_sandbox': '/',
'services' : []})

agent_0._cfg.services = [{}]
agent_0._session._cfg.services = [{}]
with self.assertRaises(ValueError):
# no executable provided
agent_0._start_services()

agent_0._cfg.services = [{'executable': 'test', 'ranks': 'zero'}]
agent_0._session._cfg.services = [{'executable': 'test',
'ranks' : 'zero'}]
with self.assertRaises(TypeError):
# type mismatch
agent_0._start_services()

services = [{'executable': '/bin/ls',
'cores_per_rank': '3'}]
agent_0._cfg.services = services
services = [{'executable' : '/bin/ls',
'cores_per_rank': '3',
'metadata' : {}}]
agent_0._session._cfg.services = services

agent_0._services_setup.wait = mock.Mock(return_value=True)
agent_0._start_services()
Expand All @@ -235,7 +239,9 @@ def local_advance(things, publish, push):
@mock.patch.object(Agent_0, '__init__', return_value=None)
def test_ctrl_service_up(self, mocked_init):

agent_0 = Agent_0(ru.Config(), self._session)
agent_0 = Agent_0()
agent_0._cfg = ru.Config()
agent_0._session = self._session
agent_0._service_uids_launched = ['101', '102']
agent_0._service_uids_running = []

Expand Down

0 comments on commit 26bcdff

Please sign in to comment.