Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(gsoc): improvements #4899

Merged
merged 5 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions pkg/api/gsoc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ func (s *Service) gsocWsHandler(w http.ResponseWriter, r *http.Request) {
logger := s.logger.WithName("gsoc_subscribe").Build()

paths := struct {
Address []byte `map:"address" validate:"required"`
Address swarm.Address `map:"address,resolve" validate:"required"`
}{}

if response := s.mapStructure(mux.Vars(r), &paths); response != nil {
response("invalid path params", logger, w)
return
Expand All @@ -43,7 +44,7 @@ func (s *Service) gsocWsHandler(w http.ResponseWriter, r *http.Request) {
go s.gsocListeningWs(conn, paths.Address)
}

func (s *Service) gsocListeningWs(conn *websocket.Conn, socAddress []byte) {
func (s *Service) gsocListeningWs(conn *websocket.Conn, socAddress swarm.Address) {
defer s.wsWg.Done()

var (
Expand All @@ -56,7 +57,7 @@ func (s *Service) gsocListeningWs(conn *websocket.Conn, socAddress []byte) {
ticker.Stop()
_ = conn.Close()
}()
cleanup := s.gsoc.Subscribe([32]byte(socAddress), func(m []byte) {
cleanup := s.gsoc.Subscribe(socAddress, func(m []byte) {
select {
case dataC <- m:
case <-gone:
Expand Down
34 changes: 16 additions & 18 deletions pkg/gsoc/gsoc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,18 @@ import (
"github.com/ethersphere/bee/v2/pkg/swarm"
)

// Handler defines code to be executed upon reception of a GSOC sub message.
// it is used as a parameter definition.
type Handler func([]byte)

type Listener interface {
Subscribe(address [32]byte, handler Handler) (cleanup func())
Subscribe(address swarm.Address, handler Handler) (cleanup func())
Handle(c *soc.SOC)
Close() error
}

type listener struct {
handlers map[[32]byte][]*Handler
handlers map[string][]*Handler
handlersMu sync.Mutex
quit chan struct{}
logger log.Logger
Expand All @@ -29,26 +33,26 @@ type listener struct {
func New(logger log.Logger) Listener {
return &listener{
logger: logger,
handlers: make(map[[32]byte][]*Handler),
handlers: make(map[string][]*Handler),
quit: make(chan struct{}),
}
}

// Subscribe allows the definition of a Handler func on a specific GSOC address.
func (l *listener) Subscribe(address [32]byte, handler Handler) (cleanup func()) {
func (l *listener) Subscribe(address swarm.Address, handler Handler) (cleanup func()) {
l.handlersMu.Lock()
defer l.handlersMu.Unlock()

l.handlers[address] = append(l.handlers[address], &handler)
l.handlers[address.ByteString()] = append(l.handlers[address.ByteString()], &handler)

return func() {
l.handlersMu.Lock()
defer l.handlersMu.Unlock()

h := l.handlers[address]
h := l.handlers[address.ByteString()]
for i := 0; i < len(h); i++ {
if h[i] == &handler {
l.handlers[address] = append(h[:i], h[i+1:]...)
l.handlers[address.ByteString()] = append(h[:i], h[i+1:]...)
return
}
}
Expand All @@ -61,13 +65,11 @@ func (l *listener) Handle(c *soc.SOC) {
if err != nil {
return // no handler
}
h := l.getHandlers([32]byte(addr.Bytes()))
h := l.getHandlers(addr)
if h == nil {
return // no handler
}
l.logger.Info("new incoming GSOC message",
"GSOC Address", addr,
"wrapped chunk address", c.WrappedChunk().Address())
l.logger.Debug("new incoming GSOC message", "GSOC Address", addr, "wrapped chunk address", c.WrappedChunk().Address())

for _, hh := range h {
go func(hh Handler) {
Expand All @@ -76,23 +78,19 @@ func (l *listener) Handle(c *soc.SOC) {
}
}

func (p *listener) getHandlers(address [32]byte) []*Handler {
func (p *listener) getHandlers(address swarm.Address) []*Handler {
p.handlersMu.Lock()
defer p.handlersMu.Unlock()

return p.handlers[address]
return p.handlers[address.ByteString()]
}

func (l *listener) Close() error {
close(l.quit)
l.handlersMu.Lock()
defer l.handlersMu.Unlock()

l.handlers = make(map[[32]byte][]*Handler) //unset handlers on shutdown
l.handlers = make(map[string][]*Handler) //unset handlers on shutdown

return nil
}

// Handler defines code to be executed upon reception of a GSOC sub message.
// it is used as a parameter definition.
type Handler func([]byte)
8 changes: 4 additions & 4 deletions pkg/gsoc/gsoc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestRegister(t *testing.T) {
t.Parallel()

var (
g = gsoc.New(log.NewLogger("test"))
g = gsoc.New(log.Noop)
h1Calls = 0
h2Calls = 0
h3Calls = 0
Expand Down Expand Up @@ -52,8 +52,8 @@ func TestRegister(t *testing.T) {
msgChan <- struct{}{}
}
)
_ = g.Subscribe([32]byte(address1.Bytes()), h1)
_ = g.Subscribe([32]byte(address2.Bytes()), h2)
_ = g.Subscribe(address1, h1)
_ = g.Subscribe(address2, h2)

ch1, _ := cac.New(payload1)
socCh1 := soc.New(socId1, ch1)
Expand All @@ -74,7 +74,7 @@ func TestRegister(t *testing.T) {
ensureCalls(t, &h2Calls, 0)

// register another handler on the first address
cleanup := g.Subscribe([32]byte(address1.Bytes()), h3)
cleanup := g.Subscribe(address1, h3)

g.Handle(socCh1)

Expand Down
20 changes: 9 additions & 11 deletions pkg/pusher/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type Op struct {
Err chan error
Direct bool
Span opentracing.Span

identityAddress swarm.Address
}

type OpChan <-chan *Op
Expand Down Expand Up @@ -218,7 +220,9 @@ func (s *Service) chunksWorker(warmupTime time.Duration) {
idAddress, err := soc.IdentityAddress(op.Chunk)
if err != nil {
op.Err <- err
continue
}
op.identityAddress = idAddress
if s.inflight.set(idAddress, op.Chunk.Stamp().BatchID()) {
if op.Direct {
select {
Expand All @@ -245,12 +249,8 @@ func (s *Service) chunksWorker(warmupTime time.Duration) {

func (s *Service) pushDeferred(ctx context.Context, logger log.Logger, op *Op) (bool, error) {
loggerV1 := logger.V(1).Build()
idAddress, err := soc.IdentityAddress(op.Chunk)
if err != nil {
return true, err
}

defer s.inflight.delete(idAddress, op.Chunk.Stamp().BatchID())
defer s.inflight.delete(op.identityAddress, op.Chunk.Stamp().BatchID())

if _, err := s.validStamp(op.Chunk); err != nil {
loggerV1.Warning(
Expand Down Expand Up @@ -278,7 +278,7 @@ func (s *Service) pushDeferred(ctx context.Context, logger log.Logger, op *Op) (
return true, err
}
case errors.Is(err, pushsync.ErrShallowReceipt):
if retry := s.shallowReceipt(idAddress); retry {
if retry := s.shallowReceipt(op.identityAddress); retry {
return true, err
}
if err := s.storer.Report(ctx, op.Chunk, storage.ChunkSynced); err != nil {
Expand All @@ -300,13 +300,11 @@ func (s *Service) pushDeferred(ctx context.Context, logger log.Logger, op *Op) (

func (s *Service) pushDirect(ctx context.Context, logger log.Logger, op *Op) error {
loggerV1 := logger.V(1).Build()
idAddress, err := soc.IdentityAddress(op.Chunk)
if err != nil {
return err
}

var err error

defer func() {
s.inflight.delete(idAddress, op.Chunk.Stamp().BatchID())
s.inflight.delete(op.identityAddress, op.Chunk.Stamp().BatchID())
select {
case op.Err <- err:
default:
Expand Down
27 changes: 3 additions & 24 deletions pkg/storer/internal/chunkstore/chunkstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,8 @@ func Has(_ context.Context, r storage.Reader, addr swarm.Address) (bool, error)

func Put(ctx context.Context, s storage.IndexStore, sh storage.Sharky, ch swarm.Chunk) error {
var (
rIdx = &RetrievalIndexItem{Address: ch.Address()}
loc sharky.Location
inserted bool
rIdx = &RetrievalIndexItem{Address: ch.Address()}
loc sharky.Location
)
err := s.Get(rIdx)
switch {
Expand All @@ -86,31 +85,11 @@ func Put(ctx context.Context, s storage.IndexStore, sh storage.Sharky, ch swarm.
}
rIdx.Location = loc
rIdx.Timestamp = uint64(time.Now().Unix())
inserted = true
case err != nil:
return fmt.Errorf("chunk store: failed to read: %w", err)
}

// SOC will be replaced in the chunk store if it is already stored with the newer payload.
// Pull sync should sync the new SOC payload with the new stamp.
// TODO: remove this condition when postage stamping is refactored for GSOC.
chunkType := storage.ChunkType(ch)
if !inserted && chunkType == swarm.ChunkTypeSingleOwner {
// replace old payload
err = sh.Release(ctx, rIdx.Location)
if err != nil {
return fmt.Errorf("chunkstore: failed to release sharky location: %w", err)
}

loc, err := sh.Write(ctx, ch.Data())
if err != nil {
return fmt.Errorf("chunk store: write to sharky failed: %w", err)
}
rIdx.Location = loc
rIdx.Timestamp = uint64(time.Now().Unix())
} else {
rIdx.RefCnt++
}
rIdx.RefCnt++

return s.Put(rIdx)
}
Expand Down
61 changes: 0 additions & 61 deletions pkg/storer/internal/chunkstore/chunkstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package chunkstore_test

import (
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -14,9 +13,7 @@ import (
"os"
"testing"

"github.com/ethersphere/bee/v2/pkg/crypto"
"github.com/ethersphere/bee/v2/pkg/sharky"
"github.com/ethersphere/bee/v2/pkg/soc"
"github.com/ethersphere/bee/v2/pkg/storer/internal/transaction"

"github.com/ethersphere/bee/v2/pkg/storage"
Expand Down Expand Up @@ -339,64 +336,6 @@ func TestChunkStore(t *testing.T) {
}
})

// TODO: remove this when postage stamping is refactored for GSOC.
t.Run("put two SOCs with different payloads", func(t *testing.T) {
key, _ := crypto.GenerateSecp256k1Key()
signer := crypto.NewDefaultSigner(key)

// chunk data to upload
chunk1 := chunktest.FixtureChunk("7000")
chunk2 := chunktest.FixtureChunk("0033")
id := make([]byte, swarm.HashSize)
s1 := soc.New(id, chunk1)
s2 := soc.New(id, chunk2)
sch1, err := s1.Sign(signer)
if err != nil {
t.Fatal(err)
}
sch1 = sch1.WithStamp(chunk1.Stamp())
sch2, err := s2.Sign(signer)
if err != nil {
t.Fatal(err)
}
sch2 = sch2.WithStamp(chunk2.Stamp())

// Put the first SOC into the chunk store
err = st.Run(context.Background(), func(s transaction.Store) error {
return s.ChunkStore().Put(context.TODO(), sch1)
})
if err != nil {
t.Fatalf("failed putting first single owner chunk: %v", err)
}

// Put the second SOC into the chunk store
err = st.Run(context.Background(), func(s transaction.Store) error {
return s.ChunkStore().Put(context.TODO(), sch2)
})
if err != nil {
t.Fatalf("failed putting second single owner chunk: %v", err)
}

// Retrieve the chunk from the chunk store
var retrievedChunk swarm.Chunk
err = st.Run(context.Background(), func(s transaction.Store) error {
retrievedChunk, err = s.ChunkStore().Get(context.TODO(), sch1.Address())
return err
})
if err != nil {
t.Fatalf("failed retrieving chunk: %v", err)
}
schRetrieved, err := soc.FromChunk(retrievedChunk)
if err != nil {
t.Fatalf("failed converting chunk to SOC: %v", err)
}

// Verify that the retrieved chunk contains the latest payload
if !bytes.Equal(chunk2.Data(), schRetrieved.WrappedChunk().Data()) {
t.Fatalf("expected payload %s, got %s", chunk2.Data(), schRetrieved.WrappedChunk().Data())
}
})

t.Run("close store", func(t *testing.T) {
err := st.Close()
if err != nil {
Expand Down
Loading
Loading