diff --git a/cmd/controller/leader_election.go b/cmd/controller/leader_election.go new file mode 100644 index 000000000..3af170ca0 --- /dev/null +++ b/cmd/controller/leader_election.go @@ -0,0 +1,64 @@ +// Copyright 2020 VMware, Inc. +// SPDX-License-Identifier: Apache-2.0 + +package main + +import ( + "context" + "github.com/go-logr/logr" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/klog/v2" + "os" + "time" +) + +var ( + client *clientset.Clientset +) + +func getNewLock(lockname, podname, namespace string) *resourcelock.LeaseLock { + return &resourcelock.LeaseLock{ + LeaseMeta: metav1.ObjectMeta{ + Name: lockname, + Namespace: namespace, + }, + Client: client.CoordinationV1(), + LockConfig: resourcelock.ResourceLockConfig{ + Identity: podname, + }, + } +} + +func runLeaderElection(ctx context.Context, lock *resourcelock.LeaseLock, podname string, ctrlOpts Options, log logr.Logger) { + // Start the leader election for running kapp-controller + log.Info("Waiting for leader election") + leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ + Lock: lock, + ReleaseOnCancel: true, + LeaseDuration: 15 * time.Second, + RenewDeadline: 10 * time.Second, + RetryPeriod: 2 * time.Second, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(c context.Context) { + err := Run(ctrlOpts, log.WithName("controller")) + if err != nil { + klog.Errorf("Error while running as leader: %v", err) + } + }, + OnStoppedLeading: func() { + klog.Fatalf("no longer the leader, staying inactive.") + os.Exit(0) + }, + OnNewLeader: func(identity string) { + //Notify when a new leader is elected + if identity == podname { + return + } + klog.InfoS("new leader elected", "id", identity) + }, + }, + }) +} diff --git a/cmd/controller/main.go b/cmd/controller/main.go index f9f3aa0e1..a11fb5357 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -4,13 +4,15 @@ package main import ( + "context" "flag" - "os" - "time" - + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" "k8s.io/klog/v2" + "os" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" + "time" ) // Version of kapp-controller is set via ldflags at build-time from the most recent git tag; see hack/build.sh @@ -36,19 +38,27 @@ func main() { sidecarexecMain() return } - log := zap.New(zap.UseDevMode(false)).WithName("kc") logf.SetLogger(log) klog.SetLogger(log) - mainLog := log.WithName("main") mainLog.Info("kapp-controller", "version", Version) + var ( + leaseLockName string + leaseLockNamespace string + podName = os.Getenv("POD_NAME") + ) + config, err := rest.InClusterConfig() + client = clientset.NewForConfigOrDie(config) - err := Run(ctrlOpts, log.WithName("controller")) if err != nil { - mainLog.Error(err, "Exited run with error") - os.Exit(1) + klog.Fatalf("failed to get kubeconfig") } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + lock := getNewLock(leaseLockName, podName, leaseLockNamespace) + runLeaderElection(ctx, lock, podName, ctrlOpts, log) os.Exit(0) }