diff --git a/.github/workflows/pylint.yaml b/.github/workflows/pylint.yaml index 21b71d9..780df7a 100644 --- a/.github/workflows/pylint.yaml +++ b/.github/workflows/pylint.yaml @@ -36,4 +36,4 @@ jobs: - name: Analysing the code with pylint run: | python_files=$(find . -name "*.py") - pylint --disable=line-too-long --disable=too-many-lines --disable=too-many-return-statements --disable=too-many-branches --disable=too-many-statements --disable=too-many-branches --disable=too-many-nested-blocks --disable=no-else-return --disable=too-many-arguments --disable=unused-argument --disable=invalid-name --disable=too-many-instance-attributes --disable=consider-using-with --disable=too-many-public-methods ${python_files} + pylint --disable=too-many-locals --disable=line-too-long --disable=too-many-lines --disable=too-many-return-statements --disable=too-many-branches --disable=too-many-statements --disable=too-many-branches --disable=too-many-nested-blocks --disable=no-else-return --disable=too-many-arguments --disable=unused-argument --disable=invalid-name --disable=too-many-instance-attributes --disable=consider-using-with --disable=too-many-public-methods ${python_files} diff --git a/roadblock.py b/roadblock.py index 60a5791..3223be8 100644 --- a/roadblock.py +++ b/roadblock.py @@ -571,6 +571,9 @@ def define_msg_schema(self): "command": { "type": "string", "enum": [ + "global-bus-created", + "leader-bus-created", + "followers-bus-created", "timeout-ts", "initialized", "switch-buses", @@ -700,10 +703,10 @@ def send_user_messages(self): for user_msg in self.user_messages: if "user-string" in user_msg: self.logger.info("Sending user message %d: 'user-string'", user_msg_counter) - self.message_publish(self.message_build(user_msg["recipient"]["type"], user_msg["recipient"]["id"], "user-string", user_msg["user-string"])) + self.message_publish("global", self.message_build(user_msg["recipient"]["type"], user_msg["recipient"]["id"], "user-string", user_msg["user-string"])) elif "user-object" in user_msg: self.logger.info("Sending user message %d: 'user-object'", user_msg_counter) - self.message_publish(self.message_build(user_msg["recipient"]["type"], user_msg["recipient"]["id"], "user-object", user_msg["user-object"])) + self.message_publish("global", self.message_build(user_msg["recipient"]["type"], user_msg["recipient"]["id"], "user-object", user_msg["user-object"])) user_msg_counter += 1 @@ -727,7 +730,9 @@ def message_handle (self, message): msg_command = self.message_get_command(message) - if msg_command == "timeout-ts": + if msg_command in ("global-bus-created", "leader-bus-created", "followers-bus-created"): + self.logger.info("Received '%s' message", msg_command) + elif msg_command == "timeout-ts": self.logger.info("Received 'timeout-ts' message") cluster_timeout = int(self.message_get_value(message)) @@ -760,7 +765,7 @@ def message_handle (self, message): if len(self.followers["online"]) == 0: self.logger.info("Sending 'all-online' message") - self.message_publish(self.message_build("all", "all", "all-online")) + self.message_publish("followers", self.message_build("all", "all", "all-online")) if self.initiator: self.send_user_messages() elif msg_command == "all-online": @@ -774,15 +779,15 @@ def message_handle (self, message): if self.roadblock_role == "follower": if self.abort: self.logger.info("Sending 'follower-ready-abort' message") - self.message_publish(self.message_build("leader", self.roadblock_leader_id, "follower-ready-abort")) + self.message_publish("leader", self.message_build("leader", self.roadblock_leader_id, "follower-ready-abort")) else: if self.wait_for is not None and self.wait_for_process is not None and self.wait_for_process.poll() is None: self.wait_for_waiting = True self.logger.info("Sending 'follower-ready-waiting' message") - self.message_publish(self.message_build("leader", self.roadblock_leader_id, "follower-ready-waiting")) + self.message_publish("leader", self.message_build("leader", self.roadblock_leader_id, "follower-ready-waiting")) else: self.logger.info("Sending 'follower-ready' message") - self.message_publish(self.message_build("leader", self.roadblock_leader_id, "follower-ready")) + self.message_publish("leader", self.message_build("leader", self.roadblock_leader_id, "follower-ready")) elif msg_command in ("follower-ready", "follower-ready-abort", "follower-ready-waiting"): if self.roadblock_role == "leader": self.logger.debug("leader got a '%s' message", msg_command) @@ -807,14 +812,14 @@ def message_handle (self, message): if len(self.followers["ready"]) == 0: self.logger.info("Sending 'all-ready' message") - self.message_publish(self.message_build("all", "all", "all-ready")) + self.message_publish("followers", self.message_build("all", "all", "all-ready")) if self.leader_abort: self.logger.info("Sending 'all-abort' command") - self.message_publish(self.message_build("all", "all", "all-abort")) + self.message_publish("followers", self.message_build("all", "all", "all-abort")) elif self.roadblock_waiting: self.logger.info("Sending 'all-wait' command") - self.message_publish(self.message_build("all", "all", "all-wait")) + self.message_publish("followers", self.message_build("all", "all", "all-wait")) self.logger.info("Disabling original timeout") self.disable_alarm() @@ -824,13 +829,13 @@ def message_handle (self, message): signal.signal(signal.SIGALRM, self.heartbeat_signal_handler) self.logger.info("Sending 'leader-heartbeat' message") - self.message_publish(self.message_build("all", "all", "leader-heartbeat")) + self.message_publish("followers", self.message_build("all", "all", "leader-heartbeat")) self.logger.info("Starting heartbeat monitoring period") self.set_alarm(self.heartbeat_timeout) else: self.logger.info("Sending 'all-go' command") - self.message_publish(self.message_build("all", "all", "all-go")) + self.message_publish("followers", self.message_build("all", "all", "all-go")) elif msg_command == "all-wait": if self.roadblock_role == "follower": self.logger.info("Received 'all-wait' message") @@ -845,7 +850,7 @@ def message_handle (self, message): self.logger.critical("Not sending 'follower-heartbeat' because of heartbeat timeout simulation request") else: self.logger.info("Sending 'follower-heartbeat' message") - self.message_publish(self.message_build("leader", self.roadblock_leader_id, "follower-heartbeat")) + self.message_publish("leader", self.message_build("leader", self.roadblock_leader_id, "follower-heartbeat")) elif msg_command == "follower-heartbeat": if self.roadblock_role == "leader": self.logger.info("Received '%s' message", msg_command) @@ -867,10 +872,10 @@ def message_handle (self, message): if self.waiting_failed: self.leader_abort = True self.logger.info("Sending 'all-abort' command") - self.message_publish(self.message_build("all", "all", "all-abort")) + self.message_publish("followers", self.message_build("all", "all", "all-abort")) else: self.logger.info("Sending 'all-go' command") - self.message_publish(self.message_build("all", "all", "all-go")) + self.message_publish("followers", self.message_build("all", "all", "all-go")) elif msg_command in ("follower-waiting-complete", "follower-waiting-complete-failed"): if self.roadblock_role == "leader": self.logger.info("Received '%s' message", msg_command) @@ -895,10 +900,10 @@ def message_handle (self, message): if self.waiting_failed: self.leader_abort_waiting = True self.logger.info("Sending 'all-abort' command") - self.message_publish(self.message_build("all", "all", "all-abort")) + self.message_publish("followers", self.message_build("all", "all", "all-abort")) else: self.logger.info("Sending 'all-go' command") - self.message_publish(self.message_build("all", "all", "all-go")) + self.message_publish("followers", self.message_build("all", "all", "all-go")) elif msg_command == "heartbeat-timeout": if self.roadblock_role == "follower": self.logger.info("Received '%s' message", msg_command) @@ -928,7 +933,7 @@ def message_handle (self, message): # tell the leader that I'm gone self.logger.info("Sending 'follower-gone' message") - self.message_publish(self.message_build("leader", self.roadblock_leader_id, "follower-gone")) + self.message_publish("leader", self.message_build("leader", self.roadblock_leader_id, "follower-gone")) # signal myself to exit self.watch_bus = False @@ -950,7 +955,7 @@ def message_handle (self, message): # send a message that will probably not be observed by # anyone...but just in case... self.logger.info("Sending 'all-gone' message") - self.message_publish(self.message_build("all", "all", "all-gone")) + self.message_publish("followers", self.message_build("all", "all", "all-gone")) # signal myself to exit self.watch_bus = False @@ -961,7 +966,7 @@ def message_handle (self, message): return self.RC_SUCCESS - def message_publish(self, message): + def message_publish(self, message_bus, message): '''Publish messages for subscribers to receive''' ret_val = 0 @@ -974,24 +979,25 @@ def message_publish(self, message): counter += 1 try: - ret_val = self.redcon.xadd(self.roadblock_uuid + "__bus", { 'msg': self.message_to_str(message) }) + ret_val = self.redcon.xadd(self.roadblock_uuid + "__bus__" + message_bus, { 'msg': self.message_to_str(message) }) except redis.exceptions.ConnectionError as con_error: self.logger.error("%s", con_error) - self.logger.error("Bus add failed due to connection error!") + self.logger.error("Bus add to '%s' failed due to connection error!", message_bus) except redis.exceptions.TimeoutError as con_error: self.logger.error("%s", con_error) - self.logger.error("Bus add failed due to a timeout error!") + self.logger.error("Bus add to '%s' failed due to a timeout error!", message_bus) if ret_val is None: - self.logger.warning("Failed attempt %d to publish message '%s'", counter, message) + self.logger.warning("Failed attempt %d to publish message '%s' to bus '%s'", counter, message, message_bus) self.backoff(counter) else: - self.logger.debug("Message '%s' was sent on the %d attempt with message ID '%s'", message, counter, ret_val) + self.logger.debug("Message '%s' was sent on the %d attempt with message ID '%s' on bus '%s'", message, counter, ret_val, message_bus) if self.message_log is not None: # if the message log is open then append messages to the queue # for later dumping + # do something bus specific here self.messages["sent"].append(message) return self.RC_SUCCESS @@ -1120,14 +1126,15 @@ def cleanup(self): self.key_delete(self.roadblock_uuid) self.key_delete(self.roadblock_uuid + "__initialized") - msg_count = self.redcon.xlen(self.roadblock_uuid + "__bus") - self.logger.debug("total messages on bus: %d", msg_count) + for bus_name in ( "global", "leader", "followers" ): + msg_count = self.redcon.xlen(self.roadblock_uuid + "__bus__" + bus_name) + self.logger.debug("total messages on bus '%s': %d", bus_name, msg_count) - msgs_trimmed = self.redcon.xtrim(self.roadblock_uuid + "__bus", maxlen = 0, approximate = False) - self.logger.debug("total messages deleted from bus: %d", msgs_trimmed) + msgs_trimmed = self.redcon.xtrim(self.roadblock_uuid + "__bus__" + bus_name, maxlen = 0, approximate = False) + self.logger.debug("total messages deleted from bus '%s': %d", bus_name, msgs_trimmed) - msg_count = self.redcon.xlen(self.roadblock_uuid + "__bus") - self.logger.debug("total messages on bus: %d", msg_count) + msg_count = self.redcon.xlen(self.roadblock_uuid + "__bus__" + bus_name) + self.logger.debug("total messages on bus '%s': %d", bus_name, msg_count) if self.connection_watchdog_state == "enabled": self.logger.info("Closing connection pool watchdog") @@ -1206,7 +1213,7 @@ def do_heartbeat_signal(self): self.logger.critical("Failed ending current heartbeat monitoring period -> heartbeat timeout") self.logger.info("Sending 'heartbeat-timeout' message") - self.message_publish(self.message_build("all", "all", "heartbeat-timeout")) + self.message_publish("followers", self.message_build("all", "all", "heartbeat-timeout")) self.timeout_internals() @@ -1220,7 +1227,7 @@ def do_heartbeat_signal(self): self.followers["waiting"] = copy.deepcopy(self.followers["waiting_backup"]) self.logger.info("Sending 'leader-heartbeat' message") - self.message_publish(self.message_build("all", "all", "leader-heartbeat")) + self.message_publish("followers", self.message_build("all", "all", "leader-heartbeat")) self.logger.info("Starting new heartbeat monitoring period") self.set_alarm(self.heartbeat_timeout) @@ -1355,10 +1362,10 @@ def wait_for_process_monitor(self): log_contents = str(base64.b64encode(lzma.compress(wait_for_log_fh.read().encode("ascii"))), "ascii") self.logger.critical("Sending 'follower-waiting-complete-failed' message") - self.message_publish(self.message_build("leader", self.roadblock_leader_id, "follower-waiting-complete-failed", value = log_contents)) + self.message_publish("leader", self.message_build("leader", self.roadblock_leader_id, "follower-waiting-complete-failed", value = log_contents)) else: self.logger.info("Sending 'follower-waiting-complete' message") - self.message_publish(self.message_build("leader", self.roadblock_leader_id, "follower-waiting-complete")) + self.message_publish("leader", self.message_build("leader", self.roadblock_leader_id, "follower-waiting-complete")) self.logger.debug("The wait_for monitor is exiting") @@ -1543,14 +1550,19 @@ def run_it(self): self.initiator = True self.logger.info("Initiator: True") + # create the streams/buses + self.logger.info("Creating buses") + self.message_publish("global", self.message_build("all", "all", "global-bus-created")) + self.message_publish("leader", self.message_build("leader", self.roadblock_leader_id, "leader-bus-created")) + self.message_publish("followers", self.message_build("all", "all", "followers-bus-created")) + # publish the cluster timeout - # create the stream/bus by posting the timeout timestamp self.logger.info("Sending 'timeout-ts' message") - self.message_publish(self.message_build("all", "all", "timeout-ts", cluster_timeout)) + self.message_publish("global", self.message_build("all", "all", "timeout-ts", cluster_timeout)) # publish the initiator information self.logger.info("Sending 'initiator-info' message") - self.message_publish(self.message_build("all", "all", "initiator-info")) + self.message_publish("global", self.message_build("all", "all", "initiator-info")) self.initiator_type = self.roadblock_role self.initiator_id = self.my_id @@ -1575,20 +1587,31 @@ def run_it(self): if self.roadblock_role == "follower": # tell the leader that I am online self.logger.info("Sending 'follower-online' message") - self.message_publish(self.message_build("leader", self.roadblock_leader_id, "follower-online")) + self.message_publish("leader", self.message_build("leader", self.roadblock_leader_id, "follower-online")) elif self.roadblock_role == "leader": # tell everyone that the leader is online self.logger.info("Sending 'leader-online' message") - self.message_publish(self.message_build("all", "all", "leader-online")) + self.message_publish("followers", self.message_build("all", "all", "leader-online")) - last_msg_id = 0 + followers_last_msg_id = 0 + leader_last_msg_id = 0 + global_last_msg_id = 0 while self.watch_bus: if self.rc != 0: self.logger.debug("self.rc != 0 --> breaking") break try: - msgs = self.redcon.xread(streams = { self.roadblock_uuid + "__bus": last_msg_id }, block = 0) + if self.roadblock_role == "follower": + msgs = self.redcon.xread(streams = { + self.roadblock_uuid + "__bus__global": global_last_msg_id, + self.roadblock_uuid + "__bus__followers": followers_last_msg_id + }, block = 0) + elif self.roadblock_role == "leader": + msgs = self.redcon.xread(streams = { + self.roadblock_uuid + "__bus__global": global_last_msg_id, + self.roadblock_uuid + "__bus__leader": leader_last_msg_id + }, block = 0) except redis.exceptions.ConnectionError as con_error: self.logger.error("%s", con_error) self.logger.error("Bus read failed due to connection error!") @@ -1599,25 +1622,33 @@ def run_it(self): if len(msgs) == 0: time.sleep(0.001) else: - self.logger.debug("retrieved %d messages for processing", len(msgs[0][1])) + for bus in msgs: + bus_name = bus[0].decode() - for msg_id, msg in msgs[0][1]: - last_msg_id = msg_id + self.logger.debug("retrieved %d messages from bus '%s' for processing", len(bus[1]), bus_name) - self.logger.debug("received msg=[%s] with msg_id=[%s]", msg, msg_id) + for msg_id, msg in bus[1]: + if bus_name == self.roadblock_uuid + "__bus__global": + global_last_msg_id = msg_id + elif bus_name == self.roadblock_uuid + "__bus__leader": + leader_last_msg_id = msg_id + elif bus_name == self.roadblock_uuid + "__bus__followers": + followers_last_msg_id = msg_id - msg = self.message_from_str(msg[b"msg"].decode()) + self.logger.debug("received msg=[%s] with msg_id=[%s] from bus '%s'", msg, msg_id, bus_name) - if not self.message_for_me(msg): - self.logger.debug("received a message which is not for me!") - else: - if not self.message_validate(msg): - self.logger.error("received a message for me which did not validate! [%s]", msg) + msg = self.message_from_str(msg[b"msg"].decode()) + + if not self.message_for_me(msg): + self.logger.debug("received a message which is not for me!") else: - self.logger.debug("received a validated message for me!") - ret_val = self.message_handle(msg) - if ret_val: - return ret_val + if not self.message_validate(msg): + self.logger.error("received a message for me which did not validate! [%s]", msg) + else: + self.logger.debug("received a validated message for me!") + ret_val = self.message_handle(msg) + if ret_val: + return ret_val if self.rc == 0: self.cleanup()