From 93e0bd322f91941b907c748dc6595fcc64ea5346 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Tue, 13 Feb 2024 08:25:42 -0500 Subject: [PATCH] better CDC error logging --- flow/connectors/postgres/cdc.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 89805659f4..44e81c8227 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -286,17 +286,21 @@ func (p *PostgresCDCSource) consumeStream( // if we are past the next standby deadline (?) if time.Now().After(nextStandbyMessageDeadline) { if !cdcRecordsStorage.IsEmpty() { - p.logger.Info(fmt.Sprintf("[%s] standby deadline reached, have %d records, will return at next commit", + p.logger.Info(fmt.Sprintf("[%s] standby deadline reached, have %d records", p.flowJobName, cdcRecordsStorage.Len()), ) if !p.commitLock { - // immediate return if we are not waiting for a commit + p.logger.Info( + fmt.Sprintf("no commit lock, returning currently accumulated records - %d", + cdcRecordsStorage.Len())) return nil + } else { + p.logger.Info(fmt.Sprintf("commit lock, waiting for commit to return records - %d", + cdcRecordsStorage.Len())) + waitingForCommit = true } - - waitingForCommit = true } else { p.logger.Info(fmt.Sprintf("[%s] standby deadline reached, no records accumulated, continuing to wait", p.flowJobName),