Skip to content

Commit

Permalink
[CAPPL-43] Wireup gateway to fetch (#14723)
Browse files Browse the repository at this point in the history
* chore: extract webapi connector_handler to outgoing_connector_handler

* feat: wireup gateway to fetch

* fix: import libocr correct version

* feat: HandleGatewayMessage for compute and handleComputeActionMessage implementation

* chore: move OutgoingConnectorHandler to webapi pkg

* chore: extract fetcher creation to outgoingConnectorHandler

* fix: allow single execute call to make multiple requests without mixing results

* chore: dedup on HandleGatewayMessage

* fix: test & lint

* chore: rename handler type to web-api-outgoing

* chore: rename IDGenerator

* chore: move CreateFetcher to compute

* chore: move GetGatewayConnector

* chore: make createFetcher private

* chore: reuse TargetResponsePayload instead of FetchResponse

* chore: rename TargetResponsePayload and TargetRequestPayload

* fix: align with new sdk.FetchResponse update

* test: fix tests with new ExecutionError & ErrorMessage fields

* test: fix engine_test

* fix: gomodtidy dependencies

* chore: switch back rename of WebAPICapabilitiesType

* test: add handler coverage

* fix: use handleWebAPIOutgoingMessage to handle both Target and Compute

* chore: clean unused handleWebAPITargetMessage

* chore: remove IDGenerator as a config parameter and pass it down on NewAction

* chore: validMethod implementation

* test: fix engine test

* test: fix test linting errors
  • Loading branch information
agparadiso authored Oct 22, 2024
1 parent f38b8aa commit 1636059
Show file tree
Hide file tree
Showing 15 changed files with 565 additions and 146 deletions.
75 changes: 67 additions & 8 deletions core/capabilities/compute/compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package compute
import (
"context"
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
"strings"
"time"

"github.com/google/uuid"
Expand All @@ -18,6 +20,9 @@ import (
coretypes "github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host"
wasmpb "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/pb"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/validation"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi"
ghcapabilities "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities"
)

const (
Expand Down Expand Up @@ -68,7 +73,9 @@ type Compute struct {
registry coretypes.CapabilitiesRegistry
modules *moduleCache

transformer ConfigTransformer
transformer ConfigTransformer
outgoingConnectorHandler *webapi.OutgoingConnectorHandler
idGenerator func() string
}

func (c *Compute) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error {
Expand Down Expand Up @@ -104,7 +111,7 @@ func (c *Compute) Execute(ctx context.Context, request capabilities.CapabilityRe

m, ok := c.modules.get(id)
if !ok {
mod, err := c.initModule(id, cfg.ModuleConfig, cfg.Binary, request.Metadata.WorkflowID, request.Metadata.ReferenceID)
mod, err := c.initModule(id, cfg.ModuleConfig, cfg.Binary, request.Metadata.WorkflowID, request.Metadata.WorkflowExecutionID, request.Metadata.ReferenceID)
if err != nil {
return capabilities.CapabilityResponse{}, err
}
Expand All @@ -115,8 +122,10 @@ func (c *Compute) Execute(ctx context.Context, request capabilities.CapabilityRe
return c.executeWithModule(m.module, cfg.Config, request)
}

func (c *Compute) initModule(id string, cfg *host.ModuleConfig, binary []byte, workflowID, referenceID string) (*module, error) {
func (c *Compute) initModule(id string, cfg *host.ModuleConfig, binary []byte, workflowID, workflowExecutionID, referenceID string) (*module, error) {
initStart := time.Now()

cfg.Fetch = c.createFetcher(workflowID, workflowExecutionID)
mod, err := host.NewModule(cfg, binary)
if err != nil {
return nil, fmt.Errorf("failed to instantiate WASM module: %w", err)
Expand Down Expand Up @@ -186,12 +195,62 @@ func (c *Compute) Close() error {
return nil
}

func NewAction(log logger.Logger, registry coretypes.CapabilitiesRegistry) *Compute {
func (c *Compute) createFetcher(workflowID, workflowExecutionID string) func(req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error) {
return func(req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error) {
if err := validation.ValidateWorkflowOrExecutionID(workflowID); err != nil {
return nil, fmt.Errorf("workflow ID %q is invalid: %w", workflowID, err)
}
if err := validation.ValidateWorkflowOrExecutionID(workflowExecutionID); err != nil {
return nil, fmt.Errorf("workflow execution ID %q is invalid: %w", workflowExecutionID, err)
}

messageID := strings.Join([]string{
workflowID,
workflowExecutionID,
ghcapabilities.MethodComputeAction,
c.idGenerator(),
}, "/")

fields := req.Headers.GetFields()
headersReq := make(map[string]string, len(fields))
for k, v := range fields {
headersReq[k] = v.String()
}

payloadBytes, err := json.Marshal(ghcapabilities.Request{
URL: req.Url,
Method: req.Method,
Headers: headersReq,
Body: req.Body,
TimeoutMs: req.TimeoutMs,
})
if err != nil {
return nil, fmt.Errorf("failed to marshal fetch request: %w", err)
}

resp, err := c.outgoingConnectorHandler.HandleSingleNodeRequest(context.Background(), messageID, payloadBytes)
if err != nil {
return nil, err
}

c.log.Debugw("received gateway response", "resp", resp)
var response wasmpb.FetchResponse
err = json.Unmarshal(resp.Body.Payload, &response)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal fetch response: %w", err)
}
return &response, nil
}
}

func NewAction(config webapi.ServiceConfig, log logger.Logger, registry coretypes.CapabilitiesRegistry, handler *webapi.OutgoingConnectorHandler, idGenerator func() string) *Compute {
compute := &Compute{
log: logger.Named(log, "CustomCompute"),
registry: registry,
modules: newModuleCache(clockwork.NewRealClock(), 1*time.Minute, 10*time.Minute, 3),
transformer: NewTransformer(),
log: logger.Named(log, "CustomCompute"),
registry: registry,
modules: newModuleCache(clockwork.NewRealClock(), 1*time.Minute, 10*time.Minute, 3),
transformer: NewTransformer(),
outgoingConnectorHandler: handler,
idGenerator: idGenerator,
}
return compute
}
181 changes: 150 additions & 31 deletions core/capabilities/compute/compute_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package compute

import (
"context"
"encoding/json"
"strings"
"testing"

"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/capabilities"
Expand All @@ -14,30 +18,72 @@ import (
cappkg "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
"github.com/smartcontractkit/chainlink-common/pkg/values"
corecapabilities "github.com/smartcontractkit/chainlink/v2/core/capabilities"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api"
gcmocks "github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector/mocks"
ghcapabilities "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common"
)

func Test_Compute_Start_AddsToRegistry(t *testing.T) {
log := logger.TestLogger(t)
registry := capabilities.NewRegistry(log)

compute := NewAction(log, registry)
compute.modules.clock = clockwork.NewFakeClock()
const (
fetchBinaryLocation = "test/fetch/cmd/testmodule.wasm"
fetchBinaryCmd = "core/capabilities/compute/test/fetch/cmd"
validRequestUUID = "d2fe6db9-beb4-47c9-b2d6-d3065ace111e"
)

require.NoError(t, compute.Start(tests.Context(t)))
var defaultConfig = webapi.ServiceConfig{
RateLimiter: common.RateLimiterConfig{
GlobalRPS: 100.0,
GlobalBurst: 100,
PerSenderRPS: 100.0,
PerSenderBurst: 100,
},
}

cp, err := registry.Get(tests.Context(t), CapabilityIDCompute)
require.NoError(t, err)
assert.Equal(t, compute, cp)
type testHarness struct {
registry *corecapabilities.Registry
connector *gcmocks.GatewayConnector
log logger.Logger
config webapi.ServiceConfig
connectorHandler *webapi.OutgoingConnectorHandler
compute *Compute
}

func Test_Compute_Execute_MissingConfig(t *testing.T) {
func setup(t *testing.T, config webapi.ServiceConfig) testHarness {
log := logger.TestLogger(t)
registry := capabilities.NewRegistry(log)
connector := gcmocks.NewGatewayConnector(t)
idGeneratorFn := func() string { return validRequestUUID }
connectorHandler, err := webapi.NewOutgoingConnectorHandler(connector, config, ghcapabilities.MethodComputeAction, log)
require.NoError(t, err)

compute := NewAction(log, registry)
compute := NewAction(config, log, registry, connectorHandler, idGeneratorFn)
compute.modules.clock = clockwork.NewFakeClock()

require.NoError(t, compute.Start(tests.Context(t)))
return testHarness{
registry: registry,
connector: connector,
log: log,
config: config,
connectorHandler: connectorHandler,
compute: compute,
}
}

func TestComputeStartAddsToRegistry(t *testing.T) {
th := setup(t, defaultConfig)

require.NoError(t, th.compute.Start(tests.Context(t)))

cp, err := th.registry.Get(tests.Context(t), CapabilityIDCompute)
require.NoError(t, err)
assert.Equal(t, th.compute, cp)
}

func TestComputeExecuteMissingConfig(t *testing.T) {
th := setup(t, defaultConfig)
require.NoError(t, th.compute.Start(tests.Context(t)))

binary := wasmtest.CreateTestBinary(binaryCmd, binaryLocation, true, t)

Expand All @@ -52,18 +98,14 @@ func Test_Compute_Execute_MissingConfig(t *testing.T) {
ReferenceID: "compute",
},
}
_, err = compute.Execute(tests.Context(t), req)
_, err = th.compute.Execute(tests.Context(t), req)
assert.ErrorContains(t, err, "invalid request: could not find \"config\" in map")
}

func Test_Compute_Execute_MissingBinary(t *testing.T) {
log := logger.TestLogger(t)
registry := capabilities.NewRegistry(log)
func TestComputeExecuteMissingBinary(t *testing.T) {
th := setup(t, defaultConfig)

compute := NewAction(log, registry)
compute.modules.clock = clockwork.NewFakeClock()

require.NoError(t, compute.Start(tests.Context(t)))
require.NoError(t, th.compute.Start(tests.Context(t)))

config, err := values.WrapMap(map[string]any{
"config": []byte(""),
Expand All @@ -76,18 +118,14 @@ func Test_Compute_Execute_MissingBinary(t *testing.T) {
ReferenceID: "compute",
},
}
_, err = compute.Execute(tests.Context(t), req)
_, err = th.compute.Execute(tests.Context(t), req)
assert.ErrorContains(t, err, "invalid request: could not find \"binary\" in map")
}

func Test_Compute_Execute(t *testing.T) {
log := logger.TestLogger(t)
registry := capabilities.NewRegistry(log)

compute := NewAction(log, registry)
compute.modules.clock = clockwork.NewFakeClock()
func TestComputeExecute(t *testing.T) {
th := setup(t, defaultConfig)

require.NoError(t, compute.Start(tests.Context(t)))
require.NoError(t, th.compute.Start(tests.Context(t)))

binary := wasmtest.CreateTestBinary(binaryCmd, binaryLocation, true, t)

Expand All @@ -110,7 +148,7 @@ func Test_Compute_Execute(t *testing.T) {
ReferenceID: "compute",
},
}
resp, err := compute.Execute(tests.Context(t), req)
resp, err := th.compute.Execute(tests.Context(t), req)
assert.NoError(t, err)
assert.True(t, resp.Value.Underlying["Value"].(*values.Bool).Underlying)

Expand All @@ -132,7 +170,88 @@ func Test_Compute_Execute(t *testing.T) {
ReferenceID: "compute",
},
}
resp, err = compute.Execute(tests.Context(t), req)
resp, err = th.compute.Execute(tests.Context(t), req)
assert.NoError(t, err)
assert.False(t, resp.Value.Underlying["Value"].(*values.Bool).Underlying)
}

func TestComputeFetch(t *testing.T) {
workflowID := "15c631d295ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0"
workflowExecutionID := "95ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0abbadeed"
th := setup(t, defaultConfig)

th.connector.EXPECT().DonID().Return("don-id")
th.connector.EXPECT().GatewayIDs().Return([]string{"gateway1", "gateway2"})

msgID := strings.Join([]string{
workflowID,
workflowExecutionID,
ghcapabilities.MethodComputeAction,
validRequestUUID,
}, "/")

gatewayResp := gatewayResponse(t, msgID)
th.connector.On("SignAndSendToGateway", mock.Anything, "gateway1", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
th.connectorHandler.HandleGatewayMessage(context.Background(), "gateway1", gatewayResp)
}).Once()

require.NoError(t, th.compute.Start(tests.Context(t)))

binary := wasmtest.CreateTestBinary(fetchBinaryCmd, fetchBinaryLocation, true, t)

config, err := values.WrapMap(map[string]any{
"config": []byte(""),
"binary": binary,
})
require.NoError(t, err)

req := cappkg.CapabilityRequest{
Config: config,
Metadata: cappkg.RequestMetadata{
WorkflowID: workflowID,
WorkflowExecutionID: workflowExecutionID,
ReferenceID: "compute",
},
}

headers, err := values.NewMap(map[string]any{})
require.NoError(t, err)
expected := cappkg.CapabilityResponse{
Value: &values.Map{
Underlying: map[string]values.Value{
"Value": &values.Map{
Underlying: map[string]values.Value{
"Body": values.NewBytes([]byte("response body")),
"Headers": headers,
"StatusCode": values.NewInt64(200),
"ErrorMessage": values.NewString(""),
"ExecutionError": values.NewBool(false),
},
},
},
},
}

actual, err := th.compute.Execute(tests.Context(t), req)
require.NoError(t, err)
assert.EqualValues(t, expected, actual)
}

func gatewayResponse(t *testing.T, msgID string) *api.Message {
headers := map[string]string{"Content-Type": "application/json"}
body := []byte("response body")
responsePayload, err := json.Marshal(ghcapabilities.Response{
StatusCode: 200,
Headers: headers,
Body: body,
ExecutionError: false,
})
require.NoError(t, err)
return &api.Message{
Body: api.MessageBody{
MessageId: msgID,
Method: ghcapabilities.MethodComputeAction,
Payload: responsePayload,
},
}
}
Loading

0 comments on commit 1636059

Please sign in to comment.