Skip to content

Commit

Permalink
Always delete associated jobs when deleting datasets
Browse files Browse the repository at this point in the history
  • Loading branch information
stijn-uva committed Nov 29, 2024
1 parent 8da18b3 commit 13b0668
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 67 deletions.
1 change: 1 addition & 0 deletions backend/lib/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,5 +239,6 @@ def request_interrupt(self, interrupt_level, job=None, remote_id=None, jobtype=N
time.sleep(0.25)

# now all queries are interrupted, formally request an abort
self.log.info(f"Requesting interrupt of job {worker.job.data['id']} ({worker.job.data['jobtype']}/{worker.job.data['remote_id']})")
worker.request_interrupt(interrupt_level)
return
28 changes: 25 additions & 3 deletions common/lib/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from common.config_manager import config
from common.lib.job import Job, JobNotFoundException
from common.lib.module_loader import ModuleCollector
from common.lib.helpers import get_software_commit, NullAwareTextIOWrapper, convert_to_int, get_software_version
from common.lib.helpers import get_software_commit, NullAwareTextIOWrapper, convert_to_int, get_software_version, call_api
from common.lib.item_mapping import MappedItem, MissingMappedField, DatasetItem
from common.lib.fourcat_module import FourcatModule
from common.lib.exceptions import (ProcessorInterruptedException, DataSetException, DataSetNotFoundException,
Expand Down Expand Up @@ -523,14 +523,14 @@ def copy(self, shallow=True):

return copy

def delete(self, commit=True):
def delete(self, commit=True, queue=None):
"""
Delete the dataset, and all its children
Deletes both database records and result files. Note that manipulating
a dataset object after it has been deleted is undefined behaviour.
:param commit bool: Commit SQL DELETE query?
:param bool commit: Commit SQL DELETE query?
"""
# first, recursively delete children
children = self.db.fetchall("SELECT * FROM datasets WHERE key_parent = %s", (self.key,))
Expand All @@ -542,6 +542,27 @@ def delete(self, commit=True):
# dataset already deleted - race condition?
pass

# delete any queued jobs for this dataset
try:
job = Job.get_by_remote_ID(self.key, self.db, self.type)
if job.is_claimed:
# tell API to stop any jobs running for this dataset
# level 2 = cancel job
# we're not interested in the result - if the API is available,
# it will do its thing, if it's not the backend is probably not
# running so the job also doesn't need to be interrupted
call_api(
"cancel-job",
{"remote_id": self.key, "jobtype": self.type, "level": 2},
False
)

# this deletes the job from the database
job.finish(True)

except JobNotFoundException:
pass

# delete from database
self.db.delete("datasets", where={"key": self.key}, commit=commit)
self.db.delete("datasets_owners", where={"key": self.key}, commit=commit)
Expand All @@ -554,6 +575,7 @@ def delete(self, commit=True):
self.get_results_path().with_suffix(".log").unlink()
if self.get_results_folder_path().exists():
shutil.rmtree(self.get_results_folder_path())

except FileNotFoundError:
# already deleted, apparently
pass
Expand Down
55 changes: 14 additions & 41 deletions common/lib/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,14 +522,16 @@ def get_4cat_canvas(path, width, height, header=None, footer="made with 4CAT", f
return canvas


def call_api(action, payload=None):
def call_api(action, payload=None, wait_for_response=True):
"""
Send message to server
Calls the internal API and returns interpreted response.
:param str action: API action
:param payload: API payload
:param bool wait_for_response: Wait for response? If not close connection
immediately after sending data.
:return: API response, or timeout message in case of timeout
"""
Expand All @@ -540,16 +542,17 @@ def call_api(action, payload=None):
msg = json.dumps({"request": action, "payload": payload})
connection.sendall(msg.encode("ascii", "ignore"))

try:
response = ""
while True:
bytes = connection.recv(2048)
if not bytes:
break
if wait_for_response:
try:
response = ""
while True:
bytes = connection.recv(2048)
if not bytes:
break

response += bytes.decode("ascii", "ignore")
except (socket.timeout, TimeoutError):
response = "(Connection timed out)"
response += bytes.decode("ascii", "ignore")
except (socket.timeout, TimeoutError):
response = "(Connection timed out)"

try:
connection.shutdown(socket.SHUT_RDWR)
Expand All @@ -559,7 +562,7 @@ def call_api(action, payload=None):
connection.close()

try:
return json.loads(response)
return json.loads(response) if wait_for_response else None
except json.JSONDecodeError:
return response

Expand Down Expand Up @@ -999,36 +1002,6 @@ def _sets_to_lists_gen(d):
return dict(_sets_to_lists_gen(d))


def url_to_hash(url, remove_scheme=True, remove_www=True):
"""
Convert a URL to a filename; some URLs are too long to be used as filenames, this keeps the domain and hashes the
rest of the URL.
"""
parsed_url = urlparse(url.lower())
if parsed_url:
if remove_scheme:
parsed_url = parsed_url._replace(scheme="")
if remove_www:
netloc = re.sub(r"^www\.", "", parsed_url.netloc)
parsed_url = parsed_url._replace(netloc=netloc)

url = re.sub(r"[^0-9a-z]+", "_", urlunparse(parsed_url).strip("/"))
else:
# Unable to parse URL; use regex
if remove_scheme:
url = re.sub(r"^https?://", "", url)
if remove_www:
if not remove_scheme:
scheme = re.match(r"^https?://", url).group()
temp_url = re.sub(r"^https?://", "", url)
url = scheme + re.sub(r"^www\.", "", temp_url)
else:
url = re.sub(r"^www\.", "", url)

url = re.sub(r"[^0-9a-z]+", "_", url.lower().strip("/"))

return hashlib.blake2b(url.encode("utf-8"), digest_size=24).hexdigest()

def folder_size(path='.'):
"""
Get the size of a folder using os.scandir for efficiency
Expand Down
23 changes: 0 additions & 23 deletions webtool/views/api_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -663,29 +663,6 @@ def delete_dataset(key=None):
if not config.get("privileges.admin.can_manipulate_all_datasets") and not dataset.is_accessible_by(current_user, "owner"):
return error(403, message="Not allowed")

# if there is an active or queued job for some child dataset, cancel and
# delete it
children = dataset.get_all_children()
for child in children:
try:
job = Job.get_by_remote_ID(child.key, database=db, jobtype=child.type)
call_api("cancel-job", {"remote_id": child.key, "jobtype": dataset.type, "level": BasicWorker.INTERRUPT_CANCEL})
job.finish()
except JobNotFoundException:
pass
except ConnectionRefusedError:
return error(500, message="The 4CAT backend is not available. Try again in a minute or contact the instance maintainer if the problem persists.")

# now cancel and delete the job for this one (if it exists)
try:
job = Job.get_by_remote_ID(dataset.key, database=db, jobtype=dataset.type)
call_api("cancel-job", {"remote_id": dataset.key, "jobtype": dataset.type, "level": BasicWorker.INTERRUPT_CANCEL})
except JobNotFoundException:
pass
except ConnectionRefusedError:
return error(500,
message="The 4CAT backend is not available. Try again in a minute or contact the instance maintainer if the problem persists.")

# do we have a parent?
parent_dataset = DataSet(key=dataset.key_parent, db=db, modules=fourcat_modules) if dataset.key_parent else None

Expand Down

0 comments on commit 13b0668

Please sign in to comment.