diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 6802e46102..bca12d770a 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -639,6 +639,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, if rowsSynced == 0 { slog.InfoContext(ctx, fmt.Sprintf("no records to push for partition %s\n", partition.PartitionId)) + pullCancel() } else { wg.Wait() if goroutineErr != nil { @@ -655,11 +656,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, } err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition) - if err != nil { - return err - } - - return nil + return err } func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config *protos.QRepConfig, diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index 0bbd10561d..627c2e2fcf 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -168,27 +168,36 @@ func (qe *QRepQueryExecutor) processRowsStream( numRows := 0 const heartBeatNumRows = 5000 + ctx := qe.ctx + // Iterate over the rows for rows.Next() { - record, err := mapRowToQRecord(rows, fieldDescriptions, qe.customTypeMap) - if err != nil { - qe.logger.Error("[pg_query_executor] failed to map row to QRecord", slog.Any("error", err)) + select { + case <-ctx.Done(): + qe.logger.Info("Context canceled, exiting processRowsStream early") + return numRows, ctx.Err() + default: + // Process the row as before + record, err := mapRowToQRecord(rows, fieldDescriptions, qe.customTypeMap) + if err != nil { + qe.logger.Error("[pg_query_executor] failed to map row to QRecord", slog.Any("error", err)) + stream.Records <- model.QRecordOrError{ + Err: fmt.Errorf("failed to map row to QRecord: %w", err), + } + return 0, fmt.Errorf("failed to map row to QRecord: %w", err) + } + stream.Records <- model.QRecordOrError{ - Err: fmt.Errorf("failed to map row to QRecord: %w", err), + Record: record, + Err: nil, } - return 0, fmt.Errorf("failed to map row to QRecord: %w", err) - } - stream.Records <- model.QRecordOrError{ - Record: record, - Err: nil, - } + if numRows%heartBeatNumRows == 0 { + qe.recordHeartbeat("cursor: %s - fetched %d records", cursorName, numRows) + } - if numRows%heartBeatNumRows == 0 { - qe.recordHeartbeat("cursor: %s - fetched %d records", cursorName, numRows) + numRows++ } - - numRows++ } qe.recordHeartbeat("cursor %s - fetch completed - %d records", cursorName, numRows)