Skip to content

Commit

Permalink
fix(gsoc): improvements (#4899)
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed Nov 11, 2024
1 parent 2c00152 commit 4cf5ab6
Show file tree
Hide file tree
Showing 17 changed files with 112 additions and 181 deletions.
3 changes: 1 addition & 2 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ import (
"github.com/ethersphere/bee/v2/pkg/settlement/swap"
"github.com/ethersphere/bee/v2/pkg/settlement/swap/chequebook"
"github.com/ethersphere/bee/v2/pkg/settlement/swap/erc20"
"github.com/ethersphere/bee/v2/pkg/soc"
"github.com/ethersphere/bee/v2/pkg/status"
"github.com/ethersphere/bee/v2/pkg/steward"
storage "github.com/ethersphere/bee/v2/pkg/storage"
Expand Down Expand Up @@ -687,7 +686,7 @@ type putterSessionWrapper struct {
}

func (p *putterSessionWrapper) Put(ctx context.Context, chunk swarm.Chunk) error {
idAddress, err := soc.IdentityAddress(chunk)
idAddress, err := storage.IdentityAddress(chunk)
if err != nil {
return err
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/api/gsoc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ func (s *Service) gsocWsHandler(w http.ResponseWriter, r *http.Request) {
logger := s.logger.WithName("gsoc_subscribe").Build()

paths := struct {
Address []byte `map:"address" validate:"required"`
Address swarm.Address `map:"address,resolve" validate:"required"`
}{}

if response := s.mapStructure(mux.Vars(r), &paths); response != nil {
response("invalid path params", logger, w)
return
Expand All @@ -43,7 +44,7 @@ func (s *Service) gsocWsHandler(w http.ResponseWriter, r *http.Request) {
go s.gsocListeningWs(conn, paths.Address)
}

func (s *Service) gsocListeningWs(conn *websocket.Conn, socAddress []byte) {
func (s *Service) gsocListeningWs(conn *websocket.Conn, socAddress swarm.Address) {
defer s.wsWg.Done()

var (
Expand All @@ -56,7 +57,7 @@ func (s *Service) gsocListeningWs(conn *websocket.Conn, socAddress []byte) {
ticker.Stop()
_ = conn.Close()
}()
cleanup := s.gsoc.Subscribe([32]byte(socAddress), func(m []byte) {
cleanup := s.gsoc.Subscribe(socAddress, func(m []byte) {
select {
case dataC <- m:
case <-gone:
Expand Down
34 changes: 16 additions & 18 deletions pkg/gsoc/gsoc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,18 @@ import (
"github.com/ethersphere/bee/v2/pkg/swarm"
)

// Handler defines code to be executed upon reception of a GSOC sub message.
// it is used as a parameter definition.
type Handler func([]byte)

type Listener interface {
Subscribe(address [32]byte, handler Handler) (cleanup func())
Subscribe(address swarm.Address, handler Handler) (cleanup func())
Handle(c *soc.SOC)
Close() error
}

type listener struct {
handlers map[[32]byte][]*Handler
handlers map[string][]*Handler
handlersMu sync.Mutex
quit chan struct{}
logger log.Logger
Expand All @@ -29,26 +33,26 @@ type listener struct {
func New(logger log.Logger) Listener {
return &listener{
logger: logger,
handlers: make(map[[32]byte][]*Handler),
handlers: make(map[string][]*Handler),
quit: make(chan struct{}),
}
}

// Subscribe allows the definition of a Handler func on a specific GSOC address.
func (l *listener) Subscribe(address [32]byte, handler Handler) (cleanup func()) {
func (l *listener) Subscribe(address swarm.Address, handler Handler) (cleanup func()) {
l.handlersMu.Lock()
defer l.handlersMu.Unlock()

l.handlers[address] = append(l.handlers[address], &handler)
l.handlers[address.ByteString()] = append(l.handlers[address.ByteString()], &handler)

return func() {
l.handlersMu.Lock()
defer l.handlersMu.Unlock()

h := l.handlers[address]
h := l.handlers[address.ByteString()]
for i := 0; i < len(h); i++ {
if h[i] == &handler {
l.handlers[address] = append(h[:i], h[i+1:]...)
l.handlers[address.ByteString()] = append(h[:i], h[i+1:]...)
return
}
}
Expand All @@ -61,13 +65,11 @@ func (l *listener) Handle(c *soc.SOC) {
if err != nil {
return // no handler
}
h := l.getHandlers([32]byte(addr.Bytes()))
h := l.getHandlers(addr)
if h == nil {
return // no handler
}
l.logger.Info("new incoming GSOC message",
"GSOC Address", addr,
"wrapped chunk address", c.WrappedChunk().Address())
l.logger.Debug("new incoming GSOC message", "GSOC Address", addr, "wrapped chunk address", c.WrappedChunk().Address())

for _, hh := range h {
go func(hh Handler) {
Expand All @@ -76,23 +78,19 @@ func (l *listener) Handle(c *soc.SOC) {
}
}

func (p *listener) getHandlers(address [32]byte) []*Handler {
func (p *listener) getHandlers(address swarm.Address) []*Handler {
p.handlersMu.Lock()
defer p.handlersMu.Unlock()

return p.handlers[address]
return p.handlers[address.ByteString()]
}

func (l *listener) Close() error {
close(l.quit)
l.handlersMu.Lock()
defer l.handlersMu.Unlock()

l.handlers = make(map[[32]byte][]*Handler) //unset handlers on shutdown
l.handlers = make(map[string][]*Handler) //unset handlers on shutdown

return nil
}

// Handler defines code to be executed upon reception of a GSOC sub message.
// it is used as a parameter definition.
type Handler func([]byte)
8 changes: 4 additions & 4 deletions pkg/gsoc/gsoc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestRegister(t *testing.T) {
t.Parallel()

var (
g = gsoc.New(log.NewLogger("test"))
g = gsoc.New(log.Noop)
h1Calls = 0
h2Calls = 0
h3Calls = 0
Expand Down Expand Up @@ -52,8 +52,8 @@ func TestRegister(t *testing.T) {
msgChan <- struct{}{}
}
)
_ = g.Subscribe([32]byte(address1.Bytes()), h1)
_ = g.Subscribe([32]byte(address2.Bytes()), h2)
_ = g.Subscribe(address1, h1)
_ = g.Subscribe(address2, h2)

ch1, _ := cac.New(payload1)
socCh1 := soc.New(socId1, ch1)
Expand All @@ -74,7 +74,7 @@ func TestRegister(t *testing.T) {
ensureCalls(t, &h2Calls, 0)

// register another handler on the first address
cleanup := g.Subscribe([32]byte(address1.Bytes()), h3)
cleanup := g.Subscribe(address1, h3)

g.Handle(socCh1)

Expand Down
4 changes: 2 additions & 2 deletions pkg/postage/stamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/postage"
"github.com/ethersphere/bee/v2/pkg/postage/batchstore/mock"
postagetesting "github.com/ethersphere/bee/v2/pkg/postage/testing"
"github.com/ethersphere/bee/v2/pkg/soc"
"github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storage/inmemstore"
chunktesting "github.com/ethersphere/bee/v2/pkg/storage/testing"
)
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestValidStamp(t *testing.T) {
// stamp on execution
ch := chunktesting.GenerateTestRandomChunk()

idAddress, err := soc.IdentityAddress(ch)
idAddress, err := storage.IdentityAddress(ch)
if err != nil {
t.Fatal(err)
}
Expand Down
23 changes: 10 additions & 13 deletions pkg/pusher/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/postage"
"github.com/ethersphere/bee/v2/pkg/pushsync"
"github.com/ethersphere/bee/v2/pkg/soc"
storage "github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/swarm"
"github.com/ethersphere/bee/v2/pkg/topology"
Expand All @@ -36,6 +35,8 @@ type Op struct {
Err chan error
Direct bool
Span opentracing.Span

identityAddress swarm.Address
}

type OpChan <-chan *Op
Expand Down Expand Up @@ -215,10 +216,12 @@ func (s *Service) chunksWorker(warmupTime time.Duration) {
for {
select {
case op := <-cc:
idAddress, err := soc.IdentityAddress(op.Chunk)
idAddress, err := storage.IdentityAddress(op.Chunk)
if err != nil {
op.Err <- err
continue
}
op.identityAddress = idAddress
if s.inflight.set(idAddress, op.Chunk.Stamp().BatchID()) {
if op.Direct {
select {
Expand All @@ -245,12 +248,8 @@ func (s *Service) chunksWorker(warmupTime time.Duration) {

func (s *Service) pushDeferred(ctx context.Context, logger log.Logger, op *Op) (bool, error) {
loggerV1 := logger.V(1).Build()
idAddress, err := soc.IdentityAddress(op.Chunk)
if err != nil {
return true, err
}

defer s.inflight.delete(idAddress, op.Chunk.Stamp().BatchID())
defer s.inflight.delete(op.identityAddress, op.Chunk.Stamp().BatchID())

if _, err := s.validStamp(op.Chunk); err != nil {
loggerV1.Warning(
Expand Down Expand Up @@ -278,7 +277,7 @@ func (s *Service) pushDeferred(ctx context.Context, logger log.Logger, op *Op) (
return true, err
}
case errors.Is(err, pushsync.ErrShallowReceipt):
if retry := s.shallowReceipt(idAddress); retry {
if retry := s.shallowReceipt(op.identityAddress); retry {
return true, err
}
if err := s.storer.Report(ctx, op.Chunk, storage.ChunkSynced); err != nil {
Expand All @@ -300,13 +299,11 @@ func (s *Service) pushDeferred(ctx context.Context, logger log.Logger, op *Op) (

func (s *Service) pushDirect(ctx context.Context, logger log.Logger, op *Op) error {
loggerV1 := logger.V(1).Build()
idAddress, err := soc.IdentityAddress(op.Chunk)
if err != nil {
return err
}

var err error

defer func() {
s.inflight.delete(idAddress, op.Chunk.Stamp().BatchID())
s.inflight.delete(op.identityAddress, op.Chunk.Stamp().BatchID())
select {
case op.Err <- err:
default:
Expand Down
2 changes: 1 addition & 1 deletion pkg/pusher/pusher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func TestPusherRetryShallow(t *testing.T) {
// generate a chunk at PO 1 with closestPeer, meaning that we get a
// receipt which is shallower than the pivot peer's depth, resulting
// in retries
chunk := testingc.GenerateTestRandomChunkAt(t, closestPeer, 1)
chunk := testingc.GenerateValidRandomChunkAt(t, closestPeer, 1)

storer.chunks <- chunk

Expand Down
2 changes: 1 addition & 1 deletion pkg/pushsync/pushsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo
sentErrorsLeft = maxPushErrors
}

idAddress, err := soc.IdentityAddress(ch)
idAddress, err := storage.IdentityAddress(ch)
if err != nil {
return nil, err
}
Expand Down
34 changes: 0 additions & 34 deletions pkg/soc/utils.go

This file was deleted.

33 changes: 33 additions & 0 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

var (
ErrOverwriteNewerChunk = errors.New("overwriting chunk with newer timestamp")
ErrUnknownChunkType = errors.New("unknown chunk type")
)

// Result represents the item returned by the read operation, which returns
Expand Down Expand Up @@ -293,3 +294,35 @@ func ChunkType(ch swarm.Chunk) swarm.ChunkType {
}
return swarm.ChunkTypeUnspecified
}

// IdentityAddress returns the internally used address for the chunk
// since the single owner chunk address is not a unique identifier for the chunk,
// but hashing the soc address and the wrapped chunk address is.
// it is used in the reserve sampling and other places where a key is needed to represent a chunk.
func IdentityAddress(chunk swarm.Chunk) (swarm.Address, error) {

if cac.Valid(chunk) {
return chunk.Address(), nil
}

// check the chunk is single owner chunk or cac
if sch, err := soc.FromChunk(chunk); err == nil {
socAddress, err := sch.Address()
if err != nil {
return swarm.ZeroAddress, err
}
h := swarm.NewHasher()
_, err = h.Write(socAddress.Bytes())
if err != nil {
return swarm.ZeroAddress, err
}
_, err = h.Write(sch.WrappedChunk().Address().Bytes())
if err != nil {
return swarm.ZeroAddress, err
}

return swarm.NewAddress(h.Sum(nil)), nil
}

return swarm.ZeroAddress, fmt.Errorf("identity address failed on chunk %s: %w", chunk, ErrUnknownChunkType)
}
8 changes: 4 additions & 4 deletions pkg/soc/utils_test.go → pkg/storage/storage_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2024 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package soc_test
package storage_test

import (
"encoding/hex"
Expand All @@ -10,10 +10,10 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethersphere/bee/v2/pkg/cac"
"github.com/ethersphere/bee/v2/pkg/soc"
"github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/swarm"
)

// TestIdentityAddress tests the IdentityAddress function.
func TestIdentityAddress(t *testing.T) {
t.Run("single owner chunk", func(t *testing.T) {
// Create a single owner chunk (SOC)
Expand Down Expand Up @@ -43,7 +43,7 @@ func TestIdentityAddress(t *testing.T) {
t.Fatal(err)
}

idAddr, err := soc.IdentityAddress(schChunk)
idAddr, err := storage.IdentityAddress(schChunk)
if err != nil {
t.Fatalf("IdentityAddress returned error: %v", err)
}
Expand All @@ -66,7 +66,7 @@ func TestIdentityAddress(t *testing.T) {
}

// Call IdentityAddress with the CAC
addr, err := soc.IdentityAddress(cacChunk)
addr, err := storage.IdentityAddress(cacChunk)
if err != nil {
t.Fatalf("IdentityAddress returned error: %v", err)
}
Expand Down
Loading

0 comments on commit 4cf5ab6

Please sign in to comment.