Skip to content

Commit

Permalink
only lookup snapshot name when ParentMirrorName set
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Oct 22, 2024
1 parent 8c2c30d commit a850f6c
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 17 deletions.
13 changes: 6 additions & 7 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,13 +489,12 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
})
defer shutdown()

snapshotFlowName := config.ParentMirrorName
if snapshotFlowName == "" {
snapshotFlowName = config.FlowJobName
}
_, snapshotName, _, err := shared.LoadSnapshotNameFromCatalog(ctx, a.CatalogPool, snapshotFlowName)
if err != nil {
return nil, err
snapshotName := ""
if config.ParentMirrorName != "" {
_, snapshotName, _, err = shared.LoadSnapshotNameFromCatalog(ctx, a.CatalogPool, config.ParentMirrorName)
if err != nil {
return nil, err
}
}

partitions, err := srcConn.GetQRepPartitions(ctx, config, last, snapshotName)
Expand Down
23 changes: 13 additions & 10 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,13 +424,12 @@ func replicateQRepPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn
var rowsSynced int
errGroup, errCtx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
snapshotFlowName := config.ParentMirrorName
if snapshotFlowName == "" {
snapshotFlowName = config.FlowJobName
}
_, snapshotName, _, err := shared.LoadSnapshotNameFromCatalog(ctx, a.CatalogPool, snapshotFlowName)
if err != nil {
return err
snapshotName := ""
if config.ParentMirrorName != "" {
_, snapshotName, _, err = shared.LoadSnapshotNameFromCatalog(ctx, a.CatalogPool, config.ParentMirrorName)
if err != nil {
return err
}
}

srcConn, err := connectors.GetByNameAs[TPull](ctx, config.Env, a.CatalogPool, config.SourceName)
Expand Down Expand Up @@ -514,9 +513,13 @@ func replicateXminPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn
var currentSnapshotXmin int64
var rowsSynced int
errGroup.Go(func() error {
_, snapshotName, _, err := shared.LoadSnapshotNameFromCatalog(ctx, a.CatalogPool, config.FlowJobName)
if err != nil {
return err
snapshotName := ""
if config.ParentMirrorName != "" {
var err error
_, snapshotName, _, err = shared.LoadSnapshotNameFromCatalog(ctx, a.CatalogPool, config.ParentMirrorName)
if err != nil {
return err
}
}

srcConn, err := connectors.GetByNameAs[*connpostgres.PostgresConnector](ctx, config.Env, a.CatalogPool, config.SourceName)
Expand Down

0 comments on commit a850f6c

Please sign in to comment.