From 608f05ea23e2c22b939b4c51e7e4d7385d09b9dc Mon Sep 17 00:00:00 2001 From: Mikhail Titov Date: Fri, 25 Aug 2023 01:34:42 -0400 Subject: [PATCH 1/6] added method to track startup file with service URL (special case - SOMA) --- src/radical/pilot/agent/agent_0.py | 52 +++++++++++++++++++++++++++--- 1 file changed, 47 insertions(+), 5 deletions(-) diff --git a/src/radical/pilot/agent/agent_0.py b/src/radical/pilot/agent/agent_0.py index 99a3e7d18b..6e55a55a46 100644 --- a/src/radical/pilot/agent/agent_0.py +++ b/src/radical/pilot/agent/agent_0.py @@ -347,7 +347,9 @@ def _start_services(self): self._log.info('starting agent services') - services = list() + startup_files = {} + services = [] + for sd in sds: td = TaskDescription(sd) @@ -362,7 +364,6 @@ def _start_services(self): task['origin'] = 'agent' task['description'] = td.as_dict() task['state'] = rps.AGENT_STAGING_INPUT_PENDING - task['status'] = 'NEW' task['type'] = 'service_task' task['uid'] = tid task['pilot_sandbox'] = cfg.pilot_sandbox @@ -377,17 +378,58 @@ def _start_services(self): self._service_uids_launched.append(tid) services.append(task) - self._log.debug('start service %s: %s', tid, sd) - + if 'soma' in td.executable.lower(): + startup_files[tid] = 'soma.txt' self.advance(services, publish=False, push=True) - # Waiting 2mins for all services to launch + if startup_files: + self.register_timed_cb(cb=self._soma_service_state_cb, + cb_data={'name': 'soma', + 'startup_files': startup_files}, + timer=2) + + # waiting 2 mins for all services to launch if not self._services_setup.wait(timeout=60 * 2): raise RuntimeError('Unable to start services') + if startup_files: + self.unregister_timed_cb(self._soma_service_state_cb) + self._log.info('all agent services started') + def _soma_service_state_cb(self, cb_data): + + name = cb_data.get('name') + startup_files = cb_data.get('startup_files', {}) + + reg = None + for tid in list(startup_files): + file = startup_files[tid] + if file and os.path.isfile(file): + + service_urls = {} + with ru.ru_open(file, 'r') as fin: + for line in fin.readlines(): + parts = line.split() + if len(parts) > 1 and '://' in parts[1]: + service_urls[parts[0]] = parts[1] + + if service_urls: + if reg is None: + reg = ru.zmq.RegistryClient(url=self._cfg.reg_addr) + service_key = 'service.%s' % name + for idx, url in service_urls.items(): + reg['%s.%s' % (service_key, idx)] = {'url': url} + + self.publish(rpc.CONTROL_PUBSUB, {'cmd': 'service_up', + 'arg': {'uid': tid}}) + del startup_files[tid] + + if reg is not None: + reg.close() + + return True # -------------------------------------------------------------------------- # From 981fc7a822b52dd14d6be5faed18d93616212062 Mon Sep 17 00:00:00 2001 From: Mikhail Titov Date: Thu, 31 Aug 2023 17:23:41 -0400 Subject: [PATCH 2/6] updated handling of the service startup process --- src/radical/pilot/agent/agent_0.py | 92 ++++++++++++++++++------------ 1 file changed, 57 insertions(+), 35 deletions(-) diff --git a/src/radical/pilot/agent/agent_0.py b/src/radical/pilot/agent/agent_0.py index 6e55a55a46..5abb2a2ba4 100644 --- a/src/radical/pilot/agent/agent_0.py +++ b/src/radical/pilot/agent/agent_0.py @@ -15,7 +15,6 @@ from .. import utils as rpu from .. import states as rps from .. import constants as rpc -from .. import TaskDescription from .. import Session from .. import TaskDescription, AGENT_SERVICE @@ -347,8 +346,9 @@ def _start_services(self): self._log.info('starting agent services') - startup_files = {} + cfg = self._cfg services = [] + services_data = {} for sd in sds: @@ -357,74 +357,98 @@ def _start_services(self): # ensure that the description is viable td.verify() - cfg = self._cfg tid = ru.generate_id('service.%(item_counter)04d', - ru.ID_CUSTOM, ns=self._cfg.sid) + ru.ID_CUSTOM, ns=cfg.sid) task = dict() + task['uid'] = tid + task['type'] = 'service_task' task['origin'] = 'agent' + task['pilot'] = cfg.pid task['description'] = td.as_dict() task['state'] = rps.AGENT_STAGING_INPUT_PENDING - task['type'] = 'service_task' - task['uid'] = tid task['pilot_sandbox'] = cfg.pilot_sandbox task['task_sandbox'] = cfg.pilot_sandbox + task['uid'] + '/' task['task_sandbox_path'] = cfg.pilot_sandbox + task['uid'] + '/' task['session_sandbox'] = cfg.session_sandbox task['resource_sandbox'] = cfg.resource_sandbox - task['pilot'] = cfg.pid task['resources'] = {'cpu': td.ranks * td.cores_per_rank, 'gpu': td.ranks * td.gpus_per_rank} + # TODO: use `type='service_task'` in RADICAL-Analytics + + # TaskDescription.metadata will contain service related data: + # "name" (unique), "startup_file" + self._service_uids_launched.append(tid) services.append(task) - if 'soma' in td.executable.lower(): - startup_files[tid] = 'soma.txt' + services_data[tid] = {} + if td.metadata.get('startup_file'): + n = td.metadata.get('name') + services_data[tid]['name'] = 'service.%s' % n if n else tid + services_data[tid]['startup_file'] = td.metadata['startup_file'] self.advance(services, publish=False, push=True) - if startup_files: - self.register_timed_cb(cb=self._soma_service_state_cb, - cb_data={'name': 'soma', - 'startup_files': startup_files}, - timer=2) + self.register_timed_cb(cb=self._services_startup_cb, + cb_data=services_data, + timer=2) - # waiting 2 mins for all services to launch - if not self._services_setup.wait(timeout=60 * 2): + # waiting for all services to start (max waiting time 2 mins) + if not self._services_setup.wait(timeout=120): raise RuntimeError('Unable to start services') - if startup_files: - self.unregister_timed_cb(self._soma_service_state_cb) + self.unregister_timed_cb(self._services_startup_cb) self._log.info('all agent services started') - def _soma_service_state_cb(self, cb_data): - - name = cb_data.get('name') - startup_files = cb_data.get('startup_files', {}) + def _services_startup_cb(self, cb_data): reg = None - for tid in list(startup_files): - file = startup_files[tid] - if file and os.path.isfile(file): + for tid in list(cb_data): + + service_up = False + startup_file = cb_data[tid].get('startup_file') + if not startup_file: + service_up = True + # FIXME: at this point we assume that since "startup_file" is + # not provided, then we don't wait - this will be + # replaced with another callback (Component.advance will + # publish control command "service_up" for service tasks) + + elif os.path.isfile(startup_file): + # if file exists then service is up (general approach) + service_up = True + + # collect data from the startup file: at this point we look + # for URLs only service_urls = {} - with ru.ru_open(file, 'r') as fin: + with ru.ru_open(startup_file, 'r') as fin: for line in fin.readlines(): + if '://' not in line: + continue parts = line.split() - if len(parts) > 1 and '://' in parts[1]: - service_urls[parts[0]] = parts[1] + if len(parts) == 1: + idx, url = '', parts[0] + elif '://' in parts[1]: + idx, url = parts[0], parts[1] + else: + continue + service_urls[idx] = url if service_urls: if reg is None: reg = ru.zmq.RegistryClient(url=self._cfg.reg_addr) - service_key = 'service.%s' % name + service_key = cb_data[tid]['name'] for idx, url in service_urls.items(): - reg['%s.%s' % (service_key, idx)] = {'url': url} + idx = '.%s' % idx if idx else '' + reg['%s%s' % (service_key, idx)] = {'url': url} - self.publish(rpc.CONTROL_PUBSUB, {'cmd': 'service_up', - 'arg': {'uid': tid}}) - del startup_files[tid] + if service_up: + self.publish(rpc.CONTROL_PUBSUB, {'cmd': 'service_up', + 'arg': {'uid': tid}}) + del cb_data[tid] if reg is not None: reg.close() @@ -701,8 +725,6 @@ def _ctrl_service_up(self, msg): self._log.warn('=== duplicated service startup signal for %s', uid) return True - self._log.debug('=== service startup message for %s', uid) - self._service_uids_running.append(uid) self._log.debug('=== service %s started (%s / %s)', uid, len(self._service_uids_running), From 51ff51f1d49c63ce0853eb8e2d5ff6d0ae47828b Mon Sep 17 00:00:00 2001 From: Mikhail Titov Date: Thu, 7 Sep 2023 18:17:31 -0400 Subject: [PATCH 3/6] updated configs --- src/radical/pilot/agent/agent_0.py | 32 ++++++++++++------------------ 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/src/radical/pilot/agent/agent_0.py b/src/radical/pilot/agent/agent_0.py index 31dea73f76..30eed5ad11 100644 --- a/src/radical/pilot/agent/agent_0.py +++ b/src/radical/pilot/agent/agent_0.py @@ -338,17 +338,15 @@ def _write_sa_configs(self): # def _start_services(self): - sds = self._cfg.services - if not sds: + if not self.session.cfg.services: return self._log.info('starting agent services') - cfg = self._cfg services = [] services_data = {} - for sd in sds: + for sd in self.session.cfg.services: td = TaskDescription(sd) td.mode = AGENT_SERVICE @@ -356,22 +354,24 @@ def _start_services(self): td.verify() tid = ru.generate_id('service.%(item_counter)04d', - ru.ID_CUSTOM, ns=cfg.sid) + ru.ID_CUSTOM, ns=self.session.uid) task = dict() task['uid'] = tid task['type'] = 'service_task' task['origin'] = 'agent' - task['pilot'] = cfg.pid + task['pilot'] = self.session.cfg.pid task['description'] = td.as_dict() task['state'] = rps.AGENT_STAGING_INPUT_PENDING - task['pilot_sandbox'] = cfg.pilot_sandbox - task['task_sandbox'] = cfg.pilot_sandbox + task['uid'] + '/' - task['task_sandbox_path'] = cfg.pilot_sandbox + task['uid'] + '/' - task['session_sandbox'] = cfg.session_sandbox - task['resource_sandbox'] = cfg.resource_sandbox + task['pilot_sandbox'] = self.session.cfg.pilot_sandbox + task['session_sandbox'] = self.session.cfg.session_sandbox + task['resource_sandbox'] = self.session.cfg.resource_sandbox task['resources'] = {'cpu': td.ranks * td.cores_per_rank, 'gpu': td.ranks * td.gpus_per_rank} + task_sandbox = self.session.cfg.pilot_sandbox + tid + '/' + task['task_sandbox'] = task_sandbox + task['task_sandbox_path'] = task_sandbox + # TODO: use `type='service_task'` in RADICAL-Analytics # TaskDescription.metadata will contain service related data: @@ -402,7 +402,6 @@ def _start_services(self): def _services_startup_cb(self, cb_data): - reg = None for tid in list(cb_data): service_up = False @@ -436,21 +435,16 @@ def _services_startup_cb(self, cb_data): service_urls[idx] = url if service_urls: - if reg is None: - reg = ru.zmq.RegistryClient(url=self._cfg.reg_addr) service_key = cb_data[tid]['name'] for idx, url in service_urls.items(): - idx = '.%s' % idx if idx else '' - reg['%s%s' % (service_key, idx)] = {'url': url} + key = '%s%s' % (service_key, '.%s' % idx if idx else '') + self.session._reg[key] = {'url': url} if service_up: self.publish(rpc.CONTROL_PUBSUB, {'cmd': 'service_up', 'arg': {'uid': tid}}) del cb_data[tid] - if reg is not None: - reg.close() - return True # -------------------------------------------------------------------------- From 394cd80ae7c19393fdafef95fd28967ed8142359 Mon Sep 17 00:00:00 2001 From: Mikhail Titov Date: Thu, 7 Sep 2023 18:24:19 -0400 Subject: [PATCH 4/6] fixed registry key for services --- src/radical/pilot/agent/agent_0.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/radical/pilot/agent/agent_0.py b/src/radical/pilot/agent/agent_0.py index 30eed5ad11..203a25a229 100644 --- a/src/radical/pilot/agent/agent_0.py +++ b/src/radical/pilot/agent/agent_0.py @@ -435,10 +435,12 @@ def _services_startup_cb(self, cb_data): service_urls[idx] = url if service_urls: - service_key = cb_data[tid]['name'] + key = cb_data[tid]['name'] for idx, url in service_urls.items(): - key = '%s%s' % (service_key, '.%s' % idx if idx else '') - self.session._reg[key] = {'url': url} + if idx: + key += '.%s' % idx + key += '.url' + self.session._reg[key] = url if service_up: self.publish(rpc.CONTROL_PUBSUB, {'cmd': 'service_up', From 24d21b9f3c1f816bc9fadd0bc4d5cd75e9685cd5 Mon Sep 17 00:00:00 2001 From: Mikhail Titov Date: Fri, 29 Sep 2023 11:05:36 -0400 Subject: [PATCH 5/6] fixed agent instance creation in tests for service tasks --- tests/unit_tests/test_agent_0/test_agent_0.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/unit_tests/test_agent_0/test_agent_0.py b/tests/unit_tests/test_agent_0/test_agent_0.py index 8e35b1a7a3..50e0ac13e5 100755 --- a/tests/unit_tests/test_agent_0/test_agent_0.py +++ b/tests/unit_tests/test_agent_0/test_agent_0.py @@ -190,7 +190,8 @@ def local_advance(things, publish, push): nonlocal advanced_services advanced_services = things - agent_0 = Agent_0(ru.Config(), self._session) + agent_0 = Agent_0() + agent_0._session = self._session agent_0.advance = local_advance agent_0._log = mock.Mock() agent_0._service_uids_launched = list() @@ -235,7 +236,9 @@ def local_advance(things, publish, push): @mock.patch.object(Agent_0, '__init__', return_value=None) def test_ctrl_service_up(self, mocked_init): - agent_0 = Agent_0(ru.Config(), self._session) + agent_0 = Agent_0() + agent_0._cfg = ru.Config() + agent_0._session = self._session agent_0._service_uids_launched = ['101', '102'] agent_0._service_uids_running = [] From 57bfad7686fb5a8063932c019d0e5266ca053b4e Mon Sep 17 00:00:00 2001 From: Mikhail Titov Date: Fri, 29 Sep 2023 12:42:21 -0400 Subject: [PATCH 6/6] fixed tests for `Agent_0` --- tests/unit_tests/test_agent_0/test_agent_0.py | 29 ++++++++++--------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/tests/unit_tests/test_agent_0/test_agent_0.py b/tests/unit_tests/test_agent_0/test_agent_0.py index 50e0ac13e5..7e4c10581b 100755 --- a/tests/unit_tests/test_agent_0/test_agent_0.py +++ b/tests/unit_tests/test_agent_0/test_agent_0.py @@ -69,8 +69,6 @@ def test_check_control_cb(self, mocked_init): def _publish_effect(publish_type, cmd): nonlocal global_control - import pprint - print('=============== pub', pprint.pformat(cmd)) global_control.append((publish_type, cmd)) def _prepenv_effect(env_name, env_spec): @@ -102,7 +100,6 @@ def _prepenv_effect(env_name, env_spec): 'kwargs': {'env_name': 'radical', 'env_spec': 'spec'}}) self.assertIsNone(agent_cmp._control_cb(None, msg)) - print('====', global_control, '====') self.assertEqual(global_control[0], ('control_pubsub', RPCResultMessage({'uid': 'rpc.0004', @@ -191,29 +188,35 @@ def local_advance(things, publish, push): advanced_services = things agent_0 = Agent_0() - agent_0._session = self._session - agent_0.advance = local_advance + agent_0._uid = 'agent_0' + agent_0._cb_lock = mt.RLock() + agent_0._threads = dict() agent_0._log = mock.Mock() agent_0._service_uids_launched = list() agent_0._services_setup = mock.Mock() - agent_0._cfg = ru.Config(from_dict={'pid' : 12, - 'pilot_sandbox': '/', - 'services' : []}) + agent_0.advance = local_advance + + agent_0._session = self._session + agent_0._session._cfg = ru.Config(from_dict={'pid' : 12, + 'pilot_sandbox': '/', + 'services' : []}) - agent_0._cfg.services = [{}] + agent_0._session._cfg.services = [{}] with self.assertRaises(ValueError): # no executable provided agent_0._start_services() - agent_0._cfg.services = [{'executable': 'test', 'ranks': 'zero'}] + agent_0._session._cfg.services = [{'executable': 'test', + 'ranks' : 'zero'}] with self.assertRaises(TypeError): # type mismatch agent_0._start_services() - services = [{'executable': '/bin/ls', - 'cores_per_rank': '3'}] - agent_0._cfg.services = services + services = [{'executable' : '/bin/ls', + 'cores_per_rank': '3', + 'metadata' : {}}] + agent_0._session._cfg.services = services agent_0._services_setup.wait = mock.Mock(return_value=True) agent_0._start_services()