diff --git a/cmd/daprd/main.go b/cmd/daprd/main.go index eb7b41bff5a..3f57b5fc54e 100644 --- a/cmd/daprd/main.go +++ b/cmd/daprd/main.go @@ -150,6 +150,7 @@ func main() { UnixDomainSocket: opts.UnixDomainSocket, DaprHTTPReadBufferSize: opts.DaprHTTPReadBufferSize, DaprGracefulShutdownSeconds: opts.DaprGracefulShutdownSeconds, + DaprBlockShutdownDuration: opts.DaprBlockShutdownDuration, DisableBuiltinK8sSecretStore: opts.DisableBuiltinK8sSecretStore, EnableAppHealthCheck: opts.EnableAppHealthCheck, AppHealthCheckPath: opts.AppHealthCheckPath, @@ -175,4 +176,5 @@ func main() { if err != nil { log.Fatalf("Fatal error from runtime: %s", err) } + log.Info("Daprd shutdown gracefully") } diff --git a/cmd/daprd/options/options.go b/cmd/daprd/options/options.go index 224c52b370a..98ee81edaf8 100644 --- a/cmd/daprd/options/options.go +++ b/cmd/daprd/options/options.go @@ -57,6 +57,7 @@ type Options struct { DaprPublicPort string AppPort string DaprGracefulShutdownSeconds int + DaprBlockShutdownDuration *time.Duration PlacementServiceHostAddr string DaprAPIListenAddresses string AppHealthProbeInterval int @@ -79,6 +80,8 @@ func New(args []string) *Options { EnableAPILogging: new(bool), } + var blockShutdownDuration time.Duration + flag.StringVar(&opts.Mode, "mode", string(modes.StandaloneMode), "Runtime mode for Dapr") flag.StringVar(&opts.DaprHTTPPort, "dapr-http-port", strconv.Itoa(runtime.DefaultDaprHTTPPort), "HTTP port for Dapr API to listen on") flag.StringVar(&opts.DaprAPIListenAddresses, "dapr-listen-addresses", runtime.DefaultAPIListenAddress, "One or more addresses for the Dapr API to listen on, CSV limited") @@ -109,6 +112,7 @@ func New(args []string) *Options { flag.StringVar(&opts.UnixDomainSocket, "unix-domain-socket", "", "Path to a unix domain socket dir mount. If specified, Dapr API servers will use Unix Domain Sockets") flag.IntVar(&opts.DaprHTTPReadBufferSize, "dapr-http-read-buffer-size", runtime.DefaultReadBufferSize, "Increasing max size of read buffer in KB to handle sending multi-KB headers") flag.IntVar(&opts.DaprGracefulShutdownSeconds, "dapr-graceful-shutdown-seconds", int(runtime.DefaultGracefulShutdownDuration/time.Second), "Graceful shutdown time in seconds") + flag.DurationVar(&blockShutdownDuration, "dapr-block-shutdown-duration", 0, "If enabled, will block graceful shutdown after terminate signal is received until either the given duration has elapsed or the app reports unhealthy. Disabled by default") flag.BoolVar(opts.EnableAPILogging, "enable-api-logging", false, "Enable API logging for API calls") flag.BoolVar(&opts.DisableBuiltinK8sSecretStore, "disable-builtin-k8s-secret-store", false, "Disable the built-in Kubernetes Secret Store") flag.BoolVar(&opts.EnableAppHealthCheck, "enable-app-health-check", false, "Enable health checks for the application using the protocol defined with app-protocol") @@ -150,6 +154,10 @@ func New(args []string) *Options { } } + if isFlagPassed("dapr-block-shutdown-duration") { + opts.DaprBlockShutdownDuration = &blockShutdownDuration + } + return &opts } diff --git a/pkg/injector/annotations/annotations.go b/pkg/injector/annotations/annotations.go index 76509f386dc..579d4bee1f2 100644 --- a/pkg/injector/annotations/annotations.go +++ b/pkg/injector/annotations/annotations.go @@ -53,6 +53,7 @@ const ( KeyHTTPMaxRequestSize = "dapr.io/http-max-request-size" KeyHTTPReadBufferSize = "dapr.io/http-read-buffer-size" KeyGracefulShutdownSeconds = "dapr.io/graceful-shutdown-seconds" + KeyBlockShutdownDuration = "dapr.io/block-shutdown-duration" KeyEnableAPILogging = "dapr.io/enable-api-logging" KeyUnixDomainSocketPath = "dapr.io/unix-domain-socket-path" KeyVolumeMountsReadOnly = "dapr.io/volume-mounts" diff --git a/pkg/injector/patcher/sidecar.go b/pkg/injector/patcher/sidecar.go index 408aa035887..d2617278dc5 100644 --- a/pkg/injector/patcher/sidecar.go +++ b/pkg/injector/patcher/sidecar.go @@ -56,57 +56,58 @@ type SidecarConfig struct { SidecarInternalGRPCPort int32 `default:"50002"` SidecarPublicPort int32 `default:"3501"` - Enabled bool `annotation:"dapr.io/enabled"` - AppPort int32 `annotation:"dapr.io/app-port"` - Config string `annotation:"dapr.io/config"` - AppProtocol string `annotation:"dapr.io/app-protocol" default:"http"` - AppSSL bool `annotation:"dapr.io/app-ssl"` // TODO: Deprecated in Dapr 1.11; remove in a future Dapr version - AppID string `annotation:"dapr.io/app-id"` - EnableProfiling bool `annotation:"dapr.io/enable-profiling"` - LogLevel string `annotation:"dapr.io/log-level" default:"info"` - APITokenSecret string `annotation:"dapr.io/api-token-secret"` - AppTokenSecret string `annotation:"dapr.io/app-token-secret"` - LogAsJSON bool `annotation:"dapr.io/log-as-json"` - AppMaxConcurrency *int `annotation:"dapr.io/app-max-concurrency"` - EnableMetrics bool `annotation:"dapr.io/enable-metrics" default:"true"` - SidecarMetricsPort int32 `annotation:"dapr.io/metrics-port" default:"9090"` - EnableDebug bool `annotation:"dapr.io/enable-debug" default:"false"` - SidecarDebugPort int32 `annotation:"dapr.io/debug-port" default:"40000"` - Env string `annotation:"dapr.io/env"` - SidecarCPURequest string `annotation:"dapr.io/sidecar-cpu-request"` - SidecarCPULimit string `annotation:"dapr.io/sidecar-cpu-limit"` - SidecarMemoryRequest string `annotation:"dapr.io/sidecar-memory-request"` - SidecarMemoryLimit string `annotation:"dapr.io/sidecar-memory-limit"` - SidecarListenAddresses string `annotation:"dapr.io/sidecar-listen-addresses" default:"[::1],127.0.0.1"` - SidecarLivenessProbeDelaySeconds int32 `annotation:"dapr.io/sidecar-liveness-probe-delay-seconds" default:"3"` - SidecarLivenessProbeTimeoutSeconds int32 `annotation:"dapr.io/sidecar-liveness-probe-timeout-seconds" default:"3"` - SidecarLivenessProbePeriodSeconds int32 `annotation:"dapr.io/sidecar-liveness-probe-period-seconds" default:"6"` - SidecarLivenessProbeThreshold int32 `annotation:"dapr.io/sidecar-liveness-probe-threshold" default:"3"` - SidecarReadinessProbeDelaySeconds int32 `annotation:"dapr.io/sidecar-readiness-probe-delay-seconds" default:"3"` - SidecarReadinessProbeTimeoutSeconds int32 `annotation:"dapr.io/sidecar-readiness-probe-timeout-seconds" default:"3"` - SidecarReadinessProbePeriodSeconds int32 `annotation:"dapr.io/sidecar-readiness-probe-period-seconds" default:"6"` - SidecarReadinessProbeThreshold int32 `annotation:"dapr.io/sidecar-readiness-probe-threshold" default:"3"` - SidecarImage string `annotation:"dapr.io/sidecar-image"` - SidecarSeccompProfileType string `annotation:"dapr.io/sidecar-seccomp-profile-type"` - HTTPMaxRequestSize *int `annotation:"dapr.io/http-max-request-size"` - HTTPReadBufferSize *int `annotation:"dapr.io/http-read-buffer-size"` - GracefulShutdownSeconds int `annotation:"dapr.io/graceful-shutdown-seconds" default:"-1"` - EnableAPILogging *bool `annotation:"dapr.io/enable-api-logging"` - UnixDomainSocketPath string `annotation:"dapr.io/unix-domain-socket-path"` - VolumeMounts string `annotation:"dapr.io/volume-mounts"` - VolumeMountsRW string `annotation:"dapr.io/volume-mounts-rw"` - DisableBuiltinK8sSecretStore bool `annotation:"dapr.io/disable-builtin-k8s-secret-store"` - EnableAppHealthCheck bool `annotation:"dapr.io/enable-app-health-check"` - AppHealthCheckPath string `annotation:"dapr.io/app-health-check-path" default:"/healthz"` - AppHealthProbeInterval int32 `annotation:"dapr.io/app-health-probe-interval" default:"5"` // In seconds - AppHealthProbeTimeout int32 `annotation:"dapr.io/app-health-probe-timeout" default:"500"` // In milliseconds - AppHealthThreshold int32 `annotation:"dapr.io/app-health-threshold" default:"3"` - PlacementAddress string `annotation:"dapr.io/placement-host-address"` - PluggableComponents string `annotation:"dapr.io/pluggable-components"` - PluggableComponentsSocketsFolder string `annotation:"dapr.io/pluggable-components-sockets-folder"` - ComponentContainer string `annotation:"dapr.io/component-container"` - InjectPluggableComponents bool `annotation:"dapr.io/inject-pluggable-components"` - AppChannelAddress string `annotation:"dapr.io/app-channel-address"` + Enabled bool `annotation:"dapr.io/enabled"` + AppPort int32 `annotation:"dapr.io/app-port"` + Config string `annotation:"dapr.io/config"` + AppProtocol string `annotation:"dapr.io/app-protocol" default:"http"` + AppSSL bool `annotation:"dapr.io/app-ssl"` // TODO: Deprecated in Dapr 1.11; remove in a future Dapr version + AppID string `annotation:"dapr.io/app-id"` + EnableProfiling bool `annotation:"dapr.io/enable-profiling"` + LogLevel string `annotation:"dapr.io/log-level" default:"info"` + APITokenSecret string `annotation:"dapr.io/api-token-secret"` + AppTokenSecret string `annotation:"dapr.io/app-token-secret"` + LogAsJSON bool `annotation:"dapr.io/log-as-json"` + AppMaxConcurrency *int `annotation:"dapr.io/app-max-concurrency"` + EnableMetrics bool `annotation:"dapr.io/enable-metrics" default:"true"` + SidecarMetricsPort int32 `annotation:"dapr.io/metrics-port" default:"9090"` + EnableDebug bool `annotation:"dapr.io/enable-debug" default:"false"` + SidecarDebugPort int32 `annotation:"dapr.io/debug-port" default:"40000"` + Env string `annotation:"dapr.io/env"` + SidecarCPURequest string `annotation:"dapr.io/sidecar-cpu-request"` + SidecarCPULimit string `annotation:"dapr.io/sidecar-cpu-limit"` + SidecarMemoryRequest string `annotation:"dapr.io/sidecar-memory-request"` + SidecarMemoryLimit string `annotation:"dapr.io/sidecar-memory-limit"` + SidecarListenAddresses string `annotation:"dapr.io/sidecar-listen-addresses" default:"[::1],127.0.0.1"` + SidecarLivenessProbeDelaySeconds int32 `annotation:"dapr.io/sidecar-liveness-probe-delay-seconds" default:"3"` + SidecarLivenessProbeTimeoutSeconds int32 `annotation:"dapr.io/sidecar-liveness-probe-timeout-seconds" default:"3"` + SidecarLivenessProbePeriodSeconds int32 `annotation:"dapr.io/sidecar-liveness-probe-period-seconds" default:"6"` + SidecarLivenessProbeThreshold int32 `annotation:"dapr.io/sidecar-liveness-probe-threshold" default:"3"` + SidecarReadinessProbeDelaySeconds int32 `annotation:"dapr.io/sidecar-readiness-probe-delay-seconds" default:"3"` + SidecarReadinessProbeTimeoutSeconds int32 `annotation:"dapr.io/sidecar-readiness-probe-timeout-seconds" default:"3"` + SidecarReadinessProbePeriodSeconds int32 `annotation:"dapr.io/sidecar-readiness-probe-period-seconds" default:"6"` + SidecarReadinessProbeThreshold int32 `annotation:"dapr.io/sidecar-readiness-probe-threshold" default:"3"` + SidecarImage string `annotation:"dapr.io/sidecar-image"` + SidecarSeccompProfileType string `annotation:"dapr.io/sidecar-seccomp-profile-type"` + HTTPMaxRequestSize *int `annotation:"dapr.io/http-max-request-size"` + HTTPReadBufferSize *int `annotation:"dapr.io/http-read-buffer-size"` + GracefulShutdownSeconds int `annotation:"dapr.io/graceful-shutdown-seconds" default:"-1"` + BlockShutdownDuration *string `annotation:"dapr.io/block-shutdown-duration"` + EnableAPILogging *bool `annotation:"dapr.io/enable-api-logging"` + UnixDomainSocketPath string `annotation:"dapr.io/unix-domain-socket-path"` + VolumeMounts string `annotation:"dapr.io/volume-mounts"` + VolumeMountsRW string `annotation:"dapr.io/volume-mounts-rw"` + DisableBuiltinK8sSecretStore bool `annotation:"dapr.io/disable-builtin-k8s-secret-store"` + EnableAppHealthCheck bool `annotation:"dapr.io/enable-app-health-check"` + AppHealthCheckPath string `annotation:"dapr.io/app-health-check-path" default:"/healthz"` + AppHealthProbeInterval int32 `annotation:"dapr.io/app-health-probe-interval" default:"5"` // In seconds + AppHealthProbeTimeout int32 `annotation:"dapr.io/app-health-probe-timeout" default:"500"` // In milliseconds + AppHealthThreshold int32 `annotation:"dapr.io/app-health-threshold" default:"3"` + PlacementAddress string `annotation:"dapr.io/placement-host-address"` + PluggableComponents string `annotation:"dapr.io/pluggable-components"` + PluggableComponentsSocketsFolder string `annotation:"dapr.io/pluggable-components-sockets-folder"` + ComponentContainer string `annotation:"dapr.io/component-container"` + InjectPluggableComponents bool `annotation:"dapr.io/inject-pluggable-components"` + AppChannelAddress string `annotation:"dapr.io/app-channel-address"` pod *corev1.Pod } diff --git a/pkg/injector/patcher/sidecar_container.go b/pkg/injector/patcher/sidecar_container.go index 25956ffa139..5ffd071e292 100644 --- a/pkg/injector/patcher/sidecar_container.go +++ b/pkg/injector/patcher/sidecar_container.go @@ -169,6 +169,10 @@ func (c *SidecarConfig) getSidecarContainer(opts getSidecarContainerOpts) (*core args = append(args, "--unix-domain-socket", injectorConsts.UnixDomainSocketDaprdPath) } + if c.BlockShutdownDuration != nil { + args = append(args, "--dapr-block-shutdown-duration", *c.BlockShutdownDuration) + } + // When debugging is enabled, we need to override the command and the flags if c.EnableDebug { ports = append(ports, corev1.ContainerPort{ diff --git a/pkg/injector/patcher/sidecar_container_test.go b/pkg/injector/patcher/sidecar_container_test.go index 486e0e65068..c195665d3e2 100644 --- a/pkg/injector/patcher/sidecar_container_test.go +++ b/pkg/injector/patcher/sidecar_container_test.go @@ -576,6 +576,27 @@ func TestGetSidecarContainer(t *testing.T) { }, })) + t.Run("block shutdown duration", testSuiteGenerator([]testCase{ + { + name: "default to empty", + annotations: map[string]string{}, + assertFn: func(t *testing.T, container *corev1.Container) { + args := strings.Join(container.Args, " ") + assert.NotContains(t, args, "--dapr-block-shutdown-duration") + }, + }, + { + name: "add a block shutdown duration", + annotations: map[string]string{ + "dapr.io/block-shutdown-duration": "3s", + }, + assertFn: func(t *testing.T, container *corev1.Container) { + args := strings.Join(container.Args, " ") + assert.Contains(t, args, "--dapr-block-shutdown-duration 3s") + }, + }, + })) + t.Run("sidecar image", testSuiteGenerator([]testCase{ { name: "no annotation", diff --git a/pkg/runtime/config.go b/pkg/runtime/config.go index 98fcd811eef..b89bd94556b 100644 --- a/pkg/runtime/config.go +++ b/pkg/runtime/config.go @@ -90,6 +90,7 @@ type Config struct { DaprPublicPort string ApplicationPort string DaprGracefulShutdownSeconds int + DaprBlockShutdownDuration *time.Duration PlacementServiceHostAddr string DaprAPIListenAddresses string AppHealthProbeInterval int @@ -129,6 +130,7 @@ type internalConfig struct { unixDomainSocket string readBufferSize int gracefulShutdownDuration time.Duration + blockShutdownDuration *time.Duration enableAPILogging *bool disableBuiltinK8sSecretStore bool config []string @@ -284,8 +286,9 @@ func (c *Config) toInternal() (*internalConfig, error) { HealthCheckHTTPPath: c.AppHealthCheckPath, MaxConcurrency: c.AppMaxConcurrency, }, - registry: registry.New(c.Registry), - metricsExporter: metrics.NewExporterWithOptions(log, metrics.DefaultMetricNamespace, c.Metrics), + registry: registry.New(c.Registry), + metricsExporter: metrics.NewExporterWithOptions(log, metrics.DefaultMetricNamespace, c.Metrics), + blockShutdownDuration: c.DaprBlockShutdownDuration, } if len(intc.standalone.ResourcesPath) == 0 && c.ComponentsPath != "" { diff --git a/pkg/runtime/config_test.go b/pkg/runtime/config_test.go index af56a77252f..ddf02b3e37c 100644 --- a/pkg/runtime/config_test.go +++ b/pkg/runtime/config_test.go @@ -55,6 +55,8 @@ func TestParsePlacementAddr(t *testing.T) { func Test_toInternal(t *testing.T) { cfg := defaultTestConfig() + var nilDuration *time.Duration + intc, err := cfg.toInternal() require.NoError(t, err) @@ -81,6 +83,7 @@ func Test_toInternal(t *testing.T) { assert.Equal(t, "", intc.unixDomainSocket) assert.Equal(t, 4, intc.readBufferSize) assert.Equal(t, time.Second, intc.gracefulShutdownDuration) + assert.Equal(t, nilDuration, intc.blockShutdownDuration) assert.Equal(t, ptr.Of(true), intc.enableAPILogging) assert.True(t, intc.disableBuiltinK8sSecretStore) assert.Equal(t, "1.1.1.1", intc.appConnectionConfig.ChannelAddress) diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index da80145f05d..257392ad584 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -36,6 +36,7 @@ import ( sdktrace "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.10.0" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/clock" nr "github.com/dapr/components-contrib/nameresolution" "github.com/dapr/components-contrib/state" @@ -98,6 +99,7 @@ type DaprRuntime struct { daprHTTPAPI http.API daprGRPCAPI grpc.API operatorClient operatorv1pb.OperatorClient + isAppHealthy chan struct{} appHealth *apphealth.AppHealth appHealthReady func(context.Context) error // Invoked the first time the app health becomes ready appHealthLock sync.Mutex @@ -107,6 +109,7 @@ type DaprRuntime struct { authz *authorizer.Authorizer sec security.Handler runnerCloser *concurrency.RunnerCloserManager + clock clock.Clock // Used for testing. initComplete chan struct{} @@ -206,7 +209,10 @@ func newDaprRuntime(ctx context.Context, namespace: namespace, podName: podName, initComplete: make(chan struct{}), + isAppHealthy: make(chan struct{}), + clock: new(clock.RealClock), } + close(rt.isAppHealthy) var gracePeriod *time.Duration if duration := runtimeConfig.gracefulShutdownDuration; duration > 0 { @@ -274,7 +280,31 @@ func newDaprRuntime(ctx context.Context, } // Run performs initialization of the runtime with the runtime and global configurations. -func (a *DaprRuntime) Run(ctx context.Context) error { +func (a *DaprRuntime) Run(parentCtx context.Context) error { + ctx := parentCtx + if a.runtimeConfig.blockShutdownDuration != nil { + // Override context with Background. Runner context will be cancelled when + // blocking graceful shutdown returns. + ctx = context.Background() + a.runnerCloser.Add(func(ctx context.Context) error { + select { + case <-parentCtx.Done(): + case <-ctx.Done(): + // Return nil as another routine has returned, not due to an interrupt. + return nil + } + + log.Infof("Blocking graceful shutdown for %s or until app reports unhealthy...", *a.runtimeConfig.blockShutdownDuration) + select { + case <-a.clock.After(*a.runtimeConfig.blockShutdownDuration): + log.Info("Block shutdown period expired, entering shutdown...") + case <-a.isAppHealthy: + log.Info("App reported unhealthy, entering shutdown...") + } + return nil + }) + } + return a.runnerCloser.Run(ctx) } @@ -586,6 +616,12 @@ func (a *DaprRuntime) appHealthChanged(ctx context.Context, status uint8) { switch status { case apphealth.AppStatusHealthy: + select { + case <-a.isAppHealthy: + a.isAppHealthy = make(chan struct{}) + default: + } + // First time the app becomes healthy, complete the init process if a.appHealthReady != nil { if err := a.appHealthReady(ctx); err != nil { @@ -608,6 +644,12 @@ func (a *DaprRuntime) appHealthChanged(ctx context.Context, status uint8) { log.Warnf("failed to subscribe to outbox topics: %s", err) } case apphealth.AppStatusUnhealthy: + select { + case <-a.isAppHealthy: + default: + close(a.isAppHealthy) + } + // Stop topic subscriptions and input bindings a.processor.PubSub().StopSubscriptions() a.processor.Binding().StopReadingFromBindings() @@ -1025,7 +1067,7 @@ func (a *DaprRuntime) blockUntilAppIsReady(ctx context.Context) error { case <-ctx.Done(): return ctx.Err() // prevents overwhelming the OS with open connections - case <-time.After(time.Millisecond * 100): + case <-a.clock.After(time.Millisecond * 100): } } diff --git a/pkg/runtime/runtime_test.go b/pkg/runtime/runtime_test.go index d4d20263a57..bda03f7890c 100644 --- a/pkg/runtime/runtime_test.go +++ b/pkg/runtime/runtime_test.go @@ -49,6 +49,7 @@ import ( "google.golang.org/grpc/metadata" v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clocktesting "k8s.io/utils/clock/testing" "github.com/dapr/components-contrib/bindings" "github.com/dapr/components-contrib/lock" @@ -1855,6 +1856,126 @@ func TestGracefulShutdownBindings(t *testing.T) { } } +func TestBlockShutdownBindings(t *testing.T) { + t.Run("block timeout", func(t *testing.T) { + rt, err := NewTestDaprRuntime(t, modes.StandaloneMode) + require.NoError(t, err) + + fakeClock := clocktesting.NewFakeClock(time.Now()) + rt.clock = fakeClock + rt.appHealthChanged(context.Background(), apphealth.AppStatusHealthy) + + rt.runtimeConfig.blockShutdownDuration = ptr.Of(time.Millisecond * 100) + rt.runtimeConfig.gracefulShutdownDuration = 3 * time.Second + rt.runtimeConfig.registry.Bindings().RegisterInputBinding( + func(_ logger.Logger) bindings.InputBinding { + return &daprt.MockBinding{} + }, + "testInputBinding", + ) + + ctx, cancel := context.WithCancel(context.Background()) + errCh := make(chan error) + go func() { + errCh <- rt.Run(ctx) + }() + + cin := componentsV1alpha1.Component{} + cin.ObjectMeta.Name = "testInputBinding" + cin.Spec.Type = "bindings.testInputBinding" + + rt.runtimeConfig.registry.Bindings().RegisterOutputBinding( + func(_ logger.Logger) bindings.OutputBinding { + return &daprt.MockBinding{} + }, + "testOutputBinding", + ) + cout := componentsV1alpha1.Component{} + cout.ObjectMeta.Name = "testOutputBinding" + cout.Spec.Type = "bindings.testOutputBinding" + require.NoError(t, rt.processor.Init(context.Background(), cin)) + require.NoError(t, rt.processor.Init(context.Background(), cout)) + assert.Len(t, rt.compStore.ListInputBindings(), 1) + assert.Len(t, rt.compStore.ListOutputBindings(), 1) + + cancel() + + select { + case <-time.After(time.Second): + case <-errCh: + assert.Fail(t, "expected not to return until block timeout is reached") + } + + fakeClock.Step(time.Millisecond * 200) + + select { + case <-time.After(rt.runtimeConfig.gracefulShutdownDuration + 2*time.Second): + assert.Fail(t, "input bindings shutdown timed out") + case err := <-errCh: + require.NoError(t, err) + } + }) + + t.Run("block app unhealthy", func(t *testing.T) { + rt, err := NewTestDaprRuntime(t, modes.StandaloneMode) + require.NoError(t, err) + + fakeClock := clocktesting.NewFakeClock(time.Now()) + rt.clock = fakeClock + rt.appHealthChanged(context.Background(), apphealth.AppStatusHealthy) + + rt.runtimeConfig.blockShutdownDuration = ptr.Of(time.Millisecond * 100) + rt.runtimeConfig.gracefulShutdownDuration = 3 * time.Second + rt.runtimeConfig.registry.Bindings().RegisterInputBinding( + func(_ logger.Logger) bindings.InputBinding { + return &daprt.MockBinding{} + }, + "testInputBinding", + ) + + ctx, cancel := context.WithCancel(context.Background()) + errCh := make(chan error) + go func() { + errCh <- rt.Run(ctx) + }() + + cin := componentsV1alpha1.Component{} + cin.ObjectMeta.Name = "testInputBinding" + cin.Spec.Type = "bindings.testInputBinding" + + rt.runtimeConfig.registry.Bindings().RegisterOutputBinding( + func(_ logger.Logger) bindings.OutputBinding { + return &daprt.MockBinding{} + }, + "testOutputBinding", + ) + cout := componentsV1alpha1.Component{} + cout.ObjectMeta.Name = "testOutputBinding" + cout.Spec.Type = "bindings.testOutputBinding" + require.NoError(t, rt.processor.Init(context.Background(), cin)) + require.NoError(t, rt.processor.Init(context.Background(), cout)) + assert.Len(t, rt.compStore.ListInputBindings(), 1) + assert.Len(t, rt.compStore.ListOutputBindings(), 1) + + cancel() + + select { + case <-time.After(time.Second): + case <-errCh: + assert.Fail(t, "expected not to return until block timeout is reached") + } + + rt.appHealthChanged(context.Background(), apphealth.AppStatusUnhealthy) + + select { + case <-time.After(rt.runtimeConfig.gracefulShutdownDuration + 2*time.Second): + assert.Fail(t, "input bindings shutdown timed out") + case err := <-errCh: + require.NoError(t, err) + } + }) +} + func TestGracefulShutdownPubSub(t *testing.T) { rt, err := NewTestDaprRuntime(t, modes.StandaloneMode) require.NoError(t, err) diff --git a/tests/integration/framework/framework.go b/tests/integration/framework/framework.go index abb99e727c7..19730a25278 100644 --- a/tests/integration/framework/framework.go +++ b/tests/integration/framework/framework.go @@ -55,7 +55,8 @@ func (f *Framework) Cleanup(t *testing.T) { t.Logf("stopping %d processes", len(f.procs)) - for _, proc := range f.procs { - proc.Cleanup(t) + // Cleanup processes in reverse order in a stack fashion (same as t.Cleanup). + for i := len(f.procs) - 1; i >= 0; i-- { + f.procs[i].Cleanup(t) } } diff --git a/tests/integration/framework/process/daprd/daprd.go b/tests/integration/framework/process/daprd/daprd.go index 0f461d30c85..af1a2a95f98 100644 --- a/tests/integration/framework/process/daprd/daprd.go +++ b/tests/integration/framework/process/daprd/daprd.go @@ -128,6 +128,12 @@ func New(t *testing.T, fopts ...Option) *Daprd { if opts.disableK8sSecretStore != nil { args = append(args, "--disable-builtin-k8s-secret-store="+strconv.FormatBool(*opts.disableK8sSecretStore)) } + if opts.gracefulShutdownSeconds != nil { + args = append(args, "--dapr-graceful-shutdown-seconds="+strconv.Itoa(*opts.gracefulShutdownSeconds)) + } + if opts.blockShutdownDuration != nil { + args = append(args, "--dapr-block-shutdown-duration="+*opts.blockShutdownDuration) + } return &Daprd{ exec: exec.New(t, binary.EnvValue("daprd"), args, opts.execOpts...), diff --git a/tests/integration/framework/process/daprd/options.go b/tests/integration/framework/process/daprd/options.go index 2b78316f41e..86b1f1912c6 100644 --- a/tests/integration/framework/process/daprd/options.go +++ b/tests/integration/framework/process/daprd/options.go @@ -44,6 +44,8 @@ type options struct { sentryAddress string controlPlaneAddress string disableK8sSecretStore *bool + gracefulShutdownSeconds *int + blockShutdownDuration *string } func WithExecOptions(execOptions ...exec.Option) Option { @@ -198,3 +200,15 @@ func WithDisableK8sSecretStore(disable bool) Option { o.disableK8sSecretStore = &disable } } + +func WithDaprGracefulShutdownSeconds(seconds int) Option { + return func(o *options) { + o.gracefulShutdownSeconds = &seconds + } +} + +func WithDaprBlockShutdownDuration(duration string) Option { + return func(o *options) { + o.blockShutdownDuration = &duration + } +} diff --git a/tests/integration/framework/process/exec/exec.go b/tests/integration/framework/process/exec/exec.go index 30e90dc983a..556f9c2b033 100644 --- a/tests/integration/framework/process/exec/exec.go +++ b/tests/integration/framework/process/exec/exec.go @@ -97,6 +97,7 @@ func (e *exec) Run(t *testing.T, ctx context.Context) { e.cmd.Stdout = e.stdoutpipe e.cmd.Stderr = e.stderrpipe + // Wait for a few seconds before killing the process completely. e.cmd.WaitDelay = time.Second * 5 @@ -112,11 +113,12 @@ func (e *exec) Cleanup(t *testing.T) { e.lock.Lock() defer e.lock.Unlock() - require.NoError(t, e.stderrpipe.Close()) - require.NoError(t, e.stdoutpipe.Close()) - kill.Kill(t, e.cmd) + e.checkExit(t) + + require.NoError(t, e.stderrpipe.Close()) + require.NoError(t, e.stdoutpipe.Close()) } func (e *exec) checkExit(t *testing.T) { @@ -125,6 +127,6 @@ func (e *exec) checkExit(t *testing.T) { t.Logf("waiting for %q process to exit", filepath.Base(e.binPath)) e.runErrorFn(t, e.cmd.Wait()) - assert.NotNil(t, e.cmd.ProcessState, "process state should not be nil") + require.NotNil(t, e.cmd.ProcessState, "process state should not be nil") assert.Equalf(t, e.exitCode, e.cmd.ProcessState.ExitCode(), "expected exit code to be %d", e.exitCode) } diff --git a/tests/integration/framework/process/logline/logline.go b/tests/integration/framework/process/logline/logline.go index 1dd0fe6ff5c..73f65c34387 100644 --- a/tests/integration/framework/process/logline/logline.go +++ b/tests/integration/framework/process/logline/logline.go @@ -19,6 +19,7 @@ import ( "errors" "io" "strings" + "sync/atomic" "testing" "github.com/stretchr/testify/assert" @@ -39,6 +40,10 @@ type LogLine struct { stderr io.Reader stderrExp io.WriteCloser stderrLinContains map[string]bool + + outCheck chan map[string]bool + closeCh chan struct{} + done atomic.Int32 } func New(t *testing.T, fopts ...Option) *LogLine { @@ -70,41 +75,46 @@ func New(t *testing.T, fopts ...Option) *LogLine { stderr: io.TeeReader(stderrReader, iowriter.New(t, "logline:stderr")), stderrExp: stderrWriter, stderrLinContains: stderrLineContains, + outCheck: make(chan map[string]bool), + closeCh: make(chan struct{}), } } func (l *LogLine) Run(t *testing.T, ctx context.Context) { - outCheck := make(chan map[string]bool) go func() { - outCheck <- l.checkOut(t, ctx, l.stdoutLineContains, l.stdoutExp, l.stdout) + res := l.checkOut(t, ctx, l.stdoutLineContains, l.stdoutExp, l.stdout) + l.done.Add(1) + l.outCheck <- res }() go func() { - outCheck <- l.checkOut(t, ctx, l.stderrLinContains, l.stderrExp, l.stderr) + res := l.checkOut(t, ctx, l.stderrLinContains, l.stderrExp, l.stderr) + l.done.Add(1) + l.outCheck <- res }() +} - for i := 0; i < 2; i++ { - for expLine := range <-outCheck { - assert.Fail(t, "expected to log line: %s", expLine) - } - } +func (l *LogLine) FoundAll() bool { + return l.done.Load() == 2 } func (l *LogLine) Cleanup(t *testing.T) { - l.stdoutExp.Close() - l.stderrExp.Close() + close(l.closeCh) + for i := 0; i < 2; i++ { + for expLine := range <-l.outCheck { + assert.Fail(t, "expected to log line: "+expLine) + } + } } func (l *LogLine) checkOut(t *testing.T, ctx context.Context, expLines map[string]bool, closer io.WriteCloser, reader io.Reader) map[string]bool { t.Helper() - closeCh := make(chan struct{}) - go func() { select { case <-ctx.Done(): - case <-closeCh: + closer.Close() + case <-l.closeCh: } - closer.Close() }() breader := bufio.NewReader(reader) @@ -126,8 +136,6 @@ func (l *LogLine) checkOut(t *testing.T, ctx context.Context, expLines map[strin } } - close(closeCh) - return expLines } diff --git a/tests/integration/framework/process/logline/options.go b/tests/integration/framework/process/logline/options.go index 63ed906c1cf..7e6b9bc64f5 100644 --- a/tests/integration/framework/process/logline/options.go +++ b/tests/integration/framework/process/logline/options.go @@ -19,14 +19,14 @@ type options struct { stderrContains []string } -func WithStdoutLineContains(line string) func(*options) { +func WithStdoutLineContains(lines ...string) func(*options) { return func(o *options) { - o.stdoutContains = append(o.stdoutContains, line) + o.stdoutContains = append(o.stdoutContains, lines...) } } -func WithStderrLineContains(line string) func(*options) { +func WithStderrLineContains(lines ...string) func(*options) { return func(o *options) { - o.stderrContains = append(o.stderrContains, line) + o.stderrContains = append(o.stderrContains, lines...) } } diff --git a/tests/integration/suite/actors/http/ttl.go b/tests/integration/suite/actors/http/ttl.go index 3b377b79fc2..88e22907568 100644 --- a/tests/integration/suite/actors/http/ttl.go +++ b/tests/integration/suite/actors/http/ttl.go @@ -98,7 +98,7 @@ func (l *ttl) Run(t *testing.T, ctx context.Context) { require.NoError(c, rErr) require.NoError(c, resp.Body.Close()) assert.Equal(c, http.StatusOK, resp.StatusCode) - }, time.Second*10, time.Millisecond*100, "actor not ready") + }, time.Second*20, time.Millisecond*100, "actor not ready") now := time.Now() diff --git a/tests/integration/suite/daprd/daprd.go b/tests/integration/suite/daprd/daprd.go index 9b0742927ac..7a2877b0106 100644 --- a/tests/integration/suite/daprd/daprd.go +++ b/tests/integration/suite/daprd/daprd.go @@ -26,5 +26,6 @@ import ( _ "github.com/dapr/dapr/tests/integration/suite/daprd/resources" _ "github.com/dapr/dapr/tests/integration/suite/daprd/secret" _ "github.com/dapr/dapr/tests/integration/suite/daprd/serviceinvocation" + _ "github.com/dapr/dapr/tests/integration/suite/daprd/shutdown" _ "github.com/dapr/dapr/tests/integration/suite/daprd/state" ) diff --git a/tests/integration/suite/daprd/resources/uniquename.go b/tests/integration/suite/daprd/resources/uniquename.go index eeff7bc69e3..9e00e3523cc 100644 --- a/tests/integration/suite/daprd/resources/uniquename.go +++ b/tests/integration/suite/daprd/resources/uniquename.go @@ -19,7 +19,9 @@ import ( "os" "path/filepath" "testing" + "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/dapr/dapr/tests/integration/framework" @@ -80,10 +82,10 @@ spec: ), ) return []framework.Option{ - framework.WithProcesses(u.daprd, u.logline), + framework.WithProcesses(u.logline, u.daprd), } } func (u *uniquename) Run(t *testing.T, ctx context.Context) { - // Assertions done in logline process + assert.Eventually(t, u.logline.FoundAll, time.Second*5, time.Millisecond*100) } diff --git a/tests/integration/suite/daprd/shutdown/block/app.go b/tests/integration/suite/daprd/shutdown/block/app.go new file mode 100644 index 00000000000..1852c8e35b0 --- /dev/null +++ b/tests/integration/suite/daprd/shutdown/block/app.go @@ -0,0 +1,18 @@ +/* +Copyright 2023 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package app + +import ( + _ "github.com/dapr/dapr/tests/integration/suite/daprd/shutdown/block/app" +) diff --git a/tests/integration/suite/daprd/shutdown/block/app/healthy.go b/tests/integration/suite/daprd/shutdown/block/app/healthy.go new file mode 100644 index 00000000000..61c9b25dfa3 --- /dev/null +++ b/tests/integration/suite/daprd/shutdown/block/app/healthy.go @@ -0,0 +1,179 @@ +/* +Copyright 2023 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package app + +import ( + "context" + "io" + "net/http" + "runtime" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + rtv1 "github.com/dapr/dapr/pkg/proto/runtime/v1" + "github.com/dapr/dapr/tests/integration/framework" + "github.com/dapr/dapr/tests/integration/framework/process/daprd" + "github.com/dapr/dapr/tests/integration/framework/process/exec" + prochttp "github.com/dapr/dapr/tests/integration/framework/process/http" + "github.com/dapr/dapr/tests/integration/framework/process/logline" + "github.com/dapr/dapr/tests/integration/suite" +) + +func init() { + suite.Register(new(healthy)) +} + +// health tests Daprd's --dapr-block-shutdown-seconds, ensuring shutdown will +// occur when the app becomes unhealthy. +type healthy struct { + daprd *daprd.Daprd + logline *logline.LogLine + appHealth atomic.Bool + healthzCalled atomic.Int64 + routeCh chan struct{} +} + +func (h *healthy) Setup(t *testing.T) []framework.Option { + if runtime.GOOS == "windows" { + t.Skip("Skipping test on windows which relies on unix process signals") + } + + h.appHealth.Store(true) + h.routeCh = make(chan struct{}, 1) + + handler := http.NewServeMux() + handler.HandleFunc("/dapr/subscribe", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + io.WriteString(w, `[{"pubsubname":"foo","topic":"topic","route":"route"}]`) + }) + handler.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { + defer h.healthzCalled.Add(1) + if h.appHealth.Load() { + w.WriteHeader(http.StatusOK) + return + } + w.WriteHeader(http.StatusServiceUnavailable) + }) + handler.HandleFunc("/route", func(w http.ResponseWriter, r *http.Request) { + h.routeCh <- struct{}{} + }) + app := prochttp.New(t, + prochttp.WithHandler(handler), + ) + + h.logline = logline.New(t, + logline.WithStdoutLineContains( + "Blocking graceful shutdown for 3m0s or until app reports unhealthy...", + "App reported unhealthy, entering shutdown...", + "Daprd shutdown gracefully", + ), + ) + + h.daprd = daprd.New(t, + daprd.WithDaprBlockShutdownDuration("180s"), + daprd.WithAppPort(app.Port()), + daprd.WithAppHealthCheck(true), + daprd.WithAppHealthCheckPath("/healthz"), + daprd.WithAppHealthProbeInterval(1), + daprd.WithAppHealthProbeThreshold(1), + daprd.WithExecOptions(exec.WithStdout(h.logline.Stdout())), + daprd.WithResourceFiles(` +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: foo +spec: + type: pubsub.in-memory + version: v1 +`)) + + return []framework.Option{ + framework.WithProcesses(app, h.logline), + } +} + +func (h *healthy) Run(t *testing.T, ctx context.Context) { + h.daprd.Run(t, ctx) + h.daprd.WaitUntilRunning(t, ctx) + + conn, err := grpc.DialContext(ctx, h.daprd.GRPCAddress(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, conn.Close()) }) + client := rtv1.NewDaprClient(conn) + + _, err = client.PublishEvent(ctx, &rtv1.PublishEventRequest{ + PubsubName: "foo", + Topic: "topic", + Data: []byte(`{"status":"completed"}`), + }) + require.NoError(t, err) + select { + case <-h.routeCh: + case <-ctx.Done(): + assert.Fail(t, "pubsub did not send message to subscriber") + } + + daprdStopped := make(chan struct{}) + go func() { + h.daprd.Cleanup(t) + close(daprdStopped) + }() + + healthzCalled := h.healthzCalled.Load() + assert.Eventually(t, func() bool { + return h.healthzCalled.Load() > healthzCalled + }, time.Second*5, time.Millisecond*100) + + _, err = client.PublishEvent(ctx, &rtv1.PublishEventRequest{ + PubsubName: "foo", + Topic: "topic", + Data: []byte(`{"status":"completed"}`), + }) + require.NoError(t, err) + select { + case <-h.routeCh: + case <-ctx.Done(): + assert.Fail(t, "pubsub did not send message to subscriber") + } + + healthzCalled = h.healthzCalled.Load() + h.appHealth.Store(false) + require.Equal(t, healthzCalled, h.healthzCalled.Load()) + + assert.Eventually(t, func() bool { + return h.healthzCalled.Load() > healthzCalled + }, time.Second*5, time.Millisecond*100) + + assert.EventuallyWithT(t, func(c *assert.CollectT) { + _, err = client.PublishEvent(ctx, &rtv1.PublishEventRequest{ + PubsubName: "foo", + Topic: "topic", + Data: []byte(`{"status":"completed"}`), + }) + //nolint:testifylint + assert.Error(c, err) + }, time.Second*5, time.Millisecond*100) + + select { + case <-daprdStopped: + case <-time.After(time.Second * 5): + assert.Fail(t, "daprd did not exit in time") + } +} diff --git a/tests/integration/suite/daprd/shutdown/block/app/unhealthy.go b/tests/integration/suite/daprd/shutdown/block/app/unhealthy.go new file mode 100644 index 00000000000..4f250c3b94e --- /dev/null +++ b/tests/integration/suite/daprd/shutdown/block/app/unhealthy.go @@ -0,0 +1,159 @@ +/* +Copyright 2023 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package app + +import ( + "context" + "io" + "net/http" + "runtime" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + commonv1 "github.com/dapr/dapr/pkg/proto/common/v1" + rtv1 "github.com/dapr/dapr/pkg/proto/runtime/v1" + "github.com/dapr/dapr/tests/integration/framework" + "github.com/dapr/dapr/tests/integration/framework/process/daprd" + "github.com/dapr/dapr/tests/integration/framework/process/exec" + prochttp "github.com/dapr/dapr/tests/integration/framework/process/http" + "github.com/dapr/dapr/tests/integration/framework/process/logline" + "github.com/dapr/dapr/tests/integration/suite" +) + +func init() { + suite.Register(new(unhealthy)) +} + +// unhealth tests Daprd's --dapr-block-shutdown-seconds, ensuring shutdown will +// occur straight away if the app is already unhealthy. +type unhealthy struct { + daprd *daprd.Daprd + logline *logline.LogLine + appHealth atomic.Bool + routeCh chan struct{} +} + +func (u *unhealthy) Setup(t *testing.T) []framework.Option { + if runtime.GOOS == "windows" { + t.Skip("Skipping test on windows which relies on unix process signals") + } + + u.appHealth.Store(true) + u.routeCh = make(chan struct{}, 1) + + handler := http.NewServeMux() + handler.HandleFunc("/dapr/subscribe", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + io.WriteString(w, `[{"pubsubname":"foo","topic":"topic","route":"route"}]`) + }) + handler.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { + if u.appHealth.Load() { + w.WriteHeader(http.StatusOK) + return + } + w.WriteHeader(http.StatusServiceUnavailable) + }) + handler.HandleFunc("/foo", func(w http.ResponseWriter, r *http.Request) { + }) + handler.HandleFunc("/route", func(w http.ResponseWriter, r *http.Request) { + u.routeCh <- struct{}{} + }) + app := prochttp.New(t, + prochttp.WithHandler(handler), + ) + + u.logline = logline.New(t, + logline.WithStdoutLineContains( + "Blocking graceful shutdown for 3m0s or until app reports unhealthy...", + "App reported unhealthy, entering shutdown...", + "Daprd shutdown gracefully", + ), + ) + + u.daprd = daprd.New(t, + daprd.WithDaprBlockShutdownDuration("180s"), + daprd.WithAppPort(app.Port()), + daprd.WithAppHealthCheck(true), + daprd.WithAppHealthCheckPath("/healthz"), + daprd.WithAppHealthProbeInterval(1), + daprd.WithAppHealthProbeThreshold(1), + daprd.WithExecOptions(exec.WithStdout(u.logline.Stdout())), + daprd.WithResourceFiles(` +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: foo +spec: + type: pubsub.in-memory + version: v1 +`)) + + return []framework.Option{ + framework.WithProcesses(app, u.logline), + } +} + +func (u *unhealthy) Run(t *testing.T, ctx context.Context) { + u.daprd.Run(t, ctx) + u.daprd.WaitUntilRunning(t, ctx) + + conn, err := grpc.DialContext(ctx, u.daprd.GRPCAddress(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, conn.Close()) }) + client := rtv1.NewDaprClient(conn) + + _, err = client.PublishEvent(ctx, &rtv1.PublishEventRequest{ + PubsubName: "foo", + Topic: "topic", + Data: []byte(`{"status":"completed"}`), + }) + require.NoError(t, err) + select { + case <-u.routeCh: + case <-ctx.Done(): + assert.Fail(t, "pubsub did not send message to subscriber") + } + + u.appHealth.Store(false) + + assert.EventuallyWithT(t, func(c *assert.CollectT) { + _, err = client.InvokeService(ctx, &rtv1.InvokeServiceRequest{ + Id: u.daprd.AppID(), + Message: &commonv1.InvokeRequest{ + Method: "foo", + HttpExtension: &commonv1.HTTPExtension{Verb: commonv1.HTTPExtension_GET}, + }, + }) + //nolint:testifylint + assert.ErrorContains(c, err, "app is not in a healthy state") + }, time.Second*5, time.Millisecond*100) + + daprdStopped := make(chan struct{}) + go func() { + u.daprd.Cleanup(t) + close(daprdStopped) + }() + + select { + case <-daprdStopped: + case <-time.After(time.Second * 5): + assert.Fail(t, "daprd did not exit in time") + } +} diff --git a/tests/integration/suite/daprd/shutdown/block/timeout.go b/tests/integration/suite/daprd/shutdown/block/timeout.go new file mode 100644 index 00000000000..bb9d1ac3d8b --- /dev/null +++ b/tests/integration/suite/daprd/shutdown/block/timeout.go @@ -0,0 +1,159 @@ +/* +Copyright 2023 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package app + +import ( + "context" + "io" + "net/http" + "runtime" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + rtv1 "github.com/dapr/dapr/pkg/proto/runtime/v1" + "github.com/dapr/dapr/tests/integration/framework" + "github.com/dapr/dapr/tests/integration/framework/process/daprd" + "github.com/dapr/dapr/tests/integration/framework/process/exec" + prochttp "github.com/dapr/dapr/tests/integration/framework/process/http" + "github.com/dapr/dapr/tests/integration/framework/process/logline" + "github.com/dapr/dapr/tests/integration/suite" +) + +func init() { + suite.Register(new(timeout)) +} + +// timeout tests Daprd's --dapr-block-shutdown-seconds, ensuring shutdown +// procedure will begin when seconds is reached when app still reports healthy. +type timeout struct { + daprd *daprd.Daprd + logline *logline.LogLine + routeCh chan struct{} +} + +func (i *timeout) Setup(t *testing.T) []framework.Option { + if runtime.GOOS == "windows" { + t.Skip("Skipping test on windows which relies on unix process signals") + } + + i.routeCh = make(chan struct{}, 1) + handler := http.NewServeMux() + handler.HandleFunc("/dapr/subscribe", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + io.WriteString(w, `[{"pubsubname":"foo","topic":"topic","route":"route"}]`) + }) + handler.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + }) + handler.HandleFunc("/route", func(w http.ResponseWriter, r *http.Request) { + i.routeCh <- struct{}{} + }) + app := prochttp.New(t, + prochttp.WithHandler(handler), + ) + + i.logline = logline.New(t, + logline.WithStdoutLineContains( + "Blocking graceful shutdown for 2s or until app reports unhealthy...", + "Block shutdown period expired, entering shutdown...", + "Daprd shutdown gracefully", + ), + ) + + i.daprd = daprd.New(t, + daprd.WithDaprBlockShutdownDuration("2s"), + daprd.WithAppPort(app.Port()), + daprd.WithAppHealthCheck(true), + daprd.WithAppHealthCheckPath("/healthz"), + daprd.WithAppHealthProbeInterval(1), + daprd.WithAppHealthProbeThreshold(1), + daprd.WithExecOptions(exec.WithStdout(i.logline.Stdout())), + daprd.WithResourceFiles(` +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: foo +spec: + type: pubsub.in-memory + version: v1 +`)) + + return []framework.Option{ + framework.WithProcesses(app, i.logline), + } +} + +func (i *timeout) Run(t *testing.T, ctx context.Context) { + i.daprd.Run(t, ctx) + i.daprd.WaitUntilRunning(t, ctx) + + conn, err := grpc.DialContext(ctx, i.daprd.GRPCAddress(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, conn.Close()) }) + client := rtv1.NewDaprClient(conn) + + _, err = client.PublishEvent(ctx, &rtv1.PublishEventRequest{ + PubsubName: "foo", + Topic: "topic", + Data: []byte(`{"status":"completed"}`), + }) + require.NoError(t, err) + select { + case <-i.routeCh: + case <-ctx.Done(): + assert.Fail(t, "pubsub message should have been sent to subscriber") + } + + daprdStopped := make(chan struct{}) + go func() { + i.daprd.Cleanup(t) + close(daprdStopped) + }() + + t.Run("daprd APIs should still be available during blocked shutdown", func(t *testing.T) { + time.Sleep(time.Second) + _, err = client.PublishEvent(ctx, &rtv1.PublishEventRequest{ + PubsubName: "foo", + Topic: "topic", + Data: []byte(`{"status":"completed"}`), + }) + require.NoError(t, err) + select { + case <-i.routeCh: + case <-ctx.Done(): + assert.Fail(t, "pubsub message should have been sent to subscriber") + } + }) + + t.Run("daprd APIs are no longer available when past blocked shutdown", func(t *testing.T) { + time.Sleep(time.Second * 3 / 2) + _, err = client.PublishEvent(ctx, &rtv1.PublishEventRequest{ + PubsubName: "foo", + Topic: "topic", + Data: []byte(`{"status":"completed"}`), + }) + require.Error(t, err) + }) + + select { + case <-daprdStopped: + case <-time.After(time.Second * 5): + assert.Fail(t, "daprd did not exit in time") + } +} diff --git a/tests/integration/suite/daprd/shutdown/graceful/graceful.go b/tests/integration/suite/daprd/shutdown/graceful/graceful.go new file mode 100644 index 00000000000..b70dcd23f24 --- /dev/null +++ b/tests/integration/suite/daprd/shutdown/graceful/graceful.go @@ -0,0 +1,65 @@ +/* +Copyright 2023 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package graceful + +import ( + "context" + "net/http" + "runtime" + "testing" + + "github.com/dapr/dapr/tests/integration/framework" + "github.com/dapr/dapr/tests/integration/framework/process/daprd" + "github.com/dapr/dapr/tests/integration/framework/process/exec" + prochttp "github.com/dapr/dapr/tests/integration/framework/process/http" + "github.com/dapr/dapr/tests/integration/framework/process/logline" + "github.com/dapr/dapr/tests/integration/suite" +) + +func init() { + suite.Register(new(graceful)) +} + +// graceful tests Daprd's --dapr-graceful-shutdown-seconds gracefully +// terminates on all resources closing. +type graceful struct { + daprd *daprd.Daprd +} + +func (g *graceful) Setup(t *testing.T) []framework.Option { + if runtime.GOOS == "windows" { + t.Skip("Skipping test on windows which relies on unix process signals") + } + + app := prochttp.New(t, prochttp.WithHandler(http.NewServeMux())) + + logline := logline.New(t, + logline.WithStdoutLineContains( + "Daprd shutdown gracefully", + ), + ) + + g.daprd = daprd.New(t, + daprd.WithDaprGracefulShutdownSeconds(5), + daprd.WithAppPort(app.Port()), + daprd.WithExecOptions(exec.WithStdout(logline.Stdout())), + ) + return []framework.Option{ + framework.WithProcesses(app, logline, g.daprd), + } +} + +func (g *graceful) Run(t *testing.T, ctx context.Context) { + g.daprd.WaitUntilRunning(t, ctx) +} diff --git a/tests/integration/suite/daprd/shutdown/graceful/timeout.go b/tests/integration/suite/daprd/shutdown/graceful/timeout.go new file mode 100644 index 00000000000..bdd7e8f349c --- /dev/null +++ b/tests/integration/suite/daprd/shutdown/graceful/timeout.go @@ -0,0 +1,106 @@ +/* +Copyright 2023 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package graceful + +import ( + "context" + "fmt" + "io" + "net/http" + "runtime" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/dapr/dapr/tests/integration/framework" + "github.com/dapr/dapr/tests/integration/framework/process/daprd" + "github.com/dapr/dapr/tests/integration/framework/process/exec" + prochttp "github.com/dapr/dapr/tests/integration/framework/process/http" + "github.com/dapr/dapr/tests/integration/framework/process/logline" + "github.com/dapr/dapr/tests/integration/framework/util" + "github.com/dapr/dapr/tests/integration/suite" +) + +func init() { + suite.Register(new(timeout)) +} + +// timeout tests Daprd's --dapr-graceful-shutdown-seconds where Daprd force exits +// because the graceful timeout expires. +type timeout struct { + daprd *daprd.Daprd + closeInvoke chan struct{} + inInvoke chan struct{} +} + +func (i *timeout) Setup(t *testing.T) []framework.Option { + if runtime.GOOS == "windows" { + t.Skip("Skipping test on windows which relies on unix process signals") + } + + i.closeInvoke = make(chan struct{}) + i.inInvoke = make(chan struct{}) + handler := http.NewServeMux() + handler.HandleFunc("/foo", func(w http.ResponseWriter, r *http.Request) { + close(i.inInvoke) + <-i.closeInvoke + }) + app := prochttp.New(t, + prochttp.WithHandler(handler), + ) + + logline := logline.New(t, + logline.WithStdoutLineContains( + "Graceful shutdown timeout exceeded, forcing shutdown", + ), + ) + + i.daprd = daprd.New(t, + daprd.WithAppPort(app.Port()), + daprd.WithDaprGracefulShutdownSeconds(1), + daprd.WithExecOptions( + exec.WithExitCode(1), + exec.WithRunError(func(t *testing.T, err error) { + require.ErrorContains(t, err, "exit status 1") + }), + exec.WithStdout(logline.Stdout()), + ), + ) + + return []framework.Option{ + framework.WithProcesses(app, logline), + } +} + +func (i *timeout) Run(t *testing.T, ctx context.Context) { + i.daprd.Run(t, ctx) + i.daprd.WaitUntilRunning(t, ctx) + client := util.HTTPClient(t) + + reqURL := fmt.Sprintf("http://localhost:%d/v1.0/invoke/%s/method/foo", i.daprd.HTTPPort(), i.daprd.AppID()) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, reqURL, nil) + require.NoError(t, err) + errCh := make(chan error) + go func() { + resp, cerr := client.Do(req) + if resp != nil { + resp.Body.Close() + } + errCh <- cerr + }() + <-i.inInvoke + i.daprd.Cleanup(t) + close(i.closeInvoke) + require.ErrorIs(t, <-errCh, io.EOF) +} diff --git a/tests/integration/suite/daprd/shutdown/shutdown.go b/tests/integration/suite/daprd/shutdown/shutdown.go new file mode 100644 index 00000000000..42bf5b6baa0 --- /dev/null +++ b/tests/integration/suite/daprd/shutdown/shutdown.go @@ -0,0 +1,19 @@ +/* +Copyright 2023 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package shutdown + +import ( + _ "github.com/dapr/dapr/tests/integration/suite/daprd/shutdown/block" + _ "github.com/dapr/dapr/tests/integration/suite/daprd/shutdown/graceful" +)