Skip to content

Commit

Permalink
Add used inputs tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
KonradStaniec committed Oct 16, 2024
1 parent 16c8dd0 commit 4e8d094
Show file tree
Hide file tree
Showing 3 changed files with 241 additions and 21 deletions.
19 changes: 15 additions & 4 deletions staker/stakerapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -1349,7 +1349,7 @@ func (app *StakerApp) handlePostApprovalCmd(cmd *stakingRequestCmd) error {
return err
}

if err := app.txTracker.AddTransaction(
if err := app.txTracker.AddTransactionSentToBTC(
cmd.stakingTx,
cmd.stakingOutputIdx,
cmd.stakingTime,
Expand Down Expand Up @@ -1508,14 +1508,25 @@ func (app *StakerApp) handleStakingEvents() {

if err := app.txTracker.SetTxUnbondingSignaturesReceived(
&ev.stakingTxHash,
ev.delegationActive,
babylonCovSigsToDbSigSigs(ev.covenantUnbondingSignatures),
); err != nil {
// TODO: handle this error somehow, it means we possilbly make invalid state transition
app.logger.Fatalf("Error setting state for tx %s: %s", &ev.stakingTxHash, err)
}

if !ev.delegationActive {
if ev.delegationActive {
app.wg.Add(1)
go func(hash chainhash.Hash) {
defer app.wg.Done()
utils.PushOrQuit[*delegationActiveOnBabylonEvent](
app.delegationActiveOnBabylonEvChan,
&delegationActiveOnBabylonEvent{
stakingTxHash: hash,
},
app.quit,
)
}(ev.stakingTxHash)
} else {
storedTx, _ := app.mustGetTransactionAndStakerAddress(&ev.stakingTxHash)
// if the delegation is not active here, it can only mean that statking
// is going through pre-approvel flow. Fire up task to send staking tx
Expand All @@ -1528,7 +1539,6 @@ func (app *StakerApp) handleStakingEvents() {
)
}

app.m.DelegationsActivatedOnBabylon.Inc()
app.logStakingEventProcessed(ev)

case ev := <-app.unbondingTxConfirmedOnBtcEvChan:
Expand Down Expand Up @@ -1560,6 +1570,7 @@ func (app *StakerApp) handleStakingEvents() {
// which is seems like programming error. Maybe panic?
app.logger.Fatalf("Error setting state for tx %s: %s", ev.stakingTxHash, err)
}
app.m.DelegationsActivatedOnBabylon.Inc()
app.logStakingEventProcessed(ev)

case ev := <-app.criticalErrorEvChan:
Expand Down
166 changes: 154 additions & 12 deletions stakerdb/trackedtranactionstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ var (
// It holds additional data for staking transaction in watch only mode
watchedTxDataBucketName = []byte("watched")

// mapping outpoint -> txHash
// It holds mapping from outpoint to transaction hash
// outpoint: outpoint.txHash || bigendian(outpoint.index)
inputsDataBucketName = []byte("inputs")

// key for next transaction
numTxKey = []byte("ntk")
)
Expand Down Expand Up @@ -245,6 +250,12 @@ func (c *TrackedTransactionStore) initBuckets() error {
return err
}

_, err = tx.CreateTopLevelBucket(inputsDataBucketName)

if err != nil {
return err
}

return nil
})
}
Expand Down Expand Up @@ -474,6 +485,7 @@ func saveTrackedTransaction(
txHashBytes []byte,
tx *proto.TrackedTransaction,
watchedTxData *proto.WatchedTxData,
id *inputData,
) error {
if tx == nil {
return fmt.Errorf("cannot save nil tracked transaciton")
Expand Down Expand Up @@ -521,6 +533,22 @@ func saveTrackedTransaction(
}
}

if id != nil {
inputDataBucket := rwTx.ReadWriteBucket(inputsDataBucketName)
if inputDataBucket == nil {
return ErrCorruptedTransactionsDb
}

for _, input := range id.inputs {
// save all the inputs to the transaction
err = inputDataBucket.Put(input, txHashBytes)

if err != nil {
return err
}
}
}

// increment counter for the next transaction
return txIdxBucket.Put(numTxKey, uint64KeyToBytes(nextTxKey+1))
}
Expand All @@ -529,6 +557,7 @@ func (c *TrackedTransactionStore) addTransactionInternal(
txHashBytes []byte,
tt *proto.TrackedTransaction,
wd *proto.WatchedTxData,
id *inputData,
) error {
return kvdb.Batch(c.db, func(tx kvdb.RwTx) error {
transactionsBucketIdxBucket := tx.ReadWriteBucket(transactionIndexName)
Expand All @@ -548,7 +577,7 @@ func (c *TrackedTransactionStore) addTransactionInternal(
return ErrCorruptedTransactionsDb
}

return saveTrackedTransaction(tx, transactionsBucketIdxBucket, transactionsBucket, txHashBytes, tt, wd)
return saveTrackedTransaction(tx, transactionsBucketIdxBucket, transactionsBucket, txHashBytes, tt, wd, id)
})
}

Expand Down Expand Up @@ -599,6 +628,58 @@ func CreateTrackedTransaction(
return protoTxToStoredTransaction(&msg)
}

type inputData struct {
inputs [][]byte
txHash []byte
}

func outpointBytes(op *wire.OutPoint) ([]byte, error) {
var buf bytes.Buffer
_, err := buf.Write(op.Hash.CloneBytes())
if err != nil {
return nil, err
}

binary.Write(&buf, binary.BigEndian, op.Index)

return buf.Bytes(), nil
}

func outpointFromBytes(b []byte) (*wire.OutPoint, error) {
if len(b) != 36 {
return nil, fmt.Errorf("invalid outpoint bytes length")
}

hash, err := chainhash.NewHash(b[:32])
if err != nil {
return nil, err
}

return &wire.OutPoint{
Hash: *hash,
Index: binary.BigEndian.Uint32(b[32:]),
}, nil

}

func getInputData(tx *wire.MsgTx) (*inputData, error) {
var inputs [][]byte

for _, in := range tx.TxIn {
opBytes, err := outpointBytes(&in.PreviousOutPoint)
if err != nil {
return nil, err
}
inputs = append(inputs, opBytes)
}
txHash := tx.TxHash()

return &inputData{
inputs: inputs,
txHash: txHash.CloneBytes(),
}, nil
}

func (c *TrackedTransactionStore) AddTransactionSentToBabylon(
btcTx *wire.MsgTx,
stakingOutputIndex uint32,
Expand Down Expand Up @@ -653,12 +734,18 @@ func (c *TrackedTransactionStore) AddTransactionSentToBabylon(
UnbondingTxData: update,
}

inputData, err := getInputData(btcTx)

if err != nil {
return fmt.Errorf("failed to get input data: %w", err)
}

return c.addTransactionInternal(
txHashBytes[:], &msg, nil,
txHashBytes[:], &msg, nil, inputData,
)
}

func (c *TrackedTransactionStore) AddTransaction(
func (c *TrackedTransactionStore) AddTransactionSentToBTC(
btcTx *wire.MsgTx,
stakingOutputIndex uint32,
stakingTime uint16,
Expand Down Expand Up @@ -705,7 +792,7 @@ func (c *TrackedTransactionStore) AddTransaction(
}

return c.addTransactionInternal(
txHashBytes[:], &msg, nil,
txHashBytes[:], &msg, nil, nil,
)
}

Expand Down Expand Up @@ -791,7 +878,7 @@ func (c *TrackedTransactionStore) AddWatchedTransaction(
}

return c.addTransactionInternal(
txHashBytes, &msg, &watchedData,
txHashBytes, &msg, &watchedData, nil,
)
}

Expand Down Expand Up @@ -841,6 +928,39 @@ func (c *TrackedTransactionStore) setTxState(
return err
}

// delegation has been activaten remove used inputs if any exists
// TODO(konrad): This is not pretty architecture wise and a bit broken in scenario
// that delegation is never activated
if storedTx.State == proto.TransactionState_DELEGATION_ACTIVE {
inputDataBucket := tx.ReadWriteBucket(inputsDataBucketName)
if inputDataBucket == nil {
return ErrCorruptedTransactionsDb
}

var stakingTx wire.MsgTx
err := stakingTx.Deserialize(bytes.NewReader(storedTx.StakingTransaction))

if err != nil {
return err
}

for _, input := range stakingTx.TxIn {
input, err := outpointBytes(&input.PreviousOutPoint)

if err != nil {
return err
}

// if key does not exist, this operation is no-op
err = inputDataBucket.Delete(input)

if err != nil {
return err
}
}

}

return nil
})
}
Expand Down Expand Up @@ -915,7 +1035,6 @@ func (c *TrackedTransactionStore) SetDelegationActiveOnBabylon(txHash *chainhash

func (c *TrackedTransactionStore) SetTxUnbondingSignaturesReceived(
txHash *chainhash.Hash,
delegationActive bool,
covenantSignatures []PubKeySigPair,
) error {
setUnbondingSignaturesReceived := func(tx *proto.TrackedTransaction) error {
Expand All @@ -926,12 +1045,7 @@ func (c *TrackedTransactionStore) SetTxUnbondingSignaturesReceived(
if len(tx.UnbondingTxData.CovenantSignatures) > 0 {
return fmt.Errorf("cannot set unbonding signatures received, because unbonding signatures already exist: %w", ErrInvalidUnbondingDataUpdate)
}

if delegationActive {
tx.State = proto.TransactionState_DELEGATION_ACTIVE
} else {
tx.State = proto.TransactionState_VERIFIED
}
tx.State = proto.TransactionState_VERIFIED
tx.UnbondingTxData.CovenantSignatures = covenantSigsToProto(covenantSignatures)
return nil
}
Expand Down Expand Up @@ -1200,3 +1314,31 @@ func (c *TrackedTransactionStore) ScanTrackedTransactions(scanFunc StoredTransac
})
}, reset)
}

func (c *TrackedTransactionStore) OutpointUsed(op *wire.OutPoint) (bool, error) {
var used bool = false

err := c.db.View(func(tx kvdb.RTx) error {
inputsBucket := tx.ReadBucket(inputsDataBucketName)

if inputsBucket == nil {
return ErrCorruptedTransactionsDb
}

opBytes, err := outpointBytes(op)

if err != nil {
return fmt.Errorf("invalid outpoint provided: %w", err)
}

res := inputsBucket.Get(opBytes)

if res != nil {
used = true
}

return nil
}, func() {})

return used, err
}
Loading

0 comments on commit 4e8d094

Please sign in to comment.