diff --git a/docs/release-notes/change-log.md b/docs/release-notes/change-log.md index 3d46dc484..d8a97ac20 100644 --- a/docs/release-notes/change-log.md +++ b/docs/release-notes/change-log.md @@ -9,6 +9,14 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## v1.3.1 + +### Server + +* Fixed error-passing between tier2 and tier1 (tier1 will not retry sending requests that fail deterministicly to tier2) +* Tier1 will now schedule a single job on tier2, quickly ramping up to the requested number of workers after 4 seconds of delay, to catch early exceptions +* "store became too big" is now considered a deterministic error and returns code "InvalidArgument" + ## v1.3.0 ### Highlights diff --git a/orchestrator/work/worker.go b/orchestrator/work/worker.go index b22c42eda..3387f539e 100644 --- a/orchestrator/work/worker.go +++ b/orchestrator/work/worker.go @@ -8,14 +8,15 @@ import ( "sync/atomic" "time" - "github.com/bufbuild/connect-go" "github.com/streamingfast/dauth" "github.com/streamingfast/derr" + "github.com/streamingfast/dgrpc" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" + otelCodes "go.opentelemetry.io/otel/codes" ttrace "go.opentelemetry.io/otel/trace" "go.uber.org/zap" + "google.golang.org/grpc/codes" "github.com/streamingfast/substreams/block" "github.com/streamingfast/substreams/client" @@ -244,7 +245,7 @@ func (w *RemoteWorker) work(ctx context.Context, request *pbssinternal.ProcessRa upstream.RPCFailedProgressResponse(r.Failed.Reason, r.Failed.Logs, r.Failed.LogsTruncated) err := fmt.Errorf("work failed on remote host: %s", r.Failed.Reason) - span.SetStatus(codes.Error, err.Error()) + span.SetStatus(otelCodes.Error, err.Error()) return &Result{Error: err} case *pbssinternal.ProcessRangeResponse_Completed: @@ -262,7 +263,7 @@ func (w *RemoteWorker) work(ctx context.Context, request *pbssinternal.ProcessRa if ctx.Err() != nil { return &Result{Error: ctx.Err()} } - if connect.CodeOf(err) == connect.CodeInvalidArgument { + if grpcErr := dgrpc.AsGRPCError(err); grpcErr.Code() == codes.InvalidArgument { return &Result{Error: err} } return &Result{ diff --git a/orchestrator/work/workerpool.go b/orchestrator/work/workerpool.go index cc980b3c0..2575fc94d 100644 --- a/orchestrator/work/workerpool.go +++ b/orchestrator/work/workerpool.go @@ -2,6 +2,7 @@ package work import ( "context" + "time" "go.uber.org/zap" @@ -10,6 +11,7 @@ import ( type WorkerPool struct { workers []*WorkerStatus + started *time.Time } type WorkerState int @@ -17,6 +19,7 @@ type WorkerState int const ( WorkerFree WorkerState = iota WorkerWorking + WorkerInitialWait ) type WorkerStatus struct { @@ -31,18 +34,37 @@ func NewWorkerPool(ctx context.Context, workerCount int, workerFactory WorkerFac workers := make([]*WorkerStatus, workerCount) for i := 0; i < workerCount; i++ { + state := WorkerFree + if i > 0 { + state = WorkerInitialWait + } workers[i] = &WorkerStatus{ Worker: workerFactory(logger), - State: WorkerFree, + State: state, } } + now := time.Now() return &WorkerPool{ workers: workers, + started: &now, + } +} + +func (p *WorkerPool) rampupWorkers() { + if p.started == nil || time.Since(*p.started) < time.Second*4 { + return + } + for _, w := range p.workers { + if w.State == WorkerInitialWait { + w.State = WorkerFree + } } + p.started = nil } func (p *WorkerPool) WorkerAvailable() bool { + p.rampupWorkers() for _, w := range p.workers { if w.State == WorkerFree { return true diff --git a/service/tier1.go b/service/tier1.go index 3933134e4..352ba859f 100644 --- a/service/tier1.go +++ b/service/tier1.go @@ -47,6 +47,7 @@ import ( "go.opentelemetry.io/otel/attribute" ttrace "go.opentelemetry.io/otel/trace" "go.uber.org/zap" + "google.golang.org/grpc/codes" "google.golang.org/protobuf/proto" ) @@ -266,19 +267,19 @@ func (s *Tier1Service) Blocks( err = s.blocks(runningContext, request, outputGraph, respFunc) - if grpcError := toGRPCError(runningContext, err); grpcError != nil { - switch connect.CodeOf(grpcError) { + if connectError := toConnectError(runningContext, err); connectError != nil { + switch connect.CodeOf(connectError) { case connect.CodeInternal: logger.Info("unexpected termination of stream of blocks", zap.String("stream_processor", "tier1"), zap.Error(err)) case connect.CodeInvalidArgument: logger.Debug("recording failure on request", zap.String("request_id", requestID)) - s.recordFailure(requestID, grpcError) + s.recordFailure(requestID, connectError) case connect.CodeCanceled: - logger.Info("Blocks request canceled by user", zap.Error(grpcError)) + logger.Info("Blocks request canceled by user", zap.Error(connectError)) default: - logger.Info("Blocks request completed with error", zap.Error(grpcError)) + logger.Info("Blocks request completed with error", zap.Error(connectError)) } - return grpcError + return connectError } logger.Info("Blocks request completed witout error") @@ -555,27 +556,41 @@ func setupRequestStats(ctx context.Context, requestDetails *reqctx.RequestDetail return reqctx.WithReqStats(ctx, stats), stats } -// toGRPCError turns an `err` into a gRPC error if it's non-nil, in the `nil` case, +// toConnectError turns an `err` into a connect error if it's non-nil, in the `nil` case, // `nil` is returned right away. // // If the `err` has in its chain of error either `context.Canceled`, `context.DeadlineExceeded` -// or `stream.ErrInvalidArg`, error is turned into a proper gRPC error respectively of code +// or `stream.ErrInvalidArg`, error is turned into a proper connect error respectively of code // `Canceled`, `DeadlineExceeded` or `InvalidArgument`. // -// If the `err` has its in chain any error constructed through `connect.NewError` (and its variants), then -// we return the first found error of such type directly, because it's already a gRPC error. +// If the `err` has in its chain any error constructed through `connect.NewError` (and its variants), then +// we return the first found error of such type directly, because it's already a connect error. +// +// If the `err` has in its chain any error constructed through `grpc` or `status`, it will be converted to connect equivalent. // // Otherwise, the error is assumed to be an internal error and turned backed into a proper // `connect.NewError(connect.CodeInternal, err)`. -func toGRPCError(ctx context.Context, err error) error { +func toConnectError(ctx context.Context, err error) error { if err == nil { return nil } + // GRPC to connect error if grpcError := dgrpc.AsGRPCError(err); grpcError != nil { + switch grpcError.Code() { + case codes.Canceled: + return connect.NewError(connect.CodeCanceled, grpcError.Err()) + case codes.Unavailable: + return connect.NewError(connect.CodeUnavailable, grpcError.Err()) + case codes.InvalidArgument: + return connect.NewError(connect.CodeInvalidArgument, grpcError.Err()) + case codes.Unknown: + return connect.NewError(connect.CodeUnknown, grpcError.Err()) + } return grpcError.Err() } + // special case for context canceled when shutting down if errors.Is(err, context.Canceled) { if context.Cause(ctx) != nil { err = context.Cause(ctx) @@ -586,10 +601,15 @@ func toGRPCError(ctx context.Context, err error) error { return connect.NewError(connect.CodeCanceled, err) } + // context deadline exceeded if errors.Is(err, context.DeadlineExceeded) { return connect.NewError(connect.CodeDeadlineExceeded, err) } + if store.StoreAboveMaxSizeRegexp.MatchString(err.Error()) { + return connect.NewError(connect.CodeInvalidArgument, err) + } + if errors.Is(err, exec.ErrWasmDeterministicExec) { return connect.NewError(connect.CodeInvalidArgument, err) } diff --git a/service/tier2.go b/service/tier2.go index f3954fd3c..82c6df637 100644 --- a/service/tier2.go +++ b/service/tier2.go @@ -2,6 +2,7 @@ package service import ( "context" + "errors" "fmt" "io" "os" @@ -11,6 +12,7 @@ import ( "github.com/streamingfast/bstream" "github.com/streamingfast/bstream/stream" "github.com/streamingfast/dauth" + "github.com/streamingfast/dgrpc" "github.com/streamingfast/dmetering" "github.com/streamingfast/dstore" "github.com/streamingfast/logging" @@ -22,6 +24,7 @@ import ( pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2" "github.com/streamingfast/substreams/pipeline" "github.com/streamingfast/substreams/pipeline/cache" + "github.com/streamingfast/substreams/pipeline/exec" "github.com/streamingfast/substreams/pipeline/outputmodules" "github.com/streamingfast/substreams/reqctx" "github.com/streamingfast/substreams/service/config" @@ -31,7 +34,9 @@ import ( "go.opentelemetry.io/otel/attribute" ttrace "go.opentelemetry.io/otel/trace" "go.uber.org/zap" + "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" ) type Tier2Service struct { @@ -173,7 +178,8 @@ func (s *Tier2Service) ProcessRange(request *pbssinternal.ProcessRangeRequest, s err = s.processRange(ctx, request, respFunc, tracing.GetTraceID(ctx).String()) grpcError = toGRPCError(ctx, err) - if grpcError != nil && connect.CodeOf(grpcError) == connect.CodeInternal { + switch status.Code(grpcError) { + case codes.Unknown, codes.Internal, codes.Unavailable: logger.Info("unexpected termination of stream of blocks", zap.Error(err)) } @@ -347,3 +353,67 @@ func updateStreamHeadersHostname(setHeader func(metadata.MD) error, logger *zap. } return hostname } + +// toGRPCError turns an `err` into a gRPC error if it's non-nil, in the `nil` case, +// `nil` is returned right away. +// +// If the `err` has in its chain of error either `context.Canceled`, `context.DeadlineExceeded` +// or `stream.ErrInvalidArg`, error is turned into a proper gRPC error respectively of code +// `Canceled`, `DeadlineExceeded` or `InvalidArgument`. +// +// If the `err` has its in chain any error constructed through `connect.NewError` (and its variants), then +// we return the first found error of such type directly, because it's already a gRPC error. +// +// Otherwise, the error is assumed to be an internal error and turned backed into a proper +// `connect.NewError(connect.CodeInternal, err)`. + +func toGRPCError(ctx context.Context, err error) error { + if err == nil { + return nil + } + + // already GRPC error + if grpcError := dgrpc.AsGRPCError(err); grpcError != nil { + return grpcError.Err() + } + + // GRPC to connect error + connectError := &connect.Error{} + if errors.As(err, &connectError) { + switch connectError.Code() { + case connect.CodeCanceled: + return status.Error(codes.Canceled, err.Error()) + case connect.CodeUnavailable: + return status.Error(codes.Canceled, err.Error()) + case connect.CodeInvalidArgument: + return status.Error(codes.InvalidArgument, err.Error()) + case connect.CodeUnknown: + return status.Error(codes.Unknown, err.Error()) + } + } + + if errors.Is(err, context.Canceled) { + if context.Cause(ctx) != nil { + err = context.Cause(ctx) + if err == errShuttingDown { + return status.Error(codes.Unavailable, err.Error()) + } + } + return status.Error(codes.Canceled, err.Error()) + } + if errors.Is(err, context.DeadlineExceeded) { + return status.Error(codes.DeadlineExceeded, err.Error()) + } + if store.StoreAboveMaxSizeRegexp.MatchString(err.Error()) { + return status.Error(codes.InvalidArgument, err.Error()) + } + if errors.Is(err, exec.ErrWasmDeterministicExec) { + return status.Error(codes.InvalidArgument, err.Error()) + } + + var errInvalidArg *stream.ErrInvalidArg + if errors.As(err, &errInvalidArg) { + return status.Error(codes.InvalidArgument, err.Error()) + } + return status.Error(codes.Internal, err.Error()) +} diff --git a/storage/store/delta.go b/storage/store/delta.go index cf96a6be4..c53c782a7 100644 --- a/storage/store/delta.go +++ b/storage/store/delta.go @@ -2,6 +2,7 @@ package store import ( "fmt" + "regexp" pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2" ) @@ -41,10 +42,16 @@ func (b *baseStore) ApplyDelta(delta *pbssinternal.StoreDelta) { } if b.totalSizeBytes > b.totalSizeLimit { - panic(fmt.Sprintf("store %q became too big at %d, maximum size: %d", b.Name(), b.totalSizeBytes, b.totalSizeLimit)) + panic(storeTooBigError(b.Name(), b.totalSizeBytes, b.totalSizeLimit)) } } +var StoreAboveMaxSizeRegexp = regexp.MustCompile("store .* became too big at [0-9]*, maximum size: [0-9]*") + +func storeTooBigError(storeName string, size, limit uint64) error { + return fmt.Errorf("store %q became too big at %d, maximum size: %d", storeName, size, limit) +} + func (b *baseStore) ApplyDeltasReverse(deltas []*pbssinternal.StoreDelta) { for i := len(deltas) - 1; i >= 0; i-- { delta := deltas[i] diff --git a/tools/tier2call.go b/tools/tier2call.go index a0105a4a3..72fde6833 100644 --- a/tools/tier2call.go +++ b/tools/tier2call.go @@ -98,7 +98,7 @@ func tier2CallE(cmd *cobra.Command, args []string) error { for { msg, err := req.Recv() if err != nil { - fmt.Println("Error: %w", err) + fmt.Println("Error:", err) break } cnt, _ := json.Marshal(msg)