From 089f63e9f7f7917f7a494b0c2dded9b50787cfe7 Mon Sep 17 00:00:00 2001 From: JP Bruins Slot Date: Fri, 10 Nov 2023 12:11:25 +0100 Subject: [PATCH] Scheduler optimizations implementation (#1999) Co-authored-by: ammar92 --- mula/scheduler/app.py | 25 ++-- mula/scheduler/config/settings.py | 36 ++++- mula/scheduler/connectors/services/bytes.py | 10 +- .../connectors/services/katalogus.py | 4 +- .../scheduler/connectors/services/octopoes.py | 3 +- .../scheduler/connectors/services/services.py | 21 ++- mula/scheduler/context/context.py | 9 +- mula/tests/integration/test_services.py | 4 + mula/tests/scripts/__init__.py | 0 mula/tests/scripts/load.py | 123 ++++++++++++++++++ 10 files changed, 206 insertions(+), 29 deletions(-) create mode 100644 mula/tests/scripts/__init__.py create mode 100644 mula/tests/scripts/load.py diff --git a/mula/scheduler/app.py b/mula/scheduler/app.py index f29331c2461..3bbe5c2149b 100644 --- a/mula/scheduler/app.py +++ b/mula/scheduler/app.py @@ -175,22 +175,23 @@ def collect_metrics(self) -> None: This method that allows to collect metrics throughout the application. """ - for s in self.schedulers.values(): - self.ctx.metrics_qsize.labels( - scheduler_id=s.scheduler_id, - ).set( - s.queue.qsize(), - ) - - status_counts = self.ctx.datastores.task_store.get_status_counts(s.scheduler_id) - for status, count in status_counts.items(): - self.ctx.metrics_task_status_counts.labels( + with self.lock: + for s in self.schedulers.values(): + self.ctx.metrics_qsize.labels( scheduler_id=s.scheduler_id, - status=status, ).set( - count, + s.queue.qsize(), ) + status_counts = self.ctx.datastores.task_store.get_status_counts(s.scheduler_id) + for status, count in status_counts.items(): + self.ctx.metrics_task_status_counts.labels( + scheduler_id=s.scheduler_id, + status=status, + ).set( + count, + ) + def run(self) -> None: """Start the main scheduler application, and run in threads the following processes: diff --git a/mula/scheduler/config/settings.py b/mula/scheduler/config/settings.py index e7b4b7e6bf3..34943f05622 100644 --- a/mula/scheduler/config/settings.py +++ b/mula/scheduler/config/settings.py @@ -76,11 +76,6 @@ class Settings(BaseSettings): ) # Application settings - katalogus_cache_ttl: int = Field( - 30, - description="The lifetime of the katalogus cache in seconds", - ) - monitor_organisations_interval: int = Field( 60, description="Interval in seconds of the execution of the " @@ -90,11 +85,42 @@ class Settings(BaseSettings): "their schedulers.", ) + # External services settings octopoes_request_timeout: int = Field( 10, description="The timeout in seconds for the requests to the octopoes api", ) + octopoes_pool_connections: int = Field( + 10, + description="The maximum number of connections to save in the pool for the octopoes api", + ) + + katalogus_cache_ttl: int = Field( + 30, + description="The lifetime of the katalogus cache in seconds", + ) + + katalogus_request_timeout: int = Field( + 10, + description="The timeout in seconds for the requests to the katalogus api", + ) + + katalogus_pool_connections: int = Field( + 10, + description="The maximum number of connections to save in the pool for the katalogus api", + ) + + bytes_request_timeout: int = Field( + 10, + description="The timeout in seconds for the requests to the bytes api", + ) + + bytes_pool_connections: int = Field( + 10, + description="The maximum number of connections to save in the pool for the bytes api", + ) + rabbitmq_prefetch_count: int = Field( 100, description="RabbitMQ prefetch_count for `channel.basic_qos()`, " diff --git a/mula/scheduler/connectors/services/bytes.py b/mula/scheduler/connectors/services/bytes.py index 9f2084285aa..2a72b6f37e8 100644 --- a/mula/scheduler/connectors/services/bytes.py +++ b/mula/scheduler/connectors/services/bytes.py @@ -1,3 +1,4 @@ +import threading import typing from functools import wraps from typing import Any, Callable, Dict, Optional @@ -33,7 +34,7 @@ class Bytes(HTTPService): name = "bytes" - def __init__(self, host: str, source: str, user: str, password: str, timeout: int = 5): + def __init__(self, host: str, source: str, user: str, password: str, timeout: int, pool_connections: int): """Initialize the Bytes service. Args: @@ -48,10 +49,13 @@ def __init__(self, host: str, source: str, user: str, password: str, timeout: in "password": password, } - super().__init__(host=host, source=source, timeout=timeout) + self.lock: threading.Lock = threading.Lock() + + super().__init__(host, source, timeout, pool_connections) def login(self) -> None: - self.headers.update({"Authorization": f"bearer {self.get_token()}"}) + with self.lock: + self.headers.update({"Authorization": f"bearer {self.get_token()}"}) @staticmethod def _verify_response(response: requests.Response) -> None: diff --git a/mula/scheduler/connectors/services/katalogus.py b/mula/scheduler/connectors/services/katalogus.py index d8eeb3bcb57..bdd109bbab2 100644 --- a/mula/scheduler/connectors/services/katalogus.py +++ b/mula/scheduler/connectors/services/katalogus.py @@ -12,8 +12,8 @@ class Katalogus(HTTPService): name = "katalogus" - def __init__(self, host: str, source: str, timeout: int = 5, cache_ttl: int = 30): - super().__init__(host, source, timeout) + def __init__(self, host: str, source: str, timeout: int, pool_connections: int, cache_ttl: int = 30): + super().__init__(host, source, timeout, pool_connections) # For every organisation we cache its plugins, it references the # plugin-id as key and the plugin as value. diff --git a/mula/scheduler/connectors/services/octopoes.py b/mula/scheduler/connectors/services/octopoes.py index 5d70d7c2141..08658c7a9c5 100644 --- a/mula/scheduler/connectors/services/octopoes.py +++ b/mula/scheduler/connectors/services/octopoes.py @@ -17,10 +17,11 @@ def __init__( host: str, source: str, orgs: List[Organisation], + pool_connections: int, timeout: int = 10, ): self.orgs: List[Organisation] = orgs - super().__init__(host, source, timeout) + super().__init__(host, source, timeout, pool_connections) @exception_handler def get_objects_by_object_types( diff --git a/mula/scheduler/connectors/services/services.py b/mula/scheduler/connectors/services/services.py index fc3b29753e6..8024e6f7846 100644 --- a/mula/scheduler/connectors/services/services.py +++ b/mula/scheduler/connectors/services/services.py @@ -38,7 +38,14 @@ class HTTPService(Connector): name: Optional[str] = None health_endpoint: Optional[str] = "health" - def __init__(self, host: str, source: str, timeout: int = 5, retries: int = 5): + def __init__( + self, + host: str, + source: str, + timeout: int = 10, + pool_connections: int = 10, + retries: int = 5, + ): """Initializer of the HTTPService class. During initialization the host will be checked if it is available and healthy. @@ -51,6 +58,8 @@ def __init__(self, host: str, source: str, timeout: int = 5, retries: int = 5): from where the requests came from. timeout: An integer defining the timeout of requests. + pool_connections: + The number of connections kept alive in the pool. retries: An integer defining the number of retries to make before giving up. @@ -61,7 +70,8 @@ def __init__(self, host: str, source: str, timeout: int = 5, retries: int = 5): self.session: requests.Session = requests.Session() self.host: str = host self.timeout: int = timeout - self.retries = retries + self.retries: int = retries + self.pool_connections: int = pool_connections self.source: str = source max_retries = Retry( @@ -69,8 +79,11 @@ def __init__(self, host: str, source: str, timeout: int = 5, retries: int = 5): backoff_factor=0.1, status_forcelist=[500, 502, 503, 504], ) - self.session.mount("http://", HTTPAdapter(max_retries=max_retries)) - self.session.mount("https://", HTTPAdapter(max_retries=max_retries)) + + # Mount the HTTPAdapter to the session + http_adapter = HTTPAdapter(max_retries=max_retries, pool_connections=self.pool_connections) + self.session.mount("http://", http_adapter) + self.session.mount("https://", http_adapter) if self.source: self.headers["User-Agent"] = self.source diff --git a/mula/scheduler/context/context.py b/mula/scheduler/context/context.py index a5090124ace..a59688551d0 100644 --- a/mula/scheduler/context/context.py +++ b/mula/scheduler/context/context.py @@ -45,21 +45,26 @@ def __init__(self) -> None: katalogus_service = services.Katalogus( host=remove_trailing_slash(str(self.config.host_katalogus)), source=f"scheduler/{scheduler.__version__}", + timeout=self.config.katalogus_request_timeout, + pool_connections=self.config.katalogus_pool_connections, cache_ttl=self.config.katalogus_cache_ttl, ) bytes_service = services.Bytes( host=remove_trailing_slash(str(self.config.host_bytes)), + source=f"scheduler/{scheduler.__version__}", user=self.config.host_bytes_user, password=self.config.host_bytes_password, - source=f"scheduler/{scheduler.__version__}", + timeout=self.config.bytes_request_timeout, + pool_connections=self.config.bytes_pool_connections, ) octopoes_service = services.Octopoes( host=remove_trailing_slash(str(self.config.host_octopoes)), source=f"scheduler/{scheduler.__version__}", - orgs=katalogus_service.get_organisations(), timeout=self.config.octopoes_request_timeout, + pool_connections=self.config.octopoes_pool_connections, + orgs=katalogus_service.get_organisations(), ) # Register external services, SimpleNamespace allows us to use dot diff --git a/mula/tests/integration/test_services.py b/mula/tests/integration/test_services.py index 29324d6aa95..ba92d5a30fa 100644 --- a/mula/tests/integration/test_services.py +++ b/mula/tests/integration/test_services.py @@ -16,6 +16,8 @@ def setUp(self) -> None: host=remove_trailing_slash(str(self.config.host_bytes)), user=self.config.host_bytes_user, password=self.config.host_bytes_password, + timeout=self.config.bytes_request_timeout, + pool_connections=self.config.bytes_pool_connections, source="scheduler_test", ) @@ -54,6 +56,8 @@ def setUp(self) -> None: self.service_katalogus = services.Katalogus( host=remove_trailing_slash(str(self.config.host_katalogus)), source="scheduler_test", + timeout=self.config.katalogus_request_timeout, + pool_connections=self.config.katalogus_pool_connections, cache_ttl=12345, ) diff --git a/mula/tests/scripts/__init__.py b/mula/tests/scripts/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/mula/tests/scripts/load.py b/mula/tests/scripts/load.py new file mode 100644 index 00000000000..ef08a91114c --- /dev/null +++ b/mula/tests/scripts/load.py @@ -0,0 +1,123 @@ +import csv +import uuid +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, List + +import requests + +OCTOPOES_API = "http://localhost:8001" +KATALOGUS_API = "http://localhost:8003" +SCHEDULER_API = "http://localhost:8004" + + +def run(): + # Create organisations + orgs: List[Dict[str, Any]] = [] + for n in range(1, 10): + org = { + "id": f"org-{n}", + "name": f"Organisation {n}", + } + orgs.append(org) + + resp_katalogus = requests.post( + url=f"{KATALOGUS_API}/v1/organisations/", + json=org, + ) + + try: + resp_katalogus.raise_for_status() + except requests.exceptions.HTTPError: + if resp_katalogus.status_code != 404: + print("Error creating organisation ", org) + raise + + if resp_katalogus.status_code == 404: + print("Organisation already exists in katalogus", org) + + try: + requests.post( + url=f"{OCTOPOES_API}/{org.get('id')}/node/", + ) + except requests.exceptions.HTTPError: + print("Error creating organisation ", org) + raise + + print("Created organisation ", org) + + # Enable boefjes for organisation + boefjes = ("dns-records", "dns-sec", "dns-zone") + for boefje_id in boefjes: + resp_enable_boefje = requests.patch( + url=f"{KATALOGUS_API}/v1/organisations/{org.get('id')}/repositories/LOCAL/plugins/{boefje_id}", + json={"enabled": True}, + ) + + try: + resp_enable_boefje.raise_for_status() + except requests.exceptions.HTTPError: + print("Error enabling boefje ", boefje_id) + raise + + print("Enabled boefje ", boefje_id) + + declarations: List[Dict[str, Any]] = [] + with Path("data.csv").open(newline="") as csv_file: + csv_reader = csv.DictReader(csv_file, delimiter=",", quotechar='"') + for row in csv_reader: + name = row["name"] + declaration = { + "ooi": { + "object_type": "Hostname", + "primary_key": f"Hostname|internet|{name}", + "network": "Network|internet", + "name": f"{name}", + "dns_zone": None, + "scan_profile": { + "scan_profile_type": "declared", + "level": 1, + "reference": f"Hostname|internet|{name}", + }, + }, + "valid_time": datetime.now(timezone.utc).isoformat(), + "method": None, + "task_id": str(uuid.uuid4()), + } + declarations.append(declaration) + + for org in orgs: + for declaration in declarations: + resp_octopoes_decl = requests.post(f"{OCTOPOES_API}/{org.get('id')}/declarations", json=declaration) + + try: + resp_octopoes_decl.raise_for_status() + except requests.exceptions.HTTPError: + print("Error creating declaration ", declaration) + print(resp_octopoes_decl.text) + raise + + print("Org", org.get("id"), "created declaration ", declaration) + + resp_octopoes_scan_profile = requests.put( + url=f"{OCTOPOES_API}/{org.get('id')}/scan_profiles", + params={"valid_time": datetime.now(timezone.utc)}, + json={ + "scan_profile_type": "declared", + "reference": declaration.get("ooi").get("scan_profile").get("reference"), + "level": declaration.get("ooi").get("scan_profile").get("level"), + }, + ) + + try: + resp_octopoes_scan_profile.raise_for_status() + except requests.exceptions.HTTPError: + print("Error creating scan profile", declaration.get("ooi").get("scan_profile")) + print(resp_octopoes_scan_profile.text) + raise + + print("Org {org.get('id')} created scan profile", declaration.get("ooi").get("scan_profile")) + + +if __name__ == "__main__": + run()