diff --git a/api-notification.go b/api-notification.go index 85e57805b..347b160ee 100644 --- a/api-notification.go +++ b/api-notification.go @@ -22,6 +22,7 @@ import ( "io" "net/http" "net/url" + "time" ) // GetBucketNotification - get bucket notification at a given path. @@ -143,7 +144,14 @@ func (c Client) ListenBucketNotification(bucketName, prefix, suffix string, even } // Continously run and listen on bucket notification. - for { + // Create a done channel to control 'ListObjects' go routine. + doneCh := make(chan struct{}, 1) + + // Indicate to our routine to exit cleanly upon return. + defer close(doneCh) + + // Wait on the jitter retry loop. + for range c.newRetryTimerContinous(time.Second, time.Second*30, MaxJitter, doneCh) { urlValues := make(url.Values) urlValues.Set("prefix", prefix) urlValues.Set("suffix", suffix) @@ -155,10 +163,7 @@ func (c Client) ListenBucketNotification(bucketName, prefix, suffix string, even queryValues: urlValues, }) if err != nil { - notificationInfoCh <- NotificationInfo{ - Err: err, - } - return + continue } // Validate http response, upon error return quickly. @@ -180,10 +185,7 @@ func (c Client) ListenBucketNotification(bucketName, prefix, suffix string, even for bio.Scan() { var notificationInfo NotificationInfo if err = json.Unmarshal(bio.Bytes(), ¬ificationInfo); err != nil { - notificationInfoCh <- NotificationInfo{ - Err: err, - } - return + continue } // Send notifications on channel only if there are events received. if len(notificationInfo.Records) > 0 { @@ -200,12 +202,7 @@ func (c Client) ListenBucketNotification(bucketName, prefix, suffix string, even // and re-connect. if err == io.ErrUnexpectedEOF { resp.Body.Close() - continue } - notificationInfoCh <- NotificationInfo{ - Err: err, - } - return } } }(notificationInfoCh) diff --git a/retry-continous.go b/retry-continous.go new file mode 100644 index 000000000..e300af69c --- /dev/null +++ b/retry-continous.go @@ -0,0 +1,52 @@ +package minio + +import "time" + +// newRetryTimerContinous creates a timer with exponentially increasing delays forever. +func (c Client) newRetryTimerContinous(unit time.Duration, cap time.Duration, jitter float64, doneCh chan struct{}) <-chan int { + attemptCh := make(chan int) + + // normalize jitter to the range [0, 1.0] + if jitter < NoJitter { + jitter = NoJitter + } + if jitter > MaxJitter { + jitter = MaxJitter + } + + // computes the exponential backoff duration according to + // https://www.awsarchitectureblog.com/2015/03/backoff.html + exponentialBackoffWait := func(attempt int) time.Duration { + // 1< maxAttempt { + attempt = maxAttempt + } + //sleep = random_between(0, min(cap, base * 2 ** attempt)) + sleep := unit * time.Duration(1< cap { + sleep = cap + } + if jitter != NoJitter { + sleep -= time.Duration(c.random.Float64() * float64(sleep) * jitter) + } + return sleep + } + + go func() { + defer close(attemptCh) + var nextBackoff int + for { + select { + // Attempts starts. + case attemptCh <- nextBackoff: + nextBackoff++ + case <-doneCh: + // Stop the routine. + return + } + time.Sleep(exponentialBackoffWait(nextBackoff)) + } + }() + return attemptCh +}