Skip to content

Commit

Permalink
fixed edge cases with QRep catalog stats and IntPartitions (#536)
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Oct 19, 2023
1 parent 8c9a259 commit d32dfde
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 16 deletions.
26 changes: 12 additions & 14 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,16 +562,16 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Infof("no records to push for partition %s\n", partition.PartitionId)
return nil
} else {
wg.Wait()
if goroutineErr != nil {
return goroutineErr
}
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Infof("pushed %d records\n", res)
}

wg.Wait()
if goroutineErr != nil {
return goroutineErr
}
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Infof("pushed %d records\n", res)
err = a.CatalogMirrorMonitor.UpdateEndTimeForPartition(ctx, runUUID, partition)
if err != nil {
return err
Expand All @@ -585,7 +585,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config
ctx = context.WithValue(ctx, shared.EnableMetricsKey, a.EnableMetrics)
dstConn, err := connectors.GetQRepConsolidateConnector(ctx, config.DestinationPeer)
if errors.Is(err, connectors.ErrUnsupportedFunctionality) {
return nil
return a.CatalogMirrorMonitor.UpdateEndTimeForQRepRun(ctx, runUUID)
} else if err != nil {
return err
}
Expand All @@ -599,13 +599,11 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config
}()

err = dstConn.ConsolidateQRepPartitions(config)
if errors.Is(err, connectors.ErrUnsupportedFunctionality) {
return nil
} else if err != nil {
if err != nil {
return err
}
err = a.CatalogMirrorMonitor.UpdateEndTimeForQRepRun(ctx, runUUID)
return err

return a.CatalogMirrorMonitor.UpdateEndTimeForQRepRun(ctx, runUUID)
}

func (a *FlowableActivity) CleanupQRepFlow(ctx context.Context, config *protos.QRepConfig) error {
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (c *PostgresConnector) GetQRepPartitions(
return nil, fmt.Errorf("failed to set transaction snapshot: %w", err)
}

// TODO re-enable locing of the watermark table.
// TODO re-enable locking of the watermark table.
// // lock the table while we get the partitions.
// lockQuery := fmt.Sprintf("LOCK %s IN EXCLUSIVE MODE", config.WatermarkTable)
// if _, err = tx.Exec(c.ctx, lockQuery); err != nil {
Expand Down Expand Up @@ -612,7 +612,7 @@ func (c *PostgresConnector) getIntPartitions(
return nil, fmt.Errorf("batch size cannot be 0")
}

for start <= end {
for start < end {
partitionEnd := start + batchSize
// safeguard against integer overflow
if partitionEnd > end || partitionEnd < start {
Expand Down

0 comments on commit d32dfde

Please sign in to comment.