-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CAPPL-19] Gateway Connector service wrapper #14356
Changes from 10 commits
e951bdb
d4ce0db
7784141
ac908dd
cb3088d
b5bafec
79d6e4c
a2d5ab6
be130db
ef63be6
2b66be0
6519107
493f94c
cfaf0ca
3b1e3ee
6b68bff
f945bd2
3971608
1cfa38f
5ac44fa
8f26d15
c3f5616
558f577
f2f8d9c
e119169
d572c97
5e0ae26
d4e0068
e256b2f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,151 @@ | ||||||
package gatewayconnector | ||||||
|
||||||
import ( | ||||||
"context" | ||||||
"crypto/ecdsa" | ||||||
"errors" | ||||||
"math/big" | ||||||
"slices" | ||||||
|
||||||
"github.com/ethereum/go-ethereum/common" | ||||||
"github.com/jonboulle/clockwork" | ||||||
|
||||||
"github.com/smartcontractkit/chainlink-common/pkg/services" | ||||||
"github.com/smartcontractkit/chainlink/v2/core/config" | ||||||
"github.com/smartcontractkit/chainlink/v2/core/logger" | ||||||
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api" | ||||||
gwCommon "github.com/smartcontractkit/chainlink/v2/core/services/gateway/common" | ||||||
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector" | ||||||
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/network" | ||||||
"github.com/smartcontractkit/chainlink/v2/core/services/keystore" | ||||||
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey" | ||||||
) | ||||||
|
||||||
type serviceWrapper struct { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I realized that the Wrapper also needs to have a getter for the underlying Connector object (we need to extract it once it's created). Please add it and also make ServiceWrapper public (uppercase) so we can pass it around in application.go. |
||||||
services.StateMachine | ||||||
|
||||||
config *config.GatewayConnector | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No pointer here - GatewayConnector in an interface. |
||||||
keystore keystore.Eth | ||||||
connector connector.GatewayConnector | ||||||
lggr logger.Logger | ||||||
} | ||||||
|
||||||
type connectorSigner struct { | ||||||
services.StateMachine | ||||||
|
||||||
connector connector.GatewayConnector | ||||||
signerKey *ecdsa.PrivateKey | ||||||
nodeAddress string | ||||||
lggr logger.Logger | ||||||
} | ||||||
|
||||||
var _ connector.Signer = &connectorSigner{} | ||||||
|
||||||
func NewConnectorSigner(config *config.GatewayConnector, signerKey *ecdsa.PrivateKey, lggr logger.Logger) (*connectorSigner, error) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No pointer needed for config. |
||||||
return &connectorSigner{ | ||||||
nodeAddress: (*config).NodeAddress(), | ||||||
signerKey: signerKey, | ||||||
lggr: lggr.Named("ConnectorSigner"), | ||||||
}, nil | ||||||
} | ||||||
|
||||||
func (h *connectorSigner) Sign(data ...[]byte) ([]byte, error) { | ||||||
return gwCommon.SignData(h.signerKey, data...) | ||||||
} | ||||||
|
||||||
func (h *connectorSigner) HandleGatewayMessage(ctx context.Context, gatewayID string, msg *api.Message) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this just meant to be a stub for the handler? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we leave the curly brackets off? Makes it more clear There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wait, no, this is not needed any more, you can remove this method There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Signer is just a signer. Handler will be added to the Connector object separately. |
||||||
} | ||||||
func (h *connectorSigner) Start(ctx context.Context) error { | ||||||
return h.StartOnce("ConnectorSigner", func() error { | ||||||
return nil | ||||||
}) | ||||||
} | ||||||
func (h *connectorSigner) Close() error { | ||||||
return h.StopOnce("ConnectorSigner", func() (err error) { | ||||||
return nil | ||||||
}) | ||||||
} | ||||||
|
||||||
func (h *connectorSigner) SetConnector(connector connector.GatewayConnector) { | ||||||
h.connector = connector | ||||||
} | ||||||
|
||||||
func translateConfigs(f config.GatewayConnector) connector.ConnectorConfig { | ||||||
r := connector.ConnectorConfig{} | ||||||
r.NodeAddress = f.NodeAddress() | ||||||
r.DonId = f.DonID() | ||||||
|
||||||
if len(f.Gateways()) != 0 { | ||||||
r.Gateways = make([]connector.ConnectorGatewayConfig, len(f.Gateways())) | ||||||
for index, element := range f.Gateways() { | ||||||
r.Gateways[index] = connector.ConnectorGatewayConfig{Id: element.ID(), URL: element.URL()} | ||||||
} | ||||||
} | ||||||
|
||||||
r.WsClientConfig = network.WebSocketClientConfig{HandshakeTimeoutMillis: f.WSHandshakeTimeoutMillis()} | ||||||
r.AuthMinChallengeLen = f.AuthMinChallengeLen() | ||||||
r.AuthTimestampToleranceSec = f.AuthTimestampToleranceSec() | ||||||
return r | ||||||
} | ||||||
|
||||||
// NOTE: this wrapper is needed to make sure that our services are started after Keystore. | ||||||
func NewGatewayConnectorServiceWrapper(config *config.GatewayConnector, keystore keystore.Eth, lggr logger.Logger) *serviceWrapper { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. GatewayConnector is an interface, so you don't need a pointer (*) |
||||||
return &serviceWrapper{ | ||||||
config: config, | ||||||
keystore: keystore, | ||||||
lggr: lggr, | ||||||
} | ||||||
} | ||||||
|
||||||
func (e *serviceWrapper) Start(ctx context.Context) error { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add a unit test that creates the ServiceWrapper object and then calls Start() can Close() on it, please? Ideally you'd test valid NodeAddress and an invalid one (i.e. key doesn't exit). |
||||||
return e.StartOnce("GatewayConnectorServiceWrapper", func() error { | ||||||
conf := *e.config | ||||||
e.lggr.Infow("Starting GatewayConnectorServiceWrapper", "chainID", conf.ChainIDForNodeKey()) | ||||||
chainID, _ := new(big.Int).SetString(conf.ChainIDForNodeKey(), 0) | ||||||
enabledKeys, err := e.keystore.EnabledKeysForChain(ctx, chainID) | ||||||
if err != nil { | ||||||
return err | ||||||
} | ||||||
if len(enabledKeys) == 0 { | ||||||
return errors.New("no available keys found") | ||||||
} | ||||||
configuredNodeAddress := common.HexToAddress(conf.NodeAddress()) | ||||||
idx := slices.IndexFunc(enabledKeys, func(key ethkey.KeyV2) bool { return key.Address == configuredNodeAddress }) | ||||||
if idx == -1 { | ||||||
return errors.New("key for configured node address not found") | ||||||
} | ||||||
signerKey := enabledKeys[idx].ToEcdsaPrivKey() | ||||||
if enabledKeys[idx].ID() != conf.NodeAddress() { | ||||||
return errors.New("node address mismatch") | ||||||
} | ||||||
|
||||||
signer, err := NewConnectorSigner(e.config, signerKey, e.lggr) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit:
Suggested change
|
||||||
if err != nil { | ||||||
return err | ||||||
} | ||||||
translated := translateConfigs(conf) | ||||||
e.connector, err = connector.NewGatewayConnector(&translated, signer, clockwork.NewRealClock(), e.lggr) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of using a real clock here, we may want to use a parameterized clock, such as keeping one on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @bolekk your thoughts? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. makes sense to me |
||||||
if err != nil { | ||||||
return err | ||||||
} | ||||||
return e.connector.Start(ctx) | ||||||
}) | ||||||
} | ||||||
|
||||||
func (e *serviceWrapper) Close() error { | ||||||
return e.StopOnce("GatewayConnectorServiceWrapper", func() (err error) { | ||||||
return e.connector.Close() | ||||||
}) | ||||||
} | ||||||
|
||||||
func (e *serviceWrapper) Ready() error { | ||||||
return nil | ||||||
} | ||||||
|
||||||
func (e *serviceWrapper) HealthReport() map[string]error { | ||||||
return nil | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To be thorough, how about filling this in from
Suggested change
|
||||||
} | ||||||
|
||||||
func (e *serviceWrapper) Name() string { | ||||||
return "GatewayConnectorServiceWrapper" | ||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
package gatewayconnector | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/ethereum/go-ethereum/common" | ||
"github.com/smartcontractkit/chainlink/v2/core/config/toml" | ||
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils" | ||
"github.com/smartcontractkit/chainlink/v2/core/logger" | ||
chainlink "github.com/smartcontractkit/chainlink/v2/core/services/chainlink" | ||
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey" | ||
ksmocks "github.com/smartcontractkit/chainlink/v2/core/services/keystore/mocks" | ||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
"github.com/test-go/testify/mock" | ||
) | ||
|
||
func generateConfig(addr common.Address) (chainlink.GeneralConfig, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can put more shared logic in this helper. Instead of returning config, return the whole Wrapper object with mocks already set. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, I'd wanted to do that but apparently misunderstood your earlier comment. |
||
return chainlink.GeneralConfigOpts{ | ||
Config: chainlink.Config{ | ||
Core: toml.Core{ | ||
Capabilities: toml.Capabilities{ | ||
GatewayConnector: toml.GatewayConnector{ | ||
ChainIDForNodeKey: ptr("1"), | ||
NodeAddress: ptr(addr.Hex()), | ||
DonID: ptr("5"), | ||
WSHandshakeTimeoutMillis: ptr[uint32](100), | ||
AuthMinChallengeLen: ptr[int](0), | ||
AuthTimestampToleranceSec: ptr[uint32](10), | ||
Gateways: []toml.ConnectorGateway{{ID: ptr("example_gateway"), URL: ptr("wss://localhost:8081/node")}}, | ||
}, | ||
}, | ||
}, | ||
}, | ||
}.New() | ||
} | ||
|
||
// Unit test that creates the ServiceWrapper object and then calls Start() can Close() on it. | ||
// Take inspiration from functions/plugin_test.go and functions/connector_handler_test.go on how to mock the dependencies. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can remove all these comments and instead name the tests:
|
||
// | ||
// Test valid NodeAddress and an invalid one (i.e. key doesn't exit). | ||
|
||
func TestGatewayConnectorServiceWrapper(t *testing.T) { | ||
t.Parallel() | ||
|
||
logger := logger.TestLogger(t) | ||
_, addr := testutils.NewPrivateKeyAndAddress(t) | ||
|
||
config, err := generateConfig(addr) | ||
ethKeystore := ksmocks.NewEth(t) | ||
ethKeystore.On("EnabledKeysForChain", mock.Anything).Return([]ethkey.KeyV2{{Address: addr}}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. EnabledKeysForChain takes two arguments, not one. |
||
|
||
gc := config.Capabilities().GatewayConnector() | ||
handler := NewGatewayConnectorServiceWrapper(&gc, ethKeystore, logger) | ||
require.NoError(t, err) | ||
|
||
t.Cleanup(func() { | ||
assert.NoError(t, handler.Close()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you can't have that and also call Close() manually below (Close will error if called more than once) - but please confirm. |
||
}) | ||
|
||
t.Run("Start & Stop Success", func(t *testing.T) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You don't really need this t.Run() section here, especially that it's only a single test case. |
||
ctx := testutils.Context(t) | ||
|
||
err := handler.Start(ctx) | ||
require.NoError(t, err) | ||
err = handler.Close() | ||
require.NoError(t, err) | ||
}) | ||
} | ||
|
||
func TestGatewayConnectorServiceWrapperConfigError(t *testing.T) { | ||
t.Parallel() | ||
|
||
logger := logger.TestLogger(t) | ||
_, addr := testutils.NewPrivateKeyAndAddress(t) | ||
|
||
config, err := generateConfig(addr) | ||
ethKeystore := ksmocks.NewEth(t) | ||
_, addr2 := testutils.NewPrivateKeyAndAddress(t) | ||
|
||
ethKeystore.On("EnabledKeysForChain", mock.Anything).Return([]ethkey.KeyV2{{Address: addr2}}) | ||
|
||
gc := config.Capabilities().GatewayConnector() | ||
handler := NewGatewayConnectorServiceWrapper(&gc, ethKeystore, logger) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should be named wrapper, not handler. |
||
require.NoError(t, err) | ||
|
||
t.Cleanup(func() { | ||
assert.NoError(t, handler.Close()) | ||
}) | ||
|
||
t.Run("Start Error", func(t *testing.T) { | ||
ctx := testutils.Context(t) | ||
err := handler.Start(ctx) | ||
require.Error(t, err) | ||
}) | ||
} | ||
|
||
func ptr[T any](t T) *T { return &t } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: stick to go conventional package naming
https://go.dev/blog/package-names