forked from stoewer/go-nakadi
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathevents.go
180 lines (156 loc) · 6.05 KB
/
events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
package nakadi
import (
"fmt"
"io/ioutil"
"net/http"
"time"
"github.com/pkg/errors"
)
// An EventType defines a kind of event that can be processed on a Nakadi service.
type EventType struct {
Name string `json:"name"`
OwningApplication string `json:"owning_application"`
Category string `json:"category"`
EnrichmentStrategies []string `json:"enrichment_strategies,omitempty"`
PartitionStrategy string `json:"partition_strategy,omitempty"`
CompatibilityMode string `json:"compatibility_mode,omitempty"`
Schema *EventTypeSchema `json:"schema"`
PartitionKeyFields []string `json:"partition_key_fields"`
DefaultStatistics *EventTypeStatistics `json:"default_statistics,omitempty"`
Options *EventTypeOptions `json:"options,omitempty"`
CreatedAt time.Time `json:"created_at,omitempty"`
UpdatedAt time.Time `json:"updated_at,omitempty"`
}
// EventTypeSchema is a non optional description of the schema on an event type.
type EventTypeSchema struct {
Version string `json:"version,omitempty"`
Type string `json:"type"`
Schema string `json:"schema"`
CreatedAt time.Time `json:"created_at,omitempty"`
}
// EventTypeStatistics describe operational statistics for an event type. This statistics are
// used by Nakadi to optimize the throughput events from a certain kind. They are provided on
// event type creation.
type EventTypeStatistics struct {
MessagesPerMinute int `json:"messages_per_minute"`
MessageSize int `json:"message_size"`
ReadParallelism int `json:"read_parallelism"`
WriteParallelism int `json:"write_parallelism"`
}
// EventTypeOptions provide additional parameters for tuning Nakadi.
type EventTypeOptions struct {
RetentionTime int64 `json:"retention_time"`
}
// EventOptions is a set of optional parameters used to configure the EventAPI.
type EventOptions struct {
// Whether or not methods of the EventAPI retry when a request fails. If
// set to true InitialRetryInterval, MaxRetryInterval, and MaxElapsedTime have
// no effect (default: false).
Retry bool
// The initial (minimal) retry interval used for the exponential backoff algorithm
// when retry is enables.
InitialRetryInterval time.Duration
// MaxRetryInterval the maximum retry interval. Once the exponential backoff reaches
// this value the retry intervals remain constant.
MaxRetryInterval time.Duration
// MaxElapsedTime is the maximum time spent on retries when when performing a request.
// Once this value was reached the exponential backoff is halted and the request will
// fail with an error.
MaxElapsedTime time.Duration
}
func (o *EventOptions) withDefaults() *EventOptions {
var copyOptions EventOptions
if o != nil {
copyOptions = *o
}
if copyOptions.InitialRetryInterval == 0 {
copyOptions.InitialRetryInterval = defaultInitialRetryInterval
}
if copyOptions.MaxRetryInterval == 0 {
copyOptions.MaxRetryInterval = defaultMaxRetryInterval
}
if copyOptions.MaxElapsedTime == 0 {
copyOptions.MaxElapsedTime = defaultMaxElapsedTime
}
return ©Options
}
// NewEventAPI creates a new instance of a EventAPI implementation which can be used to
// manage event types on a specific Nakadi service. The last parameter is a struct containing only
// optional parameters. The options may be nil.
func NewEventAPI(client *Client, options *EventOptions) *EventAPI {
options = options.withDefaults()
return &EventAPI{
client: client,
backOffConf: backOffConfiguration{
Retry: options.Retry,
InitialRetryInterval: options.InitialRetryInterval,
MaxRetryInterval: options.MaxRetryInterval,
MaxElapsedTime: options.MaxElapsedTime}}
}
// EventAPI is a sub API that allows to inspect and manage event types on a Nakadi instance.
type EventAPI struct {
client *Client
backOffConf backOffConfiguration
}
// List returns all registered event types.
func (e *EventAPI) List() ([]*EventType, error) {
eventTypes := []*EventType{}
err := e.client.httpGET(e.backOffConf.create(), e.eventBaseURL(), &eventTypes, "unable to request event types")
if err != nil {
return nil, err
}
return eventTypes, nil
}
// Get returns an event type based on its name.
func (e *EventAPI) Get(name string) (*EventType, error) {
eventType := &EventType{}
err := e.client.httpGET(e.backOffConf.create(), e.eventURL(name), eventType, "unable to request event types")
if err != nil {
return nil, err
}
return eventType, nil
}
// Create saves a new event type.
func (e *EventAPI) Create(eventType *EventType) error {
const errMsg = "unable to create event type"
response, err := e.client.httpPOST(e.backOffConf.create(), e.eventBaseURL(), eventType, errMsg)
if err != nil {
return err
}
defer response.Body.Close()
if response.StatusCode != http.StatusCreated {
buffer, err := ioutil.ReadAll(response.Body)
if err != nil {
return errors.Wrapf(err, "%s: unable to read response body", errMsg)
}
return decodeResponseToError(buffer, errMsg)
}
return nil
}
// Update updates an existing event type.
func (e *EventAPI) Update(eventType *EventType) error {
const errMsg = "unable to update event type"
response, err := e.client.httpPUT(e.backOffConf.create(), e.eventURL(eventType.Name), eventType, errMsg)
if err != nil {
return err
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
buffer, err := ioutil.ReadAll(response.Body)
if err != nil {
return errors.Wrapf(err, "%s: unable to read response body", errMsg)
}
return decodeResponseToError(buffer, "unable to update event type")
}
return nil
}
// Delete removes an event type.
func (e *EventAPI) Delete(name string) error {
return e.client.httpDELETE(e.backOffConf.create(), e.eventURL(name), "unable to delete event type")
}
func (e *EventAPI) eventURL(name string) string {
return fmt.Sprintf("%s/event-types/%s", e.client.nakadiURL, name)
}
func (e *EventAPI) eventBaseURL() string {
return fmt.Sprintf("%s/event-types", e.client.nakadiURL)
}