Skip to content

Commit

Permalink
Don't alert when drop flow is stuck dropping slot because active PID (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Oct 24, 2024
1 parent 44f0031 commit 7d388c9
Showing 1 changed file with 7 additions and 5 deletions.
12 changes: 7 additions & 5 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync/atomic"
"time"

"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
lua "github.com/yuin/gopher-lua"
Expand Down Expand Up @@ -627,10 +628,12 @@ func (a *FlowableActivity) DropFlowSource(ctx context.Context, req *protos.DropF
}
defer connectors.CloseConnector(ctx, srcConn)

err = srcConn.PullFlowCleanup(ctx, req.FlowJobName)
if err != nil {
if err := srcConn.PullFlowCleanup(ctx, req.FlowJobName); err != nil {
pullCleanupErr := fmt.Errorf("[DropFlowSource] failed to clean up source: %w", err)
a.Alerter.LogFlowError(ctx, req.FlowJobName, pullCleanupErr)
if !shared.IsSQLStateError(err, pgerrcode.ObjectInUse) {
// don't alert when PID active
a.Alerter.LogFlowError(ctx, req.FlowJobName, pullCleanupErr)
}
return pullCleanupErr
}

Expand All @@ -647,8 +650,7 @@ func (a *FlowableActivity) DropFlowDestination(ctx context.Context, req *protos.
}
defer connectors.CloseConnector(ctx, dstConn)

err = dstConn.SyncFlowCleanup(ctx, req.FlowJobName)
if err != nil {
if err := dstConn.SyncFlowCleanup(ctx, req.FlowJobName); err != nil {
syncFlowCleanupErr := fmt.Errorf("[DropFlowDestination] failed to clean up destination: %w", err)
a.Alerter.LogFlowError(ctx, req.FlowJobName, syncFlowCleanupErr)
return syncFlowCleanupErr
Expand Down

0 comments on commit 7d388c9

Please sign in to comment.