Skip to content

Commit

Permalink
[Functions] Fix bugs in s4 plugin (#10742)
Browse files Browse the repository at this point in the history
1. Observation was appending unnecessary entries
2. Report wasn't following the order of entries from observation set
  • Loading branch information
bolekk authored Sep 21, 2023
1 parent 48ad8fb commit 15e8d93
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 10 deletions.
15 changes: 10 additions & 5 deletions core/services/ocr2/plugins/s4/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ func (c *plugin) Query(ctx context.Context, ts types.ReportTimestamp) (types.Que
c.addressRange.Advance()

c.logger.Debug("S4StorageReporting Query", commontypes.LogFields{
"epoch": ts.Epoch,
"round": ts.Round,
"nRows": len(rows),
"epoch": ts.Epoch,
"round": ts.Round,
"nSnapshotRows": len(rows),
})

return queryBytes, err
Expand Down Expand Up @@ -129,18 +129,20 @@ func (c *plugin) Observation(ctx context.Context, ts types.ReportTimestamp, quer

snapshotVersionsMap := snapshotToVersionMap(snapshot)
toBeAdded := make([]rkey, 0)
// Add rows from query snapshot that have a higher version locally.
for _, qr := range queryRows {
address := UnmarshalAddress(qr.Address)
k := key{address: address.String(), slotID: uint(qr.Slotid)}
if version, ok := snapshotVersionsMap[k]; ok && version > qr.Version {
toBeAdded = append(toBeAdded, rkey{address: address, slotID: uint(qr.Slotid)})
delete(snapshotVersionsMap, k)
}
delete(snapshotVersionsMap, k)
}

if len(toBeAdded) > maxRemainingRows {
toBeAdded = toBeAdded[:maxRemainingRows]
} else {
// Add rows from query address range that exist locally but are missing from query snapshot.
for _, sr := range snapshot {
if !sr.Confirmed {
continue
Expand Down Expand Up @@ -180,6 +182,7 @@ func (c *plugin) Report(_ context.Context, ts types.ReportTimestamp, _ types.Que
promReportingPluginReport.WithLabelValues(c.config.ProductName).Inc()

reportMap := make(map[key]*Row)
reportKeys := []key{}

for _, ao := range aos {
observationRows, err := UnmarshalRows(ao.Observation)
Expand All @@ -202,11 +205,13 @@ func (c *plugin) Report(_ context.Context, ts types.ReportTimestamp, _ types.Que
continue
}
reportMap[mkey] = row
reportKeys = append(reportKeys, mkey)
}
}

reportRows := make([]*Row, 0)
for _, row := range reportMap {
for _, key := range reportKeys {
row := reportMap[key]
reportRows = append(reportRows, row)

if len(reportRows) >= int(c.config.MaxReportEntries) {
Expand Down
78 changes: 73 additions & 5 deletions core/services/ocr2/plugins/s4/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func TestPlugin_Observation(t *testing.T) {
ormRows := generateTestOrmRows(t, int(config.MaxObservationEntries), time.Minute)
snapshot := make([]*s4_svc.SnapshotRow, len(ormRows))
for i, or := range ormRows {
or.Confirmed = i < numUnconfirmed
or.Confirmed = i < numUnconfirmed // First half are confirmed
or.Version = uint64(i)
snapshot[i] = &s4_svc.SnapshotRow{
Address: or.Address,
Expand All @@ -355,21 +355,23 @@ func TestPlugin_Observation(t *testing.T) {
}
}
orm.On("DeleteExpired", uint(10), mock.Anything, mock.Anything).Return(int64(10), nil).Once()
orm.On("GetUnconfirmedRows", config.MaxObservationEntries, mock.Anything).Return(ormRows[:numUnconfirmed], nil).Once()
orm.On("GetUnconfirmedRows", config.MaxObservationEntries, mock.Anything).Return(ormRows[numUnconfirmed:], nil).Once()
orm.On("GetSnapshot", mock.Anything, mock.Anything).Return(snapshot, nil).Once()

snapshotRows := rowsToShapshotRows(ormRows)
query := &s4.Query{
Rows: make([]*s4.SnapshotRow, len(snapshotRows)),
}
numHigherVersion := 2
for i, v := range snapshotRows {
query.Rows[i] = &s4.SnapshotRow{
Address: v.Address.Bytes(),
Slotid: uint32(v.SlotId),
Version: v.Version,
}
if i < 5 {
if i < numHigherVersion {
ormRows[i].Version++
snapshot[i].Version++
orm.On("Get", v.Address, v.SlotId, mock.Anything).Return(ormRows[i], nil).Once()
}
}
Expand All @@ -382,11 +384,66 @@ func TestPlugin_Observation(t *testing.T) {
rows := &s4.Rows{}
err = proto.Unmarshal(observation, rows)
assert.NoError(t, err)
assert.Len(t, rows.Rows, int(config.MaxObservationEntries))
assert.Len(t, rows.Rows, numUnconfirmed+numHigherVersion)

for i := 0; i < numUnconfirmed; i++ {
assert.Equal(t, ormRows[i].Version, rows.Rows[i].Version)
assert.Equal(t, ormRows[numUnconfirmed+i].Version, rows.Rows[i].Version)
}
for i := 0; i < numHigherVersion; i++ {
assert.Equal(t, ormRows[i].Version, rows.Rows[numUnconfirmed+i].Version)
}
})

t.Run("missing from query", func(t *testing.T) {
vLow, vHigh := uint64(2), uint64(5)
ormRows := generateTestOrmRows(t, 3, time.Minute)
// Follower node has 3 confirmed entries with latest versions.
snapshot := make([]*s4_svc.SnapshotRow, len(ormRows))
for i, or := range ormRows {
or.Confirmed = true
or.Version = vHigh
snapshot[i] = &s4_svc.SnapshotRow{
Address: or.Address,
SlotId: or.SlotId,
Version: or.Version,
Confirmed: or.Confirmed,
}
}

// Query snapshot has:
// - First entry with same version
// - Second entry with lower version
// - Third entry missing
query := &s4.Query{
Rows: []*s4.SnapshotRow{
&s4.SnapshotRow{
Address: snapshot[0].Address.Bytes(),
Slotid: uint32(snapshot[0].SlotId),
Version: vHigh,
},
&s4.SnapshotRow{
Address: snapshot[1].Address.Bytes(),
Slotid: uint32(snapshot[1].SlotId),
Version: vLow,
},
},
}
queryBytes, err := proto.Marshal(query)
assert.NoError(t, err)

orm.On("DeleteExpired", uint(10), mock.Anything, mock.Anything).Return(int64(10), nil).Once()
orm.On("GetUnconfirmedRows", config.MaxObservationEntries, mock.Anything).Return([]*s4_svc.Row{}, nil).Once()
orm.On("GetSnapshot", mock.Anything, mock.Anything).Return(snapshot, nil).Once()
orm.On("Get", snapshot[1].Address, snapshot[1].SlotId, mock.Anything).Return(ormRows[1], nil).Once()
orm.On("Get", snapshot[2].Address, snapshot[2].SlotId, mock.Anything).Return(ormRows[2], nil).Once()

observation, err := plugin.Observation(testutils.Context(t), types.ReportTimestamp{}, queryBytes)
assert.NoError(t, err)

rows := &s4.Rows{}
err = proto.Unmarshal(observation, rows)
assert.NoError(t, err)
assert.Len(t, rows.Rows, 2)
})
}

Expand Down Expand Up @@ -419,4 +476,15 @@ func TestPlugin_Report(t *testing.T) {
err = proto.Unmarshal(report, reportRows)
assert.NoError(t, err)
assert.Len(t, reportRows.Rows, 10)

ok2, report2, err2 := plugin.Report(testutils.Context(t), types.ReportTimestamp{}, nil, aos)
assert.NoError(t, err2)
assert.True(t, ok2)

reportRows2 := &s4.Rows{}
err = proto.Unmarshal(report2, reportRows2)
assert.NoError(t, err)

// Verify that the same report was produced
assert.Equal(t, reportRows, reportRows2)
}

0 comments on commit 15e8d93

Please sign in to comment.