Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optional custom validation #128

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions p2p/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ type Exchange[H header.Header[H]] struct {
peerTracker *peerTracker
metrics *exchangeMetrics

Params ClientParameters
Params ClientParameters
CustomValidate func(H) error
}

func NewExchange[H header.Header[H]](
Expand Down Expand Up @@ -253,6 +254,7 @@ func (ex *Exchange[H]) GetRangeByHeight(
) ([]H, error) {
session := newSession[H](
ex.ctx, ex.host, ex.peerTracker, ex.protocolID, ex.Params.RangeRequestTimeout, ex.metrics, withValidation(from),
func(s *session[H]) { s.customValidate = ex.CustomValidate },
)
defer session.close()
// we request the next header height that we don't have: `fromHead`+1
Expand Down Expand Up @@ -335,7 +337,7 @@ func (ex *Exchange[H]) request(
return nil, err
}

hdrs, err := processResponses[H](responses)
hdrs, err := processResponses[H](responses, ex.CustomValidate)
if err != nil {
return nil, err
}
Expand Down
18 changes: 13 additions & 5 deletions p2p/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ type session[H header.Header[H]] struct {
from H
requestTimeout time.Duration

ctx context.Context
cancel context.CancelFunc
reqCh chan *p2p_pb.HeaderRequest
ctx context.Context
cancel context.CancelFunc
reqCh chan *p2p_pb.HeaderRequest
customValidate func(H) error
}

func newSession[H header.Header[H]](
Expand Down Expand Up @@ -222,7 +223,7 @@ func (s *session[H]) doRequest(

// processResponses converts HeaderResponse to Header.
func (s *session[H]) processResponses(responses []*p2p_pb.HeaderResponse) ([]H, error) {
hdrs, err := processResponses[H](responses)
hdrs, err := processResponses[H](responses, s.customValidate)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -288,7 +289,7 @@ func prepareRequests(from, amount, headersPerPeer uint64) []*p2p_pb.HeaderReques
}

// processResponses converts HeaderResponses to Headers
func processResponses[H header.Header[H]](resps []*p2p_pb.HeaderResponse) ([]H, error) {
func processResponses[H header.Header[H]](resps []*p2p_pb.HeaderResponse, customValidate func(H) error) ([]H, error) {
if len(resps) == 0 {
return nil, errEmptyResponse
}
Expand All @@ -311,6 +312,13 @@ func processResponses[H header.Header[H]](resps []*p2p_pb.HeaderResponse) ([]H,
return nil, err
}

if customValidate != nil {
err = customValidate(hdr)
if err != nil {
return nil, err
}
}

hdrs = append(hdrs, hdr)
}

Expand Down
20 changes: 16 additions & 4 deletions p2p/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ type SubscriberParams struct {
type Subscriber[H header.Header[H]] struct {
pubsubTopicID string

metrics *subscriberMetrics
pubsub *pubsub.PubSub
topic *pubsub.Topic
msgID pubsub.MsgIdFunction
metrics *subscriberMetrics
pubsub *pubsub.PubSub
topic *pubsub.Topic
msgID pubsub.MsgIdFunction
CustomValidate func(H) error
}

// WithSubscriberMetrics enables metrics collection for the Subscriber.
Expand Down Expand Up @@ -118,6 +119,17 @@ func (s *Subscriber[H]) SetVerifier(val func(context.Context, H) error) error {
return pubsub.ValidationReject
}

if s.CustomValidate != nil {
err = s.CustomValidate(hdr)
if err != nil {
log.Errorw("invalid header",
"from", p.ShortString(),
"err", err)
s.metrics.reject(ctx)
return pubsub.ValidationReject
}
}

var verErr *header.VerifyError
err = val(ctx, hdr)
switch {
Expand Down