-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathapplication.go
176 lines (151 loc) · 5.76 KB
/
application.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
package cli
import (
"context"
"fmt"
"time"
"github.com/streamingfast/shutter"
"go.uber.org/atomic"
"go.uber.org/zap"
)
// Application is a simple object that can be used to manage the lifecycle of an application. You
// create the application with `NewApplication` and then you can use it to supervise and start
// child processes by using `SuperviseAndStart`.
//
// You then wait for the application to terminate by calling `WaitForTermination`. This call is blocking
// and register a signal handler to be notified of SIGINT and SIGTERM.
type Application struct {
appCtx context.Context
shutter *shutter.Shutter
isSignaled *atomic.Bool
}
func NewApplication(ctx context.Context) *Application {
shutter := shutter.New()
appCtx, cancelApp := context.WithCancel(ctx)
shutter.OnTerminating(func(_ error) {
cancelApp()
})
return &Application{
appCtx: appCtx,
shutter: shutter,
isSignaled: atomic.NewBool(false),
}
}
// IsReady returns true if the application is ready to be used. When the Ctrl-C signal is received,
// the app is immediately marked as not ready and the `WaitForTermination` call will wait for the
// unreadyPeriodDelay to expire before terminating the application.
func (a *Application) IsReady() bool {
return !a.isSignaled.Load()
}
func (a *Application) Context() context.Context {
return a.appCtx
}
// Shutter interface over the `*shutter.Shutter` struct.
type Shutter interface {
OnTerminated(f func(error))
OnTerminating(f func(error))
Shutdown(error)
}
// Runnable contracts is to be blocking and to return only when the task is done.
type Runnable interface {
Run()
}
// RunnableError contracts is to be blocking and to return only when the task is done. We assume
// the error happens while bootstrapping the task.
type RunnableError interface {
Run() error
}
// RunnableContext contracts is to be blocking and to return only when the task is done. The context
// must be **used** only for the bootstrap period of task, long running task should be tied to a
// `*shutter.Shutter` instance.
type RunnableContext interface {
Run(ctx context.Context)
}
// RunnableContext contracts is to be blocking and to return only when the task is done. The context
// must be **used** only for the bootstrap period of task, long running task should be tied to a
// `*shutter.Shutter` instance. We assume the error happens while bootstrapping the task.
type RunnableContextError interface {
Run(ctx context.Context) error
}
// Supervise the received child shutter, mainly, this ensures that on child's termination,
// the application is also terminated with the error that caused the child to terminate.
//
// If the application shuts down before the child, the child is also terminated but
// gracefully (it does **not** receive the error that caused the application to terminate).
//
// The child termination is always performed before the application fully complete, unless
// the gracecul shutdown delay has expired.
func (a *Application) Supervise(child Shutter) {
child.OnTerminated(a.shutter.Shutdown)
a.shutter.OnTerminating(func(_ error) {
child.Shutdown(nil)
})
}
// SuperviseAndStart calls [Supervise] and then starts the child in a goroutine. The received
// child must implement one of [Runnable], [RunnableContext], [RunnableError] or [RunnableContextError] to
// be able to be started correctly.
//
// The child is started in a goroutine and tied to the application lifecycle because we also
// called [Supervise]. Later the call to `WaitForTermination` will wait for the application to
// terminate which will also terminates and wait for all child.
func (a *Application) SuperviseAndStart(child Shutter) {
a.Supervise(child)
switch v := child.(type) {
case Runnable:
go v.Run()
case RunnableContext:
go v.Run(a.appCtx)
case RunnableError:
go func() {
err := v.Run()
if err != nil {
child.Shutdown(err)
}
}()
case RunnableContextError:
go func() {
err := v.Run(a.appCtx)
if err != nil {
child.Shutdown(err)
}
}()
default:
panic(fmt.Errorf("unsupported child type %T, must implement one of cli.Runnable, cli.RunnableContext, cli.RunnableError or cli.RunnableContextError", child))
}
}
// WaitForTermination waits for the application to terminate. This first setup the signal handler and
// then wait for either the signal handler to be notified or the application to be terminating.
//
// On application terminating, all child registered with [Supervise] are also terminated. We then wait for
// all child to gracefully terminate. If the graceful shutdown delay is reached, we force the termination
// of the application right now.
//
// Doing Ctrl-C 4 times or more will lead to a force quit of the whole process by calling `os.Exit(1)`, this
// is performed by the signal handler code and is does **not** respect the graceful shutdown delay in this case.
func (a *Application) WaitForTermination(logger *zap.Logger, unreadyPeriodDelay, gracefulShutdownDelay time.Duration) error {
// On any exit path, we synchronize the logger one last time
defer func() {
logger.Sync()
}()
signalHandler, isSignaled, _ := SetupSignalHandler(unreadyPeriodDelay, logger)
// Wire the signal handler to the application
a.isSignaled = isSignaled
select {
case <-signalHandler:
go a.shutter.Shutdown(nil)
break
case <-a.shutter.Terminating():
logger.Info("run terminating", zap.Bool("from_signal", isSignaled.Load()), zap.Bool("with_error", a.shutter.Err() != nil))
break
}
logger.Info("waiting for run termination")
select {
case <-a.shutter.Terminated():
case <-time.After(gracefulShutdownDelay):
logger.Warn("application did not terminate within graceful period of " + gracefulShutdownDelay.String() + ", forcing termination")
}
if err := a.shutter.Err(); err != nil {
return err
}
logger.Info("run terminated gracefully")
return nil
}