Skip to content

Commit

Permalink
Add storage-provisioner check for gce during cleanup (#2431)
Browse files Browse the repository at this point in the history
* Additional checks for dynamic storage using sc provisioners
* Update metadata label for PV zone extraction
* Fetch zones from node selector terms
* Unit test coverage for fetching gce disks
  • Loading branch information
jeffbanks authored Aug 4, 2023
1 parent b00bcf7 commit 7287e32
Show file tree
Hide file tree
Showing 2 changed files with 244 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -33,6 +35,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.rpc.ResourceExhaustedException;
import com.google.auth.oauth2.GoogleCredentials;
Expand Down Expand Up @@ -603,36 +606,37 @@ private boolean deleteLeakedDisks(HashSet<GCEDisk> gceDisksInCluster)
return Utils.waitForSuccess(getNodeGroup().logger(), deleteDiskCommands) && gCloudListDisksJsonSucceeded;
}

/**
* Obtain a list of provisioners from storage class to identify if dynamic storage
* is being used for PVs. Otherwise, static storage will be left for these PVs.
*
* @param pvJson persistent volume(s) in json format from k8s
* @param gceDisks a set of disks managed by the caller.
* @return status of obtaining gce disks for target deletion.
*/
private boolean getGCEDisksFromPVJson(String pvJson, Set<GCEDisk> gceDisks)
{
try
{
final var rootNode = JsonUtils.getJsonNode(pvJson);

final List<String> provisioners = lookupDynamicProvisioners();
for (final var item : rootNode.at("/items").require())
{
// Ignore PVs that are not dynamic (https://kubernetes.io/docs/concepts/storage/dynamic-provisioning/),
// operating under the assumption that static PVs will be backed by disks containing
// data that should not be deleted between testruns (for example, test data).
final var createdBy = item.at("/metadata/annotations").get("kubernetes.io/createdby");
if (createdBy != null && createdBy.asText().equals("gce-pd-dynamic-provisioner"))

final var k8sCreatedBy = item.at("/metadata/annotations").get("kubernetes.io/createdby");
final var gkeCreatedBy = item.at("/metadata/annotations").get("storage.gke.io/createdby");
final var k8sProvisionedBy = item.at("/metadata/annotations").get("pv.kubernetes.io/provisioned-by");

if (k8sCreatedBy != null && provisioners.contains(k8sCreatedBy.asText()) ||
gkeCreatedBy != null && provisioners.contains(gkeCreatedBy.asText()) ||
k8sProvisionedBy != null && provisioners.contains(k8sProvisionedBy.asText()))
{
final var labels = item.at("/metadata/labels");
var zone = labels.get("topology.kubernetes.io/zone");
if (zone == null)
{
// Try the deprecated zone label
zone = labels.get("failure-domain.beta.kubernetes.io/zone");
}
if (zone == null)
{
throw new RuntimeException("Could not extract zone from PV list");
}
gceDisks.add(new GCEDisk(zone.asText(),
item.at("/spec/gcePersistentDisk/pdName").require().asText()));
applyZoneAndDiskNameFromItem(item, gceDisks);
}
}
}

catch (Throwable ex)
{
getNodeGroup().logger().error("Could not read all PVs from cluster", ex);
Expand Down Expand Up @@ -709,4 +713,109 @@ public Map<String, String> getKubernetesEnv()
env.put("CLOUDSDK_CORE_DISABLE_PROMPTS", "1");
return env;
}

/**
*
* @return string representation of k8s sc (dynamic) provisioners registered.
*/
private List<String> lookupDynamicProvisioners()
{

final List<String> provisioners = new ArrayList<>();
try
{
final var lookupProvisioners =
executeInKubernetesEnv(
"kubectl get sc -o=jsonpath='{range .items[*]}{.provisioner}{\";\"}'").buffered();

if (lookupProvisioners.doWait().forSuccess())
{
provisioners.addAll(Arrays.stream(lookupProvisioners.getStdout().split(";")).toList());
}
}
catch (Exception ex)
{
throw new RuntimeException("Failed to retrieve list of storage class provisioners due to exception.", ex);
}

// Add legacy dynamic provisioner for allowing older versions.
provisioners.add("gce-pd-dynamic-provisioner");

return provisioners;
}

protected void applyZoneAndDiskNameFromItem(JsonNode item, Set<GCEDisk> gceDisks)
{
final String name = fetchNameFrom(item);

if (name == null)
{
throw new RuntimeException("Unable to obtain name for PV");
}

final String zone = fetchZonesFromUsingLegacy(item);

if (zone == null)
{
final Set<String> zones = fetchZonesFrom(item);

for (var zn : zones)
{
gceDisks.add(new GCEDisk(zn, name));
}
}
}

protected String fetchNameFrom(JsonNode item)
{
var pdName = item.at("/metadata/name");
if (pdName == null)
{
// This gcePersistentDisk was deprecated, here for backward compat.
pdName = item.at("/spec/gcePersistentDisk/pdName");
}
if (pdName == null)
{
throw new RuntimeException("Could not extract name for PV");
}
return pdName.asText();
}

// Supports the deprecated zone label
protected String fetchZonesFromUsingLegacy(JsonNode item)
{
final var labels = item.at("/metadata/labels");
var zone = labels.get("failure-domain.beta.kubernetes.io/zone");

if (zone != null)
{
return zone.asText();
}
return null;
}

protected Set<String> fetchZonesFrom(JsonNode item)
{
final Set<String> zones = new HashSet<>();
var terms = item.at("/spec/nodeAffinity/required/nodeSelectorTerms");

for (var term : terms)
{
var expressions = term.at("/matchExpressions");
for (var expression : expressions)
{
var keyValue = expression.findValue("key").asText();

if (keyValue.equals("topology.gke.io/zone") || keyValue.equals("topology.kubernetes.io/zone"))
{
var listOfZones = expression.findValue("values");
for (var v : listOfZones)
{
zones.add(v.asText());
}
}
}
}
return zones;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright 2023 DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.fallout.components.kubernetes;

import java.util.Set;

import com.fasterxml.jackson.databind.JsonNode;
import org.junit.jupiter.api.Test;

import com.datastax.fallout.util.JsonUtils;

import static com.datastax.fallout.assertj.Assertions.assertThat;

public class JsonNodesTest
{

private static final String jsonPayload =
"""
{
"apiVersion": "v1",
"kind": "PersistentVolume",
"metadata": {
"annotations": {
"pv.kubernetes.io/provisioned-by": "pd.csi.storage.gke.io",
"volume.kubernetes.io/provisioner-deletion-secret-name": "",
"volume.kubernetes.io/provisioner-deletion-secret-namespace": ""
},
"creationTimestamp": "2023-08-03T16:50:53Z",
"finalizers": [
"kubernetes.io/pv-protection",
"external-attacher/pd-csi-storage-gke-io"
],
"name": "pvc-10f49bd8-d0bb-4b68-a0f6-6153b89c6fdb",
"resourceVersion": "6210",
"uid": "ab5f3de0-c9f4-4608-ab67-c7088dfc94e4"
},
"spec": {
"accessModes": [
"ReadWriteOnce"
],
"capacity": {
"storage": "20Gi"
},
"claimRef": {
"apiVersion": "v1",
"kind": "PersistentVolumeClaim",
"name": "pulsar-bookkeeper-journal-pulsar-bookkeeper-0",
"namespace": "mypulsar",
"resourceVersion": "6160",
"uid": "10f49bd8-d0bb-4b68-a0f6-6153b89c6fdb"
},
"csi": {
"driver": "pd.csi.storage.gke.io",
"fsType": "ext4",
"volumeAttributes": {
"storage.kubernetes.io/csiProvisionerIdentity": "1691080879534-8081-pd.csi.storage.gke.io"
},
"volumeHandle": "projects/datastax-gcp-pulsar/zones/us-central1-a/disks/pvc-10f49bd8-d0bb-4b68-a0f6-6153b89c6fdb"
},
"nodeAffinity": {
"required": {
"nodeSelectorTerms": [
{
"matchExpressions": [
{
"key": "topology.gke.io/zone",
"operator": "In",
"values": [
"us-central1-a", "us-centra1-c"
]
}
]
}
]
}
},
"persistentVolumeReclaimPolicy": "Delete",
"storageClassName": "standard-rwo",
"volumeMode": "Filesystem"
},
"status": {
"phase": "Bound"
}
}
""";

@Test
void gcePersistentVolumes()
{
final GoogleKubernetesEngineProvisioner gkep = new GoogleKubernetesEngineProvisioner();
final JsonNode item = JsonUtils.getJsonNode(jsonPayload);
final String name = gkep.fetchNameFrom(item);
assertThat(name).isNotEmpty();

final String zonesLegacy = gkep.fetchZonesFromUsingLegacy(item);
assertThat(zonesLegacy).isNull();

final Set<String> multipleZones = gkep.fetchZonesFrom(item);
assertThat(multipleZones).isNotEmpty();
assertThat(multipleZones.size()).isEqualTo(2);
}

}

0 comments on commit 7287e32

Please sign in to comment.