Skip to content

Commit

Permalink
Fly: Remove references to SignedObservation
Browse files Browse the repository at this point in the history
  • Loading branch information
bruce-riley committed Jan 27, 2025
1 parent 50325da commit 9dc8b09
Show file tree
Hide file tree
Showing 11 changed files with 334 additions and 196 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,6 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: 1.21
go-version: 1.23
- name: Build
run: cd fly && go build -v ./...
14 changes: 7 additions & 7 deletions fly/cmd/healthcheck/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func main() {
defer localCancel()
hbReceived := false
var addrInfo peer.AddrInfo
observationsReceived := 0
observationBatchesReceived := 0
components := p2p.DefaultComponents()
components.Port = p2pPort
host, err := p2p.NewHost(logger, localContext, p2pNetworkID, p2pBootstrap, components, priv)
Expand Down Expand Up @@ -116,10 +116,10 @@ func main() {
hbReceived = true
}
}
case *gossipv1.GossipMessage_SignedObservation:
logger.Debug("received observation")
if bytes.Equal(m.SignedObservation.Addr, guardianPubKey) {
observationsReceived++
case *gossipv1.GossipMessage_SignedObservationBatch:
logger.Debug("received observation batch")
if bytes.Equal(m.SignedObservationBatch.Addr, guardianPubKey) {
observationBatchesReceived++
}
}
}
Expand All @@ -144,8 +144,8 @@ func main() {
} else {
fmt.Println("❌ NO HEARTBEAT RECEIVED")
}
if observationsReceived > 0 {
fmt.Printf("✅ %d observations received\n", observationsReceived)
if observationBatchesReceived > 0 {
fmt.Printf("✅ %d observationBatches received\n", observationBatchesReceived)
} else {
fmt.Println("❌ NO OBSERVATIONS RECEIVED")
}
Expand Down
51 changes: 9 additions & 42 deletions fly/cmd/heartbeats/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,7 @@ func main() {
type GossipMsgType int16

const (
GSM_signedObservation GossipMsgType = iota
GSM_signedObservationInBatch
GSM_signedObservationInBatch GossipMsgType = iota
GSM_signedObservationBatch
GSM_tbObservation
GSM_signedHeartbeat
Expand All @@ -196,7 +195,6 @@ func main() {
)

// Inbound observations
obsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation], 20000)
batchObsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], 20000)

// Inbound observation requests
Expand Down Expand Up @@ -266,12 +264,12 @@ func main() {

gossipMsgTable := table.NewWriter()
gossipMsgTable.SetOutputMirror(os.Stdout)
gossipMsgTable.AppendHeader(table.Row{"#", "Guardian", "Obsv", "ObsvInB", "ObsvB", "TB_OBsv", "HB", "VAA", "Obsv_Req", "Chain_Gov_Cfg", "Chain_Gov_Status"})
gossipMsgTable.AppendHeader(table.Row{"#", "Guardian", "ObsvInB", "ObsvB", "TB_OBsv", "HB", "VAA", "Obsv_Req", "Chain_Gov_Cfg", "Chain_Gov_Status"})
gossipMsgTable.SetStyle(table.StyleColoredDark)

obsvRateTable := table.NewWriter()
obsvRateTable.SetOutputMirror(os.Stdout)
obsvRateTable.AppendHeader(table.Row{"#", "Guardian", "Obsv", "1%", "2%", "3%", "4%", "5%", "6%", "7%", "8%", "9%", "10%"})
obsvRateTable.AppendHeader(table.Row{"#", "Guardian", "1%", "2%", "3%", "4%", "5%", "6%", "7%", "8%", "9%", "10%"})
obsvRateTable.SetStyle(table.StyleColoredDark)

guardianTable := table.NewWriter()
Expand Down Expand Up @@ -338,41 +336,11 @@ func main() {

// Just count observations
go func() {
uniqueObs := map[string]struct{}{}
uniqueObsInBatch := map[string]struct{}{}
for {
select {
case <-rootCtx.Done():
return
case o := <-obsvC: // TODO: Rip out this code once we cut over to batching.
spl := strings.Split(o.Msg.MessageId, "/")
emitter := strings.ToLower(spl[1])
addr := "0x" + string(hex.EncodeToString(o.Msg.Addr))
idx := guardianIndexMap[strings.ToLower(addr)]
if knownEmitters[emitter] {
gossipCounter[idx][GSM_tbObservation]++
gossipCounter[totalsRow][GSM_tbObservation]++
}
if handleObsv(uint(idx)) {
obsvRateTable.ResetRows()
for i := 0; i < numGuardians; i++ {
obsvRateTable.AppendRow(table.Row{i, obsvRateRows[int(i)].guardianName, obsvRateRows[int(i)].obsvCount, obsvRateRows[uint(i)].percents[0], obsvRateRows[uint(i)].percents[1], obsvRateRows[uint(i)].percents[2], obsvRateRows[uint(i)].percents[3], obsvRateRows[uint(i)].percents[4], obsvRateRows[uint(i)].percents[5], obsvRateRows[uint(i)].percents[6], obsvRateRows[uint(i)].percents[7], obsvRateRows[uint(i)].percents[8], obsvRateRows[uint(i)].percents[9]})
}
}
gossipCounter[idx][GSM_signedObservation]++
gossipCounter[totalsRow][GSM_signedObservation]++

if *loadTesting {
uniqueObs[hex.EncodeToString(o.Msg.Hash)] = struct{}{}
gossipCounter[uniqueRow][GSM_signedObservation] = len(uniqueObs)
}

gossipLock.Lock()
gossipMsgTable.ResetRows()
for idx, r := range gossipCounter {
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]})
}
gossipLock.Unlock()
case batch := <-batchObsvC:
addr := "0x" + string(hex.EncodeToString(batch.Msg.Addr))
idx := guardianIndexMap[strings.ToLower(addr)]
Expand Down Expand Up @@ -406,7 +374,7 @@ func main() {
gossipLock.Lock()
gossipMsgTable.ResetRows()
for idx, r := range gossipCounter {
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]})
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7]})
}
gossipLock.Unlock()
}
Expand All @@ -428,7 +396,7 @@ func main() {
gossipLock.Lock()
gossipMsgTable.ResetRows()
for idx, r := range gossipCounter {
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]})
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7]})
}
gossipLock.Unlock()
}
Expand Down Expand Up @@ -460,7 +428,7 @@ func main() {
gossipLock.Lock()
gossipMsgTable.ResetRows()
for idx, r := range gossipCounter {
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]})
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7]})
}
gossipLock.Unlock()
}
Expand Down Expand Up @@ -529,7 +497,7 @@ func main() {
gossipLock.Lock()
gossipMsgTable.ResetRows()
for idx, r := range gossipCounter {
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]})
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7]})
}
gossipLock.Unlock()
if activeTable == 0 {
Expand Down Expand Up @@ -564,7 +532,7 @@ func main() {
gossipLock.Lock()
gossipMsgTable.ResetRows()
for idx, r := range gossipCounter {
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]})
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7]})
}
gossipLock.Unlock()
}
Expand All @@ -585,7 +553,7 @@ func main() {
gossipLock.Lock()
gossipMsgTable.ResetRows()
for idx, r := range gossipCounter {
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]})
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7]})
}
gossipLock.Unlock()
}
Expand All @@ -610,7 +578,6 @@ func main() {
gst,
rootCtxCancel,
p2p.WithComponents(components),
p2p.WithSignedObservationListener(obsvC),
p2p.WithSignedObservationBatchListener(batchObsvC),
p2p.WithSignedVAAListener(signedInC),
p2p.WithObservationRequestListener(obsvReqC),
Expand Down
12 changes: 0 additions & 12 deletions fly/cmd/historical_uptime/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,11 +294,9 @@ func main() {
defer rootCtxCancel()

// Inbound observations
obsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation], 1024)
batchObsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], 1024)

// Add channel capacity checks
go monitorChannelCapacity(rootCtx, logger, "obsvC", obsvC)
go monitorChannelCapacity(rootCtx, logger, "batchObsvC", batchObsvC)

// Heartbeat updates
Expand Down Expand Up @@ -370,15 +368,6 @@ func main() {
}
logger.Info("Observation cleanup completed.")
return
case o := <-obsvC: // TODO: Rip out this code once we cut over to batching.
obs := historical_uptime.CreateNewObservation(o.Msg.MessageId, o.Msg.Addr, o.Timestamp, o.Msg.Addr)
observationBatch = append(observationBatch, obs)

// if it reaches batchSize then process this batch
if len(observationBatch) >= batchSize {
historical_uptime.ProcessObservationBatch(*db, logger, observationBatch)
observationBatch = observationBatch[:0] // Clear the batch
}
case batch := <-batchObsvC:
for _, signedObs := range batch.Msg.Observations {
obs := historical_uptime.CreateNewObservation(signedObs.MessageId, signedObs.Signature, batch.Timestamp, signedObs.TxHash)
Expand Down Expand Up @@ -477,7 +466,6 @@ func main() {
gst,
rootCtxCancel,
p2p.WithComponents(components),
p2p.WithSignedObservationListener(obsvC),
p2p.WithSignedObservationBatchListener(batchObsvC),
)
if err != nil {
Expand Down
13 changes: 0 additions & 13 deletions fly/cmd/observation_stats/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ func main() {
defer rootCtxCancel()

// Inbound observations
obsvC := make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], 1024)
batchObsvC := make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], 1024)

// Guardian set state managed by processor
Expand All @@ -93,17 +92,6 @@ func main() {
select {
case <-rootCtx.Done():
return
case o := <-obsvC: // TODO: Rip out this code once we cut over to batching.
if o.Msg.MessageId[:3] != "26/" && o.Msg.MessageId[:2] != "7/" {
ga := eth_common.BytesToAddress(o.Msg.Addr).String()
if _, ok := obsvByHash[o.Msg.MessageId]; !ok {
obsvByHash[o.Msg.MessageId] = map[string]time.Time{}
}
if _, ok := obsvByHash[o.Msg.MessageId][ga]; !ok {
obsvByHash[o.Msg.MessageId][ga] = time.Now()
}
logger.Warn("status", zap.String("id", o.Msg.MessageId), zap.Any("msg", obsvByHash[o.Msg.MessageId]))
}
case batch := <-batchObsvC:
for _, o := range batch.Msg.Observations {
if o.MessageId[:3] != "26/" && o.MessageId[:2] != "7/" {
Expand Down Expand Up @@ -139,7 +127,6 @@ func main() {
gst,
rootCtxCancel,
p2p.WithComponents(components),
p2p.WithSignedObservationListener(obsvC),
p2p.WithSignedObservationBatchListener(batchObsvC),
)
if err != nil {
Expand Down
25 changes: 0 additions & 25 deletions fly/cmd/prom_gossip/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ func main() {
defer rootCtxCancel()

// Inbound observations
obsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation], 20000)
batchObsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], 20000)

// Inbound observation requests
Expand Down Expand Up @@ -248,29 +247,6 @@ func main() {
afterCount := len(uniqueObs)
logger.Info("Cleaned up unique observations cache", zap.Int("beforeCount", beforeCount), zap.Int("afterCount", afterCount), zap.Int("cleanedUpCount", beforeCount-afterCount))
timer.Reset(delay)
case o := <-obsvC: // TODO: Rip out this code once we cut over to batching.
gossipByType.WithLabelValues("observation").Inc()
spl := strings.Split(o.Msg.MessageId, "/")
chain, err := parseChainID(spl[0])
if err != nil {
chain = vaa.ChainIDUnset
}
emitter := strings.ToLower(spl[1])
addr := "0x" + string(hex.EncodeToString(o.Msg.Addr))
name := addr
idx, found := guardianIndexMap[strings.ToLower(addr)]
if found {
name = guardianIndexToNameMap[idx]
}
observationsByGuardianPerChain.WithLabelValues(name, chain.String()).Inc()
if knownEmitters[emitter] {
tbObservationsByGuardianPerChain.WithLabelValues(name, chain.String()).Inc()
}
hash := hex.EncodeToString(o.Msg.Hash)
if _, exists := uniqueObs[hash]; !exists {
uniqueObservationsCounter.Inc()
}
uniqueObs[hash] = time.Now()
case batch := <-batchObsvC:
gossipByType.WithLabelValues("batch_observation").Inc()
addr := "0x" + string(hex.EncodeToString(batch.Msg.Addr))
Expand Down Expand Up @@ -431,7 +407,6 @@ func main() {
gst,
rootCtxCancel,
p2p.WithComponents(components),
p2p.WithSignedObservationListener(obsvC),
p2p.WithSignedObservationBatchListener(batchObsvC),
p2p.WithSignedVAAListener(signedInC),
p2p.WithObservationRequestListener(obsvReqC),
Expand Down
4 changes: 0 additions & 4 deletions fly/cmd/track_gossip/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ func main() {
defer rootCtxCancel()

// Inbound observations
obsvC := make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], 1024)
batchObsvC := make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], 1024)

// Inbound observation requests
Expand Down Expand Up @@ -128,8 +127,6 @@ func main() {
select {
case <-rootCtx.Done():
return
case <-obsvC: // TODO: Rip out this code once we cut over to batching.
numObs++
case batch := <-batchObsvC:
numObs += len(batch.Msg.Observations)
case <-signedInC:
Expand Down Expand Up @@ -193,7 +190,6 @@ func main() {
gst,
rootCtxCancel,
p2p.WithComponents(components),
p2p.WithSignedObservationListener(obsvC),
p2p.WithSignedObservationBatchListener(batchObsvC),
p2p.WithSignedVAAListener(signedInC),
p2p.WithObservationRequestListener(obsvReqC),
Expand Down
12 changes: 1 addition & 11 deletions fly/cmd/track_pyth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ func main() {
defer rootCtxCancel()

// Inbound observations
obsvC := make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], 1024)
batchObsvC := make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], 1024)

// Inbound signed VAAs
Expand Down Expand Up @@ -160,14 +159,6 @@ func main() {
select {
case <-rootCtx.Done():
return
case m := <-obsvC: // TODO: Rip out this code once we cut over to batching.
obs := &gossipv1.Observation{
Hash: m.Msg.Hash,
Signature: m.Msg.Signature,
TxHash: m.Msg.TxHash,
MessageId: m.Msg.MessageId,
}
handleObservation(logger, gs, m.Msg.Addr, obs)
case batch := <-batchObsvC:
for _, o := range batch.Msg.Observations {
handleObservation(logger, gs, batch.Msg.Addr, o)
Expand Down Expand Up @@ -235,7 +226,6 @@ func main() {
gst,
rootCtxCancel,
p2p.WithComponents(components),
p2p.WithSignedObservationListener(obsvC),
p2p.WithSignedObservationBatchListener(batchObsvC),
p2p.WithSignedVAAListener(signedInC),
)
Expand Down Expand Up @@ -403,7 +393,7 @@ func handleObservation(logger *zap.Logger, gs common.GuardianSet, addr []byte, m
return
}

// Hooray! Now, we have verified all fields on SignedObservation and know that it includes
// Hooray! Now, we have verified all fields on observation and know that it includes
// a valid signature by an active guardian. We still don't fully trust them, as they may be
// byzantine, but now we know who we're dealing with.

Expand Down
Loading

0 comments on commit 9dc8b09

Please sign in to comment.