diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 6130094cd7..e8137381b1 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -317,7 +317,11 @@ func (p *PostgresCDCSource) processMessage(batch *model.RecordBatch, xld pglogre case *pglogrepl.BeginMessage: log.Debugf("BeginMessage => FinalLSN: %v, XID: %v", msg.FinalLSN, msg.Xid) log.Debugf("Locking PullRecords at BeginMessage, awaiting CommitMessage") - p.commitLock = true + if utils.GetEnvBool("PEERDB_BETA_EVENTHUB_PUSH_ASYNC", false) { + p.commitLock = false + } else { + p.commitLock = true + } case *pglogrepl.InsertMessage: return p.processInsertMessage(xld.WALStart, msg) case *pglogrepl.UpdateMessage: