Skip to content

Commit

Permalink
fly: updated p2p deps
Browse files Browse the repository at this point in the history
  • Loading branch information
kev1n-peters committed Nov 16, 2023
1 parent b791923 commit baebb75
Show file tree
Hide file tree
Showing 7 changed files with 874 additions and 562 deletions.
41 changes: 39 additions & 2 deletions fly/cmd/fly/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func main() {
sendC := make(chan []byte)

// Inbound observations
obsvC := make(chan *gossipv1.SignedObservation, 50)
obsvC := make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], 1024)

// Inbound observation requests
obsvReqC := make(chan *gossipv1.ObservationRequest, 50)
Expand Down Expand Up @@ -337,6 +337,8 @@ func main() {
"height": strconv.FormatInt(network.Height, 10),
"contractAddress": network.ContractAddress,
"errorCount": strconv.FormatUint(network.ErrorCount, 10),
"safeHeight": strconv.FormatInt(network.SafeHeight, 10),
"finalizedHeight": strconv.FormatInt(network.FinalizedHeight, 10),
})
}

Expand Down Expand Up @@ -497,8 +499,43 @@ func main() {
}

// Run supervisor.
components := p2p.DefaultComponents()
components.Port = p2pPort
// Reduce number of connected peers to reduce network egress
components.GossipParams.D = 1 // default: 6
components.GossipParams.Dlo = 1 // default: 5
components.GossipParams.Dhi = 2 // default: 12
components.GossipParams.Dout = 1 // default: 2
supervisor.New(rootCtx, logger, func(ctx context.Context) error {
if err := supervisor.Run(ctx, "p2p", p2p.Run(obsvC, obsvReqC, nil, sendC, signedInC, priv, nil, gst, p2pPort, p2pNetworkID, p2pBootstrap, "", false, rootCtxCancel, nil, govConfigC, govStatusC)); err != nil {
if err := supervisor.Run(ctx,
"p2p",
p2p.Run(obsvC,
obsvReqC,
nil,
sendC,
signedInC,
priv,
nil,
gst,
p2pNetworkID,
p2pBootstrap,
"",
false,
rootCtxCancel,
nil,
nil,
govConfigC,
govStatusC,
components,
nil,
false,
false,
nil,
nil,
"",
0,
"",
)); err != nil {
return err
}

Expand Down
38 changes: 34 additions & 4 deletions fly/cmd/heartbeats/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func main() {
sendC := make(chan []byte)

// Inbound observations
obsvC := make(chan *gossipv1.SignedObservation, 1024)
obsvC := make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], 1024)

// Inbound observation requests
obsvReqC := make(chan *gossipv1.ObservationRequest, 1024)
Expand Down Expand Up @@ -311,9 +311,9 @@ func main() {
case <-rootCtx.Done():
return
case o := <-obsvC:
spl := strings.Split(o.MessageId, "/")
spl := strings.Split(o.Msg.MessageId, "/")
emitter := strings.ToLower(spl[1])
addr := "0x" + string(hex.EncodeToString(o.Addr))
addr := "0x" + string(hex.EncodeToString(o.Msg.Addr))
idx := guardianIndexMap[strings.ToLower(addr)]
if knownEmitters[emitter] {
gossipCounter[idx][GSM_tbObservation]++
Expand Down Expand Up @@ -510,8 +510,38 @@ func main() {
}

// Run supervisor.
components := p2p.DefaultComponents()
components.Port = p2pPort
supervisor.New(rootCtx, logger, func(ctx context.Context) error {
if err := supervisor.Run(ctx, "p2p", p2p.Run(obsvC, obsvReqC, nil, sendC, signedInC, priv, nil, gst, p2pPort, p2pNetworkID, p2pBootstrap, "", false, rootCtxCancel, nil, govConfigC, govStatusC)); err != nil {
if err := supervisor.Run(ctx,
"p2p",
p2p.Run(obsvC,
obsvReqC,
nil,
sendC,
signedInC,
priv,
nil,
gst,
p2pNetworkID,
p2pBootstrap,
"",
false,
rootCtxCancel,
nil,
nil,
govConfigC,
govStatusC,
components,
nil,
false,
false,
nil,
nil,
"",
0,
"",
)); err != nil {
return err
}

Expand Down
48 changes: 39 additions & 9 deletions fly/cmd/observation_stats/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func main() {
sendC := make(chan []byte)

// Inbound observations
obsvC := make(chan *gossipv1.SignedObservation, 50)
obsvC := make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], 1024)

// Inbound observation requests
obsvReqC := make(chan *gossipv1.ObservationRequest, 50)
Expand Down Expand Up @@ -110,16 +110,16 @@ func main() {
case <-rootCtx.Done():
return
case o := <-obsvC:
if o.MessageId[:3] != "26/" && o.MessageId[:2] != "7/" {
ga := eth_common.BytesToAddress(o.Addr).String()
if o.Msg.MessageId[:3] != "26/" && o.Msg.MessageId[:2] != "7/" {
ga := eth_common.BytesToAddress(o.Msg.Addr).String()
// logger.Warn("observation", zap.String("id",o.MessageId), zap.String("addr",ga))
if _, ok := obsvByHash[o.MessageId]; !ok {
obsvByHash[o.MessageId] = map[string]time.Time{}
if _, ok := obsvByHash[o.Msg.MessageId]; !ok {
obsvByHash[o.Msg.MessageId] = map[string]time.Time{}
}
if _, ok := obsvByHash[o.MessageId][ga]; !ok {
obsvByHash[o.MessageId][ga] = time.Now()
if _, ok := obsvByHash[o.Msg.MessageId][ga]; !ok {
obsvByHash[o.Msg.MessageId][ga] = time.Now()
}
logger.Warn("status", zap.String("id", o.MessageId), zap.Any("msg", obsvByHash[o.MessageId]))
logger.Warn("status", zap.String("id", o.Msg.MessageId), zap.Any("msg", obsvByHash[o.Msg.MessageId]))
}
}
}
Expand Down Expand Up @@ -189,8 +189,38 @@ func main() {
}

// Run supervisor.
components := p2p.DefaultComponents()
components.Port = p2pPort
supervisor.New(rootCtx, logger, func(ctx context.Context) error {
if err := supervisor.Run(ctx, "p2p", p2p.Run(obsvC, obsvReqC, nil, sendC, signedInC, priv, nil, gst, p2pPort, p2pNetworkID, p2pBootstrap, "", false, rootCtxCancel, nil, govConfigC, govStatusC)); err != nil {
if err := supervisor.Run(ctx,
"p2p",
p2p.Run(obsvC,
obsvReqC,
nil,
sendC,
signedInC,
priv,
nil,
gst,
p2pNetworkID,
p2pBootstrap,
"",
false,
rootCtxCancel,
nil,
nil,
govConfigC,
govStatusC,
components,
nil,
false,
false,
nil,
nil,
"",
0,
"",
)); err != nil {
return err
}

Expand Down
34 changes: 32 additions & 2 deletions fly/cmd/track_gossip/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func main() {
sendC := make(chan []byte)

// Inbound observations
obsvC := make(chan *gossipv1.SignedObservation, 50)
obsvC := make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], 1024)

// Inbound observation requests
obsvReqC := make(chan *gossipv1.ObservationRequest, 50)
Expand Down Expand Up @@ -234,8 +234,38 @@ func main() {
}

// Run supervisor.
components := p2p.DefaultComponents()
components.Port = p2pPort
supervisor.New(rootCtx, logger, func(ctx context.Context) error {
if err := supervisor.Run(ctx, "p2p", p2p.Run(obsvC, obsvReqC, nil, sendC, signedInC, priv, nil, gst, p2pPort, p2pNetworkID, p2pBootstrap, "", false, rootCtxCancel, nil, govConfigC, govStatusC)); err != nil {
if err := supervisor.Run(ctx,
"p2p",
p2p.Run(obsvC,
obsvReqC,
nil,
sendC,
signedInC,
priv,
nil,
gst,
p2pNetworkID,
p2pBootstrap,
"",
false,
rootCtxCancel,
nil,
nil,
govConfigC,
govStatusC,
components,
nil,
false,
false,
nil,
nil,
"",
0,
"",
)); err != nil {
return err
}

Expand Down
45 changes: 34 additions & 11 deletions fly/cmd/track_pyth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func main() {
sendC := make(chan []byte)

// Inbound observations
obsvC := make(chan *gossipv1.SignedObservation, 50)
obsvC := make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], 1024)

// Inbound observation requests
obsvReqC := make(chan *gossipv1.ObservationRequest, 50)
Expand Down Expand Up @@ -171,7 +171,7 @@ func main() {
case <-rootCtx.Done():
return
case m := <-obsvC:
handleObservation(logger, gs, m)
handleObservation(logger, gs, m.Msg)
}
}
}()
Expand Down Expand Up @@ -259,8 +259,38 @@ func main() {
}

// Run supervisor.
components := p2p.DefaultComponents()
components.Port = p2pPort
supervisor.New(rootCtx, logger, func(ctx context.Context) error {
if err := supervisor.Run(ctx, "p2p", p2p.Run(obsvC, obsvReqC, nil, sendC, signedInC, priv, nil, gst, p2pPort, p2pNetworkID, p2pBootstrap, "", false, rootCtxCancel, nil, govConfigC, govStatusC)); err != nil {
if err := supervisor.Run(ctx,
"p2p",
p2p.Run(obsvC,
obsvReqC,
nil,
sendC,
signedInC,
priv,
nil,
gst,
p2pNetworkID,
p2pBootstrap,
"",
false,
rootCtxCancel,
nil,
nil,
govConfigC,
govStatusC,
components,
nil,
false,
false,
nil,
nil,
"",
0,
"",
)); err != nil {
return err
}

Expand Down Expand Up @@ -289,16 +319,9 @@ func handleSignedVAAWithQuorum(logger *zap.Logger, gs common.GuardianSet, m *gos
return
}

// Calculate digest for logging
digest := v.SigningMsg()
hash := hex.EncodeToString(digest.Bytes())

// Check if guardianSet doesn't have any keys
if len(gs.Keys) == 0 {
logger.Warn("dropping SignedVAAWithQuorum message since we have a guardian set without keys",
zap.String("digest", hash),
zap.Any("message", m),
)
logger.Warn("dropping SignedVAAWithQuorum message since we have a guardian set without keys")
return
}

Expand Down
Loading

0 comments on commit baebb75

Please sign in to comment.