Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

check for partition being synced before starting errgroup #1401

Merged
merged 10 commits into from
Mar 7, 2024
27 changes: 19 additions & 8 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,12 +611,6 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := activity.GetLogger(ctx)

err := monitoring.UpdateStartTimeForPartition(ctx, a.CatalogPool, runUUID, partition, time.Now())
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to update start time for partition: %w", err)
}

srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
Expand All @@ -631,6 +625,25 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
}
defer connectors.CloseConnector(ctx, dstConn)

done, err := dstConn.IsQRepPartitionSynced(ctx, &protos.IsQRepPartitionSyncedInput{
FlowJobName: config.FlowJobName,
PartitionId: partition.PartitionId,
})
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to get fetch status of partition: %w", err)
}
if done {
logger.Info("no records to push for partition " + partition.PartitionId)
return nil
}

err = monitoring.UpdateStartTimeForPartition(ctx, a.CatalogPool, runUUID, partition, time.Now())
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to update start time for partition: %w", err)
}

logger.Info("replicating partition " + partition.PartitionId)
shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("syncing partition - %s: %d of %d total.", partition.PartitionId, idx, total)
Expand Down Expand Up @@ -705,8 +718,6 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
if err != nil {
return err
}
} else {
logger.Info("no records to push for partition " + partition.PartitionId)
}

err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition)
Expand Down
15 changes: 6 additions & 9 deletions flow/connectors/bigquery/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,6 @@ func (c *BigQueryConnector) SyncQRepRecords(
return 0, err
}

done, err := c.pgMetadata.IsQrepPartitionSynced(ctx, config.FlowJobName, partition.PartitionId)
if err != nil {
return 0, fmt.Errorf("failed to check if partition %s is synced: %w", partition.PartitionId, err)
}

if done {
c.logger.Info(fmt.Sprintf("Partition %s has already been synced", partition.PartitionId))
return 0, nil
}
c.logger.Info(fmt.Sprintf("QRep sync function called and partition existence checked for"+
" partition %s of destination table %s",
partition.PartitionId, destTable))
Expand Down Expand Up @@ -111,3 +102,9 @@ func (c *BigQueryConnector) SetupQRepMetadataTables(ctx context.Context, config

return nil
}

func (c *BigQueryConnector) IsQRepPartitionSynced(ctx context.Context,
req *protos.IsQRepPartitionSyncedInput,
) (bool, error) {
return c.pgMetadata.IsQrepPartitionSynced(ctx, req.FlowJobName, req.PartitionId)
}
16 changes: 4 additions & 12 deletions flow/connectors/clickhouse/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,6 @@ func (c *ClickhouseConnector) SyncQRepRecords(
slog.String("destinationTable", destTable),
)

done, err := c.isPartitionSynced(ctx, partition.PartitionId)
if err != nil {
return 0, fmt.Errorf("failed to check if partition %s is synced: %w", partition.PartitionId, err)
}

if done {
c.logger.Info("Partition has already been synced", flowLog)
return 0, nil
}

tblSchema, err := c.getTableSchema(destTable)
if err != nil {
return 0, fmt.Errorf("failed to get schema of table %s: %w", destTable, err)
Expand Down Expand Up @@ -96,9 +86,11 @@ func (c *ClickhouseConnector) getTableSchema(tableName string) ([]*sql.ColumnTyp
return columnTypes, nil
}

func (c *ClickhouseConnector) isPartitionSynced(ctx context.Context, partitionID string) (bool, error) {
func (c *ClickhouseConnector) IsQRepPartitionSynced(ctx context.Context,
req *protos.IsQRepPartitionSyncedInput,
) (bool, error) {
//nolint:gosec
queryString := fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE partitionID = '%s'`, qRepMetadataTableName, partitionID)
queryString := fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE partitionID = '%s'`, qRepMetadataTableName, req.PartitionId)

row := c.database.QueryRowContext(ctx, queryString)

Expand Down
4 changes: 4 additions & 0 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ type QRepPullConnector interface {
type QRepSyncConnector interface {
Connector

// IsQRepPartitionSynced returns true if a partition has already been synced
IsQRepPartitionSynced(ctx context.Context, req *protos.IsQRepPartitionSyncedInput) (bool, error)

// SetupQRepMetadataTables sets up the metadata tables for QRep.
SetupQRepMetadataTables(ctx context.Context, config *protos.QRepConfig) error

Expand Down Expand Up @@ -261,6 +264,7 @@ var (
_ QRepSyncConnector = &connbigquery.BigQueryConnector{}
_ QRepSyncConnector = &connsnowflake.SnowflakeConnector{}
_ QRepSyncConnector = &connclickhouse.ClickhouseConnector{}
_ QRepSyncConnector = &conns3.S3Connector{}

_ QRepConsolidateConnector = &connsnowflake.SnowflakeConnector{}
_ QRepConsolidateConnector = &connclickhouse.ClickhouseConnector{}
Expand Down
19 changes: 6 additions & 13 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,15 +458,6 @@ func (c *PostgresConnector) SyncQRepRecords(
return 0, fmt.Errorf("table %s does not exist, used schema: %s", dstTable.Table, dstTable.Schema)
}

done, err := c.isPartitionSynced(ctx, partition.PartitionId)
if err != nil {
return 0, fmt.Errorf("failed to check if partition is synced: %w", err)
}

if done {
c.logger.Info(fmt.Sprintf("partition %s already synced", partition.PartitionId))
return 0, nil
}
c.logger.Info("SyncRecords called and initial checks complete.")

stagingTableSync := &QRepStagingTableSync{connector: c}
Expand Down Expand Up @@ -573,18 +564,20 @@ func BuildQuery(logger log.Logger, query string, flowJobName string) (string, er
return res, nil
}

// isPartitionSynced checks whether a specific partition is synced
func (c *PostgresConnector) isPartitionSynced(ctx context.Context, partitionID string) (bool, error) {
// IsQRepPartitionSynced checks whether a specific partition is synced
func (c *PostgresConnector) IsQRepPartitionSynced(ctx context.Context,
req *protos.IsQRepPartitionSyncedInput,
) (bool, error) {
// setup the query string
metadataTableIdentifier := pgx.Identifier{c.metadataSchema, qRepMetadataTableName}
queryString := fmt.Sprintf(
"SELECT COUNT(*)>0 FROM %s WHERE partitionID = $1;",
"SELECT COUNT(*)>0 FROM %s WHERE partitionID=$1;",
metadataTableIdentifier.Sanitize(),
)

// prepare and execute the query
var result bool
err := c.conn.QueryRow(ctx, queryString, partitionID).Scan(&result)
err := c.conn.QueryRow(ctx, queryString, req.PartitionId).Scan(&result)
if err != nil {
return false, fmt.Errorf("failed to execute query: %w", err)
}
Expand Down
7 changes: 7 additions & 0 deletions flow/connectors/s3/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,10 @@ func (c *S3Connector) SetupQRepMetadataTables(_ context.Context, config *protos.
c.logger.Info("QRep metadata setup not needed for S3.")
return nil
}

// S3 doesn't appear to check if the partition already exists
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment should be updated with reality. It doesn't check. Instead, S3 will always overwrite an existing partition

func (c *S3Connector) IsQRepPartitionSynced(_ context.Context,
config *protos.IsQRepPartitionSyncedInput,
) (bool, error) {
return false, nil
}
16 changes: 6 additions & 10 deletions flow/connectors/snowflake/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,6 @@ func (c *SnowflakeConnector) SyncQRepRecords(
}
c.logger.Info("Called QRep sync function and obtained table schema", flowLog)

done, err := c.pgMetadata.IsQrepPartitionSynced(ctx, config.FlowJobName, partition.PartitionId)
if err != nil {
return 0, fmt.Errorf("failed to check if partition %s is synced: %w", partition.PartitionId, err)
}

if done {
c.logger.Info("Partition has already been synced", flowLog)
return 0, nil
}

avroSync := NewSnowflakeAvroSyncHandler(config, c)
return avroSync.SyncQRepRecords(ctx, config, partition, tblSchema, stream)
}
Expand Down Expand Up @@ -282,3 +272,9 @@ func (c *SnowflakeConnector) dropStage(ctx context.Context, stagingPath string,
func (c *SnowflakeConnector) getStageNameForJob(job string) string {
return fmt.Sprintf("%s.peerdb_stage_%s", c.rawSchema, job)
}

func (c *SnowflakeConnector) IsQRepPartitionSynced(ctx context.Context,
req *protos.IsQRepPartitionSyncedInput,
) (bool, error) {
return c.pgMetadata.IsQrepPartitionSynced(ctx, req.FlowJobName, req.PartitionId)
}
7 changes: 6 additions & 1 deletion protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -380,9 +380,14 @@ message SetupFlowOutput {
map<string, TableSchema> table_name_schema_mapping = 2;
}

message AddTablesToPublicationInput{
message AddTablesToPublicationInput {
string flow_job_name = 1;
string publication_name = 2;
repeated TableMapping additional_tables = 3;
}

message IsQRepPartitionSyncedInput {
string flow_job_name = 1;
string partition_id = 2;
}

Loading