diff --git a/.changeset/chatty-masks-divide.md b/.changeset/chatty-masks-divide.md new file mode 100644 index 00000000000..09d50e12f0f --- /dev/null +++ b/.changeset/chatty-masks-divide.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +#internal #bugfix keystone bugfixes diff --git a/contracts/.changeset/witty-onions-relate.md b/contracts/.changeset/witty-onions-relate.md new file mode 100644 index 00000000000..30d3da6df92 --- /dev/null +++ b/contracts/.changeset/witty-onions-relate.md @@ -0,0 +1,5 @@ +--- +'@chainlink/contracts': patch +--- + +#internal KeystoneFeedsConsumer bytes10 decoding bugfix diff --git a/contracts/src/v0.8/keystone/KeystoneFeedsConsumer.sol b/contracts/src/v0.8/keystone/KeystoneFeedsConsumer.sol index 1044c70d492..e886c6011dc 100644 --- a/contracts/src/v0.8/keystone/KeystoneFeedsConsumer.sol +++ b/contracts/src/v0.8/keystone/KeystoneFeedsConsumer.sol @@ -88,8 +88,8 @@ contract KeystoneFeedsConsumer is IReceiver, ConfirmedOwner { // workflow_owner // offset 74, size 20 // report_name // offset 94, size 2 assembly { - // shift right by 22 bytes to get the actual value - workflowName := shr(mul(22, 8), mload(add(metadata, 64))) + // no shifting needed for bytes10 type + workflowName := mload(add(metadata, 64)) // shift right by 12 bytes to get the actual value workflowOwner := shr(mul(12, 8), mload(add(metadata, 74))) } diff --git a/core/capabilities/remote/trigger_subscriber.go b/core/capabilities/remote/trigger_subscriber.go index e9704bf1865..b8c3d37a596 100644 --- a/core/capabilities/remote/trigger_subscriber.go +++ b/core/capabilities/remote/trigger_subscriber.go @@ -44,7 +44,7 @@ type triggerEventKey struct { } type subRegState struct { - callback chan<- commoncap.CapabilityResponse + callback chan commoncap.CapabilityResponse rawRequest []byte } @@ -103,14 +103,20 @@ func (s *triggerSubscriber) RegisterTrigger(ctx context.Context, request commonc s.mu.Lock() defer s.mu.Unlock() - callback := make(chan commoncap.CapabilityResponse, defaultSendChannelBufferSize) - s.registeredWorkflows[request.Metadata.WorkflowID] = &subRegState{ - callback: callback, - rawRequest: rawRequest, + s.lggr.Infow("RegisterTrigger called", "capabilityId", s.capInfo.ID, "donId", s.capDonInfo.ID, "workflowID", request.Metadata.WorkflowID) + regState, ok := s.registeredWorkflows[request.Metadata.WorkflowID] + if !ok { + regState = &subRegState{ + callback: make(chan commoncap.CapabilityResponse, defaultSendChannelBufferSize), + rawRequest: rawRequest, + } + s.registeredWorkflows[request.Metadata.WorkflowID] = regState + } else { + regState.rawRequest = rawRequest + s.lggr.Warnw("RegisterTrigger re-registering trigger", "capabilityId", s.capInfo.ID, "donId", s.capDonInfo.ID, "workflowID", request.Metadata.WorkflowID) } - s.lggr.Infow("RegisterTrigger called", "capabilityId", s.capInfo.ID, "donId", s.capDonInfo.ID, "workflowID", request.Metadata.WorkflowID) - return callback, nil + return regState.callback, nil } func (s *triggerSubscriber) registrationLoop() { @@ -124,6 +130,9 @@ func (s *triggerSubscriber) registrationLoop() { case <-ticker.C: s.mu.RLock() s.lggr.Infow("register trigger for remote capability", "capabilityId", s.capInfo.ID, "donId", s.capDonInfo.ID, "nMembers", len(s.capDonInfo.Members), "nWorkflows", len(s.registeredWorkflows)) + if len(s.registeredWorkflows) == 0 { + s.lggr.Infow("no workflows to register") + } for _, registration := range s.registeredWorkflows { // NOTE: send to all by default, introduce different strategies later (KS-76) for _, peerID := range s.capDonInfo.Members { @@ -149,7 +158,10 @@ func (s *triggerSubscriber) UnregisterTrigger(ctx context.Context, request commo s.mu.Lock() defer s.mu.Unlock() - close(s.registeredWorkflows[request.Metadata.WorkflowID].callback) + state := s.registeredWorkflows[request.Metadata.WorkflowID] + if state != nil && state.callback != nil { + close(state.callback) + } delete(s.registeredWorkflows, request.Metadata.WorkflowID) // Registrations will quickly expire on all remote nodes. // Alternatively, we could send UnregisterTrigger messages right away. diff --git a/core/capabilities/remote/trigger_subscriber_test.go b/core/capabilities/remote/trigger_subscriber_test.go index 4d251d49dc8..1ad360418fb 100644 --- a/core/capabilities/remote/trigger_subscriber_test.go +++ b/core/capabilities/remote/trigger_subscriber_test.go @@ -68,11 +68,12 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) { subscriber := remote.NewTriggerSubscriber(config, capInfo, capDonInfo, workflowDonInfo, dispatcher, nil, lggr) require.NoError(t, subscriber.Start(ctx)) - triggerEventCallbackCh, err := subscriber.RegisterTrigger(ctx, commoncap.CapabilityRequest{ + req := commoncap.CapabilityRequest{ Metadata: commoncap.RequestMetadata{ WorkflowID: workflowID1, }, - }) + } + triggerEventCallbackCh, err := subscriber.RegisterTrigger(ctx, req) require.NoError(t, err) <-awaitRegistrationMessageCh @@ -99,5 +100,7 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) { response := <-triggerEventCallbackCh require.Equal(t, response.Value, triggerEventValue) + require.NoError(t, subscriber.UnregisterTrigger(ctx, req)) + require.NoError(t, subscriber.UnregisterTrigger(ctx, req)) require.NoError(t, subscriber.Close()) } diff --git a/core/capabilities/targets/write_target.go b/core/capabilities/targets/write_target.go index 9eea8451fd9..97b88330ba3 100644 --- a/core/capabilities/targets/write_target.go +++ b/core/capabilities/targets/write_target.go @@ -91,7 +91,7 @@ func (cap *WriteTarget) Execute(ctx context.Context, request capabilities.Capabi return nil, err } - if inputs.Report == nil { + if len(inputs.Report) == 0 { // We received any empty report -- this means we should skip transmission. cap.lggr.Debugw("Skipping empty report", "request", request) return success(), nil diff --git a/core/capabilities/targets/write_target_test.go b/core/capabilities/targets/write_target_test.go index 6964a9617ea..5d77535bdcb 100644 --- a/core/capabilities/targets/write_target_test.go +++ b/core/capabilities/targets/write_target_test.go @@ -72,7 +72,7 @@ func TestWriteTarget(t *testing.T) { t.Run("succeeds with empty report", func(t *testing.T) { emptyInputs, err2 := values.NewMap(map[string]any{ "signed_report": map[string]any{ - "report": nil, + "report": []byte{}, }, "signatures": [][]byte{}, }) diff --git a/core/services/relay/evm/cap_encoder.go b/core/services/relay/evm/cap_encoder.go index 55cb34a90ac..ef78cb07b48 100644 --- a/core/services/relay/evm/cap_encoder.go +++ b/core/services/relay/evm/cap_encoder.go @@ -113,22 +113,22 @@ func prependMetadataFields(meta consensustypes.Metadata, userPayload []byte) ([] binary.BigEndian.PutUint32(cfgVersionBytes, meta.DONConfigVersion) result = append(result, cfgVersionBytes...) - // 5. Workflow ID / spec hash (32 bytes) + // 6. Workflow ID / spec hash (32 bytes) if result, err = decodeAndAppend(meta.WorkflowID, 32, result, "WorkflowID"); err != nil { return nil, err } - // 6. Workflow Name (10 bytes) + // 7. Workflow Name (10 bytes) if result, err = decodeAndAppend(meta.WorkflowName, 10, result, "WorkflowName"); err != nil { return nil, err } - // 7. Workflow Owner (20 bytes) + // 8. Workflow Owner (20 bytes) if result, err = decodeAndAppend(meta.WorkflowOwner, 20, result, "WorkflowOwner"); err != nil { return nil, err } - // 8. Report ID (2 bytes) + // 9. Report ID (2 bytes) if result, err = decodeAndAppend(meta.ReportID, 2, result, "ReportID"); err != nil { return nil, err }