Skip to content

Commit

Permalink
use protos instead
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Dec 28, 2023
1 parent ef6d01e commit 8ca421f
Show file tree
Hide file tree
Showing 12 changed files with 1,090 additions and 2,029 deletions.
18 changes: 8 additions & 10 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,10 @@ type FlowableActivity struct {
// CheckConnection implements CheckConnection.
func (a *FlowableActivity) CheckConnection(
ctx context.Context,
config *protos.Peer,
flowName string,
config *protos.SetupInput,
) (*CheckConnectionResult, error) {
ctx = context.WithValue(ctx, shared.FlowNameKey, flowName)
dstConn, err := connectors.GetCDCSyncConnector(ctx, config)
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName)
dstConn, err := connectors.GetCDCSyncConnector(ctx, config.Peer)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
}
Expand All @@ -67,16 +66,16 @@ func (a *FlowableActivity) CheckConnection(
}

// SetupMetadataTables implements SetupMetadataTables.
func (a *FlowableActivity) SetupMetadataTables(ctx context.Context, config *protos.Peer, flowName string) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, flowName)
dstConn, err := connectors.GetCDCSyncConnector(ctx, config)
func (a *FlowableActivity) SetupMetadataTables(ctx context.Context, config *protos.SetupInput) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName)
dstConn, err := connectors.GetCDCSyncConnector(ctx, config.Peer)
if err != nil {
return fmt.Errorf("failed to get connector: %w", err)
}
defer connectors.CloseConnector(dstConn)

if err := dstConn.SetupMetadataTables(); err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
a.Alerter.LogFlowError(ctx, config.FlowName, err)
return fmt.Errorf("failed to setup metadata tables: %w", err)
}

Expand Down Expand Up @@ -153,9 +152,8 @@ func (a *FlowableActivity) CreateRawTable(
func (a *FlowableActivity) GetTableSchema(
ctx context.Context,
config *protos.GetTableSchemaBatchInput,
flowName string,
) (*protos.GetTableSchemaBatchOutput, error) {
ctx = context.WithValue(ctx, shared.FlowNameKey, flowName)
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName)
srcConn, err := connectors.GetCDCPullConnector(ctx, config.PeerConnectionConfig)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
Expand Down
Loading

0 comments on commit 8ca421f

Please sign in to comment.