forked from fmitra/authenticator
-
Notifications
You must be signed in to change notification settings - Fork 1
/
service.go
75 lines (58 loc) · 1.57 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// Package msgrepo provides message storage for consumers and publishers.
package msgrepo
import (
"context"
"fmt"
"math/rand"
"time"
"github.com/go-kit/kit/log"
auth "github.com/fmitra/authenticator"
)
// service is an implementation of auth.MessageRepository
type service struct {
logger log.Logger
messageQueue chan *auth.Message
}
// Publish writes an unsent message to a channel.
func (s *service) Publish(ctx context.Context, msg *auth.Message) error {
isExpired := time.Now().After(msg.ExpiresAt)
if isExpired {
return fmt.Errorf("cannot publish expired message")
}
go func() {
msg.DeliveryAttempts++
if msg.DeliveryAttempts == 1 {
s.messageQueue <- msg
return
}
waitTime := delay(msg.DeliveryAttempts)
time.Sleep(waitTime)
s.messageQueue <- msg
}()
return nil
}
// Recent retrieves recently published unsent messages.
func (s *service) Recent(ctx context.Context) (<-chan *auth.Message, <-chan error) {
errc := make(chan error, 1)
go func() {
defer close(errc)
defer close(s.messageQueue)
<-ctx.Done()
errc <- ctx.Err()
}()
return s.messageQueue, errc
}
// delay calculates the amount of time to wait before
// publishing a message back into the queue
func delay(deliveryAttempts int) time.Duration {
rand.Seed(time.Now().UnixNano())
// Maximum 3 second jitter
jitter := time.Duration(rand.Intn(3000)) * time.Millisecond
minDelay := (time.Duration(deliveryAttempts) * time.Second) * 2
countdown := jitter + minDelay
maxCountdown := 30 * time.Second
if countdown < maxCountdown {
return countdown
}
return maxCountdown
}