Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CHG use distributed locks to distribute theia-poller job #401

Closed
wabscale opened this issue Sep 21, 2022 · 2 comments
Closed

CHG use distributed locks to distribute theia-poller job #401

wabscale opened this issue Sep 21, 2022 · 2 comments
Labels
backend feature New feature or request k8s

Comments

@wabscale
Copy link
Collaborator

wabscale commented Sep 21, 2022

The poller job is a job thats sole purpose is to make sure that the database entries for IDEs match the resources allocated. This means looking at the kubernetes resources allocated and updating IDE database entries. Right now only one of these runs at a time. It is pretty much the only piece of Anubis that is a single point of failure. If for whatever reason the poller goes away, IDE states will not be updated. This will make launching or stopping IDEs impossible (which would be a huge problem).

We'll be able to run more than a single instance of redis poller if we can use distributed locks to ensure consistency and synchronization.

redis distributed locks: https://redis.io/docs/reference/patterns/distributed-locks/
python lib: https://github.com/brainix/pottery#redlock
poller job right now: https://github.com/AnubisLMS/Anubis/blob/a5e3f004076c108f6ec8819f9ee7c099eb639fc1/api/anubis/jobs/ide_poller.py

@wabscale wabscale added feature New feature or request backend k8s labels Sep 21, 2022
@wabscale wabscale moved this to Todo in AnubisLMS Sep 21, 2022
@wabscale
Copy link
Collaborator Author

Basically as it is done here:

def reap_pipeline_jobs() -> int:
"""
Runs through all jobs in the namespace. If the job is finished, it will
send a request to the kube api to delete it. Number of active jobs is
returned.
:return: number of active jobs
"""
# Get the batch v1 object so we can query for active k8s jobs
batch_v1 = client.BatchV1Api()
# Get all pipeline jobs in the anubis namespace
jobs = get_active_pipeline_jobs()
# Get the autograde pipeline timeout from config
autograde_pipeline_timeout_minutes = get_config_int("AUTOGRADE_PIPELINE_TIMEOUT_MINUTES", default=5)
# Iterate through all pipeline jobs
for job in jobs:
job: client.V1Job
# If submission id not in labels just skip. Job ttl will delete itself.
if 'submission-id' not in job.metadata.labels:
logger.error(f'skipping job based off old label format: {job.metadata.name}')
continue
# Read submission id from labels
submission_id = job.metadata.labels['submission-id']
# Create a distributed lock for the submission job
lock = Redlock(
key=f'submission-job-{submission_id}',
masters={redis},
auto_release_time=3.0,
)
if not lock.acquire(blocking=False):
continue
# Log that we are inspecting the pipeline
logger.debug(f'inspecting pipeline: {job.metadata.name}')
# Get database record of the submission
submission: Submission = Submission.query.filter(
Submission.id == submission_id,
).first()
if submission is None:
logger.error(f"submission from db not found {submission_id}")
continue
# Calculate job created time
job_created = job.metadata.creation_timestamp.replace(tzinfo=None)
# Delete the job if it is older than a few minutes
if datetime.utcnow() - job_created > timedelta(minutes=autograde_pipeline_timeout_minutes):
# Attempt to delete the k8s job
reap_pipeline_job(job, submission)
# If the job has finished, and was marked as successful, then
# we can clean it up
elif job.status.succeeded is not None and job.status.succeeded >= 1:
# Attempt to delete the k8s job
reap_pipeline_job(job, submission)
lock.release()

Just need to lock the IDE resource as the submissions are locked here. We may want to formalize the way the locks are done here a bit. Maybe make a single function for creating the lock and releasing the lock. There is also a syncronize function in pottery. Maybe a decorator like this with argument based locking (like hash the values of the arguments to create the key).

>>> from pottery import synchronize
>>> @synchronize(key='synchronized-func', masters={redis}, auto_release_time=1.5, blocking=True, timeout=-1)
... def func():
...   # Only one thread can execute this function at a time.
...   return True
...
>>> func()
True
>>>

@wabscale
Copy link
Collaborator Author

53eca0b

Repository owner moved this from Todo to Done in AnubisLMS Sep 26, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backend feature New feature or request k8s
Projects
Status: Done
Development

No branches or pull requests

1 participant