Skip to content

Commit

Permalink
feat: Use apptainer for SingularityComputingElement and enhance debug…
Browse files Browse the repository at this point in the history
…ging"
  • Loading branch information
chrisburr committed Jan 22, 2025
1 parent 0fb01b2 commit b16c65b
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 75 deletions.
4 changes: 0 additions & 4 deletions dirac.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -639,10 +639,6 @@ Resources
# Default: /cvmfs/cernvm-prod.cern.ch/cvm4
ContainerRoot = /cvmfs/cernvm-prod.cern.ch/cvm4

# The binary to start the container
# default: singularity
ContainerBin = /opt/extras/bin/singularity

# List of directories to bind
ContainerBind = /etc/grid-security,someDir:::BoundHere

Expand Down
95 changes: 28 additions & 67 deletions src/DIRAC/Resources/Computing/SingularityComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@
See the Configuration/Resources/Computing documention for details on
where to set the option parameters.
"""

import io
import json
import os
import re
import shutil
import sys
import tempfile
from pathlib import Path

import DIRAC
from DIRAC import S_OK, S_ERROR, gConfig, gLogger
Expand Down Expand Up @@ -112,7 +111,6 @@ def __init__(self, ceUniqueID):
self.__root = self.ceParameters["ContainerRoot"]
self.__workdir = CONTAINER_WORKDIR
self.__innerdir = CONTAINER_INNERDIR
self.__singularityBin = "singularity"
self.__installDIRACInContainer = self.ceParameters.get("InstallDIRACInContainer", False)
if isinstance(self.__installDIRACInContainer, str) and self.__installDIRACInContainer.lower() in (
"false",
Expand All @@ -122,47 +120,6 @@ def __init__(self, ceUniqueID):

self.processors = int(self.ceParameters.get("NumberOfProcessors", 1))

def __hasUserNS(self):
"""Detect if this node has user namespaces enabled.
Returns True if they are enabled, False otherwise.
"""
try:
with open("/proc/sys/user/max_user_namespaces") as proc_fd:
maxns = int(proc_fd.readline().strip())
# Any "reasonable number" of namespaces is sufficient
return maxns > 100
except Exception:
# Any failure, missing file, doesn't contain a number, etc. and we
# assume they are disabled.
return False

def __hasSingularity(self):
"""Search the current PATH for an exectuable named singularity.
Returns True if it is found, False otherwise.
"""
if self.ceParameters.get("ContainerBin"):
binPath = self.ceParameters["ContainerBin"]
if os.path.isfile(binPath) and os.access(binPath, os.X_OK):
self.__singularityBin = binPath
self.log.debug(f'Use singularity from "{self.__singularityBin}"')
return True
if "PATH" not in os.environ:
return False # Hmm, PATH not set? How unusual...
searchPaths = os.environ["PATH"].split(os.pathsep)
# We can use CVMFS as a last resort if userNS is enabled
if self.__hasUserNS():
searchPaths.append(FALLBACK_SINGULARITY)
for searchPath in searchPaths:
binPath = os.path.join(searchPath, "singularity")
if os.path.isfile(binPath):
# File found, check it's executable to be certain:
if os.access(binPath, os.X_OK):
self.log.debug(f'Found singularity at "{binPath}"')
self.__singularityBin = binPath
return True
# No suitable binaries found
return False

@staticmethod
def __findInstallBaseDir():
"""Find the path to root of the current DIRAC installation"""
Expand Down Expand Up @@ -326,11 +283,12 @@ def __getEnv(self):
We blank almost everything to prevent contamination from the host system.
"""

if not self.__installDIRACInContainer:
payloadEnv = {k: v for k, v in os.environ.items() if ENV_VAR_WHITELIST.match(k)}
else:
if self.__installDIRACInContainer:
payloadEnv = {}
else:
payloadEnv = {k: v for k, v in os.environ.items() if ENV_VAR_WHITELIST.match(k)}

payloadEnv["PATH"] = Path(sys.executable).parent
payloadEnv["TMP"] = "/tmp"
payloadEnv["TMPDIR"] = "/tmp"
payloadEnv["X509_USER_PROXY"] = os.path.join(self.__innerdir, "proxy")
Expand Down Expand Up @@ -361,10 +319,6 @@ def submitJob(self, executableFile, proxy=None, **kwargs):
"""
rootImage = self.__root
renewTask = None
# Check that singularity is available
if not self.__hasSingularity():
self.log.error("Singularity is not installed on PATH.")
return S_ERROR("Failed to find singularity")

self.log.info("Creating singularity container")

Expand Down Expand Up @@ -396,19 +350,19 @@ def submitJob(self, executableFile, proxy=None, **kwargs):
# Mount /cvmfs in if it exists on the host
withCVMFS = os.path.isdir("/cvmfs")
innerCmd = os.path.join(self.__innerdir, "dirac_container.sh")
cmd = [self.__singularityBin, "exec"]
cmd.extend(["--contain"]) # use minimal /dev and empty other directories (e.g. /tmp and $HOME)
cmd.extend(["--ipc"]) # run container in a new IPC namespace
cmd.extend(["--workdir", baseDir]) # working directory to be used for /tmp, /var/tmp and $HOME
cmd.extend(["--home", "/tmp"]) # Avoid using small tmpfs for default $HOME and use scratch /tmp instead
if self.__hasUserNS():
cmd.append("--userns")
outerCmd = ["apptainer", "exec"]
outerCmd.extend(["--contain"]) # use minimal /dev and empty other directories (e.g. /tmp and $HOME)
outerCmd.extend(["--ipc"]) # run container in a new IPC namespace
outerCmd.extend(["--workdir", baseDir]) # working directory to be used for /tmp, /var/tmp and $HOME
outerCmd.extend(["--home", "/tmp"]) # Avoid using small tmpfs for default $HOME and use scratch /tmp instead
outerCmd.append("--userns")
if withCVMFS:
cmd.extend(["--bind", "/cvmfs"])
outerCmd.extend(["--bind", "/cvmfs"])
if not self.__installDIRACInContainer:
cmd.extend(["--bind", "{0}:{0}:ro".format(self.__findInstallBaseDir())])
outerCmd.extend(["--bind", "{0}:{0}:ro".format(self.__findInstallBaseDir())])

bindPaths = self.ceParameters.get("ContainerBind", "").split(",")
rawBindPaths = self.ceParameters.get("ContainerBind", "")
bindPaths = rawBindPaths.split(",") if rawBindPaths else []
siteName = gConfig.getValue("/LocalSite/Site", "")
ceName = gConfig.getValue("/LocalSite/GridCE", "")
if siteName and ceName:
Expand Down Expand Up @@ -441,20 +395,20 @@ def submitJob(self, executableFile, proxy=None, **kwargs):

for bindPath in bindPaths:
if len(bindPath.split(":::")) == 1:
cmd.extend(["--bind", bindPath.strip()])
outerCmd.extend(["--bind", bindPath.strip()])
elif len(bindPath.split(":::")) in [2, 3]:
cmd.extend(["--bind", ":".join([bp.strip() for bp in bindPath.split(":::")])])
outerCmd.extend(["--bind", ":".join([bp.strip() for bp in bindPath.split(":::")])])

if "ContainerOptions" in self.ceParameters:
containerOpts = self.ceParameters["ContainerOptions"].split(",")
for opt in containerOpts:
cmd.extend([opt.strip()])
if os.path.isdir(rootImage) or os.path.isfile(rootImage):
cmd.extend([rootImage, innerCmd])
else:
outerCmd.extend([opt.strip()])
if not (os.path.isdir(rootImage) or os.path.isfile(rootImage)):
# if we are here is because there's no image, or it is not accessible (e.g. not on CVMFS)
self.log.error("Singularity image to exec not found: ", rootImage)
return S_ERROR("Failed to find singularity image to exec")
outerCmd.append(rootImage)
cmd = outerCmd + [innerCmd]

self.log.debug(f"Execute singularity command: {cmd}")
self.log.debug(f"Execute singularity env: {self.__getEnv()}")
Expand All @@ -464,6 +418,13 @@ def submitJob(self, executableFile, proxy=None, **kwargs):

if not result["OK"]:
self.log.error("Fail to run Singularity", result["Message"])
# If we fail to run the container try to run it again with verbose output
# to help with debugging.
self.log.error("Singularity command was: ", cmd)
self.log.error(f"Singularity env was: {self.__getEnv()}")
debugCmd = [outerCmd[0], "--debug"] + outerCmd[1:] + ["echo", "All okay"]
self.log.error("Running with debug output to facilitate debugging", debugCmd)
result = systemCall(0, debugCmd, callbackFunction=self.sendOutput, env=self.__getEnv())
if proxy and renewTask:
gThreadScheduler.removeTask(renewTask)
self.__deleteWorkArea(baseDir)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
""" Test class for Job Agent
"""
import multiprocessing
import os
import pytest
import time
from unittest.mock import MagicMock
from concurrent.futures import ProcessPoolExecutor
from functools import partial

from DIRAC import gLogger, S_OK, S_ERROR
from DIRAC.Core.Security.X509Chain import X509Chain # pylint: disable=import-error
Expand Down Expand Up @@ -587,7 +589,7 @@ def test_submitJob(mocker, mockJWInput, expected):
("Pool/Singularity", jobScript % "1", (["Failed to find singularity"], []), ([], [])),
],
)
def test_submitAndCheckJob(mocker, localCE, job, expectedResult1, expectedResult2):
def test_submitAndCheckJob(monkeypatch, mocker, localCE, job, expectedResult1, expectedResult2):
"""Test the submission and the management of the job status."""
jobName = "testJob.py"
with open(jobName, "w") as execFile:
Expand All @@ -606,8 +608,14 @@ def test_submitAndCheckJob(mocker, localCE, job, expectedResult1, expectedResult
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.JobAgent._sendFailoverRequest", return_value=S_OK())
mocker.patch("DIRAC.Core.Security.X509Chain.X509Chain.dumpAllToString", return_value=S_OK())
mocker.patch(
"DIRAC.Resources.Computing.SingularityComputingElement.SingularityComputingElement._SingularityComputingElement__hasSingularity",
return_value=False,
"DIRAC.Resources.Computing.SingularityComputingElement.SingularityComputingElement.submitJob",
return_value=S_ERROR("Failed to find singularity"),
)
# We need to force ProcessPoolExecutor to use the fork context to enable the
# mocks to propagate to the subprocesses used by PoolComputingElement
mocker.patch(
"concurrent.futures.ProcessPoolExecutor",
side_effect=partial(ProcessPoolExecutor, mp_context=multiprocessing.get_context("fork")),
)

jobAgent = JobAgent("JobAgent", "Test")
Expand Down

0 comments on commit b16c65b

Please sign in to comment.