From 840ce992e5c54c5808364c9574168b1dfa679144 Mon Sep 17 00:00:00 2001 From: Basit Hasan Date: Sun, 6 Aug 2023 23:22:07 +0530 Subject: [PATCH] add leader election mechanism Signed-off-by: Basit Hasan --- cmd/controller/main.go | 78 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 70 insertions(+), 8 deletions(-) diff --git a/cmd/controller/main.go b/cmd/controller/main.go index f9f3aa0e12..4ee2775c33 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -4,18 +4,72 @@ package main import ( + "context" "flag" - "os" - "time" - + "github.com/go-logr/logr" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/klog/v2" + "os" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" + "time" ) // Version of kapp-controller is set via ldflags at build-time from the most recent git tag; see hack/build.sh var Version = "develop" +var ( + client *clientset.Clientset +) + +func getNewLock(lockname, podname, namespace string) *resourcelock.LeaseLock { + return &resourcelock.LeaseLock{ + LeaseMeta: metav1.ObjectMeta{ + Name: lockname, + Namespace: namespace, + }, + Client: client.CoordinationV1(), + LockConfig: resourcelock.ResourceLockConfig{ + Identity: podname, + }, + } +} + +func runLeaderElection(ctx context.Context, lock *resourcelock.LeaseLock, podname string, ctrlOpts Options, log logr.Logger) { + // Start the leader election for running kapp-controller + log.Info("Waiting for leader election") + leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ + Lock: lock, + ReleaseOnCancel: true, + LeaseDuration: 15 * time.Second, + RenewDeadline: 10 * time.Second, + RetryPeriod: 2 * time.Second, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(c context.Context) { + err := Run(ctrlOpts, log.WithName("controller")) + if err != nil { + klog.Errorf("Error while running as leader: %v", err) + } + }, + OnStoppedLeading: func() { + klog.Fatalf("no longer the leader, staying inactive.") + os.Exit(0) + }, + OnNewLeader: func(identity string) { + //Notify when a new leader is elected + if identity == podname { + return + } + klog.InfoS("new leader elected", "id", identity) + }, + }, + }) +} + func main() { ctrlOpts := Options{} var sidecarexec bool @@ -36,19 +90,27 @@ func main() { sidecarexecMain() return } - log := zap.New(zap.UseDevMode(false)).WithName("kc") logf.SetLogger(log) klog.SetLogger(log) - mainLog := log.WithName("main") mainLog.Info("kapp-controller", "version", Version) + var ( + leaseLockName string + leaseLockNamespace string + podName = os.Getenv("POD_NAME") + ) + config, err := rest.InClusterConfig() + client = clientset.NewForConfigOrDie(config) - err := Run(ctrlOpts, log.WithName("controller")) if err != nil { - mainLog.Error(err, "Exited run with error") - os.Exit(1) + klog.Fatalf("failed to get kubeconfig") } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + lock := getNewLock(leaseLockName, podName, leaseLockNamespace) + runLeaderElection(lock, ctx, podName, ctrlOpts, log) os.Exit(0) }