Skip to content

Commit

Permalink
Use the same postfix for all volumes
Browse files Browse the repository at this point in the history
for one data object
  • Loading branch information
gregorjerse committed Nov 7, 2023
1 parent 6914292 commit d76fc39
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 9 deletions.
1 change: 1 addition & 0 deletions docs/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Fixed
-----
- Add default ordering on ``AnnotationValue`` model
- Create history object for ``Collection`` if none exists
- Use the same postfix for all kubernetes volumes for the given data object


===================
Expand Down
34 changes: 25 additions & 9 deletions resolwe/flow/managers/workload_connectors/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,8 @@ def get_upload_dir() -> str:
raise RuntimeError("No mountable upload connector is defined.")


def unique_volume_name(base_name: str, data_id: int) -> str:
def unique_volume_name(base_name: str, data_id: int, postfix: str) -> str:
"""Get unique persistent volume claim name."""
# Append random string to make it safe for restart.
postfix = "".join(random.choices(string.ascii_lowercase + string.digits, k=5))
return f"{base_name}-{data_id}-{postfix}"


Expand Down Expand Up @@ -124,6 +122,7 @@ def _initialize_variables(self):
"namespace", "default"
)
self.tools_path_prefix = Path("/usr/local/bin/resolwe")
self._random_postfixes = dict()

def _prepare_environment(
self,
Expand Down Expand Up @@ -243,14 +242,15 @@ def _volumes(
location_subpath: Path,
core_api: Any,
tools_configmaps: Dict[str, str],
random_postfix: str,
) -> list:
"""Prepare all volumes."""

def volume_from_config(volume_name: str, volume_config: dict):
"""Get configuration for kubernetes for given volume."""
claim_name = volume_config["config"]["name"]
if self._should_create_pvc(volume_config):
claim_name = unique_volume_name(claim_name, data_id)
claim_name = unique_volume_name(claim_name, data_id, random_postfix)
if volume_config["type"] == "persistent_volume":
return {
"name": volume_name,
Expand Down Expand Up @@ -600,13 +600,23 @@ def start(self, data: Data, listener_connection: Tuple[str, str, str]):
)
location_subpath = Path(data.location.subpath)

# Set random postfix string for volume and pod names.
random_postfix = "".join(
random.choices(string.ascii_lowercase + string.digits, k=5)
)

container_name_prefix = (
getattr(settings, "FLOW_EXECUTOR", {})
.get("CONTAINER_NAME_PREFIX", "resolwe")
.replace("_", "-")
.lower()
)
container_name = self._generate_container_name(container_name_prefix, data.pk)

container_name = self._generate_container_name(
container_name_prefix, data.pk, random_postfix
)

self._random_postfixes[data.pk] = random_postfix

# Set resource limits.
requests = dict()
Expand Down Expand Up @@ -729,7 +739,11 @@ def start(self, data: Data, listener_connection: Tuple[str, str, str]):
"affinity": {},
"hostNetwork": use_host_network,
"volumes": self._volumes(
data.id, location_subpath, core_api, tools_configmaps
data.id,
location_subpath,
core_api,
tools_configmaps,
random_postfix,
),
"initContainers": [
{
Expand Down Expand Up @@ -795,6 +809,7 @@ def start(self, data: Data, listener_connection: Tuple[str, str, str]):
claim_name = unique_volume_name(
storage_settings.FLOW_VOLUMES[processing_name]["config"]["name"],
data.id,
random_postfix,
)
claim_size = limits.pop("storage", 200) * (2**30) # Default 200 gibibytes
core_api.create_namespaced_persistent_volume_claim(
Expand All @@ -812,6 +827,7 @@ def start(self, data: Data, listener_connection: Tuple[str, str, str]):
claim_name = unique_volume_name(
storage_settings.FLOW_VOLUMES[input_name]["config"]["name"],
data.id,
random_postfix,
)
core_api.create_namespaced_persistent_volume_claim(
body=self._persistent_volume_claim(
Expand All @@ -834,14 +850,13 @@ def start(self, data: Data, listener_connection: Tuple[str, str, str]):
"It took {:.2f}s to send config to kubernetes".format(end_time - start_time)
)

def _generate_container_name(self, prefix: str, data_id: int):
def _generate_container_name(self, prefix: str, data_id: int, postfix: str):
"""Generate unique container name.
Name of the kubernetes container should contain only lower case
alpfanumeric characters and dashes. Underscores are not allowed.
"""
# Append random string to make it safe for restart.
postfix = "".join(random.choices(string.ascii_lowercase + string.digits, k=5))
return f"{prefix}-{data_id}-{postfix}"

def submit(self, data: Data, argv):
Expand Down Expand Up @@ -871,9 +886,10 @@ def cleanup(self, data_id: int):
logger.exception("Could not load the kubernetes configuration.")
raise exception

random_postfix = self._random_postfixes.pop(data_id)
core_api = kubernetes.client.CoreV1Api()
claim_names = [
unique_volume_name(type_, data_id)
unique_volume_name(type_, data_id, random_postfix)
for type_ in [
constants.PROCESSING_VOLUME_NAME,
constants.INPUTS_VOLUME_NAME,
Expand Down

0 comments on commit d76fc39

Please sign in to comment.