Skip to content

Commit

Permalink
Fix cases of !BADKEY in datadog (#1443)
Browse files Browse the repository at this point in the history
Explicitly passing flow name in no longer necessary after #1357
  • Loading branch information
serprex authored Mar 7, 2024
1 parent b213528 commit d8a7bd6
Show file tree
Hide file tree
Showing 8 changed files with 22 additions and 21 deletions.
10 changes: 5 additions & 5 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func (a *FlowableActivity) SyncFlow(
}

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return "transferring records for job - " + flowName
return "transferring records for job"
})
defer shutdown()

Expand Down Expand Up @@ -474,7 +474,7 @@ func (a *FlowableActivity) StartNormalize(
defer connectors.CloseConnector(ctx, dstConn)

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return "normalizing records from batch for job - " + input.FlowConnectionConfigs.FlowJobName
return "normalizing records from batch for job"
})
defer shutdown()

Expand Down Expand Up @@ -542,7 +542,7 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
defer connectors.CloseConnector(ctx, srcConn)

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return "getting partitions for job - " + config.FlowJobName
return "getting partitions for job"
})
defer shutdown()

Expand Down Expand Up @@ -725,7 +725,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config
defer connectors.CloseConnector(ctx, dstConn)

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return "consolidating partitions for job - " + config.FlowJobName
return "consolidating partitions for job"
})
defer shutdown()

Expand Down Expand Up @@ -980,7 +980,7 @@ func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.Rena
defer connectors.CloseConnector(ctx, dstConn)

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return "renaming tables for job - " + config.FlowJobName
return "renaming tables for job"
})
defer shutdown()

Expand Down
6 changes: 3 additions & 3 deletions flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (a *SnapshotActivity) CloseSlotKeepAlive(ctx context.Context, flowJobName s
connectors.CloseConnector(ctx, s.connector)
delete(a.SnapshotConnections, flowJobName)
}
a.Alerter.LogFlowEvent(ctx, flowJobName, "Ended Snapshot Flow Job - "+flowJobName)
a.Alerter.LogFlowEvent(ctx, flowJobName, "Ended Snapshot Flow Job")

return nil
}
Expand All @@ -50,7 +50,7 @@ func (a *SnapshotActivity) SetupReplication(
return nil, nil
}

a.Alerter.LogFlowEvent(ctx, config.FlowJobName, "Started Snapshot Flow Job - "+config.FlowJobName)
a.Alerter.LogFlowEvent(ctx, config.FlowJobName, "Started Snapshot Flow Job")

conn, err := connectors.GetCDCPullConnector(ctx, config.PeerConnectionConfig)
if err != nil {
Expand Down Expand Up @@ -84,7 +84,7 @@ func (a *SnapshotActivity) SetupReplication(
var slotInfo connpostgres.SlotCreationResult
select {
case slotInfo = <-slotSignal.SlotCreated:
logger.Info("slot created", slotInfo.SlotName)
logger.Info("slot created", slog.String("SlotName", slotInfo.SlotName))
case err := <-replicationErr:
closeConnectionForError(err)
return nil, fmt.Errorf("failed to setup replication: %w", err)
Expand Down
3 changes: 2 additions & 1 deletion flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ func (h *FlowRequestHandler) cloneTableSummary(

rows, err := h.pool.Query(ctx, q, "clone_"+flowJobName+"_%")
if err != nil {
slog.Error("unable to query initial load partition - "+flowJobName, slog.Any("error", err))
slog.Error("unable to query initial load partition",
slog.String(string(shared.FlowNameKey), flowJobName), slog.Any("error", err))
return nil, fmt.Errorf("unable to query initial load partition - %s: %w", flowJobName, err)
}

Expand Down
4 changes: 2 additions & 2 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func CDCFlowWorkflow(
for state.ActiveSignal == model.PauseSignal {
// only place we block on receive, so signal processing is immediate
for state.ActiveSignal == model.PauseSignal && state.FlowConfigUpdate == nil && ctx.Err() == nil {
logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime)))
logger.Info(fmt.Sprintf("mirror has been paused for %s", time.Since(startTime).Round(time.Second)))
selector.Select(ctx)
}
if err := ctx.Err(); err != nil {
Expand All @@ -255,7 +255,7 @@ func CDCFlowWorkflow(
}
}

logger.Info("mirror has been resumed after ", time.Since(startTime))
logger.Info(fmt.Sprintf("mirror has been resumed after %s", time.Since(startTime).Round(time.Second)))
state.CurrentFlowStatus = protos.FlowStatus_STATUS_RUNNING
}

Expand Down
6 changes: 3 additions & 3 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (q *QRepFlowExecution) SetupWatermarkTableOnDestination(ctx workflow.Contex
// fetch the schema for the watermark table
watermarkTableSchema, err := q.getTableSchema(ctx, q.config.WatermarkTable)
if err != nil {
q.logger.Error("failed to fetch schema for watermark table: ", err)
q.logger.Error("failed to fetch schema for watermark table", slog.Any("error", err))
return fmt.Errorf("failed to fetch schema for watermark table: %w", err)
}

Expand Down Expand Up @@ -161,7 +161,7 @@ func (q *QRepFlowExecution) GetPartitions(
return nil, fmt.Errorf("failed to fetch partitions to replicate: %w", err)
}

q.logger.Info("partitions to replicate - ", slog.Int("num_partitions", len(partitions.Partitions)))
q.logger.Info("partitions to replicate", slog.Int("num_partitions", len(partitions.Partitions)))
return partitions, nil
}

Expand Down Expand Up @@ -439,7 +439,7 @@ func QRepFlowWorkflow(
state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED

for q.activeSignal == model.PauseSignal {
logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime)))
logger.Info(fmt.Sprintf("mirror has been paused for %s", time.Since(startTime).Round(time.Second)))
// only place we block on receive, so signal processing is immediate
val, ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute)
if ok {
Expand Down
2 changes: 1 addition & 1 deletion flow/workflows/setup_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables(

var tblSchemaOutput *protos.GetTableSchemaBatchOutput
if err := future.Get(ctx, &tblSchemaOutput); err != nil {
s.logger.Error("failed to fetch schema for source tables: ", err)
s.logger.Error("failed to fetch schema for source tables", slog.Any("error", err))
return nil, fmt.Errorf("failed to fetch schema for source table %s: %w", sourceTables, err)
}

Expand Down
10 changes: 5 additions & 5 deletions flow/workflows/snapshot_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (s *SnapshotFlowExecution) setupReplication(
ctx workflow.Context,
) (*protos.SetupReplicationOutput, error) {
flowName := s.config.FlowJobName
s.logger.Info("setting up replication on source for peer flow - ", flowName)
s.logger.Info("setting up replication on source for peer flow")

ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 4 * time.Hour,
Expand Down Expand Up @@ -59,7 +59,7 @@ func (s *SnapshotFlowExecution) setupReplication(
return nil, fmt.Errorf("failed to setup replication on source peer: %w", err)
}

s.logger.Info("replication slot live for on source for peer flow - ", flowName)
s.logger.Info("replication slot live for on source for peer flow")

return res, nil
}
Expand All @@ -68,7 +68,7 @@ func (s *SnapshotFlowExecution) closeSlotKeepAlive(
ctx workflow.Context,
) error {
flowName := s.config.FlowJobName
s.logger.Info("closing slot keep alive for peer flow - ", flowName)
s.logger.Info("closing slot keep alive for peer flow")

ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 15 * time.Minute,
Expand All @@ -78,7 +78,7 @@ func (s *SnapshotFlowExecution) closeSlotKeepAlive(
return fmt.Errorf("failed to close slot keep alive for peer flow: %w", err)
}

s.logger.Info("closed slot keep alive for peer flow - ", flowName)
s.logger.Info("closed slot keep alive for peer flow")

return nil
}
Expand Down Expand Up @@ -226,7 +226,7 @@ func (s *SnapshotFlowExecution) cloneTablesWithSlot(
return fmt.Errorf("failed to setup replication: %w", err)
}

logger.Info("cloning tables in parallel: ", numTablesInParallel)
logger.Info(fmt.Sprintf("cloning %d tables in parallel", numTablesInParallel))
if err := s.cloneTables(ctx, slotInfo, numTablesInParallel); err != nil {
return fmt.Errorf("failed to clone tables: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion flow/workflows/xmin_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func XminFlowWorkflow(
state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED

for q.activeSignal == model.PauseSignal {
logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime)))
logger.Info(fmt.Sprintf("mirror has been paused for %s", time.Since(startTime).Round(time.Second)))
// only place we block on receive, so signal processing is immediate
val, ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute)
if ok {
Expand Down

0 comments on commit d8a7bd6

Please sign in to comment.