From 2412230b65c565604214dbfdb676941fe68d79b6 Mon Sep 17 00:00:00 2001 From: lazar Date: Thu, 12 Sep 2024 13:13:36 +0200 Subject: [PATCH 01/10] generate protos for submitter store --- Makefile | 6 + proto/.clang-format | 7 ++ proto/Dockerfile | 36 ++++++ proto/checkpoint.pb.go | 239 +++++++++++++++++++++++++++++++++++++ proto/checkpoint.proto | 19 +++ proto/gen_protos_docker.sh | 24 ++++ proto/protocgen.sh | 41 +++++++ submitter/store/store.go | 63 ++++++++++ 8 files changed, 435 insertions(+) create mode 100644 proto/.clang-format create mode 100644 proto/Dockerfile create mode 100644 proto/checkpoint.pb.go create mode 100644 proto/checkpoint.proto create mode 100755 proto/gen_protos_docker.sh create mode 100755 proto/protocgen.sh create mode 100644 submitter/store/store.go diff --git a/Makefile b/Makefile index 66370d9..32eaca4 100644 --- a/Makefile +++ b/Makefile @@ -75,3 +75,9 @@ update-changelog: ./scripts/update_changelog.sh $(sinceTag) $(upcomingTag) .PHONY: build test test-e2e build-docker rm-docker mocks update-changelog + +proto-gen: + @$(call print, "Compiling protos.") + cd ./proto; ./gen_protos_docker.sh + +.PHONY: proto-gen \ No newline at end of file diff --git a/proto/.clang-format b/proto/.clang-format new file mode 100644 index 0000000..f191427 --- /dev/null +++ b/proto/.clang-format @@ -0,0 +1,7 @@ +--- +Language: Proto +BasedOnStyle: Google +IndentWidth: 4 +AllowShortFunctionsOnASingleLine: None +SpaceBeforeParens: Always +CompactNamespaces: false diff --git a/proto/Dockerfile b/proto/Dockerfile new file mode 100644 index 0000000..69525c1 --- /dev/null +++ b/proto/Dockerfile @@ -0,0 +1,36 @@ +# If you change this value, please change it in the following files as well: +# /.github/workflows/main.yaml +# /Dockerfile +# /dev.Dockerfile +# /make/builder.Dockerfile +# /tools/Dockerfile +FROM golang:1.20.5-buster + +RUN apt-get update && apt-get install -y \ + git \ + protobuf-compiler='3.6.1*' \ + clang-format='1:7.0*' + +# We don't want any default values for these variables to make sure they're +# explicitly provided by parsing the go.mod file. Otherwise we might forget to +# update them here if we bump the versions. +ARG PROTOBUF_VERSION +ARG GRPC_GATEWAY_VERSION + +ENV PROTOC_GEN_GO_GRPC_VERSION="v1.1.0" +ENV GOCACHE=/tmp/build/.cache +ENV GOMODCACHE=/tmp/build/.modcache + +RUN cd /tmp \ + && mkdir -p /tmp/build/.cache \ + && mkdir -p /tmp/build/.modcache \ + && go install google.golang.org/protobuf/cmd/protoc-gen-go@${PROTOBUF_VERSION} \ + && go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@${PROTOC_GEN_GO_GRPC_VERSION} \ + && go install github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-grpc-gateway@${GRPC_GATEWAY_VERSION} \ + && go install github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-openapiv2@${GRPC_GATEWAY_VERSION} \ + && go install golang.org/x/tools/cmd/goimports@v0.1.7 \ + && chmod -R 777 /tmp/build/ + +WORKDIR /build + +CMD ["/bin/bash", "/build/proto/protocgen.sh"] diff --git a/proto/checkpoint.pb.go b/proto/checkpoint.pb.go new file mode 100644 index 0000000..f3c6d39 --- /dev/null +++ b/proto/checkpoint.pb.go @@ -0,0 +1,239 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.33.0 +// protoc v3.6.1 +// source: checkpoint.proto + +package proto + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// StoredTx stores a BTC transaction and its ID +type StoredTx struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + TxId []byte `protobuf:"bytes,1,opt,name=tx_id,json=txId,proto3" json:"tx_id,omitempty"` // chainhash.Hash serialized as bytes + Tx []byte `protobuf:"bytes,2,opt,name=tx,proto3" json:"tx,omitempty"` // wire.MsgTx serialized as bytes +} + +func (x *StoredTx) Reset() { + *x = StoredTx{} + if protoimpl.UnsafeEnabled { + mi := &file_checkpoint_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *StoredTx) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StoredTx) ProtoMessage() {} + +func (x *StoredTx) ProtoReflect() protoreflect.Message { + mi := &file_checkpoint_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StoredTx.ProtoReflect.Descriptor instead. +func (*StoredTx) Descriptor() ([]byte, []int) { + return file_checkpoint_proto_rawDescGZIP(), []int{0} +} + +func (x *StoredTx) GetTxId() []byte { + if x != nil { + return x.TxId + } + return nil +} + +func (x *StoredTx) GetTx() []byte { + if x != nil { + return x.Tx + } + return nil +} + +// StoredCheckpoint holds two transactions and an epoch number +type StoredCheckpoint struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Tx1 *StoredTx `protobuf:"bytes,1,opt,name=tx1,proto3" json:"tx1,omitempty"` + Tx2 *StoredTx `protobuf:"bytes,2,opt,name=tx2,proto3" json:"tx2,omitempty"` + Epoch uint64 `protobuf:"varint,3,opt,name=epoch,proto3" json:"epoch,omitempty"` +} + +func (x *StoredCheckpoint) Reset() { + *x = StoredCheckpoint{} + if protoimpl.UnsafeEnabled { + mi := &file_checkpoint_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *StoredCheckpoint) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StoredCheckpoint) ProtoMessage() {} + +func (x *StoredCheckpoint) ProtoReflect() protoreflect.Message { + mi := &file_checkpoint_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StoredCheckpoint.ProtoReflect.Descriptor instead. +func (*StoredCheckpoint) Descriptor() ([]byte, []int) { + return file_checkpoint_proto_rawDescGZIP(), []int{1} +} + +func (x *StoredCheckpoint) GetTx1() *StoredTx { + if x != nil { + return x.Tx1 + } + return nil +} + +func (x *StoredCheckpoint) GetTx2() *StoredTx { + if x != nil { + return x.Tx2 + } + return nil +} + +func (x *StoredCheckpoint) GetEpoch() uint64 { + if x != nil { + return x.Epoch + } + return 0 +} + +var File_checkpoint_proto protoreflect.FileDescriptor + +var file_checkpoint_proto_rawDesc = []byte{ + 0x0a, 0x10, 0x63, 0x68, 0x65, 0x63, 0x6b, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x12, 0x05, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x2f, 0x0a, 0x08, 0x53, 0x74, 0x6f, + 0x72, 0x65, 0x64, 0x54, 0x78, 0x12, 0x13, 0x0a, 0x05, 0x74, 0x78, 0x5f, 0x69, 0x64, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x74, 0x78, 0x49, 0x64, 0x12, 0x0e, 0x0a, 0x02, 0x74, 0x78, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x02, 0x74, 0x78, 0x22, 0x6e, 0x0a, 0x10, 0x53, 0x74, + 0x6f, 0x72, 0x65, 0x64, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x12, 0x21, + 0x0a, 0x03, 0x74, 0x78, 0x31, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x74, 0x6f, 0x72, 0x65, 0x64, 0x54, 0x78, 0x52, 0x03, 0x74, 0x78, + 0x31, 0x12, 0x21, 0x0a, 0x03, 0x74, 0x78, 0x32, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x74, 0x6f, 0x72, 0x65, 0x64, 0x54, 0x78, 0x52, + 0x03, 0x74, 0x78, 0x32, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x70, 0x6f, 0x63, 0x68, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x04, 0x52, 0x05, 0x65, 0x70, 0x6f, 0x63, 0x68, 0x42, 0x2b, 0x5a, 0x29, 0x67, 0x69, + 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x62, 0x61, 0x62, 0x79, 0x6c, 0x6f, 0x6e, + 0x6c, 0x61, 0x62, 0x73, 0x2d, 0x69, 0x6f, 0x2f, 0x76, 0x69, 0x67, 0x69, 0x6c, 0x61, 0x6e, 0x74, + 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_checkpoint_proto_rawDescOnce sync.Once + file_checkpoint_proto_rawDescData = file_checkpoint_proto_rawDesc +) + +func file_checkpoint_proto_rawDescGZIP() []byte { + file_checkpoint_proto_rawDescOnce.Do(func() { + file_checkpoint_proto_rawDescData = protoimpl.X.CompressGZIP(file_checkpoint_proto_rawDescData) + }) + return file_checkpoint_proto_rawDescData +} + +var file_checkpoint_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_checkpoint_proto_goTypes = []interface{}{ + (*StoredTx)(nil), // 0: proto.StoredTx + (*StoredCheckpoint)(nil), // 1: proto.StoredCheckpoint +} +var file_checkpoint_proto_depIdxs = []int32{ + 0, // 0: proto.StoredCheckpoint.tx1:type_name -> proto.StoredTx + 0, // 1: proto.StoredCheckpoint.tx2:type_name -> proto.StoredTx + 2, // [2:2] is the sub-list for method output_type + 2, // [2:2] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name +} + +func init() { file_checkpoint_proto_init() } +func file_checkpoint_proto_init() { + if File_checkpoint_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_checkpoint_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*StoredTx); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_checkpoint_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*StoredCheckpoint); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_checkpoint_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_checkpoint_proto_goTypes, + DependencyIndexes: file_checkpoint_proto_depIdxs, + MessageInfos: file_checkpoint_proto_msgTypes, + }.Build() + File_checkpoint_proto = out.File + file_checkpoint_proto_rawDesc = nil + file_checkpoint_proto_goTypes = nil + file_checkpoint_proto_depIdxs = nil +} diff --git a/proto/checkpoint.proto b/proto/checkpoint.proto new file mode 100644 index 0000000..fc899be --- /dev/null +++ b/proto/checkpoint.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; + +package proto; + +option go_package = "github.com/babylonlabs-io/vigilante/proto"; + + +// StoredTx stores a BTC transaction and its ID +message StoredTx { + bytes tx_id = 1; // chainhash.Hash serialized as bytes + bytes tx = 2; // wire.MsgTx serialized as bytes +} + +// StoredCheckpoint holds two transactions and an epoch number +message StoredCheckpoint { + StoredTx tx1 = 1; + StoredTx tx2 = 2; + uint64 epoch = 3; +} \ No newline at end of file diff --git a/proto/gen_protos_docker.sh b/proto/gen_protos_docker.sh new file mode 100755 index 0000000..ee18475 --- /dev/null +++ b/proto/gen_protos_docker.sh @@ -0,0 +1,24 @@ +#!/bin/bash + +set -e + +# Directory of the script file, independent of where it's called from. +DIR="$(cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd)" + +PROTOBUF_VERSION=$(go list -f '{{.Version}}' -m google.golang.org/protobuf) +GRPC_GATEWAY_VERSION=$(go list -f '{{.Version}}' -m github.com/grpc-ecosystem/grpc-gateway/v2) + +echo "Building protobuf compiler docker image..." +docker build -t vigilante-protobuf-builder \ + --build-arg PROTOBUF_VERSION="$PROTOBUF_VERSION" \ + --build-arg GRPC_GATEWAY_VERSION="$GRPC_GATEWAY_VERSION" \ + . + +echo "Compiling and formatting *.proto files..." +docker run \ + --rm \ + --user "$UID:$(id -g)" \ + -e UID=$UID \ + -e SUBSERVER_PREFIX \ + -v "$DIR/../:/build" \ + vigilante-protobuf-builder diff --git a/proto/protocgen.sh b/proto/protocgen.sh new file mode 100755 index 0000000..866ccc7 --- /dev/null +++ b/proto/protocgen.sh @@ -0,0 +1,41 @@ +#!/bin/bash + +set -e + +# generate compiles the *.pb.go stubs from the *.proto files. +function generate() { + echo "Generating vigilatne protos" + + PROTOS="checkpoint.proto" + + # For each of the sub-servers, we then generate their protos, but a restricted + # set as they don't yet require REST proxies, or swagger docs. + for file in $PROTOS; do + DIRECTORY=$(dirname "${file}") + echo "Generating protos from ${file}, into ${DIRECTORY}" + + # Generate the protos. + protoc --go_out . --go_opt paths=source_relative \ + --go-grpc_out . --go-grpc_opt paths=source_relative \ + "${file}" --proto_path=$GOPATH/src/ --proto_path=. + done +} + +# format formats the *.proto files with the clang-format utility. +function format() { + echo "Formatting protos" + #| xargs -0 clang-format --style=file -i + find . -name "*.proto" -print0 | xargs -0 +} + +# Compile and format the proto package. +pushd proto +format +generate +popd + +if [[ "$COMPILE_MOBILE" == "1" ]]; then + pushd mobile + ./gen_bindings.sh $FALAFEL_VERSION + popd +fi diff --git a/submitter/store/store.go b/submitter/store/store.go new file mode 100644 index 0000000..d67930c --- /dev/null +++ b/submitter/store/store.go @@ -0,0 +1,63 @@ +package store + +import ( + "errors" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/kvdb" +) + +type SubmitterStore struct { + db kvdb.Backend +} + +var ( + // storing submitted txns + storedCheckpointBucketName = []byte("storedckpt") + lastSubmittedCkptKey = []byte("lastsubckpt") +) + +var ( + // ErrCorruptedDb For some reason, db on disk representation have changed + ErrCorruptedDb = errors.New("db is corrupted") + // ErrNotFound Value not found + ErrNotFound = errors.New("not found") +) + +type StoredTx struct { + TxId *chainhash.Hash // we store this to check against the mempool + Tx *wire.MsgTx // we store this in case we need to resubmit +} + +type StoredCheckpoint struct { + Tx1 StoredTx + Tx2 StoredTx + Epoch uint64 +} + +func NewSubmitterStore(backend kvdb.Backend) (*SubmitterStore, error) { + store := &SubmitterStore{db: backend} + if err := store.createBuckets(); err != nil { + return nil, err + } + + return store, nil +} + +func (s *SubmitterStore) createBuckets() error { + buckets := [][]byte{storedCheckpointBucketName} + for _, bucket := range buckets { + if err := s.db.Update(func(tx kvdb.RwTx) error { + _, err := tx.CreateTopLevelBucket(bucket) + if err != nil { + return err + } + + return nil + }, func() {}); err != nil { + return err + } + } + + return nil +} From a367c1f0217f49df8cca7c72a0eb0d1bfeff3422 Mon Sep 17 00:00:00 2001 From: lazar Date: Thu, 12 Sep 2024 14:01:14 +0200 Subject: [PATCH 02/10] adds submitter store --- proto/checkpoint.pb.go | 126 ++++++++------------------------------- proto/checkpoint.proto | 11 +--- submitter/store/store.go | 124 +++++++++++++++++++++++++++++++++++--- utils/serialize.go | 27 +++++++++ 4 files changed, 170 insertions(+), 118 deletions(-) create mode 100644 utils/serialize.go diff --git a/proto/checkpoint.pb.go b/proto/checkpoint.pb.go index f3c6d39..3a3454c 100644 --- a/proto/checkpoint.pb.go +++ b/proto/checkpoint.pb.go @@ -20,77 +20,21 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) -// StoredTx stores a BTC transaction and its ID -type StoredTx struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - TxId []byte `protobuf:"bytes,1,opt,name=tx_id,json=txId,proto3" json:"tx_id,omitempty"` // chainhash.Hash serialized as bytes - Tx []byte `protobuf:"bytes,2,opt,name=tx,proto3" json:"tx,omitempty"` // wire.MsgTx serialized as bytes -} - -func (x *StoredTx) Reset() { - *x = StoredTx{} - if protoimpl.UnsafeEnabled { - mi := &file_checkpoint_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *StoredTx) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*StoredTx) ProtoMessage() {} - -func (x *StoredTx) ProtoReflect() protoreflect.Message { - mi := &file_checkpoint_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use StoredTx.ProtoReflect.Descriptor instead. -func (*StoredTx) Descriptor() ([]byte, []int) { - return file_checkpoint_proto_rawDescGZIP(), []int{0} -} - -func (x *StoredTx) GetTxId() []byte { - if x != nil { - return x.TxId - } - return nil -} - -func (x *StoredTx) GetTx() []byte { - if x != nil { - return x.Tx - } - return nil -} - // StoredCheckpoint holds two transactions and an epoch number type StoredCheckpoint struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Tx1 *StoredTx `protobuf:"bytes,1,opt,name=tx1,proto3" json:"tx1,omitempty"` - Tx2 *StoredTx `protobuf:"bytes,2,opt,name=tx2,proto3" json:"tx2,omitempty"` - Epoch uint64 `protobuf:"varint,3,opt,name=epoch,proto3" json:"epoch,omitempty"` + Tx1 []byte `protobuf:"bytes,1,opt,name=tx1,proto3" json:"tx1,omitempty"` // wire.MsgTx serialized as bytes + Tx2 []byte `protobuf:"bytes,2,opt,name=tx2,proto3" json:"tx2,omitempty"` + Epoch uint64 `protobuf:"varint,3,opt,name=epoch,proto3" json:"epoch,omitempty"` } func (x *StoredCheckpoint) Reset() { *x = StoredCheckpoint{} if protoimpl.UnsafeEnabled { - mi := &file_checkpoint_proto_msgTypes[1] + mi := &file_checkpoint_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -103,7 +47,7 @@ func (x *StoredCheckpoint) String() string { func (*StoredCheckpoint) ProtoMessage() {} func (x *StoredCheckpoint) ProtoReflect() protoreflect.Message { - mi := &file_checkpoint_proto_msgTypes[1] + mi := &file_checkpoint_proto_msgTypes[0] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -116,17 +60,17 @@ func (x *StoredCheckpoint) ProtoReflect() protoreflect.Message { // Deprecated: Use StoredCheckpoint.ProtoReflect.Descriptor instead. func (*StoredCheckpoint) Descriptor() ([]byte, []int) { - return file_checkpoint_proto_rawDescGZIP(), []int{1} + return file_checkpoint_proto_rawDescGZIP(), []int{0} } -func (x *StoredCheckpoint) GetTx1() *StoredTx { +func (x *StoredCheckpoint) GetTx1() []byte { if x != nil { return x.Tx1 } return nil } -func (x *StoredCheckpoint) GetTx2() *StoredTx { +func (x *StoredCheckpoint) GetTx2() []byte { if x != nil { return x.Tx2 } @@ -144,20 +88,15 @@ var File_checkpoint_proto protoreflect.FileDescriptor var file_checkpoint_proto_rawDesc = []byte{ 0x0a, 0x10, 0x63, 0x68, 0x65, 0x63, 0x6b, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x2e, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x12, 0x05, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x2f, 0x0a, 0x08, 0x53, 0x74, 0x6f, - 0x72, 0x65, 0x64, 0x54, 0x78, 0x12, 0x13, 0x0a, 0x05, 0x74, 0x78, 0x5f, 0x69, 0x64, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x74, 0x78, 0x49, 0x64, 0x12, 0x0e, 0x0a, 0x02, 0x74, 0x78, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x02, 0x74, 0x78, 0x22, 0x6e, 0x0a, 0x10, 0x53, 0x74, - 0x6f, 0x72, 0x65, 0x64, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x12, 0x21, - 0x0a, 0x03, 0x74, 0x78, 0x31, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x74, 0x6f, 0x72, 0x65, 0x64, 0x54, 0x78, 0x52, 0x03, 0x74, 0x78, - 0x31, 0x12, 0x21, 0x0a, 0x03, 0x74, 0x78, 0x32, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, - 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x74, 0x6f, 0x72, 0x65, 0x64, 0x54, 0x78, 0x52, - 0x03, 0x74, 0x78, 0x32, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x70, 0x6f, 0x63, 0x68, 0x18, 0x03, 0x20, - 0x01, 0x28, 0x04, 0x52, 0x05, 0x65, 0x70, 0x6f, 0x63, 0x68, 0x42, 0x2b, 0x5a, 0x29, 0x67, 0x69, - 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x62, 0x61, 0x62, 0x79, 0x6c, 0x6f, 0x6e, - 0x6c, 0x61, 0x62, 0x73, 0x2d, 0x69, 0x6f, 0x2f, 0x76, 0x69, 0x67, 0x69, 0x6c, 0x61, 0x6e, 0x74, - 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x74, 0x6f, 0x12, 0x05, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x4c, 0x0a, 0x10, 0x53, 0x74, 0x6f, + 0x72, 0x65, 0x64, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x12, 0x10, 0x0a, + 0x03, 0x74, 0x78, 0x31, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x74, 0x78, 0x31, 0x12, + 0x10, 0x0a, 0x03, 0x74, 0x78, 0x32, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x74, 0x78, + 0x32, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x70, 0x6f, 0x63, 0x68, 0x18, 0x03, 0x20, 0x01, 0x28, 0x04, + 0x52, 0x05, 0x65, 0x70, 0x6f, 0x63, 0x68, 0x42, 0x2b, 0x5a, 0x29, 0x67, 0x69, 0x74, 0x68, 0x75, + 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x62, 0x61, 0x62, 0x79, 0x6c, 0x6f, 0x6e, 0x6c, 0x61, 0x62, + 0x73, 0x2d, 0x69, 0x6f, 0x2f, 0x76, 0x69, 0x67, 0x69, 0x6c, 0x61, 0x6e, 0x74, 0x65, 0x2f, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -172,19 +111,16 @@ func file_checkpoint_proto_rawDescGZIP() []byte { return file_checkpoint_proto_rawDescData } -var file_checkpoint_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_checkpoint_proto_msgTypes = make([]protoimpl.MessageInfo, 1) var file_checkpoint_proto_goTypes = []interface{}{ - (*StoredTx)(nil), // 0: proto.StoredTx - (*StoredCheckpoint)(nil), // 1: proto.StoredCheckpoint + (*StoredCheckpoint)(nil), // 0: proto.StoredCheckpoint } var file_checkpoint_proto_depIdxs = []int32{ - 0, // 0: proto.StoredCheckpoint.tx1:type_name -> proto.StoredTx - 0, // 1: proto.StoredCheckpoint.tx2:type_name -> proto.StoredTx - 2, // [2:2] is the sub-list for method output_type - 2, // [2:2] is the sub-list for method input_type - 2, // [2:2] is the sub-list for extension type_name - 2, // [2:2] is the sub-list for extension extendee - 0, // [0:2] is the sub-list for field type_name + 0, // [0:0] is the sub-list for method output_type + 0, // [0:0] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name } func init() { file_checkpoint_proto_init() } @@ -194,18 +130,6 @@ func file_checkpoint_proto_init() { } if !protoimpl.UnsafeEnabled { file_checkpoint_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*StoredTx); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_checkpoint_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*StoredCheckpoint); i { case 0: return &v.state @@ -224,7 +148,7 @@ func file_checkpoint_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_checkpoint_proto_rawDesc, NumEnums: 0, - NumMessages: 2, + NumMessages: 1, NumExtensions: 0, NumServices: 0, }, diff --git a/proto/checkpoint.proto b/proto/checkpoint.proto index fc899be..97b8215 100644 --- a/proto/checkpoint.proto +++ b/proto/checkpoint.proto @@ -4,16 +4,9 @@ package proto; option go_package = "github.com/babylonlabs-io/vigilante/proto"; - -// StoredTx stores a BTC transaction and its ID -message StoredTx { - bytes tx_id = 1; // chainhash.Hash serialized as bytes - bytes tx = 2; // wire.MsgTx serialized as bytes -} - // StoredCheckpoint holds two transactions and an epoch number message StoredCheckpoint { - StoredTx tx1 = 1; - StoredTx tx2 = 2; + bytes tx1 = 1; // wire.MsgTx serialized as bytes + bytes tx2 = 2; uint64 epoch = 3; } \ No newline at end of file diff --git a/submitter/store/store.go b/submitter/store/store.go index d67930c..3195b47 100644 --- a/submitter/store/store.go +++ b/submitter/store/store.go @@ -2,9 +2,12 @@ package store import ( "errors" - "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/babylonlabs-io/vigilante/proto" + "github.com/babylonlabs-io/vigilante/utils" "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcwallet/walletdb" "github.com/lightningnetwork/lnd/kvdb" + pm "google.golang.org/protobuf/proto" ) type SubmitterStore struct { @@ -24,14 +27,9 @@ var ( ErrNotFound = errors.New("not found") ) -type StoredTx struct { - TxId *chainhash.Hash // we store this to check against the mempool - Tx *wire.MsgTx // we store this in case we need to resubmit -} - type StoredCheckpoint struct { - Tx1 StoredTx - Tx2 StoredTx + Tx1 *wire.MsgTx + Tx2 *wire.MsgTx Epoch uint64 } @@ -61,3 +59,113 @@ func (s *SubmitterStore) createBuckets() error { return nil } + +func (s *SubmitterStore) LatestCheckpoint() (*StoredCheckpoint, bool, error) { + data, f, err := s.get(lastSubmittedCkptKey, storedCheckpointBucketName) + if err != nil { + return nil, false, err + } else if !f { + return nil, false, nil + } + + protoCkpt := &proto.StoredCheckpoint{} + if err := pm.Unmarshal(data, protoCkpt); err != nil { + return nil, false, err + } + + var storedCkpt StoredCheckpoint + if err := storedCkpt.FromProto(protoCkpt); err != nil { + return nil, false, err + } + + return &storedCkpt, true, err +} + +func (s *SubmitterStore) PutCheckpoint(ckpt *StoredCheckpoint) error { + protoCkpt, err := ckpt.ToProto() + if err != nil { + return err + } + + data, err := pm.Marshal(protoCkpt) + if err != nil { + return err + } + + return s.put(lastSubmittedCkptKey, data, storedCheckpointBucketName) +} + +func (s *SubmitterStore) get(key, bucketName []byte) ([]byte, bool, error) { + var returnVal []byte + + err := s.db.View(func(tx walletdb.ReadTx) error { + b := tx.ReadBucket(bucketName) + if b == nil { + return ErrCorruptedDb + } + + byteVal := b.Get(key) + if byteVal == nil { + return ErrNotFound + } + + returnVal = byteVal + + return nil + }, func() {}) + + if err != nil { + if errors.Is(err, ErrNotFound) { + return nil, false, nil + } + return nil, false, err + } + + return returnVal, true, nil +} + +func (s *SubmitterStore) put(key []byte, val []byte, bucketName []byte) error { + return kvdb.Batch(s.db, func(tx kvdb.RwTx) error { + bucket := tx.ReadWriteBucket(bucketName) + if bucket == nil { + return ErrCorruptedDb + } + + return bucket.Put(key, val) + }) +} + +// ToProto converts StoredTx to its Protobuf equivalent +func (s *StoredCheckpoint) ToProto() (*proto.StoredCheckpoint, error) { + bufTx1, err := utils.SerializeMsgTx(s.Tx1) + if err != nil { + return nil, err + } + + bufTx2, err := utils.SerializeMsgTx(s.Tx1) + if err != nil { + return nil, err + } + + return &proto.StoredCheckpoint{ + Tx1: bufTx1, + Tx2: bufTx2, + Epoch: s.Epoch, + }, nil +} + +// FromProto converts a Protobuf StoredTx to the Go struct +func (s *StoredCheckpoint) FromProto(protoTx *proto.StoredCheckpoint) error { + var err error + s.Tx1, err = utils.DeserializeMsgTx(protoTx.Tx1) + if err != nil { + return err + } + + s.Tx2, err = utils.DeserializeMsgTx(protoTx.Tx2) + if err != nil { + return err + } + + return nil +} diff --git a/utils/serialize.go b/utils/serialize.go new file mode 100644 index 0000000..7a90724 --- /dev/null +++ b/utils/serialize.go @@ -0,0 +1,27 @@ +package utils + +import ( + "bytes" + "github.com/btcsuite/btcd/wire" +) + +// SerializeMsgTx serializes wire.MsgTx to bytes +func SerializeMsgTx(tx *wire.MsgTx) ([]byte, error) { + var txBuf bytes.Buffer + if err := tx.Serialize(&txBuf); err != nil { + return nil, err + } + + return txBuf.Bytes(), nil +} + +// DeserializeMsgTx deserializes bytes to wire.MsgTx +func DeserializeMsgTx(data []byte) (*wire.MsgTx, error) { + var tx wire.MsgTx + + if err := tx.Deserialize(bytes.NewReader(data)); err != nil { + return nil, err + } + + return &tx, nil +} From e45a4f9b1e7133f1a6b27b7c572d977916a74aad Mon Sep 17 00:00:00 2001 From: lazar Date: Thu, 12 Sep 2024 14:24:38 +0200 Subject: [PATCH 03/10] adds store test --- submitter/store/store.go | 18 +++++++++++++--- submitter/store/store_test.go | 40 +++++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 3 deletions(-) create mode 100644 submitter/store/store_test.go diff --git a/submitter/store/store.go b/submitter/store/store.go index 3195b47..9fe2702 100644 --- a/submitter/store/store.go +++ b/submitter/store/store.go @@ -33,6 +33,14 @@ type StoredCheckpoint struct { Epoch uint64 } +func NewStoredCheckpoint(tx1, tx2 *wire.MsgTx, epoch uint64) *StoredCheckpoint { + return &StoredCheckpoint{ + Tx1: tx1, + Tx2: tx2, + Epoch: epoch, + } +} + func NewSubmitterStore(backend kvdb.Backend) (*SubmitterStore, error) { store := &SubmitterStore{db: backend} if err := store.createBuckets(); err != nil { @@ -142,7 +150,7 @@ func (s *StoredCheckpoint) ToProto() (*proto.StoredCheckpoint, error) { return nil, err } - bufTx2, err := utils.SerializeMsgTx(s.Tx1) + bufTx2, err := utils.SerializeMsgTx(s.Tx2) if err != nil { return nil, err } @@ -157,15 +165,19 @@ func (s *StoredCheckpoint) ToProto() (*proto.StoredCheckpoint, error) { // FromProto converts a Protobuf StoredTx to the Go struct func (s *StoredCheckpoint) FromProto(protoTx *proto.StoredCheckpoint) error { var err error - s.Tx1, err = utils.DeserializeMsgTx(protoTx.Tx1) + tx1, err := utils.DeserializeMsgTx(protoTx.Tx1) if err != nil { return err } - s.Tx2, err = utils.DeserializeMsgTx(protoTx.Tx2) + tx2, err := utils.DeserializeMsgTx(protoTx.Tx2) if err != nil { return err } + s.Tx1 = tx1 + s.Tx2 = tx2 + s.Epoch = protoTx.Epoch + return nil } diff --git a/submitter/store/store_test.go b/submitter/store/store_test.go new file mode 100644 index 0000000..6af130a --- /dev/null +++ b/submitter/store/store_test.go @@ -0,0 +1,40 @@ +package store_test + +import ( + bbndatagen "github.com/babylonlabs-io/babylon/testutil/datagen" + "github.com/babylonlabs-io/vigilante/submitter/store" + "github.com/babylonlabs-io/vigilante/testutil" + "github.com/babylonlabs-io/vigilante/testutil/datagen" + "github.com/stretchr/testify/require" + "math/rand" + "testing" +) + +func FuzzStoringCkpt(f *testing.F) { + bbndatagen.AddRandomSeedsToFuzzer(f, 3) + + f.Fuzz(func(t *testing.T, seed int64) { + t.Parallel() + r := rand.New(rand.NewSource(seed)) + db := testutil.MakeTestBackend(t) + s, err := store.NewSubmitterStore(db) + require.NoError(t, err) + + _, exists, err := s.LatestCheckpoint() + require.NoError(t, err) + require.False(t, exists) + + epoch := uint64(r.Int63n(1000) + 1) + tx1 := datagen.GenRandomTx(r) + tx2 := datagen.GenRandomTx(r) + err = s.PutCheckpoint(store.NewStoredCheckpoint(tx1, tx2, epoch)) + require.NoError(t, err) + + storedCkpt, exists, err := s.LatestCheckpoint() + require.NoError(t, err) + require.True(t, exists) + require.Equal(t, storedCkpt.Epoch, epoch) + require.Equal(t, storedCkpt.Tx1.TxHash().String(), tx1.TxHash().String()) + require.Equal(t, storedCkpt.Tx2.TxHash().String(), tx2.TxHash().String()) + }) +} From cf23e565df014d4bd244700293cac38b88582dbe Mon Sep 17 00:00:00 2001 From: lazar Date: Thu, 12 Sep 2024 16:58:08 +0200 Subject: [PATCH 04/10] adds store to submitter --- btcclient/client_wallet.go | 4 ++ btcclient/interface.go | 1 + cmd/vigilante/cmd/submitter.go | 6 ++ config/submitter.go | 3 +- e2etest/monitor_e2e_test.go | 1 + e2etest/submitter_e2e_test.go | 2 + submitter/relayer/change_address_test.go | 3 +- submitter/relayer/relayer.go | 81 ++++++++++++++++++++++-- submitter/submitter.go | 3 + 9 files changed, 98 insertions(+), 6 deletions(-) diff --git a/btcclient/client_wallet.go b/btcclient/client_wallet.go index cc3a50a..d591bbf 100644 --- a/btcclient/client_wallet.go +++ b/btcclient/client_wallet.go @@ -121,3 +121,7 @@ func (c *Client) FundRawTransaction(tx *wire.MsgTx, opts btcjson.FundRawTransact func (c *Client) SignRawTransactionWithWallet(tx *wire.MsgTx) (*wire.MsgTx, bool, error) { return c.Client.SignRawTransactionWithWallet(tx) } + +func (c *Client) GetRawTransaction(txHash *chainhash.Hash) (*btcutil.Tx, error) { + return c.Client.GetRawTransaction(txHash) +} diff --git a/btcclient/interface.go b/btcclient/interface.go index 71b9bca..f9d89aa 100644 --- a/btcclient/interface.go +++ b/btcclient/interface.go @@ -38,4 +38,5 @@ type BTCWallet interface { GetHighUTXOAndSum() (*btcjson.ListUnspentResult, float64, error) FundRawTransaction(tx *wire.MsgTx, opts btcjson.FundRawTransactionOpts, isWitness *bool) (*btcjson.FundRawTransactionResult, error) SignRawTransactionWithWallet(tx *wire.MsgTx) (*wire.MsgTx, bool, error) + GetRawTransaction(txHash *chainhash.Hash) (*btcutil.Tx, error) } diff --git a/cmd/vigilante/cmd/submitter.go b/cmd/vigilante/cmd/submitter.go index bc97ae4..b90acd6 100644 --- a/cmd/vigilante/cmd/submitter.go +++ b/cmd/vigilante/cmd/submitter.go @@ -62,6 +62,11 @@ func GetSubmitterCmd() *cobra.Command { // register submitter metrics submitterMetrics := metrics.NewSubmitterMetrics() + dbBackend, err := cfg.Submitter.DatabaseConfig.GetDbBackend() + if err != nil { + panic(err) + } + // create submitter vigilantSubmitter, err := submitter.New( &cfg.Submitter, @@ -73,6 +78,7 @@ func GetSubmitterCmd() *cobra.Command { cfg.Common.MaxRetrySleepTime, cfg.Common.MaxRetryTimes, submitterMetrics, + dbBackend, ) if err != nil { panic(fmt.Errorf("failed to create vigilante submitter: %w", err)) diff --git a/config/submitter.go b/config/submitter.go index b507f1b..511deee 100644 --- a/config/submitter.go +++ b/config/submitter.go @@ -25,7 +25,8 @@ type SubmitterConfig struct { PollingIntervalSeconds uint `mapstructure:"polling-interval-seconds"` // ResendIntervalSeconds defines the time (in seconds) which the submitter awaits // before resubmitting checkpoints to BTC - ResendIntervalSeconds uint `mapstructure:"resend-interval-seconds"` + ResendIntervalSeconds uint `mapstructure:"resend-interval-seconds"` + DatabaseConfig *DBConfig `mapstructure:"database-config"` } func (cfg *SubmitterConfig) Validate() error { diff --git a/e2etest/monitor_e2e_test.go b/e2etest/monitor_e2e_test.go index 6f1c050..8debc3b 100644 --- a/e2etest/monitor_e2e_test.go +++ b/e2etest/monitor_e2e_test.go @@ -61,6 +61,7 @@ func TestMonitorBootstrap(t *testing.T) { tm.Config.Common.MaxRetrySleepTime, tm.Config.Common.MaxRetryTimes, metrics.NewSubmitterMetrics(), + testutil.MakeTestBackend(t), ) vigilantSubmitter.Start() diff --git a/e2etest/submitter_e2e_test.go b/e2etest/submitter_e2e_test.go index 7fcb4f6..4d9db1d 100644 --- a/e2etest/submitter_e2e_test.go +++ b/e2etest/submitter_e2e_test.go @@ -64,6 +64,7 @@ func TestSubmitterSubmission(t *testing.T) { tm.Config.Common.MaxRetrySleepTime, tm.Config.Common.MaxRetryTimes, metrics.NewSubmitterMetrics(), + testutil.MakeTestBackend(t), ) vigilantSubmitter.Start() @@ -138,6 +139,7 @@ func TestSubmitterSubmissionReplace(t *testing.T) { tm.Config.Common.MaxRetrySleepTime, tm.Config.Common.MaxRetryTimes, metrics.NewSubmitterMetrics(), + testutil.MakeTestBackend(t), ) vigilantSubmitter.Start() diff --git a/submitter/relayer/change_address_test.go b/submitter/relayer/change_address_test.go index 2029f5c..96022a4 100644 --- a/submitter/relayer/change_address_test.go +++ b/submitter/relayer/change_address_test.go @@ -1,6 +1,7 @@ package relayer_test import ( + "github.com/babylonlabs-io/vigilante/testutil" "testing" "github.com/btcsuite/btcd/btcjson" @@ -48,7 +49,7 @@ func TestGetChangeAddress(t *testing.T) { logger, err := config.NewRootLogger("auto", "debug") require.NoError(t, err) testRelayer := relayer.New(wallet, []byte("bbnt"), btctxformatter.CurrentVersion, submitterAddr, - submitterMetrics.RelayerMetrics, nil, &cfg, logger) + submitterMetrics.RelayerMetrics, nil, &cfg, logger, testutil.MakeTestBackend(t)) // 1. only SegWit Bech32 addresses segWitBech32Addrs := append(SegWitBech32p2wshAddrsStr, SegWitBech32p2wpkhAddrsStr...) diff --git a/submitter/relayer/relayer.go b/submitter/relayer/relayer.go index 4d16793..3567ef2 100644 --- a/submitter/relayer/relayer.go +++ b/submitter/relayer/relayer.go @@ -5,7 +5,9 @@ import ( "encoding/hex" "errors" "fmt" + "github.com/babylonlabs-io/vigilante/submitter/store" "github.com/btcsuite/btcd/btcjson" + "github.com/lightningnetwork/lnd/kvdb" "math" "strconv" "time" @@ -34,6 +36,7 @@ const ( type Relayer struct { chainfee.Estimator btcclient.BTCWallet + store *store.SubmitterStore lastSubmittedCheckpoint *types.CheckpointInfo tag btctxformatter.BabylonTag version btctxformatter.FormatVersion @@ -52,11 +55,18 @@ func New( est chainfee.Estimator, config *config.SubmitterConfig, parentLogger *zap.Logger, + db kvdb.Backend, ) *Relayer { + subStore, err := store.NewSubmitterStore(db) + if err != nil { + panic(fmt.Errorf("error setting up store: %w", err)) + } + metrics.ResendIntervalSecondsGauge.Set(float64(config.ResendIntervalSeconds)) return &Relayer{ Estimator: est, BTCWallet: wallet, + store: subStore, tag: tag, version: version, submitterAddress: submitterAddress, @@ -81,25 +91,40 @@ func (rl *Relayer) SendCheckpointToBTC(ckpt *ckpttypes.RawCheckpointWithMetaResp return nil } + fnStoreCkpt := func(tx1, tx2 *wire.MsgTx, epochNum uint64) error { + storedCkpt := store.NewStoredCheckpoint(tx1, tx2, epochNum) + return rl.store.PutCheckpoint(storedCkpt) + } + if rl.shouldSendCompleteCkpt(ckptEpoch) { rl.logger.Infof("Submitting a raw checkpoint for epoch %v", ckptEpoch) - submittedCheckpoint, err := rl.convertCkptToTwoTxAndSubmit(ckpt.Ckpt) + submittedCkpt, err := rl.convertCkptToTwoTxAndSubmit(ckpt.Ckpt) if err != nil { return err } - rl.lastSubmittedCheckpoint = submittedCheckpoint + rl.lastSubmittedCheckpoint = submittedCkpt + + err = fnStoreCkpt(submittedCkpt.Tx1.Tx, submittedCkpt.Tx2.Tx, submittedCkpt.Epoch) + if err != nil { + return err + } return nil } else if rl.shouldSendTx2(ckptEpoch) { rl.logger.Infof("Retrying to send tx2 for epoch %v, tx1 %s", ckptEpoch, rl.lastSubmittedCheckpoint.Tx1.TxId) - submittedCheckpoint, err := rl.retrySendTx2(ckpt.Ckpt) + submittedCkpt, err := rl.retrySendTx2(ckpt.Ckpt) if err != nil { return err } - rl.lastSubmittedCheckpoint = submittedCheckpoint + rl.lastSubmittedCheckpoint = submittedCkpt + + err = fnStoreCkpt(submittedCkpt.Tx1.Tx, submittedCkpt.Tx2.Tx, submittedCkpt.Epoch) + if err != nil { + return err + } return nil } @@ -150,6 +175,15 @@ func (rl *Relayer) SendCheckpointToBTC(ckpt *ckpttypes.RawCheckpointWithMetaResp // update the second tx of the last submitted checkpoint as it is replaced rl.lastSubmittedCheckpoint.Tx2 = resubmittedTx2 + + err = fnStoreCkpt( + rl.lastSubmittedCheckpoint.Tx1.Tx, + rl.lastSubmittedCheckpoint.Tx2.Tx, + rl.lastSubmittedCheckpoint.Epoch, + ) + if err != nil { + return err + } } return nil @@ -542,3 +576,42 @@ func (rl *Relayer) sendTxToBTC(tx *wire.MsgTx) (*chainhash.Hash, error) { return ha, nil } + +// checkResendFromStore - checks if we need to resubmit txns from a store +// in case "submitter" service has restarted we want to ensure that we don't send txns again for a checkpoint +// that has already been processeds +func (rl *Relayer) checkResendFromStore(epoch uint64) error { + storedCkpt, exists, err := rl.store.LatestCheckpoint() + if err != nil { + return err + } else if !exists { + return nil + } + + if storedCkpt.Epoch != epoch { + return nil + } + + maybeResend := func(tx *wire.MsgTx) error { + txID := tx.TxHash() + _, err = rl.GetRawTransaction(&txID) // todo(lazar): check for specific not found err + if err != nil { + _, err := rl.sendTxToBTC(tx) + if err != nil { + return err + } + } + + return nil + } + + if err := maybeResend(storedCkpt.Tx1); err != nil { + return err + } + + if err := maybeResend(storedCkpt.Tx2); err != nil { + return err + } + + return nil +} diff --git a/submitter/submitter.go b/submitter/submitter.go index 82f467f..c031e9a 100644 --- a/submitter/submitter.go +++ b/submitter/submitter.go @@ -4,6 +4,7 @@ import ( "encoding/hex" "fmt" "github.com/babylonlabs-io/vigilante/retrywrap" + "github.com/lightningnetwork/lnd/kvdb" "sync" "time" @@ -43,6 +44,7 @@ func New( submitterAddr sdk.AccAddress, retrySleepTime, maxRetrySleepTime time.Duration, maxRetryTimes uint, submitterMetrics *metrics.SubmitterMetrics, + db kvdb.Backend, ) (*Submitter, error) { logger := parentLogger.With(zap.String("module", "submitter")) var ( @@ -85,6 +87,7 @@ func New( est, cfg, logger, + db, ) return &Submitter{ From 2beb0ea717f0e8e31da4a3b5c9571ee5ba7ab16f Mon Sep 17 00:00:00 2001 From: lazar Date: Thu, 12 Sep 2024 16:59:08 +0200 Subject: [PATCH 05/10] regen mocks --- testutil/mocks/btcclient.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/testutil/mocks/btcclient.go b/testutil/mocks/btcclient.go index 76581b6..240994f 100644 --- a/testutil/mocks/btcclient.go +++ b/testutil/mocks/btcclient.go @@ -284,6 +284,21 @@ func (mr *MockBTCWalletMockRecorder) GetRawChangeAddress(account interface{}) *g return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRawChangeAddress", reflect.TypeOf((*MockBTCWallet)(nil).GetRawChangeAddress), account) } +// GetRawTransaction mocks base method. +func (m *MockBTCWallet) GetRawTransaction(txHash *chainhash.Hash) (*btcutil.Tx, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetRawTransaction", txHash) + ret0, _ := ret[0].(*btcutil.Tx) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetRawTransaction indicates an expected call of GetRawTransaction. +func (mr *MockBTCWalletMockRecorder) GetRawTransaction(txHash interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRawTransaction", reflect.TypeOf((*MockBTCWallet)(nil).GetRawTransaction), txHash) +} + // GetWalletLockTime mocks base method. func (m *MockBTCWallet) GetWalletLockTime() int64 { m.ctrl.T.Helper() From e7ddb7f4bf5c03a274fb9571a00a48b44bfdb7aa Mon Sep 17 00:00:00 2001 From: lazar Date: Thu, 12 Sep 2024 17:44:38 +0200 Subject: [PATCH 06/10] cleanup --- submitter/relayer/relayer.go | 58 ++++++++++++++++++++++++++---------- 1 file changed, 43 insertions(+), 15 deletions(-) diff --git a/submitter/relayer/relayer.go b/submitter/relayer/relayer.go index 3567ef2..3c65d01 100644 --- a/submitter/relayer/relayer.go +++ b/submitter/relayer/relayer.go @@ -97,6 +97,16 @@ func (rl *Relayer) SendCheckpointToBTC(ckpt *ckpttypes.RawCheckpointWithMetaResp } if rl.shouldSendCompleteCkpt(ckptEpoch) { + inMempool, err := rl.maybeResendFromStore(ckptEpoch) + if err != nil { + return err + } + + // we've already processed this + if inMempool { + return nil + } + rl.logger.Infof("Submitting a raw checkpoint for epoch %v", ckptEpoch) submittedCkpt, err := rl.convertCkptToTwoTxAndSubmit(ckpt.Ckpt) @@ -113,6 +123,16 @@ func (rl *Relayer) SendCheckpointToBTC(ckpt *ckpttypes.RawCheckpointWithMetaResp return nil } else if rl.shouldSendTx2(ckptEpoch) { + inMempool, err := rl.maybeResendFromStore(ckptEpoch) + if err != nil { + return err + } + + // we've already processed this + if inMempool { + return nil + } + rl.logger.Infof("Retrying to send tx2 for epoch %v, tx1 %s", ckptEpoch, rl.lastSubmittedCheckpoint.Tx1.TxId) submittedCkpt, err := rl.retrySendTx2(ckpt.Ckpt) if err != nil { @@ -577,41 +597,49 @@ func (rl *Relayer) sendTxToBTC(tx *wire.MsgTx) (*chainhash.Hash, error) { return ha, nil } -// checkResendFromStore - checks if we need to resubmit txns from a store -// in case "submitter" service has restarted we want to ensure that we don't send txns again for a checkpoint -// that has already been processeds -func (rl *Relayer) checkResendFromStore(epoch uint64) error { +// maybeResendFromStore - checks if we need to resubmit txns from a store +// in case "submitter" service was restarted, we want to ensure that we don't send txns again for a checkpoint +// that has already been processed. +// Returns true if the first transactions are in the mempool (no resubmission needed), +// and false if any transaction was re-sent from the store. +func (rl *Relayer) maybeResendFromStore(epoch uint64) (bool, error) { storedCkpt, exists, err := rl.store.LatestCheckpoint() if err != nil { - return err + return false, err } else if !exists { - return nil + return false, nil } if storedCkpt.Epoch != epoch { - return nil + return false, nil } - maybeResend := func(tx *wire.MsgTx) error { + maybeResend := func(tx *wire.MsgTx) (bool, error) { txID := tx.TxHash() _, err = rl.GetRawTransaction(&txID) // todo(lazar): check for specific not found err if err != nil { _, err := rl.sendTxToBTC(tx) if err != nil { - return err + return false, err } + + // we know about this tx, but we needed to resend it from already constructed tx from db + return true, nil } - return nil + // tx exists in mempool and is known to us + return true, nil } - if err := maybeResend(storedCkpt.Tx1); err != nil { - return err + inMempoolTx1, err := maybeResend(storedCkpt.Tx1) + if err != nil { + return false, err } - if err := maybeResend(storedCkpt.Tx2); err != nil { - return err + _, err = maybeResend(storedCkpt.Tx2) + if err != nil { + return false, err } - return nil + return inMempoolTx1, nil } From bd97a57ea136fdaefddf52153ffe760995b046bc Mon Sep 17 00:00:00 2001 From: lazar Date: Fri, 13 Sep 2024 11:18:43 +0200 Subject: [PATCH 07/10] fun fun function --- e2etest/submitter_e2e_test.go | 1 + submitter/relayer/relayer.go | 53 ++++++++++++++++++++++------------- 2 files changed, 35 insertions(+), 19 deletions(-) diff --git a/e2etest/submitter_e2e_test.go b/e2etest/submitter_e2e_test.go index 4d9db1d..1806329 100644 --- a/e2etest/submitter_e2e_test.go +++ b/e2etest/submitter_e2e_test.go @@ -4,6 +4,7 @@ package e2etest import ( + "github.com/babylonlabs-io/vigilante/testutil" "math/rand" "testing" "time" diff --git a/submitter/relayer/relayer.go b/submitter/relayer/relayer.go index 3c65d01..336354f 100644 --- a/submitter/relayer/relayer.go +++ b/submitter/relayer/relayer.go @@ -33,6 +33,10 @@ const ( dustThreshold btcutil.Amount = 546 ) +type GetLatestCheckpointFunc func() (*store.StoredCheckpoint, bool, error) +type GetRawTransactionFunc func(txHash *chainhash.Hash) (*btcutil.Tx, error) +type SendTransactionFunc func(tx *wire.MsgTx) (*chainhash.Hash, error) + type Relayer struct { chainfee.Estimator btcclient.BTCWallet @@ -97,13 +101,17 @@ func (rl *Relayer) SendCheckpointToBTC(ckpt *ckpttypes.RawCheckpointWithMetaResp } if rl.shouldSendCompleteCkpt(ckptEpoch) { - inMempool, err := rl.maybeResendFromStore(ckptEpoch) + hasBeenProcessed, err := maybeResendFromStore( + ckptEpoch, + rl.store.LatestCheckpoint, + rl.GetRawTransaction, + rl.sendTxToBTC, + ) if err != nil { return err } - // we've already processed this - if inMempool { + if hasBeenProcessed { return nil } @@ -123,13 +131,17 @@ func (rl *Relayer) SendCheckpointToBTC(ckpt *ckpttypes.RawCheckpointWithMetaResp return nil } else if rl.shouldSendTx2(ckptEpoch) { - inMempool, err := rl.maybeResendFromStore(ckptEpoch) + hasBeenProcessed, err := maybeResendFromStore( + ckptEpoch, + rl.store.LatestCheckpoint, + rl.GetRawTransaction, + rl.sendTxToBTC, + ) if err != nil { return err } - // we've already processed this - if inMempool { + if hasBeenProcessed { return nil } @@ -602,8 +614,13 @@ func (rl *Relayer) sendTxToBTC(tx *wire.MsgTx) (*chainhash.Hash, error) { // that has already been processed. // Returns true if the first transactions are in the mempool (no resubmission needed), // and false if any transaction was re-sent from the store. -func (rl *Relayer) maybeResendFromStore(epoch uint64) (bool, error) { - storedCkpt, exists, err := rl.store.LatestCheckpoint() +func maybeResendFromStore( + epoch uint64, + getLatestStoreCheckpoint GetLatestCheckpointFunc, + getRawTransaction GetRawTransactionFunc, + sendTransaction SendTransactionFunc, +) (bool, error) { + storedCkpt, exists, err := getLatestStoreCheckpoint() if err != nil { return false, err } else if !exists { @@ -614,32 +631,30 @@ func (rl *Relayer) maybeResendFromStore(epoch uint64) (bool, error) { return false, nil } - maybeResend := func(tx *wire.MsgTx) (bool, error) { + maybeResend := func(tx *wire.MsgTx) error { txID := tx.TxHash() - _, err = rl.GetRawTransaction(&txID) // todo(lazar): check for specific not found err + _, err = getRawTransaction(&txID) // todo(lazar): check for specific not found err if err != nil { - _, err := rl.sendTxToBTC(tx) + _, err := sendTransaction(tx) if err != nil { - return false, err + return err } // we know about this tx, but we needed to resend it from already constructed tx from db - return true, nil + return nil } // tx exists in mempool and is known to us - return true, nil + return nil } - inMempoolTx1, err := maybeResend(storedCkpt.Tx1) - if err != nil { + if err := maybeResend(storedCkpt.Tx1); err != nil { return false, err } - _, err = maybeResend(storedCkpt.Tx2) - if err != nil { + if err := maybeResend(storedCkpt.Tx2); err != nil { return false, err } - return inMempoolTx1, nil + return true, nil } From 5ff99c8686040d9d2064da879ebfdac7601b362b Mon Sep 17 00:00:00 2001 From: lazar Date: Fri, 13 Sep 2024 13:23:26 +0200 Subject: [PATCH 08/10] adds unit test --- submitter/relayer/relayer_test.go | 116 ++++++++++++++++++++++++++++++ 1 file changed, 116 insertions(+) create mode 100644 submitter/relayer/relayer_test.go diff --git a/submitter/relayer/relayer_test.go b/submitter/relayer/relayer_test.go new file mode 100644 index 0000000..75cf016 --- /dev/null +++ b/submitter/relayer/relayer_test.go @@ -0,0 +1,116 @@ +package relayer + +import ( + "errors" + "github.com/babylonlabs-io/vigilante/submitter/store" + "github.com/btcsuite/btcd/btcutil" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" + "github.com/stretchr/testify/assert" + "testing" +) + +func Test_maybeResendFromStore(t *testing.T) { + t.Parallel() + tests := []struct { + name string + epoch uint64 + getLatestCheckpoint GetLatestCheckpointFunc + getRawTransaction GetRawTransactionFunc + sendTransaction SendTransactionFunc + expectedResult bool + expectedError error + }{ + { + name: "Checkpoint not found", + epoch: 123, + getLatestCheckpoint: func() (*store.StoredCheckpoint, bool, error) { + return nil, false, nil + }, + getRawTransaction: func(txHash *chainhash.Hash) (*btcutil.Tx, error) { + return nil, nil + }, + sendTransaction: func(tx *wire.MsgTx) (*chainhash.Hash, error) { + return nil, nil + }, + expectedResult: false, + expectedError: nil, + }, + { + name: "Error retrieving checkpoint", + epoch: 123, + getLatestCheckpoint: func() (*store.StoredCheckpoint, bool, error) { + return nil, false, errors.New("checkpoint error") + }, + getRawTransaction: func(txHash *chainhash.Hash) (*btcutil.Tx, error) { + return nil, nil + }, + sendTransaction: func(tx *wire.MsgTx) (*chainhash.Hash, error) { + return nil, nil + }, + expectedResult: false, + expectedError: errors.New("checkpoint error"), + }, + { + name: "Epoch mismatch", + epoch: 123, + getLatestCheckpoint: func() (*store.StoredCheckpoint, bool, error) { + return &store.StoredCheckpoint{Epoch: 456, Tx1: &wire.MsgTx{}, Tx2: &wire.MsgTx{}}, true, nil + }, + getRawTransaction: func(txHash *chainhash.Hash) (*btcutil.Tx, error) { + return nil, nil + }, + sendTransaction: func(tx *wire.MsgTx) (*chainhash.Hash, error) { + return nil, nil + }, + expectedResult: false, + expectedError: nil, + }, + { + name: "Successful resends", + epoch: 123, + getLatestCheckpoint: func() (*store.StoredCheckpoint, bool, error) { + return &store.StoredCheckpoint{Epoch: 123, Tx1: &wire.MsgTx{}, Tx2: &wire.MsgTx{}}, true, nil + }, + getRawTransaction: func(txHash *chainhash.Hash) (*btcutil.Tx, error) { + return nil, errors.New("transaction not found") // Simulate transaction not found + }, + sendTransaction: func(tx *wire.MsgTx) (*chainhash.Hash, error) { + return &chainhash.Hash{}, nil // Simulate successful send + }, + expectedResult: true, + expectedError: nil, + }, + { + name: "Error resending transaction", + epoch: 123, + getLatestCheckpoint: func() (*store.StoredCheckpoint, bool, error) { + return &store.StoredCheckpoint{Epoch: 123, Tx1: &wire.MsgTx{}, Tx2: &wire.MsgTx{}}, true, nil + }, + getRawTransaction: func(txHash *chainhash.Hash) (*btcutil.Tx, error) { + return nil, errors.New("transaction not found") + }, + sendTransaction: func(tx *wire.MsgTx) (*chainhash.Hash, error) { + return nil, errors.New("send error") + }, + expectedResult: false, + expectedError: errors.New("send error"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + result, err := maybeResendFromStore(tt.epoch, tt.getLatestCheckpoint, tt.getRawTransaction, tt.sendTransaction) + + if tt.expectedError != nil { + assert.Error(t, err) + assert.Equal(t, tt.expectedError.Error(), err.Error()) + } else { + assert.NoError(t, err) + } + + assert.Equal(t, tt.expectedResult, result) + }) + } +} From c7b5e6c593c836118fb6f2c7cb46abf2152d58f3 Mon Sep 17 00:00:00 2001 From: lazar Date: Fri, 13 Sep 2024 13:29:44 +0200 Subject: [PATCH 09/10] cleanup --- submitter/relayer/relayer.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/submitter/relayer/relayer.go b/submitter/relayer/relayer.go index 336354f..45f641f 100644 --- a/submitter/relayer/relayer.go +++ b/submitter/relayer/relayer.go @@ -95,7 +95,7 @@ func (rl *Relayer) SendCheckpointToBTC(ckpt *ckpttypes.RawCheckpointWithMetaResp return nil } - fnStoreCkpt := func(tx1, tx2 *wire.MsgTx, epochNum uint64) error { + storeCkptFunc := func(tx1, tx2 *wire.MsgTx, epochNum uint64) error { storedCkpt := store.NewStoredCheckpoint(tx1, tx2, epochNum) return rl.store.PutCheckpoint(storedCkpt) } @@ -124,7 +124,7 @@ func (rl *Relayer) SendCheckpointToBTC(ckpt *ckpttypes.RawCheckpointWithMetaResp rl.lastSubmittedCheckpoint = submittedCkpt - err = fnStoreCkpt(submittedCkpt.Tx1.Tx, submittedCkpt.Tx2.Tx, submittedCkpt.Epoch) + err = storeCkptFunc(submittedCkpt.Tx1.Tx, submittedCkpt.Tx2.Tx, submittedCkpt.Epoch) if err != nil { return err } @@ -153,7 +153,7 @@ func (rl *Relayer) SendCheckpointToBTC(ckpt *ckpttypes.RawCheckpointWithMetaResp rl.lastSubmittedCheckpoint = submittedCkpt - err = fnStoreCkpt(submittedCkpt.Tx1.Tx, submittedCkpt.Tx2.Tx, submittedCkpt.Epoch) + err = storeCkptFunc(submittedCkpt.Tx1.Tx, submittedCkpt.Tx2.Tx, submittedCkpt.Epoch) if err != nil { return err } @@ -208,7 +208,7 @@ func (rl *Relayer) SendCheckpointToBTC(ckpt *ckpttypes.RawCheckpointWithMetaResp // update the second tx of the last submitted checkpoint as it is replaced rl.lastSubmittedCheckpoint.Tx2 = resubmittedTx2 - err = fnStoreCkpt( + err = storeCkptFunc( rl.lastSubmittedCheckpoint.Tx1.Tx, rl.lastSubmittedCheckpoint.Tx2.Tx, rl.lastSubmittedCheckpoint.Epoch, @@ -631,7 +631,7 @@ func maybeResendFromStore( return false, nil } - maybeResend := func(tx *wire.MsgTx) error { + maybeResendFunc := func(tx *wire.MsgTx) error { txID := tx.TxHash() _, err = getRawTransaction(&txID) // todo(lazar): check for specific not found err if err != nil { @@ -648,11 +648,11 @@ func maybeResendFromStore( return nil } - if err := maybeResend(storedCkpt.Tx1); err != nil { + if err := maybeResendFunc(storedCkpt.Tx1); err != nil { return false, err } - if err := maybeResend(storedCkpt.Tx2); err != nil { + if err := maybeResendFunc(storedCkpt.Tx2); err != nil { return false, err } From 5a742874b0abea2ef235f8342ee5738b728a315a Mon Sep 17 00:00:00 2001 From: lazar Date: Mon, 16 Sep 2024 09:59:28 +0200 Subject: [PATCH 10/10] pr comments --- submitter/relayer/relayer.go | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/submitter/relayer/relayer.go b/submitter/relayer/relayer.go index 45f641f..4f3977b 100644 --- a/submitter/relayer/relayer.go +++ b/submitter/relayer/relayer.go @@ -100,7 +100,7 @@ func (rl *Relayer) SendCheckpointToBTC(ckpt *ckpttypes.RawCheckpointWithMetaResp return rl.store.PutCheckpoint(storedCkpt) } - if rl.shouldSendCompleteCkpt(ckptEpoch) { + if rl.shouldSendCompleteCkpt(ckptEpoch) || rl.shouldSendTx2(ckptEpoch) { hasBeenProcessed, err := maybeResendFromStore( ckptEpoch, rl.store.LatestCheckpoint, @@ -114,7 +114,9 @@ func (rl *Relayer) SendCheckpointToBTC(ckpt *ckpttypes.RawCheckpointWithMetaResp if hasBeenProcessed { return nil } + } + if rl.shouldSendCompleteCkpt(ckptEpoch) { rl.logger.Infof("Submitting a raw checkpoint for epoch %v", ckptEpoch) submittedCkpt, err := rl.convertCkptToTwoTxAndSubmit(ckpt.Ckpt) @@ -131,20 +133,6 @@ func (rl *Relayer) SendCheckpointToBTC(ckpt *ckpttypes.RawCheckpointWithMetaResp return nil } else if rl.shouldSendTx2(ckptEpoch) { - hasBeenProcessed, err := maybeResendFromStore( - ckptEpoch, - rl.store.LatestCheckpoint, - rl.GetRawTransaction, - rl.sendTxToBTC, - ) - if err != nil { - return err - } - - if hasBeenProcessed { - return nil - } - rl.logger.Infof("Retrying to send tx2 for epoch %v, tx1 %s", ckptEpoch, rl.lastSubmittedCheckpoint.Tx1.TxId) submittedCkpt, err := rl.retrySendTx2(ckpt.Ckpt) if err != nil {