Skip to content

Commit

Permalink
Fix cases of !BADKEY in datadog
Browse files Browse the repository at this point in the history
Explicitly passing flow name in no longer necessary after #1357
  • Loading branch information
serprex committed Mar 6, 2024
1 parent 373bfb2 commit b8ffb7a
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 13 deletions.
10 changes: 5 additions & 5 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func (a *FlowableActivity) SyncFlow(
}

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return "transferring records for job - " + flowName
return "transferring records for job"
})
defer shutdown()

Expand Down Expand Up @@ -474,7 +474,7 @@ func (a *FlowableActivity) StartNormalize(
defer connectors.CloseConnector(ctx, dstConn)

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return "normalizing records from batch for job - " + input.FlowConnectionConfigs.FlowJobName
return "normalizing records from batch for job"
})
defer shutdown()

Expand Down Expand Up @@ -542,7 +542,7 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
defer connectors.CloseConnector(ctx, srcConn)

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return "getting partitions for job - " + config.FlowJobName
return "getting partitions for job"
})
defer shutdown()

Expand Down Expand Up @@ -725,7 +725,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config
defer connectors.CloseConnector(ctx, dstConn)

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return "consolidating partitions for job - " + config.FlowJobName
return "consolidating partitions for job"
})
defer shutdown()

Expand Down Expand Up @@ -980,7 +980,7 @@ func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.Rena
defer connectors.CloseConnector(ctx, dstConn)

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return "renaming tables for job - " + config.FlowJobName
return "renaming tables for job"
})
defer shutdown()

Expand Down
4 changes: 2 additions & 2 deletions flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (a *SnapshotActivity) CloseSlotKeepAlive(ctx context.Context, flowJobName s
connectors.CloseConnector(ctx, s.connector)
delete(a.SnapshotConnections, flowJobName)
}
a.Alerter.LogFlowEvent(ctx, flowJobName, "Ended Snapshot Flow Job - "+flowJobName)
a.Alerter.LogFlowEvent(ctx, flowJobName, "Ended Snapshot Flow Job")

return nil
}
Expand All @@ -50,7 +50,7 @@ func (a *SnapshotActivity) SetupReplication(
return nil, nil
}

a.Alerter.LogFlowEvent(ctx, config.FlowJobName, "Started Snapshot Flow Job - "+config.FlowJobName)
a.Alerter.LogFlowEvent(ctx, config.FlowJobName, "Started Snapshot Flow Job")

conn, err := connectors.GetCDCPullConnector(ctx, config.PeerConnectionConfig)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ func (h *FlowRequestHandler) cloneTableSummary(

rows, err := h.pool.Query(ctx, q, "clone_"+flowJobName+"_%")
if err != nil {
slog.Error("unable to query initial load partition - "+flowJobName, slog.Any("error", err))
slog.Error("unable to query initial load partition",
slog.String(string(shared.FlowNameKey), flowJobName), slog.Any("error", err))
return nil, fmt.Errorf("unable to query initial load partition - %s: %w", flowJobName, err)
}

Expand Down
2 changes: 1 addition & 1 deletion flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (q *QRepFlowExecution) GetPartitions(
return nil, fmt.Errorf("failed to fetch partitions to replicate: %w", err)
}

q.logger.Info("partitions to replicate - ", slog.Int("num_partitions", len(partitions.Partitions)))
q.logger.Info("partitions to replicate", slog.Int("num_partitions", len(partitions.Partitions)))
return partitions, nil
}

Expand Down
8 changes: 4 additions & 4 deletions flow/workflows/snapshot_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (s *SnapshotFlowExecution) setupReplication(
ctx workflow.Context,
) (*protos.SetupReplicationOutput, error) {
flowName := s.config.FlowJobName
s.logger.Info("setting up replication on source for peer flow - ", flowName)
s.logger.Info("setting up replication on source for peer flow")

ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 4 * time.Hour,
Expand Down Expand Up @@ -59,7 +59,7 @@ func (s *SnapshotFlowExecution) setupReplication(
return nil, fmt.Errorf("failed to setup replication on source peer: %w", err)
}

s.logger.Info("replication slot live for on source for peer flow - ", flowName)
s.logger.Info("replication slot live for on source for peer flow")

return res, nil
}
Expand All @@ -68,7 +68,7 @@ func (s *SnapshotFlowExecution) closeSlotKeepAlive(
ctx workflow.Context,
) error {
flowName := s.config.FlowJobName
s.logger.Info("closing slot keep alive for peer flow - ", flowName)
s.logger.Info("closing slot keep alive for peer flow")

ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 15 * time.Minute,
Expand All @@ -78,7 +78,7 @@ func (s *SnapshotFlowExecution) closeSlotKeepAlive(
return fmt.Errorf("failed to close slot keep alive for peer flow: %w", err)
}

s.logger.Info("closed slot keep alive for peer flow - ", flowName)
s.logger.Info("closed slot keep alive for peer flow")

return nil
}
Expand Down

0 comments on commit b8ffb7a

Please sign in to comment.