Skip to content

Commit

Permalink
add heartbeats for kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed May 22, 2024
1 parent aa76f4e commit 9a274a1
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 12 deletions.
6 changes: 5 additions & 1 deletion flow/connectors/kafka/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package connkafka
import (
"context"
"fmt"
"log/slog"
"sync/atomic"
"time"

Expand Down Expand Up @@ -30,13 +31,16 @@ func (c *KafkaConnector) SyncQRepRecords(
schema := stream.Schema()

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("sent %d records to %s", numRecords.Load(), config.DestinationTableIdentifier)
msg := fmt.Sprintf("sent %d records to %s", numRecords.Load(), config.DestinationTableIdentifier)
c.logger.Info(msg)
return msg
})
defer shutdown()

queueCtx, queueErr := context.WithCancelCause(ctx)
pool, err := c.createPool(queueCtx, config.Script, config.FlowJobName, nil, queueErr)
if err != nil {
c.logger.Error("failed to create pool", slog.Any("error", err))
return 0, err
}
defer pool.Close()
Expand Down
25 changes: 14 additions & 11 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"context"
"fmt"
"log/slog"
"sync/atomic"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgtype"
"go.temporal.io/sdk/log"

"github.com/PeerDB-io/peer-flow/connectors/utils"
datatypes "github.com/PeerDB-io/peer-flow/datatypes"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
Expand Down Expand Up @@ -136,13 +138,19 @@ func (qe *QRepQueryExecutor) processRowsStream(
rows pgx.Rows,
fieldDescriptions []pgconn.FieldDescription,
) (int, error) {
numRows := 0
const heartBeatNumRows = 10000
var numRows atomic.Int64

shutdown := utils.HeartbeatRoutine(ctx, func() string {
msg := fmt.Sprintf("processed %d rows", numRows.Load())
qe.logger.Info(msg)
return msg
})
defer shutdown()

for rows.Next() {
if err := ctx.Err(); err != nil {
qe.logger.Info("Context canceled, exiting processRowsStream early")
return numRows, err
return int(numRows.Load()), err
}

record, err := qe.mapRowToQRecord(rows, fieldDescriptions)
Expand All @@ -153,16 +161,11 @@ func (qe *QRepQueryExecutor) processRowsStream(
}

stream.Records <- record

if numRows%heartBeatNumRows == 0 {
qe.logger.Info("processing row stream", slog.String("cursor", cursorName), slog.Int("records", numRows))
}

numRows++
numRows.Add(1)
}

qe.logger.Info("processed row stream", slog.String("cursor", cursorName), slog.Int("records", numRows))
return numRows, nil
qe.logger.Info("processed row stream", slog.String("cursor", cursorName), slog.Int64("records", numRows.Load()))
return int(numRows.Load()), nil
}

func (qe *QRepQueryExecutor) processFetchedRows(
Expand Down

0 comments on commit 9a274a1

Please sign in to comment.