Skip to content

Commit

Permalink
Use FastBatchInsertBuilder to insert to insert into claimable_balance…
Browse files Browse the repository at this point in the history
…s and claimable_balance_claimants tables
  • Loading branch information
urvisavla committed Oct 19, 2023
1 parent 851ad5c commit 83ac240
Show file tree
Hide file tree
Showing 14 changed files with 368 additions and 392 deletions.
76 changes: 30 additions & 46 deletions services/horizon/internal/actions/claimable_balance_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package actions

import (
"database/sql"
"net/http/httptest"
"testing"

Expand All @@ -20,6 +21,9 @@ func TestGetClaimableBalanceByID(t *testing.T) {
test.ResetHorizonDB(t, tt.HorizonDB)
q := &history.Q{tt.HorizonSession()}

q.SessionInterface.BeginTx(tt.Ctx, &sql.TxOptions{})
defer q.SessionInterface.Rollback()

accountID := "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"
asset := xdr.MustNewCreditAsset("USD", accountID)
balanceID := xdr.ClaimableBalanceId{
Expand All @@ -43,20 +47,22 @@ func TestGetClaimableBalanceByID(t *testing.T) {
LastModifiedLedger: 123,
}

err = q.UpsertClaimableBalances(tt.Ctx, []history.ClaimableBalance{cBalance})
tt.Assert.NoError(err)
balanceInsertBuilder := q.NewClaimableBalanceBatchInsertBuilder()
tt.Assert.NoError(balanceInsertBuilder.Add(cBalance))

claimantsInsertBuilder := q.NewClaimableBalanceClaimantBatchInsertBuilder(10)
claimantsInsertBuilder := q.NewClaimableBalanceClaimantBatchInsertBuilder()
for _, claimant := range cBalance.Claimants {
claimant := history.ClaimableBalanceClaimant{
BalanceID: cBalance.BalanceID,
Destination: claimant.Destination,
LastModifiedLedger: cBalance.LastModifiedLedger,
}
err = claimantsInsertBuilder.Add(tt.Ctx, claimant)
tt.Assert.NoError(err)
tt.Assert.NoError(claimantsInsertBuilder.Add(claimant))
}

tt.Assert.NoError(balanceInsertBuilder.Exec(tt.Ctx, q.SessionInterface))
tt.Assert.NoError(claimantsInsertBuilder.Exec(tt.Ctx, q.SessionInterface))

handler := GetClaimableBalanceByIDHandler{}
response, err := handler.GetResource(httptest.NewRecorder(), makeRequest(
t,
Expand Down Expand Up @@ -148,6 +154,9 @@ func TestGetClaimableBalances(t *testing.T) {
test.ResetHorizonDB(t, tt.HorizonDB)
q := &history.Q{tt.HorizonSession()}

q.SessionInterface.BeginTx(tt.Ctx, &sql.TxOptions{})
defer q.SessionInterface.Rollback()

entriesMeta := []struct {
id xdr.Hash
accountID string
Expand Down Expand Up @@ -187,25 +196,24 @@ func TestGetClaimableBalances(t *testing.T) {
hCBs = append(hCBs, cb)
}

err := q.UpsertClaimableBalances(tt.Ctx, hCBs)
tt.Assert.NoError(err)

claimantsInsertBuilder := q.NewClaimableBalanceClaimantBatchInsertBuilder(10)
balanceInsertbuilder := q.NewClaimableBalanceBatchInsertBuilder()
claimantsInsertBuilder := q.NewClaimableBalanceClaimantBatchInsertBuilder()

for _, cBalance := range hCBs {
tt.Assert.NoError(balanceInsertbuilder.Add(cBalance))

for _, claimant := range cBalance.Claimants {
claimant := history.ClaimableBalanceClaimant{
BalanceID: cBalance.BalanceID,
Destination: claimant.Destination,
LastModifiedLedger: cBalance.LastModifiedLedger,
}
err = claimantsInsertBuilder.Add(tt.Ctx, claimant)
tt.Assert.NoError(err)
tt.Assert.NoError(claimantsInsertBuilder.Add(claimant))
}
}

err = claimantsInsertBuilder.Exec(tt.Ctx)
tt.Assert.NoError(err)
tt.Assert.NoError(balanceInsertbuilder.Exec(tt.Ctx, q.SessionInterface))
tt.Assert.NoError(claimantsInsertBuilder.Exec(tt.Ctx, q.SessionInterface))

handler := GetClaimableBalancesHandler{}
response, err := handler.GetResourcePage(httptest.NewRecorder(), makeRequest(
Expand Down Expand Up @@ -284,11 +292,8 @@ func TestGetClaimableBalances(t *testing.T) {
tt.Assert.NoError(err)
tt.Assert.Len(response, 0)

// new claimable balances are ingest and one of them updated, they should appear in the next pages
cbToBeUpdated := hCBs[3]
cbToBeUpdated.Sponsor = null.StringFrom("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML")
cbToBeUpdated.LastModifiedLedger = 1238
q.UpsertClaimableBalances(tt.Ctx, []history.ClaimableBalance{cbToBeUpdated})
balanceInsertbuilder = q.NewClaimableBalanceBatchInsertBuilder()
claimantsInsertBuilder = q.NewClaimableBalanceClaimantBatchInsertBuilder()

entriesMeta = []struct {
id xdr.Hash
Expand Down Expand Up @@ -316,23 +321,21 @@ func TestGetClaimableBalances(t *testing.T) {
hCBs = append(hCBs, entry)
}

err = q.UpsertClaimableBalances(tt.Ctx, hCBs)
tt.Assert.NoError(err)

for _, cBalance := range hCBs {
tt.Assert.NoError(balanceInsertbuilder.Add(cBalance))

for _, claimant := range cBalance.Claimants {
claimant := history.ClaimableBalanceClaimant{
BalanceID: cBalance.BalanceID,
Destination: claimant.Destination,
LastModifiedLedger: cBalance.LastModifiedLedger,
}
err = claimantsInsertBuilder.Add(tt.Ctx, claimant)
tt.Assert.NoError(err)
tt.Assert.NoError(claimantsInsertBuilder.Add(claimant))
}
}

err = claimantsInsertBuilder.Exec(tt.Ctx)
tt.Assert.NoError(err)
tt.Assert.NoError(balanceInsertbuilder.Exec(tt.Ctx, q.SessionInterface))
tt.Assert.NoError(claimantsInsertBuilder.Exec(tt.Ctx, q.SessionInterface))

response, err = handler.GetResourcePage(httptest.NewRecorder(), makeRequest(
t,
Expand Down Expand Up @@ -372,21 +375,6 @@ func TestGetClaimableBalances(t *testing.T) {
q,
))

tt.Assert.NoError(err)
tt.Assert.Len(response, 1)

tt.Assert.Equal(cbToBeUpdated.BalanceID, response[0].(protocol.ClaimableBalance).BalanceID)

response, err = handler.GetResourcePage(httptest.NewRecorder(), makeRequest(
t,
map[string]string{
"limit": "2",
"cursor": response[0].(protocol.ClaimableBalance).PagingToken(),
},
map[string]string{},
q,
))

tt.Assert.NoError(err)
tt.Assert.Len(response, 0)

Expand All @@ -404,9 +392,7 @@ func TestGetClaimableBalances(t *testing.T) {
tt.Assert.NoError(err)
tt.Assert.Len(response, 2)

tt.Assert.Equal(cbToBeUpdated.BalanceID, response[0].(protocol.ClaimableBalance).BalanceID)

tt.Assert.Equal(hCBs[1].BalanceID, response[1].(protocol.ClaimableBalance).BalanceID)
tt.Assert.Equal(hCBs[1].BalanceID, response[0].(protocol.ClaimableBalance).BalanceID)

response, err = handler.GetResourcePage(httptest.NewRecorder(), makeRequest(
t,
Expand All @@ -422,8 +408,6 @@ func TestGetClaimableBalances(t *testing.T) {
tt.Assert.NoError(err)
tt.Assert.Len(response, 1)

tt.Assert.Equal(hCBs[0].BalanceID, response[0].(protocol.ClaimableBalance).BalanceID)

// filter by asset
response, err = handler.GetResourcePage(httptest.NewRecorder(), makeRequest(
t,
Expand Down Expand Up @@ -492,7 +476,7 @@ func TestGetClaimableBalances(t *testing.T) {
))

tt.Assert.NoError(err)
tt.Assert.Len(response, 3)
tt.Assert.Len(response, 2)
for _, resource := range response {
tt.Assert.Equal(
"GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package history

Check failure on line 1 in services/horizon/internal/db2/history/claimable_balance_batch_insert_builder.go

View workflow job for this annotation

GitHub Actions / golangci

1-47 lines are duplicate of `services/horizon/internal/db2/history/claimable_balance_claimant_batch_insert_builder.go:1-47` (dupl)

import (
"context"

"github.com/stellar/go/support/db"
"github.com/stellar/go/xdr"
)

// ClaimableBalanceBatchInsertBuilder is used to insert claimable balance into the
// claimable_balances table
type ClaimableBalanceBatchInsertBuilder interface {
Add(claimableBalance ClaimableBalance) error
Exec(ctx context.Context, session db.SessionInterface) error
Reset() error
}

// ClaimableBalanceBatchInsertBuilder is a simple wrapper around db.FastBatchInsertBuilder
type claimableBalanceBatchInsertBuilder struct {
encodingBuffer *xdr.EncodingBuffer
builder db.FastBatchInsertBuilder
table string
}

// NewClaimableBalanceBatchInsertBuilder constructs a new ClaimableBalanceBatchInsertBuilder instance
func (q *Q) NewClaimableBalanceBatchInsertBuilder() ClaimableBalanceBatchInsertBuilder {
return &claimableBalanceBatchInsertBuilder{
encodingBuffer: xdr.NewEncodingBuffer(),
builder: db.FastBatchInsertBuilder{},
table: "claimable_balances",
}
}

// Add adds a new claimable balance to the batch
func (i *claimableBalanceBatchInsertBuilder) Add(claimableBalance ClaimableBalance) error {
return i.builder.RowStruct(claimableBalance)
}

// Exec inserts claimable balance rows to the database
func (i *claimableBalanceBatchInsertBuilder) Exec(ctx context.Context, session db.SessionInterface) error {
return i.builder.Exec(ctx, session, i.table)
}

func (i *claimableBalanceBatchInsertBuilder) Reset() error{
i.builder.Reset()
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,33 +10,38 @@ import (
// ClaimableBalanceClaimantBatchInsertBuilder is used to insert transactions into the
// history_transactions table
type ClaimableBalanceClaimantBatchInsertBuilder interface {
Add(ctx context.Context, claimableBalanceClaimant ClaimableBalanceClaimant) error
Exec(ctx context.Context) error
Add(claimableBalanceClaimant ClaimableBalanceClaimant) error
Exec(ctx context.Context, session db.SessionInterface) error
Reset() error
}

// ClaimableBalanceClaimantBatchInsertBuilder is a simple wrapper around db.BatchInsertBuilder
// ClaimableBalanceClaimantBatchInsertBuilder is a simple wrapper around db.FastBatchInsertBuilder
type claimableBalanceClaimantBatchInsertBuilder struct {
encodingBuffer *xdr.EncodingBuffer
builder db.BatchInsertBuilder
builder db.FastBatchInsertBuilder
table string
}

// NewClaimableBalanceClaimantBatchInsertBuilder constructs a new ClaimableBalanceClaimantBatchInsertBuilder instance
func (q *Q) NewClaimableBalanceClaimantBatchInsertBuilder(maxBatchSize int) ClaimableBalanceClaimantBatchInsertBuilder {
func (q *Q) NewClaimableBalanceClaimantBatchInsertBuilder() ClaimableBalanceClaimantBatchInsertBuilder {
return &claimableBalanceClaimantBatchInsertBuilder{
encodingBuffer: xdr.NewEncodingBuffer(),
builder: db.BatchInsertBuilder{
Table: q.GetTable("claimable_balance_claimants"),
MaxBatchSize: maxBatchSize,
Suffix: "ON CONFLICT (id, destination) DO UPDATE SET last_modified_ledger=EXCLUDED.last_modified_ledger",
},
builder: db.FastBatchInsertBuilder{},
table: "claimable_balance_claimants",
}
}

// Add adds a new transaction to the batch
func (i *claimableBalanceClaimantBatchInsertBuilder) Add(ctx context.Context, claimableBalanceClaimant ClaimableBalanceClaimant) error {
return i.builder.RowStruct(ctx, claimableBalanceClaimant)
// Add adds a new claimant for a claimable Balance to the batch
func (i *claimableBalanceClaimantBatchInsertBuilder) Add(claimableBalanceClaimant ClaimableBalanceClaimant) error {
return i.builder.RowStruct(claimableBalanceClaimant)
}

func (i *claimableBalanceClaimantBatchInsertBuilder) Exec(ctx context.Context) error {
return i.builder.Exec(ctx)
// Exec flushes the entire batch into the database
func (i *claimableBalanceClaimantBatchInsertBuilder) Exec(ctx context.Context, session db.SessionInterface) error {
return i.builder.Exec(ctx, session, i.table)
}

func (i *claimableBalanceClaimantBatchInsertBuilder) Reset() error {
i.builder.Reset()
return nil
}
34 changes: 2 additions & 32 deletions services/horizon/internal/db2/history/claimable_balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,12 @@ type Claimant struct {

// QClaimableBalances defines claimable-balance-related related queries.
type QClaimableBalances interface {
UpsertClaimableBalances(ctx context.Context, cb []ClaimableBalance) error
RemoveClaimableBalances(ctx context.Context, ids []string) (int64, error)
RemoveClaimableBalanceClaimants(ctx context.Context, ids []string) (int64, error)
GetClaimableBalancesByID(ctx context.Context, ids []string) ([]ClaimableBalance, error)
CountClaimableBalances(ctx context.Context) (int, error)
NewClaimableBalanceClaimantBatchInsertBuilder(maxBatchSize int) ClaimableBalanceClaimantBatchInsertBuilder
NewClaimableBalanceClaimantBatchInsertBuilder() ClaimableBalanceClaimantBatchInsertBuilder
NewClaimableBalanceBatchInsertBuilder() ClaimableBalanceBatchInsertBuilder
GetClaimantsByClaimableBalances(ctx context.Context, ids []string) (map[string][]ClaimableBalanceClaimant, error)
}

Expand Down Expand Up @@ -171,36 +171,6 @@ func (q *Q) GetClaimantsByClaimableBalances(ctx context.Context, ids []string) (
return claimantsMap, err
}

// UpsertClaimableBalances upserts a batch of claimable balances in the claimable_balances table.
// There's currently no limit of the number of offers this method can
// accept other than 2GB limit of the query string length what should be enough
// for each ledger with the current limits.
func (q *Q) UpsertClaimableBalances(ctx context.Context, cbs []ClaimableBalance) error {
var id, claimants, asset, amount, sponsor, lastModifiedLedger, flags []interface{}

for _, cb := range cbs {
id = append(id, cb.BalanceID)
claimants = append(claimants, cb.Claimants)
asset = append(asset, cb.Asset)
amount = append(amount, cb.Amount)
sponsor = append(sponsor, cb.Sponsor)
lastModifiedLedger = append(lastModifiedLedger, cb.LastModifiedLedger)
flags = append(flags, cb.Flags)
}

upsertFields := []upsertField{
{"id", "text", id},
{"claimants", "jsonb", claimants},
{"asset", "text", asset},
{"amount", "bigint", amount},
{"sponsor", "text", sponsor},
{"last_modified_ledger", "integer", lastModifiedLedger},
{"flags", "int", flags},
}

return q.upsertRows(ctx, "claimable_balances", "id", upsertFields)
}

// RemoveClaimableBalances deletes claimable balances table.
// Returns number of rows affected and error.
func (q *Q) RemoveClaimableBalances(ctx context.Context, ids []string) (int64, error) {
Expand Down
Loading

0 comments on commit 83ac240

Please sign in to comment.