Skip to content

Commit

Permalink
feat(replicas): new replicas pkg for redundancy by dispersed replicas
Browse files Browse the repository at this point in the history
  • Loading branch information
zelig committed Nov 22, 2023
1 parent acc4750 commit 9d519c0
Show file tree
Hide file tree
Showing 7 changed files with 759 additions and 0 deletions.
11 changes: 11 additions & 0 deletions pkg/replicas/export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright 2023 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package replicas

var (
Counts = counts
Signer = signer
)

Check failure on line 11 in pkg/replicas/export_test.go

View workflow job for this annotation

GitHub Actions / Lint

File is not `gofmt`-ed with `-s` (gofmt)
151 changes: 151 additions & 0 deletions pkg/replicas/getter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// Copyright 2023 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// the code below implements the integration of dispersed replicas in chunk fetching.
// using storage.Getter interface.
package replicas

import (
"context"
"errors"
"time"

"github.com/ethersphere/bee/pkg/file/redundancy"
"github.com/ethersphere/bee/pkg/soc"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)

// ErrSwarmageddon is returned in case of a vis mayor called Swarmageddon.
// Swarmageddon is the situation when none of the replicas can be retrieved.
// If 2^{depth} replicas were uploaded and they all have valid postage stamps
// then the probability of Swarmageddon is less than 0.000001
// assuming the error rate of chunk retrievals stays below the level expressed
// as depth by the publisher.
type ErrSwarmageddon struct {
error
}

func (err *ErrSwarmageddon) Unwrap() []error {
if err == nil || err.error == nil {
return nil
}
return err.error.(interface{ Unwrap() []error }).Unwrap()

Check failure on line 34 in pkg/replicas/getter.go

View workflow job for this annotation

GitHub Actions / Lint

type assertion on error will fail on wrapped errors. Use errors.As to check for specific errors (errorlint)
}

// getter the private implementation of storage.Getter, an interface for
// retrieving chunks. This getter embeds the original simple chunk getter and extends it
// to a multiplexed variant that fetches chunks with replicas.
//
// the strategy to retrieve a chunk that has replicas can be configured with a few parameters:
// - RetryInterval: the delay before a new batch of replicas is fetched.
// - depth: 2^{depth} is the total number of additional replicas that have been uploaded
// (by default, it is assumed to be 4, ie. total of 16)
// - (not implemented) pivot: replicas with address in the proximity of pivot will be tried first
type getter struct {
storage.Getter
level redundancy.Level
}

// NewGetter is the getter constructor
func NewGetter(g storage.Getter, level redundancy.Level) storage.Getter {
return &getter{g, level}
}

// Get makes the getter satisfy the storage.Getter interface
func (g *getter) Get(ctx context.Context, addr swarm.Address) (ch swarm.Chunk, err error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// channel that the results (retrieved chunks) are gathered to from concurrent
// workers each fetching a replica
resultC := make(chan swarm.Chunk)
// errc collects the errors
errc := make(chan error, 17)
var errs []error
errcnt := 0

// concurrently call to retrieve chunk using original CAC address
go func() {
ch, err := g.Getter.Get(ctx, addr)
if err != nil {
errc <- err
return
}
select {
case resultC <- ch:
case <-ctx.Done():
}
}()
// counters
n := 0 // counts the replica addresses tried
target := 2 // the number of replicas attempted to download in this batch
total := counts[g.level] // total number of replicas allowed (and makes sense) to retrieve

//
rr := newReplicator(addr, uint8(g.level))
next := rr.c
var wait <-chan time.Time // nil channel to disable case
// addresses used are doubling each period of search expansion
// (at intervals of RetryInterval)
ticker := time.NewTicker(RetryInterval)
defer ticker.Stop()
for level := uint8(0); level <= uint8(g.level); {
select {
// at least one chunk is retrieved, cancel the rest and return early
case chunk := <-resultC:
return chunk, nil

case err = <-errc:
errs = append(errs, err)
errcnt++
if errcnt > total {
return nil, &ErrSwarmageddon{errors.Join(errs...)}
}

// ticker switches on the address channel
case <-wait:
wait = nil
next = rr.c
continue

// getting the addresses in order
case so := <-next:
if so == nil {
next = nil
continue
}

go func() {
ch, err := g.Getter.Get(ctx, swarm.NewAddress(so.addr))
if err != nil {
errc <- err
return
}

soc, err := soc.FromChunk(ch)
if err != nil {
errc <- err
return
}

select {
case resultC <- soc.WrappedChunk():
case <-ctx.Done():
}
}()
n++
if n < target {
continue
}
next = nil
wait = ticker.C
target *= 2
level++
n = 0
}
}

return nil, nil
}
149 changes: 149 additions & 0 deletions pkg/replicas/getter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright 2023 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package replicas_test

import (
"context"
"crypto/rand"
"errors"
"fmt"
"io"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/ethersphere/bee/pkg/cac"
"github.com/ethersphere/bee/pkg/file/redundancy"
"github.com/ethersphere/bee/pkg/replicas"
"github.com/ethersphere/bee/pkg/soc"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)

type testGetter struct {
ch swarm.Chunk
c chan swarm.Address
firstFound int32
origCalled bool
}

func (tg *testGetter) Get(ctx context.Context, addr swarm.Address) (ch swarm.Chunk, err error) {

ch = tg.ch
if addr.Equal(tg.ch.Address()) {
tg.origCalled = true
} else {
ch, err = soc.New(addr.Bytes(), ch).Sign(replicas.Signer)
if err != nil {
return nil, err
}
}
tg.c <- addr
if i := atomic.AddInt32(&tg.firstFound, int32(-1)); i >= 0 {
return nil, storage.ErrNotFound
}
return ch, nil
}

func newTestGetter(ch swarm.Chunk, firstFound int) *testGetter {
return &testGetter{ch, make(chan swarm.Address), int32(firstFound), false}
}

func (tg *testGetter) Close() error {
return nil
}

func TestGetter(t *testing.T) {
t.Run("Levels and dispersion", func(t *testing.T) {
chunkLen := 420
buf := make([]byte, chunkLen)
if _, err := io.ReadFull(rand.Reader, buf); err != nil {
t.Fatal(err)
}
ch, err := cac.New(buf)
if err != nil {
t.Fatal(err)
}
retryInterval := replicas.RetryInterval
defer func() { replicas.RetryInterval = retryInterval }()
replicas.RetryInterval = 20 * time.Millisecond
for level, c := range replicas.Counts {
for j := 0; j <= 16; j++ {

t.Run(fmt.Sprintf("redundancy:%d, first found:%d", level, j), func(t *testing.T) {
ctx := context.Background()
store := newTestGetter(ch, j)
defer store.Close()

now := time.Now()
var addrs []swarm.Address
var latencies []time.Duration
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
wg.Done()
for addr := range store.c {
if ch.Address().Equal(addr) {
continue
}
addrs = append(addrs, addr)
latencies = append(latencies, time.Now().Sub(now))

Check failure on line 93 in pkg/replicas/getter_test.go

View workflow job for this annotation

GitHub Actions / Lint

S1012: should use `time.Since` instead of `time.Now().Sub` (gosimple)
}

}()
g := replicas.NewGetter(store, redundancy.Level(level))
_, err := g.Get(ctx, ch.Address())
wg.Wait()
t.Run("correctly returned error", func(t *testing.T) {
if j <= c {
if err != nil {
t.Fatalf("expected no error. got %v", err)
}
return
}
esg, ok := err.(*replicas.ErrSwarmageddon)

Check failure on line 107 in pkg/replicas/getter_test.go

View workflow job for this annotation

GitHub Actions / Lint

type assertion on error will fail on wrapped errors. Use errors.As to check for specific errors (errorlint)
if !ok {
t.Fatalf("incorrect error. want Swarmageddon. got %v", err)
}
errs := esg.Unwrap()
var length int
for length, err = range errs {
if !errors.Is(err, storage.ErrNotFound) {
t.Fatalf("incorrect error. want it to wrap %v. got %v", storage.ErrNotFound, err)
}
}
expLen := replicas.Counts[level] + 1
if length+1 != expLen {
t.Fatalf("incorrect error. want %d. got %d", expLen, length)
}
})
if j < c {
t.Run("original address called", func(t *testing.T) {
if !store.origCalled {
t.Fatal("origial chunk not attempted for retrieval")
}
})
}
strict := false
t.Run("dispersion", func(t *testing.T) {
if err := dispersed(redundancy.Level(level), ch, strict, addrs...); err != nil {
t.Fatalf("addresses are not dispersed: %v", err)
}
})
t.Run("latency", func(t *testing.T) {
for i, latency := range latencies {
multiplier := latency / replicas.RetryInterval
fmt.Printf("%d,lat=%v,mm=%v\n", i, latency, multiplier)

Check failure on line 139 in pkg/replicas/getter_test.go

View workflow job for this annotation

GitHub Actions / Lint

use of `fmt.Printf` forbidden by pattern `^(fmt\.Print(|f|ln)|print|println)$` (forbidigo)
if multiplier > 0 && i < replicas.Counts[multiplier-1] {
t.Fatalf("incorrect latency for retrieving replica %d: %v", i, err)
}
}
})
})
}
}
})
}
60 changes: 60 additions & 0 deletions pkg/replicas/putter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// the code below implements the integration of dispersed replicas in chunk upload.
// using storage.Putter interface.
package replicas

import (
"context"
"errors"
"sync"

"github.com/ethersphere/bee/pkg/soc"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)

// putter is the private implementation of the public storage.Putter interface
// putter extends the original putter to a concurrent multiputter
type putter struct {
putter storage.Putter
}

// NewPutter is the putter constructor
func NewPutter(p storage.Putter) storage.Putter {
return &putter{p}
}

// Put makes the getter satisfy the storage.Getter interface
func (p *putter) Put(ctx context.Context, ch swarm.Chunk) (err error) {
rlevel := getLevelFromContext(ctx)
errs := []error{p.putter.Put(ctx, ch)}
if rlevel == 0 {
return errs[0]
}

rr := newReplicator(ch.Address(), uint8(rlevel))
errc := make(chan error, counts[rlevel])
wg := sync.WaitGroup{}
for r := range rr.c {
r := r
wg.Add(1)
go func() {
defer wg.Done()
sch, err := soc.New(r.id, ch).Sign(signer)
if err == nil {
err = p.putter.Put(ctx, sch)
}
errc <- err
}()
}

wg.Wait()
close(errc)
for err := range errc {
errs = append(errs, err)
}
return errors.Join(errs...)
}
Loading

0 comments on commit 9d519c0

Please sign in to comment.