From 15e8d93b341ab9d89396770759b4c6ecdb89664a Mon Sep 17 00:00:00 2001 From: Bolek <1416262+bolekk@users.noreply.github.com> Date: Thu, 21 Sep 2023 07:23:16 -0700 Subject: [PATCH] [Functions] Fix bugs in s4 plugin (#10742) 1. Observation was appending unnecessary entries 2. Report wasn't following the order of entries from observation set --- core/services/ocr2/plugins/s4/plugin.go | 15 ++-- core/services/ocr2/plugins/s4/plugin_test.go | 78 ++++++++++++++++++-- 2 files changed, 83 insertions(+), 10 deletions(-) diff --git a/core/services/ocr2/plugins/s4/plugin.go b/core/services/ocr2/plugins/s4/plugin.go index 23041b3b91e..0aca93e55e6 100644 --- a/core/services/ocr2/plugins/s4/plugin.go +++ b/core/services/ocr2/plugins/s4/plugin.go @@ -79,9 +79,9 @@ func (c *plugin) Query(ctx context.Context, ts types.ReportTimestamp) (types.Que c.addressRange.Advance() c.logger.Debug("S4StorageReporting Query", commontypes.LogFields{ - "epoch": ts.Epoch, - "round": ts.Round, - "nRows": len(rows), + "epoch": ts.Epoch, + "round": ts.Round, + "nSnapshotRows": len(rows), }) return queryBytes, err @@ -129,18 +129,20 @@ func (c *plugin) Observation(ctx context.Context, ts types.ReportTimestamp, quer snapshotVersionsMap := snapshotToVersionMap(snapshot) toBeAdded := make([]rkey, 0) + // Add rows from query snapshot that have a higher version locally. for _, qr := range queryRows { address := UnmarshalAddress(qr.Address) k := key{address: address.String(), slotID: uint(qr.Slotid)} if version, ok := snapshotVersionsMap[k]; ok && version > qr.Version { toBeAdded = append(toBeAdded, rkey{address: address, slotID: uint(qr.Slotid)}) - delete(snapshotVersionsMap, k) } + delete(snapshotVersionsMap, k) } if len(toBeAdded) > maxRemainingRows { toBeAdded = toBeAdded[:maxRemainingRows] } else { + // Add rows from query address range that exist locally but are missing from query snapshot. for _, sr := range snapshot { if !sr.Confirmed { continue @@ -180,6 +182,7 @@ func (c *plugin) Report(_ context.Context, ts types.ReportTimestamp, _ types.Que promReportingPluginReport.WithLabelValues(c.config.ProductName).Inc() reportMap := make(map[key]*Row) + reportKeys := []key{} for _, ao := range aos { observationRows, err := UnmarshalRows(ao.Observation) @@ -202,11 +205,13 @@ func (c *plugin) Report(_ context.Context, ts types.ReportTimestamp, _ types.Que continue } reportMap[mkey] = row + reportKeys = append(reportKeys, mkey) } } reportRows := make([]*Row, 0) - for _, row := range reportMap { + for _, key := range reportKeys { + row := reportMap[key] reportRows = append(reportRows, row) if len(reportRows) >= int(c.config.MaxReportEntries) { diff --git a/core/services/ocr2/plugins/s4/plugin_test.go b/core/services/ocr2/plugins/s4/plugin_test.go index 0d37103936f..b85ba053122 100644 --- a/core/services/ocr2/plugins/s4/plugin_test.go +++ b/core/services/ocr2/plugins/s4/plugin_test.go @@ -345,7 +345,7 @@ func TestPlugin_Observation(t *testing.T) { ormRows := generateTestOrmRows(t, int(config.MaxObservationEntries), time.Minute) snapshot := make([]*s4_svc.SnapshotRow, len(ormRows)) for i, or := range ormRows { - or.Confirmed = i < numUnconfirmed + or.Confirmed = i < numUnconfirmed // First half are confirmed or.Version = uint64(i) snapshot[i] = &s4_svc.SnapshotRow{ Address: or.Address, @@ -355,21 +355,23 @@ func TestPlugin_Observation(t *testing.T) { } } orm.On("DeleteExpired", uint(10), mock.Anything, mock.Anything).Return(int64(10), nil).Once() - orm.On("GetUnconfirmedRows", config.MaxObservationEntries, mock.Anything).Return(ormRows[:numUnconfirmed], nil).Once() + orm.On("GetUnconfirmedRows", config.MaxObservationEntries, mock.Anything).Return(ormRows[numUnconfirmed:], nil).Once() orm.On("GetSnapshot", mock.Anything, mock.Anything).Return(snapshot, nil).Once() snapshotRows := rowsToShapshotRows(ormRows) query := &s4.Query{ Rows: make([]*s4.SnapshotRow, len(snapshotRows)), } + numHigherVersion := 2 for i, v := range snapshotRows { query.Rows[i] = &s4.SnapshotRow{ Address: v.Address.Bytes(), Slotid: uint32(v.SlotId), Version: v.Version, } - if i < 5 { + if i < numHigherVersion { ormRows[i].Version++ + snapshot[i].Version++ orm.On("Get", v.Address, v.SlotId, mock.Anything).Return(ormRows[i], nil).Once() } } @@ -382,11 +384,66 @@ func TestPlugin_Observation(t *testing.T) { rows := &s4.Rows{} err = proto.Unmarshal(observation, rows) assert.NoError(t, err) - assert.Len(t, rows.Rows, int(config.MaxObservationEntries)) + assert.Len(t, rows.Rows, numUnconfirmed+numHigherVersion) for i := 0; i < numUnconfirmed; i++ { - assert.Equal(t, ormRows[i].Version, rows.Rows[i].Version) + assert.Equal(t, ormRows[numUnconfirmed+i].Version, rows.Rows[i].Version) + } + for i := 0; i < numHigherVersion; i++ { + assert.Equal(t, ormRows[i].Version, rows.Rows[numUnconfirmed+i].Version) + } + }) + + t.Run("missing from query", func(t *testing.T) { + vLow, vHigh := uint64(2), uint64(5) + ormRows := generateTestOrmRows(t, 3, time.Minute) + // Follower node has 3 confirmed entries with latest versions. + snapshot := make([]*s4_svc.SnapshotRow, len(ormRows)) + for i, or := range ormRows { + or.Confirmed = true + or.Version = vHigh + snapshot[i] = &s4_svc.SnapshotRow{ + Address: or.Address, + SlotId: or.SlotId, + Version: or.Version, + Confirmed: or.Confirmed, + } + } + + // Query snapshot has: + // - First entry with same version + // - Second entry with lower version + // - Third entry missing + query := &s4.Query{ + Rows: []*s4.SnapshotRow{ + &s4.SnapshotRow{ + Address: snapshot[0].Address.Bytes(), + Slotid: uint32(snapshot[0].SlotId), + Version: vHigh, + }, + &s4.SnapshotRow{ + Address: snapshot[1].Address.Bytes(), + Slotid: uint32(snapshot[1].SlotId), + Version: vLow, + }, + }, } + queryBytes, err := proto.Marshal(query) + assert.NoError(t, err) + + orm.On("DeleteExpired", uint(10), mock.Anything, mock.Anything).Return(int64(10), nil).Once() + orm.On("GetUnconfirmedRows", config.MaxObservationEntries, mock.Anything).Return([]*s4_svc.Row{}, nil).Once() + orm.On("GetSnapshot", mock.Anything, mock.Anything).Return(snapshot, nil).Once() + orm.On("Get", snapshot[1].Address, snapshot[1].SlotId, mock.Anything).Return(ormRows[1], nil).Once() + orm.On("Get", snapshot[2].Address, snapshot[2].SlotId, mock.Anything).Return(ormRows[2], nil).Once() + + observation, err := plugin.Observation(testutils.Context(t), types.ReportTimestamp{}, queryBytes) + assert.NoError(t, err) + + rows := &s4.Rows{} + err = proto.Unmarshal(observation, rows) + assert.NoError(t, err) + assert.Len(t, rows.Rows, 2) }) } @@ -419,4 +476,15 @@ func TestPlugin_Report(t *testing.T) { err = proto.Unmarshal(report, reportRows) assert.NoError(t, err) assert.Len(t, reportRows.Rows, 10) + + ok2, report2, err2 := plugin.Report(testutils.Context(t), types.ReportTimestamp{}, nil, aos) + assert.NoError(t, err2) + assert.True(t, ok2) + + reportRows2 := &s4.Rows{} + err = proto.Unmarshal(report2, reportRows2) + assert.NoError(t, err) + + // Verify that the same report was produced + assert.Equal(t, reportRows, reportRows2) }