Skip to content

Commit

Permalink
Don't spam logs about unknown types being sent as text (#1291)
Browse files Browse the repository at this point in the history
Logs of this nature should only be generated once per connector

PostgresCDCSource & QRepQueryExecutor now extend PostgresConnector,
stops QRepQueryExecutor querying for customTypesMapping for snapshots

QValue equality shifted because we're now using connector with type mappings when retrieving records
  • Loading branch information
serprex authored Feb 14, 2024
1 parent cc61dfb commit 758727a
Show file tree
Hide file tree
Showing 22 changed files with 405 additions and 416 deletions.
5 changes: 1 addition & 4 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ type BigQueryServiceAccount struct {
ClientX509CertURL string `json:"client_x509_cert_url"`
}

// BigQueryConnector is a Connector implementation for BigQuery.
type BigQueryConnector struct {
bqConfig *protos.BigqueryConfig
client *bigquery.Client
Expand All @@ -59,7 +58,6 @@ type BigQueryConnector struct {
logger log.Logger
}

// Create BigQueryServiceAccount from BigqueryConfig
func NewBigQueryServiceAccount(bqConfig *protos.BigqueryConfig) (*BigQueryServiceAccount, error) {
var serviceAccount BigQueryServiceAccount
serviceAccount.Type = bqConfig.AuthType
Expand Down Expand Up @@ -178,7 +176,6 @@ func TableCheck(ctx context.Context, client *bigquery.Client, dataset string, pr
return nil
}

// NewBigQueryConnector creates a new BigQueryConnector from a PeerConnectionConfig.
func NewBigQueryConnector(ctx context.Context, config *protos.BigqueryConfig) (*BigQueryConnector, error) {
logger := logger.LoggerFromCtx(ctx)

Expand Down Expand Up @@ -246,7 +243,7 @@ func (c *BigQueryConnector) Close(_ context.Context) error {
return c.client.Close()
}

// ConnectionActive returns true if the connection is active.
// ConnectionActive returns nil if the connection is active.
func (c *BigQueryConnector) ConnectionActive(ctx context.Context) error {
_, err := c.client.DatasetInProject(c.projectID, c.datasetID).Metadata(ctx)
if err != nil {
Expand Down
20 changes: 8 additions & 12 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import (
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
)

type PostgresCDCSource struct {
*PostgresConnector
replConn *pgx.Conn
SrcTableIDNameMapping map[uint32]string
TableNameMapping map[string]model.NameAndExclude
Expand All @@ -35,11 +35,9 @@ type PostgresCDCSource struct {
relationMessageMapping model.RelationMessageMapping
typeMap *pgtype.Map
commitLock bool
customTypeMapping map[uint32]string

// for partitioned tables, maps child relid to parent relid
childToParentRelIDMapping map[uint32]uint32
logger slog.Logger

// for storing chema delta audit logs to catalog
catalogPool *pgxpool.Pool
Expand All @@ -64,14 +62,14 @@ type startReplicationOpts struct {
}

// Create a new PostgresCDCSource
func NewPostgresCDCSource(ctx context.Context, cdcConfig *PostgresCDCConfig, customTypeMap map[uint32]string) (*PostgresCDCSource, error) {
func (c *PostgresConnector) NewPostgresCDCSource(ctx context.Context, cdcConfig *PostgresCDCConfig) (*PostgresCDCSource, error) {
childToParentRelIDMap, err := getChildToParentRelIDMap(ctx, cdcConfig.Connection)
if err != nil {
return nil, fmt.Errorf("error getting child to parent relid map: %w", err)
}

flowName, _ := ctx.Value(shared.FlowNameKey).(string)
return &PostgresCDCSource{
PostgresConnector: c,
replConn: cdcConfig.Connection,
SrcTableIDNameMapping: cdcConfig.SrcTableIDNameMapping,
TableNameMapping: cdcConfig.TableNameMapping,
Expand All @@ -81,8 +79,6 @@ func NewPostgresCDCSource(ctx context.Context, cdcConfig *PostgresCDCConfig, cus
typeMap: pgtype.NewMap(),
childToParentRelIDMapping: childToParentRelIDMap,
commitLock: false,
customTypeMapping: customTypeMap,
logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)),
catalogPool: cdcConfig.CatalogPool,
flowJobName: cdcConfig.FlowJobName,
}, nil
Expand Down Expand Up @@ -731,20 +727,20 @@ func (p *PostgresCDCSource) decodeColumnData(data []byte, dataType uint32, forma
}
return qvalue.QValue{}, err
}
retVal, err := parseFieldFromPostgresOID(dataType, parsedData)
retVal, err := p.parseFieldFromPostgresOID(dataType, parsedData)
if err != nil {
return qvalue.QValue{}, err
}
return retVal, nil
} else if dataType == uint32(oid.T_timetz) { // ugly TIMETZ workaround for CDC decoding.
retVal, err := parseFieldFromPostgresOID(dataType, string(data))
retVal, err := p.parseFieldFromPostgresOID(dataType, string(data))
if err != nil {
return qvalue.QValue{}, err
}
return retVal, nil
}

typeName, ok := p.customTypeMapping[dataType]
typeName, ok := p.customTypesMapping[dataType]
if ok {
customQKind := customTypeToQKind(typeName)
if customQKind == qvalue.QValueKindGeography || customQKind == qvalue.QValueKindGeometry {
Expand Down Expand Up @@ -835,9 +831,9 @@ func (p *PostgresCDCSource) processRelationMessage(
for _, column := range currRel.Columns {
// not present in previous relation message, but in current one, so added.
if prevRelMap[column.Name] == nil {
qKind := postgresOIDToQValueKind(column.DataType)
qKind := p.postgresOIDToQValueKind(column.DataType)
if qKind == qvalue.QValueKindInvalid {
typeName, ok := p.customTypeMapping[column.DataType]
typeName, ok := p.customTypesMapping[column.DataType]
if ok {
qKind = customTypeToQKind(typeName)
}
Expand Down
16 changes: 10 additions & 6 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/PeerDB-io/peer-flow/shared/alerting"
)

// PostgresConnector is a Connector implementation for Postgres.
type PostgresConnector struct {
connStr string
config *protos.PostgresConfig
Expand All @@ -33,10 +32,10 @@ type PostgresConnector struct {
replConfig *pgx.ConnConfig
customTypesMapping map[uint32]string
metadataSchema string
hushWarnOID map[uint32]struct{}
logger log.Logger
}

// NewPostgresConnector creates a new instance of PostgresConnector.
func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) (*PostgresConnector, error) {
connectionString := utils.GetPGConnectionString(pgConfig)

Expand Down Expand Up @@ -84,6 +83,7 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig)
replConfig: replConfig,
customTypesMapping: customTypeMap,
metadataSchema: metadataSchema,
hushWarnOID: make(map[uint32]struct{}),
logger: logger.LoggerFromCtx(ctx),
}, nil
}
Expand All @@ -107,7 +107,11 @@ func (c *PostgresConnector) Close(ctx context.Context) error {
return nil
}

// ConnectionActive returns true if the connection is active.
func (c *PostgresConnector) Conn() *pgx.Conn {
return c.conn
}

// ConnectionActive returns nil if the connection is active.
func (c *PostgresConnector) ConnectionActive(ctx context.Context) error {
if c.conn == nil {
return fmt.Errorf("connection is nil")
Expand Down Expand Up @@ -214,7 +218,7 @@ func (c *PostgresConnector) PullRecords(ctx context.Context, catalogPool *pgxpoo
}
defer replConn.Close(ctx)

cdc, err := NewPostgresCDCSource(ctx, &PostgresCDCConfig{
cdc, err := c.NewPostgresCDCSource(ctx, &PostgresCDCConfig{
Connection: replConn,
SrcTableIDNameMapping: req.SrcTableIDNameMapping,
Slot: slotName,
Expand All @@ -223,7 +227,7 @@ func (c *PostgresConnector) PullRecords(ctx context.Context, catalogPool *pgxpoo
RelationMessageMapping: req.RelationMessageMapping,
CatalogPool: catalogPool,
FlowJobName: req.FlowJobName,
}, c.customTypesMapping)
})
if err != nil {
return fmt.Errorf("failed to create cdc source: %w", err)
}
Expand Down Expand Up @@ -603,7 +607,7 @@ func (c *PostgresConnector) getTableSchemaForTable(
columnNames := make([]string, 0, len(fields))
columns := make([]*protos.FieldDescription, 0, len(fields))
for _, fieldDescription := range fields {
genericColType := postgresOIDToQValueKind(fieldDescription.DataTypeOID)
genericColType := c.postgresOIDToQValueKind(fieldDescription.DataTypeOID)
if genericColType == qvalue.QValueKindInvalid {
typeName, ok := c.customTypesMapping[fieldDescription.DataTypeOID]
if ok {
Expand Down
33 changes: 7 additions & 26 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,12 +324,8 @@ func (c *PostgresConnector) PullQRepRecords(
partitionIdLog := slog.String(string(shared.PartitionIDKey), partition.PartitionId)
if partition.FullTablePartition {
c.logger.Info("pulling full table partition", partitionIdLog)
executor, err := NewQRepQueryExecutorSnapshot(ctx,
c.conn, c.config.TransactionSnapshot,
executor := c.NewQRepQueryExecutorSnapshot(c.config.TransactionSnapshot,
config.FlowJobName, partition.PartitionId)
if err != nil {
return nil, err
}
query := config.Query
return executor.ExecuteAndProcessQuery(ctx, query)
}
Expand Down Expand Up @@ -368,12 +364,8 @@ func (c *PostgresConnector) PullQRepRecords(
return nil, err
}

executor, err := NewQRepQueryExecutorSnapshot(
ctx, c.conn, c.config.TransactionSnapshot,
executor := c.NewQRepQueryExecutorSnapshot(c.config.TransactionSnapshot,
config.FlowJobName, partition.PartitionId)
if err != nil {
return nil, err
}

records, err := executor.ExecuteAndProcessQuery(ctx, query, rangeStart, rangeEnd)
if err != nil {
Expand All @@ -392,15 +384,11 @@ func (c *PostgresConnector) PullQRepRecordStream(
partitionIdLog := slog.String(string(shared.PartitionIDKey), partition.PartitionId)
if partition.FullTablePartition {
c.logger.Info("pulling full table partition", partitionIdLog)
executor, err := NewQRepQueryExecutorSnapshot(
ctx, c.conn, c.config.TransactionSnapshot,
executor := c.NewQRepQueryExecutorSnapshot(c.config.TransactionSnapshot,
config.FlowJobName, partition.PartitionId)
if err != nil {
return 0, err
}

query := config.Query
_, err = executor.ExecuteAndProcessQueryStream(ctx, stream, query)
_, err := executor.ExecuteAndProcessQueryStream(ctx, stream, query)
return 0, err
}
c.logger.Info("Obtained ranges for partition for PullQRepStream", partitionIdLog)
Expand Down Expand Up @@ -438,12 +426,8 @@ func (c *PostgresConnector) PullQRepRecordStream(
return 0, err
}

executor, err := NewQRepQueryExecutorSnapshot(
ctx, c.conn, c.config.TransactionSnapshot,
executor := c.NewQRepQueryExecutorSnapshot(c.config.TransactionSnapshot,
config.FlowJobName, partition.PartitionId)
if err != nil {
return 0, err
}

numRecords, err := executor.ExecuteAndProcessQueryStream(ctx, stream, query, rangeStart, rangeEnd)
if err != nil {
Expand Down Expand Up @@ -539,13 +523,10 @@ func (c *PostgresConnector) PullXminRecordStream(
query += " WHERE age(xmin) > 0 AND age(xmin) <= age($1::xid)"
}

executor, err := NewQRepQueryExecutorSnapshot(
ctx, c.conn, c.config.TransactionSnapshot,
executor := c.NewQRepQueryExecutorSnapshot(c.config.TransactionSnapshot,
config.FlowJobName, partition.PartitionId)
if err != nil {
return 0, currentSnapshotXmin, err
}

var err error
var numRecords int
if partition.Range != nil {
numRecords, currentSnapshotXmin, err = executor.ExecuteAndProcessQueryStreamGettingCurrentSnapshotXmin(
Expand Down
18 changes: 11 additions & 7 deletions flow/connectors/postgres/qrep_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,28 @@ import (
"context"
"testing"

"github.com/jackc/pgx/v5"
"github.com/PeerDB-io/peer-flow/generated/protos"
)

func BenchmarkQRepQueryExecutor(b *testing.B) {
connectionString := "postgres://postgres:postgres@localhost:7132/postgres"
query := "SELECT * FROM bench.large_table"

ctx := context.Background()

// Create a separate connection for non-replication queries
conn, err := pgx.Connect(ctx, connectionString)
connector, err := NewPostgresConnector(ctx,
&protos.PostgresConfig{
Host: "localhost",
Port: 7132,
User: "postgres",
Password: "postgres",
Database: "postgres",
})
if err != nil {
b.Fatalf("failed to create connection: %v", err)
}
defer conn.Close(context.Background())
defer connector.Close(ctx)

// Create a new QRepQueryExecutor instance
qe := NewQRepQueryExecutor(conn, context.Background(), "test flow", "test part")
qe := connector.NewQRepQueryExecutor("test flow", "test part")

// Run the benchmark
b.ResetTimer()
Expand Down
Loading

0 comments on commit 758727a

Please sign in to comment.