From d2b3837a0b5ff150b2438de12863381482c79c6e Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Fri, 10 Nov 2023 03:58:55 +0530 Subject: [PATCH] schema changes for QRep BigQuery --- flow/connectors/bigquery/bigquery.go | 22 +++---- flow/connectors/bigquery/qrep.go | 57 ++++++++++++++++++- .../{qrep_sync_method.go => qrep_sql_sync.go} | 3 +- flow/connectors/postgres/postgres.go | 2 +- flow/connectors/snowflake/snowflake.go | 2 +- 5 files changed, 69 insertions(+), 17 deletions(-) rename flow/connectors/bigquery/{qrep_sync_method.go => qrep_sql_sync.go} (98%) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 7e9089ee95..776b6cc828 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -226,7 +226,7 @@ func (c *BigQueryConnector) ReplayTableSchemaDeltas(flowJobName string, schemaDeltas []*protos.TableSchemaDelta) error { for _, schemaDelta := range schemaDeltas { if schemaDelta == nil || len(schemaDelta.AddedColumns) == 0 { - return nil + continue } for _, addedColumn := range schemaDelta.AddedColumns { @@ -1127,6 +1127,16 @@ func (c *BigQueryConnector) SetupNormalizedTables( ) (*protos.SetupNormalizedTableBatchOutput, error) { tableExistsMapping := make(map[string]bool) for tableIdentifier, tableSchema := range req.TableNameSchemaMapping { + table := c.client.Dataset(c.datasetID).Table(tableIdentifier) + + // check if the table exists + _, err := table.Metadata(c.ctx) + if err == nil { + // table exists, go to next table + tableExistsMapping[tableIdentifier] = true + continue + } + // convert the column names and types to bigquery types columns := make([]*bigquery.FieldSchema, len(tableSchema.Columns)) idx := 0 @@ -1141,16 +1151,6 @@ func (c *BigQueryConnector) SetupNormalizedTables( // create the table using the columns schema := bigquery.Schema(columns) - table := c.client.Dataset(c.datasetID).Table(tableIdentifier) - - // check if the table exists - _, err := table.Metadata(c.ctx) - if err == nil { - // table exists, go to next table - tableExistsMapping[tableIdentifier] = true - continue - } - err = table.Create(c.ctx, &bigquery.TableMetadata{Schema: schema}) if err != nil { return nil, fmt.Errorf("failed to create table %s: %w", tableIdentifier, err) diff --git a/flow/connectors/bigquery/qrep.go b/flow/connectors/bigquery/qrep.go index c0aeea0f93..7f4a5cdb8a 100644 --- a/flow/connectors/bigquery/qrep.go +++ b/flow/connectors/bigquery/qrep.go @@ -3,6 +3,7 @@ package connbigquery import ( "fmt" "reflect" + "strings" "time" "cloud.google.com/go/bigquery" @@ -20,10 +21,13 @@ func (c *BigQueryConnector) SyncQRepRecords( ) (int, error) { // Ensure the destination table is available. destTable := config.DestinationTableIdentifier - bqTable := c.client.Dataset(c.datasetID).Table(destTable) - tblMetadata, err := bqTable.Metadata(c.ctx) + srcSchema, err := stream.Schema() + if err != nil { + return 0, fmt.Errorf("failed to get schema of source table %s: %w", config.WatermarkTable, err) + } + tblMetadata, err := c.replayTableSchemaDeltasQRep(config, partition, srcSchema) if err != nil { - return 0, fmt.Errorf("failed to get metadata of table %s: %w", destTable, err) + return 0, err } done, err := c.isPartitionSynced(partition.PartitionId) @@ -57,6 +61,53 @@ func (c *BigQueryConnector) SyncQRepRecords( } } +func (c *BigQueryConnector) replayTableSchemaDeltasQRep(config *protos.QRepConfig, partition *protos.QRepPartition, + srcSchema *model.QRecordSchema) (*bigquery.TableMetadata, error) { + destTable := config.DestinationTableIdentifier + bqTable := c.client.Dataset(c.datasetID).Table(destTable) + dstTableMetadata, err := bqTable.Metadata(c.ctx) + if err != nil { + return nil, fmt.Errorf("failed to get metadata of table %s: %w", destTable, err) + } + + tableSchemaDelta := &protos.TableSchemaDelta{ + SrcTableName: config.WatermarkTable, + DstTableName: config.DestinationTableIdentifier, + } + + for _, col := range srcSchema.Fields { + hasColumn := false + // check ignoring case + for _, dstCol := range dstTableMetadata.Schema { + if strings.EqualFold(col.Name, dstCol.Name) { + hasColumn = true + break + } + } + + if !hasColumn { + log.WithFields(log.Fields{ + "flowName": config.FlowJobName, + "partitionID": partition.PartitionId, + }).Infof("adding column %s to destination table %s", col.Name, config.DestinationTableIdentifier) + tableSchemaDelta.AddedColumns = append(tableSchemaDelta.AddedColumns, &protos.DeltaAddedColumn{ + ColumnName: col.Name, + ColumnType: string(col.Type), + }) + } + } + + err = c.ReplayTableSchemaDeltas(config.FlowJobName, []*protos.TableSchemaDelta{tableSchemaDelta}) + if err != nil { + return nil, fmt.Errorf("failed to add columns to destination table: %w", err) + } + dstTableMetadata, err = bqTable.Metadata(c.ctx) + if err != nil { + return nil, fmt.Errorf("failed to get metadata of table %s: %w", destTable, err) + } + return dstTableMetadata, nil +} + func (c *BigQueryConnector) createMetadataInsertStatement( partition *protos.QRepPartition, jobName string, diff --git a/flow/connectors/bigquery/qrep_sync_method.go b/flow/connectors/bigquery/qrep_sql_sync.go similarity index 98% rename from flow/connectors/bigquery/qrep_sync_method.go rename to flow/connectors/bigquery/qrep_sql_sync.go index 8a9e42a3b5..3580731dbc 100644 --- a/flow/connectors/bigquery/qrep_sync_method.go +++ b/flow/connectors/bigquery/qrep_sql_sync.go @@ -12,7 +12,7 @@ import ( log "github.com/sirupsen/logrus" ) -type QRepSyncMethod interface { +type QRepSQLSyncMethod interface { SyncQRepRecords( flowJobName string, dstTableName string, @@ -38,6 +38,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords( startTime := time.Now() // generate a 128 bit random runID for this run + //nolint:gosec runID := rand.Int63() // create a staging table with the same schema as the destination table if it doesn't exist diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 923beb265b..9b70da8b9d 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -699,7 +699,7 @@ func (c *PostgresConnector) ReplayTableSchemaDeltas(flowJobName string, for _, schemaDelta := range schemaDeltas { if schemaDelta == nil || len(schemaDelta.AddedColumns) == 0 { - return nil + continue } for _, addedColumn := range schemaDelta.AddedColumns { diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 326b44c109..c2ccc91a1c 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -460,7 +460,7 @@ func (c *SnowflakeConnector) ReplayTableSchemaDeltas(flowJobName string, for _, schemaDelta := range schemaDeltas { if schemaDelta == nil || len(schemaDelta.AddedColumns) == 0 { - return nil + continue } for _, addedColumn := range schemaDelta.AddedColumns {