Skip to content

Commit

Permalink
[snowflake] Add basic schema changes support for QRep (#595)
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Oct 31, 2023
1 parent 7e1723e commit 0982cb7
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 46 deletions.
3 changes: 2 additions & 1 deletion dev-peerdb.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ then
exit 1
fi

docker compose -f docker-compose-dev.yml up --build
docker compose -f docker-compose-dev.yml up --build\
--no-attach temporal --no-attach pyroscope --no-attach temporal-ui
3 changes: 1 addition & 2 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@ func (s *QRepAvroSyncMethod) SyncRecords(
}).Errorf("failed to delete staging table %s: %v", stagingTable, err)
}

log.Printf("loaded stage into %s.%s",
datasetID, dstTableName)
log.Printf("loaded stage into %s.%s", datasetID, dstTableName)

return numRecords, nil
}
Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/snowflake/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

peersql "github.com/PeerDB-io/peer-flow/connectors/sql"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model/qvalue"
util "github.com/PeerDB-io/peer-flow/utils"
)

Expand Down Expand Up @@ -55,7 +56,7 @@ func NewSnowflakeClient(ctx context.Context, config *protos.SnowflakeConfig) (*S
}

genericExecutor := *peersql.NewGenericSQLQueryExecutor(
ctx, database, snowflakeTypeToQValueKindMap, qValueKindToSnowflakeTypeMap)
ctx, database, snowflakeTypeToQValueKindMap, qvalue.QValueKindToSnowflakeTypeMap)

return &SnowflakeClient{
GenericSQLQueryExecutor: genericExecutor,
Expand Down
84 changes: 84 additions & 0 deletions flow/connectors/snowflake/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/PeerDB-io/peer-flow/connectors/utils/metrics"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
util "github.com/PeerDB-io/peer-flow/utils"
log "github.com/sirupsen/logrus"
_ "github.com/snowflakedb/gosnowflake"
Expand Down Expand Up @@ -122,6 +123,17 @@ func (s *SnowflakeAvroSyncMethod) SyncQRepRecords(
"partitionID": partition.PartitionId,
}).Infof("sync function called and schema acquired")

err = s.addMissingColumns(
config.FlowJobName,
schema,
dstTableSchema,
dstTableName,
partition,
)
if err != nil {
return 0, err
}

avroSchema, err := s.getAvroSchema(dstTableName, schema, config.FlowJobName)
if err != nil {
return 0, err
Expand Down Expand Up @@ -170,6 +182,78 @@ func (s *SnowflakeAvroSyncMethod) SyncQRepRecords(
return numRecords, nil
}

func (s *SnowflakeAvroSyncMethod) addMissingColumns(
flowJobName string,
schema *model.QRecordSchema,
dstTableSchema []*sql.ColumnType,
dstTableName string,
partition *protos.QRepPartition,
) error {
// check if avro schema has additional columns compared to destination table
// if so, we need to add those columns to the destination table
colsToTypes := map[string]qvalue.QValueKind{}
for _, col := range schema.Fields {
hasColumn := false
// check ignoring case
for _, dstCol := range dstTableSchema {
if strings.EqualFold(col.Name, dstCol.Name()) {
hasColumn = true
break
}
}

if !hasColumn {
log.WithFields(log.Fields{
"flowName": flowJobName,
"partitionID": partition.PartitionId,
}).Infof("adding column %s to destination table %s", col.Name, dstTableName)
colsToTypes[col.Name] = col.Type
}
}

if len(colsToTypes) > 0 {
tx, err := s.connector.database.Begin()
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}

for colName, colType := range colsToTypes {
sfColType, err := colType.ToDWHColumnType(qvalue.QDWHTypeSnowflake)
if err != nil {
return fmt.Errorf("failed to convert QValueKind to Snowflake column type: %w", err)
}
upperCasedColName := strings.ToUpper(colName)
alterTableCmd := fmt.Sprintf("ALTER TABLE %s ", dstTableName)
alterTableCmd += fmt.Sprintf("ADD COLUMN IF NOT EXISTS \"%s\" %s;", upperCasedColName, sfColType)

log.WithFields(log.Fields{
"flowName": flowJobName,
"partitionID": partition.PartitionId,
}).Infof("altering destination table %s with command `%s`", dstTableName, alterTableCmd)

if _, err := tx.Exec(alterTableCmd); err != nil {
return fmt.Errorf("failed to alter destination table: %w", err)
}
}

if err := tx.Commit(); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}

log.WithFields(log.Fields{
"flowName": flowJobName,
"partitionID": partition.PartitionId,
}).Infof("successfully added missing columns to destination table %s", dstTableName)
} else {
log.WithFields(log.Fields{
"flowName": flowJobName,
"partitionID": partition.PartitionId,
}).Infof("no missing columns found in destination table %s", dstTableName)
}

return nil
}

func (s *SnowflakeAvroSyncMethod) getAvroSchema(
dstTableName string,
schema *model.QRecordSchema,
Expand Down
43 changes: 6 additions & 37 deletions flow/connectors/snowflake/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,6 @@ import (
"github.com/PeerDB-io/peer-flow/model/qvalue"
)

var qValueKindToSnowflakeTypeMap = map[qvalue.QValueKind]string{
qvalue.QValueKindBoolean: "BOOLEAN",
qvalue.QValueKindInt16: "INTEGER",
qvalue.QValueKindInt32: "INTEGER",
qvalue.QValueKindInt64: "INTEGER",
qvalue.QValueKindFloat32: "FLOAT",
qvalue.QValueKindFloat64: "FLOAT",
qvalue.QValueKindNumeric: "NUMBER(38, 9)",
qvalue.QValueKindString: "STRING",
qvalue.QValueKindJSON: "VARIANT",
qvalue.QValueKindTimestamp: "TIMESTAMP_NTZ",
qvalue.QValueKindTimestampTZ: "TIMESTAMP_TZ",
qvalue.QValueKindTime: "TIME",
qvalue.QValueKindDate: "DATE",
qvalue.QValueKindBit: "BINARY",
qvalue.QValueKindBytes: "BINARY",
qvalue.QValueKindStruct: "STRING",
qvalue.QValueKindUUID: "STRING",
qvalue.QValueKindTimeTZ: "STRING",
qvalue.QValueKindInvalid: "STRING",
qvalue.QValueKindHStore: "STRING",
qvalue.QValueKindGeography: "GEOGRAPHY",
qvalue.QValueKindGeometry: "GEOMETRY",
qvalue.QValueKindPoint: "GEOMETRY",

// array types will be mapped to VARIANT
qvalue.QValueKindArrayFloat32: "VARIANT",
qvalue.QValueKindArrayFloat64: "VARIANT",
qvalue.QValueKindArrayInt32: "VARIANT",
qvalue.QValueKindArrayInt64: "VARIANT",
qvalue.QValueKindArrayString: "VARIANT",
}

var snowflakeTypeToQValueKindMap = map[string]qvalue.QValueKind{
"INT": qvalue.QValueKindInt32,
"BIGINT": qvalue.QValueKindInt64,
Expand Down Expand Up @@ -67,11 +34,13 @@ var snowflakeTypeToQValueKindMap = map[string]qvalue.QValueKind{
"GEOGRAPHY": qvalue.QValueKindGeography,
}

func qValueKindToSnowflakeType(colType qvalue.QValueKind) string {
if val, ok := qValueKindToSnowflakeTypeMap[colType]; ok {
return val
func qValueKindToSnowflakeType(colType qvalue.QValueKind) (string, error) {
val, err := colType.ToDWHColumnType(qvalue.QDWHTypeSnowflake)
if err != nil {
return "", err
}
return "STRING"

return val, err
}

func snowflakeTypeToQValueKind(name string) (qvalue.QValueKind, error) {
Expand Down
23 changes: 18 additions & 5 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,9 +452,13 @@ func (c *SnowflakeConnector) ReplayTableSchemaDeltas(flowJobName string,
}

for _, addedColumn := range schemaDelta.AddedColumns {
sfColtype, err := qValueKindToSnowflakeType(qvalue.QValueKind(addedColumn.ColumnType))
if err != nil {
return fmt.Errorf("failed to convert column type %s to snowflake type: %w",
addedColumn.ColumnType, err)
}
_, err = tableSchemaModifyTx.Exec(fmt.Sprintf("ALTER TABLE %s ADD COLUMN \"%s\" %s",
schemaDelta.DstTableName, strings.ToUpper(addedColumn.ColumnName),
qValueKindToSnowflakeType(qvalue.QValueKind(addedColumn.ColumnType))))
schemaDelta.DstTableName, strings.ToUpper(addedColumn.ColumnName), sfColtype))
if err != nil {
return fmt.Errorf("failed to add column %s for table %s: %w", addedColumn.ColumnName,
schemaDelta.DstTableName, err)
Expand Down Expand Up @@ -891,8 +895,12 @@ func generateCreateTableSQLForNormalizedTable(
createTableSQLArray := make([]string, 0, len(sourceTableSchema.Columns))
for columnName, genericColumnType := range sourceTableSchema.Columns {
columnNameUpper := strings.ToUpper(columnName)
createTableSQLArray = append(createTableSQLArray, fmt.Sprintf(`"%s" %s,`, columnNameUpper,
qValueKindToSnowflakeType(qvalue.QValueKind(genericColumnType))))
sfColType, err := qValueKindToSnowflakeType(qvalue.QValueKind(genericColumnType))
if err != nil {
log.Warnf("failed to convert column type %s to snowflake type: %v", genericColumnType, err)
continue
}
createTableSQLArray = append(createTableSQLArray, fmt.Sprintf(`"%s" %s,`, columnNameUpper, sfColType))
}

// add a _peerdb_is_deleted column to the normalized table
Expand Down Expand Up @@ -957,7 +965,12 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement(

flattenedCastsSQLArray := make([]string, 0, len(normalizedTableSchema.Columns))
for columnName, genericColumnType := range normalizedTableSchema.Columns {
sfType := qValueKindToSnowflakeType(qvalue.QValueKind(genericColumnType))
sfType, err := qValueKindToSnowflakeType(qvalue.QValueKind(genericColumnType))
if err != nil {
return 0, fmt.Errorf("failed to convert column type %s to snowflake type: %w",
genericColumnType, err)
}

targetColumnName := fmt.Sprintf(`"%s"`, strings.ToUpper(columnName))
switch qvalue.QValueKind(genericColumnType) {
case qvalue.QValueKindBytes, qvalue.QValueKindBit:
Expand Down
47 changes: 47 additions & 0 deletions flow/model/qvalue/kind.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package qvalue

import "fmt"

type QValueKind string

const (
Expand Down Expand Up @@ -47,3 +49,48 @@ func QValueKindIsArray(kind QValueKind) bool {
return false
}
}

var QValueKindToSnowflakeTypeMap = map[QValueKind]string{
QValueKindBoolean: "BOOLEAN",
QValueKindInt16: "INTEGER",
QValueKindInt32: "INTEGER",
QValueKindInt64: "INTEGER",
QValueKindFloat32: "FLOAT",
QValueKindFloat64: "FLOAT",
QValueKindNumeric: "NUMBER(38, 9)",
QValueKindString: "STRING",
QValueKindJSON: "VARIANT",
QValueKindTimestamp: "TIMESTAMP_NTZ",
QValueKindTimestampTZ: "TIMESTAMP_TZ",
QValueKindTime: "TIME",
QValueKindDate: "DATE",
QValueKindBit: "BINARY",
QValueKindBytes: "BINARY",
QValueKindStruct: "STRING",
QValueKindUUID: "STRING",
QValueKindTimeTZ: "STRING",
QValueKindInvalid: "STRING",
QValueKindHStore: "STRING",
QValueKindGeography: "GEOGRAPHY",
QValueKindGeometry: "GEOMETRY",
QValueKindPoint: "GEOMETRY",

// array types will be mapped to VARIANT
QValueKindArrayFloat32: "VARIANT",
QValueKindArrayFloat64: "VARIANT",
QValueKindArrayInt32: "VARIANT",
QValueKindArrayInt64: "VARIANT",
QValueKindArrayString: "VARIANT",
}

func (kind QValueKind) ToDWHColumnType(dwhType QDWHType) (string, error) {
if dwhType != QDWHTypeSnowflake {
return "", fmt.Errorf("unsupported DWH type: %v", dwhType)
}

if val, ok := QValueKindToSnowflakeTypeMap[kind]; ok {
return val, nil
} else {
return "STRING", nil
}
}

0 comments on commit 0982cb7

Please sign in to comment.