diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 3a6e6c44f7..bf808b0ee6 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -658,8 +658,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, errGroup, errCtx := errgroup.WithContext(ctx) stream := model.NewQRecordStream(bufferSize) errGroup.Go(func() error { - pgConn := srcConn.(*connpostgres.PostgresConnector) - tmp, err := pgConn.PullQRepRecordStream(errCtx, config, partition, stream) + tmp, err := srcConn.PullQRepRecordStream(errCtx, config, partition, stream) numRecords := int64(tmp) if err != nil { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) diff --git a/flow/cmd/peer_data.go b/flow/cmd/peer_data.go index 4a03373c8c..73d1b8310e 100644 --- a/flow/cmd/peer_data.go +++ b/flow/cmd/peer_data.go @@ -201,17 +201,11 @@ func (h *FlowRequestHandler) GetAllTables( if err != nil { return &protos.AllTablesResponse{Tables: nil}, err } - defer rows.Close() - var tables []string - for rows.Next() { - var table pgtype.Text - err := rows.Scan(&table) - if err != nil { - return &protos.AllTablesResponse{Tables: nil}, err - } - - tables = append(tables, table.String) + + tables, err := pgx.CollectRows[string](rows, pgx.RowTo) + if err != nil { + return nil, fmt.Errorf("failed to fetch all tables: %w", err) } return &protos.AllTablesResponse{Tables: tables}, nil } diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 08641b86f8..7d47c1d62f 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -142,7 +142,7 @@ type QRepPullConnector interface { // GetQRepPartitions returns the partitions for a given table that haven't been synced yet. GetQRepPartitions(ctx context.Context, config *protos.QRepConfig, last *protos.QRepPartition) ([]*protos.QRepPartition, error) - // PullQRepRecords returns the records for a given partition. + // PullQRepRecordStream streams the records for a given partition. PullQRepRecordStream(ctx context.Context, config *protos.QRepConfig, partition *protos.QRepPartition, stream *model.QRecordStream) (int, error) } diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index 3f80d0920a..a8c5179448 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -226,7 +226,7 @@ func (qe *QRepQueryExecutor) processFetchedRows( defer func() { rows.Close() - // description of .Close() says it should only be called once rows are closed + // description of .Err() says it should only be called once rows are closed if rows.Err() != nil { stream.Records <- model.QRecordOrError{ Err: rows.Err(),