Skip to content

Commit

Permalink
Fix flow err log and cleanup flowable.go (#887)
Browse files Browse the repository at this point in the history
A few places we were logging peer names instead of flow name for the
`peerdb_stats.flow_errors` table. Also some long lines have been split
in this PR

Co-authored-by: Kaushik Iska <[email protected]>
  • Loading branch information
Amogh-Bharadwaj and iskakaushik authored Dec 25, 2023
1 parent f1038ba commit b86f106
Showing 1 changed file with 11 additions and 6 deletions.
17 changes: 11 additions & 6 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ func (a *FlowableActivity) SetupMetadataTables(ctx context.Context, config *prot
}
defer connectors.CloseConnector(dstConn)

flowName, _ := ctx.Value(shared.FlowNameKey).(string)
if err := dstConn.SetupMetadataTables(); err != nil {
a.Alerter.LogFlowError(ctx, config.Name, err)
a.Alerter.LogFlowError(ctx, flowName, err)
return fmt.Errorf("failed to setup metadata tables: %w", err)
}

Expand Down Expand Up @@ -112,7 +113,7 @@ func (a *FlowableActivity) EnsurePullability(

output, err := srcConn.EnsurePullability(config)
if err != nil {
a.Alerter.LogFlowError(ctx, config.PeerConnectionConfig.Name, err)
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return nil, fmt.Errorf("failed to ensure pullability: %w", err)
}

Expand Down Expand Up @@ -169,7 +170,8 @@ func (a *FlowableActivity) CreateNormalizedTable(

setupNormalizedTablesOutput, err := conn.SetupNormalizedTables(config)
if err != nil {
a.Alerter.LogFlowError(ctx, config.PeerConnectionConfig.Name, err)
flowName, _ := ctx.Value(shared.FlowNameKey).(string)
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, fmt.Errorf("failed to setup normalized tables: %w", err)
}

Expand Down Expand Up @@ -580,7 +582,8 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
slog.Error("failed to pull records", slog.Any("error", err))
goroutineErr = err
} else {
err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx, a.CatalogPool, runUUID, partition, numRecords)
err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx,
a.CatalogPool, runUUID, partition, numRecords)
if err != nil {
slog.Error(fmt.Sprintf("%v", err))
goroutineErr = err
Expand Down Expand Up @@ -935,7 +938,8 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
},
}
}
updateErr := monitoring.InitializeQRepRun(ctx, a.CatalogPool, config, runUUID, []*protos.QRepPartition{partitionForMetrics})
updateErr := monitoring.InitializeQRepRun(
ctx, a.CatalogPool, config, runUUID, []*protos.QRepPartition{partitionForMetrics})
if updateErr != nil {
return updateErr
}
Expand All @@ -945,7 +949,8 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
return fmt.Errorf("failed to update start time for partition: %w", err)
}

err = monitoring.UpdatePullEndTimeAndRowsForPartition(errCtx, a.CatalogPool, runUUID, partition, int64(numRecords))
err = monitoring.UpdatePullEndTimeAndRowsForPartition(
errCtx, a.CatalogPool, runUUID, partition, int64(numRecords))
if err != nil {
slog.Error(fmt.Sprintf("%v", err))
return err
Expand Down

0 comments on commit b86f106

Please sign in to comment.