Skip to content

Commit

Permalink
Fix worker IP update to use one single DB session
Browse files Browse the repository at this point in the history
  • Loading branch information
benoit74 committed Oct 30, 2023
1 parent aa387e3 commit bc3391d
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 16 deletions.
12 changes: 11 additions & 1 deletion dispatcher/backend/src/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@

from common.enum import SchedulePeriodicity


def refreshable_constant(fn):
"""Refreshable constants helper for those we have interest to live update"""
return fn


OPENSSL_BIN = os.getenv("OPENSSL_BIN", "/usr/bin/openssl")
MESSAGE_VALIDITY = 60 # number of seconds before a message expire

Expand Down Expand Up @@ -66,7 +72,11 @@
# using the following, it is possible to automate
# the update of a whitelist of workers IPs on Wasabi (S3 provider)
# enable this feature (default is off)
USES_WORKERS_IPS_WHITELIST = bool(os.getenv("USES_WORKERS_IPS_WHITELIST", ""))
# Nota: this is a refreshable constant so that it can be dynamically updated
# (including in tests)
USES_WORKERS_IPS_WHITELIST = refreshable_constant(
lambda: bool(os.getenv("USES_WORKERS_IPS_WHITELIST", ""))
)
MAX_WORKER_IP_CHANGES_PER_DAY = 4
# wasabi URL with credentials to update policy
WASABI_URL = os.getenv("WASABI_URL", "")
Expand Down
9 changes: 6 additions & 3 deletions dispatcher/backend/src/common/external.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@
logger = logging.getLogger(__name__)


def update_workers_whitelist():
def update_workers_whitelist(session: so.Session):
"""update whitelist of workers on external services"""
update_wasabi_whitelist(build_workers_whitelist())
IpUpdater.update_fn(build_workers_whitelist(session=session))


@dbsession
def build_workers_whitelist(session: so.Session) -> typing.List[str]:
"""list of worker IP adresses and networks (text) to use as whitelist"""
wl_networks = []
Expand Down Expand Up @@ -150,6 +149,10 @@ def get_statement():
)


class IpUpdater:
update_fn = update_wasabi_whitelist


@dbsession
def advertise_books_to_cms(task_id: UUID, session: so.Session):
"""inform openZIM CMS of all created ZIMs in the farm for this task
Expand Down
19 changes: 10 additions & 9 deletions dispatcher/backend/src/routes/requested_tasks/requested_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@
logger = logging.getLogger(__name__)


def record_ip_change(worker_name):
def record_ip_change(session: so.Session, worker_name: str):
"""record that this worker changed its IP and trigger whitelist changes"""
today = datetime.date.today()
# counts and limits are per-day so reset it if date changed
if today != WorkersIpChangesCounts.today:
WorkersIpChangesCounts.reset()
if WorkersIpChangesCounts.add(worker_name) <= MAX_WORKER_IP_CHANGES_PER_DAY:
update_workers_whitelist()
update_workers_whitelist(session)
else:
logger.error(
f"Worker {worker_name} IP changes for {today} "
Expand Down Expand Up @@ -229,15 +229,16 @@ def get(self, session: so.Session, token: AccessToken.Payload):
worker = dbm.Worker.get(session, worker_name, WorkerNotFound)
if worker.user.username == token.username:
worker.last_seen = getnow()
previous_ip = str(worker.last_ip)
worker.last_ip = worker_ip

# flush to DB so that record_ip_change has access to updated IP
session.flush()

# IP changed since last encounter
if USES_WORKERS_IPS_WHITELIST and previous_ip != worker_ip:
record_ip_change(worker_name)
if str(worker.last_ip) != worker_ip:
logger.info(
f"Worker IP changed detected for {worker_name}: "
f"IP changed from {worker.last_ip} to {worker_ip}"
)
worker.last_ip = worker_ip
if USES_WORKERS_IPS_WHITELIST():
record_ip_change(session=session, worker_name=worker_name)

request_args = WorkerRequestedTaskSchema().load(request_args)

Expand Down
11 changes: 11 additions & 0 deletions dispatcher/backend/src/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from db import Session
import pytest
from typing import Generator

from sqlalchemy.orm import Session as OrmSession


@pytest.fixture
def dbsession() -> Generator[OrmSession, None, None]:
with Session.begin() as session:
yield session
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import os
from typing import List

import pytest

from common.external import build_workers_whitelist
from common.external import IpUpdater, build_workers_whitelist


class TestWorkersCommon:
def test_build_workers_whitelist(self, workers):
whitelist = build_workers_whitelist()
def test_build_workers_whitelist(self, workers, dbsession):
whitelist = build_workers_whitelist(session=dbsession)
# - 4 because:
# 2 workers have a duplicate IP
# 1 worker has an IP missing
Expand Down Expand Up @@ -206,3 +209,59 @@ def test_checkin_another_user(
# response.get_json()["error"]
# == "worker with same name already exists for another user"
# )


class TestWorkerRequestedTasks:
def test_requested_task_worker_as_admin(self, client, access_token, worker):
response = client.get(
"/requested-tasks/worker",
query_string={
"worker": worker["name"],
"avail_cpu": 4,
"avail_memory": 2048,
"avail_disk": 4096,
},
headers={"Authorization": access_token},
)
assert response.status_code == 200

def test_requested_task_worker_as_worker(self, client, make_access_token, worker):
response = client.get(
"/requested-tasks/worker",
query_string={
"worker": worker["name"],
"avail_cpu": 4,
"avail_memory": 2048,
"avail_disk": 4096,
},
headers={"Authorization": make_access_token(worker["username"], "worker")},
)
assert response.status_code == 200

new_ip_address = "88.88.88.88"

def custom_ip_update(self, ip_addresses: List):
self.ip_updated = True
assert TestWorkerRequestedTasks.new_ip_address in ip_addresses

def test_requested_task_worker_update_ip_whitelist(
self, client, make_access_token, worker
):
self.ip_updated = False
IpUpdater.update_fn = self.custom_ip_update
os.environ["USES_WORKERS_IPS_WHITELIST"] = "1"
response = client.get(
"/requested-tasks/worker",
query_string={
"worker": worker["name"],
"avail_cpu": 4,
"avail_memory": 2048,
"avail_disk": 4096,
},
headers={
"Authorization": make_access_token(worker["username"], "worker"),
"X-Forwarded-For": TestWorkerRequestedTasks.new_ip_address,
},
)
assert response.status_code == 200
assert self.ip_updated

0 comments on commit bc3391d

Please sign in to comment.