Skip to content

Commit

Permalink
feat: more enhancements
Browse files Browse the repository at this point in the history
  • Loading branch information
acha-bill committed Sep 11, 2023
1 parent 61717bf commit 2c4c55d
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 5 deletions.
8 changes: 8 additions & 0 deletions pkg/pusher/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/tracing"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
)

// loggerName is the tree path name of the logger for this package.
Expand Down Expand Up @@ -160,10 +162,15 @@ func (s *Service) chunksWorker(warmupTime time.Duration, tracer *tracing.Tracer)
}
}()

pSpan := tracing.FromContext(ctx)

s.metrics.TotalToPush.Inc()
ctx, logger := ctxLogger()
startTime := time.Now()

pushSpan, _, ctx := s.tracer.FollowSpanFromContext(tracing.WithContext(ctx, pSpan), "chunks-worker-push", s.logger, opentracing.Tag{Key: "tagID", Value: op.Chunk.TagID()}, opentracing.Tag{Key: "address", Value: op.Chunk.Address()})
defer pushSpan.Finish()

if op.Direct {
err = s.pushDirect(ctx, logger, op)
} else {
Expand All @@ -173,6 +180,7 @@ func (s *Service) chunksWorker(warmupTime time.Duration, tracer *tracing.Tracer)
if err != nil {
s.metrics.TotalErrors.Inc()
s.metrics.ErrorTime.Observe(time.Since(startTime).Seconds())
ext.LogError(pushSpan, err)
}

s.metrics.SyncTime.Observe(time.Since(startTime).Seconds())
Expand Down
8 changes: 5 additions & 3 deletions pkg/pushsync/pushsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/tracing"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
opentracinglog "github.com/opentracing/opentracing-go/log"
)

Expand Down Expand Up @@ -183,10 +184,10 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
chunk := swarm.NewChunk(swarm.NewAddress(ch.Address), ch.Data)
chunkAddress := chunk.Address()

span, _, ctx := ps.tracer.StartSpanFromContext(ctx, "pushsync-handler", ps.logger, opentracing.Tag{Key: "address", Value: chunkAddress.String()}, opentracing.Tag{Key: "tagID", Value: chunk.TagID()})
span, _, ctx := ps.tracer.StartSpanFromContext(ctx, "pushsync-handler", ps.logger, opentracing.Tag{Key: "address", Value: chunkAddress.String()}, opentracing.Tag{Key: "tagID", Value: chunk.TagID()}, opentracing.Tag{Key: "sender_address", Value: p.Address.String()})
defer func() {
if err != nil {
span.LogFields(opentracinglog.Error(err))
ext.LogError(span, err)
}
span.Finish()
}()
Expand Down Expand Up @@ -411,6 +412,7 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo
ps.metrics.TotalFailedSendAttempts.Inc()
ps.logger.Debug("could not push to peer", "chunk_address", ch.Address(), "peer_address", result.peer, "error", result.err)
span.LogFields(opentracinglog.String("event", "push failed"), opentracinglog.String("address", ch.Address().String()), opentracinglog.String("peer_address", result.peer.String()), opentracinglog.Error(result.err))
ext.LogError(span, result.err)
sentErrorsLeft--

retry()
Expand Down Expand Up @@ -438,7 +440,7 @@ func (ps *PushSync) push(parentCtx context.Context, resultChan chan<- receiptRes

defer func() {
if err != nil {
spanInner.LogFields(opentracinglog.Error(err))
ext.LogError(spanInner, err)
}
spanInner.Finish()
select {
Expand Down
8 changes: 6 additions & 2 deletions pkg/retrieval/retrieval.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/ethersphere/bee/pkg/topology"
"github.com/ethersphere/bee/pkg/tracing"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
opentracinglog "github.com/opentracing/opentracing-go/log"
"resenje.org/singleflight"
)
Expand Down Expand Up @@ -186,6 +187,7 @@ func (s *Service) RetrieveChunk(ctx context.Context, chunkAddr, sourcePeerAddr s

inflight := 0

var span opentracing.Span
for errorsLeft > 0 {

select {
Expand Down Expand Up @@ -241,8 +243,7 @@ func (s *Service) RetrieveChunk(ctx context.Context, chunkAddr, sourcePeerAddr s

go func() {
ctx := tracing.WithContext(context.Background(), tracing.FromContext(topCtx))
span, _, ctx := s.tracer.FollowSpanFromContext(ctx, "retrieve-chunk", s.logger, opentracing.Tag{Key: "address", Value: chunkAddr.String()})
defer span.Finish()
span, _, ctx = s.tracer.FollowSpanFromContext(ctx, "retrieve-chunk", s.logger, opentracing.Tag{Key: "address", Value: chunkAddr.String()}, opentracing.Tag{Key: "peer", Value: peer.String()})
s.retrieveChunk(ctx, chunkAddr, peer, resultC, action, origin, span)
}()

Expand All @@ -251,10 +252,13 @@ func (s *Service) RetrieveChunk(ctx context.Context, chunkAddr, sourcePeerAddr s
inflight--

if res.err == nil {
span.Finish()
loggerV1.Debug("retrieved chunk", "chunk_address", chunkAddr, "peer_address", res.peer)
return res.chunk, nil
}

ext.LogError(span, res.err)
span.Finish()
loggerV1.Debug("failed to get chunk", "chunk_address", chunkAddr, "peer_address", res.peer, "error", res.err)

errorsLeft--
Expand Down

0 comments on commit 2c4c55d

Please sign in to comment.