Skip to content

Commit

Permalink
Add a resync options for CREATE MIRROR (cdc option) (#580)
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Oct 26, 2023
1 parent f8d2f23 commit 9654beb
Show file tree
Hide file tree
Showing 12 changed files with 1,744 additions and 723 deletions.
21 changes: 21 additions & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/PeerDB-io/peer-flow/connectors"
connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/connectors/utils/metrics"
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
Expand Down Expand Up @@ -702,3 +703,23 @@ func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context,

return nil
}

func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.RenameTablesInput) (*protos.RenameTablesOutput, error) {
dstConn, err := connectors.GetCDCSyncConnector(ctx, config.Peer)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
}
defer connectors.CloseConnector(dstConn)

// check if destination is snowflake, if not error out
if config.Peer.Type != protos.DBType_SNOWFLAKE {
return nil, fmt.Errorf("rename tables is only supported for snowflake")
}

sfConn, ok := dstConn.(*connsnowflake.SnowflakeConnector)
if !ok {
return nil, fmt.Errorf("failed to cast connector to snowflake connector")
}

return sfConn.RenameTables(config)
}
46 changes: 46 additions & 0 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
"github.com/snowflakedb/gosnowflake"
"go.temporal.io/sdk/activity"
"golang.org/x/exp/maps"
)

Expand Down Expand Up @@ -1149,3 +1150,48 @@ func (c *SnowflakeConnector) generateUpdateStatement(allCols []string, unchanged
}
return updateStmts
}

func (c *SnowflakeConnector) RenameTables(req *protos.RenameTablesInput) (*protos.RenameTablesOutput, error) {
renameTablesTx, err := c.database.BeginTx(c.ctx, nil)
if err != nil {
return nil, fmt.Errorf("unable to begin transaction for rename tables: %w", err)
}

for _, renameRequest := range req.RenameTableOptions {
src := renameRequest.CurrentName
dst := renameRequest.NewName

log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Infof("renaming table '%s' to '%s'...", src, dst)

activity.RecordHeartbeat(c.ctx, fmt.Sprintf("renaming table '%s' to '%s'...", src, dst))

// drop the dst table if exists
_, err = renameTablesTx.ExecContext(c.ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", dst))
if err != nil {
return nil, fmt.Errorf("unable to drop table %s: %w", dst, err)
}

// rename the src table to dst
_, err = renameTablesTx.ExecContext(c.ctx, fmt.Sprintf("ALTER TABLE %s RENAME TO %s", src, dst))
if err != nil {
return nil, fmt.Errorf("unable to rename table %s to %s: %w", src, dst, err)
}

log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Infof("successfully renamed table '%s' to '%s'", src, dst)

activity.RecordHeartbeat(c.ctx, fmt.Sprintf("successfully renamed table '%s' to '%s'", src, dst))
}

err = renameTablesTx.Commit()
if err != nil {
return nil, fmt.Errorf("unable to commit transaction for rename tables: %w", err)
}

return &protos.RenameTablesOutput{
FlowJobName: req.FlowJobName,
}, nil
}
Loading

0 comments on commit 9654beb

Please sign in to comment.