Skip to content

Commit

Permalink
services/horizon: Use COPY for inserting into accounts table
Browse files Browse the repository at this point in the history
  • Loading branch information
urvisavla committed Nov 16, 2023
1 parent 6cdae54 commit b0660a7
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 93 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package history

Check failure on line 1 in services/horizon/internal/db2/history/accounts_batch_insert_builder.go

View workflow job for this annotation

GitHub Actions / golangci

1-39 lines are duplicate of `services/horizon/internal/db2/history/claimable_balance_batch_insert_builder.go:1-40` (dupl)

import (
"context"

"github.com/stellar/go/support/db"
)

// AccountsBatchInsertBuilder is used to insert accounts into the accounts table
type AccountsBatchInsertBuilder interface {
Add(account AccountEntry) error
Exec(ctx context.Context) error
}

// AccountsBatchInsertBuilder is a simple wrapper around db.FastBatchInsertBuilder
type accountsBatchInsertBuilder struct {
session db.SessionInterface
builder db.FastBatchInsertBuilder
table string
}

// NewAccountsBatchInsertBuilder constructs a new AccountsBatchInsertBuilder instance
func (q *Q) NewAccountsBatchInsertBuilder() AccountsBatchInsertBuilder {
return &accountsBatchInsertBuilder{
session: q,
builder: db.FastBatchInsertBuilder{},
table: "accounts",
}
}

// Add adds a new account to the batch
func (i *accountsBatchInsertBuilder) Add(account AccountEntry) error {
return i.builder.RowStruct(account)
}

// Exec writes the batch of accounts to the database.
func (i *accountsBatchInsertBuilder) Exec(ctx context.Context) error {
return i.builder.Exec(ctx, i.session, i.table)
}
1 change: 1 addition & 0 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ type QAccounts interface {
GetAccountsByIDs(ctx context.Context, ids []string) ([]AccountEntry, error)
UpsertAccounts(ctx context.Context, accounts []AccountEntry) error
RemoveAccounts(ctx context.Context, accountIDs []string) (int64, error)
NewAccountsBatchInsertBuilder() AccountsBatchInsertBuilder
}

// AccountSigner is a row of data from the `accounts_signers` table
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package history

import (
"context"

"github.com/stretchr/testify/mock"
)

type MockAccountsBatchInsertBuilder struct {
mock.Mock
}

func (m *MockAccountsBatchInsertBuilder) Add(account AccountEntry) error {
a := m.Called(account)
return a.Error(0)
}

func (m *MockAccountsBatchInsertBuilder) Exec(ctx context.Context) error {
a := m.Called(ctx)
return a.Error(0)
}
5 changes: 5 additions & 0 deletions services/horizon/internal/db2/history/mock_q_accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,8 @@ func (m *MockQAccounts) RemoveAccounts(ctx context.Context, accountIDs []string)
a := m.Called(ctx, accountIDs)
return a.Get(0).(int64), a.Error(1)
}

func (m *MockQAccounts) NewAccountsBatchInsertBuilder() AccountsBatchInsertBuilder {
a := m.Called()
return a.Get(0).(AccountsBatchInsertBuilder)
}
48 changes: 27 additions & 21 deletions services/horizon/internal/ingest/processor_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,6 @@ func TestProcessorRunnerRunHistoryArchiveIngestionGenesis(t *testing.T) {

q := &mockDBQ{}

q.MockQAccounts.On("UpsertAccounts", ctx, []history.AccountEntry{
{
LastModifiedLedger: 1,
AccountID: "GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7",
Balance: int64(1000000000000000000),
SequenceNumber: 0,
SequenceTime: zero.IntFrom(0),
MasterWeight: 1,
},
}).Return(nil).Once()

batchBuilders := mockChangeProcessorBatchBuilders(q, ctx, true)
defer mock.AssertExpectationsForObjects(t, batchBuilders...)

Expand All @@ -49,6 +38,16 @@ func TestProcessorRunnerRunHistoryArchiveIngestionGenesis(t *testing.T) {
Sponsor: null.String{},
}).Return(nil).Once()

assert.IsType(t, &history.MockAccountsBatchInsertBuilder{}, batchBuilders[1])
batchBuilders[1].(*history.MockAccountsBatchInsertBuilder).On("Add", history.AccountEntry{
LastModifiedLedger: 1,
AccountID: "GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7",
Balance: int64(1000000000000000000),
SequenceNumber: 0,
SequenceTime: zero.IntFrom(0),
MasterWeight: 1,
}).Return(nil).Once()

q.MockQAssetStats.On("InsertAssetStats", ctx, []history.ExpAssetStat{}, 100000).
Return(nil)

Expand Down Expand Up @@ -94,16 +93,6 @@ func TestProcessorRunnerRunHistoryArchiveIngestionHistoryArchive(t *testing.T) {
nil,
).Once()

q.MockQAccounts.On("UpsertAccounts", ctx, []history.AccountEntry{
{
LastModifiedLedger: 1,
AccountID: "GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7",
Balance: int64(1000000000000000000),
SequenceNumber: 0,
MasterWeight: 1,
},
}).Return(nil).Once()

batchBuilders := mockChangeProcessorBatchBuilders(q, ctx, true)
defer mock.AssertExpectationsForObjects(t, batchBuilders...)

Expand All @@ -114,6 +103,15 @@ func TestProcessorRunnerRunHistoryArchiveIngestionHistoryArchive(t *testing.T) {
Weight: 1,
}).Return(nil).Once()

assert.IsType(t, &history.MockAccountsBatchInsertBuilder{}, batchBuilders[1])
batchBuilders[1].(*history.MockAccountsBatchInsertBuilder).On("Add", history.AccountEntry{
LastModifiedLedger: 1,
AccountID: "GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7",
Balance: int64(1000000000000000000),
SequenceNumber: 0,
MasterWeight: 1,
}).Return(nil).Once()

q.MockQAssetStats.On("InsertAssetStats", ctx, []history.ExpAssetStat{}, 100000).
Return(nil)

Expand Down Expand Up @@ -494,6 +492,13 @@ func mockChangeProcessorBatchBuilders(q *mockDBQ, ctx context.Context, mockExec
q.MockQSigners.On("NewAccountSignersBatchInsertBuilder").
Return(mockAccountSignersBatchInsertBuilder).Twice()

mockAccountsBatchInsertBuilder := &history.MockAccountsBatchInsertBuilder{}
if mockExec {
mockAccountsBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
}
q.MockQAccounts.On("NewAccountsBatchInsertBuilder").
Return(mockAccountsBatchInsertBuilder).Twice()

mockClaimableBalanceClaimantBatchInsertBuilder := &history.MockClaimableBalanceClaimantBatchInsertBuilder{}
if mockExec {
mockClaimableBalanceClaimantBatchInsertBuilder.On("Exec", ctx).
Expand Down Expand Up @@ -524,6 +529,7 @@ func mockChangeProcessorBatchBuilders(q *mockDBQ, ctx context.Context, mockExec
Return(mockOfferBatchInsertBuilder).Twice()

return []interface{}{mockAccountSignersBatchInsertBuilder,
mockAccountsBatchInsertBuilder,
mockClaimableBalanceBatchInsertBuilder,
mockClaimableBalanceClaimantBatchInsertBuilder,
mockLiquidityPoolBatchInsertBuilder,
Expand Down
23 changes: 19 additions & 4 deletions services/horizon/internal/ingest/processors/accounts_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (
type AccountsProcessor struct {
accountsQ history.QAccounts

cache *ingest.ChangeCompactor
cache *ingest.ChangeCompactor
batchInsertBuilder history.AccountsBatchInsertBuilder
}

func NewAccountsProcessor(accountsQ history.QAccounts) *AccountsProcessor {
Expand All @@ -24,6 +25,7 @@ func NewAccountsProcessor(accountsQ history.QAccounts) *AccountsProcessor {

func (p *AccountsProcessor) reset() {
p.cache = ingest.NewChangeCompactor()
p.batchInsertBuilder = p.accountsQ.NewAccountsBatchInsertBuilder()
}

func (p *AccountsProcessor) ProcessChange(ctx context.Context, change ingest.Change) error {
Expand All @@ -41,13 +43,14 @@ func (p *AccountsProcessor) ProcessChange(ctx context.Context, change ingest.Cha
if err != nil {
return errors.Wrap(err, "error in Commit")
}
p.reset()
}

return nil
}

func (p *AccountsProcessor) Commit(ctx context.Context) error {
defer p.reset()

batchUpsertAccounts := []history.AccountEntry{}
removeBatch := []string{}

Expand All @@ -63,8 +66,15 @@ func (p *AccountsProcessor) Commit(ctx context.Context) error {
}

switch {
case change.Post != nil:
// Created and updated
case change.Pre == nil && change.Post != nil:
// Created
row := p.ledgerEntryToRow(*change.Post)
err := p.batchInsertBuilder.Add(row)
if err != nil {
return errors.Wrap(err, "Error adding to AccountsBatchInsertBuilder")
}
case change.Pre != nil && change.Post != nil:
// Updated
row := p.ledgerEntryToRow(*change.Post)
batchUpsertAccounts = append(batchUpsertAccounts, row)
case change.Pre != nil && change.Post == nil:
Expand All @@ -77,6 +87,11 @@ func (p *AccountsProcessor) Commit(ctx context.Context) error {
}
}

err := p.batchInsertBuilder.Exec(ctx)
if err != nil {
return errors.Wrap(err, "Error executing AccountsBatchInsertBuilder")
}

// Upsert accounts
if len(batchUpsertAccounts) > 0 {
err := p.accountsQ.UpsertAccounts(ctx, batchUpsertAccounts)
Expand Down
Loading

0 comments on commit b0660a7

Please sign in to comment.