Skip to content

Commit

Permalink
log full table partitions to QRep stats
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Mar 7, 2024
1 parent cbf23bd commit 1b08589
Showing 1 changed file with 30 additions and 35 deletions.
65 changes: 30 additions & 35 deletions flow/connectors/utils/monitoring/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,43 +224,38 @@ func AppendSlotSizeInfo(
func addPartitionToQRepRun(ctx context.Context, pool *pgxpool.Pool, flowJobName string,
runUUID string, partition *protos.QRepPartition,
) error {
if partition.Range == nil && partition.FullTablePartition {
logger.LoggerFromCtx(ctx).Info("partition"+partition.PartitionId+
" is a full table partition. Metrics logging is skipped.",
slog.String(string(shared.FlowNameKey), flowJobName))
return nil
}

var rangeStart, rangeEnd string
switch x := partition.Range.Range.(type) {
case *protos.PartitionRange_IntRange:
rangeStart = strconv.FormatInt(x.IntRange.Start, 10)
rangeEnd = strconv.FormatInt(x.IntRange.End, 10)
case *protos.PartitionRange_TimestampRange:
rangeStart = x.TimestampRange.Start.AsTime().String()
rangeEnd = x.TimestampRange.End.AsTime().String()
case *protos.PartitionRange_TidRange:
rangeStartValue, err := pgtype.TID{
BlockNumber: x.TidRange.Start.BlockNumber,
OffsetNumber: uint16(x.TidRange.Start.OffsetNumber),
Valid: true,
}.Value()
if err != nil {
return fmt.Errorf("unable to encode TID as string: %w", err)
}
rangeStart = rangeStartValue.(string)

rangeEndValue, err := pgtype.TID{
BlockNumber: x.TidRange.End.BlockNumber,
OffsetNumber: uint16(x.TidRange.End.OffsetNumber),
Valid: true,
}.Value()
if err != nil {
return fmt.Errorf("unable to encode TID as string: %w", err)
if partition.Range != nil && !partition.FullTablePartition {
switch x := partition.Range.Range.(type) {
case *protos.PartitionRange_IntRange:
rangeStart = strconv.FormatInt(x.IntRange.Start, 10)
rangeEnd = strconv.FormatInt(x.IntRange.End, 10)
case *protos.PartitionRange_TimestampRange:
rangeStart = x.TimestampRange.Start.AsTime().String()
rangeEnd = x.TimestampRange.End.AsTime().String()
case *protos.PartitionRange_TidRange:
rangeStartValue, err := pgtype.TID{
BlockNumber: x.TidRange.Start.BlockNumber,
OffsetNumber: uint16(x.TidRange.Start.OffsetNumber),
Valid: true,
}.Value()
if err != nil {
return fmt.Errorf("unable to encode TID as string: %w", err)
}
rangeStart = rangeStartValue.(string)

rangeEndValue, err := pgtype.TID{
BlockNumber: x.TidRange.End.BlockNumber,
OffsetNumber: uint16(x.TidRange.End.OffsetNumber),
Valid: true,
}.Value()
if err != nil {
return fmt.Errorf("unable to encode TID as string: %w", err)
}
rangeEnd = rangeEndValue.(string)
default:
return fmt.Errorf("unknown range type: %v", x)
}
rangeEnd = rangeEndValue.(string)
default:
return fmt.Errorf("unknown range type: %v", x)
}

_, err := pool.Exec(ctx,
Expand Down

0 comments on commit 1b08589

Please sign in to comment.