Skip to content

Commit

Permalink
tmp: trying to figure out context
Browse files Browse the repository at this point in the history
  • Loading branch information
zelig committed Dec 21, 2023
1 parent 52e4ada commit 37d46a3
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 20 deletions.
18 changes: 13 additions & 5 deletions pkg/api/bzz.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func (s *Service) fileUploadHandler(
response("invalid query params", logger, w)
return
}
fmt.Println("fileUploadHandler", "rlevel", rLevel)

p := requestPipelineFn(putter, encrypt, rLevel)
ctx := r.Context()
Expand Down Expand Up @@ -252,6 +253,7 @@ func (s *Service) fileUploadHandler(
func (s *Service) bzzDownloadHandler(w http.ResponseWriter, r *http.Request) {
logger := tracing.NewLoggerWithTraceID(r.Context(), s.logger.WithName("get_bzz_by_path").Build())

fmt.Println("bzzDownloadHandler", "r.URL.Path", r.URL.Path)
paths := struct {
Address swarm.Address `map:"address,resolve" validate:"required"`
Path string `map:"path"`
Expand All @@ -272,21 +274,29 @@ func (s *Service) serveReference(logger log.Logger, address swarm.Address, pathV
loggerV1 := logger.V(1).Build()

headers := struct {
Cache *bool `map:"Swarm-Cache"`
Cache *bool `map:"Swarm-Cache"`
Strategy getter.Strategy `map:"Swarm-Redundancy-Strategy"`
FallbackMode bool `map:"Swarm-Redundancy-Fallback-Mode"`
ChunkRetrievalTimeout string `map:"Swarm-Chunk-Retrieval-Timeout"`
}{}

if response := s.mapStructure(r.Header, &headers); response != nil {
fmt.Println("downloadHandler", "invalid header params")
response("invalid header params", logger, w)
return
}
cache := true
if headers.Cache != nil {
cache = *headers.Cache
}

fmt.Println("serveReference", "address", address, "pathVar", pathVar, "cache", cache)
ls := loadsave.NewReadonly(s.storer.Download(cache))
feedDereferenced := false

ctx := r.Context()
fmt.Println("serveReference", "Strategy", headers.Strategy, "FallbackMode", headers.FallbackMode, "ChunkRetrievalTimeout", headers.ChunkRetrievalTimeout)
getter.SetParamsInContext(ctx, headers.Strategy, headers.FallbackMode, headers.ChunkRetrievalTimeout)

FETCH:
// read manifest entry
Expand Down Expand Up @@ -367,7 +377,7 @@ FETCH:
jsonhttp.NotFound(w, "address not found or incorrect")
return
}

fmt.Println("bzz download: path", pathVar)
me, err := m.Lookup(ctx, pathVar)
if err != nil {
loggerV1.Debug("bzz download: invalid path", "address", address, "path", pathVar, "error", err)
Expand Down Expand Up @@ -475,9 +485,7 @@ func (s *Service) downloadHandler(logger log.Logger, w http.ResponseWriter, r *h

ctx := r.Context()
fmt.Println("downloadHandler", "Strategy", headers.Strategy, "FallbackMode", headers.FallbackMode, "ChunkRetrievalTimeout", headers.ChunkRetrievalTimeout)
ctx = getter.SetStrategy(ctx, headers.Strategy)
ctx = getter.SetStrict(ctx, !headers.FallbackMode)
ctx = getter.SetFetchTimeout(ctx, headers.ChunkRetrievalTimeout)
getter.SetParamsInContext(ctx, headers.Strategy, headers.FallbackMode, headers.ChunkRetrievalTimeout)
reader, l, err := joiner.New(ctx, s.storer.Download(cache), s.storer.Cache(), reference)
if err != nil {
if errors.Is(err, storage.ErrNotFound) || errors.Is(err, topology.ErrNotFound) {
Expand Down
11 changes: 7 additions & 4 deletions pkg/api/bzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ func TestBzzUploadWithRedundancy(t *testing.T) {
storerMock := mockstorer.NewWithChunkStore(mockstorer.NewForgettingStore(inmemchunkstore.New(), 2, fetchTimeout))
client, _, _, _ := newTestServer(t, testServerOptions{
Storer: storerMock,
Logger: log.Noop,
Post: mockpost.New(mockpost.WithAcceptAll()),
Logger: log.NewLogger("stdout", log.WithVerbosity(log.VerbosityDebug), log.WithCallerFunc()),
// Logger: log.Noop ,
Post: mockpost.New(mockpost.WithAcceptAll()),
})

type testCase struct {
Expand All @@ -51,7 +52,7 @@ func TestBzzUploadWithRedundancy(t *testing.T) {
encrypt string
}
var tcs []testCase
for _, encrypt := range []string{"true", "false"} {
for _, encrypt := range []string{"false", "true"} {
for _, level := range []string{"0", "1", "2", "3", "4"} {
for _, size := range []int{1, 42, 420, 4095, 4096, 4200, 128*4096 + 4095} {
tcs = append(tcs, testCase{
Expand Down Expand Up @@ -85,6 +86,7 @@ func TestBzzUploadWithRedundancy(t *testing.T) {
)

t.Run("download without redundancy should fail", func(t *testing.T) {
t.Skip("no")
req, err := http.NewRequest("GET", fileDownloadResource(refResponse.Reference.String()), nil)
if err != nil {
t.Fatal(err)
Expand All @@ -104,7 +106,8 @@ func TestBzzUploadWithRedundancy(t *testing.T) {
}
})

t.Run("download with redundancy should fail", func(t *testing.T) {
t.Run("download with redundancy should succeed", func(t *testing.T) {
// t.Skip("no")
req, err := http.NewRequest("GET", fileDownloadResource(refResponse.Reference.String()), nil)
if err != nil {
t.Fatal(err)
Expand Down
19 changes: 12 additions & 7 deletions pkg/file/joiner/joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package joiner
import (
"context"
"errors"
"fmt"
"io"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -52,6 +53,7 @@ type decoderCache struct {

// NewDecoderCache creates a new decoder cache
func NewDecoderCache(g storage.Getter, p storage.Putter, strategy getter.Strategy, strict bool, fetcherTimeout time.Duration) *decoderCache {
fmt.Println("NewDecoderCache", strategy, strict, fetcherTimeout)
return &decoderCache{
fetcher: g,
putter: p,
Expand Down Expand Up @@ -118,23 +120,26 @@ func New(ctx context.Context, g storage.Getter, putter storage.Putter, address s
spanFn := func(data []byte) (redundancy.Level, int64) {
return 0, int64(bmt.LengthFromSpan(data[:swarm.SpanSize]))
}
var strategy getter.Strategy
var strict bool
var fetcherTimeout time.Duration
strategy, strict, fetcherTimeout, err := getter.GetParamsFromContext(ctx)
if err != nil {
fmt.Printf("get params from context: %v\n", err)
return nil, 0, err
}
// override stuff if root chunk has redundancy
if rLevel != redundancy.NONE {
_, parities := file.ReferenceCount(uint64(span), rLevel, encryption)
rootParity = parities
strategy, strict, fetcherTimeout, err = getter.GetParamsFromContext(ctx)
if err != nil {
return nil, 0, err
}

spanFn = chunkToSpan
if encryption {
maxBranching = rLevel.GetMaxEncShards()
} else {
maxBranching = rLevel.GetMaxShards()
}
} else {
// if root chunk has no redundancy, strategy is ignored and set to NONE and strict is set to true
strategy = getter.NONE
strict = true
}

j := &joiner{
Expand Down
25 changes: 21 additions & 4 deletions pkg/file/redundancy/getter/strategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package getter
import (
"context"
"fmt"
"strconv"
"time"
)

Expand All @@ -31,15 +32,25 @@ const (

// GetParamsFromContext extracts the strategy and strict mode from the context
func GetParamsFromContext(ctx context.Context) (s Strategy, strict bool, fetcherTimeout time.Duration, err error) {
var ok bool
s, ok = ctx.Value(strategyKey{}).(Strategy)
// var ok bool
sStr, ok := ctx.Value(strategyKey{}).(string)
if !ok {
return s, strict, fetcherTimeout, fmt.Errorf("error setting strategy from context")
}
strict, ok = ctx.Value(modeKey{}).(bool)
sNum, err := strconv.ParseUint(sStr, 10, 2)
if err != nil {
return s, strict, fetcherTimeout, fmt.Errorf("error parsing strategy from context")
}
s = Strategy(sNum)

strictStr, ok := ctx.Value(modeKey{}).(string)
if !ok {
return s, strict, fetcherTimeout, fmt.Errorf("error setting fallback mode from context")
}
strict, err = strconv.ParseBool(strictStr)
if err != nil {
return s, strict, fetcherTimeout, fmt.Errorf("error parsing fallback mode from context")
}
fetcherTimeoutVal, ok := ctx.Value(fetcherTimeoutKey{}).(string)
if !ok {
return s, strict, fetcherTimeout, fmt.Errorf("error setting fetcher timeout from context")
Expand All @@ -58,14 +69,20 @@ func SetFetchTimeout(ctx context.Context, timeout string) context.Context {

// SetStrategy sets the strategy for the retrieval
func SetStrategy(ctx context.Context, s Strategy) context.Context {
return context.WithValue(ctx, strategyKey{}, s)
return context.WithValue(ctx, strategyKey{}, int(s))
}

// SetStrict sets the strict mode for the retrieval
func SetStrict(ctx context.Context, strict bool) context.Context {
return context.WithValue(ctx, modeKey{}, strict)
}

func SetParamsInContext(ctx context.Context, s Strategy, fallbackmode bool, fetchTimeout string) context.Context {
ctx = SetStrategy(ctx, s)
ctx = SetStrict(ctx, !fallbackmode)
ctx = SetFetchTimeout(ctx, fetchTimeout)
return ctx
}
func (g *decoder) prefetch(ctx context.Context, strategy int, strict bool, strategyTimeout, fetchTimeout time.Duration) {
if strict && strategy == NONE {
return
Expand Down

0 comments on commit 37d46a3

Please sign in to comment.