Skip to content

Commit

Permalink
passes flowname to more structs
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Dec 5, 2023
1 parent 5f44454 commit 233ec0b
Show file tree
Hide file tree
Showing 13 changed files with 395 additions and 320 deletions.
40 changes: 22 additions & 18 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ type FlowableActivity struct {
func (a *FlowableActivity) CheckConnection(
ctx context.Context,
config *protos.Peer,
flowName string,
) (*CheckConnectionResult, error) {
dstConn, err := connectors.GetCDCSyncConnector(ctx, config)
dstConn, err := connectors.GetCDCSyncConnector(ctx, config, flowName)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
}
Expand All @@ -61,8 +62,9 @@ func (a *FlowableActivity) CheckConnection(
}

// SetupMetadataTables implements SetupMetadataTables.
func (a *FlowableActivity) SetupMetadataTables(ctx context.Context, config *protos.Peer) error {
dstConn, err := connectors.GetCDCSyncConnector(ctx, config)
func (a *FlowableActivity) SetupMetadataTables(ctx context.Context,
config *protos.Peer, flowName string) error {
dstConn, err := connectors.GetCDCSyncConnector(ctx, config, flowName)
if err != nil {
return fmt.Errorf("failed to get connector: %w", err)
}
Expand All @@ -80,7 +82,7 @@ func (a *FlowableActivity) GetLastSyncedID(
ctx context.Context,
config *protos.GetLastSyncedIDInput,
) (*protos.LastSyncState, error) {
dstConn, err := connectors.GetCDCSyncConnector(ctx, config.PeerConnectionConfig)
dstConn, err := connectors.GetCDCSyncConnector(ctx, config.PeerConnectionConfig, config.FlowJobName)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
}
Expand Down Expand Up @@ -114,7 +116,8 @@ func (a *FlowableActivity) CreateRawTable(
config *protos.CreateRawTableInput,
) (*protos.CreateRawTableOutput, error) {
ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogMirrorMonitor)
dstConn, err := connectors.GetCDCSyncConnector(ctx, config.PeerConnectionConfig)
dstConn, err := connectors.GetCDCSyncConnector(ctx,
config.PeerConnectionConfig, config.FlowJobName)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
}
Expand Down Expand Up @@ -151,7 +154,7 @@ func (a *FlowableActivity) CreateNormalizedTable(
ctx context.Context,
config *protos.SetupNormalizedTableBatchInput,
) (*protos.SetupNormalizedTableBatchOutput, error) {
conn, err := connectors.GetCDCSyncConnector(ctx, config.PeerConnectionConfig)
conn, err := connectors.GetCDCSyncConnector(ctx, config.PeerConnectionConfig, config.FlowName)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
}
Expand All @@ -168,7 +171,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,

ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogMirrorMonitor)

dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination)
dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination, input.FlowConnectionConfigs.FlowJobName)
if err != nil {
return nil, fmt.Errorf("failed to get destination connector: %w", err)
}
Expand Down Expand Up @@ -339,9 +342,9 @@ func (a *FlowableActivity) StartNormalize(
) (*model.NormalizeResponse, error) {
conn := input.FlowConnectionConfigs

dstConn, err := connectors.GetCDCNormalizeConnector(ctx, conn.Destination)
dstConn, err := connectors.GetCDCNormalizeConnector(ctx, conn.Destination, input.FlowConnectionConfigs.FlowJobName)
if errors.Is(err, connectors.ErrUnsupportedFunctionality) {
dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination)
dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination, input.FlowConnectionConfigs.FlowJobName)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %v", err)
}
Expand Down Expand Up @@ -404,7 +407,8 @@ func (a *FlowableActivity) ReplayTableSchemaDeltas(
ctx context.Context,
input *protos.ReplayTableSchemaDeltaInput,
) error {
dest, err := connectors.GetCDCNormalizeConnector(ctx, input.FlowConnectionConfigs.Destination)
dest, err := connectors.GetCDCNormalizeConnector(ctx,
input.FlowConnectionConfigs.Destination, input.FlowConnectionConfigs.FlowJobName)
if errors.Is(err, connectors.ErrUnsupportedFunctionality) {
return nil
} else if err != nil {
Expand All @@ -417,7 +421,7 @@ func (a *FlowableActivity) ReplayTableSchemaDeltas(

// SetupQRepMetadataTables sets up the metadata tables for QReplication.
func (a *FlowableActivity) SetupQRepMetadataTables(ctx context.Context, config *protos.QRepConfig) error {
conn, err := connectors.GetQRepSyncConnector(ctx, config.DestinationPeer)
conn, err := connectors.GetQRepSyncConnector(ctx, config.DestinationPeer, config.FlowJobName)
if err != nil {
return fmt.Errorf("failed to get connector: %w", err)
}
Expand Down Expand Up @@ -511,7 +515,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
}
defer connectors.CloseConnector(srcConn)

dstConn, err := connectors.GetQRepSyncConnector(ctx, config.DestinationPeer)
dstConn, err := connectors.GetQRepSyncConnector(ctx, config.DestinationPeer, config.FlowJobName)
if err != nil {
return fmt.Errorf("failed to get qrep destination connector: %w", err)
}
Expand Down Expand Up @@ -606,7 +610,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,

func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config *protos.QRepConfig,
runUUID string) error {
dstConn, err := connectors.GetQRepConsolidateConnector(ctx, config.DestinationPeer)
dstConn, err := connectors.GetQRepConsolidateConnector(ctx, config.DestinationPeer, config.FlowJobName)
if errors.Is(err, connectors.ErrUnsupportedFunctionality) {
return a.CatalogMirrorMonitor.UpdateEndTimeForQRepRun(ctx, runUUID)
} else if err != nil {
Expand All @@ -630,7 +634,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config
}

func (a *FlowableActivity) CleanupQRepFlow(ctx context.Context, config *protos.QRepConfig) error {
dst, err := connectors.GetQRepConsolidateConnector(ctx, config.DestinationPeer)
dst, err := connectors.GetQRepConsolidateConnector(ctx, config.DestinationPeer, config.FlowJobName)
if errors.Is(err, connectors.ErrUnsupportedFunctionality) {
return nil
} else if err != nil {
Expand All @@ -647,7 +651,7 @@ func (a *FlowableActivity) DropFlow(ctx context.Context, config *protos.Shutdown
}
defer connectors.CloseConnector(srcConn)

dstConn, err := connectors.GetCDCSyncConnector(ctx, config.DestinationPeer)
dstConn, err := connectors.GetCDCSyncConnector(ctx, config.DestinationPeer, config.FlowJobName)
if err != nil {
return fmt.Errorf("failed to get destination connector: %w", err)
}
Expand Down Expand Up @@ -801,7 +805,7 @@ func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context,

func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.RenameTablesInput) (
*protos.RenameTablesOutput, error) {
dstConn, err := connectors.GetCDCSyncConnector(ctx, config.Peer)
dstConn, err := connectors.GetCDCSyncConnector(ctx, config.Peer, config.FlowJobName)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
}
Expand All @@ -825,7 +829,7 @@ func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.Rena

func (a *FlowableActivity) CreateTablesFromExisting(ctx context.Context, req *protos.CreateTablesFromExistingInput) (
*protos.CreateTablesFromExistingOutput, error) {
dstConn, err := connectors.GetCDCSyncConnector(ctx, req.Peer)
dstConn, err := connectors.GetCDCSyncConnector(ctx, req.Peer, req.FlowJobName)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
}
Expand Down Expand Up @@ -864,7 +868,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
}
defer connectors.CloseConnector(srcConn)

dstConn, err := connectors.GetQRepSyncConnector(ctx, config.DestinationPeer)
dstConn, err := connectors.GetQRepSyncConnector(ctx, config.DestinationPeer, config.FlowJobName)
if err != nil {
return 0, fmt.Errorf("failed to get qrep destination connector: %w", err)
}
Expand Down
18 changes: 9 additions & 9 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,15 @@ func GetCDCPullConnector(ctx context.Context, config *protos.Peer) (CDCPullConne
}
}

func GetCDCSyncConnector(ctx context.Context, config *protos.Peer) (CDCSyncConnector, error) {
func GetCDCSyncConnector(ctx context.Context, config *protos.Peer, flowName string) (CDCSyncConnector, error) {
inner := config.Config
switch inner.(type) {
case *protos.Peer_PostgresConfig:
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig())
case *protos.Peer_BigqueryConfig:
return connbigquery.NewBigQueryConnector(ctx, config.GetBigqueryConfig())
case *protos.Peer_SnowflakeConfig:
return connsnowflake.NewSnowflakeConnector(ctx, config.GetSnowflakeConfig())
return connsnowflake.NewSnowflakeConnector(ctx, config.GetSnowflakeConfig(), flowName)
case *protos.Peer_EventhubConfig:
return nil, fmt.Errorf("use eventhub group config instead")
case *protos.Peer_EventhubGroupConfig:
Expand All @@ -158,15 +158,15 @@ func GetCDCSyncConnector(ctx context.Context, config *protos.Peer) (CDCSyncConne
}

func GetCDCNormalizeConnector(ctx context.Context,
config *protos.Peer) (CDCNormalizeConnector, error) {
config *protos.Peer, flowName string) (CDCNormalizeConnector, error) {
inner := config.Config
switch inner.(type) {
case *protos.Peer_PostgresConfig:
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig())
case *protos.Peer_BigqueryConfig:
return connbigquery.NewBigQueryConnector(ctx, config.GetBigqueryConfig())
case *protos.Peer_SnowflakeConfig:
return connsnowflake.NewSnowflakeConnector(ctx, config.GetSnowflakeConfig())
return connsnowflake.NewSnowflakeConnector(ctx, config.GetSnowflakeConfig(), flowName)
default:
return nil, ErrUnsupportedFunctionality
}
Expand All @@ -184,15 +184,15 @@ func GetQRepPullConnector(ctx context.Context, config *protos.Peer) (QRepPullCon
}
}

func GetQRepSyncConnector(ctx context.Context, config *protos.Peer) (QRepSyncConnector, error) {
func GetQRepSyncConnector(ctx context.Context, config *protos.Peer, flowName string) (QRepSyncConnector, error) {
inner := config.Config
switch inner.(type) {
case *protos.Peer_PostgresConfig:
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig())
case *protos.Peer_BigqueryConfig:
return connbigquery.NewBigQueryConnector(ctx, config.GetBigqueryConfig())
case *protos.Peer_SnowflakeConfig:
return connsnowflake.NewSnowflakeConnector(ctx, config.GetSnowflakeConfig())
return connsnowflake.NewSnowflakeConnector(ctx, config.GetSnowflakeConfig(), flowName)
case *protos.Peer_S3Config:
return conns3.NewS3Connector(ctx, config.GetS3Config())
default:
Expand Down Expand Up @@ -222,7 +222,7 @@ func GetConnector(ctx context.Context, peer *protos.Peer) (Connector, error) {
if sfConfig == nil {
return nil, fmt.Errorf("missing snowflake config for %s peer %s", peer.Type.String(), peer.Name)
}
return connsnowflake.NewSnowflakeConnector(ctx, sfConfig)
return connsnowflake.NewSnowflakeConnector(ctx, sfConfig, "")
case protos.DBType_SQLSERVER:
sqlServerConfig := peer.GetSqlserverConfig()
if sqlServerConfig == nil {
Expand All @@ -243,11 +243,11 @@ func GetConnector(ctx context.Context, peer *protos.Peer) (Connector, error) {
}

func GetQRepConsolidateConnector(ctx context.Context,
config *protos.Peer) (QRepConsolidateConnector, error) {
config *protos.Peer, flowName string) (QRepConsolidateConnector, error) {
inner := config.Config
switch inner.(type) {
case *protos.Peer_SnowflakeConfig:
return connsnowflake.NewSnowflakeConnector(ctx, config.GetSnowflakeConfig())
return connsnowflake.NewSnowflakeConnector(ctx, config.GetSnowflakeConfig(), flowName)

default:
return nil, ErrUnsupportedFunctionality
Expand Down
Loading

0 comments on commit 233ec0b

Please sign in to comment.