Skip to content

Commit 00fb43a

Browse files
committed
feat(manager): add prestart hook support
When implementing a controller that uses leader election, there maybe be work that needs to be done after winning the election but before processing enqueued requests. For example, a controller may need to build up an internal mapping of the current state of the cluster before it can begin reconciling. This changeset adds support for adding prestart hooks to controller-runtime's manager implementation. This hook runs after the manager has been elected leader, immediately before the leader election controllers are started. Related #607
1 parent 351818d commit 00fb43a

File tree

3 files changed

+183
-0
lines changed

3 files changed

+183
-0
lines changed

pkg/manager/internal.go

+46
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ const (
5353
defaultRenewDeadline = 10 * time.Second
5454
defaultRetryPeriod = 2 * time.Second
5555
defaultGracefulShutdownPeriod = 30 * time.Second
56+
defaultHookPeriod = 15 * time.Second
5657

5758
defaultReadinessEndpoint = "/readyz"
5859
defaultLivenessEndpoint = "/healthz"
@@ -161,6 +162,13 @@ type controllerManager struct {
161162
// internalProceduresStop channel is used internally to the manager when coordinating
162163
// the proper shutdown of servers. This channel is also used for dependency injection.
163164
internalProceduresStop chan struct{}
165+
166+
// prestartHooks are functions that are run immediately before calling the Start functions
167+
// of the leader election runnables.
168+
prestartHooks []Runnable
169+
170+
// hookTimeout is the duration given to each hook to return successfully.
171+
hookTimeout time.Duration
164172
}
165173

166174
type hasCache interface {
@@ -217,6 +225,23 @@ func (cm *controllerManager) GetHTTPClient() *http.Client {
217225
return cm.cluster.GetHTTPClient()
218226
}
219227

228+
// Hook allows you to add hooks.
229+
func (cm *controllerManager) Hook(hook HookType, runnable Runnable) error {
230+
cm.Lock()
231+
defer cm.Unlock()
232+
233+
if cm.started {
234+
return fmt.Errorf("unable to add new hook because the manager has already been started")
235+
}
236+
237+
switch hook {
238+
case HookPrestartType:
239+
cm.prestartHooks = append(cm.prestartHooks, runnable)
240+
}
241+
242+
return nil
243+
}
244+
220245
func (cm *controllerManager) GetConfig() *rest.Config {
221246
return cm.cluster.GetConfig()
222247
}
@@ -554,6 +579,27 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e
554579
}
555580

556581
func (cm *controllerManager) startLeaderElectionRunnables() error {
582+
cm.logger.Info("Running prestart hooks")
583+
for _, hook := range cm.prestartHooks {
584+
var ctx context.Context
585+
var cancel context.CancelFunc
586+
587+
if cm.hookTimeout < 0 {
588+
ctx, cancel = context.WithCancel(cm.internalCtx)
589+
} else {
590+
ctx, cancel = context.WithTimeout(cm.internalCtx, cm.hookTimeout)
591+
}
592+
593+
if err := hook.Start(ctx); err != nil {
594+
cancel()
595+
return err
596+
}
597+
cancel()
598+
}
599+
600+
// All the prestart hooks have been run, clear the slice to free the underlying resources.
601+
cm.prestartHooks = nil
602+
557603
return cm.runnables.LeaderElection.Start(cm.internalCtx)
558604
}
559605

pkg/manager/manager.go

+22
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@ type Manager interface {
7373
// AddReadyzCheck allows you to add Readyz checker
7474
AddReadyzCheck(name string, check healthz.Checker) error
7575

76+
// Hook allows to add Runnables as hooks to modify the behavior.
77+
Hook(hook HookType, runnable Runnable) error
78+
7679
// Start starts all registered Controllers and blocks until the context is cancelled.
7780
// Returns an error if there is an error starting any controller.
7881
//
@@ -263,6 +266,10 @@ type Options struct {
263266
// +optional
264267
Controller config.Controller
265268

269+
// HookTimeout is the duration given to each hook to return successfully.
270+
// To use hooks without timeout, set to a negative duration, e.g. time.Duration(-1)
271+
HookTimeout *time.Duration
272+
266273
// makeBroadcaster allows deferring the creation of the broadcaster to
267274
// avoid leaking goroutines if we never call Start on this manager. It also
268275
// returns whether or not this is a "owned" broadcaster, and as such should be
@@ -277,6 +284,15 @@ type Options struct {
277284
newPprofListener func(addr string) (net.Listener, error)
278285
}
279286

287+
// HookType defines hooks for use with AddHook.
288+
type HookType int
289+
290+
const (
291+
// HookPrestartType defines a hook that is run after leader election and immediately before
292+
// calling Start on the runnables that needed leader election.
293+
HookPrestartType HookType = iota
294+
)
295+
280296
// BaseContextFunc is a function used to provide a base Context to Runnables
281297
// managed by a Manager.
282298
type BaseContextFunc func() context.Context
@@ -432,6 +448,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
432448
livenessEndpointName: options.LivenessEndpointName,
433449
pprofListener: pprofListener,
434450
gracefulShutdownTimeout: *options.GracefulShutdownTimeout,
451+
hookTimeout: *options.HookTimeout,
435452
internalProceduresStop: make(chan struct{}),
436453
leaderElectionStopped: make(chan struct{}),
437454
leaderElectionReleaseOnCancel: options.LeaderElectionReleaseOnCancel,
@@ -653,6 +670,11 @@ func setOptionsDefaults(options Options) Options {
653670
options.GracefulShutdownTimeout = &gracefulShutdownTimeout
654671
}
655672

673+
if options.HookTimeout == nil {
674+
hookTimeout := defaultHookPeriod
675+
options.HookTimeout = &hookTimeout
676+
}
677+
656678
if options.Logger.GetSink() == nil {
657679
options.Logger = log.Log
658680
}

pkg/manager/manager_test.go

+115
Original file line numberDiff line numberDiff line change
@@ -1207,6 +1207,121 @@ var _ = Describe("manger.Manager", func() {
12071207
Expect(time.Since(beforeDone)).To(BeNumerically(">=", 1500*time.Millisecond))
12081208
})
12091209

1210+
It("should run prestart hooks before calling Start on leader election runnables", func() {
1211+
m, err := New(cfg, options)
1212+
Expect(err).NotTo(HaveOccurred())
1213+
for _, cb := range callbacks {
1214+
cb(m)
1215+
}
1216+
1217+
runnableRan := make(chan struct{})
1218+
1219+
Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
1220+
close(runnableRan)
1221+
return nil
1222+
}))).ToNot(HaveOccurred())
1223+
1224+
Expect(m.Hook(HookPrestartType, RunnableFunc(func(ctx context.Context) error {
1225+
Expect(m.Elected()).ShouldNot(BeClosed())
1226+
Consistently(runnableRan).ShouldNot(BeClosed())
1227+
return nil
1228+
}))).ToNot(HaveOccurred())
1229+
1230+
ctx, cancel := context.WithCancel(context.Background())
1231+
defer cancel()
1232+
go func() {
1233+
defer GinkgoRecover()
1234+
Expect(m.Elected()).ShouldNot(BeClosed())
1235+
Expect(m.Start(ctx)).NotTo(HaveOccurred())
1236+
}()
1237+
1238+
<-m.Elected()
1239+
})
1240+
1241+
It("should run prestart hooks with timeout", func() {
1242+
m, err := New(cfg, options)
1243+
Expect(err).NotTo(HaveOccurred())
1244+
for _, cb := range callbacks {
1245+
cb(m)
1246+
}
1247+
m.(*controllerManager).hookTimeout = 1 * time.Nanosecond
1248+
1249+
Expect(m.Hook(HookPrestartType, RunnableFunc(func(ctx context.Context) error {
1250+
select {
1251+
case <-ctx.Done():
1252+
return ctx.Err()
1253+
case <-time.After(1 * time.Second):
1254+
return errors.New("prestart hook timeout exceeded expected")
1255+
}
1256+
}))).ToNot(HaveOccurred())
1257+
1258+
ctx, cancel := context.WithCancel(context.Background())
1259+
defer cancel()
1260+
1261+
Expect(m.Start(ctx)).Should(MatchError(context.DeadlineExceeded))
1262+
})
1263+
1264+
It("should run prestart hooks without timeout", func() {
1265+
m, err := New(cfg, options)
1266+
Expect(err).NotTo(HaveOccurred())
1267+
for _, cb := range callbacks {
1268+
cb(m)
1269+
}
1270+
m.(*controllerManager).hookTimeout = -1 * time.Second
1271+
1272+
Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
1273+
fmt.Println("runnable returning")
1274+
return nil
1275+
}))).ToNot(HaveOccurred())
1276+
1277+
Expect(m.Hook(HookPrestartType, RunnableFunc(func(ctx context.Context) error {
1278+
select {
1279+
case <-ctx.Done():
1280+
return ctx.Err()
1281+
case <-time.After(1 * time.Second):
1282+
fmt.Println("prestart hook returning")
1283+
return nil
1284+
}
1285+
}))).ToNot(HaveOccurred())
1286+
1287+
ctx, cancel := context.WithCancel(context.Background())
1288+
defer cancel()
1289+
1290+
go func() {
1291+
defer GinkgoRecover()
1292+
Expect(m.Elected()).ShouldNot(BeClosed())
1293+
Expect(m.Start(ctx)).NotTo(HaveOccurred())
1294+
}()
1295+
1296+
<-m.Elected()
1297+
})
1298+
1299+
It("should not run leader election runnables if prestart hooks fail", func() {
1300+
m, err := New(cfg, options)
1301+
Expect(err).NotTo(HaveOccurred())
1302+
for _, cb := range callbacks {
1303+
cb(m)
1304+
}
1305+
1306+
runnableRan := make(chan struct{})
1307+
1308+
Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
1309+
close(runnableRan)
1310+
return nil
1311+
}))).ToNot(HaveOccurred())
1312+
1313+
Expect(m.Hook(HookPrestartType, RunnableFunc(func(ctx context.Context) error {
1314+
Expect(m.Elected()).ShouldNot(BeClosed())
1315+
Consistently(runnableRan).ShouldNot(BeClosed())
1316+
return errors.New("prestart hook failed")
1317+
}))).ToNot(HaveOccurred())
1318+
1319+
ctx, cancel := context.WithCancel(context.Background())
1320+
defer cancel()
1321+
1322+
Expect(m.Elected()).ShouldNot(BeClosed())
1323+
Expect(m.Start(ctx)).Should(MatchError(ContainSubstring("prestart hook failed")))
1324+
})
12101325
}
12111326

12121327
Context("with defaults", func() {

0 commit comments

Comments
 (0)