diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 85d5fd41b8..33c944f993 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -537,11 +537,12 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, "flowName": config.FlowJobName, }).Errorf("failed to pull records: %v", err) goroutineErr = err - } - err = a.CatalogMirrorMonitor.UpdatePullEndTimeAndRowsForPartition(ctx, runUUID, partition, numRecords) - if err != nil { - log.Errorf("%v", err) - goroutineErr = err + } else { + err = a.CatalogMirrorMonitor.UpdatePullEndTimeAndRowsForPartition(ctx, runUUID, partition, numRecords) + if err != nil { + log.Errorf("%v", err) + goroutineErr = err + } } wg.Done() } diff --git a/flow/workflows/xmin_flow.go b/flow/workflows/xmin_flow.go index 3c265fdbdc..db09b7abc9 100644 --- a/flow/workflows/xmin_flow.go +++ b/flow/workflows/xmin_flow.go @@ -239,7 +239,7 @@ func XminFlowWorkflow( } state.LastPartition = &protos.QRepPartition{ - PartitionId: uuid.New().String(), + PartitionId: q.runUUID, Range: &protos.PartitionRange{Range: &protos.PartitionRange_IntRange{IntRange: &protos.IntPartitionRange{Start: lastPartition}}}, }