Skip to content

Commit

Permalink
log how long snapshot export maintained, & when snapshot flow fails (#…
Browse files Browse the repository at this point in the history
…1352)

Mix up had #1347 merge before logging feedback implemented
  • Loading branch information
serprex authored Feb 22, 2024
1 parent 3ccb4f6 commit 192db33
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 1 deletion.
6 changes: 5 additions & 1 deletion flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,12 @@ func (a *SnapshotActivity) MaintainTx(ctx context.Context, sessionID string, pee
a.SnapshotConnections[sessionID] = sss
a.SnapshotConnectionsMutex.Unlock()

logger := activity.GetLogger(ctx)
start := time.Now()
for {
activity.RecordHeartbeat(ctx, "maintaining export snapshot transaction")
msg := fmt.Sprintf("maintaining export snapshot transaction %s", time.Since(start).Round(time.Second))
logger.Info(msg)
activity.RecordHeartbeat(ctx, msg)
if ctx.Err() != nil {
a.SnapshotConnectionsMutex.Lock()
delete(a.SnapshotConnections, sessionID)
Expand Down
1 change: 1 addition & 0 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ func CDCFlowWorkflowWithConfig(
snapshotFlowCtx := workflow.WithChildOptions(ctx, childSnapshotFlowOpts)
snapshotFlowFuture := workflow.ExecuteChildWorkflow(snapshotFlowCtx, SnapshotFlowWorkflow, cfg)
if err := snapshotFlowFuture.Get(snapshotFlowCtx, nil); err != nil {
w.logger.Error("snapshot flow failed", slog.Any("error", err))
return state, fmt.Errorf("failed to execute child workflow: %w", err)
}

Expand Down

0 comments on commit 192db33

Please sign in to comment.