Skip to content

Commit

Permalink
fix error code passing tier1/tier2, rampup parallel jobs over 4secs, fix
Browse files Browse the repository at this point in the history
  • Loading branch information
sduchesneau committed Jan 15, 2024
1 parent 3d5f799 commit 5747381
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 19 deletions.
8 changes: 8 additions & 0 deletions docs/release-notes/change-log.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## v1.3.1

### Server

* Fixed error-passing between tier2 and tier1 (tier1 will not retry sending requests that fail deterministicly to tier2)
* Tier1 will now schedule a single job on tier2, quickly ramping up to the requested number of workers after 4 seconds of delay, to catch early exceptions
* "store became too big" is now considered a deterministic error and returns code "InvalidArgument"

## v1.3.0

### Highlights
Expand Down
9 changes: 5 additions & 4 deletions orchestrator/work/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ import (
"sync/atomic"
"time"

"github.com/bufbuild/connect-go"
"github.com/streamingfast/dauth"
"github.com/streamingfast/derr"
"github.com/streamingfast/dgrpc"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
otelCodes "go.opentelemetry.io/otel/codes"
ttrace "go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"google.golang.org/grpc/codes"

"github.com/streamingfast/substreams/block"
"github.com/streamingfast/substreams/client"
Expand Down Expand Up @@ -244,7 +245,7 @@ func (w *RemoteWorker) work(ctx context.Context, request *pbssinternal.ProcessRa
upstream.RPCFailedProgressResponse(r.Failed.Reason, r.Failed.Logs, r.Failed.LogsTruncated)

err := fmt.Errorf("work failed on remote host: %s", r.Failed.Reason)
span.SetStatus(codes.Error, err.Error())
span.SetStatus(otelCodes.Error, err.Error())
return &Result{Error: err}

case *pbssinternal.ProcessRangeResponse_Completed:
Expand All @@ -262,7 +263,7 @@ func (w *RemoteWorker) work(ctx context.Context, request *pbssinternal.ProcessRa
if ctx.Err() != nil {
return &Result{Error: ctx.Err()}
}
if connect.CodeOf(err) == connect.CodeInvalidArgument {
if grpcErr := dgrpc.AsGRPCError(err); grpcErr.Code() == codes.InvalidArgument {
return &Result{Error: err}
}
return &Result{
Expand Down
24 changes: 23 additions & 1 deletion orchestrator/work/workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package work

import (
"context"
"time"

"go.uber.org/zap"

Expand All @@ -10,13 +11,15 @@ import (

type WorkerPool struct {
workers []*WorkerStatus
started *time.Time
}

type WorkerState int

const (
WorkerFree WorkerState = iota
WorkerWorking
WorkerInitialWait
)

type WorkerStatus struct {
Expand All @@ -31,18 +34,37 @@ func NewWorkerPool(ctx context.Context, workerCount int, workerFactory WorkerFac

workers := make([]*WorkerStatus, workerCount)
for i := 0; i < workerCount; i++ {
state := WorkerFree
if i > 0 {
state = WorkerInitialWait
}
workers[i] = &WorkerStatus{
Worker: workerFactory(logger),
State: WorkerFree,
State: state,
}
}

now := time.Now()
return &WorkerPool{
workers: workers,
started: &now,
}
}

func (p *WorkerPool) rampupWorkers() {
if p.started == nil || time.Since(*p.started) < time.Second*4 {
return
}
for _, w := range p.workers {
if w.State == WorkerInitialWait {
w.State = WorkerFree
}
}
p.started = nil
}

func (p *WorkerPool) WorkerAvailable() bool {
p.rampupWorkers()
for _, w := range p.workers {
if w.State == WorkerFree {
return true
Expand Down
42 changes: 31 additions & 11 deletions service/tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"go.opentelemetry.io/otel/attribute"
ttrace "go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/protobuf/proto"
)

Expand Down Expand Up @@ -266,19 +267,19 @@ func (s *Tier1Service) Blocks(

err = s.blocks(runningContext, request, outputGraph, respFunc)

if grpcError := toGRPCError(runningContext, err); grpcError != nil {
switch connect.CodeOf(grpcError) {
if connectError := toConnectError(runningContext, err); connectError != nil {
switch connect.CodeOf(connectError) {
case connect.CodeInternal:
logger.Info("unexpected termination of stream of blocks", zap.String("stream_processor", "tier1"), zap.Error(err))
case connect.CodeInvalidArgument:
logger.Debug("recording failure on request", zap.String("request_id", requestID))
s.recordFailure(requestID, grpcError)
s.recordFailure(requestID, connectError)
case connect.CodeCanceled:
logger.Info("Blocks request canceled by user", zap.Error(grpcError))
logger.Info("Blocks request canceled by user", zap.Error(connectError))
default:
logger.Info("Blocks request completed with error", zap.Error(grpcError))
logger.Info("Blocks request completed with error", zap.Error(connectError))
}
return grpcError
return connectError
}

logger.Info("Blocks request completed witout error")
Expand Down Expand Up @@ -555,27 +556,41 @@ func setupRequestStats(ctx context.Context, requestDetails *reqctx.RequestDetail
return reqctx.WithReqStats(ctx, stats), stats
}

// toGRPCError turns an `err` into a gRPC error if it's non-nil, in the `nil` case,
// toConnectError turns an `err` into a connect error if it's non-nil, in the `nil` case,
// `nil` is returned right away.
//
// If the `err` has in its chain of error either `context.Canceled`, `context.DeadlineExceeded`
// or `stream.ErrInvalidArg`, error is turned into a proper gRPC error respectively of code
// or `stream.ErrInvalidArg`, error is turned into a proper connect error respectively of code
// `Canceled`, `DeadlineExceeded` or `InvalidArgument`.
//
// If the `err` has its in chain any error constructed through `connect.NewError` (and its variants), then
// we return the first found error of such type directly, because it's already a gRPC error.
// If the `err` has in its chain any error constructed through `connect.NewError` (and its variants), then
// we return the first found error of such type directly, because it's already a connect error.
//
// If the `err` has in its chain any error constructed through `grpc` or `status`, it will be converted to connect equivalent.
//
// Otherwise, the error is assumed to be an internal error and turned backed into a proper
// `connect.NewError(connect.CodeInternal, err)`.
func toGRPCError(ctx context.Context, err error) error {
func toConnectError(ctx context.Context, err error) error {
if err == nil {
return nil
}

// GRPC to connect error
if grpcError := dgrpc.AsGRPCError(err); grpcError != nil {
switch grpcError.Code() {
case codes.Canceled:
return connect.NewError(connect.CodeCanceled, grpcError.Err())
case codes.Unavailable:
return connect.NewError(connect.CodeUnavailable, grpcError.Err())
case codes.InvalidArgument:
return connect.NewError(connect.CodeInvalidArgument, grpcError.Err())
case codes.Unknown:
return connect.NewError(connect.CodeUnknown, grpcError.Err())
}
return grpcError.Err()
}

// special case for context canceled when shutting down
if errors.Is(err, context.Canceled) {
if context.Cause(ctx) != nil {
err = context.Cause(ctx)
Expand All @@ -586,10 +601,15 @@ func toGRPCError(ctx context.Context, err error) error {
return connect.NewError(connect.CodeCanceled, err)
}

// context deadline exceeded
if errors.Is(err, context.DeadlineExceeded) {
return connect.NewError(connect.CodeDeadlineExceeded, err)
}

if store.StoreAboveMaxSizeRegexp.MatchString(err.Error()) {
return connect.NewError(connect.CodeInvalidArgument, err)
}

if errors.Is(err, exec.ErrWasmDeterministicExec) {
return connect.NewError(connect.CodeInvalidArgument, err)
}
Expand Down
72 changes: 71 additions & 1 deletion service/tier2.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package service

import (
"context"
"errors"
"fmt"
"io"
"os"
Expand All @@ -11,6 +12,7 @@ import (
"github.com/streamingfast/bstream"
"github.com/streamingfast/bstream/stream"
"github.com/streamingfast/dauth"
"github.com/streamingfast/dgrpc"
"github.com/streamingfast/dmetering"
"github.com/streamingfast/dstore"
"github.com/streamingfast/logging"
Expand All @@ -22,6 +24,7 @@ import (
pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2"
"github.com/streamingfast/substreams/pipeline"
"github.com/streamingfast/substreams/pipeline/cache"
"github.com/streamingfast/substreams/pipeline/exec"
"github.com/streamingfast/substreams/pipeline/outputmodules"
"github.com/streamingfast/substreams/reqctx"
"github.com/streamingfast/substreams/service/config"
Expand All @@ -31,7 +34,9 @@ import (
"go.opentelemetry.io/otel/attribute"
ttrace "go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)

type Tier2Service struct {
Expand Down Expand Up @@ -173,7 +178,8 @@ func (s *Tier2Service) ProcessRange(request *pbssinternal.ProcessRangeRequest, s
err = s.processRange(ctx, request, respFunc, tracing.GetTraceID(ctx).String())
grpcError = toGRPCError(ctx, err)

if grpcError != nil && connect.CodeOf(grpcError) == connect.CodeInternal {
switch status.Code(grpcError) {
case codes.Unknown, codes.Internal, codes.Unavailable:
logger.Info("unexpected termination of stream of blocks", zap.Error(err))
}

Expand Down Expand Up @@ -347,3 +353,67 @@ func updateStreamHeadersHostname(setHeader func(metadata.MD) error, logger *zap.
}
return hostname
}

// toGRPCError turns an `err` into a gRPC error if it's non-nil, in the `nil` case,
// `nil` is returned right away.
//
// If the `err` has in its chain of error either `context.Canceled`, `context.DeadlineExceeded`
// or `stream.ErrInvalidArg`, error is turned into a proper gRPC error respectively of code
// `Canceled`, `DeadlineExceeded` or `InvalidArgument`.
//
// If the `err` has its in chain any error constructed through `connect.NewError` (and its variants), then
// we return the first found error of such type directly, because it's already a gRPC error.
//
// Otherwise, the error is assumed to be an internal error and turned backed into a proper
// `connect.NewError(connect.CodeInternal, err)`.

func toGRPCError(ctx context.Context, err error) error {
if err == nil {
return nil
}

// already GRPC error
if grpcError := dgrpc.AsGRPCError(err); grpcError != nil {
return grpcError.Err()
}

// GRPC to connect error
connectError := &connect.Error{}
if errors.As(err, &connectError) {
switch connectError.Code() {
case connect.CodeCanceled:
return status.Error(codes.Canceled, err.Error())
case connect.CodeUnavailable:
return status.Error(codes.Canceled, err.Error())
case connect.CodeInvalidArgument:
return status.Error(codes.InvalidArgument, err.Error())
case connect.CodeUnknown:
return status.Error(codes.Unknown, err.Error())
}
}

if errors.Is(err, context.Canceled) {
if context.Cause(ctx) != nil {
err = context.Cause(ctx)
if err == errShuttingDown {
return status.Error(codes.Unavailable, err.Error())
}
}
return status.Error(codes.Canceled, err.Error())
}
if errors.Is(err, context.DeadlineExceeded) {
return status.Error(codes.DeadlineExceeded, err.Error())
}
if store.StoreAboveMaxSizeRegexp.MatchString(err.Error()) {
return status.Error(codes.InvalidArgument, err.Error())
}
if errors.Is(err, exec.ErrWasmDeterministicExec) {
return status.Error(codes.InvalidArgument, err.Error())
}

var errInvalidArg *stream.ErrInvalidArg
if errors.As(err, &errInvalidArg) {
return status.Error(codes.InvalidArgument, err.Error())
}
return status.Error(codes.Internal, err.Error())
}
9 changes: 8 additions & 1 deletion storage/store/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package store

import (
"fmt"
"regexp"

pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2"
)
Expand Down Expand Up @@ -41,10 +42,16 @@ func (b *baseStore) ApplyDelta(delta *pbssinternal.StoreDelta) {
}

if b.totalSizeBytes > b.totalSizeLimit {
panic(fmt.Sprintf("store %q became too big at %d, maximum size: %d", b.Name(), b.totalSizeBytes, b.totalSizeLimit))
panic(storeTooBigError(b.Name(), b.totalSizeBytes, b.totalSizeLimit))
}
}

var StoreAboveMaxSizeRegexp = regexp.MustCompile("store .* became too big at [0-9]*, maximum size: [0-9]*")

func storeTooBigError(storeName string, size, limit uint64) error {
return fmt.Errorf("store %q became too big at %d, maximum size: %d", storeName, size, limit)
}

func (b *baseStore) ApplyDeltasReverse(deltas []*pbssinternal.StoreDelta) {
for i := len(deltas) - 1; i >= 0; i-- {
delta := deltas[i]
Expand Down
2 changes: 1 addition & 1 deletion tools/tier2call.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func tier2CallE(cmd *cobra.Command, args []string) error {
for {
msg, err := req.Recv()
if err != nil {
fmt.Println("Error: %w", err)
fmt.Println("Error:", err)
break
}
cnt, _ := json.Marshal(msg)
Expand Down

0 comments on commit 5747381

Please sign in to comment.