Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ks 296 dispatcher context and threading #13551

Merged
merged 1 commit into from
Jun 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/rare-carpets-cry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#internal capability dispatcher threading and context usage
83 changes: 67 additions & 16 deletions core/capabilities/remote/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
sync "sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/protobuf/proto"

"github.com/smartcontractkit/chainlink-common/pkg/services"
Expand All @@ -24,7 +26,7 @@ type dispatcher struct {
peerID p2ptypes.PeerID
signer p2ptypes.Signer
registry core.CapabilitiesRegistry
receivers map[key]remotetypes.Receiver
receivers map[key]*receiver
mu sync.RWMutex
stopCh services.StopChan
wg sync.WaitGroup
Expand All @@ -45,7 +47,7 @@ func NewDispatcher(peerWrapper p2ptypes.PeerWrapper, signer p2ptypes.Signer, reg
peerWrapper: peerWrapper,
signer: signer,
registry: registry,
receivers: make(map[key]remotetypes.Receiver),
receivers: make(map[key]*receiver),
stopCh: make(services.StopChan),
lggr: lggr.Named("Dispatcher"),
}
Expand All @@ -58,29 +60,79 @@ func (d *dispatcher) Start(ctx context.Context) error {
return fmt.Errorf("peer is not initialized")
}
d.wg.Add(1)
go d.receive()
go func() {
defer d.wg.Done()
d.receive()
}()

d.lggr.Info("dispatcher started")
return nil
}

func (d *dispatcher) SetReceiver(capabilityId string, donId string, receiver remotetypes.Receiver) error {
func (d *dispatcher) Close() error {
close(d.stopCh)
d.wg.Wait()
d.lggr.Info("dispatcher closed")
return nil
}

var capReceiveChannelUsage = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "capability_receive_channel_usage",
ettec marked this conversation as resolved.
Show resolved Hide resolved
Help: "The usage of the receive channel for each capability, 0 indicates empty, 1 indicates full.",
}, []string{"capabilityId", "donId"})

const receiverBufferSize = 10000

type receiver struct {
cancel context.CancelFunc
ch chan *remotetypes.MessageBody
}

func (d *dispatcher) SetReceiver(capabilityId string, donId string, rec remotetypes.Receiver) error {
d.mu.Lock()
defer d.mu.Unlock()
k := key{capabilityId, donId}
_, ok := d.receivers[k]
if ok {
return fmt.Errorf("receiver already exists for capability %s and don %s", capabilityId, donId)
}
d.receivers[k] = receiver

receiverCh := make(chan *remotetypes.MessageBody, receiverBufferSize)

ctx, cancelCtx := d.stopCh.NewCtx()
d.wg.Add(1)
go func() {
defer cancelCtx()
defer d.wg.Done()
for {
select {
case <-ctx.Done():
return
case msg := <-receiverCh:
rec.Receive(ctx, msg)
}
}
}()

d.receivers[k] = &receiver{
cancel: cancelCtx,
ch: receiverCh,
}

d.lggr.Debugw("receiver set", "capabilityId", capabilityId, "donId", donId)
return nil
}

func (d *dispatcher) RemoveReceiver(capabilityId string, donId string) {
d.mu.Lock()
defer d.mu.Unlock()
delete(d.receivers, key{capabilityId, donId})
d.lggr.Debugw("receiver removed", "capabilityId", capabilityId, "donId", donId)

receiverKey := key{capabilityId, donId}
if receiver, ok := d.receivers[receiverKey]; ok {
receiver.cancel()
delete(d.receivers, receiverKey)
d.lggr.Debugw("receiver removed", "capabilityId", capabilityId, "donId", donId)
}
}

func (d *dispatcher) Send(peerID p2ptypes.PeerID, msgBody *remotetypes.MessageBody) error {
Expand All @@ -105,7 +157,6 @@ func (d *dispatcher) Send(peerID p2ptypes.PeerID, msgBody *remotetypes.MessageBo
}

func (d *dispatcher) receive() {
defer d.wg.Done()
recvCh := d.peer.Receive()
for {
select {
Expand All @@ -128,7 +179,14 @@ func (d *dispatcher) receive() {
d.tryRespondWithError(msg.Sender, body, types.Error_CAPABILITY_NOT_FOUND)
continue
}
receiver.Receive(body)

receiverQueueUsage := float64(len(receiver.ch)) / receiverBufferSize
capReceiveChannelUsage.WithLabelValues(k.capId, k.donId).Set(receiverQueueUsage)
select {
case receiver.ch <- body:
default:
d.lggr.Warnw("receiver channel full, dropping message", "capabilityId", k.capId, "donId", k.donId)
}
}
}
}
Expand All @@ -150,13 +208,6 @@ func (d *dispatcher) tryRespondWithError(peerID p2ptypes.PeerID, body *remotetyp
}
}

func (d *dispatcher) Close() error {
close(d.stopCh)
d.wg.Wait()
d.lggr.Info("dispatcher closed")
return nil
}

func (d *dispatcher) Ready() error {
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion core/capabilities/remote/dispatcher_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package remote_test

import (
"context"
"errors"
"testing"

Expand All @@ -26,7 +27,7 @@ func newReceiver() *testReceiver {
}
}

func (r *testReceiver) Receive(msg *remotetypes.MessageBody) {
func (r *testReceiver) Receive(_ context.Context, msg *remotetypes.MessageBody) {
r.ch <- msg
}

Expand Down
9 changes: 2 additions & 7 deletions core/capabilities/remote/target/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,7 @@ func (c *client) Execute(ctx context.Context, capReq commoncap.CapabilityRequest
return nil, fmt.Errorf("request for message ID %s already exists", messageID)
}

// TODO confirm reasons for below workaround and see if can be resolved
// The context passed in by the workflow engine is cancelled prior to the results being read from the response channel
// The wrapping of the context with 'WithoutCancel' is a workaround for that behaviour.
requestCtx := context.WithoutCancel(ctx)
req, err := request.NewClientRequest(requestCtx, c.lggr, capReq, messageID, c.remoteCapabilityInfo, c.localDONInfo, c.dispatcher,
req, err := request.NewClientRequest(ctx, c.lggr, capReq, messageID, c.remoteCapabilityInfo, c.localDONInfo, c.dispatcher,
c.requestTimeout)
if err != nil {
return nil, fmt.Errorf("failed to create client request: %w", err)
Expand All @@ -149,10 +145,9 @@ func (c *client) Execute(ctx context.Context, capReq commoncap.CapabilityRequest
return req.ResponseChan(), nil
}

func (c *client) Receive(msg *types.MessageBody) {
func (c *client) Receive(ctx context.Context, msg *types.MessageBody) {
c.mutex.Lock()
defer c.mutex.Unlock()
ctx, _ := c.stopCh.NewCtx()

messageID := GetMessageID(msg)

Expand Down
6 changes: 3 additions & 3 deletions core/capabilities/remote/target/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func testClient(ctx context.Context, t *testing.T, numWorkflowPeers int, workflo
ID: "workflow-don",
}

broker := newTestAsyncMessageBroker(100)
broker := newTestAsyncMessageBroker(t, 100)

receivers := make([]remotetypes.Receiver, numCapabilityPeers)
for i := 0; i < numCapabilityPeers; i++ {
Expand Down Expand Up @@ -229,7 +229,7 @@ func newTestServer(peerID p2ptypes.PeerID, dispatcher remotetypes.Dispatcher, wo
}
}

func (t *clientTestServer) Receive(msg *remotetypes.MessageBody) {
func (t *clientTestServer) Receive(_ context.Context, msg *remotetypes.MessageBody) {
t.mux.Lock()
defer t.mux.Unlock()

Expand Down Expand Up @@ -297,7 +297,7 @@ func NewTestDispatcher() *TestDispatcher {
}

func (t *TestDispatcher) SendToReceiver(msgBody *remotetypes.MessageBody) {
t.receiver.Receive(msgBody)
t.receiver.Receive(context.Background(), msgBody)
}

func (t *TestDispatcher) SetReceiver(capabilityId string, donId string, receiver remotetypes.Receiver) error {
Expand Down
10 changes: 7 additions & 3 deletions core/capabilities/remote/target/endtoend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target"
remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
Expand Down Expand Up @@ -215,7 +216,7 @@ func testRemoteTarget(ctx context.Context, t *testing.T, underlying commoncap.Ta
F: workflowDonF,
}

broker := newTestAsyncMessageBroker(1000)
broker := newTestAsyncMessageBroker(t, 1000)

workflowDONs := map[string]commoncap.DON{
workflowDonInfo.ID: workflowDonInfo,
Expand Down Expand Up @@ -276,6 +277,8 @@ func testRemoteTarget(ctx context.Context, t *testing.T, underlying commoncap.Ta

type testAsyncMessageBroker struct {
services.StateMachine
t *testing.T

nodes map[p2ptypes.PeerID]remotetypes.Receiver

sendCh chan *remotetypes.MessageBody
Expand All @@ -292,8 +295,9 @@ func (a *testAsyncMessageBroker) Name() string {
return "testAsyncMessageBroker"
}

func newTestAsyncMessageBroker(sendChBufferSize int) *testAsyncMessageBroker {
func newTestAsyncMessageBroker(t *testing.T, sendChBufferSize int) *testAsyncMessageBroker {
return &testAsyncMessageBroker{
t: t,
nodes: make(map[p2ptypes.PeerID]remotetypes.Receiver),
stopCh: make(services.StopChan),
sendCh: make(chan *remotetypes.MessageBody, sendChBufferSize),
Expand All @@ -318,7 +322,7 @@ func (a *testAsyncMessageBroker) Start(ctx context.Context) error {
panic("server not found for peer id")
}

receiver.Receive(msg)
receiver.Receive(tests.Context(a.t), msg)
}
}
}()
Expand Down
6 changes: 1 addition & 5 deletions core/capabilities/remote/target/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,7 @@ func (r *server) expireRequests() {
}
}

// Receive handles incoming messages from remote nodes and dispatches them to the corresponding request.
func (r *server) Receive(msg *types.MessageBody) {
func (r *server) Receive(ctx context.Context, msg *types.MessageBody) {
r.receiveLock.Lock()
defer r.receiveLock.Unlock()

Expand Down Expand Up @@ -135,9 +134,6 @@ func (r *server) Receive(msg *types.MessageBody) {

req := r.requestIDToRequest[requestID]

// TODO context should be received from the dispatcher here - pending KS-296
ctx, cancel := r.stopCh.NewCtx()
defer cancel()
err := req.OnMessage(ctx, msg)
if err != nil {
r.lggr.Errorw("request failed to OnMessage new message", "request", req, "err", err)
Expand Down
4 changes: 2 additions & 2 deletions core/capabilities/remote/target/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func testRemoteTargetServer(ctx context.Context, t *testing.T,
}

var srvcs []services.Service
broker := newTestAsyncMessageBroker(1000)
broker := newTestAsyncMessageBroker(t, 1000)
err := broker.Start(context.Background())
require.NoError(t, err)
srvcs = append(srvcs, broker)
Expand Down Expand Up @@ -183,7 +183,7 @@ type serverTestClient struct {
callerDonID string
}

func (r *serverTestClient) Receive(msg *remotetypes.MessageBody) {
func (r *serverTestClient) Receive(_ context.Context, msg *remotetypes.MessageBody) {
r.receivedMessages <- msg
}

Expand Down
2 changes: 1 addition & 1 deletion core/capabilities/remote/trigger_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (p *triggerPublisher) Start(ctx context.Context) error {
return nil
}

func (p *triggerPublisher) Receive(msg *types.MessageBody) {
func (p *triggerPublisher) Receive(_ context.Context, msg *types.MessageBody) {
sender := ToPeerID(msg.Sender)
if msg.Method == types.MethodRegisterTrigger {
req, err := pb.UnmarshalCapabilityRequest(msg.Payload)
Expand Down
2 changes: 1 addition & 1 deletion core/capabilities/remote/trigger_publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestTriggerPublisher_Register(t *testing.T) {
CallerDonId: workflowDonInfo.ID,
Payload: marshaled,
}
publisher.Receive(regEvent)
publisher.Receive(ctx, regEvent)
forwarded := <-underlying.registrationsCh
require.Equal(t, capRequest.Metadata.WorkflowID, forwarded.Metadata.WorkflowID)

Expand Down
2 changes: 1 addition & 1 deletion core/capabilities/remote/trigger_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (s *triggerSubscriber) UnregisterTrigger(ctx context.Context, request commo
return nil
}

func (s *triggerSubscriber) Receive(msg *types.MessageBody) {
func (s *triggerSubscriber) Receive(_ context.Context, msg *types.MessageBody) {
sender := ToPeerID(msg.Sender)
if _, found := s.capDonMembers[sender]; !found {
s.lggr.Errorw("received message from unexpected node", "capabilityId", s.capInfo.ID, "sender", sender)
Expand Down
2 changes: 1 addition & 1 deletion core/capabilities/remote/trigger_subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) {
},
Payload: marshaled,
}
subscriber.Receive(triggerEvent)
subscriber.Receive(ctx, triggerEvent)
response := <-triggerEventCallbackCh
require.Equal(t, response.Value, triggerEventValue)

Expand Down
8 changes: 5 additions & 3 deletions core/capabilities/remote/types/mocks/receiver.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion core/capabilities/remote/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package types

import (
"context"

commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
)
Expand All @@ -25,7 +27,7 @@ type Dispatcher interface {

//go:generate mockery --quiet --name Receiver --output ./mocks/ --case=underscore
type Receiver interface {
Receive(msg *MessageBody)
Receive(ctx context.Context, msg *MessageBody)
}

type Aggregator interface {
Expand Down
Loading