diff --git a/src/radical/pilot/agent/agent_0.py b/src/radical/pilot/agent/agent_0.py index c2d2fa84d6..5b78c4f63b 100644 --- a/src/radical/pilot/agent/agent_0.py +++ b/src/radical/pilot/agent/agent_0.py @@ -15,7 +15,6 @@ from .. import utils as rpu from .. import states as rps from .. import constants as rpc -from .. import TaskDescription from .. import Session from .. import TaskDescription, AGENT_SERVICE @@ -339,53 +338,116 @@ def _write_sa_configs(self): # def _start_services(self): - sds = self._cfg.services - if not sds: + if not self.session.cfg.services: return self._log.info('starting agent services') - services = list() - for sd in sds: + services = [] + services_data = {} + + for sd in self.session.cfg.services: td = TaskDescription(sd) td.mode = AGENT_SERVICE # ensure that the description is viable td.verify() - cfg = self._cfg tid = ru.generate_id('service.%(item_counter)04d', - ru.ID_CUSTOM, ns=self._cfg.sid) + ru.ID_CUSTOM, ns=self.session.uid) task = dict() + task['uid'] = tid + task['type'] = 'service_task' task['origin'] = 'agent' + task['pilot'] = self.session.cfg.pid task['description'] = td.as_dict() task['state'] = rps.AGENT_STAGING_INPUT_PENDING - task['status'] = 'NEW' - task['type'] = 'service_task' - task['uid'] = tid - task['pilot_sandbox'] = cfg.pilot_sandbox - task['task_sandbox'] = cfg.pilot_sandbox + task['uid'] + '/' - task['task_sandbox_path'] = cfg.pilot_sandbox + task['uid'] + '/' - task['session_sandbox'] = cfg.session_sandbox - task['resource_sandbox'] = cfg.resource_sandbox - task['pilot'] = cfg.pid + task['pilot_sandbox'] = self.session.cfg.pilot_sandbox + task['session_sandbox'] = self.session.cfg.session_sandbox + task['resource_sandbox'] = self.session.cfg.resource_sandbox task['resources'] = {'cpu': td.ranks * td.cores_per_rank, 'gpu': td.ranks * td.gpus_per_rank} + task_sandbox = self.session.cfg.pilot_sandbox + tid + '/' + task['task_sandbox'] = task_sandbox + task['task_sandbox_path'] = task_sandbox + + # TODO: use `type='service_task'` in RADICAL-Analytics + + # TaskDescription.metadata will contain service related data: + # "name" (unique), "startup_file" + self._service_uids_launched.append(tid) services.append(task) - self._log.debug('start service %s: %s', tid, sd) - + services_data[tid] = {} + if td.metadata.get('startup_file'): + n = td.metadata.get('name') + services_data[tid]['name'] = 'service.%s' % n if n else tid + services_data[tid]['startup_file'] = td.metadata['startup_file'] self.advance(services, publish=False, push=True) - # Waiting 2mins for all services to launch - if not self._services_setup.wait(timeout=60 * 2): + self.register_timed_cb(cb=self._services_startup_cb, + cb_data=services_data, + timer=2) + + # waiting for all services to start (max waiting time 2 mins) + if not self._services_setup.wait(timeout=120): raise RuntimeError('Unable to start services') + self.unregister_timed_cb(self._services_startup_cb) + self._log.info('all agent services started') + def _services_startup_cb(self, cb_data): + + for tid in list(cb_data): + + service_up = False + startup_file = cb_data[tid].get('startup_file') + + if not startup_file: + service_up = True + # FIXME: at this point we assume that since "startup_file" is + # not provided, then we don't wait - this will be + # replaced with another callback (Component.advance will + # publish control command "service_up" for service tasks) + + elif os.path.isfile(startup_file): + # if file exists then service is up (general approach) + service_up = True + + # collect data from the startup file: at this point we look + # for URLs only + service_urls = {} + with ru.ru_open(startup_file, 'r') as fin: + for line in fin.readlines(): + if '://' not in line: + continue + parts = line.split() + if len(parts) == 1: + idx, url = '', parts[0] + elif '://' in parts[1]: + idx, url = parts[0], parts[1] + else: + continue + service_urls[idx] = url + + if service_urls: + key = cb_data[tid]['name'] + for idx, url in service_urls.items(): + if idx: + key += '.%s' % idx + key += '.url' + self.session._reg[key] = url + + if service_up: + self.publish(rpc.CONTROL_PUBSUB, {'cmd': 'service_up', + 'arg': {'uid': tid}}) + del cb_data[tid] + + return True # -------------------------------------------------------------------------- # @@ -597,8 +659,6 @@ def _ctrl_service_up(self, msg): self._log.warn('duplicated service startup signal for %s', uid) return True - self._log.debug('service startup message for %s', uid) - self._service_uids_running.append(uid) self._log.debug('service %s started (%s / %s)', uid, len(self._service_uids_running), diff --git a/tests/unit_tests/test_agent_0/test_agent_0.py b/tests/unit_tests/test_agent_0/test_agent_0.py index 8e35b1a7a3..7e4c10581b 100755 --- a/tests/unit_tests/test_agent_0/test_agent_0.py +++ b/tests/unit_tests/test_agent_0/test_agent_0.py @@ -69,8 +69,6 @@ def test_check_control_cb(self, mocked_init): def _publish_effect(publish_type, cmd): nonlocal global_control - import pprint - print('=============== pub', pprint.pformat(cmd)) global_control.append((publish_type, cmd)) def _prepenv_effect(env_name, env_spec): @@ -102,7 +100,6 @@ def _prepenv_effect(env_name, env_spec): 'kwargs': {'env_name': 'radical', 'env_spec': 'spec'}}) self.assertIsNone(agent_cmp._control_cb(None, msg)) - print('====', global_control, '====') self.assertEqual(global_control[0], ('control_pubsub', RPCResultMessage({'uid': 'rpc.0004', @@ -190,29 +187,36 @@ def local_advance(things, publish, push): nonlocal advanced_services advanced_services = things - agent_0 = Agent_0(ru.Config(), self._session) - agent_0.advance = local_advance + agent_0 = Agent_0() + agent_0._uid = 'agent_0' + agent_0._cb_lock = mt.RLock() + agent_0._threads = dict() agent_0._log = mock.Mock() agent_0._service_uids_launched = list() agent_0._services_setup = mock.Mock() - agent_0._cfg = ru.Config(from_dict={'pid' : 12, - 'pilot_sandbox': '/', - 'services' : []}) + agent_0.advance = local_advance + + agent_0._session = self._session + agent_0._session._cfg = ru.Config(from_dict={'pid' : 12, + 'pilot_sandbox': '/', + 'services' : []}) - agent_0._cfg.services = [{}] + agent_0._session._cfg.services = [{}] with self.assertRaises(ValueError): # no executable provided agent_0._start_services() - agent_0._cfg.services = [{'executable': 'test', 'ranks': 'zero'}] + agent_0._session._cfg.services = [{'executable': 'test', + 'ranks' : 'zero'}] with self.assertRaises(TypeError): # type mismatch agent_0._start_services() - services = [{'executable': '/bin/ls', - 'cores_per_rank': '3'}] - agent_0._cfg.services = services + services = [{'executable' : '/bin/ls', + 'cores_per_rank': '3', + 'metadata' : {}}] + agent_0._session._cfg.services = services agent_0._services_setup.wait = mock.Mock(return_value=True) agent_0._start_services() @@ -235,7 +239,9 @@ def local_advance(things, publish, push): @mock.patch.object(Agent_0, '__init__', return_value=None) def test_ctrl_service_up(self, mocked_init): - agent_0 = Agent_0(ru.Config(), self._session) + agent_0 = Agent_0() + agent_0._cfg = ru.Config() + agent_0._session = self._session agent_0._service_uids_launched = ['101', '102'] agent_0._service_uids_running = []