Skip to content

Commit

Permalink
Remove blocked only job types (#2958)
Browse files Browse the repository at this point in the history
* remove WORKER_JOB_TYPES_BLOCKED

* remove WORKER_JOB_TYPES_ONLY
  • Loading branch information
severo authored Jun 27, 2024
1 parent 6b57e68 commit cd832d5
Show file tree
Hide file tree
Showing 12 changed files with 7 additions and 112 deletions.
6 changes: 0 additions & 6 deletions chart/env/prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -428,8 +428,6 @@ workers:
uvicornPort: 8080
workerDifficultyMax: 100
workerDifficultyMin: 70
workerJobTypesBlocked: ""
workerJobTypesOnly: ""
nodeSelector:
role-datasets-server-worker: "true"
tolerations:
Expand Down Expand Up @@ -459,8 +457,6 @@ workers:
uvicornPort: 8080
workerDifficultyMax: 70
workerDifficultyMin: 40
workerJobTypesBlocked: ""
workerJobTypesOnly: ""
nodeSelector:
role-datasets-server-worker: "true"
tolerations:
Expand Down Expand Up @@ -490,8 +486,6 @@ workers:
uvicornPort: 8080
workerDifficultyMax: 40
workerDifficultyMin: 0
workerJobTypesBlocked: ""
workerJobTypesOnly: ""
nodeSelector:
role-datasets-server-worker-light: "true"
tolerations:
Expand Down
4 changes: 0 additions & 4 deletions chart/env/staging.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,6 @@ workers:
uvicornPort: 8080
workerDifficultyMax: 100
workerDifficultyMin: 0
workerJobTypesBlocked: ""
workerJobTypesOnly: ""
replicas: 1
autoscaling:
enabled: true
Expand All @@ -284,8 +282,6 @@ workers:
uvicornPort: 8080
workerDifficultyMax: 40
workerDifficultyMin: 0
workerJobTypesBlocked: ""
workerJobTypesOnly: ""
replicas: 1
autoscaling:
enabled: true
Expand Down
4 changes: 0 additions & 4 deletions chart/templates/worker/_container.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@
value: {{ .workerValues.workerDifficultyMax | quote }}
- name: WORKER_DIFFICULTY_MIN
value: {{ .workerValues.workerDifficultyMin | quote }}
- name: WORKER_JOB_TYPES_BLOCKED
value: {{ .workerValues.workerJobTypesBlocked | quote }}
- name: WORKER_JOB_TYPES_ONLY
value: {{ .workerValues.workerJobTypesOnly | quote }}
- name: ROWS_INDEX_MAX_ARROW_DATA_IN_MEMORY
value: {{ .Values.rowsIndex.maxArrowDataInMemory | quote }}
# prometheus
Expand Down
4 changes: 0 additions & 4 deletions chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -563,10 +563,6 @@ workers:
workerDifficultyMax: 100
# min difficulty of the jobs that this worker will process
workerDifficultyMin: 0
# job types that this worker will not process
workerJobTypesBlocked: ""
# job types that this worker can process
workerJobTypesOnly: ""
# Directory where the uvicorn workers share their prometheus metrics
# see https://github.com/prometheus/client_python#multiprocess-mode-eg-gunicorn
prometheusMultiprocDirectory: "/tmp"
Expand Down
37 changes: 2 additions & 35 deletions libs/libcommon/src/libcommon/queue/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,6 @@ class StartedJobError(Exception):


class JobQueryFilters(TypedDict, total=False):
type__nin: list[str]
type__in: list[str]
difficulty__gt: int
difficulty__lte: int
dataset__nin: list[str]
Expand Down Expand Up @@ -381,8 +379,6 @@ def _get_next_waiting_job_for_priority(
priority: Priority,
difficulty_min: Optional[int] = None,
difficulty_max: Optional[int] = None,
job_types_blocked: Optional[list[str]] = None,
job_types_only: Optional[list[str]] = None,
) -> JobDocument:
"""Get the next job in the queue for a given priority.
Expand All @@ -396,27 +392,18 @@ def _get_next_waiting_job_for_priority(
priority (`Priority`): The priority of the job.
difficulty_min (`int`, *optional*): if not None, only jobs with a difficulty greater or equal to this value are considered.
difficulty_max (`int`, *optional*): if not None, only jobs with a difficulty lower or equal to this value are considered.
job_types_blocked (`list[str]`, *optinoal*): if not None, jobs of the given types are not considered.
job_types_only (`list[str]`, *optional*): if not None, only jobs of the given types are considered.
Raises:
[`EmptyQueueError`]: if there is no waiting job in the queue that satisfies the restrictions above.
Returns:
`JobDocument`: the next waiting job for priority
"""
logging.debug(
f"Getting next waiting job for priority {priority}, blocked types: {job_types_blocked}, only types:"
f" {job_types_only}"
)
logging.debug(f"Getting next waiting job for priority {priority}")
blocked_datasets = get_blocked_datasets()
logging.debug(f"Blocked datasets: {blocked_datasets}")

filters: JobQueryFilters = {}
if job_types_blocked:
filters["type__nin"] = job_types_blocked
if job_types_only:
filters["type__in"] = job_types_only
if difficulty_min is not None and difficulty_min > DEFAULT_DIFFICULTY_MIN:
filters["difficulty__gt"] = difficulty_min
if difficulty_max is not None and difficulty_max < DEFAULT_DIFFICULTY_MAX:
Expand Down Expand Up @@ -486,8 +473,6 @@ def get_next_waiting_job(
self,
difficulty_min: Optional[int] = None,
difficulty_max: Optional[int] = None,
job_types_blocked: Optional[list[str]] = None,
job_types_only: Optional[list[str]] = None,
) -> JobDocument:
"""Get the next job in the queue.
Expand All @@ -501,8 +486,6 @@ def get_next_waiting_job(
Args:
difficulty_min (`int`, *optional*): if not None, only jobs with a difficulty greater or equal to this value are considered.
difficulty_max (`int`, *optional*): if not None, only jobs with a difficulty lower or equal to this value are considered.
job_types_blocked (`list[str]`, *optional*): if not None, jobs of the given types are not considered.
job_types_only (`list[str]`, *optional*): if not None, only jobs of the given types are considered.
Raises:
[`EmptyQueueError`]: if there is no waiting job in the queue that satisfies the restrictions above.
Expand All @@ -514,8 +497,6 @@ def get_next_waiting_job(
with contextlib.suppress(EmptyQueueError):
return self._get_next_waiting_job_for_priority(
priority=priority,
job_types_blocked=job_types_blocked,
job_types_only=job_types_only,
difficulty_min=difficulty_min,
difficulty_max=difficulty_max,
)
Expand Down Expand Up @@ -589,8 +570,6 @@ def start_job(
self,
difficulty_min: Optional[int] = None,
difficulty_max: Optional[int] = None,
job_types_blocked: Optional[list[str]] = None,
job_types_only: Optional[list[str]] = None,
) -> JobInfo:
"""Start the next job in the queue.
Expand All @@ -600,8 +579,6 @@ def start_job(
Args:
difficulty_min: if not None, only jobs with a difficulty greater or equal to this value are considered.
difficulty_max: if not None, only jobs with a difficulty lower or equal to this value are considered.
job_types_blocked: if not None, jobs of the given types are not considered.
job_types_only: if not None, only jobs of the given types are considered.
Raises:
[`EmptyQueueError`]: if there is no job in the queue, within the limit of the maximum number of started jobs
Expand All @@ -613,23 +590,13 @@ def start_job(
`JobInfo`: the job id, the type, the input arguments: dataset, revision, config and split
"""

logging.debug(f"looking for a job to start, blocked types: {job_types_blocked}, only types: {job_types_only}")
logging.debug("looking for a job to start")
next_waiting_job = self.get_next_waiting_job(
job_types_blocked=job_types_blocked,
job_types_only=job_types_only,
difficulty_min=difficulty_min,
difficulty_max=difficulty_max,
)
logging.debug(f"job found: {next_waiting_job}")
# ^ can raise EmptyQueueError
if job_types_blocked and next_waiting_job.type in job_types_blocked:
raise RuntimeError(
f"The job type {next_waiting_job.type} is in the list of blocked job types {job_types_only}"
)
if job_types_only and next_waiting_job.type not in job_types_only:
raise RuntimeError(
f"The job type {next_waiting_job.type} is not in the list of allowed job types {job_types_only}"
)
started_job = self._start_newest_job_and_delete_others(job=next_waiting_job)
return started_job.info()

Expand Down
48 changes: 3 additions & 45 deletions libs/libcommon/tests/queue/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,48 +346,6 @@ def test_priority_logic_priority_order() -> None:
queue.start_job()


@pytest.mark.parametrize(
"job_types_blocked,job_types_only,should_raise",
[
(None, None, False),
(None, ["test_type"], False),
(["other_type"], None, False),
(["other_type"], ["test_type"], False),
(None, ["other_type"], True),
(["test_type"], None, True),
(["test_type"], ["test_type"], True),
(["other_type", "test_type"], None, True),
(["other_type"], ["other_type"], True),
(["other_type", "test_type"], ["other_type", "test_type"], True),
],
)
def test_job_types_only(
job_types_blocked: Optional[list[str]], job_types_only: Optional[list[str]], should_raise: bool
) -> None:
job_type = "test_type"
test_dataset = "test_dataset"
test_revision = "test_revision"
test_difficulty = 50
queue = Queue()
queue.add_job(
job_type=job_type,
dataset=test_dataset,
revision=test_revision,
config=None,
split=None,
difficulty=test_difficulty,
)
assert queue.is_job_in_process(
job_type=job_type, dataset=test_dataset, revision=test_revision, config=None, split=None
)
if should_raise:
with pytest.raises(EmptyQueueError):
queue.start_job(job_types_blocked=job_types_blocked, job_types_only=job_types_only)
else:
job_info = queue.start_job(job_types_blocked=job_types_blocked, job_types_only=job_types_only)
assert job_info["params"]["dataset"] == test_dataset


@pytest.mark.parametrize(
"difficulty_min,difficulty_max,should_raise",
[
Expand Down Expand Up @@ -533,7 +491,7 @@ def test_queue_heartbeat() -> None:
split="split1",
difficulty=test_difficulty,
)
queue.start_job(job_types_only=[job_type])
queue.start_job()
assert job.last_heartbeat is None
queue.heartbeat(job.pk)
job.reload()
Expand All @@ -555,7 +513,7 @@ def test_queue_get_zombies() -> None:
split="split1",
difficulty=test_difficulty,
)
queue.start_job(job_types_only=[job_type])
queue.start_job()
queue.add_job(
job_type=job_type,
dataset="dataset1",
Expand All @@ -564,7 +522,7 @@ def test_queue_get_zombies() -> None:
split="split2",
difficulty=test_difficulty,
)
queue.start_job(job_types_only=[job_type])
queue.start_job()
assert queue.get_zombies(max_seconds_without_heartbeat=10) == [zombie.info()]
assert queue.get_zombies(max_seconds_without_heartbeat=-1) == []
assert queue.get_zombies(max_seconds_without_heartbeat=0) == []
Expand Down
2 changes: 1 addition & 1 deletion libs/libcommon/tests/test_backfill_on_real_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def test_plan_job_creation_and_termination() -> None:
)

# we simulate the job for "dataset-config-names,dataset,revision" has finished
job_info = Queue().start_job(job_types_only=["dataset-config-names"])
job_info = Queue().start_job()
upsert_response(
kind=job_info["type"],
dataset=job_info["params"]["dataset"],
Expand Down
2 changes: 0 additions & 2 deletions services/worker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ Set environment variables to configure the worker.
- `WORKER_DIFFICULTY_MAX`: the maximum difficulty (included) of the jobs to process. Difficulty will always be a strictly positive integer, and its max value is 100. Defaults to None.
- `WORKER_DIFFICULTY_MIN`: the minimum difficulty (excluded) of the jobs to process. Difficulty will always be a strictly positive integer, and its max value is 100. Defaults to None.
- `WORKER_HEARTBEAT_INTERVAL_SECONDS`: the time interval between two heartbeats. Each heartbeat updates the job "last_heartbeat" field in the queue. Defaults to `60` (1 minute).
- `WORKER_JOB_TYPES_BLOCKED`: comma-separated list of job types that will not be processed, e.g. "dataset-config-names,dataset-split-names". If empty, no job type is blocked. Defaults to empty.
- `WORKER_JOB_TYPES_ONLY`: comma-separated list of the non-blocked job types to process, e.g. "dataset-config-names,dataset-split-names". If empty, the worker processes all the non-blocked jobs. Defaults to empty.
- `WORKER_KILL_LONG_JOB_INTERVAL_SECONDS`: the time interval at which the worker looks for long jobs to kill them. Defaults to `60` (1 minute).
- `WORKER_KILL_ZOMBIES_INTERVAL_SECONDS`: the time interval at which the worker looks for zombie jobs to kill them. Defaults to `600` (10 minutes).
- `WORKER_MAX_JOB_DURATION_SECONDS`: the maximum duration allowed for a job to run. If the job runs longer, it is killed (see `WORKER_KILL_LONG_JOB_INTERVAL_SECONDS`). Defaults to `1200` (20 minutes).
Expand Down
4 changes: 0 additions & 4 deletions services/worker/src/worker/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ class WorkerConfig:
difficulty_max: Optional[int] = WORKER_DIFFICULTY_MAX
difficulty_min: Optional[int] = WORKER_DIFFICULTY_MIN
heartbeat_interval_seconds: float = WORKER_HEARTBEAT_INTERVAL_SECONDS
job_types_blocked: list[str] = field(default_factory=get_empty_str_list)
job_types_only: list[str] = field(default_factory=get_empty_str_list)
kill_long_job_interval_seconds: float = WORKER_KILL_LONG_JOB_INTERVAL_SECONDS
kill_zombies_interval_seconds: float = WORKER_KILL_ZOMBIES_INTERVAL_SECONDS
max_job_duration_seconds: float = WORKER_MAX_JOB_DURATION_SECONDS
Expand All @@ -84,8 +82,6 @@ def from_env(cls) -> "WorkerConfig":
heartbeat_interval_seconds=env.float(
name="HEARTBEAT_INTERVAL_SECONDS", default=WORKER_HEARTBEAT_INTERVAL_SECONDS
),
job_types_blocked=env.list(name="JOB_TYPES_BLOCKED", default=get_empty_str_list()),
job_types_only=env.list(name="JOB_TYPES_ONLY", default=get_empty_str_list()),
kill_long_job_interval_seconds=env.float(
name="KILL_LONG_JOB_INTERVAL_SECONDS", default=WORKER_KILL_LONG_JOB_INTERVAL_SECONDS
),
Expand Down
2 changes: 0 additions & 2 deletions services/worker/src/worker/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,6 @@ def process_next_job(self) -> bool:
job_info = self.queue.start_job(
difficulty_min=self.app_config.worker.difficulty_min,
difficulty_max=self.app_config.worker.difficulty_max,
job_types_blocked=self.app_config.worker.job_types_blocked,
job_types_only=self.app_config.worker.job_types_only,
)
self.set_worker_state(current_job_info=job_info)
logging.debug(f"job assigned: {job_info}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ def test_concurrency(
)
queue = Queue()
queue.create_jobs([job_info])
job_info = queue.start_job(job_types_only=["dataset-config-names"])
job_info = queue.start_job()
job_manager = JobManager(
job_info=job_info,
app_config=app_config,
Expand Down
4 changes: 0 additions & 4 deletions services/worker/tests/test_loop.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from dataclasses import replace

from libcommon.dtos import JobInfo
from libcommon.resources import CacheMongoResource, QueueMongoResource

Expand Down Expand Up @@ -40,8 +38,6 @@ def test_process_next_job(
queue_mongo_resource: QueueMongoResource,
worker_state_file_path: str,
) -> None:
app_config = replace(app_config, worker=replace(app_config.worker, job_types_only=[JOB_TYPE]))

factory = DummyJobRunnerFactory(app_config=app_config)

loop = Loop(
Expand Down

0 comments on commit cd832d5

Please sign in to comment.