From 0ebda72436fb188f432a86d97ccc46bf82777023 Mon Sep 17 00:00:00 2001 From: Faizan Qazi Date: Wed, 20 Nov 2024 15:33:19 -0500 Subject: [PATCH 1/2] sql: apply PCR AOST time stamp for authentication flow Previously, when using PCR reader catalog its was possible to hit "PCR reader timestamp has moved forward" when attempting to execute workloads. This was because internal executors are exempt from the AOST timestamp. To address this, the authentication code paths are updated to set fixed timestamps, when connecting to a PCR reader catalog. Additionally, additional tests are added for validating opening new connections and preparing queries to confirm this problem is fixed. Fixes: #135829 Release note: None --- pkg/sql/catalog/descs/collection.go | 16 ++ pkg/sql/catalog/descs/leased_descriptors.go | 2 + pkg/sql/catalog/replication/BUILD.bazel | 1 + .../replication/reader_catalog_test.go | 46 ++++- pkg/sql/regions/region_provider_test.go | 4 + pkg/sql/sessioninit/cache.go | 3 + pkg/sql/user.go | 157 +++++++++--------- 7 files changed, 152 insertions(+), 77 deletions(-) diff --git a/pkg/sql/catalog/descs/collection.go b/pkg/sql/catalog/descs/collection.go index 48fb9a6ceae9..098911b92847 100644 --- a/pkg/sql/catalog/descs/collection.go +++ b/pkg/sql/catalog/descs/collection.go @@ -1271,6 +1271,22 @@ func (tc *Collection) GetIndexComment( return tc.GetComment(catalogkeys.MakeCommentKey(uint32(tableID), uint32(indexID), catalogkeys.IndexCommentType)) } +// MaybeSetReplicationSafeTS modifies a txn to apply the replication safe timestamp, +// if we are executing against a PCR reader catalog. +func (tc *Collection) MaybeSetReplicationSafeTS(ctx context.Context, txn *kv.Txn) error { + now := txn.DB().Clock().Now() + desc, err := tc.leased.lm.Acquire(ctx, now, keys.SystemDatabaseID) + if err != nil { + return err + } + defer desc.Release(ctx) + + if desc.Underlying().(catalog.DatabaseDescriptor).GetReplicatedPCRVersion() == 0 { + return nil + } + return txn.SetFixedTimestamp(ctx, tc.leased.lm.GetSafeReplicationTS()) +} + // GetConstraintComment implements the scdecomp.CommentGetter interface. func (tc *Collection) GetConstraintComment( tableID descpb.ID, constraintID catid.ConstraintID, diff --git a/pkg/sql/catalog/descs/leased_descriptors.go b/pkg/sql/catalog/descs/leased_descriptors.go index 2a25469138c4..57398d437f17 100644 --- a/pkg/sql/catalog/descs/leased_descriptors.go +++ b/pkg/sql/catalog/descs/leased_descriptors.go @@ -36,6 +36,8 @@ type LeaseManager interface { IncGaugeAfterLeaseDuration( gaugeType lease.AfterLeaseDurationGauge, ) (decrAfterWait func()) + + GetSafeReplicationTS() hlc.Timestamp } type deadlineHolder interface { diff --git a/pkg/sql/catalog/replication/BUILD.bazel b/pkg/sql/catalog/replication/BUILD.bazel index fa3242085d62..dbacb4a9a6a9 100644 --- a/pkg/sql/catalog/replication/BUILD.bazel +++ b/pkg/sql/catalog/replication/BUILD.bazel @@ -49,6 +49,7 @@ go_test( "//pkg/util/randutil", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", + "@com_github_jackc_pgx_v4//:pgx", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/sql/catalog/replication/reader_catalog_test.go b/pkg/sql/catalog/replication/reader_catalog_test.go index 6b87e7358344..5f0065e3374a 100644 --- a/pkg/sql/catalog/replication/reader_catalog_test.go +++ b/pkg/sql/catalog/replication/reader_catalog_test.go @@ -34,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" + "github.com/jackc/pgx/v4" "github.com/stretchr/testify/require" ) @@ -270,6 +271,8 @@ func TestReaderCatalogTSAdvance(t *testing.T) { srcRunner := sqlutils.MakeSQLRunner(srcConn) ddlToExec := []string{ + "CREATE USER bob password 'bob'", + "GRANT ADMIN TO bob;", "CREATE SEQUENCE sq1;", "CREATE TYPE IF NOT EXISTS status AS ENUM ('open', 'closed', 'inactive');", "CREATE TABLE t1(j int default nextval('sq1'), val status);", @@ -297,6 +300,9 @@ func TestReaderCatalogTSAdvance(t *testing.T) { // Connect only after the reader catalog is setup, so the connection // executor is aware. destConn := destTenant.SQLConn(t) + destURL, destURLCleanup := destTenant.PGUrl(t, serverutils.UserPassword("bob", "bob"), serverutils.ClientCerts(false)) + defer destURLCleanup() + require.NoError(t, err) destRunner := sqlutils.MakeSQLRunner(destConn) check := func(query string, isEqual bool) { @@ -322,6 +328,16 @@ func TestReaderCatalogTSAdvance(t *testing.T) { } else { require.NotEqualValues(t, srcRes, destRes) } + + // Sanity: Execute the same query as prepared statement inside the reader + // catalog . + destPgxConn, err := pgx.Connect(ctx, destURL.String()) + _, err = destPgxConn.Prepare(ctx, query, query) + require.NoError(t, err) + rows, err := destPgxConn.Query(ctx, query) + require.NoError(t, err) + defer rows.Close() + require.NoError(t, destPgxConn.Close(ctx)) } compareEqual := func(query string) { @@ -333,6 +349,10 @@ func TestReaderCatalogTSAdvance(t *testing.T) { var newTS hlc.Timestamp descriptorRefreshHookEnabled.Store(true) + existingPgxConn, err := pgx.Connect(ctx, destURL.String()) + require.NoError(t, err) + _, err = existingPgxConn.Prepare(ctx, "basic select", "SELECT * FROM t1, v1, t2") + require.NoError(t, err) for _, useAOST := range []bool{false, true} { if useAOST { closeWaitForRefresh() @@ -385,7 +405,9 @@ func TestReaderCatalogTSAdvance(t *testing.T) { destRunner.Exec(t, "SET bypass_pcr_reader_catalog_aost='on'") } iterationsDone := false + uniqueIdx := 0 for !iterationsDone { + uniqueIdx++ if !useAOST { select { case waitForRefresh <- struct{}{}: @@ -397,8 +419,27 @@ func TestReaderCatalogTSAdvance(t *testing.T) { case <-iterationsDoneCh: iterationsDone = true default: + // Prepare on an existing connection. + rows, err := existingPgxConn.Query(ctx, "SELECT * FROM t1, v1, t2") + require.NoError(t, err) + rows.Close() + uniqueQuery := fmt.Sprintf("SELECT a.j + %d FROM t1 as a, v1 as b, t2 as c ", uniqueIdx) + _, err = existingPgxConn.Prepare(ctx, fmt.Sprintf("q%d", uniqueIdx), uniqueQuery) + require.NoError(t, err) + rows, err = existingPgxConn.Query(ctx, uniqueQuery) + require.NoError(t, err) + rows.Close() + // Open new connections. + newPgxConn, err := pgx.Connect(ctx, destURL.String()) + require.NoError(t, err) + _, err = newPgxConn.Prepare(ctx, "basic select", "SELECT * FROM t1, v1, t2") + require.NoError(t, err) + rows, err = newPgxConn.Query(ctx, "SELECT * FROM t1, v1, t2") + require.NoError(t, err) + require.NoError(t, newPgxConn.Close(ctx)) + tx := destRunner.Begin(t) - _, err := tx.Exec("SELECT * FROM t1") + _, err = tx.Exec("SELECT * FROM t1") checkAOSTError(err) _, err = tx.Exec("SELECT * FROM v1") checkAOSTError(err) @@ -414,13 +455,13 @@ func TestReaderCatalogTSAdvance(t *testing.T) { checkAOSTError(err) } } - // Finally ensure the queries actually match. require.NoError(t, grp.Wait()) // Check if the error was detected. require.Equalf(t, !useAOST, errorDetected, "error was detected unexpectedly (AOST = %t on connection)", useAOST) } + require.NoError(t, existingPgxConn.Close(ctx)) now = newTS compareEqual("SELECT * FROM t1 ORDER BY j") compareEqual("SELECT * FROM v1 ORDER BY 1") @@ -459,7 +500,6 @@ func TestReaderCatalogTSAdvanceWithLongTxn(t *testing.T) { ddlToExec := []string{ "CREATE USER roacher WITH CREATEROLE;", - "GRANT ADMIN TO roacher;", "ALTER USER roacher SET timezone='America/New_York';", "CREATE SEQUENCE sq1;", "CREATE TABLE t1(n int default nextval('sq1'), val TEXT);", diff --git a/pkg/sql/regions/region_provider_test.go b/pkg/sql/regions/region_provider_test.go index bd8a2bd37e63..da76caacf0a7 100644 --- a/pkg/sql/regions/region_provider_test.go +++ b/pkg/sql/regions/region_provider_test.go @@ -210,6 +210,10 @@ func (f fakeLeaseManager) IncGaugeAfterLeaseDuration(gauge lease.AfterLeaseDurat return func() {} } +func (f fakeLeaseManager) GetSafeReplicationTS() hlc.Timestamp { + return hlc.Timestamp{} +} + var _ descs.LeaseManager = (*fakeLeaseManager)(nil) type fakeSystemDatabase struct { diff --git a/pkg/sql/sessioninit/cache.go b/pkg/sql/sessioninit/cache.go index c999c0dc3b63..18a477cac738 100644 --- a/pkg/sql/sessioninit/cache.go +++ b/pkg/sql/sessioninit/cache.go @@ -123,6 +123,9 @@ func (a *Cache) GetAuthInfo( err = db.DescsTxn(ctx, func( ctx context.Context, txn descs.Txn, ) error { + if err := txn.Descriptors().MaybeSetReplicationSafeTS(ctx, txn.KV()); err != nil { + return err + } _, usersTableDesc, err = descs.PrefixAndTable(ctx, txn.Descriptors().ByNameWithLeased(txn.KV()).Get(), UsersTableName) if err != nil { return err diff --git a/pkg/sql/user.go b/pkg/sql/user.go index dbbf2ae8c1c2..54f7a89c1194 100644 --- a/pkg/sql/user.go +++ b/pkg/sql/user.go @@ -136,6 +136,9 @@ func GetUserSessionInitInfo( return execCfg.InternalDB.DescsTxn(ctx, func( ctx context.Context, txn descs.Txn, ) error { + if err := txn.Descriptors().MaybeSetReplicationSafeTS(ctx, txn.KV()); err != nil { + return err + } memberships, err := MemberOfWithAdminOption(ctx, execCfg, txn, user) if err != nil { return err @@ -288,92 +291,98 @@ func retrieveAuthInfo( // we should always look up the latest data. const getHashedPassword = `SELECT "hashedPassword" FROM system.public.users ` + `WHERE username=$1` - ie := f.Executor() - values, err := ie.QueryRowEx( - ctx, "get-hashed-pwd", nil, /* txn */ - sessiondata.NodeUserSessionDataOverride, - getHashedPassword, user) - - if err != nil { - return aInfo, errors.Wrapf(err, "error looking up user %s", user) - } - var hashedPassword []byte - if values != nil { - aInfo.UserExists = true - if v := values[0]; v != tree.DNull { - hashedPassword = []byte(*(v.(*tree.DBytes))) + err := f.DescsTxn(ctx, func(ctx context.Context, txn descs.Txn) error { + if err := txn.Descriptors().MaybeSetReplicationSafeTS(ctx, txn.KV()); err != nil { + return err + } + values, err := txn.QueryRowEx( + ctx, "get-hashed-pwd", txn.KV(), /* txn */ + sessiondata.NodeUserSessionDataOverride, + getHashedPassword, user) + if err != nil { + return err } - } - aInfo.HashedPassword = password.LoadPasswordHash(ctx, hashedPassword) - if !aInfo.UserExists { - return aInfo, nil - } + var hashedPassword []byte + if values != nil { + aInfo.UserExists = true + if v := values[0]; v != tree.DNull { + hashedPassword = []byte(*(v.(*tree.DBytes))) + } + } - // None of the rest of the role options are relevant for root. - if user.IsRootUser() { - return aInfo, nil - } + aInfo.HashedPassword = password.LoadPasswordHash(ctx, hashedPassword) - // Use fully qualified table name to avoid looking up "".system.role_options. - const getLoginDependencies = `SELECT option, value FROM system.public.role_options ` + - `WHERE username=$1 AND option IN ('NOLOGIN', 'VALID UNTIL', 'NOSQLLOGIN', 'REPLICATION', 'SUBJECT')` + if !aInfo.UserExists { + return nil + } - roleOptsIt, err := ie.QueryIteratorEx( - ctx, "get-login-dependencies", nil, /* txn */ - sessiondata.NodeUserSessionDataOverride, - getLoginDependencies, - user, - ) + // None of the rest of the role options are relevant for root. + if user.IsRootUser() { + return nil + } - if err != nil { - return aInfo, errors.Wrapf(err, "error looking up user %s", user) - } - // We have to make sure to close the iterator since we might return from - // the for loop early (before Next() returns false). - defer func() { retErr = errors.CombineErrors(retErr, roleOptsIt.Close()) }() + // Use fully qualified table name to avoid looking up "".system.role_options. + const getLoginDependencies = `SELECT option, value FROM system.public.role_options ` + + `WHERE username=$1 AND option IN ('NOLOGIN', 'VALID UNTIL', 'NOSQLLOGIN', 'REPLICATION', 'SUBJECT')` - // To support users created before 20.1, allow all USERS/ROLES to login - // if NOLOGIN is not found. - aInfo.CanLoginSQLRoleOpt = true - aInfo.CanLoginDBConsoleRoleOpt = true - var ok bool + roleOptsIt, err := txn.QueryIteratorEx( + ctx, "get-login-dependencies", txn.KV(), /* txn */ + sessiondata.NodeUserSessionDataOverride, + getLoginDependencies, + user, + ) - for ok, err = roleOptsIt.Next(ctx); ok; ok, err = roleOptsIt.Next(ctx) { - row := roleOptsIt.Cur() - option := string(tree.MustBeDString(row[0])) - switch option { - case "NOLOGIN": - aInfo.CanLoginSQLRoleOpt = false - aInfo.CanLoginDBConsoleRoleOpt = false - case "NOSQLLOGIN": - aInfo.CanLoginSQLRoleOpt = false - case "REPLICATION": - aInfo.CanUseReplicationRoleOpt = true - case "VALID UNTIL": - if row[1] != tree.DNull { - ts := string(tree.MustBeDString(row[1])) - // This is okay because the VALID UNTIL is stored as a string - // representation of a TimestampTZ which has the same underlying - // representation in the table as a Timestamp (UTC time). - timeCtx := tree.NewParseContext(timeutil.Now()) - aInfo.ValidUntil, _, err = tree.ParseDTimestamp(timeCtx, ts, time.Microsecond) - if err != nil { - return aInfo, errors.Wrap(err, - "error trying to parse timestamp while retrieving password valid until value") + if err != nil { + return errors.Wrapf(err, "error looking up user %s", user) + } + // We have to make sure to close the iterator since we might return from + // the for loop early (before Next() returns false). + defer func() { retErr = errors.CombineErrors(retErr, roleOptsIt.Close()) }() + + // To support users created before 20.1, allow all USERS/ROLES to login + // if NOLOGIN is not found. + aInfo.CanLoginSQLRoleOpt = true + aInfo.CanLoginDBConsoleRoleOpt = true + var ok bool + + for ok, err = roleOptsIt.Next(ctx); ok; ok, err = roleOptsIt.Next(ctx) { + row := roleOptsIt.Cur() + option := string(tree.MustBeDString(row[0])) + switch option { + case "NOLOGIN": + aInfo.CanLoginSQLRoleOpt = false + aInfo.CanLoginDBConsoleRoleOpt = false + case "NOSQLLOGIN": + aInfo.CanLoginSQLRoleOpt = false + case "REPLICATION": + aInfo.CanUseReplicationRoleOpt = true + case "VALID UNTIL": + if row[1] != tree.DNull { + ts := string(tree.MustBeDString(row[1])) + // This is okay because the VALID UNTIL is stored as a string + // representation of a TimestampTZ which has the same underlying + // representation in the table as a Timestamp (UTC time). + timeCtx := tree.NewParseContext(timeutil.Now()) + aInfo.ValidUntil, _, err = tree.ParseDTimestamp(timeCtx, ts, time.Microsecond) + if err != nil { + return errors.Wrap(err, + "error trying to parse timestamp while retrieving password valid until value") + } } - } - case "SUBJECT": - if row[1] != tree.DNull { - subjectStr := string(tree.MustBeDString(row[1])) - dn, err := distinguishedname.ParseDN(subjectStr) - if err != nil { - return aInfo, err + case "SUBJECT": + if row[1] != tree.DNull { + subjectStr := string(tree.MustBeDString(row[1])) + dn, err := distinguishedname.ParseDN(subjectStr) + if err != nil { + return err + } + aInfo.Subject = dn } - aInfo.Subject = dn } } - } + return err + }) return aInfo, err } From f096bbe929b4ff1f4962024b81720ffa32772469 Mon Sep 17 00:00:00 2001 From: Faizan Qazi Date: Fri, 22 Nov 2024 10:26:43 -0500 Subject: [PATCH 2/2] catalog/lease: mismatchedExternalDataRowTimestamp retryable Previously, when mixed ExternalDataRow time stamps were detected between descriptors during a txn, an error would be returned. While user operations should never see this error because of an AOST set on them (to guarantee a consistent view of descriptors), internal operations could observe this error. To address this, this patch marks the mismatchedExternalDataRowTimestamp error as retryable. This will cause internal txns to retry if the error gets hit. Release note: None --- pkg/sql/catalog/descs/leased_descriptors.go | 4 +++ .../replication/reader_catalog_test.go | 32 ++++++++++++------- 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/pkg/sql/catalog/descs/leased_descriptors.go b/pkg/sql/catalog/descs/leased_descriptors.go index 57398d437f17..e192d49eb4e2 100644 --- a/pkg/sql/catalog/descs/leased_descriptors.go +++ b/pkg/sql/catalog/descs/leased_descriptors.go @@ -101,6 +101,10 @@ func newMismatchedExternalDataRowTimestampError( } } +// ClientVisibleRetryError implements the ClientVisibleRetryError interface. +func (e *mismatchedExternalDataRowTimestamp) ClientVisibleRetryError() { +} + func (e *mismatchedExternalDataRowTimestamp) SafeFormatError(p errors.Printer) (next error) { p.Printf("PCR reader timestamp has moved forward, "+ "existing descriptor %s(%d) and timestamp: %s "+ diff --git a/pkg/sql/catalog/replication/reader_catalog_test.go b/pkg/sql/catalog/replication/reader_catalog_test.go index 5f0065e3374a..54e5d7328917 100644 --- a/pkg/sql/catalog/replication/reader_catalog_test.go +++ b/pkg/sql/catalog/replication/reader_catalog_test.go @@ -254,7 +254,7 @@ func TestReaderCatalogTSAdvance(t *testing.T) { destTestingKnobs := base.TestingKnobs{ SQLLeaseManager: &lease.ManagerTestingKnobs{ TestingDescriptorRefreshedEvent: func(descriptor *descpb.Descriptor) { - if !descriptorRefreshHookEnabled.Load() { + if !descriptorRefreshHookEnabled.Swap(false) { return } <-waitForRefresh @@ -332,6 +332,7 @@ func TestReaderCatalogTSAdvance(t *testing.T) { // Sanity: Execute the same query as prepared statement inside the reader // catalog . destPgxConn, err := pgx.Connect(ctx, destURL.String()) + require.NoError(t, err) _, err = destPgxConn.Prepare(ctx, query, query) require.NoError(t, err) rows, err := destPgxConn.Query(ctx, query) @@ -409,6 +410,9 @@ func TestReaderCatalogTSAdvance(t *testing.T) { for !iterationsDone { uniqueIdx++ if !useAOST { + // Toggle the block on each iteration, so there is some risk + // of not all descriptor updates being updated. + descriptorRefreshHookEnabled.Swap(true) select { case waitForRefresh <- struct{}{}: case <-iterationsDoneCh: @@ -436,16 +440,22 @@ func TestReaderCatalogTSAdvance(t *testing.T) { require.NoError(t, err) rows, err = newPgxConn.Query(ctx, "SELECT * FROM t1, v1, t2") require.NoError(t, err) + rows.Close() require.NoError(t, newPgxConn.Close(ctx)) - tx := destRunner.Begin(t) - _, err = tx.Exec("SELECT * FROM t1") - checkAOSTError(err) - _, err = tx.Exec("SELECT * FROM v1") - checkAOSTError(err) - _, err = tx.Exec("SELECT * FROM t2") - checkAOSTError(err) - checkAOSTError(tx.Commit()) + // Only use txn's with AOST, which should never hit retryable errors. + // Automatic retry's are enabled for PCR errors, but they can still + // happen in a txn. When AOST is off don't try the txn. + if useAOST { + tx := destRunner.Begin(t) + _, err = tx.Exec("SELECT * FROM t1") + checkAOSTError(err) + _, err = tx.Exec("SELECT * FROM v1") + checkAOSTError(err) + _, err = tx.Exec("SELECT * FROM t2") + checkAOSTError(err) + checkAOSTError(tx.Commit()) + } _, err = destRunner.DB.ExecContext(ctx, "SELECT * FROM t1,v1, t2") checkAOSTError(err) @@ -457,8 +467,8 @@ func TestReaderCatalogTSAdvance(t *testing.T) { } // Finally ensure the queries actually match. require.NoError(t, grp.Wait()) - // Check if the error was detected. - require.Equalf(t, !useAOST, errorDetected, + // When the AOST is shut off we retry, so no errors should occur. + require.Equalf(t, false, errorDetected, "error was detected unexpectedly (AOST = %t on connection)", useAOST) } require.NoError(t, existingPgxConn.Close(ctx))