Skip to content

Commit

Permalink
fixed review comments pt.1
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Feb 2, 2024
1 parent b5fe61f commit 27a0e36
Show file tree
Hide file tree
Showing 6 changed files with 7 additions and 6 deletions.
2 changes: 1 addition & 1 deletion flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ func CDCFlowWorkflowWithConfig(
state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED

for state.ActiveSignal == shared.PauseSignal {
w.logger.Info("mirror has been paused for ", slog.Any("duration", time.Since(startTime)))
w.logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime)))
// only place we block on receive, so signal processing is immediate
mainLoopSelector.Select(ctx)
if state.ActiveSignal == shared.NoopSignal {
Expand Down
2 changes: 1 addition & 1 deletion flow/workflows/normalize_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func NormalizeFlowWorkflow(ctx workflow.Context,
if lastSyncBatchID != syncBatchID {
lastSyncBatchID = syncBatchID

logger.Info("executing normalize - ", slog.String("flowName", config.FlowJobName))
logger.Info("executing normalize", slog.String("flowName", config.FlowJobName))
startNormalizeInput := &protos.StartNormalizeInput{
FlowConnectionConfigs: config,
TableNameSchemaMapping: tableNameSchemaMapping,
Expand Down
2 changes: 1 addition & 1 deletion flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ func QRepFlowWorkflow(
var signalVal shared.CDCFlowSignal

for q.activeSignal == shared.PauseSignal {
q.logger.Info("mirror has been paused for ", slog.Any("duration", time.Since(startTime)))
q.logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime)))
// only place we block on receive, so signal processing is immediate
ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute, &signalVal)
if ok {
Expand Down
3 changes: 2 additions & 1 deletion flow/workflows/setup_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package peerflow

import (
"fmt"
"log/slog"
"slices"
"sort"
"time"
Expand Down Expand Up @@ -253,7 +254,7 @@ func (s *SetupFlowExecution) executeSetupFlow(
ctx workflow.Context,
config *protos.FlowConnectionConfigs,
) (*protos.SetupFlowOutput, error) {
s.logger.Info("executing setup flow - ", s.cdcFlowName)
s.logger.Info("executing setup flow", slog.String("flowName", s.cdcFlowName))

// first check the connectionsAndSetupMetadataTables
if err := s.checkConnectionsAndSetupMetadataTables(ctx, config); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion flow/workflows/sync_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (s *SyncFlowExecution) executeSyncFlow(
opts *protos.SyncFlowOptions,
relationMessageMapping model.RelationMessageMapping,
) (*model.SyncResponse, error) {
s.logger.Info("executing sync flow - ", slog.String("flowName", s.CDCFlowName))
s.logger.Info("executing sync flow", slog.String("flowName", s.CDCFlowName))

syncMetaCtx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{
StartToCloseTimeout: 1 * time.Minute,
Expand Down
2 changes: 1 addition & 1 deletion flow/workflows/xmin_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func XminFlowWorkflow(
var signalVal shared.CDCFlowSignal

for q.activeSignal == shared.PauseSignal {
q.logger.Info("mirror has been paused for ", slog.Any("duration", time.Since(startTime)))
q.logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime)))
// only place we block on receive, so signal processing is immediate
ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute, &signalVal)
if ok {
Expand Down

0 comments on commit 27a0e36

Please sign in to comment.