diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 3d1c226737..3f1d57495f 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + log "github.com/sirupsen/logrus" connbigquery "github.com/PeerDB-io/peer-flow/connectors/bigquery" conneventhub "github.com/PeerDB-io/peer-flow/connectors/eventhub" @@ -254,5 +255,8 @@ func CloseConnector(conn Connector) { return } - conn.Close() + err := conn.Close() + if err != nil { + log.Errorf("error closing connector: %v", err) + } } diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index a99142c42a..fa5e156c48 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -166,6 +166,8 @@ func (p *PostgresCDCSource) consumeStream( shutdown <- true }() + firstProcessed := false + for { if time.Now().After(nextStandbyMessageDeadline) || (len(records.Records) >= int(req.MaxBatchSize)) { @@ -214,8 +216,6 @@ func (p *PostgresCDCSource) consumeStream( continue } - firstProcessed := false - switch msg.Data[0] { case pglogrepl.PrimaryKeepaliveMessageByteID: pkm, err := pglogrepl.ParsePrimaryKeepaliveMessage(msg.Data[1:])