diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index ffceae7c53..1c3a385d3f 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -16,7 +16,7 @@ jobs: timeout-minutes: 30 services: pg_cdc: - image: postgres:15.4-alpine + image: postgis/postgis:15-3.4-alpine ports: - 7132:5432 env: diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 81e9ef790d..c20f0f20d6 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -30,6 +30,7 @@ type PostgresCDCSource struct { typeMap *pgtype.Map startLSN pglogrepl.LSN commitLock bool + customTypeMapping map[uint32]string } type PostgresCDCConfig struct { @@ -43,7 +44,7 @@ type PostgresCDCConfig struct { } // Create a new PostgresCDCSource -func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig) (*PostgresCDCSource, error) { +func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig, customTypeMap map[uint32]string) (*PostgresCDCSource, error) { return &PostgresCDCSource{ ctx: cdcConfig.AppContext, replPool: cdcConfig.Connection, @@ -54,6 +55,7 @@ func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig) (*PostgresCDCSource, err relationMessageMapping: cdcConfig.RelationMessageMapping, typeMap: pgtype.NewMap(), commitLock: false, + customTypeMapping: customTypeMap, }, nil } @@ -527,6 +529,12 @@ func (p *PostgresCDCSource) decodeColumnData(data []byte, dataType uint32, forma } return retVal, nil } + typeName, ok := p.customTypeMapping[dataType] + if ok { + return &qvalue.QValue{Kind: customTypeToQKind(typeName), + Value: string(data)}, nil + } + return &qvalue.QValue{Kind: qvalue.QValueKindString, Value: string(data)}, nil } @@ -577,9 +585,16 @@ func (p *PostgresCDCSource) processRelationMessage( for _, column := range currRel.Columns { // not present in previous relation message, but in current one, so added. if prevRelMap[column.Name] == nil { + qKind := postgresOIDToQValueKind(column.DataType) + if qKind == qvalue.QValueKindInvalid { + typeName, ok := p.customTypeMapping[column.DataType] + if ok { + qKind = customTypeToQKind(typeName) + } + } schemaDelta.AddedColumns = append(schemaDelta.AddedColumns, &protos.DeltaAddedColumn{ ColumnName: column.Name, - ColumnType: string(postgresOIDToQValueKind(column.DataType)), + ColumnType: string(qKind), }) // present in previous and current relation messages, but data types have changed. // so we add it to AddedColumns and DroppedColumns, knowing that we process DroppedColumns first. diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 9725a6a2af..e7588a74df 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -31,6 +31,7 @@ type PostgresConnector struct { pool *pgxpool.Pool replPool *pgxpool.Pool tableSchemaMapping map[string]*protos.TableSchema + customTypesMapping map[uint32]string } // SchemaTable is a table in a schema. @@ -56,6 +57,11 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) return nil, fmt.Errorf("failed to create connection pool: %w", err) } + customTypeMap, err := utils.GetCustomDataTypes(ctx, pool) + if err != nil { + return nil, fmt.Errorf("failed to get custom type map: %w", err) + } + // ensure that replication is set to database connConfig, err := pgxpool.ParseConfig(connectionString) if err != nil { @@ -72,11 +78,12 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) } return &PostgresConnector{ - connStr: connectionString, - ctx: ctx, - config: pgConfig, - pool: pool, - replPool: replPool, + connStr: connectionString, + ctx: ctx, + config: pgConfig, + pool: pool, + replPool: replPool, + customTypesMapping: customTypeMap, }, nil } @@ -217,7 +224,7 @@ func (c *PostgresConnector) PullRecords(req *model.PullRecordsRequest) (*model.R Publication: publicationName, TableNameMapping: req.TableNameMapping, RelationMessageMapping: req.RelationMessageMapping, - }) + }, c.customTypesMapping) if err != nil { return nil, fmt.Errorf("failed to create cdc source: %w", err) } @@ -590,8 +597,12 @@ func (c *PostgresConnector) getTableSchemaForTable( for _, fieldDescription := range rows.FieldDescriptions() { genericColType := postgresOIDToQValueKind(fieldDescription.DataTypeOID) if genericColType == qvalue.QValueKindInvalid { - // we use string for invalid types - genericColType = qvalue.QValueKindString + typeName, ok := c.customTypesMapping[fieldDescription.DataTypeOID] + if ok { + genericColType = customTypeToQKind(typeName) + } else { + genericColType = qvalue.QValueKindString + } } res.Columns[fieldDescription.Name] = string(genericColType) diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index 3c0d3124b8..830ff453bf 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -290,8 +290,11 @@ func (c *PostgresConnector) PullQRepRecords( log.WithFields(log.Fields{ "partitionId": partition.PartitionId, }).Infof("pulling full table partition for flow job %s", config.FlowJobName) - executor := NewQRepQueryExecutorSnapshot(c.pool, c.ctx, c.config.TransactionSnapshot, + executor, err := NewQRepQueryExecutorSnapshot(c.pool, c.ctx, c.config.TransactionSnapshot, config.FlowJobName, partition.PartitionId) + if err != nil { + return nil, err + } query := config.Query return executor.ExecuteAndProcessQuery(query) } @@ -336,8 +339,12 @@ func (c *PostgresConnector) PullQRepRecords( return nil, err } - executor := NewQRepQueryExecutorSnapshot(c.pool, c.ctx, c.config.TransactionSnapshot, + executor, err := NewQRepQueryExecutorSnapshot(c.pool, c.ctx, c.config.TransactionSnapshot, config.FlowJobName, partition.PartitionId) + if err != nil { + return nil, err + } + records, err := executor.ExecuteAndProcessQuery(query, rangeStart, rangeEnd) if err != nil { @@ -362,10 +369,14 @@ func (c *PostgresConnector) PullQRepRecordStream( "flowName": config.FlowJobName, "partitionId": partition.PartitionId, }).Infof("pulling full table partition for flow job %s", config.FlowJobName) - executor := NewQRepQueryExecutorSnapshot(c.pool, c.ctx, c.config.TransactionSnapshot, + executor, err := NewQRepQueryExecutorSnapshot(c.pool, c.ctx, c.config.TransactionSnapshot, config.FlowJobName, partition.PartitionId) + if err != nil { + return 0, err + } + query := config.Query - _, err := executor.ExecuteAndProcessQueryStream(stream, query) + _, err = executor.ExecuteAndProcessQueryStream(stream, query) return 0, err } log.WithFields(log.Fields{ @@ -409,8 +420,12 @@ func (c *PostgresConnector) PullQRepRecordStream( return 0, err } - executor := NewQRepQueryExecutorSnapshot(c.pool, c.ctx, c.config.TransactionSnapshot, + executor, err := NewQRepQueryExecutorSnapshot(c.pool, c.ctx, c.config.TransactionSnapshot, config.FlowJobName, partition.PartitionId) + if err != nil { + return 0, err + } + numRecords, err := executor.ExecuteAndProcessQueryStream(stream, query, rangeStart, rangeEnd) if err != nil { return 0, err diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index 5e2ef59d59..94943132bb 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -7,6 +7,7 @@ import ( "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" util "github.com/PeerDB-io/peer-flow/utils" "github.com/jackc/pgx/v5" @@ -17,12 +18,13 @@ import ( ) type QRepQueryExecutor struct { - pool *pgxpool.Pool - ctx context.Context - snapshot string - testEnv bool - flowJobName string - partitionID string + pool *pgxpool.Pool + ctx context.Context + snapshot string + testEnv bool + flowJobName string + partitionID string + customTypeMap map[uint32]string } func NewQRepQueryExecutor(pool *pgxpool.Pool, ctx context.Context, @@ -37,18 +39,23 @@ func NewQRepQueryExecutor(pool *pgxpool.Pool, ctx context.Context, } func NewQRepQueryExecutorSnapshot(pool *pgxpool.Pool, ctx context.Context, snapshot string, - flowJobName string, partitionID string) *QRepQueryExecutor { + flowJobName string, partitionID string) (*QRepQueryExecutor, error) { log.WithFields(log.Fields{ "flowName": flowJobName, "partitionID": partitionID, }).Info("Declared new qrep executor for snapshot") - return &QRepQueryExecutor{ - pool: pool, - ctx: ctx, - snapshot: snapshot, - flowJobName: flowJobName, - partitionID: partitionID, + CustomTypeMap, err := utils.GetCustomDataTypes(ctx, pool) + if err != nil { + return nil, fmt.Errorf("failed to get custom data types: %w", err) } + return &QRepQueryExecutor{ + pool: pool, + ctx: ctx, + snapshot: snapshot, + flowJobName: flowJobName, + partitionID: partitionID, + customTypeMap: CustomTypeMap, + }, nil } func (qe *QRepQueryExecutor) SetTestEnv(testEnv bool) { @@ -89,11 +96,22 @@ func (qe *QRepQueryExecutor) executeQueryInTx(tx pgx.Tx, cursorName string, fetc } // FieldDescriptionsToSchema converts a slice of pgconn.FieldDescription to a QRecordSchema. -func fieldDescriptionsToSchema(fds []pgconn.FieldDescription) *model.QRecordSchema { +func (qe *QRepQueryExecutor) fieldDescriptionsToSchema(fds []pgconn.FieldDescription) *model.QRecordSchema { qfields := make([]*model.QField, len(fds)) for i, fd := range fds { cname := fd.Name ctype := postgresOIDToQValueKind(fd.DataTypeOID) + if ctype == qvalue.QValueKindInvalid { + var err error + ctype = qvalue.QValueKind(qe.customTypeMap[fd.DataTypeOID]) + if err != nil { + ctype = qvalue.QValueKindInvalid + typeName, ok := qe.customTypeMap[fd.DataTypeOID] + if ok { + ctype = customTypeToQKind(typeName) + } + } + } // there isn't a way to know if a column is nullable or not // TODO fix this. cnullable := true @@ -118,7 +136,7 @@ func (qe *QRepQueryExecutor) ProcessRows( }).Info("Processing rows") // Iterate over the rows for rows.Next() { - record, err := mapRowToQRecord(rows, fieldDescriptions) + record, err := mapRowToQRecord(rows, fieldDescriptions, qe.customTypeMap) if err != nil { return nil, fmt.Errorf("failed to map row to QRecord: %w", err) } @@ -133,7 +151,7 @@ func (qe *QRepQueryExecutor) ProcessRows( batch := &model.QRecordBatch{ NumRecords: uint32(len(records)), Records: records, - Schema: fieldDescriptionsToSchema(fieldDescriptions), + Schema: qe.fieldDescriptionsToSchema(fieldDescriptions), } log.WithFields(log.Fields{ @@ -155,7 +173,7 @@ func (qe *QRepQueryExecutor) processRowsStream( // Iterate over the rows for rows.Next() { - record, err := mapRowToQRecord(rows, fieldDescriptions) + record, err := mapRowToQRecord(rows, fieldDescriptions, qe.customTypeMap) if err != nil { stream.Records <- &model.QRecordOrError{ Err: fmt.Errorf("failed to map row to QRecord: %w", err), @@ -214,7 +232,7 @@ func (qe *QRepQueryExecutor) processFetchedRows( fieldDescriptions := rows.FieldDescriptions() if !stream.IsSchemaSet() { - schema := fieldDescriptionsToSchema(fieldDescriptions) + schema := qe.fieldDescriptionsToSchema(fieldDescriptions) _ = stream.SetSchema(schema) } @@ -395,7 +413,8 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStream( return totalRecordsFetched, nil } -func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription) (*model.QRecord, error) { +func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription, + customTypeMap map[uint32]string) (*model.QRecord, error) { // make vals an empty array of QValue of size len(fds) record := model.NewQRecord(len(fds)) @@ -405,11 +424,21 @@ func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription) (*model.QRecor } for i, fd := range fds { - tmp, err := parseFieldFromPostgresOID(fd.DataTypeOID, values[i]) - if err != nil { - return nil, fmt.Errorf("failed to parse field: %w", err) + // Check if it's a custom type first + typeName, ok := customTypeMap[fd.DataTypeOID] + if !ok { + tmp, err := parseFieldFromPostgresOID(fd.DataTypeOID, values[i]) + if err != nil { + return nil, fmt.Errorf("failed to parse field: %w", err) + } + record.Set(i, *tmp) + } else { + customTypeVal := qvalue.QValue{ + Kind: customTypeToQKind(typeName), + Value: values[i], + } + record.Set(i, customTypeVal) } - record.Set(i, *tmp) } return record, nil diff --git a/flow/connectors/postgres/qvalue_convert.go b/flow/connectors/postgres/qvalue_convert.go index b9c7dcc904..21f802f6c0 100644 --- a/flow/connectors/postgres/qvalue_convert.go +++ b/flow/connectors/postgres/qvalue_convert.go @@ -55,6 +55,8 @@ func postgresOIDToQValueKind(recvOID uint32) qvalue.QValueKind { return qvalue.QValueKindArrayInt32 case pgtype.Int8ArrayOID: return qvalue.QValueKindArrayInt64 + case pgtype.PointOID: + return qvalue.QValueKindPoint case pgtype.Float4ArrayOID: return qvalue.QValueKindArrayFloat32 case pgtype.Float8ArrayOID: @@ -77,8 +79,10 @@ func postgresOIDToQValueKind(recvOID uint32) qvalue.QValueKind { return qvalue.QValueKindString } else if recvOID == uint32(oid.T_tsquery) { // TSQUERY return qvalue.QValueKindString + } else if recvOID == uint32(oid.T_point) { // POINT + return qvalue.QValueKindPoint } - // log.Warnf("failed to get type name for oid: %v", recvOID) + return qvalue.QValueKindInvalid } else { log.Warnf("unsupported field type: %v - type name - %s; returning as string", recvOID, typeName.Name) @@ -337,6 +341,11 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) ( return nil, fmt.Errorf("failed to parse hstore: %w", err) } val = &qvalue.QValue{Kind: qvalue.QValueKindHStore, Value: hstoreVal} + case qvalue.QValueKindPoint: + xCoord := value.(pgtype.Point).P.X + yCoord := value.(pgtype.Point).P.Y + val = &qvalue.QValue{Kind: qvalue.QValueKindPoint, + Value: fmt.Sprintf("POINT(%f %f)", xCoord, yCoord)} default: // log.Warnf("unhandled QValueKind => %v, parsing as string", qvalueKind) textVal, ok := value.(string) @@ -380,3 +389,16 @@ func numericToRat(numVal *pgtype.Numeric) (*big.Rat, error) { // handle invalid numeric return nil, errors.New("invalid numeric") } + +func customTypeToQKind(typeName string) qvalue.QValueKind { + var qValueKind qvalue.QValueKind + switch typeName { + case "geometry": + qValueKind = qvalue.QValueKindGeometry + case "geography": + qValueKind = qvalue.QValueKindGeography + default: + qValueKind = qvalue.QValueKindString + } + return qValueKind +} diff --git a/flow/connectors/snowflake/qrep.go b/flow/connectors/snowflake/qrep.go index ce0cc48511..094dbcaff9 100644 --- a/flow/connectors/snowflake/qrep.go +++ b/flow/connectors/snowflake/qrep.go @@ -253,7 +253,7 @@ func (c *SnowflakeConnector) ConsolidateQRepPartitions(config *protos.QRepConfig case protos.QRepSyncMode_QREP_SYNC_MODE_MULTI_INSERT: return fmt.Errorf("multi-insert sync mode not supported for snowflake") case protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO: - allCols, err := c.getColsFromTable(destTable) + colInfo, err := c.getColsFromTable(destTable) if err != nil { log.WithFields(log.Fields{ "flowName": config.FlowJobName, @@ -261,6 +261,7 @@ func (c *SnowflakeConnector) ConsolidateQRepPartitions(config *protos.QRepConfig return fmt.Errorf("failed to get columns from table %s: %w", destTable, err) } + allCols := colInfo.Columns err = CopyStageToDestination(c, config, destTable, stageName, allCols) if err != nil { log.WithFields(log.Fields{ @@ -283,7 +284,7 @@ func (c *SnowflakeConnector) CleanupQRepFlow(config *protos.QRepConfig) error { return c.dropStage(config.StagingPath, config.FlowJobName) } -func (c *SnowflakeConnector) getColsFromTable(tableName string) ([]string, error) { +func (c *SnowflakeConnector) getColsFromTable(tableName string) (*model.ColumnInformation, error) { // parse the table name to get the schema and table name components, err := parseTableName(tableName) if err != nil { @@ -296,7 +297,7 @@ func (c *SnowflakeConnector) getColsFromTable(tableName string) ([]string, error //nolint:gosec queryString := fmt.Sprintf(` - SELECT column_name + SELECT column_name, data_type FROM information_schema.columns WHERE UPPER(table_name) = '%s' AND UPPER(table_schema) = '%s' `, components.tableIdentifier, components.schemaIdentifier) @@ -307,16 +308,24 @@ func (c *SnowflakeConnector) getColsFromTable(tableName string) ([]string, error } defer rows.Close() - var cols []string + columnMap := map[string]string{} for rows.Next() { - var col string - if err := rows.Scan(&col); err != nil { + var colName string + var colType string + if err := rows.Scan(&colName, &colType); err != nil { return nil, fmt.Errorf("failed to scan row: %w", err) } - cols = append(cols, col) + columnMap[colName] = colType + } + var cols []string + for k := range columnMap { + cols = append(cols, k) } - return cols, nil + return &model.ColumnInformation{ + ColumnMap: columnMap, + Columns: cols, + }, nil } // dropStage drops the stage for the given job. diff --git a/flow/connectors/snowflake/qrep_avro_sync.go b/flow/connectors/snowflake/qrep_avro_sync.go index 71c7d74d9a..4f5a9b8fc5 100644 --- a/flow/connectors/snowflake/qrep_avro_sync.go +++ b/flow/connectors/snowflake/qrep_avro_sync.go @@ -18,6 +18,11 @@ import ( "go.temporal.io/sdk/activity" ) +type CopyInfo struct { + transformationSQL string + columnsSQL string +} + type SnowflakeAvroSyncMethod struct { config *protos.QRepConfig connector *SnowflakeConnector @@ -73,11 +78,12 @@ func (s *SnowflakeAvroSyncMethod) SyncRecords( "flowName": flowJobName, }).Infof("Created stage %s", stage) - allCols, err := s.connector.getColsFromTable(s.config.DestinationTableIdentifier) + colInfo, err := s.connector.getColsFromTable(s.config.DestinationTableIdentifier) if err != nil { return 0, err } + allCols := colInfo.Columns err = s.putFileToStage(localFilePath, stage) if err != nil { return 0, err @@ -251,6 +257,46 @@ func (s *SnowflakeAvroSyncMethod) putFileToStage(localFilePath string, stage str return nil } +func (sc *SnowflakeConnector) GetCopyTransformation(dstTableName string) (*CopyInfo, error) { + colInfo, colsErr := sc.getColsFromTable(dstTableName) + if colsErr != nil { + return nil, fmt.Errorf("failed to get columns from destination table: %w", colsErr) + } + + var transformations []string + var columnOrder []string + for col, colType := range colInfo.ColumnMap { + if col == "_PEERDB_IS_DELETED" { + continue + } + colName := strings.ToLower(col) + // No need to quote raw table columns + if strings.Contains(dstTableName, "_PEERDB_RAW") { + columnOrder = append(columnOrder, colName) + } else { + columnOrder = append(columnOrder, fmt.Sprintf("\"%s\"", colName)) + } + + switch colType { + case "GEOGRAPHY": + transformations = append(transformations, + fmt.Sprintf("TO_GEOGRAPHY($1:\"%s\"::string) AS \"%s\"", colName, colName)) + case "GEOMETRY": + transformations = append(transformations, + fmt.Sprintf("TO_GEOMETRY($1:\"%s\"::string) AS \"%s\"", colName, colName)) + case "NUMBER": + transformations = append(transformations, + fmt.Sprintf("$1:\"%s\" AS \"%s\"", colName, colName)) + default: + transformations = append(transformations, + fmt.Sprintf("($1:\"%s\")::%s AS \"%s\"", colName, colType, colName)) + } + } + transformationSQL := strings.Join(transformations, ",") + columnsSQL := strings.Join(columnOrder, ",") + return &CopyInfo{transformationSQL, columnsSQL}, nil +} + func CopyStageToDestination( connector *SnowflakeConnector, config *protos.QRepConfig, @@ -263,7 +309,6 @@ func CopyStageToDestination( }).Infof("Copying stage to destination %s", dstTableName) copyOpts := []string{ "FILE_FORMAT = (TYPE = AVRO)", - "MATCH_BY_COLUMN_NAME='CASE_INSENSITIVE'", "PURGE = TRUE", "ON_ERROR = 'CONTINUE'", } @@ -278,9 +323,13 @@ func CopyStageToDestination( } } + copyTransformation, err := connector.GetCopyTransformation(dstTableName) + if err != nil { + return fmt.Errorf("failed to get copy transformation: %w", err) + } switch appendMode { case true: - err := writeHandler.HandleAppendMode(config.FlowJobName) + err := writeHandler.HandleAppendMode(config.FlowJobName, copyTransformation) if err != nil { return fmt.Errorf("failed to handle append mode: %w", err) } @@ -288,7 +337,7 @@ func CopyStageToDestination( case false: upsertKeyCols := config.WriteMode.UpsertKeyColumns err := writeHandler.HandleUpsertMode(allCols, upsertKeyCols, config.WatermarkColumn, - config.FlowJobName) + config.FlowJobName, copyTransformation) if err != nil { return fmt.Errorf("failed to handle upsert mode: %w", err) } @@ -348,9 +397,12 @@ func NewSnowflakeAvroWriteHandler( } } -func (s *SnowflakeAvroWriteHandler) HandleAppendMode(flowJobName string) error { +func (s *SnowflakeAvroWriteHandler) HandleAppendMode( + flowJobName string, + copyInfo *CopyInfo) error { //nolint:gosec - copyCmd := fmt.Sprintf("COPY INTO %s FROM @%s %s", s.dstTableName, s.stage, strings.Join(s.copyOpts, ",")) + copyCmd := fmt.Sprintf("COPY INTO %s(%s) FROM (SELECT %s FROM @%s) %s", + s.dstTableName, copyInfo.columnsSQL, copyInfo.transformationSQL, s.stage, strings.Join(s.copyOpts, ",")) log.Infof("running copy command: %s", copyCmd) _, err := s.connector.database.Exec(copyCmd) if err != nil { @@ -424,6 +476,7 @@ func (s *SnowflakeAvroWriteHandler) HandleUpsertMode( upsertKeyCols []string, watermarkCol string, flowJobName string, + copyInfo *CopyInfo, ) error { runID, err := util.RandomUInt64() if err != nil { @@ -443,8 +496,8 @@ func (s *SnowflakeAvroWriteHandler) HandleUpsertMode( }).Infof("created temp table %s", tempTableName) //nolint:gosec - copyCmd := fmt.Sprintf("COPY INTO %s FROM @%s %s", - tempTableName, s.stage, strings.Join(s.copyOpts, ",")) + copyCmd := fmt.Sprintf("COPY INTO %s(%s) FROM (SELECT %s FROM @%s) %s", + tempTableName, copyInfo.columnsSQL, copyInfo.transformationSQL, s.stage, strings.Join(s.copyOpts, ",")) _, err = s.connector.database.Exec(copyCmd) if err != nil { return fmt.Errorf("failed to run COPY INTO command: %w", err) diff --git a/flow/connectors/snowflake/qvalue_convert.go b/flow/connectors/snowflake/qvalue_convert.go index 32b84dd07e..b88517856d 100644 --- a/flow/connectors/snowflake/qvalue_convert.go +++ b/flow/connectors/snowflake/qvalue_convert.go @@ -27,8 +27,11 @@ var qValueKindToSnowflakeTypeMap = map[qvalue.QValueKind]string{ qvalue.QValueKindTimeTZ: "STRING", qvalue.QValueKindInvalid: "STRING", qvalue.QValueKindHStore: "STRING", + qvalue.QValueKindGeography: "GEOGRAPHY", + qvalue.QValueKindGeometry: "GEOMETRY", + qvalue.QValueKindPoint: "GEOMETRY", - // array types will be mapped to STRING + // array types will be mapped to VARIANT qvalue.QValueKindArrayFloat32: "VARIANT", qvalue.QValueKindArrayFloat64: "VARIANT", qvalue.QValueKindArrayInt32: "VARIANT", @@ -60,6 +63,8 @@ var snowflakeTypeToQValueKindMap = map[string]qvalue.QValueKind{ "DECIMAL": qvalue.QValueKindNumeric, "NUMERIC": qvalue.QValueKindNumeric, "VARIANT": qvalue.QValueKindJSON, + "GEOMETRY": qvalue.QValueKindGeometry, + "GEOGRAPHY": qvalue.QValueKindGeography, } func qValueKindToSnowflakeType(colType qvalue.QValueKind) string { diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 7046746ecc..d691fef970 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -958,6 +958,9 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement( case qvalue.QValueKindBytes, qvalue.QValueKindBit: flattenedCastsSQLArray = append(flattenedCastsSQLArray, fmt.Sprintf("BASE64_DECODE_BINARY(%s:\"%s\") "+ "AS %s,", toVariantColumnName, columnName, targetColumnName)) + case qvalue.QValueKindGeography, qvalue.QValueKindGeometry, qvalue.QValueKindPoint: + flattenedCastsSQLArray = append(flattenedCastsSQLArray, fmt.Sprintf("CAST(%s:\"%s\" AS STRING) AS %s,", + toVariantColumnName, columnName, targetColumnName)) // TODO: https://github.com/PeerDB-io/peerdb/issues/189 - handle time types and interval types // case model.ColumnTypeTime: // flattenedCastsSQLArray = append(flattenedCastsSQLArray, fmt.Sprintf("TIME_FROM_PARTS(0,0,0,%s:%s:"+ diff --git a/flow/connectors/utils/postgres.go b/flow/connectors/utils/postgres.go index 883cf36791..2080d2f3dd 100644 --- a/flow/connectors/utils/postgres.go +++ b/flow/connectors/utils/postgres.go @@ -1,10 +1,12 @@ package utils import ( + "context" "fmt" "net/url" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/jackc/pgx/v5/pgxpool" ) func GetPGConnectionString(pgConfig *protos.PostgresConfig) string { @@ -20,3 +22,28 @@ func GetPGConnectionString(pgConfig *protos.PostgresConfig) string { ) return connString } + +func GetCustomDataTypes(ctx context.Context, pool *pgxpool.Pool) (map[uint32]string, error) { + rows, err := pool.Query(ctx, ` + SELECT t.oid, t.typname as type + FROM pg_type t + LEFT JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace + WHERE (t.typrelid = 0 OR (SELECT c.relkind = 'c' FROM pg_catalog.pg_class c WHERE c.oid = t.typrelid)) + AND NOT EXISTS(SELECT 1 FROM pg_catalog.pg_type el WHERE el.oid = t.typelem AND el.typarray = t.oid) + AND n.nspname NOT IN ('pg_catalog', 'information_schema'); + `) + if err != nil { + return nil, fmt.Errorf("failed to get custom types: %w", err) + } + + customTypeMap := map[uint32]string{} + for rows.Next() { + var typeID uint32 + var typeName string + if err := rows.Scan(&typeID, &typeName); err != nil { + return nil, fmt.Errorf("failed to scan row: %w", err) + } + customTypeMap[typeID] = typeName + } + return customTypeMap, nil +} diff --git a/flow/e2e/bigquery/qrep_flow_bq_test.go b/flow/e2e/bigquery/qrep_flow_bq_test.go index fb0fc3b876..5e6374cc1a 100644 --- a/flow/e2e/bigquery/qrep_flow_bq_test.go +++ b/flow/e2e/bigquery/qrep_flow_bq_test.go @@ -3,8 +3,6 @@ package e2e_bigquery import ( "context" "fmt" - "sort" - "strings" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/e2e" @@ -29,32 +27,6 @@ func (s *PeerFlowE2ETestSuiteBQ) setupBQDestinationTable(dstTable string) { fmt.Printf("created table on bigquery: %s.%s. %v\n", s.bqHelper.Config.DatasetId, dstTable, err) } -func (s *PeerFlowE2ETestSuiteBQ) compareTableSchemasBQ(tableName string) { - // read rows from source table - pgQueryExecutor := connpostgres.NewQRepQueryExecutor(s.pool, context.Background(), "testflow", "testpart") - pgQueryExecutor.SetTestEnv(true) - - pgRows, err := pgQueryExecutor.ExecuteAndProcessQuery( - fmt.Sprintf("SELECT * FROM e2e_test_%s.%s ORDER BY id", bigquerySuffix, tableName), - ) - s.NoError(err) - sort.Slice(pgRows.Schema.Fields, func(i int, j int) bool { - return strings.Compare(pgRows.Schema.Fields[i].Name, pgRows.Schema.Fields[j].Name) == -1 - }) - - // read rows from destination table - qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, tableName) - bqRows, err := s.bqHelper.ExecuteAndProcessQuery( - fmt.Sprintf("SELECT * FROM %s ORDER BY id", qualifiedTableName), - ) - s.NoError(err) - sort.Slice(bqRows.Schema.Fields, func(i int, j int) bool { - return strings.Compare(bqRows.Schema.Fields[i].Name, bqRows.Schema.Fields[j].Name) == -1 - }) - - s.True(pgRows.Schema.EqualNames(bqRows.Schema), "schemas from source and destination tables are not equal") -} - func (s *PeerFlowE2ETestSuiteBQ) compareTableContentsBQ(tableName string, colsString string) { // read rows from source table pgQueryExecutor := connpostgres.NewQRepQueryExecutor(s.pool, context.Background(), "testflow", "testpart") diff --git a/flow/e2e/s3/s3_helper.go b/flow/e2e/s3/s3_helper.go index 5d40d9a0e0..7ea629ad2b 100644 --- a/flow/e2e/s3/s3_helper.go +++ b/flow/e2e/s3/s3_helper.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "os" + "time" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/e2e" @@ -15,14 +16,14 @@ import ( ) const ( - peerName string = "test_s3_peer" - prefixName string = "test-s3" + peerName string = "test_s3_peer" ) type S3TestHelper struct { client *s3.S3 s3Config *protos.S3Config bucketName string + prefix string } func NewS3TestHelper(switchToGCS bool) (*S3TestHelper, error) { @@ -51,10 +52,11 @@ func NewS3TestHelper(switchToGCS bool) (*S3TestHelper, error) { if err != nil { return nil, err } + prefix := fmt.Sprintf("peerdb_test/%d", time.Now().UnixNano()) return &S3TestHelper{ client, &protos.S3Config{ - Url: fmt.Sprintf("s3://%s/%s", bucketName, prefixName), + Url: fmt.Sprintf("s3://%s/%s", bucketName, prefix), AccessKeyId: &config.AccessKeyID, SecretAccessKey: &config.SecretAccessKey, Region: &config.Region, @@ -68,6 +70,7 @@ func NewS3TestHelper(switchToGCS bool) (*S3TestHelper, error) { }, }, bucketName, + prefix, }, nil } @@ -89,7 +92,7 @@ func (h *S3TestHelper) ListAllFiles( ) ([]*s3.Object, error) { Bucket := h.bucketName - Prefix := fmt.Sprintf("%s/%s/", prefixName, jobName) + Prefix := fmt.Sprintf("%s/%s/", h.prefix, jobName) files, err := h.client.ListObjects(&s3.ListObjectsInput{ Bucket: &Bucket, Prefix: &Prefix, @@ -105,7 +108,7 @@ func (h *S3TestHelper) ListAllFiles( // Delete all generated objects during the test func (h *S3TestHelper) CleanUp() error { Bucket := h.bucketName - Prefix := prefixName + Prefix := h.prefix files, err := h.client.ListObjects(&s3.ListObjectsInput{ Bucket: &Bucket, Prefix: &Prefix, diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index de8e8ba9c8..418bcfd63e 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -596,7 +596,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { c14 INET,c15 INTEGER,c16 INTERVAL,c17 JSON,c18 JSONB,c21 MACADDR,c22 MONEY, c23 NUMERIC,c24 OID,c28 REAL,c29 SMALLINT,c30 SMALLSERIAL,c31 SERIAL,c32 TEXT, c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME, c36 TIMETZ,c37 TSQUERY,c38 TSVECTOR, - c39 TXID_SNAPSHOT,c40 UUID,c41 XML); + c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 GEOMETRY(POINT), c43 GEOGRAPHY(POINT), + c44 GEOGRAPHY(POLYGON), c45 GEOGRAPHY(LINESTRING), c46 GEOMETRY(LINESTRING), c47 GEOMETRY(POLYGON)); CREATE OR REPLACE FUNCTION random_bytea(bytea_length integer) RETURNS bytea AS $body$ SELECT decode(string_agg(lpad(to_hex(width_bucket(random(), 0, 1, 256)-1),2,'0') ,''), 'hex') @@ -637,7 +638,10 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { 1.2,1.23,4::oid,1.23,1,1,1,'test',now(),now(),now()::time,now()::timetz, 'fat & rat'::tsquery,'a fat cat sat on a mat and ate a fat rat'::tsvector, txid_current_snapshot(), - '66073c38-b8df-4bdb-bbca-1c97596b8940'::uuid,xmlcomment('hello'); + '66073c38-b8df-4bdb-bbca-1c97596b8940'::uuid,xmlcomment('hello'), + 'POINT(1 2)','POINT(40.7128 -74.0060)','POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))', + 'LINESTRING(-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831)','LINESTRING(0 0, 1 1, 2 2)', + 'POLYGON((-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831, -74.0060 40.7128))'; `, srcTableName)) s.NoError(err) fmt.Println("Executed an insert with all types") @@ -656,7 +660,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { noNulls, err := s.sfHelper.CheckNull("test_types_sf", []string{"c41", "c1", "c2", "c3", "c4", "c6", "c39", "c40", "id", "c9", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", "c21", "c22", "c23", "c24", "c28", "c29", "c30", "c31", "c33", "c34", "c35", "c36", - "c37", "c38", "c7", "c8", "c32"}) + "c37", "c38", "c7", "c8", "c32", "c42", "c43", "c44", "c45", "c46"}) if err != nil { fmt.Println("error %w", err) } @@ -679,7 +683,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC() { c14 INET,c15 INTEGER,c16 INTERVAL,c17 JSON,c18 JSONB,c21 MACADDR,c22 MONEY, c23 NUMERIC,c24 OID,c28 REAL,c29 SMALLINT,c30 SMALLSERIAL,c31 SERIAL,c32 TEXT, c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME, c36 TIMETZ,c37 TSQUERY,c38 TSVECTOR, - c39 TXID_SNAPSHOT,c40 UUID,c41 XML); + c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 GEOMETRY(POINT), c43 GEOGRAPHY(POINT), + c44 GEOGRAPHY(POLYGON), c45 GEOGRAPHY(LINESTRING), c46 GEOMETRY(LINESTRING), c47 GEOMETRY(POLYGON)); CREATE OR REPLACE FUNCTION random_bytea(bytea_length integer) RETURNS bytea AS $body$ SELECT decode(string_agg(lpad(to_hex(width_bucket(random(), 0, 1, 256)-1),2,'0') ,''), 'hex') @@ -721,7 +726,10 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC() { 1.2,1.23,4::oid,1.23,1,1,1,'test',now(),now(),now()::time,now()::timetz, 'fat & rat'::tsquery,'a fat cat sat on a mat and ate a fat rat'::tsvector, txid_current_snapshot(), - '66073c38-b8df-4bdb-bbca-1c97596b8940'::uuid,xmlcomment('hello'); + '66073c38-b8df-4bdb-bbca-1c97596b8940'::uuid,xmlcomment('hello'), + 'POINT(1 2)','POINT(40.7128 -74.0060)','POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))', + 'LINESTRING(-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831)','LINESTRING(0 0, 1 1, 2 2)', + 'POLYGON((-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831, -74.0060 40.7128))'; `, srcTableName)) s.NoError(err) fmt.Println("Executed an insert with all types") @@ -740,7 +748,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC() { noNulls, err := s.sfHelper.CheckNull("test_types_sf_avro_cdc", []string{"c41", "c1", "c2", "c3", "c4", "c6", "c39", "c40", "id", "c9", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", "c21", "c22", "c23", "c24", "c28", "c29", "c30", "c31", "c33", "c34", "c35", "c36", - "c37", "c38", "c7", "c8", "c32"}) + "c37", "c38", "c7", "c8", "c32", "c42", "c43", "c44", "c45", "c46"}) if err != nil { fmt.Println("error %w", err) } diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index 01d2532e51..8da1df3c5f 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -3,8 +3,6 @@ package e2e_snowflake import ( "context" "fmt" - "sort" - "strings" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/e2e" @@ -32,33 +30,6 @@ func (s *PeerFlowE2ETestSuiteSF) setupSFDestinationTable(dstTable string) { fmt.Printf("created table on snowflake: %s.%s. %v\n", s.sfHelper.testSchemaName, dstTable, err) } -func (s *PeerFlowE2ETestSuiteSF) compareTableSchemasSF(tableName string) { - // read rows from source table - pgQueryExecutor := connpostgres.NewQRepQueryExecutor(s.pool, context.Background(), "testflow", "testpart") - pgQueryExecutor.SetTestEnv(true) - pgRows, err := pgQueryExecutor.ExecuteAndProcessQuery( - fmt.Sprintf("SELECT * FROM e2e_test_%s.%s LIMIT 0", snowflakeSuffix, tableName), - ) - require.NoError(s.T(), err) - sort.Slice(pgRows.Schema.Fields, func(i int, j int) bool { - return strings.Compare(pgRows.Schema.Fields[i].Name, pgRows.Schema.Fields[j].Name) == -1 - }) - - // read rows from destination table - qualifiedTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tableName) - // excluding soft-delete column during schema conversion - sfSelQuery := fmt.Sprintf(`SELECT * EXCLUDE _PEERDB_IS_DELETED FROM %s LIMIT 0`, qualifiedTableName) - fmt.Printf("running query on snowflake: %s\n", sfSelQuery) - - sfRows, err := s.sfHelper.ExecuteAndProcessQuery(sfSelQuery) - require.NoError(s.T(), err) - sort.Slice(sfRows.Schema.Fields, func(i int, j int) bool { - return strings.Compare(sfRows.Schema.Fields[i].Name, sfRows.Schema.Fields[j].Name) == -1 - }) - - s.True(pgRows.Schema.EqualNames(sfRows.Schema), "schemas from source and destination tables are not equal") -} - func (s *PeerFlowE2ETestSuiteSF) compareTableContentsSF(tableName string, selector string, caseSensitive bool) { // read rows from source table pgQueryExecutor := connpostgres.NewQRepQueryExecutor(s.pool, context.Background(), "testflow", "testpart") diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index f3b3b00fa3..29b7a66ed0 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -152,7 +152,14 @@ func CreateSourceTableQRep(pool *pgxpool.Pool, suffix string, tableName string) "f7 jsonb", "f8 smallint", } - + if strings.Contains(tableName, "sf") { + tblFields = append(tblFields, "geometry_point geometry(point)", + "geography_point geography(point)", + "geometry_linestring geometry(linestring)", + "geography_linestring geography(linestring)", + "geometry_polygon geometry(polygon)", + "geography_polygon geography(polygon)") + } tblFieldStr := strings.Join(tblFields, ",") _, err := pool.Exec(context.Background(), fmt.Sprintf(` @@ -187,6 +194,15 @@ func PopulateSourceTable(pool *pgxpool.Pool, suffix string, tableName string, ro for i := 0; i < rowCount-1; i++ { id := uuid.New().String() ids = append(ids, id) + geoValues := "" + if strings.Contains(tableName, "sf") { + // geo types + geoValues = `,'POINT(1 2)','POINT(40.7128 -74.0060)', + 'LINESTRING(0 0, 1 1, 2 2)', + 'LINESTRING(-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831)', + 'POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))', + 'POLYGON((-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831, -74.0060 40.7128))'` + } row := fmt.Sprintf(` ( '%s', '%s', CURRENT_TIMESTAMP, 3.86487206688919, CURRENT_TIMESTAMP, @@ -198,13 +214,19 @@ func PopulateSourceTable(pool *pgxpool.Pool, suffix string, tableName string, ro CURRENT_TIMESTAMP, 1, ARRAY['text1', 'text2'], ARRAY[123, 456], ARRAY[789, 012], ARRAY['varchar1', 'varchar2'], '{"key": 8.5}', '[{"key1": "value1", "key2": "value2", "key3": "value3"}]', - '{"key": "value"}', 15 + '{"key": "value"}', 15 %s )`, id, uuid.New().String(), uuid.New().String(), - uuid.New().String(), uuid.New().String(), uuid.New().String(), uuid.New().String()) + uuid.New().String(), uuid.New().String(), uuid.New().String(), uuid.New().String(), geoValues) rows = append(rows, row) } + geoColumns := "" + if strings.Contains(tableName, "sf") { + geoColumns = ",geometry_point, geography_point," + + "geometry_linestring, geography_linestring," + + "geometry_polygon, geography_polygon" + } _, err := pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO e2e_test_%s.%s ( id, card_id, "from", price, created_at, @@ -213,9 +235,10 @@ func PopulateSourceTable(pool *pgxpool.Pool, suffix string, tableName string, ro deal_id, ethereum_transaction_id, ignore_price, card_eth_value, paid_eth_price, card_bought_notified, address, account_id, asset_id, status, transaction_id, settled_at, reference_id, - settle_at, settlement_delay_reason, f1, f2, f3, f4, f5, f6, f7, f8 + settle_at, settlement_delay_reason, f1, f2, f3, f4, f5, f6, f7, f8 + %s ) VALUES %s; - `, suffix, tableName, strings.Join(rows, ","))) + `, suffix, tableName, geoColumns, strings.Join(rows, ","))) if err != nil { return err } diff --git a/flow/model/column.go b/flow/model/column.go new file mode 100644 index 0000000000..5cbf25dc2e --- /dev/null +++ b/flow/model/column.go @@ -0,0 +1,8 @@ +package model + +type ColumnInformation struct { + // This is a mapping from column name to column type + // Example: "name" -> "VARCHAR" + ColumnMap map[string]string + Columns []string // List of column names +} diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index 62fefe698d..9b8db89d78 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -36,6 +36,10 @@ func GetAvroSchemaFromQValueKind(kind QValueKind, nullable bool) (*QValueKindAvr return &QValueKindAvroSchema{ AvroLogicalSchema: "string", }, nil + case QValueKindGeometry, QValueKindGeography, QValueKindPoint: + return &QValueKindAvroSchema{ + AvroLogicalSchema: "string", + }, nil case QValueKindInt16, QValueKindInt32, QValueKindInt64: return &QValueKindAvroSchema{ AvroLogicalSchema: "long", @@ -202,6 +206,8 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) { return c.processArrayString() case QValueKindUUID: return c.processUUID() + case QValueKindGeography, QValueKindGeometry, QValueKindPoint: + return c.processGeospatial() default: return nil, fmt.Errorf("[toavro] unsupported QValueKind: %s", c.Value.Kind) } @@ -330,6 +336,22 @@ func (c *QValueAvroConverter) processUUID() (interface{}, error) { return uuidString, nil } +func (c *QValueAvroConverter) processGeospatial() (interface{}, error) { + if c.Value.Value == nil { + return nil, nil + } + + geoString, ok := c.Value.Value.(string) + if !ok { + return nil, fmt.Errorf("[conversion] invalid geospatial value %v", c.Value.Value) + } + + if c.Nullable { + return goavro.Union("string", geoString), nil + } + return geoString, nil +} + func (c *QValueAvroConverter) processArrayInt32() (interface{}, error) { if c.Value.Value == nil && c.Nullable { return nil, nil diff --git a/flow/model/qvalue/kind.go b/flow/model/qvalue/kind.go index 1def708611..1e8e5f5099 100644 --- a/flow/model/qvalue/kind.go +++ b/flow/model/qvalue/kind.go @@ -23,6 +23,9 @@ const ( QValueKindJSON QValueKind = "json" QValueKindBit QValueKind = "bit" QValueKindHStore QValueKind = "hstore" + QValueKindGeography QValueKind = "geography" + QValueKindGeometry QValueKind = "geometry" + QValueKindPoint QValueKind = "point" // array types QValueKindArrayFloat32 QValueKind = "array_float32" diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index fb47ff8ffd..8029a10408 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -336,7 +336,6 @@ func CDCFlowWorkflowWithConfig( cfg.TableNameSchemaMapping[modifiedDstTables[i]] = getModifiedSchemaRes.TableNameSchemaMapping[modifiedSrcTables[i]] } - } }