Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try to mitigate filling up the coordination directory #4749

Merged
merged 8 commits into from
Jan 22, 2024
10 changes: 7 additions & 3 deletions src/toil/batchSystems/cleanup_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,13 @@ def __init__(self, workerCleanupInfo: WorkerCleanupInfo) -> None:

def __enter__(self) -> None:
# Set up an arena so we know who is the last worker to leave
self.arena = LastProcessStandingArena(Toil.get_toil_coordination_dir(self.workerCleanupInfo.work_dir, self.workerCleanupInfo.coordination_dir),
self.workerCleanupInfo.workflow_id + '-cleanup')
self.arena = LastProcessStandingArena(
Toil.get_toil_coordination_dir(
self.workerCleanupInfo.work_dir,
self.workerCleanupInfo.coordination_dir
),
Toil.get_workflow_path_component(self.workerCleanupInfo.workflow_id) + "-cleanup"
)
logger.debug('Entering cleanup arena')
self.arena.enter()
logger.debug('Cleanup arena entered')
Expand All @@ -90,4 +95,3 @@ def __exit__(self, type: Optional[Type[BaseException]], value: Optional[BaseExce
# Now the coordination_dir is allowed to no longer exist on the node.
logger.debug('Cleanup arena left')


8 changes: 4 additions & 4 deletions src/toil/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -1299,7 +1299,7 @@ def get_toil_coordination_dir(cls, config_work_dir: Optional[str], config_coordi
return coordination_dir

@staticmethod
def _get_workflow_path_component(workflow_id: str) -> str:
def get_workflow_path_component(workflow_id: str) -> str:
"""
Get a safe filesystem path component for a workflow.

Expand All @@ -1308,7 +1308,7 @@ def _get_workflow_path_component(workflow_id: str) -> str:

:param workflow_id: The ID of the current Toil workflow.
"""
return str(uuid.uuid5(uuid.UUID(getNodeID()), workflow_id)).replace('-', '')
return "toilwf-" + str(uuid.uuid5(uuid.UUID(getNodeID()), workflow_id)).replace('-', '')

@classmethod
def getLocalWorkflowDir(
Expand All @@ -1325,7 +1325,7 @@ def getLocalWorkflowDir(

# Create a directory unique to each host in case workDir is on a shared FS.
# This prevents workers on different nodes from erasing each other's directories.
workflowDir: str = os.path.join(base, cls._get_workflow_path_component(workflowID))
workflowDir: str = os.path.join(base, cls.get_workflow_path_component(workflowID))
try:
# Directory creation is atomic
os.mkdir(workflowDir)
Expand Down Expand Up @@ -1367,7 +1367,7 @@ def get_local_workflow_coordination_dir(
base = cls.get_toil_coordination_dir(config_work_dir, config_coordination_dir)

# Make a per-workflow and node subdirectory
subdir = os.path.join(base, cls._get_workflow_path_component(workflow_id))
subdir = os.path.join(base, cls.get_workflow_path_component(workflow_id))
# Make it exist
os.makedirs(subdir, exist_ok=True)
# TODO: May interfere with workflow directory creation logging if it's the same directory.
Expand Down
5 changes: 4 additions & 1 deletion src/toil/fileStores/nonCachingFileStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,10 @@ def _createJobStateFile(self) -> str:
jobState = {'jobProcessName': get_process_name(self.coordination_dir),
'jobName': self.jobName,
'jobDir': self.localTempDir}
(fd, jobStateFile) = tempfile.mkstemp(suffix='.jobState.tmp', dir=self.coordination_dir)
try:
(fd, jobStateFile) = tempfile.mkstemp(suffix='.jobState.tmp', dir=self.coordination_dir)
except Exception as e:
raise RuntimeError("Could not make state file in " + self.coordination_dir) from e
with open(fd, 'wb') as fH:
# Write data
dill.dump(jobState, fH)
Expand Down
30 changes: 28 additions & 2 deletions src/toil/lib/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,17 +182,43 @@ def make_public_dir(in_directory: Optional[str] = None) -> str:
os.chmod(this_should_never_happen, 0o777)
return this_should_never_happen

def try_path(path: str) -> Optional[str]:
def try_path(path: str, min_size: int = 100 * 1024 * 1024) -> Optional[str]:
"""
Try to use the given path. Return it if it exists or can be made,
and we can make things within it, or None otherwise.

:param min_size: Reject paths on filesystems smaller than this many bytes.
"""

try:
os.makedirs(path, exist_ok=True)
except OSError:
# Maybe we lack permissions
return None
return path if os.path.exists(path) and os.access(path, os.W_OK) else None

if not os.path.exists(path):
# We didn't manage to make it
return None

if not os.access(path, os.W_OK):
# It doesn't look writable
return None

try:
stats = os.statvfs(path)
except OSError:
# Maybe we lack permissions
return None

# Is the filesystem big enough?
# We need to look at the FS size and not the free space so we don't change
# over to a different filesystem when this one fills up.
fs_size = stats.f_frsize * stats.f_blocks
if fs_size < min_size:
# Too small
return None

return path


class WriteWatchingStream:
Expand Down
7 changes: 6 additions & 1 deletion src/toil/lib/threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,9 +529,14 @@ def enter(self) -> None:
os.mkdir(self.lockfileDir)
except FileExistsError:
pass
except Exception as e:
raise RuntimeError("Could not make lock file directory " + self.lockfileDir) from e

# Make ourselves a file in it and lock it to prove we are alive.
self.lockfileFD, self.lockfileName = tempfile.mkstemp(dir=self.lockfileDir) # type: ignore
try:
self.lockfileFD, self.lockfileName = tempfile.mkstemp(dir=self.lockfileDir) # type: ignore
except Exception as e:
raise RuntimeError("Could not make lock file in " + self.lockfileDir) from e
# Nobody can see it yet, so lock it right away
fcntl.lockf(self.lockfileFD, fcntl.LOCK_EX) # type: ignore

Expand Down