diff --git a/pkg/capabilities/consensus/ocr3/datafeeds/feeds_aggregator.go b/pkg/capabilities/consensus/ocr3/datafeeds/feeds_aggregator.go index 71113958a..1fbb4b5a1 100644 --- a/pkg/capabilities/consensus/ocr3/datafeeds/feeds_aggregator.go +++ b/pkg/capabilities/consensus/ocr3/datafeeds/feeds_aggregator.go @@ -68,6 +68,7 @@ func (a *dataFeedsAggregator) Aggregate(previousOutcome *types.AggregationOutcom } } } + a.lggr.Debugw("collected latestReportPerFeed", "len", len(latestReportPerFeed)) currentState := &DataFeedsOutcomeMetadata{} if previousOutcome != nil { @@ -117,6 +118,7 @@ func (a *dataFeedsAggregator) Aggregate(previousOutcome *types.AggregationOutcom return nil, err } + a.lggr.Debugw("Aggregate complete", "nReportsNeedingUpdate", len(reportsNeedingUpdate)) return &types.AggregationOutcome{ EncodableOutcome: reportsProto.GetMapValue(), Metadata: marshalledState, @@ -143,7 +145,7 @@ func NewDataFeedsAggregator(config values.Map, mercuryCodec MercuryCodec, lggr l return &dataFeedsAggregator{ config: parsedConfig, mercuryCodec: mercuryCodec, - lggr: lggr, + lggr: logger.Named(lggr, "DataFeedsAggregator"), }, nil } diff --git a/pkg/capabilities/consensus/ocr3/reporting_plugin.go b/pkg/capabilities/consensus/ocr3/reporting_plugin.go index 5c2c328e1..a0924f409 100644 --- a/pkg/capabilities/consensus/ocr3/reporting_plugin.go +++ b/pkg/capabilities/consensus/ocr3/reporting_plugin.go @@ -99,7 +99,7 @@ func (r *reportingPlugin) Observation(ctx context.Context, outctx ocr3types.Outc obs.Observations = append(obs.Observations, r) } - r.lggr.Debugw("Observation complete", "len", len(obs.Observations)) + r.lggr.Debugw("Observation complete", "len", len(obs.Observations), "queryLen", len(queryReq.Ids)) return proto.Marshal(obs) } @@ -198,7 +198,7 @@ func (r *reportingPlugin) Outcome(outctx ocr3types.OutcomeContext, query types.Q o.Outcomes[weid.WorkflowId] = outcome } - r.lggr.Debugw("Outcome complete", "len", len(o.Outcomes)) + r.lggr.Debugw("Outcome complete", "len", len(o.Outcomes), "nReportsToGenerate", len(o.ReportsToGenerate)) return proto.Marshal(o) } @@ -214,6 +214,11 @@ func (r *reportingPlugin) Reports(seqNr uint64, outcome ocr3types.Outcome) ([]oc // This doesn't handle a query which contains the same workflowId multiple times. for _, report := range o.ReportsToGenerate { outcome, id := report.Outcome, report.Id + outcome, err := pbtypes.AppendWorkflowIDs(outcome, id.WorkflowId, id.WorkflowExecutionId) + if err != nil { + r.lggr.Errorw("could not append IDs") + continue + } mv, err := values.FromMapValueProto(outcome.EncodableOutcome) if err != nil { r.lggr.Errorw("could not convert outcome to value", "workflowID", id.WorkflowId) @@ -244,6 +249,7 @@ func (r *reportingPlugin) Reports(seqNr uint64, outcome ocr3types.Outcome) ([]oc }) } + r.lggr.Debugw("Reports complete", "len", len(reports)) return reports, nil } @@ -261,9 +267,10 @@ func (r *reportingPlugin) ShouldAcceptAttestedReport(ctx context.Context, seqNr return false, err } + r.lggr.Debugw("ShouldAcceptAttestedReport transmitting", "len", len(b.Underlying)) err = r.r.transmitResponse(ctx, response{ Value: b, - WorkflowExecutionID: id.WorkflowId, + WorkflowExecutionID: id.WorkflowExecutionId, }) if err != nil { r.lggr.Errorw("could not transmit response", "error", err, "weid", id.WorkflowExecutionId) diff --git a/pkg/capabilities/consensus/ocr3/store.go b/pkg/capabilities/consensus/ocr3/store.go index ccdd7a771..8b2c23cf1 100644 --- a/pkg/capabilities/consensus/ocr3/store.go +++ b/pkg/capabilities/consensus/ocr3/store.go @@ -33,6 +33,9 @@ func (s *store) add(ctx context.Context, req *request) error { s.mu.Lock() defer s.mu.Unlock() + if _, ok := s.requests[req.WorkflowExecutionID]; ok { + return fmt.Errorf("request with id %s already exists", req.WorkflowExecutionID) + } now := s.clock.Now() req.ExpiresAt = now.Add(s.requestTimeout) s.requestIDs = append(s.requestIDs, req.WorkflowExecutionID) diff --git a/pkg/capabilities/consensus/ocr3/store_test.go b/pkg/capabilities/consensus/ocr3/store_test.go index ce5c4fe35..f23fe1de7 100644 --- a/pkg/capabilities/consensus/ocr3/store_test.go +++ b/pkg/capabilities/consensus/ocr3/store_test.go @@ -30,6 +30,11 @@ func TestOCR3Store(t *testing.T) { require.NoError(t, err) }) + t.Run("add duplicate", func(t *testing.T) { + err := s.add(ctx, req) + require.Error(t, err) + }) + t.Run("get", func(t *testing.T) { got, err := s.get(ctx, rid) require.NoError(t, err) diff --git a/pkg/capabilities/mercury/mercury.go b/pkg/capabilities/mercury/mercury.go index 647efea9a..340c47d4f 100644 --- a/pkg/capabilities/mercury/mercury.go +++ b/pkg/capabilities/mercury/mercury.go @@ -4,6 +4,7 @@ import ( "encoding/hex" "errors" "strings" + "time" "github.com/shopspring/decimal" @@ -55,11 +56,24 @@ type Codec struct { } func (m Codec) Unwrap(raw values.Value) (ReportSet, error) { + now := uint32(time.Now().Unix()) return ReportSet{ Reports: map[FeedID]Report{ - FeedID("012345678901234567890123456789012345678901234567890123456789000000"): { + FeedID("0x1111111111111111111100000000000000000000000000000000000000000000"): { Info: ReportInfo{ - Timestamp: 42, + Timestamp: now, + Price: 100.00, + }, + }, + FeedID("0x2222222222222222222200000000000000000000000000000000000000000000"): { + Info: ReportInfo{ + Timestamp: now, + Price: 100.00, + }, + }, + FeedID("0x3333333333333333333300000000000000000000000000000000000000000000"): { + Info: ReportInfo{ + Timestamp: now, Price: 100.00, }, }, @@ -70,7 +84,7 @@ func (m Codec) Unwrap(raw values.Value) (ReportSet, error) { func (m Codec) Wrap(reportSet ReportSet) (values.Value, error) { return values.NewMap( map[string]any{ - "0123456789": map[string]any{ + "0x1111111111111111111100000000000000000000000000000000000000000000": map[string]any{ "timestamp": 42, "price": decimal.NewFromFloat(100.00), },