diff --git a/internal/datastore/crdb/crdb.go b/internal/datastore/crdb/crdb.go index 4613c3a089..88d6e3e9ae 100644 --- a/internal/datastore/crdb/crdb.go +++ b/internal/datastore/crdb/crdb.go @@ -103,11 +103,12 @@ func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datas return nil, common.RedactAndLogSensitiveConnString(ctx, errUnableToInstantiate, err, url) } + includeQueryParametersInTraces := config.includeQueryParametersInTraces readPoolConfig, err := pgxpool.ParseConfig(url) if err != nil { return nil, common.RedactAndLogSensitiveConnString(ctx, errUnableToInstantiate, err, url) } - err = config.readPoolOpts.ConfigurePgx(readPoolConfig) + err = config.readPoolOpts.ConfigurePgx(readPoolConfig, includeQueryParametersInTraces) if err != nil { return nil, common.RedactAndLogSensitiveConnString(ctx, errUnableToInstantiate, err, url) } @@ -116,7 +117,7 @@ func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datas if err != nil { return nil, common.RedactAndLogSensitiveConnString(ctx, errUnableToInstantiate, err, url) } - err = config.writePoolOpts.ConfigurePgx(writePoolConfig) + err = config.writePoolOpts.ConfigurePgx(writePoolConfig, includeQueryParametersInTraces) if err != nil { return nil, common.RedactAndLogSensitiveConnString(ctx, errUnableToInstantiate, err, url) } diff --git a/internal/datastore/crdb/migrations/driver.go b/internal/datastore/crdb/migrations/driver.go index cec5c0a1c5..74dea59706 100644 --- a/internal/datastore/crdb/migrations/driver.go +++ b/internal/datastore/crdb/migrations/driver.go @@ -35,7 +35,7 @@ func NewCRDBDriver(url string) (*CRDBDriver, error) { return nil, fmt.Errorf(errUnableToInstantiate, err) } pgxcommon.ConfigurePGXLogger(connConfig) - pgxcommon.ConfigureOTELTracer(connConfig) + pgxcommon.ConfigureOTELTracer(connConfig, false) db, err := pgx.ConnectConfig(context.Background(), connConfig) if err != nil { diff --git a/internal/datastore/crdb/options.go b/internal/datastore/crdb/options.go index db0fab7f7a..67d8933638 100644 --- a/internal/datastore/crdb/options.go +++ b/internal/datastore/crdb/options.go @@ -12,22 +12,23 @@ type crdbOptions struct { readPoolOpts, writePoolOpts pgxcommon.PoolOptions connectRate time.Duration - watchBufferLength uint16 - watchBufferWriteTimeout time.Duration - watchConnectTimeout time.Duration - revisionQuantization time.Duration - followerReadDelay time.Duration - maxRevisionStalenessPercent float64 - gcWindow time.Duration - maxRetries uint8 - overlapStrategy string - overlapKey string - enableConnectionBalancing bool - analyzeBeforeStatistics bool - filterMaximumIDCount uint16 - enablePrometheusStats bool - withIntegrity bool - allowedMigrations []string + watchBufferLength uint16 + watchBufferWriteTimeout time.Duration + watchConnectTimeout time.Duration + revisionQuantization time.Duration + followerReadDelay time.Duration + maxRevisionStalenessPercent float64 + gcWindow time.Duration + maxRetries uint8 + overlapStrategy string + overlapKey string + enableConnectionBalancing bool + analyzeBeforeStatistics bool + filterMaximumIDCount uint16 + enablePrometheusStats bool + withIntegrity bool + includeQueryParametersInTraces bool + allowedMigrations []string } const ( @@ -50,11 +51,12 @@ const ( defaultOverlapKey = "defaultsynckey" defaultOverlapStrategy = overlapStrategyStatic - defaultEnablePrometheusStats = false - defaultEnableConnectionBalancing = true - defaultConnectRate = 100 * time.Millisecond - defaultFilterMaximumIDCount = 100 - defaultWithIntegrity = false + defaultEnablePrometheusStats = false + defaultEnableConnectionBalancing = true + defaultConnectRate = 100 * time.Millisecond + defaultFilterMaximumIDCount = 100 + defaultWithIntegrity = false + defaultIncludeQueryParametersInTraces = false ) // Option provides the facility to configure how clients within the CRDB @@ -63,21 +65,22 @@ type Option func(*crdbOptions) func generateConfig(options []Option) (crdbOptions, error) { computed := crdbOptions{ - gcWindow: 24 * time.Hour, - watchBufferLength: defaultWatchBufferLength, - watchBufferWriteTimeout: defaultWatchBufferWriteTimeout, - watchConnectTimeout: defaultWatchConnectTimeout, - revisionQuantization: defaultRevisionQuantization, - followerReadDelay: defaultFollowerReadDelay, - maxRevisionStalenessPercent: defaultMaxRevisionStalenessPercent, - maxRetries: defaultMaxRetries, - overlapKey: defaultOverlapKey, - overlapStrategy: defaultOverlapStrategy, - enablePrometheusStats: defaultEnablePrometheusStats, - enableConnectionBalancing: defaultEnableConnectionBalancing, - connectRate: defaultConnectRate, - filterMaximumIDCount: defaultFilterMaximumIDCount, - withIntegrity: defaultWithIntegrity, + gcWindow: 24 * time.Hour, + watchBufferLength: defaultWatchBufferLength, + watchBufferWriteTimeout: defaultWatchBufferWriteTimeout, + watchConnectTimeout: defaultWatchConnectTimeout, + revisionQuantization: defaultRevisionQuantization, + followerReadDelay: defaultFollowerReadDelay, + maxRevisionStalenessPercent: defaultMaxRevisionStalenessPercent, + maxRetries: defaultMaxRetries, + overlapKey: defaultOverlapKey, + overlapStrategy: defaultOverlapStrategy, + enablePrometheusStats: defaultEnablePrometheusStats, + enableConnectionBalancing: defaultEnableConnectionBalancing, + connectRate: defaultConnectRate, + filterMaximumIDCount: defaultFilterMaximumIDCount, + withIntegrity: defaultWithIntegrity, + includeQueryParametersInTraces: defaultIncludeQueryParametersInTraces, } for _, option := range options { @@ -345,3 +348,8 @@ func WithIntegrity(withIntegrity bool) Option { func AllowedMigrations(allowedMigrations []string) Option { return func(po *crdbOptions) { po.allowedMigrations = allowedMigrations } } + +// IncludeQueryParametersInTraces marks whether query parameters should be included in traces. +func IncludeQueryParametersInTraces(includeQueryParametersInTraces bool) Option { + return func(po *crdbOptions) { po.includeQueryParametersInTraces = includeQueryParametersInTraces } +} diff --git a/internal/datastore/postgres/common/pgx.go b/internal/datastore/postgres/common/pgx.go index 8b778d2258..4187d96334 100644 --- a/internal/datastore/postgres/common/pgx.go +++ b/internal/datastore/postgres/common/pgx.go @@ -168,7 +168,7 @@ func ParseConfigWithInstrumentation(url string) (*pgx.ConnConfig, error) { } ConfigurePGXLogger(connConfig) - ConfigureOTELTracer(connConfig) + ConfigureOTELTracer(connConfig, false) return connConfig, nil } @@ -281,8 +281,16 @@ func IsSerializationError(err error) bool { } // ConfigureOTELTracer adds OTEL tracing to a pgx.ConnConfig -func ConfigureOTELTracer(connConfig *pgx.ConnConfig) { - addTracer(connConfig, otelpgx.NewTracer(otelpgx.WithTrimSQLInSpanName())) +func ConfigureOTELTracer(connConfig *pgx.ConnConfig, includeQueryParameters bool) { + options := []otelpgx.Option{ + otelpgx.WithTrimSQLInSpanName(), + } + + if includeQueryParameters { + options = append(options, otelpgx.WithIncludeQueryParameters()) + } + + addTracer(connConfig, otelpgx.NewTracer(options...)) } func addTracer(connConfig *pgx.ConnConfig, tracer pgx.QueryTracer) { @@ -343,7 +351,7 @@ type PoolOptions struct { } // ConfigurePgx applies PoolOptions to a pgx connection pool confiugration. -func (opts PoolOptions) ConfigurePgx(pgxConfig *pgxpool.Config) error { +func (opts PoolOptions) ConfigurePgx(pgxConfig *pgxpool.Config, includeQueryParametersInTraces bool) error { if opts.MaxOpenConns != nil { maxConns, err := safecast.ToInt32(*opts.MaxOpenConns) if err != nil { @@ -385,7 +393,7 @@ func (opts PoolOptions) ConfigurePgx(pgxConfig *pgxpool.Config) error { } ConfigurePGXLogger(pgxConfig.ConnConfig) - ConfigureOTELTracer(pgxConfig.ConnConfig) + ConfigureOTELTracer(pgxConfig.ConnConfig, includeQueryParametersInTraces) return nil } diff --git a/internal/datastore/postgres/migrations/driver.go b/internal/datastore/postgres/migrations/driver.go index e56efa0b00..e6584e689d 100644 --- a/internal/datastore/postgres/migrations/driver.go +++ b/internal/datastore/postgres/migrations/driver.go @@ -29,7 +29,7 @@ type AlembicPostgresDriver struct { } // NewAlembicPostgresDriver creates a new driver with active connections to the database specified. -func NewAlembicPostgresDriver(ctx context.Context, url string, credentialsProvider datastore.CredentialsProvider) (*AlembicPostgresDriver, error) { +func NewAlembicPostgresDriver(ctx context.Context, url string, credentialsProvider datastore.CredentialsProvider, includeQueryParametersInTraces bool) (*AlembicPostgresDriver, error) { ctx, span := tracer.Start(ctx, "NewAlembicPostgresDriver") defer span.End() @@ -38,7 +38,7 @@ func NewAlembicPostgresDriver(ctx context.Context, url string, credentialsProvid return nil, err } pgxcommon.ConfigurePGXLogger(connConfig) - pgxcommon.ConfigureOTELTracer(connConfig) + pgxcommon.ConfigureOTELTracer(connConfig, includeQueryParametersInTraces) if credentialsProvider != nil { log.Ctx(ctx).Debug().Str("name", credentialsProvider.Name()).Msg("using credentials provider") diff --git a/internal/datastore/postgres/options.go b/internal/datastore/postgres/options.go index d3a9b1f8de..be79c03d9a 100644 --- a/internal/datastore/postgres/options.go +++ b/internal/datastore/postgres/options.go @@ -24,10 +24,11 @@ type postgresOptions struct { maxRetries uint8 filterMaximumIDCount uint16 - enablePrometheusStats bool - analyzeBeforeStatistics bool - gcEnabled bool - readStrictMode bool + enablePrometheusStats bool + analyzeBeforeStatistics bool + gcEnabled bool + readStrictMode bool + includeQueryParametersInTraces bool migrationPhase string allowedMigrations []string @@ -67,6 +68,7 @@ const ( defaultCredentialsProviderName = "" defaultReadStrictMode = false defaultFilterMaximumIDCount = 100 + defaultIncludeQueryParametersInTraces = false ) // Option provides the facility to configure how clients within the @@ -75,20 +77,21 @@ type Option func(*postgresOptions) func generateConfig(options []Option) (postgresOptions, error) { computed := postgresOptions{ - gcWindow: defaultGarbageCollectionWindow, - gcInterval: defaultGarbageCollectionInterval, - gcMaxOperationTime: defaultGarbageCollectionMaxOperationTime, - watchBufferLength: defaultWatchBufferLength, - watchBufferWriteTimeout: defaultWatchBufferWriteTimeout, - revisionQuantization: defaultQuantization, - maxRevisionStalenessPercent: defaultMaxRevisionStalenessPercent, - enablePrometheusStats: defaultEnablePrometheusStats, - maxRetries: defaultMaxRetries, - gcEnabled: defaultGCEnabled, - credentialsProviderName: defaultCredentialsProviderName, - readStrictMode: defaultReadStrictMode, - queryInterceptor: nil, - filterMaximumIDCount: defaultFilterMaximumIDCount, + gcWindow: defaultGarbageCollectionWindow, + gcInterval: defaultGarbageCollectionInterval, + gcMaxOperationTime: defaultGarbageCollectionMaxOperationTime, + watchBufferLength: defaultWatchBufferLength, + watchBufferWriteTimeout: defaultWatchBufferWriteTimeout, + revisionQuantization: defaultQuantization, + maxRevisionStalenessPercent: defaultMaxRevisionStalenessPercent, + enablePrometheusStats: defaultEnablePrometheusStats, + maxRetries: defaultMaxRetries, + gcEnabled: defaultGCEnabled, + credentialsProviderName: defaultCredentialsProviderName, + readStrictMode: defaultReadStrictMode, + queryInterceptor: nil, + filterMaximumIDCount: defaultFilterMaximumIDCount, + includeQueryParametersInTraces: defaultIncludeQueryParametersInTraces, } for _, option := range options { @@ -377,3 +380,8 @@ func CredentialsProviderName(credentialsProviderName string) Option { func FilterMaximumIDCount(filterMaximumIDCount uint16) Option { return func(po *postgresOptions) { po.filterMaximumIDCount = filterMaximumIDCount } } + +// IncludeQueryParametersInTraces is a flag to set whether to include query parameters in OTEL traces +func IncludeQueryParametersInTraces(includeQueryParametersInTraces bool) Option { + return func(po *postgresOptions) { po.includeQueryParametersInTraces = includeQueryParametersInTraces } +} diff --git a/internal/datastore/postgres/postgres.go b/internal/datastore/postgres/postgres.go index dec442c9b8..7de86e6af0 100644 --- a/internal/datastore/postgres/postgres.go +++ b/internal/datastore/postgres/postgres.go @@ -182,7 +182,8 @@ func newPostgresDatastore( // Setup the config for each of the read and write pools. readPoolConfig := pgConfig.Copy() - err = config.readPoolOpts.ConfigurePgx(readPoolConfig) + includeQueryParametersInTraces := config.includeQueryParametersInTraces + err = config.readPoolOpts.ConfigurePgx(readPoolConfig, includeQueryParametersInTraces) if err != nil { return nil, common.RedactAndLogSensitiveConnString(ctx, errUnableToInstantiate, err, pgURL) } @@ -195,7 +196,7 @@ func newPostgresDatastore( var writePoolConfig *pgxpool.Config if isPrimary { writePoolConfig = pgConfig.Copy() - err = config.writePoolOpts.ConfigurePgx(writePoolConfig) + err = config.writePoolOpts.ConfigurePgx(writePoolConfig, includeQueryParametersInTraces) if err != nil { return nil, common.RedactAndLogSensitiveConnString(ctx, errUnableToInstantiate, err, pgURL) } @@ -377,21 +378,22 @@ type pgDatastore struct { *revisions.CachedOptimizedRevisions *common.MigrationValidator - dburl string - readPool, writePool pgxcommon.ConnPooler - watchBufferLength uint16 - watchBufferWriteTimeout time.Duration - optimizedRevisionQuery string - validTransactionQuery string - gcWindow time.Duration - gcInterval time.Duration - gcTimeout time.Duration - analyzeBeforeStatistics bool - readTxOptions pgx.TxOptions - maxRetries uint8 - watchEnabled bool - isPrimary bool - inStrictReadMode bool + dburl string + readPool, writePool pgxcommon.ConnPooler + watchBufferLength uint16 + watchBufferWriteTimeout time.Duration + optimizedRevisionQuery string + validTransactionQuery string + gcWindow time.Duration + gcInterval time.Duration + gcTimeout time.Duration + analyzeBeforeStatistics bool + readTxOptions pgx.TxOptions + maxRetries uint8 + watchEnabled bool + isPrimary bool + inStrictReadMode bool + includeQueryParametersInTraces bool credentialsProvider datastore.CredentialsProvider @@ -652,7 +654,7 @@ func errorRetryable(err error) bool { } func (pgd *pgDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, error) { - pgDriver, err := migrations.NewAlembicPostgresDriver(ctx, pgd.dburl, pgd.credentialsProvider) + pgDriver, err := migrations.NewAlembicPostgresDriver(ctx, pgd.dburl, pgd.credentialsProvider, pgd.includeQueryParametersInTraces) if err != nil { return datastore.ReadyState{}, err } diff --git a/internal/testserver/datastore/postgres.go b/internal/testserver/datastore/postgres.go index f5c08d7ea8..84816770e2 100644 --- a/internal/testserver/datastore/postgres.go +++ b/internal/testserver/datastore/postgres.go @@ -148,7 +148,7 @@ func (b *postgresTester) NewDatastore(t testing.TB, initFunc InitFunc) datastore for i := 1; i <= retryCount; i++ { connectStr := b.NewDatabase(t) - migrationDriver, err := pgmigrations.NewAlembicPostgresDriver(context.Background(), connectStr, datastore.NoCredentialsProvider) + migrationDriver, err := pgmigrations.NewAlembicPostgresDriver(context.Background(), connectStr, datastore.NoCredentialsProvider, false) if err == nil { ctx := context.WithValue(context.Background(), migrate.BackfillBatchSize, uint64(1000)) require.NoError(t, pgmigrations.DatabaseMigrations.Run(ctx, migrationDriver, b.targetMigration, migrate.LiveRun)) diff --git a/pkg/cmd/datastore/datastore.go b/pkg/cmd/datastore/datastore.go index e0c23733d1..8b7da6726b 100644 --- a/pkg/cmd/datastore/datastore.go +++ b/pkg/cmd/datastore/datastore.go @@ -107,11 +107,12 @@ type Config struct { FilterMaximumIDCount uint16 `debugmap:"hidden" default:"100"` // Options - ReadConnPool ConnPoolConfig `debugmap:"visible"` - WriteConnPool ConnPoolConfig `debugmap:"visible"` - ReadOnly bool `debugmap:"visible"` - EnableDatastoreMetrics bool `debugmap:"visible"` - DisableStats bool `debugmap:"visible"` + ReadConnPool ConnPoolConfig `debugmap:"visible"` + WriteConnPool ConnPoolConfig `debugmap:"visible"` + ReadOnly bool `debugmap:"visible"` + EnableDatastoreMetrics bool `debugmap:"visible"` + DisableStats bool `debugmap:"visible"` + IncludeQueryParametersInTraces bool `debugmap:"visible"` // Read Replicas ReadReplicaConnPool ConnPoolConfig `debugmap:"visible"` @@ -247,6 +248,7 @@ func RegisterDatastoreFlagsWithPrefix(flagSet *pflag.FlagSet, prefix string, opt flagSet.Uint16Var(&opts.WatchBufferLength, flagName("datastore-watch-buffer-length"), 1024, "how large the watch buffer should be before blocking") flagSet.DurationVar(&opts.WatchBufferWriteTimeout, flagName("datastore-watch-buffer-write-timeout"), 1*time.Second, "how long the watch buffer should queue before forcefully disconnecting the reader") flagSet.DurationVar(&opts.WatchConnectTimeout, flagName("datastore-watch-connect-timeout"), 1*time.Second, "how long the watch connection should wait before timing out (cockroachdb driver only)") + flagSet.BoolVar(&opts.IncludeQueryParametersInTraces, flagName("datastore-include-query-parameters-in-traces"), false, "include query parameters in traces (postgres and CRDB drivers only)") flagSet.BoolVar(&opts.RelationshipIntegrityEnabled, flagName("datastore-relationship-integrity-enabled"), false, "enables relationship integrity checks. only supported on CRDB") flagSet.StringVar(&opts.RelationshipIntegrityCurrentKey.KeyID, flagName("datastore-relationship-integrity-current-key-id"), "", "current key id for relationship integrity checks") @@ -315,6 +317,7 @@ func DefaultDatastoreConfig() *Config { RelationshipIntegrityCurrentKey: RelIntegrityKey{}, RelationshipIntegrityExpiredKeys: []string{}, AllowedMigrations: []string{}, + IncludeQueryParametersInTraces: false, } } @@ -509,6 +512,7 @@ func newCRDBDatastore(ctx context.Context, opts Config) (datastore.Datastore, er crdb.FilterMaximumIDCount(opts.FilterMaximumIDCount), crdb.WithIntegrity(opts.RelationshipIntegrityEnabled), crdb.AllowedMigrations(opts.AllowedMigrations), + crdb.IncludeQueryParametersInTraces(opts.IncludeQueryParametersInTraces), ) } @@ -549,6 +553,7 @@ func commonPostgresDatastoreOptions(opts Config) ([]postgres.Option, error) { postgres.WithEnablePrometheusStats(opts.EnableDatastoreMetrics), postgres.MaxRetries(maxRetries), postgres.FilterMaximumIDCount(opts.FilterMaximumIDCount), + postgres.IncludeQueryParametersInTraces(opts.IncludeQueryParametersInTraces), }, nil } diff --git a/pkg/cmd/datastore/zz_generated.options.go b/pkg/cmd/datastore/zz_generated.options.go index 7d0774838c..4bf6a58d30 100644 --- a/pkg/cmd/datastore/zz_generated.options.go +++ b/pkg/cmd/datastore/zz_generated.options.go @@ -44,6 +44,7 @@ func (c *Config) ToOption() ConfigOption { to.ReadOnly = c.ReadOnly to.EnableDatastoreMetrics = c.EnableDatastoreMetrics to.DisableStats = c.DisableStats + to.IncludeQueryParametersInTraces = c.IncludeQueryParametersInTraces to.ReadReplicaConnPool = c.ReadReplicaConnPool to.ReadReplicaURIs = c.ReadReplicaURIs to.ReadReplicaCredentialsProviderName = c.ReadReplicaCredentialsProviderName @@ -95,6 +96,7 @@ func (c Config) DebugMap() map[string]any { debugMap["ReadOnly"] = helpers.DebugValue(c.ReadOnly, false) debugMap["EnableDatastoreMetrics"] = helpers.DebugValue(c.EnableDatastoreMetrics, false) debugMap["DisableStats"] = helpers.DebugValue(c.DisableStats, false) + debugMap["IncludeQueryParametersInTraces"] = helpers.DebugValue(c.IncludeQueryParametersInTraces, false) debugMap["ReadReplicaConnPool"] = helpers.DebugValue(c.ReadReplicaConnPool, false) debugMap["ReadReplicaURIs"] = helpers.SensitiveDebugValue(c.ReadReplicaURIs) debugMap["ReadReplicaCredentialsProviderName"] = helpers.DebugValue(c.ReadReplicaCredentialsProviderName, false) @@ -238,6 +240,13 @@ func WithDisableStats(disableStats bool) ConfigOption { } } +// WithIncludeQueryParametersInTraces returns an option that can set IncludeQueryParametersInTraces on a Config +func WithIncludeQueryParametersInTraces(includeQueryParametersInTraces bool) ConfigOption { + return func(c *Config) { + c.IncludeQueryParametersInTraces = includeQueryParametersInTraces + } +} + // WithReadReplicaConnPool returns an option that can set ReadReplicaConnPool on a Config func WithReadReplicaConnPool(readReplicaConnPool ConnPoolConfig) ConfigOption { return func(c *Config) { diff --git a/pkg/cmd/migrate.go b/pkg/cmd/migrate.go index 2880f79783..b586cc5047 100644 --- a/pkg/cmd/migrate.go +++ b/pkg/cmd/migrate.go @@ -75,7 +75,7 @@ func migrateRun(cmd *cobra.Command, args []string) error { } } - migrationDriver, err := migrations.NewAlembicPostgresDriver(cmd.Context(), dbURL, credentialsProvider) + migrationDriver, err := migrations.NewAlembicPostgresDriver(cmd.Context(), dbURL, credentialsProvider, false) if err != nil { return fmt.Errorf("unable to create migration driver for %s: %w", datastoreEngine, err) }