Skip to content

Commit

Permalink
Remove InitializeTableSchema (#1078)
Browse files Browse the repository at this point in the history
Split from #893
  • Loading branch information
serprex authored Jan 15, 2024
1 parent aebe35c commit 826e8fa
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 81 deletions.
20 changes: 5 additions & 15 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,11 +197,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
}
defer connectors.CloseConnector(dstConn)

slog.InfoContext(ctx, "initializing table schema...")
err = dstConn.InitializeTableSchema(input.FlowConnectionConfigs.TableNameSchemaMapping)
if err != nil {
return nil, fmt.Errorf("failed to initialize table schema: %w", err)
}
activity.RecordHeartbeat(ctx, "initialized table schema")
slog.InfoContext(ctx, "pulling records...")
tblNameMapping := make(map[string]model.NameAndExclude)
Expand Down Expand Up @@ -398,17 +393,12 @@ func (a *FlowableActivity) StartNormalize(
})
defer shutdown()

slog.InfoContext(ctx, "initializing table schema...")
err = dstConn.InitializeTableSchema(input.FlowConnectionConfigs.TableNameSchemaMapping)
if err != nil {
return nil, fmt.Errorf("failed to initialize table schema: %w", err)
}

res, err := dstConn.NormalizeRecords(&model.NormalizeRecordsRequest{
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
SoftDelete: input.FlowConnectionConfigs.SoftDelete,
SoftDeleteColName: input.FlowConnectionConfigs.SoftDeleteColName,
SyncedAtColName: input.FlowConnectionConfigs.SyncedAtColName,
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
SoftDelete: input.FlowConnectionConfigs.SoftDelete,
SoftDeleteColName: input.FlowConnectionConfigs.SoftDeleteColName,
SyncedAtColName: input.FlowConnectionConfigs.SyncedAtColName,
TableNameSchemaMapping: input.FlowConnectionConfigs.TableNameSchemaMapping,
})
if err != nil {
a.Alerter.LogFlowError(ctx, input.FlowConnectionConfigs.FlowJobName, err)
Expand Down
23 changes: 8 additions & 15 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,13 @@ type BigQueryServiceAccount struct {

// BigQueryConnector is a Connector implementation for BigQuery.
type BigQueryConnector struct {
ctx context.Context
bqConfig *protos.BigqueryConfig
client *bigquery.Client
storageClient *storage.Client
tableNameSchemaMapping map[string]*protos.TableSchema
datasetID string
catalogPool *pgxpool.Pool
logger slog.Logger
ctx context.Context
bqConfig *protos.BigqueryConfig
client *bigquery.Client
storageClient *storage.Client
datasetID string
catalogPool *pgxpool.Pool
logger slog.Logger
}

// Create BigQueryServiceAccount from BigqueryConfig
Expand Down Expand Up @@ -211,12 +210,6 @@ func (c *BigQueryConnector) NeedsSetupMetadataTables() bool {
return err != nil
}

// InitializeTableSchema initializes the schema for a table, implementing the Connector interface.
func (c *BigQueryConnector) InitializeTableSchema(req map[string]*protos.TableSchema) error {
c.tableNameSchemaMapping = req
return nil
}

func (c *BigQueryConnector) waitForTableReady(datasetTable *datasetTable) error {
table := c.client.Dataset(datasetTable.dataset).Table(datasetTable.table)
maxDuration := 5 * time.Minute
Expand Down Expand Up @@ -596,7 +589,7 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
},
dstTableName: tableName,
dstDatasetTable: dstDatasetTable,
normalizedTableSchema: c.tableNameSchemaMapping[tableName],
normalizedTableSchema: req.TableNameSchemaMapping[tableName],
syncBatchID: batchIDs.SyncBatchID,
normalizeBatchID: batchIDs.NormalizeBatchID,
peerdbCols: &protos.PeerDBColumns{
Expand Down
6 changes: 0 additions & 6 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,6 @@ type CDCSyncConnector interface {
// GetLastSyncBatchID gets the last batch synced to the destination from the metadata table
GetLastSyncBatchID(jobName string) (int64, error)

// InitializeTableSchema initializes the table schema of all the destination tables for the connector.
InitializeTableSchema(req map[string]*protos.TableSchema) error

// CreateRawTable creates a raw table for the connector with a given name and a fixed schema.
CreateRawTable(req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error)

Expand All @@ -89,9 +86,6 @@ type CDCSyncConnector interface {
type CDCNormalizeConnector interface {
Connector

// InitializeTableSchema initializes the table schema of all the destination tables for the connector.
InitializeTableSchema(req map[string]*protos.TableSchema) error

// NormalizeRecords merges records pushed earlier into the destination table.
// This method should be idempotent, and should be able to be called multiple times with the same request.
NormalizeRecords(req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error)
Expand Down
18 changes: 6 additions & 12 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@ import (
)

type EventHubConnector struct {
ctx context.Context
config *protos.EventHubGroupConfig
pgMetadata *metadataStore.PostgresMetadataStore
tableSchemas map[string]*protos.TableSchema
creds *azidentity.DefaultAzureCredential
hubManager *EventHubManager
logger slog.Logger
ctx context.Context
config *protos.EventHubGroupConfig
pgMetadata *metadataStore.PostgresMetadataStore
creds *azidentity.DefaultAzureCredential
hubManager *EventHubManager
logger slog.Logger
}

// NewEventHubConnector creates a new EventHubConnector.
Expand Down Expand Up @@ -83,11 +82,6 @@ func (c *EventHubConnector) ConnectionActive() error {
return nil
}

func (c *EventHubConnector) InitializeTableSchema(req map[string]*protos.TableSchema) error {
c.tableSchemas = req
return nil
}

func (c *EventHubConnector) NeedsSetupMetadataTables() bool {
return c.pgMetadata.NeedsSetupMetadata()
}
Expand Down
9 changes: 1 addition & 8 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ type PostgresConnector struct {
pool *SSHWrappedPostgresPool
replConfig *pgxpool.Config
replPool *SSHWrappedPostgresPool
tableSchemaMapping map[string]*protos.TableSchema
customTypesMapping map[uint32]string
metadataSchema string
logger slog.Logger
Expand Down Expand Up @@ -465,7 +464,7 @@ func (c *PostgresConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
normalizeStmtGen := &normalizeStmtGenerator{
rawTableName: rawTableIdentifier,
dstTableName: destinationTableName,
normalizedTableSchema: c.tableSchemaMapping[destinationTableName],
normalizedTableSchema: req.TableNameSchemaMapping[destinationTableName],
unchangedToastColumns: unchangedToastColsMap[destinationTableName],
peerdbCols: &protos.PeerDBColumns{
SoftDeleteColName: req.SoftDeleteColName,
Expand Down Expand Up @@ -701,12 +700,6 @@ func (c *PostgresConnector) SetupNormalizedTables(req *protos.SetupNormalizedTab
}, nil
}

// InitializeTableSchema initializes the schema for a table, implementing the Connector interface.
func (c *PostgresConnector) InitializeTableSchema(req map[string]*protos.TableSchema) error {
c.tableSchemaMapping = req
return nil
}

// ReplayTableSchemaDelta changes a destination table to match the schema at source
// This could involve adding or dropping multiple columns.
func (c *PostgresConnector) ReplayTableSchemaDeltas(flowJobName string,
Expand Down
5 changes: 0 additions & 5 deletions flow/connectors/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,6 @@ func (c *S3Connector) CreateRawTable(req *protos.CreateRawTableInput) (*protos.C
return nil, nil
}

func (c *S3Connector) InitializeTableSchema(req map[string]*protos.TableSchema) error {
c.logger.Info("InitializeTableSchema for S3 is a no-op")
return nil
}

func (c *S3Connector) Close() error {
c.logger.Debug("Closing metadata store connection")
return c.pgMetadata.Close()
Expand Down
25 changes: 9 additions & 16 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,10 @@ const (
)

type SnowflakeConnector struct {
ctx context.Context
database *sql.DB
tableSchemaMapping map[string]*protos.TableSchema
metadataSchema string
logger slog.Logger
ctx context.Context
database *sql.DB
metadataSchema string
logger slog.Logger
}

// creating this to capture array results from snowflake.
Expand Down Expand Up @@ -149,11 +148,10 @@ func NewSnowflakeConnector(ctx context.Context,

flowName, _ := ctx.Value(shared.FlowNameKey).(string)
return &SnowflakeConnector{
ctx: ctx,
database: database,
tableSchemaMapping: nil,
metadataSchema: metadataSchema,
logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)),
ctx: ctx,
database: database,
metadataSchema: metadataSchema,
logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)),
}, nil
}

Expand Down Expand Up @@ -448,11 +446,6 @@ func (c *SnowflakeConnector) SetupNormalizedTables(
}, nil
}

func (c *SnowflakeConnector) InitializeTableSchema(req map[string]*protos.TableSchema) error {
c.tableSchemaMapping = req
return nil
}

// ReplayTableSchemaDeltas changes a destination table to match the schema at source
// This could involve adding or dropping multiple columns.
func (c *SnowflakeConnector) ReplayTableSchemaDeltas(flowJobName string,
Expand Down Expand Up @@ -641,7 +634,7 @@ func (c *SnowflakeConnector) NormalizeRecords(req *model.NormalizeRecordsRequest
dstTableName: tableName,
syncBatchID: batchIDs.SyncBatchID,
normalizeBatchID: batchIDs.NormalizeBatchID,
normalizedTableSchema: c.tableSchemaMapping[tableName],
normalizedTableSchema: req.TableNameSchemaMapping[tableName],
unchangedToastColumns: tableNametoUnchangedToastCols[tableName],
peerdbCols: &protos.PeerDBColumns{
SoftDelete: req.SoftDelete,
Expand Down
9 changes: 5 additions & 4 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,10 +505,11 @@ type SyncRecordsRequest struct {
}

type NormalizeRecordsRequest struct {
FlowJobName string
SoftDelete bool
SoftDeleteColName string
SyncedAtColName string
FlowJobName string
SoftDelete bool
SoftDeleteColName string
SyncedAtColName string
TableNameSchemaMapping map[string]*protos.TableSchema
}

type SyncResponse struct {
Expand Down

0 comments on commit 826e8fa

Please sign in to comment.