Skip to content

Commit

Permalink
more fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Oct 29, 2023
1 parent f55b3b8 commit b370d9d
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 16 deletions.
25 changes: 17 additions & 8 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
ctx = context.WithValue(ctx, shared.EnableMetricsKey, a.EnableMetrics)
ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogMirrorMonitor)

srcConn, err := connectors.GetCDCPullConnector(ctx, conn.Source)
if err != nil {
return nil, fmt.Errorf("failed to get source connector: %w", err)
}
defer connectors.CloseConnector(srcConn)
dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination)
if err != nil {
return nil, fmt.Errorf("failed to get destination connector: %w", err)
Expand Down Expand Up @@ -200,7 +195,12 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,

startTime := time.Now()

errGroup, ctx := errgroup.WithContext(ctx)
errGroup, errCtx := errgroup.WithContext(ctx)
srcConn, err := connectors.GetCDCPullConnector(errCtx, conn.Source)
if err != nil {
return nil, fmt.Errorf("failed to get source connector: %w", err)
}
defer connectors.CloseConnector(srcConn)

// start a goroutine to pull records from the source
errGroup.Go(func() error {
Expand All @@ -215,11 +215,14 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
OverridePublicationName: input.FlowConnectionConfigs.PublicationName,
OverrideReplicationSlotName: input.FlowConnectionConfigs.ReplicationSlotName,
RelationMessageMapping: input.RelationMessageMapping,
RecordStream: recordBatch,
RecordStream: recordBatch,
})
})

hasRecords := recordBatch.WaitAndCheckEmpty()
hasRecords := !recordBatch.WaitAndCheckEmpty()
log.WithFields(log.Fields{
"flowName": input.FlowConnectionConfigs.FlowJobName,
}).Infof("the current sync flow has records: %v", hasRecords)

if a.CatalogMirrorMonitor.IsActive() && hasRecords {
syncBatchID, err := dstConn.GetLastSyncBatchID(input.FlowConnectionConfigs.FlowJobName)
Expand All @@ -242,6 +245,12 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
}

if !hasRecords {
// wait for the pull goroutine to finish
err = errGroup.Wait()
if err != nil {
return nil, fmt.Errorf("failed to pull records: %w", err)
}

log.WithFields(log.Fields{"flowName": input.FlowConnectionConfigs.FlowJobName}).Info("no records to push")
syncResponse := &model.SyncResponse{}
syncResponse.RelationMessageMapping = <-recordBatch.RelationMessageMapping
Expand Down
3 changes: 1 addition & 2 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ func (p *PostgresCDCSource) consumeStream(
records.SignalAsEmpty()
}
records.RelationMessageMapping <- &p.relationMessageMapping
records.Close()
}()

shutdown := utils.HeartbeatRoutine(p.ctx, 10*time.Second, func() string {
Expand Down Expand Up @@ -246,7 +245,7 @@ func (p *PostgresCDCSource) consumeStream(
cancel()
if err != nil && !p.commitLock {
if pgconn.Timeout(err) {
log.Infof("Idle timeout reached, returning currently accumulated records")
log.Infof("Idle timeout reached, returning currently accumulated records - %d", len(localRecords))
return nil
} else {
return fmt.Errorf("ReceiveMessage failed: %w", err)
Expand Down
4 changes: 4 additions & 0 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ func (c *PostgresConnector) GetLastOffset(jobName string) (*protos.LastSyncState

// PullRecords pulls records from the source.
func (c *PostgresConnector) PullRecords(req *model.PullRecordsRequest) error {
defer func() {
req.RecordStream.Close()
}()

// Slotname would be the job name prefixed with "peerflow_slot_"
slotName := fmt.Sprintf("peerflow_slot_%s", req.FlowJobName)
if req.OverrideReplicationSlotName != "" {
Expand Down
12 changes: 9 additions & 3 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ func (c *SnowflakeConnector) ReplayTableSchemaDeltas(flowJobName string,

func (c *SnowflakeConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) {
rawTableIdentifier := getRawTableIdentifier(req.FlowJobName)
log.Printf("pushing records to Snowflake table %s", rawTableIdentifier)
log.Infof("pushing records to Snowflake table %s", rawTableIdentifier)

syncBatchID, err := c.GetLastSyncBatchID(req.FlowJobName)
if err != nil {
Expand All @@ -488,12 +488,15 @@ func (c *SnowflakeConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.

var res *model.SyncResponse
if req.SyncMode == protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO {
log.Infof("sync mode is for flow %s is AVRO", req.FlowJobName)
res, err = c.syncRecordsViaAvro(req, rawTableIdentifier, syncBatchID)
if err != nil {
return nil, err
}
}

log.Infof("sync mode is for flow %s is MULTI_INSERT", req.FlowJobName)

// transaction for SyncRecords
syncRecordsTx, err := c.database.BeginTx(c.ctx, nil)
if err != nil {
Expand Down Expand Up @@ -633,8 +636,11 @@ func (c *SnowflakeConnector) syncRecordsViaSQL(req *model.SyncRecordsRequest, ra
}, nil
}

func (c *SnowflakeConnector) syncRecordsViaAvro(req *model.SyncRecordsRequest, rawTableIdentifier string,
syncBatchID int64) (*model.SyncResponse, error) {
func (c *SnowflakeConnector) syncRecordsViaAvro(
req *model.SyncRecordsRequest,
rawTableIdentifier string,
syncBatchID int64,
) (*model.SyncResponse, error) {
lastCP := req.Records.LastCheckPointID
tableNameRowsMapping := make(map[string]uint32)
streamReq := model.NewRecordsToStreamRequest(req.Records.GetRecords(), tableNameRowsMapping, 0, syncBatchID)
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/utils/monitoring/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (c *CatalogMirrorMonitor) UpdateLatestLSNAtSourceForCDCFlow(ctx context.Con
"UPDATE peerdb_stats.cdc_flows SET latest_lsn_at_source=$1 WHERE flow_name=$2",
uint64(latestLSNAtSource), flowJobName)
if err != nil {
return fmt.Errorf("error while updating flow in cdc_flows: %w", err)
return fmt.Errorf("[source] error while updating flow in cdc_flows: %w", err)
}
return nil
}
Expand All @@ -82,7 +82,7 @@ func (c *CatalogMirrorMonitor) UpdateLatestLSNAtTargetForCDCFlow(ctx context.Con
"UPDATE peerdb_stats.cdc_flows SET latest_lsn_at_target=$1 WHERE flow_name=$2",
uint64(latestLSNAtTarget), flowJobName)
if err != nil {
return fmt.Errorf("error while updating flow in cdc_flows: %w", err)
return fmt.Errorf("[target] error while updating flow in cdc_flows: %w", err)
}
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,8 @@ func (r *CDCRecordStream) SignalAsNotEmpty() {
}

func (r *CDCRecordStream) WaitAndCheckEmpty() bool {
return <-r.emptySignal
isEmpty := <-r.emptySignal
return isEmpty
}

func (r *CDCRecordStream) WaitForSchemaDeltas() []*protos.TableSchemaDelta {
Expand Down

0 comments on commit b370d9d

Please sign in to comment.