From 27a0e3617700e5aa7bb5e105130d206b474ea7a1 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Fri, 2 Feb 2024 12:47:27 +0530 Subject: [PATCH] fixed review comments pt.1 --- flow/workflows/cdc_flow.go | 2 +- flow/workflows/normalize_flow.go | 2 +- flow/workflows/qrep_flow.go | 2 +- flow/workflows/setup_flow.go | 3 ++- flow/workflows/sync_flow.go | 2 +- flow/workflows/xmin_flow.go | 2 +- 6 files changed, 7 insertions(+), 6 deletions(-) diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index fd3145299a..21ccd68472 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -477,7 +477,7 @@ func CDCFlowWorkflowWithConfig( state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED for state.ActiveSignal == shared.PauseSignal { - w.logger.Info("mirror has been paused for ", slog.Any("duration", time.Since(startTime))) + w.logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime))) // only place we block on receive, so signal processing is immediate mainLoopSelector.Select(ctx) if state.ActiveSignal == shared.NoopSignal { diff --git a/flow/workflows/normalize_flow.go b/flow/workflows/normalize_flow.go index 33a882d831..8b51faeac3 100644 --- a/flow/workflows/normalize_flow.go +++ b/flow/workflows/normalize_flow.go @@ -66,7 +66,7 @@ func NormalizeFlowWorkflow(ctx workflow.Context, if lastSyncBatchID != syncBatchID { lastSyncBatchID = syncBatchID - logger.Info("executing normalize - ", slog.String("flowName", config.FlowJobName)) + logger.Info("executing normalize", slog.String("flowName", config.FlowJobName)) startNormalizeInput := &protos.StartNormalizeInput{ FlowConnectionConfigs: config, TableNameSchemaMapping: tableNameSchemaMapping, diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 3f7c0a8662..024cdc783a 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -516,7 +516,7 @@ func QRepFlowWorkflow( var signalVal shared.CDCFlowSignal for q.activeSignal == shared.PauseSignal { - q.logger.Info("mirror has been paused for ", slog.Any("duration", time.Since(startTime))) + q.logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime))) // only place we block on receive, so signal processing is immediate ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute, &signalVal) if ok { diff --git a/flow/workflows/setup_flow.go b/flow/workflows/setup_flow.go index 19581061db..118cb6e221 100644 --- a/flow/workflows/setup_flow.go +++ b/flow/workflows/setup_flow.go @@ -2,6 +2,7 @@ package peerflow import ( "fmt" + "log/slog" "slices" "sort" "time" @@ -253,7 +254,7 @@ func (s *SetupFlowExecution) executeSetupFlow( ctx workflow.Context, config *protos.FlowConnectionConfigs, ) (*protos.SetupFlowOutput, error) { - s.logger.Info("executing setup flow - ", s.cdcFlowName) + s.logger.Info("executing setup flow", slog.String("flowName", s.cdcFlowName)) // first check the connectionsAndSetupMetadataTables if err := s.checkConnectionsAndSetupMetadataTables(ctx, config); err != nil { diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index 8a8ca98aec..4f52a0c68e 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -39,7 +39,7 @@ func (s *SyncFlowExecution) executeSyncFlow( opts *protos.SyncFlowOptions, relationMessageMapping model.RelationMessageMapping, ) (*model.SyncResponse, error) { - s.logger.Info("executing sync flow - ", slog.String("flowName", s.CDCFlowName)) + s.logger.Info("executing sync flow", slog.String("flowName", s.CDCFlowName)) syncMetaCtx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{ StartToCloseTimeout: 1 * time.Minute, diff --git a/flow/workflows/xmin_flow.go b/flow/workflows/xmin_flow.go index e67a3e951f..64a71e8104 100644 --- a/flow/workflows/xmin_flow.go +++ b/flow/workflows/xmin_flow.go @@ -101,7 +101,7 @@ func XminFlowWorkflow( var signalVal shared.CDCFlowSignal for q.activeSignal == shared.PauseSignal { - q.logger.Info("mirror has been paused for ", slog.Any("duration", time.Since(startTime))) + q.logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime))) // only place we block on receive, so signal processing is immediate ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute, &signalVal) if ok {