Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CreateNormalizedTable: lift loop, make it based on separate connector #1280

Merged
merged 4 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 51 additions & 6 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"log/slog"
"sync"
"sync/atomic"
"time"

"github.com/jackc/pglogrepl"
Expand Down Expand Up @@ -141,25 +142,69 @@ func (a *FlowableActivity) GetTableSchema(
return srcConn.GetTableSchema(ctx, config)
}

// CreateNormalizedTable creates a normalized table in the destination flowable.
// CreateNormalizedTable creates normalized tables in destination.
func (a *FlowableActivity) CreateNormalizedTable(
ctx context.Context,
config *protos.SetupNormalizedTableBatchInput,
) (*protos.SetupNormalizedTableBatchOutput, error) {
logger := activity.GetLogger(ctx)
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName)
conn, err := connectors.GetCDCSyncConnector(ctx, config.PeerConnectionConfig)
conn, err := connectors.GetConnectorAs[connectors.NormalizedTablesConnector](ctx, config.PeerConnectionConfig)
if err != nil {
if err == connectors.ErrUnsupportedFunctionality {
logger.Info("Connector does not implement normalized tables")
return nil, nil
}
return nil, fmt.Errorf("failed to get connector: %w", err)
}
defer connectors.CloseConnector(ctx, conn)

setupNormalizedTablesOutput, err := conn.SetupNormalizedTables(ctx, config)
tx, err := conn.StartSetupNormalizedTables(ctx)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowName, err)
return nil, fmt.Errorf("failed to setup normalized tables: %w", err)
return nil, fmt.Errorf("failed to setup normalized tables tx: %w", err)
}
defer conn.AbortSetupNormalizedTables(ctx, tx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defer aborts feels like we will always Abort. I'm wondering if there is a way to say this RollbackIfError or something,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considered MaybeRollback but went with Cleanup to keep it a general "defer cleanup of whatever your tx object is"


numTablesSetup := atomic.Uint32{}
totalTables := uint32(len(config.TableNameSchemaMapping))
shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("setting up normalized tables - %d of %d done",
numTablesSetup.Load(), totalTables)
})
defer shutdown()

return setupNormalizedTablesOutput, nil
tableExistsMapping := make(map[string]bool)
for tableIdentifier, tableSchema := range config.TableNameSchemaMapping {
created, err := conn.SetupNormalizedTable(
ctx,
tx,
tableIdentifier,
tableSchema,
config.SoftDeleteColName,
config.SyncedAtColName,
)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowName, err)
return nil, fmt.Errorf("failed to setup normalized table %s: %w", tableIdentifier, err)
}
tableExistsMapping[tableIdentifier] = created

numTablesSetup.Add(1)
if created {
logger.Info(fmt.Sprintf("created table %s", tableIdentifier))
} else {
logger.Info(fmt.Sprintf("table already exists %s", tableIdentifier))
}
}

err = conn.FinishSetupNormalizedTables(ctx, tx)
if err != nil {
return nil, fmt.Errorf("failed to commit normalized tables tx: %w", err)
}

return &protos.SetupNormalizedTableBatchOutput{
TableExistsMapping: tableExistsMapping,
}, nil
}

func (a *FlowableActivity) StartFlow(ctx context.Context,
Expand Down
230 changes: 111 additions & 119 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"reflect"
"regexp"
"strings"
"sync/atomic"
"time"

"cloud.google.com/go/bigquery"
Expand Down Expand Up @@ -521,7 +520,7 @@ func (c *BigQueryConnector) NormalizeRecords(ctx context.Context, req *model.Nor
unchangedToastColumns := tableNametoUnchangedToastCols[tableName]
dstDatasetTable, _ := c.convertToDatasetTable(tableName)
mergeGen := &mergeStmtGenerator{
rawDatasetTable: &datasetTable{
rawDatasetTable: datasetTable{
project: c.projectID,
dataset: c.datasetID,
table: rawTableName,
Expand Down Expand Up @@ -648,138 +647,131 @@ func (c *BigQueryConnector) CreateRawTable(ctx context.Context, req *protos.Crea
}, nil
}

// SetupNormalizedTables sets up normalized tables, implementing the Connector interface.
func (c *BigQueryConnector) StartSetupNormalizedTables(_ context.Context) (interface{}, error) {
// needed since CreateNormalizedTable duplicate check isn't accurate enough
return make(map[datasetTable]struct{}), nil
}

func (c *BigQueryConnector) FinishSetupNormalizedTables(_ context.Context, _ interface{}) error {
return nil
}

func (c *BigQueryConnector) AbortSetupNormalizedTables(_ context.Context, _ interface{}) {
}

// This runs CREATE TABLE IF NOT EXISTS on bigquery, using the schema and table name provided.
func (c *BigQueryConnector) SetupNormalizedTables(
func (c *BigQueryConnector) SetupNormalizedTable(
ctx context.Context,
req *protos.SetupNormalizedTableBatchInput,
) (*protos.SetupNormalizedTableBatchOutput, error) {
numTablesSetup := atomic.Uint32{}
totalTables := uint32(len(req.TableNameSchemaMapping))

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("setting up normalized tables - %d of %d done",
numTablesSetup.Load(), totalTables)
})
defer shutdown()

tableExistsMapping := make(map[string]bool)
datasetTablesSet := make(map[datasetTable]struct{})
for tableIdentifier, tableSchema := range req.TableNameSchemaMapping {
// only place where we check for parsing errors
datasetTable, err := c.convertToDatasetTable(tableIdentifier)
if err != nil {
return nil, err
}
_, ok := datasetTablesSet[*datasetTable]
if ok {
return nil, fmt.Errorf("invalid mirror: two tables mirror to the same BigQuery table %s",
datasetTable.string())
tx interface{},
tableIdentifier string,
tableSchema *protos.TableSchema,
softDeleteColName string,
syncedAtColName string,
) (bool, error) {
datasetTablesSet := tx.(map[datasetTable]struct{})

// only place where we check for parsing errors
datasetTable, err := c.convertToDatasetTable(tableIdentifier)
if err != nil {
return false, err
}
_, ok := datasetTablesSet[datasetTable]
if ok {
return false, fmt.Errorf("invalid mirror: two tables mirror to the same BigQuery table %s",
datasetTable.string())
}
datasetTablesSet[datasetTable] = struct{}{}
dataset := c.client.DatasetInProject(c.projectID, datasetTable.dataset)
_, err = dataset.Metadata(ctx)
// just assume this means dataset don't exist, and create it
if err != nil {
// if err message does not contain `notFound`, then other error happened.
if !strings.Contains(err.Error(), "notFound") {
return false, fmt.Errorf("error while checking metadata for BigQuery dataset %s: %w",
datasetTable.dataset, err)
}
dataset := c.client.DatasetInProject(c.projectID, datasetTable.dataset)
_, err = dataset.Metadata(ctx)
// just assume this means dataset don't exist, and create it
c.logger.Info(fmt.Sprintf("creating dataset %s...", dataset.DatasetID))
err = dataset.Create(ctx, nil)
if err != nil {
// if err message does not contain `notFound`, then other error happened.
if !strings.Contains(err.Error(), "notFound") {
return nil, fmt.Errorf("error while checking metadata for BigQuery dataset %s: %w",
datasetTable.dataset, err)
}
c.logger.Info(fmt.Sprintf("creating dataset %s...", dataset.DatasetID))
err = dataset.Create(ctx, nil)
if err != nil {
return nil, fmt.Errorf("failed to create BigQuery dataset %s: %w", dataset.DatasetID, err)
}
}
table := dataset.Table(datasetTable.table)

// check if the table exists
_, err = table.Metadata(ctx)
if err == nil {
// table exists, go to next table
tableExistsMapping[tableIdentifier] = true
datasetTablesSet[*datasetTable] = struct{}{}

c.logger.Info(fmt.Sprintf("table already exists %s", tableIdentifier))
numTablesSetup.Add(1)
continue
return false, fmt.Errorf("failed to create BigQuery dataset %s: %w", dataset.DatasetID, err)
}
}
table := dataset.Table(datasetTable.table)

// convert the column names and types to bigquery types
columns := make([]*bigquery.FieldSchema, 0, len(tableSchema.Columns)+2)
for _, column := range tableSchema.Columns {
genericColType := column.Type
if genericColType == "numeric" {
precision, scale := numeric.ParseNumericTypmod(column.TypeModifier)
if column.TypeModifier == -1 || precision > 38 || scale > 37 {
precision = numeric.PeerDBNumericPrecision
scale = numeric.PeerDBNumericScale
}
columns = append(columns, &bigquery.FieldSchema{
Name: column.Name,
Type: bigquery.BigNumericFieldType,
Repeated: qvalue.QValueKind(genericColType).IsArray(),
Precision: int64(precision),
Scale: int64(scale),
})
} else {
columns = append(columns, &bigquery.FieldSchema{
Name: column.Name,
Type: qValueKindToBigQueryType(genericColType),
Repeated: qvalue.QValueKind(genericColType).IsArray(),
})
// check if the table exists
_, err = table.Metadata(ctx)
if err == nil {
// table exists, go to next table
return true, nil
}

// convert the column names and types to bigquery types
columns := make([]*bigquery.FieldSchema, 0, len(tableSchema.Columns)+2)
for _, column := range tableSchema.Columns {
genericColType := column.Type
if genericColType == "numeric" {
precision, scale := numeric.ParseNumericTypmod(column.TypeModifier)
if column.TypeModifier == -1 || precision > 38 || scale > 37 {
precision = numeric.PeerDBNumericPrecision
scale = numeric.PeerDBNumericScale
}
}

if req.SoftDeleteColName != "" {
columns = append(columns, &bigquery.FieldSchema{
Name: req.SoftDeleteColName,
Type: bigquery.BooleanFieldType,
Repeated: false,
Name: column.Name,
Type: bigquery.BigNumericFieldType,
Repeated: qvalue.QValueKind(genericColType).IsArray(),
Precision: int64(precision),
Scale: int64(scale),
})
}

if req.SyncedAtColName != "" {
} else {
columns = append(columns, &bigquery.FieldSchema{
Name: req.SyncedAtColName,
Type: bigquery.TimestampFieldType,
Repeated: false,
Name: column.Name,
Type: qValueKindToBigQueryType(genericColType),
Repeated: qvalue.QValueKind(genericColType).IsArray(),
})
}
}

if softDeleteColName != "" {
columns = append(columns, &bigquery.FieldSchema{
Name: softDeleteColName,
Type: bigquery.BooleanFieldType,
Repeated: false,
})
}

// create the table using the columns
schema := bigquery.Schema(columns)
if syncedAtColName != "" {
columns = append(columns, &bigquery.FieldSchema{
Name: syncedAtColName,
Type: bigquery.TimestampFieldType,
Repeated: false,
})
}

// cluster by the primary key if < 4 columns.
var clustering *bigquery.Clustering
numPkeyCols := len(tableSchema.PrimaryKeyColumns)
if numPkeyCols > 0 && numPkeyCols < 4 {
clustering = &bigquery.Clustering{
Fields: tableSchema.PrimaryKeyColumns,
}
}
// create the table using the columns
schema := bigquery.Schema(columns)

metadata := &bigquery.TableMetadata{
Schema: schema,
Name: datasetTable.table,
Clustering: clustering,
// cluster by the primary key if < 4 columns.
var clustering *bigquery.Clustering
numPkeyCols := len(tableSchema.PrimaryKeyColumns)
if numPkeyCols > 0 && numPkeyCols < 4 {
clustering = &bigquery.Clustering{
Fields: tableSchema.PrimaryKeyColumns,
}
}

err = table.Create(ctx, metadata)
if err != nil {
return nil, fmt.Errorf("failed to create table %s: %w", tableIdentifier, err)
}
metadata := &bigquery.TableMetadata{
Schema: schema,
Name: datasetTable.table,
Clustering: clustering,
}

tableExistsMapping[tableIdentifier] = false
datasetTablesSet[*datasetTable] = struct{}{}
// log that table was created
c.logger.Info(fmt.Sprintf("created table %s", tableIdentifier))
numTablesSetup.Add(1)
err = table.Create(ctx, metadata)
if err != nil {
return false, fmt.Errorf("failed to create table %s: %w", tableIdentifier, err)
}

return &protos.SetupNormalizedTableBatchOutput{
TableExistsMapping: tableExistsMapping,
}, nil
datasetTablesSet[datasetTable] = struct{}{}
return false, nil
}

func (c *BigQueryConnector) SyncFlowCleanup(ctx context.Context, jobName string) error {
Expand Down Expand Up @@ -982,25 +974,25 @@ func (d *datasetTable) string() string {
return fmt.Sprintf("%s.%s.%s", d.project, d.dataset, d.table)
}

func (c *BigQueryConnector) convertToDatasetTable(tableName string) (*datasetTable, error) {
func (c *BigQueryConnector) convertToDatasetTable(tableName string) (datasetTable, error) {
parts := strings.Split(tableName, ".")
if len(parts) == 1 {
return &datasetTable{
return datasetTable{
dataset: c.datasetID,
table: parts[0],
}, nil
} else if len(parts) == 2 {
return &datasetTable{
return datasetTable{
dataset: parts[0],
table: parts[1],
}, nil
} else if len(parts) == 3 {
return &datasetTable{
return datasetTable{
project: parts[0],
dataset: parts[1],
table: parts[2],
}, nil
} else {
return nil, fmt.Errorf("invalid BigQuery table name: %s", tableName)
return datasetTable{}, fmt.Errorf("invalid BigQuery table name: %s", tableName)
}
}
4 changes: 2 additions & 2 deletions flow/connectors/bigquery/merge_stmt_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ import (

type mergeStmtGenerator struct {
// dataset + raw table
rawDatasetTable *datasetTable
rawDatasetTable datasetTable
// destination table name, used to retrieve records from raw table
dstTableName string
// dataset + destination table
dstDatasetTable *datasetTable
dstDatasetTable datasetTable
// last synced batchID.
syncBatchID int64
// last normalized batchID.
Expand Down
Loading
Loading