Skip to content

Commit

Permalink
Cleanup code for reduced error handling (#1314)
Browse files Browse the repository at this point in the history
1. monitoring should remain connector agnostic, not using pglogrepl in activities/flowable.go
2. GetLastCheckpoint returning error would only happen due to programmer error; panic instead
3. NewPostgresCDCSource becomes infalliable when caller responsible for childToParentRelIDMap

Also have CDCRecordStream ignore redundant calls to Close
  • Loading branch information
serprex authored Feb 16, 2024
1 parent 3f653cd commit a0fe0cb
Show file tree
Hide file tree
Showing 10 changed files with 37 additions and 77 deletions.
11 changes: 3 additions & 8 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"sync/atomic"
"time"

"github.com/jackc/pglogrepl"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
Expand Down Expand Up @@ -352,19 +351,15 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,

logger.Info(fmt.Sprintf("pushed %d records in %d seconds", numRecords, int(syncDuration.Seconds())))

lastCheckpoint, err := recordBatch.GetLastCheckpoint()
if err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, fmt.Errorf("failed to get last checkpoint: %w", err)
}
lastCheckpoint := recordBatch.GetLastCheckpoint()

err = monitoring.UpdateNumRowsAndEndLSNForCDCBatch(
ctx,
a.CatalogPool,
input.FlowConnectionConfigs.FlowJobName,
res.CurrentSyncBatchID,
uint32(numRecords),
pglogrepl.LSN(lastCheckpoint),
lastCheckpoint,
)
if err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
Expand All @@ -375,7 +370,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
ctx,
a.CatalogPool,
input.FlowConnectionConfigs.FlowJobName,
pglogrepl.LSN(lastCheckpoint),
lastCheckpoint,
)
if err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
Expand Down
6 changes: 1 addition & 5 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,6 @@ func (s *QRepAvroSyncMethod) SyncRecords(
insertStmt := fmt.Sprintf("INSERT INTO `%s` SELECT * FROM `%s`;",
rawTableName, stagingTable)

lastCP, err := req.Records.GetLastCheckpoint()
if err != nil {
return nil, fmt.Errorf("failed to get last checkpoint: %w", err)
}

activity.RecordHeartbeat(ctx,
fmt.Sprintf("Flow job %s: performing insert and update transaction"+
" for destination table %s and sync batch ID %d",
Expand All @@ -98,6 +93,7 @@ func (s *QRepAvroSyncMethod) SyncRecords(
return nil, fmt.Errorf("failed to execute statements in a transaction: %w", err)
}

lastCP := req.Records.GetLastCheckpoint()
err = s.connector.pgMetadata.FinishBatch(ctx, req.FlowJobName, syncBatchID, lastCP)
if err != nil {
return nil, fmt.Errorf("failed to update metadata: %w", err)
Expand Down
14 changes: 2 additions & 12 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,8 @@ func (c *ClickhouseConnector) syncRecordsViaAvro(
return nil, fmt.Errorf("failed to sync schema changes: %w", err)
}

lastCheckpoint, err := req.Records.GetLastCheckpoint()
if err != nil {
return nil, err
}

return &model.SyncResponse{
LastSyncedCheckpointID: lastCheckpoint,
LastSyncedCheckpointID: req.Records.GetLastCheckpoint(),
NumRecordsSynced: int64(numRecords),
CurrentSyncBatchID: syncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
Expand All @@ -130,12 +125,7 @@ func (c *ClickhouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe
return nil, err
}

lastCheckpoint, err := req.Records.GetLastCheckpoint()
if err != nil {
return nil, fmt.Errorf("failed to get last checkpoint: %w", err)
}

err = c.pgMetadata.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, lastCheckpoint)
err = c.pgMetadata.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, res.LastSyncedCheckpointID)
if err != nil {
c.logger.Error("failed to increment id", slog.Any("error", err))
return nil, err
Expand Down
7 changes: 1 addition & 6 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,7 @@ func (c *EventHubConnector) SyncRecords(ctx context.Context, req *model.SyncReco
return nil, err
}

lastCheckpoint, err := req.Records.GetLastCheckpoint()
if err != nil {
c.logger.Error("failed to get last checkpoint", slog.Any("error", err))
return nil, err
}

lastCheckpoint := req.Records.GetLastCheckpoint()
err = c.pgMetadata.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, lastCheckpoint)
if err != nil {
c.logger.Error("failed to increment id", slog.Any("error", err))
Expand Down
20 changes: 7 additions & 13 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type PostgresCDCConfig struct {
SrcTableIDNameMapping map[uint32]string
TableNameMapping map[string]model.NameAndExclude
RelationMessageMapping model.RelationMessageMapping
ChildToParentRelIDMap map[uint32]uint32
CatalogPool *pgxpool.Pool
FlowJobName string
}
Expand All @@ -63,12 +64,7 @@ type startReplicationOpts struct {
}

// Create a new PostgresCDCSource
func (c *PostgresConnector) NewPostgresCDCSource(ctx context.Context, cdcConfig *PostgresCDCConfig) (*PostgresCDCSource, error) {
childToParentRelIDMap, err := getChildToParentRelIDMap(ctx, cdcConfig.Connection)
if err != nil {
return nil, fmt.Errorf("error getting child to parent relid map: %w", err)
}

func (c *PostgresConnector) NewPostgresCDCSource(ctx context.Context, cdcConfig *PostgresCDCConfig) *PostgresCDCSource {
return &PostgresCDCSource{
PostgresConnector: c,
replConn: cdcConfig.Connection,
Expand All @@ -78,21 +74,19 @@ func (c *PostgresConnector) NewPostgresCDCSource(ctx context.Context, cdcConfig
publication: cdcConfig.Publication,
relationMessageMapping: cdcConfig.RelationMessageMapping,
typeMap: pgtype.NewMap(),
childToParentRelIDMapping: childToParentRelIDMap,
childToParentRelIDMapping: cdcConfig.ChildToParentRelIDMap,
commitLock: false,
catalogPool: cdcConfig.CatalogPool,
flowJobName: cdcConfig.FlowJobName,
}, nil
}
}

func getChildToParentRelIDMap(ctx context.Context, conn *pgx.Conn) (map[uint32]uint32, error) {
query := `
SELECT
parent.oid AS parentrelid,
child.oid AS childrelid
SELECT parent.oid AS parentrelid, child.oid AS childrelid
FROM pg_inherits
JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
JOIN pg_class child ON pg_inherits.inhrelid = child.oid
JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
JOIN pg_class child ON pg_inherits.inhrelid = child.oid
WHERE parent.relkind='p';
`

Expand Down
19 changes: 9 additions & 10 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,19 +219,22 @@ func (c *PostgresConnector) PullRecords(ctx context.Context, catalogPool *pgxpoo
}
defer replConn.Close(ctx)

cdc, err := c.NewPostgresCDCSource(ctx, &PostgresCDCConfig{
childToParentRelIDMap, err := getChildToParentRelIDMap(ctx, replConn)
if err != nil {
return fmt.Errorf("error getting child to parent relid map: %w", err)
}

cdc := c.NewPostgresCDCSource(ctx, &PostgresCDCConfig{
Connection: replConn,
SrcTableIDNameMapping: req.SrcTableIDNameMapping,
Slot: slotName,
Publication: publicationName,
TableNameMapping: req.TableNameMapping,
RelationMessageMapping: req.RelationMessageMapping,
ChildToParentRelIDMap: childToParentRelIDMap,
CatalogPool: catalogPool,
FlowJobName: req.FlowJobName,
})
if err != nil {
return fmt.Errorf("failed to create cdc source: %w", err)
}

err = cdc.PullRecords(ctx, req)
if err != nil {
Expand All @@ -242,7 +245,7 @@ func (c *PostgresConnector) PullRecords(ctx context.Context, catalogPool *pgxpoo
if err != nil {
return fmt.Errorf("failed to get current LSN: %w", err)
}
err = monitoring.UpdateLatestLSNAtSourceForCDCFlow(ctx, catalogPool, req.FlowJobName, latestLSN)
err = monitoring.UpdateLatestLSNAtSourceForCDCFlow(ctx, catalogPool, req.FlowJobName, int64(latestLSN))
if err != nil {
return fmt.Errorf("failed to update latest LSN at source for CDC flow: %w", err)
}
Expand Down Expand Up @@ -373,12 +376,8 @@ func (c *PostgresConnector) SyncRecords(ctx context.Context, req *model.SyncReco
c.logger.Info(fmt.Sprintf("synced %d records to Postgres table %s via COPY",
syncedRecordsCount, rawTableIdentifier))

lastCP, err := req.Records.GetLastCheckpoint()
if err != nil {
return nil, fmt.Errorf("error getting last checkpoint: %w", err)
}

// updating metadata with new offset and syncBatchID
lastCP := req.Records.GetLastCheckpoint()
err = c.updateSyncMetadata(ctx, req.FlowJobName, lastCP, req.SyncBatchID, syncRecordsTx)
if err != nil {
return nil, err
Expand Down
6 changes: 1 addition & 5 deletions flow/connectors/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,7 @@ func (c *S3Connector) SyncRecords(ctx context.Context, req *model.SyncRecordsReq
}
c.logger.Info(fmt.Sprintf("Synced %d records", numRecords))

lastCheckpoint, err := req.Records.GetLastCheckpoint()
if err != nil {
return nil, fmt.Errorf("failed to get last checkpoint: %w", err)
}

lastCheckpoint := req.Records.GetLastCheckpoint()
err = c.pgMetadata.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, lastCheckpoint)
if err != nil {
c.logger.Error("failed to increment id", "error", err)
Expand Down
7 changes: 1 addition & 6 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,13 +478,8 @@ func (c *SnowflakeConnector) syncRecordsViaAvro(
return nil, fmt.Errorf("failed to sync schema changes: %w", err)
}

lastCheckpoint, err := req.Records.GetLastCheckpoint()
if err != nil {
return nil, err
}

return &model.SyncResponse{
LastSyncedCheckpointID: lastCheckpoint,
LastSyncedCheckpointID: req.Records.GetLastCheckpoint(),
NumRecordsSynced: int64(numRecords),
CurrentSyncBatchID: syncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
Expand Down
9 changes: 4 additions & 5 deletions flow/connectors/utils/monitoring/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"strconv"
"time"

"github.com/jackc/pglogrepl"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
Expand All @@ -21,7 +20,7 @@ import (
type CDCBatchInfo struct {
BatchID int64
RowsInBatch uint32
BatchEndlSN pglogrepl.LSN
BatchEndlSN int64
StartTime time.Time
}

Expand All @@ -36,7 +35,7 @@ func InitializeCDCFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName stri
}

func UpdateLatestLSNAtSourceForCDCFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName string,
latestLSNAtSource pglogrepl.LSN,
latestLSNAtSource int64,
) error {
_, err := pool.Exec(ctx,
"UPDATE peerdb_stats.cdc_flows SET latest_lsn_at_source=$1 WHERE flow_name=$2",
Expand All @@ -48,7 +47,7 @@ func UpdateLatestLSNAtSourceForCDCFlow(ctx context.Context, pool *pgxpool.Pool,
}

func UpdateLatestLSNAtTargetForCDCFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName string,
latestLSNAtTarget pglogrepl.LSN,
latestLSNAtTarget int64,
) error {
_, err := pool.Exec(ctx,
"UPDATE peerdb_stats.cdc_flows SET latest_lsn_at_target=$1 WHERE flow_name=$2",
Expand Down Expand Up @@ -80,7 +79,7 @@ func UpdateNumRowsAndEndLSNForCDCBatch(
flowJobName string,
batchID int64,
numRows uint32,
batchEndLSN pglogrepl.LSN,
batchEndLSN int64,
) error {
_, err := pool.Exec(ctx,
"UPDATE peerdb_stats.cdc_batches SET rows_in_batch=$1,batch_end_lsn=$2 WHERE flow_name=$3 AND batch_id=$4",
Expand Down
15 changes: 8 additions & 7 deletions flow/model/cdc_record_stream.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package model

import (
"errors"
"sync/atomic"

"github.com/PeerDB-io/peer-flow/generated/protos"
Expand Down Expand Up @@ -41,11 +40,11 @@ func (r *CDCRecordStream) UpdateLatestCheckpoint(val int64) {
}
}

func (r *CDCRecordStream) GetLastCheckpoint() (int64, error) {
func (r *CDCRecordStream) GetLastCheckpoint() int64 {
if !r.lastCheckpointSet {
return 0, errors.New("last checkpoint not set, stream is still active")
panic("last checkpoint not set, stream is still active")
}
return r.lastCheckpointID.Load(), nil
return r.lastCheckpointID.Load()
}

func (r *CDCRecordStream) AddRecord(record Record) {
Expand All @@ -66,9 +65,11 @@ func (r *CDCRecordStream) WaitAndCheckEmpty() bool {
}

func (r *CDCRecordStream) Close() {
close(r.emptySignal)
close(r.records)
r.lastCheckpointSet = true
if !r.lastCheckpointSet {
close(r.emptySignal)
close(r.records)
r.lastCheckpointSet = true
}
}

func (r *CDCRecordStream) GetRecords() <-chan Record {
Expand Down

0 comments on commit a0fe0cb

Please sign in to comment.