Skip to content

Commit

Permalink
refactor: gsoc handler param soc as reference
Browse files Browse the repository at this point in the history
  • Loading branch information
nugaon committed Nov 5, 2024
1 parent 0ce6407 commit d54dff5
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 26 deletions.
6 changes: 3 additions & 3 deletions pkg/api/gsoc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestGsocWebsocketSingleHandler(t *testing.T) {
socCh := soc.New(id, ch)
ch, _ = socCh.Sign(signer)
socCh, _ = soc.FromChunk(ch)
g.Handle(*socCh)
g.Handle(socCh)

go expectMessage(t, cl, respC, payload)
if err := <-respC; err != nil {
Expand Down Expand Up @@ -87,7 +87,7 @@ func TestGsocWebsocketMultiHandler(t *testing.T) {
t.Fatal(err)
}

g.Handle(*socCh)
g.Handle(socCh)

go expectMessage(t, cl, respC, payload)
go expectMessage(t, cl2, respC, payload)
Expand Down Expand Up @@ -125,7 +125,7 @@ func TestGsocPong(t *testing.T) {
ch, _ = socCh.Sign(signer)
socCh, _ = soc.FromChunk(ch)

g.Handle(*socCh)
g.Handle(socCh)

go expectMessage(t, cl, respC, nil)
if err := <-respC; err == nil || !strings.Contains(err.Error(), "i/o timeout") {
Expand Down
4 changes: 2 additions & 2 deletions pkg/gsoc/gsoc.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

type Listener interface {
Subscribe(address [32]byte, handler handler) (cleanup func())
Handle(c soc.SOC)
Handle(c *soc.SOC)
Close() error
}

Expand Down Expand Up @@ -56,7 +56,7 @@ func (l *listener) Subscribe(address [32]byte, handler handler) (cleanup func())
}

// Handle is called by push/pull sync and passes the chunk its registered handler
func (l *listener) Handle(c soc.SOC) {
func (l *listener) Handle(c *soc.SOC) {
addr, err := c.Address()
if err != nil {
return // no handler
Expand Down
8 changes: 4 additions & 4 deletions pkg/gsoc/gsoc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestRegister(t *testing.T) {
socCh2, _ = soc.FromChunk(ch2)

// trigger soc upload on address1, check that only h1 is called
g.Handle(*socCh1)
g.Handle(socCh1)

waitHandlerCallback(t, &msgChan, 1)

Expand All @@ -76,7 +76,7 @@ func TestRegister(t *testing.T) {
// register another handler on the first address
cleanup := g.Subscribe([32]byte(address1.Bytes()), h3)

g.Handle(*socCh1)
g.Handle(socCh1)

waitHandlerCallback(t, &msgChan, 2)

Expand All @@ -86,15 +86,15 @@ func TestRegister(t *testing.T) {

cleanup() // remove the last handler

g.Handle(*socCh1)
g.Handle(socCh1)

waitHandlerCallback(t, &msgChan, 1)

ensureCalls(t, &h1Calls, 3)
ensureCalls(t, &h2Calls, 0)
ensureCalls(t, &h3Calls, 1)

g.Handle(*socCh2)
g.Handle(socCh2)

waitHandlerCallback(t, &msgChan, 1)

Expand Down
6 changes: 3 additions & 3 deletions pkg/pullsync/pullsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type Syncer struct {
store storer.Reserve
quit chan struct{}
unwrap func(swarm.Chunk)
gsocHandler func(soc.SOC)
gsocHandler func(*soc.SOC)
validStamp postage.ValidStampFn
intervalsSF singleflight.Group[string, *collectAddrsResult]
syncInProgress atomic.Int32
Expand All @@ -88,7 +88,7 @@ func New(
streamer p2p.Streamer,
store storer.Reserve,
unwrap func(swarm.Chunk),
gsocHandler func(soc.SOC),
gsocHandler func(*soc.SOC),
validStamp postage.ValidStampFn,
logger log.Logger,
maxPage uint64,
Expand Down Expand Up @@ -360,7 +360,7 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start
if cac.Valid(chunk) {
go s.unwrap(chunk)
} else if chunk, err := soc.FromChunk(chunk); err == nil {
s.gsocHandler(*chunk)
s.gsocHandler(chunk)
} else {
s.logger.Debug("invalid cac/soc chunk", "error", swarm.ErrInvalidChunk, "peer_address", peer, "chunk", chunk)
chunkErr = errors.Join(chunkErr, swarm.ErrInvalidChunk)
Expand Down
2 changes: 1 addition & 1 deletion pkg/pullsync/pullsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ func newPullSyncWithStamperValidator(
storage := mock.NewReserve(o...)
logger := log.Noop
unwrap := func(swarm.Chunk) {}
socHandler := func(soc.SOC) {}
socHandler := func(*soc.SOC) {}
ps := pullsync.New(
s,
storage,
Expand Down
6 changes: 3 additions & 3 deletions pkg/pushsync/pushsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type PushSync struct {
store Storer
topologyDriver topology.Driver
unwrap func(swarm.Chunk)
gsocHandler func(soc.SOC)
gsocHandler func(*soc.SOC)
logger log.Logger
accounting accounting.Interface
pricer pricer.Interface
Expand Down Expand Up @@ -115,7 +115,7 @@ func New(
topology topology.Driver,
fullNode bool,
unwrap func(swarm.Chunk),
gsocHandler func(soc.SOC),
gsocHandler func(*soc.SOC),
validStamp postage.ValidStampFn,
logger log.Logger,
accounting accounting.Interface,
Expand Down Expand Up @@ -229,7 +229,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
if cac.Valid(chunk) {
go ps.unwrap(chunk)
} else if chunk, err := soc.FromChunk(chunk); err == nil {
ps.gsocHandler(*chunk)
ps.gsocHandler(chunk)
} else {
return swarm.ErrInvalidChunk
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/pushsync/pushsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func TestSocListener(t *testing.T) {
}
sch2 = sch2.WithStamp(chunk2.Stamp())
expectedPayload := chunk1.Data()
gsocListener := func(soc soc.SOC) {
gsocListener := func(soc *soc.SOC) {
if !bytes.Equal(soc.WrappedChunk().Data(), expectedPayload) {
t.Fatalf("unexpected SOC payload on GSOC listener. got %s, want %s", soc.WrappedChunk().Data(), expectedPayload)
}
Expand Down Expand Up @@ -468,7 +468,7 @@ func TestPushChunkToClosestErrorAttemptRetry(t *testing.T) {
}),
)

psPivot, pivotStorer := createPushSyncNodeWithAccounting(t, pivotNode, defaultPrices, recorder, nil, defaultSigner(chunk), pivotAccounting, log.Noop, func(soc.SOC) {}, mock.WithPeers(peer1, peer2, peer3, peer4))
psPivot, pivotStorer := createPushSyncNodeWithAccounting(t, pivotNode, defaultPrices, recorder, nil, defaultSigner(chunk), pivotAccounting, log.Noop, func(*soc.SOC) {}, mock.WithPeers(peer1, peer2, peer3, peer4))

// Trigger the sending of chunk to the closest node
receipt, err := psPivot.PushChunkToClosest(context.Background(), chunk)
Expand Down Expand Up @@ -645,15 +645,15 @@ func TestPropagateErrMsg(t *testing.T) {
captureLogger := log.NewLogger("test", log.WithSink(buf))

// Create the closest peer
psClosestPeer, _ := createPushSyncNodeWithAccounting(t, closestPeer, defaultPrices, nil, nil, faultySigner, accountingmock.NewAccounting(), log.Noop, func(soc.SOC) {}, mock.WithClosestPeerErr(topology.ErrWantSelf))
psClosestPeer, _ := createPushSyncNodeWithAccounting(t, closestPeer, defaultPrices, nil, nil, faultySigner, accountingmock.NewAccounting(), log.Noop, func(*soc.SOC) {}, mock.WithClosestPeerErr(topology.ErrWantSelf))

// creating the pivot peer
psPivot, _ := createPushSyncNodeWithAccounting(t, pivotPeer, defaultPrices, nil, nil, defaultSigner(chunk), accountingmock.NewAccounting(), log.Noop, func(soc.SOC) {}, mock.WithPeers(closestPeer))
psPivot, _ := createPushSyncNodeWithAccounting(t, pivotPeer, defaultPrices, nil, nil, defaultSigner(chunk), accountingmock.NewAccounting(), log.Noop, func(*soc.SOC) {}, mock.WithPeers(closestPeer))

combinedRecorder := streamtest.New(streamtest.WithProtocols(psPivot.Protocol(), psClosestPeer.Protocol()), streamtest.WithBaseAddr(triggerPeer))

// Creating the trigger peer
psTriggerPeer, _ := createPushSyncNodeWithAccounting(t, triggerPeer, defaultPrices, combinedRecorder, nil, defaultSigner(chunk), accountingmock.NewAccounting(), captureLogger, func(soc.SOC) {}, mock.WithPeers(pivotPeer))
psTriggerPeer, _ := createPushSyncNodeWithAccounting(t, triggerPeer, defaultPrices, combinedRecorder, nil, defaultSigner(chunk), accountingmock.NewAccounting(), captureLogger, func(*soc.SOC) {}, mock.WithPeers(pivotPeer))

_, err := psTriggerPeer.PushChunkToClosest(context.Background(), chunk)
if err == nil {
Expand Down Expand Up @@ -829,7 +829,7 @@ func createPushSyncNode(
) (*pushsync.PushSync, *testStorer, accounting.Interface) {
t.Helper()
mockAccounting := accountingmock.NewAccounting()
ps, mstorer := createPushSyncNodeWithAccounting(t, addr, prices, recorder, unwrap, signer, mockAccounting, log.Noop, func(soc.SOC) {}, mockOpts...)
ps, mstorer := createPushSyncNodeWithAccounting(t, addr, prices, recorder, unwrap, signer, mockAccounting, log.Noop, func(*soc.SOC) {}, mockOpts...)
return ps, mstorer, mockAccounting
}

Expand All @@ -838,7 +838,7 @@ func createGsocPushSyncNode(
addr swarm.Address,
prices pricerParameters,
recorder *streamtest.Recorder,
gsocListener func(soc.SOC),
gsocListener func(*soc.SOC),
signer crypto.Signer,
mockOpts ...mock.Option,
) (*pushsync.PushSync, *testStorer, accounting.Interface) {
Expand Down Expand Up @@ -878,7 +878,7 @@ func createPushSyncNodeWithRadius(

radiusFunc := func() (uint8, error) { return radius, nil }

ps := pushsync.New(addr, 1, blockHash.Bytes(), recorderDisconnecter, storer, radiusFunc, mockTopology, true, unwrap, func(soc.SOC) {}, validStamp, log.Noop, accountingmock.NewAccounting(), mockPricer, signer, nil, -1)
ps := pushsync.New(addr, 1, blockHash.Bytes(), recorderDisconnecter, storer, radiusFunc, mockTopology, true, unwrap, func(*soc.SOC) {}, validStamp, log.Noop, accountingmock.NewAccounting(), mockPricer, signer, nil, -1)
t.Cleanup(func() { ps.Close() })

return ps, storer
Expand All @@ -893,7 +893,7 @@ func createPushSyncNodeWithAccounting(
signer crypto.Signer,
acct accounting.Interface,
logger log.Logger,
gsocListener func(soc.SOC),
gsocListener func(*soc.SOC),
mockOpts ...mock.Option,
) (*pushsync.PushSync, *testStorer) {
t.Helper()
Expand All @@ -910,7 +910,7 @@ func createPushSyncNodeWithAccounting(
unwrap = func(swarm.Chunk) {}
}
if gsocListener == nil {
gsocListener = func(soc.SOC) {}
gsocListener = func(*soc.SOC) {}
}

validStamp := func(ch swarm.Chunk) (swarm.Chunk, error) {
Expand Down

0 comments on commit d54dff5

Please sign in to comment.