Skip to content

Commit

Permalink
Set celery task limits and fix worker timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
aarontp committed May 30, 2024
1 parent b162614 commit dbc4d9d
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 17 deletions.
6 changes: 3 additions & 3 deletions turbinia/config/turbinia_config_tmpl.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@
# still be enabled with the --jobs_allowlist flag on the server, but the client
# will not be able to allowlist jobs that have been disabled or denylisted on
# the server.
DISABLED_JOBS = ['BinaryExtractorJob', 'BulkExtractorJob', 'DfdeweyJob', 'HindsightJob', 'PhotorecJob'] # yapf: disable
DISABLED_JOBS = ['VolatilityJob', 'BinaryExtractorJob', 'BulkExtractorJob', 'DfdeweyJob', 'HindsightJob', 'PhotorecJob'] # yapf: disable

# Configure additional job dependency checks below.
DEPENDENCIES = [{
Expand Down Expand Up @@ -232,7 +232,7 @@
'job': 'PlasoJob',
'programs': ['log2timeline', 'pinfo'],
'docker_image': None,
'timeout': 86400
'timeout': 600
}, {
'job': 'PhotorecJob',
'programs': ['photorec'],
Expand Down Expand Up @@ -351,7 +351,7 @@
# These options are required for customizing the prometheus configuration
###############################################################################
# This will enable the Prometheus service for the workers and server.
PROMETHEUS_ENABLED = True
PROMETHEUS_ENABLED = False

# Prometheus listen address and port
PROMETHEUS_ADDR = '0.0.0.0'
Expand Down
45 changes: 32 additions & 13 deletions turbinia/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,11 @@
# Job/Task timeout value before it times out a given Task. This is to make sure
# that the Server doesn't time out the Task before the Worker has a chance to
# and should account for the Task scheduling and setup time that happens before
# the Task starts.
SERVER_TASK_TIMEOUT_BUFFER = 86400
# the Task starts. This time will be measured from the time the task is
# enqueue'd, not from when it actually starts on the worker so if there is a
# long wait for tasks to be executed they could potentially be timed out before
# even getting a chance to start so this limit is set conservatively high.
SERVER_TASK_TIMEOUT_BUFFER = 7200

# Define metrics
turbinia_server_tasks_total = Counter(
Expand Down Expand Up @@ -177,7 +180,7 @@ def abort_request(self, request_id, requester, evidence_name, message):
continue, an AbortTask will be created with the error message and is written
directly to the state database. This way the client will get a reasonable
error in response to the failure.
Args:
request_id(str): The request ID.
requester(str): The username of the requester.
Expand Down Expand Up @@ -418,7 +421,8 @@ def add_task(self, task, job, evidence_):
task.job_name = job.name
job.tasks.append(task)
self.state_manager.write_new_task(task)
self.enqueue_task(task, evidence_)
timeout_limit = jobs_manager.JobsManager.GetTimeoutValue(task.job_name)
self.enqueue_task(task, evidence_, timeout_limit)
turbinia_server_tasks_total.inc()
if task.id not in evidence_.tasks:
evidence_.tasks.append(task.id)
Expand Down Expand Up @@ -456,12 +460,13 @@ def remove_job(self, job_id):
turbinia_jobs_completed_total.inc()
return bool(remove_job)

def enqueue_task(self, task, evidence_):
def enqueue_task(self, task, evidence_, timeout_limit):
"""Enqueues a task and evidence in the implementation specific task queue.
Args:
task: An instantiated Turbinia Task
evidence_: An Evidence object to be processed.
timeout_limit(int): The timeout for the Task in seconds.
"""
raise NotImplementedError

Expand All @@ -483,10 +488,10 @@ def process_result(self, task_result):
"""
if task_result.successful is None:
log.error(
f'''Task {task_result.task_name} from {task_result.worker_name}
returned invalid success status "None". Setting this to False
so the client knows the Task is complete. Usually this means
that the Task returning the TurbiniaTaskResult did not call
f'''Task {task_result.task_name} from {task_result.worker_name}
returned invalid success status "None". Setting this to False
so the client knows the Task is complete. Usually this means
that the Task returning the TurbiniaTaskResult did not call
the close() method on it.
''')
turbinia_result_success_invalid.inc()
Expand Down Expand Up @@ -737,9 +742,23 @@ def get_evidence(self):

return evidence_list

def enqueue_task(self, task, evidence_):
def enqueue_task(self, task, evidence_, timeout):
log.info(
f'Adding Celery task {task.name:s} with evidence {evidence_.name:s}'
f' to queue')
task.stub = self.celery_runner.delay(
task.serialize(), evidence_.serialize())
f' to queue with timeout {timeout}')
self.celery_runner.max_retries = 0
# https://docs.celeryq.dev/en/stable/userguide/configuration.html#task-time-limit
# Hard limit in seconds, the worker processing the task will be killed and
# replaced with a new one when this is exceeded.
self.celery_runner.task_time_limit = 60
# TODO: change tiemouts to be between client and server timeouts.
# TODO: check exception in client and handle
# TODO: Add time to expire to account for scheduling wait
# Time limits described here:
# https://docs.celeryq.dev/en/stable/userguide/workers.html#time-limits
# task.stub = self.celery_runner.apply_async(
# (task.serialize(), evidence_.serialize()), retry=False,
# soft_time_limit=30, time_limit=60, expires=timeout)
task.stub = self.celery_runner.apply_async(
(task.serialize(), evidence_.serialize()), retry=False,
time_limit=60, expires=timeout)
13 changes: 12 additions & 1 deletion turbinia/workers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import pickle
import platform
import pprint
import psutil
import signal
import subprocess
import sys
import tempfile
Expand Down Expand Up @@ -692,6 +694,15 @@ def execute(
env=env, text=True, encoding="utf-8")
stdout, stderr = proc.communicate(timeout=timeout_limit)
except subprocess.TimeoutExpired as exception:
result.log(
'Job {0:s} with Task {1:s} has reached timeout limit of {2:d} so '
'killing child processes.'.format(
self.job_id, self.id, timeout_limit))
# Kill child processes and parent process so we can return, otherwise
# communicate() will hang waiting for the grand-children to be reaped.
psutil_proc = psutil.Process(proc.pid)
for child in psutil_proc.children(recursive=True):
child.send_signal(signal.SIGKILL)
proc.kill()
# Get any potential partial output so we can save it later.
stdout, stderr = proc.communicate()
Expand Down Expand Up @@ -877,7 +888,7 @@ def create_result(

def check_serialization_errors(self, result):
"""Checks the TurbiniaTaskResult is valid for serialization.
This method checks the 'result'' object is the correct type and whether
it is pickle/JSON serializable or not.
Expand Down

0 comments on commit dbc4d9d

Please sign in to comment.