Skip to content

Commit

Permalink
Merge branch 'main' into eh-ff
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Oct 16, 2023
2 parents e8060ad + 5fb024f commit 1a94d94
Show file tree
Hide file tree
Showing 20 changed files with 329 additions and 131 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
timeout-minutes: 30
services:
pg_cdc:
image: postgres:15.4-alpine
image: postgis/postgis:15-3.4-alpine
ports:
- 7132:5432
env:
Expand Down
19 changes: 17 additions & 2 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type PostgresCDCSource struct {
typeMap *pgtype.Map
startLSN pglogrepl.LSN
commitLock bool
customTypeMapping map[uint32]string
}

type PostgresCDCConfig struct {
Expand All @@ -43,7 +44,7 @@ type PostgresCDCConfig struct {
}

// Create a new PostgresCDCSource
func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig) (*PostgresCDCSource, error) {
func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig, customTypeMap map[uint32]string) (*PostgresCDCSource, error) {
return &PostgresCDCSource{
ctx: cdcConfig.AppContext,
replPool: cdcConfig.Connection,
Expand All @@ -54,6 +55,7 @@ func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig) (*PostgresCDCSource, err
relationMessageMapping: cdcConfig.RelationMessageMapping,
typeMap: pgtype.NewMap(),
commitLock: false,
customTypeMapping: customTypeMap,
}, nil
}

Expand Down Expand Up @@ -527,6 +529,12 @@ func (p *PostgresCDCSource) decodeColumnData(data []byte, dataType uint32, forma
}
return retVal, nil
}
typeName, ok := p.customTypeMapping[dataType]
if ok {
return &qvalue.QValue{Kind: customTypeToQKind(typeName),
Value: string(data)}, nil
}

return &qvalue.QValue{Kind: qvalue.QValueKindString, Value: string(data)}, nil
}

Expand Down Expand Up @@ -577,9 +585,16 @@ func (p *PostgresCDCSource) processRelationMessage(
for _, column := range currRel.Columns {
// not present in previous relation message, but in current one, so added.
if prevRelMap[column.Name] == nil {
qKind := postgresOIDToQValueKind(column.DataType)
if qKind == qvalue.QValueKindInvalid {
typeName, ok := p.customTypeMapping[column.DataType]
if ok {
qKind = customTypeToQKind(typeName)
}
}
schemaDelta.AddedColumns = append(schemaDelta.AddedColumns, &protos.DeltaAddedColumn{
ColumnName: column.Name,
ColumnType: string(postgresOIDToQValueKind(column.DataType)),
ColumnType: string(qKind),
})
// present in previous and current relation messages, but data types have changed.
// so we add it to AddedColumns and DroppedColumns, knowing that we process DroppedColumns first.
Expand Down
27 changes: 19 additions & 8 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type PostgresConnector struct {
pool *pgxpool.Pool
replPool *pgxpool.Pool
tableSchemaMapping map[string]*protos.TableSchema
customTypesMapping map[uint32]string
}

// SchemaTable is a table in a schema.
Expand All @@ -56,6 +57,11 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig)
return nil, fmt.Errorf("failed to create connection pool: %w", err)
}

customTypeMap, err := utils.GetCustomDataTypes(ctx, pool)
if err != nil {
return nil, fmt.Errorf("failed to get custom type map: %w", err)
}

// ensure that replication is set to database
connConfig, err := pgxpool.ParseConfig(connectionString)
if err != nil {
Expand All @@ -72,11 +78,12 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig)
}

return &PostgresConnector{
connStr: connectionString,
ctx: ctx,
config: pgConfig,
pool: pool,
replPool: replPool,
connStr: connectionString,
ctx: ctx,
config: pgConfig,
pool: pool,
replPool: replPool,
customTypesMapping: customTypeMap,
}, nil
}

Expand Down Expand Up @@ -217,7 +224,7 @@ func (c *PostgresConnector) PullRecords(req *model.PullRecordsRequest) (*model.R
Publication: publicationName,
TableNameMapping: req.TableNameMapping,
RelationMessageMapping: req.RelationMessageMapping,
})
}, c.customTypesMapping)
if err != nil {
return nil, fmt.Errorf("failed to create cdc source: %w", err)
}
Expand Down Expand Up @@ -590,8 +597,12 @@ func (c *PostgresConnector) getTableSchemaForTable(
for _, fieldDescription := range rows.FieldDescriptions() {
genericColType := postgresOIDToQValueKind(fieldDescription.DataTypeOID)
if genericColType == qvalue.QValueKindInvalid {
// we use string for invalid types
genericColType = qvalue.QValueKindString
typeName, ok := c.customTypesMapping[fieldDescription.DataTypeOID]
if ok {
genericColType = customTypeToQKind(typeName)
} else {
genericColType = qvalue.QValueKindString
}
}

res.Columns[fieldDescription.Name] = string(genericColType)
Expand Down
25 changes: 20 additions & 5 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,11 @@ func (c *PostgresConnector) PullQRepRecords(
log.WithFields(log.Fields{
"partitionId": partition.PartitionId,
}).Infof("pulling full table partition for flow job %s", config.FlowJobName)
executor := NewQRepQueryExecutorSnapshot(c.pool, c.ctx, c.config.TransactionSnapshot,
executor, err := NewQRepQueryExecutorSnapshot(c.pool, c.ctx, c.config.TransactionSnapshot,
config.FlowJobName, partition.PartitionId)
if err != nil {
return nil, err
}
query := config.Query
return executor.ExecuteAndProcessQuery(query)
}
Expand Down Expand Up @@ -336,8 +339,12 @@ func (c *PostgresConnector) PullQRepRecords(
return nil, err
}

executor := NewQRepQueryExecutorSnapshot(c.pool, c.ctx, c.config.TransactionSnapshot,
executor, err := NewQRepQueryExecutorSnapshot(c.pool, c.ctx, c.config.TransactionSnapshot,
config.FlowJobName, partition.PartitionId)
if err != nil {
return nil, err
}

records, err := executor.ExecuteAndProcessQuery(query,
rangeStart, rangeEnd)
if err != nil {
Expand All @@ -362,10 +369,14 @@ func (c *PostgresConnector) PullQRepRecordStream(
"flowName": config.FlowJobName,
"partitionId": partition.PartitionId,
}).Infof("pulling full table partition for flow job %s", config.FlowJobName)
executor := NewQRepQueryExecutorSnapshot(c.pool, c.ctx, c.config.TransactionSnapshot,
executor, err := NewQRepQueryExecutorSnapshot(c.pool, c.ctx, c.config.TransactionSnapshot,
config.FlowJobName, partition.PartitionId)
if err != nil {
return 0, err
}

query := config.Query
_, err := executor.ExecuteAndProcessQueryStream(stream, query)
_, err = executor.ExecuteAndProcessQueryStream(stream, query)
return 0, err
}
log.WithFields(log.Fields{
Expand Down Expand Up @@ -409,8 +420,12 @@ func (c *PostgresConnector) PullQRepRecordStream(
return 0, err
}

executor := NewQRepQueryExecutorSnapshot(c.pool, c.ctx, c.config.TransactionSnapshot,
executor, err := NewQRepQueryExecutorSnapshot(c.pool, c.ctx, c.config.TransactionSnapshot,
config.FlowJobName, partition.PartitionId)
if err != nil {
return 0, err
}

numRecords, err := executor.ExecuteAndProcessQueryStream(stream, query, rangeStart, rangeEnd)
if err != nil {
return 0, err
Expand Down
75 changes: 52 additions & 23 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
util "github.com/PeerDB-io/peer-flow/utils"
"github.com/jackc/pgx/v5"
Expand All @@ -17,12 +18,13 @@ import (
)

type QRepQueryExecutor struct {
pool *pgxpool.Pool
ctx context.Context
snapshot string
testEnv bool
flowJobName string
partitionID string
pool *pgxpool.Pool
ctx context.Context
snapshot string
testEnv bool
flowJobName string
partitionID string
customTypeMap map[uint32]string
}

func NewQRepQueryExecutor(pool *pgxpool.Pool, ctx context.Context,
Expand All @@ -37,18 +39,23 @@ func NewQRepQueryExecutor(pool *pgxpool.Pool, ctx context.Context,
}

func NewQRepQueryExecutorSnapshot(pool *pgxpool.Pool, ctx context.Context, snapshot string,
flowJobName string, partitionID string) *QRepQueryExecutor {
flowJobName string, partitionID string) (*QRepQueryExecutor, error) {
log.WithFields(log.Fields{
"flowName": flowJobName,
"partitionID": partitionID,
}).Info("Declared new qrep executor for snapshot")
return &QRepQueryExecutor{
pool: pool,
ctx: ctx,
snapshot: snapshot,
flowJobName: flowJobName,
partitionID: partitionID,
CustomTypeMap, err := utils.GetCustomDataTypes(ctx, pool)
if err != nil {
return nil, fmt.Errorf("failed to get custom data types: %w", err)
}
return &QRepQueryExecutor{
pool: pool,
ctx: ctx,
snapshot: snapshot,
flowJobName: flowJobName,
partitionID: partitionID,
customTypeMap: CustomTypeMap,
}, nil
}

func (qe *QRepQueryExecutor) SetTestEnv(testEnv bool) {
Expand Down Expand Up @@ -89,11 +96,22 @@ func (qe *QRepQueryExecutor) executeQueryInTx(tx pgx.Tx, cursorName string, fetc
}

// FieldDescriptionsToSchema converts a slice of pgconn.FieldDescription to a QRecordSchema.
func fieldDescriptionsToSchema(fds []pgconn.FieldDescription) *model.QRecordSchema {
func (qe *QRepQueryExecutor) fieldDescriptionsToSchema(fds []pgconn.FieldDescription) *model.QRecordSchema {
qfields := make([]*model.QField, len(fds))
for i, fd := range fds {
cname := fd.Name
ctype := postgresOIDToQValueKind(fd.DataTypeOID)
if ctype == qvalue.QValueKindInvalid {
var err error
ctype = qvalue.QValueKind(qe.customTypeMap[fd.DataTypeOID])
if err != nil {
ctype = qvalue.QValueKindInvalid
typeName, ok := qe.customTypeMap[fd.DataTypeOID]
if ok {
ctype = customTypeToQKind(typeName)
}
}
}
// there isn't a way to know if a column is nullable or not
// TODO fix this.
cnullable := true
Expand All @@ -118,7 +136,7 @@ func (qe *QRepQueryExecutor) ProcessRows(
}).Info("Processing rows")
// Iterate over the rows
for rows.Next() {
record, err := mapRowToQRecord(rows, fieldDescriptions)
record, err := mapRowToQRecord(rows, fieldDescriptions, qe.customTypeMap)
if err != nil {
return nil, fmt.Errorf("failed to map row to QRecord: %w", err)
}
Expand All @@ -133,7 +151,7 @@ func (qe *QRepQueryExecutor) ProcessRows(
batch := &model.QRecordBatch{
NumRecords: uint32(len(records)),
Records: records,
Schema: fieldDescriptionsToSchema(fieldDescriptions),
Schema: qe.fieldDescriptionsToSchema(fieldDescriptions),
}

log.WithFields(log.Fields{
Expand All @@ -155,7 +173,7 @@ func (qe *QRepQueryExecutor) processRowsStream(

// Iterate over the rows
for rows.Next() {
record, err := mapRowToQRecord(rows, fieldDescriptions)
record, err := mapRowToQRecord(rows, fieldDescriptions, qe.customTypeMap)
if err != nil {
stream.Records <- &model.QRecordOrError{
Err: fmt.Errorf("failed to map row to QRecord: %w", err),
Expand Down Expand Up @@ -214,7 +232,7 @@ func (qe *QRepQueryExecutor) processFetchedRows(

fieldDescriptions := rows.FieldDescriptions()
if !stream.IsSchemaSet() {
schema := fieldDescriptionsToSchema(fieldDescriptions)
schema := qe.fieldDescriptionsToSchema(fieldDescriptions)
_ = stream.SetSchema(schema)
}

Expand Down Expand Up @@ -395,7 +413,8 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStream(
return totalRecordsFetched, nil
}

func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription) (*model.QRecord, error) {
func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription,
customTypeMap map[uint32]string) (*model.QRecord, error) {
// make vals an empty array of QValue of size len(fds)
record := model.NewQRecord(len(fds))

Expand All @@ -405,11 +424,21 @@ func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription) (*model.QRecor
}

for i, fd := range fds {
tmp, err := parseFieldFromPostgresOID(fd.DataTypeOID, values[i])
if err != nil {
return nil, fmt.Errorf("failed to parse field: %w", err)
// Check if it's a custom type first
typeName, ok := customTypeMap[fd.DataTypeOID]
if !ok {
tmp, err := parseFieldFromPostgresOID(fd.DataTypeOID, values[i])
if err != nil {
return nil, fmt.Errorf("failed to parse field: %w", err)
}
record.Set(i, *tmp)
} else {
customTypeVal := qvalue.QValue{
Kind: customTypeToQKind(typeName),
Value: values[i],
}
record.Set(i, customTypeVal)
}
record.Set(i, *tmp)
}

return record, nil
Expand Down
24 changes: 23 additions & 1 deletion flow/connectors/postgres/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ func postgresOIDToQValueKind(recvOID uint32) qvalue.QValueKind {
return qvalue.QValueKindArrayInt32
case pgtype.Int8ArrayOID:
return qvalue.QValueKindArrayInt64
case pgtype.PointOID:
return qvalue.QValueKindPoint
case pgtype.Float4ArrayOID:
return qvalue.QValueKindArrayFloat32
case pgtype.Float8ArrayOID:
Expand All @@ -77,8 +79,10 @@ func postgresOIDToQValueKind(recvOID uint32) qvalue.QValueKind {
return qvalue.QValueKindString
} else if recvOID == uint32(oid.T_tsquery) { // TSQUERY
return qvalue.QValueKindString
} else if recvOID == uint32(oid.T_point) { // POINT
return qvalue.QValueKindPoint
}
// log.Warnf("failed to get type name for oid: %v", recvOID)

return qvalue.QValueKindInvalid
} else {
log.Warnf("unsupported field type: %v - type name - %s; returning as string", recvOID, typeName.Name)
Expand Down Expand Up @@ -337,6 +341,11 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) (
return nil, fmt.Errorf("failed to parse hstore: %w", err)
}
val = &qvalue.QValue{Kind: qvalue.QValueKindHStore, Value: hstoreVal}
case qvalue.QValueKindPoint:
xCoord := value.(pgtype.Point).P.X
yCoord := value.(pgtype.Point).P.Y
val = &qvalue.QValue{Kind: qvalue.QValueKindPoint,
Value: fmt.Sprintf("POINT(%f %f)", xCoord, yCoord)}
default:
// log.Warnf("unhandled QValueKind => %v, parsing as string", qvalueKind)
textVal, ok := value.(string)
Expand Down Expand Up @@ -380,3 +389,16 @@ func numericToRat(numVal *pgtype.Numeric) (*big.Rat, error) {
// handle invalid numeric
return nil, errors.New("invalid numeric")
}

func customTypeToQKind(typeName string) qvalue.QValueKind {
var qValueKind qvalue.QValueKind
switch typeName {
case "geometry":
qValueKind = qvalue.QValueKindGeometry
case "geography":
qValueKind = qvalue.QValueKindGeography
default:
qValueKind = qvalue.QValueKindString
}
return qValueKind
}
Loading

0 comments on commit 1a94d94

Please sign in to comment.