Skip to content

Commit

Permalink
schema changes for QRep BigQuery
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Nov 9, 2023
1 parent 858bcbc commit 7db266f
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 17 deletions.
22 changes: 11 additions & 11 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (c *BigQueryConnector) ReplayTableSchemaDeltas(flowJobName string,
schemaDeltas []*protos.TableSchemaDelta) error {
for _, schemaDelta := range schemaDeltas {
if schemaDelta == nil || len(schemaDelta.AddedColumns) == 0 {
return nil
continue
}

for _, addedColumn := range schemaDelta.AddedColumns {
Expand Down Expand Up @@ -1088,6 +1088,16 @@ func (c *BigQueryConnector) SetupNormalizedTables(
) (*protos.SetupNormalizedTableBatchOutput, error) {
tableExistsMapping := make(map[string]bool)
for tableIdentifier, tableSchema := range req.TableNameSchemaMapping {
table := c.client.Dataset(c.datasetID).Table(tableIdentifier)

// check if the table exists
_, err := table.Metadata(c.ctx)
if err == nil {
// table exists, go to next table
tableExistsMapping[tableIdentifier] = true
continue
}

// convert the column names and types to bigquery types
columns := make([]*bigquery.FieldSchema, len(tableSchema.Columns))
idx := 0
Expand All @@ -1102,16 +1112,6 @@ func (c *BigQueryConnector) SetupNormalizedTables(

// create the table using the columns
schema := bigquery.Schema(columns)
table := c.client.Dataset(c.datasetID).Table(tableIdentifier)

// check if the table exists
_, err := table.Metadata(c.ctx)
if err == nil {
// table exists, go to next table
tableExistsMapping[tableIdentifier] = true
continue
}

err = table.Create(c.ctx, &bigquery.TableMetadata{Schema: schema})
if err != nil {
return nil, fmt.Errorf("failed to create table %s: %w", tableIdentifier, err)
Expand Down
57 changes: 54 additions & 3 deletions flow/connectors/bigquery/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package connbigquery
import (
"fmt"
"reflect"
"strings"
"time"

"cloud.google.com/go/bigquery"
Expand All @@ -20,10 +21,13 @@ func (c *BigQueryConnector) SyncQRepRecords(
) (int, error) {
// Ensure the destination table is available.
destTable := config.DestinationTableIdentifier
bqTable := c.client.Dataset(c.datasetID).Table(destTable)
tblMetadata, err := bqTable.Metadata(c.ctx)
srcSchema, err := stream.Schema()
if err != nil {
return 0, fmt.Errorf("failed to get schema of source table %s: %w", config.WatermarkTable, err)
}
tblMetadata, err := c.replayTableSchemaDeltasQRep(config, partition, srcSchema)
if err != nil {
return 0, fmt.Errorf("failed to get metadata of table %s: %w", destTable, err)
return 0, err
}

done, err := c.isPartitionSynced(partition.PartitionId)
Expand Down Expand Up @@ -57,6 +61,53 @@ func (c *BigQueryConnector) SyncQRepRecords(
}
}

func (c *BigQueryConnector) replayTableSchemaDeltasQRep(config *protos.QRepConfig, partition *protos.QRepPartition,
srcSchema *model.QRecordSchema) (*bigquery.TableMetadata, error) {
destTable := config.DestinationTableIdentifier
bqTable := c.client.Dataset(c.datasetID).Table(destTable)
dstTableMetadata, err := bqTable.Metadata(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to get metadata of table %s: %w", destTable, err)
}

tableSchemaDelta := &protos.TableSchemaDelta{
SrcTableName: config.WatermarkTable,
DstTableName: config.DestinationTableIdentifier,
}

for _, col := range srcSchema.Fields {
hasColumn := false
// check ignoring case
for _, dstCol := range dstTableMetadata.Schema {
if strings.EqualFold(col.Name, dstCol.Name) {
hasColumn = true
break
}
}

if !hasColumn {
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
"partitionID": partition.PartitionId,
}).Infof("adding column %s to destination table %s", col.Name, config.DestinationTableIdentifier)
tableSchemaDelta.AddedColumns = append(tableSchemaDelta.AddedColumns, &protos.DeltaAddedColumn{
ColumnName: col.Name,
ColumnType: string(col.Type),
})
}
}

err = c.ReplayTableSchemaDeltas(config.FlowJobName, []*protos.TableSchemaDelta{tableSchemaDelta})
if err != nil {
return nil, fmt.Errorf("failed to add columns to destination table: %w", err)
}
dstTableMetadata, err = bqTable.Metadata(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to get metadata of table %s: %w", destTable, err)
}
return dstTableMetadata, nil
}

func (c *BigQueryConnector) createMetadataInsertStatement(
partition *protos.QRepPartition,
jobName string,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
log "github.com/sirupsen/logrus"
)

type QRepSyncMethod interface {
type QRepSQLSyncMethod interface {
SyncQRepRecords(
flowJobName string,
dstTableName string,
Expand All @@ -38,6 +38,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords(
startTime := time.Now()

// generate a 128 bit random runID for this run
//nolint:gosec
runID := rand.Int63()

// create a staging table with the same schema as the destination table if it doesn't exist
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ func (c *PostgresConnector) ReplayTableSchemaDeltas(flowJobName string,

for _, schemaDelta := range schemaDeltas {
if schemaDelta == nil || len(schemaDelta.AddedColumns) == 0 {
return nil
continue
}

for _, addedColumn := range schemaDelta.AddedColumns {
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ func (c *SnowflakeConnector) ReplayTableSchemaDeltas(flowJobName string,

for _, schemaDelta := range schemaDeltas {
if schemaDelta == nil || len(schemaDelta.AddedColumns) == 0 {
return nil
continue
}

for _, addedColumn := range schemaDelta.AddedColumns {
Expand Down

0 comments on commit 7db266f

Please sign in to comment.