Skip to content

Commit

Permalink
adds typecasting to get xmin working
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Sep 20, 2023
1 parent 499dc7e commit 6d357f5
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 0 deletions.
12 changes: 12 additions & 0 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ func (c *PostgresConnector) getNumRowsPartitions(
var err error
numRowsPerPartition := int64(config.NumRowsPerPartition)
quotedWatermarkColumn := fmt.Sprintf("\"%s\"", config.WatermarkColumn)
if config.WatermarkColumn == "xmin" {
quotedWatermarkColumn = fmt.Sprintf("%s::text::bigint", quotedWatermarkColumn)
}

whereClause := ""
if last != nil && last.Range != nil {
Expand All @@ -130,7 +133,10 @@ func (c *PostgresConnector) getNumRowsPartitions(
minVal = lastRange.IntRange.End
case *protos.PartitionRange_TimestampRange:
minVal = lastRange.TimestampRange.End.AsTime()
case *protos.PartitionRange_XminRange:
minVal = lastRange.XminRange.End
}

row = tx.QueryRow(c.ctx, countQuery, minVal)
} else {
row = tx.QueryRow(c.ctx, countQuery)
Expand Down Expand Up @@ -309,6 +315,9 @@ func (c *PostgresConnector) PullQRepRecords(
OffsetNumber: uint16(x.TidRange.End.OffsetNumber),
Valid: true,
}
case *protos.PartitionRange_XminRange:
rangeStart = x.XminRange.Start
rangeEnd = x.XminRange.End
default:
return nil, fmt.Errorf("unknown range type: %v", x)
}
Expand Down Expand Up @@ -383,6 +392,9 @@ func (c *PostgresConnector) PullQRepRecordStream(
OffsetNumber: uint16(x.TidRange.End.OffsetNumber),
Valid: true,
}
case *protos.PartitionRange_XminRange:
rangeStart = x.XminRange.Start
rangeEnd = x.XminRange.End
default:
return 0, fmt.Errorf("unknown range type: %v", x)
}
Expand Down
4 changes: 4 additions & 0 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,10 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStream(
cursorName := fmt.Sprintf("peerdb_cursor_%d", randomUint)
fetchSize := shared.FetchAndChannelSize
cursorQuery := fmt.Sprintf("DECLARE %s CURSOR FOR %s", cursorName, query)
log.WithFields(log.Fields{
"flowName": qe.flowJobName,
"partitionID": qe.partitionID,
}).Infof("[pg_query_executor] executing cursor declaration for %v with args %v", cursorQuery, args)
_, err = tx.Exec(qe.ctx, cursorQuery, args...)
if err != nil {
stream.Records <- &model.QRecordOrError{
Expand Down
8 changes: 8 additions & 0 deletions flow/connectors/utils/partition/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ func compareValues(prevEnd interface{}, start interface{}) int {
return 0
}
}
case uint32: //xmin
if prevEnd.(uint32) < uint32(v) {
return -1
} else if prevEnd.(uint32) > uint32(v) {
return 1
} else {
return 0
}
default:
return 0
}
Expand Down

0 comments on commit 6d357f5

Please sign in to comment.