From 0e49e6eedfbe74a3b5dc0a6f7fa5ae6e04dd7747 Mon Sep 17 00:00:00 2001 From: Urvi Date: Wed, 15 Nov 2023 23:03:32 -0800 Subject: [PATCH] services/horizon: use COPY for inserting into accounts_data table --- .../account_data_batch_insert_builder.go | 50 +++++++++++++++++++ .../db2/history/account_data_value.go | 4 +- services/horizon/internal/db2/history/main.go | 1 + .../mock_account_data_batch_insert_builder.go | 21 ++++++++ .../internal/db2/history/mock_q_data.go | 5 ++ .../internal/ingest/processor_runner_test.go | 8 +++ .../processors/account_data_processor.go | 16 ++++-- .../accounts_data_processor_test.go | 29 ++++++++--- 8 files changed, 122 insertions(+), 12 deletions(-) create mode 100644 services/horizon/internal/db2/history/account_data_batch_insert_builder.go create mode 100644 services/horizon/internal/db2/history/mock_account_data_batch_insert_builder.go diff --git a/services/horizon/internal/db2/history/account_data_batch_insert_builder.go b/services/horizon/internal/db2/history/account_data_batch_insert_builder.go new file mode 100644 index 0000000000..75c8ff6124 --- /dev/null +++ b/services/horizon/internal/db2/history/account_data_batch_insert_builder.go @@ -0,0 +1,50 @@ +package history + +import ( + "context" + + "github.com/stellar/go/support/db" +) + +type AccountDataBatchInsertBuilder interface { + Add(data Data) error + Exec(ctx context.Context) error +} + +type accountDataBatchInsertBuilder struct { + session db.SessionInterface + builder db.FastBatchInsertBuilder + table string +} + +func (q *Q) NewAccountDataBatchInsertBuilder() AccountDataBatchInsertBuilder { + return &accountDataBatchInsertBuilder{ + session: q, + builder: db.FastBatchInsertBuilder{}, + table: "accounts_data", + } +} + +// Add adds a new account data to the batch +func (i *accountDataBatchInsertBuilder) Add(data Data) error { + ledgerKey, err := accountDataKeyToString(AccountDataKey{ + AccountID: data.AccountID, + DataName: data.Name, + }) + if err != nil { + return err + } + return i.builder.Row(map[string]interface{}{ + "ledger_key": ledgerKey, + "account_id": data.AccountID, + "name": data.Name, + "value": data.Value, + "last_modified_ledger": data.LastModifiedLedger, + "sponsor": data.Sponsor, + }) +} + +// Exec writes the batch of account data to the database. +func (i *accountDataBatchInsertBuilder) Exec(ctx context.Context) error { + return i.builder.Exec(ctx, i.session, i.table) +} diff --git a/services/horizon/internal/db2/history/account_data_value.go b/services/horizon/internal/db2/history/account_data_value.go index efcd8d319b..693420e034 100644 --- a/services/horizon/internal/db2/history/account_data_value.go +++ b/services/horizon/internal/db2/history/account_data_value.go @@ -22,7 +22,9 @@ func (t *AccountDataValue) Scan(src interface{}) error { // Value implements driver.Valuer func (value AccountDataValue) Value() (driver.Value, error) { - return driver.Value([]uint8(base64.StdEncoding.EncodeToString(value))), nil + // Return string to bypass buggy encoding in pq driver for []byte. + // More info https://github.com/stellar/go/issues/5086#issuecomment-1773215436) + return driver.Value(base64.StdEncoding.EncodeToString(value)), nil } func (value AccountDataValue) Base64() string { diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index 850b8bef3c..428ba7d894 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -356,6 +356,7 @@ type QData interface { GetAccountDataByKeys(ctx context.Context, keys []AccountDataKey) ([]Data, error) UpsertAccountData(ctx context.Context, data []Data) error RemoveAccountData(ctx context.Context, keys []AccountDataKey) (int64, error) + NewAccountDataBatchInsertBuilder() AccountDataBatchInsertBuilder } // Asset is a row of data from the `history_assets` table diff --git a/services/horizon/internal/db2/history/mock_account_data_batch_insert_builder.go b/services/horizon/internal/db2/history/mock_account_data_batch_insert_builder.go new file mode 100644 index 0000000000..aa5af10730 --- /dev/null +++ b/services/horizon/internal/db2/history/mock_account_data_batch_insert_builder.go @@ -0,0 +1,21 @@ +package history + +import ( + "context" + + "github.com/stretchr/testify/mock" +) + +type MockAccountDataBatchInsertBuilder struct { + mock.Mock +} + +func (m *MockAccountDataBatchInsertBuilder) Add(data Data) error { + a := m.Called(data) + return a.Error(0) +} + +func (m *MockAccountDataBatchInsertBuilder) Exec(ctx context.Context) error { + a := m.Called(ctx) + return a.Error(0) +} diff --git a/services/horizon/internal/db2/history/mock_q_data.go b/services/horizon/internal/db2/history/mock_q_data.go index 3316aaa51b..8d418e896e 100644 --- a/services/horizon/internal/db2/history/mock_q_data.go +++ b/services/horizon/internal/db2/history/mock_q_data.go @@ -30,3 +30,8 @@ func (m *MockQData) RemoveAccountData(ctx context.Context, keys []AccountDataKey a := m.Called(ctx, keys) return a.Get(0).(int64), a.Error(1) } + +func (m *MockQData) NewAccountDataBatchInsertBuilder() AccountDataBatchInsertBuilder { + a := m.Called() + return a.Get(0).(AccountDataBatchInsertBuilder) +} diff --git a/services/horizon/internal/ingest/processor_runner_test.go b/services/horizon/internal/ingest/processor_runner_test.go index b932a05129..c25156a0a7 100644 --- a/services/horizon/internal/ingest/processor_runner_test.go +++ b/services/horizon/internal/ingest/processor_runner_test.go @@ -523,10 +523,18 @@ func mockChangeProcessorBatchBuilders(q *mockDBQ, ctx context.Context, mockExec q.MockQOffers.On("NewOffersBatchInsertBuilder"). Return(mockOfferBatchInsertBuilder).Twice() + mockAccountDataBatchInsertBuilder := &history.MockAccountDataBatchInsertBuilder{} + if mockExec { + mockAccountDataBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() + } + q.MockQData.On("NewAccountDataBatchInsertBuilder"). + Return(mockAccountDataBatchInsertBuilder).Twice() + return []interface{}{mockAccountSignersBatchInsertBuilder, mockClaimableBalanceBatchInsertBuilder, mockClaimableBalanceClaimantBatchInsertBuilder, mockLiquidityPoolBatchInsertBuilder, mockOfferBatchInsertBuilder, + mockAccountDataBatchInsertBuilder, } } diff --git a/services/horizon/internal/ingest/processors/account_data_processor.go b/services/horizon/internal/ingest/processors/account_data_processor.go index dfdbecb43e..f774700fbd 100644 --- a/services/horizon/internal/ingest/processors/account_data_processor.go +++ b/services/horizon/internal/ingest/processors/account_data_processor.go @@ -12,7 +12,8 @@ import ( type AccountDataProcessor struct { dataQ history.QData - cache *ingest.ChangeCompactor + cache *ingest.ChangeCompactor + batchInsertBuilder history.AccountDataBatchInsertBuilder } func NewAccountDataProcessor(dataQ history.QData) *AccountDataProcessor { @@ -23,6 +24,7 @@ func NewAccountDataProcessor(dataQ history.QData) *AccountDataProcessor { func (p *AccountDataProcessor) reset() { p.cache = ingest.NewChangeCompactor() + p.batchInsertBuilder = p.dataQ.NewAccountDataBatchInsertBuilder() } func (p *AccountDataProcessor) ProcessChange(ctx context.Context, change ingest.Change) error { @@ -41,13 +43,13 @@ func (p *AccountDataProcessor) ProcessChange(ctx context.Context, change ingest. if err != nil { return errors.Wrap(err, "error in Commit") } - p.reset() } return nil } func (p *AccountDataProcessor) Commit(ctx context.Context) error { + defer p.reset() var ( datasToUpsert []history.Data datasToDelete []history.AccountDataKey @@ -57,7 +59,10 @@ func (p *AccountDataProcessor) Commit(ctx context.Context) error { switch { case change.Pre == nil && change.Post != nil: // Created - datasToUpsert = append(datasToUpsert, p.ledgerEntryToRow(change.Post)) + err := p.batchInsertBuilder.Add(p.ledgerEntryToRow(change.Post)) + if err != nil { + return errors.Wrap(err, "Error adding to AccountDataBatchInsertBuilder") + } case change.Pre != nil && change.Post == nil: // Removed data := change.Pre.Data.MustData() @@ -72,6 +77,11 @@ func (p *AccountDataProcessor) Commit(ctx context.Context) error { } } + err := p.batchInsertBuilder.Exec(ctx) + if err != nil { + return errors.Wrap(err, "Error executing AccountDataBatchInsertBuilder") + } + if len(datasToUpsert) > 0 { if err := p.dataQ.UpsertAccountData(ctx, datasToUpsert); err != nil { return errors.Wrap(err, "error executing upsert") diff --git a/services/horizon/internal/ingest/processors/accounts_data_processor_test.go b/services/horizon/internal/ingest/processors/accounts_data_processor_test.go index 86ff1e13de..273cfcca4b 100644 --- a/services/horizon/internal/ingest/processors/accounts_data_processor_test.go +++ b/services/horizon/internal/ingest/processors/accounts_data_processor_test.go @@ -18,15 +18,21 @@ func TestAccountsDataProcessorTestSuiteState(t *testing.T) { type AccountsDataProcessorTestSuiteState struct { suite.Suite - ctx context.Context - processor *AccountDataProcessor - mockQ *history.MockQData + ctx context.Context + processor *AccountDataProcessor + mockQ *history.MockQData + mockAccountDataBatchInsertBuilder *history.MockAccountDataBatchInsertBuilder } func (s *AccountsDataProcessorTestSuiteState) SetupTest() { s.ctx = context.Background() s.mockQ = &history.MockQData{} + s.mockAccountDataBatchInsertBuilder = &history.MockAccountDataBatchInsertBuilder{} + s.mockQ.On("NewAccountDataBatchInsertBuilder"). + Return(s.mockAccountDataBatchInsertBuilder) + s.mockAccountDataBatchInsertBuilder.On("Exec", s.ctx).Return(nil) + s.processor = NewAccountDataProcessor(s.mockQ) } @@ -60,7 +66,7 @@ func (s *AccountsDataProcessorTestSuiteState) TestCreatesAccounts() { Value: history.AccountDataValue(data.DataValue), LastModifiedLedger: uint32(entry.LastModifiedLedgerSeq), } - s.mockQ.On("UpsertAccountData", s.ctx, []history.Data{historyData}).Return(nil).Once() + s.mockAccountDataBatchInsertBuilder.On("Add", historyData).Return(nil).Once() err := s.processor.ProcessChange(s.ctx, ingest.Change{ Type: xdr.LedgerEntryTypeData, @@ -76,15 +82,21 @@ func TestAccountsDataProcessorTestSuiteLedger(t *testing.T) { type AccountsDataProcessorTestSuiteLedger struct { suite.Suite - ctx context.Context - processor *AccountDataProcessor - mockQ *history.MockQData + ctx context.Context + processor *AccountDataProcessor + mockQ *history.MockQData + mockAccountDataBatchInsertBuilder *history.MockAccountDataBatchInsertBuilder } func (s *AccountsDataProcessorTestSuiteLedger) SetupTest() { s.ctx = context.Background() s.mockQ = &history.MockQData{} + s.mockAccountDataBatchInsertBuilder = &history.MockAccountDataBatchInsertBuilder{} + s.mockQ.On("NewAccountDataBatchInsertBuilder"). + Return(s.mockAccountDataBatchInsertBuilder) + s.mockAccountDataBatchInsertBuilder.On("Exec", s.ctx).Return(nil) + s.processor = NewAccountDataProcessor(s.mockQ) } @@ -152,7 +164,7 @@ func (s *AccountsDataProcessorTestSuiteLedger) TestNewAccountData() { Value: history.AccountDataValue(updatedData.DataValue), LastModifiedLedger: uint32(updatedEntry.LastModifiedLedgerSeq), } - s.mockQ.On("UpsertAccountData", s.ctx, []history.Data{historyData}).Return(nil).Once() + s.mockAccountDataBatchInsertBuilder.On("Add", historyData).Return(nil).Once() } func (s *AccountsDataProcessorTestSuiteLedger) TestUpdateAccountData() { @@ -196,6 +208,7 @@ func (s *AccountsDataProcessorTestSuiteLedger) TestUpdateAccountData() { Value: history.AccountDataValue(updatedData.DataValue), LastModifiedLedger: uint32(updatedEntry.LastModifiedLedgerSeq), } + s.mockAccountDataBatchInsertBuilder.On("Add", historyData).Return(nil).Once() s.mockQ.On("UpsertAccountData", s.ctx, []history.Data{historyData}).Return(nil).Once() }