-
Notifications
You must be signed in to change notification settings - Fork 503
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
services/horizon/internal/db2/history: Implement account loader and future account ids #5015
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,193 @@ | ||
package history | ||
|
||
import ( | ||
"context" | ||
"database/sql/driver" | ||
"fmt" | ||
"sort" | ||
"strings" | ||
|
||
"github.com/lib/pq" | ||
|
||
"github.com/stellar/go/support/db" | ||
"github.com/stellar/go/support/errors" | ||
) | ||
|
||
// FutureAccountID represents a future history account. | ||
// A FutureAccountID is created by an AccountLoader and | ||
// the account id is available after calling Exec() on | ||
// the AccountLoader. | ||
type FutureAccountID struct { | ||
address string | ||
loader *AccountLoader | ||
} | ||
|
||
const loaderLookupBatchSize = 50000 | ||
|
||
// Value implements the database/sql/driver Valuer interface. | ||
func (a FutureAccountID) Value() (driver.Value, error) { | ||
return a.loader.GetNow(a.address), nil | ||
} | ||
|
||
// AccountLoader will map account addresses to their history | ||
// account ids. If there is no existing mapping for a given address, | ||
// the AccountLoader will insert into the history_accounts table to | ||
// establish a mapping. | ||
type AccountLoader struct { | ||
sealed bool | ||
set map[string]interface{} | ||
ids map[string]int64 | ||
} | ||
|
||
var errSealed = errors.New("cannot register more entries to loader after calling Exec()") | ||
|
||
// NewAccountLoader will construct a new AccountLoader instance. | ||
func NewAccountLoader() *AccountLoader { | ||
return &AccountLoader{ | ||
sealed: false, | ||
set: map[string]interface{}{}, | ||
ids: map[string]int64{}, | ||
} | ||
} | ||
|
||
// GetFuture registers the given account address into the loader and | ||
// returns a FutureAccountID which will hold the history account id for | ||
// the address after Exec() is called. | ||
func (a *AccountLoader) GetFuture(address string) FutureAccountID { | ||
if a.sealed { | ||
panic(errSealed) | ||
} | ||
|
||
a.set[address] = nil | ||
return FutureAccountID{ | ||
address: address, | ||
loader: a, | ||
} | ||
} | ||
|
||
// GetNow returns the history account id for the given address. | ||
// GetNow should only be called on values which were registered by | ||
// GetFuture() calls. Also, Exec() must be called before any GetNow | ||
// call can succeed. | ||
func (a *AccountLoader) GetNow(address string) int64 { | ||
if id, ok := a.ids[address]; !ok { | ||
panic(fmt.Errorf("address %v not present", address)) | ||
} else { | ||
return id | ||
} | ||
} | ||
|
||
func (a *AccountLoader) lookupKeys(ctx context.Context, q *Q, addresses []string) error { | ||
for i := 0; i < len(addresses); i += loaderLookupBatchSize { | ||
end := i + loaderLookupBatchSize | ||
if end > len(addresses) { | ||
end = len(addresses) | ||
} | ||
|
||
var accounts []Account | ||
if err := q.AccountsByAddresses(ctx, &accounts, addresses[i:end]); err != nil { | ||
return errors.Wrap(err, "could not select accounts") | ||
} | ||
|
||
for _, account := range accounts { | ||
a.ids[account.Address] = account.ID | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
// Exec will look up all the history account ids for the addresses registered in the loader. | ||
// If there are no history account ids for a given set of addresses, Exec will insert rows | ||
// into the history_accounts table to establish a mapping between address and history account id. | ||
func (a *AccountLoader) Exec(ctx context.Context, session db.SessionInterface) error { | ||
a.sealed = true | ||
if len(a.set) == 0 { | ||
return nil | ||
} | ||
q := &Q{session} | ||
addresses := make([]string, 0, len(a.set)) | ||
for address := range a.set { | ||
addresses = append(addresses, address) | ||
} | ||
// sort entries before inserting rows to prevent deadlocks on acquiring a ShareLock | ||
// https://github.com/stellar/go/issues/2370 | ||
sort.Strings(addresses) | ||
|
||
if err := a.lookupKeys(ctx, q, addresses); err != nil { | ||
return err | ||
} | ||
|
||
insert := 0 | ||
for _, address := range addresses { | ||
if _, ok := a.ids[address]; ok { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
nvm, I don't think there could be a case where |
||
continue | ||
} | ||
addresses[insert] = address | ||
insert++ | ||
} | ||
if insert == 0 { | ||
return nil | ||
} | ||
addresses = addresses[:insert] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. neat, re-used the same array in place |
||
|
||
err := bulkInsert( | ||
ctx, | ||
q, | ||
"history_accounts", | ||
[]string{"address"}, | ||
[]bulkInsertField{ | ||
{ | ||
name: "address", | ||
dbType: "character varying(64)", | ||
objects: addresses, | ||
}, | ||
}, | ||
) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return a.lookupKeys(ctx, q, addresses) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I intended that the account loader should only be used once and not be reused. I will add some code to enforce that intention. |
||
} | ||
|
||
type bulkInsertField struct { | ||
name string | ||
dbType string | ||
objects []string | ||
} | ||
|
||
func bulkInsert(ctx context.Context, q *Q, table string, conflictFields []string, fields []bulkInsertField) error { | ||
unnestPart := make([]string, 0, len(fields)) | ||
insertFieldsPart := make([]string, 0, len(fields)) | ||
pqArrays := make([]interface{}, 0, len(fields)) | ||
|
||
for _, field := range fields { | ||
unnestPart = append( | ||
unnestPart, | ||
fmt.Sprintf("unnest(?::%s[]) /* %s */", field.dbType, field.name), | ||
) | ||
insertFieldsPart = append( | ||
insertFieldsPart, | ||
field.name, | ||
) | ||
pqArrays = append( | ||
pqArrays, | ||
pq.Array(field.objects), | ||
) | ||
} | ||
|
||
sql := ` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would be compelling at some point if we have time to consider evaluating sql code options that provide compile time type safety on sql statements, less embedded string fragments/concat, like gojet. it may not even be viable for our setup, but worth keeping in mind. |
||
WITH r AS | ||
(SELECT ` + strings.Join(unnestPart, ",") + `) | ||
INSERT INTO ` + table + ` | ||
(` + strings.Join(insertFieldsPart, ",") + `) | ||
SELECT * from r | ||
ON CONFLICT (` + strings.Join(conflictFields, ",") + `) DO NOTHING` | ||
|
||
_, err := q.ExecRaw( | ||
context.WithValue(ctx, &db.QueryTypeContextKey, db.UpsertQueryType), | ||
sql, | ||
pqArrays..., | ||
) | ||
return err | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
package history | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
|
||
"github.com/stellar/go/keypair" | ||
"github.com/stellar/go/services/horizon/internal/test" | ||
) | ||
|
||
func TestAccountLoader(t *testing.T) { | ||
tt := test.Start(t) | ||
defer tt.Finish() | ||
test.ResetHorizonDB(t, tt.HorizonDB) | ||
session := tt.HorizonSession() | ||
|
||
var addresses []string | ||
for i := 0; i < 100; i++ { | ||
addresses = append(addresses, keypair.MustRandom().Address()) | ||
} | ||
|
||
loader := NewAccountLoader() | ||
var futures []FutureAccountID | ||
for _, address := range addresses { | ||
future := loader.GetFuture(address) | ||
futures = append(futures, future) | ||
assert.Panics(t, func() { | ||
loader.GetNow(address) | ||
}) | ||
assert.Panics(t, func() { | ||
future.Value() | ||
}) | ||
} | ||
|
||
assert.NoError(t, loader.Exec(context.Background(), session)) | ||
assert.Panics(t, func() { | ||
loader.GetFuture(keypair.MustRandom().Address()) | ||
}) | ||
|
||
q := &Q{session} | ||
for i, address := range addresses { | ||
future := futures[i] | ||
id := loader.GetNow(address) | ||
val, err := future.Value() | ||
assert.NoError(t, err) | ||
assert.Equal(t, id, val) | ||
var account Account | ||
assert.NoError(t, q.AccountByAddress(context.Background(), &account, address)) | ||
assert.Equal(t, account.ID, id) | ||
assert.Equal(t, account.Address, address) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a prior issue that explains this aspect further, since ingestion is single threaded and all this happens in same db tx, trying to understand where it could happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ingestion is single threaded. however, it is possible to do parallel reingestion with multiple workers where each worker has a separate but concurrent transaction
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, then there is potential that same accounts could be processed from different ledger ranges of different worker threads at same time, how does sorting in that case further avoid db deadlock, I'm not suggesting to remove it, just to understand. It sticks out in the application code as performing unrelated complexity that would have expected to be mitigated at db level with tx repeatable read isolation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's say there are two workers ingesting different ledger ranges. In both the ledger ranges the workers have to insert the same set of accounts into the history_accounts table. If the workers insert the accounts in the same order they will avoid a deadlock because the worker who wins the race will acquire the lock and the other worker will block until the transaction is complete. Consider the worst case scenario if worker 1 inserts accounts A, B, C and worker 2 inserts accounts C, B, A. Let's say worker 1 is faster so it inserts accounts A and B. Then worker 2 inserts account C. When worker 1 tries to insert account C there will be a deadlock because worker 2 already has a lock on that row.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think changing the transaction isolation level could avoid the deadlock