diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index ffd6eba45..a0c4a989e 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -598,10 +598,15 @@ func (h *FlowRequestHandler) CDCBatches(ctx context.Context, req *protos.GetCDCB return nil, err } + var page int32 + if req.Limit != 0 { + page = rowsBehind/int32(req.Limit) + 1 + } + return &protos.GetCDCBatchesResponse{ CdcBatches: batches, Total: total, - Page: rowsBehind/int32(req.Limit) + 1, + Page: page, }, nil } @@ -755,7 +760,7 @@ func (h *FlowRequestHandler) ListMirrorLogs( } page := req.Page - if page == 0 { + if page == 0 && req.NumPerPage != 0 { page = rowsBehind/req.NumPerPage + 1 } diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index 2a65ec353..b393a4691 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -84,7 +84,6 @@ func (c *PostgresConnector) getNumRowsPartitions( config *protos.QRepConfig, last *protos.QRepPartition, ) ([]*protos.QRepPartition, error) { - var err error numRowsPerPartition := int64(config.NumRowsPerPartition) quotedWatermarkColumn := QuoteIdentifier(config.WatermarkColumn) @@ -116,7 +115,7 @@ func (c *PostgresConnector) getNumRowsPartitions( } var totalRows pgtype.Int8 - if err = row.Scan(&totalRows); err != nil { + if err := row.Scan(&totalRows); err != nil { return nil, fmt.Errorf("failed to query for total rows: %w", err) } @@ -177,19 +176,16 @@ func (c *PostgresConnector) getNumRowsPartitions( return nil, fmt.Errorf("failed to scan row: %w", err) } - err = partitionHelper.AddPartition(start, end) - if err != nil { + if err := partitionHelper.AddPartition(start, end); err != nil { return nil, fmt.Errorf("failed to add partition: %w", err) } } - err = rows.Err() - if err != nil { + if err := rows.Err(); err != nil { return nil, fmt.Errorf("failed to read rows: %w", err) } - err = tx.Commit(ctx) - if err != nil { + if err := tx.Commit(ctx); err != nil { return nil, fmt.Errorf("failed to commit transaction: %w", err) }