Skip to content

Commit

Permalink
[postgres] reduce unnecessary queries (#2214)
Browse files Browse the repository at this point in the history
closes #2210
  • Loading branch information
heavycrystal authored Nov 13, 2024
1 parent 0fbd7f7 commit 713f10b
Show file tree
Hide file tree
Showing 10 changed files with 189 additions and 154 deletions.
62 changes: 39 additions & 23 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ func (c *PostgresConnector) NewPostgresCDCSource(cdcConfig *PostgresCDCConfig) *
relationMessageMapping: cdcConfig.RelationMessageMapping,
slot: cdcConfig.Slot,
publication: cdcConfig.Publication,
childToParentRelIDMapping: cdcConfig.ChildToParentRelIDMap,
typeMap: pgtype.NewMap(),
commitLock: nil,
childToParentRelIDMapping: cdcConfig.ChildToParentRelIDMap,
catalogPool: cdcConfig.CatalogPool,
flowJobName: cdcConfig.FlowJobName,
hushWarnUnhandledMessageType: make(map[pglogrepl.MessageType]struct{}),
Expand All @@ -85,21 +85,18 @@ func GetChildToParentRelIDMap(ctx context.Context, conn *pgx.Conn) (map[uint32]u
WHERE parent.relkind='p';
`

rows, err := conn.Query(ctx, query, pgx.QueryExecModeSimpleProtocol)
rows, err := conn.Query(ctx, query)
if err != nil {
return nil, fmt.Errorf("error querying for child to parent relid map: %w", err)
}
defer rows.Close()

childToParentRelIDMap := make(map[uint32]uint32)
var parentRelID pgtype.Uint32
var childRelID pgtype.Uint32
for rows.Next() {
err := rows.Scan(&parentRelID, &childRelID)
if err != nil {
return nil, fmt.Errorf("error scanning child to parent relid map: %w", err)
}
var parentRelID, childRelID pgtype.Uint32
if _, err := pgx.ForEachRow(rows, []any{&parentRelID, &childRelID}, func() error {
childToParentRelIDMap[childRelID.Uint32] = parentRelID.Uint32
return nil
}); err != nil {
return nil, fmt.Errorf("error iterating over child to parent relid map: %w", err)
}

return childToParentRelIDMap, nil
Expand All @@ -114,6 +111,7 @@ type replProcessor[Items model.Items] interface {
p *PostgresCDCSource,
tuple *pglogrepl.TupleDataColumn,
col *pglogrepl.RelationMessageColumn,
customTypeMapping map[uint32]string,
) error
}

Expand All @@ -128,6 +126,7 @@ func (pgProcessor) Process(
p *PostgresCDCSource,
tuple *pglogrepl.TupleDataColumn,
col *pglogrepl.RelationMessageColumn,
customTypeMapping map[uint32]string,
) error {
switch tuple.DataType {
case 'n': // null
Expand Down Expand Up @@ -158,21 +157,22 @@ func (qProcessor) Process(
p *PostgresCDCSource,
tuple *pglogrepl.TupleDataColumn,
col *pglogrepl.RelationMessageColumn,
customTypeMapping map[uint32]string,
) error {
switch tuple.DataType {
case 'n': // null
items.AddColumn(col.Name, qvalue.QValueNull(qvalue.QValueKindInvalid))
case 't': // text
// bytea also appears here as a hex
data, err := p.decodeColumnData(tuple.Data, col.DataType, pgtype.TextFormatCode)
data, err := p.decodeColumnData(tuple.Data, col.DataType, pgtype.TextFormatCode, customTypeMapping)
if err != nil {
p.logger.Error("error decoding text column data", slog.Any("error", err),
slog.String("columnName", col.Name), slog.Int64("dataType", int64(col.DataType)))
return fmt.Errorf("error decoding text column data: %w", err)
}
items.AddColumn(col.Name, data)
case 'b': // binary
data, err := p.decodeColumnData(tuple.Data, col.DataType, pgtype.BinaryFormatCode)
data, err := p.decodeColumnData(tuple.Data, col.DataType, pgtype.BinaryFormatCode, customTypeMapping)
if err != nil {
return fmt.Errorf("error decoding binary column data: %w", err)
}
Expand All @@ -189,6 +189,7 @@ func processTuple[Items model.Items](
tuple *pglogrepl.TupleData,
rel *pglogrepl.RelationMessage,
exclude map[string]struct{},
customTypeMapping map[uint32]string,
) (Items, map[string]struct{}, error) {
// if the tuple is nil, return an empty map
if tuple == nil {
Expand All @@ -208,15 +209,17 @@ func processTuple[Items model.Items](
unchangedToastColumns = make(map[string]struct{})
}
unchangedToastColumns[rcol.Name] = struct{}{}
} else if err := processor.Process(items, p, tcol, rcol); err != nil {
} else if err := processor.Process(items, p, tcol, rcol, customTypeMapping); err != nil {
var none Items
return none, nil, err
}
}
return items, unchangedToastColumns, nil
}

func (p *PostgresCDCSource) decodeColumnData(data []byte, dataType uint32, formatCode int16) (qvalue.QValue, error) {
func (p *PostgresCDCSource) decodeColumnData(data []byte, dataType uint32,
formatCode int16, customTypeMapping map[uint32]string,
) (qvalue.QValue, error) {
var parsedData any
var err error
if dt, ok := p.typeMap.TypeForOID(dataType); ok {
Expand Down Expand Up @@ -260,7 +263,7 @@ func (p *PostgresCDCSource) decodeColumnData(data []byte, dataType uint32, forma
return retVal, nil
}

typeName, ok := p.customTypesMapping[dataType]
typeName, ok := customTypeMapping[dataType]
if ok {
customQKind := customTypeToQKind(typeName)
switch customQKind {
Expand Down Expand Up @@ -634,17 +637,21 @@ func processMessage[Items model.Items](
if err != nil {
return nil, fmt.Errorf("error parsing logical message: %w", err)
}
customTypeMapping, err := p.fetchCustomTypeMapping(ctx)
if err != nil {
return nil, err
}

switch msg := logicalMsg.(type) {
case *pglogrepl.BeginMessage:
logger.Debug("BeginMessage", slog.Any("FinalLSN", msg.FinalLSN), slog.Any("XID", msg.Xid))
p.commitLock = msg
case *pglogrepl.InsertMessage:
return processInsertMessage(p, xld.WALStart, msg, processor)
return processInsertMessage(p, xld.WALStart, msg, processor, customTypeMapping)
case *pglogrepl.UpdateMessage:
return processUpdateMessage(p, xld.WALStart, msg, processor)
return processUpdateMessage(p, xld.WALStart, msg, processor, customTypeMapping)
case *pglogrepl.DeleteMessage:
return processDeleteMessage(p, xld.WALStart, msg, processor)
return processDeleteMessage(p, xld.WALStart, msg, processor, customTypeMapping)
case *pglogrepl.CommitMessage:
// for a commit message, update the last checkpoint id for the record batch.
logger.Debug("CommitMessage", slog.Any("CommitLSN", msg.CommitLSN), slog.Any("TransactionEndLSN", msg.TransactionEndLSN))
Expand Down Expand Up @@ -694,6 +701,7 @@ func processInsertMessage[Items model.Items](
lsn pglogrepl.LSN,
msg *pglogrepl.InsertMessage,
processor replProcessor[Items],
customTypeMapping map[uint32]string,
) (model.Record[Items], error) {
relID := p.getParentRelIDIfPartitioned(msg.RelationID)

Expand All @@ -710,7 +718,7 @@ func processInsertMessage[Items model.Items](
return nil, fmt.Errorf("unknown relation id: %d", relID)
}

items, _, err := processTuple(processor, p, msg.Tuple, rel, p.tableNameMapping[tableName].Exclude)
items, _, err := processTuple(processor, p, msg.Tuple, rel, p.tableNameMapping[tableName].Exclude, customTypeMapping)
if err != nil {
return nil, fmt.Errorf("error converting tuple to map: %w", err)
}
Expand All @@ -729,6 +737,7 @@ func processUpdateMessage[Items model.Items](
lsn pglogrepl.LSN,
msg *pglogrepl.UpdateMessage,
processor replProcessor[Items],
customTypeMapping map[uint32]string,
) (model.Record[Items], error) {
relID := p.getParentRelIDIfPartitioned(msg.RelationID)

Expand All @@ -745,13 +754,14 @@ func processUpdateMessage[Items model.Items](
return nil, fmt.Errorf("unknown relation id: %d", relID)
}

oldItems, _, err := processTuple(processor, p, msg.OldTuple, rel, p.tableNameMapping[tableName].Exclude)
oldItems, _, err := processTuple(processor, p, msg.OldTuple, rel,
p.tableNameMapping[tableName].Exclude, customTypeMapping)
if err != nil {
return nil, fmt.Errorf("error converting old tuple to map: %w", err)
}

newItems, unchangedToastColumns, err := processTuple(
processor, p, msg.NewTuple, rel, p.tableNameMapping[tableName].Exclude)
processor, p, msg.NewTuple, rel, p.tableNameMapping[tableName].Exclude, customTypeMapping)
if err != nil {
return nil, fmt.Errorf("error converting new tuple to map: %w", err)
}
Expand Down Expand Up @@ -785,6 +795,7 @@ func processDeleteMessage[Items model.Items](
lsn pglogrepl.LSN,
msg *pglogrepl.DeleteMessage,
processor replProcessor[Items],
customTypeMapping map[uint32]string,
) (model.Record[Items], error) {
relID := p.getParentRelIDIfPartitioned(msg.RelationID)

Expand All @@ -801,7 +812,8 @@ func processDeleteMessage[Items model.Items](
return nil, fmt.Errorf("unknown relation id: %d", relID)
}

items, _, err := processTuple(processor, p, msg.OldTuple, rel, p.tableNameMapping[tableName].Exclude)
items, _, err := processTuple(processor, p, msg.OldTuple, rel,
p.tableNameMapping[tableName].Exclude, customTypeMapping)
if err != nil {
return nil, fmt.Errorf("error converting tuple to map: %w", err)
}
Expand Down Expand Up @@ -844,6 +856,10 @@ func processRelationMessage[Items model.Items](
slog.Uint64("relId", uint64(currRel.RelationID)))
return nil, nil
}
customTypeMapping, err := p.fetchCustomTypeMapping(ctx)
if err != nil {
return nil, err
}

// retrieve current TableSchema for table changed, mapping uses dst table name as key, need to translate source name
currRelDstInfo, ok := p.tableNameMapping[currRelName]
Expand All @@ -867,7 +883,7 @@ func processRelationMessage[Items model.Items](
case protos.TypeSystem_Q:
qKind := p.postgresOIDToQValueKind(column.DataType)
if qKind == qvalue.QValueKindInvalid {
typeName, ok := p.customTypesMapping[column.DataType]
typeName, ok := customTypeMapping[column.DataType]
if ok {
qKind = customTypeToQKind(typeName)
}
Expand Down
9 changes: 8 additions & 1 deletion flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,14 @@ func (c *PostgresConnector) jobMetadataExists(ctx context.Context, jobName strin
}

func (c *PostgresConnector) MajorVersion(ctx context.Context) (shared.PGVersion, error) {
return shared.GetMajorVersion(ctx, c.conn)
if c.pgVersion == 0 {
pgVersion, err := shared.GetMajorVersion(ctx, c.conn)
if err != nil {
return 0, err
}
c.pgVersion = pgVersion
}
return c.pgVersion, nil
}

func (c *PostgresConnector) updateSyncMetadata(ctx context.Context, flowJobName string, lastCP int64, syncBatchID int64,
Expand Down
Loading

0 comments on commit 713f10b

Please sign in to comment.