Skip to content

Commit

Permalink
Minor fixes to OCR capability (#364)
Browse files Browse the repository at this point in the history
1. Prevent adding duplicate request IDs
2. Add missing AppendWorkflowIDs() call
3. Fix WorkflowExecutionID in transmitResponse
4. More logging
5. Better test data in Mercury codec (soon to be replaced with real logic)
  • Loading branch information
bolekk authored Feb 22, 2024
1 parent e19522a commit d1a00d5
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func (a *dataFeedsAggregator) Aggregate(previousOutcome *types.AggregationOutcom
}
}
}
a.lggr.Debugw("collected latestReportPerFeed", "len", len(latestReportPerFeed))

currentState := &DataFeedsOutcomeMetadata{}
if previousOutcome != nil {
Expand Down Expand Up @@ -117,6 +118,7 @@ func (a *dataFeedsAggregator) Aggregate(previousOutcome *types.AggregationOutcom
return nil, err
}

a.lggr.Debugw("Aggregate complete", "nReportsNeedingUpdate", len(reportsNeedingUpdate))
return &types.AggregationOutcome{
EncodableOutcome: reportsProto.GetMapValue(),
Metadata: marshalledState,
Expand All @@ -143,7 +145,7 @@ func NewDataFeedsAggregator(config values.Map, mercuryCodec MercuryCodec, lggr l
return &dataFeedsAggregator{
config: parsedConfig,
mercuryCodec: mercuryCodec,
lggr: lggr,
lggr: logger.Named(lggr, "DataFeedsAggregator"),
}, nil
}

Expand Down
13 changes: 10 additions & 3 deletions pkg/capabilities/consensus/ocr3/reporting_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (r *reportingPlugin) Observation(ctx context.Context, outctx ocr3types.Outc
obs.Observations = append(obs.Observations, r)
}

r.lggr.Debugw("Observation complete", "len", len(obs.Observations))
r.lggr.Debugw("Observation complete", "len", len(obs.Observations), "queryLen", len(queryReq.Ids))
return proto.Marshal(obs)
}

Expand Down Expand Up @@ -198,7 +198,7 @@ func (r *reportingPlugin) Outcome(outctx ocr3types.OutcomeContext, query types.Q
o.Outcomes[weid.WorkflowId] = outcome
}

r.lggr.Debugw("Outcome complete", "len", len(o.Outcomes))
r.lggr.Debugw("Outcome complete", "len", len(o.Outcomes), "nReportsToGenerate", len(o.ReportsToGenerate))
return proto.Marshal(o)
}

Expand All @@ -214,6 +214,11 @@ func (r *reportingPlugin) Reports(seqNr uint64, outcome ocr3types.Outcome) ([]oc
// This doesn't handle a query which contains the same workflowId multiple times.
for _, report := range o.ReportsToGenerate {
outcome, id := report.Outcome, report.Id
outcome, err := pbtypes.AppendWorkflowIDs(outcome, id.WorkflowId, id.WorkflowExecutionId)
if err != nil {
r.lggr.Errorw("could not append IDs")
continue
}
mv, err := values.FromMapValueProto(outcome.EncodableOutcome)
if err != nil {
r.lggr.Errorw("could not convert outcome to value", "workflowID", id.WorkflowId)
Expand Down Expand Up @@ -244,6 +249,7 @@ func (r *reportingPlugin) Reports(seqNr uint64, outcome ocr3types.Outcome) ([]oc
})
}

r.lggr.Debugw("Reports complete", "len", len(reports))
return reports, nil
}

Expand All @@ -261,9 +267,10 @@ func (r *reportingPlugin) ShouldAcceptAttestedReport(ctx context.Context, seqNr
return false, err
}

r.lggr.Debugw("ShouldAcceptAttestedReport transmitting", "len", len(b.Underlying))
err = r.r.transmitResponse(ctx, response{
Value: b,
WorkflowExecutionID: id.WorkflowId,
WorkflowExecutionID: id.WorkflowExecutionId,
})
if err != nil {
r.lggr.Errorw("could not transmit response", "error", err, "weid", id.WorkflowExecutionId)
Expand Down
3 changes: 3 additions & 0 deletions pkg/capabilities/consensus/ocr3/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ func (s *store) add(ctx context.Context, req *request) error {
s.mu.Lock()
defer s.mu.Unlock()

if _, ok := s.requests[req.WorkflowExecutionID]; ok {
return fmt.Errorf("request with id %s already exists", req.WorkflowExecutionID)
}
now := s.clock.Now()
req.ExpiresAt = now.Add(s.requestTimeout)
s.requestIDs = append(s.requestIDs, req.WorkflowExecutionID)
Expand Down
5 changes: 5 additions & 0 deletions pkg/capabilities/consensus/ocr3/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ func TestOCR3Store(t *testing.T) {
require.NoError(t, err)
})

t.Run("add duplicate", func(t *testing.T) {
err := s.add(ctx, req)
require.Error(t, err)
})

t.Run("get", func(t *testing.T) {
got, err := s.get(ctx, rid)
require.NoError(t, err)
Expand Down
20 changes: 17 additions & 3 deletions pkg/capabilities/mercury/mercury.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/hex"
"errors"
"strings"
"time"

"github.com/shopspring/decimal"

Expand Down Expand Up @@ -55,11 +56,24 @@ type Codec struct {
}

func (m Codec) Unwrap(raw values.Value) (ReportSet, error) {
now := uint32(time.Now().Unix())
return ReportSet{
Reports: map[FeedID]Report{
FeedID("012345678901234567890123456789012345678901234567890123456789000000"): {
FeedID("0x1111111111111111111100000000000000000000000000000000000000000000"): {
Info: ReportInfo{
Timestamp: 42,
Timestamp: now,
Price: 100.00,
},
},
FeedID("0x2222222222222222222200000000000000000000000000000000000000000000"): {
Info: ReportInfo{
Timestamp: now,
Price: 100.00,
},
},
FeedID("0x3333333333333333333300000000000000000000000000000000000000000000"): {
Info: ReportInfo{
Timestamp: now,
Price: 100.00,
},
},
Expand All @@ -70,7 +84,7 @@ func (m Codec) Unwrap(raw values.Value) (ReportSet, error) {
func (m Codec) Wrap(reportSet ReportSet) (values.Value, error) {
return values.NewMap(
map[string]any{
"0123456789": map[string]any{
"0x1111111111111111111100000000000000000000000000000000000000000000": map[string]any{
"timestamp": 42,
"price": decimal.NewFromFloat(100.00),
},
Expand Down

0 comments on commit d1a00d5

Please sign in to comment.