Skip to content

Commit

Permalink
add personal streams for targeted message delivery to improve scalabi…
Browse files Browse the repository at this point in the history
…lity

- this allows user messages to be deliverd to the recipient and the
  recipient only which prevents participants from having to do
  needless processing on messages not intended for them
  • Loading branch information
k-rister committed Dec 7, 2023
1 parent 02bb0b8 commit 8fe1c30
Showing 1 changed file with 30 additions and 5 deletions.
35 changes: 30 additions & 5 deletions roadblock.py
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,7 @@ def define_msg_schema(self):
"global-bus-created",
"leader-bus-created",
"followers-bus-created",
"personal-bus-created",
"timeout-ts",
"initialized",
"switch-buses",
Expand Down Expand Up @@ -701,12 +702,16 @@ def send_user_messages(self):
self.logger.info("Sending user requested messages")
user_msg_counter = 1
for user_msg in self.user_messages:
bus_name = "global"
if user_msg["recipient"]["id"] != "all":
bus_name = user_msg["recipient"]["id"]

if "user-string" in user_msg:
self.logger.info("Sending user message %d: 'user-string'", user_msg_counter)
self.message_publish("global", self.message_build(user_msg["recipient"]["type"], user_msg["recipient"]["id"], "user-string", user_msg["user-string"]))
self.message_publish(bus_name, self.message_build(user_msg["recipient"]["type"], user_msg["recipient"]["id"], "user-string", user_msg["user-string"]))
elif "user-object" in user_msg:
self.logger.info("Sending user message %d: 'user-object'", user_msg_counter)
self.message_publish("global", self.message_build(user_msg["recipient"]["type"], user_msg["recipient"]["id"], "user-object", user_msg["user-object"]))
self.message_publish(bus_name, self.message_build(user_msg["recipient"]["type"], user_msg["recipient"]["id"], "user-object", user_msg["user-object"]))

user_msg_counter += 1

Expand All @@ -730,7 +735,7 @@ def message_handle (self, message):

msg_command = self.message_get_command(message)

if msg_command in ("global-bus-created", "leader-bus-created", "followers-bus-created"):
if msg_command in ("global-bus-created", "leader-bus-created", "followers-bus-created", "personal-bus-created"):
self.logger.info("Received '%s' message", msg_command)
elif msg_command == "timeout-ts":
self.logger.info("Received 'timeout-ts' message")
Expand Down Expand Up @@ -971,6 +976,9 @@ def message_publish(self, message_bus, message):

ret_val = 0
counter = 0

self.logger.debug("Attempting to publish message '%s' on bus '%s'", message, message_bus)

while ret_val == 0:
if self.rc != 0:
self.logger.debug("self.rc != 0 --> breaking")
Expand Down Expand Up @@ -1126,7 +1134,14 @@ def cleanup(self):
self.key_delete(self.roadblock_uuid)
self.key_delete(self.roadblock_uuid + "__initialized")

buses_to_clean = []
for bus_name in ( "global", "leader", "followers" ):
buses_to_clean.append(bus_name)
buses_to_clean.append(self.roadblock_leader_id)
for bus_name in self.roadblock_followers:
buses_to_clean.append(bus_name)

for bus_name in buses_to_clean:
msg_count = self.redcon.xlen(self.roadblock_uuid + "__bus__" + bus_name)
self.logger.debug("total messages on bus '%s': %d", bus_name, msg_count)

Expand Down Expand Up @@ -1584,6 +1599,11 @@ def run_it(self):
self.logger.info(".")

self.logger.info("Roadblock is initialized")

# create the personal stream/bus
self.logger.info("Creating personal bus")
self.message_publish(self.my_id, self.message_build_custom(self.roadblock_role, "personal-bus-created", self.roadblock_role, self.my_id, "personal-bus-created"))

if self.roadblock_role == "follower":
# tell the leader that I am online
self.logger.info("Sending 'follower-online' message")
Expand All @@ -1596,6 +1616,7 @@ def run_it(self):
followers_last_msg_id = 0
leader_last_msg_id = 0
global_last_msg_id = 0
personal_last_msg_id = 0
while self.watch_bus:
if self.rc != 0:
self.logger.debug("self.rc != 0 --> breaking")
Expand All @@ -1605,12 +1626,14 @@ def run_it(self):
if self.roadblock_role == "follower":
msgs = self.redcon.xread(streams = {
self.roadblock_uuid + "__bus__global": global_last_msg_id,
self.roadblock_uuid + "__bus__followers": followers_last_msg_id
self.roadblock_uuid + "__bus__followers": followers_last_msg_id,
self.roadblock_uuid + "__bus__" + self.my_id: personal_last_msg_id
}, block = 0)
elif self.roadblock_role == "leader":
msgs = self.redcon.xread(streams = {
self.roadblock_uuid + "__bus__global": global_last_msg_id,
self.roadblock_uuid + "__bus__leader": leader_last_msg_id
self.roadblock_uuid + "__bus__leader": leader_last_msg_id,
self.roadblock_uuid + "__bus__" + self.my_id: personal_last_msg_id
}, block = 0)
except redis.exceptions.ConnectionError as con_error:
self.logger.error("%s", con_error)
Expand All @@ -1634,6 +1657,8 @@ def run_it(self):
leader_last_msg_id = msg_id
elif bus_name == self.roadblock_uuid + "__bus__followers":
followers_last_msg_id = msg_id
elif bus_name == self.roadblock_uuid + "__bus__" + self.my_id:
personal_last_msg_id = msg_id

self.logger.debug("received msg=[%s] with msg_id=[%s] from bus '%s'", msg, msg_id, bus_name)

Expand Down

0 comments on commit 8fe1c30

Please sign in to comment.