Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't spam logs about unknown types being sent as text #1291

Merged
merged 13 commits into from
Feb 14, 2024
Merged
5 changes: 1 addition & 4 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ type BigQueryServiceAccount struct {
ClientX509CertURL string `json:"client_x509_cert_url"`
}

// BigQueryConnector is a Connector implementation for BigQuery.
type BigQueryConnector struct {
bqConfig *protos.BigqueryConfig
client *bigquery.Client
Expand All @@ -59,7 +58,6 @@ type BigQueryConnector struct {
logger log.Logger
}

// Create BigQueryServiceAccount from BigqueryConfig
func NewBigQueryServiceAccount(bqConfig *protos.BigqueryConfig) (*BigQueryServiceAccount, error) {
var serviceAccount BigQueryServiceAccount
serviceAccount.Type = bqConfig.AuthType
Expand Down Expand Up @@ -178,7 +176,6 @@ func TableCheck(ctx context.Context, client *bigquery.Client, dataset string, pr
return nil
}

// NewBigQueryConnector creates a new BigQueryConnector from a PeerConnectionConfig.
func NewBigQueryConnector(ctx context.Context, config *protos.BigqueryConfig) (*BigQueryConnector, error) {
logger := logger.LoggerFromCtx(ctx)

Expand Down Expand Up @@ -246,7 +243,7 @@ func (c *BigQueryConnector) Close(_ context.Context) error {
return c.client.Close()
}

// ConnectionActive returns true if the connection is active.
// ConnectionActive returns nil if the connection is active.
func (c *BigQueryConnector) ConnectionActive(ctx context.Context) error {
_, err := c.client.DatasetInProject(c.projectID, c.datasetID).Metadata(ctx)
if err != nil {
Expand Down
20 changes: 8 additions & 12 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import (
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
)

type PostgresCDCSource struct {
*PostgresConnector
replConn *pgx.Conn
SrcTableIDNameMapping map[uint32]string
TableNameMapping map[string]model.NameAndExclude
Expand All @@ -35,11 +35,9 @@ type PostgresCDCSource struct {
relationMessageMapping model.RelationMessageMapping
typeMap *pgtype.Map
commitLock bool
customTypeMapping map[uint32]string

// for partitioned tables, maps child relid to parent relid
childToParentRelIDMapping map[uint32]uint32
logger slog.Logger

// for storing chema delta audit logs to catalog
catalogPool *pgxpool.Pool
Expand All @@ -64,14 +62,14 @@ type startReplicationOpts struct {
}

// Create a new PostgresCDCSource
func NewPostgresCDCSource(ctx context.Context, cdcConfig *PostgresCDCConfig, customTypeMap map[uint32]string) (*PostgresCDCSource, error) {
func (c *PostgresConnector) NewPostgresCDCSource(ctx context.Context, cdcConfig *PostgresCDCConfig) (*PostgresCDCSource, error) {
childToParentRelIDMap, err := getChildToParentRelIDMap(ctx, cdcConfig.Connection)
if err != nil {
return nil, fmt.Errorf("error getting child to parent relid map: %w", err)
}

flowName, _ := ctx.Value(shared.FlowNameKey).(string)
return &PostgresCDCSource{
PostgresConnector: c,
replConn: cdcConfig.Connection,
SrcTableIDNameMapping: cdcConfig.SrcTableIDNameMapping,
TableNameMapping: cdcConfig.TableNameMapping,
Expand All @@ -81,8 +79,6 @@ func NewPostgresCDCSource(ctx context.Context, cdcConfig *PostgresCDCConfig, cus
typeMap: pgtype.NewMap(),
childToParentRelIDMapping: childToParentRelIDMap,
commitLock: false,
customTypeMapping: customTypeMap,
logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)),
catalogPool: cdcConfig.CatalogPool,
flowJobName: cdcConfig.FlowJobName,
}, nil
Expand Down Expand Up @@ -731,20 +727,20 @@ func (p *PostgresCDCSource) decodeColumnData(data []byte, dataType uint32, forma
}
return qvalue.QValue{}, err
}
retVal, err := parseFieldFromPostgresOID(dataType, parsedData)
retVal, err := p.parseFieldFromPostgresOID(dataType, parsedData)
if err != nil {
return qvalue.QValue{}, err
}
return retVal, nil
} else if dataType == uint32(oid.T_timetz) { // ugly TIMETZ workaround for CDC decoding.
retVal, err := parseFieldFromPostgresOID(dataType, string(data))
retVal, err := p.parseFieldFromPostgresOID(dataType, string(data))
if err != nil {
return qvalue.QValue{}, err
}
return retVal, nil
}

typeName, ok := p.customTypeMapping[dataType]
typeName, ok := p.customTypesMapping[dataType]
if ok {
customQKind := customTypeToQKind(typeName)
if customQKind == qvalue.QValueKindGeography || customQKind == qvalue.QValueKindGeometry {
Expand Down Expand Up @@ -835,9 +831,9 @@ func (p *PostgresCDCSource) processRelationMessage(
for _, column := range currRel.Columns {
// not present in previous relation message, but in current one, so added.
if prevRelMap[column.Name] == nil {
qKind := postgresOIDToQValueKind(column.DataType)
qKind := p.postgresOIDToQValueKind(column.DataType)
if qKind == qvalue.QValueKindInvalid {
typeName, ok := p.customTypeMapping[column.DataType]
typeName, ok := p.customTypesMapping[column.DataType]
if ok {
qKind = customTypeToQKind(typeName)
}
Expand Down
16 changes: 10 additions & 6 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/PeerDB-io/peer-flow/shared/alerting"
)

// PostgresConnector is a Connector implementation for Postgres.
type PostgresConnector struct {
connStr string
config *protos.PostgresConfig
Expand All @@ -33,10 +32,10 @@ type PostgresConnector struct {
replConfig *pgx.ConnConfig
customTypesMapping map[uint32]string
metadataSchema string
hushWarnOID map[uint32]struct{}
logger log.Logger
}

// NewPostgresConnector creates a new instance of PostgresConnector.
func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) (*PostgresConnector, error) {
connectionString := utils.GetPGConnectionString(pgConfig)

Expand Down Expand Up @@ -84,6 +83,7 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig)
replConfig: replConfig,
customTypesMapping: customTypeMap,
metadataSchema: metadataSchema,
hushWarnOID: make(map[uint32]struct{}),
logger: logger.LoggerFromCtx(ctx),
}, nil
}
Expand All @@ -107,7 +107,11 @@ func (c *PostgresConnector) Close(ctx context.Context) error {
return nil
}

// ConnectionActive returns true if the connection is active.
func (c *PostgresConnector) Conn() *pgx.Conn {
return c.conn
}

// ConnectionActive returns nil if the connection is active.
func (c *PostgresConnector) ConnectionActive(ctx context.Context) error {
if c.conn == nil {
return fmt.Errorf("connection is nil")
Expand Down Expand Up @@ -214,7 +218,7 @@ func (c *PostgresConnector) PullRecords(ctx context.Context, catalogPool *pgxpoo
}
defer replConn.Close(ctx)

cdc, err := NewPostgresCDCSource(ctx, &PostgresCDCConfig{
cdc, err := c.NewPostgresCDCSource(ctx, &PostgresCDCConfig{
Connection: replConn,
SrcTableIDNameMapping: req.SrcTableIDNameMapping,
Slot: slotName,
Expand All @@ -223,7 +227,7 @@ func (c *PostgresConnector) PullRecords(ctx context.Context, catalogPool *pgxpoo
RelationMessageMapping: req.RelationMessageMapping,
CatalogPool: catalogPool,
FlowJobName: req.FlowJobName,
}, c.customTypesMapping)
})
if err != nil {
return fmt.Errorf("failed to create cdc source: %w", err)
}
Expand Down Expand Up @@ -603,7 +607,7 @@ func (c *PostgresConnector) getTableSchemaForTable(
columnNames := make([]string, 0, len(fields))
columns := make([]*protos.FieldDescription, 0, len(fields))
for _, fieldDescription := range fields {
genericColType := postgresOIDToQValueKind(fieldDescription.DataTypeOID)
genericColType := c.postgresOIDToQValueKind(fieldDescription.DataTypeOID)
if genericColType == qvalue.QValueKindInvalid {
typeName, ok := c.customTypesMapping[fieldDescription.DataTypeOID]
if ok {
Expand Down
33 changes: 7 additions & 26 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,12 +324,8 @@ func (c *PostgresConnector) PullQRepRecords(
partitionIdLog := slog.String(string(shared.PartitionIDKey), partition.PartitionId)
if partition.FullTablePartition {
c.logger.Info("pulling full table partition", partitionIdLog)
executor, err := NewQRepQueryExecutorSnapshot(ctx,
c.conn, c.config.TransactionSnapshot,
executor := c.NewQRepQueryExecutorSnapshot(c.config.TransactionSnapshot,
config.FlowJobName, partition.PartitionId)
if err != nil {
return nil, err
}
query := config.Query
return executor.ExecuteAndProcessQuery(ctx, query)
}
Expand Down Expand Up @@ -368,12 +364,8 @@ func (c *PostgresConnector) PullQRepRecords(
return nil, err
}

executor, err := NewQRepQueryExecutorSnapshot(
ctx, c.conn, c.config.TransactionSnapshot,
executor := c.NewQRepQueryExecutorSnapshot(c.config.TransactionSnapshot,
config.FlowJobName, partition.PartitionId)
if err != nil {
return nil, err
}

records, err := executor.ExecuteAndProcessQuery(ctx, query, rangeStart, rangeEnd)
if err != nil {
Expand All @@ -392,15 +384,11 @@ func (c *PostgresConnector) PullQRepRecordStream(
partitionIdLog := slog.String(string(shared.PartitionIDKey), partition.PartitionId)
if partition.FullTablePartition {
c.logger.Info("pulling full table partition", partitionIdLog)
executor, err := NewQRepQueryExecutorSnapshot(
ctx, c.conn, c.config.TransactionSnapshot,
executor := c.NewQRepQueryExecutorSnapshot(c.config.TransactionSnapshot,
config.FlowJobName, partition.PartitionId)
if err != nil {
return 0, err
}

query := config.Query
_, err = executor.ExecuteAndProcessQueryStream(ctx, stream, query)
_, err := executor.ExecuteAndProcessQueryStream(ctx, stream, query)
return 0, err
}
c.logger.Info("Obtained ranges for partition for PullQRepStream", partitionIdLog)
Expand Down Expand Up @@ -438,12 +426,8 @@ func (c *PostgresConnector) PullQRepRecordStream(
return 0, err
}

executor, err := NewQRepQueryExecutorSnapshot(
ctx, c.conn, c.config.TransactionSnapshot,
executor := c.NewQRepQueryExecutorSnapshot(c.config.TransactionSnapshot,
config.FlowJobName, partition.PartitionId)
if err != nil {
return 0, err
}

numRecords, err := executor.ExecuteAndProcessQueryStream(ctx, stream, query, rangeStart, rangeEnd)
if err != nil {
Expand Down Expand Up @@ -539,13 +523,10 @@ func (c *PostgresConnector) PullXminRecordStream(
query += " WHERE age(xmin) > 0 AND age(xmin) <= age($1::xid)"
}

executor, err := NewQRepQueryExecutorSnapshot(
ctx, c.conn, c.config.TransactionSnapshot,
executor := c.NewQRepQueryExecutorSnapshot(c.config.TransactionSnapshot,
config.FlowJobName, partition.PartitionId)
if err != nil {
return 0, currentSnapshotXmin, err
}

var err error
var numRecords int
if partition.Range != nil {
numRecords, currentSnapshotXmin, err = executor.ExecuteAndProcessQueryStreamGettingCurrentSnapshotXmin(
Expand Down
18 changes: 11 additions & 7 deletions flow/connectors/postgres/qrep_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,28 @@ import (
"context"
"testing"

"github.com/jackc/pgx/v5"
"github.com/PeerDB-io/peer-flow/generated/protos"
)

func BenchmarkQRepQueryExecutor(b *testing.B) {
connectionString := "postgres://postgres:postgres@localhost:7132/postgres"
query := "SELECT * FROM bench.large_table"

ctx := context.Background()

// Create a separate connection for non-replication queries
conn, err := pgx.Connect(ctx, connectionString)
connector, err := NewPostgresConnector(ctx,
&protos.PostgresConfig{
Host: "localhost",
Port: 7132,
User: "postgres",
Password: "postgres",
Database: "postgres",
})
if err != nil {
b.Fatalf("failed to create connection: %v", err)
}
defer conn.Close(context.Background())
defer connector.Close(ctx)

// Create a new QRepQueryExecutor instance
qe := NewQRepQueryExecutor(conn, context.Background(), "test flow", "test part")
qe := connector.NewQRepQueryExecutor("test flow", "test part")

// Run the benchmark
b.ResetTimer()
Expand Down
Loading
Loading