diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index b35045b945..fc0063c1ab 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -498,6 +498,10 @@ func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pgl // treat all relation messages as corresponding to parent if partitioned. msg.RelationID = p.getParentRelIDIfPartitioned(msg.RelationID) + if _, exists := p.SrcTableIDNameMapping[msg.RelationID]; !exists { + return nil, nil + } + // TODO (kaushik): consider persistent state for a mirror job // to be stored somewhere in temporal state. We might need to persist // the state of the relation message somewhere