From b370d9d132f6a410010a96c424f88d171d631a48 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Sun, 29 Oct 2023 11:36:22 -0400 Subject: [PATCH] more fixes --- flow/activities/flowable.go | 25 +++++++++++++------ flow/connectors/postgres/cdc.go | 3 +-- flow/connectors/postgres/postgres.go | 4 +++ flow/connectors/snowflake/snowflake.go | 12 ++++++--- .../connectors/utils/monitoring/monitoring.go | 4 +-- flow/model/model.go | 3 ++- 6 files changed, 35 insertions(+), 16 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 98cea2c049..fb18a9e8ab 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -165,11 +165,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, ctx = context.WithValue(ctx, shared.EnableMetricsKey, a.EnableMetrics) ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogMirrorMonitor) - srcConn, err := connectors.GetCDCPullConnector(ctx, conn.Source) - if err != nil { - return nil, fmt.Errorf("failed to get source connector: %w", err) - } - defer connectors.CloseConnector(srcConn) dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination) if err != nil { return nil, fmt.Errorf("failed to get destination connector: %w", err) @@ -200,7 +195,12 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, startTime := time.Now() - errGroup, ctx := errgroup.WithContext(ctx) + errGroup, errCtx := errgroup.WithContext(ctx) + srcConn, err := connectors.GetCDCPullConnector(errCtx, conn.Source) + if err != nil { + return nil, fmt.Errorf("failed to get source connector: %w", err) + } + defer connectors.CloseConnector(srcConn) // start a goroutine to pull records from the source errGroup.Go(func() error { @@ -215,11 +215,14 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, OverridePublicationName: input.FlowConnectionConfigs.PublicationName, OverrideReplicationSlotName: input.FlowConnectionConfigs.ReplicationSlotName, RelationMessageMapping: input.RelationMessageMapping, - RecordStream: recordBatch, + RecordStream: recordBatch, }) }) - hasRecords := recordBatch.WaitAndCheckEmpty() + hasRecords := !recordBatch.WaitAndCheckEmpty() + log.WithFields(log.Fields{ + "flowName": input.FlowConnectionConfigs.FlowJobName, + }).Infof("the current sync flow has records: %v", hasRecords) if a.CatalogMirrorMonitor.IsActive() && hasRecords { syncBatchID, err := dstConn.GetLastSyncBatchID(input.FlowConnectionConfigs.FlowJobName) @@ -242,6 +245,12 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, } if !hasRecords { + // wait for the pull goroutine to finish + err = errGroup.Wait() + if err != nil { + return nil, fmt.Errorf("failed to pull records: %w", err) + } + log.WithFields(log.Fields{"flowName": input.FlowConnectionConfigs.FlowJobName}).Info("no records to push") syncResponse := &model.SyncResponse{} syncResponse.RelationMessageMapping = <-recordBatch.RelationMessageMapping diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 1843d381af..28c940ae33 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -191,7 +191,6 @@ func (p *PostgresCDCSource) consumeStream( records.SignalAsEmpty() } records.RelationMessageMapping <- &p.relationMessageMapping - records.Close() }() shutdown := utils.HeartbeatRoutine(p.ctx, 10*time.Second, func() string { @@ -246,7 +245,7 @@ func (p *PostgresCDCSource) consumeStream( cancel() if err != nil && !p.commitLock { if pgconn.Timeout(err) { - log.Infof("Idle timeout reached, returning currently accumulated records") + log.Infof("Idle timeout reached, returning currently accumulated records - %d", len(localRecords)) return nil } else { return fmt.Errorf("ReceiveMessage failed: %w", err) diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 951841bae4..7b75213f26 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -175,6 +175,10 @@ func (c *PostgresConnector) GetLastOffset(jobName string) (*protos.LastSyncState // PullRecords pulls records from the source. func (c *PostgresConnector) PullRecords(req *model.PullRecordsRequest) error { + defer func() { + req.RecordStream.Close() + }() + // Slotname would be the job name prefixed with "peerflow_slot_" slotName := fmt.Sprintf("peerflow_slot_%s", req.FlowJobName) if req.OverrideReplicationSlotName != "" { diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index de9ac3d972..f9e430dc44 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -478,7 +478,7 @@ func (c *SnowflakeConnector) ReplayTableSchemaDeltas(flowJobName string, func (c *SnowflakeConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) { rawTableIdentifier := getRawTableIdentifier(req.FlowJobName) - log.Printf("pushing records to Snowflake table %s", rawTableIdentifier) + log.Infof("pushing records to Snowflake table %s", rawTableIdentifier) syncBatchID, err := c.GetLastSyncBatchID(req.FlowJobName) if err != nil { @@ -488,12 +488,15 @@ func (c *SnowflakeConnector) SyncRecords(req *model.SyncRecordsRequest) (*model. var res *model.SyncResponse if req.SyncMode == protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO { + log.Infof("sync mode is for flow %s is AVRO", req.FlowJobName) res, err = c.syncRecordsViaAvro(req, rawTableIdentifier, syncBatchID) if err != nil { return nil, err } } + log.Infof("sync mode is for flow %s is MULTI_INSERT", req.FlowJobName) + // transaction for SyncRecords syncRecordsTx, err := c.database.BeginTx(c.ctx, nil) if err != nil { @@ -633,8 +636,11 @@ func (c *SnowflakeConnector) syncRecordsViaSQL(req *model.SyncRecordsRequest, ra }, nil } -func (c *SnowflakeConnector) syncRecordsViaAvro(req *model.SyncRecordsRequest, rawTableIdentifier string, - syncBatchID int64) (*model.SyncResponse, error) { +func (c *SnowflakeConnector) syncRecordsViaAvro( + req *model.SyncRecordsRequest, + rawTableIdentifier string, + syncBatchID int64, +) (*model.SyncResponse, error) { lastCP := req.Records.LastCheckPointID tableNameRowsMapping := make(map[string]uint32) streamReq := model.NewRecordsToStreamRequest(req.Records.GetRecords(), tableNameRowsMapping, 0, syncBatchID) diff --git a/flow/connectors/utils/monitoring/monitoring.go b/flow/connectors/utils/monitoring/monitoring.go index 8c5e9a22e7..f7031815ee 100644 --- a/flow/connectors/utils/monitoring/monitoring.go +++ b/flow/connectors/utils/monitoring/monitoring.go @@ -67,7 +67,7 @@ func (c *CatalogMirrorMonitor) UpdateLatestLSNAtSourceForCDCFlow(ctx context.Con "UPDATE peerdb_stats.cdc_flows SET latest_lsn_at_source=$1 WHERE flow_name=$2", uint64(latestLSNAtSource), flowJobName) if err != nil { - return fmt.Errorf("error while updating flow in cdc_flows: %w", err) + return fmt.Errorf("[source] error while updating flow in cdc_flows: %w", err) } return nil } @@ -82,7 +82,7 @@ func (c *CatalogMirrorMonitor) UpdateLatestLSNAtTargetForCDCFlow(ctx context.Con "UPDATE peerdb_stats.cdc_flows SET latest_lsn_at_target=$1 WHERE flow_name=$2", uint64(latestLSNAtTarget), flowJobName) if err != nil { - return fmt.Errorf("error while updating flow in cdc_flows: %w", err) + return fmt.Errorf("[target] error while updating flow in cdc_flows: %w", err) } return nil } diff --git a/flow/model/model.go b/flow/model/model.go index aa5b1ecb83..a14c6b73a6 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -321,7 +321,8 @@ func (r *CDCRecordStream) SignalAsNotEmpty() { } func (r *CDCRecordStream) WaitAndCheckEmpty() bool { - return <-r.emptySignal + isEmpty := <-r.emptySignal + return isEmpty } func (r *CDCRecordStream) WaitForSchemaDeltas() []*protos.TableSchemaDelta {