Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

logs schema deltas to catalog as soon as they are read #842

Merged
merged 4 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 50 additions & 13 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package connpostgres
import (
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"log/slog"
"time"
Expand All @@ -20,6 +21,7 @@ import (
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/lib/pq/oid"
"go.temporal.io/sdk/activity"
)

type PostgresCDCSource struct {
Expand All @@ -38,6 +40,10 @@ type PostgresCDCSource struct {
// for partitioned tables, maps child relid to parent relid
childToParentRelIDMapping map[uint32]uint32
logger slog.Logger

// for storing chema delta audit logs to catalog
catalogPool *pgxpool.Pool
flowJobName string
}

type PostgresCDCConfig struct {
Expand All @@ -48,6 +54,8 @@ type PostgresCDCConfig struct {
SrcTableIDNameMapping map[uint32]string
TableNameMapping map[string]model.NameAndExclude
RelationMessageMapping model.RelationMessageMapping
CatalogPool *pgxpool.Pool
FlowJobName string
}

// Create a new PostgresCDCSource
Expand All @@ -71,6 +79,8 @@ func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig, customTypeMap map[uint32
commitLock: false,
customTypeMapping: customTypeMap,
logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)),
catalogPool: cdcConfig.CatalogPool,
flowJobName: cdcConfig.FlowJobName,
}, nil
}

Expand Down Expand Up @@ -186,7 +196,7 @@ func (p *PostgresCDCSource) consumeStream(
}

var standByLastLogged time.Time
cdcRecordsStorage := cdc_records.NewCDCRecordsStore(req.FlowJobName)
cdcRecordsStorage := cdc_records.NewCDCRecordsStore(p.flowJobName)
defer func() {
if cdcRecordsStorage.IsEmpty() {
records.SignalAsEmpty()
Expand All @@ -200,7 +210,7 @@ func (p *PostgresCDCSource) consumeStream(
}()

shutdown := utils.HeartbeatRoutine(p.ctx, 10*time.Second, func() string {
jobName := req.FlowJobName
jobName := p.flowJobName
currRecords := cdcRecordsStorage.Len()
return fmt.Sprintf("pulling records for job - %s, currently have %d records", jobName, currRecords)
})
Expand Down Expand Up @@ -264,7 +274,7 @@ func (p *PostgresCDCSource) consumeStream(
if waitingForCommit && !p.commitLock {
p.logger.Info(fmt.Sprintf(
"[%s] commit received, returning currently accumulated records - %d",
req.FlowJobName,
p.flowJobName,
cdcRecordsStorage.Len()),
)
return nil
Expand All @@ -274,7 +284,7 @@ func (p *PostgresCDCSource) consumeStream(
if time.Now().After(nextStandbyMessageDeadline) {
if !cdcRecordsStorage.IsEmpty() {
p.logger.Info(fmt.Sprintf("[%s] standby deadline reached, have %d records, will return at next commit",
req.FlowJobName,
p.flowJobName,
cdcRecordsStorage.Len()),
)

Expand All @@ -286,7 +296,7 @@ func (p *PostgresCDCSource) consumeStream(
waitingForCommit = true
} else {
p.logger.Info(fmt.Sprintf("[%s] standby deadline reached, no records accumulated, continuing to wait",
req.FlowJobName),
p.flowJobName),
)
}
nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout)
Expand Down Expand Up @@ -329,8 +339,9 @@ func (p *PostgresCDCSource) consumeStream(
return fmt.Errorf("ParsePrimaryKeepaliveMessage failed: %w", err)
}

p.logger.Debug(fmt.Sprintf("Primary Keepalive Message => ServerWALEnd: %s ServerTime: %s ReplyRequested: %t",
pkm.ServerWALEnd, pkm.ServerTime, pkm.ReplyRequested))
p.logger.Debug(
fmt.Sprintf("Primary Keepalive Message => ServerWALEnd: %s ServerTime: %s ReplyRequested: %t",
pkm.ServerWALEnd, pkm.ServerTime, pkm.ReplyRequested))

if pkm.ServerWALEnd > clientXLogPos {
clientXLogPos = pkm.ServerWALEnd
Expand All @@ -348,7 +359,8 @@ func (p *PostgresCDCSource) consumeStream(

p.logger.Debug(fmt.Sprintf("XLogData => WALStart %s ServerWALEnd %s ServerTime %s\n",
xld.WALStart, xld.ServerWALEnd, xld.ServerTime))
rec, err := p.processMessage(records, xld)
rec, err := p.processMessage(records, xld, clientXLogPos)

if err != nil {
return fmt.Errorf("error processing message: %w", err)
}
Expand Down Expand Up @@ -464,7 +476,8 @@ func (p *PostgresCDCSource) consumeStream(
}
}

func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pglogrepl.XLogData) (model.Record, error) {
func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pglogrepl.XLogData,
currentClientXlogPos pglogrepl.LSN) (model.Record, error) {
logicalMsg, err := pglogrepl.Parse(xld.WALData)
if err != nil {
return nil, fmt.Errorf("error parsing logical message: %w", err)
Expand Down Expand Up @@ -503,7 +516,10 @@ func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pgl
if p.relationMessageMapping[msg.RelationID] == nil {
p.relationMessageMapping[msg.RelationID] = convertRelationMessageToProto(msg)
} else {
return p.processRelationMessage(xld.WALStart, convertRelationMessageToProto(msg))
// RelationMessages don't contain an LSN, so we use current clientXlogPos instead.
//nolint:lll
// https://github.com/postgres/postgres/blob/8b965c549dc8753be8a38c4a1b9fabdb535a4338/src/backend/replication/logical/proto.c#L670
return p.processRelationMessage(currentClientXlogPos, convertRelationMessageToProto(msg))
serprex marked this conversation as resolved.
Show resolved Hide resolved
}

case *pglogrepl.TruncateMessage:
Expand Down Expand Up @@ -746,7 +762,27 @@ func convertRelationMessageToProto(msg *pglogrepl.RelationMessage) *protos.Relat
}
}

// processRelationMessage processes a delete message and returns a TableSchemaDelta
func (p *PostgresCDCSource) auditSchemaDelta(flowJobName string, rec *model.RelationRecord) error {
activityInfo := activity.GetInfo(p.ctx)
workflowID := activityInfo.WorkflowExecution.ID
runID := activityInfo.WorkflowExecution.RunID
recJSON, err := json.Marshal(rec)
if err != nil {
return fmt.Errorf("failed to marshal schema delta to JSON: %w", err)
}

_, err = p.catalogPool.Exec(p.ctx,
`INSERT INTO
peerdb_stats.schema_deltas_audit_log(flow_job_name,workflow_id,run_id,delta_info)
VALUES($1,$2,$3,$4)`,
flowJobName, workflowID, runID, recJSON)
if err != nil {
return fmt.Errorf("failed to insert row into table: %w", err)
}
return nil
}

// processRelationMessage processes a RelationMessage and returns a TableSchemaDelta
func (p *PostgresCDCSource) processRelationMessage(
lsn pglogrepl.LSN,
currRel *protos.RelationMessage,
Expand Down Expand Up @@ -804,10 +840,11 @@ func (p *PostgresCDCSource) processRelationMessage(
}

p.relationMessageMapping[currRel.RelationId] = currRel
return &model.RelationRecord{
rec := &model.RelationRecord{
TableSchemaDelta: schemaDelta,
CheckPointID: int64(lsn),
}, nil
}
return rec, p.auditSchemaDelta(p.flowJobName, rec)
}

func (p *PostgresCDCSource) recToTablePKey(req *model.PullRecordsRequest,
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func (c *PostgresConnector) GetLastSyncBatchID(jobName string) (int64, error) {

var result pgtype.Int8
if !rows.Next() {
c.logger.Info("No row found ,returning 0")
c.logger.Info("No row found, returning 0")
return 0, nil
}
err = rows.Scan(&result)
Expand Down
5 changes: 4 additions & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ func (c *PostgresConnector) PullRecords(catalogPool *pgxpool.Pool, req *model.Pu
Publication: publicationName,
TableNameMapping: req.TableNameMapping,
RelationMessageMapping: req.RelationMessageMapping,
CatalogPool: catalogPool,
FlowJobName: req.FlowJobName,
}, c.customTypesMapping)
if err != nil {
return fmt.Errorf("failed to create cdc source: %w", err)
Expand Down Expand Up @@ -365,7 +367,8 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
len(records), syncedRecordsCount)
}

c.logger.Info(fmt.Sprintf("synced %d records to Postgres table %s via COPY", syncedRecordsCount, rawTableIdentifier))
c.logger.Info(fmt.Sprintf("synced %d records to Postgres table %s via COPY",
syncedRecordsCount, rawTableIdentifier))

lastCP, err := req.Records.GetLastCheckpoint()
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions nexus/catalog/migrations/V15__schema_deltas_audit_log.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
CREATE TABLE IF NOT EXISTS peerdb_stats.schema_deltas_audit_log (
id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
flow_job_name TEXT NOT NULL,
read_timestamp TIMESTAMP DEFAULT now(),
workflow_id TEXT NOT NULL,
run_id TEXT NOT NULL,
delta_info JSONB NOT NULL
);
Loading