From f2f4a0e085ed09426df7da6e13c299f8365b1b40 Mon Sep 17 00:00:00 2001 From: urvisavla Date: Thu, 15 Aug 2024 09:05:49 -0700 Subject: [PATCH 1/5] ingest: Add BufferedStorate ledger backend in README (#5427) --- ingest/README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ingest/README.md b/ingest/README.md index 277807f978..bace92c2d0 100644 --- a/ingest/README.md +++ b/ingest/README.md @@ -23,9 +23,9 @@ From a high level, the ingestion library is broken down into a few modular compo [ Ledger Backend ] | - | - Captive - Core + |---+---| + Captive Buffered + Core Storage ``` This is described in a little more detail in [`doc.go`](./doc.go), its accompanying examples, the documentation within this package, and the rest of this tutorial. From 26ab1acd813b9f2df77e8a4a139cbab90f1c952e Mon Sep 17 00:00:00 2001 From: urvisavla Date: Mon, 19 Aug 2024 16:55:51 -0700 Subject: [PATCH 2/5] Fix inconsistent debian version in Dockerfile (#5432) --- services/galexie/docker/Dockerfile | 2 +- services/horizon/docker/Dockerfile.dev | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/services/galexie/docker/Dockerfile b/services/galexie/docker/Dockerfile index 6614a40692..42ff2e43d9 100644 --- a/services/galexie/docker/Dockerfile +++ b/services/galexie/docker/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.22-bullseye AS builder +FROM golang:1.22-bookworm AS builder WORKDIR /go/src/github.com/stellar/go diff --git a/services/horizon/docker/Dockerfile.dev b/services/horizon/docker/Dockerfile.dev index 5cef8d89d1..91b0f8aeea 100644 --- a/services/horizon/docker/Dockerfile.dev +++ b/services/horizon/docker/Dockerfile.dev @@ -1,4 +1,4 @@ -FROM golang:1.22-bullseye AS builder +FROM golang:1.22-bookworm AS builder ARG VERSION="devel" WORKDIR /go/src/github.com/stellar/go From dfabe31b5b89cb235e81732d708a3f5bdeff507a Mon Sep 17 00:00:00 2001 From: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com> Date: Tue, 20 Aug 2024 10:19:02 +1000 Subject: [PATCH 3/5] Remove CLA reference from contributing document (#5435) * Remove CLA reference from contributing document * Update CONTRIBUTING.md --- services/horizon/CONTRIBUTING.md | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/services/horizon/CONTRIBUTING.md b/services/horizon/CONTRIBUTING.md index 26ba4d1ad0..c8c598382d 100644 --- a/services/horizon/CONTRIBUTING.md +++ b/services/horizon/CONTRIBUTING.md @@ -1,6 +1,3 @@ # How to contribute -Please read the [Contribution Guide](https://github.com/stellar/docs/blob/master/CONTRIBUTING.md). - -Then please [sign the Contributor License Agreement](https://docs.google.com/forms/d/1g7EF6PERciwn7zfmfke5Sir2n10yddGGSXyZsq98tVY/viewform?usp=send_form). - +Please read the [Contribution Guide](https://github.com/stellar/.github/blob/master/CONTRIBUTING.md). From 8257415ed3f62654d03f979132c8637632c6e230 Mon Sep 17 00:00:00 2001 From: mlo Date: Thu, 22 Aug 2024 13:52:21 -0700 Subject: [PATCH 4/5] Update Core version to v21.3.1 across CI (#5439) * Bump core versions to v21.3.0 * Update RPC image and incorporate into cache-busting * Remove unsupported Horizon / Captive Core flag in config --------- Co-authored-by: George --- .github/workflows/galexie-release.yml | 2 +- .github/workflows/galexie.yml | 2 +- .github/workflows/horizon.yml | 10 +++++----- .../captive-core-integration-tests.soroban-rpc.cfg | 2 +- services/horizon/internal/docs/GUIDE_FOR_DEVELOPERS.md | 4 ++-- services/horizon/internal/docs/TESTING_NOTES.md | 2 +- 6 files changed, 11 insertions(+), 11 deletions(-) diff --git a/.github/workflows/galexie-release.yml b/.github/workflows/galexie-release.yml index 18c441eae3..b84f3c1cf0 100644 --- a/.github/workflows/galexie-release.yml +++ b/.github/workflows/galexie-release.yml @@ -16,7 +16,7 @@ jobs: # this is the multi-arch index sha, get it by 'docker buildx imagetools inspect stellar/quickstart:testing' GALEXIE_INTEGRATION_TESTS_QUICKSTART_IMAGE: docker.io/stellar/quickstart:testing@sha256:03c6679f838a92b1eda4cd3a9e2bdee4c3586e278a138a0acf36a9bc99a0041f GALEXIE_INTEGRATION_TESTS_QUICKSTART_IMAGE_PULL: "false" - STELLAR_CORE_VERSION: 21.1.0-1921.b3aeb14cc.focal + STELLAR_CORE_VERSION: 21.3.1-2007.4ede19620.focal steps: - name: Set VERSION run: | diff --git a/.github/workflows/galexie.yml b/.github/workflows/galexie.yml index 458f23ca37..6406a7a897 100644 --- a/.github/workflows/galexie.yml +++ b/.github/workflows/galexie.yml @@ -10,7 +10,7 @@ jobs: name: Test runs-on: ubuntu-latest env: - CAPTIVE_CORE_DEBIAN_PKG_VERSION: 21.1.0-1921.b3aeb14cc.focal + CAPTIVE_CORE_DEBIAN_PKG_VERSION: 21.3.1-2007.4ede19620.focal GALEXIE_INTEGRATION_TESTS_ENABLED: "true" GALEXIE_INTEGRATION_TESTS_CAPTIVE_CORE_BIN: /usr/bin/stellar-core # this pins to a version of quickstart:testing that has the same version as GALEXIE_INTEGRATION_TESTS_CAPTIVE_CORE_BIN diff --git a/.github/workflows/horizon.yml b/.github/workflows/horizon.yml index 117e11bb6f..f315e27791 100644 --- a/.github/workflows/horizon.yml +++ b/.github/workflows/horizon.yml @@ -33,9 +33,9 @@ jobs: HORIZON_INTEGRATION_TESTS_ENABLED: true HORIZON_INTEGRATION_TESTS_CORE_MAX_SUPPORTED_PROTOCOL: ${{ matrix.protocol-version }} HORIZON_INTEGRATION_TESTS_CAPTIVE_CORE_USE_DB: true - PROTOCOL_21_CORE_DEBIAN_PKG_VERSION: 21.0.0-1872.c6f474133.focal - PROTOCOL_21_CORE_DOCKER_IMG: stellar/stellar-core:21 - PROTOCOL_21_SOROBAN_RPC_DOCKER_IMG: stellar/soroban-rpc:21.0.0-rc2-73 + PROTOCOL_21_CORE_DEBIAN_PKG_VERSION: 21.3.1-2007.4ede19620.focal + PROTOCOL_21_CORE_DOCKER_IMG: stellar/stellar-core:21.3.1-2007.4ede19620.focal + PROTOCOL_21_SOROBAN_RPC_DOCKER_IMG: stellar/soroban-rpc:21.4.1 PGHOST: localhost PGPORT: 5432 PGUSER: postgres @@ -98,7 +98,7 @@ jobs: - name: Calculate the source hash id: calculate_source_hash run: | - combined_hash=$(echo "horizon-hash-${{ hashFiles('./horizon') }}-${{ hashFiles('./clients/horizonclient/**') }}-${{ hashFiles('./protocols/horizon/**') }}-${{ hashFiles('./txnbuild/**') }}-${{ hashFiles('./ingest/**') }}-${{ hashFiles('./xdr/**') }}-${{ hashFiles('./services/**') }}-${{ env.PROTOCOL_21_CORE_DOCKER_IMG }}-${{ env.PREFIX }}" | sha256sum | cut -d ' ' -f 1) + combined_hash=$(echo "horizon-hash-${{ hashFiles('./horizon') }}-${{ hashFiles('./clients/horizonclient/**') }}-${{ hashFiles('./protocols/horizon/**') }}-${{ hashFiles('./txnbuild/**') }}-${{ hashFiles('./ingest/**') }}-${{ hashFiles('./xdr/**') }}-${{ hashFiles('./services/**') }}-${{ env.PROTOCOL_21_CORE_DOCKER_IMG }}-${{ env.PROTOCOL_21_RPC_DOCKER_IMG }}-${{ env.PROTOCOL_21_CORE_DEBIAN_PKG_VERSION }}-${{ env.PREFIX }}" | sha256sum | cut -d ' ' -f 1) echo "COMBINED_SOURCE_HASH=$combined_hash" >> "$GITHUB_ENV" - name: Restore Horizon binary and integration tests source hash to cache @@ -123,7 +123,7 @@ jobs: name: Test (and push) verify-range image runs-on: ubuntu-22.04 env: - STELLAR_CORE_VERSION: 21.0.0-1872.c6f474133.focal + STELLAR_CORE_VERSION: 21.3.1-2007.4ede19620.focal CAPTIVE_CORE_STORAGE_PATH: /tmp steps: - uses: actions/checkout@v3 diff --git a/services/horizon/docker/captive-core-integration-tests.soroban-rpc.cfg b/services/horizon/docker/captive-core-integration-tests.soroban-rpc.cfg index 8d76504de2..0ce81a7f5c 100644 --- a/services/horizon/docker/captive-core-integration-tests.soroban-rpc.cfg +++ b/services/horizon/docker/captive-core-integration-tests.soroban-rpc.cfg @@ -1,4 +1,4 @@ -EXPERIMENTAL_BUCKETLIST_DB = true +DEPRECATED_SQL_LEDGER_STATE=false PEER_PORT=11725 ARTIFICIALLY_ACCELERATE_TIME_FOR_TESTING=true diff --git a/services/horizon/internal/docs/GUIDE_FOR_DEVELOPERS.md b/services/horizon/internal/docs/GUIDE_FOR_DEVELOPERS.md index cdbc1b97c7..9464aea2a5 100644 --- a/services/horizon/internal/docs/GUIDE_FOR_DEVELOPERS.md +++ b/services/horizon/internal/docs/GUIDE_FOR_DEVELOPERS.md @@ -171,14 +171,14 @@ By default, the Docker Compose file is configured to use version 21 of Protocol ```bash export PROTOCOL_VERSION="21" export CORE_IMAGE="stellar/stellar-core:21" -export STELLAR_CORE_VERSION="21.0.0-1872.c6f474133.focal" +export STELLAR_CORE_VERSION="21.3.1-2007.4ede19620.focal" ``` Example: Runs Stellar Protocol and Core version 21, for any mode of testnet, standalone, pubnet ```bash -PROTOCOL_VERSION=21 CORE_IMAGE=stellar/stellar-core:21 STELLAR_CORE_VERSION=21.0.0-1872.c6f474133.focal ./start.sh [standalone|pubnet] +PROTOCOL_VERSION=21 CORE_IMAGE=stellar/stellar-core:21 STELLAR_CORE_VERSION=21.3.1-2007.4ede19620.focal ./start.sh [standalone|pubnet] ``` ## **Logging** diff --git a/services/horizon/internal/docs/TESTING_NOTES.md b/services/horizon/internal/docs/TESTING_NOTES.md index c7db9cd465..4dcc5ce6f2 100644 --- a/services/horizon/internal/docs/TESTING_NOTES.md +++ b/services/horizon/internal/docs/TESTING_NOTES.md @@ -16,7 +16,7 @@ Before running integration tests, you also need to set some environment variable ```bash export HORIZON_INTEGRATION_TESTS_ENABLED=true export HORIZON_INTEGRATION_TESTS_CORE_MAX_SUPPORTED_PROTOCOL=21 -export HORIZON_INTEGRATION_TESTS_DOCKER_IMG=stellar/stellar-core:21.0.0-1872.c6f474133.focal +export HORIZON_INTEGRATION_TESTS_DOCKER_IMG=stellar/stellar-core:21.3.1-2007.4ede19620.focal ``` Make sure to check [horizon.yml](/.github/workflows/horizon.yml) for the latest core image version. From 2349c8fbd5f3d51df3c81b2d25f0c8e93f579cc0 Mon Sep 17 00:00:00 2001 From: tamirms Date: Fri, 23 Aug 2024 23:31:37 +0100 Subject: [PATCH 5/5] services/horizon/internal/db2/history: Insert and query rows from history lookup tables with one query (#5415) --- .../internal/db2/history/account_loader.go | 336 ++++++++++++------ .../db2/history/account_loader_test.go | 33 +- .../internal/db2/history/asset_loader.go | 207 +++-------- .../internal/db2/history/asset_loader_test.go | 56 ++- .../db2/history/claimable_balance_loader.go | 151 +------- .../history/claimable_balance_loader_test.go | 41 ++- .../db2/history/liquidity_pool_loader.go | 151 +------- .../db2/history/liquidity_pool_loader_test.go | 37 +- 8 files changed, 466 insertions(+), 546 deletions(-) diff --git a/services/horizon/internal/db2/history/account_loader.go b/services/horizon/internal/db2/history/account_loader.go index e7e7e90854..9e15920609 100644 --- a/services/horizon/internal/db2/history/account_loader.go +++ b/services/horizon/internal/db2/history/account_loader.go @@ -1,6 +1,7 @@ package history import ( + "cmp" "context" "database/sql/driver" "fmt" @@ -12,37 +13,29 @@ import ( "github.com/stellar/go/support/collections/set" "github.com/stellar/go/support/db" "github.com/stellar/go/support/errors" - "github.com/stellar/go/support/ordered" ) +var errSealed = errors.New("cannot register more entries to Loader after calling Exec()") + +// LoaderStats describes the result of executing a history lookup id Loader +type LoaderStats struct { + // Total is the number of elements registered to the Loader + Total int + // Inserted is the number of elements inserted into the lookup table + Inserted int +} + // FutureAccountID represents a future history account. // A FutureAccountID is created by an AccountLoader and // the account id is available after calling Exec() on // the AccountLoader. -type FutureAccountID struct { - address string - loader *AccountLoader -} - -const loaderLookupBatchSize = 50000 - -// Value implements the database/sql/driver Valuer interface. -func (a FutureAccountID) Value() (driver.Value, error) { - return a.loader.GetNow(a.address) -} +type FutureAccountID = future[string, Account] // AccountLoader will map account addresses to their history // account ids. If there is no existing mapping for a given address, // the AccountLoader will insert into the history_accounts table to // establish a mapping. -type AccountLoader struct { - sealed bool - set set.Set[string] - ids map[string]int64 - stats LoaderStats -} - -var errSealed = errors.New("cannot register more entries to loader after calling Exec()") +type AccountLoader = loader[string, Account] // NewAccountLoader will construct a new AccountLoader instance. func NewAccountLoader() *AccountLoader { @@ -51,141 +44,222 @@ func NewAccountLoader() *AccountLoader { set: set.Set[string]{}, ids: map[string]int64{}, stats: LoaderStats{}, + name: "AccountLoader", + table: "history_accounts", + columnsForKeys: func(addresses []string) []columnValues { + return []columnValues{ + { + name: "address", + dbType: "character varying(64)", + objects: addresses, + }, + } + }, + mappingFromRow: func(account Account) (string, int64) { + return account.Address, account.ID + }, + less: cmp.Less[string], } } -// GetFuture registers the given account address into the loader and -// returns a FutureAccountID which will hold the history account id for -// the address after Exec() is called. -func (a *AccountLoader) GetFuture(address string) FutureAccountID { - if a.sealed { +type loader[K comparable, T any] struct { + sealed bool + set set.Set[K] + ids map[K]int64 + stats LoaderStats + name string + table string + columnsForKeys func([]K) []columnValues + mappingFromRow func(T) (K, int64) + less func(K, K) bool +} + +type future[K comparable, T any] struct { + key K + loader *loader[K, T] +} + +// Value implements the database/sql/driver Valuer interface. +func (f future[K, T]) Value() (driver.Value, error) { + return f.loader.GetNow(f.key) +} + +// GetFuture registers the given key into the Loader and +// returns a future which will hold the history id for +// the key after Exec() is called. +func (l *loader[K, T]) GetFuture(key K) future[K, T] { + if l.sealed { panic(errSealed) } - a.set.Add(address) - return FutureAccountID{ - address: address, - loader: a, + l.set.Add(key) + return future[K, T]{ + key: key, + loader: l, } } -// GetNow returns the history account id for the given address. +// GetNow returns the history id for the given key. // GetNow should only be called on values which were registered by // GetFuture() calls. Also, Exec() must be called before any GetNow // call can succeed. -func (a *AccountLoader) GetNow(address string) (int64, error) { - if !a.sealed { - return 0, fmt.Errorf(`invalid account loader state, - Exec was not called yet to properly seal and resolve %v id`, address) +func (l *loader[K, T]) GetNow(key K) (int64, error) { + if !l.sealed { + return 0, fmt.Errorf(`invalid loader state, + Exec was not called yet to properly seal and resolve %v id`, key) } - if internalID, ok := a.ids[address]; !ok { - return 0, fmt.Errorf(`account loader address %q was not found`, address) + if internalID, ok := l.ids[key]; !ok { + return 0, fmt.Errorf(`loader key %v was not found`, key) } else { return internalID, nil } } -func (a *AccountLoader) lookupKeys(ctx context.Context, q *Q, addresses []string) error { - for i := 0; i < len(addresses); i += loaderLookupBatchSize { - end := ordered.Min(len(addresses), i+loaderLookupBatchSize) +// Exec will look up all the history ids for the keys registered in the Loader. +// If there are no history ids for a given set of keys, Exec will insert rows +// into the corresponding history table to establish a mapping between each key and its history id. +func (l *loader[K, T]) Exec(ctx context.Context, session db.SessionInterface) error { + l.sealed = true + if len(l.set) == 0 { + return nil + } + q := &Q{session} + keys := make([]K, 0, len(l.set)) + for key := range l.set { + keys = append(keys, key) + } + // sort entries before inserting rows to prevent deadlocks on acquiring a ShareLock + // https://github.com/stellar/go/issues/2370 + sort.Slice(keys, func(i, j int) bool { + return l.less(keys[i], keys[j]) + }) - var accounts []Account - if err := q.AccountsByAddresses(ctx, &accounts, addresses[i:end]); err != nil { - return errors.Wrap(err, "could not select accounts") - } + if count, err := l.insert(ctx, q, keys); err != nil { + return err + } else { + l.stats.Total += count + l.stats.Inserted += count + } - for _, account := range accounts { - a.ids[account.Address] = account.ID - } + if count, err := l.query(ctx, q, keys); err != nil { + return err + } else { + l.stats.Total += count } + return nil } -// LoaderStats describes the result of executing a history lookup id loader -type LoaderStats struct { - // Total is the number of elements registered to the loader - Total int - // Inserted is the number of elements inserted into the lookup table - Inserted int +// Stats returns the number of addresses registered in the Loader and the number of rows +// inserted into the history table. +func (l *loader[K, T]) Stats() LoaderStats { + return l.stats } -// Exec will look up all the history account ids for the addresses registered in the loader. -// If there are no history account ids for a given set of addresses, Exec will insert rows -// into the history_accounts table to establish a mapping between address and history account id. -func (a *AccountLoader) Exec(ctx context.Context, session db.SessionInterface) error { - a.sealed = true - if len(a.set) == 0 { - return nil - } - q := &Q{session} - addresses := make([]string, 0, len(a.set)) - for address := range a.set { - addresses = append(addresses, address) - } +func (l *loader[K, T]) Name() string { + return l.name +} - if err := a.lookupKeys(ctx, q, addresses); err != nil { - return err +func (l *loader[K, T]) filter(keys []K) []K { + if len(l.ids) == 0 { + return keys } - a.stats.Total += len(addresses) - insert := 0 - for _, address := range addresses { - if _, ok := a.ids[address]; ok { + remaining := make([]K, 0, len(keys)) + for _, key := range keys { + if _, ok := l.ids[key]; ok { continue } - addresses[insert] = address - insert++ + remaining = append(remaining, key) } - if insert == 0 { - return nil + return remaining +} + +func (l *loader[K, T]) updateMap(rows []T) { + for _, row := range rows { + key, id := l.mappingFromRow(row) + l.ids[key] = id + } +} + +func (l *loader[K, T]) insert(ctx context.Context, q *Q, keys []K) (int, error) { + keys = l.filter(keys) + if len(keys) == 0 { + return 0, nil } - addresses = addresses[:insert] - // sort entries before inserting rows to prevent deadlocks on acquiring a ShareLock - // https://github.com/stellar/go/issues/2370 - sort.Strings(addresses) + var rows []T err := bulkInsert( ctx, q, - "history_accounts", - []string{"address"}, - []bulkInsertField{ - { - name: "address", - dbType: "character varying(64)", - objects: addresses, - }, - }, + l.table, + l.columnsForKeys(keys), + &rows, ) if err != nil { - return err + return 0, err } - a.stats.Inserted += insert - return a.lookupKeys(ctx, q, addresses) + l.updateMap(rows) + return len(rows), nil } -// Stats returns the number of addresses registered in the loader and the number of addresses -// inserted into the history_accounts table. -func (a *AccountLoader) Stats() LoaderStats { - return a.stats -} +func (l *loader[K, T]) query(ctx context.Context, q *Q, keys []K) (int, error) { + keys = l.filter(keys) + if len(keys) == 0 { + return 0, nil + } -func (a *AccountLoader) Name() string { - return "AccountLoader" + var rows []T + err := bulkGet( + ctx, + q, + l.table, + l.columnsForKeys(keys), + &rows, + ) + if err != nil { + return 0, err + } + + l.updateMap(rows) + return len(rows), nil } -type bulkInsertField struct { +type columnValues struct { name string dbType string objects []string } -func bulkInsert(ctx context.Context, q *Q, table string, conflictFields []string, fields []bulkInsertField) error { +func bulkInsert(ctx context.Context, q *Q, table string, fields []columnValues, response interface{}) error { unnestPart := make([]string, 0, len(fields)) insertFieldsPart := make([]string, 0, len(fields)) pqArrays := make([]interface{}, 0, len(fields)) + // In the code below we are building the bulk insert query which looks like: + // + // WITH rows AS + // (SELECT + // /* unnestPart */ + // unnest(?::type1[]), /* field1 */ + // unnest(?::type2[]), /* field2 */ + // ... + // ) + // INSERT INTO table ( + // /* insertFieldsPart */ + // field1, + // field2, + // ... + // ) + // SELECT * FROM rows ON CONFLICT (field1, field2, ...) DO NOTHING RETURNING * + // + // Using unnest allows to get around the maximum limit of 65,535 query parameters, + // see https://www.postgresql.org/docs/12/limits.html and + // https://klotzandrew.com/blog/postgres-passing-65535-parameter-limit/ + // + // Without using unnest we would have to use multiple insert statements to insert + // all the rows for large datasets. for _, field := range fields { unnestPart = append( unnestPart, @@ -200,21 +274,69 @@ func bulkInsert(ctx context.Context, q *Q, table string, conflictFields []string pq.Array(field.objects), ) } + columns := strings.Join(insertFieldsPart, ",") sql := ` - WITH r AS + WITH rows AS (SELECT ` + strings.Join(unnestPart, ",") + `) INSERT INTO ` + table + ` - (` + strings.Join(insertFieldsPart, ",") + `) - SELECT * from r - ON CONFLICT (` + strings.Join(conflictFields, ",") + `) DO NOTHING` + (` + columns + `) + SELECT * FROM rows + ON CONFLICT (` + columns + `) DO NOTHING + RETURNING *` + + return q.SelectRaw( + ctx, + response, + sql, + pqArrays..., + ) +} + +func bulkGet(ctx context.Context, q *Q, table string, fields []columnValues, response interface{}) error { + unnestPart := make([]string, 0, len(fields)) + columns := make([]string, 0, len(fields)) + pqArrays := make([]interface{}, 0, len(fields)) + + // In the code below we are building the bulk get query which looks like: + // + // SELECT * FROM table WHERE (field1, field2, ...) IN + // (SELECT + // /* unnestPart */ + // unnest(?::type1[]), /* field1 */ + // unnest(?::type2[]), /* field2 */ + // ... + // ) + // + // Using unnest allows to get around the maximum limit of 65,535 query parameters, + // see https://www.postgresql.org/docs/12/limits.html and + // https://klotzandrew.com/blog/postgres-passing-65535-parameter-limit/ + // + // Without using unnest we would have to use multiple select statements to obtain + // all the rows for large datasets. + for _, field := range fields { + unnestPart = append( + unnestPart, + fmt.Sprintf("unnest(?::%s[]) /* %s */", field.dbType, field.name), + ) + columns = append( + columns, + field.name, + ) + pqArrays = append( + pqArrays, + pq.Array(field.objects), + ) + } + sql := `SELECT * FROM ` + table + ` WHERE (` + strings.Join(columns, ",") + `) IN + (SELECT ` + strings.Join(unnestPart, ",") + `)` - _, err := q.ExecRaw( - context.WithValue(ctx, &db.QueryTypeContextKey, db.UpsertQueryType), + return q.SelectRaw( + ctx, + response, sql, pqArrays..., ) - return err } // AccountLoaderStub is a stub wrapper around AccountLoader which allows diff --git a/services/horizon/internal/db2/history/account_loader_test.go b/services/horizon/internal/db2/history/account_loader_test.go index ed30b43bd9..9a9fb30445 100644 --- a/services/horizon/internal/db2/history/account_loader_test.go +++ b/services/horizon/internal/db2/history/account_loader_test.go @@ -26,7 +26,7 @@ func TestAccountLoader(t *testing.T) { future := loader.GetFuture(address) _, err := future.Value() assert.Error(t, err) - assert.Contains(t, err.Error(), `invalid account loader state,`) + assert.Contains(t, err.Error(), `invalid loader state,`) duplicateFuture := loader.GetFuture(address) assert.Equal(t, future, duplicateFuture) } @@ -55,4 +55,35 @@ func TestAccountLoader(t *testing.T) { _, err = loader.GetNow("not present") assert.Error(t, err) assert.Contains(t, err.Error(), `was not found`) + + // check that Loader works when all the previous values are already + // present in the db and also add 10 more rows to insert + loader = NewAccountLoader() + for i := 0; i < 10; i++ { + addresses = append(addresses, keypair.MustRandom().Address()) + } + + for _, address := range addresses { + future := loader.GetFuture(address) + _, err = future.Value() + assert.Error(t, err) + assert.Contains(t, err.Error(), `invalid loader state,`) + } + + assert.NoError(t, loader.Exec(context.Background(), session)) + assert.Equal(t, LoaderStats{ + Total: 110, + Inserted: 10, + }, loader.Stats()) + + for _, address := range addresses { + var internalId int64 + internalId, err = loader.GetNow(address) + assert.NoError(t, err) + var account Account + assert.NoError(t, q.AccountByAddress(context.Background(), &account, address)) + assert.Equal(t, account.ID, internalId) + assert.Equal(t, account.Address, address) + } + } diff --git a/services/horizon/internal/db2/history/asset_loader.go b/services/horizon/internal/db2/history/asset_loader.go index fe17dc17be..cdd2a0d714 100644 --- a/services/horizon/internal/db2/history/asset_loader.go +++ b/services/horizon/internal/db2/history/asset_loader.go @@ -1,16 +1,9 @@ package history import ( - "context" - "database/sql/driver" - "fmt" - "sort" "strings" "github.com/stellar/go/support/collections/set" - "github.com/stellar/go/support/db" - "github.com/stellar/go/support/errors" - "github.com/stellar/go/support/ordered" "github.com/stellar/go/xdr" ) @@ -40,26 +33,13 @@ func AssetKeyFromXDR(asset xdr.Asset) AssetKey { // A FutureAssetID is created by an AssetLoader and // the asset id is available after calling Exec() on // the AssetLoader. -type FutureAssetID struct { - asset AssetKey - loader *AssetLoader -} - -// Value implements the database/sql/driver Valuer interface. -func (a FutureAssetID) Value() (driver.Value, error) { - return a.loader.GetNow(a.asset) -} +type FutureAssetID = future[AssetKey, Asset] // AssetLoader will map assets to their history // asset ids. If there is no existing mapping for a given sset, // the AssetLoader will insert into the history_assets table to // establish a mapping. -type AssetLoader struct { - sealed bool - set set.Set[AssetKey] - ids map[AssetKey]int64 - stats LoaderStats -} +type AssetLoader = loader[AssetKey, Asset] // NewAssetLoader will construct a new AssetLoader instance. func NewAssetLoader() *AssetLoader { @@ -68,152 +48,47 @@ func NewAssetLoader() *AssetLoader { set: set.Set[AssetKey]{}, ids: map[AssetKey]int64{}, stats: LoaderStats{}, - } -} - -// GetFuture registers the given asset into the loader and -// returns a FutureAssetID which will hold the history asset id for -// the asset after Exec() is called. -func (a *AssetLoader) GetFuture(asset AssetKey) FutureAssetID { - if a.sealed { - panic(errSealed) - } - a.set.Add(asset) - return FutureAssetID{ - asset: asset, - loader: a, - } -} - -// GetNow returns the history asset id for the given asset. -// GetNow should only be called on values which were registered by -// GetFuture() calls. Also, Exec() must be called before any GetNow -// call can succeed. -func (a *AssetLoader) GetNow(asset AssetKey) (int64, error) { - if !a.sealed { - return 0, fmt.Errorf(`invalid asset loader state, - Exec was not called yet to properly seal and resolve %v id`, asset) - } - if internalID, ok := a.ids[asset]; !ok { - return 0, fmt.Errorf(`asset loader id %v was not found`, asset) - } else { - return internalID, nil - } -} - -func (a *AssetLoader) lookupKeys(ctx context.Context, q *Q, keys []AssetKey) error { - var rows []Asset - for i := 0; i < len(keys); i += loaderLookupBatchSize { - end := ordered.Min(len(keys), i+loaderLookupBatchSize) - subset := keys[i:end] - args := make([]interface{}, 0, 3*len(subset)) - placeHolders := make([]string, 0, len(subset)) - for _, key := range subset { - args = append(args, key.Code, key.Type, key.Issuer) - placeHolders = append(placeHolders, "(?, ?, ?)") - } - rawSQL := fmt.Sprintf( - "SELECT * FROM history_assets WHERE (asset_code, asset_type, asset_issuer) in (%s)", - strings.Join(placeHolders, ", "), - ) - err := q.SelectRaw(ctx, &rows, rawSQL, args...) - if err != nil { - return errors.Wrap(err, "could not select assets") - } - - for _, row := range rows { - a.ids[AssetKey{ - Type: row.Type, - Code: row.Code, - Issuer: row.Issuer, - }] = row.ID - } - } - return nil -} - -// Exec will look up all the history asset ids for the assets registered in the loader. -// If there are no history asset ids for a given set of assets, Exec will insert rows -// into the history_assets table. -func (a *AssetLoader) Exec(ctx context.Context, session db.SessionInterface) error { - a.sealed = true - if len(a.set) == 0 { - return nil - } - q := &Q{session} - keys := make([]AssetKey, 0, len(a.set)) - for key := range a.set { - keys = append(keys, key) - } - - if err := a.lookupKeys(ctx, q, keys); err != nil { - return err - } - a.stats.Total += len(keys) - - assetTypes := make([]string, 0, len(a.set)-len(a.ids)) - assetCodes := make([]string, 0, len(a.set)-len(a.ids)) - assetIssuers := make([]string, 0, len(a.set)-len(a.ids)) - // sort entries before inserting rows to prevent deadlocks on acquiring a ShareLock - // https://github.com/stellar/go/issues/2370 - sort.Slice(keys, func(i, j int) bool { - return keys[i].String() < keys[j].String() - }) - insert := 0 - for _, key := range keys { - if _, ok := a.ids[key]; ok { - continue - } - assetTypes = append(assetTypes, key.Type) - assetCodes = append(assetCodes, key.Code) - assetIssuers = append(assetIssuers, key.Issuer) - keys[insert] = key - insert++ - } - if insert == 0 { - return nil - } - keys = keys[:insert] - - err := bulkInsert( - ctx, - q, - "history_assets", - []string{"asset_code", "asset_type", "asset_issuer"}, - []bulkInsertField{ - { - name: "asset_code", - dbType: "character varying(12)", - objects: assetCodes, - }, - { - name: "asset_issuer", - dbType: "character varying(56)", - objects: assetIssuers, - }, - { - name: "asset_type", - dbType: "character varying(64)", - objects: assetTypes, - }, + name: "AssetLoader", + table: "history_assets", + columnsForKeys: func(keys []AssetKey) []columnValues { + assetTypes := make([]string, 0, len(keys)) + assetCodes := make([]string, 0, len(keys)) + assetIssuers := make([]string, 0, len(keys)) + for _, key := range keys { + assetTypes = append(assetTypes, key.Type) + assetCodes = append(assetCodes, key.Code) + assetIssuers = append(assetIssuers, key.Issuer) + } + + return []columnValues{ + { + name: "asset_code", + dbType: "character varying(12)", + objects: assetCodes, + }, + { + name: "asset_type", + dbType: "character varying(64)", + objects: assetTypes, + }, + { + name: "asset_issuer", + dbType: "character varying(56)", + objects: assetIssuers, + }, + } + }, + mappingFromRow: func(asset Asset) (AssetKey, int64) { + return AssetKey{ + Type: asset.Type, + Code: asset.Code, + Issuer: asset.Issuer, + }, asset.ID + }, + less: func(a AssetKey, b AssetKey) bool { + return a.String() < b.String() }, - ) - if err != nil { - return err } - a.stats.Inserted += insert - - return a.lookupKeys(ctx, q, keys) -} - -// Stats returns the number of assets registered in the loader and the number of assets -// inserted into the history_assets table. -func (a *AssetLoader) Stats() LoaderStats { - return a.stats -} - -func (a *AssetLoader) Name() string { - return "AssetLoader" } // AssetLoaderStub is a stub wrapper around AssetLoader which allows diff --git a/services/horizon/internal/db2/history/asset_loader_test.go b/services/horizon/internal/db2/history/asset_loader_test.go index f097561e4a..ca65cebb7e 100644 --- a/services/horizon/internal/db2/history/asset_loader_test.go +++ b/services/horizon/internal/db2/history/asset_loader_test.go @@ -71,7 +71,7 @@ func TestAssetLoader(t *testing.T) { future := loader.GetFuture(key) _, err := future.Value() assert.Error(t, err) - assert.Contains(t, err.Error(), `invalid asset loader state,`) + assert.Contains(t, err.Error(), `invalid loader state,`) duplicateFuture := loader.GetFuture(key) assert.Equal(t, future, duplicateFuture) } @@ -106,4 +106,58 @@ func TestAssetLoader(t *testing.T) { _, err = loader.GetNow(AssetKey{}) assert.Error(t, err) assert.Contains(t, err.Error(), `was not found`) + + // check that Loader works when all the previous values are already + // present in the db and also add 10 more rows to insert + loader = NewAssetLoader() + for i := 0; i < 10; i++ { + var key AssetKey + if i%2 == 0 { + code := [4]byte{0, 0, 0, 0} + copy(code[:], fmt.Sprintf("ab%d", i)) + key = AssetKeyFromXDR(xdr.Asset{ + Type: xdr.AssetTypeAssetTypeCreditAlphanum4, + AlphaNum4: &xdr.AlphaNum4{ + AssetCode: code, + Issuer: xdr.MustAddress(keypair.MustRandom().Address())}}) + } else { + code := [12]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} + copy(code[:], fmt.Sprintf("abcdef%d", i)) + key = AssetKeyFromXDR(xdr.Asset{ + Type: xdr.AssetTypeAssetTypeCreditAlphanum12, + AlphaNum12: &xdr.AlphaNum12{ + AssetCode: code, + Issuer: xdr.MustAddress(keypair.MustRandom().Address())}}) + + } + keys = append(keys, key) + } + + for _, key := range keys { + future := loader.GetFuture(key) + _, err = future.Value() + assert.Error(t, err) + assert.Contains(t, err.Error(), `invalid loader state,`) + } + assert.NoError(t, loader.Exec(context.Background(), session)) + assert.Equal(t, LoaderStats{ + Total: 110, + Inserted: 10, + }, loader.Stats()) + + for _, key := range keys { + var internalID int64 + internalID, err = loader.GetNow(key) + assert.NoError(t, err) + var assetXDR xdr.Asset + if key.Type == "native" { + assetXDR = xdr.MustNewNativeAsset() + } else { + assetXDR = xdr.MustNewCreditAsset(key.Code, key.Issuer) + } + var assetID int64 + assetID, err = q.GetAssetID(context.Background(), assetXDR) + assert.NoError(t, err) + assert.Equal(t, assetID, internalID) + } } diff --git a/services/horizon/internal/db2/history/claimable_balance_loader.go b/services/horizon/internal/db2/history/claimable_balance_loader.go index ef18683cb6..f775ea4b24 100644 --- a/services/horizon/internal/db2/history/claimable_balance_loader.go +++ b/services/horizon/internal/db2/history/claimable_balance_loader.go @@ -1,41 +1,22 @@ package history import ( - "context" - "database/sql/driver" - "fmt" - "sort" + "cmp" "github.com/stellar/go/support/collections/set" - "github.com/stellar/go/support/db" - "github.com/stellar/go/support/errors" - "github.com/stellar/go/support/ordered" ) // FutureClaimableBalanceID represents a future history claimable balance. // A FutureClaimableBalanceID is created by a ClaimableBalanceLoader and // the claimable balance id is available after calling Exec() on // the ClaimableBalanceLoader. -type FutureClaimableBalanceID struct { - id string - loader *ClaimableBalanceLoader -} - -// Value implements the database/sql/driver Valuer interface. -func (a FutureClaimableBalanceID) Value() (driver.Value, error) { - return a.loader.getNow(a.id) -} +type FutureClaimableBalanceID = future[string, HistoryClaimableBalance] // ClaimableBalanceLoader will map claimable balance ids to their internal // history ids. If there is no existing mapping for a given claimable balance id, // the ClaimableBalanceLoader will insert into the history_claimable_balances table to // establish a mapping. -type ClaimableBalanceLoader struct { - sealed bool - set set.Set[string] - ids map[string]int64 - stats LoaderStats -} +type ClaimableBalanceLoader = loader[string, HistoryClaimableBalance] // NewClaimableBalanceLoader will construct a new ClaimableBalanceLoader instance. func NewClaimableBalanceLoader() *ClaimableBalanceLoader { @@ -44,118 +25,20 @@ func NewClaimableBalanceLoader() *ClaimableBalanceLoader { set: set.Set[string]{}, ids: map[string]int64{}, stats: LoaderStats{}, - } -} - -// GetFuture registers the given claimable balance into the loader and -// returns a FutureClaimableBalanceID which will hold the internal history id for -// the claimable balance after Exec() is called. -func (a *ClaimableBalanceLoader) GetFuture(id string) FutureClaimableBalanceID { - if a.sealed { - panic(errSealed) - } - - a.set.Add(id) - return FutureClaimableBalanceID{ - id: id, - loader: a, - } -} - -// getNow returns the internal history id for the given claimable balance. -// getNow should only be called on values which were registered by -// GetFuture() calls. Also, Exec() must be called before any getNow -// call can succeed. -func (a *ClaimableBalanceLoader) getNow(id string) (int64, error) { - if !a.sealed { - return 0, fmt.Errorf(`invalid claimable balance loader state, - Exec was not called yet to properly seal and resolve %v id`, id) - } - if internalID, ok := a.ids[id]; !ok { - return 0, fmt.Errorf(`claimable balance loader id %q was not found`, id) - } else { - return internalID, nil - } -} - -func (a *ClaimableBalanceLoader) lookupKeys(ctx context.Context, q *Q, ids []string) error { - for i := 0; i < len(ids); i += loaderLookupBatchSize { - end := ordered.Min(len(ids), i+loaderLookupBatchSize) - - cbs, err := q.ClaimableBalancesByIDs(ctx, ids[i:end]) - if err != nil { - return errors.Wrap(err, "could not select claimable balances") - } - - for _, cb := range cbs { - a.ids[cb.BalanceID] = cb.InternalID - } - } - return nil -} - -// Exec will look up all the internal history ids for the claimable balances registered in the loader. -// If there are no internal ids for a given set of claimable balances, Exec will insert rows -// into the history_claimable_balances table. -func (a *ClaimableBalanceLoader) Exec(ctx context.Context, session db.SessionInterface) error { - a.sealed = true - if len(a.set) == 0 { - return nil - } - q := &Q{session} - ids := make([]string, 0, len(a.set)) - for id := range a.set { - ids = append(ids, id) - } - - if err := a.lookupKeys(ctx, q, ids); err != nil { - return err - } - a.stats.Total += len(ids) - - insert := 0 - for _, id := range ids { - if _, ok := a.ids[id]; ok { - continue - } - ids[insert] = id - insert++ - } - if insert == 0 { - return nil - } - ids = ids[:insert] - // sort entries before inserting rows to prevent deadlocks on acquiring a ShareLock - // https://github.com/stellar/go/issues/2370 - sort.Strings(ids) - - err := bulkInsert( - ctx, - q, - "history_claimable_balances", - []string{"claimable_balance_id"}, - []bulkInsertField{ - { - name: "claimable_balance_id", - dbType: "text", - objects: ids, - }, + name: "ClaimableBalanceLoader", + table: "history_claimable_balances", + columnsForKeys: func(keys []string) []columnValues { + return []columnValues{ + { + name: "claimable_balance_id", + dbType: "text", + objects: keys, + }, + } }, - ) - if err != nil { - return err + mappingFromRow: func(row HistoryClaimableBalance) (string, int64) { + return row.BalanceID, row.InternalID + }, + less: cmp.Less[string], } - a.stats.Inserted += insert - - return a.lookupKeys(ctx, q, ids) -} - -// Stats returns the number of claimable balances registered in the loader and the number of claimable balances -// inserted into the history_claimable_balances table. -func (a *ClaimableBalanceLoader) Stats() LoaderStats { - return a.stats -} - -func (a *ClaimableBalanceLoader) Name() string { - return "ClaimableBalanceLoader" } diff --git a/services/horizon/internal/db2/history/claimable_balance_loader_test.go b/services/horizon/internal/db2/history/claimable_balance_loader_test.go index aaf91ccdcc..f5759015c7 100644 --- a/services/horizon/internal/db2/history/claimable_balance_loader_test.go +++ b/services/horizon/internal/db2/history/claimable_balance_loader_test.go @@ -35,7 +35,7 @@ func TestClaimableBalanceLoader(t *testing.T) { futures = append(futures, future) _, err := future.Value() assert.Error(t, err) - assert.Contains(t, err.Error(), `invalid claimable balance loader state,`) + assert.Contains(t, err.Error(), `invalid loader state,`) duplicateFuture := loader.GetFuture(id) assert.Equal(t, future, duplicateFuture) } @@ -63,8 +63,45 @@ func TestClaimableBalanceLoader(t *testing.T) { assert.Equal(t, cb.InternalID, internalID) } - futureCb := &FutureClaimableBalanceID{id: "not-present", loader: loader} + futureCb := &FutureClaimableBalanceID{key: "not-present", loader: loader} _, err = futureCb.Value() assert.Error(t, err) assert.Contains(t, err.Error(), `was not found`) + + // check that Loader works when all the previous values are already + // present in the db and also add 10 more rows to insert + loader = NewClaimableBalanceLoader() + for i := 100; i < 110; i++ { + balanceID := xdr.ClaimableBalanceId{ + Type: xdr.ClaimableBalanceIdTypeClaimableBalanceIdTypeV0, + V0: &xdr.Hash{byte(i)}, + } + var id string + id, err = xdr.MarshalHex(balanceID) + tt.Assert.NoError(err) + ids = append(ids, id) + } + + for _, id := range ids { + future := loader.GetFuture(id) + _, err = future.Value() + assert.Error(t, err) + assert.Contains(t, err.Error(), `invalid loader state,`) + } + + assert.NoError(t, loader.Exec(context.Background(), session)) + assert.Equal(t, LoaderStats{ + Total: 110, + Inserted: 10, + }, loader.Stats()) + + for _, id := range ids { + internalID, err := loader.GetNow(id) + assert.NoError(t, err) + var cb HistoryClaimableBalance + cb, err = q.ClaimableBalanceByID(context.Background(), id) + assert.NoError(t, err) + assert.Equal(t, cb.BalanceID, id) + assert.Equal(t, cb.InternalID, internalID) + } } diff --git a/services/horizon/internal/db2/history/liquidity_pool_loader.go b/services/horizon/internal/db2/history/liquidity_pool_loader.go index d619fa3bb4..a03caaa988 100644 --- a/services/horizon/internal/db2/history/liquidity_pool_loader.go +++ b/services/horizon/internal/db2/history/liquidity_pool_loader.go @@ -1,41 +1,22 @@ package history import ( - "context" - "database/sql/driver" - "fmt" - "sort" + "cmp" "github.com/stellar/go/support/collections/set" - "github.com/stellar/go/support/db" - "github.com/stellar/go/support/errors" - "github.com/stellar/go/support/ordered" ) // FutureLiquidityPoolID represents a future history liquidity pool. // A FutureLiquidityPoolID is created by an LiquidityPoolLoader and // the liquidity pool id is available after calling Exec() on // the LiquidityPoolLoader. -type FutureLiquidityPoolID struct { - id string - loader *LiquidityPoolLoader -} - -// Value implements the database/sql/driver Valuer interface. -func (a FutureLiquidityPoolID) Value() (driver.Value, error) { - return a.loader.GetNow(a.id) -} +type FutureLiquidityPoolID = future[string, HistoryLiquidityPool] // LiquidityPoolLoader will map liquidity pools to their internal // history ids. If there is no existing mapping for a given liquidity pool, // the LiquidityPoolLoader will insert into the history_liquidity_pools table to // establish a mapping. -type LiquidityPoolLoader struct { - sealed bool - set set.Set[string] - ids map[string]int64 - stats LoaderStats -} +type LiquidityPoolLoader = loader[string, HistoryLiquidityPool] // NewLiquidityPoolLoader will construct a new LiquidityPoolLoader instance. func NewLiquidityPoolLoader() *LiquidityPoolLoader { @@ -44,120 +25,22 @@ func NewLiquidityPoolLoader() *LiquidityPoolLoader { set: set.Set[string]{}, ids: map[string]int64{}, stats: LoaderStats{}, - } -} - -// GetFuture registers the given liquidity pool into the loader and -// returns a FutureLiquidityPoolID which will hold the internal history id for -// the liquidity pool after Exec() is called. -func (a *LiquidityPoolLoader) GetFuture(id string) FutureLiquidityPoolID { - if a.sealed { - panic(errSealed) - } - - a.set.Add(id) - return FutureLiquidityPoolID{ - id: id, - loader: a, - } -} - -// GetNow returns the internal history id for the given liquidity pool. -// GetNow should only be called on values which were registered by -// GetFuture() calls. Also, Exec() must be called before any GetNow -// call can succeed. -func (a *LiquidityPoolLoader) GetNow(id string) (int64, error) { - if !a.sealed { - return 0, fmt.Errorf(`invalid liquidity pool loader state, - Exec was not called yet to properly seal and resolve %v id`, id) - } - if internalID, ok := a.ids[id]; !ok { - return 0, fmt.Errorf(`liquidity pool loader id %q was not found`, id) - } else { - return internalID, nil - } -} - -func (a *LiquidityPoolLoader) lookupKeys(ctx context.Context, q *Q, ids []string) error { - for i := 0; i < len(ids); i += loaderLookupBatchSize { - end := ordered.Min(len(ids), i+loaderLookupBatchSize) - - lps, err := q.LiquidityPoolsByIDs(ctx, ids[i:end]) - if err != nil { - return errors.Wrap(err, "could not select accounts") - } - - for _, lp := range lps { - a.ids[lp.PoolID] = lp.InternalID - } - } - return nil -} - -// Exec will look up all the internal history ids for the liquidity pools registered in the loader. -// If there are no internal history ids for a given set of liquidity pools, Exec will insert rows -// into the history_liquidity_pools table. -func (a *LiquidityPoolLoader) Exec(ctx context.Context, session db.SessionInterface) error { - a.sealed = true - if len(a.set) == 0 { - return nil - } - q := &Q{session} - ids := make([]string, 0, len(a.set)) - for id := range a.set { - ids = append(ids, id) - } - - if err := a.lookupKeys(ctx, q, ids); err != nil { - return err - } - a.stats.Total += len(ids) - - insert := 0 - for _, id := range ids { - if _, ok := a.ids[id]; ok { - continue - } - ids[insert] = id - insert++ - } - if insert == 0 { - return nil - } - ids = ids[:insert] - // sort entries before inserting rows to prevent deadlocks on acquiring a ShareLock - // https://github.com/stellar/go/issues/2370 - sort.Strings(ids) - - err := bulkInsert( - ctx, - q, - "history_liquidity_pools", - []string{"liquidity_pool_id"}, - []bulkInsertField{ - { - name: "liquidity_pool_id", - dbType: "text", - objects: ids, - }, + name: "LiquidityPoolLoader", + table: "history_liquidity_pools", + columnsForKeys: func(keys []string) []columnValues { + return []columnValues{ + { + name: "liquidity_pool_id", + dbType: "text", + objects: keys, + }, + } }, - ) - if err != nil { - return err + mappingFromRow: func(row HistoryLiquidityPool) (string, int64) { + return row.PoolID, row.InternalID + }, + less: cmp.Less[string], } - a.stats.Inserted += insert - - return a.lookupKeys(ctx, q, ids) -} - -// Stats returns the number of liquidity pools registered in the loader and the number of liquidity pools -// inserted into the history_liquidity_pools table. -func (a *LiquidityPoolLoader) Stats() LoaderStats { - return a.stats -} - -func (a *LiquidityPoolLoader) Name() string { - return "LiquidityPoolLoader" } // LiquidityPoolLoaderStub is a stub wrapper around LiquidityPoolLoader which allows diff --git a/services/horizon/internal/db2/history/liquidity_pool_loader_test.go b/services/horizon/internal/db2/history/liquidity_pool_loader_test.go index 25ca80826c..aec2fcd886 100644 --- a/services/horizon/internal/db2/history/liquidity_pool_loader_test.go +++ b/services/horizon/internal/db2/history/liquidity_pool_loader_test.go @@ -29,7 +29,7 @@ func TestLiquidityPoolLoader(t *testing.T) { future := loader.GetFuture(id) _, err := future.Value() assert.Error(t, err) - assert.Contains(t, err.Error(), `invalid liquidity pool loader state,`) + assert.Contains(t, err.Error(), `invalid loader state,`) duplicateFuture := loader.GetFuture(id) assert.Equal(t, future, duplicateFuture) } @@ -59,4 +59,39 @@ func TestLiquidityPoolLoader(t *testing.T) { _, err = loader.GetNow("not present") assert.Error(t, err) assert.Contains(t, err.Error(), `was not found`) + + // check that Loader works when all the previous values are already + // present in the db and also add 10 more rows to insert + loader = NewLiquidityPoolLoader() + for i := 100; i < 110; i++ { + poolID := xdr.PoolId{byte(i)} + var id string + id, err = xdr.MarshalHex(poolID) + tt.Assert.NoError(err) + ids = append(ids, id) + } + + for _, id := range ids { + future := loader.GetFuture(id) + _, err = future.Value() + assert.Error(t, err) + assert.Contains(t, err.Error(), `invalid loader state,`) + } + + assert.NoError(t, loader.Exec(context.Background(), session)) + assert.Equal(t, LoaderStats{ + Total: 110, + Inserted: 10, + }, loader.Stats()) + + for _, id := range ids { + var internalID int64 + internalID, err = loader.GetNow(id) + assert.NoError(t, err) + var lp HistoryLiquidityPool + lp, err = q.LiquidityPoolByID(context.Background(), id) + assert.NoError(t, err) + assert.Equal(t, lp.PoolID, id) + assert.Equal(t, lp.InternalID, internalID) + } }