Skip to content

Commit

Permalink
Retry instead of panicking immediately (#355)
Browse files Browse the repository at this point in the history
  • Loading branch information
muralov authored Jan 10, 2024
1 parent 1322c17 commit 75e2140
Showing 1 changed file with 19 additions and 5 deletions.
24 changes: 19 additions & 5 deletions testing/eventmeshmock.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import (
"net/url"
"path"
"strings"
"time"

"github.com/go-logr/logr"
"github.com/pkg/errors"
"golang.org/x/oauth2"
kctrllog "sigs.k8s.io/controller-runtime/pkg/log"

Expand Down Expand Up @@ -71,7 +73,7 @@ func NewEventMeshMockResponseOverride() *EventMeshMockResponseOverride {

type (
ResponseUpdateReq func(w http.ResponseWriter, key string, webhookAuth *emstypes.WebhookAuth)
ResponseUpdateStateReq func(w http.ResponseWriter, key string, state emstypes.State)
ResponseUpdateStateReq func(w http.ResponseWriter, key string, state emstypes.State) error
ResponseWithSub func(w http.ResponseWriter, subscription emstypes.Subscription)
ResponseWithName func(w http.ResponseWriter, subscriptionName string)
Response func(w http.ResponseWriter)
Expand Down Expand Up @@ -203,7 +205,17 @@ func (m *EventMeshMock) handleMessaging() func(w http.ResponseWriter, r *http.Re

// extract get request key from /messaging/events/subscriptions/%s/state
key := strings.TrimSuffix(r.URL.Path, "/state")
m.UpdateStateResponse(w, key, state)
for i := 0; i < 3; i++ {
err := m.UpdateStateResponse(w, key, state)
if err == nil {
break
}
if i < 2 { // Don't sleep after the last attempt
time.Sleep(3 * time.Second)
} else {
panic(err)
}
}
case http.MethodGet:
key := r.URL.Path
// check if any response override defined for this subscription
Expand Down Expand Up @@ -259,7 +271,7 @@ func UpdateSubscriptionResponse(m *EventMeshMock) ResponseUpdateReq {

// UpdateSubscriptionStateResponse updates the EventMesh subscription status in the mock.
func UpdateSubscriptionStateResponse(m *EventMeshMock) ResponseUpdateStateReq {
return func(w http.ResponseWriter, key string, state emstypes.State) {
return func(w http.ResponseWriter, key string, state emstypes.State) error {
if subscription := m.Subscriptions.GetSubscription(key); subscription != nil {
switch state.Action {
case emstypes.StateActionPause:
Expand All @@ -272,7 +284,8 @@ func UpdateSubscriptionStateResponse(m *EventMeshMock) ResponseUpdateStateReq {
}
default:
{
panic(fmt.Sprintf("EventMesh subscription status is not supported: %#v", state))
errEventMeshStatusNotSupported := errors.New("EventMesh subscription status is not supported")
return fmt.Errorf("%w: %#v", errEventMeshStatusNotSupported, state)
}
}

Expand All @@ -281,10 +294,11 @@ func UpdateSubscriptionStateResponse(m *EventMeshMock) ResponseUpdateStateReq {

err := json.NewEncoder(w).Encode(*subscription)
Expect(err).ShouldNot(HaveOccurred())
return
return nil
}

w.WriteHeader(http.StatusNotFound)
return nil
}
}

Expand Down

0 comments on commit 75e2140

Please sign in to comment.