diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 20b5a2f2e5..d1fc44b7e6 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -3,6 +3,7 @@ package connpostgres import ( "context" "crypto/sha256" + "encoding/json" "fmt" "log/slog" "time" @@ -20,6 +21,7 @@ import ( "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" "github.com/lib/pq/oid" + "go.temporal.io/sdk/activity" ) type PostgresCDCSource struct { @@ -38,6 +40,9 @@ type PostgresCDCSource struct { // for partitioned tables, maps child relid to parent relid childToParentRelIDMapping map[uint32]uint32 logger slog.Logger + + // for storing chema delta audit logs to catalog + catalogPool *pgxpool.Pool } type PostgresCDCConfig struct { @@ -48,6 +53,7 @@ type PostgresCDCConfig struct { SrcTableIDNameMapping map[uint32]string TableNameMapping map[string]model.NameAndExclude RelationMessageMapping model.RelationMessageMapping + CatalogConnection *pgxpool.Pool } // Create a new PostgresCDCSource @@ -71,6 +77,7 @@ func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig, customTypeMap map[uint32 commitLock: false, customTypeMapping: customTypeMap, logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)), + catalogPool: cdcConfig.CatalogConnection, }, nil } @@ -329,8 +336,9 @@ func (p *PostgresCDCSource) consumeStream( return fmt.Errorf("ParsePrimaryKeepaliveMessage failed: %w", err) } - p.logger.Debug(fmt.Sprintf("Primary Keepalive Message => ServerWALEnd: %s ServerTime: %s ReplyRequested: %t", - pkm.ServerWALEnd, pkm.ServerTime, pkm.ReplyRequested)) + p.logger.Debug( + fmt.Sprintf("Primary Keepalive Message => ServerWALEnd: %s ServerTime: %s ReplyRequested: %t", + pkm.ServerWALEnd, pkm.ServerTime, pkm.ReplyRequested)) if pkm.ServerWALEnd > clientXLogPos { clientXLogPos = pkm.ServerWALEnd @@ -348,7 +356,7 @@ func (p *PostgresCDCSource) consumeStream( p.logger.Debug(fmt.Sprintf("XLogData => WALStart %s ServerWALEnd %s ServerTime %s\n", xld.WALStart, xld.ServerWALEnd, xld.ServerTime)) - rec, err := p.processMessage(records, xld) + rec, err := p.processMessage(records, xld, req.FlowJobName, clientXLogPos) if err != nil { return fmt.Errorf("error processing message: %w", err) @@ -465,7 +473,8 @@ func (p *PostgresCDCSource) consumeStream( } } -func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pglogrepl.XLogData) (model.Record, error) { +func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pglogrepl.XLogData, + flowJobName string, currentClientXlogPos pglogrepl.LSN) (model.Record, error) { logicalMsg, err := pglogrepl.Parse(xld.WALData) if err != nil { return nil, fmt.Errorf("error parsing logical message: %w", err) @@ -504,7 +513,7 @@ func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pgl if p.relationMessageMapping[msg.RelationID] == nil { p.relationMessageMapping[msg.RelationID] = convertRelationMessageToProto(msg) } else { - return p.processRelationMessage(xld.WALStart, convertRelationMessageToProto(msg)) + return p.processRelationMessage(currentClientXlogPos, convertRelationMessageToProto(msg), flowJobName) } case *pglogrepl.TruncateMessage: @@ -747,10 +756,31 @@ func convertRelationMessageToProto(msg *pglogrepl.RelationMessage) *protos.Relat } } -// processRelationMessage processes a delete message and returns a TableSchemaDelta +func (p *PostgresCDCSource) genSchemaDeltaAuditLog(flowJobName string, rec *model.RelationRecord) error { + activityInfo := activity.GetInfo(p.ctx) + workflowID := activityInfo.WorkflowExecution.ID + runID := activityInfo.WorkflowExecution.RunID + recJSON, err := json.Marshal(rec) + if err != nil { + return fmt.Errorf("failed to marshal schema delta to JSON: %w", err) + } + + _, err = p.catalogPool.Exec(p.ctx, + `INSERT INTO + peerdb_stats.schema_deltas_audit_log(flow_job_name,workflow_id,run_id,delta_info) + VALUES($1,$2,$3,$4) ON CONFLICT DO NOTHING`, + flowJobName, workflowID, runID, recJSON) + if err != nil { + return fmt.Errorf("failed to insert row into table: %w", err) + } + return nil +} + +// processRelationMessage processes a RelationMessage and returns a TableSchemaDelta func (p *PostgresCDCSource) processRelationMessage( lsn pglogrepl.LSN, currRel *protos.RelationMessage, + flowJobName string, ) (model.Record, error) { // retrieve initial RelationMessage for table changed. prevRel := p.relationMessageMapping[currRel.RelationId] @@ -805,10 +835,11 @@ func (p *PostgresCDCSource) processRelationMessage( } p.relationMessageMapping[currRel.RelationId] = currRel - return &model.RelationRecord{ + rec := &model.RelationRecord{ TableSchemaDelta: schemaDelta, CheckPointID: int64(lsn), - }, nil + } + return rec, p.genSchemaDeltaAuditLog(flowJobName, rec) } func (p *PostgresCDCSource) recToTablePKey(req *model.PullRecordsRequest, diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index 1eec3c5bf7..21a7cfc8eb 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -382,7 +382,7 @@ func (c *PostgresConnector) GetLastSyncBatchID(jobName string) (int64, error) { var result pgtype.Int8 if !rows.Next() { - c.logger.Info("No row found ,returning 0") + c.logger.Info("No row found, returning 0") return 0, nil } err = rows.Scan(&result) diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index cf2a92b5b2..af968fc5e4 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -236,6 +236,7 @@ func (c *PostgresConnector) PullRecords(catalogPool *pgxpool.Pool, req *model.Pu Publication: publicationName, TableNameMapping: req.TableNameMapping, RelationMessageMapping: req.RelationMessageMapping, + CatalogConnection: catalogPool, }, c.customTypesMapping) if err != nil { return fmt.Errorf("failed to create cdc source: %w", err) @@ -363,7 +364,8 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S len(records), syncedRecordsCount) } - c.logger.Info(fmt.Sprintf("synced %d records to Postgres table %s via COPY", syncedRecordsCount, rawTableIdentifier)) + c.logger.Info(fmt.Sprintf("synced %d records to Postgres table %s via COPY", + syncedRecordsCount, rawTableIdentifier)) lastCP, err := req.Records.GetLastCheckpoint() if err != nil { diff --git a/nexus/catalog/migrations/V15__schema_deltas_audit_log.sql b/nexus/catalog/migrations/V15__schema_deltas_audit_log.sql new file mode 100644 index 0000000000..92fe24cab9 --- /dev/null +++ b/nexus/catalog/migrations/V15__schema_deltas_audit_log.sql @@ -0,0 +1,8 @@ +CREATE TABLE IF NOT EXISTS peerdb_stats.schema_deltas_audit_log ( + id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + flow_job_name TEXT NOT NULL, + read_timestamp TIMESTAMP DEFAULT now(), + workflow_id TEXT NOT NULL, + run_id TEXT NOT NULL, + delta_info JSONB NOT NULL +); \ No newline at end of file