forked from stoewer/go-nakadi
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhelper.go
114 lines (101 loc) · 3.07 KB
/
helper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package nakadi
import (
"encoding/json"
"net"
"net/http"
"time"
"github.com/cenkalti/backoff/v3"
"github.com/pkg/errors"
)
const (
// defaults used by http.DefaultTransport
defaultKeepAlive = 30 * time.Second
defaultIdleConnTimeout = 90 * time.Second
// nakadi specific timeouts
nakadiHeartbeatInterval = 30 * time.Second
)
// newHTTPClient crates an http client which is used for non streaming requests.
func newHTTPClient(timeout time.Duration) *http.Client {
return &http.Client{
Timeout: timeout,
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: timeout,
KeepAlive: defaultKeepAlive,
DualStack: true,
}).DialContext,
MaxIdleConns: 100,
IdleConnTimeout: defaultIdleConnTimeout,
TLSHandshakeTimeout: timeout,
},
}
}
// newHTTPStream creates an http client which is used for streaming purposes.
func newHTTPStream(timeout time.Duration) *http.Client {
return &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: timeout,
KeepAlive: 2 * nakadiHeartbeatInterval,
DualStack: true,
}).DialContext,
MaxIdleConns: 100,
IdleConnTimeout: 2 * nakadiHeartbeatInterval,
TLSHandshakeTimeout: timeout,
},
}
}
// problemJSON is used to decode error responses.
type problemJSON struct {
Title string `json:"title"`
Detail string `json:"detail"`
Status int `json:"status"`
Type string `json:"type"`
}
type errorJSON struct {
Error string `json:"error"`
ErrorDescription string `json:"error_description"`
}
// decodeResponseToError will try do decode into problemJSON then errorJSON
// and extract details from this defined formats.
// It will fallback to creating and error with message body
// Second parameter is an error message
func decodeResponseToError(buffer []byte, msg string) error {
problem := problemJSON{}
err := json.Unmarshal(buffer, &problem)
if err == nil {
return errors.Errorf("%s: %s", msg, problem.Detail)
}
errJSON := &errorJSON{}
err = json.Unmarshal(buffer, &errJSON)
if err == nil {
return errors.Errorf("%s: %s", msg, errJSON.ErrorDescription)
}
return errors.Errorf("%s: %s", msg, string(buffer))
}
// backOffConfiguration holds initial values for the initialization of a backoff that can
// be used in retries.
type backOffConfiguration struct {
// Whether to retry failed request or not.
Retry bool
// The initial (minimal) retry interval used for the exponential backoff.
InitialRetryInterval time.Duration
// MaxRetryInterval the maximum retry interval.
MaxRetryInterval time.Duration
// MaxElapsedTime is the maximum time spent on retries.
MaxElapsedTime time.Duration
}
// create initializes a new backoff from configured parameters.
func (rc *backOffConfiguration) create() backoff.BackOff {
if !rc.Retry {
return &backoff.StopBackOff{}
}
back := backoff.NewExponentialBackOff()
back.InitialInterval = rc.InitialRetryInterval
back.MaxInterval = rc.MaxRetryInterval
back.MaxElapsedTime = rc.MaxElapsedTime
back.Reset()
return back
}