Skip to content

Commit

Permalink
Merge pull request #2177 from josephschorr/pgx-query-params-in-traces
Browse files Browse the repository at this point in the history
Add option to enable query parameters to appear in traces
  • Loading branch information
tstirrat15 authored Dec 19, 2024
2 parents 8740a3c + e9f1533 commit 68a6ebc
Show file tree
Hide file tree
Showing 11 changed files with 130 additions and 89 deletions.
5 changes: 3 additions & 2 deletions internal/datastore/crdb/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,12 @@ func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datas
return nil, common.RedactAndLogSensitiveConnString(ctx, errUnableToInstantiate, err, url)
}

includeQueryParametersInTraces := config.includeQueryParametersInTraces
readPoolConfig, err := pgxpool.ParseConfig(url)
if err != nil {
return nil, common.RedactAndLogSensitiveConnString(ctx, errUnableToInstantiate, err, url)
}
err = config.readPoolOpts.ConfigurePgx(readPoolConfig)
err = config.readPoolOpts.ConfigurePgx(readPoolConfig, includeQueryParametersInTraces)
if err != nil {
return nil, common.RedactAndLogSensitiveConnString(ctx, errUnableToInstantiate, err, url)
}
Expand All @@ -116,7 +117,7 @@ func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datas
if err != nil {
return nil, common.RedactAndLogSensitiveConnString(ctx, errUnableToInstantiate, err, url)
}
err = config.writePoolOpts.ConfigurePgx(writePoolConfig)
err = config.writePoolOpts.ConfigurePgx(writePoolConfig, includeQueryParametersInTraces)
if err != nil {
return nil, common.RedactAndLogSensitiveConnString(ctx, errUnableToInstantiate, err, url)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/datastore/crdb/migrations/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewCRDBDriver(url string) (*CRDBDriver, error) {
return nil, fmt.Errorf(errUnableToInstantiate, err)
}
pgxcommon.ConfigurePGXLogger(connConfig)
pgxcommon.ConfigureOTELTracer(connConfig)
pgxcommon.ConfigureOTELTracer(connConfig, false)

db, err := pgx.ConnectConfig(context.Background(), connConfig)
if err != nil {
Expand Down
80 changes: 44 additions & 36 deletions internal/datastore/crdb/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,23 @@ type crdbOptions struct {
readPoolOpts, writePoolOpts pgxcommon.PoolOptions
connectRate time.Duration

watchBufferLength uint16
watchBufferWriteTimeout time.Duration
watchConnectTimeout time.Duration
revisionQuantization time.Duration
followerReadDelay time.Duration
maxRevisionStalenessPercent float64
gcWindow time.Duration
maxRetries uint8
overlapStrategy string
overlapKey string
enableConnectionBalancing bool
analyzeBeforeStatistics bool
filterMaximumIDCount uint16
enablePrometheusStats bool
withIntegrity bool
allowedMigrations []string
watchBufferLength uint16
watchBufferWriteTimeout time.Duration
watchConnectTimeout time.Duration
revisionQuantization time.Duration
followerReadDelay time.Duration
maxRevisionStalenessPercent float64
gcWindow time.Duration
maxRetries uint8
overlapStrategy string
overlapKey string
enableConnectionBalancing bool
analyzeBeforeStatistics bool
filterMaximumIDCount uint16
enablePrometheusStats bool
withIntegrity bool
includeQueryParametersInTraces bool
allowedMigrations []string
}

const (
Expand All @@ -50,11 +51,12 @@ const (
defaultOverlapKey = "defaultsynckey"
defaultOverlapStrategy = overlapStrategyStatic

defaultEnablePrometheusStats = false
defaultEnableConnectionBalancing = true
defaultConnectRate = 100 * time.Millisecond
defaultFilterMaximumIDCount = 100
defaultWithIntegrity = false
defaultEnablePrometheusStats = false
defaultEnableConnectionBalancing = true
defaultConnectRate = 100 * time.Millisecond
defaultFilterMaximumIDCount = 100
defaultWithIntegrity = false
defaultIncludeQueryParametersInTraces = false
)

// Option provides the facility to configure how clients within the CRDB
Expand All @@ -63,21 +65,22 @@ type Option func(*crdbOptions)

func generateConfig(options []Option) (crdbOptions, error) {
computed := crdbOptions{
gcWindow: 24 * time.Hour,
watchBufferLength: defaultWatchBufferLength,
watchBufferWriteTimeout: defaultWatchBufferWriteTimeout,
watchConnectTimeout: defaultWatchConnectTimeout,
revisionQuantization: defaultRevisionQuantization,
followerReadDelay: defaultFollowerReadDelay,
maxRevisionStalenessPercent: defaultMaxRevisionStalenessPercent,
maxRetries: defaultMaxRetries,
overlapKey: defaultOverlapKey,
overlapStrategy: defaultOverlapStrategy,
enablePrometheusStats: defaultEnablePrometheusStats,
enableConnectionBalancing: defaultEnableConnectionBalancing,
connectRate: defaultConnectRate,
filterMaximumIDCount: defaultFilterMaximumIDCount,
withIntegrity: defaultWithIntegrity,
gcWindow: 24 * time.Hour,
watchBufferLength: defaultWatchBufferLength,
watchBufferWriteTimeout: defaultWatchBufferWriteTimeout,
watchConnectTimeout: defaultWatchConnectTimeout,
revisionQuantization: defaultRevisionQuantization,
followerReadDelay: defaultFollowerReadDelay,
maxRevisionStalenessPercent: defaultMaxRevisionStalenessPercent,
maxRetries: defaultMaxRetries,
overlapKey: defaultOverlapKey,
overlapStrategy: defaultOverlapStrategy,
enablePrometheusStats: defaultEnablePrometheusStats,
enableConnectionBalancing: defaultEnableConnectionBalancing,
connectRate: defaultConnectRate,
filterMaximumIDCount: defaultFilterMaximumIDCount,
withIntegrity: defaultWithIntegrity,
includeQueryParametersInTraces: defaultIncludeQueryParametersInTraces,
}

for _, option := range options {
Expand Down Expand Up @@ -345,3 +348,8 @@ func WithIntegrity(withIntegrity bool) Option {
func AllowedMigrations(allowedMigrations []string) Option {
return func(po *crdbOptions) { po.allowedMigrations = allowedMigrations }
}

// IncludeQueryParametersInTraces marks whether query parameters should be included in traces.
func IncludeQueryParametersInTraces(includeQueryParametersInTraces bool) Option {
return func(po *crdbOptions) { po.includeQueryParametersInTraces = includeQueryParametersInTraces }
}
18 changes: 13 additions & 5 deletions internal/datastore/postgres/common/pgx.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func ParseConfigWithInstrumentation(url string) (*pgx.ConnConfig, error) {
}

ConfigurePGXLogger(connConfig)
ConfigureOTELTracer(connConfig)
ConfigureOTELTracer(connConfig, false)

return connConfig, nil
}
Expand Down Expand Up @@ -281,8 +281,16 @@ func IsSerializationError(err error) bool {
}

// ConfigureOTELTracer adds OTEL tracing to a pgx.ConnConfig
func ConfigureOTELTracer(connConfig *pgx.ConnConfig) {
addTracer(connConfig, otelpgx.NewTracer(otelpgx.WithTrimSQLInSpanName()))
func ConfigureOTELTracer(connConfig *pgx.ConnConfig, includeQueryParameters bool) {
options := []otelpgx.Option{
otelpgx.WithTrimSQLInSpanName(),
}

if includeQueryParameters {
options = append(options, otelpgx.WithIncludeQueryParameters())
}

addTracer(connConfig, otelpgx.NewTracer(options...))
}

func addTracer(connConfig *pgx.ConnConfig, tracer pgx.QueryTracer) {
Expand Down Expand Up @@ -343,7 +351,7 @@ type PoolOptions struct {
}

// ConfigurePgx applies PoolOptions to a pgx connection pool confiugration.
func (opts PoolOptions) ConfigurePgx(pgxConfig *pgxpool.Config) error {
func (opts PoolOptions) ConfigurePgx(pgxConfig *pgxpool.Config, includeQueryParametersInTraces bool) error {
if opts.MaxOpenConns != nil {
maxConns, err := safecast.ToInt32(*opts.MaxOpenConns)
if err != nil {
Expand Down Expand Up @@ -385,7 +393,7 @@ func (opts PoolOptions) ConfigurePgx(pgxConfig *pgxpool.Config) error {
}

ConfigurePGXLogger(pgxConfig.ConnConfig)
ConfigureOTELTracer(pgxConfig.ConnConfig)
ConfigureOTELTracer(pgxConfig.ConnConfig, includeQueryParametersInTraces)
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions internal/datastore/postgres/migrations/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type AlembicPostgresDriver struct {
}

// NewAlembicPostgresDriver creates a new driver with active connections to the database specified.
func NewAlembicPostgresDriver(ctx context.Context, url string, credentialsProvider datastore.CredentialsProvider) (*AlembicPostgresDriver, error) {
func NewAlembicPostgresDriver(ctx context.Context, url string, credentialsProvider datastore.CredentialsProvider, includeQueryParametersInTraces bool) (*AlembicPostgresDriver, error) {
ctx, span := tracer.Start(ctx, "NewAlembicPostgresDriver")
defer span.End()

Expand All @@ -38,7 +38,7 @@ func NewAlembicPostgresDriver(ctx context.Context, url string, credentialsProvid
return nil, err
}
pgxcommon.ConfigurePGXLogger(connConfig)
pgxcommon.ConfigureOTELTracer(connConfig)
pgxcommon.ConfigureOTELTracer(connConfig, includeQueryParametersInTraces)

if credentialsProvider != nil {
log.Ctx(ctx).Debug().Str("name", credentialsProvider.Name()).Msg("using credentials provider")
Expand Down
44 changes: 26 additions & 18 deletions internal/datastore/postgres/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ type postgresOptions struct {
maxRetries uint8
filterMaximumIDCount uint16

enablePrometheusStats bool
analyzeBeforeStatistics bool
gcEnabled bool
readStrictMode bool
enablePrometheusStats bool
analyzeBeforeStatistics bool
gcEnabled bool
readStrictMode bool
includeQueryParametersInTraces bool

migrationPhase string
allowedMigrations []string
Expand Down Expand Up @@ -67,6 +68,7 @@ const (
defaultCredentialsProviderName = ""
defaultReadStrictMode = false
defaultFilterMaximumIDCount = 100
defaultIncludeQueryParametersInTraces = false
)

// Option provides the facility to configure how clients within the
Expand All @@ -75,20 +77,21 @@ type Option func(*postgresOptions)

func generateConfig(options []Option) (postgresOptions, error) {
computed := postgresOptions{
gcWindow: defaultGarbageCollectionWindow,
gcInterval: defaultGarbageCollectionInterval,
gcMaxOperationTime: defaultGarbageCollectionMaxOperationTime,
watchBufferLength: defaultWatchBufferLength,
watchBufferWriteTimeout: defaultWatchBufferWriteTimeout,
revisionQuantization: defaultQuantization,
maxRevisionStalenessPercent: defaultMaxRevisionStalenessPercent,
enablePrometheusStats: defaultEnablePrometheusStats,
maxRetries: defaultMaxRetries,
gcEnabled: defaultGCEnabled,
credentialsProviderName: defaultCredentialsProviderName,
readStrictMode: defaultReadStrictMode,
queryInterceptor: nil,
filterMaximumIDCount: defaultFilterMaximumIDCount,
gcWindow: defaultGarbageCollectionWindow,
gcInterval: defaultGarbageCollectionInterval,
gcMaxOperationTime: defaultGarbageCollectionMaxOperationTime,
watchBufferLength: defaultWatchBufferLength,
watchBufferWriteTimeout: defaultWatchBufferWriteTimeout,
revisionQuantization: defaultQuantization,
maxRevisionStalenessPercent: defaultMaxRevisionStalenessPercent,
enablePrometheusStats: defaultEnablePrometheusStats,
maxRetries: defaultMaxRetries,
gcEnabled: defaultGCEnabled,
credentialsProviderName: defaultCredentialsProviderName,
readStrictMode: defaultReadStrictMode,
queryInterceptor: nil,
filterMaximumIDCount: defaultFilterMaximumIDCount,
includeQueryParametersInTraces: defaultIncludeQueryParametersInTraces,
}

for _, option := range options {
Expand Down Expand Up @@ -377,3 +380,8 @@ func CredentialsProviderName(credentialsProviderName string) Option {
func FilterMaximumIDCount(filterMaximumIDCount uint16) Option {
return func(po *postgresOptions) { po.filterMaximumIDCount = filterMaximumIDCount }
}

// IncludeQueryParametersInTraces is a flag to set whether to include query parameters in OTEL traces
func IncludeQueryParametersInTraces(includeQueryParametersInTraces bool) Option {
return func(po *postgresOptions) { po.includeQueryParametersInTraces = includeQueryParametersInTraces }
}
38 changes: 20 additions & 18 deletions internal/datastore/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ func newPostgresDatastore(

// Setup the config for each of the read and write pools.
readPoolConfig := pgConfig.Copy()
err = config.readPoolOpts.ConfigurePgx(readPoolConfig)
includeQueryParametersInTraces := config.includeQueryParametersInTraces
err = config.readPoolOpts.ConfigurePgx(readPoolConfig, includeQueryParametersInTraces)
if err != nil {
return nil, common.RedactAndLogSensitiveConnString(ctx, errUnableToInstantiate, err, pgURL)
}
Expand All @@ -195,7 +196,7 @@ func newPostgresDatastore(
var writePoolConfig *pgxpool.Config
if isPrimary {
writePoolConfig = pgConfig.Copy()
err = config.writePoolOpts.ConfigurePgx(writePoolConfig)
err = config.writePoolOpts.ConfigurePgx(writePoolConfig, includeQueryParametersInTraces)
if err != nil {
return nil, common.RedactAndLogSensitiveConnString(ctx, errUnableToInstantiate, err, pgURL)
}
Expand Down Expand Up @@ -377,21 +378,22 @@ type pgDatastore struct {
*revisions.CachedOptimizedRevisions
*common.MigrationValidator

dburl string
readPool, writePool pgxcommon.ConnPooler
watchBufferLength uint16
watchBufferWriteTimeout time.Duration
optimizedRevisionQuery string
validTransactionQuery string
gcWindow time.Duration
gcInterval time.Duration
gcTimeout time.Duration
analyzeBeforeStatistics bool
readTxOptions pgx.TxOptions
maxRetries uint8
watchEnabled bool
isPrimary bool
inStrictReadMode bool
dburl string
readPool, writePool pgxcommon.ConnPooler
watchBufferLength uint16
watchBufferWriteTimeout time.Duration
optimizedRevisionQuery string
validTransactionQuery string
gcWindow time.Duration
gcInterval time.Duration
gcTimeout time.Duration
analyzeBeforeStatistics bool
readTxOptions pgx.TxOptions
maxRetries uint8
watchEnabled bool
isPrimary bool
inStrictReadMode bool
includeQueryParametersInTraces bool

credentialsProvider datastore.CredentialsProvider

Expand Down Expand Up @@ -652,7 +654,7 @@ func errorRetryable(err error) bool {
}

func (pgd *pgDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, error) {
pgDriver, err := migrations.NewAlembicPostgresDriver(ctx, pgd.dburl, pgd.credentialsProvider)
pgDriver, err := migrations.NewAlembicPostgresDriver(ctx, pgd.dburl, pgd.credentialsProvider, pgd.includeQueryParametersInTraces)
if err != nil {
return datastore.ReadyState{}, err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/testserver/datastore/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (b *postgresTester) NewDatastore(t testing.TB, initFunc InitFunc) datastore
for i := 1; i <= retryCount; i++ {
connectStr := b.NewDatabase(t)

migrationDriver, err := pgmigrations.NewAlembicPostgresDriver(context.Background(), connectStr, datastore.NoCredentialsProvider)
migrationDriver, err := pgmigrations.NewAlembicPostgresDriver(context.Background(), connectStr, datastore.NoCredentialsProvider, false)
if err == nil {
ctx := context.WithValue(context.Background(), migrate.BackfillBatchSize, uint64(1000))
require.NoError(t, pgmigrations.DatabaseMigrations.Run(ctx, migrationDriver, b.targetMigration, migrate.LiveRun))
Expand Down
15 changes: 10 additions & 5 deletions pkg/cmd/datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,12 @@ type Config struct {
FilterMaximumIDCount uint16 `debugmap:"hidden" default:"100"`

// Options
ReadConnPool ConnPoolConfig `debugmap:"visible"`
WriteConnPool ConnPoolConfig `debugmap:"visible"`
ReadOnly bool `debugmap:"visible"`
EnableDatastoreMetrics bool `debugmap:"visible"`
DisableStats bool `debugmap:"visible"`
ReadConnPool ConnPoolConfig `debugmap:"visible"`
WriteConnPool ConnPoolConfig `debugmap:"visible"`
ReadOnly bool `debugmap:"visible"`
EnableDatastoreMetrics bool `debugmap:"visible"`
DisableStats bool `debugmap:"visible"`
IncludeQueryParametersInTraces bool `debugmap:"visible"`

// Read Replicas
ReadReplicaConnPool ConnPoolConfig `debugmap:"visible"`
Expand Down Expand Up @@ -247,6 +248,7 @@ func RegisterDatastoreFlagsWithPrefix(flagSet *pflag.FlagSet, prefix string, opt
flagSet.Uint16Var(&opts.WatchBufferLength, flagName("datastore-watch-buffer-length"), 1024, "how large the watch buffer should be before blocking")
flagSet.DurationVar(&opts.WatchBufferWriteTimeout, flagName("datastore-watch-buffer-write-timeout"), 1*time.Second, "how long the watch buffer should queue before forcefully disconnecting the reader")
flagSet.DurationVar(&opts.WatchConnectTimeout, flagName("datastore-watch-connect-timeout"), 1*time.Second, "how long the watch connection should wait before timing out (cockroachdb driver only)")
flagSet.BoolVar(&opts.IncludeQueryParametersInTraces, flagName("datastore-include-query-parameters-in-traces"), false, "include query parameters in traces (postgres and CRDB drivers only)")

flagSet.BoolVar(&opts.RelationshipIntegrityEnabled, flagName("datastore-relationship-integrity-enabled"), false, "enables relationship integrity checks. only supported on CRDB")
flagSet.StringVar(&opts.RelationshipIntegrityCurrentKey.KeyID, flagName("datastore-relationship-integrity-current-key-id"), "", "current key id for relationship integrity checks")
Expand Down Expand Up @@ -315,6 +317,7 @@ func DefaultDatastoreConfig() *Config {
RelationshipIntegrityCurrentKey: RelIntegrityKey{},
RelationshipIntegrityExpiredKeys: []string{},
AllowedMigrations: []string{},
IncludeQueryParametersInTraces: false,
}
}

Expand Down Expand Up @@ -509,6 +512,7 @@ func newCRDBDatastore(ctx context.Context, opts Config) (datastore.Datastore, er
crdb.FilterMaximumIDCount(opts.FilterMaximumIDCount),
crdb.WithIntegrity(opts.RelationshipIntegrityEnabled),
crdb.AllowedMigrations(opts.AllowedMigrations),
crdb.IncludeQueryParametersInTraces(opts.IncludeQueryParametersInTraces),
)
}

Expand Down Expand Up @@ -549,6 +553,7 @@ func commonPostgresDatastoreOptions(opts Config) ([]postgres.Option, error) {
postgres.WithEnablePrometheusStats(opts.EnableDatastoreMetrics),
postgres.MaxRetries(maxRetries),
postgres.FilterMaximumIDCount(opts.FilterMaximumIDCount),
postgres.IncludeQueryParametersInTraces(opts.IncludeQueryParametersInTraces),
}, nil
}

Expand Down
Loading

0 comments on commit 68a6ebc

Please sign in to comment.