Skip to content

Commit

Permalink
Fix interface implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Feb 13, 2024
1 parent fb506ed commit 770cdc3
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 5 deletions.
4 changes: 2 additions & 2 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,12 @@ func (a *FlowableActivity) CreateNormalizedTable(
ctx context.Context,
config *protos.SetupNormalizedTableBatchInput,
) (*protos.SetupNormalizedTableBatchOutput, error) {
logger := activity.GetLogger(ctx)
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName)
conn, err := connectors.GetConnectorAs[connectors.NormalizedTablesConnector](ctx, config.PeerConnectionConfig)
if err != nil {
if err == connectors.ErrUnsupportedFunctionality {
activity.GetLogger(ctx).Info("Connector does not implement normalized tables")
logger.Info("Connector does not implement normalized tables")
return nil, nil
}
return nil, fmt.Errorf("failed to get connector: %w", err)
Expand All @@ -172,7 +173,6 @@ func (a *FlowableActivity) CreateNormalizedTable(
})
defer shutdown()

logger := activity.GetLogger(ctx)
tableExistsMapping := make(map[string]bool)
for tableIdentifier, tableSchema := range config.TableNameSchemaMapping {
created, err := conn.SetupNormalizedTable(
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ func (c *BigQueryConnector) StartSetupNormalizedTables(_ context.Context) (inter
return make(map[datasetTable]struct{}), nil
}

func (c *BigQueryConnector) FinishSetupNormalizedTables(_ context.Context) error {
func (c *BigQueryConnector) FinishSetupNormalizedTables(_ context.Context, _ interface{}) error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (c *ClickhouseConnector) StartSetupNormalizedTables(_ context.Context) (int
return nil, nil
}

func (c *ClickhouseConnector) FinishSetupNormalizedTables(_ context.Context) error {
func (c *ClickhouseConnector) FinishSetupNormalizedTables(_ context.Context, _ interface{}) error {
return nil
}

Expand Down
5 changes: 5 additions & 0 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,11 @@ var (
_ CDCNormalizeConnector = &connsnowflake.SnowflakeConnector{}
_ CDCNormalizeConnector = &connclickhouse.ClickhouseConnector{}

_ NormalizedTablesConnector = &connpostgres.PostgresConnector{}
_ NormalizedTablesConnector = &connbigquery.BigQueryConnector{}
_ NormalizedTablesConnector = &connsnowflake.SnowflakeConnector{}
_ NormalizedTablesConnector = &connclickhouse.ClickhouseConnector{}

_ QRepPullConnector = &connpostgres.PostgresConnector{}
_ QRepPullConnector = &connsqlserver.SQLServerConnector{}

Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func (c *SnowflakeConnector) StartSetupNormalizedTables(_ context.Context) (inte
return nil, nil
}

func (c *SnowflakeConnector) FinishSetupNormalizedTables(_ context.Context) error {
func (c *SnowflakeConnector) FinishSetupNormalizedTables(_ context.Context, _ interface{}) error {
return nil
}

Expand Down

0 comments on commit 770cdc3

Please sign in to comment.