diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 9591cab251..66a4a2033f 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -72,8 +72,9 @@ func (a *FlowableActivity) SetupMetadataTables(ctx context.Context, config *prot } defer connectors.CloseConnector(dstConn) + flowName, _ := ctx.Value(shared.FlowNameKey).(string) if err := dstConn.SetupMetadataTables(); err != nil { - a.Alerter.LogFlowError(ctx, config.Name, err) + a.Alerter.LogFlowError(ctx, flowName, err) return fmt.Errorf("failed to setup metadata tables: %w", err) } @@ -112,7 +113,7 @@ func (a *FlowableActivity) EnsurePullability( output, err := srcConn.EnsurePullability(config) if err != nil { - a.Alerter.LogFlowError(ctx, config.PeerConnectionConfig.Name, err) + a.Alerter.LogFlowError(ctx, config.FlowJobName, err) return nil, fmt.Errorf("failed to ensure pullability: %w", err) } @@ -169,7 +170,8 @@ func (a *FlowableActivity) CreateNormalizedTable( setupNormalizedTablesOutput, err := conn.SetupNormalizedTables(config) if err != nil { - a.Alerter.LogFlowError(ctx, config.PeerConnectionConfig.Name, err) + flowName, _ := ctx.Value(shared.FlowNameKey).(string) + a.Alerter.LogFlowError(ctx, flowName, err) return nil, fmt.Errorf("failed to setup normalized tables: %w", err) } @@ -580,7 +582,8 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, slog.Error("failed to pull records", slog.Any("error", err)) goroutineErr = err } else { - err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx, a.CatalogPool, runUUID, partition, numRecords) + err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx, + a.CatalogPool, runUUID, partition, numRecords) if err != nil { slog.Error(fmt.Sprintf("%v", err)) goroutineErr = err @@ -935,7 +938,8 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, }, } } - updateErr := monitoring.InitializeQRepRun(ctx, a.CatalogPool, config, runUUID, []*protos.QRepPartition{partitionForMetrics}) + updateErr := monitoring.InitializeQRepRun( + ctx, a.CatalogPool, config, runUUID, []*protos.QRepPartition{partitionForMetrics}) if updateErr != nil { return updateErr } @@ -945,7 +949,8 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, return fmt.Errorf("failed to update start time for partition: %w", err) } - err = monitoring.UpdatePullEndTimeAndRowsForPartition(errCtx, a.CatalogPool, runUUID, partition, int64(numRecords)) + err = monitoring.UpdatePullEndTimeAndRowsForPartition( + errCtx, a.CatalogPool, runUUID, partition, int64(numRecords)) if err != nil { slog.Error(fmt.Sprintf("%v", err)) return err