Skip to content

Commit

Permalink
services/horizon: use COPY for inserting into accounts_data table
Browse files Browse the repository at this point in the history
  • Loading branch information
urvisavla committed Nov 16, 2023
1 parent 4bd1d00 commit 85a0de6
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package history

import (
"context"

"github.com/stellar/go/support/db"
)

type AccountDataBatchInsertBuilder interface {
Add(data Data) error
Exec(ctx context.Context) error
}

type accountDataBatchInsertBuilder struct {
session db.SessionInterface
builder db.FastBatchInsertBuilder
table string
}

func (q *Q) NewAccountDataBatchInsertBuilder() AccountDataBatchInsertBuilder {
return &accountDataBatchInsertBuilder{
session: q,
builder: db.FastBatchInsertBuilder{},
table: "accounts_data",
}
}

// Add adds a new account data to the batch
func (i *accountDataBatchInsertBuilder) Add(data Data) error {
ledgerKey, err := accountDataKeyToString(AccountDataKey{
AccountID: data.AccountID,
DataName: data.Name,
})
if err != nil {
return err
}
return i.builder.Row(map[string]interface{}{
"ledger_key": ledgerKey,
"account_id": data.AccountID,
"name": data.Name,
"value": data.Value,
"last_modified_ledger": data.LastModifiedLedger,
"sponsor": data.Sponsor,
})
}

// Exec writes the batch of account data to the database.
func (i *accountDataBatchInsertBuilder) Exec(ctx context.Context) error {
return i.builder.Exec(ctx, i.session, i.table)
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (t *AccountDataValue) Scan(src interface{}) error {

// Value implements driver.Valuer
func (value AccountDataValue) Value() (driver.Value, error) {
return driver.Value([]uint8(base64.StdEncoding.EncodeToString(value))), nil
return driver.Value(base64.StdEncoding.EncodeToString(value)), nil
}

func (value AccountDataValue) Base64() string {
Expand Down
1 change: 1 addition & 0 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ type QData interface {
GetAccountDataByKeys(ctx context.Context, keys []AccountDataKey) ([]Data, error)
UpsertAccountData(ctx context.Context, data []Data) error
RemoveAccountData(ctx context.Context, keys []AccountDataKey) (int64, error)
NewAccountDataBatchInsertBuilder() AccountDataBatchInsertBuilder
}

// Asset is a row of data from the `history_assets` table
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package history

import (
"context"

"github.com/stretchr/testify/mock"
)

type MockAccountDataBatchInsertBuilder struct {
mock.Mock
}

func (m *MockAccountDataBatchInsertBuilder) Add(data Data) error {
a := m.Called(data)
return a.Error(0)
}

func (m *MockAccountDataBatchInsertBuilder) Exec(ctx context.Context) error {
a := m.Called(ctx)
return a.Error(0)
}
5 changes: 5 additions & 0 deletions services/horizon/internal/db2/history/mock_q_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,8 @@ func (m *MockQData) RemoveAccountData(ctx context.Context, keys []AccountDataKey
a := m.Called(ctx, keys)
return a.Get(0).(int64), a.Error(1)
}

func (m *MockQData) NewAccountDataBatchInsertBuilder() AccountDataBatchInsertBuilder {
a := m.Called()
return a.Get(0).(AccountDataBatchInsertBuilder)
}
28 changes: 28 additions & 0 deletions services/horizon/internal/ingest/processor_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ func TestProcessorRunnerRunHistoryArchiveIngestionGenesis(t *testing.T) {
mockOffersBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
q.MockQOffers.On("NewOffersBatchInsertBuilder").Return(mockOffersBatchInsertBuilder).Twice()

mockAccountDataBatchInsertBuilder := &history.MockAccountDataBatchInsertBuilder{}
mockAccountDataBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
q.MockQData.On("NewAccountDataBatchInsertBuilder").
Return(mockAccountDataBatchInsertBuilder).Twice()

q.MockQAssetStats.On("InsertAssetStats", ctx, []history.ExpAssetStat{}, 100000).
Return(nil)

Expand Down Expand Up @@ -158,6 +163,11 @@ func TestProcessorRunnerRunHistoryArchiveIngestionHistoryArchive(t *testing.T) {
mockOffersBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
q.MockQOffers.On("NewOffersBatchInsertBuilder").Return(mockOffersBatchInsertBuilder).Twice()

mockAccountDataBatchInsertBuilder := &history.MockAccountDataBatchInsertBuilder{}
mockAccountDataBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
q.MockQData.On("NewAccountDataBatchInsertBuilder").
Return(mockAccountDataBatchInsertBuilder).Twice()

q.MockQAssetStats.On("InsertAssetStats", ctx, []history.ExpAssetStat{}, 100000).
Return(nil)

Expand Down Expand Up @@ -215,6 +225,10 @@ func TestProcessorRunnerRunHistoryArchiveIngestionProtocolVersionNotSupported(t
mockOffersBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
q.MockQOffers.On("NewOffersBatchInsertBuilder").Return(mockOffersBatchInsertBuilder).Once()

mockAccountDataBatchInsertBuilder := &history.MockAccountDataBatchInsertBuilder{}
q.MockQData.On("NewAccountDataBatchInsertBuilder").
Return(mockAccountDataBatchInsertBuilder).Once()

q.MockQAssetStats.On("InsertAssetStats", ctx, []history.ExpAssetStat{}, 100000).
Return(nil)

Expand Down Expand Up @@ -263,6 +277,10 @@ func TestProcessorRunnerBuildChangeProcessor(t *testing.T) {
mockOfferBatchInsertBuilder := &history.MockOffersBatchInsertBuilder{}
q.MockQOffers.On("NewOffersBatchInsertBuilder").Return(mockOfferBatchInsertBuilder).Twice()

mockAccountDataBatchInsertBuilder := &history.MockAccountDataBatchInsertBuilder{}
q.MockQData.On("NewAccountDataBatchInsertBuilder").
Return(mockAccountDataBatchInsertBuilder).Twice()

runner := ProcessorRunner{
ctx: ctx,
historyQ: q,
Expand Down Expand Up @@ -405,6 +423,11 @@ func TestProcessorRunnerWithFilterEnabled(t *testing.T) {

mockLiquidityPoolBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()

mockAccountDataBatchInsertBuilder := &history.MockAccountDataBatchInsertBuilder{}
mockAccountDataBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
q.MockQData.On("NewAccountDataBatchInsertBuilder").
Return(mockAccountDataBatchInsertBuilder).Twice()

q.On("DeleteTransactionsFilteredTmpOlderThan", ctx, mock.AnythingOfType("uint64")).
Return(int64(0), nil)

Expand Down Expand Up @@ -606,6 +629,11 @@ func mockBatchBuilders(q *mockDBQ, mockSession *db.MockSession, ctx context.Cont
mockOfferBatchInsertBuilder.On("Exec", ctx).Return(nil)
q.MockQOffers.On("NewOffersBatchInsertBuilder").Return(mockOfferBatchInsertBuilder)

mockAccountDataBatchInsertBuilder := &history.MockAccountDataBatchInsertBuilder{}
mockAccountDataBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
q.MockQData.On("NewAccountDataBatchInsertBuilder").
Return(mockAccountDataBatchInsertBuilder).Twice()

q.On("NewTradeBatchInsertBuilder").Return(&history.MockTradeBatchInsertBuilder{})

return []interface{}{mockAccountSignersBatchInsertBuilder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import (
type AccountDataProcessor struct {
dataQ history.QData

cache *ingest.ChangeCompactor
cache *ingest.ChangeCompactor
batchInsertBuilder history.AccountDataBatchInsertBuilder
}

func NewAccountDataProcessor(dataQ history.QData) *AccountDataProcessor {
Expand All @@ -23,6 +24,7 @@ func NewAccountDataProcessor(dataQ history.QData) *AccountDataProcessor {

func (p *AccountDataProcessor) reset() {
p.cache = ingest.NewChangeCompactor()
p.batchInsertBuilder = p.dataQ.NewAccountDataBatchInsertBuilder()
}

func (p *AccountDataProcessor) ProcessChange(ctx context.Context, change ingest.Change) error {
Expand All @@ -41,13 +43,13 @@ func (p *AccountDataProcessor) ProcessChange(ctx context.Context, change ingest.
if err != nil {
return errors.Wrap(err, "error in Commit")
}
p.reset()
}

return nil
}

func (p *AccountDataProcessor) Commit(ctx context.Context) error {
defer p.reset()
var (
datasToUpsert []history.Data
datasToDelete []history.AccountDataKey
Expand All @@ -57,7 +59,10 @@ func (p *AccountDataProcessor) Commit(ctx context.Context) error {
switch {
case change.Pre == nil && change.Post != nil:
// Created
datasToUpsert = append(datasToUpsert, p.ledgerEntryToRow(change.Post))
err := p.batchInsertBuilder.Add(p.ledgerEntryToRow(change.Post))
if err != nil {
return errors.Wrap(err, "Error adding to AccountDataBatchInsertBuilder")
}
case change.Pre != nil && change.Post == nil:
// Removed
data := change.Pre.Data.MustData()
Expand All @@ -72,6 +77,11 @@ func (p *AccountDataProcessor) Commit(ctx context.Context) error {
}
}

err := p.batchInsertBuilder.Exec(ctx)
if err != nil {
return errors.Wrap(err, "Error executing AccountDataBatchInsertBuilder")
}

if len(datasToUpsert) > 0 {
if err := p.dataQ.UpsertAccountData(ctx, datasToUpsert); err != nil {
return errors.Wrap(err, "error executing upsert")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,21 @@ func TestAccountsDataProcessorTestSuiteState(t *testing.T) {

type AccountsDataProcessorTestSuiteState struct {
suite.Suite
ctx context.Context
processor *AccountDataProcessor
mockQ *history.MockQData
ctx context.Context
processor *AccountDataProcessor
mockQ *history.MockQData
mockAccountDataBatchInsertBuilder *history.MockAccountDataBatchInsertBuilder
}

func (s *AccountsDataProcessorTestSuiteState) SetupTest() {
s.ctx = context.Background()
s.mockQ = &history.MockQData{}

s.mockAccountDataBatchInsertBuilder = &history.MockAccountDataBatchInsertBuilder{}
s.mockQ.On("NewAccountDataBatchInsertBuilder").
Return(s.mockAccountDataBatchInsertBuilder)
s.mockAccountDataBatchInsertBuilder.On("Exec", s.ctx).Return(nil)

s.processor = NewAccountDataProcessor(s.mockQ)
}

Expand Down Expand Up @@ -60,7 +66,7 @@ func (s *AccountsDataProcessorTestSuiteState) TestCreatesAccounts() {
Value: history.AccountDataValue(data.DataValue),
LastModifiedLedger: uint32(entry.LastModifiedLedgerSeq),
}
s.mockQ.On("UpsertAccountData", s.ctx, []history.Data{historyData}).Return(nil).Once()
s.mockAccountDataBatchInsertBuilder.On("Add", historyData).Return(nil).Once()

err := s.processor.ProcessChange(s.ctx, ingest.Change{
Type: xdr.LedgerEntryTypeData,
Expand All @@ -76,15 +82,21 @@ func TestAccountsDataProcessorTestSuiteLedger(t *testing.T) {

type AccountsDataProcessorTestSuiteLedger struct {
suite.Suite
ctx context.Context
processor *AccountDataProcessor
mockQ *history.MockQData
ctx context.Context
processor *AccountDataProcessor
mockQ *history.MockQData
mockAccountDataBatchInsertBuilder *history.MockAccountDataBatchInsertBuilder
}

func (s *AccountsDataProcessorTestSuiteLedger) SetupTest() {
s.ctx = context.Background()
s.mockQ = &history.MockQData{}

s.mockAccountDataBatchInsertBuilder = &history.MockAccountDataBatchInsertBuilder{}
s.mockQ.On("NewAccountDataBatchInsertBuilder").
Return(s.mockAccountDataBatchInsertBuilder)
s.mockAccountDataBatchInsertBuilder.On("Exec", s.ctx).Return(nil)

s.processor = NewAccountDataProcessor(s.mockQ)
}

Expand Down Expand Up @@ -152,7 +164,7 @@ func (s *AccountsDataProcessorTestSuiteLedger) TestNewAccountData() {
Value: history.AccountDataValue(updatedData.DataValue),
LastModifiedLedger: uint32(updatedEntry.LastModifiedLedgerSeq),
}
s.mockQ.On("UpsertAccountData", s.ctx, []history.Data{historyData}).Return(nil).Once()
s.mockAccountDataBatchInsertBuilder.On("Add", historyData).Return(nil).Once()
}

func (s *AccountsDataProcessorTestSuiteLedger) TestUpdateAccountData() {
Expand Down Expand Up @@ -196,6 +208,7 @@ func (s *AccountsDataProcessorTestSuiteLedger) TestUpdateAccountData() {
Value: history.AccountDataValue(updatedData.DataValue),
LastModifiedLedger: uint32(updatedEntry.LastModifiedLedgerSeq),
}
s.mockAccountDataBatchInsertBuilder.On("Add", historyData).Return(nil).Once()
s.mockQ.On("UpsertAccountData", s.ctx, []history.Data{historyData}).Return(nil).Once()
}

Expand Down

0 comments on commit 85a0de6

Please sign in to comment.