Skip to content

Commit

Permalink
feat(replicas): new replicas pkg for redundancy by dispersed replicas
Browse files Browse the repository at this point in the history
  • Loading branch information
zelig committed Nov 9, 2023
1 parent 48a603c commit a1aaedb
Show file tree
Hide file tree
Showing 3 changed files with 348 additions and 0 deletions.
132 changes: 132 additions & 0 deletions pkg/replicas/getter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// the code below implements the integration of dispersed replicas in chunk fetching.
// using storage.Getter interface.
package replicas

import (
"context"
"errors"
"fmt"
"time"

"github.com/ethersphere/bee/pkg/file/pipeline/redundancy"

Check failure on line 15 in pkg/replicas/getter.go

View workflow job for this annotation

GitHub Actions / Test (ubuntu-latest)

no required module provides package github.com/ethersphere/bee/pkg/file/pipeline/redundancy; to add it:

Check failure on line 15 in pkg/replicas/getter.go

View workflow job for this annotation

GitHub Actions / Test (flaky)

no required module provides package github.com/ethersphere/bee/pkg/file/pipeline/redundancy; to add it:
"github.com/ethersphere/bee/pkg/soc"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)

// getter is the private implementation of storage.Getter, an interface for
// retrieving chunks.
// this getter embeds the original simple chunk getter and extends it to a
// multiplexed variant whereby the replicas of a chunk are fetched concurrently
//
// the exact strategy of concurrent retrieval can be configured around a few parameters
// - RetryInterval: the time it takes before a new batch of replicas is fetched
// - Pivot: replicas with address in the proximity of pivot will be tried first
// - total: the total number of replicas assumed to exists (by default, it is assumed to be 16)
type getter struct {
getter storage.Getter
level redundancy.Level
}

// Get implements what the getter needs to satisfy the storage.Getter interface
func (g getter) Get(ctx context.Context, addr swarm.Address) (swarm.Chunk, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// channel the results (retrieved chunks) are gathered to from concurrent
// workers each fetching a replica
resultC := make(chan swarm.Chunk)
// errc collects the errors
errc := make(chan error, 18)
var errs []error
errcnt := 0

// concurrently call to retrieve chunk using original CAC address
go func() {
ch, err := g.getter.Get(ctx, addr)
if err != nil {
errc <- err
return
}
select {
case resultC <- ch:
case <-ctx.Done():
}
}()
s := newSorter(addr)
n := 0
target := 2

// setting total replicas based on redundancy parameter (default to paranoid)
rlevel := redundancy.PARANOID
if val := ctx.Value("redundancyLevel"); val != nil {
rlevel, _ = val.(redundancy.Level)
}
total := counts[int(rlevel)]

addressC := make(chan swarm.Address, 17)
go s.addresses(ctx, total, addressC)
next := addressC
var wait <-chan time.Time

// addresses used are doubling each period of search expansion
// (at intervals of RetryInterval)
ticker := time.NewTicker(RetryInterval)
defer ticker.Stop()
for s.level <= uint8(rlevel) {
select {
// at least one chunk is retrieved, cancel the rest and return early
case chunk := <-resultC:
return chunk, nil

case err := <-errc:
errs = append(errs, err)
errcnt++
if errcnt > total {
return nil, fmt.Errorf("unable to retrieve any replicas: %w", errors.Join(errs...))
}
// ticker switches on the address channel
case <-wait:
wait = nil
next = addressC
continue

// getting the addresses in order
case addr := <-next:
go func() {
ch, err := g.getter.Get(ctx, addr)
if err != nil {
errc <- err
return
}

soc, err := soc.FromChunk(ch)
if err != nil {
errc <- err
return
}

select {
case resultC <- soc.WrappedChunk():
case <-ctx.Done():
}
}()

n++
if n < target {
continue
}

next = nil
wait = ticker.C
target *= 2
s.level++
n = 0
}
}
return nil, nil
}
47 changes: 47 additions & 0 deletions pkg/replicas/putter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@

type putter struct {

Check failure on line 2 in pkg/replicas/putter.go

View workflow job for this annotation

GitHub Actions / Test (ubuntu-latest)

expected 'package', found 'type'

Check failure on line 2 in pkg/replicas/putter.go

View workflow job for this annotation

GitHub Actions / Lint

expected 'package', found 'type' (typecheck)

Check failure on line 2 in pkg/replicas/putter.go

View workflow job for this annotation

GitHub Actions / Test (flaky)

expected 'package', found 'type'
putter storage.Putter
}

func NewPutter(p storage.Putter) storage.Putter {
return &putter{p}
}

func (p *putter) Put(ctx context.Context, ch swarm.Chunk) (err error) {
s := newSorter(ch.Address())
rlevel := redundancy.PARANOID
if val := ctx.Value("redundancyLevel"); val != nil {
rlevel, _ = val.(redundancy.Level)
}
target := counts[rlevel]

c := make(chan []byte, 17)
s.ids(ctx, target, c)
errc := make(chan error, 2*target)
var errs []error

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
wg.Wait()
close(errc)
}()
for id := range c {
wg.Add(1)
go func() {
defer wg.Done()
sc := soc.New(id, ch)
sch, err := sc.Sign(signer)
if err != nil {
errc <- err
return
}
errc <- p.putter.Put(ctx, sch)
}()
}
wg.Done()
for err := range errc {
errs = append(errs, err)
}
return errors.Join(errs...)
}
169 changes: 169 additions & 0 deletions pkg/replicas/replicas.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// Copyright 2020 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Package replicas implements a scheme to replicate chunks
// in such a way that
// - the replicas are optimally dispersed to aid cross-neighbourhood redundancy
// - the replicas addresses can be deduced by retrievers only knowing the address
// of the original content addressed chunk
// - no new chunk validation rules are introduced
// -
package replicas

import (
"context"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/file/pipeline/redundancy"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)

var (
// RetryInterval is the duration between successive additional requests
RetryInterval = 300 * time.Millisecond
//
// counts of replicas used for levels of increasing security
// the actual number of replicas needed to keep the error rate below 1/10^6
// for the five levels of redundancy are 0, 2, 4, 5, 19
// we use an approximation as the successive powers of 2
counts = []int{0, 2, 4, 8, 16}
)

var (
signer crypto.Signer
owner common.Address
)

func init() {
privKey, _ := crypto.DecodeSecp256k1PrivateKey(swarm.Address{}.Bytes())
signer = crypto.NewDefaultSigner(privKey)
owner, _ = signer.EthereumAddress()
}

func NewGetter(g storage.Getter, level redundancy.Level) storage.Getter {
return &getter{g, level}
}

// sorter running the und for replicas
type sorter struct {
addr []byte // chunk address
queue [16]*uint8 // to sort addresses according to di
exists [16]bool // maps the 16 distinct nibbles
level uint8 // 2^level dispersed ones
}

// newSorter sorter constructor
func newSorter(addr swarm.Address) *sorter {
return &sorter{addr: addr.Bytes()}
}

// socOpt is params of the mined SOC chunk (address)
type socOpt struct {
addr, id []byte // byte slice of SOC address and SOC ID
nh uint8
}

func (so *socOpt) Address() swarm.Address {
return swarm.NewAddress(so.addr)
}

func (s *sorter) newSocOpt(i uint8) (sp *socOpt, err error) {
h := swarm.NewHasher()
// change the last byte of the address to create SOC ID
id := make([]byte, 32)
copy(id, s.addr)
// calculate SOC address for potential replica
for _, v := range [][]byte{id, owner.Bytes()} {
_, err := h.Write(v)
if err != nil {
return nil, err
}
}
addr := h.Sum(nil)
nh := addr[0] << 4
return &socOpt{addr, id, nh}, nil
}

// insert gets the neighbourhood index i and places it onto the multi-queue
// so that addresses are always optimally dispersed
// in successive sets of addresses being double the numnber of addresses
// optimal dispersion means maximum distance of neighbourhoods
// ie., the binary tree representing the new addresses is balanced
func (s *sorter) insert(i uint8) (stop bool) {
level := s.level
n := 0
nextFound := true

for ; n < counts[level]; n++ {

so, err := s.newSocOpt(i)
switch {
case err != nil || s.exists[so.nh]:
continue
case s.queue[n] == nil:
break
case level == 3:
continue
}
nextFound = false

if po := poi(i, *(s.queue[n])); po >= level {
level = po + 1
n = counts[level]
}
}

s.queue[n] = &i
return nextFound
}

// poi calculates the proximity order of a and b 's first 4 i=biy=yd
func poi(a, b uint8) uint8 {
for i, c := range []byte{128, 64, 32, 16} {
if (a^b)&c > 0 {
return uint8(i)
}
}
return 5
}

func (s *sorter) addresses(ctx context.Context, target int, c chan swarm.Address) {
n := 0
for i := uint8(0); n < 16 && i <= 255; i++ {
so, err := s.newSocOpt(i)
if err != nil || s.exists[so.nh] {
continue
}
s.exists[so.nh] = true

n++
select {
case c <- so.Address():
case <-ctx.Done():
break
}
}
}

func (s *sorter) ids(ctx context.Context, target int, c chan []byte) {

n := 0
for i := uint8(0); n < 16 && i <= 255; i++ {
so, err := s.newSocOpt(i)
if err != nil || s.exists[so.nh] {
continue
}
s.exists[so.nh] = true

n++
select {
case c <- so.id:
case <-ctx.Done():
return
}
}
}

0 comments on commit a1aaedb

Please sign in to comment.