Skip to content

Commit

Permalink
connectors: build on a single GetConnector using generic GetConnector…
Browse files Browse the repository at this point in the history
…As function

Split out from moving normalized table creation to a separate connector interface
  • Loading branch information
serprex committed Feb 13, 2024
1 parent a1657f7 commit 467c31f
Showing 1 changed file with 37 additions and 119 deletions.
156 changes: 37 additions & 119 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,147 +136,65 @@ type QRepConsolidateConnector interface {
CleanupQRepFlow(ctx context.Context, config *protos.QRepConfig) error
}

func GetCDCPullConnector(ctx context.Context, config *protos.Peer) (CDCPullConnector, error) {
inner := config.Config
switch inner.(type) {
case *protos.Peer_PostgresConfig:
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig())
default:
return nil, ErrUnsupportedFunctionality
}
}

func GetCDCSyncConnector(ctx context.Context, config *protos.Peer) (CDCSyncConnector, error) {
inner := config.Config
switch inner.(type) {
func GetConnector(ctx context.Context, config *protos.Peer) (Connector, error) {
switch inner := config.Config.(type) {
case *protos.Peer_PostgresConfig:
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig())
return connpostgres.NewPostgresConnector(ctx, inner.PostgresConfig)
case *protos.Peer_BigqueryConfig:
return connbigquery.NewBigQueryConnector(ctx, config.GetBigqueryConfig())
return connbigquery.NewBigQueryConnector(ctx, inner.BigqueryConfig)
case *protos.Peer_SnowflakeConfig:
return connsnowflake.NewSnowflakeConnector(ctx, config.GetSnowflakeConfig())
return connsnowflake.NewSnowflakeConnector(ctx, inner.SnowflakeConfig)
case *protos.Peer_EventhubConfig:
return nil, fmt.Errorf("use eventhub group config instead")
case *protos.Peer_EventhubGroupConfig:
return conneventhub.NewEventHubConnector(ctx, config.GetEventhubGroupConfig())
return conneventhub.NewEventHubConnector(ctx, inner.EventhubGroupConfig)
case *protos.Peer_S3Config:
return conns3.NewS3Connector(ctx, config.GetS3Config())
return conns3.NewS3Connector(ctx, inner.S3Config)
case *protos.Peer_SqlserverConfig:
return connsqlserver.NewSQLServerConnector(ctx, inner.SqlserverConfig)
case *protos.Peer_ClickhouseConfig:
return connclickhouse.NewClickhouseConnector(ctx, config.GetClickhouseConfig())
return connclickhouse.NewClickhouseConnector(ctx, inner.ClickhouseConfig)
default:
return nil, ErrUnsupportedFunctionality
}
}

func GetCDCNormalizeConnector(ctx context.Context,
config *protos.Peer,
) (CDCNormalizeConnector, error) {
inner := config.Config
switch inner.(type) {
case *protos.Peer_PostgresConfig:
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig())
case *protos.Peer_BigqueryConfig:
return connbigquery.NewBigQueryConnector(ctx, config.GetBigqueryConfig())
case *protos.Peer_SnowflakeConfig:
return connsnowflake.NewSnowflakeConnector(ctx, config.GetSnowflakeConfig())
case *protos.Peer_ClickhouseConfig:
return connclickhouse.NewClickhouseConnector(ctx, config.GetClickhouseConfig())
default:
return nil, ErrUnsupportedFunctionality
func GetConnectorAs[T Connector](ctx context.Context, config *protos.Peer) (T, error) {
var none T
conn, err := GetConnector(ctx, config)
if err != nil {
return none, err
}
}

func GetQRepPullConnector(ctx context.Context, config *protos.Peer) (QRepPullConnector, error) {
inner := config.Config
switch inner.(type) {
case *protos.Peer_PostgresConfig:
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig())
case *protos.Peer_SqlserverConfig:
return connsqlserver.NewSQLServerConnector(ctx, config.GetSqlserverConfig())
default:
return nil, ErrUnsupportedFunctionality
if conn, ok := conn.(T); ok {
return conn, nil
} else {
return none, ErrUnsupportedFunctionality
}
}

func GetQRepSyncConnector(ctx context.Context, config *protos.Peer) (QRepSyncConnector, error) {
inner := config.Config
switch inner.(type) {
case *protos.Peer_PostgresConfig:
return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig())
case *protos.Peer_BigqueryConfig:
return connbigquery.NewBigQueryConnector(ctx, config.GetBigqueryConfig())
case *protos.Peer_SnowflakeConfig:
return connsnowflake.NewSnowflakeConnector(ctx, config.GetSnowflakeConfig())
case *protos.Peer_S3Config:
return conns3.NewS3Connector(ctx, config.GetS3Config())
case *protos.Peer_ClickhouseConfig:
return connclickhouse.NewClickhouseConnector(ctx, config.GetClickhouseConfig())
default:
return nil, ErrUnsupportedFunctionality
}
func GetCDCPullConnector(ctx context.Context, config *protos.Peer) (CDCPullConnector, error) {
return GetConnectorAs[CDCPullConnector](ctx, config)
}

func GetConnector(ctx context.Context, peer *protos.Peer) (Connector, error) {
inner := peer.Type
switch inner {
case protos.DBType_POSTGRES:
pgConfig := peer.GetPostgresConfig()

if pgConfig == nil {
return nil, fmt.Errorf("missing postgres config for %s peer %s", peer.Type.String(), peer.Name)
}
// we can't decide if a PG peer should have replication permissions on it because we don't know
// what the user wants to do with it, so defaulting to being permissive.
// can be revisited in the future or we can use some UI wizardry.
return connpostgres.NewPostgresConnector(ctx, pgConfig)
case protos.DBType_BIGQUERY:
bqConfig := peer.GetBigqueryConfig()
if bqConfig == nil {
return nil, fmt.Errorf("missing bigquery config for %s peer %s", peer.Type.String(), peer.Name)
}
return connbigquery.NewBigQueryConnector(ctx, bqConfig)

case protos.DBType_SNOWFLAKE:
sfConfig := peer.GetSnowflakeConfig()
if sfConfig == nil {
return nil, fmt.Errorf("missing snowflake config for %s peer %s", peer.Type.String(), peer.Name)
}
return connsnowflake.NewSnowflakeConnector(ctx, sfConfig)
case protos.DBType_SQLSERVER:
sqlServerConfig := peer.GetSqlserverConfig()
if sqlServerConfig == nil {
return nil, fmt.Errorf("missing sqlserver config for %s peer %s", peer.Type.String(), peer.Name)
}
return connsqlserver.NewSQLServerConnector(ctx, sqlServerConfig)
case protos.DBType_S3:
s3Config := peer.GetS3Config()
if s3Config == nil {
return nil, fmt.Errorf("missing s3 config for %s peer %s", peer.Type.String(), peer.Name)
}
return conns3.NewS3Connector(ctx, s3Config)
case protos.DBType_CLICKHOUSE:
clickhouseConfig := peer.GetClickhouseConfig()
if clickhouseConfig == nil {
return nil, fmt.Errorf("missing clickhouse config for %s peer %s", peer.Type.String(), peer.Name)
}
return connclickhouse.NewClickhouseConnector(ctx, clickhouseConfig)
default:
return nil, fmt.Errorf("unsupported peer type %s", peer.Type.String())
}
func GetCDCSyncConnector(ctx context.Context, config *protos.Peer) (CDCSyncConnector, error) {
return GetConnectorAs[CDCSyncConnector](ctx, config)
}

func GetQRepConsolidateConnector(ctx context.Context,
config *protos.Peer,
) (QRepConsolidateConnector, error) {
inner := config.Config
switch inner.(type) {
case *protos.Peer_SnowflakeConfig:
return connsnowflake.NewSnowflakeConnector(ctx, config.GetSnowflakeConfig())
case *protos.Peer_ClickhouseConfig:
return connclickhouse.NewClickhouseConnector(ctx, config.GetClickhouseConfig())
default:
return nil, ErrUnsupportedFunctionality
}
func GetCDCNormalizeConnector(ctx context.Context, config *protos.Peer) (CDCNormalizeConnector, error) {
return GetConnectorAs[CDCNormalizeConnector](ctx, config)
}

func GetQRepPullConnector(ctx context.Context, config *protos.Peer) (QRepPullConnector, error) {
return GetConnectorAs[QRepPullConnector](ctx, config)
}

func GetQRepSyncConnector(ctx context.Context, config *protos.Peer) (QRepSyncConnector, error) {
return GetConnectorAs[QRepSyncConnector](ctx, config)
}

func GetQRepConsolidateConnector(ctx context.Context, config *protos.Peer) (QRepConsolidateConnector, error) {
return GetConnectorAs[QRepConsolidateConnector](ctx, config)
}

func CloseConnector(ctx context.Context, conn Connector) {
Expand Down

0 comments on commit 467c31f

Please sign in to comment.