diff --git a/action_transaction.go b/action_transaction.go index aff14c45..53260c6b 100644 --- a/action_transaction.go +++ b/action_transaction.go @@ -24,103 +24,15 @@ import ( // txHex is the raw transaction hex // draftID is the unique draft id from a previously started New() transaction (draft_transaction.ID) // opts are model options and can include "metadata" -func (c *Client) RecordTransaction(ctx context.Context, xPubKey, txHex, draftID string, - opts ...ModelOps, -) (*Transaction, error) { - // Check for existing NewRelic transaction +func (c *Client) RecordTransaction(ctx context.Context, xPubKey, txHex, draftID string, opts ...ModelOps) (*Transaction, error) { ctx = c.GetOrStartTxn(ctx, "record_transaction") - // Create the model & set the default options (gives options from client->model) - newOpts := c.DefaultModelOptions(append(opts, WithXPub(xPubKey), New())...) - transaction := newTransactionWithDraftID( - txHex, draftID, newOpts..., - ) - - // Ensure that we have a transaction id (created from the txHex) - id := transaction.GetID() - if len(id) == 0 { - return nil, ErrMissingTxHex - } - - var ( - unlock func() - err error - ) - // Create the lock and set the release for after the function completes - // Waits for the moment when the transaction is unlocked and creates a new lock - // Relevant for bux to bux transactions, as we have 1 tx but need to record 2 txs - outgoing and incoming - for { - unlock, err = newWriteLock( - ctx, fmt.Sprintf(lockKeyRecordTx, id), c.Cachestore(), - ) - if err == nil { - break - } - time.Sleep(time.Second * 1) - } - defer unlock() - - // OPTION: check incoming transactions (if enabled, will add to queue for checking on-chain) - if !c.IsITCEnabled() { - transaction.DebugLog("incoming transaction check is disabled") - } else { - - // Incoming (external/unknown) transaction (no draft id was given) - if len(transaction.DraftID) == 0 { - - // Process & save the model - incomingTx := newIncomingTransaction( - transaction.ID, txHex, newOpts..., - ) - if err = incomingTx.Save(ctx); err != nil { - return nil, err - } - - // Check if sync transaction exist. And if not, we should create it - if syncTx, _ := GetSyncTransactionByID(ctx, transaction.ID, transaction.client.DefaultModelOptions()...); syncTx == nil { - // Create the sync transaction model - sync := newSyncTransaction( - transaction.GetID(), - transaction.Client().DefaultSyncConfig(), - transaction.GetOptions(true)..., - ) - - // Skip broadcasting and skip P2P (incoming tx should have been broadcasted already) - sync.BroadcastStatus = SyncStatusSkipped // todo: this is an assumption - sync.P2PStatus = SyncStatusSkipped // The owner of the Tx should have already notified paymail providers - - // Use the same metadata - sync.Metadata = transaction.Metadata - - // If all the options are skipped, do not make a new model (ignore the record) - if !sync.isSkipped() { - if err = sync.Save(ctx); err != nil { - return nil, err - } - } - } - // Added to queue - return newTransactionFromIncomingTransaction(incomingTx), nil - } - - // Internal tx (must match draft tx) - if transaction.draftTransaction, err = getDraftTransactionID( - ctx, transaction.XPubID, transaction.DraftID, - transaction.GetOptions(false)..., - ); err != nil { - return nil, err - } else if transaction.draftTransaction == nil { - return nil, ErrDraftNotFound - } - } - - // Process & save the transaction model - if err = transaction.Save(ctx); err != nil { + rts, err := getRecordTxStrategy(ctx, c, xPubKey, txHex, draftID) + if err != nil { return nil, err } - // Return the response - return transaction, nil + return recordTransaction(ctx, c, rts, opts...) } // RecordRawTransaction will parse the transaction and save it into the Datastore directly, without any checks @@ -198,11 +110,30 @@ func (c *Client) recordTxHex(ctx context.Context, txHex string, opts ...ModelOps return nil, err } - // run before create to see whether xpub_in_ids or xpub_out_ids is set - if err = transaction.BeforeCreating(ctx); err != nil { + // Logic moved from BeforeCreating hook - should be refactorized in next iteration + + // If we are external and the user disabled incoming transaction checking, check outputs + if transaction.isExternal() && !transaction.Client().IsITCEnabled() { + // Check that the transaction has >= 1 known destination + if !transaction.TransactionBase.hasOneKnownDestination(ctx, transaction.Client(), transaction.GetOptions(false)...) { + return nil, ErrNoMatchingOutputs + } + } + + // Process the UTXOs + if err = transaction.processUtxos(ctx); err != nil { return nil, err } + // Set the values from the inputs/outputs and draft tx + transaction.TotalValue, transaction.Fee = transaction.getValues() + + // Add values + transaction.NumberOfInputs = uint32(len(transaction.TransactionBase.parsedTx.Inputs)) + transaction.NumberOfOutputs = uint32(len(transaction.TransactionBase.parsedTx.Outputs)) + + // /Logic moved from BeforeCreating hook - should be refactorized in next iteration + monitor := c.options.chainstate.Monitor() if monitor != nil { @@ -213,7 +144,7 @@ func (c *Client) recordTxHex(ctx context.Context, txHex string, opts ...ModelOps } } - // Process & save the transaction model + // save the transaction model if err = transaction.Save(ctx); err != nil { return nil, err } @@ -418,7 +349,7 @@ func (c *Client) UpdateTransactionMetadata(ctx context.Context, xPubID, id strin return nil, err } - // Save the model + // Save the model // update existing record if err = transaction.Save(ctx); err != nil { return nil, err } @@ -550,7 +481,7 @@ func (c *Client) RevertTransaction(ctx context.Context, id string) error { transaction.XpubOutputValue = XpubOutputValue{"reverted": 0} transaction.DeletedAt.Valid = true transaction.DeletedAt.Time = time.Now() - err = transaction.Save(ctx) + err = transaction.Save(ctx) // update existing record return err } diff --git a/action_transaction_test.go b/action_transaction_test.go index 47dbaa80..2eb5ef6c 100644 --- a/action_transaction_test.go +++ b/action_transaction_test.go @@ -221,8 +221,16 @@ func initRevertTransactionData(t *testing.T) (context.Context, ClientInterface, require.NoError(t, err) assert.NotEmpty(t, hex) - var transaction *Transaction - transaction, err = client.RecordTransaction(ctx, testXPub, hex, draftTransaction.ID, client.DefaultModelOptions()...) + newOpts := client.DefaultModelOptions(WithXPub(testXPub), New()) + transaction := newTransactionWithDraftID( + hex, draftTransaction.ID, newOpts..., + ) + transaction.draftTransaction = draftTransaction + _hydrateOutgoingWithSync(transaction) + err = transaction.processUtxos(ctx) + require.NoError(t, err) + + err = transaction.Save(ctx) require.NoError(t, err) assert.NotEmpty(t, transaction) diff --git a/beef_tx_bytes.go b/beef_tx_bytes.go index e0e36a05..705ec5b5 100644 --- a/beef_tx_bytes.go +++ b/beef_tx_bytes.go @@ -47,7 +47,7 @@ func (beefTx *beefTx) toBeefBytes() ([]byte, error) { // compose beef buffer := make([]byte, 0, beefSize) - buffer = append(buffer, version...) + buffer = append(buffer, ver...) buffer = append(buffer, nBUMPS...) buffer = append(buffer, bumps...) diff --git a/beef_tx_test.go b/beef_tx_test.go index 16cb0d0f..313e401a 100644 --- a/beef_tx_test.go +++ b/beef_tx_test.go @@ -10,23 +10,7 @@ import ( ) func Test_ToBeefHex(t *testing.T) { - t.Run("all parents txs are already mined", func(t *testing.T) { - // given - ctx, client, deferMe := initSimpleTestCase(t) - defer deferMe() - - ancestorTx := addGrandpaTx(ctx, t, client) - minedParentTx := createTxWithDraft(ctx, t, client, ancestorTx, true) - - newTx := createTxWithDraft(ctx, t, client, minedParentTx, false) - - // when - hex, err := ToBeefHex(ctx, newTx) - - // then - assert.NoError(t, err) - assert.NotEmpty(t, hex) - }) + // TOOD: prepare tests in BUX-168 t.Run("some parents txs are not mined yet", func(t *testing.T) { // Error expected! this should be changed in the future. right now the test case has been written to make sure the system doesn't panic in such a situation @@ -64,7 +48,7 @@ func addGrandpaTx(ctx context.Context, t *testing.T, client ClientInterface) *Tr }, } grandpaTx.MerkleProof = MerkleProof(grandpaTxMp) - grandpaTx.BUMP = grandpaTx.MerkleProof.ToBUMP() + grandpaTx.BUMP = grandpaTx.MerkleProof.ToBUMP(grandpaTx.BlockHeight) err := grandpaTx.Save(ctx) require.NoError(t, err) diff --git a/db_model_transactions.go b/db_model_transactions.go index 8ab4da8a..54f63215 100644 --- a/db_model_transactions.go +++ b/db_model_transactions.go @@ -59,80 +59,6 @@ func (m *Transaction) BeforeCreating(ctx context.Context) error { return err } - // m.xPubID is the xpub of the user registering the transaction - if len(m.XPubID) > 0 && len(m.DraftID) > 0 { - // Only get the draft if we haven't already - if m.draftTransaction == nil { - if m.draftTransaction, err = getDraftTransactionID( - ctx, m.XPubID, m.DraftID, m.GetOptions(false)..., - ); err != nil { - return err - } else if m.draftTransaction == nil { - return ErrDraftNotFound - } - } - } - - // Validations and broadcast config check - if m.draftTransaction != nil { - - // No config set? Use the default from the client - if m.draftTransaction.Configuration.Sync == nil { - m.draftTransaction.Configuration.Sync = m.Client().DefaultSyncConfig() - } - - // Create the sync transaction model - sync := newSyncTransaction( - m.GetID(), - m.draftTransaction.Configuration.Sync, - m.GetOptions(true)..., - ) - - // Found any p2p outputs? - p2pStatus := SyncStatusSkipped - if m.draftTransaction.Configuration.Outputs != nil { - for _, output := range m.draftTransaction.Configuration.Outputs { - if output.PaymailP4 != nil && output.PaymailP4.ResolutionType == ResolutionTypeP2P { - p2pStatus = SyncStatusPending - } - } - } - sync.P2PStatus = p2pStatus - - // Use the same metadata - sync.Metadata = m.Metadata - - // set this transaction on the sync transaction object. This is needed for the first broadcast - sync.transaction = m - - // If all the options are skipped, do not make a new model (ignore the record) - if !sync.isSkipped() { - m.syncTransaction = sync - } - } - - // If we are external and the user disabled incoming transaction checking, check outputs - if m.isExternal() && !m.Client().IsITCEnabled() { - // Check that the transaction has >= 1 known destination - if !m.TransactionBase.hasOneKnownDestination(ctx, m.Client(), m.GetOptions(false)...) { - return ErrNoMatchingOutputs - } - } - - // Process the UTXOs - if err = m.processUtxos(ctx); err != nil { - return err - } - - // Set the values from the inputs/outputs and draft tx - m.TotalValue, m.Fee = m.getValues() - - // Add values if found - if m.TransactionBase.parsedTx != nil { - m.NumberOfInputs = uint32(len(m.TransactionBase.parsedTx.Inputs)) - m.NumberOfOutputs = uint32(len(m.TransactionBase.parsedTx.Outputs)) - } - m.DebugLog("end: " + m.Name() + " BeforeCreating hook") m.beforeCreateCalled = true return nil @@ -297,8 +223,7 @@ func (m *Transaction) migrateBUMP() error { return err } for _, tx := range txs { - bump := tx.MerkleProof.ToBUMP() - bump.BlockHeight = tx.BlockHeight + bump := tx.MerkleProof.ToBUMP(tx.BlockHeight) tx.BUMP = bump _ = tx.Save(ctx) } diff --git a/mock_chainstate_test.go b/mock_chainstate_test.go index 39bc5558..77ff6a9b 100644 --- a/mock_chainstate_test.go +++ b/mock_chainstate_test.go @@ -5,6 +5,7 @@ import ( "time" "github.com/bitcoin-sv/go-broadcast-client/broadcast" + "github.com/libsv/go-bc" "github.com/tonicpow/go-minercraft/v2" "github.com/BuxOrg/bux/chainstate" @@ -133,6 +134,11 @@ func (c *chainStateEverythingOnChain) QueryTransaction(_ context.Context, id str ID: id, MinerID: "", Provider: "whatsonchain", + MerkleProof: &bc.MerkleProof{ + Index: 37008, + TxOrID: id, + Nodes: []string{"3228f78cfd3c96262ec521225f1b9dd6326b4d3e245d1551bb06258f2101cb65", "05267706279d2e5ebcf89ed0645d4283108c7e850cdb84aeb0974738ae447a8d"}, + }, }, nil } @@ -147,6 +153,11 @@ func (c *chainStateEverythingOnChain) QueryTransactionFastest(_ context.Context, ID: id, MinerID: "", Provider: "whatsonchain", + MerkleProof: &bc.MerkleProof{ + Index: 37008, + TxOrID: id, + Nodes: []string{"3228f78cfd3c96262ec521225f1b9dd6326b4d3e245d1551bb06258f2101cb65", "05267706279d2e5ebcf89ed0645d4283108c7e850cdb84aeb0974738ae447a8d"}, + }, }, nil } diff --git a/model_bump.go b/model_bump.go index 41772230..059fb1cc 100644 --- a/model_bump.go +++ b/model_bump.go @@ -10,13 +10,14 @@ import ( "reflect" "sort" + "github.com/libsv/go-bc" "github.com/libsv/go-bt/v2" ) const maxBumpHeight = 64 // BUMPs represents a slice of BUMPs - BSV Unified Merkle Paths -type BUMPs []BUMP +type BUMPs []*BUMP // BUMP represents BUMP (BSV Unified Merkle Path) format type BUMP struct { @@ -29,43 +30,64 @@ type BUMP struct { // BUMPLeaf represents each BUMP path element type BUMPLeaf struct { Offset uint64 `json:"offset,string"` - Hash string `json:"hash"` + Hash string `json:"hash,omitempty"` TxID bool `json:"txid,omitempty"` Duplicate bool `json:"duplicate,omitempty"` } // CalculateMergedBUMP calculates Merged BUMP from a slice of Merkle Proofs -func CalculateMergedBUMP(mp []MerkleProof) (BUMP, error) { - bump := BUMP{} - - if len(mp) == 0 || mp == nil { - return bump, nil +func CalculateMergedBUMP(bumps []BUMP) (*BUMP, error) { + if len(bumps) == 0 || bumps == nil { + return nil, nil } - height := len(mp[0].Nodes) - if height > maxBumpHeight { - return bump, + blockHeight := bumps[0].BlockHeight + bumpHeight := len(bumps[0].Path) + if bumpHeight > maxBumpHeight { + return nil, fmt.Errorf("BUMP cannot be higher than %d", maxBumpHeight) } - for _, m := range mp { - if height != len(m.Nodes) { - return bump, + for _, b := range bumps { + if bumpHeight != len(b.Path) { + return nil, errors.New("Merged BUMP cannot be obtained from Merkle Proofs of different heights") } + if b.BlockHeight != blockHeight { + return nil, + errors.New("BUMPs have different block heights. Cannot merge BUMPs from different blocks") + } + if len(b.Path) == 0 { + return nil, + errors.New("Empty BUMP given") + } } - bump.Path = make([][]BUMPLeaf, height) - bump.allNodes = make([]map[uint64]bool, height) + bump := BUMP{BlockHeight: blockHeight} + bump.Path = make([][]BUMPLeaf, bumpHeight) + bump.allNodes = make([]map[uint64]bool, bumpHeight) for i := range bump.allNodes { bump.allNodes[i] = make(map[uint64]bool, 0) } - for _, m := range mp { - bumpToAdd := m.ToBUMP() - err := bump.add(bumpToAdd) + merkleRoot, err := bumps[0].calculateMerkleRoot() + if err != nil { + return nil, err + } + + for _, b := range bumps { + mr, err := b.calculateMerkleRoot() if err != nil { - return BUMP{}, err + return nil, err + } + + if merkleRoot != mr { + return nil, errors.New("BUMPs have different merkle roots") + } + + err = bump.add(b) + if err != nil { + return nil, err } } @@ -75,7 +97,7 @@ func CalculateMergedBUMP(mp []MerkleProof) (BUMP, error) { }) } - return bump, nil + return &bump, nil } func (bump *BUMP) add(b BUMP) error { @@ -104,6 +126,97 @@ func (bump *BUMP) add(b BUMP) error { return nil } +func (b *BUMP) calculateMerkleRoot() (string, error) { + merkleRoot := "" + + for _, bumpPathElement := range b.Path[0] { + if bumpPathElement.TxID { + calcMerkleRoot, err := calculateMerkleRoot(bumpPathElement, b) + if err != nil { + return "", err + } + + if merkleRoot == "" { + merkleRoot = calcMerkleRoot + continue + } + + if calcMerkleRoot != merkleRoot { + return "", errors.New("different merkle roots for the same block") + } + } + } + return merkleRoot, nil +} + +// calculateMerkleRoots will calculate one merkle root for tx in the BUMPLeaf +func calculateMerkleRoot(baseLeaf BUMPLeaf, bump *BUMP) (string, error) { + calculatedHash := baseLeaf.Hash + offset := baseLeaf.Offset + + for _, bLevel := range bump.Path { + newOffset := getOffsetPair(offset) + leafInPair := findLeafByOffset(newOffset, bLevel) + if leafInPair == nil { + return "", errors.New("could not find pair") + } + + leftNode, rightNode := prepareNodes(baseLeaf, offset, *leafInPair, newOffset) + + str, err := bc.MerkleTreeParentStr(leftNode, rightNode) + if err != nil { + return "", err + } + calculatedHash = str + + offset = offset / 2 + + baseLeaf = BUMPLeaf{ + Hash: calculatedHash, + Offset: offset, + } + } + + return calculatedHash, nil +} + +func findLeafByOffset(offset uint64, bumpLeaves []BUMPLeaf) *BUMPLeaf { + for _, bumpTx := range bumpLeaves { + if bumpTx.Offset == offset { + return &bumpTx + } + } + return nil +} + +func getOffsetPair(offset uint64) uint64 { + if offset%2 == 0 { + return offset + 1 + } + return offset - 1 +} + +func prepareNodes(baseLeaf BUMPLeaf, offset uint64, leafInPair BUMPLeaf, newOffset uint64) (string, string) { + var baseLeafHash, pairLeafHash string + + if baseLeaf.Duplicate { + baseLeafHash = leafInPair.Hash + } else { + baseLeafHash = baseLeaf.Hash + } + + if leafInPair.Duplicate { + pairLeafHash = baseLeaf.Hash + } else { + pairLeafHash = leafInPair.Hash + } + + if newOffset > offset { + return baseLeafHash, pairLeafHash + } + return pairLeafHash, baseLeafHash +} + // Bytes returns BUMPs bytes func (bumps *BUMPs) Bytes() []byte { var buff bytes.Buffer diff --git a/model_bump_test.go b/model_bump_test.go index a0dca28d..64e91567 100644 --- a/model_bump_test.go +++ b/model_bump_test.go @@ -10,45 +10,74 @@ import ( func TestBUMPModel_CalculateBUMP(t *testing.T) { t.Parallel() - t.Run("Single Merkle Proof", func(t *testing.T) { + t.Run("Single BUMP", func(t *testing.T) { // given - merkleProofs := []MerkleProof{ + bumps := []BUMP{ { - Index: 1, - TxOrID: "txId", - Nodes: []string{"node0", "node1", "node2", "node3"}, + BlockHeight: 0, + Path: [][]BUMPLeaf{ + { + { + Offset: 0, + Hash: "123b00", // this has to be a valid hex now + }, + { + Offset: 1, + Hash: "123b", + TxID: true, + }, + }, + { + { + Offset: 1, + Hash: "123b01", + }, + }, + { + { + Offset: 1, + Hash: "123b02", + }, + }, + { + { + Offset: 1, + Hash: "123b03", + }, + }, + }, }, } - expectedBUMP := BUMP{ + expectedBUMP := &BUMP{ BlockHeight: 0, Path: [][]BUMPLeaf{ { { Offset: 0, - Hash: "node0", + Hash: "123b00", }, { Offset: 1, - Hash: "txId", + Hash: "123b", TxID: true, }, }, { { Offset: 1, - Hash: "node1", + Hash: "123b01", }, }, { { Offset: 1, - Hash: "node2", + Hash: "123b02", }, }, { { Offset: 1, - Hash: "node3", + Hash: "123b03", }, }, }, @@ -70,260 +99,299 @@ func TestBUMPModel_CalculateBUMP(t *testing.T) { } // when - bump, err := CalculateMergedBUMP(merkleProofs) + bump, err := CalculateMergedBUMP(bumps) // then assert.NoError(t, err) assert.Equal(t, expectedBUMP, bump) }) - t.Run("Slice of Merkle Proofs", func(t *testing.T) { + t.Run("Paired Transactions", func(t *testing.T) { // given - merkleProofs := []MerkleProof{ - { - Index: 2, - TxOrID: "txId1", - Nodes: []string{"D", "AB", "EFGH", "IJKLMNOP"}, - }, + bumps := []BUMP{ { - Index: 7, - TxOrID: "txId2", - Nodes: []string{"G", "EF", "ABCD", "IJKLMNOP"}, - }, - { - Index: 13, - TxOrID: "txId3", - Nodes: []string{"M", "OP", "IJKL", "ABCDEFGH"}, - }, - } - expectedBUMP := BUMP{ - BlockHeight: 0, - Path: [][]BUMPLeaf{ - { - { - Offset: 2, - Hash: "txId1", - TxID: true, - }, + BlockHeight: 0, + Path: [][]BUMPLeaf{ { - Offset: 3, - Hash: "D", + { + Offset: 8, + Hash: "123b09", + TxID: true, + }, + { + Offset: 9, + Hash: "123b10", + }, }, { - Offset: 6, - Hash: "G", + { + Offset: 5, + Hash: "123b1112", + }, }, { - Offset: 7, - Hash: "txId2", - TxID: true, + { + Offset: 3, + Hash: "123b13141516", + }, }, { - Offset: 12, - Hash: "M", + { + Offset: 0, + Hash: "123b0102030405060708", + }, }, + }, + }, + { + BlockHeight: 0, + Path: [][]BUMPLeaf{ { - Offset: 13, - Hash: "txId3", - TxID: true, + { + Offset: 8, + Hash: "123b09", + }, + { + Offset: 9, + Hash: "123b10", + TxID: true, + }, }, - }, - { { - Offset: 0, - Hash: "AB", + { + Offset: 5, + Hash: "123b1112", + }, }, { - Offset: 2, - Hash: "EF", + { + Offset: 3, + Hash: "123b13141516", + }, }, { - Offset: 7, - Hash: "OP", + { + Offset: 0, + Hash: "123b0102030405060708", + }, }, }, + }, + } + expectedBUMP := &BUMP{ + BlockHeight: 0, + Path: [][]BUMPLeaf{ { { - Offset: 0, - Hash: "ABCD", + Offset: 8, + Hash: "123b09", + TxID: true, }, { - Offset: 1, - Hash: "EFGH", + Offset: 9, + Hash: "123b10", + TxID: true, }, + }, + { { - Offset: 2, - Hash: "IJKL", + Offset: 5, + Hash: "123b1112", }, }, { { - Offset: 0, - Hash: "ABCDEFGH", + Offset: 3, + Hash: "123b13141516", }, + }, + { { - Offset: 1, - Hash: "IJKLMNOP", + Offset: 0, + Hash: "123b0102030405060708", }, }, }, allNodes: []map[uint64]bool{ { - 2: true, - 3: true, - 6: true, - 7: true, - 12: true, - 13: true, + 8: true, + 9: true, }, { - 0: true, - 2: true, - 7: true, + 5: true, }, { - 0: true, - 1: true, - 2: true, + 3: true, }, { 0: true, - 1: true, }, }, } // when - bump, err := CalculateMergedBUMP(merkleProofs) + bump, err := CalculateMergedBUMP(bumps) // then assert.NoError(t, err) assert.Equal(t, expectedBUMP, bump) }) - t.Run("Paired Transactions", func(t *testing.T) { + t.Run("Different sizes of BUMPs", func(t *testing.T) { // given - merkleProofs := []MerkleProof{ - { - Index: 8, - TxOrID: "I", - Nodes: []string{"J", "KL", "MNOP", "ABCDEFGH"}, - }, + bumps := []BUMP{ { - Index: 9, - TxOrID: "J", - Nodes: []string{"I", "KL", "MNOP", "ABCDEFGH"}, - }, - } - expectedBUMP := BUMP{ - BlockHeight: 0, - Path: [][]BUMPLeaf{ - { + BlockHeight: 0, + Path: [][]BUMPLeaf{ { - Offset: 8, - Hash: "I", - TxID: true, + { + Offset: 8, + Hash: "123b09", + TxID: true, + }, + { + Offset: 9, + Hash: "123b10", + }, }, { - Offset: 9, - Hash: "J", - TxID: true, + { + Offset: 5, + Hash: "123b1112", + }, }, - }, - { { - Offset: 5, - Hash: "KL", + { + Offset: 3, + Hash: "123b13141516", + }, }, - }, - { { - Offset: 3, - Hash: "MNOP", + { + Offset: 0, + Hash: "123b0102030405060708", + }, }, }, - { + }, + { + BlockHeight: 0, + Path: [][]BUMPLeaf{ { - Offset: 0, - Hash: "ABCDEFGH", + { + Offset: 8, + Hash: "123b09", + }, + { + Offset: 9, + Hash: "123b10", + TxID: true, + }, + }, + { + { + Offset: 5, + Hash: "123b1112", + }, + }, + { + { + Offset: 3, + Hash: "123b0102030405060708", + }, }, - }, - }, - allNodes: []map[uint64]bool{ - { - 8: true, - 9: true, - }, - { - 5: true, - }, - { - 3: true, - }, - { - 0: true, }, }, } // when - bump, err := CalculateMergedBUMP(merkleProofs) + bump, err := CalculateMergedBUMP(bumps) // then - assert.NoError(t, err) - assert.Equal(t, expectedBUMP, bump) + assert.Error(t, err) + assert.Nil(t, bump) }) - t.Run("Different sizes of Merkle Proofs", func(t *testing.T) { + t.Run("BUMPs with different block heights", func(t *testing.T) { // given - merkleProofs := []MerkleProof{ + bumps := []BUMP{ { - Index: 8, - TxOrID: "I", - Nodes: []string{"J", "KL", "MNOP", "ABCDEFGH"}, + BlockHeight: 0, + Path: [][]BUMPLeaf{ + { + { + Offset: 8, + Hash: "123b09", + TxID: true, + }, + { + Offset: 9, + Hash: "123b10", + }, + }, + { + { + Offset: 5, + Hash: "123b1112", + }, + }, + }, }, { - Index: 9, - TxOrID: "J", - Nodes: []string{"I", "KL", "MNOP"}, + BlockHeight: 100, + Path: [][]BUMPLeaf{ + { + { + Offset: 8, + Hash: "123b09", + }, + { + Offset: 9, + Hash: "123b10", + TxID: true, + }, + }, + { + { + Offset: 5, + Hash: "123b1112", + }, + }, + }, }, } // when - bump, err := CalculateMergedBUMP(merkleProofs) + bump, err := CalculateMergedBUMP(bumps) // then assert.Error(t, err) - assert.Equal(t, bump, BUMP{}) + assert.Nil(t, bump) }) - t.Run("Empty slice of Merkle Proofs", func(t *testing.T) { + t.Run("Empty slice of BUMPS", func(t *testing.T) { // given - merkleProof := []MerkleProof{} + bumps := []BUMP{} // when - bump, err := CalculateMergedBUMP(merkleProof) + bump, err := CalculateMergedBUMP(bumps) // then assert.NoError(t, err) - assert.Equal(t, bump, BUMP{}) + assert.Nil(t, bump) }) - t.Run("Slice of empty Merkle Proofs", func(t *testing.T) { + t.Run("Slice of empty BUMPS", func(t *testing.T) { // given - merkleProofs := []MerkleProof{ + bumps := []BUMP{ {}, {}, {}, } // when - bump, err := CalculateMergedBUMP(merkleProofs) + bump, err := CalculateMergedBUMP(bumps) // then - assert.NoError(t, err) - assert.Equal(t, bump, BUMP{ - BlockHeight: 0, - Path: [][]BUMPLeaf{}, - allNodes: []map[uint64]bool{}, - }) + assert.Error(t, err) + assert.Nil(t, bump) }) } @@ -522,7 +590,7 @@ func TestBUMPModel_CalculateMergedBUMPAndHex(t *testing.T) { }, }, } - expectedBUMP := BUMP{ + expectedBUMP := &BUMP{ BlockHeight: 0, Path: [][]BUMPLeaf{ { @@ -727,7 +795,11 @@ func TestBUMPModel_CalculateMergedBUMPAndHex(t *testing.T) { "3d2388f114e6f627fd9dd632e72502699e419338bed5022840f4176e1731f715" // when - bump, err := CalculateMergedBUMP(merkleProof) + bumps := make([]BUMP, 0) + for _, mp := range merkleProof { + bumps = append(bumps, mp.ToBUMP(0)) + } + bump, err := CalculateMergedBUMP(bumps) actualHex := bump.Hex() // then diff --git a/model_draft_transactions.go b/model_draft_transactions.go index 114266fc..f82f79c9 100644 --- a/model_draft_transactions.go +++ b/model_draft_transactions.go @@ -380,7 +380,7 @@ func (m *DraftTransaction) createTransactionHex(ctx context.Context) (err error) // final sanity check inputValue := uint64(0) usedUtxos := make([]string, 0) - merkleProofs := make(map[uint64][]MerkleProof) + bumps := make(map[uint64][]BUMP) for _, input := range m.Configuration.Inputs { // check whether an utxo was used twice, this is not valid if utils.StringInSlice(input.Utxo.ID, usedUtxos) { @@ -392,8 +392,8 @@ func (m *DraftTransaction) createTransactionHex(ctx context.Context) (err error) if err != nil { return err } - if tx.MerkleProof.TxOrID != "" { - merkleProofs[tx.BlockHeight] = append(merkleProofs[tx.BlockHeight], tx.MerkleProof) + if len(tx.BUMP.Path) != 0 { + bumps[tx.BlockHeight] = append(bumps[tx.BlockHeight], tx.BUMP) } } outputValue := uint64(0) @@ -410,10 +410,13 @@ func (m *DraftTransaction) createTransactionHex(ctx context.Context) (err error) if inputValue-outputValue != m.Configuration.Fee { return ErrTransactionFeeInvalid } - for _, v := range merkleProofs { - bump, err := CalculateMergedBUMP(v) + for _, b := range bumps { + bump, err := CalculateMergedBUMP(b) if err != nil { - return err + return fmt.Errorf("Error while calculating Merged BUMP: %s", err.Error()) + } + if bump == nil { + continue } m.BUMPs = append(m.BUMPs, bump) } diff --git a/model_draft_transactions_test.go b/model_draft_transactions_test.go index c9c850e8..8037389c 100644 --- a/model_draft_transactions_test.go +++ b/model_draft_transactions_test.go @@ -1559,6 +1559,15 @@ func initSimpleTestCase(t *testing.T) (context.Context, ClientInterface, func()) require.NoError(t, err) transaction := newTransaction(testTxHex, append(client.DefaultModelOptions(), New())...) + err = transaction.processUtxos(ctx) + require.NoError(t, err) + transaction.TotalValue, transaction.Fee = transaction.getValues() + + if transaction.TransactionBase.parsedTx != nil { + transaction.NumberOfInputs = uint32(len(transaction.TransactionBase.parsedTx.Inputs)) + transaction.NumberOfOutputs = uint32(len(transaction.TransactionBase.parsedTx.Outputs)) + } + err = transaction.Save(ctx) require.NoError(t, err) diff --git a/model_incoming_transactions.go b/model_incoming_transactions.go index 72c7b39b..60b01651 100644 --- a/model_incoming_transactions.go +++ b/model_incoming_transactions.go @@ -2,10 +2,9 @@ package bux import ( "context" + "encoding/json" "errors" "fmt" - "runtime/debug" - "strings" "time" "github.com/BuxOrg/bux/chainstate" @@ -31,13 +30,12 @@ type IncomingTransaction struct { } // newIncomingTransaction will start a new model -func newIncomingTransaction(txID, hex string, opts ...ModelOps) (tx *IncomingTransaction) { +func newIncomingTransaction(hex string, opts ...ModelOps) (tx *IncomingTransaction) { // Create the model tx = &IncomingTransaction{ Model: *NewBaseModel(ModelIncomingTransaction, opts...), TransactionBase: TransactionBase{ - ID: txID, Hex: hex, }, Status: SyncStatusReady, @@ -45,7 +43,8 @@ func newIncomingTransaction(txID, hex string, opts ...ModelOps) (tx *IncomingTra // Attempt to parse if len(hex) > 0 { - tx.TransactionBase.parsedTx, _ = bt.NewTxFromString(hex) + tx.parsedTx, _ = bt.NewTxFromString(hex) + tx.ID = tx.parsedTx.TxID() } return @@ -54,7 +53,7 @@ func newIncomingTransaction(txID, hex string, opts ...ModelOps) (tx *IncomingTra // getIncomingTransactionByID will get the incoming transactions to process func getIncomingTransactionByID(ctx context.Context, id string, opts ...ModelOps) (*IncomingTransaction, error) { // Construct an empty tx - tx := newIncomingTransaction("", "", opts...) + tx := newIncomingTransaction("", opts...) tx.ID = id // Get the record @@ -128,6 +127,22 @@ func (m *IncomingTransaction) GetID() string { return m.ID } +func (m *IncomingTransaction) toTransactionDto() *Transaction { + t := Transaction{} + t.Hex = m.Hex + + t.parsedTx = m.parsedTx + t.rawXpubKey = m.rawXpubKey + t.setXPubID() + t.setID() + + t.Metadata = m.Metadata + t.NumberOfOutputs = uint32(len(m.parsedTx.Outputs)) + t.NumberOfInputs = uint32(len(m.parsedTx.Inputs)) + + return &t +} + // BeforeCreating will fire before the model is being inserted into the Datastore func (m *IncomingTransaction) BeforeCreating(ctx context.Context) error { m.DebugLog("starting: [" + m.name.String() + "] BeforeCreating hook...") @@ -189,7 +204,7 @@ func (m *IncomingTransaction) AfterCreated(ctx context.Context) error { // todo: this should be refactored into a task // go func(incomingTx *IncomingTransaction) { - if err := processIncomingTransaction(context.Background(), nil, m); err != nil { + if err := processIncomingTransaction(context.Background(), m.Client().Logger(), m); err != nil { m.Client().Logger().Error(ctx, "error processing incoming transaction: "+err.Error()) } // }(m) @@ -274,21 +289,14 @@ func processIncomingTransactions(ctx context.Context, logClient zLogger.GormLogg func processIncomingTransaction(ctx context.Context, logClient zLogger.GormLoggerInterface, incomingTx *IncomingTransaction) error { - if logClient != nil { - logClient.Info(ctx, fmt.Sprintf("processing incoming transaction: %v", incomingTx)) + if logClient == nil { + logClient = incomingTx.client.Logger() } + logClient.Info(ctx, fmt.Sprintf("processIncomingTransaction(): transaction: %v", incomingTx)) + // Successfully capture any panics, convert to readable string and log the error - defer func() { - if err := recover(); err != nil { - incomingTx.Client().Logger().Error(ctx, - fmt.Sprintf( - "panic: %v - stack trace: %v", err, - strings.ReplaceAll(string(debug.Stack()), "\n", ""), - ), - ) - } - }() + defer recoverAndLog(ctx, incomingTx.client.Logger()) // Create the lock and set the release for after the function completes unlock, err := newWriteLock( @@ -305,9 +313,7 @@ func processIncomingTransaction(ctx context.Context, logClient zLogger.GormLogge ctx, incomingTx.ID, chainstate.RequiredInMempool, defaultQueryTxTimeout, ); err != nil { - if logClient != nil { - logClient.Error(ctx, fmt.Sprintf("error finding transaction %s on chain: %s", incomingTx.ID, err.Error())) - } + logClient.Error(ctx, fmt.Sprintf("processIncomingTransaction(): error finding transaction %s on chain. Reason: %s", incomingTx.ID, err)) // TX might not have been broadcast yet? (race condition, or it was never broadcast...) if errors.Is(err, chainstate.ErrTransactionNotFound) { @@ -322,42 +328,51 @@ func processIncomingTransaction(ctx context.Context, logClient zLogger.GormLogge } // Broadcast was successful, so the transaction was accepted by the network, continue processing like before - if logClient != nil { - logClient.Info(ctx, fmt.Sprintf("broadcast of transaction was successful using %s", provider)) - } + logClient.Info(ctx, fmt.Sprintf("processIncomingTransaction(): broadcast of transaction %s was successful using %s. Incoming tx will be processed again.", incomingTx.ID, provider)) + // allow propagation time.Sleep(3 * time.Second) - if txInfo, err = incomingTx.Client().Chainstate().QueryTransactionFastest( - ctx, incomingTx.ID, chainstate.RequiredInMempool, defaultQueryTxTimeout, - ); err != nil { - incomingTx.Status = statusReady - incomingTx.StatusMessage = "tx was not found on-chain, attempting to broadcast using provider: " + provider - _ = incomingTx.Save(ctx) - return err - } - } else { - // Actual error occurred - bailAndSaveIncomingTransaction(ctx, incomingTx, err.Error()) - return err + return nil // reprocess it when triggering the task again } + + // Actual error occurred + bailAndSaveIncomingTransaction(ctx, incomingTx, err.Error()) + return err } - if logClient != nil { - logClient.Info(ctx, fmt.Sprintf("found incoming transaction %s in %s", incomingTx.ID, txInfo.Provider)) + // validate txInfo + if txInfo.BlockHash == "" || txInfo.MerkleProof == nil || txInfo.MerkleProof.TxOrID == "" || len(txInfo.MerkleProof.Nodes) == 0 { + logClient.Warn(ctx, fmt.Sprintf("processIncomingTransaction(): txInfo for %s is invalid, will try again later", incomingTx.ID)) + + if incomingTx.client.IsDebug() { + txInfoJSON, _ := json.Marshal(txInfo) //nolint:nolintlint,nilerr // error is not needed + incomingTx.DebugLog(string(txInfoJSON)) + } + return nil } - // Create the new transaction model - transaction := newTransactionFromIncomingTransaction(incomingTx) + logClient.Info(ctx, fmt.Sprintf("found incoming transaction %s in %s", incomingTx.ID, txInfo.Provider)) - // Get the transaction by ID - if tx, _ := getTransactionByID( - ctx, transaction.rawXpubKey, transaction.TransactionBase.ID, transaction.client.DefaultModelOptions()..., - ); tx != nil { - transaction = tx + // Check if we have transaction in DB already + transaction, _ := getTransactionByID( + ctx, incomingTx.rawXpubKey, incomingTx.ID, incomingTx.client.DefaultModelOptions()..., + ) + + if transaction == nil { + // Create the new transaction model + transaction = newTransactionFromIncomingTransaction(incomingTx) + + if err = transaction.processUtxos(ctx); err != nil { + logClient.Error(ctx, fmt.Sprintf("processIncomingTransaction(): processUtxos() for %s failed. Reason: %s", incomingTx.ID, err)) + return err + } + + transaction.TotalValue, transaction.Fee = transaction.getValues() + transaction.NumberOfOutputs = uint32(len(transaction.TransactionBase.parsedTx.Outputs)) + transaction.NumberOfInputs = uint32(len(transaction.TransactionBase.parsedTx.Inputs)) } - // Add additional information (if found on-chain) - transaction.BlockHeight = uint64(txInfo.BlockHeight) - transaction.BlockHash = txInfo.BlockHash + + transaction.setChainInfo(txInfo) // Create status message onChain := len(transaction.BlockHash) > 0 || transaction.BlockHeight > 0 diff --git a/model_incoming_transactions_test.go b/model_incoming_transactions_test.go index d3285037..4f75effd 100644 --- a/model_incoming_transactions_test.go +++ b/model_incoming_transactions_test.go @@ -12,7 +12,7 @@ import ( func TestIncomingTransaction_GetModelName(t *testing.T) { t.Parallel() - bTx := newIncomingTransaction(testTxID, testTxHex, New()) + bTx := newIncomingTransaction(testTxHex, New()) assert.Equal(t, ModelIncomingTransaction.String(), bTx.GetModelName()) } diff --git a/model_merkle_proof.go b/model_merkle_proof.go index 075e8167..fb351181 100644 --- a/model_merkle_proof.go +++ b/model_merkle_proof.go @@ -58,8 +58,8 @@ func (m MerkleProof) Value() (driver.Value, error) { } // ToBUMP transform Merkle Proof to BUMP -func (m *MerkleProof) ToBUMP() BUMP { - bump := BUMP{} +func (m *MerkleProof) ToBUMP(blockHeight uint64) BUMP { + bump := BUMP{BlockHeight: blockHeight} height := len(m.Nodes) if height == 0 { @@ -79,7 +79,11 @@ func (m *MerkleProof) ToBUMP() BUMP { } txIDPath2 := BUMPLeaf{ Offset: offsetPair(offset), - Hash: m.Nodes[0], + } + if m.Nodes[0] != "*" { + txIDPath2.Hash = m.Nodes[0] + } else { + txIDPath2.Duplicate = true } if offset < pairOffset { @@ -94,10 +98,17 @@ func (m *MerkleProof) ToBUMP() BUMP { for i := 1; i < height; i++ { p := make([]BUMPLeaf, 0) offset = parentOffset(offset) - p = append(p, BUMPLeaf{ - Offset: offset, - Hash: m.Nodes[i], - }) + + leaf := BUMPLeaf{Offset: offset} + + isDuplicate := m.Nodes[i] == "*" + if !isDuplicate { + leaf.Hash = m.Nodes[i] + } else { + leaf.Duplicate = true + } + + p = append(p, leaf) path = append(path, p) } bump.Path = path diff --git a/model_merkle_proof_test.go b/model_merkle_proof_test.go index 83d45938..19d8a737 100644 --- a/model_merkle_proof_test.go +++ b/model_merkle_proof_test.go @@ -11,12 +11,15 @@ func TestMerkleProofModel_ToBUMP(t *testing.T) { t.Parallel() t.Run("Valid Merkle Proof #1", func(t *testing.T) { + // given + blockHeight := uint64(0) mp := MerkleProof{ Index: 1, TxOrID: "txId", Nodes: []string{"node0", "node1", "node2", "node3"}, } expectedBUMP := BUMP{ + BlockHeight: blockHeight, Path: [][]BUMPLeaf{ { {Offset: 0, Hash: "node0"}, @@ -33,17 +36,24 @@ func TestMerkleProofModel_ToBUMP(t *testing.T) { }, }, } - actualBUMP := mp.ToBUMP() + + // when + actualBUMP := mp.ToBUMP(blockHeight) + + // then assert.Equal(t, expectedBUMP, actualBUMP) }) t.Run("Valid Merkle Proof #2", func(t *testing.T) { + // given + blockHeight := uint64(0) mp := MerkleProof{ Index: 14, TxOrID: "txId", Nodes: []string{"node0", "node1", "node2", "node3", "node4"}, } expectedBUMP := BUMP{ + BlockHeight: blockHeight, Path: [][]BUMPLeaf{ { {Offset: 14, Hash: "txId", TxID: true}, @@ -63,13 +73,55 @@ func TestMerkleProofModel_ToBUMP(t *testing.T) { }, }, } - actualBUMP := mp.ToBUMP() + + // when + actualBUMP := mp.ToBUMP(blockHeight) + + // then + assert.Equal(t, expectedBUMP, actualBUMP) + }) + + t.Run("Valid Merkle Proof #3 - with *", func(t *testing.T) { + // given + blockHeight := uint64(0) + mp := MerkleProof{ + Index: 14, + TxOrID: "txId", + Nodes: []string{"*", "node1", "node2", "node3", "node4"}, + } + expectedBUMP := BUMP{ + BlockHeight: blockHeight, + Path: [][]BUMPLeaf{ + { + {Offset: 14, Hash: "txId", TxID: true}, + {Offset: 15, Duplicate: true}, + }, + { + {Offset: 6, Hash: "node1"}, + }, + { + {Offset: 2, Hash: "node2"}, + }, + { + {Offset: 0, Hash: "node3"}, + }, + { + {Offset: 1, Hash: "node4"}, + }, + }, + } + + // when + actualBUMP := mp.ToBUMP(blockHeight) + + // then assert.Equal(t, expectedBUMP, actualBUMP) }) t.Run("Empty Merkle Proof", func(t *testing.T) { + blockHeight := uint64(0) mp := MerkleProof{} - actualBUMP := mp.ToBUMP() - assert.Equal(t, BUMP{}, actualBUMP) + actualBUMP := mp.ToBUMP(blockHeight) + assert.Equal(t, BUMP{BlockHeight: blockHeight}, actualBUMP) }) } diff --git a/model_sync_transactions.go b/model_sync_transactions.go index ab77d261..0a2c2904 100644 --- a/model_sync_transactions.go +++ b/model_sync_transactions.go @@ -108,17 +108,6 @@ func (m *SyncTransaction) BeforeCreating(_ context.Context) error { func (m *SyncTransaction) AfterCreated(ctx context.Context) error { m.DebugLog("starting: " + m.Name() + " AfterCreated hook...") - // Should we broadcast immediately? - if m.Configuration.Broadcast && - m.Configuration.BroadcastInstant { - if err := processBroadcastTransaction( // TODO: remove business logic - ctx, m, - ); err != nil { - // return err (do not return and fail the record creation) - m.Client().Logger().Error(ctx, "error running broadcast tx: "+err.Error()) - } - } - m.DebugLog("end: " + m.Name() + " AfterCreated hook") return nil } diff --git a/model_sync_transactions_test.go b/model_sync_transactions_test.go index ece2f081..7676ad5f 100644 --- a/model_sync_transactions_test.go +++ b/model_sync_transactions_test.go @@ -34,12 +34,12 @@ func Test_areParentsBroadcast(t *testing.T) { txErr := tx.Save(ctx) require.NoError(t, txErr) - tx = newTransaction(testTx2Hex, append(opts, New())...) - txErr = tx.Save(ctx) + tx2 := newTransaction(testTx2Hex, append(opts, New())...) + txErr = tx2.Save(ctx) require.NoError(t, txErr) - tx = newTransaction(testTx3Hex, append(opts, New())...) - txErr = tx.Save(ctx) + tx3 := newTransaction(testTx3Hex, append(opts, New())...) + txErr = tx3.Save(ctx) require.NoError(t, txErr) // input of testTxID @@ -54,7 +54,7 @@ func Test_areParentsBroadcast(t *testing.T) { require.NoError(t, txErr) type args struct { - tx *SyncTransaction + tx *Transaction opts []ModelOps } tests := []struct { @@ -66,7 +66,7 @@ func Test_areParentsBroadcast(t *testing.T) { { name: "no parents", args: args{ - tx: newSyncTransaction(testTxID3, &SyncConfig{SyncOnChain: true, Broadcast: true}, New()), + tx: tx3, opts: opts, }, want: true, @@ -75,7 +75,7 @@ func Test_areParentsBroadcast(t *testing.T) { { name: "parent not broadcast", args: args{ - tx: newSyncTransaction(testTxID2, &SyncConfig{SyncOnChain: true, Broadcast: true}, New()), + tx: tx2, opts: opts, }, want: false, @@ -84,7 +84,7 @@ func Test_areParentsBroadcast(t *testing.T) { { name: "parent broadcast", args: args{ - tx: newSyncTransaction(testTxID, &SyncConfig{SyncOnChain: true, Broadcast: true}, New()), + tx: tx, opts: opts, }, want: true, @@ -93,7 +93,7 @@ func Test_areParentsBroadcast(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := _areParentsBroadcast(ctx, tt.args.tx, tt.args.opts...) + got, err := _areParentsBroadcasted(ctx, tt.args.tx, tt.args.opts...) if !tt.wantErr(t, err, fmt.Sprintf("areParentsBroadcast(%v, %v, %v)", ctx, tt.args.tx, tt.args.opts)) { return } diff --git a/model_transactions.go b/model_transactions.go index 39140aad..0e149c57 100644 --- a/model_transactions.go +++ b/model_transactions.go @@ -2,8 +2,8 @@ package bux import ( "context" - "encoding/hex" + "github.com/BuxOrg/bux/chainstate" "github.com/BuxOrg/bux/taskmanager" "github.com/BuxOrg/bux/utils" "github.com/libsv/go-bt/v2" @@ -123,11 +123,6 @@ func newTransactionFromIncomingTransaction(incomingTx *IncomingTransaction) *Tra _ = tx.setID() } - // Set the fields - tx.NumberOfOutputs = uint32(len(tx.TransactionBase.parsedTx.Outputs)) - tx.NumberOfInputs = uint32(len(tx.TransactionBase.parsedTx.Inputs)) - tx.Status = statusProcessing - return tx } @@ -228,67 +223,20 @@ func (m *Transaction) isExternal() bool { return m.draftTransaction == nil } -// processTxInputs will process the transaction inputs -func (m *Transaction) processInputs(ctx context.Context) (err error) { - // Pre-build the options - opts := m.GetOptions(false) - client := m.Client() - - var utxo *Utxo - - // check whether we are spending an internal utxo - for index := range m.TransactionBase.parsedTx.Inputs { - // todo: optimize this SQL SELECT to get all utxos in one query? - if utxo, err = m.transactionService.getUtxo(ctx, - hex.EncodeToString(m.TransactionBase.parsedTx.Inputs[index].PreviousTxID()), - m.TransactionBase.parsedTx.Inputs[index].PreviousTxOutIndex, - opts..., - ); err != nil { - return - } else if utxo != nil { // Found a UTXO record - - // Is Spent? - if len(utxo.SpendingTxID.String) > 0 { - return ErrUtxoAlreadySpent - } - - // Only if IUC is enabled (or client is nil which means its enabled by default) - if client == nil || client.IsIUCEnabled() { - - // check whether the utxo is spent - isReserved := len(utxo.DraftID.String) > 0 - matchesDraft := m.draftTransaction != nil && utxo.DraftID.String == m.draftTransaction.ID - - // Check whether the spending transaction was reserved by the draft transaction (in the utxo) - if !isReserved { - return ErrUtxoNotReserved - } - if !matchesDraft { - return ErrDraftIDMismatch - } - } - - // Update the output value - if _, ok := m.XpubOutputValue[utxo.XpubID]; !ok { - m.XpubOutputValue[utxo.XpubID] = 0 - } - m.XpubOutputValue[utxo.XpubID] -= int64(utxo.Satoshis) - - // Mark utxo as spent - utxo.SpendingTxID.Valid = true - utxo.SpendingTxID.String = m.ID - m.utxos = append(m.utxos, *utxo) +func (m *Transaction) setChainInfo(txInfo *chainstate.TransactionInfo) { + m.BlockHash = txInfo.BlockHash + m.BlockHeight = uint64(txInfo.BlockHeight) + m.setMerkleRoot(txInfo) +} - // Add the xPub ID - if !utils.StringInSlice(utxo.XpubID, m.XpubInIDs) { - m.XpubInIDs = append(m.XpubInIDs, utxo.XpubID) - } - } +func (m *Transaction) setMerkleRoot(txInfo *chainstate.TransactionInfo) { + if txInfo.MerkleProof != nil { + mp := MerkleProof(*txInfo.MerkleProof) + m.MerkleProof = mp - // todo: what if the utxo is nil (not found)? + bump := mp.ToBUMP(uint64(txInfo.BlockHeight)) + m.BUMP = bump } - - return } // IsXpubAssociated will check if this key is associated to this transaction diff --git a/model_transactions_test.go b/model_transactions_test.go index c1550a84..4487e679 100644 --- a/model_transactions_test.go +++ b/model_transactions_test.go @@ -359,7 +359,7 @@ func TestTransaction_processInputs(t *testing.T) { transaction.transactionService = transactionServiceMock{} ctx := context.Background() - err := transaction.processInputs(ctx) + err := transaction._processInputs(ctx) require.NoError(t, err) assert.Nil(t, transaction.utxos) assert.Nil(t, transaction.XpubInIDs) @@ -392,7 +392,7 @@ func TestTransaction_processInputs(t *testing.T) { } ctx := context.Background() - err := transaction.processInputs(ctx) + err := transaction._processInputs(ctx) require.NoError(t, err) require.NotNil(t, transaction.utxos) assert.IsType(t, Utxo{}, transaction.utxos[0]) @@ -436,7 +436,7 @@ func TestTransaction_processInputs(t *testing.T) { } ctx := context.Background() - err := transaction.processInputs(ctx) + err := transaction._processInputs(ctx) require.ErrorIs(t, err, ErrUtxoAlreadySpent) }) @@ -463,7 +463,7 @@ func TestTransaction_processInputs(t *testing.T) { } ctx := context.Background() - err := transaction.processInputs(ctx) + err := transaction._processInputs(ctx) require.ErrorIs(t, err, ErrUtxoNotReserved) }) @@ -494,7 +494,7 @@ func TestTransaction_processInputs(t *testing.T) { } ctx := context.Background() - err := transaction.processInputs(ctx) + err := transaction._processInputs(ctx) require.ErrorIs(t, err, ErrDraftIDMismatch) }) @@ -523,7 +523,7 @@ func TestTransaction_processInputs(t *testing.T) { }, } - err := transaction.processInputs(ctx) + err := transaction._processInputs(ctx) require.NoError(t, err) }) @@ -556,7 +556,7 @@ func TestTransaction_processInputs(t *testing.T) { }, } - err := transaction.processInputs(ctx) + err := transaction._processInputs(ctx) require.NoError(t, err) require.NotNil(t, transaction.utxos) assert.IsType(t, Utxo{}, transaction.utxos[0]) @@ -691,6 +691,16 @@ func (ts *EmbeddedDBTestSuite) TestTransaction_Save() { transaction := newTransaction(testTxHex, append(tc.client.DefaultModelOptions(), New())...) require.NotNil(t, transaction) + err = transaction.processUtxos(tc.ctx) + require.NoError(t, err) + + transaction.TotalValue, transaction.Fee = transaction.getValues() + + if transaction.TransactionBase.parsedTx != nil { + transaction.NumberOfInputs = uint32(len(transaction.TransactionBase.parsedTx.Inputs)) + transaction.NumberOfOutputs = uint32(len(transaction.TransactionBase.parsedTx.Outputs)) + } + err = transaction.Save(tc.ctx) require.NoError(t, err) @@ -744,6 +754,15 @@ func (ts *EmbeddedDBTestSuite) TestTransaction_Save() { transactionIn := newTransaction(testTx2Hex, append(tc.client.DefaultModelOptions(), New())...) require.NotNil(t, transactionIn) + err = transactionIn.processUtxos(tc.ctx) + require.NoError(t, err) + + transactionIn.TotalValue, transactionIn.Fee = transactionIn.getValues() + + if transactionIn.TransactionBase.parsedTx != nil { + transactionIn.NumberOfInputs = uint32(len(transactionIn.TransactionBase.parsedTx.Inputs)) + transactionIn.NumberOfOutputs = uint32(len(transactionIn.TransactionBase.parsedTx.Outputs)) + } err = transactionIn.Save(tc.ctx) require.NoError(t, err) @@ -769,10 +788,23 @@ func (ts *EmbeddedDBTestSuite) TestTransaction_Save() { require.NoError(t, err) // this transaction should spend the utxo of the IN transaction - transaction := newTransaction(testTxHex, + transaction := newTransactionWithDraftID(testTxHex, draftTransaction.ID, append(tc.client.DefaultModelOptions(), WithXPub(xPub.rawXpubKey), New())...) require.NotNil(t, transactionIn) - transaction.DraftID = draftTransaction.ID + + transaction.draftTransaction = draftTransaction + + err = transaction.processUtxos(tc.ctx) + require.NoError(t, err) + + // Set the values from the inputs/outputs and draft tx + transaction.TotalValue, transaction.Fee = transactionIn.getValues() + + // Add values if found + if transaction.TransactionBase.parsedTx != nil { + transaction.NumberOfInputs = uint32(len(transaction.TransactionBase.parsedTx.Inputs)) + transaction.NumberOfOutputs = uint32(len(transaction.TransactionBase.parsedTx.Outputs)) + } err = transaction.Save(tc.ctx) require.NoError(t, err) diff --git a/paymail_service_provider.go b/paymail_service_provider.go index 4baadc94..0035dee1 100644 --- a/paymail_service_provider.go +++ b/paymail_service_provider.go @@ -4,7 +4,6 @@ import ( "context" "database/sql" "encoding/hex" - "errors" "fmt" "time" @@ -12,8 +11,6 @@ import ( "github.com/bitcoin-sv/go-paymail/server" "github.com/bitcoinschema/go-bitcoin/v2" "github.com/libsv/go-bk/bec" - "github.com/libsv/go-bt/v2" - "github.com/mrz1836/go-datastore" customTypes "github.com/mrz1836/go-datastore/custom_types" "github.com/BuxOrg/bux/chainstate" @@ -149,47 +146,32 @@ func (p *PaymailDefaultServiceProvider) CreateP2PDestinationResponse( } // RecordTransaction will record the transaction -func (p *PaymailDefaultServiceProvider) RecordTransaction( - ctx context.Context, - p2pTx *paymail.P2PTransaction, - requestMetadata *server.RequestMetadata, -) (*paymail.P2PTransactionPayload, error) { +// TODO: rename to HandleReceivedP2pTransaction +func (p *PaymailDefaultServiceProvider) RecordTransaction(ctx context.Context, + p2pTx *paymail.P2PTransaction, requestMetadata *server.RequestMetadata) (*paymail.P2PTransactionPayload, error) { + // Create the metadata - metadata := p.createMetadata(requestMetadata, "RecordTransaction") + metadata := p.createMetadata(requestMetadata, "HandleReceivedP2pTransaction") metadata[p2pMetadataField] = p2pTx.MetaData metadata[ReferenceIDField] = p2pTx.Reference - var draftID string - if tx, _ := p.client.GetTransactionByHex(ctx, p2pTx.Hex); tx != nil { - draftID = tx.DraftID - } - // Record the transaction - transaction, err := p.client.RecordTransaction( - ctx, "", p2pTx.Hex, draftID, []ModelOps{WithMetadatas(metadata)}..., - ) - // do not return an error if we already have the transaction - if err != nil && !errors.Is(err, datastore.ErrDuplicateKey) { + rts, err := getRecordTxStrategy(ctx, p.client, "", p2pTx.Hex, "") + if err != nil { return nil, err } - // we need to set the tx ID here, since our transaction will be empty if we already had it in the DB - txID := "" - if transaction != nil { - txID = transaction.ID - } else { - var btTx *bt.Tx - btTx, err = bt.NewTxFromString(p2pTx.Hex) - if err != nil { - return nil, err - } - txID = btTx.TxID() + rts.(recordIncomingTxStrategy).ForceBroadcast(true) + + transaction, err := recordTransaction(ctx, p.client, rts, WithMetadatas(metadata)) + if err != nil { + return nil, err } // Return the response from the p2p request return &paymail.P2PTransactionPayload{ Note: p2pTx.MetaData.Note, - TxID: txID, + TxID: transaction.ID, }, nil } @@ -212,6 +194,8 @@ func (p *PaymailDefaultServiceProvider) createPaymailInformation(ctx context.Con paymailAddress, err = getPaymailAddress(ctx, alias+"@"+domain, opts...) if err != nil { return nil, nil, err + } else if paymailAddress == nil { + return nil, nil, ErrMissingPaymail } unlock, err := newWaitWriteLock(ctx, lockKey(paymailAddress), p.client.Cachestore()) diff --git a/record_tx.go b/record_tx.go new file mode 100644 index 00000000..c51811f8 --- /dev/null +++ b/record_tx.go @@ -0,0 +1,97 @@ +package bux + +import ( + "context" + "fmt" + "time" +) + +type recordTxStrategy interface { + TxID() string + Validate() error + Execute(ctx context.Context, c ClientInterface, opts []ModelOps) (*Transaction, error) +} + +type recordIncomingTxStrategy interface { + ForceBroadcast(force bool) +} + +func recordTransaction(ctx context.Context, c ClientInterface, strategy recordTxStrategy, opts ...ModelOps) (*Transaction, error) { + unlock := waitForRecordTxWriteLock(ctx, c, strategy.TxID()) + defer unlock() + + return strategy.Execute(ctx, c, opts) +} + +func getRecordTxStrategy(ctx context.Context, c ClientInterface, xPubKey, txHex, draftID string) (recordTxStrategy, error) { + var rts recordTxStrategy + + if draftID != "" { + rts = getOutgoingTxRecordStrategy(xPubKey, txHex, draftID) + } else { + var err error + rts, err = getIncomingTxRecordStrategy(ctx, c, txHex) + + if err != nil { + return nil, err + } + } + + if err := rts.Validate(); err != nil { + return nil, err + } + + return rts, nil +} + +func getOutgoingTxRecordStrategy(xPubKey, txHex, draftID string) recordTxStrategy { + return &outgoingTx{ + Hex: txHex, + RelatedDraftID: draftID, + XPubKey: xPubKey, + } +} + +func getIncomingTxRecordStrategy(ctx context.Context, c ClientInterface, txHex string) (recordTxStrategy, error) { + tx, err := getTransactionByHex(ctx, txHex, c.DefaultModelOptions()...) + if err != nil { + return nil, err + } + + var rts recordTxStrategy + + if tx != nil { + rts = &internalIncomingTx{ + Tx: tx, + BroadcastNow: false, + } + } else { + rts = &externalIncomingTx{ + Hex: txHex, + BroadcastNow: false, + } + } + + return rts, nil +} + +func waitForRecordTxWriteLock(ctx context.Context, c ClientInterface, key string) func() { + var ( + unlock func() + err error + ) + // Create the lock and set the release for after the function completes + // Waits for the moment when the transaction is unlocked and creates a new lock + // Relevant for bux to bux transactions, as we have 1 tx but need to record 2 txs - outgoing and incoming + for { + unlock, err = newWriteLock( + ctx, fmt.Sprintf(lockKeyRecordTx, key), c.Cachestore(), + ) + if err == nil { + break + } + time.Sleep(time.Second * 1) + } + + return unlock +} diff --git a/record_tx_strategy_external_incoming_tx.go b/record_tx_strategy_external_incoming_tx.go new file mode 100644 index 00000000..a0dec99a --- /dev/null +++ b/record_tx_strategy_external_incoming_tx.go @@ -0,0 +1,131 @@ +package bux + +import ( + "context" + "fmt" + + "github.com/libsv/go-bt/v2" + zLogger "github.com/mrz1836/go-logger" +) + +type externalIncomingTx struct { + Hex string + BroadcastNow bool // e.g. BEEF must be broadcasted now +} + +func (tx *externalIncomingTx) Execute(ctx context.Context, c ClientInterface, opts []ModelOps) (*Transaction, error) { + logger := c.Logger() + + // process + if !tx.BroadcastNow && c.IsITCEnabled() { // do not save transaction to database now, save IncomingTransaction instead and let task manager handle and process it + return _addTxToCheck(ctx, tx, c, opts) + } + + transaction, err := _createExternalTxToRecord(ctx, tx, c, opts) + if err != nil { + return nil, fmt.Errorf("ExternalIncomingTx.Execute(): creation of external incoming tx failed. Reason: %w", err) + } + + logger.Info(ctx, fmt.Sprintf("ExternalIncomingTx.Execute(): start without ITC, TxID: %s", transaction.ID)) + + if transaction.syncTransaction.BroadcastStatus == SyncStatusReady { + _externalIncomingBroadcast(ctx, logger, transaction) // ignore error, transaction will be broadcaset in a cron task + } + + // record + if err = transaction.Save(ctx); err != nil { + return nil, fmt.Errorf("ExternalIncomingTx.Execute(): saving of Transaction failed. Reason: %w", err) + } + + logger.Info(ctx, fmt.Sprintf("ExternalIncomingTx.Execute(): complete, TxID: %s", transaction.ID)) + return transaction, nil +} + +func (tx *externalIncomingTx) Validate() error { + if tx.Hex == "" { + return ErrMissingFieldHex + } + + return nil // is valid +} + +func (tx *externalIncomingTx) TxID() string { + btTx, _ := bt.NewTxFromString(tx.Hex) + return btTx.TxID() +} + +func (tx *externalIncomingTx) ForceBroadcast(force bool) { + tx.BroadcastNow = force +} + +func _addTxToCheck(ctx context.Context, tx *externalIncomingTx, c ClientInterface, opts []ModelOps) (*Transaction, error) { + logger := c.Logger() + + incomingTx := newIncomingTransaction(tx.Hex, c.DefaultModelOptions(append(opts, New())...)...) + + logger.Info(ctx, fmt.Sprintf("ExternalIncomingTx.Execute(): start ITC, TxID: %s", incomingTx.ID)) + + if err := incomingTx.Save(ctx); err != nil { + return nil, fmt.Errorf("ExternalIncomingTx.Execute(): addind new IncomingTx to check queue failed. Reason: %w", err) + } + + result := incomingTx.toTransactionDto() + result.Status = statusProcessing + + logger.Info(ctx, fmt.Sprintf("ExternalIncomingTx.Execute(): complete ITC, TxID: %s", incomingTx.ID)) + return result, nil +} + +func _createExternalTxToRecord(ctx context.Context, eTx *externalIncomingTx, c ClientInterface, opts []ModelOps) (*Transaction, error) { + // Create NEW tx model + tx := newTransaction(eTx.Hex, c.DefaultModelOptions(append(opts, New())...)...) + _hydrateExternalWithSync(tx) + + if !tx.TransactionBase.hasOneKnownDestination(ctx, c, tx.GetOptions(false)...) { + return nil, ErrNoMatchingOutputs + } + + if err := tx.processUtxos(ctx); err != nil { + return nil, err + } + + tx.TotalValue, tx.Fee = tx.getValues() + if tx.TransactionBase.parsedTx != nil { + tx.NumberOfInputs = uint32(len(tx.TransactionBase.parsedTx.Inputs)) + tx.NumberOfOutputs = uint32(len(tx.TransactionBase.parsedTx.Outputs)) + } + + return tx, nil +} + +func _hydrateExternalWithSync(tx *Transaction) { + sync := newSyncTransaction( + tx.ID, + tx.Client().DefaultSyncConfig(), + tx.GetOptions(true)..., + ) + + // to simplfy: broadcast every external incoming txs + sync.BroadcastStatus = SyncStatusReady + + sync.P2PStatus = SyncStatusSkipped // the owner of the Tx should have already notified paymail providers + //sync.SyncStatus = SyncStatusReady + + // Use the same metadata + sync.Metadata = tx.Metadata + sync.transaction = tx + tx.syncTransaction = sync +} + +func _externalIncomingBroadcast(ctx context.Context, logger zLogger.GormLoggerInterface, tx *Transaction) { + logger.Info(ctx, fmt.Sprintf("ExternalIncomingTx.Execute(): start broadcast, TxID: %s", tx.ID)) + + if err := broadcastSyncTransaction(ctx, tx.syncTransaction); err != nil { + // ignore error, transaction will be broadcaset in a cron task + logger. + Warn(ctx, fmt.Sprintf("ExternalIncomingTx.Execute(): broadcasting failed. Reason: %s, TxID: %s", err, tx.ID)) + } else { + logger. + Info(ctx, fmt.Sprintf("ExternalIncomingTx.Execute(): broadcast complete, TxID: %s", tx.ID)) + } +} diff --git a/record_tx_strategy_internal_incoming_tx.go b/record_tx_strategy_internal_incoming_tx.go new file mode 100644 index 00000000..7fedbafb --- /dev/null +++ b/record_tx_strategy_internal_incoming_tx.go @@ -0,0 +1,77 @@ +package bux + +import ( + "context" + "errors" + "fmt" + + zLogger "github.com/mrz1836/go-logger" +) + +type internalIncomingTx struct { + Tx *Transaction + BroadcastNow bool // e.g. BEEF must be broadcasted now +} + +func (tx *internalIncomingTx) Execute(ctx context.Context, c ClientInterface, opts []ModelOps) (*Transaction, error) { + logger := c.Logger() + logger.Info(ctx, fmt.Sprintf("InternalIncomingTx.Execute(): start, TxID: %s", tx.Tx.ID)) + + // process + transaction := tx.Tx + syncTx, err := GetSyncTransactionByID(ctx, transaction.ID, transaction.GetOptions(false)...) + if err != nil { + return nil, fmt.Errorf("InternalIncomingTx.Execute(): getting syncTx failed. Reason: %w", err) + } + + if tx.BroadcastNow || syncTx.BroadcastStatus == SyncStatusReady { + syncTx.transaction = transaction + transaction.syncTransaction = syncTx + + _internalIncomingBroadcast(ctx, logger, transaction) // ignore broadcast error - will be repeted by task manager + } + + logger.Info(ctx, fmt.Sprintf("InternalIncomingTx.Execute(): complete, TxID: %s", transaction.ID)) + return transaction, nil +} + +func (tx *internalIncomingTx) Validate() error { + if tx.Tx == nil { + return errors.New("Tx cannot be nil") + } + + return nil // is valid +} + +func (tx *internalIncomingTx) TxID() string { + return tx.Tx.ID +} + +func (tx *internalIncomingTx) ForceBroadcast(force bool) { + tx.BroadcastNow = force +} + +func _internalIncomingBroadcast(ctx context.Context, logger zLogger.GormLoggerInterface, transaction *Transaction) { + logger.Info(ctx, fmt.Sprintf("InternalIncomingTx.Execute(): start broadcast, TxID: %s", transaction.ID)) + + syncTx := transaction.syncTransaction + err := broadcastSyncTransaction(ctx, syncTx) + if err != nil { + logger. + Warn(ctx, fmt.Sprintf("InternalIncomingTx.Execute(): broadcasting failed. Reason: %s, TxID: %s", err, transaction.ID)) + + if syncTx.BroadcastStatus == SyncStatusSkipped { // revert status to ready after fail to re-run broadcasting, this can happen when we received internal BEEF tx + syncTx.BroadcastStatus = SyncStatusReady + + if err = syncTx.Save(ctx); err != nil { + logger. + Error(ctx, fmt.Sprintf("InternalIncomingTx.Execute(): changing synctx.BroadcastStatus from Pending to Ready failed. Reason: %s, TxID: %s", err, transaction.ID)) + } + } + + // ignore broadcast error - will be repeted by task manager + } else { + logger. + Info(ctx, fmt.Sprintf("InternalIncomingTx.Execute(): broadcast complete, TxID: %s", transaction.ID)) + } +} diff --git a/record_tx_strategy_outgoing_tx.go b/record_tx_strategy_outgoing_tx.go new file mode 100644 index 00000000..b33efe3e --- /dev/null +++ b/record_tx_strategy_outgoing_tx.go @@ -0,0 +1,194 @@ +package bux + +import ( + "context" + "errors" + "fmt" + + "github.com/libsv/go-bt/v2" + zLogger "github.com/mrz1836/go-logger" +) + +type outgoingTx struct { + Hex string + RelatedDraftID string + XPubKey string +} + +func (tx *outgoingTx) Execute(ctx context.Context, c ClientInterface, opts []ModelOps) (*Transaction, error) { + logger := c.Logger() + + // process + transaction, err := _createOutgoingTxToRecord(ctx, tx, c, opts) + + logger.Info(ctx, fmt.Sprintf("OutgoingTx.Execute(): start, TxID: %s", transaction.ID)) + + if err != nil { + return nil, fmt.Errorf("OutgoingTx.Execute(): creation of outgoing tx failed. Reason: %w", err) + } + + if transaction.syncTransaction.P2PStatus == SyncStatusReady { + if err = _outgoingNotifyP2p(ctx, logger, transaction); err != nil { + return nil, err // reject transaction if P2P notification failed + } + } + + if transaction.syncTransaction.BroadcastStatus == SyncStatusReady { + _outgoingBroadcast(ctx, logger, transaction) // ignore error, transaction will be broadcasted by cron task + } + + // record + if err = transaction.Save(ctx); err != nil { + return nil, fmt.Errorf("OutgoingTx.Execute(): saving of Transaction failed. Reason: %w", err) + } + + logger.Info(ctx, fmt.Sprintf("OutgoingTx.Execute(): complete, TxID: %s", transaction.ID)) + return transaction, nil +} + +func (tx outgoingTx) Validate() error { + if tx.Hex == "" { + return ErrMissingFieldHex + } + + if tx.RelatedDraftID == "" { + return errors.New("empty RelatedDraftID") + } + + if tx.XPubKey == "" { + return errors.New("empty xPubKey") // is it required ? + } + + return nil // is valid +} + +func (tx outgoingTx) TxID() string { + btTx, _ := bt.NewTxFromString(tx.Hex) + return btTx.TxID() +} + +func _createOutgoingTxToRecord(ctx context.Context, oTx *outgoingTx, c ClientInterface, opts []ModelOps) (*Transaction, error) { + // Create NEW transaction model + newOpts := c.DefaultModelOptions(append(opts, WithXPub(oTx.XPubKey), New())...) + tx := newTransactionWithDraftID( + oTx.Hex, oTx.RelatedDraftID, newOpts..., + ) + + // hydrate + if err := _hydrateOutgoingWithDraft(ctx, tx); err != nil { + return nil, err + } + + _hydrateOutgoingWithSync(tx) + + if err := tx.processUtxos(ctx); err != nil { + return nil, err + } + + tx.TotalValue, tx.Fee = tx.getValues() + if tx.TransactionBase.parsedTx != nil { + tx.NumberOfInputs = uint32(len(tx.TransactionBase.parsedTx.Inputs)) + tx.NumberOfOutputs = uint32(len(tx.TransactionBase.parsedTx.Outputs)) + } + + return tx, nil +} + +func _hydrateOutgoingWithDraft(ctx context.Context, tx *Transaction) error { + draft, err := getDraftTransactionID(ctx, tx.XPubID, tx.DraftID, tx.GetOptions(false)...) + + if err != nil { + return err + } + + if draft == nil { + return ErrDraftNotFound + } + + if len(draft.Configuration.Outputs) == 0 { + return errors.New("corresponding draft transaction has no outputs") + } + + if draft.Configuration.Sync == nil { + draft.Configuration.Sync = tx.Client().DefaultSyncConfig() + } + + tx.draftTransaction = draft + + return nil // success +} + +func _hydrateOutgoingWithSync(tx *Transaction) { + sync := newSyncTransaction(tx.ID, tx.draftTransaction.Configuration.Sync, tx.GetOptions(true)...) + + // setup synchronization + sync.BroadcastStatus = _getBroadcastSyncStatus(tx) + sync.P2PStatus = _getP2pSyncStatus(tx) + //sync.SyncStatus = SyncStatusReady + + sync.Metadata = tx.Metadata + + sync.transaction = tx + tx.syncTransaction = sync +} + +func _getBroadcastSyncStatus(tx *Transaction) SyncStatus { + // immediately broadcast if is not BEEF + broadcast := SyncStatusReady // broadcast immediately + + outputs := tx.draftTransaction.Configuration.Outputs + + for _, o := range outputs { + if o.PaymailP4 != nil { + if o.PaymailP4.Format == BeefPaymailPayloadFormat { + broadcast = SyncStatusSkipped // postpone broadcasting if tx contains outputs in BEEF + + break + } + } + } + + return broadcast +} + +func _getP2pSyncStatus(tx *Transaction) SyncStatus { + p2pStatus := SyncStatusSkipped + + outputs := tx.draftTransaction.Configuration.Outputs + for _, o := range outputs { + if o.PaymailP4 != nil && o.PaymailP4.ResolutionType == ResolutionTypeP2P { + p2pStatus = SyncStatusReady // notify p2p immediately + + break + } + } + + return p2pStatus +} + +func _outgoingNotifyP2p(ctx context.Context, logger zLogger.GormLoggerInterface, tx *Transaction) error { + logger.Info(ctx, fmt.Sprintf("OutgoingTx.Execute(): start p2p, TxID: %s", tx.ID)) + + if err := processP2PTransaction(ctx, tx.syncTransaction, tx); err != nil { + logger. + Error(ctx, fmt.Sprintf("OutgoingTx.Execute(): processP2PTransaction failed. Reason: %s, TxID: %s", err, tx.ID)) + + return err + } + + logger.Info(ctx, fmt.Sprintf("OutgoingTx.Execute(): p2p complete, TxID: %s", tx.ID)) + return nil +} + +func _outgoingBroadcast(ctx context.Context, logger zLogger.GormLoggerInterface, tx *Transaction) { + logger.Info(ctx, fmt.Sprintf("OutgoingTx.Execute(): start broadcast, TxID: %s", tx.ID)) + + if err := broadcastSyncTransaction(ctx, tx.syncTransaction); err != nil { + // ignore error, transaction will be broadcasted by cron task + logger. + Warn(ctx, fmt.Sprintf("OutgoingTx.Execute(): broadcasting failed. Reason: %s, TxID: %s", err, tx.ID)) + } else { + logger. + Info(ctx, fmt.Sprintf("OutgoingTx.Execute(): broadcast complete, TxID: %s", tx.ID)) + } +} diff --git a/sync_tx_repository.go b/sync_tx_repository.go index 67702325..a1eaa4d4 100644 --- a/sync_tx_repository.go +++ b/sync_tx_repository.go @@ -37,9 +37,9 @@ func GetSyncTransactionByID(ctx context.Context, id string, opts ...ModelOps) (* // getTransactionsToBroadcast will get the sync transactions to broadcast func getTransactionsToBroadcast(ctx context.Context, queryParams *datastore.QueryParams, opts ...ModelOps, -) (map[string][]*SyncTransaction, error) { +) ([]*SyncTransaction, error) { // Get the records by status - txs, err := _getSyncTransactionsByConditions( + scTxs, err := _getSyncTransactionsByConditions( ctx, map[string]interface{}{ broadcastStatusField: SyncStatusReady.String(), @@ -48,19 +48,25 @@ func getTransactionsToBroadcast(ctx context.Context, queryParams *datastore.Quer ) if err != nil { return nil, err + } else if len(scTxs) == 0 { + return nil, nil } - // group transactions by xpub and return including the tx itself - txsByXpub := make(map[string][]*SyncTransaction) - for _, tx := range txs { - if tx.transaction, err = getTransactionByID( - ctx, "", tx.ID, opts..., - ); err != nil { + // hydrate and see if it's ready to sync + res := make([]*SyncTransaction, 0, len(scTxs)) + + for _, sTx := range scTxs { + // hydrate + sTx.transaction, err = getTransactionByID( + ctx, "", sTx.ID, opts..., + ) + if err != nil { return nil, err + } else if sTx.transaction == nil { + return nil, ErrMissingTransaction } - var parentsBroadcast bool - parentsBroadcast, err = _areParentsBroadcast(ctx, tx, opts...) + parentsBroadcast, err := _areParentsBroadcasted(ctx, sTx.transaction, opts...) if err != nil { return nil, err } @@ -70,20 +76,10 @@ func getTransactionsToBroadcast(ctx context.Context, queryParams *datastore.Quer continue } - xPubID := "" // fallback if we have no input xpubs - if len(tx.transaction.XpubInIDs) > 0 { - // use the first xpub for the grouping - // in most cases when we are broadcasting, there should be only 1 xpub in - xPubID = tx.transaction.XpubInIDs[0] - } - - if txsByXpub[xPubID] == nil { - txsByXpub[xPubID] = make([]*SyncTransaction, 0) - } - txsByXpub[xPubID] = append(txsByXpub[xPubID], tx) + res = append(res, sTx) } - return txsByXpub, nil + return res, nil } // getTransactionsToSync will get the sync transactions to sync @@ -160,25 +156,15 @@ func _getSyncTransactionsByConditions(ctx context.Context, conditions map[string return txs, nil } -func _areParentsBroadcast(ctx context.Context, syncTx *SyncTransaction, opts ...ModelOps) (bool, error) { - tx, err := getTransactionByID(ctx, "", syncTx.ID, opts...) - if err != nil { - return false, err - } - - if tx == nil { - return false, ErrMissingTransaction - } - +func _areParentsBroadcasted(ctx context.Context, tx *Transaction, opts ...ModelOps) (bool, error) { // get the sync transaction of all inputs - var btTx *bt.Tx - btTx, err = bt.NewTxFromString(tx.Hex) + btTx, err := bt.NewTxFromString(tx.Hex) if err != nil { return false, err } // check that all inputs we handled have been broadcast, or are not handled by Bux - parentsBroadcast := true + parentsBroadcasted := true for _, input := range btTx.Inputs { var parentTx *SyncTransaction previousTxID := hex.EncodeToString(bt.ReverseBytes(input.PreviousTxID())) @@ -188,9 +174,9 @@ func _areParentsBroadcast(ctx context.Context, syncTx *SyncTransaction, opts ... } // if we have a sync transaction, and it is not complete, then we cannot broadcast if parentTx != nil && parentTx.BroadcastStatus != SyncStatusComplete { - parentsBroadcast = false + parentsBroadcasted = false } } - return parentsBroadcast, nil + return parentsBroadcasted, nil } diff --git a/sync_tx_service.go b/sync_tx_service.go index c4c9363b..b815ccab 100644 --- a/sync_tx_service.go +++ b/sync_tx_service.go @@ -7,8 +7,6 @@ import ( "errors" "fmt" "runtime" - "runtime/debug" - "strings" "sync" "time" @@ -40,7 +38,7 @@ func processSyncTransactions(ctx context.Context, maxTransactions int, opts ...M // Process the incoming transaction for index := range records { - if err = _processSyncTransaction( + if err = _syncTxDataFromChain( ctx, records[index], nil, ); err != nil { return err @@ -60,20 +58,20 @@ func processBroadcastTransactions(ctx context.Context, maxTransactions int, opts } // Get maxTransactions records, grouped by xpub - txsByXpub, err := getTransactionsToBroadcast( - ctx, queryParams, opts..., - ) + snTxs, err := getTransactionsToBroadcast(ctx, queryParams, opts...) if err != nil { return err - } else if len(txsByXpub) == 0 { + } else if len(snTxs) == 0 { return nil } - wg := new(sync.WaitGroup) + // Process the transactions per xpub, in parallel + txsByXpub := _groupByXpub(snTxs) + // we limit the number of concurrent broadcasts to the number of cpus*2, since there is lots of IO wait limit := make(chan bool, runtime.NumCPU()*2) + wg := new(sync.WaitGroup) - // Process the transactions per xpub, in parallel for xPubID := range txsByXpub { limit <- true // limit the number of routines running at the same time wg.Add(1) @@ -82,7 +80,7 @@ func processBroadcastTransactions(ctx context.Context, maxTransactions int, opts defer func() { <-limit }() for _, tx := range txsByXpub[xPubID] { - if err = processBroadcastTransaction( + if err = broadcastSyncTransaction( ctx, tx, ); err != nil { tx.Client().Logger().Error(ctx, @@ -119,7 +117,7 @@ func processP2PTransactions(ctx context.Context, maxTransactions int, opts ...Mo // Process the incoming transaction for index := range records { - if err = _processP2PTransaction( + if err = processP2PTransaction( ctx, records[index], nil, ); err != nil { return err @@ -129,19 +127,10 @@ func processP2PTransactions(ctx context.Context, maxTransactions int, opts ...Mo return nil } -// processBroadcastTransaction will process a sync transaction record and broadcast it -func processBroadcastTransaction(ctx context.Context, syncTx *SyncTransaction) error { +// broadcastSyncTransaction will broadcast transaction related to syncTx record +func broadcastSyncTransaction(ctx context.Context, syncTx *SyncTransaction) error { // Successfully capture any panics, convert to readable string and log the error - defer func() { - if err := recover(); err != nil { - syncTx.Client().Logger().Error(ctx, - fmt.Sprintf( - "panic: %v - stack trace: %v", err, - strings.ReplaceAll(string(debug.Stack()), "\n", ""), - ), - ) - } - }() + defer recoverAndLog(ctx, syncTx.client.Logger()) // Create the lock and set the release for after the function completes unlock, err := newWriteLock( @@ -152,31 +141,26 @@ func processBroadcastTransaction(ctx context.Context, syncTx *SyncTransaction) e return err } - // Get the transaction - var transaction *Transaction - var incomingTransaction *IncomingTransaction + // Get the transaction HEX var txHex string if syncTx.transaction != nil && syncTx.transaction.Hex != "" { // the transaction has already been retrieved and added to the syncTx object, just use that - transaction = syncTx.transaction - txHex = transaction.Hex + txHex = syncTx.transaction.Hex } else { - if transaction, err = getTransactionByID( + // else get hex from DB + transaction, err := getTransactionByID( ctx, "", syncTx.ID, syncTx.GetOptions(false)..., - ); err != nil { + ) + + if err != nil { return err - } else if transaction == nil { - // maybe this is only an incoming transaction, let's try to find that and broadcast - // the processing of incoming transactions should then pick it up in the next job run - if incomingTransaction, err = getIncomingTransactionByID(ctx, syncTx.ID, syncTx.GetOptions(false)...); err != nil { - return err - } else if incomingTransaction == nil { - return errors.New("transaction was expected but not found, using ID: " + syncTx.ID) - } - txHex = incomingTransaction.Hex - } else { - txHex = transaction.Hex } + + if transaction == nil { + return errors.New("transaction was expected but not found, using ID: " + syncTx.ID) + } + + txHex = transaction.Hex } // Broadcast @@ -193,19 +177,6 @@ func processBroadcastTransaction(ctx context.Context, syncTx *SyncTransaction) e // Create status message message := "broadcast success" - // process the incoming transaction before finishing the sync - if incomingTransaction != nil { - // give the transaction some time to propagate through the network - time.Sleep(3 * time.Second) - - // we don't need to handle the error here, this is only to speed up the processing - // job will pick it up later if needed - if err = processIncomingTransaction(ctx, nil, incomingTransaction); err == nil { - // again ignore the error, if an error occurs the transaction will be set and will be processed further - transaction, _ = getTransactionByID(ctx, "", syncTx.ID, syncTx.GetOptions(false)...) - } - } - // Update the sync information syncTx.BroadcastStatus = SyncStatusComplete syncTx.Results.LastMessage = message @@ -244,34 +215,15 @@ func processBroadcastTransaction(ctx context.Context, syncTx *SyncTransaction) e // Fire a notification notify(notifications.EventTypeBroadcast, syncTx) - // Notify any P2P paymail providers associated to the transaction - // but only if we actually found the transaction in the transactions' collection, otherwise this was an incoming - // transaction that needed to be broadcast and was not successfully processed after the broadcast - if transaction != nil { - if syncTx.P2PStatus == SyncStatusReady { - return _processP2PTransaction(ctx, syncTx, transaction) - } else if syncTx.P2PStatus == SyncStatusSkipped && syncTx.SyncStatus == SyncStatusReady { - return _processSyncTransaction(ctx, syncTx, transaction) - } - } return nil } ///////////////// -// _processSyncTransaction will process the sync transaction record, or save the failure -func _processSyncTransaction(ctx context.Context, syncTx *SyncTransaction, transaction *Transaction) error { +// _syncTxDataFromChain will process the sync transaction record, or save the failure +func _syncTxDataFromChain(ctx context.Context, syncTx *SyncTransaction, transaction *Transaction) error { // Successfully capture any panics, convert to readable string and log the error - defer func() { - if err := recover(); err != nil { - syncTx.Client().Logger().Error(ctx, - fmt.Sprintf( - "panic: %v - stack trace: %v", err, - strings.ReplaceAll(string(debug.Stack()), "\n", ""), - ), - ) - } - }() + defer recoverAndLog(ctx, syncTx.client.Logger()) // Create the lock and set the release for after the function completes unlock, err := newWriteLock( @@ -323,13 +275,7 @@ func _processSyncTransaction(ctx context.Context, syncTx *SyncTransaction, trans return nil } - // Add additional information (if found on-chain) - transaction.BlockHash = txInfo.BlockHash - transaction.BlockHeight = uint64(txInfo.BlockHeight) - transaction.MerkleProof = MerkleProof(*txInfo.MerkleProof) - bump := transaction.MerkleProof.ToBUMP() - bump.BlockHeight = transaction.BlockHeight - transaction.BUMP = bump + transaction.setChainInfo(txInfo) // Create status message message := "transaction was found on-chain by " + chainstate.ProviderBroadcastClient @@ -363,19 +309,10 @@ func _processSyncTransaction(ctx context.Context, syncTx *SyncTransaction, trans return nil } -// _processP2PTransaction will process the sync transaction record, or save the failure -func _processP2PTransaction(ctx context.Context, syncTx *SyncTransaction, transaction *Transaction) error { +// processP2PTransaction will process the sync transaction record, or save the failure +func processP2PTransaction(ctx context.Context, syncTx *SyncTransaction, transaction *Transaction) error { // Successfully capture any panics, convert to readable string and log the error - defer func() { - if err := recover(); err != nil { - syncTx.Client().Logger().Error(ctx, - fmt.Sprintf( - "panic: %v - stack trace: %v", err, - strings.ReplaceAll(string(debug.Stack()), "\n", ""), - ), - ) - } - }() + defer recoverAndLog(ctx, syncTx.client.Logger()) // Create the lock and set the release for after the function completes unlock, err := newWriteLock( @@ -476,10 +413,32 @@ func _notifyPaymailProviders(ctx context.Context, transaction *Transaction) ([]* // utils +func _groupByXpub(scTxs []*SyncTransaction) map[string][]*SyncTransaction { + txsByXpub := make(map[string][]*SyncTransaction) + + // group transactions by xpub and return including the tx itself + for _, tx := range scTxs { + xPubID := "" // fallback if we have no input xpubs + if len(tx.transaction.XpubInIDs) > 0 { + // use the first xpub for the grouping + // in most cases when we are broadcasting, there should be only 1 xpub in + xPubID = tx.transaction.XpubInIDs[0] + } + + if txsByXpub[xPubID] == nil { + txsByXpub[xPubID] = make([]*SyncTransaction, 0) + } + txsByXpub[xPubID] = append(txsByXpub[xPubID], tx) + } + + return txsByXpub +} + // _bailAndSaveSyncTransaction will save the error message for a sync tx func _bailAndSaveSyncTransaction(ctx context.Context, syncTx *SyncTransaction, status SyncStatus, action, provider, message string, ) { + if action == syncActionSync { syncTx.SyncStatus = status } else if action == syncActionP2P { @@ -500,5 +459,10 @@ func _bailAndSaveSyncTransaction(ctx context.Context, syncTx *SyncTransaction, s Provider: provider, StatusMessage: message, }) + + if syncTx.IsNew() { + return // do not save if new record! caller should decide if want to save new record + } + _ = syncTx.Save(ctx) } diff --git a/tx_repository.go b/tx_repository.go index 663eddef..49562206 100644 --- a/tx_repository.go +++ b/tx_repository.go @@ -4,6 +4,7 @@ import ( "context" "errors" + "github.com/libsv/go-bt" "github.com/mrz1836/go-datastore" ) @@ -254,3 +255,12 @@ func getTransactionsToCalculateBUMP(ctx context.Context, queryParams *datastore. } return txs, nil } + +func getTransactionByHex(ctx context.Context, hex string, opts ...ModelOps) (*Transaction, error) { + btTx, err := bt.NewTxFromString(hex) + if err != nil { + return nil, err + } + + return getTransactionByID(ctx, "", btTx.GetTxID(), opts...) +} diff --git a/tx_service.go b/tx_service.go index 1693cc73..cfba9d72 100644 --- a/tx_service.go +++ b/tx_service.go @@ -2,6 +2,7 @@ package bux import ( "context" + "encoding/hex" "errors" "github.com/BuxOrg/bux/chainstate" @@ -55,7 +56,7 @@ func processTransactions(ctx context.Context, maxTransactions int, opts ...Model func (m *Transaction) processUtxos(ctx context.Context) error { // Input should be processed only for outgoing transactions if m.draftTransaction != nil { - if err := m.processInputs(ctx); err != nil { + if err := m._processInputs(ctx); err != nil { return err } } @@ -63,6 +64,69 @@ func (m *Transaction) processUtxos(ctx context.Context) error { return m._processOutputs(ctx) } +// processTxInputs will process the transaction inputs +func (m *Transaction) _processInputs(ctx context.Context) (err error) { + // Pre-build the options + opts := m.GetOptions(false) + client := m.Client() + + var utxo *Utxo + + // check whether we are spending an internal utxo + for index := range m.TransactionBase.parsedTx.Inputs { + // todo: optimize this SQL SELECT to get all utxos in one query? + if utxo, err = m.transactionService.getUtxo(ctx, + hex.EncodeToString(m.TransactionBase.parsedTx.Inputs[index].PreviousTxID()), + m.TransactionBase.parsedTx.Inputs[index].PreviousTxOutIndex, + opts..., + ); err != nil { + return + } else if utxo != nil { // Found a UTXO record + + // Is Spent? + if len(utxo.SpendingTxID.String) > 0 { + return ErrUtxoAlreadySpent + } + + // Only if IUC is enabled (or client is nil which means its enabled by default) + if client == nil || client.IsIUCEnabled() { + + // check whether the utxo is spent + isReserved := len(utxo.DraftID.String) > 0 + matchesDraft := m.draftTransaction != nil && utxo.DraftID.String == m.draftTransaction.ID + + // Check whether the spending transaction was reserved by the draft transaction (in the utxo) + if !isReserved { + return ErrUtxoNotReserved + } + if !matchesDraft { + return ErrDraftIDMismatch + } + } + + // Update the output value + if _, ok := m.XpubOutputValue[utxo.XpubID]; !ok { + m.XpubOutputValue[utxo.XpubID] = 0 + } + m.XpubOutputValue[utxo.XpubID] -= int64(utxo.Satoshis) + + // Mark utxo as spent + utxo.SpendingTxID.Valid = true + utxo.SpendingTxID.String = m.ID + m.utxos = append(m.utxos, *utxo) + + // Add the xPub ID + if !utils.StringInSlice(utxo.XpubID, m.XpubInIDs) { + m.XpubInIDs = append(m.XpubInIDs, utxo.XpubID) + } + } + + // todo: what if the utxo is nil (not found)? + } + + return +} + // processTxOutputs will process the transaction outputs func (m *Transaction) _processOutputs(ctx context.Context) (err error) { // Pre-build the options @@ -131,8 +195,7 @@ func _processTransaction(ctx context.Context, transaction *Transaction) error { return err } - transaction.BlockHash = txInfo.BlockHash - transaction.BlockHeight = uint64(txInfo.BlockHeight) + transaction.setChainInfo(txInfo) return transaction.Save(ctx) } diff --git a/utils.go b/utils.go new file mode 100644 index 00000000..67be2791 --- /dev/null +++ b/utils.go @@ -0,0 +1,21 @@ +package bux + +import ( + "context" + "fmt" + "runtime/debug" + "strings" + + zLogger "github.com/mrz1836/go-logger" +) + +func recoverAndLog(ctx context.Context, log zLogger.GormLoggerInterface) { + if err := recover(); err != nil { + log.Error(ctx, + fmt.Sprintf( + "panic: %v - stack trace: %v", err, + strings.ReplaceAll(string(debug.Stack()), "\n", ""), + ), + ) + } +}