Skip to content

Commit

Permalink
Implemented tests for clusterization
Browse files Browse the repository at this point in the history
  • Loading branch information
Pablo Rodríguez Flores committed Jun 13, 2024
1 parent 4f50f4f commit 4f8699e
Show file tree
Hide file tree
Showing 10 changed files with 410 additions and 40 deletions.
18 changes: 15 additions & 3 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,21 @@ jobs:
if: github.event_name == 'push' || (github.event_name == 'pull_request' && github.event.pull_request.user.login == 'dependabot[bot]')
runs-on: ubuntu-latest

services:
minio:
image: minio/minio:latest
env:
MINIO_ACCESS_KEY: minioadmin
MINIO_SECRET_KEY: minioadmin
ports:
- 9000:9000
options: --name minio -d minio/minio server /data

zookeeper:
image: bitnami/zookeeper:latest
ports:
- 2181:2181

steps:
- name: Checkout code
uses: actions/checkout@v2
Expand Down Expand Up @@ -66,6 +81,3 @@ jobs:
- name: Run outliers
run: |
ENVIRONMENT=test python3.9 resources/src/__main__.py
2 changes: 1 addition & 1 deletion resources/src/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def __init__(self) -> None:
self.server = None
self.app = None
self.query_builder = None
self.zoo_sync = RbOutliersZooSync()
self.zoo_sync = RbOutliersZooSync(config)
self.run()

def run(self):
Expand Down
5 changes: 3 additions & 2 deletions resources/src/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ s3_hostname=http://x.x.x.x:9000
s3_public_key=my_key
s3_private_key=my_key
s3_region=es
s3_bucket=my_bucket
s3_bucket=mybucket

[ZooKeeper]
zk_sleep_time=86400
zk_hosts=hosts
zk_tick_time=5
zk_hosts=x.x.x.x:2821
zk_sync_path=/rb-aioutliers
zk_name=name
52 changes: 30 additions & 22 deletions resources/src/redborder/zookeeper/rb_outliers_zoo_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
from kazoo.recipe.watchers import ChildrenWatch
from kazoo.exceptions import LockTimeout
from resources.src.redborder.s3 import S3
from resources.src.server.rest import config
from resources.src.logger.logger import logger
from resources.src.config.configmanager import ConfigManager
from resources.src.redborder.async_jobs.train_job import RbOutlierTrainJob
from resources.src.redborder.zookeeper.zookeeper_client import ZooKeeperClient

Expand All @@ -38,26 +38,34 @@ class RbOutliersZooSync(ZooKeeperClient):
synchronization, and model processing tasks.
"""

def __init__(self) -> None:
def __init__(self, config: ConfigManager) -> None:
"""
Initializes the RbOutliersZooSync instance, setting up the necessary attributes and
configurations, including the ZooKeeper client and S3 client.
Args:
config (ConfigManager): Configuration settings including the ones for ZooKeeper client.
"""
self.config = config
self.is_leader = False
self.is_running = False
self.s3_client = None
self.queue = None
self.election = None
self.leader_watcher = None
self.paths = {}
super().__init__()
super().__init__(config)
self.sleep_time = float(config.get("ZooKeeper", "zk_sleep_time"))
self.tick_time = float(config.get("ZooKeeper", "zk_tick_time"))

def _tick(self) -> None:
time.sleep(self.tick_time)

def _ensure_paths(self) -> None:
"""
Ensures the required ZooKeeper paths are created.
"""
zk_sync_path = config.get("ZooKeeper", "zk_sync_path")
zk_sync_path = self.config.get("ZooKeeper", "zk_sync_path")
self.paths = {
"leader": os.path.join(zk_sync_path, "leader"),
"queue": os.path.join(zk_sync_path, "models", "queue"),
Expand All @@ -73,11 +81,11 @@ def setup_s3(self) -> None:
Sets up the S3 client with the necessary configurations.
"""
self.s3_client = S3(
config.get("AWS", "s3_public_key"),
config.get("AWS", "s3_private_key"),
config.get("AWS", "s3_region"),
config.get("AWS", "s3_bucket"),
config.get("AWS", "s3_hostname")
self.config.get("AWS", "s3_public_key"),
self.config.get("AWS", "s3_private_key"),
self.config.get("AWS", "s3_region"),
self.config.get("AWS", "s3_bucket"),
self.config.get("AWS", "s3_hostname")
)

def sync_nodes(self) -> None:
Expand All @@ -101,7 +109,7 @@ def cleanup(self, signum: int, frame) -> None:
"""
Cleans up the resources and stops the synchronization process.
Parameters:
Args:
signum (int): The signal number.
frame: The current stack frame.
"""
Expand All @@ -112,7 +120,7 @@ def cleanup(self, signum: int, frame) -> None:
if self.is_leader:
self.is_leader = False
self.zookeeper.set(self.paths["leader"], b"")
time.sleep(2)
self._tick()
super().cleanup(signum, frame)

def _run_tasks(self) -> None:
Expand All @@ -124,11 +132,11 @@ def _run_tasks(self) -> None:
self._leader_tasks()
else:
self._follower_tasks()
time.sleep(2)
self._tick()

def _leader_tasks(self) -> None:
"""
Runs the tasks for the leader node. This involves queuing models to be trained by the
Runs the tasks for the leader node. This involves queuing models to be trained by the
followers and requeuing models whose clients or training have failed.
"""
logger.info("Running leader tasks")
Expand All @@ -145,7 +153,7 @@ def _leader_tasks(self) -> None:
self._delete_node(self.paths["taken"], model)
self.queue.put(bytes(model, "utf-8"))
logger.info(f"Model {model} requeued")
time.sleep(2)
self._tick()

def _follower_tasks(self) -> None:
"""
Expand All @@ -155,24 +163,24 @@ def _follower_tasks(self) -> None:
while self.is_running and not self.is_leader:
if not self._leader_exists():
logger.info("No leader found, waiting...")
time.sleep(5)
self._tick()
continue
model = self._get_model_from_queue()
if model:
self._process_model_as_follower(model)
time.sleep(5)
self._tick()

def _participate_in_election(self, leader_nodes: list[str]) -> None:
"""
Participates in the leader election process.
Parameters:
Args:
leader_nodes (list[str]): A list of leader nodes.
"""
if not self._leader_exists() and self.is_running:
logger.info(f"Participating in election {len(leader_nodes)} nodes")
try:
if self.election.lock.acquire(timeout=10):
if self.election.lock.acquire(timeout=5*self.tick_time):
try:
self._election_callback()
finally:
Expand Down Expand Up @@ -231,8 +239,8 @@ def _get_models(self) -> list[str]:
def _queue_models_on_zoo(self, models: list[str]) -> None:
"""
Queues the models in ZooKeeper.
Parameters:
Args:
models (list[str]): A list of models to be locked.
"""
if self.is_leader and self.is_running:
Expand All @@ -243,7 +251,7 @@ def _queue_models_on_zoo(self, models: list[str]) -> None:
def _get_model_from_queue(self) -> str:
"""
Attempts to get a model from the queue. Then, znodes with the model name are created both
in the train and in the taken nodes to indicate that the model is being trained. This is
in the train and in the taken nodes to indicate that the model is being trained. This is
made so if the follower fails, the ephemeral node in "train" will disappear while the node
in "taken" would remain. The leader will notice this abnormal state and requeue the model.
Expand All @@ -265,7 +273,7 @@ def _process_model_as_follower(self, model: str) -> None:
"""
Processes the model as a follower node.
Parameters:
Args:
model (str): The model to process.
"""
try:
Expand Down
28 changes: 17 additions & 11 deletions resources/src/redborder/zookeeper/zookeeper_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,36 @@
import signal
from kazoo.client import KazooClient, KazooRetry
from kazoo.protocol.states import KazooState
from resources.src.server.rest import config
from resources.src.logger.logger import logger
from resources.src.config.configmanager import ConfigManager

class ZooKeeperClient:
"""
A client to manage interactions with a ZooKeeper service.
This class provides methods to set up a ZooKeeper client, handle state changes,
This class provides methods to set up a ZooKeeper client, handle state changes,
clean up connections, and manage znodes.
"""

def __init__(self) -> None:
def __init__(self, config: ConfigManager) -> None:
"""
Initializes the ZooKeeperClient instance, setting up the ZooKeeper client and
Initializes the ZooKeeperClient instance, setting up the ZooKeeper client and
signal handlers for cleanup.
Args:
config (ConfigManager): Configuration settings including the ones for ZooKeeper client.
"""
self.zookeeper = None
self._setup_zookeeper()
self._setup_zookeeper(config)
signal.signal(signal.SIGINT, self.cleanup)
signal.signal(signal.SIGTERM, self.cleanup)

def _setup_zookeeper(self) -> None:
def _setup_zookeeper(self, config: ConfigManager) -> None:
"""
Sets up the ZooKeeper client with retry strategies and adds a state listener.
Args:
config (ConfigManager): Configuration settings including the ones for ZooKeeper client.
"""
retry = KazooRetry(
max_tries=15,
Expand All @@ -66,7 +72,7 @@ def _listener(self, state: KazooState) -> None:
"""
Listens for changes in the ZooKeeper connection state and logs the events.
Parameters:
Args:
state (KazooState): The current state of the ZooKeeper connection.
"""
if state == KazooState.LOST:
Expand All @@ -80,7 +86,7 @@ def cleanup(self, signum: int, frame) -> None:
"""
Cleans up the ZooKeeper client and exits the application.
Parameters:
Args:
signum (int): The signal number.
frame: The current stack frame.
"""
Expand All @@ -94,7 +100,7 @@ def _check_node(self, *paths: str) -> bool:
"""
Checks if a znode exists at the specified path.
Parameters:
Args:
*paths (str): The parts of the path to join and create the znode.
Returns:
Expand All @@ -107,7 +113,7 @@ def _create_node(self, *paths: str, ephemeral: bool = False) -> None:
"""
Creates a single znode at the specified path.
Parameters:
Args:
*paths (str): The parts of the path to join and create the znode.
ephemeral (bool, optional): Set to true if the node should be ephemeral.
"""
Expand All @@ -118,7 +124,7 @@ def _delete_node(self, *paths: str) -> None:
"""
Deletes a single znode at the specified path.
Parameters:
Args:
*paths (str): The parts of the path to join and delete the znode.
"""
full_path = os.path.join(*paths)
Expand Down
2 changes: 1 addition & 1 deletion resources/src/server/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ def sync_with_s3_periodically(self):
"""
while True:
logger.logger.info("Sync with S3 Started")
self.sync_models_with_s3()
self.sync_models_with_s3(config)
logger.logger.info("Sync with S3 Finished")
time.sleep(self.s3_sync_interval)

Expand Down
43 changes: 43 additions & 0 deletions resources/tests/config_test.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
[OutliersServerProduction]
outliers_binding_address=0.0.0.0
outliers_server_port=39091
outliers_server_workers=4
outliers_server_threads=20

[OutliersServerTesting]
outliers_binding_address=0.0.0.0
outliers_server_port=39091

[Outliers]
metric=bytes
epochs=100
batch_size=32
backup_path=resources/src/ai/backups/

[ShallowOutliers]
sensitivity=0.95
contamination=0.01

[Druid]
druid_endpoint=http://x.x.x.x:8080/druid/v2/

[Logger]
log_file=./outliers.log

[Rails]
endpoint=https://x.x.x.x
auth_token=my_token

[AWS]
s3_hostname=http://localhost:9000
s3_public_key=minioadmin
s3_private_key=minioadmin
s3_region=es
s3_bucket=testbucket

[ZooKeeper]
zk_sleep_time=1
zk_tick_time=0.5
zk_hosts=localhost:2181
zk_sync_path=/rb-aioutliers
zk_name=test
Loading

0 comments on commit 4f8699e

Please sign in to comment.