Skip to content

Commit

Permalink
Use log.With to have flowName always included in workflow logs (#1357)
Browse files Browse the repository at this point in the history
Fixes #1353
  • Loading branch information
serprex authored Feb 22, 2024
1 parent efa1521 commit 7ba5085
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 43 deletions.
2 changes: 1 addition & 1 deletion flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func killExistingScheduleFlows(
err := tc.CancelWorkflow(ctx,
workflow.Execution.WorkflowId, workflow.Execution.RunId)
if err != nil && err.Error() != "workflow execution already completed" {
return fmt.Errorf("unable to terminate workflow: %w", err)
return fmt.Errorf("unable to cancel workflow: %w", err)
}
}
return nil
Expand Down
10 changes: 5 additions & 5 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,10 @@ type CDCFlowWorkflowExecution struct {
}

// NewCDCFlowWorkflowExecution creates a new instance of PeerFlowWorkflowExecution.
func NewCDCFlowWorkflowExecution(ctx workflow.Context) *CDCFlowWorkflowExecution {
func NewCDCFlowWorkflowExecution(ctx workflow.Context, flowName string) *CDCFlowWorkflowExecution {
return &CDCFlowWorkflowExecution{
flowExecutionID: workflow.GetInfo(ctx).WorkflowExecution.ID,
logger: workflow.GetLogger(ctx),
logger: log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), flowName)),
}
}

Expand Down Expand Up @@ -217,7 +217,7 @@ func CDCFlowWorkflowWithConfig(
state = NewCDCFlowWorkflowState(cfg)
}

w := NewCDCFlowWorkflowExecution(ctx)
w := NewCDCFlowWorkflowExecution(ctx, cfg.FlowJobName)

err := workflow.SetQueryHandler(ctx, shared.CDCFlowStateQuery, func() (CDCFlowWorkflowState, error) {
return *state, nil
Expand Down Expand Up @@ -518,7 +518,7 @@ func CDCFlowWorkflowWithConfig(
break
}
currentSyncFlowNum += 1
w.logger.Info("executing sync flow", slog.Int("count", currentSyncFlowNum), slog.String("flowName", cfg.FlowJobName))
w.logger.Info("executing sync flow", slog.Int("count", currentSyncFlowNum))

// execute the sync flow
syncFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
Expand All @@ -527,7 +527,7 @@ func CDCFlowWorkflowWithConfig(
WaitForCancellation: true,
})

w.logger.Info("executing sync flow", slog.String("flowName", cfg.FlowJobName))
w.logger.Info("executing sync flow")
syncFlowFuture := workflow.ExecuteActivity(syncFlowCtx, flowable.SyncFlow, cfg, state.SyncFlowOptions, sessionInfo.SessionID)

var syncDone, syncErr bool
Expand Down
4 changes: 2 additions & 2 deletions flow/workflows/drop_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package peerflow
import (
"errors"
"fmt"
"log/slog"
"time"

"go.temporal.io/sdk/workflow"
Expand All @@ -12,8 +13,7 @@ import (
)

func DropFlowWorkflow(ctx workflow.Context, req *protos.ShutdownRequest) error {
logger := workflow.GetLogger(ctx)
logger.Info("performing cleanup for flow ", req.FlowJobName)
workflow.GetLogger(ctx).Info("performing cleanup for flow", slog.String(string(shared.FlowNameKey), req.FlowJobName))

ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
Expand Down
9 changes: 5 additions & 4 deletions flow/workflows/normalize_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"log/slog"
"time"

"go.temporal.io/sdk/log"
"go.temporal.io/sdk/workflow"

"github.com/PeerDB-io/peer-flow/generated/protos"
Expand All @@ -16,7 +17,7 @@ func NormalizeFlowWorkflow(
ctx workflow.Context,
config *protos.FlowConnectionConfigs,
) (*model.NormalizeFlowResponse, error) {
logger := workflow.GetLogger(ctx)
logger := log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName))

normalizeFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 7 * 24 * time.Hour,
Expand Down Expand Up @@ -54,16 +55,16 @@ func NormalizeFlowWorkflow(
}
if canceled || (stopLoop && lastSyncBatchID == syncBatchID) {
if canceled {
logger.Info("normalize canceled - ", config.FlowJobName)
logger.Info("normalize canceled")
} else {
logger.Info("normalize finished - ", config.FlowJobName)
logger.Info("normalize finished")
}
break
}
if lastSyncBatchID != syncBatchID {
lastSyncBatchID = syncBatchID

logger.Info("executing normalize", slog.String("flowName", config.FlowJobName))
logger.Info("executing normalize")
startNormalizeInput := &protos.StartNormalizeInput{
FlowConnectionConfigs: config,
TableNameSchemaMapping: tableNameSchemaMapping,
Expand Down
28 changes: 14 additions & 14 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func NewQRepFlowExecution(ctx workflow.Context, config *protos.QRepConfig, runUU
return &QRepFlowExecution{
config: config,
flowExecutionID: workflow.GetInfo(ctx).WorkflowExecution.ID,
logger: workflow.GetLogger(ctx),
logger: log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName)),
runUUID: runUUID,
childPartitionWorkflows: nil,
activeSignal: shared.NoopSignal,
Expand All @@ -79,14 +79,14 @@ func NewQRepPartitionFlowExecution(ctx workflow.Context,
return &QRepPartitionFlowExecution{
config: config,
flowExecutionID: workflow.GetInfo(ctx).WorkflowExecution.ID,
logger: workflow.GetLogger(ctx),
logger: log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName)),
runUUID: runUUID,
}
}

// SetupMetadataTables creates the metadata tables for query based replication.
func (q *QRepFlowExecution) SetupMetadataTables(ctx workflow.Context) error {
q.logger.Info("setting up metadata tables for qrep flow - ", q.config.FlowJobName)
q.logger.Info("setting up metadata tables for qrep flow")

ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
Expand All @@ -96,7 +96,7 @@ func (q *QRepFlowExecution) SetupMetadataTables(ctx workflow.Context) error {
return fmt.Errorf("failed to setup metadata tables: %w", err)
}

q.logger.Info("metadata tables setup for qrep flow - ", q.config.FlowJobName)
q.logger.Info("metadata tables setup for qrep flow")
return nil
}

Expand Down Expand Up @@ -125,7 +125,7 @@ func (q *QRepFlowExecution) getTableSchema(ctx workflow.Context, tableName strin

func (q *QRepFlowExecution) SetupWatermarkTableOnDestination(ctx workflow.Context) error {
if q.config.SetupWatermarkTableOnDestination {
q.logger.Info("setting up watermark table on destination for qrep flow: ", q.config.FlowJobName)
q.logger.Info("setting up watermark table on destination for qrep flow")

ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
Expand Down Expand Up @@ -153,7 +153,7 @@ func (q *QRepFlowExecution) SetupWatermarkTableOnDestination(ctx workflow.Contex
q.logger.Error("failed to create watermark table: ", err)
return fmt.Errorf("failed to create watermark table: %w", err)
}
q.logger.Info("finished setting up watermark table for qrep flow: ", q.config.FlowJobName)
q.logger.Info("finished setting up watermark table for qrep flow")
}
return nil
}
Expand All @@ -163,7 +163,7 @@ func (q *QRepFlowExecution) GetPartitions(
ctx workflow.Context,
last *protos.QRepPartition,
) (*protos.QRepParitionResult, error) {
q.logger.Info("fetching partitions to replicate for peer flow - ", q.config.FlowJobName)
q.logger.Info("fetching partitions to replicate for peer flow")

ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Hour,
Expand Down Expand Up @@ -430,7 +430,6 @@ func QRepFlowWorkflow(

originalRunID := workflow.GetInfo(ctx).OriginalRunID
ctx = workflow.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := workflow.GetLogger(ctx)

maxParallelWorkers := 16
if config.MaxParallelWorkers > 0 {
Expand All @@ -452,6 +451,7 @@ func QRepFlowWorkflow(
}

q := NewQRepFlowExecution(ctx, config, originalRunID)
logger := q.logger

err = q.SetupWatermarkTableOnDestination(ctx)
if err != nil {
Expand All @@ -462,7 +462,7 @@ func QRepFlowWorkflow(
if err != nil {
return fmt.Errorf("failed to setup metadata tables: %w", err)
}
q.logger.Info("metadata tables setup for peer flow - ", config.FlowJobName)
logger.Info("metadata tables setup for peer flow - ", config.FlowJobName)

err = q.handleTableCreationForResync(ctx, state)
if err != nil {
Expand All @@ -480,13 +480,13 @@ func QRepFlowWorkflow(
return err
}

logger.Info("consolidating partitions for peer flow - ", slog.String("flowName", config.FlowJobName))
logger.Info("consolidating partitions for peer flow")
if err := q.consolidatePartitions(ctx); err != nil {
return err
}

if config.InitialCopyOnly {
q.logger.Info("initial copy completed for peer flow - ", config.FlowJobName)
logger.Info("initial copy completed for peer flow - ", config.FlowJobName)
return nil
}

Expand All @@ -495,7 +495,7 @@ func QRepFlowWorkflow(
return err
}

q.logger.Info("partitions processed - ", len(partitions.Partitions))
logger.Info("partitions processed - ", len(partitions.Partitions))
state.NumPartitionsProcessed += uint64(len(partitions.Partitions))

if len(partitions.Partitions) > 0 {
Expand All @@ -510,7 +510,7 @@ func QRepFlowWorkflow(
}
}

workflow.GetLogger(ctx).Info("Continuing as new workflow",
logger.Info("Continuing as new workflow",
"Last Partition", state.LastPartition,
"Number of Partitions Processed", state.NumPartitionsProcessed)

Expand All @@ -524,7 +524,7 @@ func QRepFlowWorkflow(
var signalVal shared.CDCFlowSignal

for q.activeSignal == shared.PauseSignal {
q.logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime)))
logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime)))
// only place we block on receive, so signal processing is immediate
ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute, &signalVal)
if ok {
Expand Down
19 changes: 10 additions & 9 deletions flow/workflows/setup_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/PeerDB-io/peer-flow/activities"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
)

// SetupFlow is the workflow that is responsible for ensuring all the
Expand Down Expand Up @@ -43,7 +44,7 @@ func NewSetupFlowExecution(ctx workflow.Context, tableNameMapping map[string]str
tableNameMapping: tableNameMapping,
cdcFlowName: cdcFlowName,
executionID: workflow.GetInfo(ctx).WorkflowExecution.ID,
logger: workflow.GetLogger(ctx),
logger: log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), cdcFlowName)),
}
}

Expand All @@ -53,7 +54,7 @@ func (s *SetupFlowExecution) checkConnectionsAndSetupMetadataTables(
ctx workflow.Context,
config *protos.FlowConnectionConfigs,
) error {
s.logger.Info("checking connections for CDC flow - ", s.cdcFlowName)
s.logger.Info("checking connections for CDC flow")

checkCtx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{
StartToCloseTimeout: time.Minute,
Expand Down Expand Up @@ -81,7 +82,7 @@ func (s *SetupFlowExecution) checkConnectionsAndSetupMetadataTables(
return fmt.Errorf("failed to check destination peer connection: %w", err)
}

s.logger.Info("ensuring metadata table exists - ", s.cdcFlowName)
s.logger.Info("ensuring metadata table exists")

// then setup the destination peer metadata tables
if destConnStatus.NeedsSetupMetadataTables {
Expand All @@ -105,7 +106,7 @@ func (s *SetupFlowExecution) ensurePullability(
config *protos.FlowConnectionConfigs,
checkConstraints bool,
) (map[uint32]string, error) {
s.logger.Info("ensuring pullability for peer flow - ", s.cdcFlowName)
s.logger.Info("ensuring pullability for peer flow")

ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 4 * time.Hour,
Expand Down Expand Up @@ -146,7 +147,7 @@ func (s *SetupFlowExecution) createRawTable(
ctx workflow.Context,
config *protos.FlowConnectionConfigs,
) error {
s.logger.Info("creating raw table on destination - ", s.cdcFlowName)
s.logger.Info("creating raw table on destination")
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
})
Expand All @@ -171,7 +172,7 @@ func (s *SetupFlowExecution) createRawTable(
func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables(
ctx workflow.Context, flowConnectionConfigs *protos.FlowConnectionConfigs,
) (map[string]*protos.TableSchema, error) {
s.logger.Info("fetching table schema for peer flow - ", s.cdcFlowName)
s.logger.Info("fetching table schema for peer flow")

ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 1 * time.Hour,
Expand Down Expand Up @@ -199,7 +200,7 @@ func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables(
sortedSourceTables := maps.Keys(tableNameSchemaMapping)
sort.Strings(sortedSourceTables)

s.logger.Info("setting up normalized tables for peer flow - ", s.cdcFlowName)
s.logger.Info("setting up normalized tables for peer flow")
normalizedTableMapping := make(map[string]*protos.TableSchema)
for _, srcTableName := range sortedSourceTables {
tableSchema := tableNameSchemaMapping[srcTableName]
Expand Down Expand Up @@ -244,7 +245,7 @@ func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables(
return nil, fmt.Errorf("failed to create normalized tables: %w", err)
}

s.logger.Info("finished setting up normalized tables for peer flow - ", s.cdcFlowName)
s.logger.Info("finished setting up normalized tables for peer flow")
return normalizedTableMapping, nil
}

Expand All @@ -253,7 +254,7 @@ func (s *SetupFlowExecution) executeSetupFlow(
ctx workflow.Context,
config *protos.FlowConnectionConfigs,
) (*protos.SetupFlowOutput, error) {
s.logger.Info("executing setup flow", slog.String("flowName", s.cdcFlowName))
s.logger.Info("executing setup flow")

// first check the connectionsAndSetupMetadataTables
if err := s.checkConnectionsAndSetupMetadataTables(ctx, config); err != nil {
Expand Down
4 changes: 1 addition & 3 deletions flow/workflows/snapshot_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,9 @@ func (s *SnapshotFlowExecution) cloneTablesWithSlot(
}

func SnapshotFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionConfigs) error {
logger := workflow.GetLogger(ctx)

se := &SnapshotFlowExecution{
config: config,
logger: logger,
logger: log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName)),
}

numTablesInParallel := int(max(config.SnapshotNumTablesInParallel, 1))
Expand Down
11 changes: 6 additions & 5 deletions flow/workflows/xmin_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func XminFlowWorkflow(
}

q := NewQRepFlowExecution(ctx, config, originalRunID)
logger := q.logger

err = q.SetupWatermarkTableOnDestination(ctx)
if err != nil {
Expand All @@ -36,7 +37,7 @@ func XminFlowWorkflow(
if err != nil {
return fmt.Errorf("failed to setup metadata tables: %w", err)
}
q.logger.Info("metadata tables setup for peer flow - ", config.FlowJobName)
logger.Info("metadata tables setup for peer flow - ", config.FlowJobName)

err = q.handleTableCreationForResync(ctx, state)
if err != nil {
Expand Down Expand Up @@ -64,7 +65,7 @@ func XminFlowWorkflow(
}

if config.InitialCopyOnly {
q.logger.Info("initial copy completed for peer flow - ", config.FlowJobName)
logger.Info("initial copy completed for peer flow - ", config.FlowJobName)
return nil
}

Expand All @@ -78,7 +79,7 @@ func XminFlowWorkflow(
Range: &protos.PartitionRange{Range: &protos.PartitionRange_IntRange{IntRange: &protos.IntPartitionRange{Start: lastPartition}}},
}

workflow.GetLogger(ctx).Info("Continuing as new workflow",
logger.Info("Continuing as new workflow",
"Last Partition", state.LastPartition,
"Number of Partitions Processed", state.NumPartitionsProcessed)

Expand All @@ -92,11 +93,11 @@ func XminFlowWorkflow(
var signalVal shared.CDCFlowSignal

for q.activeSignal == shared.PauseSignal {
q.logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime)))
logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime)))
// only place we block on receive, so signal processing is immediate
ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute, &signalVal)
if ok {
q.activeSignal = shared.FlowSignalHandler(q.activeSignal, signalVal, q.logger)
q.activeSignal = shared.FlowSignalHandler(q.activeSignal, signalVal, logger)
} else if err := ctx.Err(); err != nil {
return err
}
Expand Down

0 comments on commit 7ba5085

Please sign in to comment.