From 74866be06cc4fa6c51527f194f95785b950e352d Mon Sep 17 00:00:00 2001 From: Sam Davies Date: Wed, 31 Jan 2024 11:01:55 -0700 Subject: [PATCH] NATS poc --- .../relay/evm/mercury/nats_transmitter.go | 33 +++++++++++++++++++ .../services/relay/evm/mercury/transmitter.go | 12 ++++++- go.mod | 5 +++ go.sum | 11 +++++++ 4 files changed, 60 insertions(+), 1 deletion(-) create mode 100644 core/services/relay/evm/mercury/nats_transmitter.go diff --git a/core/services/relay/evm/mercury/nats_transmitter.go b/core/services/relay/evm/mercury/nats_transmitter.go new file mode 100644 index 00000000000..fd34ad20452 --- /dev/null +++ b/core/services/relay/evm/mercury/nats_transmitter.go @@ -0,0 +1,33 @@ +package mercury + +import ( + "context" + + "github.com/nats-io/nats.go" + + mercuryutils "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/utils" +) + +type natsTransmitter struct { + feedID mercuryutils.FeedID + url string + + conn *nats.Conn +} + +func newNATSTransmitter(feedID mercuryutils.FeedID, url string) *natsTransmitter { + return &natsTransmitter{feedID, url, nil} +} + +func (nt *natsTransmitter) Start(context.Context) error { + // TODO: advanced connect/dialler that uses context + // user: system + // password: nOS0OJbtBh4RUA1P5KJ5FQYLU6bl1Vso + nc, err := nats.Connect(nt.url, nats.UserInfo("system", "nOS0OJbtBh4RUA1P5KJ5FQYLU6bl1Vso")) + nt.conn = nc + return err +} +func (nt *natsTransmitter) Close() error { + nt.conn.Close() + return nil +} diff --git a/core/services/relay/evm/mercury/transmitter.go b/core/services/relay/evm/mercury/transmitter.go index 40a51b9d92d..69d50e18e83 100644 --- a/core/services/relay/evm/mercury/transmitter.go +++ b/core/services/relay/evm/mercury/transmitter.go @@ -127,6 +127,8 @@ type mercuryTransmitter struct { transmitQueueDeleteErrorCount prometheus.Counter transmitQueueInsertErrorCount prometheus.Counter transmitQueuePushErrorCount prometheus.Counter + + nt *natsTransmitter } var PayloadTypes = getPayloadTypes() @@ -171,6 +173,7 @@ func NewTransmitter(lggr logger.Logger, cfgTracker ConfigTracker, rpcClient wsrp transmitQueueDeleteErrorCount.WithLabelValues(feedIDHex), transmitQueueInsertErrorCount.WithLabelValues(feedIDHex), transmitQueuePushErrorCount.WithLabelValues(feedIDHex), + newNATSTransmitter(feedID, "nats://0.0.0.0:53474"), } } @@ -196,7 +199,7 @@ func (mt *mercuryTransmitter) Start(ctx context.Context) (err error) { go mt.runDeleteQueueLoop() mt.wg.Add(1) go mt.runQueueLoop() - return nil + return mt.nt.Start(ctx) }) } @@ -210,6 +213,7 @@ func (mt *mercuryTransmitter) Close() error { } close(mt.stopCh) mt.wg.Wait() + mt.nt.Close() return mt.rpcClient.Close() }) } @@ -353,6 +357,12 @@ func (mt *mercuryTransmitter) Transmit(ctx context.Context, reportCtx ocrtypes.R return pkgerrors.Wrap(err, "abi.Pack failed") } + // HACK: insert NATS transmission + p := []byte(fmt.Sprintf("report: 0x%x", payload)) + if err := mt.nt.conn.Publish(mt.feedID.Hex(), p); err != nil { + panic(err) + } + req := &pb.TransmitRequest{ Payload: payload, } diff --git a/go.mod b/go.mod index cbcad20919f..448b43d3735 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,8 @@ module github.com/smartcontractkit/chainlink/v2 go 1.21.3 +toolchain go1.21.5 + require ( github.com/Depado/ginprom v1.8.0 github.com/Masterminds/semver/v3 v3.2.1 @@ -48,6 +50,7 @@ require ( github.com/mitchellh/go-homedir v1.1.0 github.com/mitchellh/mapstructure v1.5.0 github.com/mr-tron/base58 v1.2.0 + github.com/nats-io/nats.go v1.12.1 github.com/olekukonko/tablewriter v0.0.5 github.com/onsi/gomega v1.30.0 github.com/patrickmn/go-cache v2.1.0+incompatible @@ -258,6 +261,8 @@ require ( github.com/mostynb/zstdpool-freelist v0.0.0-20201229113212-927304c0c3b1 // indirect github.com/mtibben/percent v0.2.1 // indirect github.com/mwitkow/grpc-proxy v0.0.0-20230212185441-f345521cb9c9 // indirect + github.com/nats-io/nkeys v0.3.0 // indirect + github.com/nats-io/nuid v1.0.1 // indirect github.com/oklog/run v1.1.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect diff --git a/go.sum b/go.sum index c8f6620af85..fb435795e0b 100644 --- a/go.sum +++ b/go.sum @@ -989,9 +989,19 @@ github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjW github.com/mtibben/percent v0.2.1 h1:5gssi8Nqo8QU/r2pynCm+hBQHpkB/uNK7BJCFogWdzs= github.com/mtibben/percent v0.2.1/go.mod h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ibNBTZrns= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/nats-io/jwt v0.3.0 h1:xdnzwFETV++jNc4W1mw//qFyJGb2ABOombmZJQS4+Qo= github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg= +github.com/nats-io/jwt/v2 v2.0.3 h1:i/O6cmIsjpcQyWDYNcq2JyZ3/VTF8SJ4JWluI5OhpvI= +github.com/nats-io/jwt/v2 v2.0.3/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY= +github.com/nats-io/nats-server/v2 v2.5.0 h1:wsnVaaXH9VRSg+A2MVg5Q727/CqxnmPLGFQ3YZYKTQg= +github.com/nats-io/nats-server/v2 v2.5.0/go.mod h1:Kj86UtrXAL6LwYRA6H4RqzkHhK0Vcv2ZnKD5WbQ1t3g= github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w= +github.com/nats-io/nats.go v1.12.1 h1:+0ndxwUPz3CmQ2vjbXdkC1fo3FdiOQDim4gl3Mge8Qo= +github.com/nats-io/nats.go v1.12.1/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= +github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= +github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nkovacs/streamquote v0.0.0-20170412213628-49af9bddb229/go.mod h1:0aYXnNPJ8l7uZxf45rWW1a/uME32OF0rhiYGNQ2oF2E= @@ -1431,6 +1441,7 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201203163018-be400aefbc4c/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= +golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=