Skip to content

Commit

Permalink
removed activity alerts for now, added slot size alert
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Dec 8, 2023
1 parent 25a7889 commit 83f19b2
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 4 deletions.
5 changes: 2 additions & 3 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,12 @@ func (a *FlowableActivity) EnsurePullability(
) (*protos.EnsurePullabilityBatchOutput, error) {
srcConn, err := connectors.GetCDCPullConnector(ctx, config.PeerConnectionConfig)
if err != nil {
a.vigilForActivityFailures(config.FlowJobName, err)
return nil, fmt.Errorf("failed to get connector: %w", err)
}
defer connectors.CloseConnector(srcConn)

output, err := srcConn.EnsurePullability(config)
if err != nil {
a.vigilForActivityFailures(config.FlowJobName, err)
return nil, fmt.Errorf("failed to ensure pullability: %w", err)
}

Expand Down Expand Up @@ -967,7 +965,8 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
}},
}
}
updateErr := a.CatalogMirrorMonitor.InitializeQRepRun(ctx, config, runUUID, []*protos.QRepPartition{partitionForMetrics})
updateErr := a.CatalogMirrorMonitor.InitializeQRepRun(ctx, config, runUUID,
[]*protos.QRepPartition{partitionForMetrics})
if updateErr != nil {
return updateErr
}
Expand Down
27 changes: 26 additions & 1 deletion flow/utils/evervigil/ever_vigil.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type slackServiceConfig struct {
ChannelIDs []string `json:"channel_ids"`
}

func NewVigil(catalogPool *pgxpool.Pool) (*EverVigil, error) {
func registerServicesForNotifier(catalogPool *pgxpool.Pool) (*notify.Notify, error) {
notifier := notify.New()

rows, err := catalogPool.Query(context.Background(),
Expand All @@ -33,6 +33,7 @@ func NewVigil(catalogPool *pgxpool.Pool) (*EverVigil, error) {
return nil, fmt.Errorf("failed to read everVigil config from catalog: %w", err)
}

registeredAtleastOneService := false
var serviceType, serviceConfig string
_, err = pgx.ForEachRow(rows, []any{&serviceType, &serviceConfig}, func() error {
switch serviceType {
Expand All @@ -46,12 +47,26 @@ func NewVigil(catalogPool *pgxpool.Pool) (*EverVigil, error) {
slackService := slack.New(slackServiceConfig.AuthToken)
slackService.AddReceivers(slackServiceConfig.ChannelIDs...)
notifier.UseServices(slackService)
registeredAtleastOneService = true
default:
return fmt.Errorf("unknown service type: %s", serviceType)
}
return nil
})

// vigil is currently useless, marking it as such
if !registeredAtleastOneService {
notifier.Disabled = true
}
return notifier, nil
}

func NewVigil(catalogPool *pgxpool.Pool) (*EverVigil, error) {
notifier, err := registerServicesForNotifier(catalogPool)
if err != nil {
return nil, err
}

return &EverVigil{
notifier: notifier,
catalogPool: catalogPool,
Expand All @@ -68,6 +83,16 @@ func (ev *EverVigil) Close() {
// in the past 15 minutes
func (ev *EverVigil) AlertIf(alertKey string, alertMessage string) {
if ev.catalogPool != nil && ev.notifier != nil {
// try to make the vigil not useless if possible
if ev.notifier.Disabled {
var err error
ev.notifier, err = registerServicesForNotifier(ev.catalogPool)
if err != nil {
logrus.Warnf("failed to register services for vigil: %v", err)
return
}
}

row := ev.catalogPool.QueryRow(context.Background(),
`SELECT created_timestamp FROM peerdb_stats.alerts_v1 WHERE alert_key=$1
ORDER BY created_timestamp DESC LIMIT 1`,
Expand Down

0 comments on commit 83f19b2

Please sign in to comment.