From 5d8950fc859188707ced178c34cec148b9a7ee08 Mon Sep 17 00:00:00 2001 From: Josh van Leeuwen Date: Wed, 6 Dec 2023 04:51:26 +0000 Subject: [PATCH] Adds Daprd option `--dapr-block-shutdown-duration` (#7268) * Adds Daprd option `--block-shutdown-seconds` Closes https://github.com/dapr/dapr/issues/4313 Docs: https://github.com/dapr/docs/pull/3893 PR adds the `--block-shutdown-seconds` CLI flag and corresponding `dapr.io/block-shutdown-seconds` Kubernetes annotation which configures Daprd to block the graceful shutdown procedure until _either_, the block shutdown seconds has elapsed _or_ the application has become unhealthy, as according to the normal app health status. By default, this option is unset, and therefore there is no effect to the current behaviour of graceful shutdown. When set, Daprd will block the interrupt signal cascading into runtime until the above requirements have been met. The framework process `Cleanup` order has been reversed to mimic `t.Cleanup` and allow the `logline` process to function correctly. Signed-off-by: joshvanl * Revert if check on killing process exec proc cleanup Signed-off-by: joshvanl * Revert error ignore of processes already killed in unix Signed-off-by: joshvanl * Skip shutdown/graceful/block/healthy on windows. * Skip shutdown/block/unhealthy test on windows. * Linting Signed-off-by: joshvanl * Updates `dapr-block-shutdown-seconds` to `dapr-block-shutdown-duration` Signed-off-by: joshvanl --------- Signed-off-by: joshvanl Co-authored-by: Loong Dai --- cmd/daprd/main.go | 2 + cmd/daprd/options/options.go | 8 + pkg/injector/annotations/annotations.go | 1 + pkg/injector/patcher/sidecar.go | 103 +++++----- pkg/injector/patcher/sidecar_container.go | 4 + .../patcher/sidecar_container_test.go | 21 ++ pkg/runtime/config.go | 7 +- pkg/runtime/config_test.go | 3 + pkg/runtime/runtime.go | 46 ++++- pkg/runtime/runtime_test.go | 121 ++++++++++++ tests/integration/framework/framework.go | 5 +- .../framework/process/daprd/daprd.go | 6 + .../framework/process/daprd/options.go | 14 ++ .../framework/process/exec/exec.go | 10 +- .../framework/process/logline/logline.go | 40 ++-- .../framework/process/logline/options.go | 8 +- tests/integration/suite/actors/http/ttl.go | 2 +- tests/integration/suite/daprd/daprd.go | 1 + .../suite/daprd/resources/uniquename.go | 6 +- .../suite/daprd/shutdown/block/app.go | 18 ++ .../suite/daprd/shutdown/block/app/healthy.go | 179 ++++++++++++++++++ .../daprd/shutdown/block/app/unhealthy.go | 159 ++++++++++++++++ .../suite/daprd/shutdown/block/timeout.go | 159 ++++++++++++++++ .../suite/daprd/shutdown/graceful/graceful.go | 65 +++++++ .../suite/daprd/shutdown/graceful/timeout.go | 106 +++++++++++ .../suite/daprd/shutdown/shutdown.go | 19 ++ 26 files changed, 1029 insertions(+), 84 deletions(-) create mode 100644 tests/integration/suite/daprd/shutdown/block/app.go create mode 100644 tests/integration/suite/daprd/shutdown/block/app/healthy.go create mode 100644 tests/integration/suite/daprd/shutdown/block/app/unhealthy.go create mode 100644 tests/integration/suite/daprd/shutdown/block/timeout.go create mode 100644 tests/integration/suite/daprd/shutdown/graceful/graceful.go create mode 100644 tests/integration/suite/daprd/shutdown/graceful/timeout.go create mode 100644 tests/integration/suite/daprd/shutdown/shutdown.go diff --git a/cmd/daprd/main.go b/cmd/daprd/main.go index eb7b41bff5a..3f57b5fc54e 100644 --- a/cmd/daprd/main.go +++ b/cmd/daprd/main.go @@ -150,6 +150,7 @@ func main() { UnixDomainSocket: opts.UnixDomainSocket, DaprHTTPReadBufferSize: opts.DaprHTTPReadBufferSize, DaprGracefulShutdownSeconds: opts.DaprGracefulShutdownSeconds, + DaprBlockShutdownDuration: opts.DaprBlockShutdownDuration, DisableBuiltinK8sSecretStore: opts.DisableBuiltinK8sSecretStore, EnableAppHealthCheck: opts.EnableAppHealthCheck, AppHealthCheckPath: opts.AppHealthCheckPath, @@ -175,4 +176,5 @@ func main() { if err != nil { log.Fatalf("Fatal error from runtime: %s", err) } + log.Info("Daprd shutdown gracefully") } diff --git a/cmd/daprd/options/options.go b/cmd/daprd/options/options.go index 224c52b370a..98ee81edaf8 100644 --- a/cmd/daprd/options/options.go +++ b/cmd/daprd/options/options.go @@ -57,6 +57,7 @@ type Options struct { DaprPublicPort string AppPort string DaprGracefulShutdownSeconds int + DaprBlockShutdownDuration *time.Duration PlacementServiceHostAddr string DaprAPIListenAddresses string AppHealthProbeInterval int @@ -79,6 +80,8 @@ func New(args []string) *Options { EnableAPILogging: new(bool), } + var blockShutdownDuration time.Duration + flag.StringVar(&opts.Mode, "mode", string(modes.StandaloneMode), "Runtime mode for Dapr") flag.StringVar(&opts.DaprHTTPPort, "dapr-http-port", strconv.Itoa(runtime.DefaultDaprHTTPPort), "HTTP port for Dapr API to listen on") flag.StringVar(&opts.DaprAPIListenAddresses, "dapr-listen-addresses", runtime.DefaultAPIListenAddress, "One or more addresses for the Dapr API to listen on, CSV limited") @@ -109,6 +112,7 @@ func New(args []string) *Options { flag.StringVar(&opts.UnixDomainSocket, "unix-domain-socket", "", "Path to a unix domain socket dir mount. If specified, Dapr API servers will use Unix Domain Sockets") flag.IntVar(&opts.DaprHTTPReadBufferSize, "dapr-http-read-buffer-size", runtime.DefaultReadBufferSize, "Increasing max size of read buffer in KB to handle sending multi-KB headers") flag.IntVar(&opts.DaprGracefulShutdownSeconds, "dapr-graceful-shutdown-seconds", int(runtime.DefaultGracefulShutdownDuration/time.Second), "Graceful shutdown time in seconds") + flag.DurationVar(&blockShutdownDuration, "dapr-block-shutdown-duration", 0, "If enabled, will block graceful shutdown after terminate signal is received until either the given duration has elapsed or the app reports unhealthy. Disabled by default") flag.BoolVar(opts.EnableAPILogging, "enable-api-logging", false, "Enable API logging for API calls") flag.BoolVar(&opts.DisableBuiltinK8sSecretStore, "disable-builtin-k8s-secret-store", false, "Disable the built-in Kubernetes Secret Store") flag.BoolVar(&opts.EnableAppHealthCheck, "enable-app-health-check", false, "Enable health checks for the application using the protocol defined with app-protocol") @@ -150,6 +154,10 @@ func New(args []string) *Options { } } + if isFlagPassed("dapr-block-shutdown-duration") { + opts.DaprBlockShutdownDuration = &blockShutdownDuration + } + return &opts } diff --git a/pkg/injector/annotations/annotations.go b/pkg/injector/annotations/annotations.go index 76509f386dc..579d4bee1f2 100644 --- a/pkg/injector/annotations/annotations.go +++ b/pkg/injector/annotations/annotations.go @@ -53,6 +53,7 @@ const ( KeyHTTPMaxRequestSize = "dapr.io/http-max-request-size" KeyHTTPReadBufferSize = "dapr.io/http-read-buffer-size" KeyGracefulShutdownSeconds = "dapr.io/graceful-shutdown-seconds" + KeyBlockShutdownDuration = "dapr.io/block-shutdown-duration" KeyEnableAPILogging = "dapr.io/enable-api-logging" KeyUnixDomainSocketPath = "dapr.io/unix-domain-socket-path" KeyVolumeMountsReadOnly = "dapr.io/volume-mounts" diff --git a/pkg/injector/patcher/sidecar.go b/pkg/injector/patcher/sidecar.go index 408aa035887..d2617278dc5 100644 --- a/pkg/injector/patcher/sidecar.go +++ b/pkg/injector/patcher/sidecar.go @@ -56,57 +56,58 @@ type SidecarConfig struct { SidecarInternalGRPCPort int32 `default:"50002"` SidecarPublicPort int32 `default:"3501"` - Enabled bool `annotation:"dapr.io/enabled"` - AppPort int32 `annotation:"dapr.io/app-port"` - Config string `annotation:"dapr.io/config"` - AppProtocol string `annotation:"dapr.io/app-protocol" default:"http"` - AppSSL bool `annotation:"dapr.io/app-ssl"` // TODO: Deprecated in Dapr 1.11; remove in a future Dapr version - AppID string `annotation:"dapr.io/app-id"` - EnableProfiling bool `annotation:"dapr.io/enable-profiling"` - LogLevel string `annotation:"dapr.io/log-level" default:"info"` - APITokenSecret string `annotation:"dapr.io/api-token-secret"` - AppTokenSecret string `annotation:"dapr.io/app-token-secret"` - LogAsJSON bool `annotation:"dapr.io/log-as-json"` - AppMaxConcurrency *int `annotation:"dapr.io/app-max-concurrency"` - EnableMetrics bool `annotation:"dapr.io/enable-metrics" default:"true"` - SidecarMetricsPort int32 `annotation:"dapr.io/metrics-port" default:"9090"` - EnableDebug bool `annotation:"dapr.io/enable-debug" default:"false"` - SidecarDebugPort int32 `annotation:"dapr.io/debug-port" default:"40000"` - Env string `annotation:"dapr.io/env"` - SidecarCPURequest string `annotation:"dapr.io/sidecar-cpu-request"` - SidecarCPULimit string `annotation:"dapr.io/sidecar-cpu-limit"` - SidecarMemoryRequest string `annotation:"dapr.io/sidecar-memory-request"` - SidecarMemoryLimit string `annotation:"dapr.io/sidecar-memory-limit"` - SidecarListenAddresses string `annotation:"dapr.io/sidecar-listen-addresses" default:"[::1],127.0.0.1"` - SidecarLivenessProbeDelaySeconds int32 `annotation:"dapr.io/sidecar-liveness-probe-delay-seconds" default:"3"` - SidecarLivenessProbeTimeoutSeconds int32 `annotation:"dapr.io/sidecar-liveness-probe-timeout-seconds" default:"3"` - SidecarLivenessProbePeriodSeconds int32 `annotation:"dapr.io/sidecar-liveness-probe-period-seconds" default:"6"` - SidecarLivenessProbeThreshold int32 `annotation:"dapr.io/sidecar-liveness-probe-threshold" default:"3"` - SidecarReadinessProbeDelaySeconds int32 `annotation:"dapr.io/sidecar-readiness-probe-delay-seconds" default:"3"` - SidecarReadinessProbeTimeoutSeconds int32 `annotation:"dapr.io/sidecar-readiness-probe-timeout-seconds" default:"3"` - SidecarReadinessProbePeriodSeconds int32 `annotation:"dapr.io/sidecar-readiness-probe-period-seconds" default:"6"` - SidecarReadinessProbeThreshold int32 `annotation:"dapr.io/sidecar-readiness-probe-threshold" default:"3"` - SidecarImage string `annotation:"dapr.io/sidecar-image"` - SidecarSeccompProfileType string `annotation:"dapr.io/sidecar-seccomp-profile-type"` - HTTPMaxRequestSize *int `annotation:"dapr.io/http-max-request-size"` - HTTPReadBufferSize *int `annotation:"dapr.io/http-read-buffer-size"` - GracefulShutdownSeconds int `annotation:"dapr.io/graceful-shutdown-seconds" default:"-1"` - EnableAPILogging *bool `annotation:"dapr.io/enable-api-logging"` - UnixDomainSocketPath string `annotation:"dapr.io/unix-domain-socket-path"` - VolumeMounts string `annotation:"dapr.io/volume-mounts"` - VolumeMountsRW string `annotation:"dapr.io/volume-mounts-rw"` - DisableBuiltinK8sSecretStore bool `annotation:"dapr.io/disable-builtin-k8s-secret-store"` - EnableAppHealthCheck bool `annotation:"dapr.io/enable-app-health-check"` - AppHealthCheckPath string `annotation:"dapr.io/app-health-check-path" default:"/healthz"` - AppHealthProbeInterval int32 `annotation:"dapr.io/app-health-probe-interval" default:"5"` // In seconds - AppHealthProbeTimeout int32 `annotation:"dapr.io/app-health-probe-timeout" default:"500"` // In milliseconds - AppHealthThreshold int32 `annotation:"dapr.io/app-health-threshold" default:"3"` - PlacementAddress string `annotation:"dapr.io/placement-host-address"` - PluggableComponents string `annotation:"dapr.io/pluggable-components"` - PluggableComponentsSocketsFolder string `annotation:"dapr.io/pluggable-components-sockets-folder"` - ComponentContainer string `annotation:"dapr.io/component-container"` - InjectPluggableComponents bool `annotation:"dapr.io/inject-pluggable-components"` - AppChannelAddress string `annotation:"dapr.io/app-channel-address"` + Enabled bool `annotation:"dapr.io/enabled"` + AppPort int32 `annotation:"dapr.io/app-port"` + Config string `annotation:"dapr.io/config"` + AppProtocol string `annotation:"dapr.io/app-protocol" default:"http"` + AppSSL bool `annotation:"dapr.io/app-ssl"` // TODO: Deprecated in Dapr 1.11; remove in a future Dapr version + AppID string `annotation:"dapr.io/app-id"` + EnableProfiling bool `annotation:"dapr.io/enable-profiling"` + LogLevel string `annotation:"dapr.io/log-level" default:"info"` + APITokenSecret string `annotation:"dapr.io/api-token-secret"` + AppTokenSecret string `annotation:"dapr.io/app-token-secret"` + LogAsJSON bool `annotation:"dapr.io/log-as-json"` + AppMaxConcurrency *int `annotation:"dapr.io/app-max-concurrency"` + EnableMetrics bool `annotation:"dapr.io/enable-metrics" default:"true"` + SidecarMetricsPort int32 `annotation:"dapr.io/metrics-port" default:"9090"` + EnableDebug bool `annotation:"dapr.io/enable-debug" default:"false"` + SidecarDebugPort int32 `annotation:"dapr.io/debug-port" default:"40000"` + Env string `annotation:"dapr.io/env"` + SidecarCPURequest string `annotation:"dapr.io/sidecar-cpu-request"` + SidecarCPULimit string `annotation:"dapr.io/sidecar-cpu-limit"` + SidecarMemoryRequest string `annotation:"dapr.io/sidecar-memory-request"` + SidecarMemoryLimit string `annotation:"dapr.io/sidecar-memory-limit"` + SidecarListenAddresses string `annotation:"dapr.io/sidecar-listen-addresses" default:"[::1],127.0.0.1"` + SidecarLivenessProbeDelaySeconds int32 `annotation:"dapr.io/sidecar-liveness-probe-delay-seconds" default:"3"` + SidecarLivenessProbeTimeoutSeconds int32 `annotation:"dapr.io/sidecar-liveness-probe-timeout-seconds" default:"3"` + SidecarLivenessProbePeriodSeconds int32 `annotation:"dapr.io/sidecar-liveness-probe-period-seconds" default:"6"` + SidecarLivenessProbeThreshold int32 `annotation:"dapr.io/sidecar-liveness-probe-threshold" default:"3"` + SidecarReadinessProbeDelaySeconds int32 `annotation:"dapr.io/sidecar-readiness-probe-delay-seconds" default:"3"` + SidecarReadinessProbeTimeoutSeconds int32 `annotation:"dapr.io/sidecar-readiness-probe-timeout-seconds" default:"3"` + SidecarReadinessProbePeriodSeconds int32 `annotation:"dapr.io/sidecar-readiness-probe-period-seconds" default:"6"` + SidecarReadinessProbeThreshold int32 `annotation:"dapr.io/sidecar-readiness-probe-threshold" default:"3"` + SidecarImage string `annotation:"dapr.io/sidecar-image"` + SidecarSeccompProfileType string `annotation:"dapr.io/sidecar-seccomp-profile-type"` + HTTPMaxRequestSize *int `annotation:"dapr.io/http-max-request-size"` + HTTPReadBufferSize *int `annotation:"dapr.io/http-read-buffer-size"` + GracefulShutdownSeconds int `annotation:"dapr.io/graceful-shutdown-seconds" default:"-1"` + BlockShutdownDuration *string `annotation:"dapr.io/block-shutdown-duration"` + EnableAPILogging *bool `annotation:"dapr.io/enable-api-logging"` + UnixDomainSocketPath string `annotation:"dapr.io/unix-domain-socket-path"` + VolumeMounts string `annotation:"dapr.io/volume-mounts"` + VolumeMountsRW string `annotation:"dapr.io/volume-mounts-rw"` + DisableBuiltinK8sSecretStore bool `annotation:"dapr.io/disable-builtin-k8s-secret-store"` + EnableAppHealthCheck bool `annotation:"dapr.io/enable-app-health-check"` + AppHealthCheckPath string `annotation:"dapr.io/app-health-check-path" default:"/healthz"` + AppHealthProbeInterval int32 `annotation:"dapr.io/app-health-probe-interval" default:"5"` // In seconds + AppHealthProbeTimeout int32 `annotation:"dapr.io/app-health-probe-timeout" default:"500"` // In milliseconds + AppHealthThreshold int32 `annotation:"dapr.io/app-health-threshold" default:"3"` + PlacementAddress string `annotation:"dapr.io/placement-host-address"` + PluggableComponents string `annotation:"dapr.io/pluggable-components"` + PluggableComponentsSocketsFolder string `annotation:"dapr.io/pluggable-components-sockets-folder"` + ComponentContainer string `annotation:"dapr.io/component-container"` + InjectPluggableComponents bool `annotation:"dapr.io/inject-pluggable-components"` + AppChannelAddress string `annotation:"dapr.io/app-channel-address"` pod *corev1.Pod } diff --git a/pkg/injector/patcher/sidecar_container.go b/pkg/injector/patcher/sidecar_container.go index 25956ffa139..5ffd071e292 100644 --- a/pkg/injector/patcher/sidecar_container.go +++ b/pkg/injector/patcher/sidecar_container.go @@ -169,6 +169,10 @@ func (c *SidecarConfig) getSidecarContainer(opts getSidecarContainerOpts) (*core args = append(args, "--unix-domain-socket", injectorConsts.UnixDomainSocketDaprdPath) } + if c.BlockShutdownDuration != nil { + args = append(args, "--dapr-block-shutdown-duration", *c.BlockShutdownDuration) + } + // When debugging is enabled, we need to override the command and the flags if c.EnableDebug { ports = append(ports, corev1.ContainerPort{ diff --git a/pkg/injector/patcher/sidecar_container_test.go b/pkg/injector/patcher/sidecar_container_test.go index 486e0e65068..c195665d3e2 100644 --- a/pkg/injector/patcher/sidecar_container_test.go +++ b/pkg/injector/patcher/sidecar_container_test.go @@ -576,6 +576,27 @@ func TestGetSidecarContainer(t *testing.T) { }, })) + t.Run("block shutdown duration", testSuiteGenerator([]testCase{ + { + name: "default to empty", + annotations: map[string]string{}, + assertFn: func(t *testing.T, container *corev1.Container) { + args := strings.Join(container.Args, " ") + assert.NotContains(t, args, "--dapr-block-shutdown-duration") + }, + }, + { + name: "add a block shutdown duration", + annotations: map[string]string{ + "dapr.io/block-shutdown-duration": "3s", + }, + assertFn: func(t *testing.T, container *corev1.Container) { + args := strings.Join(container.Args, " ") + assert.Contains(t, args, "--dapr-block-shutdown-duration 3s") + }, + }, + })) + t.Run("sidecar image", testSuiteGenerator([]testCase{ { name: "no annotation", diff --git a/pkg/runtime/config.go b/pkg/runtime/config.go index 98fcd811eef..b89bd94556b 100644 --- a/pkg/runtime/config.go +++ b/pkg/runtime/config.go @@ -90,6 +90,7 @@ type Config struct { DaprPublicPort string ApplicationPort string DaprGracefulShutdownSeconds int + DaprBlockShutdownDuration *time.Duration PlacementServiceHostAddr string DaprAPIListenAddresses string AppHealthProbeInterval int @@ -129,6 +130,7 @@ type internalConfig struct { unixDomainSocket string readBufferSize int gracefulShutdownDuration time.Duration + blockShutdownDuration *time.Duration enableAPILogging *bool disableBuiltinK8sSecretStore bool config []string @@ -284,8 +286,9 @@ func (c *Config) toInternal() (*internalConfig, error) { HealthCheckHTTPPath: c.AppHealthCheckPath, MaxConcurrency: c.AppMaxConcurrency, }, - registry: registry.New(c.Registry), - metricsExporter: metrics.NewExporterWithOptions(log, metrics.DefaultMetricNamespace, c.Metrics), + registry: registry.New(c.Registry), + metricsExporter: metrics.NewExporterWithOptions(log, metrics.DefaultMetricNamespace, c.Metrics), + blockShutdownDuration: c.DaprBlockShutdownDuration, } if len(intc.standalone.ResourcesPath) == 0 && c.ComponentsPath != "" { diff --git a/pkg/runtime/config_test.go b/pkg/runtime/config_test.go index af56a77252f..ddf02b3e37c 100644 --- a/pkg/runtime/config_test.go +++ b/pkg/runtime/config_test.go @@ -55,6 +55,8 @@ func TestParsePlacementAddr(t *testing.T) { func Test_toInternal(t *testing.T) { cfg := defaultTestConfig() + var nilDuration *time.Duration + intc, err := cfg.toInternal() require.NoError(t, err) @@ -81,6 +83,7 @@ func Test_toInternal(t *testing.T) { assert.Equal(t, "", intc.unixDomainSocket) assert.Equal(t, 4, intc.readBufferSize) assert.Equal(t, time.Second, intc.gracefulShutdownDuration) + assert.Equal(t, nilDuration, intc.blockShutdownDuration) assert.Equal(t, ptr.Of(true), intc.enableAPILogging) assert.True(t, intc.disableBuiltinK8sSecretStore) assert.Equal(t, "1.1.1.1", intc.appConnectionConfig.ChannelAddress) diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index da80145f05d..257392ad584 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -36,6 +36,7 @@ import ( sdktrace "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.10.0" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/clock" nr "github.com/dapr/components-contrib/nameresolution" "github.com/dapr/components-contrib/state" @@ -98,6 +99,7 @@ type DaprRuntime struct { daprHTTPAPI http.API daprGRPCAPI grpc.API operatorClient operatorv1pb.OperatorClient + isAppHealthy chan struct{} appHealth *apphealth.AppHealth appHealthReady func(context.Context) error // Invoked the first time the app health becomes ready appHealthLock sync.Mutex @@ -107,6 +109,7 @@ type DaprRuntime struct { authz *authorizer.Authorizer sec security.Handler runnerCloser *concurrency.RunnerCloserManager + clock clock.Clock // Used for testing. initComplete chan struct{} @@ -206,7 +209,10 @@ func newDaprRuntime(ctx context.Context, namespace: namespace, podName: podName, initComplete: make(chan struct{}), + isAppHealthy: make(chan struct{}), + clock: new(clock.RealClock), } + close(rt.isAppHealthy) var gracePeriod *time.Duration if duration := runtimeConfig.gracefulShutdownDuration; duration > 0 { @@ -274,7 +280,31 @@ func newDaprRuntime(ctx context.Context, } // Run performs initialization of the runtime with the runtime and global configurations. -func (a *DaprRuntime) Run(ctx context.Context) error { +func (a *DaprRuntime) Run(parentCtx context.Context) error { + ctx := parentCtx + if a.runtimeConfig.blockShutdownDuration != nil { + // Override context with Background. Runner context will be cancelled when + // blocking graceful shutdown returns. + ctx = context.Background() + a.runnerCloser.Add(func(ctx context.Context) error { + select { + case <-parentCtx.Done(): + case <-ctx.Done(): + // Return nil as another routine has returned, not due to an interrupt. + return nil + } + + log.Infof("Blocking graceful shutdown for %s or until app reports unhealthy...", *a.runtimeConfig.blockShutdownDuration) + select { + case <-a.clock.After(*a.runtimeConfig.blockShutdownDuration): + log.Info("Block shutdown period expired, entering shutdown...") + case <-a.isAppHealthy: + log.Info("App reported unhealthy, entering shutdown...") + } + return nil + }) + } + return a.runnerCloser.Run(ctx) } @@ -586,6 +616,12 @@ func (a *DaprRuntime) appHealthChanged(ctx context.Context, status uint8) { switch status { case apphealth.AppStatusHealthy: + select { + case <-a.isAppHealthy: + a.isAppHealthy = make(chan struct{}) + default: + } + // First time the app becomes healthy, complete the init process if a.appHealthReady != nil { if err := a.appHealthReady(ctx); err != nil { @@ -608,6 +644,12 @@ func (a *DaprRuntime) appHealthChanged(ctx context.Context, status uint8) { log.Warnf("failed to subscribe to outbox topics: %s", err) } case apphealth.AppStatusUnhealthy: + select { + case <-a.isAppHealthy: + default: + close(a.isAppHealthy) + } + // Stop topic subscriptions and input bindings a.processor.PubSub().StopSubscriptions() a.processor.Binding().StopReadingFromBindings() @@ -1025,7 +1067,7 @@ func (a *DaprRuntime) blockUntilAppIsReady(ctx context.Context) error { case <-ctx.Done(): return ctx.Err() // prevents overwhelming the OS with open connections - case <-time.After(time.Millisecond * 100): + case <-a.clock.After(time.Millisecond * 100): } } diff --git a/pkg/runtime/runtime_test.go b/pkg/runtime/runtime_test.go index d4d20263a57..bda03f7890c 100644 --- a/pkg/runtime/runtime_test.go +++ b/pkg/runtime/runtime_test.go @@ -49,6 +49,7 @@ import ( "google.golang.org/grpc/metadata" v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clocktesting "k8s.io/utils/clock/testing" "github.com/dapr/components-contrib/bindings" "github.com/dapr/components-contrib/lock" @@ -1855,6 +1856,126 @@ func TestGracefulShutdownBindings(t *testing.T) { } } +func TestBlockShutdownBindings(t *testing.T) { + t.Run("block timeout", func(t *testing.T) { + rt, err := NewTestDaprRuntime(t, modes.StandaloneMode) + require.NoError(t, err) + + fakeClock := clocktesting.NewFakeClock(time.Now()) + rt.clock = fakeClock + rt.appHealthChanged(context.Background(), apphealth.AppStatusHealthy) + + rt.runtimeConfig.blockShutdownDuration = ptr.Of(time.Millisecond * 100) + rt.runtimeConfig.gracefulShutdownDuration = 3 * time.Second + rt.runtimeConfig.registry.Bindings().RegisterInputBinding( + func(_ logger.Logger) bindings.InputBinding { + return &daprt.MockBinding{} + }, + "testInputBinding", + ) + + ctx, cancel := context.WithCancel(context.Background()) + errCh := make(chan error) + go func() { + errCh <- rt.Run(ctx) + }() + + cin := componentsV1alpha1.Component{} + cin.ObjectMeta.Name = "testInputBinding" + cin.Spec.Type = "bindings.testInputBinding" + + rt.runtimeConfig.registry.Bindings().RegisterOutputBinding( + func(_ logger.Logger) bindings.OutputBinding { + return &daprt.MockBinding{} + }, + "testOutputBinding", + ) + cout := componentsV1alpha1.Component{} + cout.ObjectMeta.Name = "testOutputBinding" + cout.Spec.Type = "bindings.testOutputBinding" + require.NoError(t, rt.processor.Init(context.Background(), cin)) + require.NoError(t, rt.processor.Init(context.Background(), cout)) + assert.Len(t, rt.compStore.ListInputBindings(), 1) + assert.Len(t, rt.compStore.ListOutputBindings(), 1) + + cancel() + + select { + case <-time.After(time.Second): + case <-errCh: + assert.Fail(t, "expected not to return until block timeout is reached") + } + + fakeClock.Step(time.Millisecond * 200) + + select { + case <-time.After(rt.runtimeConfig.gracefulShutdownDuration + 2*time.Second): + assert.Fail(t, "input bindings shutdown timed out") + case err := <-errCh: + require.NoError(t, err) + } + }) + + t.Run("block app unhealthy", func(t *testing.T) { + rt, err := NewTestDaprRuntime(t, modes.StandaloneMode) + require.NoError(t, err) + + fakeClock := clocktesting.NewFakeClock(time.Now()) + rt.clock = fakeClock + rt.appHealthChanged(context.Background(), apphealth.AppStatusHealthy) + + rt.runtimeConfig.blockShutdownDuration = ptr.Of(time.Millisecond * 100) + rt.runtimeConfig.gracefulShutdownDuration = 3 * time.Second + rt.runtimeConfig.registry.Bindings().RegisterInputBinding( + func(_ logger.Logger) bindings.InputBinding { + return &daprt.MockBinding{} + }, + "testInputBinding", + ) + + ctx, cancel := context.WithCancel(context.Background()) + errCh := make(chan error) + go func() { + errCh <- rt.Run(ctx) + }() + + cin := componentsV1alpha1.Component{} + cin.ObjectMeta.Name = "testInputBinding" + cin.Spec.Type = "bindings.testInputBinding" + + rt.runtimeConfig.registry.Bindings().RegisterOutputBinding( + func(_ logger.Logger) bindings.OutputBinding { + return &daprt.MockBinding{} + }, + "testOutputBinding", + ) + cout := componentsV1alpha1.Component{} + cout.ObjectMeta.Name = "testOutputBinding" + cout.Spec.Type = "bindings.testOutputBinding" + require.NoError(t, rt.processor.Init(context.Background(), cin)) + require.NoError(t, rt.processor.Init(context.Background(), cout)) + assert.Len(t, rt.compStore.ListInputBindings(), 1) + assert.Len(t, rt.compStore.ListOutputBindings(), 1) + + cancel() + + select { + case <-time.After(time.Second): + case <-errCh: + assert.Fail(t, "expected not to return until block timeout is reached") + } + + rt.appHealthChanged(context.Background(), apphealth.AppStatusUnhealthy) + + select { + case <-time.After(rt.runtimeConfig.gracefulShutdownDuration + 2*time.Second): + assert.Fail(t, "input bindings shutdown timed out") + case err := <-errCh: + require.NoError(t, err) + } + }) +} + func TestGracefulShutdownPubSub(t *testing.T) { rt, err := NewTestDaprRuntime(t, modes.StandaloneMode) require.NoError(t, err) diff --git a/tests/integration/framework/framework.go b/tests/integration/framework/framework.go index abb99e727c7..19730a25278 100644 --- a/tests/integration/framework/framework.go +++ b/tests/integration/framework/framework.go @@ -55,7 +55,8 @@ func (f *Framework) Cleanup(t *testing.T) { t.Logf("stopping %d processes", len(f.procs)) - for _, proc := range f.procs { - proc.Cleanup(t) + // Cleanup processes in reverse order in a stack fashion (same as t.Cleanup). + for i := len(f.procs) - 1; i >= 0; i-- { + f.procs[i].Cleanup(t) } } diff --git a/tests/integration/framework/process/daprd/daprd.go b/tests/integration/framework/process/daprd/daprd.go index 0f461d30c85..af1a2a95f98 100644 --- a/tests/integration/framework/process/daprd/daprd.go +++ b/tests/integration/framework/process/daprd/daprd.go @@ -128,6 +128,12 @@ func New(t *testing.T, fopts ...Option) *Daprd { if opts.disableK8sSecretStore != nil { args = append(args, "--disable-builtin-k8s-secret-store="+strconv.FormatBool(*opts.disableK8sSecretStore)) } + if opts.gracefulShutdownSeconds != nil { + args = append(args, "--dapr-graceful-shutdown-seconds="+strconv.Itoa(*opts.gracefulShutdownSeconds)) + } + if opts.blockShutdownDuration != nil { + args = append(args, "--dapr-block-shutdown-duration="+*opts.blockShutdownDuration) + } return &Daprd{ exec: exec.New(t, binary.EnvValue("daprd"), args, opts.execOpts...), diff --git a/tests/integration/framework/process/daprd/options.go b/tests/integration/framework/process/daprd/options.go index 2b78316f41e..86b1f1912c6 100644 --- a/tests/integration/framework/process/daprd/options.go +++ b/tests/integration/framework/process/daprd/options.go @@ -44,6 +44,8 @@ type options struct { sentryAddress string controlPlaneAddress string disableK8sSecretStore *bool + gracefulShutdownSeconds *int + blockShutdownDuration *string } func WithExecOptions(execOptions ...exec.Option) Option { @@ -198,3 +200,15 @@ func WithDisableK8sSecretStore(disable bool) Option { o.disableK8sSecretStore = &disable } } + +func WithDaprGracefulShutdownSeconds(seconds int) Option { + return func(o *options) { + o.gracefulShutdownSeconds = &seconds + } +} + +func WithDaprBlockShutdownDuration(duration string) Option { + return func(o *options) { + o.blockShutdownDuration = &duration + } +} diff --git a/tests/integration/framework/process/exec/exec.go b/tests/integration/framework/process/exec/exec.go index 30e90dc983a..556f9c2b033 100644 --- a/tests/integration/framework/process/exec/exec.go +++ b/tests/integration/framework/process/exec/exec.go @@ -97,6 +97,7 @@ func (e *exec) Run(t *testing.T, ctx context.Context) { e.cmd.Stdout = e.stdoutpipe e.cmd.Stderr = e.stderrpipe + // Wait for a few seconds before killing the process completely. e.cmd.WaitDelay = time.Second * 5 @@ -112,11 +113,12 @@ func (e *exec) Cleanup(t *testing.T) { e.lock.Lock() defer e.lock.Unlock() - require.NoError(t, e.stderrpipe.Close()) - require.NoError(t, e.stdoutpipe.Close()) - kill.Kill(t, e.cmd) + e.checkExit(t) + + require.NoError(t, e.stderrpipe.Close()) + require.NoError(t, e.stdoutpipe.Close()) } func (e *exec) checkExit(t *testing.T) { @@ -125,6 +127,6 @@ func (e *exec) checkExit(t *testing.T) { t.Logf("waiting for %q process to exit", filepath.Base(e.binPath)) e.runErrorFn(t, e.cmd.Wait()) - assert.NotNil(t, e.cmd.ProcessState, "process state should not be nil") + require.NotNil(t, e.cmd.ProcessState, "process state should not be nil") assert.Equalf(t, e.exitCode, e.cmd.ProcessState.ExitCode(), "expected exit code to be %d", e.exitCode) } diff --git a/tests/integration/framework/process/logline/logline.go b/tests/integration/framework/process/logline/logline.go index 1dd0fe6ff5c..73f65c34387 100644 --- a/tests/integration/framework/process/logline/logline.go +++ b/tests/integration/framework/process/logline/logline.go @@ -19,6 +19,7 @@ import ( "errors" "io" "strings" + "sync/atomic" "testing" "github.com/stretchr/testify/assert" @@ -39,6 +40,10 @@ type LogLine struct { stderr io.Reader stderrExp io.WriteCloser stderrLinContains map[string]bool + + outCheck chan map[string]bool + closeCh chan struct{} + done atomic.Int32 } func New(t *testing.T, fopts ...Option) *LogLine { @@ -70,41 +75,46 @@ func New(t *testing.T, fopts ...Option) *LogLine { stderr: io.TeeReader(stderrReader, iowriter.New(t, "logline:stderr")), stderrExp: stderrWriter, stderrLinContains: stderrLineContains, + outCheck: make(chan map[string]bool), + closeCh: make(chan struct{}), } } func (l *LogLine) Run(t *testing.T, ctx context.Context) { - outCheck := make(chan map[string]bool) go func() { - outCheck <- l.checkOut(t, ctx, l.stdoutLineContains, l.stdoutExp, l.stdout) + res := l.checkOut(t, ctx, l.stdoutLineContains, l.stdoutExp, l.stdout) + l.done.Add(1) + l.outCheck <- res }() go func() { - outCheck <- l.checkOut(t, ctx, l.stderrLinContains, l.stderrExp, l.stderr) + res := l.checkOut(t, ctx, l.stderrLinContains, l.stderrExp, l.stderr) + l.done.Add(1) + l.outCheck <- res }() +} - for i := 0; i < 2; i++ { - for expLine := range <-outCheck { - assert.Fail(t, "expected to log line: %s", expLine) - } - } +func (l *LogLine) FoundAll() bool { + return l.done.Load() == 2 } func (l *LogLine) Cleanup(t *testing.T) { - l.stdoutExp.Close() - l.stderrExp.Close() + close(l.closeCh) + for i := 0; i < 2; i++ { + for expLine := range <-l.outCheck { + assert.Fail(t, "expected to log line: "+expLine) + } + } } func (l *LogLine) checkOut(t *testing.T, ctx context.Context, expLines map[string]bool, closer io.WriteCloser, reader io.Reader) map[string]bool { t.Helper() - closeCh := make(chan struct{}) - go func() { select { case <-ctx.Done(): - case <-closeCh: + closer.Close() + case <-l.closeCh: } - closer.Close() }() breader := bufio.NewReader(reader) @@ -126,8 +136,6 @@ func (l *LogLine) checkOut(t *testing.T, ctx context.Context, expLines map[strin } } - close(closeCh) - return expLines } diff --git a/tests/integration/framework/process/logline/options.go b/tests/integration/framework/process/logline/options.go index 63ed906c1cf..7e6b9bc64f5 100644 --- a/tests/integration/framework/process/logline/options.go +++ b/tests/integration/framework/process/logline/options.go @@ -19,14 +19,14 @@ type options struct { stderrContains []string } -func WithStdoutLineContains(line string) func(*options) { +func WithStdoutLineContains(lines ...string) func(*options) { return func(o *options) { - o.stdoutContains = append(o.stdoutContains, line) + o.stdoutContains = append(o.stdoutContains, lines...) } } -func WithStderrLineContains(line string) func(*options) { +func WithStderrLineContains(lines ...string) func(*options) { return func(o *options) { - o.stderrContains = append(o.stderrContains, line) + o.stderrContains = append(o.stderrContains, lines...) } } diff --git a/tests/integration/suite/actors/http/ttl.go b/tests/integration/suite/actors/http/ttl.go index 3b377b79fc2..88e22907568 100644 --- a/tests/integration/suite/actors/http/ttl.go +++ b/tests/integration/suite/actors/http/ttl.go @@ -98,7 +98,7 @@ func (l *ttl) Run(t *testing.T, ctx context.Context) { require.NoError(c, rErr) require.NoError(c, resp.Body.Close()) assert.Equal(c, http.StatusOK, resp.StatusCode) - }, time.Second*10, time.Millisecond*100, "actor not ready") + }, time.Second*20, time.Millisecond*100, "actor not ready") now := time.Now() diff --git a/tests/integration/suite/daprd/daprd.go b/tests/integration/suite/daprd/daprd.go index 9b0742927ac..7a2877b0106 100644 --- a/tests/integration/suite/daprd/daprd.go +++ b/tests/integration/suite/daprd/daprd.go @@ -26,5 +26,6 @@ import ( _ "github.com/dapr/dapr/tests/integration/suite/daprd/resources" _ "github.com/dapr/dapr/tests/integration/suite/daprd/secret" _ "github.com/dapr/dapr/tests/integration/suite/daprd/serviceinvocation" + _ "github.com/dapr/dapr/tests/integration/suite/daprd/shutdown" _ "github.com/dapr/dapr/tests/integration/suite/daprd/state" ) diff --git a/tests/integration/suite/daprd/resources/uniquename.go b/tests/integration/suite/daprd/resources/uniquename.go index eeff7bc69e3..9e00e3523cc 100644 --- a/tests/integration/suite/daprd/resources/uniquename.go +++ b/tests/integration/suite/daprd/resources/uniquename.go @@ -19,7 +19,9 @@ import ( "os" "path/filepath" "testing" + "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/dapr/dapr/tests/integration/framework" @@ -80,10 +82,10 @@ spec: ), ) return []framework.Option{ - framework.WithProcesses(u.daprd, u.logline), + framework.WithProcesses(u.logline, u.daprd), } } func (u *uniquename) Run(t *testing.T, ctx context.Context) { - // Assertions done in logline process + assert.Eventually(t, u.logline.FoundAll, time.Second*5, time.Millisecond*100) } diff --git a/tests/integration/suite/daprd/shutdown/block/app.go b/tests/integration/suite/daprd/shutdown/block/app.go new file mode 100644 index 00000000000..1852c8e35b0 --- /dev/null +++ b/tests/integration/suite/daprd/shutdown/block/app.go @@ -0,0 +1,18 @@ +/* +Copyright 2023 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package app + +import ( + _ "github.com/dapr/dapr/tests/integration/suite/daprd/shutdown/block/app" +) diff --git a/tests/integration/suite/daprd/shutdown/block/app/healthy.go b/tests/integration/suite/daprd/shutdown/block/app/healthy.go new file mode 100644 index 00000000000..61c9b25dfa3 --- /dev/null +++ b/tests/integration/suite/daprd/shutdown/block/app/healthy.go @@ -0,0 +1,179 @@ +/* +Copyright 2023 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package app + +import ( + "context" + "io" + "net/http" + "runtime" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + rtv1 "github.com/dapr/dapr/pkg/proto/runtime/v1" + "github.com/dapr/dapr/tests/integration/framework" + "github.com/dapr/dapr/tests/integration/framework/process/daprd" + "github.com/dapr/dapr/tests/integration/framework/process/exec" + prochttp "github.com/dapr/dapr/tests/integration/framework/process/http" + "github.com/dapr/dapr/tests/integration/framework/process/logline" + "github.com/dapr/dapr/tests/integration/suite" +) + +func init() { + suite.Register(new(healthy)) +} + +// health tests Daprd's --dapr-block-shutdown-seconds, ensuring shutdown will +// occur when the app becomes unhealthy. +type healthy struct { + daprd *daprd.Daprd + logline *logline.LogLine + appHealth atomic.Bool + healthzCalled atomic.Int64 + routeCh chan struct{} +} + +func (h *healthy) Setup(t *testing.T) []framework.Option { + if runtime.GOOS == "windows" { + t.Skip("Skipping test on windows which relies on unix process signals") + } + + h.appHealth.Store(true) + h.routeCh = make(chan struct{}, 1) + + handler := http.NewServeMux() + handler.HandleFunc("/dapr/subscribe", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + io.WriteString(w, `[{"pubsubname":"foo","topic":"topic","route":"route"}]`) + }) + handler.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { + defer h.healthzCalled.Add(1) + if h.appHealth.Load() { + w.WriteHeader(http.StatusOK) + return + } + w.WriteHeader(http.StatusServiceUnavailable) + }) + handler.HandleFunc("/route", func(w http.ResponseWriter, r *http.Request) { + h.routeCh <- struct{}{} + }) + app := prochttp.New(t, + prochttp.WithHandler(handler), + ) + + h.logline = logline.New(t, + logline.WithStdoutLineContains( + "Blocking graceful shutdown for 3m0s or until app reports unhealthy...", + "App reported unhealthy, entering shutdown...", + "Daprd shutdown gracefully", + ), + ) + + h.daprd = daprd.New(t, + daprd.WithDaprBlockShutdownDuration("180s"), + daprd.WithAppPort(app.Port()), + daprd.WithAppHealthCheck(true), + daprd.WithAppHealthCheckPath("/healthz"), + daprd.WithAppHealthProbeInterval(1), + daprd.WithAppHealthProbeThreshold(1), + daprd.WithExecOptions(exec.WithStdout(h.logline.Stdout())), + daprd.WithResourceFiles(` +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: foo +spec: + type: pubsub.in-memory + version: v1 +`)) + + return []framework.Option{ + framework.WithProcesses(app, h.logline), + } +} + +func (h *healthy) Run(t *testing.T, ctx context.Context) { + h.daprd.Run(t, ctx) + h.daprd.WaitUntilRunning(t, ctx) + + conn, err := grpc.DialContext(ctx, h.daprd.GRPCAddress(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, conn.Close()) }) + client := rtv1.NewDaprClient(conn) + + _, err = client.PublishEvent(ctx, &rtv1.PublishEventRequest{ + PubsubName: "foo", + Topic: "topic", + Data: []byte(`{"status":"completed"}`), + }) + require.NoError(t, err) + select { + case <-h.routeCh: + case <-ctx.Done(): + assert.Fail(t, "pubsub did not send message to subscriber") + } + + daprdStopped := make(chan struct{}) + go func() { + h.daprd.Cleanup(t) + close(daprdStopped) + }() + + healthzCalled := h.healthzCalled.Load() + assert.Eventually(t, func() bool { + return h.healthzCalled.Load() > healthzCalled + }, time.Second*5, time.Millisecond*100) + + _, err = client.PublishEvent(ctx, &rtv1.PublishEventRequest{ + PubsubName: "foo", + Topic: "topic", + Data: []byte(`{"status":"completed"}`), + }) + require.NoError(t, err) + select { + case <-h.routeCh: + case <-ctx.Done(): + assert.Fail(t, "pubsub did not send message to subscriber") + } + + healthzCalled = h.healthzCalled.Load() + h.appHealth.Store(false) + require.Equal(t, healthzCalled, h.healthzCalled.Load()) + + assert.Eventually(t, func() bool { + return h.healthzCalled.Load() > healthzCalled + }, time.Second*5, time.Millisecond*100) + + assert.EventuallyWithT(t, func(c *assert.CollectT) { + _, err = client.PublishEvent(ctx, &rtv1.PublishEventRequest{ + PubsubName: "foo", + Topic: "topic", + Data: []byte(`{"status":"completed"}`), + }) + //nolint:testifylint + assert.Error(c, err) + }, time.Second*5, time.Millisecond*100) + + select { + case <-daprdStopped: + case <-time.After(time.Second * 5): + assert.Fail(t, "daprd did not exit in time") + } +} diff --git a/tests/integration/suite/daprd/shutdown/block/app/unhealthy.go b/tests/integration/suite/daprd/shutdown/block/app/unhealthy.go new file mode 100644 index 00000000000..4f250c3b94e --- /dev/null +++ b/tests/integration/suite/daprd/shutdown/block/app/unhealthy.go @@ -0,0 +1,159 @@ +/* +Copyright 2023 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package app + +import ( + "context" + "io" + "net/http" + "runtime" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + commonv1 "github.com/dapr/dapr/pkg/proto/common/v1" + rtv1 "github.com/dapr/dapr/pkg/proto/runtime/v1" + "github.com/dapr/dapr/tests/integration/framework" + "github.com/dapr/dapr/tests/integration/framework/process/daprd" + "github.com/dapr/dapr/tests/integration/framework/process/exec" + prochttp "github.com/dapr/dapr/tests/integration/framework/process/http" + "github.com/dapr/dapr/tests/integration/framework/process/logline" + "github.com/dapr/dapr/tests/integration/suite" +) + +func init() { + suite.Register(new(unhealthy)) +} + +// unhealth tests Daprd's --dapr-block-shutdown-seconds, ensuring shutdown will +// occur straight away if the app is already unhealthy. +type unhealthy struct { + daprd *daprd.Daprd + logline *logline.LogLine + appHealth atomic.Bool + routeCh chan struct{} +} + +func (u *unhealthy) Setup(t *testing.T) []framework.Option { + if runtime.GOOS == "windows" { + t.Skip("Skipping test on windows which relies on unix process signals") + } + + u.appHealth.Store(true) + u.routeCh = make(chan struct{}, 1) + + handler := http.NewServeMux() + handler.HandleFunc("/dapr/subscribe", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + io.WriteString(w, `[{"pubsubname":"foo","topic":"topic","route":"route"}]`) + }) + handler.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { + if u.appHealth.Load() { + w.WriteHeader(http.StatusOK) + return + } + w.WriteHeader(http.StatusServiceUnavailable) + }) + handler.HandleFunc("/foo", func(w http.ResponseWriter, r *http.Request) { + }) + handler.HandleFunc("/route", func(w http.ResponseWriter, r *http.Request) { + u.routeCh <- struct{}{} + }) + app := prochttp.New(t, + prochttp.WithHandler(handler), + ) + + u.logline = logline.New(t, + logline.WithStdoutLineContains( + "Blocking graceful shutdown for 3m0s or until app reports unhealthy...", + "App reported unhealthy, entering shutdown...", + "Daprd shutdown gracefully", + ), + ) + + u.daprd = daprd.New(t, + daprd.WithDaprBlockShutdownDuration("180s"), + daprd.WithAppPort(app.Port()), + daprd.WithAppHealthCheck(true), + daprd.WithAppHealthCheckPath("/healthz"), + daprd.WithAppHealthProbeInterval(1), + daprd.WithAppHealthProbeThreshold(1), + daprd.WithExecOptions(exec.WithStdout(u.logline.Stdout())), + daprd.WithResourceFiles(` +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: foo +spec: + type: pubsub.in-memory + version: v1 +`)) + + return []framework.Option{ + framework.WithProcesses(app, u.logline), + } +} + +func (u *unhealthy) Run(t *testing.T, ctx context.Context) { + u.daprd.Run(t, ctx) + u.daprd.WaitUntilRunning(t, ctx) + + conn, err := grpc.DialContext(ctx, u.daprd.GRPCAddress(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, conn.Close()) }) + client := rtv1.NewDaprClient(conn) + + _, err = client.PublishEvent(ctx, &rtv1.PublishEventRequest{ + PubsubName: "foo", + Topic: "topic", + Data: []byte(`{"status":"completed"}`), + }) + require.NoError(t, err) + select { + case <-u.routeCh: + case <-ctx.Done(): + assert.Fail(t, "pubsub did not send message to subscriber") + } + + u.appHealth.Store(false) + + assert.EventuallyWithT(t, func(c *assert.CollectT) { + _, err = client.InvokeService(ctx, &rtv1.InvokeServiceRequest{ + Id: u.daprd.AppID(), + Message: &commonv1.InvokeRequest{ + Method: "foo", + HttpExtension: &commonv1.HTTPExtension{Verb: commonv1.HTTPExtension_GET}, + }, + }) + //nolint:testifylint + assert.ErrorContains(c, err, "app is not in a healthy state") + }, time.Second*5, time.Millisecond*100) + + daprdStopped := make(chan struct{}) + go func() { + u.daprd.Cleanup(t) + close(daprdStopped) + }() + + select { + case <-daprdStopped: + case <-time.After(time.Second * 5): + assert.Fail(t, "daprd did not exit in time") + } +} diff --git a/tests/integration/suite/daprd/shutdown/block/timeout.go b/tests/integration/suite/daprd/shutdown/block/timeout.go new file mode 100644 index 00000000000..bb9d1ac3d8b --- /dev/null +++ b/tests/integration/suite/daprd/shutdown/block/timeout.go @@ -0,0 +1,159 @@ +/* +Copyright 2023 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package app + +import ( + "context" + "io" + "net/http" + "runtime" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + rtv1 "github.com/dapr/dapr/pkg/proto/runtime/v1" + "github.com/dapr/dapr/tests/integration/framework" + "github.com/dapr/dapr/tests/integration/framework/process/daprd" + "github.com/dapr/dapr/tests/integration/framework/process/exec" + prochttp "github.com/dapr/dapr/tests/integration/framework/process/http" + "github.com/dapr/dapr/tests/integration/framework/process/logline" + "github.com/dapr/dapr/tests/integration/suite" +) + +func init() { + suite.Register(new(timeout)) +} + +// timeout tests Daprd's --dapr-block-shutdown-seconds, ensuring shutdown +// procedure will begin when seconds is reached when app still reports healthy. +type timeout struct { + daprd *daprd.Daprd + logline *logline.LogLine + routeCh chan struct{} +} + +func (i *timeout) Setup(t *testing.T) []framework.Option { + if runtime.GOOS == "windows" { + t.Skip("Skipping test on windows which relies on unix process signals") + } + + i.routeCh = make(chan struct{}, 1) + handler := http.NewServeMux() + handler.HandleFunc("/dapr/subscribe", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + io.WriteString(w, `[{"pubsubname":"foo","topic":"topic","route":"route"}]`) + }) + handler.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + }) + handler.HandleFunc("/route", func(w http.ResponseWriter, r *http.Request) { + i.routeCh <- struct{}{} + }) + app := prochttp.New(t, + prochttp.WithHandler(handler), + ) + + i.logline = logline.New(t, + logline.WithStdoutLineContains( + "Blocking graceful shutdown for 2s or until app reports unhealthy...", + "Block shutdown period expired, entering shutdown...", + "Daprd shutdown gracefully", + ), + ) + + i.daprd = daprd.New(t, + daprd.WithDaprBlockShutdownDuration("2s"), + daprd.WithAppPort(app.Port()), + daprd.WithAppHealthCheck(true), + daprd.WithAppHealthCheckPath("/healthz"), + daprd.WithAppHealthProbeInterval(1), + daprd.WithAppHealthProbeThreshold(1), + daprd.WithExecOptions(exec.WithStdout(i.logline.Stdout())), + daprd.WithResourceFiles(` +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: foo +spec: + type: pubsub.in-memory + version: v1 +`)) + + return []framework.Option{ + framework.WithProcesses(app, i.logline), + } +} + +func (i *timeout) Run(t *testing.T, ctx context.Context) { + i.daprd.Run(t, ctx) + i.daprd.WaitUntilRunning(t, ctx) + + conn, err := grpc.DialContext(ctx, i.daprd.GRPCAddress(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, conn.Close()) }) + client := rtv1.NewDaprClient(conn) + + _, err = client.PublishEvent(ctx, &rtv1.PublishEventRequest{ + PubsubName: "foo", + Topic: "topic", + Data: []byte(`{"status":"completed"}`), + }) + require.NoError(t, err) + select { + case <-i.routeCh: + case <-ctx.Done(): + assert.Fail(t, "pubsub message should have been sent to subscriber") + } + + daprdStopped := make(chan struct{}) + go func() { + i.daprd.Cleanup(t) + close(daprdStopped) + }() + + t.Run("daprd APIs should still be available during blocked shutdown", func(t *testing.T) { + time.Sleep(time.Second) + _, err = client.PublishEvent(ctx, &rtv1.PublishEventRequest{ + PubsubName: "foo", + Topic: "topic", + Data: []byte(`{"status":"completed"}`), + }) + require.NoError(t, err) + select { + case <-i.routeCh: + case <-ctx.Done(): + assert.Fail(t, "pubsub message should have been sent to subscriber") + } + }) + + t.Run("daprd APIs are no longer available when past blocked shutdown", func(t *testing.T) { + time.Sleep(time.Second * 3 / 2) + _, err = client.PublishEvent(ctx, &rtv1.PublishEventRequest{ + PubsubName: "foo", + Topic: "topic", + Data: []byte(`{"status":"completed"}`), + }) + require.Error(t, err) + }) + + select { + case <-daprdStopped: + case <-time.After(time.Second * 5): + assert.Fail(t, "daprd did not exit in time") + } +} diff --git a/tests/integration/suite/daprd/shutdown/graceful/graceful.go b/tests/integration/suite/daprd/shutdown/graceful/graceful.go new file mode 100644 index 00000000000..b70dcd23f24 --- /dev/null +++ b/tests/integration/suite/daprd/shutdown/graceful/graceful.go @@ -0,0 +1,65 @@ +/* +Copyright 2023 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package graceful + +import ( + "context" + "net/http" + "runtime" + "testing" + + "github.com/dapr/dapr/tests/integration/framework" + "github.com/dapr/dapr/tests/integration/framework/process/daprd" + "github.com/dapr/dapr/tests/integration/framework/process/exec" + prochttp "github.com/dapr/dapr/tests/integration/framework/process/http" + "github.com/dapr/dapr/tests/integration/framework/process/logline" + "github.com/dapr/dapr/tests/integration/suite" +) + +func init() { + suite.Register(new(graceful)) +} + +// graceful tests Daprd's --dapr-graceful-shutdown-seconds gracefully +// terminates on all resources closing. +type graceful struct { + daprd *daprd.Daprd +} + +func (g *graceful) Setup(t *testing.T) []framework.Option { + if runtime.GOOS == "windows" { + t.Skip("Skipping test on windows which relies on unix process signals") + } + + app := prochttp.New(t, prochttp.WithHandler(http.NewServeMux())) + + logline := logline.New(t, + logline.WithStdoutLineContains( + "Daprd shutdown gracefully", + ), + ) + + g.daprd = daprd.New(t, + daprd.WithDaprGracefulShutdownSeconds(5), + daprd.WithAppPort(app.Port()), + daprd.WithExecOptions(exec.WithStdout(logline.Stdout())), + ) + return []framework.Option{ + framework.WithProcesses(app, logline, g.daprd), + } +} + +func (g *graceful) Run(t *testing.T, ctx context.Context) { + g.daprd.WaitUntilRunning(t, ctx) +} diff --git a/tests/integration/suite/daprd/shutdown/graceful/timeout.go b/tests/integration/suite/daprd/shutdown/graceful/timeout.go new file mode 100644 index 00000000000..bdd7e8f349c --- /dev/null +++ b/tests/integration/suite/daprd/shutdown/graceful/timeout.go @@ -0,0 +1,106 @@ +/* +Copyright 2023 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package graceful + +import ( + "context" + "fmt" + "io" + "net/http" + "runtime" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/dapr/dapr/tests/integration/framework" + "github.com/dapr/dapr/tests/integration/framework/process/daprd" + "github.com/dapr/dapr/tests/integration/framework/process/exec" + prochttp "github.com/dapr/dapr/tests/integration/framework/process/http" + "github.com/dapr/dapr/tests/integration/framework/process/logline" + "github.com/dapr/dapr/tests/integration/framework/util" + "github.com/dapr/dapr/tests/integration/suite" +) + +func init() { + suite.Register(new(timeout)) +} + +// timeout tests Daprd's --dapr-graceful-shutdown-seconds where Daprd force exits +// because the graceful timeout expires. +type timeout struct { + daprd *daprd.Daprd + closeInvoke chan struct{} + inInvoke chan struct{} +} + +func (i *timeout) Setup(t *testing.T) []framework.Option { + if runtime.GOOS == "windows" { + t.Skip("Skipping test on windows which relies on unix process signals") + } + + i.closeInvoke = make(chan struct{}) + i.inInvoke = make(chan struct{}) + handler := http.NewServeMux() + handler.HandleFunc("/foo", func(w http.ResponseWriter, r *http.Request) { + close(i.inInvoke) + <-i.closeInvoke + }) + app := prochttp.New(t, + prochttp.WithHandler(handler), + ) + + logline := logline.New(t, + logline.WithStdoutLineContains( + "Graceful shutdown timeout exceeded, forcing shutdown", + ), + ) + + i.daprd = daprd.New(t, + daprd.WithAppPort(app.Port()), + daprd.WithDaprGracefulShutdownSeconds(1), + daprd.WithExecOptions( + exec.WithExitCode(1), + exec.WithRunError(func(t *testing.T, err error) { + require.ErrorContains(t, err, "exit status 1") + }), + exec.WithStdout(logline.Stdout()), + ), + ) + + return []framework.Option{ + framework.WithProcesses(app, logline), + } +} + +func (i *timeout) Run(t *testing.T, ctx context.Context) { + i.daprd.Run(t, ctx) + i.daprd.WaitUntilRunning(t, ctx) + client := util.HTTPClient(t) + + reqURL := fmt.Sprintf("http://localhost:%d/v1.0/invoke/%s/method/foo", i.daprd.HTTPPort(), i.daprd.AppID()) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, reqURL, nil) + require.NoError(t, err) + errCh := make(chan error) + go func() { + resp, cerr := client.Do(req) + if resp != nil { + resp.Body.Close() + } + errCh <- cerr + }() + <-i.inInvoke + i.daprd.Cleanup(t) + close(i.closeInvoke) + require.ErrorIs(t, <-errCh, io.EOF) +} diff --git a/tests/integration/suite/daprd/shutdown/shutdown.go b/tests/integration/suite/daprd/shutdown/shutdown.go new file mode 100644 index 00000000000..42bf5b6baa0 --- /dev/null +++ b/tests/integration/suite/daprd/shutdown/shutdown.go @@ -0,0 +1,19 @@ +/* +Copyright 2023 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package shutdown + +import ( + _ "github.com/dapr/dapr/tests/integration/suite/daprd/shutdown/block" + _ "github.com/dapr/dapr/tests/integration/suite/daprd/shutdown/graceful" +)