From 7ba50853d2b49c5f26ca8facf5f3dccd23d1a4cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 22 Feb 2024 15:15:15 +0000 Subject: [PATCH] Use log.With to have flowName always included in workflow logs (#1357) Fixes #1353 --- flow/cmd/api.go | 2 +- flow/workflows/cdc_flow.go | 10 +++++----- flow/workflows/drop_flow.go | 4 ++-- flow/workflows/normalize_flow.go | 9 +++++---- flow/workflows/qrep_flow.go | 28 ++++++++++++++-------------- flow/workflows/setup_flow.go | 19 ++++++++++--------- flow/workflows/snapshot_flow.go | 4 +--- flow/workflows/xmin_flow.go | 11 ++++++----- 8 files changed, 44 insertions(+), 43 deletions(-) diff --git a/flow/cmd/api.go b/flow/cmd/api.go index 656fad33d0..ae64b372be 100644 --- a/flow/cmd/api.go +++ b/flow/cmd/api.go @@ -83,7 +83,7 @@ func killExistingScheduleFlows( err := tc.CancelWorkflow(ctx, workflow.Execution.WorkflowId, workflow.Execution.RunId) if err != nil && err.Error() != "workflow execution already completed" { - return fmt.Errorf("unable to terminate workflow: %w", err) + return fmt.Errorf("unable to cancel workflow: %w", err) } } return nil diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 7d195d56bf..8158e40f35 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -102,10 +102,10 @@ type CDCFlowWorkflowExecution struct { } // NewCDCFlowWorkflowExecution creates a new instance of PeerFlowWorkflowExecution. -func NewCDCFlowWorkflowExecution(ctx workflow.Context) *CDCFlowWorkflowExecution { +func NewCDCFlowWorkflowExecution(ctx workflow.Context, flowName string) *CDCFlowWorkflowExecution { return &CDCFlowWorkflowExecution{ flowExecutionID: workflow.GetInfo(ctx).WorkflowExecution.ID, - logger: workflow.GetLogger(ctx), + logger: log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), flowName)), } } @@ -217,7 +217,7 @@ func CDCFlowWorkflowWithConfig( state = NewCDCFlowWorkflowState(cfg) } - w := NewCDCFlowWorkflowExecution(ctx) + w := NewCDCFlowWorkflowExecution(ctx, cfg.FlowJobName) err := workflow.SetQueryHandler(ctx, shared.CDCFlowStateQuery, func() (CDCFlowWorkflowState, error) { return *state, nil @@ -518,7 +518,7 @@ func CDCFlowWorkflowWithConfig( break } currentSyncFlowNum += 1 - w.logger.Info("executing sync flow", slog.Int("count", currentSyncFlowNum), slog.String("flowName", cfg.FlowJobName)) + w.logger.Info("executing sync flow", slog.Int("count", currentSyncFlowNum)) // execute the sync flow syncFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ @@ -527,7 +527,7 @@ func CDCFlowWorkflowWithConfig( WaitForCancellation: true, }) - w.logger.Info("executing sync flow", slog.String("flowName", cfg.FlowJobName)) + w.logger.Info("executing sync flow") syncFlowFuture := workflow.ExecuteActivity(syncFlowCtx, flowable.SyncFlow, cfg, state.SyncFlowOptions, sessionInfo.SessionID) var syncDone, syncErr bool diff --git a/flow/workflows/drop_flow.go b/flow/workflows/drop_flow.go index 2686cdfaa0..5263d7b97d 100644 --- a/flow/workflows/drop_flow.go +++ b/flow/workflows/drop_flow.go @@ -3,6 +3,7 @@ package peerflow import ( "errors" "fmt" + "log/slog" "time" "go.temporal.io/sdk/workflow" @@ -12,8 +13,7 @@ import ( ) func DropFlowWorkflow(ctx workflow.Context, req *protos.ShutdownRequest) error { - logger := workflow.GetLogger(ctx) - logger.Info("performing cleanup for flow ", req.FlowJobName) + workflow.GetLogger(ctx).Info("performing cleanup for flow", slog.String(string(shared.FlowNameKey), req.FlowJobName)) ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 5 * time.Minute, diff --git a/flow/workflows/normalize_flow.go b/flow/workflows/normalize_flow.go index 16c53ba7a5..84005e2b47 100644 --- a/flow/workflows/normalize_flow.go +++ b/flow/workflows/normalize_flow.go @@ -4,6 +4,7 @@ import ( "log/slog" "time" + "go.temporal.io/sdk/log" "go.temporal.io/sdk/workflow" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -16,7 +17,7 @@ func NormalizeFlowWorkflow( ctx workflow.Context, config *protos.FlowConnectionConfigs, ) (*model.NormalizeFlowResponse, error) { - logger := workflow.GetLogger(ctx) + logger := log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName)) normalizeFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 7 * 24 * time.Hour, @@ -54,16 +55,16 @@ func NormalizeFlowWorkflow( } if canceled || (stopLoop && lastSyncBatchID == syncBatchID) { if canceled { - logger.Info("normalize canceled - ", config.FlowJobName) + logger.Info("normalize canceled") } else { - logger.Info("normalize finished - ", config.FlowJobName) + logger.Info("normalize finished") } break } if lastSyncBatchID != syncBatchID { lastSyncBatchID = syncBatchID - logger.Info("executing normalize", slog.String("flowName", config.FlowJobName)) + logger.Info("executing normalize") startNormalizeInput := &protos.StartNormalizeInput{ FlowConnectionConfigs: config, TableNameSchemaMapping: tableNameSchemaMapping, diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 7656ca47e6..4014efdc72 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -65,7 +65,7 @@ func NewQRepFlowExecution(ctx workflow.Context, config *protos.QRepConfig, runUU return &QRepFlowExecution{ config: config, flowExecutionID: workflow.GetInfo(ctx).WorkflowExecution.ID, - logger: workflow.GetLogger(ctx), + logger: log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName)), runUUID: runUUID, childPartitionWorkflows: nil, activeSignal: shared.NoopSignal, @@ -79,14 +79,14 @@ func NewQRepPartitionFlowExecution(ctx workflow.Context, return &QRepPartitionFlowExecution{ config: config, flowExecutionID: workflow.GetInfo(ctx).WorkflowExecution.ID, - logger: workflow.GetLogger(ctx), + logger: log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName)), runUUID: runUUID, } } // SetupMetadataTables creates the metadata tables for query based replication. func (q *QRepFlowExecution) SetupMetadataTables(ctx workflow.Context) error { - q.logger.Info("setting up metadata tables for qrep flow - ", q.config.FlowJobName) + q.logger.Info("setting up metadata tables for qrep flow") ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 5 * time.Minute, @@ -96,7 +96,7 @@ func (q *QRepFlowExecution) SetupMetadataTables(ctx workflow.Context) error { return fmt.Errorf("failed to setup metadata tables: %w", err) } - q.logger.Info("metadata tables setup for qrep flow - ", q.config.FlowJobName) + q.logger.Info("metadata tables setup for qrep flow") return nil } @@ -125,7 +125,7 @@ func (q *QRepFlowExecution) getTableSchema(ctx workflow.Context, tableName strin func (q *QRepFlowExecution) SetupWatermarkTableOnDestination(ctx workflow.Context) error { if q.config.SetupWatermarkTableOnDestination { - q.logger.Info("setting up watermark table on destination for qrep flow: ", q.config.FlowJobName) + q.logger.Info("setting up watermark table on destination for qrep flow") ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 5 * time.Minute, @@ -153,7 +153,7 @@ func (q *QRepFlowExecution) SetupWatermarkTableOnDestination(ctx workflow.Contex q.logger.Error("failed to create watermark table: ", err) return fmt.Errorf("failed to create watermark table: %w", err) } - q.logger.Info("finished setting up watermark table for qrep flow: ", q.config.FlowJobName) + q.logger.Info("finished setting up watermark table for qrep flow") } return nil } @@ -163,7 +163,7 @@ func (q *QRepFlowExecution) GetPartitions( ctx workflow.Context, last *protos.QRepPartition, ) (*protos.QRepParitionResult, error) { - q.logger.Info("fetching partitions to replicate for peer flow - ", q.config.FlowJobName) + q.logger.Info("fetching partitions to replicate for peer flow") ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 5 * time.Hour, @@ -430,7 +430,6 @@ func QRepFlowWorkflow( originalRunID := workflow.GetInfo(ctx).OriginalRunID ctx = workflow.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) - logger := workflow.GetLogger(ctx) maxParallelWorkers := 16 if config.MaxParallelWorkers > 0 { @@ -452,6 +451,7 @@ func QRepFlowWorkflow( } q := NewQRepFlowExecution(ctx, config, originalRunID) + logger := q.logger err = q.SetupWatermarkTableOnDestination(ctx) if err != nil { @@ -462,7 +462,7 @@ func QRepFlowWorkflow( if err != nil { return fmt.Errorf("failed to setup metadata tables: %w", err) } - q.logger.Info("metadata tables setup for peer flow - ", config.FlowJobName) + logger.Info("metadata tables setup for peer flow - ", config.FlowJobName) err = q.handleTableCreationForResync(ctx, state) if err != nil { @@ -480,13 +480,13 @@ func QRepFlowWorkflow( return err } - logger.Info("consolidating partitions for peer flow - ", slog.String("flowName", config.FlowJobName)) + logger.Info("consolidating partitions for peer flow") if err := q.consolidatePartitions(ctx); err != nil { return err } if config.InitialCopyOnly { - q.logger.Info("initial copy completed for peer flow - ", config.FlowJobName) + logger.Info("initial copy completed for peer flow - ", config.FlowJobName) return nil } @@ -495,7 +495,7 @@ func QRepFlowWorkflow( return err } - q.logger.Info("partitions processed - ", len(partitions.Partitions)) + logger.Info("partitions processed - ", len(partitions.Partitions)) state.NumPartitionsProcessed += uint64(len(partitions.Partitions)) if len(partitions.Partitions) > 0 { @@ -510,7 +510,7 @@ func QRepFlowWorkflow( } } - workflow.GetLogger(ctx).Info("Continuing as new workflow", + logger.Info("Continuing as new workflow", "Last Partition", state.LastPartition, "Number of Partitions Processed", state.NumPartitionsProcessed) @@ -524,7 +524,7 @@ func QRepFlowWorkflow( var signalVal shared.CDCFlowSignal for q.activeSignal == shared.PauseSignal { - q.logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime))) + logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime))) // only place we block on receive, so signal processing is immediate ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute, &signalVal) if ok { diff --git a/flow/workflows/setup_flow.go b/flow/workflows/setup_flow.go index 6ee6529dd6..ccb699d8a4 100644 --- a/flow/workflows/setup_flow.go +++ b/flow/workflows/setup_flow.go @@ -13,6 +13,7 @@ import ( "github.com/PeerDB-io/peer-flow/activities" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/shared" ) // SetupFlow is the workflow that is responsible for ensuring all the @@ -43,7 +44,7 @@ func NewSetupFlowExecution(ctx workflow.Context, tableNameMapping map[string]str tableNameMapping: tableNameMapping, cdcFlowName: cdcFlowName, executionID: workflow.GetInfo(ctx).WorkflowExecution.ID, - logger: workflow.GetLogger(ctx), + logger: log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), cdcFlowName)), } } @@ -53,7 +54,7 @@ func (s *SetupFlowExecution) checkConnectionsAndSetupMetadataTables( ctx workflow.Context, config *protos.FlowConnectionConfigs, ) error { - s.logger.Info("checking connections for CDC flow - ", s.cdcFlowName) + s.logger.Info("checking connections for CDC flow") checkCtx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{ StartToCloseTimeout: time.Minute, @@ -81,7 +82,7 @@ func (s *SetupFlowExecution) checkConnectionsAndSetupMetadataTables( return fmt.Errorf("failed to check destination peer connection: %w", err) } - s.logger.Info("ensuring metadata table exists - ", s.cdcFlowName) + s.logger.Info("ensuring metadata table exists") // then setup the destination peer metadata tables if destConnStatus.NeedsSetupMetadataTables { @@ -105,7 +106,7 @@ func (s *SetupFlowExecution) ensurePullability( config *protos.FlowConnectionConfigs, checkConstraints bool, ) (map[uint32]string, error) { - s.logger.Info("ensuring pullability for peer flow - ", s.cdcFlowName) + s.logger.Info("ensuring pullability for peer flow") ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 4 * time.Hour, @@ -146,7 +147,7 @@ func (s *SetupFlowExecution) createRawTable( ctx workflow.Context, config *protos.FlowConnectionConfigs, ) error { - s.logger.Info("creating raw table on destination - ", s.cdcFlowName) + s.logger.Info("creating raw table on destination") ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 5 * time.Minute, }) @@ -171,7 +172,7 @@ func (s *SetupFlowExecution) createRawTable( func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables( ctx workflow.Context, flowConnectionConfigs *protos.FlowConnectionConfigs, ) (map[string]*protos.TableSchema, error) { - s.logger.Info("fetching table schema for peer flow - ", s.cdcFlowName) + s.logger.Info("fetching table schema for peer flow") ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 1 * time.Hour, @@ -199,7 +200,7 @@ func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables( sortedSourceTables := maps.Keys(tableNameSchemaMapping) sort.Strings(sortedSourceTables) - s.logger.Info("setting up normalized tables for peer flow - ", s.cdcFlowName) + s.logger.Info("setting up normalized tables for peer flow") normalizedTableMapping := make(map[string]*protos.TableSchema) for _, srcTableName := range sortedSourceTables { tableSchema := tableNameSchemaMapping[srcTableName] @@ -244,7 +245,7 @@ func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables( return nil, fmt.Errorf("failed to create normalized tables: %w", err) } - s.logger.Info("finished setting up normalized tables for peer flow - ", s.cdcFlowName) + s.logger.Info("finished setting up normalized tables for peer flow") return normalizedTableMapping, nil } @@ -253,7 +254,7 @@ func (s *SetupFlowExecution) executeSetupFlow( ctx workflow.Context, config *protos.FlowConnectionConfigs, ) (*protos.SetupFlowOutput, error) { - s.logger.Info("executing setup flow", slog.String("flowName", s.cdcFlowName)) + s.logger.Info("executing setup flow") // first check the connectionsAndSetupMetadataTables if err := s.checkConnectionsAndSetupMetadataTables(ctx, config); err != nil { diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index 100ef22b69..8d2ee29d74 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -243,11 +243,9 @@ func (s *SnapshotFlowExecution) cloneTablesWithSlot( } func SnapshotFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionConfigs) error { - logger := workflow.GetLogger(ctx) - se := &SnapshotFlowExecution{ config: config, - logger: logger, + logger: log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName)), } numTablesInParallel := int(max(config.SnapshotNumTablesInParallel, 1)) diff --git a/flow/workflows/xmin_flow.go b/flow/workflows/xmin_flow.go index 52fc9cd47f..878805094d 100644 --- a/flow/workflows/xmin_flow.go +++ b/flow/workflows/xmin_flow.go @@ -26,6 +26,7 @@ func XminFlowWorkflow( } q := NewQRepFlowExecution(ctx, config, originalRunID) + logger := q.logger err = q.SetupWatermarkTableOnDestination(ctx) if err != nil { @@ -36,7 +37,7 @@ func XminFlowWorkflow( if err != nil { return fmt.Errorf("failed to setup metadata tables: %w", err) } - q.logger.Info("metadata tables setup for peer flow - ", config.FlowJobName) + logger.Info("metadata tables setup for peer flow - ", config.FlowJobName) err = q.handleTableCreationForResync(ctx, state) if err != nil { @@ -64,7 +65,7 @@ func XminFlowWorkflow( } if config.InitialCopyOnly { - q.logger.Info("initial copy completed for peer flow - ", config.FlowJobName) + logger.Info("initial copy completed for peer flow - ", config.FlowJobName) return nil } @@ -78,7 +79,7 @@ func XminFlowWorkflow( Range: &protos.PartitionRange{Range: &protos.PartitionRange_IntRange{IntRange: &protos.IntPartitionRange{Start: lastPartition}}}, } - workflow.GetLogger(ctx).Info("Continuing as new workflow", + logger.Info("Continuing as new workflow", "Last Partition", state.LastPartition, "Number of Partitions Processed", state.NumPartitionsProcessed) @@ -92,11 +93,11 @@ func XminFlowWorkflow( var signalVal shared.CDCFlowSignal for q.activeSignal == shared.PauseSignal { - q.logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime))) + logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime))) // only place we block on receive, so signal processing is immediate ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute, &signalVal) if ok { - q.activeSignal = shared.FlowSignalHandler(q.activeSignal, signalVal, q.logger) + q.activeSignal = shared.FlowSignalHandler(q.activeSignal, signalVal, logger) } else if err := ctx.Err(); err != nil { return err }