diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index d1fc44b7e6..0bf8958fb5 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -43,6 +43,7 @@ type PostgresCDCSource struct { // for storing chema delta audit logs to catalog catalogPool *pgxpool.Pool + flowJobName string } type PostgresCDCConfig struct { @@ -53,7 +54,8 @@ type PostgresCDCConfig struct { SrcTableIDNameMapping map[uint32]string TableNameMapping map[string]model.NameAndExclude RelationMessageMapping model.RelationMessageMapping - CatalogConnection *pgxpool.Pool + CatalogPool *pgxpool.Pool + FlowJobName string } // Create a new PostgresCDCSource @@ -77,7 +79,8 @@ func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig, customTypeMap map[uint32 commitLock: false, customTypeMapping: customTypeMap, logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)), - catalogPool: cdcConfig.CatalogConnection, + catalogPool: cdcConfig.CatalogPool, + flowJobName: cdcConfig.FlowJobName, }, nil } @@ -193,7 +196,7 @@ func (p *PostgresCDCSource) consumeStream( } var standByLastLogged time.Time - cdcRecordsStorage := cdc_records.NewCDCRecordsStore(req.FlowJobName) + cdcRecordsStorage := cdc_records.NewCDCRecordsStore(p.flowJobName) defer func() { if cdcRecordsStorage.IsEmpty() { records.SignalAsEmpty() @@ -207,7 +210,7 @@ func (p *PostgresCDCSource) consumeStream( }() shutdown := utils.HeartbeatRoutine(p.ctx, 10*time.Second, func() string { - jobName := req.FlowJobName + jobName := p.flowJobName currRecords := cdcRecordsStorage.Len() return fmt.Sprintf("pulling records for job - %s, currently have %d records", jobName, currRecords) }) @@ -271,7 +274,7 @@ func (p *PostgresCDCSource) consumeStream( if waitingForCommit && !p.commitLock { p.logger.Info(fmt.Sprintf( "[%s] commit received, returning currently accumulated records - %d", - req.FlowJobName, + p.flowJobName, cdcRecordsStorage.Len()), ) return nil @@ -281,7 +284,7 @@ func (p *PostgresCDCSource) consumeStream( if time.Now().After(nextStandbyMessageDeadline) { if !cdcRecordsStorage.IsEmpty() { p.logger.Info(fmt.Sprintf("[%s] standby deadline reached, have %d records, will return at next commit", - req.FlowJobName, + p.flowJobName, cdcRecordsStorage.Len()), ) @@ -293,7 +296,7 @@ func (p *PostgresCDCSource) consumeStream( waitingForCommit = true } else { p.logger.Info(fmt.Sprintf("[%s] standby deadline reached, no records accumulated, continuing to wait", - req.FlowJobName), + p.flowJobName), ) } nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout) @@ -356,7 +359,7 @@ func (p *PostgresCDCSource) consumeStream( p.logger.Debug(fmt.Sprintf("XLogData => WALStart %s ServerWALEnd %s ServerTime %s\n", xld.WALStart, xld.ServerWALEnd, xld.ServerTime)) - rec, err := p.processMessage(records, xld, req.FlowJobName, clientXLogPos) + rec, err := p.processMessage(records, xld, clientXLogPos) if err != nil { return fmt.Errorf("error processing message: %w", err) @@ -474,7 +477,7 @@ func (p *PostgresCDCSource) consumeStream( } func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pglogrepl.XLogData, - flowJobName string, currentClientXlogPos pglogrepl.LSN) (model.Record, error) { + currentClientXlogPos pglogrepl.LSN) (model.Record, error) { logicalMsg, err := pglogrepl.Parse(xld.WALData) if err != nil { return nil, fmt.Errorf("error parsing logical message: %w", err) @@ -513,7 +516,7 @@ func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pgl if p.relationMessageMapping[msg.RelationID] == nil { p.relationMessageMapping[msg.RelationID] = convertRelationMessageToProto(msg) } else { - return p.processRelationMessage(currentClientXlogPos, convertRelationMessageToProto(msg), flowJobName) + return p.processRelationMessage(currentClientXlogPos, convertRelationMessageToProto(msg)) } case *pglogrepl.TruncateMessage: @@ -756,7 +759,7 @@ func convertRelationMessageToProto(msg *pglogrepl.RelationMessage) *protos.Relat } } -func (p *PostgresCDCSource) genSchemaDeltaAuditLog(flowJobName string, rec *model.RelationRecord) error { +func (p *PostgresCDCSource) auditSchemaDelta(flowJobName string, rec *model.RelationRecord) error { activityInfo := activity.GetInfo(p.ctx) workflowID := activityInfo.WorkflowExecution.ID runID := activityInfo.WorkflowExecution.RunID @@ -768,7 +771,7 @@ func (p *PostgresCDCSource) genSchemaDeltaAuditLog(flowJobName string, rec *mode _, err = p.catalogPool.Exec(p.ctx, `INSERT INTO peerdb_stats.schema_deltas_audit_log(flow_job_name,workflow_id,run_id,delta_info) - VALUES($1,$2,$3,$4) ON CONFLICT DO NOTHING`, + VALUES($1,$2,$3,$4)`, flowJobName, workflowID, runID, recJSON) if err != nil { return fmt.Errorf("failed to insert row into table: %w", err) @@ -780,7 +783,6 @@ func (p *PostgresCDCSource) genSchemaDeltaAuditLog(flowJobName string, rec *mode func (p *PostgresCDCSource) processRelationMessage( lsn pglogrepl.LSN, currRel *protos.RelationMessage, - flowJobName string, ) (model.Record, error) { // retrieve initial RelationMessage for table changed. prevRel := p.relationMessageMapping[currRel.RelationId] @@ -839,7 +841,7 @@ func (p *PostgresCDCSource) processRelationMessage( TableSchemaDelta: schemaDelta, CheckPointID: int64(lsn), } - return rec, p.genSchemaDeltaAuditLog(flowJobName, rec) + return rec, p.auditSchemaDelta(p.flowJobName, rec) } func (p *PostgresCDCSource) recToTablePKey(req *model.PullRecordsRequest, diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index af968fc5e4..06c1cfd62a 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -236,7 +236,8 @@ func (c *PostgresConnector) PullRecords(catalogPool *pgxpool.Pool, req *model.Pu Publication: publicationName, TableNameMapping: req.TableNameMapping, RelationMessageMapping: req.RelationMessageMapping, - CatalogConnection: catalogPool, + CatalogPool: catalogPool, + FlowJobName: req.FlowJobName, }, c.customTypesMapping) if err != nil { return fmt.Errorf("failed to create cdc source: %w", err)