Skip to content

Commit

Permalink
idle at end of QRepFlowWorkflow unless we detect new rows (#539)
Browse files Browse the repository at this point in the history
Closes #537
  • Loading branch information
heavycrystal authored Oct 19, 2023
1 parent d32dfde commit c65e79a
Show file tree
Hide file tree
Showing 8 changed files with 217 additions and 488 deletions.
36 changes: 36 additions & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,3 +655,39 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context, config *protos.

return nil
}

func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context,
config *protos.QRepConfig, last *protos.QRepPartition) error {
if config.SourcePeer.Type != protos.DBType_POSTGRES {
return nil
}
waitBetweenBatches := 5 * time.Second
if config.WaitBetweenBatchesSeconds > 0 {
waitBetweenBatches = time.Duration(config.WaitBetweenBatchesSeconds) * time.Second
}

srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer)
if err != nil {
return fmt.Errorf("failed to get qrep source connector: %w", err)
}
defer connectors.CloseConnector(srcConn)
pgSrcConn := srcConn.(*connpostgres.PostgresConnector)

attemptCount := 1
for {
activity.RecordHeartbeat(ctx, fmt.Sprintf("no new rows yet, attempt #%d", attemptCount))
time.Sleep(waitBetweenBatches)

result, err := pgSrcConn.CheckForUpdatedMaxValue(config, last)
if err != nil {
return fmt.Errorf("failed to check for new rows: %w", err)
}
if result {
break
}

attemptCount += 1
}

return nil
}
59 changes: 51 additions & 8 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,6 @@ func (c *PostgresConnector) getNumRowsPartitions(
minVal = lastRange.IntRange.End
case *protos.PartitionRange_TimestampRange:
minVal = lastRange.TimestampRange.End.AsTime()
case *protos.PartitionRange_XminRange:
minVal = lastRange.XminRange.End
}

row = tx.QueryRow(c.ctx, countQuery, minVal)
Expand Down Expand Up @@ -253,6 +251,12 @@ func (c *PostgresConnector) getMinMaxValues(
}
case *protos.PartitionRange_TimestampRange:
minValue = lastRange.TimestampRange.End.AsTime()
case *protos.PartitionRange_TidRange:
minValue = lastRange.TidRange.End
maxValue = &protos.TID{
BlockNumber: maxValue.(pgtype.TID).BlockNumber,
OffsetNumber: uint32(maxValue.(pgtype.TID).OffsetNumber),
}
}
} else {
// Otherwise get the minimum value from the database
Expand All @@ -272,6 +276,15 @@ func (c *PostgresConnector) getMinMaxValues(
case int32:
minValue = int64(v)
maxValue = int64(maxValue.(int32))
case pgtype.TID:
minValue = &protos.TID{
BlockNumber: v.BlockNumber,
OffsetNumber: uint32(v.OffsetNumber),
}
maxValue = &protos.TID{
BlockNumber: maxValue.(pgtype.TID).BlockNumber,
OffsetNumber: uint32(maxValue.(pgtype.TID).OffsetNumber),
}
}
}

Expand All @@ -283,6 +296,42 @@ func (c *PostgresConnector) getMinMaxValues(
return minValue, maxValue, nil
}

func (c *PostgresConnector) CheckForUpdatedMaxValue(config *protos.QRepConfig,
last *protos.QRepPartition) (bool, error) {
tx, err := c.pool.Begin(c.ctx)
if err != nil {
return false, fmt.Errorf("unable to begin transaction for getting max value: %w", err)
}
defer func() {
deferErr := tx.Rollback(c.ctx)
if deferErr != pgx.ErrTxClosed && deferErr != nil {
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Errorf("unexpected error rolling back transaction for getting max value: %v", err)
}
}()

_, maxValue, err := c.getMinMaxValues(tx, config, last)
if err != nil {
return false, fmt.Errorf("error while getting min and max values: %w", err)
}

switch x := last.Range.Range.(type) {
case *protos.PartitionRange_IntRange:
if maxValue.(int64) > x.IntRange.End {
return true, nil
}
case *protos.PartitionRange_TimestampRange:
if maxValue.(time.Time).After(x.TimestampRange.End.AsTime()) {
return true, nil
}
default:
return false, fmt.Errorf("unknown range type: %v", x)
}

return false, nil
}

func (c *PostgresConnector) PullQRepRecords(
config *protos.QRepConfig,
partition *protos.QRepPartition) (*model.QRecordBatch, error) {
Expand Down Expand Up @@ -321,9 +370,6 @@ func (c *PostgresConnector) PullQRepRecords(
OffsetNumber: uint16(x.TidRange.End.OffsetNumber),
Valid: true,
}
case *protos.PartitionRange_XminRange:
rangeStart = x.XminRange.Start
rangeEnd = x.XminRange.End
default:
return nil, fmt.Errorf("unknown range type: %v", x)
}
Expand Down Expand Up @@ -406,9 +452,6 @@ func (c *PostgresConnector) PullQRepRecordStream(
OffsetNumber: uint16(x.TidRange.End.OffsetNumber),
Valid: true,
}
case *protos.PartitionRange_XminRange:
rangeStart = x.XminRange.Start
rangeEnd = x.XminRange.End
default:
return 0, fmt.Errorf("unknown range type: %v", x)
}
Expand Down
18 changes: 0 additions & 18 deletions flow/connectors/utils/partition/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,20 +133,6 @@ func createTIDPartition(start pgtype.TID, end pgtype.TID) *protos.QRepPartition
}
}

func createXMINPartition(start uint32, end uint32) *protos.QRepPartition {
return &protos.QRepPartition{
PartitionId: uuid.New().String(),
Range: &protos.PartitionRange{
Range: &protos.PartitionRange_XminRange{
XminRange: &protos.XMINPartitionRange{
Start: start,
End: end,
},
},
},
}
}

type PartitionHelper struct {
prevStart interface{}
prevEnd interface{}
Expand Down Expand Up @@ -196,10 +182,6 @@ func (p *PartitionHelper) AddPartition(start interface{}, end interface{}) error
p.partitions = append(p.partitions, createTIDPartition(v, end.(pgtype.TID)))
p.prevStart = v
p.prevEnd = end
case pgtype.Uint32:
p.partitions = append(p.partitions, createXMINPartition(v.Uint32, end.(uint32)))
p.prevStart = v
p.prevEnd = end
default:
return fmt.Errorf("unsupported type: %T", v)
}
Expand Down
Loading

0 comments on commit c65e79a

Please sign in to comment.