diff --git a/services/horizon/internal/db2/history/account_loader.go b/services/horizon/internal/db2/history/account_loader.go index e402129a44..1f2314d63a 100644 --- a/services/horizon/internal/db2/history/account_loader.go +++ b/services/horizon/internal/db2/history/account_loader.go @@ -17,11 +17,32 @@ import ( var errSealed = errors.New("cannot register more entries to Loader after calling Exec()") +// ConcurrencyMode is used to configure the level of thread-safety for a loader type ConcurrencyMode int +func (cm ConcurrencyMode) String() string { + switch cm { + case ConcurrentInserts: + return "ConcurrentInserts" + case ConcurrentDeletes: + return "ConcurrentDeletes" + default: + return "unknown" + } +} + const ( _ ConcurrencyMode = iota + // ConcurrentInserts configures the loader to maintain safety when there are multiple loaders + // inserting into the same table concurrently. This ConcurrencyMode is suitable for parallel reingestion. + // Note while ConcurrentInserts is enabled it is not safe to have deletes occurring concurrently on the + // same table. ConcurrentInserts + // ConcurrentDeletes configures the loader to maintain safety when there is another thread which is invoking + // reapLookupTable() to delete rows from the same table concurrently. This ConcurrencyMode is suitable for + // live ingestion when reaping of lookup tables is enabled. + // Note while ConcurrentDeletes is enabled it is not safe to have multiple threads inserting concurrently to the + // same table. ConcurrentDeletes ) diff --git a/services/horizon/internal/db2/history/loader_concurrency_test.go b/services/horizon/internal/db2/history/loader_concurrency_test.go new file mode 100644 index 0000000000..d3da832bce --- /dev/null +++ b/services/horizon/internal/db2/history/loader_concurrency_test.go @@ -0,0 +1,189 @@ +package history + +import ( + "context" + "database/sql" + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/stellar/go/keypair" + "github.com/stellar/go/services/horizon/internal/test" +) + +func TestLoaderConcurrentInserts(t *testing.T) { + tt := test.Start(t) + defer tt.Finish() + test.ResetHorizonDB(t, tt.HorizonDB) + s1 := tt.HorizonSession() + s2 := s1.Clone() + + for _, testCase := range []struct { + mode ConcurrencyMode + pass bool + }{ + {ConcurrentInserts, true}, + {ConcurrentDeletes, false}, + } { + t.Failed() + t.Run(fmt.Sprintf("%v", testCase.mode), func(t *testing.T) { + var addresses []string + for i := 0; i < 10; i++ { + addresses = append(addresses, keypair.MustRandom().Address()) + } + + l1 := NewAccountLoader(testCase.mode) + for _, address := range addresses { + l1.GetFuture(address) + } + + for i := 0; i < 5; i++ { + addresses = append(addresses, keypair.MustRandom().Address()) + } + + l2 := NewAccountLoader(testCase.mode) + for _, address := range addresses { + l2.GetFuture(address) + } + + assert.NoError(t, s1.Begin(context.Background())) + assert.NoError(t, l1.Exec(context.Background(), s1)) + + assert.NoError(t, s2.Begin(context.Background())) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + <-time.After(time.Second * 3) + assert.NoError(t, s1.Commit()) + }() + assert.NoError(t, l2.Exec(context.Background(), s2)) + assert.NoError(t, s2.Commit()) + wg.Wait() + + assert.Equal(t, LoaderStats{ + Total: 10, + Inserted: 10, + }, l1.Stats()) + + if testCase.pass { + assert.Equal(t, LoaderStats{ + Total: 15, + Inserted: 5, + }, l2.Stats()) + } else { + assert.NotEqual(t, LoaderStats{ + Total: 15, + Inserted: 5, + }, l2.Stats()) + return + } + + q := &Q{s1} + for _, address := range addresses[:10] { + l1Id, err := l1.GetNow(address) + assert.NoError(t, err) + + l2Id, err := l2.GetNow(address) + assert.NoError(t, err) + assert.Equal(t, l1Id, l2Id) + + var account Account + assert.NoError(t, q.AccountByAddress(context.Background(), &account, address)) + assert.Equal(t, account.ID, l1Id) + assert.Equal(t, account.Address, address) + } + + for _, address := range addresses[10:] { + l2Id, err := l2.GetNow(address) + assert.NoError(t, err) + + var account Account + assert.NoError(t, q.AccountByAddress(context.Background(), &account, address)) + assert.Equal(t, account.ID, l2Id) + assert.Equal(t, account.Address, address) + } + }) + } +} + +func TestLoaderConcurrentDeletes(t *testing.T) { + tt := test.Start(t) + defer tt.Finish() + test.ResetHorizonDB(t, tt.HorizonDB) + s1 := tt.HorizonSession() + s2 := s1.Clone() + + for _, testCase := range []struct { + mode ConcurrencyMode + pass bool + }{ + {ConcurrentInserts, false}, + {ConcurrentDeletes, true}, + } { + t.Run(fmt.Sprintf("%v", testCase.mode), func(t *testing.T) { + var addresses []string + for i := 0; i < 10; i++ { + addresses = append(addresses, keypair.MustRandom().Address()) + } + + loader := NewAccountLoader(testCase.mode) + for _, address := range addresses { + loader.GetFuture(address) + } + assert.NoError(t, loader.Exec(context.Background(), s1)) + + var ids []int64 + for _, address := range addresses { + id, err := loader.GetNow(address) + assert.NoError(t, err) + ids = append(ids, id) + } + + loader = NewAccountLoader(testCase.mode) + for _, address := range addresses { + loader.GetFuture(address) + } + + assert.NoError(t, s1.Begin(context.Background())) + assert.NoError(t, loader.Exec(context.Background(), s1)) + + assert.NoError(t, s2.Begin(context.Background())) + q2 := &Q{s2} + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + <-time.After(time.Second * 3) + + q1 := &Q{s1} + for _, address := range addresses { + id, err := loader.GetNow(address) + assert.NoError(t, err) + + var account Account + err = q1.AccountByAddress(context.Background(), &account, address) + if testCase.pass { + assert.NoError(t, err) + assert.Equal(t, account.ID, id) + assert.Equal(t, account.Address, address) + } else { + assert.ErrorContains(t, err, sql.ErrNoRows.Error()) + } + } + assert.NoError(t, s1.Commit()) + }() + + deletedCount, err := q2.reapLookupTable(context.Background(), "history_accounts", ids, 1000) + assert.NoError(t, err) + assert.Equal(t, int64(len(addresses)), deletedCount) + assert.NoError(t, s2.Commit()) + + wg.Wait() + }) + } +} diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index ef285379a6..dd62e17c5e 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -1012,6 +1012,18 @@ func (q *Q) ReapLookupTable(ctx context.Context, table string, ids []int64, newO } defer q.Rollback() + rowsDeleted, err := q.reapLookupTable(ctx, table, ids, newOffset) + if err != nil { + return 0, err + } + + if err := q.Commit(); err != nil { + return 0, fmt.Errorf("could not commit transaction: %w", err) + } + return rowsDeleted, nil +} + +func (q *Q) reapLookupTable(ctx context.Context, table string, ids []int64, newOffset int64) (int64, error) { if err := q.updateLookupTableReapOffset(ctx, table, newOffset); err != nil { return 0, fmt.Errorf("error updating offset: %w", err) } @@ -1024,10 +1036,6 @@ func (q *Q) ReapLookupTable(ctx context.Context, table string, ids []int64, newO return 0, fmt.Errorf("could not delete orphaned rows: %w", err) } } - - if err := q.Commit(); err != nil { - return 0, fmt.Errorf("could not commit transaction: %w", err) - } return rowsDeleted, nil }