From 4e8d09483647c96ec32f99b58957e5c761bceedf Mon Sep 17 00:00:00 2001 From: KonradStaniec Date: Wed, 16 Oct 2024 12:49:28 +0200 Subject: [PATCH] Add used inputs tracking --- staker/stakerapp.go | 19 ++- stakerdb/trackedtranactionstore.go | 166 +++++++++++++++++++++-- stakerdb/trackedtransactionstore_test.go | 77 ++++++++++- 3 files changed, 241 insertions(+), 21 deletions(-) diff --git a/staker/stakerapp.go b/staker/stakerapp.go index d232079..ba729e0 100644 --- a/staker/stakerapp.go +++ b/staker/stakerapp.go @@ -1349,7 +1349,7 @@ func (app *StakerApp) handlePostApprovalCmd(cmd *stakingRequestCmd) error { return err } - if err := app.txTracker.AddTransaction( + if err := app.txTracker.AddTransactionSentToBTC( cmd.stakingTx, cmd.stakingOutputIdx, cmd.stakingTime, @@ -1508,14 +1508,25 @@ func (app *StakerApp) handleStakingEvents() { if err := app.txTracker.SetTxUnbondingSignaturesReceived( &ev.stakingTxHash, - ev.delegationActive, babylonCovSigsToDbSigSigs(ev.covenantUnbondingSignatures), ); err != nil { // TODO: handle this error somehow, it means we possilbly make invalid state transition app.logger.Fatalf("Error setting state for tx %s: %s", &ev.stakingTxHash, err) } - if !ev.delegationActive { + if ev.delegationActive { + app.wg.Add(1) + go func(hash chainhash.Hash) { + defer app.wg.Done() + utils.PushOrQuit[*delegationActiveOnBabylonEvent]( + app.delegationActiveOnBabylonEvChan, + &delegationActiveOnBabylonEvent{ + stakingTxHash: hash, + }, + app.quit, + ) + }(ev.stakingTxHash) + } else { storedTx, _ := app.mustGetTransactionAndStakerAddress(&ev.stakingTxHash) // if the delegation is not active here, it can only mean that statking // is going through pre-approvel flow. Fire up task to send staking tx @@ -1528,7 +1539,6 @@ func (app *StakerApp) handleStakingEvents() { ) } - app.m.DelegationsActivatedOnBabylon.Inc() app.logStakingEventProcessed(ev) case ev := <-app.unbondingTxConfirmedOnBtcEvChan: @@ -1560,6 +1570,7 @@ func (app *StakerApp) handleStakingEvents() { // which is seems like programming error. Maybe panic? app.logger.Fatalf("Error setting state for tx %s: %s", ev.stakingTxHash, err) } + app.m.DelegationsActivatedOnBabylon.Inc() app.logStakingEventProcessed(ev) case ev := <-app.criticalErrorEvChan: diff --git a/stakerdb/trackedtranactionstore.go b/stakerdb/trackedtranactionstore.go index 5393a05..9524a16 100644 --- a/stakerdb/trackedtranactionstore.go +++ b/stakerdb/trackedtranactionstore.go @@ -31,6 +31,11 @@ var ( // It holds additional data for staking transaction in watch only mode watchedTxDataBucketName = []byte("watched") + // mapping outpoint -> txHash + // It holds mapping from outpoint to transaction hash + // outpoint: outpoint.txHash || bigendian(outpoint.index) + inputsDataBucketName = []byte("inputs") + // key for next transaction numTxKey = []byte("ntk") ) @@ -245,6 +250,12 @@ func (c *TrackedTransactionStore) initBuckets() error { return err } + _, err = tx.CreateTopLevelBucket(inputsDataBucketName) + + if err != nil { + return err + } + return nil }) } @@ -474,6 +485,7 @@ func saveTrackedTransaction( txHashBytes []byte, tx *proto.TrackedTransaction, watchedTxData *proto.WatchedTxData, + id *inputData, ) error { if tx == nil { return fmt.Errorf("cannot save nil tracked transaciton") @@ -521,6 +533,22 @@ func saveTrackedTransaction( } } + if id != nil { + inputDataBucket := rwTx.ReadWriteBucket(inputsDataBucketName) + if inputDataBucket == nil { + return ErrCorruptedTransactionsDb + } + + for _, input := range id.inputs { + // save all the inputs to the transaction + err = inputDataBucket.Put(input, txHashBytes) + + if err != nil { + return err + } + } + } + // increment counter for the next transaction return txIdxBucket.Put(numTxKey, uint64KeyToBytes(nextTxKey+1)) } @@ -529,6 +557,7 @@ func (c *TrackedTransactionStore) addTransactionInternal( txHashBytes []byte, tt *proto.TrackedTransaction, wd *proto.WatchedTxData, + id *inputData, ) error { return kvdb.Batch(c.db, func(tx kvdb.RwTx) error { transactionsBucketIdxBucket := tx.ReadWriteBucket(transactionIndexName) @@ -548,7 +577,7 @@ func (c *TrackedTransactionStore) addTransactionInternal( return ErrCorruptedTransactionsDb } - return saveTrackedTransaction(tx, transactionsBucketIdxBucket, transactionsBucket, txHashBytes, tt, wd) + return saveTrackedTransaction(tx, transactionsBucketIdxBucket, transactionsBucket, txHashBytes, tt, wd, id) }) } @@ -599,6 +628,58 @@ func CreateTrackedTransaction( return protoTxToStoredTransaction(&msg) } +type inputData struct { + inputs [][]byte + txHash []byte +} + +func outpointBytes(op *wire.OutPoint) ([]byte, error) { + var buf bytes.Buffer + _, err := buf.Write(op.Hash.CloneBytes()) + if err != nil { + return nil, err + } + + binary.Write(&buf, binary.BigEndian, op.Index) + + return buf.Bytes(), nil +} + +func outpointFromBytes(b []byte) (*wire.OutPoint, error) { + if len(b) != 36 { + return nil, fmt.Errorf("invalid outpoint bytes length") + } + + hash, err := chainhash.NewHash(b[:32]) + if err != nil { + return nil, err + } + + return &wire.OutPoint{ + Hash: *hash, + Index: binary.BigEndian.Uint32(b[32:]), + }, nil + +} + +func getInputData(tx *wire.MsgTx) (*inputData, error) { + var inputs [][]byte + + for _, in := range tx.TxIn { + opBytes, err := outpointBytes(&in.PreviousOutPoint) + if err != nil { + return nil, err + } + inputs = append(inputs, opBytes) + } + txHash := tx.TxHash() + + return &inputData{ + inputs: inputs, + txHash: txHash.CloneBytes(), + }, nil +} + func (c *TrackedTransactionStore) AddTransactionSentToBabylon( btcTx *wire.MsgTx, stakingOutputIndex uint32, @@ -653,12 +734,18 @@ func (c *TrackedTransactionStore) AddTransactionSentToBabylon( UnbondingTxData: update, } + inputData, err := getInputData(btcTx) + + if err != nil { + return fmt.Errorf("failed to get input data: %w", err) + } + return c.addTransactionInternal( - txHashBytes[:], &msg, nil, + txHashBytes[:], &msg, nil, inputData, ) } -func (c *TrackedTransactionStore) AddTransaction( +func (c *TrackedTransactionStore) AddTransactionSentToBTC( btcTx *wire.MsgTx, stakingOutputIndex uint32, stakingTime uint16, @@ -705,7 +792,7 @@ func (c *TrackedTransactionStore) AddTransaction( } return c.addTransactionInternal( - txHashBytes[:], &msg, nil, + txHashBytes[:], &msg, nil, nil, ) } @@ -791,7 +878,7 @@ func (c *TrackedTransactionStore) AddWatchedTransaction( } return c.addTransactionInternal( - txHashBytes, &msg, &watchedData, + txHashBytes, &msg, &watchedData, nil, ) } @@ -841,6 +928,39 @@ func (c *TrackedTransactionStore) setTxState( return err } + // delegation has been activaten remove used inputs if any exists + // TODO(konrad): This is not pretty architecture wise and a bit broken in scenario + // that delegation is never activated + if storedTx.State == proto.TransactionState_DELEGATION_ACTIVE { + inputDataBucket := tx.ReadWriteBucket(inputsDataBucketName) + if inputDataBucket == nil { + return ErrCorruptedTransactionsDb + } + + var stakingTx wire.MsgTx + err := stakingTx.Deserialize(bytes.NewReader(storedTx.StakingTransaction)) + + if err != nil { + return err + } + + for _, input := range stakingTx.TxIn { + input, err := outpointBytes(&input.PreviousOutPoint) + + if err != nil { + return err + } + + // if key does not exist, this operation is no-op + err = inputDataBucket.Delete(input) + + if err != nil { + return err + } + } + + } + return nil }) } @@ -915,7 +1035,6 @@ func (c *TrackedTransactionStore) SetDelegationActiveOnBabylon(txHash *chainhash func (c *TrackedTransactionStore) SetTxUnbondingSignaturesReceived( txHash *chainhash.Hash, - delegationActive bool, covenantSignatures []PubKeySigPair, ) error { setUnbondingSignaturesReceived := func(tx *proto.TrackedTransaction) error { @@ -926,12 +1045,7 @@ func (c *TrackedTransactionStore) SetTxUnbondingSignaturesReceived( if len(tx.UnbondingTxData.CovenantSignatures) > 0 { return fmt.Errorf("cannot set unbonding signatures received, because unbonding signatures already exist: %w", ErrInvalidUnbondingDataUpdate) } - - if delegationActive { - tx.State = proto.TransactionState_DELEGATION_ACTIVE - } else { - tx.State = proto.TransactionState_VERIFIED - } + tx.State = proto.TransactionState_VERIFIED tx.UnbondingTxData.CovenantSignatures = covenantSigsToProto(covenantSignatures) return nil } @@ -1200,3 +1314,31 @@ func (c *TrackedTransactionStore) ScanTrackedTransactions(scanFunc StoredTransac }) }, reset) } + +func (c *TrackedTransactionStore) OutpointUsed(op *wire.OutPoint) (bool, error) { + var used bool = false + + err := c.db.View(func(tx kvdb.RTx) error { + inputsBucket := tx.ReadBucket(inputsDataBucketName) + + if inputsBucket == nil { + return ErrCorruptedTransactionsDb + } + + opBytes, err := outpointBytes(op) + + if err != nil { + return fmt.Errorf("invalid outpoint provided: %w", err) + } + + res := inputsBucket.Get(opBytes) + + if res != nil { + used = true + } + + return nil + }, func() {}) + + return used, err +} diff --git a/stakerdb/trackedtransactionstore_test.go b/stakerdb/trackedtransactionstore_test.go index a22544d..2d2be75 100644 --- a/stakerdb/trackedtransactionstore_test.go +++ b/stakerdb/trackedtransactionstore_test.go @@ -123,7 +123,7 @@ func FuzzStoringTxs(f *testing.F) { stakerAddr, err := btcutil.DecodeAddress(storedTx.StakerAddress, &chaincfg.MainNetParams) require.NoError(t, err) require.NoError(t, err) - err = s.AddTransaction( + err = s.AddTransactionSentToBTC( storedTx.StakingTx, storedTx.StakingOutputIndex, storedTx.StakingTime, @@ -182,7 +182,7 @@ func TestStateTransitions(t *testing.T) { stakerAddr, err := btcutil.DecodeAddress(tx.StakerAddress, &chaincfg.MainNetParams) require.NoError(t, err) txHash := tx.StakingTx.TxHash() - err = s.AddTransaction( + err = s.AddTransactionSentToBTC( tx.StakingTx, tx.StakingOutputIndex, tx.StakingTime, @@ -238,8 +238,7 @@ func TestPaginator(t *testing.T) { for _, storedTx := range generatedStoredTxs { stakerAddr, err := btcutil.DecodeAddress(storedTx.StakerAddress, &chaincfg.MainNetParams) require.NoError(t, err) - require.NoError(t, err) - err = s.AddTransaction( + err = s.AddTransactionSentToBTC( storedTx.StakingTx, storedTx.StakingOutputIndex, storedTx.StakingTime, @@ -309,7 +308,7 @@ func FuzzQuerySpendableTx(f *testing.F) { stakerAddr, err := btcutil.DecodeAddress(storedTx.StakerAddress, &chaincfg.MainNetParams) require.NoError(t, err) require.NoError(t, err) - err = s.AddTransaction( + err = s.AddTransactionSentToBTC( storedTx.StakingTx, storedTx.StakingOutputIndex, storedTx.StakingTime, @@ -383,3 +382,71 @@ func FuzzQuerySpendableTx(f *testing.F) { require.Equal(t, storedResult.Total, uint64(maxCreatedTx)) }) } + +func FuzzTrackInputs(f *testing.F) { + // only 3 seeds as this is pretty slow test opening/closing db + datagen.AddRandomSeedsToFuzzer(f, 3) + + f.Fuzz(func(t *testing.T, seed int64) { + r := rand.New(rand.NewSource(seed)) + s := MakeTestStore(t) + numTx := 45 + // all gene + generatedStoredTxs := genNStoredTransactions(t, r, numTx, 200) + + randomUnbondingTx := datagen.GenRandomTx(r) + unbodningTime := uint16(r.Int31n(int32(200)) + 1) + + for _, storedTx := range generatedStoredTxs { + stakerAddr, err := btcutil.DecodeAddress(storedTx.StakerAddress, &chaincfg.MainNetParams) + require.NoError(t, err) + err = s.AddTransactionSentToBabylon( + storedTx.StakingTx, + storedTx.StakingOutputIndex, + storedTx.StakingTime, + storedTx.FinalityProvidersBtcPks, + storedTx.Pop, + stakerAddr, + randomUnbondingTx, + unbodningTime, + ) + require.NoError(t, err) + } + + for _, storedTx := range generatedStoredTxs { + // check all inputs are used + for _, inp := range storedTx.StakingTx.TxIn { + used, err := s.OutpointUsed(&inp.PreviousOutPoint) + require.NoError(t, err) + require.True(t, used) + } + } + + // generate few not saved transactions + notSaved := genNStoredTransactions(t, r, 20, 200) + + // check all input are not used + for _, storedTx := range notSaved { + for _, inp := range storedTx.StakingTx.TxIn { + used, err := s.OutpointUsed(&inp.PreviousOutPoint) + require.NoError(t, err) + require.False(t, used) + } + } + + for _, storedTx := range generatedStoredTxs { + stakingTxHash := storedTx.StakingTx.TxHash() + err := s.SetDelegationActiveOnBabylon(&stakingTxHash) + require.NoError(t, err) + } + + for _, storedTx := range generatedStoredTxs { + // check all inputs are deleted + for _, inp := range storedTx.StakingTx.TxIn { + used, err := s.OutpointUsed(&inp.PreviousOutPoint) + require.NoError(t, err) + require.False(t, used) + } + } + }) +}