From 25d7427e56efc55b07bd7a8a2a29e37f457d6b7c Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Sat, 30 Sep 2023 02:42:11 +0530 Subject: [PATCH 1/3] changed WAL update logic to hopefully fix schema changes bugs --- flow/connectors/postgres/cdc.go | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 65e75acfc0..dd3deea677 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -141,7 +141,9 @@ func (p *PostgresCDCSource) consumeStream( }() for { - if time.Now().After(nextStandbyMessageDeadline) { + if time.Now().After(nextStandbyMessageDeadline) || + earlyReturn || + (records.Records != nil && (len(records.Records) == int(req.MaxBatchSize))) { // update the WALWritePosition to be clientXLogPos - 1 // as the clientXLogPos is the last checkpoint id + 1 // and we want to send the last checkpoint id as the last @@ -160,6 +162,10 @@ func (p *PostgresCDCSource) consumeStream( utils.RecordHeartbeatWithRecover(p.ctx, numRowsProcessedMessage) log.Infof("Sent Standby status message. %s", numRowsProcessedMessage) nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout) + + if earlyReturn || (records.Records != nil && (len(records.Records) == int(req.MaxBatchSize))) { + return result, nil + } } ctx, cancel := context.WithDeadline(p.ctx, nextStandbyMessageDeadline) @@ -284,12 +290,9 @@ func (p *PostgresCDCSource) consumeStream( } } - currentPos := xld.WALStart + pglogrepl.LSN(len(xld.WALData)) - records.LastCheckPointID = int64(currentPos) - - if records.Records != nil && - ((len(records.Records) == int(req.MaxBatchSize)) || earlyReturn) { - return result, nil + if xld.WALStart > clientXLogPos { + clientXLogPos = xld.WALStart + records.LastCheckPointID = int64(clientXLogPos) } } } From 696148213ea481b3e7f821f64efb33391b0e25c5 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Sat, 30 Sep 2023 09:23:03 -0400 Subject: [PATCH 2/3] use len on slice --- flow/connectors/postgres/cdc.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index dd3deea677..3be9406c34 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -143,7 +143,7 @@ func (p *PostgresCDCSource) consumeStream( for { if time.Now().After(nextStandbyMessageDeadline) || earlyReturn || - (records.Records != nil && (len(records.Records) == int(req.MaxBatchSize))) { + (len(records.Records) == int(req.MaxBatchSize)) { // update the WALWritePosition to be clientXLogPos - 1 // as the clientXLogPos is the last checkpoint id + 1 // and we want to send the last checkpoint id as the last @@ -163,7 +163,7 @@ func (p *PostgresCDCSource) consumeStream( log.Infof("Sent Standby status message. %s", numRowsProcessedMessage) nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout) - if earlyReturn || (records.Records != nil && (len(records.Records) == int(req.MaxBatchSize))) { + if earlyReturn || (len(records.Records) == int(req.MaxBatchSize)) { return result, nil } } From 1120236dca4cf1ee6cec427fed952b033012278a Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Sat, 30 Sep 2023 09:29:29 -0400 Subject: [PATCH 3/3] more fixes to handling logrepl messages See: https://github.com/jackc/pglogrepl/pull/59 --- flow/connectors/postgres/cdc.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 3be9406c34..48a94cd40a 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -201,6 +201,9 @@ func (p *PostgresCDCSource) consumeStream( log.Debugf("Primary Keepalive Message => ServerWALEnd: %s ServerTime: %s ReplyRequested: %t", pkm.ServerWALEnd, pkm.ServerTime, pkm.ReplyRequested) + if pkm.ServerWALEnd > clientXLogPos { + clientXLogPos = pkm.ServerWALEnd + } if pkm.ReplyRequested { nextStandbyMessageDeadline = time.Time{} }