Skip to content

Commit

Permalink
Remove the internal Pod map and use the controller runtime cache
Browse files Browse the repository at this point in the history
  • Loading branch information
johscheuer committed May 22, 2023
1 parent 2db2c18 commit 053761c
Show file tree
Hide file tree
Showing 21 changed files with 217 additions and 186 deletions.
17 changes: 17 additions & 0 deletions api/v1beta2/foundationdbcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,23 @@ func (processGroupStatus *ProcessGroupStatus) MarkForRemoval() {
processGroupStatus.RemovalTimestamp = &metav1.Time{Time: time.Now()}
}

// GetPodName returns the Pod name for the associated Process Group.
func (processGroupStatus *ProcessGroupStatus) GetPodName(cluster *FoundationDBCluster) string {
var sb strings.Builder
sb.WriteString(cluster.Name)
sb.WriteString("-")
// The Pod name will always be in the format ${cluster}-${process-class}-${id}. The ID is currently not available
// in the processGroupStatus without doing any parsing, so we have to use the Process Group ID, which might contain
// a prefix, so we take the part after the prefix, which will be ${process-class}-${id}.
sanitizedProcessGroup := strings.ReplaceAll(string(processGroupStatus.ProcessGroupID), "_", "-")
sanitizedProcessClass := strings.ReplaceAll(string(processGroupStatus.ProcessClass), "_", "-")

idx := strings.Index(sanitizedProcessGroup, sanitizedProcessClass)
sb.WriteString(sanitizedProcessGroup[idx:])

return sb.String()
}

// NeedsReplacement checks if the ProcessGroupStatus has conditions so that it should be removed
func (processGroupStatus *ProcessGroupStatus) NeedsReplacement(failureTime int, taintReplacementTime int) (bool, int64) {
var earliestFailureTime int64 = math.MaxInt64
Expand Down
48 changes: 48 additions & 0 deletions api/v1beta2/foundationdbcluster_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5018,4 +5018,52 @@ var _ = Describe("[api] FoundationDBCluster", func() {
},
}, true, false),
)

DescribeTable("when getting the Pod name for a Process group", func(cluster *FoundationDBCluster, processGroup *ProcessGroupStatus, expected string) {
Expect(processGroup.GetPodName(cluster)).To(Equal(expected))
}, Entry("when the process group has no prefix",
&FoundationDBCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "testing-cluster",
},
},
&ProcessGroupStatus{
ProcessGroupID: "storage-1",
ProcessClass: ProcessClassStorage,
},
"testing-cluster-storage-1"),
Entry("when the process group has a prefix",
&FoundationDBCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "testing-cluster",
},
},
&ProcessGroupStatus{
ProcessGroupID: "this-is-my-fancy-prefix-storage-1",
ProcessClass: ProcessClassStorage,
},
"testing-cluster-storage-1"),
Entry("when the process group has no prefix and the process class has an underscore",
&FoundationDBCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "testing-cluster",
},
},
&ProcessGroupStatus{
ProcessGroupID: "cluster-controller-1",
ProcessClass: ProcessClassClusterController,
},
"testing-cluster-cluster-controller-1"),
Entry("when the process group has a prefix and the process class has an underscore",
&FoundationDBCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "testing-cluster",
},
},
&ProcessGroupStatus{
ProcessGroupID: "this-is-my-fancy-prefix-cluster-controller-1",
ProcessClass: ProcessClassClusterController,
},
"testing-cluster-cluster-controller-1"),
)
})
18 changes: 10 additions & 8 deletions controllers/add_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,20 @@ func (a addPods) reconcile(ctx context.Context, r *FoundationDBClusterReconciler
return &requeue{curError: err}
}

pods, err := r.PodLifecycleManager.GetPods(ctx, r, cluster, internal.GetPodListOptions(cluster, "", "")...)
if err != nil {
return &requeue{curError: err}
}

podMap := internal.CreatePodMap(cluster, pods)

for _, processGroup := range cluster.Status.ProcessGroups {
if _, podExists := podMap[processGroup.ProcessGroupID]; podExists {
_, err := r.PodLifecycleManager.GetPod(ctx, r, cluster, processGroup.GetPodName(cluster))
// If no error is returned the Pod exists
if err == nil {
continue
}

// Ignore the is not found error, as we are checking here if we should create Pods.
if err != nil {
if !k8serrors.IsNotFound(err) {
return &requeue{curError: err}
}
}

// If this process group is marked for removal, we normally don't want to spin it back up
// again. However, in a downscaling scenario, it could be that this is a storage node that
// is still draining its data onto another one. Therefore, we only want to leave it off
Expand Down
14 changes: 4 additions & 10 deletions controllers/delete_pods_for_buggification.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (

corev1 "k8s.io/api/core/v1"

"github.com/FoundationDB/fdb-kubernetes-operator/internal"
"github.com/go-logr/logr"

fdbv1beta2 "github.com/FoundationDB/fdb-kubernetes-operator/api/v1beta2"
Expand All @@ -38,12 +37,6 @@ type deletePodsForBuggification struct{}
// reconcile runs the reconciler's work.
func (d deletePodsForBuggification) reconcile(ctx context.Context, r *FoundationDBClusterReconciler, cluster *fdbv1beta2.FoundationDBCluster) *requeue {
logger := log.WithValues("namespace", cluster.Namespace, "cluster", cluster.Name, "reconciler", "deletePodsForBuggification")
pods, err := r.PodLifecycleManager.GetPods(ctx, r, cluster, internal.GetPodListOptions(cluster, "", "")...)
if err != nil {
return &requeue{curError: err}
}

podMap := internal.CreatePodMap(cluster, pods)
crashLoopContainerProcessGroups := cluster.GetCrashLoopContainerProcessGroups()

noSchedulePods := make(map[fdbv1beta2.ProcessGroupID]fdbv1beta2.None, len(cluster.Spec.Buggify.NoSchedule))
Expand All @@ -59,8 +52,9 @@ func (d deletePodsForBuggification) reconcile(ctx context.Context, r *Foundation
continue
}

pod, ok := podMap[processGroup.ProcessGroupID]
if !ok || pod == nil {
pod, err := r.PodLifecycleManager.GetPod(ctx, r, cluster, processGroup.GetPodName(cluster))
// If a Pod is not found ignore it for now.
if err != nil {
logger.V(1).Info("Could not find Pod for process group ID",
"processGroupID", processGroup.ProcessGroupID)
continue
Expand Down Expand Up @@ -135,7 +129,7 @@ func (d deletePodsForBuggification) reconcile(ctx context.Context, r *Foundation
if len(updates) > 0 {
logger.Info("Deleting pods", "count", len(updates))
r.Recorder.Event(cluster, "Normal", "UpdatingPods", "Recreating pods for buggification")
err = r.PodLifecycleManager.UpdatePods(logr.NewContext(ctx, logger), r, cluster, updates, true)
err := r.PodLifecycleManager.UpdatePods(logr.NewContext(ctx, logger), r, cluster, updates, true)
if err != nil {
return &requeue{curError: err}
}
Expand Down
26 changes: 13 additions & 13 deletions controllers/generate_initial_cluster_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,28 +47,28 @@ func (g generateInitialClusterFile) reconcile(ctx context.Context, r *Foundation
logger.Info("Generating initial cluster file")
r.Recorder.Event(cluster, corev1.EventTypeNormal, "ChangingCoordinators", "Choosing initial coordinators")

initialPods := []*corev1.Pod{}
candidateClasses := cluster.GetEligibleCandidateClasses()
for _, candidateClass := range candidateClasses {
pods, err := r.PodLifecycleManager.GetPods(ctx, r, cluster, internal.GetPodListOptions(cluster, candidateClass, "")...)
if err != nil {
return &requeue{curError: err}
}
initialPods = append(initialPods, pods...)
processCounts, err := cluster.GetProcessCountsWithDefaults()
if err != nil {
return &requeue{curError: err}
}

podMap := internal.CreatePodMap(cluster, initialPods)
var pods = make([]*corev1.Pod, 0, len(initialPods))
var pods = make([]*corev1.Pod, 0, processCounts.Total())
for _, processGroup := range cluster.Status.ProcessGroups {
if processGroup.IsMarkedForRemoval() {
logger.V(1).Info("Ignore process group marked for removal",
"processGroupID", processGroup.ProcessGroupID)
continue
}

pod, ok := podMap[processGroup.ProcessGroupID]
if !ok {
logger.V(1).Info("Ignore process group with missing Pod",
// Ignore all process groups that are not eligible as a coordinator.
if !cluster.IsEligibleAsCandidate(processGroup.ProcessClass) {
continue
}

pod, err := r.PodLifecycleManager.GetPod(ctx, r, cluster, processGroup.GetPodName(cluster))
// If a Pod is not found ignore it for now.
if err != nil {
logger.V(1).Info("Could not find Pod for process group ID",
"processGroupID", processGroup.ProcessGroupID)
continue
}
Expand Down
13 changes: 3 additions & 10 deletions controllers/remove_incompatible_processes.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/go-logr/logr"

fdbv1beta2 "github.com/FoundationDB/fdb-kubernetes-operator/api/v1beta2"
"github.com/FoundationDB/fdb-kubernetes-operator/internal"
)

// removeIncompatibleProcesses is a reconciler that will restart incompatible fdbserver processes, this can happen
Expand Down Expand Up @@ -81,18 +80,12 @@ func processIncompatibleProcesses(ctx context.Context, r *FoundationDBClusterRec

logger.Info("incompatible connections", "incompatibleConnections", status.Cluster.IncompatibleConnections)

pods, err := r.PodLifecycleManager.GetPods(ctx, r, cluster, internal.GetPodListOptions(cluster, "", "")...)
if err != nil {
return err
}

podMap := internal.CreatePodMap(cluster, pods)

incompatibleConnections := parseIncompatibleConnections(logger, status)
incompatiblePods := make([]*corev1.Pod, 0, len(incompatibleConnections))
for _, processGroup := range cluster.Status.ProcessGroups {
pod, ok := podMap[processGroup.ProcessGroupID]
if !ok || pod == nil {
pod, err := r.PodLifecycleManager.GetPod(ctx, r, cluster, processGroup.GetPodName(cluster))
// If a Pod is not found ignore it for now.
if err != nil {
logger.V(1).Info("Could not find Pod for process group ID",
"processGroupID", processGroup.ProcessGroupID)
continue
Expand Down
24 changes: 12 additions & 12 deletions controllers/replace_failed_process_groups_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ var _ = Describe("replace_failed_process_groups", func() {
taintKeyStarDuration := int64(20)
taintKeyMaintenance := "foundationdb.org/maintenance"
taintKeyMaintenanceDuration := int64(10)
var allPods []*corev1.Pod

var allPvcs *corev1.PersistentVolumeClaimList
var podOnTaintedNode *corev1.Pod
var targetPodProcessGroupID fdbv1beta2.ProcessGroupID
Expand Down Expand Up @@ -117,9 +117,6 @@ var _ = Describe("replace_failed_process_groups", func() {
processMap[fdbv1beta2.ProcessGroupID(processID)] = append(processMap[fdbv1beta2.ProcessGroupID(processID)], process)
}

allPods, err = clusterReconciler.PodLifecycleManager.GetPods(ctx.TODO(), clusterReconciler, cluster, internal.GetPodListOptions(cluster, "", "")...)
Expect(err).NotTo(HaveOccurred())

allPvcs = &corev1.PersistentVolumeClaimList{}
err = clusterReconciler.List(ctx.TODO(), allPvcs, internal.GetPodListOptions(cluster, "", "")...)
Expect(err).NotTo(HaveOccurred())
Expand All @@ -139,7 +136,7 @@ var _ = Describe("replace_failed_process_groups", func() {
targetPodProcessGroupID = internal.GetProcessGroupIDFromMeta(cluster, podOnTaintedNode.ObjectMeta)

// Call validateProcessGroups to set processGroupStatus to tainted condition
processGroupsStatus, err := validateProcessGroups(ctx.TODO(), clusterReconciler, cluster, &cluster.Status, processMap, configMap, allPods, allPvcs, logger)
processGroupsStatus, err := validateProcessGroups(ctx.TODO(), clusterReconciler, cluster, &cluster.Status, processMap, configMap, allPvcs, logger)
Expect(err).NotTo(HaveOccurred())
Expect(len(processGroupsStatus)).To(BeNumerically(">", 4))
processGroup := processGroupsStatus[len(processGroupsStatus)-4]
Expand All @@ -159,7 +156,7 @@ var _ = Describe("replace_failed_process_groups", func() {
Expect(k8sClient.Update(ctx.TODO(), node)).NotTo(HaveOccurred())
log.Info("Taint node", "Node name", podOnTaintedNode.Name, "Node taints", node.Spec.Taints)

processGroupsStatus, err := validateProcessGroups(ctx.TODO(), clusterReconciler, cluster, &cluster.Status, processMap, configMap, allPods, allPvcs, logger)
processGroupsStatus, err := validateProcessGroups(ctx.TODO(), clusterReconciler, cluster, &cluster.Status, processMap, configMap, allPvcs, logger)
Expect(err).NotTo(HaveOccurred())
Expect(len(processGroupsStatus)).To(BeNumerically(">", 4))
targetProcessGroupStatus := fdbv1beta2.FindProcessGroupByID(cluster.Status.ProcessGroups, targetPodProcessGroupID)
Expand All @@ -183,7 +180,7 @@ var _ = Describe("replace_failed_process_groups", func() {
Expect(k8sClient.Update(ctx.TODO(), node)).NotTo(HaveOccurred())
log.Info("Taint node", "Node name", podOnTaintedNode.Name, "Node taints", node.Spec.Taints)

processGroupsStatus, err := validateProcessGroups(ctx.TODO(), clusterReconciler, cluster, &cluster.Status, processMap, configMap, allPods, allPvcs, logger)
processGroupsStatus, err := validateProcessGroups(ctx.TODO(), clusterReconciler, cluster, &cluster.Status, processMap, configMap, allPvcs, logger)
Expect(err).NotTo(HaveOccurred())
Expect(len(processGroupsStatus)).To(BeNumerically(">", 4))
targetProcessGroupStatus := fdbv1beta2.FindProcessGroupByID(cluster.Status.ProcessGroups, targetPodProcessGroupID)
Expand All @@ -194,7 +191,7 @@ var _ = Describe("replace_failed_process_groups", func() {
node.Spec.Taints = []corev1.Taint{}
err = k8sClient.Update(ctx.TODO(), node)
Expect(err).NotTo(HaveOccurred())
processGroupsStatus, err = validateProcessGroups(ctx.TODO(), clusterReconciler, cluster, &cluster.Status, processMap, configMap, allPods, allPvcs, logger)
processGroupsStatus, err = validateProcessGroups(ctx.TODO(), clusterReconciler, cluster, &cluster.Status, processMap, configMap, allPvcs, logger)
Expect(err).NotTo(HaveOccurred())
Expect(len(processGroupsStatus)).To(BeNumerically(">", 4))
targetProcessGroupStatus = fdbv1beta2.FindProcessGroupByID(cluster.Status.ProcessGroups, targetPodProcessGroupID)
Expand All @@ -217,7 +214,7 @@ var _ = Describe("replace_failed_process_groups", func() {
log.Info("Taint node", "Node name", podOnTaintedNode.Name, "Node taints", node.Spec.Taints, "TaintTime", node.Spec.Taints[0].TimeAdded.Time, "Now", time.Now())
Expect(k8sClient.Update(ctx.TODO(), node)).NotTo(HaveOccurred())

processGroupsStatus, err := validateProcessGroups(ctx.TODO(), clusterReconciler, cluster, &cluster.Status, processMap, configMap, allPods, allPvcs, logger)
processGroupsStatus, err := validateProcessGroups(ctx.TODO(), clusterReconciler, cluster, &cluster.Status, processMap, configMap, allPvcs, logger)
Expect(err).NotTo(HaveOccurred())
Expect(len(processGroupsStatus)).To(BeNumerically(">", 4))
targetProcessGroupStatus := fdbv1beta2.FindProcessGroupByID(cluster.Status.ProcessGroups, targetPodProcessGroupID)
Expand Down Expand Up @@ -249,7 +246,7 @@ var _ = Describe("replace_failed_process_groups", func() {
log.Info("Taint node", "Node name", podOnTaintedNode.Name, "Node taints", node.Spec.Taints, "TaintTime", node.Spec.Taints[0].TimeAdded.Time, "Now", time.Now())
Expect(k8sClient.Update(ctx.TODO(), node)).NotTo(HaveOccurred())

processGroupsStatus, err := validateProcessGroups(ctx.TODO(), clusterReconciler, cluster, &cluster.Status, processMap, configMap, allPods, allPvcs, logger)
processGroupsStatus, err := validateProcessGroups(ctx.TODO(), clusterReconciler, cluster, &cluster.Status, processMap, configMap, allPvcs, logger)
Expect(err).NotTo(HaveOccurred())
Expect(len(processGroupsStatus)).To(BeNumerically(">", 4))
targetProcessGroupStatus := fdbv1beta2.FindProcessGroupByID(cluster.Status.ProcessGroups, targetPodProcessGroupID)
Expand Down Expand Up @@ -287,7 +284,7 @@ var _ = Describe("replace_failed_process_groups", func() {
}
Expect(tainted).To(Equal(int64(0)))

processGroupsStatus, err := validateProcessGroups(ctx.TODO(), clusterReconciler, cluster, &cluster.Status, processMap, configMap, allPods, allPvcs, logger)
processGroupsStatus, err := validateProcessGroups(ctx.TODO(), clusterReconciler, cluster, &cluster.Status, processMap, configMap, allPvcs, logger)
Expect(err).NotTo(HaveOccurred())
Expect(len(processGroupsStatus)).To(BeNumerically(">", 4))
targetProcessGroupStatus := fdbv1beta2.FindProcessGroupByID(cluster.Status.ProcessGroups, targetPodProcessGroupID)
Expand Down Expand Up @@ -565,7 +562,11 @@ var _ = Describe("replace_failed_process_groups", func() {
When("multiple nodes are tainted", func() {
var taintedNodes []*corev1.Node
var setValidTaint bool

BeforeEach(func() {
allPods, err := clusterReconciler.PodLifecycleManager.GetPods(ctx.TODO(), clusterReconciler, cluster, internal.GetPodListOptions(cluster, "", "")...)
Expect(err).NotTo(HaveOccurred())

concurrentTaints := 2
Expect(len(allPods)).To(BeNumerically(">", concurrentTaints))
taintedNodesIndex := map[int]struct{}{}
Expand Down Expand Up @@ -628,7 +629,6 @@ var _ = Describe("replace_failed_process_groups", func() {
Expect(getRemovedProcessGroupIDs(cluster)).To(Equal([]fdbv1beta2.ProcessGroupID{}))
})
})

})

Context("with no missing processes", func() {
Expand Down
8 changes: 2 additions & 6 deletions controllers/replace_misconfigured_process_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,14 @@ type replaceMisconfiguredProcessGroups struct{}
func (c replaceMisconfiguredProcessGroups) reconcile(ctx context.Context, r *FoundationDBClusterReconciler, cluster *fdbv1beta2.FoundationDBCluster) *requeue {
logger := log.WithValues("namespace", cluster.Namespace, "cluster", cluster.Name, "reconciler", "replaceMisconfiguredProcessGroups")

// TODO(johscheuer): Remove the pvc map an make direct calls.
pvcs := &corev1.PersistentVolumeClaimList{}
err := r.List(ctx, pvcs, internal.GetPodListOptions(cluster, "", "")...)
if err != nil {
return &requeue{curError: err}
}

pods, err := r.PodLifecycleManager.GetPods(ctx, r, cluster, internal.GetPodListOptions(cluster, "", "")...)
if err != nil {
return &requeue{curError: err}
}

hasReplacements, err := replacements.ReplaceMisconfiguredProcessGroups(logger, cluster, internal.CreatePVCMap(cluster, pvcs), internal.CreatePodMap(cluster, pods))
hasReplacements, err := replacements.ReplaceMisconfiguredProcessGroups(ctx, r.PodLifecycleManager, r, logger, cluster, internal.CreatePVCMap(cluster, pvcs))
if err != nil {
return &requeue{curError: err}
}
Expand Down
6 changes: 2 additions & 4 deletions controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,8 @@ func TestAPIs(t *testing.T) {
var _ = BeforeSuite(func() {
logf.SetLogger(zap.New(zap.UseDevMode(true), zap.WriteTo(GinkgoWriter)))

err := scheme.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())
err = fdbv1beta2.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())
Expect(scheme.AddToScheme(scheme.Scheme)).NotTo(HaveOccurred())
Expect(fdbv1beta2.AddToScheme(scheme.Scheme)).NotTo(HaveOccurred())

// +kubebuilder:scaffold:scheme
k8sClient = mockclient.NewMockClient(scheme.Scheme)
Expand Down
15 changes: 5 additions & 10 deletions controllers/update_labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,9 @@ type updateLabels struct{}
// reconcile runs the reconciler's work.
func (updateLabels) reconcile(ctx context.Context, r *FoundationDBClusterReconciler, cluster *fdbtypes.FoundationDBCluster) *requeue {
logger := log.WithValues("namespace", cluster.Namespace, "cluster", cluster.Name, "reconciler", "updateLabels")
pods, err := r.PodLifecycleManager.GetPods(ctx, r, cluster, internal.GetPodListOptions(cluster, "", "")...)
if err != nil {
return &requeue{curError: err}
}
podMap := internal.CreatePodMap(cluster, pods)

// TODO(johscheuer): Remove the use of the pvc map and directly make a get request.
pvcs := &corev1.PersistentVolumeClaimList{}
err = r.List(ctx, pvcs, internal.GetPodListOptions(cluster, "", "")...)
err := r.List(ctx, pvcs, internal.GetPodListOptions(cluster, "", "")...)
if err != nil {
return &requeue{curError: err}
}
Expand All @@ -56,15 +51,15 @@ func (updateLabels) reconcile(ctx context.Context, r *FoundationDBClusterReconci
continue
}

pod, ok := podMap[processGroup.ProcessGroupID]
if ok {
pod, err := r.PodLifecycleManager.GetPod(ctx, r, cluster, processGroup.GetPodName(cluster))
if err == nil {
metadata := internal.GetPodMetadata(cluster, processGroup.ProcessClass, processGroup.ProcessGroupID, "")
if metadata.Annotations == nil {
metadata.Annotations = make(map[string]string, 1)
}

if !metadataCorrect(metadata, &pod.ObjectMeta) {
err = r.PodLifecycleManager.UpdateMetadata(ctx, r, cluster, pod)
err := r.PodLifecycleManager.UpdateMetadata(ctx, r, cluster, pod)
if err != nil {
return &requeue{curError: err}
}
Expand Down
Loading

0 comments on commit 053761c

Please sign in to comment.