Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Geospatial support for Snowflake #516

Merged
merged 12 commits into from
Oct 16, 2023
2 changes: 1 addition & 1 deletion .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
timeout-minutes: 30
services:
pg_cdc:
image: postgres:15.4-alpine
image: postgis/postgis:15-3.4-alpine
ports:
- 7132:5432
env:
Expand Down
19 changes: 17 additions & 2 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type PostgresCDCSource struct {
typeMap *pgtype.Map
startLSN pglogrepl.LSN
commitLock bool
customTypeMapping map[uint32]string
}

type PostgresCDCConfig struct {
Expand All @@ -43,7 +44,7 @@ type PostgresCDCConfig struct {
}

// Create a new PostgresCDCSource
func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig) (*PostgresCDCSource, error) {
func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig, customTypeMap map[uint32]string) (*PostgresCDCSource, error) {
return &PostgresCDCSource{
ctx: cdcConfig.AppContext,
replPool: cdcConfig.Connection,
Expand All @@ -54,6 +55,7 @@ func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig) (*PostgresCDCSource, err
relationMessageMapping: cdcConfig.RelationMessageMapping,
typeMap: pgtype.NewMap(),
commitLock: false,
customTypeMapping: customTypeMap,
}, nil
}

Expand Down Expand Up @@ -527,6 +529,12 @@ func (p *PostgresCDCSource) decodeColumnData(data []byte, dataType uint32, forma
}
return retVal, nil
}
typeName, ok := p.customTypeMapping[dataType]
if ok {
return &qvalue.QValue{Kind: customTypeToQKind(typeName),
Value: string(data)}, nil
}

return &qvalue.QValue{Kind: qvalue.QValueKindString, Value: string(data)}, nil
}

Expand Down Expand Up @@ -577,9 +585,16 @@ func (p *PostgresCDCSource) processRelationMessage(
for _, column := range currRel.Columns {
// not present in previous relation message, but in current one, so added.
if prevRelMap[column.Name] == nil {
qKind := postgresOIDToQValueKind(column.DataType)
if qKind == qvalue.QValueKindInvalid {
typeName, ok := p.customTypeMapping[column.DataType]
if ok {
qKind = customTypeToQKind(typeName)
}
}
schemaDelta.AddedColumns = append(schemaDelta.AddedColumns, &protos.DeltaAddedColumn{
ColumnName: column.Name,
ColumnType: string(postgresOIDToQValueKind(column.DataType)),
ColumnType: string(qKind),
})
// present in previous and current relation messages, but data types have changed.
// so we add it to AddedColumns and DroppedColumns, knowing that we process DroppedColumns first.
Expand Down
27 changes: 19 additions & 8 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type PostgresConnector struct {
pool *pgxpool.Pool
replPool *pgxpool.Pool
tableSchemaMapping map[string]*protos.TableSchema
customTypesMapping map[uint32]string
}

// SchemaTable is a table in a schema.
Expand All @@ -56,6 +57,11 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig)
return nil, fmt.Errorf("failed to create connection pool: %w", err)
}

customTypeMap, err := utils.GetCustomDataTypes(ctx, pool)
if err != nil {
return nil, fmt.Errorf("failed to get custom type map: %w", err)
}

// ensure that replication is set to database
connConfig, err := pgxpool.ParseConfig(connectionString)
if err != nil {
Expand All @@ -72,11 +78,12 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig)
}

return &PostgresConnector{
connStr: connectionString,
ctx: ctx,
config: pgConfig,
pool: pool,
replPool: replPool,
connStr: connectionString,
ctx: ctx,
config: pgConfig,
pool: pool,
replPool: replPool,
customTypesMapping: customTypeMap,
}, nil
}

Expand Down Expand Up @@ -217,7 +224,7 @@ func (c *PostgresConnector) PullRecords(req *model.PullRecordsRequest) (*model.R
Publication: publicationName,
TableNameMapping: req.TableNameMapping,
RelationMessageMapping: req.RelationMessageMapping,
})
}, c.customTypesMapping)
if err != nil {
return nil, fmt.Errorf("failed to create cdc source: %w", err)
}
Expand Down Expand Up @@ -590,8 +597,12 @@ func (c *PostgresConnector) getTableSchemaForTable(
for _, fieldDescription := range rows.FieldDescriptions() {
genericColType := postgresOIDToQValueKind(fieldDescription.DataTypeOID)
if genericColType == qvalue.QValueKindInvalid {
// we use string for invalid types
genericColType = qvalue.QValueKindString
typeName, ok := c.customTypesMapping[fieldDescription.DataTypeOID]
if ok {
genericColType = customTypeToQKind(typeName)
} else {
genericColType = qvalue.QValueKindString
}
}

res.Columns[fieldDescription.Name] = string(genericColType)
Expand Down
25 changes: 20 additions & 5 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,11 @@ func (c *PostgresConnector) PullQRepRecords(
log.WithFields(log.Fields{
"partitionId": partition.PartitionId,
}).Infof("pulling full table partition for flow job %s", config.FlowJobName)
executor := NewQRepQueryExecutorSnapshot(c.pool, c.ctx, c.config.TransactionSnapshot,
executor, err := NewQRepQueryExecutorSnapshot(c.pool, c.ctx, c.config.TransactionSnapshot,
config.FlowJobName, partition.PartitionId)
if err != nil {
return nil, err
}
query := config.Query
return executor.ExecuteAndProcessQuery(query)
}
Expand Down Expand Up @@ -336,8 +339,12 @@ func (c *PostgresConnector) PullQRepRecords(
return nil, err
}

executor := NewQRepQueryExecutorSnapshot(c.pool, c.ctx, c.config.TransactionSnapshot,
executor, err := NewQRepQueryExecutorSnapshot(c.pool, c.ctx, c.config.TransactionSnapshot,
config.FlowJobName, partition.PartitionId)
if err != nil {
return nil, err
}

records, err := executor.ExecuteAndProcessQuery(query,
rangeStart, rangeEnd)
if err != nil {
Expand All @@ -362,10 +369,14 @@ func (c *PostgresConnector) PullQRepRecordStream(
"flowName": config.FlowJobName,
"partitionId": partition.PartitionId,
}).Infof("pulling full table partition for flow job %s", config.FlowJobName)
executor := NewQRepQueryExecutorSnapshot(c.pool, c.ctx, c.config.TransactionSnapshot,
executor, err := NewQRepQueryExecutorSnapshot(c.pool, c.ctx, c.config.TransactionSnapshot,
config.FlowJobName, partition.PartitionId)
if err != nil {
return 0, err
}

query := config.Query
_, err := executor.ExecuteAndProcessQueryStream(stream, query)
_, err = executor.ExecuteAndProcessQueryStream(stream, query)
return 0, err
}
log.WithFields(log.Fields{
Expand Down Expand Up @@ -409,8 +420,12 @@ func (c *PostgresConnector) PullQRepRecordStream(
return 0, err
}

executor := NewQRepQueryExecutorSnapshot(c.pool, c.ctx, c.config.TransactionSnapshot,
executor, err := NewQRepQueryExecutorSnapshot(c.pool, c.ctx, c.config.TransactionSnapshot,
config.FlowJobName, partition.PartitionId)
if err != nil {
return 0, err
}

numRecords, err := executor.ExecuteAndProcessQueryStream(stream, query, rangeStart, rangeEnd)
if err != nil {
return 0, err
Expand Down
75 changes: 52 additions & 23 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
util "github.com/PeerDB-io/peer-flow/utils"
"github.com/jackc/pgx/v5"
Expand All @@ -17,12 +18,13 @@ import (
)

type QRepQueryExecutor struct {
pool *pgxpool.Pool
ctx context.Context
snapshot string
testEnv bool
flowJobName string
partitionID string
pool *pgxpool.Pool
ctx context.Context
snapshot string
testEnv bool
flowJobName string
partitionID string
customTypeMap map[uint32]string
}

func NewQRepQueryExecutor(pool *pgxpool.Pool, ctx context.Context,
Expand All @@ -37,18 +39,23 @@ func NewQRepQueryExecutor(pool *pgxpool.Pool, ctx context.Context,
}

func NewQRepQueryExecutorSnapshot(pool *pgxpool.Pool, ctx context.Context, snapshot string,
flowJobName string, partitionID string) *QRepQueryExecutor {
flowJobName string, partitionID string) (*QRepQueryExecutor, error) {
log.WithFields(log.Fields{
"flowName": flowJobName,
"partitionID": partitionID,
}).Info("Declared new qrep executor for snapshot")
return &QRepQueryExecutor{
pool: pool,
ctx: ctx,
snapshot: snapshot,
flowJobName: flowJobName,
partitionID: partitionID,
CustomTypeMap, err := utils.GetCustomDataTypes(ctx, pool)
if err != nil {
return nil, fmt.Errorf("failed to get custom data types: %w", err)
}
return &QRepQueryExecutor{
pool: pool,
ctx: ctx,
snapshot: snapshot,
flowJobName: flowJobName,
partitionID: partitionID,
customTypeMap: CustomTypeMap,
}, nil
}

func (qe *QRepQueryExecutor) SetTestEnv(testEnv bool) {
Expand Down Expand Up @@ -89,11 +96,22 @@ func (qe *QRepQueryExecutor) executeQueryInTx(tx pgx.Tx, cursorName string, fetc
}

// FieldDescriptionsToSchema converts a slice of pgconn.FieldDescription to a QRecordSchema.
func fieldDescriptionsToSchema(fds []pgconn.FieldDescription) *model.QRecordSchema {
func (qe *QRepQueryExecutor) fieldDescriptionsToSchema(fds []pgconn.FieldDescription) *model.QRecordSchema {
qfields := make([]*model.QField, len(fds))
for i, fd := range fds {
cname := fd.Name
ctype := postgresOIDToQValueKind(fd.DataTypeOID)
if ctype == qvalue.QValueKindInvalid {
var err error
ctype = qvalue.QValueKind(qe.customTypeMap[fd.DataTypeOID])
if err != nil {
ctype = qvalue.QValueKindInvalid
typeName, ok := qe.customTypeMap[fd.DataTypeOID]
if ok {
ctype = customTypeToQKind(typeName)
}
}
}
// there isn't a way to know if a column is nullable or not
// TODO fix this.
cnullable := true
Expand All @@ -118,7 +136,7 @@ func (qe *QRepQueryExecutor) ProcessRows(
}).Info("Processing rows")
// Iterate over the rows
for rows.Next() {
record, err := mapRowToQRecord(rows, fieldDescriptions)
record, err := mapRowToQRecord(rows, fieldDescriptions, qe.customTypeMap)
if err != nil {
return nil, fmt.Errorf("failed to map row to QRecord: %w", err)
}
Expand All @@ -133,7 +151,7 @@ func (qe *QRepQueryExecutor) ProcessRows(
batch := &model.QRecordBatch{
NumRecords: uint32(len(records)),
Records: records,
Schema: fieldDescriptionsToSchema(fieldDescriptions),
Schema: qe.fieldDescriptionsToSchema(fieldDescriptions),
}

log.WithFields(log.Fields{
Expand All @@ -155,7 +173,7 @@ func (qe *QRepQueryExecutor) processRowsStream(

// Iterate over the rows
for rows.Next() {
record, err := mapRowToQRecord(rows, fieldDescriptions)
record, err := mapRowToQRecord(rows, fieldDescriptions, qe.customTypeMap)
if err != nil {
stream.Records <- &model.QRecordOrError{
Err: fmt.Errorf("failed to map row to QRecord: %w", err),
Expand Down Expand Up @@ -214,7 +232,7 @@ func (qe *QRepQueryExecutor) processFetchedRows(

fieldDescriptions := rows.FieldDescriptions()
if !stream.IsSchemaSet() {
schema := fieldDescriptionsToSchema(fieldDescriptions)
schema := qe.fieldDescriptionsToSchema(fieldDescriptions)
_ = stream.SetSchema(schema)
}

Expand Down Expand Up @@ -395,7 +413,8 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStream(
return totalRecordsFetched, nil
}

func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription) (*model.QRecord, error) {
func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription,
customTypeMap map[uint32]string) (*model.QRecord, error) {
// make vals an empty array of QValue of size len(fds)
record := model.NewQRecord(len(fds))

Expand All @@ -405,11 +424,21 @@ func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription) (*model.QRecor
}

for i, fd := range fds {
tmp, err := parseFieldFromPostgresOID(fd.DataTypeOID, values[i])
if err != nil {
return nil, fmt.Errorf("failed to parse field: %w", err)
// Check if it's a custom type first
typeName, ok := customTypeMap[fd.DataTypeOID]
if !ok {
tmp, err := parseFieldFromPostgresOID(fd.DataTypeOID, values[i])
if err != nil {
return nil, fmt.Errorf("failed to parse field: %w", err)
}
record.Set(i, *tmp)
} else {
customTypeVal := qvalue.QValue{
Kind: customTypeToQKind(typeName),
Value: values[i],
}
record.Set(i, customTypeVal)
}
record.Set(i, *tmp)
}

return record, nil
Expand Down
24 changes: 23 additions & 1 deletion flow/connectors/postgres/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ func postgresOIDToQValueKind(recvOID uint32) qvalue.QValueKind {
return qvalue.QValueKindArrayInt32
case pgtype.Int8ArrayOID:
return qvalue.QValueKindArrayInt64
case pgtype.PointOID:
return qvalue.QValueKindPoint
case pgtype.Float4ArrayOID:
return qvalue.QValueKindArrayFloat32
case pgtype.Float8ArrayOID:
Expand All @@ -77,8 +79,10 @@ func postgresOIDToQValueKind(recvOID uint32) qvalue.QValueKind {
return qvalue.QValueKindString
} else if recvOID == uint32(oid.T_tsquery) { // TSQUERY
return qvalue.QValueKindString
} else if recvOID == uint32(oid.T_point) { // POINT
return qvalue.QValueKindPoint
}
// log.Warnf("failed to get type name for oid: %v", recvOID)

return qvalue.QValueKindInvalid
} else {
log.Warnf("unsupported field type: %v - type name - %s; returning as string", recvOID, typeName.Name)
Expand Down Expand Up @@ -337,6 +341,11 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) (
return nil, fmt.Errorf("failed to parse hstore: %w", err)
}
val = &qvalue.QValue{Kind: qvalue.QValueKindHStore, Value: hstoreVal}
case qvalue.QValueKindPoint:
xCoord := value.(pgtype.Point).P.X
yCoord := value.(pgtype.Point).P.Y
val = &qvalue.QValue{Kind: qvalue.QValueKindPoint,
Value: fmt.Sprintf("POINT(%f %f)", xCoord, yCoord)}
default:
// log.Warnf("unhandled QValueKind => %v, parsing as string", qvalueKind)
textVal, ok := value.(string)
Expand Down Expand Up @@ -380,3 +389,16 @@ func numericToRat(numVal *pgtype.Numeric) (*big.Rat, error) {
// handle invalid numeric
return nil, errors.New("invalid numeric")
}

func customTypeToQKind(typeName string) qvalue.QValueKind {
var qValueKind qvalue.QValueKind
switch typeName {
case "geometry":
qValueKind = qvalue.QValueKindGeometry
case "geography":
qValueKind = qvalue.QValueKindGeography
default:
qValueKind = qvalue.QValueKindString
}
return qValueKind
}
Loading
Loading