Skip to content

Commit

Permalink
feat: logging
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed Feb 15, 2024
1 parent 8efb379 commit d0c3e41
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 6 deletions.
4 changes: 2 additions & 2 deletions pkg/api/bzz.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func (s *Service) serveReference(logger log.Logger, address swarm.Address, pathV
strategyTimeout := getter.DefaultStrategyTimeout.String()

ctx := r.Context()
ctx, err := getter.SetConfigInContext(ctx, headers.Strategy, headers.FallbackMode, headers.ChunkRetrievalTimeout, &strategyTimeout)
ctx, err := getter.SetConfigInContext(ctx, headers.Strategy, headers.FallbackMode, headers.ChunkRetrievalTimeout, &strategyTimeout, logger)
if err != nil {
logger.Error(err, err.Error())
jsonhttp.BadRequest(w, "could not parse headers")
Expand Down Expand Up @@ -521,7 +521,7 @@ func (s *Service) downloadHandler(logger log.Logger, w http.ResponseWriter, r *h
strategyTimeout := getter.DefaultStrategyTimeout.String()

ctx := r.Context()
ctx, err := getter.SetConfigInContext(ctx, headers.Strategy, headers.FallbackMode, headers.ChunkRetrievalTimeout, &strategyTimeout)
ctx, err := getter.SetConfigInContext(ctx, headers.Strategy, headers.FallbackMode, headers.ChunkRetrievalTimeout, &strategyTimeout, logger)
if err != nil {
logger.Error(err, err.Error())
jsonhttp.BadRequest(w, "could not parse headers")
Expand Down
5 changes: 3 additions & 2 deletions pkg/file/joiner/joiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/ethersphere/bee/pkg/file/redundancy/getter"
"github.com/ethersphere/bee/pkg/file/splitter"
filetest "github.com/ethersphere/bee/pkg/file/testing"
"github.com/ethersphere/bee/pkg/log"
storage "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/inmemchunkstore"
testingc "github.com/ethersphere/bee/pkg/storage/testing"
Expand Down Expand Up @@ -1120,7 +1121,7 @@ func TestJoinerRedundancy(t *testing.T) {
fallback := true
s := getter.RACE

ctx, err := getter.SetConfigInContext(ctx, &s, &fallback, &decodeTimeoutStr, &strategyTimeoutStr)
ctx, err := getter.SetConfigInContext(ctx, &s, &fallback, &decodeTimeoutStr, &strategyTimeoutStr, log.Noop)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1261,7 +1262,7 @@ func TestJoinerRedundancyMultilevel(t *testing.T) {
strategyTimeoutStr := strategyTimeout.String()
decodingTimeoutStr := (2 * strategyTimeout).String()

ctx, err := getter.SetConfigInContext(ctx, &s, &fallback, &decodingTimeoutStr, &strategyTimeoutStr)
ctx, err := getter.SetConfigInContext(ctx, &s, &fallback, &decodingTimeoutStr, &strategyTimeoutStr, log.Noop)
if err != nil {
t.Fatal(err)
}
Expand Down
16 changes: 15 additions & 1 deletion pkg/file/redundancy/getter/getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync"
"sync/atomic"

"github.com/ethersphere/bee/pkg/log"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/klauspost/reedsolomon"
Expand Down Expand Up @@ -40,6 +41,7 @@ type decoder struct {
cancel func() // cancel function for RS decoding
remove func() // callback to remove decoder from decoders cache
config Config // configuration
logger log.Logger
}

type Getter interface {
Expand Down Expand Up @@ -67,6 +69,7 @@ func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter
shardCnt: shardCnt,
parityCnt: size - shardCnt,
config: conf,
logger: conf.Logger.WithName("redundancy").Build(),
}

// after init, cache and wait channels are immutable, need no locking
Expand Down Expand Up @@ -122,6 +125,7 @@ func (g *decoder) fetch(ctx context.Context, i int, waitForRecovery bool) (err e
case <-g.badRecovery:
return storage.ErrNotFound
case <-g.goodRecovery:
g.logger.Debug("recovered chunk", "address", g.addrs[i])
return nil
case <-ctx.Done():
return ctx.Err()
Expand Down Expand Up @@ -234,7 +238,17 @@ func (g *decoder) prefetch(ctx context.Context) error {

var err error
for s := g.config.Strategy; s < strategyCnt; s++ {
if err = run(s); err == nil {

err = run(s)
if err != nil {
if s == DATA || s == RACE {
g.logger.Debug("failed recovery", "strategy", s)
}
}
if err == nil {
if s > DATA {
g.logger.Debug("successful recovery", "strategy", s)
}
close(g.goodRecovery)
break
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/file/redundancy/getter/getter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/ethersphere/bee/pkg/cac"
"github.com/ethersphere/bee/pkg/file/redundancy/getter"
"github.com/ethersphere/bee/pkg/log"
"github.com/ethersphere/bee/pkg/storage"
inmem "github.com/ethersphere/bee/pkg/storage/inmemchunkstore"
mockstorer "github.com/ethersphere/bee/pkg/storer/mock"
Expand Down Expand Up @@ -120,6 +121,7 @@ func testDecodingRACE(t *testing.T, bufSize, shardCnt, erasureCnt int) {
Strategy: getter.RACE,
FetchTimeout: 2 * strategyTimeout,
StrategyTimeout: strategyTimeout,
Logger: log.Noop,
}
g := getter.New(addrs, shardCnt, store, store, func() {}, conf)
defer g.Close()
Expand Down
21 changes: 20 additions & 1 deletion pkg/file/redundancy/getter/strategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"time"

"github.com/ethersphere/bee/pkg/log"
"github.com/ethersphere/bee/pkg/retrieval"
)

Expand All @@ -25,6 +26,7 @@ type (
modeKey struct{}
fetchTimeoutKey struct{}
strategyTimeoutKey struct{}
loggerKey struct{}
Strategy = int
)

Expand All @@ -34,6 +36,7 @@ type Config struct {
Strict bool
FetchTimeout time.Duration
StrategyTimeout time.Duration
Logger log.Logger
}

const (
Expand All @@ -50,6 +53,7 @@ var DefaultConfig = Config{
Strict: DefaultStrict,
FetchTimeout: DefaultFetchTimeout,
StrategyTimeout: DefaultStrategyTimeout,
Logger: log.Noop,
}

// NewConfigFromContext returns a new Config based on the context
Expand Down Expand Up @@ -86,6 +90,12 @@ func NewConfigFromContext(ctx context.Context, def Config) (conf Config, err err
return conf, e("strategy timeout")
}
}
if val := ctx.Value(loggerKey{}); val != nil {
conf.Logger, ok = val.(log.Logger)
if !ok {
return conf, e("strategy timeout")
}
}

return conf, nil
}
Expand All @@ -110,8 +120,13 @@ func SetStrategyTimeout(ctx context.Context, timeout time.Duration) context.Cont
return context.WithValue(ctx, strategyTimeoutKey{}, timeout)
}

// SetStrategyTimeout sets the timeout for each strategy
func SetLogger(ctx context.Context, l log.Logger) context.Context {
return context.WithValue(ctx, loggerKey{}, l)
}

// SetConfigInContext sets the config params in the context
func SetConfigInContext(ctx context.Context, s *Strategy, fallbackmode *bool, fetchTimeout, strategyTimeout *string) (context.Context, error) {
func SetConfigInContext(ctx context.Context, s *Strategy, fallbackmode *bool, fetchTimeout, strategyTimeout *string, logger log.Logger) (context.Context, error) {
if s != nil {
ctx = SetStrategy(ctx, *s)
}
Expand All @@ -136,5 +151,9 @@ func SetConfigInContext(ctx context.Context, s *Strategy, fallbackmode *bool, fe
ctx = SetStrategyTimeout(ctx, dur)
}

if logger != nil {
ctx = SetLogger(ctx, logger)
}

return ctx, nil
}

0 comments on commit d0c3e41

Please sign in to comment.