From 83807047520100dcc743e10ba3249ee7cd0328b3 Mon Sep 17 00:00:00 2001 From: Lazy Nina Date: Tue, 30 Apr 2024 13:11:35 -0400 Subject: [PATCH] Transfer Z's changes from z/update-state-change-entry-encoder branch to pos --- entries/access_group.go | 6 +- entries/access_group_member.go | 6 +- entries/balance.go | 6 +- entries/block.go | 8 +- entries/bls_pkid_pair.go | 6 +- entries/dao_coin_limit_order.go | 6 +- entries/derived_key.go | 6 +- entries/deso_balance.go | 6 +- entries/diamond.go | 6 +- entries/epoch.go | 4 +- entries/follow.go | 6 +- entries/global_params.go | 6 +- entries/helpers.go | 44 +++++++ entries/jailed_history.go | 6 +- entries/like.go | 6 +- entries/locked_stake.go | 6 +- entries/lockup.go | 6 +- entries/message.go | 6 +- entries/new_message.go | 9 +- entries/nft.go | 6 +- entries/nft_bid.go | 6 +- entries/pkid.go | 12 +- entries/post.go | 8 +- entries/post_association.go | 7 +- entries/profile.go | 6 +- entries/stake.go | 6 +- entries/transaction.go | 8 +- entries/user_association.go | 7 +- entries/utxo_operation.go | 6 +- entries/validator.go | 6 +- entries/yield_curve_point.go | 6 +- go.mod | 2 +- handler/data_handler.go | 196 ++++++++++++++++++++++++++------ handler/db_utils.go | 4 +- main.go | 6 +- 35 files changed, 313 insertions(+), 134 deletions(-) create mode 100644 entries/helpers.go diff --git a/entries/access_group.go b/entries/access_group.go index fe43564..2051d2f 100644 --- a/entries/access_group.go +++ b/entries/access_group.go @@ -52,7 +52,7 @@ func AccessGroupEncoderToPGStruct(accessGroupEntry *lib.AccessGroupEntry, keyByt // PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler // based on the operation type and executes it. -func AccessGroupBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func AccessGroupBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -69,7 +69,7 @@ func AccessGroupBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, para } // bulkInsertAccessGroupEntry inserts a batch of access_group entries into the database. -func bulkInsertAccessGroupEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertAccessGroupEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -94,7 +94,7 @@ func bulkInsertAccessGroupEntry(entries []*lib.StateChangeEntry, db *bun.DB, ope } // bulkDeletePostEntry deletes a batch of access_group entries from the database. -func bulkDeleteAccessGroupEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteAccessGroupEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/access_group_member.go b/entries/access_group_member.go index 78f8338..1722f14 100644 --- a/entries/access_group_member.go +++ b/entries/access_group_member.go @@ -57,7 +57,7 @@ func AccessGroupMemberEncoderToPGStruct(accessGroupMemberEntry *lib.AccessGroupM // PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler // based on the operation type and executes it. -func AccessGroupMemberBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func AccessGroupMemberBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -74,7 +74,7 @@ func AccessGroupMemberBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB } // bulkInsertAccessGroupMemberEntry inserts a batch of access_group_member entries into the database. -func bulkInsertAccessGroupMemberEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertAccessGroupMemberEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -99,7 +99,7 @@ func bulkInsertAccessGroupMemberEntry(entries []*lib.StateChangeEntry, db *bun.D } // bulkDeletePostEntry deletes a batch of access_group_member entries from the database. -func bulkDeleteAccessGroupMemberEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteAccessGroupMemberEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/balance.go b/entries/balance.go index 4f8c2a0..6c6de0a 100644 --- a/entries/balance.go +++ b/entries/balance.go @@ -46,7 +46,7 @@ func BalanceEntryEncoderToPGStruct(balanceEntry *lib.BalanceEntry, keyBytes []by // PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler // based on the operation type and executes it. -func BalanceBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func BalanceBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -63,7 +63,7 @@ func BalanceBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params * } // bulkInsertBalanceEntry inserts a batch of balance entries into the database. -func bulkInsertBalanceEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertBalanceEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -88,7 +88,7 @@ func bulkInsertBalanceEntry(entries []*lib.StateChangeEntry, db *bun.DB, operati } // bulkDeletePostEntry deletes a batch of balance entries from the database. -func bulkDeleteBalanceEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteBalanceEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/block.go b/entries/block.go index 322474c..7429306 100644 --- a/entries/block.go +++ b/entries/block.go @@ -90,7 +90,7 @@ func BlockEncoderToPGStruct(block *lib.MsgDeSoBlock, keyBytes []byte, params *li // PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler // based on the operation type and executes it. -func BlockBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func BlockBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -107,7 +107,7 @@ func BlockBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *li } // bulkInsertUtxoOperationsEntry inserts a batch of user_association entries into the database. -func bulkInsertBlockEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertBlockEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // If this block is a part of the initial sync, skip it - it will be handled by the utxo operations. if operationType == lib.DbOperationTypeInsert { return nil @@ -217,7 +217,7 @@ func bulkInsertBlockEntry(entries []*lib.StateChangeEntry, db *bun.DB, operation } // bulkDeleteBlockEntry deletes a batch of block entries from the database. -func bulkDeleteBlockEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteBlockEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) @@ -229,7 +229,7 @@ func bulkDeleteBlockEntry(entries []*lib.StateChangeEntry, db *bun.DB, operation // bulkDeleteBlockEntriesFromKeysToDelete deletes a batch of block entries from the database. // It also deletes any transactions and utxo operations associated with the block. -func bulkDeleteBlockEntriesFromKeysToDelete(db *bun.DB, keysToDelete [][]byte) error { +func bulkDeleteBlockEntriesFromKeysToDelete(db bun.IDB, keysToDelete [][]byte) error { // Execute the delete query on the blocks table. if _, err := db.NewDelete(). Model(&PGBlockEntry{}). diff --git a/entries/bls_pkid_pair.go b/entries/bls_pkid_pair.go index cc96ae8..8d3730f 100644 --- a/entries/bls_pkid_pair.go +++ b/entries/bls_pkid_pair.go @@ -77,7 +77,7 @@ func BLSPublicKeyPKIDPairSnapshotEncoderToPGStruct( // BLSPublicKeyPKIDPairBatchOperation is the entry point for processing a batch of BLSPublicKeyPKIDPair entries. // It determines the appropriate handler based on the operation type and executes it. -func BLSPublicKeyPKIDPairBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func BLSPublicKeyPKIDPairBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -95,7 +95,7 @@ func BLSPublicKeyPKIDPairBatchOperation(entries []*lib.StateChangeEntry, db *bun // bulkInsertBLSPkidPairEntry inserts a batch of stake entries into the database. func bulkInsertBLSPkidPairEntry( - entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams, + entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams, ) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) @@ -149,7 +149,7 @@ func bulkInsertBLSPkidPairEntry( } // bulkDeleteBLSPkidPairEntry deletes a batch of stake entries from the database. -func bulkDeleteBLSPkidPairEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteBLSPkidPairEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/dao_coin_limit_order.go b/entries/dao_coin_limit_order.go index 7e4a557..5657b74 100644 --- a/entries/dao_coin_limit_order.go +++ b/entries/dao_coin_limit_order.go @@ -48,7 +48,7 @@ func DaoCoinLimitOrderEncoderToPGStruct(daoCoinLimitOrder *lib.DAOCoinLimitOrder // DaoCoinLimitOrderBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler // based on the operation type and executes it. -func DaoCoinLimitOrderBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func DaoCoinLimitOrderBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -65,7 +65,7 @@ func DaoCoinLimitOrderBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB } // bulkInsertDaoCoinLimitOrderEntry inserts a batch of post_association entries into the database. -func bulkInsertDaoCoinLimitOrderEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertDaoCoinLimitOrderEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -89,7 +89,7 @@ func bulkInsertDaoCoinLimitOrderEntry(entries []*lib.StateChangeEntry, db *bun.D } // bulkDeletePostEntry deletes a batch of post_association entries from the database. -func bulkDeleteDaoCoinLimitOrderEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteDaoCoinLimitOrderEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/derived_key.go b/entries/derived_key.go index b14a7f7..2af1204 100644 --- a/entries/derived_key.go +++ b/entries/derived_key.go @@ -56,7 +56,7 @@ func DerivedKeyEncoderToPGStruct(derivedKeyEntry *lib.DerivedKeyEntry, keyBytes // PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler // based on the operation type and executes it. -func DerivedKeyBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func DerivedKeyBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -73,7 +73,7 @@ func DerivedKeyBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, param } // bulkInsertDerivedKeyEntry inserts a batch of derived_key entries into the database. -func bulkInsertDerivedKeyEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertDerivedKeyEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -101,7 +101,7 @@ func bulkInsertDerivedKeyEntry(entries []*lib.StateChangeEntry, db *bun.DB, oper } // bulkDeletePostEntry deletes a batch of derived_key entries from the database. -func bulkDeleteDerivedKeyEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteDerivedKeyEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/deso_balance.go b/entries/deso_balance.go index 3eebff2..2621a00 100644 --- a/entries/deso_balance.go +++ b/entries/deso_balance.go @@ -36,7 +36,7 @@ func DesoBalanceEncoderToPGStruct(desoBalanceEntry *lib.DeSoBalanceEntry, keyByt // PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler // based on the operation type and executes it. -func DesoBalanceBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func DesoBalanceBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -53,7 +53,7 @@ func DesoBalanceBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, para } // bulkInsertDiamondEntry inserts a batch of diamond entries into the database. -func bulkInsertDesoBalanceEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertDesoBalanceEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -77,7 +77,7 @@ func bulkInsertDesoBalanceEntry(entries []*lib.StateChangeEntry, db *bun.DB, ope } // bulkDeletePostEntry deletes a batch of diamond entries from the database. -func bulkDeleteDesoBalanceEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteDesoBalanceEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/diamond.go b/entries/diamond.go index dd07bf3..15ac3f8 100644 --- a/entries/diamond.go +++ b/entries/diamond.go @@ -41,7 +41,7 @@ func DiamondEncoderToPGStruct(diamondEntry *lib.DiamondEntry, keyBytes []byte, p // PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler // based on the operation type and executes it. -func DiamondBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func DiamondBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -58,7 +58,7 @@ func DiamondBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params * } // bulkInsertDiamondEntry inserts a batch of diamond entries into the database. -func bulkInsertDiamondEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertDiamondEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -82,7 +82,7 @@ func bulkInsertDiamondEntry(entries []*lib.StateChangeEntry, db *bun.DB, operati } // bulkDeletePostEntry deletes a batch of diamond entries from the database. -func bulkDeleteDiamondEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteDiamondEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/epoch.go b/entries/epoch.go index 39cea4a..25901dc 100644 --- a/entries/epoch.go +++ b/entries/epoch.go @@ -53,7 +53,7 @@ func EpochEntryEncoderToPGStruct(epochEntry *lib.EpochEntry, keyBytes []byte, pa // EpochEntryBatchOperation is the entry point for processing a batch of Epoch entries. // It determines the appropriate handler based on the operation type and executes it. -func EpochEntryBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func EpochEntryBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -73,7 +73,7 @@ func EpochEntryBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, param } // bulkInsertEpochEntry inserts a batch of locked stake entries into the database. -func bulkInsertEpochEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertEpochEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. diff --git a/entries/follow.go b/entries/follow.go index e808d9b..6179353 100644 --- a/entries/follow.go +++ b/entries/follow.go @@ -36,7 +36,7 @@ func FollowEncoderToPGStruct(followEntry *lib.FollowEntry, keyBytes []byte, para // PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler // based on the operation type and executes it. -func FollowBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func FollowBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -53,7 +53,7 @@ func FollowBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *l } // bulkInsertFollowEntry inserts a batch of follow entries into the database. -func bulkInsertFollowEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertFollowEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -78,7 +78,7 @@ func bulkInsertFollowEntry(entries []*lib.StateChangeEntry, db *bun.DB, operatio } // bulkDeletePostEntry deletes a batch of follow entries from the database. -func bulkDeleteFollowEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteFollowEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/global_params.go b/entries/global_params.go index e556dba..fc64f01 100644 --- a/entries/global_params.go +++ b/entries/global_params.go @@ -80,7 +80,7 @@ func GlobalParamsEncoderToPGStruct(globalParamsEntry *lib.GlobalParamsEntry, key // GlobalParamsBatchOperation is the entry point for processing a batch of global params entries. // It determines the appropriate handler based on the operation type and executes it. -func GlobalParamsBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func GlobalParamsBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -97,7 +97,7 @@ func GlobalParamsBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, par } // bulkInsertGlobalParamsEntry inserts a batch of global_params entries into the database. -func bulkInsertGlobalParamsEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertGlobalParamsEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -122,7 +122,7 @@ func bulkInsertGlobalParamsEntry(entries []*lib.StateChangeEntry, db *bun.DB, op } // bulkDeletePostEntry deletes a batch of global_params entries from the database. -func bulkDeleteGlobalParamsEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteGlobalParamsEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/helpers.go b/entries/helpers.go new file mode 100644 index 0000000..ea56c5f --- /dev/null +++ b/entries/helpers.go @@ -0,0 +1,44 @@ +package entries + +import ( + "fmt" + "github.com/google/uuid" + "github.com/pkg/errors" + "github.com/uptrace/bun" +) + +// GetDbHandle returns the correct interface to use for database operations. +// If a transaction is open, it returns the transaction handle, otherwise it returns the db handle. +func GetDbHandle(tx *bun.Tx, db *bun.DB) bun.IDB { + if tx != nil { + return tx + } + return db +} + +// CreateSavepoint creates a savepoint in the current transaction. If no transaction is open, it returns an empty string. +// The randomly generated savepoint name is returned if the savepoint is created successfully. +func CreateSavepoint(tx *bun.Tx) (string, error) { + if tx == nil { + return "", nil + } + savepointName := uuid.New().String() + + _, err := tx.Exec(fmt.Sprintf("SAVEPOINT %s", savepointName)) + if err != nil { + return "", errors.Wrapf(err, "PostgresDataHandler.CreateSavepoint: Error creating savepoint") + } + + return savepointName, nil +} + +func RollbackToSavepoint(tx *bun.Tx, savepointName string) error { + if tx == nil || savepointName == "" { + return nil + } + _, err := tx.Exec(fmt.Sprintf("ROLLBACK TO SAVEPOINT %s", savepointName)) + if err != nil { + return errors.Wrapf(err, "PostgresDataHandler.RollbackToSavepoint: Error reverting to savepoint") + } + return nil +} diff --git a/entries/jailed_history.go b/entries/jailed_history.go index 16a0f60..32a342b 100644 --- a/entries/jailed_history.go +++ b/entries/jailed_history.go @@ -39,7 +39,7 @@ func UnjailValidatorStateChangeMetadataEncoderToPGStruct( // ValidatorBatchOperation is the entry point for processing a batch of Validator entries. // It determines the appropriate handler based on the operation type and executes it. -func JailedHistoryEventBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func JailedHistoryEventBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -57,7 +57,7 @@ func JailedHistoryEventBatchOperation(entries []*lib.StateChangeEntry, db *bun.D // bulkInsertJailedHistoryEvent inserts a batch of jailed history events into the database. func bulkInsertJailedHistoryEvent( - entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams, + entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams, ) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) @@ -86,7 +86,7 @@ func bulkInsertJailedHistoryEvent( } // bulkDeleteJailedHistoryEvent deletes a batch of validator entries from the database. -func bulkDeleteJailedHistoryEvent(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteJailedHistoryEvent(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/like.go b/entries/like.go index 8c6d890..18ba75f 100644 --- a/entries/like.go +++ b/entries/like.go @@ -37,7 +37,7 @@ func LikeEncoderToPGStruct(likeEntry *lib.LikeEntry, keyBytes []byte, params *li // PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler // based on the operation type and executes it. -func LikeBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func LikeBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -54,7 +54,7 @@ func LikeBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib } // bulkInsertLikeEntry inserts a batch of like entries into the database. -func bulkInsertLikeEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertLikeEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -79,7 +79,7 @@ func bulkInsertLikeEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationT } // bulkDeletePostEntry deletes a batch of like entries from the database. -func bulkDeleteLikeEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteLikeEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/locked_stake.go b/entries/locked_stake.go index d8adf88..bcb32f3 100644 --- a/entries/locked_stake.go +++ b/entries/locked_stake.go @@ -55,7 +55,7 @@ func LockedStakeEncoderToPGStruct(lockedStakeEntry *lib.LockedStakeEntry, keyByt // LockedStakeBatchOperation is the entry point for processing a batch of LockedStake entries. // It determines the appropriate handler based on the operation type and executes it. -func LockedStakeBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func LockedStakeBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -72,7 +72,7 @@ func LockedStakeBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, para } // bulkInsertLockedStakeEntry inserts a batch of locked stake entries into the database. -func bulkInsertLockedStakeEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertLockedStakeEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -97,7 +97,7 @@ func bulkInsertLockedStakeEntry(entries []*lib.StateChangeEntry, db *bun.DB, ope } // bulkDeleteLockedStakeEntry deletes a batch of locked stake entries from the database. -func bulkDeleteLockedStakeEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteLockedStakeEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/lockup.go b/entries/lockup.go index 312f997..16c5ee5 100644 --- a/entries/lockup.go +++ b/entries/lockup.go @@ -55,7 +55,7 @@ func LockedBalanceEntryEncoderToPGStruct(lockedBalanceEntry *lib.LockedBalanceEn // LockedBalanceEntryBatchOperation is the entry point for processing a batch of LockedBalance entries. // It determines the appropriate handler based on the operation type and executes it. -func LockedBalanceEntryBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func LockedBalanceEntryBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -72,7 +72,7 @@ func LockedBalanceEntryBatchOperation(entries []*lib.StateChangeEntry, db *bun.D } // bulkInsertLockedBalanceEntry inserts a batch of locked stake entries into the database. -func bulkInsertLockedBalanceEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertLockedBalanceEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -97,7 +97,7 @@ func bulkInsertLockedBalanceEntry(entries []*lib.StateChangeEntry, db *bun.DB, o } // bulkDeleteLockedBalanceEntry deletes a batch of locked stake entries from the database. -func bulkDeleteLockedBalanceEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteLockedBalanceEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/message.go b/entries/message.go index 10ebf77..1a29d2a 100644 --- a/entries/message.go +++ b/entries/message.go @@ -55,7 +55,7 @@ func MessageEncoderToPGStruct(messageEntry *lib.MessageEntry, keyBytes []byte, p // PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler // based on the operation type and executes it. -func MessageBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func MessageBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -72,7 +72,7 @@ func MessageBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params * } // bulkInsertMessageEntry inserts a batch of message entries into the database. -func bulkInsertMessageEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertMessageEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -96,7 +96,7 @@ func bulkInsertMessageEntry(entries []*lib.StateChangeEntry, db *bun.DB, operati } // bulkDeletePostEntry deletes a batch of message entries from the database. -func bulkDeleteMessageEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteMessageEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/new_message.go b/entries/new_message.go index 8ac674a..96a856c 100644 --- a/entries/new_message.go +++ b/entries/new_message.go @@ -3,6 +3,7 @@ package entries import ( "bytes" "context" + "encoding/hex" "github.com/deso-protocol/core/lib" "github.com/deso-protocol/state-consumer/consumer" "github.com/pkg/errors" @@ -45,7 +46,7 @@ func NewMessageEncoderToPGStruct(newMessageEntry *lib.NewMessageEntry, keyBytes } pgNewMessageEntry := NewMessageEntry{ - EncryptedText: string(newMessageEntry.EncryptedText[:]), + EncryptedText: hex.EncodeToString(newMessageEntry.EncryptedText[:]), Timestamp: consumer.UnixNanoToTime(newMessageEntry.TimestampNanos), ExtraData: consumer.ExtraDataBytesToString(newMessageEntry.ExtraData), IsGroupChatMessage: isGroupChatMessage, @@ -81,7 +82,7 @@ func NewMessageEncoderToPGStruct(newMessageEntry *lib.NewMessageEntry, keyBytes // PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler // based on the operation type and executes it. -func NewMessageBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func NewMessageBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -98,7 +99,7 @@ func NewMessageBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, param } // bulkInsertNewMessageEntry inserts a batch of new_message entries into the database. -func bulkInsertNewMessageEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertNewMessageEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -123,7 +124,7 @@ func bulkInsertNewMessageEntry(entries []*lib.StateChangeEntry, db *bun.DB, oper } // bulkDeletePostEntry deletes a batch of new_message entries from the database. -func bulkDeleteNewMessageEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteNewMessageEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/nft.go b/entries/nft.go index 02647f2..59a68c8 100644 --- a/entries/nft.go +++ b/entries/nft.go @@ -62,7 +62,7 @@ func NftEncoderToPGStruct(nftEntry *lib.NFTEntry, keyBytes []byte, params *lib.D // PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler // based on the operation type and executes it. -func NftBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func NftBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -79,7 +79,7 @@ func NftBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib. } // bulkInsertNftEntry inserts a batch of nft entries into the database. -func bulkInsertNftEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertNftEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -104,7 +104,7 @@ func bulkInsertNftEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationTy } // bulkDeletePostEntry deletes a batch of nft entries from the database. -func bulkDeleteNftEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteNftEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/nft_bid.go b/entries/nft_bid.go index 9ea5fef..9648305 100644 --- a/entries/nft_bid.go +++ b/entries/nft_bid.go @@ -47,7 +47,7 @@ func NftBidEncoderToPGStruct(nftBidEntry *lib.NFTBidEntry, keyBytes []byte, para // PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler // based on the operation type and executes it. -func NftBidBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func NftBidBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -64,7 +64,7 @@ func NftBidBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *l } // bulkInsertNftBidEntry inserts a batch of nft_bid entries into the database. -func bulkInsertNftBidEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertNftBidEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -88,7 +88,7 @@ func bulkInsertNftBidEntry(entries []*lib.StateChangeEntry, db *bun.DB, operatio } // bulkDeletePostEntry deletes a batch of nft_bid entries from the database. -func bulkDeleteNftBidEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteNftBidEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/pkid.go b/entries/pkid.go index 7a72e21..f7058d0 100644 --- a/entries/pkid.go +++ b/entries/pkid.go @@ -67,7 +67,7 @@ func LeaderScheduleEncoderToPGStruct(validatorPKID *lib.PKID, keyBytes []byte, p // PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler // based on the operation type and executes it. -func PkidEntryBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func PkidEntryBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -84,7 +84,7 @@ func PkidEntryBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params } // bulkInsertDiamondEntry inserts a batch of diamond entries into the database. -func bulkInsertPkidEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertPkidEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -108,7 +108,7 @@ func bulkInsertPkidEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationT } // bulkDeletePostEntry deletes a batch of diamond entries from the database. -func bulkDeletePkidEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeletePkidEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) @@ -127,7 +127,7 @@ func bulkDeletePkidEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationT return nil } -func PkidBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func PkidBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -144,7 +144,7 @@ func PkidBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib } // bulkInsertPkid inserts a batch of PKIDs into the database. -func bulkInsertPkid(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertPkid(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) @@ -182,7 +182,7 @@ func bulkInsertPkid(entries []*lib.StateChangeEntry, db *bun.DB, operationType l } // bulkDeletePKID deletes a batch of PKIDs from the database. -func bulkDeletePkid(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeletePkid(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/post.go b/entries/post.go index e64a816..51a6e56 100644 --- a/entries/post.go +++ b/entries/post.go @@ -60,7 +60,7 @@ func PostEntryEncoderToPGStruct(postEntry *lib.PostEntry, keyBytes []byte, param IsNFT: postEntry.IsNFT, NumNFTCopies: postEntry.NumNFTCopies, NumNFTCopiesForSale: postEntry.NumNFTCopiesForSale, - NumNFTCopiesBurned: postEntry.NumNFTCopiesBurned, + NumNFTCopiesBurned: postEntry.NumNFTCopiesBurned, HasUnlockable: postEntry.HasUnlockable, NFTRoyaltyToCreatorBasisPoints: postEntry.NFTRoyaltyToCreatorBasisPoints, NFTRoyaltyToCoinBasisPoints: postEntry.NFTRoyaltyToCoinBasisPoints, @@ -90,7 +90,7 @@ func PostEntryEncoderToPGStruct(postEntry *lib.PostEntry, keyBytes []byte, param // PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler // based on the operation type and executes it. -func PostBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func PostBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -107,7 +107,7 @@ func PostBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib } // bulkInsertPostEntry inserts a batch of post entries into the database. -func bulkInsertPostEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertPostEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -135,7 +135,7 @@ func bulkInsertPostEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationT } // bulkDeletePostEntry deletes a batch of post entries from the database. -func bulkDeletePostEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeletePostEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/post_association.go b/entries/post_association.go index d6882fd..bd5bae2 100644 --- a/entries/post_association.go +++ b/entries/post_association.go @@ -38,6 +38,7 @@ func PostAssociationEncoderToPGStruct(postAssociationEntry *lib.PostAssociationE pgEntry := PostAssociationEntry{ AssociationType: string(postAssociationEntry.AssociationType[:]), AssociationValue: string(postAssociationEntry.AssociationValue[:]), + BlockHeight: postAssociationEntry.BlockHeight, ExtraData: consumer.ExtraDataBytesToString(postAssociationEntry.ExtraData), BadgerKey: keyBytes, } @@ -60,7 +61,7 @@ func PostAssociationEncoderToPGStruct(postAssociationEntry *lib.PostAssociationE // PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler // based on the operation type and executes it. -func PostAssociationBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func PostAssociationBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -77,7 +78,7 @@ func PostAssociationBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, } // bulkInsertPostAssociationEntry inserts a batch of post_association entries into the database. -func bulkInsertPostAssociationEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertPostAssociationEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -101,7 +102,7 @@ func bulkInsertPostAssociationEntry(entries []*lib.StateChangeEntry, db *bun.DB, } // bulkDeletePostEntry deletes a batch of post_association entries from the database. -func bulkDeletePostAssociationEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeletePostAssociationEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/profile.go b/entries/profile.go index 436f84f..a8b2024 100644 --- a/entries/profile.go +++ b/entries/profile.go @@ -60,7 +60,7 @@ func ProfileEntryEncoderToPGStruct(profileEntry *lib.ProfileEntry, keyBytes []by // PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler // based on the operation type and executes it. -func ProfileBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func ProfileBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -77,7 +77,7 @@ func ProfileBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params * } // bulkInsertPostEntry inserts a batch of post entries into the database. -func bulkInsertProfileEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertProfileEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -100,7 +100,7 @@ func bulkInsertProfileEntry(entries []*lib.StateChangeEntry, db *bun.DB, operati } // bulkDeletePostEntry deletes a batch of profile entries from the database. -func bulkDeleteProfileEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteProfileEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/stake.go b/entries/stake.go index 07248eb..66bbbca 100644 --- a/entries/stake.go +++ b/entries/stake.go @@ -56,7 +56,7 @@ func StakeEncoderToPGStruct(stakeEntry *lib.StakeEntry, keyBytes []byte, params // StakeBatchOperation is the entry point for processing a batch of Stake entries. // It determines the appropriate handler based on the operation type and executes it. -func StakeBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func StakeBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -73,7 +73,7 @@ func StakeBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *li } // bulkInsertStakeEntry inserts a batch of stake entries into the database. -func bulkInsertStakeEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertStakeEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -98,7 +98,7 @@ func bulkInsertStakeEntry(entries []*lib.StateChangeEntry, db *bun.DB, operation } // bulkDeleteStakeEntry deletes a batch of stake entries from the database. -func bulkDeleteStakeEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteStakeEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/transaction.go b/entries/transaction.go index 20945e8..4a48073 100644 --- a/entries/transaction.go +++ b/entries/transaction.go @@ -118,7 +118,7 @@ func TransactionEncoderToPGStruct( // TransactionBatchOperation is the entry point for processing a batch of transaction entries. It determines the appropriate handler // based on the operation type and executes it. -func TransactionBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func TransactionBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -176,7 +176,7 @@ func transformTransactionEntry(entries []*lib.StateChangeEntry, params *lib.DeSo return pgTransactionEntrySlice, nil } -func bulkInsertTransactionEntry(entries []*PGTransactionEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkInsertTransactionEntry(entries []*PGTransactionEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Bulk insert the entries. transactionQuery := db.NewInsert().Model(&entries) @@ -191,7 +191,7 @@ func bulkInsertTransactionEntry(entries []*PGTransactionEntry, db *bun.DB, opera } // transformAndBulkInsertTransactionEntry inserts a batch of user_association entries into the database. -func transformAndBulkInsertTransactionEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func transformAndBulkInsertTransactionEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { pgTransactionEntrySlice, err := transformTransactionEntry(entries, params) if err != nil { return errors.Wrapf(err, "entries.transformAndBulkInsertTransactionEntry: Problem transforming transaction entries") @@ -206,7 +206,7 @@ func transformAndBulkInsertTransactionEntry(entries []*lib.StateChangeEntry, db } // bulkDeleteTransactionEntry deletes a batch of transaction entries from the database. -func bulkDeleteTransactionEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteTransactionEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/user_association.go b/entries/user_association.go index 6f02be2..029399d 100644 --- a/entries/user_association.go +++ b/entries/user_association.go @@ -38,6 +38,7 @@ func UserAssociationEncoderToPGStruct(userAssociationEntry *lib.UserAssociationE pgEntry := UserAssociationEntry{ AssociationType: string(userAssociationEntry.AssociationType[:]), AssociationValue: string(userAssociationEntry.AssociationValue[:]), + BlockHeight: userAssociationEntry.BlockHeight, ExtraData: consumer.ExtraDataBytesToString(userAssociationEntry.ExtraData), BadgerKey: keyBytes, } @@ -60,7 +61,7 @@ func UserAssociationEncoderToPGStruct(userAssociationEntry *lib.UserAssociationE // PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler // based on the operation type and executes it. -func UserAssociationBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func UserAssociationBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -77,7 +78,7 @@ func UserAssociationBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, } // bulkInsertUserAssociationEntry inserts a batch of user_association entries into the database. -func bulkInsertUserAssociationEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertUserAssociationEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -101,7 +102,7 @@ func bulkInsertUserAssociationEntry(entries []*lib.StateChangeEntry, db *bun.DB, } // bulkDeletePostEntry deletes a batch of user_association entries from the database. -func bulkDeleteUserAssociationEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteUserAssociationEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/utxo_operation.go b/entries/utxo_operation.go index d934579..87f926c 100644 --- a/entries/utxo_operation.go +++ b/entries/utxo_operation.go @@ -67,7 +67,7 @@ func ConvertUtxoOperationKeyToBlockHashHex(keyBytes []byte) string { // UtxoOperationBatchOperation is the entry point for processing a batch of utxo operations. It determines the appropriate handler // based on the operation type and executes it. -func UtxoOperationBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func UtxoOperationBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -84,7 +84,7 @@ func UtxoOperationBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, pa } // bulkInsertUtxoOperationsEntry inserts a batch of utxo operation entries into the database. -func bulkInsertUtxoOperationsEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertUtxoOperationsEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) @@ -367,7 +367,7 @@ func bulkInsertUtxoOperationsEntry(entries []*lib.StateChangeEntry, db *bun.DB, } // bulkDeletePostEntry deletes a batch of utxo_operation entries from the database. -func bulkDeleteUtxoOperationEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteUtxoOperationEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/validator.go b/entries/validator.go index 1d41230..5604a81 100644 --- a/entries/validator.go +++ b/entries/validator.go @@ -98,7 +98,7 @@ func ValidatorEncoderToPGStruct(validatorEntry *lib.ValidatorEntry, keyBytes []b // ValidatorBatchOperation is the entry point for processing a batch of Validator entries. // It determines the appropriate handler based on the operation type and executes it. -func ValidatorBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func ValidatorBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -115,7 +115,7 @@ func ValidatorBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params } // bulkInsertValidatorEntry inserts a batch of validator entries into the database. -func bulkInsertValidatorEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertValidatorEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) uniqueValidatorEntries := consumer.FilterEntriesByPrefix(uniqueEntries, lib.Prefixes.PrefixValidatorByPKID) @@ -160,7 +160,7 @@ func bulkInsertValidatorEntry(entries []*lib.StateChangeEntry, db *bun.DB, opera } // bulkDeleteValidatorEntry deletes a batch of validator entries from the database. -func bulkDeleteValidatorEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteValidatorEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/entries/yield_curve_point.go b/entries/yield_curve_point.go index 55f0f7e..d545119 100644 --- a/entries/yield_curve_point.go +++ b/entries/yield_curve_point.go @@ -47,7 +47,7 @@ func LockupYieldCurvePointEncoderToPGStruct(lockupYieldCurvePoint *lib.LockupYie // LockupYieldCurvePointBatchOperation is the entry point for processing a batch of LockedBalance entries. // It determines the appropriate handler based on the operation type and executes it. -func LockupYieldCurvePointBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { +func LockupYieldCurvePointBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error { // We check before we call this function that there is at least one operation type. // We also ensure before this that all entries have the same operation type. operationType := entries[0].OperationType @@ -64,7 +64,7 @@ func LockupYieldCurvePointBatchOperation(entries []*lib.StateChangeEntry, db *bu } // bulkInsertLockupYieldCurvePoint inserts a batch of locked stake entries into the database. -func bulkInsertLockupYieldCurvePoint(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { +func bulkInsertLockupYieldCurvePoint(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) // Create a new array to hold the bun struct. @@ -89,7 +89,7 @@ func bulkInsertLockupYieldCurvePoint(entries []*lib.StateChangeEntry, db *bun.DB } // bulkDeleteLockupYieldCurvePoint deletes a batch of locked stake entries from the database. -func bulkDeleteLockupYieldCurvePoint(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { +func bulkDeleteLockupYieldCurvePoint(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) diff --git a/go.mod b/go.mod index fa23331..0672ae9 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module PostgresDataHandler +module github.com/deso-protocol/postgres-data-handler go 1.22 diff --git a/handler/data_handler.go b/handler/data_handler.go index 86a4c16..2f40acc 100644 --- a/handler/data_handler.go +++ b/handler/data_handler.go @@ -1,11 +1,16 @@ package handler import ( - "PostgresDataHandler/entries" - "PostgresDataHandler/migrations/post_sync_migrations" + "context" + "crypto/rand" + "database/sql" + "encoding/hex" "fmt" "github.com/deso-protocol/core/lib" + "github.com/deso-protocol/postgres-data-handler/entries" + "github.com/deso-protocol/postgres-data-handler/migrations/post_sync_migrations" "github.com/deso-protocol/state-consumer/consumer" + "github.com/golang/glog" "github.com/pkg/errors" "github.com/uptrace/bun" ) @@ -15,6 +20,8 @@ import ( type PostgresDataHandler struct { // A Postgres DB used for the storage of chain data. DB *bun.DB + // A bun transaction used for executing multiple operations in a single transaction. + Txn *bun.Tx // Params is a struct containing the current blockchain parameters. // It is used to determine which prefix to use for public keys. Params *lib.DeSoParams @@ -31,72 +38,91 @@ func (postgresDataHandler *PostgresDataHandler) HandleEntryBatch(batchedEntries var err error + // Get the correct db handle. + dbHandle := postgresDataHandler.GetDbHandle() + // Create a savepoint in the current transaction, if the transaction exists. + savepointName, err := postgresDataHandler.CreateSavepoint() + if err != nil { + return errors.Wrapf(err, "PostgresDataHandler.HandleEntryBatch: Error creating savepoint") + } + switch encoderType { case lib.EncoderTypePostEntry: - err = entries.PostBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.PostBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeProfileEntry: - err = entries.ProfileBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.ProfileBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeLikeEntry: - err = entries.LikeBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.LikeBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeDiamondEntry: - err = entries.DiamondBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.DiamondBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeFollowEntry: - err = entries.FollowBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.FollowBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeMessageEntry: - err = entries.MessageBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.MessageBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeBalanceEntry: - err = entries.BalanceBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.BalanceBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeNFTEntry: - err = entries.NftBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.NftBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeNFTBidEntry: - err = entries.NftBidBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.NftBidBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeDerivedKeyEntry: - err = entries.DerivedKeyBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.DerivedKeyBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeAccessGroupEntry: - err = entries.AccessGroupBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.AccessGroupBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeAccessGroupMemberEntry: - err = entries.AccessGroupMemberBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.AccessGroupMemberBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeNewMessageEntry: - err = entries.NewMessageBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.NewMessageBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeUserAssociationEntry: - err = entries.UserAssociationBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.UserAssociationBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypePostAssociationEntry: - err = entries.PostAssociationBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.PostAssociationBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypePKIDEntry: - err = entries.PkidEntryBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.PkidEntryBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeDeSoBalanceEntry: - err = entries.DesoBalanceBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.DesoBalanceBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeDAOCoinLimitOrderEntry: - err = entries.DaoCoinLimitOrderBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.DaoCoinLimitOrderBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeUtxoOperationBundle: - err = entries.UtxoOperationBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.UtxoOperationBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeBlock: - err = entries.BlockBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.BlockBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeTxn: - err = entries.TransactionBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.TransactionBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeStakeEntry: - err = entries.StakeBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.StakeBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeValidatorEntry: - err = entries.ValidatorBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.ValidatorBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeLockedStakeEntry: - err = entries.LockedStakeBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.LockedStakeBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeLockedBalanceEntry: - err = entries.LockedBalanceEntryBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.LockedBalanceEntryBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeLockupYieldCurvePoint: - err = entries.LockupYieldCurvePointBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.LockupYieldCurvePointBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeEpochEntry: - err = entries.EpochEntryBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.EpochEntryBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypePKID: - err = entries.PkidBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.PkidBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeGlobalParamsEntry: - err = entries.GlobalParamsBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.GlobalParamsBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) case lib.EncoderTypeBLSPublicKeyPKIDPairEntry: - err = entries.BLSPublicKeyPKIDPairBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + err = entries.BLSPublicKeyPKIDPairBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params) } if err != nil { + // If an error occurs, revert to the savepoint and return the error. + rollbackErr := postgresDataHandler.RevertToSavepoint(savepointName) + if rollbackErr != nil { + return errors.Wrapf(rollbackErr, "PostgresDataHandler.HandleEntryBatch: Error reverting to savepoint") + } return errors.Wrapf(err, "PostgresDataHandler.CallBatchOperationForEncoderType") } + + // Release the savepoint. + err = postgresDataHandler.ReleaseSavepoint(savepointName) + if err != nil { + return errors.Wrapf(err, "PostgresDataHandler.HandleEntryBatch: Error releasing savepoint") + } return nil } @@ -120,3 +146,109 @@ func (postgresDataHandler *PostgresDataHandler) HandleSyncEvent(syncEvent consum return nil } + +func (postgresDataHandler *PostgresDataHandler) InitiateTransaction() error { + glog.V(2).Info("Initiating Txn\n") + // If a transaction is already open, rollback the current transaction. + if postgresDataHandler.Txn != nil { + err := postgresDataHandler.Txn.Rollback() + if err != nil { + return errors.Wrapf(err, "PostgresDataHandler.InitiateTransaction: Error rolling back current transaction") + } + } + tx, err := postgresDataHandler.DB.BeginTx(context.Background(), &sql.TxOptions{}) + if err != nil { + return errors.Wrapf(err, "PostgresDataHandler.InitiateTransaction: Error beginning transaction") + } + postgresDataHandler.Txn = &tx + return nil +} + +func (postgresDataHandler *PostgresDataHandler) CommitTransaction() error { + glog.V(2).Info("Committing Txn\n") + if postgresDataHandler.Txn == nil { + return fmt.Errorf("PostgresDataHandler.CommitTransaction: No transaction to commit") + } + err := postgresDataHandler.Txn.Commit() + if err != nil { + return errors.Wrapf(err, "PostgresDataHandler.CommitTransaction: Error committing transaction") + } + postgresDataHandler.Txn = nil + return nil +} + +func (postgresDataHandler *PostgresDataHandler) RollbackTransaction() error { + glog.V(2).Info("Rolling back Txn\n") + if postgresDataHandler.Txn == nil { + return fmt.Errorf("PostgresDataHandler.RollbackTransaction: No transaction to rollback") + } + err := postgresDataHandler.Txn.Rollback() + if err != nil { + return errors.Wrapf(err, "PostgresDataHandler.RollbackTransaction: Error rolling back transaction") + } + postgresDataHandler.Txn = nil + return nil +} + +// GetDbHandle returns the correct interface to use for database operations. +// If a transaction is open, it returns the transaction handle, otherwise it returns the db handle. +func (postgresDataHandler *PostgresDataHandler) GetDbHandle() bun.IDB { + if postgresDataHandler.Txn != nil { + return postgresDataHandler.Txn + } + return postgresDataHandler.DB +} + +// CreateSavepoint creates a savepoint in the current transaction. If no transaction is open, it returns an empty string. +// The randomly generated savepoint name is returned if the savepoint is created successfully. +func (postgresDataHandler *PostgresDataHandler) CreateSavepoint() (string, error) { + if postgresDataHandler.Txn == nil { + return "", nil + } + savepointName, err := generateSavepointName() + if err != nil { + return "", errors.Wrapf(err, "PostgresDataHandler.CreateSavepoint: Error generating savepoint name") + } + + _, err = postgresDataHandler.Txn.Exec(fmt.Sprintf("SAVEPOINT %s", savepointName)) + if err != nil { + return "", errors.Wrapf(err, "PostgresDataHandler.CreateSavepoint: Error creating savepoint") + } + + return savepointName, nil +} + +// RevertToSavepoint reverts the current transaction to the savepoint with the given name. +func (postgresDataHandler *PostgresDataHandler) RevertToSavepoint(savepointName string) error { + if postgresDataHandler.Txn == nil { + return nil + } + _, err := postgresDataHandler.Txn.Exec(fmt.Sprintf("ROLLBACK TO SAVEPOINT %s", savepointName)) + if err != nil { + return errors.Wrapf(err, "PostgresDataHandler.RevertToSavepoint: Error reverting to savepoint") + } + return nil +} + +// ReleaseSavepoint releases the savepoint with the given name. +func (postgresDataHandler *PostgresDataHandler) ReleaseSavepoint(savepointName string) error { + if postgresDataHandler.Txn == nil { + return nil + } + _, err := postgresDataHandler.Txn.Exec(fmt.Sprintf("RELEASE SAVEPOINT %s", savepointName)) + if err != nil { + return errors.Wrapf(err, "PostgresDataHandler.ReleaseSavepoint: Error releasing savepoint") + } + return nil +} + +func generateSavepointName() (string, error) { + // Create a byte slice of length 8 for a 64-bit random value + randomBytes := make([]byte, 8) + _, err := rand.Read(randomBytes) + if err != nil { + return "", errors.Wrapf(err, "generateSavepointName: Error generating random bytes") + } + // Convert the byte slice to a hexadecimal string + return "savepoint_" + hex.EncodeToString(randomBytes), nil +} diff --git a/handler/db_utils.go b/handler/db_utils.go index d1fcc2f..51a38ab 100644 --- a/handler/db_utils.go +++ b/handler/db_utils.go @@ -1,9 +1,9 @@ package handler import ( - "PostgresDataHandler/migrations/initial_migrations" - "PostgresDataHandler/migrations/post_sync_migrations" "context" + "github.com/deso-protocol/postgres-data-handler/migrations/initial_migrations" + "github.com/deso-protocol/postgres-data-handler/migrations/post_sync_migrations" "github.com/golang/glog" "github.com/uptrace/bun" "github.com/uptrace/bun/migrate" diff --git a/main.go b/main.go index 5f279bf..e96d641 100644 --- a/main.go +++ b/main.go @@ -1,13 +1,13 @@ package main import ( - "PostgresDataHandler/handler" - "PostgresDataHandler/migrations/initial_migrations" - "PostgresDataHandler/migrations/post_sync_migrations" "database/sql" "flag" "fmt" "github.com/deso-protocol/core/lib" + "github.com/deso-protocol/postgres-data-handler/handler" + "github.com/deso-protocol/postgres-data-handler/migrations/initial_migrations" + "github.com/deso-protocol/postgres-data-handler/migrations/post_sync_migrations" "github.com/deso-protocol/state-consumer/consumer" "github.com/golang/glog" "github.com/spf13/viper"