Skip to content

Commit

Permalink
logs schema deltas to catalog as soon as they are read (#842)
Browse files Browse the repository at this point in the history
Currently stores the following info:

```
CREATE TABLE IF NOT EXISTS peerdb_stats.schema_deltas_audit_log (
    id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
    flow_job_name TEXT NOT NULL,
    read_timestamp TIMESTAMP DEFAULT now(),
    workflow_id TEXT NOT NULL,
    run_id TEXT NOT NULL,
    delta_info JSONB NOT NULL
);
```

delta_info is the marshaled version of the `RelationRecord`. Store
happens in the same function where `RelationMessages` are processed.
  • Loading branch information
heavycrystal authored Dec 18, 2023
1 parent 2500b34 commit 6dbdf61
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 15 deletions.
63 changes: 50 additions & 13 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package connpostgres
import (
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"log/slog"
"time"
Expand All @@ -20,6 +21,7 @@ import (
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/lib/pq/oid"
"go.temporal.io/sdk/activity"
)

type PostgresCDCSource struct {
Expand All @@ -38,6 +40,10 @@ type PostgresCDCSource struct {
// for partitioned tables, maps child relid to parent relid
childToParentRelIDMapping map[uint32]uint32
logger slog.Logger

// for storing chema delta audit logs to catalog
catalogPool *pgxpool.Pool
flowJobName string
}

type PostgresCDCConfig struct {
Expand All @@ -48,6 +54,8 @@ type PostgresCDCConfig struct {
SrcTableIDNameMapping map[uint32]string
TableNameMapping map[string]model.NameAndExclude
RelationMessageMapping model.RelationMessageMapping
CatalogPool *pgxpool.Pool
FlowJobName string
}

// Create a new PostgresCDCSource
Expand All @@ -71,6 +79,8 @@ func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig, customTypeMap map[uint32
commitLock: false,
customTypeMapping: customTypeMap,
logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)),
catalogPool: cdcConfig.CatalogPool,
flowJobName: cdcConfig.FlowJobName,
}, nil
}

Expand Down Expand Up @@ -186,7 +196,7 @@ func (p *PostgresCDCSource) consumeStream(
}

var standByLastLogged time.Time
cdcRecordsStorage := cdc_records.NewCDCRecordsStore(req.FlowJobName)
cdcRecordsStorage := cdc_records.NewCDCRecordsStore(p.flowJobName)
defer func() {
if cdcRecordsStorage.IsEmpty() {
records.SignalAsEmpty()
Expand All @@ -200,7 +210,7 @@ func (p *PostgresCDCSource) consumeStream(
}()

shutdown := utils.HeartbeatRoutine(p.ctx, 10*time.Second, func() string {
jobName := req.FlowJobName
jobName := p.flowJobName
currRecords := cdcRecordsStorage.Len()
return fmt.Sprintf("pulling records for job - %s, currently have %d records", jobName, currRecords)
})
Expand Down Expand Up @@ -264,7 +274,7 @@ func (p *PostgresCDCSource) consumeStream(
if waitingForCommit && !p.commitLock {
p.logger.Info(fmt.Sprintf(
"[%s] commit received, returning currently accumulated records - %d",
req.FlowJobName,
p.flowJobName,
cdcRecordsStorage.Len()),
)
return nil
Expand All @@ -274,7 +284,7 @@ func (p *PostgresCDCSource) consumeStream(
if time.Now().After(nextStandbyMessageDeadline) {
if !cdcRecordsStorage.IsEmpty() {
p.logger.Info(fmt.Sprintf("[%s] standby deadline reached, have %d records, will return at next commit",
req.FlowJobName,
p.flowJobName,
cdcRecordsStorage.Len()),
)

Expand All @@ -286,7 +296,7 @@ func (p *PostgresCDCSource) consumeStream(
waitingForCommit = true
} else {
p.logger.Info(fmt.Sprintf("[%s] standby deadline reached, no records accumulated, continuing to wait",
req.FlowJobName),
p.flowJobName),
)
}
nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout)
Expand Down Expand Up @@ -329,8 +339,9 @@ func (p *PostgresCDCSource) consumeStream(
return fmt.Errorf("ParsePrimaryKeepaliveMessage failed: %w", err)
}

p.logger.Debug(fmt.Sprintf("Primary Keepalive Message => ServerWALEnd: %s ServerTime: %s ReplyRequested: %t",
pkm.ServerWALEnd, pkm.ServerTime, pkm.ReplyRequested))
p.logger.Debug(
fmt.Sprintf("Primary Keepalive Message => ServerWALEnd: %s ServerTime: %s ReplyRequested: %t",
pkm.ServerWALEnd, pkm.ServerTime, pkm.ReplyRequested))

if pkm.ServerWALEnd > clientXLogPos {
clientXLogPos = pkm.ServerWALEnd
Expand All @@ -348,7 +359,8 @@ func (p *PostgresCDCSource) consumeStream(

p.logger.Debug(fmt.Sprintf("XLogData => WALStart %s ServerWALEnd %s ServerTime %s\n",
xld.WALStart, xld.ServerWALEnd, xld.ServerTime))
rec, err := p.processMessage(records, xld)
rec, err := p.processMessage(records, xld, clientXLogPos)

if err != nil {
return fmt.Errorf("error processing message: %w", err)
}
Expand Down Expand Up @@ -464,7 +476,8 @@ func (p *PostgresCDCSource) consumeStream(
}
}

func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pglogrepl.XLogData) (model.Record, error) {
func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pglogrepl.XLogData,
currentClientXlogPos pglogrepl.LSN) (model.Record, error) {
logicalMsg, err := pglogrepl.Parse(xld.WALData)
if err != nil {
return nil, fmt.Errorf("error parsing logical message: %w", err)
Expand Down Expand Up @@ -503,7 +516,10 @@ func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pgl
if p.relationMessageMapping[msg.RelationID] == nil {
p.relationMessageMapping[msg.RelationID] = convertRelationMessageToProto(msg)
} else {
return p.processRelationMessage(xld.WALStart, convertRelationMessageToProto(msg))
// RelationMessages don't contain an LSN, so we use current clientXlogPos instead.
//nolint:lll
// https://github.com/postgres/postgres/blob/8b965c549dc8753be8a38c4a1b9fabdb535a4338/src/backend/replication/logical/proto.c#L670
return p.processRelationMessage(currentClientXlogPos, convertRelationMessageToProto(msg))
}

case *pglogrepl.TruncateMessage:
Expand Down Expand Up @@ -746,7 +762,27 @@ func convertRelationMessageToProto(msg *pglogrepl.RelationMessage) *protos.Relat
}
}

// processRelationMessage processes a delete message and returns a TableSchemaDelta
func (p *PostgresCDCSource) auditSchemaDelta(flowJobName string, rec *model.RelationRecord) error {
activityInfo := activity.GetInfo(p.ctx)
workflowID := activityInfo.WorkflowExecution.ID
runID := activityInfo.WorkflowExecution.RunID
recJSON, err := json.Marshal(rec)
if err != nil {
return fmt.Errorf("failed to marshal schema delta to JSON: %w", err)
}

_, err = p.catalogPool.Exec(p.ctx,
`INSERT INTO
peerdb_stats.schema_deltas_audit_log(flow_job_name,workflow_id,run_id,delta_info)
VALUES($1,$2,$3,$4)`,
flowJobName, workflowID, runID, recJSON)
if err != nil {
return fmt.Errorf("failed to insert row into table: %w", err)
}
return nil
}

// processRelationMessage processes a RelationMessage and returns a TableSchemaDelta
func (p *PostgresCDCSource) processRelationMessage(
lsn pglogrepl.LSN,
currRel *protos.RelationMessage,
Expand Down Expand Up @@ -804,10 +840,11 @@ func (p *PostgresCDCSource) processRelationMessage(
}

p.relationMessageMapping[currRel.RelationId] = currRel
return &model.RelationRecord{
rec := &model.RelationRecord{
TableSchemaDelta: schemaDelta,
CheckPointID: int64(lsn),
}, nil
}
return rec, p.auditSchemaDelta(p.flowJobName, rec)
}

func (p *PostgresCDCSource) recToTablePKey(req *model.PullRecordsRequest,
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func (c *PostgresConnector) GetLastSyncBatchID(jobName string) (int64, error) {

var result pgtype.Int8
if !rows.Next() {
c.logger.Info("No row found ,returning 0")
c.logger.Info("No row found, returning 0")
return 0, nil
}
err = rows.Scan(&result)
Expand Down
5 changes: 4 additions & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ func (c *PostgresConnector) PullRecords(catalogPool *pgxpool.Pool, req *model.Pu
Publication: publicationName,
TableNameMapping: req.TableNameMapping,
RelationMessageMapping: req.RelationMessageMapping,
CatalogPool: catalogPool,
FlowJobName: req.FlowJobName,
}, c.customTypesMapping)
if err != nil {
return fmt.Errorf("failed to create cdc source: %w", err)
Expand Down Expand Up @@ -365,7 +367,8 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
len(records), syncedRecordsCount)
}

c.logger.Info(fmt.Sprintf("synced %d records to Postgres table %s via COPY", syncedRecordsCount, rawTableIdentifier))
c.logger.Info(fmt.Sprintf("synced %d records to Postgres table %s via COPY",
syncedRecordsCount, rawTableIdentifier))

lastCP, err := req.Records.GetLastCheckpoint()
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions nexus/catalog/migrations/V15__schema_deltas_audit_log.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
CREATE TABLE IF NOT EXISTS peerdb_stats.schema_deltas_audit_log (
id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
flow_job_name TEXT NOT NULL,
read_timestamp TIMESTAMP DEFAULT now(),
workflow_id TEXT NOT NULL,
run_id TEXT NOT NULL,
delta_info JSONB NOT NULL
);

0 comments on commit 6dbdf61

Please sign in to comment.