Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon/internal/db2/history: Insert and query rows from history lookup tables with one query #5415

Merged
merged 15 commits into from
Aug 23, 2024
170 changes: 120 additions & 50 deletions services/horizon/internal/db2/history/account_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/stellar/go/support/collections/set"
"github.com/stellar/go/support/db"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/ordered"
)

// FutureAccountID represents a future history account.
Expand All @@ -24,8 +23,6 @@ type FutureAccountID struct {
loader *AccountLoader
}

const loaderLookupBatchSize = 50000
tamirms marked this conversation as resolved.
Show resolved Hide resolved

// Value implements the database/sql/driver Valuer interface.
func (a FutureAccountID) Value() (driver.Value, error) {
return a.loader.GetNow(a.address)
Expand Down Expand Up @@ -85,22 +82,6 @@ func (a *AccountLoader) GetNow(address string) (int64, error) {
}
}

func (a *AccountLoader) lookupKeys(ctx context.Context, q *Q, addresses []string) error {
for i := 0; i < len(addresses); i += loaderLookupBatchSize {
end := ordered.Min(len(addresses), i+loaderLookupBatchSize)

var accounts []Account
if err := q.AccountsByAddresses(ctx, &accounts, addresses[i:end]); err != nil {
return errors.Wrap(err, "could not select accounts")
}

for _, account := range accounts {
a.ids[account.Address] = account.ID
}
}
return nil
}

// LoaderStats describes the result of executing a history lookup id loader
type LoaderStats struct {
// Total is the number of elements registered to the loader
Expand All @@ -122,47 +103,65 @@ func (a *AccountLoader) Exec(ctx context.Context, session db.SessionInterface) e
for address := range a.set {
addresses = append(addresses, address)
}

if err := a.lookupKeys(ctx, q, addresses); err != nil {
tamirms marked this conversation as resolved.
Show resolved Hide resolved
return err
}
a.stats.Total += len(addresses)

insert := 0
for _, address := range addresses {
if _, ok := a.ids[address]; ok {
continue
}
addresses[insert] = address
insert++
}
if insert == 0 {
return nil
}
addresses = addresses[:insert]
// sort entries before inserting rows to prevent deadlocks on acquiring a ShareLock
// https://github.com/stellar/go/issues/2370
sort.Strings(addresses)

var accounts []Account
err := bulkInsert(
ctx,
q,
"history_accounts",
[]string{"address"},
[]bulkInsertField{
[]columnValues{
{
name: "address",
dbType: "character varying(64)",
objects: addresses,
},
},
&accounts,
)
if err != nil {
return err
}
a.stats.Inserted += insert
for _, account := range accounts {
a.ids[account.Address] = account.ID
a.stats.Inserted++
}
a.stats.Total += len(accounts)

remaining := make([]string, 0, len(addresses))
for _, address := range addresses {
if _, ok := a.ids[address]; ok {
continue
}
remaining = append(remaining, address)
}
if len(remaining) > 0 {
var remainingAccounts []Account
err = bulkGet(
ctx,
q,
"history_accounts",
[]columnValues{
{
name: "address",
dbType: "character varying(64)",
objects: remaining,
},
},
&remainingAccounts,
)
if err != nil {
return err
}
for _, account := range remainingAccounts {
a.ids[account.Address] = account.ID
}
a.stats.Total += len(remainingAccounts)
}

return a.lookupKeys(ctx, q, addresses)
return nil
}

// Stats returns the number of addresses registered in the loader and the number of addresses
Expand All @@ -175,17 +174,40 @@ func (a *AccountLoader) Name() string {
return "AccountLoader"
}

type bulkInsertField struct {
type columnValues struct {
name string
dbType string
objects []string
}

func bulkInsert(ctx context.Context, q *Q, table string, conflictFields []string, fields []bulkInsertField) error {
func bulkInsert(ctx context.Context, q *Q, table string, fields []columnValues, response interface{}) error {
unnestPart := make([]string, 0, len(fields))
insertFieldsPart := make([]string, 0, len(fields))
pqArrays := make([]interface{}, 0, len(fields))

// In the code below we are building the bulk insert query which looks like:
//
// WITH rows AS
// (SELECT
// /* unnestPart */
// unnest(?::type1[]), /* field1 */
// unnest(?::type2[]), /* field2 */
// ...
// )
// INSERT INTO table (
// /* insertFieldsPart */
// field1,
// field2,
// ...
// )
// SELECT * FROM rows ON CONFLICT (field1, field2, ...) DO NOTHING RETURNING *
//
// Using unnest allows to get around the maximum limit of 65,535 query parameters,
// see https://www.postgresql.org/docs/12/limits.html and
// https://klotzandrew.com/blog/postgres-passing-65535-parameter-limit/
//
// Without using unnest we would have to use multiple insert statements to insert
// all the rows for large datasets.
for _, field := range fields {
unnestPart = append(
unnestPart,
Expand All @@ -200,21 +222,69 @@ func bulkInsert(ctx context.Context, q *Q, table string, conflictFields []string
pq.Array(field.objects),
)
}
columns := strings.Join(insertFieldsPart, ",")
tamirms marked this conversation as resolved.
Show resolved Hide resolved

sql := `
WITH r AS
WITH rows AS
(SELECT ` + strings.Join(unnestPart, ",") + `)
INSERT INTO ` + table + `
(` + strings.Join(insertFieldsPart, ",") + `)
SELECT * from r
ON CONFLICT (` + strings.Join(conflictFields, ",") + `) DO NOTHING`
(` + columns + `)
SELECT * FROM rows
ON CONFLICT (` + columns + `) DO NOTHING
RETURNING *`

return q.SelectRaw(
ctx,
response,
sql,
pqArrays...,
)
}

func bulkGet(ctx context.Context, q *Q, table string, fields []columnValues, response interface{}) error {
unnestPart := make([]string, 0, len(fields))
columns := make([]string, 0, len(fields))
pqArrays := make([]interface{}, 0, len(fields))

_, err := q.ExecRaw(
context.WithValue(ctx, &db.QueryTypeContextKey, db.UpsertQueryType),
// In the code below we are building the bulk get query which looks like:
//
// SELECT * FROM table WHERE (field1, field2, ...) IN
// (SELECT
// /* unnestPart */
// unnest(?::type1[]), /* field1 */
// unnest(?::type2[]), /* field2 */
// ...
// )
//
// Using unnest allows to get around the maximum limit of 65,535 query parameters,
// see https://www.postgresql.org/docs/12/limits.html and
// https://klotzandrew.com/blog/postgres-passing-65535-parameter-limit/
//
// Without using unnest we would have to use multiple select statements to obtain
// all the rows for large datasets.
for _, field := range fields {
unnestPart = append(
unnestPart,
fmt.Sprintf("unnest(?::%s[]) /* %s */", field.dbType, field.name),
)
columns = append(
columns,
field.name,
)
pqArrays = append(
pqArrays,
pq.Array(field.objects),
)
}
sql := `SELECT * FROM ` + table + ` WHERE (` + strings.Join(columns, ",") + `) IN
(SELECT ` + strings.Join(unnestPart, ",") + `)`

return q.SelectRaw(
ctx,
response,
sql,
pqArrays...,
)
return err
}

// AccountLoaderStub is a stub wrapper around AccountLoader which allows
Expand Down
26 changes: 26 additions & 0 deletions services/horizon/internal/db2/history/account_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,30 @@ func TestAccountLoader(t *testing.T) {
_, err = loader.GetNow("not present")
assert.Error(t, err)
assert.Contains(t, err.Error(), `was not found`)

// check that loader works when all the values are already present in the db
tamirms marked this conversation as resolved.
Show resolved Hide resolved
loader = NewAccountLoader()
for _, address := range addresses {
future := loader.GetFuture(address)
_, err = future.Value()
assert.Error(t, err)
assert.Contains(t, err.Error(), `invalid account loader state,`)
}

assert.NoError(t, loader.Exec(context.Background(), session))
assert.Equal(t, LoaderStats{
Total: 100,
Inserted: 0,
}, loader.Stats())

for _, address := range addresses {
var internalId int64
internalId, err = loader.GetNow(address)
assert.NoError(t, err)
var account Account
assert.NoError(t, q.AccountByAddress(context.Background(), &account, address))
assert.Equal(t, account.ID, internalId)
assert.Equal(t, account.Address, address)
}

}
Loading
Loading