-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.go
308 lines (252 loc) · 8.46 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
package rpc
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"strconv"
"sync"
"time"
"github.com/redis/go-redis/v9"
)
const (
DefaultBlockInterval = 10 * time.Second
DefaultConcurency = 25
)
type Handler func(req *Request) (any, error)
type token struct{}
type Server struct {
ctx context.Context
redis *redis.Client
handlers map[string]RequestHandler
handlersLock *sync.RWMutex
cancel context.CancelFunc
sem chan token
wg *sync.WaitGroup
stream string
group string
consumer string
interceptors []Interceptor
}
// ServerOption is a function type that can be used to configure a Server.
// It takes a pointer to a Server and modifies its properties.
type ServerOption func(*Server)
// NewServer creates a new instance of the Server struct.
// It takes a Redis client, stream name, consumer group name, and consumer name as parameters.
// It returns a pointer to the newly created Server instance.
func NewServer(redisClient *redis.Client, stream, group, consumer string, opts ...ServerOption) *Server {
ctx, cancel := context.WithCancel(context.Background())
srv := &Server{
redis: redisClient,
stream: stream,
group: group,
handlers: make(map[string]RequestHandler),
handlersLock: &sync.RWMutex{},
ctx: ctx,
cancel: cancel,
consumer: consumer,
sem: make(chan token, DefaultConcurency),
wg: &sync.WaitGroup{},
}
for _, opt := range opts {
opt(srv)
}
return srv
}
// Run starts the server and continuously reads messages from the Redis stream.
// It initializes the reader, sets up the read arguments, and enters an infinite loop
// to read messages from the stream. It processes each message by calling the
// `processMessage` method.
//
// If an error occurs during initialization or reading the stream, it returns
// the error. If the stream is empty, it continues to the next iteration.
//
// The `Run` method is responsible for running the server and handling the
// continuous message processing from the Redis stream.
func (s *Server) Run() error {
err := s.initReader()
if err != nil {
return err
}
readArgs := &redis.XReadGroupArgs{
Group: s.group,
Consumer: s.consumer,
Streams: []string{s.stream, ">"},
Block: DefaultBlockInterval,
Count: DefaultConcurency,
NoAck: false,
}
for {
select {
case <-s.ctx.Done():
return nil
case s.sem <- token{}:
<-s.sem
}
readArgs.Count = int64(cap(s.sem) - len(s.sem))
streams, err := s.redis.XReadGroup(s.ctx, readArgs).Result()
switch {
case err == redis.Nil:
continue
case err == context.Canceled:
return nil
case err != nil:
return fmt.Errorf("error reading stream: %w", err)
}
for _, stream := range streams {
for _, message := range stream.Messages {
s.processMessage(message)
}
}
}
}
// initReader initializes the reader by creating a stream and a consumer group.
// It creates the stream if it doesn't exist and creates the consumer group if it doesn't exist.
// If the consumer group already exists, it returns an error.
func (s *Server) initReader() error {
// create the stream
err := s.redis.XGroupCreateMkStream(s.ctx, s.stream, s.group, "$").Err()
if err != nil && !redis.HasErrorPrefix(err, "BUSYGROUP Consumer Group name already exists") {
return fmt.Errorf("error creating stream: %w", err)
}
// create the consumer
if err := s.redis.XGroupCreateConsumer(s.ctx, s.stream, s.group, s.consumer).Err(); err != nil {
return fmt.Errorf("error creating consumer: %w", err)
}
return nil
}
// processMessage processes the incoming Redis XMessage.
// It extracts the method, id, params, deadline, and replyTo fields from the message,
// retrieves the appropriate handler for the method, and executes it in a separate goroutine.
// If a panic occurs during execution, it recovers and logs the error.
// If a deadline is specified, it sets a deadline for the execution context.
// After executing the handler, it marshals the result into JSON and creates a response.
// Finally, it publishes the response to the specified replyTo channel using Redis.
func (s *Server) processMessage(msg redis.XMessage) {
method := getField(msg, "method")
if method == "" {
return
}
handler, ok := s.getHandler(method)
if !ok {
return
}
s.sem <- token{}
s.wg.Add(1)
go func() {
defer func() {
if r := recover(); r != nil {
slog.Error(fmt.Sprintf("RPC panic for %s: %v", method, r))
}
<-s.sem
s.wg.Done()
}()
req, cancel, err := parseMessage(s.ctx, method, msg)
if err != nil {
slog.Error(fmt.Sprintf("RPC error parsing message for %s: %v", method, err))
return
}
defer cancel()
resp, err := handler(req)
if err != nil {
slog.Error(fmt.Sprintf("RPC error handling request for %s: %v", method, err))
return
}
if err := s.handleResponse(req, resp); err != nil {
slog.Error(fmt.Sprintf("RPC error sending result for %s: %v", method, err))
return
}
}()
}
// Close stops the server gracefully by cancelling the context and waiting for all goroutines to finish.
func (s *Server) Close() {
s.cancel()
s.wg.Wait()
}
// AddHandler adds a new RPC handler to the server.
// It associates the given `handler` with the specified `rpcName`.
// If a handler already exists for the same `rpcName`, it panics.
func (s *Server) AddHandler(rpcName string, handler Handler) {
s.handlersLock.Lock()
defer s.handlersLock.Unlock()
if _, ok := s.handlers[rpcName]; ok {
panic("rpc handler already exists for " + rpcName)
}
reqHandler := useInterceptors(responseWrapper(handler), s.interceptors)
s.handlers[rpcName] = reqHandler
}
// getHandler returns the handler function associated with the given RPC name.
// It also returns a boolean value indicating whether the handler was found or not.
func (s *Server) getHandler(rpcName string) (RequestHandler, bool) {
s.handlersLock.RLock()
defer s.handlersLock.RUnlock()
handler, ok := s.handlers[rpcName]
return handler, ok
}
// getField retrieves the value of a specified field from a redis.XMessage.
// If the field does not exist or the value is not a string, an empty string is returned.
func getField(msg redis.XMessage, field string) string {
rawValue, ok := msg.Values[field]
if !ok {
return ""
}
val, ok := rawValue.(string)
if !ok {
return ""
}
return val
}
// parseMessage parses the given Redis XMessage and returns a Request, context.CancelFunc, and error.
// It extracts the necessary fields from the message and creates a context with optional deadline.
func parseMessage(ctx context.Context, method string, msg redis.XMessage) (*Request, context.CancelFunc, error) {
ctx, cancel := context.WithCancel(ctx)
if deadline := getField(msg, "deadline"); deadline != "" {
epochTime, err := strconv.ParseInt(deadline, 10, 64)
if err != nil {
return nil, nil, fmt.Errorf("error parsing deadline: %w", err)
}
deadlineTime := time.Unix(epochTime, 0)
ctx, cancel = context.WithDeadline(ctx, deadlineTime)
}
if stash := getField(msg, "stash"); stash != "" {
ctx = putStash(ctx, stash)
}
id := getField(msg, "id")
params := getField(msg, "params")
replyTo := getField(msg, "reply_to")
return NewRequest(ctx, method, id, params, replyTo), cancel, nil
}
// handleResponse handles the response by marshalling it into JSON format and publishing it to Redis.
// If the response's ReplyTo field is empty, the function returns nil.
// Otherwise, it marshals the response into bytes, publishes it to Redis using the provided context and ReplyTo value,
// and returns any error encountered during the process.
func (s *Server) handleResponse(req *Request, resp *Response) error {
if req.ReplyTo == "" {
return nil
}
respBytes, err := json.Marshal(resp)
if err != nil {
return fmt.Errorf("error marshalling response: %w", err)
}
if err := s.redis.Publish(req.Context(), req.ReplyTo, respBytes).Err(); err != nil {
return fmt.Errorf("error publishing response: %w", err)
}
return nil
}
func responseWrapper(handler Handler) RequestHandler {
return func(req *Request) (*Response, error) {
result, err := handler(req)
resp, err := NewResponse(req.ID, result, err)
if err != nil {
return nil, err
}
return resp, nil
}
}
// WithMiddleware is a function that returns a ServerOption which sets the middleware for the server.
// Middleware is a list of Interceptor functions that will be applied to incoming requests.
func WithServerInterceptors(interceptors ...Interceptor) ServerOption {
return func(s *Server) {
s.interceptors = interceptors
}
}