Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Keystone] Minor bugfixes #13436

Merged
merged 2 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/chatty-masks-divide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#internal #bugfix keystone bugfixes
5 changes: 5 additions & 0 deletions contracts/.changeset/witty-onions-relate.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@chainlink/contracts': patch
---

#internal KeystoneFeedsConsumer bytes10 decoding bugfix
4 changes: 2 additions & 2 deletions contracts/src/v0.8/keystone/KeystoneFeedsConsumer.sol
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ contract KeystoneFeedsConsumer is IReceiver, ConfirmedOwner {
// workflow_owner // offset 74, size 20
// report_name // offset 94, size 2
assembly {
// shift right by 22 bytes to get the actual value
workflowName := shr(mul(22, 8), mload(add(metadata, 64)))
// no shifting needed for bytes10 type
workflowName := mload(add(metadata, 64))
Comment on lines +91 to +92
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right, I fixed this in the forwarder but didn't realize there's a matching snippet in the consumer

// shift right by 12 bytes to get the actual value
workflowOwner := shr(mul(12, 8), mload(add(metadata, 74)))
}
Expand Down
28 changes: 20 additions & 8 deletions core/capabilities/remote/trigger_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type triggerEventKey struct {
}

type subRegState struct {
callback chan<- commoncap.CapabilityResponse
callback chan commoncap.CapabilityResponse
rawRequest []byte
}

Expand Down Expand Up @@ -103,14 +103,20 @@ func (s *triggerSubscriber) RegisterTrigger(ctx context.Context, request commonc
s.mu.Lock()
defer s.mu.Unlock()

callback := make(chan commoncap.CapabilityResponse, defaultSendChannelBufferSize)
s.registeredWorkflows[request.Metadata.WorkflowID] = &subRegState{
callback: callback,
rawRequest: rawRequest,
s.lggr.Infow("RegisterTrigger called", "capabilityId", s.capInfo.ID, "donId", s.capDonInfo.ID, "workflowID", request.Metadata.WorkflowID)
regState, ok := s.registeredWorkflows[request.Metadata.WorkflowID]
if !ok {
regState = &subRegState{
callback: make(chan commoncap.CapabilityResponse, defaultSendChannelBufferSize),
rawRequest: rawRequest,
}
s.registeredWorkflows[request.Metadata.WorkflowID] = regState
} else {
regState.rawRequest = rawRequest
s.lggr.Warnw("RegisterTrigger re-registering trigger", "capabilityId", s.capInfo.ID, "donId", s.capDonInfo.ID, "workflowID", request.Metadata.WorkflowID)
}

s.lggr.Infow("RegisterTrigger called", "capabilityId", s.capInfo.ID, "donId", s.capDonInfo.ID, "workflowID", request.Metadata.WorkflowID)
return callback, nil
return regState.callback, nil
}

func (s *triggerSubscriber) registrationLoop() {
Expand All @@ -124,6 +130,9 @@ func (s *triggerSubscriber) registrationLoop() {
case <-ticker.C:
s.mu.RLock()
s.lggr.Infow("register trigger for remote capability", "capabilityId", s.capInfo.ID, "donId", s.capDonInfo.ID, "nMembers", len(s.capDonInfo.Members), "nWorkflows", len(s.registeredWorkflows))
if len(s.registeredWorkflows) == 0 {
s.lggr.Infow("no workflows to register")
}
for _, registration := range s.registeredWorkflows {
// NOTE: send to all by default, introduce different strategies later (KS-76)
for _, peerID := range s.capDonInfo.Members {
Expand All @@ -149,7 +158,10 @@ func (s *triggerSubscriber) UnregisterTrigger(ctx context.Context, request commo
s.mu.Lock()
defer s.mu.Unlock()

close(s.registeredWorkflows[request.Metadata.WorkflowID].callback)
state := s.registeredWorkflows[request.Metadata.WorkflowID]
if state != nil && state.callback != nil {
close(state.callback)
}
delete(s.registeredWorkflows, request.Metadata.WorkflowID)
krehermann marked this conversation as resolved.
Show resolved Hide resolved
// Registrations will quickly expire on all remote nodes.
// Alternatively, we could send UnregisterTrigger messages right away.
Expand Down
7 changes: 5 additions & 2 deletions core/capabilities/remote/trigger_subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,12 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) {
subscriber := remote.NewTriggerSubscriber(config, capInfo, capDonInfo, workflowDonInfo, dispatcher, nil, lggr)
require.NoError(t, subscriber.Start(ctx))

triggerEventCallbackCh, err := subscriber.RegisterTrigger(ctx, commoncap.CapabilityRequest{
req := commoncap.CapabilityRequest{
Metadata: commoncap.RequestMetadata{
WorkflowID: workflowID1,
},
})
}
triggerEventCallbackCh, err := subscriber.RegisterTrigger(ctx, req)
require.NoError(t, err)
<-awaitRegistrationMessageCh

Expand All @@ -99,5 +100,7 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) {
response := <-triggerEventCallbackCh
require.Equal(t, response.Value, triggerEventValue)

require.NoError(t, subscriber.UnregisterTrigger(ctx, req))
require.NoError(t, subscriber.UnregisterTrigger(ctx, req))
require.NoError(t, subscriber.Close())
}
2 changes: 1 addition & 1 deletion core/capabilities/targets/write_target.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (cap *WriteTarget) Execute(ctx context.Context, request capabilities.Capabi
return nil, err
}

if inputs.Report == nil {
if len(inputs.Report) == 0 {
// We received any empty report -- this means we should skip transmission.
cap.lggr.Debugw("Skipping empty report", "request", request)
return success(), nil
Expand Down
2 changes: 1 addition & 1 deletion core/capabilities/targets/write_target_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestWriteTarget(t *testing.T) {
t.Run("succeeds with empty report", func(t *testing.T) {
emptyInputs, err2 := values.NewMap(map[string]any{
"signed_report": map[string]any{
"report": nil,
"report": []byte{},
},
"signatures": [][]byte{},
})
Expand Down
8 changes: 4 additions & 4 deletions core/services/relay/evm/cap_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,22 +113,22 @@ func prependMetadataFields(meta consensustypes.Metadata, userPayload []byte) ([]
binary.BigEndian.PutUint32(cfgVersionBytes, meta.DONConfigVersion)
result = append(result, cfgVersionBytes...)

// 5. Workflow ID / spec hash (32 bytes)
// 6. Workflow ID / spec hash (32 bytes)
if result, err = decodeAndAppend(meta.WorkflowID, 32, result, "WorkflowID"); err != nil {
return nil, err
}

// 6. Workflow Name (10 bytes)
// 7. Workflow Name (10 bytes)
if result, err = decodeAndAppend(meta.WorkflowName, 10, result, "WorkflowName"); err != nil {
return nil, err
}

// 7. Workflow Owner (20 bytes)
// 8. Workflow Owner (20 bytes)
if result, err = decodeAndAppend(meta.WorkflowOwner, 20, result, "WorkflowOwner"); err != nil {
return nil, err
}

// 8. Report ID (2 bytes)
// 9. Report ID (2 bytes)
if result, err = decodeAndAppend(meta.ReportID, 2, result, "ReportID"); err != nil {
return nil, err
}
Expand Down
Loading