Skip to content

Commit

Permalink
fixed some comments
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Mar 6, 2024
1 parent e436b0c commit 10adad0
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 14 deletions.
3 changes: 1 addition & 2 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,8 +658,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
errGroup, errCtx := errgroup.WithContext(ctx)
stream := model.NewQRecordStream(bufferSize)
errGroup.Go(func() error {
pgConn := srcConn.(*connpostgres.PostgresConnector)
tmp, err := pgConn.PullQRepRecordStream(errCtx, config, partition, stream)
tmp, err := srcConn.PullQRepRecordStream(errCtx, config, partition, stream)
numRecords := int64(tmp)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
Expand Down
14 changes: 4 additions & 10 deletions flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,17 +201,11 @@ func (h *FlowRequestHandler) GetAllTables(
if err != nil {
return &protos.AllTablesResponse{Tables: nil}, err
}

defer rows.Close()
var tables []string
for rows.Next() {
var table pgtype.Text
err := rows.Scan(&table)
if err != nil {
return &protos.AllTablesResponse{Tables: nil}, err
}

tables = append(tables, table.String)

tables, err := pgx.CollectRows[string](rows, pgx.RowTo)
if err != nil {
return nil, fmt.Errorf("failed to fetch all tables: %w", err)
}
return &protos.AllTablesResponse{Tables: tables}, nil
}
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ type QRepPullConnector interface {
// GetQRepPartitions returns the partitions for a given table that haven't been synced yet.
GetQRepPartitions(ctx context.Context, config *protos.QRepConfig, last *protos.QRepPartition) ([]*protos.QRepPartition, error)

// PullQRepRecords returns the records for a given partition.
// PullQRepRecordStream streams the records for a given partition.
PullQRepRecordStream(ctx context.Context, config *protos.QRepConfig,
partition *protos.QRepPartition, stream *model.QRecordStream) (int, error)
}
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (qe *QRepQueryExecutor) processFetchedRows(
defer func() {
rows.Close()

// description of .Close() says it should only be called once rows are closed
// description of .Err() says it should only be called once rows are closed
if rows.Err() != nil {
stream.Records <- model.QRecordOrError{
Err: rows.Err(),
Expand Down

0 comments on commit 10adad0

Please sign in to comment.