Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KS-421] Improve logging from remote capabilities #14058

Merged
merged 2 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/capabilities/remote/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (d *dispatcher) receive() {
receiver, ok := d.receivers[k]
d.mu.RUnlock()
if !ok {
d.lggr.Debugw("received message for unregistered capability", "capabilityId", k.capId, "donId", k.donId)
d.lggr.Debugw("received message for unregistered capability", "capabilityId", SanitizeLogString(k.capId), "donId", k.donId)
d.tryRespondWithError(msg.Sender, body, types.Error_CAPABILITY_NOT_FOUND)
continue
}
Expand Down
11 changes: 8 additions & 3 deletions core/capabilities/remote/target/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target/request"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
"github.com/smartcontractkit/chainlink/v2/core/logger"
Expand Down Expand Up @@ -151,7 +152,11 @@ func (c *client) Receive(ctx context.Context, msg *types.MessageBody) {
c.mutex.Lock()
defer c.mutex.Unlock()

messageID := GetMessageID(msg)
messageID, err := GetMessageID(msg)
if err != nil {
c.lggr.Errorw("invalid message ID", "err", err, "id", remote.SanitizeLogString(string(msg.MessageId)))
return
}

c.lggr.Debugw("Remote client target receiving message", "messageID", messageID)

Expand All @@ -167,8 +172,8 @@ func (c *client) Receive(ctx context.Context, msg *types.MessageBody) {
}

func GetMessageIDForRequest(req commoncap.CapabilityRequest) (string, error) {
if req.Metadata.WorkflowID == "" || req.Metadata.WorkflowExecutionID == "" {
return "", errors.New("workflow ID and workflow execution ID must be set in request metadata")
if !remote.IsValidWorkflowOrExecutionID(req.Metadata.WorkflowID) || !remote.IsValidWorkflowOrExecutionID(req.Metadata.WorkflowExecutionID) {
return "", errors.New("workflow ID and workflow execution ID in request metadata are invalid")
}

return req.Metadata.WorkflowID + req.Metadata.WorkflowExecutionID, nil
Expand Down
14 changes: 11 additions & 3 deletions core/capabilities/remote/target/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ import (
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
)

const (
workflowID1 = "15c631d295ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0"
workflowExecutionID1 = "95ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0abbadeed"
)

func Test_Client_DonTopologies(t *testing.T) {
ctx := testutils.Context(t)

Expand Down Expand Up @@ -192,8 +197,8 @@ func testClient(ctx context.Context, t *testing.T, numWorkflowPeers int, workflo
responseCh, err := caller.Execute(ctx,
commoncap.CapabilityRequest{
Metadata: commoncap.RequestMetadata{
WorkflowID: "workflowID",
WorkflowExecutionID: "workflowExecutionID",
WorkflowID: workflowID1,
WorkflowExecutionID: workflowExecutionID1,
},
Config: transmissionSchedule,
Inputs: executeInputs,
Expand Down Expand Up @@ -234,7 +239,10 @@ func (t *clientTestServer) Receive(_ context.Context, msg *remotetypes.MessageBo
defer t.mux.Unlock()

sender := toPeerID(msg.Sender)
messageID := target.GetMessageID(msg)
messageID, err := target.GetMessageID(msg)
if err != nil {
panic(err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: t.Fatal instead?

}

if t.messageIDToSenders[messageID] == nil {
t.messageIDToSenders[messageID] = make(map[p2ptypes.PeerID]bool)
Expand Down
4 changes: 2 additions & 2 deletions core/capabilities/remote/target/endtoend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,8 @@ func testRemoteTarget(ctx context.Context, t *testing.T, underlying commoncap.Ta
responseCh, err := caller.Execute(ctx,
commoncap.CapabilityRequest{
Metadata: commoncap.RequestMetadata{
WorkflowID: "workflowID",
WorkflowExecutionID: "workflowExecutionID",
WorkflowID: workflowID1,
WorkflowExecutionID: workflowExecutionID1,
},
Config: transmissionSchedule,
Inputs: executeInputs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (c *ClientRequest) OnMessage(_ context.Context, msg *types.MessageBody) err
}
}
} else {
c.lggr.Warnw("received error response", "error", msg.ErrorMsg)
c.lggr.Warnw("received error response", "error", remote.SanitizeLogString(msg.ErrorMsg))
c.errorCount[msg.ErrorMsg]++
if c.errorCount[msg.ErrorMsg] == c.requiredIdenticalResponses {
c.sendResponse(commoncap.CapabilityResponse{Err: errors.New(msg.ErrorMsg)})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ import (
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
)

const (
workflowID1 = "15c631d295ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0"
workflowExecutionID1 = "95ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0abbadeed"
)

func Test_ClientRequest_MessageValidation(t *testing.T) {
lggr := logger.TestLogger(t)

Expand Down Expand Up @@ -68,8 +73,8 @@ func Test_ClientRequest_MessageValidation(t *testing.T) {

capabilityRequest := commoncap.CapabilityRequest{
Metadata: commoncap.RequestMetadata{
WorkflowID: "workflowID",
WorkflowExecutionID: "workflowExecutionID",
WorkflowID: workflowID1,
WorkflowExecutionID: workflowExecutionID1,
},
Inputs: executeInputs,
Config: transmissionSchedule,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (e *ServerRequest) executeRequest(ctx context.Context, payload []byte) erro
return fmt.Errorf("failed to marshal capability response: %w", err)
}

e.lggr.Debugw("received execution results", "metadata", capabilityRequest.Metadata, "error", capResponse.Err)
e.lggr.Debugw("received execution results", "workflowExecutionID", capabilityRequest.Metadata.WorkflowExecutionID, "error", capResponse.Err)
e.setResult(responsePayload)
return nil
}
Expand Down
25 changes: 18 additions & 7 deletions core/capabilities/remote/target/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target/request"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
Expand Down Expand Up @@ -129,9 +130,14 @@ func (r *server) Receive(ctx context.Context, msg *types.MessageBody) {
r.receiveLock.Lock()
defer r.receiveLock.Unlock()

r.lggr.Debugw("received request for msg", "msgId", msg.MessageId)
if msg.Method != types.MethodExecute {
r.lggr.Errorw("received request for unsupported method type", "method", msg.Method)
r.lggr.Errorw("received request for unsupported method type", "method", remote.SanitizeLogString(msg.Method))
return
}

messageId, err := GetMessageID(msg)
if err != nil {
r.lggr.Errorw("invalid message id", "err", err, "id", remote.SanitizeLogString(string(msg.MessageId)))
return
}

Expand All @@ -143,9 +149,10 @@ func (r *server) Receive(ctx context.Context, msg *types.MessageBody) {

// A request is uniquely identified by the message id and the hash of the payload to prevent a malicious
// actor from sending a different payload with the same message id
messageId := GetMessageID(msg)
requestID := messageId + hex.EncodeToString(msgHash[:])

r.lggr.Debugw("received request", "msgId", msg.MessageId, "requestID", requestID)

if requestIDs, ok := r.messageIDToRequestIDsCount[messageId]; ok {
requestIDs[requestID] = requestIDs[requestID] + 1
} else {
Expand All @@ -156,7 +163,7 @@ func (r *server) Receive(ctx context.Context, msg *types.MessageBody) {
if len(requestIDs) > 1 {
// This is a potential attack vector as well as a situation that will occur if the client is sending non-deterministic payloads
// so a warning is logged
r.lggr.Warnw("received messages with the same id and different payloads", "messageID", messageId, "requestIDToCount", requestIDs)
r.lggr.Warnw("received messages with the same id and different payloads", "messageID", messageId, "lenRequestIDs", len(requestIDs))
}

if _, ok := r.requestIDToRequest[requestID]; !ok {
Expand All @@ -177,7 +184,7 @@ func (r *server) Receive(ctx context.Context, msg *types.MessageBody) {

err = reqAndMsgID.request.OnMessage(ctx, msg)
if err != nil {
r.lggr.Errorw("request failed to OnMessage new message", "request", reqAndMsgID, "err", err)
r.lggr.Errorw("request failed to OnMessage new message", "messageID", reqAndMsgID.messageID, "err", err)
}
}

Expand All @@ -201,8 +208,12 @@ func (r *server) getMessageHash(msg *types.MessageBody) ([32]byte, error) {
return hash, nil
}

func GetMessageID(msg *types.MessageBody) string {
return string(msg.MessageId)
func GetMessageID(msg *types.MessageBody) (string, error) {
idStr := string(msg.MessageId)
if !remote.IsValidID(idStr) {
return "", fmt.Errorf("invalid message id")
}
return idStr, nil
}

func (r *server) Ready() error {
Expand Down
16 changes: 8 additions & 8 deletions core/capabilities/remote/target/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func Test_Server_ExcludesNonDeterministicInputAttributes(t *testing.T) {
_, err = caller.Execute(context.Background(),
commoncap.CapabilityRequest{
Metadata: commoncap.RequestMetadata{
WorkflowID: "workflowID",
WorkflowExecutionID: "workflowExecutionID",
WorkflowID: workflowID1,
WorkflowExecutionID: workflowExecutionID1,
},
Inputs: inputs,
})
Expand All @@ -67,8 +67,8 @@ func Test_Server_RespondsAfterSufficientRequests(t *testing.T) {
_, err := caller.Execute(context.Background(),
commoncap.CapabilityRequest{
Metadata: commoncap.RequestMetadata{
WorkflowID: "workflowID",
WorkflowExecutionID: "workflowExecutionID",
WorkflowID: workflowID1,
WorkflowExecutionID: workflowExecutionID1,
},
})
require.NoError(t, err)
Expand All @@ -94,8 +94,8 @@ func Test_Server_InsufficientCallers(t *testing.T) {
_, err := caller.Execute(context.Background(),
commoncap.CapabilityRequest{
Metadata: commoncap.RequestMetadata{
WorkflowID: "workflowID",
WorkflowExecutionID: "workflowExecutionID",
WorkflowID: workflowID1,
WorkflowExecutionID: workflowExecutionID1,
},
})
require.NoError(t, err)
Expand All @@ -121,8 +121,8 @@ func Test_Server_CapabilityError(t *testing.T) {
_, err := caller.Execute(context.Background(),
commoncap.CapabilityRequest{
Metadata: commoncap.RequestMetadata{
WorkflowID: "workflowID",
WorkflowExecutionID: "workflowExecutionID",
WorkflowID: workflowID1,
WorkflowExecutionID: workflowExecutionID1,
},
})
require.NoError(t, err)
Expand Down
6 changes: 5 additions & 1 deletion core/capabilities/remote/trigger_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ func (p *triggerPublisher) Receive(_ context.Context, msg *types.MessageBody) {
p.lggr.Errorw("sender not a member of its workflow DON", "capabilityId", p.capInfo.ID, "callerDonId", msg.CallerDonId, "sender", sender)
return
}
if !IsValidWorkflowOrExecutionID(req.Metadata.WorkflowID) {
p.lggr.Errorw("received trigger request with invalid workflow ID", "capabilityId", p.capInfo.ID, "workflowId", SanitizeLogString(req.Metadata.WorkflowID))
return
}
p.lggr.Debugw("received trigger registration", "capabilityId", p.capInfo.ID, "workflowId", req.Metadata.WorkflowID, "sender", sender)
key := registrationKey{msg.CallerDonId, req.Metadata.WorkflowID}
nowMs := time.Now().UnixMilli()
Expand Down Expand Up @@ -145,7 +149,7 @@ func (p *triggerPublisher) Receive(_ context.Context, msg *types.MessageBody) {
p.lggr.Errorw("failed to register trigger", "capabilityId", p.capInfo.ID, "workflowId", req.Metadata.WorkflowID, "err", err)
}
} else {
p.lggr.Errorw("received trigger request with unknown method", "method", msg.Method, "sender", sender)
p.lggr.Errorw("received trigger request with unknown method", "method", SanitizeLogString(msg.Method), "sender", sender)
}
}

Expand Down
4 changes: 2 additions & 2 deletions core/capabilities/remote/trigger_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (s *triggerSubscriber) Receive(_ context.Context, msg *types.MessageBody) {
registration, found := s.registeredWorkflows[workflowId]
s.mu.RUnlock()
if !found {
s.lggr.Errorw("received message for unregistered workflow", "capabilityId", s.capInfo.ID, "workflowID", workflowId, "sender", sender)
s.lggr.Errorw("received message for unregistered workflow", "capabilityId", s.capInfo.ID, "workflowID", SanitizeLogString(workflowId), "sender", sender)
continue
}
key := triggerEventKey{
Expand Down Expand Up @@ -217,7 +217,7 @@ func (s *triggerSubscriber) Receive(_ context.Context, msg *types.MessageBody) {
}
}
} else {
s.lggr.Errorw("received trigger event with unknown method", "method", msg.Method, "sender", sender)
s.lggr.Errorw("received trigger event with unknown method", "method", SanitizeLogString(msg.Method), "sender", sender)
}
}

Expand Down
5 changes: 2 additions & 3 deletions core/capabilities/remote/trigger_subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink-common/pkg/values"
Expand All @@ -22,7 +21,7 @@ import (
const (
peerID1 = "12D3KooWF3dVeJ6YoT5HFnYhmwQWWMoEwVFzJQ5kKCMX3ZityxMC"
peerID2 = "12D3KooWQsmok6aD8PZqt3RnJhQRrNzKHLficq7zYFRp7kZ1hHP8"
workflowID1 = "workflowID1"
workflowID1 = "15c631d295ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0"
)

var (
Expand Down Expand Up @@ -63,7 +62,7 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) {
})

// register trigger
config := &capabilities.RemoteTriggerConfig{
config := &commoncap.RemoteTriggerConfig{
RegistrationRefresh: 100 * time.Millisecond,
RegistrationExpiry: 100 * time.Second,
MinResponsesToAggregate: 1,
Expand Down
43 changes: 43 additions & 0 deletions core/capabilities/remote/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"unicode"

"google.golang.org/protobuf/proto"

Expand All @@ -16,6 +17,12 @@ import (
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
)

const (
maxLoggedStringLen = 256
validWorkflowIDLen = 64
maxIDLen = 128
)

func ValidateMessage(msg p2ptypes.Message, expectedReceiver p2ptypes.PeerID) (*remotetypes.MessageBody, error) {
var topLevelMessage remotetypes.Message
err := proto.Unmarshal(msg.Payload, &topLevelMessage)
Expand Down Expand Up @@ -93,3 +100,39 @@ func AggregateModeRaw(elemList [][]byte, minIdenticalResponses uint32) ([]byte,
}
return found, nil
}

func SanitizeLogString(s string) string {
tooLongSuffix := ""
if len(s) > maxLoggedStringLen {
s = s[:maxLoggedStringLen]
tooLongSuffix = " [TRUNCATED]"
}
for i := 0; i < len(s); i++ {
if !unicode.IsPrint(rune(s[i])) {
return "[UNPRINTABLE] " + hex.EncodeToString([]byte(s)) + tooLongSuffix
}
}
return s + tooLongSuffix
}

// Workflow IDs and Execution IDs are 32-byte hex-encoded strings
func IsValidWorkflowOrExecutionID(id string) bool {
if len(id) != validWorkflowIDLen {
return false
}
_, err := hex.DecodeString(id)
return err == nil
}

// Trigger event IDs and message IDs can only contain printable characters and must be non-empty
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, is there a reason why we can't have the same ID validation as above? Namely, could Trigger event IDs and message IDs also be 32-byte hex-encoded strings?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently trigger event IDs are human readable, e.g. "streams_123456890" and I think that makes it debug-friendly. As for Message IDs we could probably impose some extra restriction ...

func IsValidID(id string) bool {
if len(id) == 0 || len(id) > maxIDLen {
return false
}
for i := 0; i < len(id); i++ {
if !unicode.IsPrint(rune(id[i])) {
return false
}
}
return true
}
23 changes: 23 additions & 0 deletions core/capabilities/remote/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,26 @@ func TestDefaultModeAggregator_Aggregate(t *testing.T) {
require.NoError(t, err)
require.Equal(t, res, capResponse1)
}

func TestSanitizeLogString(t *testing.T) {
require.Equal(t, "hello", remote.SanitizeLogString("hello"))
require.Equal(t, "[UNPRINTABLE] 0a", remote.SanitizeLogString("\n"))

longString := ""
for i := 0; i < 100; i++ {
longString += "aa-aa-aa-"
}
require.Equal(t, longString[:256]+" [TRUNCATED]", remote.SanitizeLogString(longString))
}

func TestIsValidWorkflowID(t *testing.T) {
require.False(t, remote.IsValidWorkflowOrExecutionID("too_short"))
require.False(t, remote.IsValidWorkflowOrExecutionID("nothex--95ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0"))
require.True(t, remote.IsValidWorkflowOrExecutionID("15c631d295ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0"))
}

func TestIsValidTriggerEventID(t *testing.T) {
require.False(t, remote.IsValidID(""))
require.False(t, remote.IsValidID("\n\n"))
require.True(t, remote.IsValidID("id_id_2"))
}
Loading