From 6d357f53d48a87a42d27cf82dcf8752df8e0350a Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 20 Sep 2023 12:41:45 +0530 Subject: [PATCH] adds typecasting to get xmin working --- flow/connectors/postgres/qrep.go | 12 ++++++++++++ flow/connectors/postgres/qrep_query_executor.go | 4 ++++ flow/connectors/utils/partition/partition.go | 8 ++++++++ 3 files changed, 24 insertions(+) diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index e80374387..ad4297092 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -114,6 +114,9 @@ func (c *PostgresConnector) getNumRowsPartitions( var err error numRowsPerPartition := int64(config.NumRowsPerPartition) quotedWatermarkColumn := fmt.Sprintf("\"%s\"", config.WatermarkColumn) + if config.WatermarkColumn == "xmin" { + quotedWatermarkColumn = fmt.Sprintf("%s::text::bigint", quotedWatermarkColumn) + } whereClause := "" if last != nil && last.Range != nil { @@ -130,7 +133,10 @@ func (c *PostgresConnector) getNumRowsPartitions( minVal = lastRange.IntRange.End case *protos.PartitionRange_TimestampRange: minVal = lastRange.TimestampRange.End.AsTime() + case *protos.PartitionRange_XminRange: + minVal = lastRange.XminRange.End } + row = tx.QueryRow(c.ctx, countQuery, minVal) } else { row = tx.QueryRow(c.ctx, countQuery) @@ -309,6 +315,9 @@ func (c *PostgresConnector) PullQRepRecords( OffsetNumber: uint16(x.TidRange.End.OffsetNumber), Valid: true, } + case *protos.PartitionRange_XminRange: + rangeStart = x.XminRange.Start + rangeEnd = x.XminRange.End default: return nil, fmt.Errorf("unknown range type: %v", x) } @@ -383,6 +392,9 @@ func (c *PostgresConnector) PullQRepRecordStream( OffsetNumber: uint16(x.TidRange.End.OffsetNumber), Valid: true, } + case *protos.PartitionRange_XminRange: + rangeStart = x.XminRange.Start + rangeEnd = x.XminRange.End default: return 0, fmt.Errorf("unknown range type: %v", x) } diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index 8857e8eb0..5e2ef59d5 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -336,6 +336,10 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStream( cursorName := fmt.Sprintf("peerdb_cursor_%d", randomUint) fetchSize := shared.FetchAndChannelSize cursorQuery := fmt.Sprintf("DECLARE %s CURSOR FOR %s", cursorName, query) + log.WithFields(log.Fields{ + "flowName": qe.flowJobName, + "partitionID": qe.partitionID, + }).Infof("[pg_query_executor] executing cursor declaration for %v with args %v", cursorQuery, args) _, err = tx.Exec(qe.ctx, cursorQuery, args...) if err != nil { stream.Records <- &model.QRecordOrError{ diff --git a/flow/connectors/utils/partition/partition.go b/flow/connectors/utils/partition/partition.go index 68a0c30e3..092b4a9cb 100644 --- a/flow/connectors/utils/partition/partition.go +++ b/flow/connectors/utils/partition/partition.go @@ -53,6 +53,14 @@ func compareValues(prevEnd interface{}, start interface{}) int { return 0 } } + case uint32: //xmin + if prevEnd.(uint32) < uint32(v) { + return -1 + } else if prevEnd.(uint32) > uint32(v) { + return 1 + } else { + return 0 + } default: return 0 }