From 9a274a16fba6c50627cda086524384a0062918eb Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 22 May 2024 21:38:41 +0530 Subject: [PATCH] add heartbeats for kafka --- flow/connectors/kafka/qrep.go | 6 ++++- .../postgres/qrep_query_executor.go | 25 +++++++++++-------- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/flow/connectors/kafka/qrep.go b/flow/connectors/kafka/qrep.go index 21bd08de97..7733a0f196 100644 --- a/flow/connectors/kafka/qrep.go +++ b/flow/connectors/kafka/qrep.go @@ -3,6 +3,7 @@ package connkafka import ( "context" "fmt" + "log/slog" "sync/atomic" "time" @@ -30,13 +31,16 @@ func (c *KafkaConnector) SyncQRepRecords( schema := stream.Schema() shutdown := utils.HeartbeatRoutine(ctx, func() string { - return fmt.Sprintf("sent %d records to %s", numRecords.Load(), config.DestinationTableIdentifier) + msg := fmt.Sprintf("sent %d records to %s", numRecords.Load(), config.DestinationTableIdentifier) + c.logger.Info(msg) + return msg }) defer shutdown() queueCtx, queueErr := context.WithCancelCause(ctx) pool, err := c.createPool(queueCtx, config.Script, config.FlowJobName, nil, queueErr) if err != nil { + c.logger.Error("failed to create pool", slog.Any("error", err)) return 0, err } defer pool.Close() diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index 05c4d5b71a..313916f4fc 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -4,12 +4,14 @@ import ( "context" "fmt" "log/slog" + "sync/atomic" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgtype" "go.temporal.io/sdk/log" + "github.com/PeerDB-io/peer-flow/connectors/utils" datatypes "github.com/PeerDB-io/peer-flow/datatypes" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" @@ -136,13 +138,19 @@ func (qe *QRepQueryExecutor) processRowsStream( rows pgx.Rows, fieldDescriptions []pgconn.FieldDescription, ) (int, error) { - numRows := 0 - const heartBeatNumRows = 10000 + var numRows atomic.Int64 + + shutdown := utils.HeartbeatRoutine(ctx, func() string { + msg := fmt.Sprintf("processed %d rows", numRows.Load()) + qe.logger.Info(msg) + return msg + }) + defer shutdown() for rows.Next() { if err := ctx.Err(); err != nil { qe.logger.Info("Context canceled, exiting processRowsStream early") - return numRows, err + return int(numRows.Load()), err } record, err := qe.mapRowToQRecord(rows, fieldDescriptions) @@ -153,16 +161,11 @@ func (qe *QRepQueryExecutor) processRowsStream( } stream.Records <- record - - if numRows%heartBeatNumRows == 0 { - qe.logger.Info("processing row stream", slog.String("cursor", cursorName), slog.Int("records", numRows)) - } - - numRows++ + numRows.Add(1) } - qe.logger.Info("processed row stream", slog.String("cursor", cursorName), slog.Int("records", numRows)) - return numRows, nil + qe.logger.Info("processed row stream", slog.String("cursor", cursorName), slog.Int64("records", numRows.Load())) + return int(numRows.Load()), nil } func (qe *QRepQueryExecutor) processFetchedRows(