Skip to content

Commit

Permalink
fixed review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Dec 18, 2023
1 parent 0ef99e1 commit 76a6002
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 15 deletions.
30 changes: 16 additions & 14 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type PostgresCDCSource struct {

// for storing chema delta audit logs to catalog
catalogPool *pgxpool.Pool
flowJobName string
}

type PostgresCDCConfig struct {
Expand All @@ -53,7 +54,8 @@ type PostgresCDCConfig struct {
SrcTableIDNameMapping map[uint32]string
TableNameMapping map[string]model.NameAndExclude
RelationMessageMapping model.RelationMessageMapping
CatalogConnection *pgxpool.Pool
CatalogPool *pgxpool.Pool
FlowJobName string
}

// Create a new PostgresCDCSource
Expand All @@ -77,7 +79,8 @@ func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig, customTypeMap map[uint32
commitLock: false,
customTypeMapping: customTypeMap,
logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)),
catalogPool: cdcConfig.CatalogConnection,
catalogPool: cdcConfig.CatalogPool,
flowJobName: cdcConfig.FlowJobName,
}, nil
}

Expand Down Expand Up @@ -193,7 +196,7 @@ func (p *PostgresCDCSource) consumeStream(
}

var standByLastLogged time.Time
cdcRecordsStorage := cdc_records.NewCDCRecordsStore(req.FlowJobName)
cdcRecordsStorage := cdc_records.NewCDCRecordsStore(p.flowJobName)
defer func() {
if cdcRecordsStorage.IsEmpty() {
records.SignalAsEmpty()
Expand All @@ -207,7 +210,7 @@ func (p *PostgresCDCSource) consumeStream(
}()

shutdown := utils.HeartbeatRoutine(p.ctx, 10*time.Second, func() string {
jobName := req.FlowJobName
jobName := p.flowJobName
currRecords := cdcRecordsStorage.Len()
return fmt.Sprintf("pulling records for job - %s, currently have %d records", jobName, currRecords)
})
Expand Down Expand Up @@ -271,7 +274,7 @@ func (p *PostgresCDCSource) consumeStream(
if waitingForCommit && !p.commitLock {
p.logger.Info(fmt.Sprintf(
"[%s] commit received, returning currently accumulated records - %d",
req.FlowJobName,
p.flowJobName,
cdcRecordsStorage.Len()),
)
return nil
Expand All @@ -281,7 +284,7 @@ func (p *PostgresCDCSource) consumeStream(
if time.Now().After(nextStandbyMessageDeadline) {
if !cdcRecordsStorage.IsEmpty() {
p.logger.Info(fmt.Sprintf("[%s] standby deadline reached, have %d records, will return at next commit",
req.FlowJobName,
p.flowJobName,
cdcRecordsStorage.Len()),
)

Expand All @@ -293,7 +296,7 @@ func (p *PostgresCDCSource) consumeStream(
waitingForCommit = true
} else {
p.logger.Info(fmt.Sprintf("[%s] standby deadline reached, no records accumulated, continuing to wait",
req.FlowJobName),
p.flowJobName),
)
}
nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout)
Expand Down Expand Up @@ -356,7 +359,7 @@ func (p *PostgresCDCSource) consumeStream(

p.logger.Debug(fmt.Sprintf("XLogData => WALStart %s ServerWALEnd %s ServerTime %s\n",
xld.WALStart, xld.ServerWALEnd, xld.ServerTime))
rec, err := p.processMessage(records, xld, req.FlowJobName, clientXLogPos)
rec, err := p.processMessage(records, xld, clientXLogPos)

if err != nil {
return fmt.Errorf("error processing message: %w", err)
Expand Down Expand Up @@ -474,7 +477,7 @@ func (p *PostgresCDCSource) consumeStream(
}

func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pglogrepl.XLogData,
flowJobName string, currentClientXlogPos pglogrepl.LSN) (model.Record, error) {
currentClientXlogPos pglogrepl.LSN) (model.Record, error) {
logicalMsg, err := pglogrepl.Parse(xld.WALData)
if err != nil {
return nil, fmt.Errorf("error parsing logical message: %w", err)
Expand Down Expand Up @@ -513,7 +516,7 @@ func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pgl
if p.relationMessageMapping[msg.RelationID] == nil {
p.relationMessageMapping[msg.RelationID] = convertRelationMessageToProto(msg)
} else {
return p.processRelationMessage(currentClientXlogPos, convertRelationMessageToProto(msg), flowJobName)
return p.processRelationMessage(currentClientXlogPos, convertRelationMessageToProto(msg))
}

case *pglogrepl.TruncateMessage:
Expand Down Expand Up @@ -756,7 +759,7 @@ func convertRelationMessageToProto(msg *pglogrepl.RelationMessage) *protos.Relat
}
}

func (p *PostgresCDCSource) genSchemaDeltaAuditLog(flowJobName string, rec *model.RelationRecord) error {
func (p *PostgresCDCSource) auditSchemaDelta(flowJobName string, rec *model.RelationRecord) error {
activityInfo := activity.GetInfo(p.ctx)
workflowID := activityInfo.WorkflowExecution.ID
runID := activityInfo.WorkflowExecution.RunID
Expand All @@ -768,7 +771,7 @@ func (p *PostgresCDCSource) genSchemaDeltaAuditLog(flowJobName string, rec *mode
_, err = p.catalogPool.Exec(p.ctx,
`INSERT INTO
peerdb_stats.schema_deltas_audit_log(flow_job_name,workflow_id,run_id,delta_info)
VALUES($1,$2,$3,$4) ON CONFLICT DO NOTHING`,
VALUES($1,$2,$3,$4)`,
flowJobName, workflowID, runID, recJSON)
if err != nil {
return fmt.Errorf("failed to insert row into table: %w", err)
Expand All @@ -780,7 +783,6 @@ func (p *PostgresCDCSource) genSchemaDeltaAuditLog(flowJobName string, rec *mode
func (p *PostgresCDCSource) processRelationMessage(
lsn pglogrepl.LSN,
currRel *protos.RelationMessage,
flowJobName string,
) (model.Record, error) {
// retrieve initial RelationMessage for table changed.
prevRel := p.relationMessageMapping[currRel.RelationId]
Expand Down Expand Up @@ -839,7 +841,7 @@ func (p *PostgresCDCSource) processRelationMessage(
TableSchemaDelta: schemaDelta,
CheckPointID: int64(lsn),
}
return rec, p.genSchemaDeltaAuditLog(flowJobName, rec)
return rec, p.auditSchemaDelta(p.flowJobName, rec)
}

func (p *PostgresCDCSource) recToTablePKey(req *model.PullRecordsRequest,
Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,8 @@ func (c *PostgresConnector) PullRecords(catalogPool *pgxpool.Pool, req *model.Pu
Publication: publicationName,
TableNameMapping: req.TableNameMapping,
RelationMessageMapping: req.RelationMessageMapping,
CatalogConnection: catalogPool,
CatalogPool: catalogPool,
FlowJobName: req.FlowJobName,
}, c.customTypesMapping)
if err != nil {
return fmt.Errorf("failed to create cdc source: %w", err)
Expand Down

0 comments on commit 76a6002

Please sign in to comment.