diff --git a/dev-peerdb.sh b/dev-peerdb.sh index 53050655f7..09e93defe5 100755 --- a/dev-peerdb.sh +++ b/dev-peerdb.sh @@ -7,4 +7,5 @@ then exit 1 fi -docker compose -f docker-compose-dev.yml up --build +docker compose -f docker-compose-dev.yml up --build\ + --no-attach temporal --no-attach pyroscope --no-attach temporal-ui diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index db9c6ef6c0..7689975602 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -92,8 +92,7 @@ func (s *QRepAvroSyncMethod) SyncRecords( }).Errorf("failed to delete staging table %s: %v", stagingTable, err) } - log.Printf("loaded stage into %s.%s", - datasetID, dstTableName) + log.Printf("loaded stage into %s.%s", datasetID, dstTableName) return numRecords, nil } diff --git a/flow/connectors/snowflake/client.go b/flow/connectors/snowflake/client.go index 14bba3d561..285b4882b8 100644 --- a/flow/connectors/snowflake/client.go +++ b/flow/connectors/snowflake/client.go @@ -10,6 +10,7 @@ import ( peersql "github.com/PeerDB-io/peer-flow/connectors/sql" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model/qvalue" util "github.com/PeerDB-io/peer-flow/utils" ) @@ -55,7 +56,7 @@ func NewSnowflakeClient(ctx context.Context, config *protos.SnowflakeConfig) (*S } genericExecutor := *peersql.NewGenericSQLQueryExecutor( - ctx, database, snowflakeTypeToQValueKindMap, qValueKindToSnowflakeTypeMap) + ctx, database, snowflakeTypeToQValueKindMap, qvalue.QValueKindToSnowflakeTypeMap) return &SnowflakeClient{ GenericSQLQueryExecutor: genericExecutor, diff --git a/flow/connectors/snowflake/qrep_avro_sync.go b/flow/connectors/snowflake/qrep_avro_sync.go index 8202d769fc..8aaa6c73da 100644 --- a/flow/connectors/snowflake/qrep_avro_sync.go +++ b/flow/connectors/snowflake/qrep_avro_sync.go @@ -12,6 +12,7 @@ import ( "github.com/PeerDB-io/peer-flow/connectors/utils/metrics" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/model/qvalue" util "github.com/PeerDB-io/peer-flow/utils" log "github.com/sirupsen/logrus" _ "github.com/snowflakedb/gosnowflake" @@ -122,6 +123,17 @@ func (s *SnowflakeAvroSyncMethod) SyncQRepRecords( "partitionID": partition.PartitionId, }).Infof("sync function called and schema acquired") + err = s.addMissingColumns( + config.FlowJobName, + schema, + dstTableSchema, + dstTableName, + partition, + ) + if err != nil { + return 0, err + } + avroSchema, err := s.getAvroSchema(dstTableName, schema, config.FlowJobName) if err != nil { return 0, err @@ -170,6 +182,78 @@ func (s *SnowflakeAvroSyncMethod) SyncQRepRecords( return numRecords, nil } +func (s *SnowflakeAvroSyncMethod) addMissingColumns( + flowJobName string, + schema *model.QRecordSchema, + dstTableSchema []*sql.ColumnType, + dstTableName string, + partition *protos.QRepPartition, +) error { + // check if avro schema has additional columns compared to destination table + // if so, we need to add those columns to the destination table + colsToTypes := map[string]qvalue.QValueKind{} + for _, col := range schema.Fields { + hasColumn := false + // check ignoring case + for _, dstCol := range dstTableSchema { + if strings.EqualFold(col.Name, dstCol.Name()) { + hasColumn = true + break + } + } + + if !hasColumn { + log.WithFields(log.Fields{ + "flowName": flowJobName, + "partitionID": partition.PartitionId, + }).Infof("adding column %s to destination table %s", col.Name, dstTableName) + colsToTypes[col.Name] = col.Type + } + } + + if len(colsToTypes) > 0 { + tx, err := s.connector.database.Begin() + if err != nil { + return fmt.Errorf("failed to begin transaction: %w", err) + } + + for colName, colType := range colsToTypes { + sfColType, err := colType.ToDWHColumnType(qvalue.QDWHTypeSnowflake) + if err != nil { + return fmt.Errorf("failed to convert QValueKind to Snowflake column type: %w", err) + } + upperCasedColName := strings.ToUpper(colName) + alterTableCmd := fmt.Sprintf("ALTER TABLE %s ", dstTableName) + alterTableCmd += fmt.Sprintf("ADD COLUMN IF NOT EXISTS \"%s\" %s;", upperCasedColName, sfColType) + + log.WithFields(log.Fields{ + "flowName": flowJobName, + "partitionID": partition.PartitionId, + }).Infof("altering destination table %s with command `%s`", dstTableName, alterTableCmd) + + if _, err := tx.Exec(alterTableCmd); err != nil { + return fmt.Errorf("failed to alter destination table: %w", err) + } + } + + if err := tx.Commit(); err != nil { + return fmt.Errorf("failed to commit transaction: %w", err) + } + + log.WithFields(log.Fields{ + "flowName": flowJobName, + "partitionID": partition.PartitionId, + }).Infof("successfully added missing columns to destination table %s", dstTableName) + } else { + log.WithFields(log.Fields{ + "flowName": flowJobName, + "partitionID": partition.PartitionId, + }).Infof("no missing columns found in destination table %s", dstTableName) + } + + return nil +} + func (s *SnowflakeAvroSyncMethod) getAvroSchema( dstTableName string, schema *model.QRecordSchema, diff --git a/flow/connectors/snowflake/qvalue_convert.go b/flow/connectors/snowflake/qvalue_convert.go index b88517856d..421281834c 100644 --- a/flow/connectors/snowflake/qvalue_convert.go +++ b/flow/connectors/snowflake/qvalue_convert.go @@ -6,39 +6,6 @@ import ( "github.com/PeerDB-io/peer-flow/model/qvalue" ) -var qValueKindToSnowflakeTypeMap = map[qvalue.QValueKind]string{ - qvalue.QValueKindBoolean: "BOOLEAN", - qvalue.QValueKindInt16: "INTEGER", - qvalue.QValueKindInt32: "INTEGER", - qvalue.QValueKindInt64: "INTEGER", - qvalue.QValueKindFloat32: "FLOAT", - qvalue.QValueKindFloat64: "FLOAT", - qvalue.QValueKindNumeric: "NUMBER(38, 9)", - qvalue.QValueKindString: "STRING", - qvalue.QValueKindJSON: "VARIANT", - qvalue.QValueKindTimestamp: "TIMESTAMP_NTZ", - qvalue.QValueKindTimestampTZ: "TIMESTAMP_TZ", - qvalue.QValueKindTime: "TIME", - qvalue.QValueKindDate: "DATE", - qvalue.QValueKindBit: "BINARY", - qvalue.QValueKindBytes: "BINARY", - qvalue.QValueKindStruct: "STRING", - qvalue.QValueKindUUID: "STRING", - qvalue.QValueKindTimeTZ: "STRING", - qvalue.QValueKindInvalid: "STRING", - qvalue.QValueKindHStore: "STRING", - qvalue.QValueKindGeography: "GEOGRAPHY", - qvalue.QValueKindGeometry: "GEOMETRY", - qvalue.QValueKindPoint: "GEOMETRY", - - // array types will be mapped to VARIANT - qvalue.QValueKindArrayFloat32: "VARIANT", - qvalue.QValueKindArrayFloat64: "VARIANT", - qvalue.QValueKindArrayInt32: "VARIANT", - qvalue.QValueKindArrayInt64: "VARIANT", - qvalue.QValueKindArrayString: "VARIANT", -} - var snowflakeTypeToQValueKindMap = map[string]qvalue.QValueKind{ "INT": qvalue.QValueKindInt32, "BIGINT": qvalue.QValueKindInt64, @@ -67,11 +34,13 @@ var snowflakeTypeToQValueKindMap = map[string]qvalue.QValueKind{ "GEOGRAPHY": qvalue.QValueKindGeography, } -func qValueKindToSnowflakeType(colType qvalue.QValueKind) string { - if val, ok := qValueKindToSnowflakeTypeMap[colType]; ok { - return val +func qValueKindToSnowflakeType(colType qvalue.QValueKind) (string, error) { + val, err := colType.ToDWHColumnType(qvalue.QDWHTypeSnowflake) + if err != nil { + return "", err } - return "STRING" + + return val, err } func snowflakeTypeToQValueKind(name string) (qvalue.QValueKind, error) { diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 2d96cf314e..84701ad05d 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -452,9 +452,13 @@ func (c *SnowflakeConnector) ReplayTableSchemaDeltas(flowJobName string, } for _, addedColumn := range schemaDelta.AddedColumns { + sfColtype, err := qValueKindToSnowflakeType(qvalue.QValueKind(addedColumn.ColumnType)) + if err != nil { + return fmt.Errorf("failed to convert column type %s to snowflake type: %w", + addedColumn.ColumnType, err) + } _, err = tableSchemaModifyTx.Exec(fmt.Sprintf("ALTER TABLE %s ADD COLUMN \"%s\" %s", - schemaDelta.DstTableName, strings.ToUpper(addedColumn.ColumnName), - qValueKindToSnowflakeType(qvalue.QValueKind(addedColumn.ColumnType)))) + schemaDelta.DstTableName, strings.ToUpper(addedColumn.ColumnName), sfColtype)) if err != nil { return fmt.Errorf("failed to add column %s for table %s: %w", addedColumn.ColumnName, schemaDelta.DstTableName, err) @@ -891,8 +895,12 @@ func generateCreateTableSQLForNormalizedTable( createTableSQLArray := make([]string, 0, len(sourceTableSchema.Columns)) for columnName, genericColumnType := range sourceTableSchema.Columns { columnNameUpper := strings.ToUpper(columnName) - createTableSQLArray = append(createTableSQLArray, fmt.Sprintf(`"%s" %s,`, columnNameUpper, - qValueKindToSnowflakeType(qvalue.QValueKind(genericColumnType)))) + sfColType, err := qValueKindToSnowflakeType(qvalue.QValueKind(genericColumnType)) + if err != nil { + log.Warnf("failed to convert column type %s to snowflake type: %v", genericColumnType, err) + continue + } + createTableSQLArray = append(createTableSQLArray, fmt.Sprintf(`"%s" %s,`, columnNameUpper, sfColType)) } // add a _peerdb_is_deleted column to the normalized table @@ -957,7 +965,12 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement( flattenedCastsSQLArray := make([]string, 0, len(normalizedTableSchema.Columns)) for columnName, genericColumnType := range normalizedTableSchema.Columns { - sfType := qValueKindToSnowflakeType(qvalue.QValueKind(genericColumnType)) + sfType, err := qValueKindToSnowflakeType(qvalue.QValueKind(genericColumnType)) + if err != nil { + return 0, fmt.Errorf("failed to convert column type %s to snowflake type: %w", + genericColumnType, err) + } + targetColumnName := fmt.Sprintf(`"%s"`, strings.ToUpper(columnName)) switch qvalue.QValueKind(genericColumnType) { case qvalue.QValueKindBytes, qvalue.QValueKindBit: diff --git a/flow/model/qvalue/kind.go b/flow/model/qvalue/kind.go index 1e8e5f5099..b2eb1b1e48 100644 --- a/flow/model/qvalue/kind.go +++ b/flow/model/qvalue/kind.go @@ -1,5 +1,7 @@ package qvalue +import "fmt" + type QValueKind string const ( @@ -47,3 +49,48 @@ func QValueKindIsArray(kind QValueKind) bool { return false } } + +var QValueKindToSnowflakeTypeMap = map[QValueKind]string{ + QValueKindBoolean: "BOOLEAN", + QValueKindInt16: "INTEGER", + QValueKindInt32: "INTEGER", + QValueKindInt64: "INTEGER", + QValueKindFloat32: "FLOAT", + QValueKindFloat64: "FLOAT", + QValueKindNumeric: "NUMBER(38, 9)", + QValueKindString: "STRING", + QValueKindJSON: "VARIANT", + QValueKindTimestamp: "TIMESTAMP_NTZ", + QValueKindTimestampTZ: "TIMESTAMP_TZ", + QValueKindTime: "TIME", + QValueKindDate: "DATE", + QValueKindBit: "BINARY", + QValueKindBytes: "BINARY", + QValueKindStruct: "STRING", + QValueKindUUID: "STRING", + QValueKindTimeTZ: "STRING", + QValueKindInvalid: "STRING", + QValueKindHStore: "STRING", + QValueKindGeography: "GEOGRAPHY", + QValueKindGeometry: "GEOMETRY", + QValueKindPoint: "GEOMETRY", + + // array types will be mapped to VARIANT + QValueKindArrayFloat32: "VARIANT", + QValueKindArrayFloat64: "VARIANT", + QValueKindArrayInt32: "VARIANT", + QValueKindArrayInt64: "VARIANT", + QValueKindArrayString: "VARIANT", +} + +func (kind QValueKind) ToDWHColumnType(dwhType QDWHType) (string, error) { + if dwhType != QDWHTypeSnowflake { + return "", fmt.Errorf("unsupported DWH type: %v", dwhType) + } + + if val, ok := QValueKindToSnowflakeTypeMap[kind]; ok { + return val, nil + } else { + return "STRING", nil + } +}