Skip to content

Commit

Permalink
added capability for BQ CDC across datasets
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Dec 26, 2023
1 parent 93d754a commit 737b755
Show file tree
Hide file tree
Showing 9 changed files with 248 additions and 147 deletions.
175 changes: 98 additions & 77 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ import (
const (
/*
Different batch Ids in code/BigQuery
1. batchID - identifier in raw/staging tables on target to depict which batch a row was inserted.
2. stagingBatchID - the random batch id we generate before ingesting into staging table.
helps filter rows in the current batch before inserting into raw table.
1. batchID - identifier in raw table on target to depict which batch a row was inserted.
3. syncBatchID - batch id that was last synced or will be synced
4. normalizeBatchID - batch id that was last normalized or will be normalized.
*/
Expand Down Expand Up @@ -233,24 +231,25 @@ func (c *BigQueryConnector) InitializeTableSchema(req map[string]*protos.TableSc
return nil
}

func (c *BigQueryConnector) waitForTableReady(tblName string) error {
table := c.client.Dataset(c.datasetID).Table(tblName)
func (c *BigQueryConnector) waitForTableReady(datasetTable *datasetTable) error {
table := c.client.Dataset(datasetTable.dataset).Table(datasetTable.table)
maxDuration := 5 * time.Minute
deadline := time.Now().Add(maxDuration)
sleepInterval := 5 * time.Second
attempt := 0

for {
if time.Now().After(deadline) {
return fmt.Errorf("timeout reached while waiting for table %s to be ready", tblName)
return fmt.Errorf("timeout reached while waiting for table %s to be ready", datasetTable)
}

_, err := table.Metadata(c.ctx)
if err == nil {
return nil
}

slog.Info("waiting for table to be ready", slog.String("table", tblName), slog.Int("attempt", attempt))
slog.Info("waiting for table to be ready",
slog.String("table", datasetTable.table), slog.Int("attempt", attempt))
attempt++
time.Sleep(sleepInterval)
}
Expand All @@ -267,9 +266,10 @@ func (c *BigQueryConnector) ReplayTableSchemaDeltas(flowJobName string,
}

for _, addedColumn := range schemaDelta.AddedColumns {
dstDatasetTable, _ := c.convertToDatasetTable(schemaDelta.DstTableName)
_, err := c.client.Query(fmt.Sprintf(
"ALTER TABLE %s.%s ADD COLUMN IF NOT EXISTS `%s` %s", c.datasetID,
schemaDelta.DstTableName, addedColumn.ColumnName,
"ALTER TABLE %s.%s ADD COLUMN IF NOT EXISTS `%s` %s", dstDatasetTable.dataset,
dstDatasetTable.table, addedColumn.ColumnName,
qValueKindToBigQueryType(addedColumn.ColumnType))).Read(c.ctx)
if err != nil {
return fmt.Errorf("failed to add column %s for table %s: %w", addedColumn.ColumnName,
Expand Down Expand Up @@ -593,16 +593,11 @@ func (c *BigQueryConnector) syncRecordsViaAvro(
var entries [10]qvalue.QValue
switch r := record.(type) {
case *model.InsertRecord:

itemsJSON, err := r.Items.ToJSON()
if err != nil {
return nil, fmt.Errorf("failed to create items to json: %v", err)
}

entries[3] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: r.DestinationTableName,
}
entries[4] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: itemsJSON,
Expand All @@ -626,16 +621,11 @@ func (c *BigQueryConnector) syncRecordsViaAvro(
if err != nil {
return nil, fmt.Errorf("failed to create new items to json: %v", err)
}

oldItemsJSON, err := r.OldItems.ToJSON()
if err != nil {
return nil, fmt.Errorf("failed to create old items to json: %v", err)
}

entries[3] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: r.DestinationTableName,
}
entries[4] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: newItemsJSON,
Expand All @@ -660,10 +650,6 @@ func (c *BigQueryConnector) syncRecordsViaAvro(
return nil, fmt.Errorf("failed to create items to json: %v", err)
}

entries[3] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: r.DestinationTableName,
}
entries[4] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: itemsJSON,
Expand Down Expand Up @@ -698,6 +684,10 @@ func (c *BigQueryConnector) syncRecordsViaAvro(
Kind: qvalue.QValueKindInt64,
Value: time.Now().UnixNano(),
}
entries[3] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: record.GetDestinationTableName(),
}
entries[7] = qvalue.QValue{
Kind: qvalue.QValueKindInt64,
Value: syncBatchID,
Expand Down Expand Up @@ -787,14 +777,18 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
c.datasetID, rawTableName, distinctTableNames))

for _, tableName := range distinctTableNames {
dstDatasetTable, _ := c.convertToDatasetTable(tableName)
mergeGen := &mergeStmtGenerator{
Dataset: c.datasetID,
NormalizedTable: tableName,
RawTable: rawTableName,
NormalizedTableSchema: c.tableNameSchemaMapping[tableName],
SyncBatchID: syncBatchID,
NormalizeBatchID: normalizeBatchID,
UnchangedToastColumns: tableNametoUnchangedToastCols[tableName],
rawDatasetTable: &datasetTable{
dataset: c.datasetID,
table: rawTableName,
},
dstTableName: tableName,
dstDatasetTable: dstDatasetTable,
normalizedTableSchema: c.tableNameSchemaMapping[tableName],
syncBatchID: syncBatchID,
normalizeBatchID: normalizeBatchID,
unchangedToastColumns: tableNametoUnchangedToastCols[tableName],
peerdbCols: &protos.PeerDBColumns{
SoftDeleteColName: req.SoftDeleteColName,
SyncedAtColName: req.SyncedAtColName,
Expand Down Expand Up @@ -846,19 +840,6 @@ func (c *BigQueryConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr
{Name: "_peerdb_unchanged_toast_columns", Type: bigquery.StringFieldType},
}

stagingSchema := bigquery.Schema{
{Name: "_peerdb_uid", Type: bigquery.StringFieldType},
{Name: "_peerdb_timestamp", Type: bigquery.TimestampFieldType},
{Name: "_peerdb_timestamp_nanos", Type: bigquery.IntegerFieldType},
{Name: "_peerdb_destination_table_name", Type: bigquery.StringFieldType},
{Name: "_peerdb_data", Type: bigquery.StringFieldType},
{Name: "_peerdb_record_type", Type: bigquery.IntegerFieldType},
{Name: "_peerdb_match_data", Type: bigquery.StringFieldType},
{Name: "_peerdb_batch_id", Type: bigquery.IntegerFieldType},
{Name: "_peerdb_staging_batch_id", Type: bigquery.IntegerFieldType},
{Name: "_peerdb_unchanged_toast_columns", Type: bigquery.StringFieldType},
}

// create the table
table := c.client.Dataset(c.datasetID).Table(rawTableName)

Expand All @@ -883,16 +864,6 @@ func (c *BigQueryConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr
return nil, fmt.Errorf("failed to create table %s.%s: %w", c.datasetID, rawTableName, err)
}

// also create a staging table for this raw table
stagingTableName := c.getStagingTableName(req.FlowJobName)
stagingTable := c.client.Dataset(c.datasetID).Table(stagingTableName)
err = stagingTable.Create(c.ctx, &bigquery.TableMetadata{
Schema: stagingSchema,
})
if err != nil {
return nil, fmt.Errorf("failed to create table %s.%s: %w", c.datasetID, stagingTableName, err)
}

return &protos.CreateRawTableOutput{
TableIdentifier: rawTableName,
}, nil
Expand Down Expand Up @@ -952,14 +923,41 @@ func (c *BigQueryConnector) SetupNormalizedTables(
req *protos.SetupNormalizedTableBatchInput,
) (*protos.SetupNormalizedTableBatchOutput, error) {
tableExistsMapping := make(map[string]bool)
datasetTablesSet := make(map[datasetTable]struct{})
for tableIdentifier, tableSchema := range req.TableNameSchemaMapping {
table := c.client.Dataset(c.datasetID).Table(tableIdentifier)
// only place where we check for parsing errors
datasetTable, err := c.convertToDatasetTable(tableIdentifier)
if err != nil {
return nil, err
}
_, ok := datasetTablesSet[*datasetTable]
if ok {
return nil, fmt.Errorf("invalid mirror: two tables mirror to the same BigQuery table %s",
datasetTable.string())
}
dataset := c.client.Dataset(datasetTable.dataset)
_, err = dataset.Metadata(c.ctx)
// just assume this means dataset don't exist, and create it
if err != nil {
// if err message does not contain `notFound`, then other error happened.
if !strings.Contains(err.Error(), "notFound") {
return nil, fmt.Errorf("error while checking metadata for BigQuery dataset %s: %w",
datasetTable.dataset, err)
}
c.logger.InfoContext(c.ctx, fmt.Sprintf("creating dataset %s...", dataset.DatasetID))
err = dataset.Create(c.ctx, nil)
if err != nil {
return nil, fmt.Errorf("failed to create BigQuery dataset %s: %w", dataset.DatasetID, err)
}
}
table := dataset.Table(datasetTable.table)

// check if the table exists
_, err := table.Metadata(c.ctx)
_, err = table.Metadata(c.ctx)
if err == nil {
// table exists, go to next table
tableExistsMapping[tableIdentifier] = true
datasetTablesSet[*datasetTable] = struct{}{}
continue
}

Expand Down Expand Up @@ -999,6 +997,7 @@ func (c *BigQueryConnector) SetupNormalizedTables(
}

tableExistsMapping[tableIdentifier] = false
datasetTablesSet[*datasetTable] = struct{}{}
// log that table was created
c.logger.Info(fmt.Sprintf("created table %s", tableIdentifier))
}
Expand All @@ -1015,10 +1014,6 @@ func (c *BigQueryConnector) SyncFlowCleanup(jobName string) error {
if err != nil {
return fmt.Errorf("failed to delete raw table: %w", err)
}
err = dataset.Table(c.getStagingTableName(jobName)).Delete(c.ctx)
if err != nil {
return fmt.Errorf("failed to delete staging table: %w", err)
}

// deleting job from metadata table
query := fmt.Sprintf("DELETE FROM %s.%s WHERE mirror_job_name = '%s'", c.datasetID, MirrorJobsTable, jobName)
Expand All @@ -1036,35 +1031,33 @@ func (c *BigQueryConnector) getRawTableName(flowJobName string) string {
return fmt.Sprintf("_peerdb_raw_%s", flowJobName)
}

// getStagingTableName returns the staging table name for the given table identifier.
func (c *BigQueryConnector) getStagingTableName(flowJobName string) string {
// replace all non-alphanumeric characters with _
flowJobName = regexp.MustCompile("[^a-zA-Z0-9]+").ReplaceAllString(flowJobName, "_")
return fmt.Sprintf("_peerdb_staging_%s", flowJobName)
}

func (c *BigQueryConnector) RenameTables(req *protos.RenameTablesInput) (*protos.RenameTablesOutput, error) {
for _, renameRequest := range req.RenameTableOptions {
src := renameRequest.CurrentName
dst := renameRequest.NewName
c.logger.Info(fmt.Sprintf("renaming table '%s' to '%s'...", src, dst))
srcDatasetTable, _ := c.convertToDatasetTable(renameRequest.CurrentName)
dstDatasetTable, _ := c.convertToDatasetTable(renameRequest.NewName)
c.logger.Info(fmt.Sprintf("renaming table '%s' to '%s'...", srcDatasetTable.string(),
dstDatasetTable.string()))

activity.RecordHeartbeat(c.ctx, fmt.Sprintf("renaming table '%s' to '%s'...", src, dst))
activity.RecordHeartbeat(c.ctx, fmt.Sprintf("renaming table '%s' to '%s'...", srcDatasetTable.string(),
dstDatasetTable.string()))

// drop the dst table if exists
_, err := c.client.Query(fmt.Sprintf("DROP TABLE IF EXISTS %s.%s", c.datasetID, dst)).Run(c.ctx)
_, err := c.client.Query(fmt.Sprintf("DROP TABLE IF EXISTS %s.%s",
dstDatasetTable.dataset, dstDatasetTable.table)).Run(c.ctx)
if err != nil {
return nil, fmt.Errorf("unable to drop table %s: %w", dst, err)
return nil, fmt.Errorf("unable to drop table %s: %w", dstDatasetTable.string(), err)
}

// rename the src table to dst
_, err = c.client.Query(fmt.Sprintf("ALTER TABLE %s.%s RENAME TO %s",
c.datasetID, src, dst)).Run(c.ctx)
srcDatasetTable.dataset, srcDatasetTable.table, dstDatasetTable.table)).Run(c.ctx)
if err != nil {
return nil, fmt.Errorf("unable to rename table %s to %s: %w", src, dst, err)
return nil, fmt.Errorf("unable to rename table %s to %s: %w", srcDatasetTable.string(),
dstDatasetTable.string(), err)
}

c.logger.Info(fmt.Sprintf("successfully renamed table '%s' to '%s'", src, dst))
c.logger.Info(fmt.Sprintf("successfully renamed table '%s' to '%s'", srcDatasetTable.string(),
dstDatasetTable.string()))
}

return &protos.RenameTablesOutput{
Expand All @@ -1076,13 +1069,15 @@ func (c *BigQueryConnector) CreateTablesFromExisting(req *protos.CreateTablesFro
*protos.CreateTablesFromExistingOutput, error,
) {
for newTable, existingTable := range req.NewToExistingTableMapping {
newDatasetTable, _ := c.convertToDatasetTable(newTable)
existingDatasetTable, _ := c.convertToDatasetTable(existingTable)
c.logger.Info(fmt.Sprintf("creating table '%s' similar to '%s'", newTable, existingTable))

activity.RecordHeartbeat(c.ctx, fmt.Sprintf("creating table '%s' similar to '%s'", newTable, existingTable))

// rename the src table to dst
_, err := c.client.Query(fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s.%s LIKE %s.%s",
c.datasetID, newTable, c.datasetID, existingTable)).Run(c.ctx)
_, err := c.client.Query(fmt.Sprintf("CREATE TABLE IF NOT EXISTS `%s` LIKE `%s`",
newDatasetTable.string(), existingDatasetTable.string())).Run(c.ctx)
if err != nil {
return nil, fmt.Errorf("unable to create table %s: %w", newTable, err)
}
Expand All @@ -1094,3 +1089,29 @@ func (c *BigQueryConnector) CreateTablesFromExisting(req *protos.CreateTablesFro
FlowJobName: req.FlowJobName,
}, nil
}

type datasetTable struct {
dataset string
table string
}

func (d *datasetTable) string() string {
return fmt.Sprintf("%s.%s", d.dataset, d.table)
}

func (c *BigQueryConnector) convertToDatasetTable(tableName string) (*datasetTable, error) {
parts := strings.Split(tableName, ".")
if len(parts) == 1 {
return &datasetTable{
dataset: c.datasetID,
table: parts[0],
}, nil
} else if len(parts) == 2 {
return &datasetTable{
dataset: parts[0],
table: parts[1],
}, nil
} else {
return nil, fmt.Errorf("invalid BigQuery table name: %s", tableName)
}
}
Loading

0 comments on commit 737b755

Please sign in to comment.