Skip to content

Commit

Permalink
reduce code duplication
Browse files Browse the repository at this point in the history
  • Loading branch information
tamirms committed Aug 15, 2024
1 parent c89e633 commit 6e3e306
Show file tree
Hide file tree
Showing 8 changed files with 278 additions and 561 deletions.
262 changes: 163 additions & 99 deletions services/horizon/internal/db2/history/account_loader.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package history

import (
"cmp"
"context"
"database/sql/driver"
"fmt"
Expand All @@ -14,32 +15,47 @@ import (
"github.com/stellar/go/support/errors"
)

var errSealed = errors.New("cannot register more entries to Loader after calling Exec()")

// LoaderStats describes the result of executing a history lookup id Loader
type LoaderStats struct {
// Total is the number of elements registered to the Loader
Total int
// Inserted is the number of elements inserted into the lookup table
Inserted int
}

type historyAccountSchema struct{}

func (historyAccountSchema) table() string {

Check failure on line 30 in services/horizon/internal/db2/history/account_loader.go

View workflow job for this annotation

GitHub Actions / check (ubuntu-22.04, 1.22.1)

func historyAccountSchema.table is unused (U1000)
return "history_accounts"
}

func (historyAccountSchema) columns(addresses []string) []columnValues {

Check failure on line 34 in services/horizon/internal/db2/history/account_loader.go

View workflow job for this annotation

GitHub Actions / check (ubuntu-22.04, 1.22.1)

func historyAccountSchema.columns is unused (U1000)
return []columnValues{
{
name: "address",
dbType: "character varying(64)",
objects: addresses,
},
}
}

func (historyAccountSchema) extract(account Account) (string, int64) {

Check failure on line 44 in services/horizon/internal/db2/history/account_loader.go

View workflow job for this annotation

GitHub Actions / check (ubuntu-22.04, 1.22.1)

func historyAccountSchema.extract is unused (U1000)
return account.Address, account.ID
}

// FutureAccountID represents a future history account.
// A FutureAccountID is created by an AccountLoader and
// the account id is available after calling Exec() on
// the AccountLoader.
type FutureAccountID struct {
address string
loader *AccountLoader
}

// Value implements the database/sql/driver Valuer interface.
func (a FutureAccountID) Value() (driver.Value, error) {
return a.loader.GetNow(a.address)
}
type FutureAccountID = future[string, Account]

// AccountLoader will map account addresses to their history
// account ids. If there is no existing mapping for a given address,
// the AccountLoader will insert into the history_accounts table to
// establish a mapping.
type AccountLoader struct {
sealed bool
set set.Set[string]
ids map[string]int64
stats LoaderStats
}

var errSealed = errors.New("cannot register more entries to loader after calling Exec()")
type AccountLoader = loader[string, Account]

// NewAccountLoader will construct a new AccountLoader instance.
func NewAccountLoader() *AccountLoader {
Expand All @@ -48,130 +64,178 @@ func NewAccountLoader() *AccountLoader {
set: set.Set[string]{},
ids: map[string]int64{},
stats: LoaderStats{},
name: "AccountLoader",
schema: historyAccountSchema{},
less: cmp.Less[string],
}
}

// GetFuture registers the given account address into the loader and
type schema[K comparable, T any] interface {
table() string
columns(keys []K) []columnValues
extract(T) (K, int64)
}

type loader[K comparable, T any] struct {
sealed bool
set set.Set[K]
ids map[K]int64
stats LoaderStats
name string
schema schema[K, T]
less func(K, K) bool
}

type future[K comparable, T any] struct {
key K
loader *loader[K, T]
}

// Value implements the database/sql/driver Valuer interface.
func (f future[K, T]) Value() (driver.Value, error) {
return f.loader.GetNow(f.key)
}

// GetFuture registers the given account address into the Loader and
// returns a FutureAccountID which will hold the history account id for
// the address after Exec() is called.
func (a *AccountLoader) GetFuture(address string) FutureAccountID {
if a.sealed {
func (l *loader[K, T]) GetFuture(key K) future[K, T] {
if l.sealed {
panic(errSealed)
}

a.set.Add(address)
return FutureAccountID{
address: address,
loader: a,
l.set.Add(key)
return future[K, T]{
key: key,
loader: l,
}
}

// GetNow returns the history account id for the given address.
// GetNow should only be called on values which were registered by
// GetFuture() calls. Also, Exec() must be called before any GetNow
// call can succeed.
func (a *AccountLoader) GetNow(address string) (int64, error) {
if !a.sealed {
return 0, fmt.Errorf(`invalid account loader state,
Exec was not called yet to properly seal and resolve %v id`, address)
func (l *loader[K, T]) GetNow(key K) (int64, error) {
if !l.sealed {
return 0, fmt.Errorf(`invalid loader state,
Exec was not called yet to properly seal and resolve %v id`, key)
}
if internalID, ok := a.ids[address]; !ok {
return 0, fmt.Errorf(`account loader address %q was not found`, address)
if internalID, ok := l.ids[key]; !ok {
return 0, fmt.Errorf(`loader key %v was not found`, key)
} else {
return internalID, nil
}
}

// LoaderStats describes the result of executing a history lookup id loader
type LoaderStats struct {
// Total is the number of elements registered to the loader
Total int
// Inserted is the number of elements inserted into the lookup table
Inserted int
}

// Exec will look up all the history account ids for the addresses registered in the loader.
// Exec will look up all the history account ids for the addresses registered in the Loader.
// If there are no history account ids for a given set of addresses, Exec will insert rows
// into the history_accounts table to establish a mapping between address and history account id.
func (a *AccountLoader) Exec(ctx context.Context, session db.SessionInterface) error {
a.sealed = true
if len(a.set) == 0 {
func (l *loader[K, T]) Exec(ctx context.Context, session db.SessionInterface) error {
l.sealed = true
if len(l.set) == 0 {
return nil
}
q := &Q{session}
addresses := make([]string, 0, len(a.set))
for address := range a.set {
addresses = append(addresses, address)
keys := make([]K, 0, len(l.set))
for key := range l.set {
keys = append(keys, key)
}
// sort entries before inserting rows to prevent deadlocks on acquiring a ShareLock
// https://github.com/stellar/go/issues/2370
sort.Strings(addresses)
sort.Slice(keys, func(i, j int) bool {
return l.less(keys[i], keys[j])
})

var accounts []Account
err := bulkInsert(
ctx,
q,
"history_accounts",
[]columnValues{
{
name: "address",
dbType: "character varying(64)",
objects: addresses,
},
},
&accounts,
)
if err != nil {
if count, err := l.insert(ctx, q, keys); err != nil {
return err
} else {
l.stats.Total += count
l.stats.Inserted += count
}
for _, account := range accounts {
a.ids[account.Address] = account.ID
a.stats.Inserted++

if count, err := l.query(ctx, q, keys); err != nil {
return err
} else {
l.stats.Total += count
}
a.stats.Total += len(accounts)

remaining := make([]string, 0, len(addresses))
for _, address := range addresses {
if _, ok := a.ids[address]; ok {
continue
}
remaining = append(remaining, address)
return nil
}

// Stats returns the number of addresses registered in the Loader and the number of addresses
// inserted into the history_accounts table.
func (l *loader[K, T]) Stats() LoaderStats {
return l.stats
}

func (l *loader[K, T]) Name() string {
return l.name
}

func (l *loader[K, T]) filter(keys []K) []K {
if len(l.ids) == 0 {
return keys
}
if len(remaining) > 0 {
var remainingAccounts []Account
err = bulkGet(
ctx,
q,
"history_accounts",
[]columnValues{
{
name: "address",
dbType: "character varying(64)",
objects: remaining,
},
},
&remainingAccounts,
)
if err != nil {
return err
}
for _, account := range remainingAccounts {
a.ids[account.Address] = account.ID

remaining := make([]K, 0, len(keys))
for _, key := range keys {
if _, ok := l.ids[key]; ok {
continue
}
a.stats.Total += len(remainingAccounts)
remaining = append(remaining, key)
}
return remaining
}

return nil
func (l *loader[K, T]) updateMap(rows []T) {
for _, row := range rows {
key, id := l.schema.extract(row)
l.ids[key] = id
}
}

// Stats returns the number of addresses registered in the loader and the number of addresses
// inserted into the history_accounts table.
func (a *AccountLoader) Stats() LoaderStats {
return a.stats
func (l *loader[K, T]) insert(ctx context.Context, q *Q, keys []K) (int, error) {
keys = l.filter(keys)
if len(keys) == 0 {
return 0, nil
}

var rows []T
err := bulkInsert(
ctx,
q,
l.schema.table(),
l.schema.columns(keys),
&rows,
)
if err != nil {
return 0, err
}

l.updateMap(rows)
return len(rows), nil
}

func (a *AccountLoader) Name() string {
return "AccountLoader"
func (l *loader[K, T]) query(ctx context.Context, q *Q, keys []K) (int, error) {
keys = l.filter(keys)
if len(keys) == 0 {
return 0, nil
}

var rows []T
err := bulkGet(
ctx,
q,
l.schema.table(),
l.schema.columns(keys),
&rows,
)
if err != nil {
return 0, err
}

l.updateMap(rows)
return len(rows), nil
}

type columnValues struct {
Expand Down
6 changes: 3 additions & 3 deletions services/horizon/internal/db2/history/account_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestAccountLoader(t *testing.T) {
future := loader.GetFuture(address)
_, err := future.Value()
assert.Error(t, err)
assert.Contains(t, err.Error(), `invalid account loader state,`)
assert.Contains(t, err.Error(), `invalid loader state,`)
duplicateFuture := loader.GetFuture(address)
assert.Equal(t, future, duplicateFuture)
}
Expand Down Expand Up @@ -56,13 +56,13 @@ func TestAccountLoader(t *testing.T) {
assert.Error(t, err)
assert.Contains(t, err.Error(), `was not found`)

// check that loader works when all the values are already present in the db
// check that Loader works when all the values are already present in the db
loader = NewAccountLoader()
for _, address := range addresses {
future := loader.GetFuture(address)
_, err = future.Value()
assert.Error(t, err)
assert.Contains(t, err.Error(), `invalid account loader state,`)
assert.Contains(t, err.Error(), `invalid loader state,`)
}

assert.NoError(t, loader.Exec(context.Background(), session))
Expand Down
Loading

0 comments on commit 6e3e306

Please sign in to comment.