Skip to content

Commit

Permalink
- remove sync_getter
Browse files Browse the repository at this point in the history
- test case with untrusted peer
  • Loading branch information
ilia.bulavintsev committed Nov 8, 2024
1 parent cb51900 commit 2a4a324
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 167 deletions.
2 changes: 1 addition & 1 deletion p2p/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) (
return head.(H), err
}
span.SetStatus(codes.Ok, "")
return head.(H), err
return head.(H), nil
}

func (ex *Exchange[H]) head(ctx context.Context, span trace.Span, opts ...header.HeadOption[H]) (H, error) {
Expand Down
43 changes: 19 additions & 24 deletions p2p/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,39 +164,32 @@ func TestExchange_RequestHeadFlightProtection(t *testing.T) {
hosts := createMocknet(t, 3)
exchg, trustedStore := createP2PExAndServer(t, hosts[0], hosts[1])

// create new server-side exchange that will act as the tracked peer
// it will have a higher chain head than the trusted peer so that the
// test can determine which peer was asked
trackedStore := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 50)
serverSideEx, err := NewExchangeServer[*headertest.DummyHeader](hosts[2], trackedStore,
WithNetworkID[ServerParameters](networkID),
)
require.NoError(t, err)
err = serverSideEx.Start(ctx)
require.NoError(t, err)
t.Cleanup(func() {
err = serverSideEx.Stop(ctx)
require.NoError(t, err)
})
// create the same requests
tests := make([]struct {
tests := []struct {
requestFromTrusted bool
lastHeader *headertest.DummyHeader
expectedHeight uint64
expectedHash header.Hash
}, 10)
for i := 0; i < 10; i++ {
tests[i] = struct {
requestFromTrusted bool
lastHeader *headertest.DummyHeader
expectedHeight uint64
expectedHash header.Hash
}{
}{
{
requestFromTrusted: true,
lastHeader: trustedStore.Headers[trustedStore.HeadHeight-1],
expectedHeight: trustedStore.HeadHeight,
expectedHash: trustedStore.Headers[trustedStore.HeadHeight].Hash(),
}
},
{
requestFromTrusted: true,
lastHeader: trustedStore.Headers[trustedStore.HeadHeight-1],
expectedHeight: trustedStore.HeadHeight,
expectedHash: trustedStore.Headers[trustedStore.HeadHeight].Hash(),
},
{
// request from untrusted peer should be the same as trusted bc of single-preflight
requestFromTrusted: false,
lastHeader: trustedStore.Headers[trustedStore.HeadHeight-1],
expectedHeight: trustedStore.HeadHeight,
expectedHash: trustedStore.Headers[trustedStore.HeadHeight].Hash(),
},
}

var wg sync2.WaitGroup
Expand All @@ -222,6 +215,8 @@ func TestExchange_RequestHeadFlightProtection(t *testing.T) {
assert.Equal(t, testStruct.expectedHash, h.Hash())

}(tt, i)
// ensure first Head will be locked by request from trusted peer
time.Sleep(time.Microsecond)
}
wg.Wait()
}
Expand Down
4 changes: 2 additions & 2 deletions sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var log = logging.Logger("header/sync")
type Syncer[H header.Header[H]] struct {
sub header.Subscriber[H] // to subscribe for new Network Heads
store syncStore[H] // to store all the headers to
getter syncGetter[H] // to fetch headers from
getter header.Getter[H] // to fetch headers from
metrics *metrics

// stateLk protects state which represents the current or latest sync
Expand Down Expand Up @@ -80,7 +80,7 @@ func NewSyncer[H header.Header[H]](
return &Syncer[H]{
sub: sub,
store: syncStore[H]{Store: store},
getter: syncGetter[H]{Getter: getter},
getter: getter,
metrics: metrics,
triggerSync: make(chan struct{}, 1), // should be buffered
Params: &params,
Expand Down
52 changes: 0 additions & 52 deletions sync/sync_getter.go

This file was deleted.

72 changes: 0 additions & 72 deletions sync/sync_getter_test.go

This file was deleted.

16 changes: 0 additions & 16 deletions sync/sync_head.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,6 @@ func (s *Syncer[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, err
return sbjHead, nil
}

// single-flight protection ensure only one Head is requested at the time
if !s.getter.Lock() {
// means that other routine held the lock and set the subjective head for us,
// so just recursively get it
return s.Head(ctx)
}
defer s.getter.Unlock()

s.metrics.outdatedHead(s.ctx)

reqCtx, cancel := context.WithTimeout(ctx, headRequestTimeout)
Expand Down Expand Up @@ -80,14 +72,6 @@ func (s *Syncer[H]) subjectiveHead(ctx context.Context) (H, error) {
}
// otherwise, request head from a trusted peer
log.Infow("stored head header expired", "height", storeHead.Height())
// single-flight protection
// ensure only one Head is requested at the time
if !s.getter.Lock() {
// means that other routine held the lock and set the subjective head for us,
// so just recursively get it
return s.subjectiveHead(ctx)
}
defer s.getter.Unlock()

trustHead, err := s.getter.Head(ctx)
if err != nil {
Expand Down

0 comments on commit 2a4a324

Please sign in to comment.