diff --git a/pkg/api/bytes.go b/pkg/api/bytes.go index dd2861ff285..46bd0e491f0 100644 --- a/pkg/api/bytes.go +++ b/pkg/api/bytes.go @@ -18,6 +18,9 @@ import ( "github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/tracing" "github.com/gorilla/mux" + "github.com/opentracing/opentracing-go/ext" + "github.com/opentracing/opentracing-go/log" + olog "github.com/opentracing/opentracing-go/log" ) type bytesPostResponse struct { @@ -26,7 +29,8 @@ type bytesPostResponse struct { // bytesUploadHandler handles upload of raw binary data of arbitrary length. func (s *Service) bytesUploadHandler(w http.ResponseWriter, r *http.Request) { - logger := tracing.NewLoggerWithTraceID(r.Context(), s.logger.WithName("post_bytes").Build()) + span, logger, ctx := s.tracer.StartSpanFromContext(r.Context(), "post_bytes", s.logger.WithName("post_bytes").Build()) + defer span.Finish() headers := struct { BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"` @@ -57,11 +61,13 @@ func (s *Service) bytesUploadHandler(w http.ResponseWriter, r *http.Request) { default: jsonhttp.InternalServerError(w, "cannot get or create tag") } + ext.LogError(span, err, log.String("action", "tag.create")) return } + span.SetTag("tagID", tag) } - putter, err := s.newStamperPutter(r.Context(), putterOptions{ + putter, err := s.newStamperPutter(ctx, putterOptions{ BatchID: headers.BatchID, TagID: tag, Pin: headers.Pin, @@ -82,6 +88,7 @@ func (s *Service) bytesUploadHandler(w http.ResponseWriter, r *http.Request) { default: jsonhttp.BadRequest(w, nil) } + ext.LogError(span, err, log.String("action", "new.StamperPutter")) return } @@ -92,7 +99,7 @@ func (s *Service) bytesUploadHandler(w http.ResponseWriter, r *http.Request) { } p := requestPipelineFn(putter, headers.Encrypt) - address, err := p(r.Context(), r.Body) + address, err := p(ctx, r.Body) if err != nil { logger.Debug("split write all failed", "error", err) logger.Error(nil, "split write all failed") @@ -102,20 +109,27 @@ func (s *Service) bytesUploadHandler(w http.ResponseWriter, r *http.Request) { default: jsonhttp.InternalServerError(ow, "split write all failed") } + ext.LogError(span, err, log.String("action", "split.WriteAll")) return } + span.SetTag("root_address", address) + err = putter.Done(address) if err != nil { logger.Debug("done split failed", "error", err) logger.Error(nil, "done split failed") jsonhttp.InternalServerError(ow, "done split failed") + ext.LogError(span, err, log.String("action", "putter.Done")) return } if tag != 0 { w.Header().Set(SwarmTagHeader, fmt.Sprint(tag)) } + + span.LogFields(olog.Bool("success", true)) + w.Header().Set("Access-Control-Expose-Headers", SwarmTagHeader) jsonhttp.Created(w, bytesPostResponse{ Reference: address, diff --git a/pkg/api/bzz.go b/pkg/api/bzz.go index 1dba2add517..3089d3fa6c0 100644 --- a/pkg/api/bzz.go +++ b/pkg/api/bzz.go @@ -9,7 +9,6 @@ import ( "encoding/hex" "errors" "fmt" - "github.com/ethersphere/bee/pkg/topology" "net/http" "path" "path/filepath" @@ -17,6 +16,11 @@ import ( "strings" "time" + "github.com/ethersphere/bee/pkg/topology" + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" + olog "github.com/opentracing/opentracing-go/log" + "github.com/ethereum/go-ethereum/common" "github.com/ethersphere/bee/pkg/feeds" "github.com/ethersphere/bee/pkg/file/joiner" @@ -34,7 +38,8 @@ import ( ) func (s *Service) bzzUploadHandler(w http.ResponseWriter, r *http.Request) { - logger := tracing.NewLoggerWithTraceID(r.Context(), s.logger.WithName("post_bzz").Build()) + span, logger, ctx := s.tracer.StartSpanFromContext(r.Context(), "post_bzz", s.logger.WithName("post_bzz").Build()) + defer span.Finish() headers := struct { ContentType string `map:"Content-Type,mimeMediaType" validate:"required"` @@ -67,11 +72,13 @@ func (s *Service) bzzUploadHandler(w http.ResponseWriter, r *http.Request) { default: jsonhttp.InternalServerError(w, "cannot get or create tag") } + ext.LogError(span, err, olog.String("action", "tag.create")) return } + span.SetTag("tagID", tag) } - putter, err := s.newStamperPutter(r.Context(), putterOptions{ + putter, err := s.newStamperPutter(ctx, putterOptions{ BatchID: headers.BatchID, TagID: tag, Pin: headers.Pin, @@ -92,6 +99,7 @@ func (s *Service) bzzUploadHandler(w http.ResponseWriter, r *http.Request) { default: jsonhttp.BadRequest(w, nil) } + ext.LogError(span, err, olog.String("action", "new.StamperPutter")) return } @@ -102,10 +110,10 @@ func (s *Service) bzzUploadHandler(w http.ResponseWriter, r *http.Request) { } if headers.IsDir || headers.ContentType == multiPartFormData { - s.dirUploadHandler(logger, ow, r, putter, r.Header.Get(ContentTypeHeader), headers.Encrypt, tag) + s.dirUploadHandler(ctx, logger, span, ow, r, putter, r.Header.Get(ContentTypeHeader), headers.Encrypt, tag) return } - s.fileUploadHandler(logger, ow, r, putter, headers.Encrypt, tag) + s.fileUploadHandler(ctx, logger, span, ow, r, putter, headers.Encrypt, tag) } // fileUploadResponse is returned when an HTTP request to upload a file is successful @@ -116,7 +124,9 @@ type bzzUploadResponse struct { // fileUploadHandler uploads the file and its metadata supplied in the file body and // the headers func (s *Service) fileUploadHandler( + ctx context.Context, logger log.Logger, + span opentracing.Span, w http.ResponseWriter, r *http.Request, putter storer.PutterSession, @@ -132,7 +142,6 @@ func (s *Service) fileUploadHandler( } p := requestPipelineFn(putter, encrypt) - ctx := r.Context() // first store the file and get its reference fr, err := p(ctx, r.Body) @@ -145,6 +154,7 @@ func (s *Service) fileUploadHandler( default: jsonhttp.InternalServerError(w, errFileStore) } + ext.LogError(span, err, olog.String("action", "file.store")) return } @@ -232,11 +242,16 @@ func (s *Service) fileUploadHandler( logger.Debug("done split failed", "error", err) logger.Error(nil, "done split failed") jsonhttp.InternalServerError(w, "done split failed") + ext.LogError(span, err, olog.String("action", "putter.Done")) return } + span.LogFields(olog.Bool("success", true)) + span.SetTag("root_address", manifestReference) + if tagID != 0 { w.Header().Set(SwarmTagHeader, fmt.Sprint(tagID)) + span.SetTag("tagID", tagID) } w.Header().Set(ETagHeader, fmt.Sprintf("%q", manifestReference.String())) w.Header().Set("Access-Control-Expose-Headers", SwarmTagHeader) diff --git a/pkg/api/dirs.go b/pkg/api/dirs.go index edd4f741120..e22c9cacf21 100644 --- a/pkg/api/dirs.go +++ b/pkg/api/dirs.go @@ -27,13 +27,18 @@ import ( storer "github.com/ethersphere/bee/pkg/storer" "github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/tracing" + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" + olog "github.com/opentracing/opentracing-go/log" ) var errEmptyDir = errors.New("no files in root directory") // dirUploadHandler uploads a directory supplied as a tar in an HTTP request func (s *Service) dirUploadHandler( + ctx context.Context, logger log.Logger, + span opentracing.Span, w http.ResponseWriter, r *http.Request, putter storer.PutterSession, @@ -64,7 +69,7 @@ func (s *Service) dirUploadHandler( defer r.Body.Close() reference, err := storeDir( - r.Context(), + ctx, encrypt, dReader, logger, @@ -86,6 +91,7 @@ func (s *Service) dirUploadHandler( default: jsonhttp.InternalServerError(w, errDirectoryStore) } + ext.LogError(span, err, olog.String("action", "dir.store")) return } @@ -94,11 +100,13 @@ func (s *Service) dirUploadHandler( logger.Debug("store dir failed", "error", err) logger.Error(nil, "store dir failed") jsonhttp.InternalServerError(w, errDirectoryStore) + ext.LogError(span, err, olog.String("action", "putter.Done")) return } if tag != 0 { w.Header().Set(SwarmTagHeader, fmt.Sprint(tag)) + span.LogFields(olog.Bool("success", true)) } w.Header().Set("Access-Control-Expose-Headers", SwarmTagHeader) jsonhttp.Created(w, bzzUploadResponse{ diff --git a/pkg/api/router.go b/pkg/api/router.go index 1ce1bb94ae0..7c854543b88 100644 --- a/pkg/api/router.go +++ b/pkg/api/router.go @@ -342,14 +342,12 @@ func (s *Service) mountAPI() { if s.Restricted { handle("/auth", jsonhttp.MethodHandler{ "POST": web.ChainHandlers( - s.newTracingHandler("auth"), jsonhttp.NewMaxBodyBytesHandler(512), web.FinalHandlerFunc(s.authHandler), ), }) handle("/refresh", jsonhttp.MethodHandler{ "POST": web.ChainHandlers( - s.newTracingHandler("auth"), jsonhttp.NewMaxBodyBytesHandler(512), web.FinalHandlerFunc(s.refreshHandler), ), diff --git a/pkg/node/node.go b/pkg/node/node.go index 197c048a263..36cd9361f73 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -754,6 +754,7 @@ func NewBee( RadiusSetter: kad, WarmupDuration: o.WarmupTime, Logger: logger, + Tracer: tracer, } if o.FullNodeMode && !o.BootnodeMode { diff --git a/pkg/pusher/pusher.go b/pkg/pusher/pusher.go index 5e38137979f..457af2f58a3 100644 --- a/pkg/pusher/pusher.go +++ b/pkg/pusher/pusher.go @@ -25,6 +25,9 @@ import ( "github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/topology" "github.com/ethersphere/bee/pkg/tracing" + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" + olog "github.com/opentracing/opentracing-go/log" ) // loggerName is the tree path name of the logger for this package. @@ -34,6 +37,7 @@ type Op struct { Chunk swarm.Chunk Err chan error Direct bool + Span opentracing.Span } type OpChan <-chan *Op @@ -57,6 +61,7 @@ type Service struct { inflight *inflight attempts *attempts smuggler chan OpChan + tracer *tracing.Tracer } const ( @@ -94,6 +99,7 @@ func New( inflight: newInflight(), attempts: &attempts{retryCount: retryCount, attempts: make(map[string]int)}, smuggler: make(chan OpChan), + tracer: tracer, } go p.chunksWorker(warmupTime, tracer) return p @@ -118,28 +124,20 @@ func (s *Service) chunksWorker(warmupTime time.Duration, tracer *tracing.Tracer) } var ( - cctx, cancel = context.WithCancel(context.Background()) - mtx sync.Mutex - wg sync.WaitGroup - span, logger, ctx = tracer.StartSpanFromContext(cctx, "pusher-sync-batch", s.logger) - timer = time.NewTimer(traceDuration) - sem = make(chan struct{}, ConcurrentPushes) - cc = make(chan *Op) + ctx, cancel = context.WithCancel(context.Background()) + sem = make(chan struct{}, ConcurrentPushes) + cc = make(chan *Op) ) // inflight.set handles the backpressure for the maximum amount of inflight chunks // and duplicate handling. - chunks, unsubscribe := s.storer.SubscribePush(cctx) + chunks, unsubscribe := s.storer.SubscribePush(ctx) defer func() { unsubscribe() cancel() }() - ctxLogger := func() (context.Context, log.Logger) { - mtx.Lock() - defer mtx.Unlock() - return ctx, logger - } + var wg sync.WaitGroup push := func(op *Op) { var ( @@ -168,39 +166,33 @@ func (s *Service) chunksWorker(warmupTime time.Duration, tracer *tracing.Tracer) }() s.metrics.TotalToPush.Inc() - ctx, logger := ctxLogger() startTime := time.Now() + spanCtx := ctx + if op.Span != nil { + spanCtx = tracing.WithContext(spanCtx, op.Span.Context()) + } else { + op.Span = opentracing.NoopTracer{}.StartSpan("noOp") + } + if op.Direct { - err = s.pushDirect(ctx, logger, op) + err = s.pushDirect(spanCtx, s.logger, op) } else { - doRepeat, err = s.pushDeferred(ctx, logger, op) + doRepeat, err = s.pushDeferred(spanCtx, s.logger, op) } if err != nil { s.metrics.TotalErrors.Inc() s.metrics.ErrorTime.Observe(time.Since(startTime).Seconds()) + ext.LogError(op.Span, err) + } else { + op.Span.LogFields(olog.Bool("success", true)) } s.metrics.SyncTime.Observe(time.Since(startTime).Seconds()) s.metrics.TotalSynced.Inc() } - go func() { - for { - select { - case <-s.quit: - return - case <-timer.C: - // reset the span - mtx.Lock() - span.Finish() - span, logger, ctx = tracer.StartSpanFromContext(cctx, "pusher-sync-batch", s.logger) - mtx.Unlock() - } - } - }() - go func() { for { select { @@ -393,6 +385,7 @@ func (s *Service) AddFeed(c <-chan *Op) { go func() { select { case s.smuggler <- c: + s.logger.Info("got a chunk being smuggled") case <-s.quit: } }() diff --git a/pkg/pushsync/pushsync.go b/pkg/pushsync/pushsync.go index 08832c3f110..8f1597384a6 100644 --- a/pkg/pushsync/pushsync.go +++ b/pkg/pushsync/pushsync.go @@ -28,6 +28,8 @@ import ( "github.com/ethersphere/bee/pkg/topology" "github.com/ethersphere/bee/pkg/tracing" opentracing "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" + olog "github.com/opentracing/opentracing-go/log" ) // loggerName is the tree path name of the logger for this package. @@ -186,8 +188,27 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) chunk := swarm.NewChunk(swarm.NewAddress(ch.Address), ch.Data) chunkAddress := chunk.Address() - span, _, ctx := ps.tracer.StartSpanFromContext(ctx, "pushsync-handler", ps.logger, opentracing.Tag{Key: "address", Value: chunkAddress.String()}) - defer span.Finish() + span, _, ctx := ps.tracer.StartSpanFromContext(ctx, "pushsync-handler", ps.logger, opentracing.Tag{Key: "address", Value: chunkAddress.String()}, opentracing.Tag{Key: "tagID", Value: chunk.TagID()}, opentracing.Tag{Key: "sender_address", Value: p.Address.String()}) + + var ( + stored bool + reason string + ) + + defer func() { + if err != nil { + ext.LogError(span, err) + } else { + var logs []olog.Field + logs = append(logs, olog.Bool("success", true)) + if stored { + logs = append(logs, olog.Bool("stored", true)) + logs = append(logs, olog.String("reason", reason)) + } + span.LogFields(logs...) + } + span.Finish() + }() stamp := new(postage.Stamp) err = stamp.UnmarshalBinary(ch.Stamp) @@ -240,12 +261,14 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) } if ps.topologyDriver.IsReachable() && ps.store.IsWithinStorageRadius(chunkAddress) { + stored, reason = true, "is within AOR" return store(ctx) } receipt, err := ps.pushToClosest(ctx, chunk, false) if err != nil { if errors.Is(err, topology.ErrWantSelf) { + stored, reason = true, "want self" return store(ctx) } @@ -437,22 +460,25 @@ func (ps *PushSync) closestPeer(chunkAddress swarm.Address, origin bool) (swarm. } func (ps *PushSync) push(parentCtx context.Context, resultChan chan<- receiptResult, peer swarm.Address, ch swarm.Chunk, action accounting.Action) { - - span := tracing.FromContext(parentCtx) - ctx, cancel := context.WithTimeout(context.Background(), defaultTTL) defer cancel() - spanInner, _, ctx := ps.tracer.StartSpanFromContext(tracing.WithContext(ctx, span), "push-closest", ps.logger, opentracing.Tag{Key: "address", Value: ch.Address().String()}) - defer spanInner.Finish() - var ( err error receipt *pb.Receipt - now = time.Now() ) + now := time.Now() + + spanInner, _, _ := ps.tracer.FollowSpanFromContext(context.WithoutCancel(parentCtx), "push-chunk-async", ps.logger, opentracing.Tag{Key: "address", Value: ch.Address().String()}) + defer func() { + if err != nil { + ext.LogError(spanInner, err) + } else { + spanInner.LogFields(olog.Bool("success", true)) + } + spanInner.Finish() select { case resultChan <- receiptResult{pushTime: now, peer: peer, err: err, receipt: receipt}: case <-parentCtx.Done(): @@ -461,7 +487,9 @@ func (ps *PushSync) push(parentCtx context.Context, resultChan chan<- receiptRes defer action.Cleanup() - receipt, err = ps.pushChunkToPeer(ctx, peer, ch) + spanInner.LogFields(olog.String("peer_address", peer.String())) + + receipt, err = ps.pushChunkToPeer(tracing.WithContext(ctx, spanInner.Context()), peer, ch) if err != nil { return } diff --git a/pkg/retrieval/retrieval.go b/pkg/retrieval/retrieval.go index 6b5a037e248..cae78558df9 100644 --- a/pkg/retrieval/retrieval.go +++ b/pkg/retrieval/retrieval.go @@ -28,6 +28,8 @@ import ( "github.com/ethersphere/bee/pkg/topology" "github.com/ethersphere/bee/pkg/tracing" "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" + olog "github.com/opentracing/opentracing-go/log" "resenje.org/singleflight" ) @@ -152,6 +154,8 @@ func (s *Service) RetrieveChunk(ctx context.Context, chunkAddr, sourcePeerAddr s s.metrics.RequestAttempts.Observe(float64(totalRetrieveAttempts)) }() + spanCtx := context.WithoutCancel(ctx) + v, _, err := s.singleflight.Do(ctx, flightRoute, func(ctx context.Context) (swarm.Chunk, error) { skip := skippeers.NewList() @@ -257,10 +261,9 @@ func (s *Service) RetrieveChunk(ctx context.Context, chunkAddr, sourcePeerAddr s inflight++ go func() { - ctx := tracing.WithContext(context.Background(), tracing.FromContext(ctx)) // todo: replace with `ctx := context.WithoutCancel(ctx)` when go 1.21 is supported to pass all context values - span, _, ctx := s.tracer.StartSpanFromContext(ctx, "retrieve-chunk", s.logger, opentracing.Tag{Key: "address", Value: chunkAddr.String()}) + span, _, ctx := s.tracer.FollowSpanFromContext(spanCtx, "retrieve-chunk", s.logger, opentracing.Tag{Key: "address", Value: chunkAddr.String()}) defer span.Finish() - s.retrieveChunk(ctx, quit, chunkAddr, peer, resultC, action, origin) + s.retrieveChunk(ctx, quit, chunkAddr, peer, resultC, action, origin, span) }() case res := <-resultC: @@ -294,7 +297,7 @@ func (s *Service) RetrieveChunk(ctx context.Context, chunkAddr, sourcePeerAddr s return v, nil } -func (s *Service) retrieveChunk(ctx context.Context, quit chan struct{}, chunkAddr, peer swarm.Address, result chan retrievalResult, action accounting.Action, isOrigin bool) { +func (s *Service) retrieveChunk(ctx context.Context, quit chan struct{}, chunkAddr, peer swarm.Address, result chan retrievalResult, action accounting.Action, isOrigin bool, span opentracing.Span) { var ( startTime = time.Now() @@ -305,7 +308,10 @@ func (s *Service) retrieveChunk(ctx context.Context, quit chan struct{}, chunkAd defer func() { action.Cleanup() if err != nil { + ext.LogError(span, err) s.metrics.TotalErrors.Inc() + } else { + span.LogFields(olog.Bool("success", true)) } select { case result <- retrievalResult{err: err, chunk: chunk, peer: peer}: @@ -446,10 +452,19 @@ func (s *Service) handler(p2pctx context.Context, p p2p.Peer, stream p2p.Stream) return fmt.Errorf("invalid address queried by peer %s", p.Address.String()) } + var forwarded bool + span, _, ctx := s.tracer.StartSpanFromContext(ctx, "handle-retrieve-chunk", s.logger, opentracing.Tag{Key: "address", Value: addr.String()}) - defer span.Finish() + defer func() { + if err != nil { + ext.LogError(span, err) + } else { + span.LogFields(olog.Bool("success", true)) + } + span.LogFields(olog.Bool("forwarded", forwarded)) + span.Finish() + }() - forwarded := false chunk, err := s.storer.Lookup().Get(ctx, addr) if err != nil { if errors.Is(err, storage.ErrNotFound) { diff --git a/pkg/storer/netstore.go b/pkg/storer/netstore.go index 21ed95753f3..3d5a0c01057 100644 --- a/pkg/storer/netstore.go +++ b/pkg/storer/netstore.go @@ -12,6 +12,8 @@ import ( "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/topology" + "github.com/opentracing/opentracing-go/ext" + olog "github.com/opentracing/opentracing-go/log" "golang.org/x/sync/errgroup" ) @@ -25,11 +27,19 @@ func (db *DB) DirectUpload() PutterSession { Putter: putterWithMetrics{ storage.PutterFunc(func(ctx context.Context, ch swarm.Chunk) error { db.directUploadLimiter <- struct{}{} - eg.Go(func() error { + eg.Go(func() (err error) { defer func() { <-db.directUploadLimiter }() + span, logger, ctx := db.tracer.FollowSpanFromContext(ctx, "put-direct-upload", db.logger) + defer func() { + if err != nil { + ext.LogError(span, err) + } + span.Finish() + }() + for { - op := &pusher.Op{Chunk: ch, Err: make(chan error, 1), Direct: true} + op := &pusher.Op{Chunk: ch, Err: make(chan error, 1), Direct: true, Span: span} select { case <-ctx.Done(): return ctx.Err() @@ -47,9 +57,9 @@ func (db *DB) DirectUpload() PutterSession { return ErrDBQuit case err := <-op.Err: if errors.Is(err, pusher.ErrShallowReceipt) { - db.logger.Debug("direct upload: shallow receipt received, retrying", "chunk", ch.Address()) + logger.Debug("direct upload: shallow receipt received, retrying", "chunk", ch.Address()) } else if errors.Is(err, topology.ErrNotFound) { - db.logger.Debug("direct upload: no peers available, retrying", "chunk", ch.Address()) + logger.Debug("direct upload: no peers available, retrying", "chunk", ch.Address()) } else { return err } @@ -62,7 +72,7 @@ func (db *DB) DirectUpload() PutterSession { db.metrics, "netstore", }, - done: func(_ swarm.Address) error { return eg.Wait() }, + done: func(swarm.Address) error { return eg.Wait() }, cleanup: func() error { _ = eg.Wait(); return nil }, } } @@ -70,12 +80,25 @@ func (db *DB) DirectUpload() PutterSession { // Download is the implementation of the NetStore.Download method. func (db *DB) Download(cache bool) storage.Getter { return getterWithMetrics{ - storage.GetterFunc(func(ctx context.Context, address swarm.Address) (swarm.Chunk, error) { - ch, err := db.Lookup().Get(ctx, address) + storage.GetterFunc(func(ctx context.Context, address swarm.Address) (ch swarm.Chunk, err error) { + + span, logger, ctx := db.tracer.StartSpanFromContext(ctx, "get-chunk", db.logger) + defer func() { + if err != nil { + ext.LogError(span, err) + } else { + span.LogFields(olog.Bool("success", true)) + } + span.Finish() + }() + + ch, err = db.Lookup().Get(ctx, address) switch { case err == nil: + span.LogFields(olog.String("step", "chunk found locally")) return ch, nil case errors.Is(err, storage.ErrNotFound): + span.LogFields(olog.String("step", "retrieve chunk from network")) if db.retrieval != nil { // if chunk is not found locally, retrieve it from the network ch, err = db.retrieval.RetrieveChunk(ctx, address, swarm.ZeroAddress) @@ -93,7 +116,7 @@ func (db *DB) Download(cache bool) storage.Getter { err := db.Cache().Put(db.cacheLimiter.ctx, ch) if err != nil { - db.logger.Debug("putting chunk to cache failed", "error", err, "chunk_address", ch.Address()) + logger.Debug("putting chunk to cache failed", "error", err, "chunk_address", ch.Address()) } }() } diff --git a/pkg/storer/storer.go b/pkg/storer/storer.go index 49ec3740738..e7d973ea0b7 100644 --- a/pkg/storer/storer.go +++ b/pkg/storer/storer.go @@ -37,6 +37,7 @@ import ( localmigration "github.com/ethersphere/bee/pkg/storer/migration" "github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/topology" + "github.com/ethersphere/bee/pkg/tracing" "github.com/ethersphere/bee/pkg/util/syncutil" "github.com/prometheus/client_golang/prometheus" "github.com/spf13/afero" @@ -444,6 +445,7 @@ type Options struct { LdbDisableSeeksCompaction bool CacheCapacity uint64 Logger log.Logger + Tracer *tracing.Tracer Address swarm.Address WarmupDuration time.Duration @@ -480,7 +482,9 @@ type cacheLimiter struct { // DB implements all the component stores described above. type DB struct { - logger log.Logger + logger log.Logger + tracer *tracing.Tracer + metrics metrics repo storage.Repository @@ -582,6 +586,7 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) { db := &DB{ metrics: metrics, logger: logger, + tracer: opts.Tracer, baseAddr: opts.Address, repo: repo, lock: lock, diff --git a/pkg/tracing/tracing.go b/pkg/tracing/tracing.go index 6739e1a03d1..14ccdb9ab43 100644 --- a/pkg/tracing/tracing.go +++ b/pkg/tracing/tracing.go @@ -107,6 +107,25 @@ func (t *Tracer) StartSpanFromContext(ctx context.Context, operationName string, return span, loggerWithTraceID(sc, l), WithContext(ctx, sc) } +// FollowSpanFromContext starts a new tracing span that is either a root one or +// follows an existing one from the provided Context. If logger is provided, a new +// log Entry will be returned with "traceID" log field. +func (t *Tracer) FollowSpanFromContext(ctx context.Context, operationName string, l log.Logger, opts ...opentracing.StartSpanOption) (opentracing.Span, log.Logger, context.Context) { + if t == nil { + t = noopTracer + } + + var span opentracing.Span + if parentContext := FromContext(ctx); parentContext != nil { + opts = append(opts, opentracing.FollowsFrom(parentContext)) + span = t.tracer.StartSpan(operationName, opts...) + } else { + span = t.tracer.StartSpan(operationName, opts...) + } + sc := span.Context() + return span, loggerWithTraceID(sc, l), WithContext(ctx, sc) +} + // AddContextHeader adds a tracing span context to provided p2p Headers from // the go context. If the tracing span context is not present in go context, // ErrContextNotFound is returned.