diff --git a/flow/connectors/utils/monitoring/monitoring.go b/flow/connectors/utils/monitoring/monitoring.go index ebb73a0b60..d682406203 100644 --- a/flow/connectors/utils/monitoring/monitoring.go +++ b/flow/connectors/utils/monitoring/monitoring.go @@ -224,43 +224,38 @@ func AppendSlotSizeInfo( func addPartitionToQRepRun(ctx context.Context, pool *pgxpool.Pool, flowJobName string, runUUID string, partition *protos.QRepPartition, ) error { - if partition.Range == nil && partition.FullTablePartition { - logger.LoggerFromCtx(ctx).Info("partition"+partition.PartitionId+ - " is a full table partition. Metrics logging is skipped.", - slog.String(string(shared.FlowNameKey), flowJobName)) - return nil - } - var rangeStart, rangeEnd string - switch x := partition.Range.Range.(type) { - case *protos.PartitionRange_IntRange: - rangeStart = strconv.FormatInt(x.IntRange.Start, 10) - rangeEnd = strconv.FormatInt(x.IntRange.End, 10) - case *protos.PartitionRange_TimestampRange: - rangeStart = x.TimestampRange.Start.AsTime().String() - rangeEnd = x.TimestampRange.End.AsTime().String() - case *protos.PartitionRange_TidRange: - rangeStartValue, err := pgtype.TID{ - BlockNumber: x.TidRange.Start.BlockNumber, - OffsetNumber: uint16(x.TidRange.Start.OffsetNumber), - Valid: true, - }.Value() - if err != nil { - return fmt.Errorf("unable to encode TID as string: %w", err) - } - rangeStart = rangeStartValue.(string) - - rangeEndValue, err := pgtype.TID{ - BlockNumber: x.TidRange.End.BlockNumber, - OffsetNumber: uint16(x.TidRange.End.OffsetNumber), - Valid: true, - }.Value() - if err != nil { - return fmt.Errorf("unable to encode TID as string: %w", err) + if partition.Range != nil && !partition.FullTablePartition { + switch x := partition.Range.Range.(type) { + case *protos.PartitionRange_IntRange: + rangeStart = strconv.FormatInt(x.IntRange.Start, 10) + rangeEnd = strconv.FormatInt(x.IntRange.End, 10) + case *protos.PartitionRange_TimestampRange: + rangeStart = x.TimestampRange.Start.AsTime().String() + rangeEnd = x.TimestampRange.End.AsTime().String() + case *protos.PartitionRange_TidRange: + rangeStartValue, err := pgtype.TID{ + BlockNumber: x.TidRange.Start.BlockNumber, + OffsetNumber: uint16(x.TidRange.Start.OffsetNumber), + Valid: true, + }.Value() + if err != nil { + return fmt.Errorf("unable to encode TID as string: %w", err) + } + rangeStart = rangeStartValue.(string) + + rangeEndValue, err := pgtype.TID{ + BlockNumber: x.TidRange.End.BlockNumber, + OffsetNumber: uint16(x.TidRange.End.OffsetNumber), + Valid: true, + }.Value() + if err != nil { + return fmt.Errorf("unable to encode TID as string: %w", err) + } + rangeEnd = rangeEndValue.(string) + default: + return fmt.Errorf("unknown range type: %v", x) } - rangeEnd = rangeEndValue.(string) - default: - return fmt.Errorf("unknown range type: %v", x) } _, err := pool.Exec(ctx,