Skip to content

Commit

Permalink
check for partition being synced before starting errgroup (#1401)
Browse files Browse the repository at this point in the history
Co-authored-by: Philip Dubé <[email protected]>
  • Loading branch information
heavycrystal and serprex authored Mar 7, 2024
1 parent d8a7bd6 commit 0496eb5
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 53 deletions.
27 changes: 19 additions & 8 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,12 +611,6 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := activity.GetLogger(ctx)

err := monitoring.UpdateStartTimeForPartition(ctx, a.CatalogPool, runUUID, partition, time.Now())
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to update start time for partition: %w", err)
}

srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
Expand All @@ -631,6 +625,25 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
}
defer connectors.CloseConnector(ctx, dstConn)

done, err := dstConn.IsQRepPartitionSynced(ctx, &protos.IsQRepPartitionSyncedInput{
FlowJobName: config.FlowJobName,
PartitionId: partition.PartitionId,
})
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to get fetch status of partition: %w", err)
}
if done {
logger.Info("no records to push for partition " + partition.PartitionId)
return nil
}

err = monitoring.UpdateStartTimeForPartition(ctx, a.CatalogPool, runUUID, partition, time.Now())
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to update start time for partition: %w", err)
}

logger.Info("replicating partition " + partition.PartitionId)
shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("syncing partition - %s: %d of %d total.", partition.PartitionId, idx, total)
Expand Down Expand Up @@ -705,8 +718,6 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
if err != nil {
return err
}
} else {
logger.Info("no records to push for partition " + partition.PartitionId)
}

err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition)
Expand Down
15 changes: 6 additions & 9 deletions flow/connectors/bigquery/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,6 @@ func (c *BigQueryConnector) SyncQRepRecords(
return 0, err
}

done, err := c.pgMetadata.IsQrepPartitionSynced(ctx, config.FlowJobName, partition.PartitionId)
if err != nil {
return 0, fmt.Errorf("failed to check if partition %s is synced: %w", partition.PartitionId, err)
}

if done {
c.logger.Info(fmt.Sprintf("Partition %s has already been synced", partition.PartitionId))
return 0, nil
}
c.logger.Info(fmt.Sprintf("QRep sync function called and partition existence checked for"+
" partition %s of destination table %s",
partition.PartitionId, destTable))
Expand Down Expand Up @@ -111,3 +102,9 @@ func (c *BigQueryConnector) SetupQRepMetadataTables(ctx context.Context, config

return nil
}

func (c *BigQueryConnector) IsQRepPartitionSynced(ctx context.Context,
req *protos.IsQRepPartitionSyncedInput,
) (bool, error) {
return c.pgMetadata.IsQrepPartitionSynced(ctx, req.FlowJobName, req.PartitionId)
}
16 changes: 4 additions & 12 deletions flow/connectors/clickhouse/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,6 @@ func (c *ClickhouseConnector) SyncQRepRecords(
slog.String("destinationTable", destTable),
)

done, err := c.isPartitionSynced(ctx, partition.PartitionId)
if err != nil {
return 0, fmt.Errorf("failed to check if partition %s is synced: %w", partition.PartitionId, err)
}

if done {
c.logger.Info("Partition has already been synced", flowLog)
return 0, nil
}

tblSchema, err := c.getTableSchema(destTable)
if err != nil {
return 0, fmt.Errorf("failed to get schema of table %s: %w", destTable, err)
Expand Down Expand Up @@ -96,9 +86,11 @@ func (c *ClickhouseConnector) getTableSchema(tableName string) ([]*sql.ColumnTyp
return columnTypes, nil
}

func (c *ClickhouseConnector) isPartitionSynced(ctx context.Context, partitionID string) (bool, error) {
func (c *ClickhouseConnector) IsQRepPartitionSynced(ctx context.Context,
req *protos.IsQRepPartitionSyncedInput,
) (bool, error) {
//nolint:gosec
queryString := fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE partitionID = '%s'`, qRepMetadataTableName, partitionID)
queryString := fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE partitionID = '%s'`, qRepMetadataTableName, req.PartitionId)

row := c.database.QueryRowContext(ctx, queryString)

Expand Down
4 changes: 4 additions & 0 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ type QRepPullConnector interface {
type QRepSyncConnector interface {
Connector

// IsQRepPartitionSynced returns true if a partition has already been synced
IsQRepPartitionSynced(ctx context.Context, req *protos.IsQRepPartitionSyncedInput) (bool, error)

// SetupQRepMetadataTables sets up the metadata tables for QRep.
SetupQRepMetadataTables(ctx context.Context, config *protos.QRepConfig) error

Expand Down Expand Up @@ -261,6 +264,7 @@ var (
_ QRepSyncConnector = &connbigquery.BigQueryConnector{}
_ QRepSyncConnector = &connsnowflake.SnowflakeConnector{}
_ QRepSyncConnector = &connclickhouse.ClickhouseConnector{}
_ QRepSyncConnector = &conns3.S3Connector{}

_ QRepConsolidateConnector = &connsnowflake.SnowflakeConnector{}
_ QRepConsolidateConnector = &connclickhouse.ClickhouseConnector{}
Expand Down
19 changes: 6 additions & 13 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,15 +458,6 @@ func (c *PostgresConnector) SyncQRepRecords(
return 0, fmt.Errorf("table %s does not exist, used schema: %s", dstTable.Table, dstTable.Schema)
}

done, err := c.isPartitionSynced(ctx, partition.PartitionId)
if err != nil {
return 0, fmt.Errorf("failed to check if partition is synced: %w", err)
}

if done {
c.logger.Info(fmt.Sprintf("partition %s already synced", partition.PartitionId))
return 0, nil
}
c.logger.Info("SyncRecords called and initial checks complete.")

stagingTableSync := &QRepStagingTableSync{connector: c}
Expand Down Expand Up @@ -573,18 +564,20 @@ func BuildQuery(logger log.Logger, query string, flowJobName string) (string, er
return res, nil
}

// isPartitionSynced checks whether a specific partition is synced
func (c *PostgresConnector) isPartitionSynced(ctx context.Context, partitionID string) (bool, error) {
// IsQRepPartitionSynced checks whether a specific partition is synced
func (c *PostgresConnector) IsQRepPartitionSynced(ctx context.Context,
req *protos.IsQRepPartitionSyncedInput,
) (bool, error) {
// setup the query string
metadataTableIdentifier := pgx.Identifier{c.metadataSchema, qRepMetadataTableName}
queryString := fmt.Sprintf(
"SELECT COUNT(*)>0 FROM %s WHERE partitionID = $1;",
"SELECT COUNT(*)>0 FROM %s WHERE partitionID=$1;",
metadataTableIdentifier.Sanitize(),
)

// prepare and execute the query
var result bool
err := c.conn.QueryRow(ctx, queryString, partitionID).Scan(&result)
err := c.conn.QueryRow(ctx, queryString, req.PartitionId).Scan(&result)
if err != nil {
return false, fmt.Errorf("failed to execute query: %w", err)
}
Expand Down
7 changes: 7 additions & 0 deletions flow/connectors/s3/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,10 @@ func (c *S3Connector) SetupQRepMetadataTables(_ context.Context, config *protos.
c.logger.Info("QRep metadata setup not needed for S3.")
return nil
}

// S3 doesn't check if partition is already synced, but file with same name is overwritten
func (c *S3Connector) IsQRepPartitionSynced(_ context.Context,
config *protos.IsQRepPartitionSyncedInput,
) (bool, error) {
return false, nil
}
16 changes: 6 additions & 10 deletions flow/connectors/snowflake/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,6 @@ func (c *SnowflakeConnector) SyncQRepRecords(
}
c.logger.Info("Called QRep sync function and obtained table schema", flowLog)

done, err := c.pgMetadata.IsQrepPartitionSynced(ctx, config.FlowJobName, partition.PartitionId)
if err != nil {
return 0, fmt.Errorf("failed to check if partition %s is synced: %w", partition.PartitionId, err)
}

if done {
c.logger.Info("Partition has already been synced", flowLog)
return 0, nil
}

avroSync := NewSnowflakeAvroSyncHandler(config, c)
return avroSync.SyncQRepRecords(ctx, config, partition, tblSchema, stream)
}
Expand Down Expand Up @@ -282,3 +272,9 @@ func (c *SnowflakeConnector) dropStage(ctx context.Context, stagingPath string,
func (c *SnowflakeConnector) getStageNameForJob(job string) string {
return fmt.Sprintf("%s.peerdb_stage_%s", c.rawSchema, job)
}

func (c *SnowflakeConnector) IsQRepPartitionSynced(ctx context.Context,
req *protos.IsQRepPartitionSyncedInput,
) (bool, error) {
return c.pgMetadata.IsQrepPartitionSynced(ctx, req.FlowJobName, req.PartitionId)
}
7 changes: 6 additions & 1 deletion protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -380,9 +380,14 @@ message SetupFlowOutput {
map<string, TableSchema> table_name_schema_mapping = 2;
}

message AddTablesToPublicationInput{
message AddTablesToPublicationInput {
string flow_job_name = 1;
string publication_name = 2;
repeated TableMapping additional_tables = 3;
}

message IsQRepPartitionSyncedInput {
string flow_job_name = 1;
string partition_id = 2;
}

0 comments on commit 0496eb5

Please sign in to comment.