From ba453a476a8456d5c091f1623fde326f8a0af60a Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Mon, 18 Dec 2023 16:27:31 +0530 Subject: [PATCH 1/3] logs schema deltas to catalog as soon as they are read --- flow/connectors/postgres/cdc.go | 48 +++++++++++++++---- flow/connectors/postgres/client.go | 2 +- flow/connectors/postgres/postgres.go | 4 +- .../V15__schema_deltas_audit_log.sql | 8 ++++ 4 files changed, 52 insertions(+), 10 deletions(-) create mode 100644 nexus/catalog/migrations/V15__schema_deltas_audit_log.sql diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 031ae5a8e3..36a0404f78 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -3,6 +3,7 @@ package connpostgres import ( "context" "crypto/sha256" + "encoding/json" "fmt" "log/slog" "time" @@ -20,6 +21,7 @@ import ( "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" "github.com/lib/pq/oid" + "go.temporal.io/sdk/activity" ) type PostgresCDCSource struct { @@ -38,6 +40,9 @@ type PostgresCDCSource struct { // for partitioned tables, maps child relid to parent relid childToParentRelIDMapping map[uint32]uint32 logger slog.Logger + + // for storing chema delta audit logs to catalog + catalogPool *pgxpool.Pool } type PostgresCDCConfig struct { @@ -48,6 +53,7 @@ type PostgresCDCConfig struct { SrcTableIDNameMapping map[uint32]string TableNameMapping map[string]model.NameAndExclude RelationMessageMapping model.RelationMessageMapping + CatalogConnection *pgxpool.Pool } // Create a new PostgresCDCSource @@ -71,6 +77,7 @@ func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig, customTypeMap map[uint32 commitLock: false, customTypeMapping: customTypeMap, logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)), + catalogPool: cdcConfig.CatalogConnection, }, nil } @@ -329,8 +336,9 @@ func (p *PostgresCDCSource) consumeStream( return fmt.Errorf("ParsePrimaryKeepaliveMessage failed: %w", err) } - p.logger.Debug(fmt.Sprintf("Primary Keepalive Message => ServerWALEnd: %s ServerTime: %s ReplyRequested: %t", - pkm.ServerWALEnd, pkm.ServerTime, pkm.ReplyRequested)) + p.logger.Debug( + fmt.Sprintf("Primary Keepalive Message => ServerWALEnd: %s ServerTime: %s ReplyRequested: %t", + pkm.ServerWALEnd, pkm.ServerTime, pkm.ReplyRequested)) if pkm.ServerWALEnd > clientXLogPos { clientXLogPos = pkm.ServerWALEnd @@ -348,7 +356,8 @@ func (p *PostgresCDCSource) consumeStream( p.logger.Debug(fmt.Sprintf("XLogData => WALStart %s ServerWALEnd %s ServerTime %s\n", xld.WALStart, xld.ServerWALEnd, xld.ServerTime)) - rec, err := p.processMessage(records, xld) + rec, err := p.processMessage(records, xld, req.FlowJobName, clientXLogPos) + if err != nil { return fmt.Errorf("error processing message: %w", err) } @@ -464,7 +473,8 @@ func (p *PostgresCDCSource) consumeStream( } } -func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pglogrepl.XLogData) (model.Record, error) { +func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pglogrepl.XLogData, + flowJobName string, currentClientXlogPos pglogrepl.LSN) (model.Record, error) { logicalMsg, err := pglogrepl.Parse(xld.WALData) if err != nil { return nil, fmt.Errorf("error parsing logical message: %w", err) @@ -503,7 +513,7 @@ func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pgl if p.relationMessageMapping[msg.RelationID] == nil { p.relationMessageMapping[msg.RelationID] = convertRelationMessageToProto(msg) } else { - return p.processRelationMessage(xld.WALStart, convertRelationMessageToProto(msg)) + return p.processRelationMessage(currentClientXlogPos, convertRelationMessageToProto(msg), flowJobName) } case *pglogrepl.TruncateMessage: @@ -746,10 +756,31 @@ func convertRelationMessageToProto(msg *pglogrepl.RelationMessage) *protos.Relat } } -// processRelationMessage processes a delete message and returns a TableSchemaDelta +func (p *PostgresCDCSource) genSchemaDeltaAuditLog(flowJobName string, rec *model.RelationRecord) error { + activityInfo := activity.GetInfo(p.ctx) + workflowID := activityInfo.WorkflowExecution.ID + runID := activityInfo.WorkflowExecution.RunID + recJSON, err := json.Marshal(rec) + if err != nil { + return fmt.Errorf("failed to marshal schema delta to JSON: %w", err) + } + + _, err = p.catalogPool.Exec(p.ctx, + `INSERT INTO + peerdb_stats.schema_deltas_audit_log(flow_job_name,workflow_id,run_id,delta_info) + VALUES($1,$2,$3,$4) ON CONFLICT DO NOTHING`, + flowJobName, workflowID, runID, recJSON) + if err != nil { + return fmt.Errorf("failed to insert row into table: %w", err) + } + return nil +} + +// processRelationMessage processes a RelationMessage and returns a TableSchemaDelta func (p *PostgresCDCSource) processRelationMessage( lsn pglogrepl.LSN, currRel *protos.RelationMessage, + flowJobName string, ) (model.Record, error) { // retrieve initial RelationMessage for table changed. prevRel := p.relationMessageMapping[currRel.RelationId] @@ -804,10 +835,11 @@ func (p *PostgresCDCSource) processRelationMessage( } p.relationMessageMapping[currRel.RelationId] = currRel - return &model.RelationRecord{ + rec := &model.RelationRecord{ TableSchemaDelta: schemaDelta, CheckPointID: int64(lsn), - }, nil + } + return rec, p.genSchemaDeltaAuditLog(flowJobName, rec) } func (p *PostgresCDCSource) recToTablePKey(req *model.PullRecordsRequest, diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index dbae60b9e6..77a5413de7 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -383,7 +383,7 @@ func (c *PostgresConnector) GetLastSyncBatchID(jobName string) (int64, error) { var result pgtype.Int8 if !rows.Next() { - c.logger.Info("No row found ,returning 0") + c.logger.Info("No row found, returning 0") return 0, nil } err = rows.Scan(&result) diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index da65b09c0d..33510036c9 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -236,6 +236,7 @@ func (c *PostgresConnector) PullRecords(catalogPool *pgxpool.Pool, req *model.Pu Publication: publicationName, TableNameMapping: req.TableNameMapping, RelationMessageMapping: req.RelationMessageMapping, + CatalogConnection: catalogPool, }, c.customTypesMapping) if err != nil { return fmt.Errorf("failed to create cdc source: %w", err) @@ -365,7 +366,8 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S len(records), syncedRecordsCount) } - c.logger.Info(fmt.Sprintf("synced %d records to Postgres table %s via COPY", syncedRecordsCount, rawTableIdentifier)) + c.logger.Info(fmt.Sprintf("synced %d records to Postgres table %s via COPY", + syncedRecordsCount, rawTableIdentifier)) lastCP, err := req.Records.GetLastCheckpoint() if err != nil { diff --git a/nexus/catalog/migrations/V15__schema_deltas_audit_log.sql b/nexus/catalog/migrations/V15__schema_deltas_audit_log.sql new file mode 100644 index 0000000000..92fe24cab9 --- /dev/null +++ b/nexus/catalog/migrations/V15__schema_deltas_audit_log.sql @@ -0,0 +1,8 @@ +CREATE TABLE IF NOT EXISTS peerdb_stats.schema_deltas_audit_log ( + id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + flow_job_name TEXT NOT NULL, + read_timestamp TIMESTAMP DEFAULT now(), + workflow_id TEXT NOT NULL, + run_id TEXT NOT NULL, + delta_info JSONB NOT NULL +); \ No newline at end of file From 25a11590766f7e30cc245bf7e57dedf1c368e8e7 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Mon, 18 Dec 2023 19:28:08 +0530 Subject: [PATCH 2/3] fixed review comments --- flow/connectors/postgres/cdc.go | 30 +++++++++++++++------------- flow/connectors/postgres/postgres.go | 3 ++- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 36a0404f78..aeb058dcf6 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -43,6 +43,7 @@ type PostgresCDCSource struct { // for storing chema delta audit logs to catalog catalogPool *pgxpool.Pool + flowJobName string } type PostgresCDCConfig struct { @@ -53,7 +54,8 @@ type PostgresCDCConfig struct { SrcTableIDNameMapping map[uint32]string TableNameMapping map[string]model.NameAndExclude RelationMessageMapping model.RelationMessageMapping - CatalogConnection *pgxpool.Pool + CatalogPool *pgxpool.Pool + FlowJobName string } // Create a new PostgresCDCSource @@ -77,7 +79,8 @@ func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig, customTypeMap map[uint32 commitLock: false, customTypeMapping: customTypeMap, logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)), - catalogPool: cdcConfig.CatalogConnection, + catalogPool: cdcConfig.CatalogPool, + flowJobName: cdcConfig.FlowJobName, }, nil } @@ -193,7 +196,7 @@ func (p *PostgresCDCSource) consumeStream( } var standByLastLogged time.Time - cdcRecordsStorage := cdc_records.NewCDCRecordsStore(req.FlowJobName) + cdcRecordsStorage := cdc_records.NewCDCRecordsStore(p.flowJobName) defer func() { if cdcRecordsStorage.IsEmpty() { records.SignalAsEmpty() @@ -207,7 +210,7 @@ func (p *PostgresCDCSource) consumeStream( }() shutdown := utils.HeartbeatRoutine(p.ctx, 10*time.Second, func() string { - jobName := req.FlowJobName + jobName := p.flowJobName currRecords := cdcRecordsStorage.Len() return fmt.Sprintf("pulling records for job - %s, currently have %d records", jobName, currRecords) }) @@ -271,7 +274,7 @@ func (p *PostgresCDCSource) consumeStream( if waitingForCommit && !p.commitLock { p.logger.Info(fmt.Sprintf( "[%s] commit received, returning currently accumulated records - %d", - req.FlowJobName, + p.flowJobName, cdcRecordsStorage.Len()), ) return nil @@ -281,7 +284,7 @@ func (p *PostgresCDCSource) consumeStream( if time.Now().After(nextStandbyMessageDeadline) { if !cdcRecordsStorage.IsEmpty() { p.logger.Info(fmt.Sprintf("[%s] standby deadline reached, have %d records, will return at next commit", - req.FlowJobName, + p.flowJobName, cdcRecordsStorage.Len()), ) @@ -293,7 +296,7 @@ func (p *PostgresCDCSource) consumeStream( waitingForCommit = true } else { p.logger.Info(fmt.Sprintf("[%s] standby deadline reached, no records accumulated, continuing to wait", - req.FlowJobName), + p.flowJobName), ) } nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout) @@ -356,7 +359,7 @@ func (p *PostgresCDCSource) consumeStream( p.logger.Debug(fmt.Sprintf("XLogData => WALStart %s ServerWALEnd %s ServerTime %s\n", xld.WALStart, xld.ServerWALEnd, xld.ServerTime)) - rec, err := p.processMessage(records, xld, req.FlowJobName, clientXLogPos) + rec, err := p.processMessage(records, xld, clientXLogPos) if err != nil { return fmt.Errorf("error processing message: %w", err) @@ -474,7 +477,7 @@ func (p *PostgresCDCSource) consumeStream( } func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pglogrepl.XLogData, - flowJobName string, currentClientXlogPos pglogrepl.LSN) (model.Record, error) { + currentClientXlogPos pglogrepl.LSN) (model.Record, error) { logicalMsg, err := pglogrepl.Parse(xld.WALData) if err != nil { return nil, fmt.Errorf("error parsing logical message: %w", err) @@ -513,7 +516,7 @@ func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pgl if p.relationMessageMapping[msg.RelationID] == nil { p.relationMessageMapping[msg.RelationID] = convertRelationMessageToProto(msg) } else { - return p.processRelationMessage(currentClientXlogPos, convertRelationMessageToProto(msg), flowJobName) + return p.processRelationMessage(currentClientXlogPos, convertRelationMessageToProto(msg)) } case *pglogrepl.TruncateMessage: @@ -756,7 +759,7 @@ func convertRelationMessageToProto(msg *pglogrepl.RelationMessage) *protos.Relat } } -func (p *PostgresCDCSource) genSchemaDeltaAuditLog(flowJobName string, rec *model.RelationRecord) error { +func (p *PostgresCDCSource) auditSchemaDelta(flowJobName string, rec *model.RelationRecord) error { activityInfo := activity.GetInfo(p.ctx) workflowID := activityInfo.WorkflowExecution.ID runID := activityInfo.WorkflowExecution.RunID @@ -768,7 +771,7 @@ func (p *PostgresCDCSource) genSchemaDeltaAuditLog(flowJobName string, rec *mode _, err = p.catalogPool.Exec(p.ctx, `INSERT INTO peerdb_stats.schema_deltas_audit_log(flow_job_name,workflow_id,run_id,delta_info) - VALUES($1,$2,$3,$4) ON CONFLICT DO NOTHING`, + VALUES($1,$2,$3,$4)`, flowJobName, workflowID, runID, recJSON) if err != nil { return fmt.Errorf("failed to insert row into table: %w", err) @@ -780,7 +783,6 @@ func (p *PostgresCDCSource) genSchemaDeltaAuditLog(flowJobName string, rec *mode func (p *PostgresCDCSource) processRelationMessage( lsn pglogrepl.LSN, currRel *protos.RelationMessage, - flowJobName string, ) (model.Record, error) { // retrieve initial RelationMessage for table changed. prevRel := p.relationMessageMapping[currRel.RelationId] @@ -839,7 +841,7 @@ func (p *PostgresCDCSource) processRelationMessage( TableSchemaDelta: schemaDelta, CheckPointID: int64(lsn), } - return rec, p.genSchemaDeltaAuditLog(flowJobName, rec) + return rec, p.auditSchemaDelta(p.flowJobName, rec) } func (p *PostgresCDCSource) recToTablePKey(req *model.PullRecordsRequest, diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 33510036c9..20dd2a5a71 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -236,7 +236,8 @@ func (c *PostgresConnector) PullRecords(catalogPool *pgxpool.Pool, req *model.Pu Publication: publicationName, TableNameMapping: req.TableNameMapping, RelationMessageMapping: req.RelationMessageMapping, - CatalogConnection: catalogPool, + CatalogPool: catalogPool, + FlowJobName: req.FlowJobName, }, c.customTypesMapping) if err != nil { return fmt.Errorf("failed to create cdc source: %w", err) From b13b420b09158638ae2d85a50bbf5f744c9ae5c9 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Mon, 18 Dec 2023 23:59:26 +0530 Subject: [PATCH 3/3] added comment --- flow/connectors/postgres/cdc.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index aeb058dcf6..979723f930 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -516,6 +516,9 @@ func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pgl if p.relationMessageMapping[msg.RelationID] == nil { p.relationMessageMapping[msg.RelationID] = convertRelationMessageToProto(msg) } else { + // RelationMessages don't contain an LSN, so we use current clientXlogPos instead. + //nolint:lll + // https://github.com/postgres/postgres/blob/8b965c549dc8753be8a38c4a1b9fabdb535a4338/src/backend/replication/logical/proto.c#L670 return p.processRelationMessage(currentClientXlogPos, convertRelationMessageToProto(msg)) }