Skip to content

Commit

Permalink
services/horizon: Use COPY for inserting into trust_lines table
Browse files Browse the repository at this point in the history
  • Loading branch information
urvisavla committed Nov 14, 2023
1 parent 4bd1d00 commit 0e6eee6
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 128 deletions.
1 change: 1 addition & 0 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,7 @@ type QTrustLines interface {
GetTrustLinesByKeys(ctx context.Context, ledgerKeys []string) ([]TrustLine, error)
UpsertTrustLines(ctx context.Context, trustlines []TrustLine) error
RemoveTrustLines(ctx context.Context, ledgerKeys []string) (int64, error)
NewTrustLinesBatchInsertBuilder() TrustLinesBatchInsertBuilder
}

func (q *Q) NewAccountSignersBatchInsertBuilder() AccountSignersBatchInsertBuilder {
Expand Down
5 changes: 5 additions & 0 deletions services/horizon/internal/db2/history/mock_q_trust_lines.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,8 @@ func (m *MockQTrustLines) RemoveTrustLines(ctx context.Context, ledgerKeys []str
a := m.Called(ctx, ledgerKeys)
return a.Get(0).(int64), a.Error(1)
}

func (m *MockQTrustLines) NewTrustLinesBatchInsertBuilder() TrustLinesBatchInsertBuilder {
a := m.Called()
return a.Get(0).(TrustLinesBatchInsertBuilder)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package history

import (
"context"

"github.com/stretchr/testify/mock"
)

type MockTrustLinesBatchInsertBuilder struct {
mock.Mock
}

func (m *MockTrustLinesBatchInsertBuilder) Add(line TrustLine) error {
a := m.Called(line)
return a.Error(0)
}

func (m *MockTrustLinesBatchInsertBuilder) Exec(ctx context.Context) error {
a := m.Called(ctx)
return a.Error(0)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package history

Check failure on line 1 in services/horizon/internal/db2/history/trust_lines_batch_insert_builder.go

View workflow job for this annotation

GitHub Actions / golangci

1-39 lines are duplicate of `services/horizon/internal/db2/history/claimable_balance_batch_insert_builder.go:1-40` (dupl)

import (
"context"

"github.com/stellar/go/support/db"
)

// TrustLinesBatchInsertBuilder is used to insert trustlines into the trust_lines table
type TrustLinesBatchInsertBuilder interface {
Add(line TrustLine) error
Exec(ctx context.Context) error
}

// trustLinesBatchInsertBuilder is a simple wrapper around db.FastBatchInsertBuilder
type trustLinesBatchInsertBuilder struct {
session db.SessionInterface
builder db.FastBatchInsertBuilder
table string
}

// NewTrustLinesBatchInsertBuilder constructs a new TrustLinesBatchInsertBuilder instance
func (q *Q) NewTrustLinesBatchInsertBuilder() TrustLinesBatchInsertBuilder {
return &trustLinesBatchInsertBuilder{
session: q,
builder: db.FastBatchInsertBuilder{},
table: "trust_lines",
}
}

// Add adds a new trustline to the batch
func (i *trustLinesBatchInsertBuilder) Add(line TrustLine) error {
return i.builder.RowStruct(line)
}

// Exec writes the batch of trust lines to the database.
func (i *trustLinesBatchInsertBuilder) Exec(ctx context.Context) error {
return i.builder.Exec(ctx, i.session, i.table)
}
25 changes: 24 additions & 1 deletion services/horizon/internal/ingest/processor_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ func TestProcessorRunnerRunHistoryArchiveIngestionGenesis(t *testing.T) {
mockOffersBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
q.MockQOffers.On("NewOffersBatchInsertBuilder").Return(mockOffersBatchInsertBuilder).Twice()

mockTrustLinesBatchInsertBuilder := &history.MockTrustLinesBatchInsertBuilder{}
mockTrustLinesBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
q.MockQTrustLines.On("NewTrustLinesBatchInsertBuilder").Return(mockTrustLinesBatchInsertBuilder).Twice()

q.MockQAssetStats.On("InsertAssetStats", ctx, []history.ExpAssetStat{}, 100000).
Return(nil)

Expand Down Expand Up @@ -158,6 +162,10 @@ func TestProcessorRunnerRunHistoryArchiveIngestionHistoryArchive(t *testing.T) {
mockOffersBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
q.MockQOffers.On("NewOffersBatchInsertBuilder").Return(mockOffersBatchInsertBuilder).Twice()

mockTrustLinesBatchInsertBuilder := &history.MockTrustLinesBatchInsertBuilder{}
mockTrustLinesBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
q.MockQTrustLines.On("NewTrustLinesBatchInsertBuilder").Return(mockTrustLinesBatchInsertBuilder).Twice()

q.MockQAssetStats.On("InsertAssetStats", ctx, []history.ExpAssetStat{}, 100000).
Return(nil)

Expand Down Expand Up @@ -215,6 +223,10 @@ func TestProcessorRunnerRunHistoryArchiveIngestionProtocolVersionNotSupported(t
mockOffersBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
q.MockQOffers.On("NewOffersBatchInsertBuilder").Return(mockOffersBatchInsertBuilder).Once()

mockTrustLinesBatchInsertBuilder := &history.MockTrustLinesBatchInsertBuilder{}
mockTrustLinesBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
q.MockQTrustLines.On("NewTrustLinesBatchInsertBuilder").Return(mockTrustLinesBatchInsertBuilder).Twice()

q.MockQAssetStats.On("InsertAssetStats", ctx, []history.ExpAssetStat{}, 100000).
Return(nil)

Expand Down Expand Up @@ -263,6 +275,9 @@ func TestProcessorRunnerBuildChangeProcessor(t *testing.T) {
mockOfferBatchInsertBuilder := &history.MockOffersBatchInsertBuilder{}
q.MockQOffers.On("NewOffersBatchInsertBuilder").Return(mockOfferBatchInsertBuilder).Twice()

mockTrustLinesBatchInsertBuilder := &history.MockTrustLinesBatchInsertBuilder{}
q.MockQTrustLines.On("NewTrustLinesBatchInsertBuilder").Return(mockTrustLinesBatchInsertBuilder).Twice()

runner := ProcessorRunner{
ctx: ctx,
historyQ: q,
Expand Down Expand Up @@ -405,6 +420,10 @@ func TestProcessorRunnerWithFilterEnabled(t *testing.T) {

mockLiquidityPoolBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()

mockTrustLinesBatchInsertBuilder := &history.MockTrustLinesBatchInsertBuilder{}
mockTrustLinesBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
q.MockQTrustLines.On("NewTrustLinesBatchInsertBuilder").Return(mockTrustLinesBatchInsertBuilder).Twice()

q.On("DeleteTransactionsFilteredTmpOlderThan", ctx, mock.AnythingOfType("uint64")).
Return(int64(0), nil)

Expand Down Expand Up @@ -603,9 +622,13 @@ func mockBatchBuilders(q *mockDBQ, mockSession *db.MockSession, ctx context.Cont
mockLiquidityPoolBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()

mockOfferBatchInsertBuilder := &history.MockOffersBatchInsertBuilder{}
mockOfferBatchInsertBuilder.On("Exec", ctx).Return(nil)
mockOfferBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
q.MockQOffers.On("NewOffersBatchInsertBuilder").Return(mockOfferBatchInsertBuilder)

mockTrustLinesBatchInsertBuilder := &history.MockTrustLinesBatchInsertBuilder{}
mockTrustLinesBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
q.MockQTrustLines.On("NewTrustLinesBatchInsertBuilder").Return(mockTrustLinesBatchInsertBuilder)

q.On("NewTradeBatchInsertBuilder").Return(&history.MockTradeBatchInsertBuilder{})

return []interface{}{mockAccountSignersBatchInsertBuilder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import (
type TrustLinesProcessor struct {
trustLinesQ history.QTrustLines

cache *ingest.ChangeCompactor
cache *ingest.ChangeCompactor
batchInsertBuilder history.TrustLinesBatchInsertBuilder
}

func NewTrustLinesProcessor(trustLinesQ history.QTrustLines) *TrustLinesProcessor {
Expand All @@ -23,6 +24,7 @@ func NewTrustLinesProcessor(trustLinesQ history.QTrustLines) *TrustLinesProcesso

func (p *TrustLinesProcessor) reset() {
p.cache = ingest.NewChangeCompactor()
p.batchInsertBuilder = p.trustLinesQ.NewTrustLinesBatchInsertBuilder()
}

func (p *TrustLinesProcessor) ProcessChange(ctx context.Context, change ingest.Change) error {
Expand All @@ -40,7 +42,6 @@ func (p *TrustLinesProcessor) ProcessChange(ctx context.Context, change ingest.C
if err != nil {
return errors.Wrap(err, "error in Commit")
}
p.reset()
}

return nil
Expand Down Expand Up @@ -97,18 +98,31 @@ func xdrToTrustline(ledgerEntry xdr.LedgerEntry) (history.TrustLine, error) {
}

func (p *TrustLinesProcessor) Commit(ctx context.Context) error {
defer p.reset()

var batchUpsertTrustLines []history.TrustLine
var batchRemoveTrustLineKeys []string

changes := p.cache.GetChanges()
for _, change := range changes {
switch {
case change.Post != nil:
tl, err := xdrToTrustline(*change.Post)
case change.Pre == nil && change.Post != nil:
// Created
line, err := xdrToTrustline(*change.Post)
if err != nil {
return errors.Wrap(err, "Error extracting trustline")
}

err = p.batchInsertBuilder.Add(line)
if err != nil {
return errors.Wrap(err, "Error adding to TrustLinesBatchInsertBuilder")
}
case change.Pre != nil && change.Post != nil:
// Updated
tl, err := xdrToTrustline(*change.Post)
if err != nil {
return errors.Wrap(err, "Error extracting trustline")
}
batchUpsertTrustLines = append(batchUpsertTrustLines, tl)
case change.Pre != nil && change.Post == nil:
// Removed
Expand All @@ -124,6 +138,11 @@ func (p *TrustLinesProcessor) Commit(ctx context.Context) error {
}
}

err := p.batchInsertBuilder.Exec(ctx)
if err != nil {
return errors.Wrap(err, "Error executing TrustLinesBatchInsertBuilder")
}

if len(batchUpsertTrustLines) > 0 {
err := p.trustLinesQ.UpsertTrustLines(ctx, batchUpsertTrustLines)
if err != nil {
Expand Down
Loading

0 comments on commit 0e6eee6

Please sign in to comment.