Skip to content

Commit

Permalink
fix: tracing for pushsync and retrieval (#4281)
Browse files Browse the repository at this point in the history
  • Loading branch information
notanatol authored Jan 16, 2024
1 parent 2a8c3f9 commit 2cdfadf
Show file tree
Hide file tree
Showing 11 changed files with 187 additions and 68 deletions.
20 changes: 17 additions & 3 deletions pkg/api/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tracing"
"github.com/gorilla/mux"
"github.com/opentracing/opentracing-go/ext"
"github.com/opentracing/opentracing-go/log"
olog "github.com/opentracing/opentracing-go/log"
)

type bytesPostResponse struct {
Expand All @@ -26,7 +29,8 @@ type bytesPostResponse struct {

// bytesUploadHandler handles upload of raw binary data of arbitrary length.
func (s *Service) bytesUploadHandler(w http.ResponseWriter, r *http.Request) {
logger := tracing.NewLoggerWithTraceID(r.Context(), s.logger.WithName("post_bytes").Build())
span, logger, ctx := s.tracer.StartSpanFromContext(r.Context(), "post_bytes", s.logger.WithName("post_bytes").Build())
defer span.Finish()

headers := struct {
BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"`
Expand Down Expand Up @@ -57,11 +61,13 @@ func (s *Service) bytesUploadHandler(w http.ResponseWriter, r *http.Request) {
default:
jsonhttp.InternalServerError(w, "cannot get or create tag")
}
ext.LogError(span, err, log.String("action", "tag.create"))
return
}
span.SetTag("tagID", tag)
}

putter, err := s.newStamperPutter(r.Context(), putterOptions{
putter, err := s.newStamperPutter(ctx, putterOptions{
BatchID: headers.BatchID,
TagID: tag,
Pin: headers.Pin,
Expand All @@ -82,6 +88,7 @@ func (s *Service) bytesUploadHandler(w http.ResponseWriter, r *http.Request) {
default:
jsonhttp.BadRequest(w, nil)
}
ext.LogError(span, err, log.String("action", "new.StamperPutter"))
return
}

Expand All @@ -92,7 +99,7 @@ func (s *Service) bytesUploadHandler(w http.ResponseWriter, r *http.Request) {
}

p := requestPipelineFn(putter, headers.Encrypt)
address, err := p(r.Context(), r.Body)
address, err := p(ctx, r.Body)
if err != nil {
logger.Debug("split write all failed", "error", err)
logger.Error(nil, "split write all failed")
Expand All @@ -102,20 +109,27 @@ func (s *Service) bytesUploadHandler(w http.ResponseWriter, r *http.Request) {
default:
jsonhttp.InternalServerError(ow, "split write all failed")
}
ext.LogError(span, err, log.String("action", "split.WriteAll"))
return
}

span.SetTag("root_address", address)

err = putter.Done(address)
if err != nil {
logger.Debug("done split failed", "error", err)
logger.Error(nil, "done split failed")
jsonhttp.InternalServerError(ow, "done split failed")
ext.LogError(span, err, log.String("action", "putter.Done"))
return
}

if tag != 0 {
w.Header().Set(SwarmTagHeader, fmt.Sprint(tag))
}

span.LogFields(olog.Bool("success", true))

w.Header().Set("Access-Control-Expose-Headers", SwarmTagHeader)
jsonhttp.Created(w, bytesPostResponse{
Reference: address,
Expand Down
27 changes: 21 additions & 6 deletions pkg/api/bzz.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,18 @@ import (
"encoding/hex"
"errors"
"fmt"
"github.com/ethersphere/bee/pkg/topology"
"net/http"
"path"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/ethersphere/bee/pkg/topology"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
olog "github.com/opentracing/opentracing-go/log"

"github.com/ethereum/go-ethereum/common"
"github.com/ethersphere/bee/pkg/feeds"
"github.com/ethersphere/bee/pkg/file/joiner"
Expand All @@ -34,7 +38,8 @@ import (
)

func (s *Service) bzzUploadHandler(w http.ResponseWriter, r *http.Request) {
logger := tracing.NewLoggerWithTraceID(r.Context(), s.logger.WithName("post_bzz").Build())
span, logger, ctx := s.tracer.StartSpanFromContext(r.Context(), "post_bzz", s.logger.WithName("post_bzz").Build())
defer span.Finish()

headers := struct {
ContentType string `map:"Content-Type,mimeMediaType" validate:"required"`
Expand Down Expand Up @@ -67,11 +72,13 @@ func (s *Service) bzzUploadHandler(w http.ResponseWriter, r *http.Request) {
default:
jsonhttp.InternalServerError(w, "cannot get or create tag")
}
ext.LogError(span, err, olog.String("action", "tag.create"))
return
}
span.SetTag("tagID", tag)
}

putter, err := s.newStamperPutter(r.Context(), putterOptions{
putter, err := s.newStamperPutter(ctx, putterOptions{
BatchID: headers.BatchID,
TagID: tag,
Pin: headers.Pin,
Expand All @@ -92,6 +99,7 @@ func (s *Service) bzzUploadHandler(w http.ResponseWriter, r *http.Request) {
default:
jsonhttp.BadRequest(w, nil)
}
ext.LogError(span, err, olog.String("action", "new.StamperPutter"))
return
}

Expand All @@ -102,10 +110,10 @@ func (s *Service) bzzUploadHandler(w http.ResponseWriter, r *http.Request) {
}

if headers.IsDir || headers.ContentType == multiPartFormData {
s.dirUploadHandler(logger, ow, r, putter, r.Header.Get(ContentTypeHeader), headers.Encrypt, tag)
s.dirUploadHandler(ctx, logger, span, ow, r, putter, r.Header.Get(ContentTypeHeader), headers.Encrypt, tag)
return
}
s.fileUploadHandler(logger, ow, r, putter, headers.Encrypt, tag)
s.fileUploadHandler(ctx, logger, span, ow, r, putter, headers.Encrypt, tag)
}

// fileUploadResponse is returned when an HTTP request to upload a file is successful
Expand All @@ -116,7 +124,9 @@ type bzzUploadResponse struct {
// fileUploadHandler uploads the file and its metadata supplied in the file body and
// the headers
func (s *Service) fileUploadHandler(
ctx context.Context,
logger log.Logger,
span opentracing.Span,
w http.ResponseWriter,
r *http.Request,
putter storer.PutterSession,
Expand All @@ -132,7 +142,6 @@ func (s *Service) fileUploadHandler(
}

p := requestPipelineFn(putter, encrypt)
ctx := r.Context()

// first store the file and get its reference
fr, err := p(ctx, r.Body)
Expand All @@ -145,6 +154,7 @@ func (s *Service) fileUploadHandler(
default:
jsonhttp.InternalServerError(w, errFileStore)
}
ext.LogError(span, err, olog.String("action", "file.store"))
return
}

Expand Down Expand Up @@ -232,11 +242,16 @@ func (s *Service) fileUploadHandler(
logger.Debug("done split failed", "error", err)
logger.Error(nil, "done split failed")
jsonhttp.InternalServerError(w, "done split failed")
ext.LogError(span, err, olog.String("action", "putter.Done"))
return
}

span.LogFields(olog.Bool("success", true))
span.SetTag("root_address", manifestReference)

if tagID != 0 {
w.Header().Set(SwarmTagHeader, fmt.Sprint(tagID))
span.SetTag("tagID", tagID)
}
w.Header().Set(ETagHeader, fmt.Sprintf("%q", manifestReference.String()))
w.Header().Set("Access-Control-Expose-Headers", SwarmTagHeader)
Expand Down
10 changes: 9 additions & 1 deletion pkg/api/dirs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,18 @@ import (
storer "github.com/ethersphere/bee/pkg/storer"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tracing"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
olog "github.com/opentracing/opentracing-go/log"
)

var errEmptyDir = errors.New("no files in root directory")

// dirUploadHandler uploads a directory supplied as a tar in an HTTP request
func (s *Service) dirUploadHandler(
ctx context.Context,
logger log.Logger,
span opentracing.Span,
w http.ResponseWriter,
r *http.Request,
putter storer.PutterSession,
Expand Down Expand Up @@ -64,7 +69,7 @@ func (s *Service) dirUploadHandler(
defer r.Body.Close()

reference, err := storeDir(
r.Context(),
ctx,
encrypt,
dReader,
logger,
Expand All @@ -86,6 +91,7 @@ func (s *Service) dirUploadHandler(
default:
jsonhttp.InternalServerError(w, errDirectoryStore)
}
ext.LogError(span, err, olog.String("action", "dir.store"))
return
}

Expand All @@ -94,11 +100,13 @@ func (s *Service) dirUploadHandler(
logger.Debug("store dir failed", "error", err)
logger.Error(nil, "store dir failed")
jsonhttp.InternalServerError(w, errDirectoryStore)
ext.LogError(span, err, olog.String("action", "putter.Done"))
return
}

if tag != 0 {
w.Header().Set(SwarmTagHeader, fmt.Sprint(tag))
span.LogFields(olog.Bool("success", true))
}
w.Header().Set("Access-Control-Expose-Headers", SwarmTagHeader)
jsonhttp.Created(w, bzzUploadResponse{
Expand Down
2 changes: 0 additions & 2 deletions pkg/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,14 +342,12 @@ func (s *Service) mountAPI() {
if s.Restricted {
handle("/auth", jsonhttp.MethodHandler{
"POST": web.ChainHandlers(
s.newTracingHandler("auth"),
jsonhttp.NewMaxBodyBytesHandler(512),
web.FinalHandlerFunc(s.authHandler),
),
})
handle("/refresh", jsonhttp.MethodHandler{
"POST": web.ChainHandlers(
s.newTracingHandler("auth"),
jsonhttp.NewMaxBodyBytesHandler(512),
web.FinalHandlerFunc(s.refreshHandler),
),
Expand Down
1 change: 1 addition & 0 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,7 @@ func NewBee(
RadiusSetter: kad,
WarmupDuration: o.WarmupTime,
Logger: logger,
Tracer: tracer,
}

if o.FullNodeMode && !o.BootnodeMode {
Expand Down
55 changes: 24 additions & 31 deletions pkg/pusher/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import (
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/tracing"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
olog "github.com/opentracing/opentracing-go/log"
)

// loggerName is the tree path name of the logger for this package.
Expand All @@ -34,6 +37,7 @@ type Op struct {
Chunk swarm.Chunk
Err chan error
Direct bool
Span opentracing.Span
}

type OpChan <-chan *Op
Expand All @@ -57,6 +61,7 @@ type Service struct {
inflight *inflight
attempts *attempts
smuggler chan OpChan
tracer *tracing.Tracer
}

const (
Expand Down Expand Up @@ -94,6 +99,7 @@ func New(
inflight: newInflight(),
attempts: &attempts{retryCount: retryCount, attempts: make(map[string]int)},
smuggler: make(chan OpChan),
tracer: tracer,
}
go p.chunksWorker(warmupTime, tracer)
return p
Expand All @@ -118,28 +124,20 @@ func (s *Service) chunksWorker(warmupTime time.Duration, tracer *tracing.Tracer)
}

var (
cctx, cancel = context.WithCancel(context.Background())
mtx sync.Mutex
wg sync.WaitGroup
span, logger, ctx = tracer.StartSpanFromContext(cctx, "pusher-sync-batch", s.logger)
timer = time.NewTimer(traceDuration)
sem = make(chan struct{}, ConcurrentPushes)
cc = make(chan *Op)
ctx, cancel = context.WithCancel(context.Background())
sem = make(chan struct{}, ConcurrentPushes)
cc = make(chan *Op)
)

// inflight.set handles the backpressure for the maximum amount of inflight chunks
// and duplicate handling.
chunks, unsubscribe := s.storer.SubscribePush(cctx)
chunks, unsubscribe := s.storer.SubscribePush(ctx)
defer func() {
unsubscribe()
cancel()
}()

ctxLogger := func() (context.Context, log.Logger) {
mtx.Lock()
defer mtx.Unlock()
return ctx, logger
}
var wg sync.WaitGroup

push := func(op *Op) {
var (
Expand Down Expand Up @@ -168,39 +166,33 @@ func (s *Service) chunksWorker(warmupTime time.Duration, tracer *tracing.Tracer)
}()

s.metrics.TotalToPush.Inc()
ctx, logger := ctxLogger()
startTime := time.Now()

spanCtx := ctx
if op.Span != nil {
spanCtx = tracing.WithContext(spanCtx, op.Span.Context())
} else {
op.Span = opentracing.NoopTracer{}.StartSpan("noOp")
}

if op.Direct {
err = s.pushDirect(ctx, logger, op)
err = s.pushDirect(spanCtx, s.logger, op)
} else {
doRepeat, err = s.pushDeferred(ctx, logger, op)
doRepeat, err = s.pushDeferred(spanCtx, s.logger, op)
}

if err != nil {
s.metrics.TotalErrors.Inc()
s.metrics.ErrorTime.Observe(time.Since(startTime).Seconds())
ext.LogError(op.Span, err)
} else {
op.Span.LogFields(olog.Bool("success", true))
}

s.metrics.SyncTime.Observe(time.Since(startTime).Seconds())
s.metrics.TotalSynced.Inc()
}

go func() {
for {
select {
case <-s.quit:
return
case <-timer.C:
// reset the span
mtx.Lock()
span.Finish()
span, logger, ctx = tracer.StartSpanFromContext(cctx, "pusher-sync-batch", s.logger)
mtx.Unlock()
}
}
}()

go func() {
for {
select {
Expand Down Expand Up @@ -393,6 +385,7 @@ func (s *Service) AddFeed(c <-chan *Op) {
go func() {
select {
case s.smuggler <- c:
s.logger.Info("got a chunk being smuggled")
case <-s.quit:
}
}()
Expand Down
Loading

0 comments on commit 2cdfadf

Please sign in to comment.