From 6f22cde5cf87c7ad13d91ff99633a3944b24c76f Mon Sep 17 00:00:00 2001 From: Lazar <12626340+Lazar955@users.noreply.github.com> Date: Mon, 16 Sep 2024 10:19:00 +0200 Subject: [PATCH] feat(submitter): stateful submitter (#43) In case of a restart, we want to avoid wasting fees if we have already submitted transactions for a certain epoch. Basic idea: A crash occurs after sending both checkpoint transactions for epoch `n` to the BTC and recording them in the database. Upon restarting, we find that epoch `n` is still marked as sealed. Before resending the transactions we first check our database and confirm that the transactions for this epoch have already been sent. Since the transactions were previously sent, the next step is to verify their status on our Bitcoin node. If the Bitcoin node is aware of the transactions and they are present in the mempool, no further action is needed. However, if the node does not recognize the transactions, this indicates they were lost, and we must resend them to the network. [References](https://github.com/babylonlabs-io/vigilante/pull/28#issuecomment-2324805680) --- Makefile | 6 + btcclient/client_wallet.go | 4 + btcclient/interface.go | 1 + cmd/vigilante/cmd/submitter.go | 6 + config/submitter.go | 3 +- e2etest/monitor_e2e_test.go | 1 + e2etest/submitter_e2e_test.go | 3 + proto/.clang-format | 7 + proto/Dockerfile | 36 +++++ proto/checkpoint.pb.go | 163 ++++++++++++++++++++ proto/checkpoint.proto | 12 ++ proto/gen_protos_docker.sh | 24 +++ proto/protocgen.sh | 41 +++++ submitter/relayer/change_address_test.go | 3 +- submitter/relayer/relayer.go | 112 +++++++++++++- submitter/relayer/relayer_test.go | 116 ++++++++++++++ submitter/store/store.go | 183 +++++++++++++++++++++++ submitter/store/store_test.go | 40 +++++ submitter/submitter.go | 3 + testutil/mocks/btcclient.go | 15 ++ utils/serialize.go | 27 ++++ 21 files changed, 800 insertions(+), 6 deletions(-) create mode 100644 proto/.clang-format create mode 100644 proto/Dockerfile create mode 100644 proto/checkpoint.pb.go create mode 100644 proto/checkpoint.proto create mode 100755 proto/gen_protos_docker.sh create mode 100755 proto/protocgen.sh create mode 100644 submitter/relayer/relayer_test.go create mode 100644 submitter/store/store.go create mode 100644 submitter/store/store_test.go create mode 100644 utils/serialize.go diff --git a/Makefile b/Makefile index 66370d93..32eaca44 100644 --- a/Makefile +++ b/Makefile @@ -75,3 +75,9 @@ update-changelog: ./scripts/update_changelog.sh $(sinceTag) $(upcomingTag) .PHONY: build test test-e2e build-docker rm-docker mocks update-changelog + +proto-gen: + @$(call print, "Compiling protos.") + cd ./proto; ./gen_protos_docker.sh + +.PHONY: proto-gen \ No newline at end of file diff --git a/btcclient/client_wallet.go b/btcclient/client_wallet.go index cc3a50a0..d591bbfd 100644 --- a/btcclient/client_wallet.go +++ b/btcclient/client_wallet.go @@ -121,3 +121,7 @@ func (c *Client) FundRawTransaction(tx *wire.MsgTx, opts btcjson.FundRawTransact func (c *Client) SignRawTransactionWithWallet(tx *wire.MsgTx) (*wire.MsgTx, bool, error) { return c.Client.SignRawTransactionWithWallet(tx) } + +func (c *Client) GetRawTransaction(txHash *chainhash.Hash) (*btcutil.Tx, error) { + return c.Client.GetRawTransaction(txHash) +} diff --git a/btcclient/interface.go b/btcclient/interface.go index 71b9bca5..f9d89aa1 100644 --- a/btcclient/interface.go +++ b/btcclient/interface.go @@ -38,4 +38,5 @@ type BTCWallet interface { GetHighUTXOAndSum() (*btcjson.ListUnspentResult, float64, error) FundRawTransaction(tx *wire.MsgTx, opts btcjson.FundRawTransactionOpts, isWitness *bool) (*btcjson.FundRawTransactionResult, error) SignRawTransactionWithWallet(tx *wire.MsgTx) (*wire.MsgTx, bool, error) + GetRawTransaction(txHash *chainhash.Hash) (*btcutil.Tx, error) } diff --git a/cmd/vigilante/cmd/submitter.go b/cmd/vigilante/cmd/submitter.go index bc97ae41..b90acd6f 100644 --- a/cmd/vigilante/cmd/submitter.go +++ b/cmd/vigilante/cmd/submitter.go @@ -62,6 +62,11 @@ func GetSubmitterCmd() *cobra.Command { // register submitter metrics submitterMetrics := metrics.NewSubmitterMetrics() + dbBackend, err := cfg.Submitter.DatabaseConfig.GetDbBackend() + if err != nil { + panic(err) + } + // create submitter vigilantSubmitter, err := submitter.New( &cfg.Submitter, @@ -73,6 +78,7 @@ func GetSubmitterCmd() *cobra.Command { cfg.Common.MaxRetrySleepTime, cfg.Common.MaxRetryTimes, submitterMetrics, + dbBackend, ) if err != nil { panic(fmt.Errorf("failed to create vigilante submitter: %w", err)) diff --git a/config/submitter.go b/config/submitter.go index b507f1bb..511deee3 100644 --- a/config/submitter.go +++ b/config/submitter.go @@ -25,7 +25,8 @@ type SubmitterConfig struct { PollingIntervalSeconds uint `mapstructure:"polling-interval-seconds"` // ResendIntervalSeconds defines the time (in seconds) which the submitter awaits // before resubmitting checkpoints to BTC - ResendIntervalSeconds uint `mapstructure:"resend-interval-seconds"` + ResendIntervalSeconds uint `mapstructure:"resend-interval-seconds"` + DatabaseConfig *DBConfig `mapstructure:"database-config"` } func (cfg *SubmitterConfig) Validate() error { diff --git a/e2etest/monitor_e2e_test.go b/e2etest/monitor_e2e_test.go index 6f1c0507..8debc3b1 100644 --- a/e2etest/monitor_e2e_test.go +++ b/e2etest/monitor_e2e_test.go @@ -61,6 +61,7 @@ func TestMonitorBootstrap(t *testing.T) { tm.Config.Common.MaxRetrySleepTime, tm.Config.Common.MaxRetryTimes, metrics.NewSubmitterMetrics(), + testutil.MakeTestBackend(t), ) vigilantSubmitter.Start() diff --git a/e2etest/submitter_e2e_test.go b/e2etest/submitter_e2e_test.go index 7fcb4f6a..18063299 100644 --- a/e2etest/submitter_e2e_test.go +++ b/e2etest/submitter_e2e_test.go @@ -4,6 +4,7 @@ package e2etest import ( + "github.com/babylonlabs-io/vigilante/testutil" "math/rand" "testing" "time" @@ -64,6 +65,7 @@ func TestSubmitterSubmission(t *testing.T) { tm.Config.Common.MaxRetrySleepTime, tm.Config.Common.MaxRetryTimes, metrics.NewSubmitterMetrics(), + testutil.MakeTestBackend(t), ) vigilantSubmitter.Start() @@ -138,6 +140,7 @@ func TestSubmitterSubmissionReplace(t *testing.T) { tm.Config.Common.MaxRetrySleepTime, tm.Config.Common.MaxRetryTimes, metrics.NewSubmitterMetrics(), + testutil.MakeTestBackend(t), ) vigilantSubmitter.Start() diff --git a/proto/.clang-format b/proto/.clang-format new file mode 100644 index 00000000..f1914278 --- /dev/null +++ b/proto/.clang-format @@ -0,0 +1,7 @@ +--- +Language: Proto +BasedOnStyle: Google +IndentWidth: 4 +AllowShortFunctionsOnASingleLine: None +SpaceBeforeParens: Always +CompactNamespaces: false diff --git a/proto/Dockerfile b/proto/Dockerfile new file mode 100644 index 00000000..69525c13 --- /dev/null +++ b/proto/Dockerfile @@ -0,0 +1,36 @@ +# If you change this value, please change it in the following files as well: +# /.github/workflows/main.yaml +# /Dockerfile +# /dev.Dockerfile +# /make/builder.Dockerfile +# /tools/Dockerfile +FROM golang:1.20.5-buster + +RUN apt-get update && apt-get install -y \ + git \ + protobuf-compiler='3.6.1*' \ + clang-format='1:7.0*' + +# We don't want any default values for these variables to make sure they're +# explicitly provided by parsing the go.mod file. Otherwise we might forget to +# update them here if we bump the versions. +ARG PROTOBUF_VERSION +ARG GRPC_GATEWAY_VERSION + +ENV PROTOC_GEN_GO_GRPC_VERSION="v1.1.0" +ENV GOCACHE=/tmp/build/.cache +ENV GOMODCACHE=/tmp/build/.modcache + +RUN cd /tmp \ + && mkdir -p /tmp/build/.cache \ + && mkdir -p /tmp/build/.modcache \ + && go install google.golang.org/protobuf/cmd/protoc-gen-go@${PROTOBUF_VERSION} \ + && go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@${PROTOC_GEN_GO_GRPC_VERSION} \ + && go install github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-grpc-gateway@${GRPC_GATEWAY_VERSION} \ + && go install github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-openapiv2@${GRPC_GATEWAY_VERSION} \ + && go install golang.org/x/tools/cmd/goimports@v0.1.7 \ + && chmod -R 777 /tmp/build/ + +WORKDIR /build + +CMD ["/bin/bash", "/build/proto/protocgen.sh"] diff --git a/proto/checkpoint.pb.go b/proto/checkpoint.pb.go new file mode 100644 index 00000000..3a3454c9 --- /dev/null +++ b/proto/checkpoint.pb.go @@ -0,0 +1,163 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.33.0 +// protoc v3.6.1 +// source: checkpoint.proto + +package proto + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// StoredCheckpoint holds two transactions and an epoch number +type StoredCheckpoint struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Tx1 []byte `protobuf:"bytes,1,opt,name=tx1,proto3" json:"tx1,omitempty"` // wire.MsgTx serialized as bytes + Tx2 []byte `protobuf:"bytes,2,opt,name=tx2,proto3" json:"tx2,omitempty"` + Epoch uint64 `protobuf:"varint,3,opt,name=epoch,proto3" json:"epoch,omitempty"` +} + +func (x *StoredCheckpoint) Reset() { + *x = StoredCheckpoint{} + if protoimpl.UnsafeEnabled { + mi := &file_checkpoint_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *StoredCheckpoint) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StoredCheckpoint) ProtoMessage() {} + +func (x *StoredCheckpoint) ProtoReflect() protoreflect.Message { + mi := &file_checkpoint_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StoredCheckpoint.ProtoReflect.Descriptor instead. +func (*StoredCheckpoint) Descriptor() ([]byte, []int) { + return file_checkpoint_proto_rawDescGZIP(), []int{0} +} + +func (x *StoredCheckpoint) GetTx1() []byte { + if x != nil { + return x.Tx1 + } + return nil +} + +func (x *StoredCheckpoint) GetTx2() []byte { + if x != nil { + return x.Tx2 + } + return nil +} + +func (x *StoredCheckpoint) GetEpoch() uint64 { + if x != nil { + return x.Epoch + } + return 0 +} + +var File_checkpoint_proto protoreflect.FileDescriptor + +var file_checkpoint_proto_rawDesc = []byte{ + 0x0a, 0x10, 0x63, 0x68, 0x65, 0x63, 0x6b, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x12, 0x05, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x4c, 0x0a, 0x10, 0x53, 0x74, 0x6f, + 0x72, 0x65, 0x64, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x12, 0x10, 0x0a, + 0x03, 0x74, 0x78, 0x31, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x74, 0x78, 0x31, 0x12, + 0x10, 0x0a, 0x03, 0x74, 0x78, 0x32, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x74, 0x78, + 0x32, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x70, 0x6f, 0x63, 0x68, 0x18, 0x03, 0x20, 0x01, 0x28, 0x04, + 0x52, 0x05, 0x65, 0x70, 0x6f, 0x63, 0x68, 0x42, 0x2b, 0x5a, 0x29, 0x67, 0x69, 0x74, 0x68, 0x75, + 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x62, 0x61, 0x62, 0x79, 0x6c, 0x6f, 0x6e, 0x6c, 0x61, 0x62, + 0x73, 0x2d, 0x69, 0x6f, 0x2f, 0x76, 0x69, 0x67, 0x69, 0x6c, 0x61, 0x6e, 0x74, 0x65, 0x2f, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_checkpoint_proto_rawDescOnce sync.Once + file_checkpoint_proto_rawDescData = file_checkpoint_proto_rawDesc +) + +func file_checkpoint_proto_rawDescGZIP() []byte { + file_checkpoint_proto_rawDescOnce.Do(func() { + file_checkpoint_proto_rawDescData = protoimpl.X.CompressGZIP(file_checkpoint_proto_rawDescData) + }) + return file_checkpoint_proto_rawDescData +} + +var file_checkpoint_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_checkpoint_proto_goTypes = []interface{}{ + (*StoredCheckpoint)(nil), // 0: proto.StoredCheckpoint +} +var file_checkpoint_proto_depIdxs = []int32{ + 0, // [0:0] is the sub-list for method output_type + 0, // [0:0] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_checkpoint_proto_init() } +func file_checkpoint_proto_init() { + if File_checkpoint_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_checkpoint_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*StoredCheckpoint); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_checkpoint_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_checkpoint_proto_goTypes, + DependencyIndexes: file_checkpoint_proto_depIdxs, + MessageInfos: file_checkpoint_proto_msgTypes, + }.Build() + File_checkpoint_proto = out.File + file_checkpoint_proto_rawDesc = nil + file_checkpoint_proto_goTypes = nil + file_checkpoint_proto_depIdxs = nil +} diff --git a/proto/checkpoint.proto b/proto/checkpoint.proto new file mode 100644 index 00000000..97b82153 --- /dev/null +++ b/proto/checkpoint.proto @@ -0,0 +1,12 @@ +syntax = "proto3"; + +package proto; + +option go_package = "github.com/babylonlabs-io/vigilante/proto"; + +// StoredCheckpoint holds two transactions and an epoch number +message StoredCheckpoint { + bytes tx1 = 1; // wire.MsgTx serialized as bytes + bytes tx2 = 2; + uint64 epoch = 3; +} \ No newline at end of file diff --git a/proto/gen_protos_docker.sh b/proto/gen_protos_docker.sh new file mode 100755 index 00000000..ee184754 --- /dev/null +++ b/proto/gen_protos_docker.sh @@ -0,0 +1,24 @@ +#!/bin/bash + +set -e + +# Directory of the script file, independent of where it's called from. +DIR="$(cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd)" + +PROTOBUF_VERSION=$(go list -f '{{.Version}}' -m google.golang.org/protobuf) +GRPC_GATEWAY_VERSION=$(go list -f '{{.Version}}' -m github.com/grpc-ecosystem/grpc-gateway/v2) + +echo "Building protobuf compiler docker image..." +docker build -t vigilante-protobuf-builder \ + --build-arg PROTOBUF_VERSION="$PROTOBUF_VERSION" \ + --build-arg GRPC_GATEWAY_VERSION="$GRPC_GATEWAY_VERSION" \ + . + +echo "Compiling and formatting *.proto files..." +docker run \ + --rm \ + --user "$UID:$(id -g)" \ + -e UID=$UID \ + -e SUBSERVER_PREFIX \ + -v "$DIR/../:/build" \ + vigilante-protobuf-builder diff --git a/proto/protocgen.sh b/proto/protocgen.sh new file mode 100755 index 00000000..866ccc7f --- /dev/null +++ b/proto/protocgen.sh @@ -0,0 +1,41 @@ +#!/bin/bash + +set -e + +# generate compiles the *.pb.go stubs from the *.proto files. +function generate() { + echo "Generating vigilatne protos" + + PROTOS="checkpoint.proto" + + # For each of the sub-servers, we then generate their protos, but a restricted + # set as they don't yet require REST proxies, or swagger docs. + for file in $PROTOS; do + DIRECTORY=$(dirname "${file}") + echo "Generating protos from ${file}, into ${DIRECTORY}" + + # Generate the protos. + protoc --go_out . --go_opt paths=source_relative \ + --go-grpc_out . --go-grpc_opt paths=source_relative \ + "${file}" --proto_path=$GOPATH/src/ --proto_path=. + done +} + +# format formats the *.proto files with the clang-format utility. +function format() { + echo "Formatting protos" + #| xargs -0 clang-format --style=file -i + find . -name "*.proto" -print0 | xargs -0 +} + +# Compile and format the proto package. +pushd proto +format +generate +popd + +if [[ "$COMPILE_MOBILE" == "1" ]]; then + pushd mobile + ./gen_bindings.sh $FALAFEL_VERSION + popd +fi diff --git a/submitter/relayer/change_address_test.go b/submitter/relayer/change_address_test.go index 6fc9ab4f..fb7e772e 100644 --- a/submitter/relayer/change_address_test.go +++ b/submitter/relayer/change_address_test.go @@ -1,6 +1,7 @@ package relayer_test import ( + "github.com/babylonlabs-io/vigilante/testutil" "testing" "github.com/btcsuite/btcd/btcjson" @@ -49,7 +50,7 @@ func TestGetChangeAddress(t *testing.T) { logger, err := config.NewRootLogger("auto", "debug") require.NoError(t, err) testRelayer := relayer.New(wallet, []byte("bbnt"), btctxformatter.CurrentVersion, submitterAddr, - submitterMetrics.RelayerMetrics, nil, &cfg, logger) + submitterMetrics.RelayerMetrics, nil, &cfg, logger, testutil.MakeTestBackend(t)) // 1. only SegWit Bech32 addresses segWitBech32Addrs := append(SegWitBech32p2wshAddrsStr, SegWitBech32p2wpkhAddrsStr...) diff --git a/submitter/relayer/relayer.go b/submitter/relayer/relayer.go index 4d167935..4f3977b4 100644 --- a/submitter/relayer/relayer.go +++ b/submitter/relayer/relayer.go @@ -5,7 +5,9 @@ import ( "encoding/hex" "errors" "fmt" + "github.com/babylonlabs-io/vigilante/submitter/store" "github.com/btcsuite/btcd/btcjson" + "github.com/lightningnetwork/lnd/kvdb" "math" "strconv" "time" @@ -31,9 +33,14 @@ const ( dustThreshold btcutil.Amount = 546 ) +type GetLatestCheckpointFunc func() (*store.StoredCheckpoint, bool, error) +type GetRawTransactionFunc func(txHash *chainhash.Hash) (*btcutil.Tx, error) +type SendTransactionFunc func(tx *wire.MsgTx) (*chainhash.Hash, error) + type Relayer struct { chainfee.Estimator btcclient.BTCWallet + store *store.SubmitterStore lastSubmittedCheckpoint *types.CheckpointInfo tag btctxformatter.BabylonTag version btctxformatter.FormatVersion @@ -52,11 +59,18 @@ func New( est chainfee.Estimator, config *config.SubmitterConfig, parentLogger *zap.Logger, + db kvdb.Backend, ) *Relayer { + subStore, err := store.NewSubmitterStore(db) + if err != nil { + panic(fmt.Errorf("error setting up store: %w", err)) + } + metrics.ResendIntervalSecondsGauge.Set(float64(config.ResendIntervalSeconds)) return &Relayer{ Estimator: est, BTCWallet: wallet, + store: subStore, tag: tag, version: version, submitterAddress: submitterAddress, @@ -81,25 +95,56 @@ func (rl *Relayer) SendCheckpointToBTC(ckpt *ckpttypes.RawCheckpointWithMetaResp return nil } + storeCkptFunc := func(tx1, tx2 *wire.MsgTx, epochNum uint64) error { + storedCkpt := store.NewStoredCheckpoint(tx1, tx2, epochNum) + return rl.store.PutCheckpoint(storedCkpt) + } + + if rl.shouldSendCompleteCkpt(ckptEpoch) || rl.shouldSendTx2(ckptEpoch) { + hasBeenProcessed, err := maybeResendFromStore( + ckptEpoch, + rl.store.LatestCheckpoint, + rl.GetRawTransaction, + rl.sendTxToBTC, + ) + if err != nil { + return err + } + + if hasBeenProcessed { + return nil + } + } + if rl.shouldSendCompleteCkpt(ckptEpoch) { rl.logger.Infof("Submitting a raw checkpoint for epoch %v", ckptEpoch) - submittedCheckpoint, err := rl.convertCkptToTwoTxAndSubmit(ckpt.Ckpt) + submittedCkpt, err := rl.convertCkptToTwoTxAndSubmit(ckpt.Ckpt) if err != nil { return err } - rl.lastSubmittedCheckpoint = submittedCheckpoint + rl.lastSubmittedCheckpoint = submittedCkpt + + err = storeCkptFunc(submittedCkpt.Tx1.Tx, submittedCkpt.Tx2.Tx, submittedCkpt.Epoch) + if err != nil { + return err + } return nil } else if rl.shouldSendTx2(ckptEpoch) { rl.logger.Infof("Retrying to send tx2 for epoch %v, tx1 %s", ckptEpoch, rl.lastSubmittedCheckpoint.Tx1.TxId) - submittedCheckpoint, err := rl.retrySendTx2(ckpt.Ckpt) + submittedCkpt, err := rl.retrySendTx2(ckpt.Ckpt) if err != nil { return err } - rl.lastSubmittedCheckpoint = submittedCheckpoint + rl.lastSubmittedCheckpoint = submittedCkpt + + err = storeCkptFunc(submittedCkpt.Tx1.Tx, submittedCkpt.Tx2.Tx, submittedCkpt.Epoch) + if err != nil { + return err + } return nil } @@ -150,6 +195,15 @@ func (rl *Relayer) SendCheckpointToBTC(ckpt *ckpttypes.RawCheckpointWithMetaResp // update the second tx of the last submitted checkpoint as it is replaced rl.lastSubmittedCheckpoint.Tx2 = resubmittedTx2 + + err = storeCkptFunc( + rl.lastSubmittedCheckpoint.Tx1.Tx, + rl.lastSubmittedCheckpoint.Tx2.Tx, + rl.lastSubmittedCheckpoint.Epoch, + ) + if err != nil { + return err + } } return nil @@ -542,3 +596,53 @@ func (rl *Relayer) sendTxToBTC(tx *wire.MsgTx) (*chainhash.Hash, error) { return ha, nil } + +// maybeResendFromStore - checks if we need to resubmit txns from a store +// in case "submitter" service was restarted, we want to ensure that we don't send txns again for a checkpoint +// that has already been processed. +// Returns true if the first transactions are in the mempool (no resubmission needed), +// and false if any transaction was re-sent from the store. +func maybeResendFromStore( + epoch uint64, + getLatestStoreCheckpoint GetLatestCheckpointFunc, + getRawTransaction GetRawTransactionFunc, + sendTransaction SendTransactionFunc, +) (bool, error) { + storedCkpt, exists, err := getLatestStoreCheckpoint() + if err != nil { + return false, err + } else if !exists { + return false, nil + } + + if storedCkpt.Epoch != epoch { + return false, nil + } + + maybeResendFunc := func(tx *wire.MsgTx) error { + txID := tx.TxHash() + _, err = getRawTransaction(&txID) // todo(lazar): check for specific not found err + if err != nil { + _, err := sendTransaction(tx) + if err != nil { + return err + } + + // we know about this tx, but we needed to resend it from already constructed tx from db + return nil + } + + // tx exists in mempool and is known to us + return nil + } + + if err := maybeResendFunc(storedCkpt.Tx1); err != nil { + return false, err + } + + if err := maybeResendFunc(storedCkpt.Tx2); err != nil { + return false, err + } + + return true, nil +} diff --git a/submitter/relayer/relayer_test.go b/submitter/relayer/relayer_test.go new file mode 100644 index 00000000..75cf0161 --- /dev/null +++ b/submitter/relayer/relayer_test.go @@ -0,0 +1,116 @@ +package relayer + +import ( + "errors" + "github.com/babylonlabs-io/vigilante/submitter/store" + "github.com/btcsuite/btcd/btcutil" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" + "github.com/stretchr/testify/assert" + "testing" +) + +func Test_maybeResendFromStore(t *testing.T) { + t.Parallel() + tests := []struct { + name string + epoch uint64 + getLatestCheckpoint GetLatestCheckpointFunc + getRawTransaction GetRawTransactionFunc + sendTransaction SendTransactionFunc + expectedResult bool + expectedError error + }{ + { + name: "Checkpoint not found", + epoch: 123, + getLatestCheckpoint: func() (*store.StoredCheckpoint, bool, error) { + return nil, false, nil + }, + getRawTransaction: func(txHash *chainhash.Hash) (*btcutil.Tx, error) { + return nil, nil + }, + sendTransaction: func(tx *wire.MsgTx) (*chainhash.Hash, error) { + return nil, nil + }, + expectedResult: false, + expectedError: nil, + }, + { + name: "Error retrieving checkpoint", + epoch: 123, + getLatestCheckpoint: func() (*store.StoredCheckpoint, bool, error) { + return nil, false, errors.New("checkpoint error") + }, + getRawTransaction: func(txHash *chainhash.Hash) (*btcutil.Tx, error) { + return nil, nil + }, + sendTransaction: func(tx *wire.MsgTx) (*chainhash.Hash, error) { + return nil, nil + }, + expectedResult: false, + expectedError: errors.New("checkpoint error"), + }, + { + name: "Epoch mismatch", + epoch: 123, + getLatestCheckpoint: func() (*store.StoredCheckpoint, bool, error) { + return &store.StoredCheckpoint{Epoch: 456, Tx1: &wire.MsgTx{}, Tx2: &wire.MsgTx{}}, true, nil + }, + getRawTransaction: func(txHash *chainhash.Hash) (*btcutil.Tx, error) { + return nil, nil + }, + sendTransaction: func(tx *wire.MsgTx) (*chainhash.Hash, error) { + return nil, nil + }, + expectedResult: false, + expectedError: nil, + }, + { + name: "Successful resends", + epoch: 123, + getLatestCheckpoint: func() (*store.StoredCheckpoint, bool, error) { + return &store.StoredCheckpoint{Epoch: 123, Tx1: &wire.MsgTx{}, Tx2: &wire.MsgTx{}}, true, nil + }, + getRawTransaction: func(txHash *chainhash.Hash) (*btcutil.Tx, error) { + return nil, errors.New("transaction not found") // Simulate transaction not found + }, + sendTransaction: func(tx *wire.MsgTx) (*chainhash.Hash, error) { + return &chainhash.Hash{}, nil // Simulate successful send + }, + expectedResult: true, + expectedError: nil, + }, + { + name: "Error resending transaction", + epoch: 123, + getLatestCheckpoint: func() (*store.StoredCheckpoint, bool, error) { + return &store.StoredCheckpoint{Epoch: 123, Tx1: &wire.MsgTx{}, Tx2: &wire.MsgTx{}}, true, nil + }, + getRawTransaction: func(txHash *chainhash.Hash) (*btcutil.Tx, error) { + return nil, errors.New("transaction not found") + }, + sendTransaction: func(tx *wire.MsgTx) (*chainhash.Hash, error) { + return nil, errors.New("send error") + }, + expectedResult: false, + expectedError: errors.New("send error"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + result, err := maybeResendFromStore(tt.epoch, tt.getLatestCheckpoint, tt.getRawTransaction, tt.sendTransaction) + + if tt.expectedError != nil { + assert.Error(t, err) + assert.Equal(t, tt.expectedError.Error(), err.Error()) + } else { + assert.NoError(t, err) + } + + assert.Equal(t, tt.expectedResult, result) + }) + } +} diff --git a/submitter/store/store.go b/submitter/store/store.go new file mode 100644 index 00000000..9fe2702b --- /dev/null +++ b/submitter/store/store.go @@ -0,0 +1,183 @@ +package store + +import ( + "errors" + "github.com/babylonlabs-io/vigilante/proto" + "github.com/babylonlabs-io/vigilante/utils" + "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcwallet/walletdb" + "github.com/lightningnetwork/lnd/kvdb" + pm "google.golang.org/protobuf/proto" +) + +type SubmitterStore struct { + db kvdb.Backend +} + +var ( + // storing submitted txns + storedCheckpointBucketName = []byte("storedckpt") + lastSubmittedCkptKey = []byte("lastsubckpt") +) + +var ( + // ErrCorruptedDb For some reason, db on disk representation have changed + ErrCorruptedDb = errors.New("db is corrupted") + // ErrNotFound Value not found + ErrNotFound = errors.New("not found") +) + +type StoredCheckpoint struct { + Tx1 *wire.MsgTx + Tx2 *wire.MsgTx + Epoch uint64 +} + +func NewStoredCheckpoint(tx1, tx2 *wire.MsgTx, epoch uint64) *StoredCheckpoint { + return &StoredCheckpoint{ + Tx1: tx1, + Tx2: tx2, + Epoch: epoch, + } +} + +func NewSubmitterStore(backend kvdb.Backend) (*SubmitterStore, error) { + store := &SubmitterStore{db: backend} + if err := store.createBuckets(); err != nil { + return nil, err + } + + return store, nil +} + +func (s *SubmitterStore) createBuckets() error { + buckets := [][]byte{storedCheckpointBucketName} + for _, bucket := range buckets { + if err := s.db.Update(func(tx kvdb.RwTx) error { + _, err := tx.CreateTopLevelBucket(bucket) + if err != nil { + return err + } + + return nil + }, func() {}); err != nil { + return err + } + } + + return nil +} + +func (s *SubmitterStore) LatestCheckpoint() (*StoredCheckpoint, bool, error) { + data, f, err := s.get(lastSubmittedCkptKey, storedCheckpointBucketName) + if err != nil { + return nil, false, err + } else if !f { + return nil, false, nil + } + + protoCkpt := &proto.StoredCheckpoint{} + if err := pm.Unmarshal(data, protoCkpt); err != nil { + return nil, false, err + } + + var storedCkpt StoredCheckpoint + if err := storedCkpt.FromProto(protoCkpt); err != nil { + return nil, false, err + } + + return &storedCkpt, true, err +} + +func (s *SubmitterStore) PutCheckpoint(ckpt *StoredCheckpoint) error { + protoCkpt, err := ckpt.ToProto() + if err != nil { + return err + } + + data, err := pm.Marshal(protoCkpt) + if err != nil { + return err + } + + return s.put(lastSubmittedCkptKey, data, storedCheckpointBucketName) +} + +func (s *SubmitterStore) get(key, bucketName []byte) ([]byte, bool, error) { + var returnVal []byte + + err := s.db.View(func(tx walletdb.ReadTx) error { + b := tx.ReadBucket(bucketName) + if b == nil { + return ErrCorruptedDb + } + + byteVal := b.Get(key) + if byteVal == nil { + return ErrNotFound + } + + returnVal = byteVal + + return nil + }, func() {}) + + if err != nil { + if errors.Is(err, ErrNotFound) { + return nil, false, nil + } + return nil, false, err + } + + return returnVal, true, nil +} + +func (s *SubmitterStore) put(key []byte, val []byte, bucketName []byte) error { + return kvdb.Batch(s.db, func(tx kvdb.RwTx) error { + bucket := tx.ReadWriteBucket(bucketName) + if bucket == nil { + return ErrCorruptedDb + } + + return bucket.Put(key, val) + }) +} + +// ToProto converts StoredTx to its Protobuf equivalent +func (s *StoredCheckpoint) ToProto() (*proto.StoredCheckpoint, error) { + bufTx1, err := utils.SerializeMsgTx(s.Tx1) + if err != nil { + return nil, err + } + + bufTx2, err := utils.SerializeMsgTx(s.Tx2) + if err != nil { + return nil, err + } + + return &proto.StoredCheckpoint{ + Tx1: bufTx1, + Tx2: bufTx2, + Epoch: s.Epoch, + }, nil +} + +// FromProto converts a Protobuf StoredTx to the Go struct +func (s *StoredCheckpoint) FromProto(protoTx *proto.StoredCheckpoint) error { + var err error + tx1, err := utils.DeserializeMsgTx(protoTx.Tx1) + if err != nil { + return err + } + + tx2, err := utils.DeserializeMsgTx(protoTx.Tx2) + if err != nil { + return err + } + + s.Tx1 = tx1 + s.Tx2 = tx2 + s.Epoch = protoTx.Epoch + + return nil +} diff --git a/submitter/store/store_test.go b/submitter/store/store_test.go new file mode 100644 index 00000000..6af130a8 --- /dev/null +++ b/submitter/store/store_test.go @@ -0,0 +1,40 @@ +package store_test + +import ( + bbndatagen "github.com/babylonlabs-io/babylon/testutil/datagen" + "github.com/babylonlabs-io/vigilante/submitter/store" + "github.com/babylonlabs-io/vigilante/testutil" + "github.com/babylonlabs-io/vigilante/testutil/datagen" + "github.com/stretchr/testify/require" + "math/rand" + "testing" +) + +func FuzzStoringCkpt(f *testing.F) { + bbndatagen.AddRandomSeedsToFuzzer(f, 3) + + f.Fuzz(func(t *testing.T, seed int64) { + t.Parallel() + r := rand.New(rand.NewSource(seed)) + db := testutil.MakeTestBackend(t) + s, err := store.NewSubmitterStore(db) + require.NoError(t, err) + + _, exists, err := s.LatestCheckpoint() + require.NoError(t, err) + require.False(t, exists) + + epoch := uint64(r.Int63n(1000) + 1) + tx1 := datagen.GenRandomTx(r) + tx2 := datagen.GenRandomTx(r) + err = s.PutCheckpoint(store.NewStoredCheckpoint(tx1, tx2, epoch)) + require.NoError(t, err) + + storedCkpt, exists, err := s.LatestCheckpoint() + require.NoError(t, err) + require.True(t, exists) + require.Equal(t, storedCkpt.Epoch, epoch) + require.Equal(t, storedCkpt.Tx1.TxHash().String(), tx1.TxHash().String()) + require.Equal(t, storedCkpt.Tx2.TxHash().String(), tx2.TxHash().String()) + }) +} diff --git a/submitter/submitter.go b/submitter/submitter.go index 82f467fb..c031e9aa 100644 --- a/submitter/submitter.go +++ b/submitter/submitter.go @@ -4,6 +4,7 @@ import ( "encoding/hex" "fmt" "github.com/babylonlabs-io/vigilante/retrywrap" + "github.com/lightningnetwork/lnd/kvdb" "sync" "time" @@ -43,6 +44,7 @@ func New( submitterAddr sdk.AccAddress, retrySleepTime, maxRetrySleepTime time.Duration, maxRetryTimes uint, submitterMetrics *metrics.SubmitterMetrics, + db kvdb.Backend, ) (*Submitter, error) { logger := parentLogger.With(zap.String("module", "submitter")) var ( @@ -85,6 +87,7 @@ func New( est, cfg, logger, + db, ) return &Submitter{ diff --git a/testutil/mocks/btcclient.go b/testutil/mocks/btcclient.go index 76581b6c..240994fb 100644 --- a/testutil/mocks/btcclient.go +++ b/testutil/mocks/btcclient.go @@ -284,6 +284,21 @@ func (mr *MockBTCWalletMockRecorder) GetRawChangeAddress(account interface{}) *g return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRawChangeAddress", reflect.TypeOf((*MockBTCWallet)(nil).GetRawChangeAddress), account) } +// GetRawTransaction mocks base method. +func (m *MockBTCWallet) GetRawTransaction(txHash *chainhash.Hash) (*btcutil.Tx, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetRawTransaction", txHash) + ret0, _ := ret[0].(*btcutil.Tx) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetRawTransaction indicates an expected call of GetRawTransaction. +func (mr *MockBTCWalletMockRecorder) GetRawTransaction(txHash interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRawTransaction", reflect.TypeOf((*MockBTCWallet)(nil).GetRawTransaction), txHash) +} + // GetWalletLockTime mocks base method. func (m *MockBTCWallet) GetWalletLockTime() int64 { m.ctrl.T.Helper() diff --git a/utils/serialize.go b/utils/serialize.go new file mode 100644 index 00000000..7a907243 --- /dev/null +++ b/utils/serialize.go @@ -0,0 +1,27 @@ +package utils + +import ( + "bytes" + "github.com/btcsuite/btcd/wire" +) + +// SerializeMsgTx serializes wire.MsgTx to bytes +func SerializeMsgTx(tx *wire.MsgTx) ([]byte, error) { + var txBuf bytes.Buffer + if err := tx.Serialize(&txBuf); err != nil { + return nil, err + } + + return txBuf.Bytes(), nil +} + +// DeserializeMsgTx deserializes bytes to wire.MsgTx +func DeserializeMsgTx(data []byte) (*wire.MsgTx, error) { + var tx wire.MsgTx + + if err := tx.Deserialize(bytes.NewReader(data)); err != nil { + return nil, err + } + + return &tx, nil +}