Skip to content

Commit

Permalink
increment rows
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed May 22, 2024
1 parent 371cffb commit b6403eb
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 0 deletions.
1 change: 1 addition & 0 deletions flow/connectors/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ func (c *KafkaConnector) createPool(
for _, kr := range result.records {
c.client.Produce(ctx, kr, func(_ *kgo.Record, err error) {
if err != nil {
c.logger.Error("[kafka] produce error", slog.Any("error", err))
queueErr(err)
} else if recordCounter.Add(-1) == 0 && lastSeenLSN != nil {
shared.AtomicInt64Max(lastSeenLSN, result.lsn)
Expand Down
2 changes: 2 additions & 0 deletions flow/connectors/kafka/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package connkafka
import (
"context"
"fmt"
"log/slog"
"sync/atomic"
"time"

Expand Down Expand Up @@ -39,6 +40,7 @@ func (c *KafkaConnector) SyncQRepRecords(
queueCtx, queueErr := context.WithCancelCause(ctx)
pool, err := c.createPool(queueCtx, config.Script, config.FlowJobName, nil, queueErr)
if err != nil {
c.logger.Error("failed to create pool", slog.Any("error", err))
return 0, err
}
defer pool.Close()
Expand Down
1 change: 1 addition & 0 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ func (qe *QRepQueryExecutor) processRowsStream(
}

stream.Records <- record
numRows.Add(1)
}

qe.logger.Info("processed row stream", slog.String("cursor", cursorName), slog.Int64("records", numRows.Load()))
Expand Down

0 comments on commit b6403eb

Please sign in to comment.