From 90c13e0a415c13cd6e628dd127e80aff623a2d34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bla=C5=BE=20Hrastnik?= Date: Wed, 10 Jul 2024 15:10:27 +0900 Subject: [PATCH 1/5] keystone: Add a mock-streams-trigger --- core/capabilities/streams/mock_trigger.go | 471 ++++++++++++++++++++++ core/services/chainlink/application.go | 8 + 2 files changed, 479 insertions(+) create mode 100644 core/capabilities/streams/mock_trigger.go diff --git a/core/capabilities/streams/mock_trigger.go b/core/capabilities/streams/mock_trigger.go new file mode 100644 index 00000000000..387ee65bd01 --- /dev/null +++ b/core/capabilities/streams/mock_trigger.go @@ -0,0 +1,471 @@ +package streams + +// NOTE: this file is an amalgamation of MercuryTrigger and the streams trigger load tests +// the mercury trigger was modified to contain non-empty meta and sign the report with mock keys + +import ( + "context" + "crypto/ecdsa" + "fmt" + "math/big" + "strconv" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/smartcontractkit/libocr/offchainreporting2plus/chains/evmutil" + ocrTypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/datastreams" + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/types/core" + v3 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v3" + "github.com/smartcontractkit/chainlink-common/pkg/values" + + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v3/reportcodec" +) + +const ( + baseTimestamp = 1000000000 +) + +func RegisterMockTrigger(lggr logger.Logger, capRegistry core.CapabilitiesRegistry) (*MockTriggerService, error) { + ctx := context.TODO() + trigger := NewMockTriggerService(100, lggr) + if err := trigger.Start(ctx); err != nil { + return nil, err + } + if err := capRegistry.Add(ctx, trigger); err != nil { + return nil, err + } + + producer := NewMockDataProducer(trigger, lggr) + if err := producer.Start(ctx); err != nil { + return nil, err + } + + return trigger, nil +} + +// NOTE: duplicated from trigger_test.go +func newReport(lggr logger.Logger, feedID [32]byte, price *big.Int, timestamp int64) []byte { + v3Codec := reportcodec.NewReportCodec(feedID, lggr) + raw, err := v3Codec.BuildReport(v3.ReportFields{ + BenchmarkPrice: price, + Timestamp: uint32(timestamp), + ValidFromTimestamp: uint32(timestamp), + Bid: price, + Ask: price, + LinkFee: price, + NativeFee: price, + ExpiresAt: uint32(timestamp + 1000000), + }) + if err != nil { + panic(err) + } + return raw +} + +func rawReportContext(reportCtx ocrTypes.ReportContext) []byte { + rc := evmutil.RawReportContext(reportCtx) + flat := []byte{} + for _, r := range rc { + flat = append(flat, r[:]...) + } + return flat +} + +const triggerID = "mock-streams-trigger@1.0.0" + +var capInfo = capabilities.MustNewCapabilityInfo( + triggerID, + capabilities.CapabilityTypeTrigger, + "Mock Streams Trigger", +) + +const defaultTickerResolutionMs = 1000 + +// TODO pending capabilities configuration implementation - this should be configurable with a sensible default +const defaultSendChannelBufferSize = 1000 + +type config struct { + // strings should be hex-encoded 32-byte values, prefixed with "0x", all lowercase, minimum 1 item + FeedIDs []string `json:"feedIds" jsonschema:"pattern=^0x[0-9a-f]{64}$,minItems=1"` + // must be greater than 0 + MaxFrequencyMs int `json:"maxFrequencyMs" jsonschema:"minimum=1"` +} + +type inputs struct { + TriggerID string `json:"triggerId"` +} + +var mercuryTriggerValidator = capabilities.NewValidator[config, inputs, capabilities.TriggerEvent](capabilities.ValidatorArgs{Info: capInfo}) + +// This Trigger Service allows for the registration and deregistration of triggers. You can also send reports to the service. +type MockTriggerService struct { + capabilities.Validator[config, inputs, capabilities.TriggerEvent] + capabilities.CapabilityInfo + tickerResolutionMs int64 + subscribers map[string]*subscriber + latestReports map[datastreams.FeedID]datastreams.FeedReport + mu sync.Mutex + stopCh services.StopChan + wg sync.WaitGroup + lggr logger.Logger + + // + meta datastreams.SignersMetadata + signers []*ecdsa.PrivateKey + // +} + +var _ capabilities.TriggerCapability = (*MockTriggerService)(nil) +var _ services.Service = &MockTriggerService{} + +type subscriber struct { + ch chan<- capabilities.CapabilityResponse + workflowID string + config config +} + +// Mock Trigger will send events to each subscriber every MaxFrequencyMs (configurable per subscriber). +// Event generation happens whenever local unix time is a multiple of tickerResolutionMs. Therefore, +// all subscribers' MaxFrequencyMs values need to be a multiple of tickerResolutionMs. +func NewMockTriggerService(tickerResolutionMs int64, lggr logger.Logger) *MockTriggerService { + if tickerResolutionMs == 0 { + tickerResolutionMs = defaultTickerResolutionMs + } + // + f := 1 + meta := datastreams.SignersMetadata{MinRequiredSignatures: 2*f + 1} + // gen private keys for MinRequiredSignatures + signers := []*ecdsa.PrivateKey{} + for i := 0; i < meta.MinRequiredSignatures; i++ { + // test keys: need to be the same across nodes + bytes := make([]byte, 32) + bytes[31] = uint8(i + 1) + + privKey, err := crypto.ToECDSA(bytes) + if err != nil { + panic(err) + } + signers = append(signers, privKey) + + signerAddr := crypto.PubkeyToAddress(privKey.PublicKey).Bytes() + meta.Signers = append(meta.Signers, signerAddr) + } + // + return &MockTriggerService{ + Validator: mercuryTriggerValidator, + CapabilityInfo: capInfo, + tickerResolutionMs: tickerResolutionMs, + subscribers: make(map[string]*subscriber), + latestReports: make(map[datastreams.FeedID]datastreams.FeedReport), + stopCh: make(services.StopChan), + lggr: lggr.Named("MockTriggerService"), + meta: meta, + signers: signers} +} + +func (o *MockTriggerService) ProcessReport(reports []datastreams.FeedReport) error { + o.mu.Lock() + defer o.mu.Unlock() + o.lggr.Debugw("ProcessReport", "nReports", len(reports)) + for _, report := range reports { + feedID := datastreams.FeedID(report.FeedID) + o.latestReports[feedID] = report + } + return nil +} + +func (o *MockTriggerService) RegisterTrigger(ctx context.Context, req capabilities.CapabilityRequest) (<-chan capabilities.CapabilityResponse, error) { + wid := req.Metadata.WorkflowID + + o.mu.Lock() + defer o.mu.Unlock() + + config, err := o.ValidateConfig(req.Config) + if err != nil { + return nil, err + } + + inputs, err := o.ValidateInputs(req.Inputs) + if err != nil { + return nil, err + } + + triggerID := o.getTriggerID(inputs.TriggerID, wid) + // If triggerId is already registered, return an error + if _, ok := o.subscribers[triggerID]; ok { + return nil, fmt.Errorf("triggerId %s already registered", triggerID) + } + + if int64(config.MaxFrequencyMs)%o.tickerResolutionMs != 0 { + return nil, fmt.Errorf("MaxFrequencyMs must be a multiple of %d", o.tickerResolutionMs) + } + + ch := make(chan capabilities.CapabilityResponse, defaultSendChannelBufferSize) + o.subscribers[triggerID] = + &subscriber{ + ch: ch, + workflowID: wid, + config: *config, + } + return ch, nil +} + +func (o *MockTriggerService) UnregisterTrigger(ctx context.Context, req capabilities.CapabilityRequest) error { + wid := req.Metadata.WorkflowID + + o.mu.Lock() + defer o.mu.Unlock() + + inputs, err := o.ValidateInputs(req.Inputs) + if err != nil { + return err + } + triggerID := o.getTriggerID(inputs.TriggerID, wid) + + subscriber, ok := o.subscribers[triggerID] + if !ok { + return fmt.Errorf("triggerId %s not registered", triggerID) + } + close(subscriber.ch) + delete(o.subscribers, triggerID) + return nil +} + +func (o *MockTriggerService) getTriggerID(triggerID string, wid string) string { + tid := wid + "|" + triggerID + return tid +} + +func (o *MockTriggerService) loop() { + defer o.wg.Done() + now := time.Now().UnixMilli() + nextWait := o.tickerResolutionMs - now%o.tickerResolutionMs + + for { + select { + case <-o.stopCh: + return + case <-time.After(time.Duration(nextWait) * time.Millisecond): + startTs := time.Now().UnixMilli() + // find closest timestamp that is a multiple of o.tickerResolutionMs + aligned := (startTs + o.tickerResolutionMs/2) / o.tickerResolutionMs * o.tickerResolutionMs + o.process(aligned) + endTs := time.Now().UnixMilli() + if endTs-startTs > o.tickerResolutionMs { + o.lggr.Errorw("processing took longer than ticker resolution", "duration", endTs-startTs, "tickerResolutionMs", o.tickerResolutionMs) + } + nextWait = getNextWaitIntervalMs(aligned, o.tickerResolutionMs, endTs) + } + } +} + +func getNextWaitIntervalMs(lastTs, tickerResolutionMs, currentTs int64) int64 { + desiredNext := lastTs + tickerResolutionMs + nextWait := desiredNext - currentTs + if nextWait <= 0 { + nextWait = 0 + } + return nextWait +} + +func (o *MockTriggerService) process(timestamp int64) { + o.mu.Lock() + defer o.mu.Unlock() + for _, sub := range o.subscribers { + if timestamp%int64(sub.config.MaxFrequencyMs) == 0 { + reportList := make([]datastreams.FeedReport, 0) + for _, feedID := range sub.config.FeedIDs { + if latest, ok := o.latestReports[datastreams.FeedID(feedID)]; ok { + reportList = append(reportList, latest) + } + } + + // use 32-byte-padded timestamp as EventID (human-readable) + eventID := fmt.Sprintf("streams_%024s", strconv.FormatInt(timestamp, 10)) + // --- + // sign reports with mock signers + for i := range reportList { + report := reportList[i] + sigData := append(crypto.Keccak256(report.FullReport), report.ReportContext...) + hash := crypto.Keccak256(sigData) + for n := 0; n < o.meta.MinRequiredSignatures; n++ { + sig, err := crypto.Sign(hash, o.signers[n]) + if err != nil { + panic(err) + } + reportList[i].Signatures = append(reportList[i].Signatures, sig) + } + } + // --- + capabilityResponse, err := wrapReports(reportList, eventID, timestamp, o.meta) + + if err != nil { + o.lggr.Errorw("error wrapping reports", "err", err) + continue + } + + o.lggr.Debugw("ProcessReport pushing event", "nReports", len(reportList), "eventID", eventID) + select { + case sub.ch <- capabilityResponse: + default: + o.lggr.Errorw("subscriber channel full, dropping event", "eventID", eventID, "workflowID", sub.workflowID) + } + } + } +} + +func wrapReports(reportList []datastreams.FeedReport, eventID string, timestamp int64, meta datastreams.SignersMetadata) (capabilities.CapabilityResponse, error) { + val, err := values.Wrap(reportList) + if err != nil { + return capabilities.CapabilityResponse{}, err + } + + metaVal, err := values.Wrap(meta) + if err != nil { + return capabilities.CapabilityResponse{}, err + } + + triggerEvent := capabilities.TriggerEvent{ + TriggerType: triggerID, + ID: eventID, + Timestamp: strconv.FormatInt(timestamp, 10), + Metadata: metaVal, + Payload: val, + } + + eventVal, err := values.Wrap(triggerEvent) + if err != nil { + return capabilities.CapabilityResponse{}, err + } + + // Create a new CapabilityResponse with the MockTriggerEvent + return capabilities.CapabilityResponse{ + Value: eventVal.(*values.Map), + }, nil +} + +func (o *MockTriggerService) Start(ctx context.Context) error { + o.wg.Add(1) + go o.loop() + o.lggr.Info("MockTriggerService started") + return nil +} + +func (o *MockTriggerService) Close() error { + close(o.stopCh) + o.wg.Wait() + o.lggr.Info("MockTriggerService closed") + return nil +} + +func (o *MockTriggerService) Ready() error { + return nil +} + +func (o *MockTriggerService) HealthReport() map[string]error { + return nil +} + +func (o *MockTriggerService) Name() string { + return "MockTriggerService" +} + +type mockDataProducer struct { + trigger *MockTriggerService + wg sync.WaitGroup + closeCh chan struct{} + lggr logger.Logger +} + +var _ services.Service = &mockDataProducer{} + +func NewMockDataProducer(trigger *MockTriggerService, lggr logger.Logger) *mockDataProducer { + return &mockDataProducer{ + trigger: trigger, + closeCh: make(chan struct{}), + lggr: lggr, + } +} + +func (m *mockDataProducer) Start(ctx context.Context) error { + m.wg.Add(1) + go m.loop() + return nil +} + +func (m *mockDataProducer) loop() { + defer m.wg.Done() + + sleepSec := 15 + ticker := time.NewTicker(time.Duration(sleepSec) * time.Second) + defer ticker.Stop() + + prices := []int64{300000, 40000, 5000000} + + j := 0 + + for range ticker.C { + for i := range prices { + prices[i] = prices[i] + 1 + } + j++ + + // https://github.com/smartcontractkit/chainlink/blob/41f9428c3aa8231e8834a230fca4c2ccffd4e6c3/core/capabilities/streams/trigger_test.go#L117-L122 + + timestamp := time.Now().Unix() + // TODO: shouldn't we increment round rather than epoch? + reportCtx := ocrTypes.ReportContext{ReportTimestamp: ocrTypes.ReportTimestamp{Epoch: uint32(baseTimestamp + j)}} + + reports := []datastreams.FeedReport{ + { + FeedID: "0x1111111111111111111100000000000000000000000000000000000000000000", + FullReport: newReport(m.lggr, common.HexToHash("0x1111111111111111111100000000000000000000000000000000000000000000"), big.NewInt(prices[0]), timestamp), + ReportContext: rawReportContext(reportCtx), + ObservationTimestamp: timestamp, + }, + { + FeedID: "0x2222222222222222222200000000000000000000000000000000000000000000", + FullReport: newReport(m.lggr, common.HexToHash("0x2222222222222222222200000000000000000000000000000000000000000000"), big.NewInt(prices[1]), timestamp), + ReportContext: rawReportContext(reportCtx), + ObservationTimestamp: timestamp, + }, + { + FeedID: "0x3333333333333333333300000000000000000000000000000000000000000000", + FullReport: newReport(m.lggr, common.HexToHash("0x3333333333333333333300000000000000000000000000000000000000000000"), big.NewInt(prices[2]), timestamp), + ReportContext: rawReportContext(reportCtx), + ObservationTimestamp: timestamp, + }, + } + + m.lggr.Infow("New set of Mock reports", "timestamp", time.Now().Unix(), "payload", reports) + err := m.trigger.ProcessReport(reports) + if err != nil { + m.lggr.Errorw("failed to process Mock reports", "err", err, "timestamp", time.Now().Unix(), "payload", reports) + } + } +} + +func (m *mockDataProducer) Close() error { + close(m.closeCh) + m.wg.Wait() + return nil +} + +func (m *mockDataProducer) HealthReport() map[string]error { + return nil +} + +func (m *mockDataProducer) Ready() error { + return nil +} + +func (m *mockDataProducer) Name() string { + return "mockDataProducer" +} diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index 6a381b1ffa8..cb19b0a49c7 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -30,6 +30,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" + capStreams "github.com/smartcontractkit/chainlink/v2/core/capabilities/streams" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" @@ -206,6 +207,13 @@ func NewApplication(opts ApplicationOpts) (Application, error) { opts.CapabilitiesRegistry = capabilities.NewRegistry(globalLogger) } + // Use a recurring trigger with mock data for testing purposes + // TODO: proper component shutdown via srvcs() + _, err := capStreams.RegisterMockTrigger(globalLogger, opts.CapabilitiesRegistry) + if err != nil { + return nil, err + } + var externalPeerWrapper p2ptypes.PeerWrapper if cfg.Capabilities().Peering().Enabled() { var dispatcher remotetypes.Dispatcher From 8920dc57724509e49a3b79b528f0618da6b6f493 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bla=C5=BE=20Hrastnik?= Date: Tue, 30 Jul 2024 02:30:20 +0900 Subject: [PATCH 2/5] capabilities: Set up a dummy registry if peering is disabled --- core/capabilities/registry.go | 30 ++++++++++++++++++++++++++ core/services/chainlink/application.go | 3 +++ 2 files changed, 33 insertions(+) diff --git a/core/capabilities/registry.go b/core/capabilities/registry.go index 4da51a27b6b..6b4b2aecd6d 100644 --- a/core/capabilities/registry.go +++ b/core/capabilities/registry.go @@ -8,6 +8,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink/v2/core/logger" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" "github.com/smartcontractkit/chainlink/v2/core/services/registrysyncer" ) @@ -199,3 +200,32 @@ func NewRegistry(lggr logger.Logger) *Registry { lggr: lggr.Named("CapabilitiesRegistry"), } } + +type mockMetadataRegistry struct{} + +func (r *mockMetadataRegistry) LocalNode(ctx context.Context) (capabilities.Node, error) { + pid := p2ptypes.PeerID([32]byte{}) + + return capabilities.Node{ + PeerID: &pid, + WorkflowDON: capabilities.DON{ + ID: 0, + ConfigVersion: 0, + Members: []p2ptypes.PeerID{}, + F: 1, + IsPublic: true, + AcceptsWorkflows: true, + }, + CapabilityDONs: []capabilities.DON{}, + }, nil +} + +func (r *mockMetadataRegistry) ConfigForCapability(ctx context.Context, capabilityID string, donID uint32) (registrysyncer.CapabilityConfiguration, error) { + return registrysyncer.CapabilityConfiguration{ + Config: []byte{}, + }, nil +} + +func NewMockMetadataRegistry() metadataRegistry { + return &mockMetadataRegistry{} +} diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index cb19b0a49c7..4227afd9ef6 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -266,6 +266,9 @@ func NewApplication(opts ApplicationOpts) (Application, error) { srvcs = append(srvcs, wfLauncher, registrySyncer) } + } else { + // If registry syncer is not set up we use a dummy local registry so that local capabilities can still be used + opts.CapabilitiesRegistry.SetLocalRegistry(capabilities.NewMockMetadataRegistry()) } // LOOPs can be created as options, in the case of LOOP relayers, or From 22ae40407c1102d93566692cb3e7583fe5654788 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bla=C5=BE=20Hrastnik?= Date: Fri, 2 Aug 2024 23:17:00 +0900 Subject: [PATCH 3/5] capabilities: Only start/stop the mock producer when capability is in use --- core/capabilities/streams/mock_trigger.go | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/core/capabilities/streams/mock_trigger.go b/core/capabilities/streams/mock_trigger.go index 387ee65bd01..995dfda0c15 100644 --- a/core/capabilities/streams/mock_trigger.go +++ b/core/capabilities/streams/mock_trigger.go @@ -42,11 +42,6 @@ func RegisterMockTrigger(lggr logger.Logger, capRegistry core.CapabilitiesRegist return nil, err } - producer := NewMockDataProducer(trigger, lggr) - if err := producer.Start(ctx); err != nil { - return nil, err - } - return trigger, nil } @@ -117,8 +112,9 @@ type MockTriggerService struct { lggr logger.Logger // - meta datastreams.SignersMetadata - signers []*ecdsa.PrivateKey + meta datastreams.SignersMetadata + signers []*ecdsa.PrivateKey + producer *mockDataProducer // } @@ -214,6 +210,13 @@ func (o *MockTriggerService) RegisterTrigger(ctx context.Context, req capabiliti workflowID: wid, config: *config, } + + // Only start the producer once a workflow is registered + o.producer = NewMockDataProducer(o, o.lggr) + if err := o.producer.Start(ctx); err != nil { + return nil, err + } + return ch, nil } @@ -235,6 +238,12 @@ func (o *MockTriggerService) UnregisterTrigger(ctx context.Context, req capabili } close(subscriber.ch) delete(o.subscribers, triggerID) + if len(o.subscribers) == 0 { + if err := o.producer.Close(); err != nil { + return err + } + o.producer = nil + } return nil } From 8df8dc9ad7748e5af223b9f3a470ccf90d3291ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bla=C5=BE=20Hrastnik?= Date: Thu, 8 Aug 2024 19:24:12 +0900 Subject: [PATCH 4/5] capabilities: Use configured feedIDs for mock trigger events --- core/capabilities/streams/mock_trigger.go | 29 ++++++++--------------- 1 file changed, 10 insertions(+), 19 deletions(-) diff --git a/core/capabilities/streams/mock_trigger.go b/core/capabilities/streams/mock_trigger.go index 995dfda0c15..a029bb6e346 100644 --- a/core/capabilities/streams/mock_trigger.go +++ b/core/capabilities/streams/mock_trigger.go @@ -212,7 +212,7 @@ func (o *MockTriggerService) RegisterTrigger(ctx context.Context, req capabiliti } // Only start the producer once a workflow is registered - o.producer = NewMockDataProducer(o, o.lggr) + o.producer = NewMockDataProducer(o, config.FeedIDs, o.lggr) if err := o.producer.Start(ctx); err != nil { return nil, err } @@ -390,15 +390,17 @@ type mockDataProducer struct { trigger *MockTriggerService wg sync.WaitGroup closeCh chan struct{} + feedIDs []string lggr logger.Logger } var _ services.Service = &mockDataProducer{} -func NewMockDataProducer(trigger *MockTriggerService, lggr logger.Logger) *mockDataProducer { +func NewMockDataProducer(trigger *MockTriggerService, feedIDs []string, lggr logger.Logger) *mockDataProducer { return &mockDataProducer{ trigger: trigger, closeCh: make(chan struct{}), + feedIDs: feedIDs, lggr: lggr, } } @@ -432,25 +434,14 @@ func (m *mockDataProducer) loop() { // TODO: shouldn't we increment round rather than epoch? reportCtx := ocrTypes.ReportContext{ReportTimestamp: ocrTypes.ReportTimestamp{Epoch: uint32(baseTimestamp + j)}} - reports := []datastreams.FeedReport{ - { - FeedID: "0x1111111111111111111100000000000000000000000000000000000000000000", - FullReport: newReport(m.lggr, common.HexToHash("0x1111111111111111111100000000000000000000000000000000000000000000"), big.NewInt(prices[0]), timestamp), + reports := []datastreams.FeedReport{} + for _, feedID := range m.feedIDs { + reports = append(reports, datastreams.FeedReport{ + FeedID: feedID, + FullReport: newReport(m.lggr, common.HexToHash(feedID), big.NewInt(prices[0]), timestamp), ReportContext: rawReportContext(reportCtx), ObservationTimestamp: timestamp, - }, - { - FeedID: "0x2222222222222222222200000000000000000000000000000000000000000000", - FullReport: newReport(m.lggr, common.HexToHash("0x2222222222222222222200000000000000000000000000000000000000000000"), big.NewInt(prices[1]), timestamp), - ReportContext: rawReportContext(reportCtx), - ObservationTimestamp: timestamp, - }, - { - FeedID: "0x3333333333333333333300000000000000000000000000000000000000000000", - FullReport: newReport(m.lggr, common.HexToHash("0x3333333333333333333300000000000000000000000000000000000000000000"), big.NewInt(prices[2]), timestamp), - ReportContext: rawReportContext(reportCtx), - ObservationTimestamp: timestamp, - }, + }) } m.lggr.Infow("New set of Mock reports", "timestamp", time.Now().Unix(), "payload", reports) From c94d1570f3dbd5b0a7bdba6ce3b62a527c1e200d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bla=C5=BE=20Hrastnik?= Date: Mon, 12 Aug 2024 19:39:13 +0900 Subject: [PATCH 5/5] capabilities: Move signing into mock producer to minimize diff --- core/capabilities/streams/mock_trigger.go | 39 ++++++++++++----------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/core/capabilities/streams/mock_trigger.go b/core/capabilities/streams/mock_trigger.go index a029bb6e346..d4d32cc94d2 100644 --- a/core/capabilities/streams/mock_trigger.go +++ b/core/capabilities/streams/mock_trigger.go @@ -212,7 +212,7 @@ func (o *MockTriggerService) RegisterTrigger(ctx context.Context, req capabiliti } // Only start the producer once a workflow is registered - o.producer = NewMockDataProducer(o, config.FeedIDs, o.lggr) + o.producer = NewMockDataProducer(o, o.meta, o.signers, config.FeedIDs, o.lggr) if err := o.producer.Start(ctx); err != nil { return nil, err } @@ -298,21 +298,6 @@ func (o *MockTriggerService) process(timestamp int64) { // use 32-byte-padded timestamp as EventID (human-readable) eventID := fmt.Sprintf("streams_%024s", strconv.FormatInt(timestamp, 10)) - // --- - // sign reports with mock signers - for i := range reportList { - report := reportList[i] - sigData := append(crypto.Keccak256(report.FullReport), report.ReportContext...) - hash := crypto.Keccak256(sigData) - for n := 0; n < o.meta.MinRequiredSignatures; n++ { - sig, err := crypto.Sign(hash, o.signers[n]) - if err != nil { - panic(err) - } - reportList[i].Signatures = append(reportList[i].Signatures, sig) - } - } - // --- capabilityResponse, err := wrapReports(reportList, eventID, timestamp, o.meta) if err != nil { @@ -390,16 +375,20 @@ type mockDataProducer struct { trigger *MockTriggerService wg sync.WaitGroup closeCh chan struct{} + meta datastreams.SignersMetadata + signers []*ecdsa.PrivateKey feedIDs []string lggr logger.Logger } var _ services.Service = &mockDataProducer{} -func NewMockDataProducer(trigger *MockTriggerService, feedIDs []string, lggr logger.Logger) *mockDataProducer { +func NewMockDataProducer(trigger *MockTriggerService, meta datastreams.SignersMetadata, signers []*ecdsa.PrivateKey, feedIDs []string, lggr logger.Logger) *mockDataProducer { return &mockDataProducer{ trigger: trigger, closeCh: make(chan struct{}), + meta: meta, + signers: signers, feedIDs: feedIDs, lggr: lggr, } @@ -436,12 +425,24 @@ func (m *mockDataProducer) loop() { reports := []datastreams.FeedReport{} for _, feedID := range m.feedIDs { - reports = append(reports, datastreams.FeedReport{ + report := datastreams.FeedReport{ FeedID: feedID, FullReport: newReport(m.lggr, common.HexToHash(feedID), big.NewInt(prices[0]), timestamp), ReportContext: rawReportContext(reportCtx), ObservationTimestamp: timestamp, - }) + } + // sign report with mock signers + sigData := append(crypto.Keccak256(report.FullReport), report.ReportContext...) + hash := crypto.Keccak256(sigData) + for n := 0; n < m.meta.MinRequiredSignatures; n++ { + sig, err := crypto.Sign(hash, m.signers[n]) + if err != nil { + panic(err) + } + report.Signatures = append(report.Signatures, sig) + } + + reports = append(reports, report) } m.lggr.Infow("New set of Mock reports", "timestamp", time.Now().Unix(), "payload", reports)