Skip to content

Commit

Permalink
full table resync for Snowflake QRep
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Nov 6, 2023
1 parent 781b6a7 commit 2f9a76e
Show file tree
Hide file tree
Showing 14 changed files with 1,609 additions and 375 deletions.
24 changes: 23 additions & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,8 @@ func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context,
return nil
}

func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.RenameTablesInput) (*protos.RenameTablesOutput, error) {
func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.RenameTablesInput) (
*protos.RenameTablesOutput, error) {
dstConn, err := connectors.GetCDCSyncConnector(ctx, config.Peer)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
Expand All @@ -742,3 +743,24 @@ func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.Rena

return sfConn.RenameTables(config)
}

func (a *FlowableActivity) CreateTablesFromExisting(ctx context.Context, req *protos.CreateTablesFromExistingInput) (
*protos.CreateTablesFromExistingOutput, error) {
dstConn, err := connectors.GetCDCSyncConnector(ctx, req.Peer)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
}
defer connectors.CloseConnector(dstConn)

// check if destination is snowflake, if not error out
if req.Peer.Type != protos.DBType_SNOWFLAKE {
return nil, fmt.Errorf("rename tables is only supported for snowflake")
}

sfConn, ok := dstConn.(*connsnowflake.SnowflakeConnector)
if !ok {
return nil, fmt.Errorf("failed to cast connector to snowflake connector")
}

return sfConn.CreateTablesFromExisting(req)
}
11 changes: 3 additions & 8 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,6 @@ func (h *FlowRequestHandler) removeFlowEntryInCatalog(

func (h *FlowRequestHandler) CreateQRepFlow(
ctx context.Context, req *protos.CreateQRepFlowRequest) (*protos.CreateQRepFlowResponse, error) {
lastPartition := &protos.QRepPartition{
PartitionId: "not-applicable-partition",
Range: nil,
}

cfg := req.QrepConfig
workflowID := fmt.Sprintf("%s-qrepflow-%s", cfg.FlowJobName, uuid.New())
workflowOptions := client.StartWorkflowOptions{
Expand All @@ -225,14 +220,14 @@ func (h *FlowRequestHandler) CreateQRepFlow(
return nil, fmt.Errorf("unable to create flow job entry: %w", err)
}
}
numPartitionsProcessed := 0

state := peerflow.NewQRepFlowState()
_, err := h.temporalClient.ExecuteWorkflow(
ctx, // context
workflowOptions, // workflow start options
peerflow.QRepFlowWorkflow, // workflow function
cfg, // workflow input
lastPartition, // last partition
numPartitionsProcessed, // number of partitions processed
state,
)
if err != nil {
return nil, fmt.Errorf("unable to start QRepFlow workflow: %w", err)
Expand Down
56 changes: 54 additions & 2 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ func (c *SnowflakeConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.

var res *model.SyncResponse
if req.SyncMode == protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO {
log.Infof("sync mode is for flow %s is AVRO", req.FlowJobName)
log.Infof("sync mode for flow %s is AVRO", req.FlowJobName)
res, err = c.syncRecordsViaAvro(req, rawTableIdentifier, syncBatchID)
if err != nil {
return nil, err
Expand All @@ -516,7 +516,7 @@ func (c *SnowflakeConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.
}()

if req.SyncMode == protos.QRepSyncMode_QREP_SYNC_MODE_MULTI_INSERT {
log.Infof("sync mode is for flow %s is MULTI_INSERT", req.FlowJobName)
log.Infof("sync mode for flow %s is MULTI_INSERT", req.FlowJobName)
res, err = c.syncRecordsViaSQL(req, rawTableIdentifier, syncBatchID, syncRecordsTx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1154,6 +1154,14 @@ func (c *SnowflakeConnector) RenameTables(req *protos.RenameTablesInput) (*proto
if err != nil {
return nil, fmt.Errorf("unable to begin transaction for rename tables: %w", err)
}
defer func() {
deferErr := renameTablesTx.Rollback()
if deferErr != sql.ErrTxDone && deferErr != nil {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Errorf("unexpected error rolling back transaction for renaming tables: %v", err)
}
}()

for _, renameRequest := range req.RenameTableOptions {
src := renameRequest.CurrentName
Expand Down Expand Up @@ -1193,3 +1201,47 @@ func (c *SnowflakeConnector) RenameTables(req *protos.RenameTablesInput) (*proto
FlowJobName: req.FlowJobName,
}, nil
}

func (c *SnowflakeConnector) CreateTablesFromExisting(req *protos.CreateTablesFromExistingInput) (
*protos.CreateTablesFromExistingOutput, error) {
createTablesFromExistingTx, err := c.database.BeginTx(c.ctx, nil)
if err != nil {
return nil, fmt.Errorf("unable to begin transaction for rename tables: %w", err)
}
defer func() {
deferErr := createTablesFromExistingTx.Rollback()
if deferErr != sql.ErrTxDone && deferErr != nil {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Errorf("unexpected error rolling back transaction for creating tables: %v", err)
}
}()

for newTable, existingTable := range req.NewToExistingTableMapping {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Infof("creating table '%s' similar to '%s'", newTable, existingTable)

activity.RecordHeartbeat(c.ctx, fmt.Sprintf("creating table '%s' similar to '%s'", newTable, existingTable))

// rename the src table to dst
_, err = createTablesFromExistingTx.ExecContext(c.ctx,
fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s LIKE %s", newTable, existingTable))
if err != nil {
return nil, fmt.Errorf("unable to create table %s: %w", newTable, err)
}

log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Infof("successfully created table '%s'", newTable)
}

err = createTablesFromExistingTx.Commit()
if err != nil {
return nil, fmt.Errorf("unable to commit transaction for creating tables: %w", err)
}

return &protos.CreateTablesFromExistingOutput{
FlowJobName: req.FlowJobName,
}, nil
}
8 changes: 2 additions & 6 deletions flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,12 +301,8 @@ func CreateQRepWorkflowConfig(
}

func RunQrepFlowWorkflow(env *testsuite.TestWorkflowEnvironment, config *protos.QRepConfig) {
lastPartition := &protos.QRepPartition{
PartitionId: "not-applicable-partition",
Range: nil,
}
numPartitionsProcessed := 0
env.ExecuteWorkflow(peerflow.QRepFlowWorkflow, config, lastPartition, numPartitionsProcessed)
state := peerflow.NewQRepFlowState()
env.ExecuteWorkflow(peerflow.QRepFlowWorkflow, config, state)
}

func GetOwnersSchema() *model.QRecordSchema {
Expand Down
Loading

0 comments on commit 2f9a76e

Please sign in to comment.