diff --git a/core/capabilities/remote/dispatcher.go b/core/capabilities/remote/dispatcher.go index c1ee5db2944..dab4f6c98bf 100644 --- a/core/capabilities/remote/dispatcher.go +++ b/core/capabilities/remote/dispatcher.go @@ -180,7 +180,7 @@ func (d *dispatcher) receive() { receiver, ok := d.receivers[k] d.mu.RUnlock() if !ok { - d.lggr.Debugw("received message for unregistered capability", "capabilityId", k.capId, "donId", k.donId) + d.lggr.Debugw("received message for unregistered capability", "capabilityId", SanitizeLogString(k.capId), "donId", k.donId) d.tryRespondWithError(msg.Sender, body, types.Error_CAPABILITY_NOT_FOUND) continue } diff --git a/core/capabilities/remote/target/client.go b/core/capabilities/remote/target/client.go index 5b65bf63e44..4273169d23e 100644 --- a/core/capabilities/remote/target/client.go +++ b/core/capabilities/remote/target/client.go @@ -9,6 +9,7 @@ import ( commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target/request" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -151,7 +152,11 @@ func (c *client) Receive(ctx context.Context, msg *types.MessageBody) { c.mutex.Lock() defer c.mutex.Unlock() - messageID := GetMessageID(msg) + messageID, err := GetMessageID(msg) + if err != nil { + c.lggr.Errorw("invalid message ID", "err", err, "id", remote.SanitizeLogString(string(msg.MessageId))) + return + } c.lggr.Debugw("Remote client target receiving message", "messageID", messageID) @@ -167,8 +172,8 @@ func (c *client) Receive(ctx context.Context, msg *types.MessageBody) { } func GetMessageIDForRequest(req commoncap.CapabilityRequest) (string, error) { - if req.Metadata.WorkflowID == "" || req.Metadata.WorkflowExecutionID == "" { - return "", errors.New("workflow ID and workflow execution ID must be set in request metadata") + if !remote.IsValidWorkflowOrExecutionID(req.Metadata.WorkflowID) || !remote.IsValidWorkflowOrExecutionID(req.Metadata.WorkflowExecutionID) { + return "", errors.New("workflow ID and workflow execution ID in request metadata are invalid") } return req.Metadata.WorkflowID + req.Metadata.WorkflowExecutionID, nil diff --git a/core/capabilities/remote/target/client_test.go b/core/capabilities/remote/target/client_test.go index 6d26b51b8ae..2198636a7a2 100644 --- a/core/capabilities/remote/target/client_test.go +++ b/core/capabilities/remote/target/client_test.go @@ -21,6 +21,11 @@ import ( p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" ) +const ( + workflowID1 = "15c631d295ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0" + workflowExecutionID1 = "95ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0abbadeed" +) + func Test_Client_DonTopologies(t *testing.T) { ctx := testutils.Context(t) @@ -192,8 +197,8 @@ func testClient(ctx context.Context, t *testing.T, numWorkflowPeers int, workflo responseCh, err := caller.Execute(ctx, commoncap.CapabilityRequest{ Metadata: commoncap.RequestMetadata{ - WorkflowID: "workflowID", - WorkflowExecutionID: "workflowExecutionID", + WorkflowID: workflowID1, + WorkflowExecutionID: workflowExecutionID1, }, Config: transmissionSchedule, Inputs: executeInputs, @@ -234,7 +239,10 @@ func (t *clientTestServer) Receive(_ context.Context, msg *remotetypes.MessageBo defer t.mux.Unlock() sender := toPeerID(msg.Sender) - messageID := target.GetMessageID(msg) + messageID, err := target.GetMessageID(msg) + if err != nil { + panic(err) + } if t.messageIDToSenders[messageID] == nil { t.messageIDToSenders[messageID] = make(map[p2ptypes.PeerID]bool) diff --git a/core/capabilities/remote/target/endtoend_test.go b/core/capabilities/remote/target/endtoend_test.go index cfab50f0fe7..31bdc83e266 100644 --- a/core/capabilities/remote/target/endtoend_test.go +++ b/core/capabilities/remote/target/endtoend_test.go @@ -261,8 +261,8 @@ func testRemoteTarget(ctx context.Context, t *testing.T, underlying commoncap.Ta responseCh, err := caller.Execute(ctx, commoncap.CapabilityRequest{ Metadata: commoncap.RequestMetadata{ - WorkflowID: "workflowID", - WorkflowExecutionID: "workflowExecutionID", + WorkflowID: workflowID1, + WorkflowExecutionID: workflowExecutionID1, }, Config: transmissionSchedule, Inputs: executeInputs, diff --git a/core/capabilities/remote/target/request/client_request.go b/core/capabilities/remote/target/request/client_request.go index 50a742c2188..0370fd229cf 100644 --- a/core/capabilities/remote/target/request/client_request.go +++ b/core/capabilities/remote/target/request/client_request.go @@ -170,7 +170,7 @@ func (c *ClientRequest) OnMessage(_ context.Context, msg *types.MessageBody) err } } } else { - c.lggr.Warnw("received error response", "error", msg.ErrorMsg) + c.lggr.Warnw("received error response", "error", remote.SanitizeLogString(msg.ErrorMsg)) c.errorCount[msg.ErrorMsg]++ if c.errorCount[msg.ErrorMsg] == c.requiredIdenticalResponses { c.sendResponse(commoncap.CapabilityResponse{Err: errors.New(msg.ErrorMsg)}) diff --git a/core/capabilities/remote/target/request/client_request_test.go b/core/capabilities/remote/target/request/client_request_test.go index 07f43dbc71f..7edb2f5e534 100644 --- a/core/capabilities/remote/target/request/client_request_test.go +++ b/core/capabilities/remote/target/request/client_request_test.go @@ -20,6 +20,11 @@ import ( p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" ) +const ( + workflowID1 = "15c631d295ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0" + workflowExecutionID1 = "95ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0abbadeed" +) + func Test_ClientRequest_MessageValidation(t *testing.T) { lggr := logger.TestLogger(t) @@ -68,8 +73,8 @@ func Test_ClientRequest_MessageValidation(t *testing.T) { capabilityRequest := commoncap.CapabilityRequest{ Metadata: commoncap.RequestMetadata{ - WorkflowID: "workflowID", - WorkflowExecutionID: "workflowExecutionID", + WorkflowID: workflowID1, + WorkflowExecutionID: workflowExecutionID1, }, Inputs: executeInputs, Config: transmissionSchedule, diff --git a/core/capabilities/remote/target/request/server_request.go b/core/capabilities/remote/target/request/server_request.go index b8ae05bc316..16e90a034bc 100644 --- a/core/capabilities/remote/target/request/server_request.go +++ b/core/capabilities/remote/target/request/server_request.go @@ -134,7 +134,7 @@ func (e *ServerRequest) executeRequest(ctx context.Context, payload []byte) erro return fmt.Errorf("failed to marshal capability response: %w", err) } - e.lggr.Debugw("received execution results", "metadata", capabilityRequest.Metadata, "error", capResponse.Err) + e.lggr.Debugw("received execution results", "workflowExecutionID", capabilityRequest.Metadata.WorkflowExecutionID, "error", capResponse.Err) e.setResult(responsePayload) return nil } diff --git a/core/capabilities/remote/target/server.go b/core/capabilities/remote/target/server.go index 39023ffb3fa..56cad3739b6 100644 --- a/core/capabilities/remote/target/server.go +++ b/core/capabilities/remote/target/server.go @@ -11,6 +11,7 @@ import ( commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target/request" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" @@ -129,9 +130,14 @@ func (r *server) Receive(ctx context.Context, msg *types.MessageBody) { r.receiveLock.Lock() defer r.receiveLock.Unlock() - r.lggr.Debugw("received request for msg", "msgId", msg.MessageId) if msg.Method != types.MethodExecute { - r.lggr.Errorw("received request for unsupported method type", "method", msg.Method) + r.lggr.Errorw("received request for unsupported method type", "method", remote.SanitizeLogString(msg.Method)) + return + } + + messageId, err := GetMessageID(msg) + if err != nil { + r.lggr.Errorw("invalid message id", "err", err, "id", remote.SanitizeLogString(string(msg.MessageId))) return } @@ -143,9 +149,10 @@ func (r *server) Receive(ctx context.Context, msg *types.MessageBody) { // A request is uniquely identified by the message id and the hash of the payload to prevent a malicious // actor from sending a different payload with the same message id - messageId := GetMessageID(msg) requestID := messageId + hex.EncodeToString(msgHash[:]) + r.lggr.Debugw("received request", "msgId", msg.MessageId, "requestID", requestID) + if requestIDs, ok := r.messageIDToRequestIDsCount[messageId]; ok { requestIDs[requestID] = requestIDs[requestID] + 1 } else { @@ -156,7 +163,7 @@ func (r *server) Receive(ctx context.Context, msg *types.MessageBody) { if len(requestIDs) > 1 { // This is a potential attack vector as well as a situation that will occur if the client is sending non-deterministic payloads // so a warning is logged - r.lggr.Warnw("received messages with the same id and different payloads", "messageID", messageId, "requestIDToCount", requestIDs) + r.lggr.Warnw("received messages with the same id and different payloads", "messageID", messageId, "lenRequestIDs", len(requestIDs)) } if _, ok := r.requestIDToRequest[requestID]; !ok { @@ -177,7 +184,7 @@ func (r *server) Receive(ctx context.Context, msg *types.MessageBody) { err = reqAndMsgID.request.OnMessage(ctx, msg) if err != nil { - r.lggr.Errorw("request failed to OnMessage new message", "request", reqAndMsgID, "err", err) + r.lggr.Errorw("request failed to OnMessage new message", "messageID", reqAndMsgID.messageID, "err", err) } } @@ -201,8 +208,12 @@ func (r *server) getMessageHash(msg *types.MessageBody) ([32]byte, error) { return hash, nil } -func GetMessageID(msg *types.MessageBody) string { - return string(msg.MessageId) +func GetMessageID(msg *types.MessageBody) (string, error) { + idStr := string(msg.MessageId) + if !remote.IsValidID(idStr) { + return "", fmt.Errorf("invalid message id") + } + return idStr, nil } func (r *server) Ready() error { diff --git a/core/capabilities/remote/target/server_test.go b/core/capabilities/remote/target/server_test.go index 2460a2dd0f7..505a2dcce5d 100644 --- a/core/capabilities/remote/target/server_test.go +++ b/core/capabilities/remote/target/server_test.go @@ -39,8 +39,8 @@ func Test_Server_ExcludesNonDeterministicInputAttributes(t *testing.T) { _, err = caller.Execute(context.Background(), commoncap.CapabilityRequest{ Metadata: commoncap.RequestMetadata{ - WorkflowID: "workflowID", - WorkflowExecutionID: "workflowExecutionID", + WorkflowID: workflowID1, + WorkflowExecutionID: workflowExecutionID1, }, Inputs: inputs, }) @@ -67,8 +67,8 @@ func Test_Server_RespondsAfterSufficientRequests(t *testing.T) { _, err := caller.Execute(context.Background(), commoncap.CapabilityRequest{ Metadata: commoncap.RequestMetadata{ - WorkflowID: "workflowID", - WorkflowExecutionID: "workflowExecutionID", + WorkflowID: workflowID1, + WorkflowExecutionID: workflowExecutionID1, }, }) require.NoError(t, err) @@ -94,8 +94,8 @@ func Test_Server_InsufficientCallers(t *testing.T) { _, err := caller.Execute(context.Background(), commoncap.CapabilityRequest{ Metadata: commoncap.RequestMetadata{ - WorkflowID: "workflowID", - WorkflowExecutionID: "workflowExecutionID", + WorkflowID: workflowID1, + WorkflowExecutionID: workflowExecutionID1, }, }) require.NoError(t, err) @@ -121,8 +121,8 @@ func Test_Server_CapabilityError(t *testing.T) { _, err := caller.Execute(context.Background(), commoncap.CapabilityRequest{ Metadata: commoncap.RequestMetadata{ - WorkflowID: "workflowID", - WorkflowExecutionID: "workflowExecutionID", + WorkflowID: workflowID1, + WorkflowExecutionID: workflowExecutionID1, }, }) require.NoError(t, err) diff --git a/core/capabilities/remote/trigger_publisher.go b/core/capabilities/remote/trigger_publisher.go index 146b8789689..b4d749754d4 100644 --- a/core/capabilities/remote/trigger_publisher.go +++ b/core/capabilities/remote/trigger_publisher.go @@ -102,6 +102,10 @@ func (p *triggerPublisher) Receive(_ context.Context, msg *types.MessageBody) { p.lggr.Errorw("sender not a member of its workflow DON", "capabilityId", p.capInfo.ID, "callerDonId", msg.CallerDonId, "sender", sender) return } + if !IsValidWorkflowOrExecutionID(req.Metadata.WorkflowID) { + p.lggr.Errorw("received trigger request with invalid workflow ID", "capabilityId", p.capInfo.ID, "workflowId", SanitizeLogString(req.Metadata.WorkflowID)) + return + } p.lggr.Debugw("received trigger registration", "capabilityId", p.capInfo.ID, "workflowId", req.Metadata.WorkflowID, "sender", sender) key := registrationKey{msg.CallerDonId, req.Metadata.WorkflowID} nowMs := time.Now().UnixMilli() @@ -145,7 +149,7 @@ func (p *triggerPublisher) Receive(_ context.Context, msg *types.MessageBody) { p.lggr.Errorw("failed to register trigger", "capabilityId", p.capInfo.ID, "workflowId", req.Metadata.WorkflowID, "err", err) } } else { - p.lggr.Errorw("received trigger request with unknown method", "method", msg.Method, "sender", sender) + p.lggr.Errorw("received trigger request with unknown method", "method", SanitizeLogString(msg.Method), "sender", sender) } } diff --git a/core/capabilities/remote/trigger_subscriber.go b/core/capabilities/remote/trigger_subscriber.go index 2d038e45c08..d957614886a 100644 --- a/core/capabilities/remote/trigger_subscriber.go +++ b/core/capabilities/remote/trigger_subscriber.go @@ -189,7 +189,7 @@ func (s *triggerSubscriber) Receive(_ context.Context, msg *types.MessageBody) { registration, found := s.registeredWorkflows[workflowId] s.mu.RUnlock() if !found { - s.lggr.Errorw("received message for unregistered workflow", "capabilityId", s.capInfo.ID, "workflowID", workflowId, "sender", sender) + s.lggr.Errorw("received message for unregistered workflow", "capabilityId", s.capInfo.ID, "workflowID", SanitizeLogString(workflowId), "sender", sender) continue } key := triggerEventKey{ @@ -217,7 +217,7 @@ func (s *triggerSubscriber) Receive(_ context.Context, msg *types.MessageBody) { } } } else { - s.lggr.Errorw("received trigger event with unknown method", "method", msg.Method, "sender", sender) + s.lggr.Errorw("received trigger event with unknown method", "method", SanitizeLogString(msg.Method), "sender", sender) } } diff --git a/core/capabilities/remote/trigger_subscriber_test.go b/core/capabilities/remote/trigger_subscriber_test.go index 2e34b03ec5c..c834a271d56 100644 --- a/core/capabilities/remote/trigger_subscriber_test.go +++ b/core/capabilities/remote/trigger_subscriber_test.go @@ -7,7 +7,6 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "github.com/smartcontractkit/chainlink-common/pkg/capabilities" commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" "github.com/smartcontractkit/chainlink-common/pkg/values" @@ -22,7 +21,7 @@ import ( const ( peerID1 = "12D3KooWF3dVeJ6YoT5HFnYhmwQWWMoEwVFzJQ5kKCMX3ZityxMC" peerID2 = "12D3KooWQsmok6aD8PZqt3RnJhQRrNzKHLficq7zYFRp7kZ1hHP8" - workflowID1 = "workflowID1" + workflowID1 = "15c631d295ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0" ) var ( @@ -63,7 +62,7 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) { }) // register trigger - config := &capabilities.RemoteTriggerConfig{ + config := &commoncap.RemoteTriggerConfig{ RegistrationRefresh: 100 * time.Millisecond, RegistrationExpiry: 100 * time.Second, MinResponsesToAggregate: 1, diff --git a/core/capabilities/remote/utils.go b/core/capabilities/remote/utils.go index dba24b843cc..10e4e3082c9 100644 --- a/core/capabilities/remote/utils.go +++ b/core/capabilities/remote/utils.go @@ -7,6 +7,7 @@ import ( "encoding/hex" "errors" "fmt" + "unicode" "google.golang.org/protobuf/proto" @@ -16,6 +17,12 @@ import ( p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" ) +const ( + maxLoggedStringLen = 256 + validWorkflowIDLen = 64 + maxIDLen = 128 +) + func ValidateMessage(msg p2ptypes.Message, expectedReceiver p2ptypes.PeerID) (*remotetypes.MessageBody, error) { var topLevelMessage remotetypes.Message err := proto.Unmarshal(msg.Payload, &topLevelMessage) @@ -93,3 +100,39 @@ func AggregateModeRaw(elemList [][]byte, minIdenticalResponses uint32) ([]byte, } return found, nil } + +func SanitizeLogString(s string) string { + tooLongSuffix := "" + if len(s) > maxLoggedStringLen { + s = s[:maxLoggedStringLen] + tooLongSuffix = " [TRUNCATED]" + } + for i := 0; i < len(s); i++ { + if !unicode.IsPrint(rune(s[i])) { + return "[UNPRINTABLE] " + hex.EncodeToString([]byte(s)) + tooLongSuffix + } + } + return s + tooLongSuffix +} + +// Workflow IDs and Execution IDs are 32-byte hex-encoded strings +func IsValidWorkflowOrExecutionID(id string) bool { + if len(id) != validWorkflowIDLen { + return false + } + _, err := hex.DecodeString(id) + return err == nil +} + +// Trigger event IDs and message IDs can only contain printable characters and must be non-empty +func IsValidID(id string) bool { + if len(id) == 0 || len(id) > maxIDLen { + return false + } + for i := 0; i < len(id); i++ { + if !unicode.IsPrint(rune(id[i])) { + return false + } + } + return true +} diff --git a/core/capabilities/remote/utils_test.go b/core/capabilities/remote/utils_test.go index 8bebf71fb66..177ab5a7d14 100644 --- a/core/capabilities/remote/utils_test.go +++ b/core/capabilities/remote/utils_test.go @@ -118,3 +118,26 @@ func TestDefaultModeAggregator_Aggregate(t *testing.T) { require.NoError(t, err) require.Equal(t, res, capResponse1) } + +func TestSanitizeLogString(t *testing.T) { + require.Equal(t, "hello", remote.SanitizeLogString("hello")) + require.Equal(t, "[UNPRINTABLE] 0a", remote.SanitizeLogString("\n")) + + longString := "" + for i := 0; i < 100; i++ { + longString += "aa-aa-aa-" + } + require.Equal(t, longString[:256]+" [TRUNCATED]", remote.SanitizeLogString(longString)) +} + +func TestIsValidWorkflowID(t *testing.T) { + require.False(t, remote.IsValidWorkflowOrExecutionID("too_short")) + require.False(t, remote.IsValidWorkflowOrExecutionID("nothex--95ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0")) + require.True(t, remote.IsValidWorkflowOrExecutionID("15c631d295ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0")) +} + +func TestIsValidTriggerEventID(t *testing.T) { + require.False(t, remote.IsValidID("")) + require.False(t, remote.IsValidID("\n\n")) + require.True(t, remote.IsValidID("id_id_2")) +}