Skip to content

Commit

Permalink
CreateNormalizedTable: lift loop, make it based on separate connector
Browse files Browse the repository at this point in the history
Also introduce a generalized GetConnectorAs, use internally for existing functions
  • Loading branch information
serprex committed Feb 13, 2024
1 parent 9081af8 commit 91e623f
Show file tree
Hide file tree
Showing 11 changed files with 332 additions and 371 deletions.
57 changes: 51 additions & 6 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"log/slog"
"sync"
"sync/atomic"
"time"

"github.com/jackc/pglogrepl"
Expand Down Expand Up @@ -141,25 +142,69 @@ func (a *FlowableActivity) GetTableSchema(
return srcConn.GetTableSchema(ctx, config)
}

// CreateNormalizedTable creates a normalized table in the destination flowable.
// CreateNormalizedTable creates normalized tables in destination.
func (a *FlowableActivity) CreateNormalizedTable(
ctx context.Context,
config *protos.SetupNormalizedTableBatchInput,
) (*protos.SetupNormalizedTableBatchOutput, error) {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName)
conn, err := connectors.GetCDCSyncConnector(ctx, config.PeerConnectionConfig)
conn, err := connectors.GetConnectorAs[connectors.NormalizedTablesConnector](ctx, config.PeerConnectionConfig)
if err != nil {
if err == connectors.ErrUnsupportedFunctionality {
activity.GetLogger(ctx).Info("Connector does not implement normalized tables")
return nil, nil
}
return nil, fmt.Errorf("failed to get connector: %w", err)
}
defer connectors.CloseConnector(ctx, conn)

setupNormalizedTablesOutput, err := conn.SetupNormalizedTables(ctx, config)
tx, err := conn.StartSetupNormalizedTables(ctx)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowName, err)
return nil, fmt.Errorf("failed to setup normalized tables: %w", err)
return nil, fmt.Errorf("failed to setup normalized tables tx: %w", err)
}
defer conn.AbortSetupNormalizedTables(ctx, tx)

numTablesSetup := atomic.Uint32{}
totalTables := uint32(len(config.TableNameSchemaMapping))
shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("setting up normalized tables - %d of %d done",
numTablesSetup.Load(), totalTables)
})
defer shutdown()

return setupNormalizedTablesOutput, nil
logger := activity.GetLogger(ctx)
tableExistsMapping := make(map[string]bool)
for tableIdentifier, tableSchema := range config.TableNameSchemaMapping {
created, err := conn.SetupNormalizedTable(
ctx,
tx,
tableIdentifier,
tableSchema,
config.SoftDeleteColName,
config.SyncedAtColName,
)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowName, err)
return nil, fmt.Errorf("failed to setup normalized table %s: %w", tableIdentifier, err)
}
tableExistsMapping[tableIdentifier] = created

numTablesSetup.Add(1)
if created {
logger.Info(fmt.Sprintf("created table %s", tableIdentifier))
} else {
logger.Info(fmt.Sprintf("table already exists %s", tableIdentifier))
}
}

err = conn.FinishSetupNormalizedTables(ctx, tx)
if err != nil {
return nil, fmt.Errorf("failed to commit normalized tables tx: %w", err)
}

return &protos.SetupNormalizedTableBatchOutput{
TableExistsMapping: tableExistsMapping,
}, nil
}

func (a *FlowableActivity) StartFlow(ctx context.Context,
Expand Down
230 changes: 111 additions & 119 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"reflect"
"regexp"
"strings"
"sync/atomic"
"time"

"cloud.google.com/go/bigquery"
Expand Down Expand Up @@ -521,7 +520,7 @@ func (c *BigQueryConnector) NormalizeRecords(ctx context.Context, req *model.Nor
unchangedToastColumns := tableNametoUnchangedToastCols[tableName]
dstDatasetTable, _ := c.convertToDatasetTable(tableName)
mergeGen := &mergeStmtGenerator{
rawDatasetTable: &datasetTable{
rawDatasetTable: datasetTable{
project: c.projectID,
dataset: c.datasetID,
table: rawTableName,
Expand Down Expand Up @@ -648,138 +647,131 @@ func (c *BigQueryConnector) CreateRawTable(ctx context.Context, req *protos.Crea
}, nil
}

// SetupNormalizedTables sets up normalized tables, implementing the Connector interface.
func (c *BigQueryConnector) StartSetupNormalizedTables(_ context.Context) (interface{}, error) {
// needed since CreateNormalizedTable duplicate check isn't accurate enough
return make(map[datasetTable]struct{}), nil
}

func (c *BigQueryConnector) FinishSetupNormalizedTables(_ context.Context) error {
return nil
}

func (c *BigQueryConnector) AbortSetupNormalizedTables(_ context.Context, _ interface{}) {
}

// This runs CREATE TABLE IF NOT EXISTS on bigquery, using the schema and table name provided.
func (c *BigQueryConnector) SetupNormalizedTables(
func (c *BigQueryConnector) SetupNormalizedTable(
ctx context.Context,
req *protos.SetupNormalizedTableBatchInput,
) (*protos.SetupNormalizedTableBatchOutput, error) {
numTablesSetup := atomic.Uint32{}
totalTables := uint32(len(req.TableNameSchemaMapping))

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("setting up normalized tables - %d of %d done",
numTablesSetup.Load(), totalTables)
})
defer shutdown()

tableExistsMapping := make(map[string]bool)
datasetTablesSet := make(map[datasetTable]struct{})
for tableIdentifier, tableSchema := range req.TableNameSchemaMapping {
// only place where we check for parsing errors
datasetTable, err := c.convertToDatasetTable(tableIdentifier)
if err != nil {
return nil, err
}
_, ok := datasetTablesSet[*datasetTable]
if ok {
return nil, fmt.Errorf("invalid mirror: two tables mirror to the same BigQuery table %s",
datasetTable.string())
tx interface{},
tableIdentifier string,
tableSchema *protos.TableSchema,
softDeleteColName string,
syncedAtColName string,
) (bool, error) {
datasetTablesSet := tx.(map[datasetTable]struct{})

// only place where we check for parsing errors
datasetTable, err := c.convertToDatasetTable(tableIdentifier)
if err != nil {
return false, err
}
_, ok := datasetTablesSet[datasetTable]
if ok {
return false, fmt.Errorf("invalid mirror: two tables mirror to the same BigQuery table %s",
datasetTable.string())
}
datasetTablesSet[datasetTable] = struct{}{}
dataset := c.client.DatasetInProject(c.projectID, datasetTable.dataset)
_, err = dataset.Metadata(ctx)
// just assume this means dataset don't exist, and create it
if err != nil {
// if err message does not contain `notFound`, then other error happened.
if !strings.Contains(err.Error(), "notFound") {
return false, fmt.Errorf("error while checking metadata for BigQuery dataset %s: %w",
datasetTable.dataset, err)
}
dataset := c.client.DatasetInProject(c.projectID, datasetTable.dataset)
_, err = dataset.Metadata(ctx)
// just assume this means dataset don't exist, and create it
c.logger.Info(fmt.Sprintf("creating dataset %s...", dataset.DatasetID))
err = dataset.Create(ctx, nil)
if err != nil {
// if err message does not contain `notFound`, then other error happened.
if !strings.Contains(err.Error(), "notFound") {
return nil, fmt.Errorf("error while checking metadata for BigQuery dataset %s: %w",
datasetTable.dataset, err)
}
c.logger.Info(fmt.Sprintf("creating dataset %s...", dataset.DatasetID))
err = dataset.Create(ctx, nil)
if err != nil {
return nil, fmt.Errorf("failed to create BigQuery dataset %s: %w", dataset.DatasetID, err)
}
}
table := dataset.Table(datasetTable.table)

// check if the table exists
_, err = table.Metadata(ctx)
if err == nil {
// table exists, go to next table
tableExistsMapping[tableIdentifier] = true
datasetTablesSet[*datasetTable] = struct{}{}

c.logger.Info(fmt.Sprintf("table already exists %s", tableIdentifier))
numTablesSetup.Add(1)
continue
return false, fmt.Errorf("failed to create BigQuery dataset %s: %w", dataset.DatasetID, err)
}
}
table := dataset.Table(datasetTable.table)

// convert the column names and types to bigquery types
columns := make([]*bigquery.FieldSchema, 0, len(tableSchema.Columns)+2)
for _, column := range tableSchema.Columns {
genericColType := column.Type
if genericColType == "numeric" {
precision, scale := numeric.ParseNumericTypmod(column.TypeModifier)
if column.TypeModifier == -1 || precision > 38 || scale > 37 {
precision = numeric.PeerDBNumericPrecision
scale = numeric.PeerDBNumericScale
}
columns = append(columns, &bigquery.FieldSchema{
Name: column.Name,
Type: bigquery.BigNumericFieldType,
Repeated: qvalue.QValueKind(genericColType).IsArray(),
Precision: int64(precision),
Scale: int64(scale),
})
} else {
columns = append(columns, &bigquery.FieldSchema{
Name: column.Name,
Type: qValueKindToBigQueryType(genericColType),
Repeated: qvalue.QValueKind(genericColType).IsArray(),
})
// check if the table exists
_, err = table.Metadata(ctx)
if err == nil {
// table exists, go to next table
return true, nil
}

// convert the column names and types to bigquery types
columns := make([]*bigquery.FieldSchema, 0, len(tableSchema.Columns)+2)
for _, column := range tableSchema.Columns {
genericColType := column.Type
if genericColType == "numeric" {
precision, scale := numeric.ParseNumericTypmod(column.TypeModifier)
if column.TypeModifier == -1 || precision > 38 || scale > 37 {
precision = numeric.PeerDBNumericPrecision
scale = numeric.PeerDBNumericScale
}
}

if req.SoftDeleteColName != "" {
columns = append(columns, &bigquery.FieldSchema{
Name: req.SoftDeleteColName,
Type: bigquery.BooleanFieldType,
Repeated: false,
Name: column.Name,
Type: bigquery.BigNumericFieldType,
Repeated: qvalue.QValueKind(genericColType).IsArray(),
Precision: int64(precision),
Scale: int64(scale),
})
}

if req.SyncedAtColName != "" {
} else {
columns = append(columns, &bigquery.FieldSchema{
Name: req.SyncedAtColName,
Type: bigquery.TimestampFieldType,
Repeated: false,
Name: column.Name,
Type: qValueKindToBigQueryType(genericColType),
Repeated: qvalue.QValueKind(genericColType).IsArray(),
})
}
}

if softDeleteColName != "" {
columns = append(columns, &bigquery.FieldSchema{
Name: softDeleteColName,
Type: bigquery.BooleanFieldType,
Repeated: false,
})
}

// create the table using the columns
schema := bigquery.Schema(columns)
if syncedAtColName != "" {
columns = append(columns, &bigquery.FieldSchema{
Name: syncedAtColName,
Type: bigquery.TimestampFieldType,
Repeated: false,
})
}

// cluster by the primary key if < 4 columns.
var clustering *bigquery.Clustering
numPkeyCols := len(tableSchema.PrimaryKeyColumns)
if numPkeyCols > 0 && numPkeyCols < 4 {
clustering = &bigquery.Clustering{
Fields: tableSchema.PrimaryKeyColumns,
}
}
// create the table using the columns
schema := bigquery.Schema(columns)

metadata := &bigquery.TableMetadata{
Schema: schema,
Name: datasetTable.table,
Clustering: clustering,
// cluster by the primary key if < 4 columns.
var clustering *bigquery.Clustering
numPkeyCols := len(tableSchema.PrimaryKeyColumns)
if numPkeyCols > 0 && numPkeyCols < 4 {
clustering = &bigquery.Clustering{
Fields: tableSchema.PrimaryKeyColumns,
}
}

err = table.Create(ctx, metadata)
if err != nil {
return nil, fmt.Errorf("failed to create table %s: %w", tableIdentifier, err)
}
metadata := &bigquery.TableMetadata{
Schema: schema,
Name: datasetTable.table,
Clustering: clustering,
}

tableExistsMapping[tableIdentifier] = false
datasetTablesSet[*datasetTable] = struct{}{}
// log that table was created
c.logger.Info(fmt.Sprintf("created table %s", tableIdentifier))
numTablesSetup.Add(1)
err = table.Create(ctx, metadata)
if err != nil {
return false, fmt.Errorf("failed to create table %s: %w", tableIdentifier, err)
}

return &protos.SetupNormalizedTableBatchOutput{
TableExistsMapping: tableExistsMapping,
}, nil
datasetTablesSet[datasetTable] = struct{}{}
return false, nil
}

func (c *BigQueryConnector) SyncFlowCleanup(ctx context.Context, jobName string) error {
Expand Down Expand Up @@ -982,25 +974,25 @@ func (d *datasetTable) string() string {
return fmt.Sprintf("%s.%s.%s", d.project, d.dataset, d.table)
}

func (c *BigQueryConnector) convertToDatasetTable(tableName string) (*datasetTable, error) {
func (c *BigQueryConnector) convertToDatasetTable(tableName string) (datasetTable, error) {
parts := strings.Split(tableName, ".")
if len(parts) == 1 {
return &datasetTable{
return datasetTable{
dataset: c.datasetID,
table: parts[0],
}, nil
} else if len(parts) == 2 {
return &datasetTable{
return datasetTable{
dataset: parts[0],
table: parts[1],
}, nil
} else if len(parts) == 3 {
return &datasetTable{
return datasetTable{
project: parts[0],
dataset: parts[1],
table: parts[2],
}, nil
} else {
return nil, fmt.Errorf("invalid BigQuery table name: %s", tableName)
return datasetTable{}, fmt.Errorf("invalid BigQuery table name: %s", tableName)
}
}
4 changes: 2 additions & 2 deletions flow/connectors/bigquery/merge_stmt_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ import (

type mergeStmtGenerator struct {
// dataset + raw table
rawDatasetTable *datasetTable
rawDatasetTable datasetTable
// destination table name, used to retrieve records from raw table
dstTableName string
// dataset + destination table
dstDatasetTable *datasetTable
dstDatasetTable datasetTable
// last synced batchID.
syncBatchID int64
// last normalized batchID.
Expand Down
Loading

0 comments on commit 91e623f

Please sign in to comment.