Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

full table resync for Snowflake QRep #617

Merged
merged 3 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,8 @@ func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context,
return nil
}

func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.RenameTablesInput) (*protos.RenameTablesOutput, error) {
func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.RenameTablesInput) (
*protos.RenameTablesOutput, error) {
dstConn, err := connectors.GetCDCSyncConnector(ctx, config.Peer)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
Expand All @@ -742,3 +743,24 @@ func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.Rena

return sfConn.RenameTables(config)
}

func (a *FlowableActivity) CreateTablesFromExisting(ctx context.Context, req *protos.CreateTablesFromExistingInput) (
*protos.CreateTablesFromExistingOutput, error) {
dstConn, err := connectors.GetCDCSyncConnector(ctx, req.Peer)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
}
defer connectors.CloseConnector(dstConn)

// check if destination is snowflake, if not error out
if req.Peer.Type != protos.DBType_SNOWFLAKE {
return nil, fmt.Errorf("create tables from existing is only supported on snowflake")
}

sfConn, ok := dstConn.(*connsnowflake.SnowflakeConnector)
if !ok {
return nil, fmt.Errorf("failed to cast connector to snowflake connector")
}

return sfConn.CreateTablesFromExisting(req)
}
11 changes: 3 additions & 8 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,6 @@ func (h *FlowRequestHandler) removeFlowEntryInCatalog(

func (h *FlowRequestHandler) CreateQRepFlow(
ctx context.Context, req *protos.CreateQRepFlowRequest) (*protos.CreateQRepFlowResponse, error) {
lastPartition := &protos.QRepPartition{
PartitionId: "not-applicable-partition",
Range: nil,
}

cfg := req.QrepConfig
workflowID := fmt.Sprintf("%s-qrepflow-%s", cfg.FlowJobName, uuid.New())
workflowOptions := client.StartWorkflowOptions{
Expand All @@ -225,14 +220,14 @@ func (h *FlowRequestHandler) CreateQRepFlow(
return nil, fmt.Errorf("unable to create flow job entry: %w", err)
}
}
numPartitionsProcessed := 0

state := peerflow.NewQRepFlowState()
_, err := h.temporalClient.ExecuteWorkflow(
ctx, // context
workflowOptions, // workflow start options
peerflow.QRepFlowWorkflow, // workflow function
cfg, // workflow input
lastPartition, // last partition
numPartitionsProcessed, // number of partitions processed
state,
)
if err != nil {
return nil, fmt.Errorf("unable to start QRepFlow workflow: %w", err)
Expand Down
56 changes: 54 additions & 2 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ func (c *SnowflakeConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.

var res *model.SyncResponse
if req.SyncMode == protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO {
log.Infof("sync mode is for flow %s is AVRO", req.FlowJobName)
log.Infof("sync mode for flow %s is AVRO", req.FlowJobName)
res, err = c.syncRecordsViaAvro(req, rawTableIdentifier, syncBatchID)
if err != nil {
return nil, err
Expand All @@ -516,7 +516,7 @@ func (c *SnowflakeConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.
}()

if req.SyncMode == protos.QRepSyncMode_QREP_SYNC_MODE_MULTI_INSERT {
log.Infof("sync mode is for flow %s is MULTI_INSERT", req.FlowJobName)
log.Infof("sync mode for flow %s is MULTI_INSERT", req.FlowJobName)
res, err = c.syncRecordsViaSQL(req, rawTableIdentifier, syncBatchID, syncRecordsTx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1154,6 +1154,14 @@ func (c *SnowflakeConnector) RenameTables(req *protos.RenameTablesInput) (*proto
if err != nil {
return nil, fmt.Errorf("unable to begin transaction for rename tables: %w", err)
}
defer func() {
deferErr := renameTablesTx.Rollback()
if deferErr != sql.ErrTxDone && deferErr != nil {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Errorf("unexpected error rolling back transaction for renaming tables: %v", err)
}
}()

for _, renameRequest := range req.RenameTableOptions {
src := renameRequest.CurrentName
Expand Down Expand Up @@ -1193,3 +1201,47 @@ func (c *SnowflakeConnector) RenameTables(req *protos.RenameTablesInput) (*proto
FlowJobName: req.FlowJobName,
}, nil
}

func (c *SnowflakeConnector) CreateTablesFromExisting(req *protos.CreateTablesFromExistingInput) (
*protos.CreateTablesFromExistingOutput, error) {
createTablesFromExistingTx, err := c.database.BeginTx(c.ctx, nil)
if err != nil {
return nil, fmt.Errorf("unable to begin transaction for rename tables: %w", err)
}
defer func() {
deferErr := createTablesFromExistingTx.Rollback()
if deferErr != sql.ErrTxDone && deferErr != nil {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Errorf("unexpected error rolling back transaction for creating tables: %v", err)
}
}()

for newTable, existingTable := range req.NewToExistingTableMapping {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Infof("creating table '%s' similar to '%s'", newTable, existingTable)

activity.RecordHeartbeat(c.ctx, fmt.Sprintf("creating table '%s' similar to '%s'", newTable, existingTable))

// rename the src table to dst
_, err = createTablesFromExistingTx.ExecContext(c.ctx,
fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s LIKE %s", newTable, existingTable))
if err != nil {
return nil, fmt.Errorf("unable to create table %s: %w", newTable, err)
}

log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Infof("successfully created table '%s'", newTable)
}

err = createTablesFromExistingTx.Commit()
if err != nil {
return nil, fmt.Errorf("unable to commit transaction for creating tables: %w", err)
}

return &protos.CreateTablesFromExistingOutput{
FlowJobName: req.FlowJobName,
}, nil
}
8 changes: 2 additions & 6 deletions flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,12 +301,8 @@ func CreateQRepWorkflowConfig(
}

func RunQrepFlowWorkflow(env *testsuite.TestWorkflowEnvironment, config *protos.QRepConfig) {
lastPartition := &protos.QRepPartition{
PartitionId: "not-applicable-partition",
Range: nil,
}
numPartitionsProcessed := 0
env.ExecuteWorkflow(peerflow.QRepFlowWorkflow, config, lastPartition, numPartitionsProcessed)
state := peerflow.NewQRepFlowState()
env.ExecuteWorkflow(peerflow.QRepFlowWorkflow, config, state)
}

func GetOwnersSchema() *model.QRecordSchema {
Expand Down
Loading
Loading