Skip to content

Commit

Permalink
Fixed broken backup path when running train job
Browse files Browse the repository at this point in the history
  • Loading branch information
Pablo Rodríguez Flores committed Jun 10, 2024
1 parent 1e86573 commit 2683675
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 19 deletions.
2 changes: 1 addition & 1 deletion resources/src/ai/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def train(self, raw_data, epochs=20, batch_size=32, backup_path=None):
backup_path (None or str): path to where the backups should be saved.
"""
if backup_path is None:
backup_path = "./backups/"
backup_path = "resources/src/ai/backups/"
date = datetime.now().strftime("%y-%m-%dT%H:%M")
self.save_model(f"{backup_path}{date}.keras",f"{backup_path}{date}.ini")
prep_data = self.prepare_data_for_training(raw_data)
Expand Down
2 changes: 1 addition & 1 deletion resources/src/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ metric=bytes
schedule_hour=* * * * *
epochs=20
batch_size=32
backup_path=./backups/
backup_path=resources/src/ai/backups/
#target_sensors=FlowSensor
model_names=traffic

Expand Down
34 changes: 17 additions & 17 deletions resources/src/redborder/zookeeper/rb_outliers_zoo_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
# Because distributed systems are a zoo......

import os
import random
import time
from kazoo.recipe.election import Election
from kazoo.recipe.queue import LockingQueue
Expand All @@ -45,7 +44,7 @@ def __init__(self) -> None:
configurations, including the ZooKeeper client and S3 client.
"""
self.is_leader = False
self.running = False
self.is_running = False
self.s3_client = None
self.queue = None
self.election = None
Expand Down Expand Up @@ -89,7 +88,7 @@ def sync_nodes(self) -> None:
logger.info("Synchronizing nodes")
self.setup_s3()
self._ensure_paths()
self.running = True
self.is_running = True
self.queue = LockingQueue(self.zookeeper, self.paths["queue"])
self.election = Election(self.zookeeper, self.paths["election"], identifier=self.name)
self.leader_watcher = ChildrenWatch(self.zookeeper, self.paths["leader"], self._participate_in_election)
Expand All @@ -104,7 +103,7 @@ def cleanup(self, signum: int, frame) -> None:
frame: The current stack frame.
"""
logger.info(f"Cleanup called with signal {signum}")
self.running = False
self.is_running = False
self.election.cancel()
self.leader_watcher._stopped = True
if self.is_leader:
Expand All @@ -117,7 +116,7 @@ def _run_tasks(self) -> None:
"""
Runs tasks based on the leadership status.
"""
while self.running:
while self.is_running:
if self.is_leader:
self._leader_tasks()
else:
Expand All @@ -129,7 +128,7 @@ def _leader_tasks(self) -> None:
Runs the tasks for the leader node.
"""
logger.info("Running leader tasks")
while self.is_leader and self.running:
while self.is_leader and self.is_running:
self._get_models()
self._locks_models_on_zoo()
next_task_time = time.time() + self.sleep_time
Expand All @@ -149,13 +148,15 @@ def _follower_tasks(self) -> None:
Runs the tasks for the follower nodes.
"""
logger.info("Running follower tasks")
while self.running:
while not self.is_leader and self._leader_exists():
model = self._get_model_from_queue()
if model:
self._process_model_as_follower(model)
time.sleep(2)
time.sleep(2)
while self.is_running and not self.is_leader:
if not self._leader_exists():
logger.info("No leader found, waiting...")
time.sleep(5)
continue
model = self._get_model_from_queue()
if model:
self._process_model_as_follower(model)
time.sleep(5)

def _participate_in_election(self, leader_nodes: list[str]) -> None:
"""
Expand All @@ -164,7 +165,7 @@ def _participate_in_election(self, leader_nodes: list[str]) -> None:
Parameters:
leader_nodes (list[str]): A list of leader nodes.
"""
if not self._leader_exists() and self.running:
if not self._leader_exists() and self.is_running:
logger.info("Participating in election")
try:
if self.election.lock.acquire(timeout=10):
Expand Down Expand Up @@ -222,7 +223,7 @@ def _locks_models_on_zoo(self) -> None:
"""
Locks the models in ZooKeeper.
"""
if self.is_leader and self.running:
if self.is_leader and self.is_running:
b_models = [bytes(model, "utf-8") for model in self.models]
self.queue.put_all(b_models)
logger.info(f"Locked models {', '.join(self.models)}")
Expand Down Expand Up @@ -260,5 +261,4 @@ def _process_model_as_follower(self, model: str) -> None:
logger.info(f"Finished training of model {model}")
except Exception as e:
logger.error(f"Client {self.name} failed to process {model}: {e}")
self.queue.put(bytes(model, "utf-8"))
logger.error(f"Client {self.name} requeued {model}")
self._delete_node(self.paths["taken"], model)

0 comments on commit 2683675

Please sign in to comment.