Skip to content

Commit

Permalink
logs schema deltas to catalog as soon as they are read
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Dec 18, 2023
1 parent bc2bdee commit c02b731
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 10 deletions.
47 changes: 39 additions & 8 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package connpostgres
import (
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"log/slog"
"time"
Expand All @@ -20,6 +21,7 @@ import (
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/lib/pq/oid"
"go.temporal.io/sdk/activity"
)

type PostgresCDCSource struct {
Expand All @@ -38,6 +40,9 @@ type PostgresCDCSource struct {
// for partitioned tables, maps child relid to parent relid
childToParentRelIDMapping map[uint32]uint32
logger slog.Logger

// for storing chema delta audit logs to catalog
catalogPool *pgxpool.Pool
}

type PostgresCDCConfig struct {
Expand All @@ -48,6 +53,7 @@ type PostgresCDCConfig struct {
SrcTableIDNameMapping map[uint32]string
TableNameMapping map[string]model.NameAndExclude
RelationMessageMapping model.RelationMessageMapping
CatalogConnection *pgxpool.Pool
}

// Create a new PostgresCDCSource
Expand All @@ -71,6 +77,7 @@ func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig, customTypeMap map[uint32
commitLock: false,
customTypeMapping: customTypeMap,
logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)),
catalogPool: cdcConfig.CatalogConnection,
}, nil
}

Expand Down Expand Up @@ -329,8 +336,9 @@ func (p *PostgresCDCSource) consumeStream(
return fmt.Errorf("ParsePrimaryKeepaliveMessage failed: %w", err)
}

p.logger.Debug(fmt.Sprintf("Primary Keepalive Message => ServerWALEnd: %s ServerTime: %s ReplyRequested: %t",
pkm.ServerWALEnd, pkm.ServerTime, pkm.ReplyRequested))
p.logger.Debug(
fmt.Sprintf("Primary Keepalive Message => ServerWALEnd: %s ServerTime: %s ReplyRequested: %t",
pkm.ServerWALEnd, pkm.ServerTime, pkm.ReplyRequested))

if pkm.ServerWALEnd > clientXLogPos {
clientXLogPos = pkm.ServerWALEnd
Expand All @@ -348,7 +356,7 @@ func (p *PostgresCDCSource) consumeStream(

p.logger.Debug(fmt.Sprintf("XLogData => WALStart %s ServerWALEnd %s ServerTime %s\n",
xld.WALStart, xld.ServerWALEnd, xld.ServerTime))
rec, err := p.processMessage(records, xld)
rec, err := p.processMessage(records, xld, req.FlowJobName, clientXLogPos)

if err != nil {
return fmt.Errorf("error processing message: %w", err)
Expand Down Expand Up @@ -465,7 +473,8 @@ func (p *PostgresCDCSource) consumeStream(
}
}

func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pglogrepl.XLogData) (model.Record, error) {
func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pglogrepl.XLogData,
flowJobName string, currentClientXlogPos pglogrepl.LSN) (model.Record, error) {
logicalMsg, err := pglogrepl.Parse(xld.WALData)
if err != nil {
return nil, fmt.Errorf("error parsing logical message: %w", err)
Expand Down Expand Up @@ -504,7 +513,7 @@ func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pgl
if p.relationMessageMapping[msg.RelationID] == nil {
p.relationMessageMapping[msg.RelationID] = convertRelationMessageToProto(msg)
} else {
return p.processRelationMessage(xld.WALStart, convertRelationMessageToProto(msg))
return p.processRelationMessage(currentClientXlogPos, convertRelationMessageToProto(msg), flowJobName)
}

case *pglogrepl.TruncateMessage:
Expand Down Expand Up @@ -747,10 +756,31 @@ func convertRelationMessageToProto(msg *pglogrepl.RelationMessage) *protos.Relat
}
}

// processRelationMessage processes a delete message and returns a TableSchemaDelta
func (p *PostgresCDCSource) genSchemaDeltaAuditLog(flowJobName string, rec *model.RelationRecord) error {
activityInfo := activity.GetInfo(p.ctx)
workflowID := activityInfo.WorkflowExecution.ID
runID := activityInfo.WorkflowExecution.RunID
recJSON, err := json.Marshal(rec)
if err != nil {
return fmt.Errorf("failed to marshal schema delta to JSON: %w", err)
}

_, err = p.catalogPool.Exec(p.ctx,
`INSERT INTO
peerdb_stats.schema_deltas_audit_log(flow_job_name,workflow_id,run_id,delta_info)
VALUES($1,$2,$3,$4) ON CONFLICT DO NOTHING`,
flowJobName, workflowID, runID, recJSON)
if err != nil {
return fmt.Errorf("failed to insert row into table: %w", err)
}
return nil
}

// processRelationMessage processes a RelationMessage and returns a TableSchemaDelta
func (p *PostgresCDCSource) processRelationMessage(
lsn pglogrepl.LSN,
currRel *protos.RelationMessage,
flowJobName string,
) (model.Record, error) {
// retrieve initial RelationMessage for table changed.
prevRel := p.relationMessageMapping[currRel.RelationId]
Expand Down Expand Up @@ -805,10 +835,11 @@ func (p *PostgresCDCSource) processRelationMessage(
}

p.relationMessageMapping[currRel.RelationId] = currRel
return &model.RelationRecord{
rec := &model.RelationRecord{
TableSchemaDelta: schemaDelta,
CheckPointID: int64(lsn),
}, nil
}
return rec, p.genSchemaDeltaAuditLog(flowJobName, rec)
}

func (p *PostgresCDCSource) recToTablePKey(req *model.PullRecordsRequest,
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ func (c *PostgresConnector) GetLastSyncBatchID(jobName string) (int64, error) {

var result pgtype.Int8
if !rows.Next() {
c.logger.Info("No row found ,returning 0")
c.logger.Info("No row found, returning 0")
return 0, nil
}
err = rows.Scan(&result)
Expand Down
4 changes: 3 additions & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ func (c *PostgresConnector) PullRecords(catalogPool *pgxpool.Pool, req *model.Pu
Publication: publicationName,
TableNameMapping: req.TableNameMapping,
RelationMessageMapping: req.RelationMessageMapping,
CatalogConnection: catalogPool,
}, c.customTypesMapping)
if err != nil {
return fmt.Errorf("failed to create cdc source: %w", err)
Expand Down Expand Up @@ -363,7 +364,8 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
len(records), syncedRecordsCount)
}

c.logger.Info(fmt.Sprintf("synced %d records to Postgres table %s via COPY", syncedRecordsCount, rawTableIdentifier))
c.logger.Info(fmt.Sprintf("synced %d records to Postgres table %s via COPY",
syncedRecordsCount, rawTableIdentifier))

lastCP, err := req.Records.GetLastCheckpoint()
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions nexus/catalog/migrations/V15__schema_deltas_audit_log.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
CREATE TABLE IF NOT EXISTS peerdb_stats.schema_deltas_audit_log (
id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
flow_job_name TEXT NOT NULL,
read_timestamp TIMESTAMP DEFAULT now(),
workflow_id TEXT NOT NULL,
run_id TEXT NOT NULL,
delta_info JSONB NOT NULL
);

0 comments on commit c02b731

Please sign in to comment.