Skip to content

Commit

Permalink
Scheduler optimizations implementation (#1999)
Browse files Browse the repository at this point in the history
Co-authored-by: ammar92 <[email protected]>
  • Loading branch information
jpbruinsslot and ammar92 authored Nov 10, 2023
1 parent 39a8f96 commit 089f63e
Show file tree
Hide file tree
Showing 10 changed files with 206 additions and 29 deletions.
25 changes: 13 additions & 12 deletions mula/scheduler/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,22 +175,23 @@ def collect_metrics(self) -> None:
This method that allows to collect metrics throughout the application.
"""
for s in self.schedulers.values():
self.ctx.metrics_qsize.labels(
scheduler_id=s.scheduler_id,
).set(
s.queue.qsize(),
)

status_counts = self.ctx.datastores.task_store.get_status_counts(s.scheduler_id)
for status, count in status_counts.items():
self.ctx.metrics_task_status_counts.labels(
with self.lock:
for s in self.schedulers.values():
self.ctx.metrics_qsize.labels(
scheduler_id=s.scheduler_id,
status=status,
).set(
count,
s.queue.qsize(),
)

status_counts = self.ctx.datastores.task_store.get_status_counts(s.scheduler_id)
for status, count in status_counts.items():
self.ctx.metrics_task_status_counts.labels(
scheduler_id=s.scheduler_id,
status=status,
).set(
count,
)

def run(self) -> None:
"""Start the main scheduler application, and run in threads the
following processes:
Expand Down
36 changes: 31 additions & 5 deletions mula/scheduler/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,6 @@ class Settings(BaseSettings):
)

# Application settings
katalogus_cache_ttl: int = Field(
30,
description="The lifetime of the katalogus cache in seconds",
)

monitor_organisations_interval: int = Field(
60,
description="Interval in seconds of the execution of the "
Expand All @@ -90,11 +85,42 @@ class Settings(BaseSettings):
"their schedulers.",
)

# External services settings
octopoes_request_timeout: int = Field(
10,
description="The timeout in seconds for the requests to the octopoes api",
)

octopoes_pool_connections: int = Field(
10,
description="The maximum number of connections to save in the pool for the octopoes api",
)

katalogus_cache_ttl: int = Field(
30,
description="The lifetime of the katalogus cache in seconds",
)

katalogus_request_timeout: int = Field(
10,
description="The timeout in seconds for the requests to the katalogus api",
)

katalogus_pool_connections: int = Field(
10,
description="The maximum number of connections to save in the pool for the katalogus api",
)

bytes_request_timeout: int = Field(
10,
description="The timeout in seconds for the requests to the bytes api",
)

bytes_pool_connections: int = Field(
10,
description="The maximum number of connections to save in the pool for the bytes api",
)

rabbitmq_prefetch_count: int = Field(
100,
description="RabbitMQ prefetch_count for `channel.basic_qos()`, "
Expand Down
10 changes: 7 additions & 3 deletions mula/scheduler/connectors/services/bytes.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import threading
import typing
from functools import wraps
from typing import Any, Callable, Dict, Optional
Expand Down Expand Up @@ -33,7 +34,7 @@ class Bytes(HTTPService):

name = "bytes"

def __init__(self, host: str, source: str, user: str, password: str, timeout: int = 5):
def __init__(self, host: str, source: str, user: str, password: str, timeout: int, pool_connections: int):
"""Initialize the Bytes service.
Args:
Expand All @@ -48,10 +49,13 @@ def __init__(self, host: str, source: str, user: str, password: str, timeout: in
"password": password,
}

super().__init__(host=host, source=source, timeout=timeout)
self.lock: threading.Lock = threading.Lock()

super().__init__(host, source, timeout, pool_connections)

def login(self) -> None:
self.headers.update({"Authorization": f"bearer {self.get_token()}"})
with self.lock:
self.headers.update({"Authorization": f"bearer {self.get_token()}"})

@staticmethod
def _verify_response(response: requests.Response) -> None:
Expand Down
4 changes: 2 additions & 2 deletions mula/scheduler/connectors/services/katalogus.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ class Katalogus(HTTPService):

name = "katalogus"

def __init__(self, host: str, source: str, timeout: int = 5, cache_ttl: int = 30):
super().__init__(host, source, timeout)
def __init__(self, host: str, source: str, timeout: int, pool_connections: int, cache_ttl: int = 30):
super().__init__(host, source, timeout, pool_connections)

# For every organisation we cache its plugins, it references the
# plugin-id as key and the plugin as value.
Expand Down
3 changes: 2 additions & 1 deletion mula/scheduler/connectors/services/octopoes.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ def __init__(
host: str,
source: str,
orgs: List[Organisation],
pool_connections: int,
timeout: int = 10,
):
self.orgs: List[Organisation] = orgs
super().__init__(host, source, timeout)
super().__init__(host, source, timeout, pool_connections)

@exception_handler
def get_objects_by_object_types(
Expand Down
21 changes: 17 additions & 4 deletions mula/scheduler/connectors/services/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,14 @@ class HTTPService(Connector):
name: Optional[str] = None
health_endpoint: Optional[str] = "health"

def __init__(self, host: str, source: str, timeout: int = 5, retries: int = 5):
def __init__(
self,
host: str,
source: str,
timeout: int = 10,
pool_connections: int = 10,
retries: int = 5,
):
"""Initializer of the HTTPService class. During initialization the
host will be checked if it is available and healthy.
Expand All @@ -51,6 +58,8 @@ def __init__(self, host: str, source: str, timeout: int = 5, retries: int = 5):
from where the requests came from.
timeout:
An integer defining the timeout of requests.
pool_connections:
The number of connections kept alive in the pool.
retries:
An integer defining the number of retries to make before
giving up.
Expand All @@ -61,16 +70,20 @@ def __init__(self, host: str, source: str, timeout: int = 5, retries: int = 5):
self.session: requests.Session = requests.Session()
self.host: str = host
self.timeout: int = timeout
self.retries = retries
self.retries: int = retries
self.pool_connections: int = pool_connections
self.source: str = source

max_retries = Retry(
total=self.retries,
backoff_factor=0.1,
status_forcelist=[500, 502, 503, 504],
)
self.session.mount("http://", HTTPAdapter(max_retries=max_retries))
self.session.mount("https://", HTTPAdapter(max_retries=max_retries))

# Mount the HTTPAdapter to the session
http_adapter = HTTPAdapter(max_retries=max_retries, pool_connections=self.pool_connections)
self.session.mount("http://", http_adapter)
self.session.mount("https://", http_adapter)

if self.source:
self.headers["User-Agent"] = self.source
Expand Down
9 changes: 7 additions & 2 deletions mula/scheduler/context/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,26 @@ def __init__(self) -> None:
katalogus_service = services.Katalogus(
host=remove_trailing_slash(str(self.config.host_katalogus)),
source=f"scheduler/{scheduler.__version__}",
timeout=self.config.katalogus_request_timeout,
pool_connections=self.config.katalogus_pool_connections,
cache_ttl=self.config.katalogus_cache_ttl,
)

bytes_service = services.Bytes(
host=remove_trailing_slash(str(self.config.host_bytes)),
source=f"scheduler/{scheduler.__version__}",
user=self.config.host_bytes_user,
password=self.config.host_bytes_password,
source=f"scheduler/{scheduler.__version__}",
timeout=self.config.bytes_request_timeout,
pool_connections=self.config.bytes_pool_connections,
)

octopoes_service = services.Octopoes(
host=remove_trailing_slash(str(self.config.host_octopoes)),
source=f"scheduler/{scheduler.__version__}",
orgs=katalogus_service.get_organisations(),
timeout=self.config.octopoes_request_timeout,
pool_connections=self.config.octopoes_pool_connections,
orgs=katalogus_service.get_organisations(),
)

# Register external services, SimpleNamespace allows us to use dot
Expand Down
4 changes: 4 additions & 0 deletions mula/tests/integration/test_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ def setUp(self) -> None:
host=remove_trailing_slash(str(self.config.host_bytes)),
user=self.config.host_bytes_user,
password=self.config.host_bytes_password,
timeout=self.config.bytes_request_timeout,
pool_connections=self.config.bytes_pool_connections,
source="scheduler_test",
)

Expand Down Expand Up @@ -54,6 +56,8 @@ def setUp(self) -> None:
self.service_katalogus = services.Katalogus(
host=remove_trailing_slash(str(self.config.host_katalogus)),
source="scheduler_test",
timeout=self.config.katalogus_request_timeout,
pool_connections=self.config.katalogus_pool_connections,
cache_ttl=12345,
)

Expand Down
Empty file added mula/tests/scripts/__init__.py
Empty file.
123 changes: 123 additions & 0 deletions mula/tests/scripts/load.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import csv
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List

import requests

OCTOPOES_API = "http://localhost:8001"
KATALOGUS_API = "http://localhost:8003"
SCHEDULER_API = "http://localhost:8004"


def run():
# Create organisations
orgs: List[Dict[str, Any]] = []
for n in range(1, 10):
org = {
"id": f"org-{n}",
"name": f"Organisation {n}",
}
orgs.append(org)

resp_katalogus = requests.post(
url=f"{KATALOGUS_API}/v1/organisations/",
json=org,
)

try:
resp_katalogus.raise_for_status()
except requests.exceptions.HTTPError:
if resp_katalogus.status_code != 404:
print("Error creating organisation ", org)
raise

if resp_katalogus.status_code == 404:
print("Organisation already exists in katalogus", org)

try:
requests.post(
url=f"{OCTOPOES_API}/{org.get('id')}/node/",
)
except requests.exceptions.HTTPError:
print("Error creating organisation ", org)
raise

print("Created organisation ", org)

# Enable boefjes for organisation
boefjes = ("dns-records", "dns-sec", "dns-zone")
for boefje_id in boefjes:
resp_enable_boefje = requests.patch(
url=f"{KATALOGUS_API}/v1/organisations/{org.get('id')}/repositories/LOCAL/plugins/{boefje_id}",
json={"enabled": True},
)

try:
resp_enable_boefje.raise_for_status()
except requests.exceptions.HTTPError:
print("Error enabling boefje ", boefje_id)
raise

print("Enabled boefje ", boefje_id)

declarations: List[Dict[str, Any]] = []
with Path("data.csv").open(newline="") as csv_file:
csv_reader = csv.DictReader(csv_file, delimiter=",", quotechar='"')
for row in csv_reader:
name = row["name"]
declaration = {
"ooi": {
"object_type": "Hostname",
"primary_key": f"Hostname|internet|{name}",
"network": "Network|internet",
"name": f"{name}",
"dns_zone": None,
"scan_profile": {
"scan_profile_type": "declared",
"level": 1,
"reference": f"Hostname|internet|{name}",
},
},
"valid_time": datetime.now(timezone.utc).isoformat(),
"method": None,
"task_id": str(uuid.uuid4()),
}
declarations.append(declaration)

for org in orgs:
for declaration in declarations:
resp_octopoes_decl = requests.post(f"{OCTOPOES_API}/{org.get('id')}/declarations", json=declaration)

try:
resp_octopoes_decl.raise_for_status()
except requests.exceptions.HTTPError:
print("Error creating declaration ", declaration)
print(resp_octopoes_decl.text)
raise

print("Org", org.get("id"), "created declaration ", declaration)

resp_octopoes_scan_profile = requests.put(
url=f"{OCTOPOES_API}/{org.get('id')}/scan_profiles",
params={"valid_time": datetime.now(timezone.utc)},
json={
"scan_profile_type": "declared",
"reference": declaration.get("ooi").get("scan_profile").get("reference"),
"level": declaration.get("ooi").get("scan_profile").get("level"),
},
)

try:
resp_octopoes_scan_profile.raise_for_status()
except requests.exceptions.HTTPError:
print("Error creating scan profile", declaration.get("ooi").get("scan_profile"))
print(resp_octopoes_scan_profile.text)
raise

print("Org {org.get('id')} created scan profile", declaration.get("ooi").get("scan_profile"))


if __name__ == "__main__":
run()

0 comments on commit 089f63e

Please sign in to comment.