Skip to content

Commit

Permalink
Merge branch 'main' into heartbeat-in-setup-normalized
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Feb 13, 2024
2 parents 0840060 + 977b1c7 commit fb506ed
Showing 1 changed file with 36 additions and 8 deletions.
44 changes: 36 additions & 8 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,23 +157,23 @@ type QRepConsolidateConnector interface {
}

func GetConnector(ctx context.Context, config *protos.Peer) (Connector, error) {
switch config.Config.(type) {
switch inner := config.Config.(type) {
case *protos.Peer_PostgresConfig:
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig())
return connpostgres.NewPostgresConnector(ctx, inner.PostgresConfig)
case *protos.Peer_BigqueryConfig:
return connbigquery.NewBigQueryConnector(ctx, config.GetBigqueryConfig())
return connbigquery.NewBigQueryConnector(ctx, inner.BigqueryConfig)
case *protos.Peer_SnowflakeConfig:
return connsnowflake.NewSnowflakeConnector(ctx, config.GetSnowflakeConfig())
return connsnowflake.NewSnowflakeConnector(ctx, inner.SnowflakeConfig)
case *protos.Peer_EventhubConfig:
return nil, fmt.Errorf("use eventhub group config instead")
case *protos.Peer_EventhubGroupConfig:
return conneventhub.NewEventHubConnector(ctx, config.GetEventhubGroupConfig())
return conneventhub.NewEventHubConnector(ctx, inner.EventhubGroupConfig)
case *protos.Peer_S3Config:
return conns3.NewS3Connector(ctx, config.GetS3Config())
return conns3.NewS3Connector(ctx, inner.S3Config)
case *protos.Peer_SqlserverConfig:
return connsqlserver.NewSQLServerConnector(ctx, config.GetSqlserverConfig())
return connsqlserver.NewSQLServerConnector(ctx, inner.SqlserverConfig)
case *protos.Peer_ClickhouseConfig:
return connclickhouse.NewClickhouseConnector(ctx, config.GetClickhouseConfig())
return connclickhouse.NewClickhouseConnector(ctx, inner.ClickhouseConfig)
default:
return nil, ErrUnsupportedFunctionality
}
Expand Down Expand Up @@ -227,3 +227,31 @@ func CloseConnector(ctx context.Context, conn Connector) {
logger.LoggerFromCtx(ctx).Error("error closing connector", slog.Any("error", err))
}
}

// create type assertions to cause compile time error if connector interface not implemented
var (
_ CDCPullConnector = &connpostgres.PostgresConnector{}

_ CDCSyncConnector = &connpostgres.PostgresConnector{}
_ CDCSyncConnector = &connbigquery.BigQueryConnector{}
_ CDCSyncConnector = &connsnowflake.SnowflakeConnector{}
_ CDCSyncConnector = &conneventhub.EventHubConnector{}
_ CDCSyncConnector = &conns3.S3Connector{}
_ CDCSyncConnector = &connclickhouse.ClickhouseConnector{}

_ CDCNormalizeConnector = &connpostgres.PostgresConnector{}
_ CDCNormalizeConnector = &connbigquery.BigQueryConnector{}
_ CDCNormalizeConnector = &connsnowflake.SnowflakeConnector{}
_ CDCNormalizeConnector = &connclickhouse.ClickhouseConnector{}

_ QRepPullConnector = &connpostgres.PostgresConnector{}
_ QRepPullConnector = &connsqlserver.SQLServerConnector{}

_ QRepSyncConnector = &connpostgres.PostgresConnector{}
_ QRepSyncConnector = &connbigquery.BigQueryConnector{}
_ QRepSyncConnector = &connsnowflake.SnowflakeConnector{}
_ QRepSyncConnector = &connclickhouse.ClickhouseConnector{}

_ QRepConsolidateConnector = &connsnowflake.SnowflakeConnector{}
_ QRepConsolidateConnector = &connclickhouse.ClickhouseConnector{}
)

0 comments on commit fb506ed

Please sign in to comment.