diff --git a/services/horizon/internal/actions/claimable_balance_test.go b/services/horizon/internal/actions/claimable_balance_test.go index ff59b96134..05ff7532c2 100644 --- a/services/horizon/internal/actions/claimable_balance_test.go +++ b/services/horizon/internal/actions/claimable_balance_test.go @@ -1,6 +1,7 @@ package actions import ( + "database/sql" "net/http/httptest" "testing" @@ -20,6 +21,9 @@ func TestGetClaimableBalanceByID(t *testing.T) { test.ResetHorizonDB(t, tt.HorizonDB) q := &history.Q{tt.HorizonSession()} + q.SessionInterface.BeginTx(tt.Ctx, &sql.TxOptions{}) + defer q.SessionInterface.Rollback() + accountID := "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML" asset := xdr.MustNewCreditAsset("USD", accountID) balanceID := xdr.ClaimableBalanceId{ @@ -43,20 +47,22 @@ func TestGetClaimableBalanceByID(t *testing.T) { LastModifiedLedger: 123, } - err = q.UpsertClaimableBalances(tt.Ctx, []history.ClaimableBalance{cBalance}) - tt.Assert.NoError(err) + balanceInsertBuilder := q.NewClaimableBalanceBatchInsertBuilder() + tt.Assert.NoError(balanceInsertBuilder.Add(cBalance)) - claimantsInsertBuilder := q.NewClaimableBalanceClaimantBatchInsertBuilder(10) + claimantsInsertBuilder := q.NewClaimableBalanceClaimantBatchInsertBuilder() for _, claimant := range cBalance.Claimants { claimant := history.ClaimableBalanceClaimant{ BalanceID: cBalance.BalanceID, Destination: claimant.Destination, LastModifiedLedger: cBalance.LastModifiedLedger, } - err = claimantsInsertBuilder.Add(tt.Ctx, claimant) - tt.Assert.NoError(err) + tt.Assert.NoError(claimantsInsertBuilder.Add(claimant)) } + tt.Assert.NoError(balanceInsertBuilder.Exec(tt.Ctx, q.SessionInterface)) + tt.Assert.NoError(claimantsInsertBuilder.Exec(tt.Ctx, q.SessionInterface)) + handler := GetClaimableBalanceByIDHandler{} response, err := handler.GetResource(httptest.NewRecorder(), makeRequest( t, @@ -148,6 +154,9 @@ func TestGetClaimableBalances(t *testing.T) { test.ResetHorizonDB(t, tt.HorizonDB) q := &history.Q{tt.HorizonSession()} + q.SessionInterface.BeginTx(tt.Ctx, &sql.TxOptions{}) + defer q.SessionInterface.Rollback() + entriesMeta := []struct { id xdr.Hash accountID string @@ -187,25 +196,24 @@ func TestGetClaimableBalances(t *testing.T) { hCBs = append(hCBs, cb) } - err := q.UpsertClaimableBalances(tt.Ctx, hCBs) - tt.Assert.NoError(err) - - claimantsInsertBuilder := q.NewClaimableBalanceClaimantBatchInsertBuilder(10) + balanceInsertbuilder := q.NewClaimableBalanceBatchInsertBuilder() + claimantsInsertBuilder := q.NewClaimableBalanceClaimantBatchInsertBuilder() for _, cBalance := range hCBs { + tt.Assert.NoError(balanceInsertbuilder.Add(cBalance)) + for _, claimant := range cBalance.Claimants { claimant := history.ClaimableBalanceClaimant{ BalanceID: cBalance.BalanceID, Destination: claimant.Destination, LastModifiedLedger: cBalance.LastModifiedLedger, } - err = claimantsInsertBuilder.Add(tt.Ctx, claimant) - tt.Assert.NoError(err) + tt.Assert.NoError(claimantsInsertBuilder.Add(claimant)) } } - err = claimantsInsertBuilder.Exec(tt.Ctx) - tt.Assert.NoError(err) + tt.Assert.NoError(balanceInsertbuilder.Exec(tt.Ctx, q.SessionInterface)) + tt.Assert.NoError(claimantsInsertBuilder.Exec(tt.Ctx, q.SessionInterface)) handler := GetClaimableBalancesHandler{} response, err := handler.GetResourcePage(httptest.NewRecorder(), makeRequest( @@ -284,11 +292,8 @@ func TestGetClaimableBalances(t *testing.T) { tt.Assert.NoError(err) tt.Assert.Len(response, 0) - // new claimable balances are ingest and one of them updated, they should appear in the next pages - cbToBeUpdated := hCBs[3] - cbToBeUpdated.Sponsor = null.StringFrom("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML") - cbToBeUpdated.LastModifiedLedger = 1238 - q.UpsertClaimableBalances(tt.Ctx, []history.ClaimableBalance{cbToBeUpdated}) + balanceInsertbuilder = q.NewClaimableBalanceBatchInsertBuilder() + claimantsInsertBuilder = q.NewClaimableBalanceClaimantBatchInsertBuilder() entriesMeta = []struct { id xdr.Hash @@ -316,23 +321,21 @@ func TestGetClaimableBalances(t *testing.T) { hCBs = append(hCBs, entry) } - err = q.UpsertClaimableBalances(tt.Ctx, hCBs) - tt.Assert.NoError(err) - for _, cBalance := range hCBs { + tt.Assert.NoError(balanceInsertbuilder.Add(cBalance)) + for _, claimant := range cBalance.Claimants { claimant := history.ClaimableBalanceClaimant{ BalanceID: cBalance.BalanceID, Destination: claimant.Destination, LastModifiedLedger: cBalance.LastModifiedLedger, } - err = claimantsInsertBuilder.Add(tt.Ctx, claimant) - tt.Assert.NoError(err) + tt.Assert.NoError(claimantsInsertBuilder.Add(claimant)) } } - err = claimantsInsertBuilder.Exec(tt.Ctx) - tt.Assert.NoError(err) + tt.Assert.NoError(balanceInsertbuilder.Exec(tt.Ctx, q.SessionInterface)) + tt.Assert.NoError(claimantsInsertBuilder.Exec(tt.Ctx, q.SessionInterface)) response, err = handler.GetResourcePage(httptest.NewRecorder(), makeRequest( t, @@ -372,21 +375,6 @@ func TestGetClaimableBalances(t *testing.T) { q, )) - tt.Assert.NoError(err) - tt.Assert.Len(response, 1) - - tt.Assert.Equal(cbToBeUpdated.BalanceID, response[0].(protocol.ClaimableBalance).BalanceID) - - response, err = handler.GetResourcePage(httptest.NewRecorder(), makeRequest( - t, - map[string]string{ - "limit": "2", - "cursor": response[0].(protocol.ClaimableBalance).PagingToken(), - }, - map[string]string{}, - q, - )) - tt.Assert.NoError(err) tt.Assert.Len(response, 0) @@ -404,9 +392,7 @@ func TestGetClaimableBalances(t *testing.T) { tt.Assert.NoError(err) tt.Assert.Len(response, 2) - tt.Assert.Equal(cbToBeUpdated.BalanceID, response[0].(protocol.ClaimableBalance).BalanceID) - - tt.Assert.Equal(hCBs[1].BalanceID, response[1].(protocol.ClaimableBalance).BalanceID) + tt.Assert.Equal(hCBs[1].BalanceID, response[0].(protocol.ClaimableBalance).BalanceID) response, err = handler.GetResourcePage(httptest.NewRecorder(), makeRequest( t, @@ -422,8 +408,6 @@ func TestGetClaimableBalances(t *testing.T) { tt.Assert.NoError(err) tt.Assert.Len(response, 1) - tt.Assert.Equal(hCBs[0].BalanceID, response[0].(protocol.ClaimableBalance).BalanceID) - // filter by asset response, err = handler.GetResourcePage(httptest.NewRecorder(), makeRequest( t, @@ -492,7 +476,7 @@ func TestGetClaimableBalances(t *testing.T) { )) tt.Assert.NoError(err) - tt.Assert.Len(response, 3) + tt.Assert.Len(response, 2) for _, resource := range response { tt.Assert.Equal( "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", diff --git a/services/horizon/internal/db2/history/claimable_balance_batch_insert_builder.go b/services/horizon/internal/db2/history/claimable_balance_batch_insert_builder.go new file mode 100644 index 0000000000..08ab005eb8 --- /dev/null +++ b/services/horizon/internal/db2/history/claimable_balance_batch_insert_builder.go @@ -0,0 +1,47 @@ +package history + +import ( + "context" + + "github.com/stellar/go/support/db" + "github.com/stellar/go/xdr" +) + +// ClaimableBalanceBatchInsertBuilder is used to insert claimable balance into the +// claimable_balances table +type ClaimableBalanceBatchInsertBuilder interface { + Add(claimableBalance ClaimableBalance) error + Exec(ctx context.Context, session db.SessionInterface) error + Reset() error +} + +// ClaimableBalanceBatchInsertBuilder is a simple wrapper around db.FastBatchInsertBuilder +type claimableBalanceBatchInsertBuilder struct { + encodingBuffer *xdr.EncodingBuffer + builder db.FastBatchInsertBuilder + table string +} + +// NewClaimableBalanceBatchInsertBuilder constructs a new ClaimableBalanceBatchInsertBuilder instance +func (q *Q) NewClaimableBalanceBatchInsertBuilder() ClaimableBalanceBatchInsertBuilder { + return &claimableBalanceBatchInsertBuilder{ + encodingBuffer: xdr.NewEncodingBuffer(), + builder: db.FastBatchInsertBuilder{}, + table: "claimable_balances", + } +} + +// Add adds a new claimable balance to the batch +func (i *claimableBalanceBatchInsertBuilder) Add(claimableBalance ClaimableBalance) error { + return i.builder.RowStruct(claimableBalance) +} + +// Exec inserts claimable balance rows to the database +func (i *claimableBalanceBatchInsertBuilder) Exec(ctx context.Context, session db.SessionInterface) error { + return i.builder.Exec(ctx, session, i.table) +} + +func (i *claimableBalanceBatchInsertBuilder) Reset() error{ + i.builder.Reset() + return nil +} diff --git a/services/horizon/internal/db2/history/claimable_balance_claimant_batch_insert_builder.go b/services/horizon/internal/db2/history/claimable_balance_claimant_batch_insert_builder.go index a1a13d84db..60a2af8419 100644 --- a/services/horizon/internal/db2/history/claimable_balance_claimant_batch_insert_builder.go +++ b/services/horizon/internal/db2/history/claimable_balance_claimant_batch_insert_builder.go @@ -10,33 +10,38 @@ import ( // ClaimableBalanceClaimantBatchInsertBuilder is used to insert transactions into the // history_transactions table type ClaimableBalanceClaimantBatchInsertBuilder interface { - Add(ctx context.Context, claimableBalanceClaimant ClaimableBalanceClaimant) error - Exec(ctx context.Context) error + Add(claimableBalanceClaimant ClaimableBalanceClaimant) error + Exec(ctx context.Context, session db.SessionInterface) error + Reset() error } -// ClaimableBalanceClaimantBatchInsertBuilder is a simple wrapper around db.BatchInsertBuilder +// ClaimableBalanceClaimantBatchInsertBuilder is a simple wrapper around db.FastBatchInsertBuilder type claimableBalanceClaimantBatchInsertBuilder struct { encodingBuffer *xdr.EncodingBuffer - builder db.BatchInsertBuilder + builder db.FastBatchInsertBuilder + table string } // NewClaimableBalanceClaimantBatchInsertBuilder constructs a new ClaimableBalanceClaimantBatchInsertBuilder instance -func (q *Q) NewClaimableBalanceClaimantBatchInsertBuilder(maxBatchSize int) ClaimableBalanceClaimantBatchInsertBuilder { +func (q *Q) NewClaimableBalanceClaimantBatchInsertBuilder() ClaimableBalanceClaimantBatchInsertBuilder { return &claimableBalanceClaimantBatchInsertBuilder{ encodingBuffer: xdr.NewEncodingBuffer(), - builder: db.BatchInsertBuilder{ - Table: q.GetTable("claimable_balance_claimants"), - MaxBatchSize: maxBatchSize, - Suffix: "ON CONFLICT (id, destination) DO UPDATE SET last_modified_ledger=EXCLUDED.last_modified_ledger", - }, + builder: db.FastBatchInsertBuilder{}, + table: "claimable_balance_claimants", } } -// Add adds a new transaction to the batch -func (i *claimableBalanceClaimantBatchInsertBuilder) Add(ctx context.Context, claimableBalanceClaimant ClaimableBalanceClaimant) error { - return i.builder.RowStruct(ctx, claimableBalanceClaimant) +// Add adds a new claimant for a claimable Balance to the batch +func (i *claimableBalanceClaimantBatchInsertBuilder) Add(claimableBalanceClaimant ClaimableBalanceClaimant) error { + return i.builder.RowStruct(claimableBalanceClaimant) } -func (i *claimableBalanceClaimantBatchInsertBuilder) Exec(ctx context.Context) error { - return i.builder.Exec(ctx) +// Exec flushes the entire batch into the database +func (i *claimableBalanceClaimantBatchInsertBuilder) Exec(ctx context.Context, session db.SessionInterface) error { + return i.builder.Exec(ctx, session, i.table) +} + +func (i *claimableBalanceClaimantBatchInsertBuilder) Reset() error { + i.builder.Reset() + return nil } diff --git a/services/horizon/internal/db2/history/claimable_balances.go b/services/horizon/internal/db2/history/claimable_balances.go index 48722a11c3..e10536da4e 100644 --- a/services/horizon/internal/db2/history/claimable_balances.go +++ b/services/horizon/internal/db2/history/claimable_balances.go @@ -126,12 +126,12 @@ type Claimant struct { // QClaimableBalances defines claimable-balance-related related queries. type QClaimableBalances interface { - UpsertClaimableBalances(ctx context.Context, cb []ClaimableBalance) error RemoveClaimableBalances(ctx context.Context, ids []string) (int64, error) RemoveClaimableBalanceClaimants(ctx context.Context, ids []string) (int64, error) GetClaimableBalancesByID(ctx context.Context, ids []string) ([]ClaimableBalance, error) CountClaimableBalances(ctx context.Context) (int, error) - NewClaimableBalanceClaimantBatchInsertBuilder(maxBatchSize int) ClaimableBalanceClaimantBatchInsertBuilder + NewClaimableBalanceClaimantBatchInsertBuilder() ClaimableBalanceClaimantBatchInsertBuilder + NewClaimableBalanceBatchInsertBuilder() ClaimableBalanceBatchInsertBuilder GetClaimantsByClaimableBalances(ctx context.Context, ids []string) (map[string][]ClaimableBalanceClaimant, error) } @@ -171,36 +171,6 @@ func (q *Q) GetClaimantsByClaimableBalances(ctx context.Context, ids []string) ( return claimantsMap, err } -// UpsertClaimableBalances upserts a batch of claimable balances in the claimable_balances table. -// There's currently no limit of the number of offers this method can -// accept other than 2GB limit of the query string length what should be enough -// for each ledger with the current limits. -func (q *Q) UpsertClaimableBalances(ctx context.Context, cbs []ClaimableBalance) error { - var id, claimants, asset, amount, sponsor, lastModifiedLedger, flags []interface{} - - for _, cb := range cbs { - id = append(id, cb.BalanceID) - claimants = append(claimants, cb.Claimants) - asset = append(asset, cb.Asset) - amount = append(amount, cb.Amount) - sponsor = append(sponsor, cb.Sponsor) - lastModifiedLedger = append(lastModifiedLedger, cb.LastModifiedLedger) - flags = append(flags, cb.Flags) - } - - upsertFields := []upsertField{ - {"id", "text", id}, - {"claimants", "jsonb", claimants}, - {"asset", "text", asset}, - {"amount", "bigint", amount}, - {"sponsor", "text", sponsor}, - {"last_modified_ledger", "integer", lastModifiedLedger}, - {"flags", "int", flags}, - } - - return q.upsertRows(ctx, "claimable_balances", "id", upsertFields) -} - // RemoveClaimableBalances deletes claimable balances table. // Returns number of rows affected and error. func (q *Q) RemoveClaimableBalances(ctx context.Context, ids []string) (int64, error) { diff --git a/services/horizon/internal/db2/history/claimable_balances_test.go b/services/horizon/internal/db2/history/claimable_balances_test.go index 238b03da68..c59c958ed8 100644 --- a/services/horizon/internal/db2/history/claimable_balances_test.go +++ b/services/horizon/internal/db2/history/claimable_balances_test.go @@ -1,6 +1,7 @@ package history import ( + "database/sql" "fmt" "testing" @@ -15,6 +16,8 @@ func TestRemoveClaimableBalance(t *testing.T) { defer tt.Finish() test.ResetHorizonDB(t, tt.HorizonDB) q := &Q{tt.HorizonSession()} + q.SessionInterface.BeginTx(tt.Ctx, &sql.TxOptions{}) + defer q.SessionInterface.Rollback() accountID := "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML" asset := xdr.MustNewCreditAsset("USD", accountID) @@ -39,8 +42,9 @@ func TestRemoveClaimableBalance(t *testing.T) { Amount: 10, } - err = q.UpsertClaimableBalances(tt.Ctx, []ClaimableBalance{cBalance}) - tt.Assert.NoError(err) + balanceBatchInsertBuilder := q.NewClaimableBalanceBatchInsertBuilder() + tt.Assert.NoError(balanceBatchInsertBuilder.Add(cBalance)) + tt.Assert.NoError(balanceBatchInsertBuilder.Exec(tt.Ctx, q.SessionInterface)) r, err := q.FindClaimableBalanceByID(tt.Ctx, id) tt.Assert.NoError(err) @@ -63,6 +67,8 @@ func TestRemoveClaimableBalanceClaimants(t *testing.T) { defer tt.Finish() test.ResetHorizonDB(t, tt.HorizonDB) q := &Q{tt.HorizonSession()} + q.SessionInterface.BeginTx(tt.Ctx, &sql.TxOptions{}) + defer q.SessionInterface.Rollback() accountID := "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML" asset := xdr.MustNewCreditAsset("USD", accountID) @@ -87,20 +93,9 @@ func TestRemoveClaimableBalanceClaimants(t *testing.T) { Amount: 10, } - claimantsInsertBuilder := q.NewClaimableBalanceClaimantBatchInsertBuilder(10) - - for _, claimant := range cBalance.Claimants { - claimant := ClaimableBalanceClaimant{ - BalanceID: cBalance.BalanceID, - Destination: claimant.Destination, - LastModifiedLedger: cBalance.LastModifiedLedger, - } - err = claimantsInsertBuilder.Add(tt.Ctx, claimant) - tt.Assert.NoError(err) - } - - err = claimantsInsertBuilder.Exec(tt.Ctx) - tt.Assert.NoError(err) + claimantsInsertBuilder := q.NewClaimableBalanceClaimantBatchInsertBuilder() + tt.Assert.NoError(insertClaimants(claimantsInsertBuilder, cBalance)) + tt.Assert.NoError(claimantsInsertBuilder.Exec(tt.Ctx, q.SessionInterface)) removed, err := q.RemoveClaimableBalanceClaimants(tt.Ctx, []string{id}) tt.Assert.NoError(err) @@ -113,6 +108,9 @@ func TestFindClaimableBalancesByDestination(t *testing.T) { test.ResetHorizonDB(t, tt.HorizonDB) q := &Q{tt.HorizonSession()} + q.SessionInterface.BeginTx(tt.Ctx, &sql.TxOptions{}) + defer q.Rollback() + dest1 := "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML" dest2 := "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H" @@ -138,19 +136,11 @@ func TestFindClaimableBalancesByDestination(t *testing.T) { Amount: 10, } - err = q.UpsertClaimableBalances(tt.Ctx, []ClaimableBalance{cBalance}) - tt.Assert.NoError(err) + balanceInsertBuilder := q.NewClaimableBalanceBatchInsertBuilder() + claimantsInsertBuilder := q.NewClaimableBalanceClaimantBatchInsertBuilder() - claimantsInsertBuilder := q.NewClaimableBalanceClaimantBatchInsertBuilder(10) - for _, claimant := range cBalance.Claimants { - claimant := ClaimableBalanceClaimant{ - BalanceID: cBalance.BalanceID, - Destination: claimant.Destination, - LastModifiedLedger: cBalance.LastModifiedLedger, - } - err = claimantsInsertBuilder.Add(tt.Ctx, claimant) - tt.Assert.NoError(err) - } + tt.Assert.NoError(balanceInsertBuilder.Add(cBalance)) + tt.Assert.NoError(insertClaimants(claimantsInsertBuilder, cBalance)) balanceID = xdr.ClaimableBalanceId{ Type: xdr.ClaimableBalanceIdTypeClaimableBalanceIdTypeV0, @@ -179,21 +169,11 @@ func TestFindClaimableBalancesByDestination(t *testing.T) { Amount: 10, } - err = q.UpsertClaimableBalances(tt.Ctx, []ClaimableBalance{cBalance}) - tt.Assert.NoError(err) + tt.Assert.NoError(balanceInsertBuilder.Add(cBalance)) + tt.Assert.NoError(insertClaimants(claimantsInsertBuilder, cBalance)) - for _, claimant := range cBalance.Claimants { - claimant := ClaimableBalanceClaimant{ - BalanceID: cBalance.BalanceID, - Destination: claimant.Destination, - LastModifiedLedger: cBalance.LastModifiedLedger, - } - err = claimantsInsertBuilder.Add(tt.Ctx, claimant) - tt.Assert.NoError(err) - } - - err = claimantsInsertBuilder.Exec(tt.Ctx) - tt.Assert.NoError(err) + tt.Assert.NoError(claimantsInsertBuilder.Exec(tt.Ctx, q.SessionInterface)) + tt.Assert.NoError(balanceInsertBuilder.Exec(tt.Ctx, q.SessionInterface)) query := ClaimableBalancesQuery{ PageQuery: db2.MustPageQuery("", false, "", 10), @@ -233,20 +213,19 @@ func TestFindClaimableBalancesByDestination(t *testing.T) { tt.Assert.Len(cbs, 1) } -func insertClaimants(q *Q, tt *test.T, cBalance ClaimableBalance) error { - claimantsInsertBuilder := q.NewClaimableBalanceClaimantBatchInsertBuilder(10) +func insertClaimants(claimantsInsertBuilder ClaimableBalanceClaimantBatchInsertBuilder, cBalance ClaimableBalance) error { for _, claimant := range cBalance.Claimants { claimant := ClaimableBalanceClaimant{ BalanceID: cBalance.BalanceID, Destination: claimant.Destination, LastModifiedLedger: cBalance.LastModifiedLedger, } - err := claimantsInsertBuilder.Add(tt.Ctx, claimant) + err := claimantsInsertBuilder.Add(claimant) if err != nil { return err } } - return claimantsInsertBuilder.Exec(tt.Ctx) + return nil } type claimableBalanceQueryResult struct { @@ -279,6 +258,9 @@ func TestFindClaimableBalancesByDestinationWithLimit(t *testing.T) { test.ResetHorizonDB(t, tt.HorizonDB) q := &Q{tt.HorizonSession()} + q.SessionInterface.BeginTx(tt.Ctx, &sql.TxOptions{}) + defer q.Rollback() + assetIssuer := "GA25GQLHJU3LPEJXEIAXK23AWEA5GWDUGRSHTQHDFT6HXHVMRULSQJUJ" asset1 := xdr.MustNewCreditAsset("ASSET1", assetIssuer) asset2 := xdr.MustNewCreditAsset("ASSET2", assetIssuer) @@ -318,8 +300,11 @@ func TestFindClaimableBalancesByDestinationWithLimit(t *testing.T) { LastModifiedLedger: 123, Amount: 10, } - err = q.UpsertClaimableBalances(tt.Ctx, []ClaimableBalance{cBalance1}) - tt.Assert.NoError(err) + balanceInsertBuilder := q.NewClaimableBalanceBatchInsertBuilder() + claimantsInsertBuilder := q.NewClaimableBalanceClaimantBatchInsertBuilder() + + tt.Assert.NoError(balanceInsertBuilder.Add(cBalance1)) + tt.Assert.NoError(insertClaimants(claimantsInsertBuilder, cBalance1)) claimants2 := []Claimant{ { @@ -345,14 +330,12 @@ func TestFindClaimableBalancesByDestinationWithLimit(t *testing.T) { LastModifiedLedger: 456, Amount: 10, } - err = q.UpsertClaimableBalances(tt.Ctx, []ClaimableBalance{cBalance2}) - tt.Assert.NoError(err) - err = insertClaimants(q, tt, cBalance1) - tt.Assert.NoError(err) + tt.Assert.NoError(balanceInsertBuilder.Add(cBalance2)) + tt.Assert.NoError(insertClaimants(claimantsInsertBuilder, cBalance2)) - err = insertClaimants(q, tt, cBalance2) - tt.Assert.NoError(err) + tt.Assert.NoError(claimantsInsertBuilder.Exec(tt.Ctx, q.SessionInterface)) + tt.Assert.NoError(balanceInsertBuilder.Exec(tt.Ctx, q.SessionInterface)) pageQuery := db2.MustPageQuery("", false, "", 1) @@ -414,73 +397,15 @@ func TestFindClaimableBalancesByDestinationWithLimit(t *testing.T) { }) } -func TestUpdateClaimableBalance(t *testing.T) { - tt := test.Start(t) - defer tt.Finish() - test.ResetHorizonDB(t, tt.HorizonDB) - q := &Q{tt.HorizonSession()} - - accountID := "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML" - lastModifiedLedgerSeq := xdr.Uint32(123) - asset := xdr.MustNewCreditAsset("USD", accountID) - balanceID := xdr.ClaimableBalanceId{ - Type: xdr.ClaimableBalanceIdTypeClaimableBalanceIdTypeV0, - V0: &xdr.Hash{1, 2, 3}, - } - id, err := xdr.MarshalHex(balanceID) - tt.Assert.NoError(err) - cBalance := ClaimableBalance{ - BalanceID: id, - Claimants: []Claimant{ - { - Destination: accountID, - Predicate: xdr.ClaimPredicate{ - Type: xdr.ClaimPredicateTypeClaimPredicateUnconditional, - }, - }, - }, - Asset: asset, - LastModifiedLedger: 123, - Amount: 10, - } - - err = q.UpsertClaimableBalances(tt.Ctx, []ClaimableBalance{cBalance}) - tt.Assert.NoError(err) - - // add sponsor - cBalance2 := ClaimableBalance{ - BalanceID: id, - Claimants: []Claimant{ - { - Destination: accountID, - Predicate: xdr.ClaimPredicate{ - Type: xdr.ClaimPredicateTypeClaimPredicateUnconditional, - }, - }, - }, - Asset: asset, - LastModifiedLedger: 123 + 1, - Amount: 10, - Sponsor: null.StringFrom("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), - } - - err = q.UpsertClaimableBalances(tt.Ctx, []ClaimableBalance{cBalance2}) - tt.Assert.NoError(err) - - cbs := []ClaimableBalance{} - err = q.Select(tt.Ctx, &cbs, selectClaimableBalances) - tt.Assert.NoError(err) - tt.Assert.Len(cbs, 1) - tt.Assert.Equal("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", cbs[0].Sponsor.String) - tt.Assert.Equal(uint32(lastModifiedLedgerSeq+1), cbs[0].LastModifiedLedger) -} - func TestFindClaimableBalance(t *testing.T) { tt := test.Start(t) defer tt.Finish() test.ResetHorizonDB(t, tt.HorizonDB) q := &Q{tt.HorizonSession()} + q.SessionInterface.BeginTx(tt.Ctx, &sql.TxOptions{}) + defer q.SessionInterface.Rollback() + accountID := "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML" asset := xdr.MustNewCreditAsset("USD", accountID) balanceID := xdr.ClaimableBalanceId{ @@ -504,11 +429,11 @@ func TestFindClaimableBalance(t *testing.T) { Amount: 10, } - err = q.UpsertClaimableBalances(tt.Ctx, []ClaimableBalance{cBalance}) - tt.Assert.NoError(err) + balanceInsertBuilder := q.NewClaimableBalanceBatchInsertBuilder() + tt.Assert.NoError(balanceInsertBuilder.Add(cBalance)) + tt.Assert.NoError(balanceInsertBuilder.Exec(tt.Ctx, q.SessionInterface)) cb, err := q.FindClaimableBalanceByID(tt.Ctx, id) - tt.Assert.NoError(err) tt.Assert.Equal(cBalance.BalanceID, cb.BalanceID) tt.Assert.Equal(cBalance.Asset, cb.Asset) @@ -525,6 +450,9 @@ func TestGetClaimableBalancesByID(t *testing.T) { test.ResetHorizonDB(t, tt.HorizonDB) q := &Q{tt.HorizonSession()} + q.SessionInterface.BeginTx(tt.Ctx, &sql.TxOptions{}) + defer q.SessionInterface.Rollback() + accountID := "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML" asset := xdr.MustNewCreditAsset("USD", accountID) balanceID := xdr.ClaimableBalanceId{ @@ -548,8 +476,9 @@ func TestGetClaimableBalancesByID(t *testing.T) { Amount: 10, } - err = q.UpsertClaimableBalances(tt.Ctx, []ClaimableBalance{cBalance}) - tt.Assert.NoError(err) + balanceInsertBuilder := q.NewClaimableBalanceBatchInsertBuilder() + tt.Assert.NoError(balanceInsertBuilder.Add(cBalance)) + tt.Assert.NoError(balanceInsertBuilder.Exec(tt.Ctx, q.SessionInterface)) r, err := q.GetClaimableBalancesByID(tt.Ctx, []string{id}) tt.Assert.NoError(err) diff --git a/services/horizon/internal/db2/history/mock_claimable_balance_batch_insert_builder.go b/services/horizon/internal/db2/history/mock_claimable_balance_batch_insert_builder.go new file mode 100644 index 0000000000..10a0ad699e --- /dev/null +++ b/services/horizon/internal/db2/history/mock_claimable_balance_batch_insert_builder.go @@ -0,0 +1,27 @@ +package history + +import ( + "context" + "github.com/stellar/go/support/db" + + "github.com/stretchr/testify/mock" +) + +type MockClaimableBalanceBatchInsertBuilder struct { + mock.Mock +} + +func (m *MockClaimableBalanceBatchInsertBuilder) Add(claimableBalance ClaimableBalance) error { + a := m.Called(claimableBalance) + return a.Error(0) +} + +func (m *MockClaimableBalanceBatchInsertBuilder) Exec(ctx context.Context, session db.SessionInterface) error { + a := m.Called(ctx, session) + return a.Error(0) +} + +func (m *MockClaimableBalanceBatchInsertBuilder) Reset() error { + a := m.Called() + return a.Error(0) +} diff --git a/services/horizon/internal/db2/history/mock_claimable_balance_claimant_batch_insert_builder.go b/services/horizon/internal/db2/history/mock_claimable_balance_claimant_batch_insert_builder.go index 63e2788e53..2f70d289d1 100644 --- a/services/horizon/internal/db2/history/mock_claimable_balance_claimant_batch_insert_builder.go +++ b/services/horizon/internal/db2/history/mock_claimable_balance_claimant_batch_insert_builder.go @@ -2,6 +2,7 @@ package history import ( "context" + "github.com/stellar/go/support/db" "github.com/stretchr/testify/mock" ) @@ -10,12 +11,17 @@ type MockClaimableBalanceClaimantBatchInsertBuilder struct { mock.Mock } -func (m *MockClaimableBalanceClaimantBatchInsertBuilder) Add(ctx context.Context, claimableBalanceClaimant ClaimableBalanceClaimant) error { - a := m.Called(ctx, claimableBalanceClaimant) +func (m *MockClaimableBalanceClaimantBatchInsertBuilder) Add(claimableBalanceClaimant ClaimableBalanceClaimant) error { + a := m.Called(claimableBalanceClaimant) return a.Error(0) } -func (m *MockClaimableBalanceClaimantBatchInsertBuilder) Exec(ctx context.Context) error { - a := m.Called(ctx) +func (m *MockClaimableBalanceClaimantBatchInsertBuilder) Exec(ctx context.Context, session db.SessionInterface) error { + a := m.Called(ctx, session) + return a.Error(0) +} + +func (m *MockClaimableBalanceClaimantBatchInsertBuilder) Reset() error { + a := m.Called() return a.Error(0) } diff --git a/services/horizon/internal/db2/history/mock_q_claimable_balances.go b/services/horizon/internal/db2/history/mock_q_claimable_balances.go index 2493d9ebea..64b65cf1a3 100644 --- a/services/horizon/internal/db2/history/mock_q_claimable_balances.go +++ b/services/horizon/internal/db2/history/mock_q_claimable_balances.go @@ -21,11 +21,6 @@ func (m *MockQClaimableBalances) GetClaimableBalancesByID(ctx context.Context, i return a.Get(0).([]ClaimableBalance), a.Error(1) } -func (m *MockQClaimableBalances) UpsertClaimableBalances(ctx context.Context, cbs []ClaimableBalance) error { - a := m.Called(ctx, cbs) - return a.Error(0) -} - func (m *MockQClaimableBalances) RemoveClaimableBalances(ctx context.Context, ids []string) (int64, error) { a := m.Called(ctx, ids) return a.Get(0).(int64), a.Error(1) @@ -36,11 +31,16 @@ func (m *MockQClaimableBalances) RemoveClaimableBalanceClaimants(ctx context.Con return a.Get(0).(int64), a.Error(1) } -func (m *MockQClaimableBalances) NewClaimableBalanceClaimantBatchInsertBuilder(maxBatchSize int) ClaimableBalanceClaimantBatchInsertBuilder { - a := m.Called(maxBatchSize) +func (m *MockQClaimableBalances) NewClaimableBalanceClaimantBatchInsertBuilder() ClaimableBalanceClaimantBatchInsertBuilder { + a := m.Called() return a.Get(0).(ClaimableBalanceClaimantBatchInsertBuilder) } +func (m *MockQClaimableBalances) NewClaimableBalanceBatchInsertBuilder() ClaimableBalanceBatchInsertBuilder { + a := m.Called() + return a.Get(0).(ClaimableBalanceBatchInsertBuilder) +} + func (m *MockQClaimableBalances) GetClaimantsByClaimableBalances(ctx context.Context, ids []string) (map[string][]ClaimableBalanceClaimant, error) { a := m.Called(ctx, ids) return a.Get(0).(map[string][]ClaimableBalanceClaimant), a.Error(1) diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index dbc586e8ff..4856821fa9 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -299,6 +299,7 @@ func NewSystem(config Config) (System, error) { ctx: ctx, config: config, historyQ: historyQ, + session: historyQ, historyAdapter: historyAdapter, filters: filters, }, diff --git a/services/horizon/internal/ingest/processor_runner.go b/services/horizon/internal/ingest/processor_runner.go index 41264ad262..d77e568bce 100644 --- a/services/horizon/internal/ingest/processor_runner.go +++ b/services/horizon/internal/ingest/processor_runner.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "github.com/stellar/go/support/db" "time" "github.com/stellar/go/ingest" @@ -88,6 +89,7 @@ type ProcessorRunner struct { ctx context.Context historyQ history.IngestionQ + session db.SessionInterface historyAdapter historyArchiveAdapterInterface logMemoryStats bool filters filters.Filters @@ -108,6 +110,7 @@ func (s *ProcessorRunner) DisableMemoryStatsLogging() { func buildChangeProcessor( historyQ history.IngestionQ, + session db.SessionInterface, changeStats *ingest.StatsChangeProcessor, source ingestionSource, ledgerSequence uint32, @@ -126,7 +129,7 @@ func buildChangeProcessor( processors.NewAssetStatsProcessor(historyQ, networkPassphrase, useLedgerCache), processors.NewSignersProcessor(historyQ, useLedgerCache), processors.NewTrustLinesProcessor(historyQ), - processors.NewClaimableBalancesChangeProcessor(historyQ), + processors.NewClaimableBalancesChangeProcessor(historyQ, session), processors.NewLiquidityPoolsChangeProcessor(historyQ, ledgerSequence), }) } @@ -226,6 +229,7 @@ func (s *ProcessorRunner) RunHistoryArchiveIngestion( changeStats := ingest.StatsChangeProcessor{} changeProcessor := buildChangeProcessor( s.historyQ, + s.session, &changeStats, historyArchiveSource, checkpointLedger, @@ -386,6 +390,7 @@ func (s *ProcessorRunner) RunAllProcessorsOnLedger(ledger xdr.LedgerCloseMeta) ( groupChangeProcessors := buildChangeProcessor( s.historyQ, + s.session, &changeStatsProcessor, ledgerSource, ledger.LedgerSequence(), diff --git a/services/horizon/internal/ingest/processor_runner_test.go b/services/horizon/internal/ingest/processor_runner_test.go index 507ba64da7..71b30d0e01 100644 --- a/services/horizon/internal/ingest/processor_runner_test.go +++ b/services/horizon/internal/ingest/processor_runner_test.go @@ -16,6 +16,7 @@ import ( "github.com/stellar/go/network" "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/services/horizon/internal/ingest/processors" + "github.com/stellar/go/support/db" "github.com/stellar/go/xdr" ) @@ -48,9 +49,10 @@ func TestProcessorRunnerRunHistoryArchiveIngestionGenesis(t *testing.T) { q.MockQSigners.On("NewAccountSignersBatchInsertBuilder", maxBatchSize). Return(mockAccountSignersBatchInsertBuilder).Once() - q.MockQClaimableBalances.On("NewClaimableBalanceClaimantBatchInsertBuilder", maxBatchSize). + q.MockQClaimableBalances.On("NewClaimableBalanceClaimantBatchInsertBuilder"). Return(&history.MockClaimableBalanceClaimantBatchInsertBuilder{}).Once() - + q.MockQClaimableBalances.On("NewClaimableBalanceBatchInsertBuilder"). + Return(&history.MockClaimableBalanceBatchInsertBuilder{}).Once() q.MockQAssetStats.On("InsertAssetStats", ctx, []history.ExpAssetStat{}, 100000). Return(nil) @@ -115,9 +117,10 @@ func TestProcessorRunnerRunHistoryArchiveIngestionHistoryArchive(t *testing.T) { mockAccountSignersBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() q.MockQSigners.On("NewAccountSignersBatchInsertBuilder", maxBatchSize). Return(mockAccountSignersBatchInsertBuilder).Once() - q.MockQClaimableBalances.On("NewClaimableBalanceClaimantBatchInsertBuilder", maxBatchSize). + q.MockQClaimableBalances.On("NewClaimableBalanceClaimantBatchInsertBuilder"). Return(&history.MockClaimableBalanceClaimantBatchInsertBuilder{}).Once() - + q.MockQClaimableBalances.On("NewClaimableBalanceBatchInsertBuilder"). + Return(&history.MockClaimableBalanceBatchInsertBuilder{}).Once() q.MockQAssetStats.On("InsertAssetStats", ctx, []history.ExpAssetStat{}, 100000). Return(nil) @@ -152,9 +155,10 @@ func TestProcessorRunnerRunHistoryArchiveIngestionProtocolVersionNotSupported(t defer mock.AssertExpectationsForObjects(t, mockAccountSignersBatchInsertBuilder) q.MockQSigners.On("NewAccountSignersBatchInsertBuilder", maxBatchSize). Return(mockAccountSignersBatchInsertBuilder).Once() - q.MockQClaimableBalances.On("NewClaimableBalanceClaimantBatchInsertBuilder", maxBatchSize). + q.MockQClaimableBalances.On("NewClaimableBalanceClaimantBatchInsertBuilder"). Return(&history.MockClaimableBalanceClaimantBatchInsertBuilder{}).Once() - + q.MockQClaimableBalances.On("NewClaimableBalanceBatchInsertBuilder"). + Return(&history.MockClaimableBalanceBatchInsertBuilder{}).Once() q.MockQAssetStats.On("InsertAssetStats", ctx, []history.ExpAssetStat{}, 100000). Return(nil) @@ -185,8 +189,10 @@ func TestProcessorRunnerBuildChangeProcessor(t *testing.T) { // Twice = checking ledgerSource and historyArchiveSource q.MockQSigners.On("NewAccountSignersBatchInsertBuilder", maxBatchSize). Return(&history.MockAccountSignersBatchInsertBuilder{}).Twice() - q.MockQClaimableBalances.On("NewClaimableBalanceClaimantBatchInsertBuilder", maxBatchSize). + q.MockQClaimableBalances.On("NewClaimableBalanceClaimantBatchInsertBuilder"). Return(&history.MockClaimableBalanceClaimantBatchInsertBuilder{}).Twice() + q.MockQClaimableBalances.On("NewClaimableBalanceBatchInsertBuilder"). + Return(&history.MockClaimableBalanceBatchInsertBuilder{}).Twice() runner := ProcessorRunner{ ctx: ctx, historyQ: q, @@ -194,7 +200,7 @@ func TestProcessorRunnerBuildChangeProcessor(t *testing.T) { } stats := &ingest.StatsChangeProcessor{} - processor := buildChangeProcessor(runner.historyQ, stats, ledgerSource, 123, "") + processor := buildChangeProcessor(runner.historyQ, &db.MockSession{}, stats, ledgerSource, 123, "") assert.IsType(t, &groupChangeProcessors{}, processor) assert.IsType(t, &statsChangeProcessor{}, processor.processors[0]) @@ -215,7 +221,7 @@ func TestProcessorRunnerBuildChangeProcessor(t *testing.T) { filters: &MockFilters{}, } - processor = buildChangeProcessor(runner.historyQ, stats, historyArchiveSource, 456, "") + processor = buildChangeProcessor(runner.historyQ, &db.MockSession{}, stats, historyArchiveSource, 456, "") assert.IsType(t, &groupChangeProcessors{}, processor) assert.IsType(t, &statsChangeProcessor{}, processor.processors[0]) @@ -242,7 +248,7 @@ func TestProcessorRunnerBuildTransactionProcessor(t *testing.T) { Return(&history.MockOperationsBatchInsertBuilder{}).Twice() // Twice = with/without failed q.MockQTransactions.On("NewTransactionBatchInsertBuilder", maxBatchSize). Return(&history.MockTransactionsBatchInsertBuilder{}).Twice() - q.MockQClaimableBalances.On("NewClaimableBalanceClaimantBatchInsertBuilder", maxBatchSize). + q.MockQClaimableBalances.On("NewClaimableBalanceClaimantBatchInsertBuilder"). Return(&history.MockClaimableBalanceClaimantBatchInsertBuilder{}).Twice() runner := ProcessorRunner{ @@ -310,9 +316,12 @@ func TestProcessorRunnerWithFilterEnabled(t *testing.T) { q.MockQTransactions.On("NewTransactionFilteredTmpBatchInsertBuilder", maxBatchSize). Return(mockTransactionsBatchInsertBuilder) - q.MockQClaimableBalances.On("NewClaimableBalanceClaimantBatchInsertBuilder", maxBatchSize). + q.MockQClaimableBalances.On("NewClaimableBalanceClaimantBatchInsertBuilder"). Return(&history.MockClaimableBalanceClaimantBatchInsertBuilder{}).Once() + q.MockQClaimableBalances.On("NewClaimableBalanceBatchInsertBuilder"). + Return(&history.MockClaimableBalanceBatchInsertBuilder{}).Once() + q.On("DeleteTransactionsFilteredTmpOlderThan", ctx, mock.AnythingOfType("uint64")). Return(int64(0), nil) @@ -369,9 +378,12 @@ func TestProcessorRunnerRunAllProcessorsOnLedger(t *testing.T) { q.MockQTransactions.On("NewTransactionBatchInsertBuilder", maxBatchSize). Return(mockTransactionsBatchInsertBuilder).Twice() - q.MockQClaimableBalances.On("NewClaimableBalanceClaimantBatchInsertBuilder", maxBatchSize). + q.MockQClaimableBalances.On("NewClaimableBalanceClaimantBatchInsertBuilder"). Return(&history.MockClaimableBalanceClaimantBatchInsertBuilder{}).Once() + q.MockQClaimableBalances.On("NewClaimableBalanceBatchInsertBuilder"). + Return(&history.MockClaimableBalanceBatchInsertBuilder{}).Once() + q.MockQLedgers.On("InsertLedger", ctx, ledger.V0.LedgerHeader, 0, 0, 0, 0, CurrentVersion). Return(int64(1), nil).Once() diff --git a/services/horizon/internal/ingest/processors/claimable_balances_change_processor.go b/services/horizon/internal/ingest/processors/claimable_balances_change_processor.go index a929927323..04579e47fc 100644 --- a/services/horizon/internal/ingest/processors/claimable_balances_change_processor.go +++ b/services/horizon/internal/ingest/processors/claimable_balances_change_processor.go @@ -6,20 +6,24 @@ import ( "github.com/stellar/go/ingest" "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/support/errors" + "github.com/stellar/go/support/db" "github.com/stellar/go/xdr" ) type ClaimableBalancesChangeProcessor struct { - encodingBuffer *xdr.EncodingBuffer - qClaimableBalances history.QClaimableBalances - cache *ingest.ChangeCompactor - claimantsInsertBuilder history.ClaimableBalanceClaimantBatchInsertBuilder + encodingBuffer *xdr.EncodingBuffer + qClaimableBalances history.QClaimableBalances + cache *ingest.ChangeCompactor + claimantsInsertBuilder history.ClaimableBalanceClaimantBatchInsertBuilder + claimableBalanceInsertBuilder history.ClaimableBalanceBatchInsertBuilder + session db.SessionInterface } -func NewClaimableBalancesChangeProcessor(Q history.QClaimableBalances) *ClaimableBalancesChangeProcessor { +func NewClaimableBalancesChangeProcessor(Q history.QClaimableBalances, session db.SessionInterface) *ClaimableBalancesChangeProcessor { p := &ClaimableBalancesChangeProcessor{ encodingBuffer: xdr.NewEncodingBuffer(), qClaimableBalances: Q, + session: session, } p.reset() return p @@ -27,7 +31,8 @@ func NewClaimableBalancesChangeProcessor(Q history.QClaimableBalances) *Claimabl func (p *ClaimableBalancesChangeProcessor) reset() { p.cache = ingest.NewChangeCompactor() - p.claimantsInsertBuilder = p.qClaimableBalances.NewClaimableBalanceClaimantBatchInsertBuilder(maxBatchSize) + p.claimantsInsertBuilder = p.qClaimableBalances.NewClaimableBalanceClaimantBatchInsertBuilder() + p.claimableBalanceInsertBuilder = p.qClaimableBalances.NewClaimableBalanceBatchInsertBuilder() } func (p *ClaimableBalancesChangeProcessor) ProcessChange(ctx context.Context, change ingest.Change) error { @@ -53,20 +58,19 @@ func (p *ClaimableBalancesChangeProcessor) ProcessChange(ctx context.Context, ch func (p *ClaimableBalancesChangeProcessor) Commit(ctx context.Context) error { var ( - cbsToUpsert []history.ClaimableBalance + cbsToInsert []history.ClaimableBalance cbIDsToDelete []string ) changes := p.cache.GetChanges() for _, change := range changes { - switch { - case change.Pre == nil && change.Post != nil: + if change.Post != nil { // Created row, err := p.ledgerEntryToRow(change.Post) if err != nil { return err } - cbsToUpsert = append(cbsToUpsert, row) - case change.Pre != nil && change.Post == nil: + cbsToInsert = append(cbsToInsert, row) + } else { // Removed cBalance := change.Pre.Data.MustClaimableBalance() id, err := p.encodingBuffer.MarshalHex(cBalance.BalanceId) @@ -74,39 +78,11 @@ func (p *ClaimableBalancesChangeProcessor) Commit(ctx context.Context) error { return err } cbIDsToDelete = append(cbIDsToDelete, id) - default: - // Updated - row, err := p.ledgerEntryToRow(change.Post) - if err != nil { - return err - } - cbsToUpsert = append(cbsToUpsert, row) } } - if len(cbsToUpsert) > 0 { - if err := p.qClaimableBalances.UpsertClaimableBalances(ctx, cbsToUpsert); err != nil { - return errors.Wrap(err, "error executing upsert") - } - - // Add ClaimableBalanceClaimants - for _, cb := range cbsToUpsert { - for _, claimant := range cb.Claimants { - claimant := history.ClaimableBalanceClaimant{ - BalanceID: cb.BalanceID, - Destination: claimant.Destination, - LastModifiedLedger: cb.LastModifiedLedger, - } - if err := p.claimantsInsertBuilder.Add(ctx, claimant); err != nil { - return errors.Wrap(err, "error adding to claimantsInsertBuilder") - } - } - } - - err := p.claimantsInsertBuilder.Exec(ctx) - if err != nil { - return errors.Wrap(err, "error executing claimantsInsertBuilder") - } + if err := p.InsertClaimableBalanceAndClaimants(ctx, cbsToInsert); err != nil { + return errors.Wrap(err, "error inserting claimable balance") } if len(cbIDsToDelete) > 0 { @@ -132,6 +108,45 @@ func (p *ClaimableBalancesChangeProcessor) Commit(ctx context.Context) error { return nil } +func (p *ClaimableBalancesChangeProcessor) InsertClaimableBalanceAndClaimants(ctx context.Context, claimableBalances []history.ClaimableBalance) error { + if len(claimableBalances) == 0 { + return nil + } + + defer p.claimantsInsertBuilder.Reset() + defer p.claimableBalanceInsertBuilder.Reset() + + for _, cb := range claimableBalances { + + if err := p.claimableBalanceInsertBuilder.Add(cb); err != nil { + return errors.Wrap(err, "error executing insert") + } + // Add claimants + for _, claimant := range cb.Claimants { + claimant := history.ClaimableBalanceClaimant{ + BalanceID: cb.BalanceID, + Destination: claimant.Destination, + LastModifiedLedger: cb.LastModifiedLedger, + } + + if err := p.claimantsInsertBuilder.Add(claimant); err != nil { + return errors.Wrap(err, "error adding to claimantsInsertBuilder") + } + } + } + + err := p.claimantsInsertBuilder.Exec(ctx, p.session) + if err != nil { + return errors.Wrap(err, "error executing claimableBalanceInsertBuilder") + } + + err = p.claimableBalanceInsertBuilder.Exec(ctx, p.session) + if err != nil { + return errors.Wrap(err, "error executing claimantsInsertBuilder") + } + return nil +} + func buildClaimants(claimants []xdr.Claimant) history.Claimants { hClaimants := history.Claimants{} for _, c := range claimants { diff --git a/services/horizon/internal/ingest/processors/claimable_balances_change_processor_test.go b/services/horizon/internal/ingest/processors/claimable_balances_change_processor_test.go index 70cd43bba5..e4fa4c2d2d 100644 --- a/services/horizon/internal/ingest/processors/claimable_balances_change_processor_test.go +++ b/services/horizon/internal/ingest/processors/claimable_balances_change_processor_test.go @@ -11,6 +11,7 @@ import ( "github.com/stellar/go/ingest" "github.com/stellar/go/services/horizon/internal/db2/history" + "github.com/stellar/go/support/db" "github.com/stellar/go/xdr" ) @@ -20,21 +21,32 @@ func TestClaimableBalancesChangeProcessorTestSuiteState(t *testing.T) { type ClaimableBalancesChangeProcessorTestSuiteState struct { suite.Suite - ctx context.Context - processor *ClaimableBalancesChangeProcessor - mockQ *history.MockQClaimableBalances - mockBatchInsertBuilder *history.MockClaimableBalanceClaimantBatchInsertBuilder + ctx context.Context + processor *ClaimableBalancesChangeProcessor + mockQ *history.MockQClaimableBalances + mockClaimantsBatchInsertBuilder *history.MockClaimableBalanceClaimantBatchInsertBuilder + mockClaimableBalanceBatchInsertBuilder *history.MockClaimableBalanceBatchInsertBuilder + session db.SessionInterface } func (s *ClaimableBalancesChangeProcessorTestSuiteState) SetupTest() { s.ctx = context.Background() - s.mockBatchInsertBuilder = &history.MockClaimableBalanceClaimantBatchInsertBuilder{} + s.mockClaimantsBatchInsertBuilder = &history.MockClaimableBalanceClaimantBatchInsertBuilder{} + s.mockClaimableBalanceBatchInsertBuilder = &history.MockClaimableBalanceBatchInsertBuilder{} + s.mockQ = &history.MockQClaimableBalances{} + s.session = &db.MockSession{} + s.mockQ. + On("NewClaimableBalanceClaimantBatchInsertBuilder"). + Return(s.mockClaimantsBatchInsertBuilder).Once() s.mockQ. - On("NewClaimableBalanceClaimantBatchInsertBuilder", maxBatchSize). - Return(s.mockBatchInsertBuilder).Once() + On("NewClaimableBalanceBatchInsertBuilder"). + Return(s.mockClaimableBalanceBatchInsertBuilder).Once() + + s.mockClaimantsBatchInsertBuilder.On("Reset").Return(nil).Once() + s.mockClaimableBalanceBatchInsertBuilder.On("Reset").Return(nil).Once() - s.processor = NewClaimableBalancesChangeProcessor(s.mockQ) + s.processor = NewClaimableBalancesChangeProcessor(s.mockQ, s.session) } func (s *ClaimableBalancesChangeProcessorTestSuiteState) TearDownTest() { @@ -67,27 +79,27 @@ func (s *ClaimableBalancesChangeProcessorTestSuiteState) TestCreatesClaimableBal } id, err := xdr.MarshalHex(balanceID) s.Assert().NoError(err) - s.mockQ.On("UpsertClaimableBalances", s.ctx, []history.ClaimableBalance{ - { - BalanceID: id, - Claimants: []history.Claimant{ - { - Destination: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", - }, + s.mockClaimableBalanceBatchInsertBuilder.On("Add", history.ClaimableBalance{ + BalanceID: id, + Claimants: []history.Claimant{ + { + Destination: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", }, - Asset: cBalance.Asset, - Amount: cBalance.Amount, - LastModifiedLedger: uint32(lastModifiedLedgerSeq), }, + Asset: cBalance.Asset, + Amount: cBalance.Amount, + LastModifiedLedger: uint32(lastModifiedLedgerSeq), }).Return(nil).Once() - s.mockBatchInsertBuilder.On("Add", s.ctx, history.ClaimableBalanceClaimant{ + s.mockClaimableBalanceBatchInsertBuilder.On("Exec", s.ctx, s.session).Return(nil).Once() + + s.mockClaimantsBatchInsertBuilder.On("Add", history.ClaimableBalanceClaimant{ BalanceID: id, Destination: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", LastModifiedLedger: uint32(lastModifiedLedgerSeq), }).Return(nil).Once() - s.mockBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once() + s.mockClaimantsBatchInsertBuilder.On("Exec", s.ctx, s.session).Return(nil).Once() err = s.processor.ProcessChange(s.ctx, ingest.Change{ Type: xdr.LedgerEntryTypeClaimableBalance, @@ -109,25 +121,39 @@ func TestClaimableBalancesChangeProcessorTestSuiteLedger(t *testing.T) { type ClaimableBalancesChangeProcessorTestSuiteLedger struct { suite.Suite - ctx context.Context - processor *ClaimableBalancesChangeProcessor - mockQ *history.MockQClaimableBalances - mockBatchInsertBuilder *history.MockClaimableBalanceClaimantBatchInsertBuilder + ctx context.Context + processor *ClaimableBalancesChangeProcessor + mockQ *history.MockQClaimableBalances + mockClaimantsBatchInsertBuilder *history.MockClaimableBalanceClaimantBatchInsertBuilder + mockClaimableBalanceBatchInsertBuilder *history.MockClaimableBalanceBatchInsertBuilder + session db.SessionInterface } func (s *ClaimableBalancesChangeProcessorTestSuiteLedger) SetupTest() { s.ctx = context.Background() - s.mockBatchInsertBuilder = &history.MockClaimableBalanceClaimantBatchInsertBuilder{} + s.mockClaimantsBatchInsertBuilder = &history.MockClaimableBalanceClaimantBatchInsertBuilder{} + s.mockClaimableBalanceBatchInsertBuilder = &history.MockClaimableBalanceBatchInsertBuilder{} s.mockQ = &history.MockQClaimableBalances{} + s.session = &db.MockSession{} + s.mockQ. + On("NewClaimableBalanceClaimantBatchInsertBuilder"). + Return(s.mockClaimantsBatchInsertBuilder).Twice() s.mockQ. - On("NewClaimableBalanceClaimantBatchInsertBuilder", maxBatchSize). - Return(s.mockBatchInsertBuilder).Once() + On("NewClaimableBalanceBatchInsertBuilder"). + Return(s.mockClaimableBalanceBatchInsertBuilder).Twice() + + s.mockClaimantsBatchInsertBuilder.On("Reset").Return(nil).Once() + s.mockClaimableBalanceBatchInsertBuilder.On("Reset").Return(nil).Once() - s.processor = NewClaimableBalancesChangeProcessor(s.mockQ) + s.mockClaimantsBatchInsertBuilder.On("Exec", s.ctx, s.session).Return(nil).Once() + s.mockClaimableBalanceBatchInsertBuilder.On("Exec", s.ctx, s.session).Return(nil).Once() + + s.processor = NewClaimableBalancesChangeProcessor(s.mockQ, s.session) } func (s *ClaimableBalancesChangeProcessorTestSuiteLedger) TearDownTest() { s.Assert().NoError(s.processor.Commit(s.ctx)) + s.processor.reset() s.mockQ.AssertExpectations(s.T()) } @@ -160,9 +186,23 @@ func (s *ClaimableBalancesChangeProcessorTestSuiteLedger) TestNewClaimableBalanc }, }, } - s.mockBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once() - err := s.processor.ProcessChange(s.ctx, ingest.Change{ + id, err := xdr.MarshalHex(balanceID) + s.Assert().NoError(err) + + // We use LedgerEntryChangesCache so all changes are squashed + s.mockClaimableBalanceBatchInsertBuilder.On( + "Add", + history.ClaimableBalance{ + BalanceID: id, + Claimants: []history.Claimant{}, + Asset: cBalance.Asset, + Amount: cBalance.Amount, + LastModifiedLedger: uint32(lastModifiedLedgerSeq), + }, + ).Return(nil).Once() + + err = s.processor.ProcessChange(s.ctx, ingest.Change{ Type: xdr.LedgerEntryTypeClaimableBalance, Pre: nil, Post: &entry, @@ -192,89 +232,16 @@ func (s *ClaimableBalancesChangeProcessorTestSuiteLedger) TestNewClaimableBalanc }) s.Assert().NoError(err) - id, err := xdr.MarshalHex(balanceID) - s.Assert().NoError(err) // We use LedgerEntryChangesCache so all changes are squashed - s.mockQ.On( - "UpsertClaimableBalances", - s.ctx, - []history.ClaimableBalance{ - { - BalanceID: id, - Claimants: []history.Claimant{}, - Asset: cBalance.Asset, - Amount: cBalance.Amount, - LastModifiedLedger: uint32(lastModifiedLedgerSeq), - Sponsor: null.StringFrom("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), - }, - }, - ).Return(nil).Once() -} - -func (s *ClaimableBalancesChangeProcessorTestSuiteLedger) TestUpdateClaimableBalance() { - balanceID := xdr.ClaimableBalanceId{ - Type: xdr.ClaimableBalanceIdTypeClaimableBalanceIdTypeV0, - V0: &xdr.Hash{1, 2, 3}, - } - cBalance := xdr.ClaimableBalanceEntry{ - BalanceId: balanceID, - Claimants: []xdr.Claimant{}, - Asset: xdr.MustNewCreditAsset("USD", "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), - Amount: 10, - } - lastModifiedLedgerSeq := xdr.Uint32(123) - - pre := xdr.LedgerEntry{ - Data: xdr.LedgerEntryData{ - Type: xdr.LedgerEntryTypeClaimableBalance, - ClaimableBalance: &cBalance, - }, - LastModifiedLedgerSeq: lastModifiedLedgerSeq - 1, - Ext: xdr.LedgerEntryExt{ - V: 1, - V1: &xdr.LedgerEntryExtensionV1{ - SponsoringId: nil, - }, - }, - } - - // add sponsor - updated := xdr.LedgerEntry{ - Data: xdr.LedgerEntryData{ - Type: xdr.LedgerEntryTypeClaimableBalance, - ClaimableBalance: &cBalance, - }, - LastModifiedLedgerSeq: lastModifiedLedgerSeq, - Ext: xdr.LedgerEntryExt{ - V: 1, - V1: &xdr.LedgerEntryExtensionV1{ - SponsoringId: xdr.MustAddressPtr("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), - }, - }, - } - s.mockBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once() - - err := s.processor.ProcessChange(s.ctx, ingest.Change{ - Type: xdr.LedgerEntryTypeClaimableBalance, - Pre: &pre, - Post: &updated, - }) - s.Assert().NoError(err) - - id, err := xdr.MarshalHex(balanceID) - s.Assert().NoError(err) - s.mockQ.On( - "UpsertClaimableBalances", - s.ctx, - []history.ClaimableBalance{ - { - BalanceID: id, - Claimants: []history.Claimant{}, - Asset: cBalance.Asset, - Amount: cBalance.Amount, - LastModifiedLedger: uint32(lastModifiedLedgerSeq), - Sponsor: null.StringFrom("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), - }, + s.mockClaimableBalanceBatchInsertBuilder.On( + "Add", + history.ClaimableBalance{ + BalanceID: id, + Claimants: []history.Claimant{}, + Asset: cBalance.Asset, + Amount: cBalance.Amount, + LastModifiedLedger: uint32(lastModifiedLedgerSeq), + Sponsor: null.StringFrom("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), }, ).Return(nil).Once() } diff --git a/services/horizon/internal/ingest/verify_test.go b/services/horizon/internal/ingest/verify_test.go index 47d29cb6ee..e001a30076 100644 --- a/services/horizon/internal/ingest/verify_test.go +++ b/services/horizon/internal/ingest/verify_test.go @@ -271,8 +271,11 @@ func TestStateVerifierLockBusy(t *testing.T) { test.ResetHorizonDB(t, tt.HorizonDB) q := &history.Q{&db.Session{DB: tt.HorizonDB}} + q.SessionInterface.BeginTx(tt.Ctx, &sql.TxOptions{}) + defer q.SessionInterface.Rollback() + checkpointLedger := uint32(63) - changeProcessor := buildChangeProcessor(q, &ingest.StatsChangeProcessor{}, ledgerSource, checkpointLedger, "") + changeProcessor := buildChangeProcessor(q, q.SessionInterface, &ingest.StatsChangeProcessor{}, ledgerSource, checkpointLedger, "") gen := randxdr.NewGenerator() var changes []xdr.LedgerEntryChange @@ -324,8 +327,11 @@ func TestStateVerifier(t *testing.T) { test.ResetHorizonDB(t, tt.HorizonDB) q := &history.Q{&db.Session{DB: tt.HorizonDB}} + q.SessionInterface.BeginTx(tt.Ctx, &sql.TxOptions{}) + defer q.SessionInterface.Rollback() + checkpointLedger := uint32(63) - changeProcessor := buildChangeProcessor(q, &ingest.StatsChangeProcessor{}, ledgerSource, checkpointLedger, "") + changeProcessor := buildChangeProcessor(q, q.SessionInterface, &ingest.StatsChangeProcessor{}, ledgerSource, checkpointLedger, "") mockChangeReader := &ingest.MockChangeReader{} gen := randxdr.NewGenerator() @@ -354,6 +360,8 @@ func TestStateVerifier(t *testing.T) { tt.Assert.NoError(changeProcessor.Commit(tt.Ctx)) tt.Assert.Equal(len(xdr.LedgerEntryTypeMap), len(coverage)) + q.SessionInterface.Commit() + q.UpdateLastLedgerIngest(tt.Ctx, checkpointLedger) mockChangeReader.On("Read").Return(ingest.Change{}, io.EOF).Twice()