From 95070f6b706d71b79020b9b9da7a405de3231d56 Mon Sep 17 00:00:00 2001 From: Trygve Aspenes Date: Wed, 15 Feb 2023 21:16:38 +0100 Subject: [PATCH 1/6] add try/except and check if arec is running --- posttroll/address_receiver.py | 82 ++++++++++++++++++----------------- posttroll/ns.py | 2 + 2 files changed, 45 insertions(+), 39 deletions(-) diff --git a/posttroll/address_receiver.py b/posttroll/address_receiver.py index 27106ef..bad26f8 100644 --- a/posttroll/address_receiver.py +++ b/posttroll/address_receiver.py @@ -173,46 +173,50 @@ def _run(self): nameservers = ["localhost"] self._is_running = True - with Publish("address_receiver", self._port, ["addresses"], - nameservers=nameservers) as pub: - try: - while self._do_run: - try: - data, fromaddr = recv() - if self._multicast_enabled: - ip_, port = fromaddr - if self._restrict_to_localhost and ip_ not in self._local_ips: - # discard external message - LOGGER.debug('Discard external message') + try: + with Publish("address_receiver", self._port, ["addresses"], + nameservers=nameservers) as pub: + try: + while self._do_run: + try: + data, fromaddr = recv() + if self._multicast_enabled: + ip_, port = fromaddr + if self._restrict_to_localhost and ip_ not in self._local_ips: + # discard external message + LOGGER.debug('Discard external message') + continue + LOGGER.debug("data %s", data) + except SocketTimeout: + if self._multicast_enabled: + LOGGER.debug("Multicast socket timed out on recv!") continue - LOGGER.debug("data %s", data) - except SocketTimeout: - if self._multicast_enabled: - LOGGER.debug("Multicast socket timed out on recv!") - continue - finally: - self._check_age(pub, min_interval=self._max_age / 20) - if self._do_heartbeat: - pub.heartbeat(min_interval=29) - msg = Message.decode(data) - name = msg.subject.split("/")[1] - if(msg.type == 'info' and - msg.subject.lower().startswith(self._subject)): - addr = msg.data["URI"] - msg.data['status'] = True - metadata = copy.copy(msg.data) - metadata["name"] = name - - LOGGER.debug('receiving address %s %s %s', str(addr), - str(name), str(metadata)) - if addr not in self._addresses: - LOGGER.info("nameserver: publish add '%s'", - str(msg)) - pub.send(msg.encode()) - self._add(addr, metadata) - finally: - self._is_running = False - recv.close() + finally: + self._check_age(pub, min_interval=self._max_age / 20) + if self._do_heartbeat: + pub.heartbeat(min_interval=29) + msg = Message.decode(data) + name = msg.subject.split("/")[1] + if(msg.type == 'info' and + msg.subject.lower().startswith(self._subject)): + addr = msg.data["URI"] + msg.data['status'] = True + metadata = copy.copy(msg.data) + metadata["name"] = name + + LOGGER.debug('receiving address %s %s %s', str(addr), + str(name), str(metadata)) + if addr not in self._addresses: + LOGGER.info("nameserver: publish add '%s'", + str(msg)) + pub.send(msg.encode()) + self._add(addr, metadata) + finally: + self._is_running = False + recv.close() + except OSError: + LOGGER.exception("Fails to start address receiver run loop.") + self._is_running = False def _add(self, adr, metadata): """Add an address.""" diff --git a/posttroll/ns.py b/posttroll/ns.py index 5baf54a..79623f6 100644 --- a/posttroll/ns.py +++ b/posttroll/ns.py @@ -132,6 +132,8 @@ def run(self, *args): multicast_enabled=self._multicast_enabled, restrict_to_localhost=self._restrict_to_localhost) arec.start() + if not arec.is_running(): + return port = PORT try: From a29d2b11ba2c417135a228c7ee552986236310c5 Mon Sep 17 00:00:00 2001 From: stickler-ci Date: Wed, 15 Feb 2023 20:19:59 +0000 Subject: [PATCH 2/6] Fixing style errors. --- posttroll/address_receiver.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/posttroll/address_receiver.py b/posttroll/address_receiver.py index bad26f8..c868aa7 100644 --- a/posttroll/address_receiver.py +++ b/posttroll/address_receiver.py @@ -175,7 +175,7 @@ def _run(self): self._is_running = True try: with Publish("address_receiver", self._port, ["addresses"], - nameservers=nameservers) as pub: + nameservers=nameservers) as pub: try: while self._do_run: try: @@ -197,15 +197,15 @@ def _run(self): pub.heartbeat(min_interval=29) msg = Message.decode(data) name = msg.subject.split("/")[1] - if(msg.type == 'info' and - msg.subject.lower().startswith(self._subject)): + if (msg.type == 'info' and + msg.subject.lower().startswith(self._subject)): addr = msg.data["URI"] msg.data['status'] = True metadata = copy.copy(msg.data) metadata["name"] = name LOGGER.debug('receiving address %s %s %s', str(addr), - str(name), str(metadata)) + str(name), str(metadata)) if addr not in self._addresses: LOGGER.info("nameserver: publish add '%s'", str(msg)) From 9013e431532f0a08e220dedac862de5a987d54e0 Mon Sep 17 00:00:00 2001 From: Trygve Aspenes Date: Thu, 16 Feb 2023 07:50:53 +0100 Subject: [PATCH 3/6] refactor_run and add test --- posttroll/address_receiver.py | 71 ++++++++++++++++++---------------- posttroll/tests/test_pubsub.py | 10 +++++ 2 files changed, 47 insertions(+), 34 deletions(-) diff --git a/posttroll/address_receiver.py b/posttroll/address_receiver.py index c868aa7..60401d7 100644 --- a/posttroll/address_receiver.py +++ b/posttroll/address_receiver.py @@ -146,6 +146,42 @@ def _check_age(self, pub, min_interval=zero_seconds): for addr in to_del: del self._addresses[addr] + def _recv_loop_and_parse_data(self, recv, pub): + while self._do_run: + try: + data, fromaddr = recv() + if self._multicast_enabled: + ip_, port = fromaddr + if self._restrict_to_localhost and ip_ not in self._local_ips: + # discard external message + LOGGER.debug('Discard external message') + continue + LOGGER.debug("data %s", data) + except SocketTimeout: + if self._multicast_enabled: + LOGGER.debug("Multicast socket timed out on recv!") + continue + finally: + self._check_age(pub, min_interval=self._max_age / 20) + if self._do_heartbeat: + pub.heartbeat(min_interval=29) + msg = Message.decode(data) + name = msg.subject.split("/")[1] + if (msg.type == 'info' and + msg.subject.lower().startswith(self._subject)): + addr = msg.data["URI"] + msg.data['status'] = True + metadata = copy.copy(msg.data) + metadata["name"] = name + + LOGGER.debug('receiving address %s %s %s', str(addr), + str(name), str(metadata)) + if addr not in self._addresses: + LOGGER.info("nameserver: publish add '%s'", + str(msg)) + pub.send(msg.encode()) + self._add(addr, metadata) + def _run(self): """Run the receiver.""" port = broadcast_port @@ -177,40 +213,7 @@ def _run(self): with Publish("address_receiver", self._port, ["addresses"], nameservers=nameservers) as pub: try: - while self._do_run: - try: - data, fromaddr = recv() - if self._multicast_enabled: - ip_, port = fromaddr - if self._restrict_to_localhost and ip_ not in self._local_ips: - # discard external message - LOGGER.debug('Discard external message') - continue - LOGGER.debug("data %s", data) - except SocketTimeout: - if self._multicast_enabled: - LOGGER.debug("Multicast socket timed out on recv!") - continue - finally: - self._check_age(pub, min_interval=self._max_age / 20) - if self._do_heartbeat: - pub.heartbeat(min_interval=29) - msg = Message.decode(data) - name = msg.subject.split("/")[1] - if (msg.type == 'info' and - msg.subject.lower().startswith(self._subject)): - addr = msg.data["URI"] - msg.data['status'] = True - metadata = copy.copy(msg.data) - metadata["name"] = name - - LOGGER.debug('receiving address %s %s %s', str(addr), - str(name), str(metadata)) - if addr not in self._addresses: - LOGGER.info("nameserver: publish add '%s'", - str(msg)) - pub.send(msg.encode()) - self._add(addr, metadata) + self._recv_loop_and_parse_data(recv, pub) finally: self._is_running = False recv.close() diff --git a/posttroll/tests/test_pubsub.py b/posttroll/tests/test_pubsub.py index 67bdd47..eba7cec 100644 --- a/posttroll/tests/test_pubsub.py +++ b/posttroll/tests/test_pubsub.py @@ -446,6 +446,16 @@ def test_localhost_restriction(self, mcrec, pub, msg): msg.decode.assert_not_called() adr.stop() + @mock.patch("posttroll.address_receiver.Publish") + def test_publish_oserror(self, pub): + """Test address receiver handle oserror in publish.""" + pub.side_effect = OSError + from posttroll.address_receiver import AddressReceiver + adr = AddressReceiver() + adr.start() + time.sleep(3) + self.assertFalse(adr.is_running()) + adr.stop() class TestPublisherDictConfig(unittest.TestCase): """Test configuring publishers with a dictionary.""" From fdaf9b009d509f6349f2425145d2b55733b25111 Mon Sep 17 00:00:00 2001 From: stickler-ci Date: Thu, 16 Feb 2023 06:51:07 +0000 Subject: [PATCH 4/6] Fixing style errors. --- posttroll/address_receiver.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/posttroll/address_receiver.py b/posttroll/address_receiver.py index 60401d7..32cf5bf 100644 --- a/posttroll/address_receiver.py +++ b/posttroll/address_receiver.py @@ -168,14 +168,14 @@ def _recv_loop_and_parse_data(self, recv, pub): msg = Message.decode(data) name = msg.subject.split("/")[1] if (msg.type == 'info' and - msg.subject.lower().startswith(self._subject)): + msg.subject.lower().startswith(self._subject)): addr = msg.data["URI"] msg.data['status'] = True metadata = copy.copy(msg.data) metadata["name"] = name LOGGER.debug('receiving address %s %s %s', str(addr), - str(name), str(metadata)) + str(name), str(metadata)) if addr not in self._addresses: LOGGER.info("nameserver: publish add '%s'", str(msg)) From eca60b355b7ba8b665110b5f28f371bc81e864b2 Mon Sep 17 00:00:00 2001 From: Trygve Aspenes Date: Thu, 16 Feb 2023 09:15:40 +0100 Subject: [PATCH 5/6] logger message --- posttroll/ns.py | 1 + 1 file changed, 1 insertion(+) diff --git a/posttroll/ns.py b/posttroll/ns.py index 79623f6..83ef232 100644 --- a/posttroll/ns.py +++ b/posttroll/ns.py @@ -133,6 +133,7 @@ def run(self, *args): restrict_to_localhost=self._restrict_to_localhost) arec.start() if not arec.is_running(): + logger.error("Address Receiver fails to start.") return port = PORT From 416537646db71e7439ffc451500e9bef6a461887 Mon Sep 17 00:00:00 2001 From: Trygve Aspenes Date: Thu, 16 Feb 2023 09:16:21 +0100 Subject: [PATCH 6/6] add test nameserver failes to start --- posttroll/tests/test_pubsub.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/posttroll/tests/test_pubsub.py b/posttroll/tests/test_pubsub.py index eba7cec..47a33fc 100644 --- a/posttroll/tests/test_pubsub.py +++ b/posttroll/tests/test_pubsub.py @@ -603,6 +603,18 @@ def test_dict_config_subscriber(NSSubscriber, Subscriber): NSSubscriber.assert_not_called() +@mock.patch('posttroll.ns.AddressReceiver') +def test_nameserver_addressreceiver_fails_to_start(arec): + from posttroll.ns import NameServer + arec_instance = mock.Mock() + arec.return_value = arec_instance + arec_instance.is_running.return_value = False + ns = NameServer(max_age=timedelta(seconds=3), + multicast_enabled=False) + ns_run_ret = ns.run() + assert ns_run_ret is None + + @mock.patch('posttroll.subscriber.NSSubscriber.start') def test_dict_config_full_nssubscriber(NSSubscriber_start): """Test that all NSSubscriber options are passed."""