Skip to content

Commit

Permalink
add leader election mechanism
Browse files Browse the repository at this point in the history
Signed-off-by: Basit Hasan <[email protected]>
  • Loading branch information
basit9958 committed Aug 6, 2023
1 parent ab79b03 commit 840ce99
Showing 1 changed file with 70 additions and 8 deletions.
78 changes: 70 additions & 8 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,72 @@
package main

import (
"context"
"flag"
"os"
"time"

"github.com/go-logr/logr"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/klog/v2"
"os"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"time"
)

// Version of kapp-controller is set via ldflags at build-time from the most recent git tag; see hack/build.sh
var Version = "develop"

var (
client *clientset.Clientset
)

func getNewLock(lockname, podname, namespace string) *resourcelock.LeaseLock {
return &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: lockname,
Namespace: namespace,
},
Client: client.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: podname,
},
}
}

func runLeaderElection(ctx context.Context, lock *resourcelock.LeaseLock, podname string, ctrlOpts Options, log logr.Logger) {
// Start the leader election for running kapp-controller
log.Info("Waiting for leader election")
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
ReleaseOnCancel: true,
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(c context.Context) {
err := Run(ctrlOpts, log.WithName("controller"))
if err != nil {
klog.Errorf("Error while running as leader: %v", err)
}
},
OnStoppedLeading: func() {
klog.Fatalf("no longer the leader, staying inactive.")
os.Exit(0)
},
OnNewLeader: func(identity string) {
//Notify when a new leader is elected
if identity == podname {
return
}
klog.InfoS("new leader elected", "id", identity)
},
},
})
}

func main() {
ctrlOpts := Options{}
var sidecarexec bool
Expand All @@ -36,19 +90,27 @@ func main() {
sidecarexecMain()
return
}

log := zap.New(zap.UseDevMode(false)).WithName("kc")
logf.SetLogger(log)
klog.SetLogger(log)

mainLog := log.WithName("main")
mainLog.Info("kapp-controller", "version", Version)
var (
leaseLockName string
leaseLockNamespace string
podName = os.Getenv("POD_NAME")
)
config, err := rest.InClusterConfig()
client = clientset.NewForConfigOrDie(config)

err := Run(ctrlOpts, log.WithName("controller"))
if err != nil {
mainLog.Error(err, "Exited run with error")
os.Exit(1)
klog.Fatalf("failed to get kubeconfig")
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

lock := getNewLock(leaseLockName, podName, leaseLockNamespace)
runLeaderElection(lock, ctx, podName, ctrlOpts, log)

os.Exit(0)
}

0 comments on commit 840ce99

Please sign in to comment.