diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index d63433a0f5..96166de6ef 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -147,11 +147,12 @@ func (a *FlowableActivity) CreateNormalizedTable( ctx context.Context, config *protos.SetupNormalizedTableBatchInput, ) (*protos.SetupNormalizedTableBatchOutput, error) { + logger := activity.GetLogger(ctx) ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName) conn, err := connectors.GetConnectorAs[connectors.NormalizedTablesConnector](ctx, config.PeerConnectionConfig) if err != nil { if err == connectors.ErrUnsupportedFunctionality { - activity.GetLogger(ctx).Info("Connector does not implement normalized tables") + logger.Info("Connector does not implement normalized tables") return nil, nil } return nil, fmt.Errorf("failed to get connector: %w", err) @@ -172,7 +173,6 @@ func (a *FlowableActivity) CreateNormalizedTable( }) defer shutdown() - logger := activity.GetLogger(ctx) tableExistsMapping := make(map[string]bool) for tableIdentifier, tableSchema := range config.TableNameSchemaMapping { created, err := conn.SetupNormalizedTable( diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index edc2546903..a7cf31ce9c 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -652,7 +652,7 @@ func (c *BigQueryConnector) StartSetupNormalizedTables(_ context.Context) (inter return make(map[datasetTable]struct{}), nil } -func (c *BigQueryConnector) FinishSetupNormalizedTables(_ context.Context) error { +func (c *BigQueryConnector) FinishSetupNormalizedTables(_ context.Context, _ interface{}) error { return nil } diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index 225aa2845c..0a43ed0353 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -23,7 +23,7 @@ func (c *ClickhouseConnector) StartSetupNormalizedTables(_ context.Context) (int return nil, nil } -func (c *ClickhouseConnector) FinishSetupNormalizedTables(_ context.Context) error { +func (c *ClickhouseConnector) FinishSetupNormalizedTables(_ context.Context, _ interface{}) error { return nil } diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 23bbe4e04d..6b2e470ac6 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -244,6 +244,11 @@ var ( _ CDCNormalizeConnector = &connsnowflake.SnowflakeConnector{} _ CDCNormalizeConnector = &connclickhouse.ClickhouseConnector{} + _ NormalizedTablesConnector = &connpostgres.PostgresConnector{} + _ NormalizedTablesConnector = &connbigquery.BigQueryConnector{} + _ NormalizedTablesConnector = &connsnowflake.SnowflakeConnector{} + _ NormalizedTablesConnector = &connclickhouse.ClickhouseConnector{} + _ QRepPullConnector = &connpostgres.PostgresConnector{} _ QRepPullConnector = &connsqlserver.SQLServerConnector{} diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index ab13a52a34..befebcf284 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -333,7 +333,7 @@ func (c *SnowflakeConnector) StartSetupNormalizedTables(_ context.Context) (inte return nil, nil } -func (c *SnowflakeConnector) FinishSetupNormalizedTables(_ context.Context) error { +func (c *SnowflakeConnector) FinishSetupNormalizedTables(_ context.Context, _ interface{}) error { return nil }