From d8a7bd61046b9463846e793dc686c55ec4f2612e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 7 Mar 2024 14:56:43 +0000 Subject: [PATCH] Fix cases of !BADKEY in datadog (#1443) Explicitly passing flow name in no longer necessary after #1357 --- flow/activities/flowable.go | 10 +++++----- flow/activities/snapshot_activity.go | 6 +++--- flow/cmd/mirror_status.go | 3 ++- flow/workflows/cdc_flow.go | 4 ++-- flow/workflows/qrep_flow.go | 6 +++--- flow/workflows/setup_flow.go | 2 +- flow/workflows/snapshot_flow.go | 10 +++++----- flow/workflows/xmin_flow.go | 2 +- 8 files changed, 22 insertions(+), 21 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 3900d07385..5d9c4e690f 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -301,7 +301,7 @@ func (a *FlowableActivity) SyncFlow( } shutdown := utils.HeartbeatRoutine(ctx, func() string { - return "transferring records for job - " + flowName + return "transferring records for job" }) defer shutdown() @@ -474,7 +474,7 @@ func (a *FlowableActivity) StartNormalize( defer connectors.CloseConnector(ctx, dstConn) shutdown := utils.HeartbeatRoutine(ctx, func() string { - return "normalizing records from batch for job - " + input.FlowConnectionConfigs.FlowJobName + return "normalizing records from batch for job" }) defer shutdown() @@ -542,7 +542,7 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context, defer connectors.CloseConnector(ctx, srcConn) shutdown := utils.HeartbeatRoutine(ctx, func() string { - return "getting partitions for job - " + config.FlowJobName + return "getting partitions for job" }) defer shutdown() @@ -725,7 +725,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config defer connectors.CloseConnector(ctx, dstConn) shutdown := utils.HeartbeatRoutine(ctx, func() string { - return "consolidating partitions for job - " + config.FlowJobName + return "consolidating partitions for job" }) defer shutdown() @@ -980,7 +980,7 @@ func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.Rena defer connectors.CloseConnector(ctx, dstConn) shutdown := utils.HeartbeatRoutine(ctx, func() string { - return "renaming tables for job - " + config.FlowJobName + return "renaming tables for job" }) defer shutdown() diff --git a/flow/activities/snapshot_activity.go b/flow/activities/snapshot_activity.go index 262d3d0dbd..84df6ecfab 100644 --- a/flow/activities/snapshot_activity.go +++ b/flow/activities/snapshot_activity.go @@ -32,7 +32,7 @@ func (a *SnapshotActivity) CloseSlotKeepAlive(ctx context.Context, flowJobName s connectors.CloseConnector(ctx, s.connector) delete(a.SnapshotConnections, flowJobName) } - a.Alerter.LogFlowEvent(ctx, flowJobName, "Ended Snapshot Flow Job - "+flowJobName) + a.Alerter.LogFlowEvent(ctx, flowJobName, "Ended Snapshot Flow Job") return nil } @@ -50,7 +50,7 @@ func (a *SnapshotActivity) SetupReplication( return nil, nil } - a.Alerter.LogFlowEvent(ctx, config.FlowJobName, "Started Snapshot Flow Job - "+config.FlowJobName) + a.Alerter.LogFlowEvent(ctx, config.FlowJobName, "Started Snapshot Flow Job") conn, err := connectors.GetCDCPullConnector(ctx, config.PeerConnectionConfig) if err != nil { @@ -84,7 +84,7 @@ func (a *SnapshotActivity) SetupReplication( var slotInfo connpostgres.SlotCreationResult select { case slotInfo = <-slotSignal.SlotCreated: - logger.Info("slot created", slotInfo.SlotName) + logger.Info("slot created", slog.String("SlotName", slotInfo.SlotName)) case err := <-replicationErr: closeConnectionForError(err) return nil, fmt.Errorf("failed to setup replication: %w", err) diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index e8160277f5..f72cd5311e 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -144,7 +144,8 @@ func (h *FlowRequestHandler) cloneTableSummary( rows, err := h.pool.Query(ctx, q, "clone_"+flowJobName+"_%") if err != nil { - slog.Error("unable to query initial load partition - "+flowJobName, slog.Any("error", err)) + slog.Error("unable to query initial load partition", + slog.String(string(shared.FlowNameKey), flowJobName), slog.Any("error", err)) return nil, fmt.Errorf("unable to query initial load partition - %s: %w", flowJobName, err) } diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index bab81b3b23..93e1d4659b 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -236,7 +236,7 @@ func CDCFlowWorkflow( for state.ActiveSignal == model.PauseSignal { // only place we block on receive, so signal processing is immediate for state.ActiveSignal == model.PauseSignal && state.FlowConfigUpdate == nil && ctx.Err() == nil { - logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime))) + logger.Info(fmt.Sprintf("mirror has been paused for %s", time.Since(startTime).Round(time.Second))) selector.Select(ctx) } if err := ctx.Err(); err != nil { @@ -255,7 +255,7 @@ func CDCFlowWorkflow( } } - logger.Info("mirror has been resumed after ", time.Since(startTime)) + logger.Info(fmt.Sprintf("mirror has been resumed after %s", time.Since(startTime).Round(time.Second))) state.CurrentFlowStatus = protos.FlowStatus_STATUS_RUNNING } diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 839eab9ddc..2f0124e717 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -119,7 +119,7 @@ func (q *QRepFlowExecution) SetupWatermarkTableOnDestination(ctx workflow.Contex // fetch the schema for the watermark table watermarkTableSchema, err := q.getTableSchema(ctx, q.config.WatermarkTable) if err != nil { - q.logger.Error("failed to fetch schema for watermark table: ", err) + q.logger.Error("failed to fetch schema for watermark table", slog.Any("error", err)) return fmt.Errorf("failed to fetch schema for watermark table: %w", err) } @@ -161,7 +161,7 @@ func (q *QRepFlowExecution) GetPartitions( return nil, fmt.Errorf("failed to fetch partitions to replicate: %w", err) } - q.logger.Info("partitions to replicate - ", slog.Int("num_partitions", len(partitions.Partitions))) + q.logger.Info("partitions to replicate", slog.Int("num_partitions", len(partitions.Partitions))) return partitions, nil } @@ -439,7 +439,7 @@ func QRepFlowWorkflow( state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED for q.activeSignal == model.PauseSignal { - logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime))) + logger.Info(fmt.Sprintf("mirror has been paused for %s", time.Since(startTime).Round(time.Second))) // only place we block on receive, so signal processing is immediate val, ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute) if ok { diff --git a/flow/workflows/setup_flow.go b/flow/workflows/setup_flow.go index ac3e66f93c..0574f0d24e 100644 --- a/flow/workflows/setup_flow.go +++ b/flow/workflows/setup_flow.go @@ -192,7 +192,7 @@ func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables( var tblSchemaOutput *protos.GetTableSchemaBatchOutput if err := future.Get(ctx, &tblSchemaOutput); err != nil { - s.logger.Error("failed to fetch schema for source tables: ", err) + s.logger.Error("failed to fetch schema for source tables", slog.Any("error", err)) return nil, fmt.Errorf("failed to fetch schema for source table %s: %w", sourceTables, err) } diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index 050bc604e5..ce9ab27d78 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -30,7 +30,7 @@ func (s *SnapshotFlowExecution) setupReplication( ctx workflow.Context, ) (*protos.SetupReplicationOutput, error) { flowName := s.config.FlowJobName - s.logger.Info("setting up replication on source for peer flow - ", flowName) + s.logger.Info("setting up replication on source for peer flow") ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 4 * time.Hour, @@ -59,7 +59,7 @@ func (s *SnapshotFlowExecution) setupReplication( return nil, fmt.Errorf("failed to setup replication on source peer: %w", err) } - s.logger.Info("replication slot live for on source for peer flow - ", flowName) + s.logger.Info("replication slot live for on source for peer flow") return res, nil } @@ -68,7 +68,7 @@ func (s *SnapshotFlowExecution) closeSlotKeepAlive( ctx workflow.Context, ) error { flowName := s.config.FlowJobName - s.logger.Info("closing slot keep alive for peer flow - ", flowName) + s.logger.Info("closing slot keep alive for peer flow") ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 15 * time.Minute, @@ -78,7 +78,7 @@ func (s *SnapshotFlowExecution) closeSlotKeepAlive( return fmt.Errorf("failed to close slot keep alive for peer flow: %w", err) } - s.logger.Info("closed slot keep alive for peer flow - ", flowName) + s.logger.Info("closed slot keep alive for peer flow") return nil } @@ -226,7 +226,7 @@ func (s *SnapshotFlowExecution) cloneTablesWithSlot( return fmt.Errorf("failed to setup replication: %w", err) } - logger.Info("cloning tables in parallel: ", numTablesInParallel) + logger.Info(fmt.Sprintf("cloning %d tables in parallel", numTablesInParallel)) if err := s.cloneTables(ctx, slotInfo, numTablesInParallel); err != nil { return fmt.Errorf("failed to clone tables: %w", err) } diff --git a/flow/workflows/xmin_flow.go b/flow/workflows/xmin_flow.go index 777daba38b..4cd6deece7 100644 --- a/flow/workflows/xmin_flow.go +++ b/flow/workflows/xmin_flow.go @@ -38,7 +38,7 @@ func XminFlowWorkflow( state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED for q.activeSignal == model.PauseSignal { - logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime))) + logger.Info(fmt.Sprintf("mirror has been paused for %s", time.Since(startTime).Round(time.Second))) // only place we block on receive, so signal processing is immediate val, ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute) if ok {