From 192db3332204401783fbb8dadfbcc0740a10fe83 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 22 Feb 2024 14:20:25 +0000 Subject: [PATCH] log how long snapshot export maintained, & when snapshot flow fails (#1352) Mix up had #1347 merge before logging feedback implemented --- flow/activities/snapshot_activity.go | 6 +++++- flow/workflows/cdc_flow.go | 1 + 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/flow/activities/snapshot_activity.go b/flow/activities/snapshot_activity.go index 4383388d50..0e75ea2e06 100644 --- a/flow/activities/snapshot_activity.go +++ b/flow/activities/snapshot_activity.go @@ -124,8 +124,12 @@ func (a *SnapshotActivity) MaintainTx(ctx context.Context, sessionID string, pee a.SnapshotConnections[sessionID] = sss a.SnapshotConnectionsMutex.Unlock() + logger := activity.GetLogger(ctx) + start := time.Now() for { - activity.RecordHeartbeat(ctx, "maintaining export snapshot transaction") + msg := fmt.Sprintf("maintaining export snapshot transaction %s", time.Since(start).Round(time.Second)) + logger.Info(msg) + activity.RecordHeartbeat(ctx, msg) if ctx.Err() != nil { a.SnapshotConnectionsMutex.Lock() delete(a.SnapshotConnections, sessionID) diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 38daea5df9..e58351926d 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -307,6 +307,7 @@ func CDCFlowWorkflowWithConfig( snapshotFlowCtx := workflow.WithChildOptions(ctx, childSnapshotFlowOpts) snapshotFlowFuture := workflow.ExecuteChildWorkflow(snapshotFlowCtx, SnapshotFlowWorkflow, cfg) if err := snapshotFlowFuture.Get(snapshotFlowCtx, nil); err != nil { + w.logger.Error("snapshot flow failed", slog.Any("error", err)) return state, fmt.Errorf("failed to execute child workflow: %w", err) }