-
Notifications
You must be signed in to change notification settings - Fork 581
/
ipc.c
321 lines (261 loc) · 8.14 KB
/
ipc.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
/*
* Copyright (C) 2017 OpenSIPS Project
*
* This file is part of opensips, a free SIP server.
*
* opensips is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version
*
* opensips is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include <string.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include "ipc.h"
#include "dprint.h"
#include "mem/mem.h"
#include <fcntl.h>
#define IPC_HANDLER_NAME_MAX 32
typedef struct _ipc_handler {
/* handler function */
ipc_handler_f *func;
/* same name/description, null terminated */
char name[IPC_HANDLER_NAME_MAX+1];
} ipc_handler;
typedef struct _ipc_job {
/* the ID (internal) of the process sending the job */
unsigned short snd_proc;
/* the job's handler type */
ipc_handler_type handler_type;
/* the payload of the job, just pointers */
void *payload1;
void *payload2;
} ipc_job;
static ipc_handler *ipc_handlers = NULL;
static unsigned int ipc_handlers_no = 0;
/* shared IPC support: dispatching a job to a random OpenSIPS worker */
static int ipc_shared_pipe[2];
/* IPC type used for RPC - a self registered type */
static ipc_handler_type ipc_rpc_type = 0;
/* FD (pipe) used for dispatching IPC jobs between all processes (1 to any) */
int ipc_shared_fd_read;
int init_ipc(void)
{
int optval;
/* create the pipe for dispatching the timer jobs */
if (pipe(ipc_shared_pipe) != 0) {
LM_ERR("failed to create ipc pipe (%s)!\n", strerror(errno));
return -1;
}
/* make reading fd non-blocking */
optval = fcntl(ipc_shared_pipe[0], F_GETFL);
if (optval == -1) {
LM_ERR("fcntl failed: (%d) %s\n", errno, strerror(errno));
return -1;
}
if (fcntl(ipc_shared_pipe[0], F_SETFL, optval|O_NONBLOCK) == -1) {
LM_ERR("set non-blocking failed: (%d) %s\n", errno, strerror(errno));
return -1;
}
ipc_shared_fd_read = ipc_shared_pipe[0];
/* self-register the IPC type for RPC */
ipc_rpc_type = ipc_register_handler( NULL, "RPC");
if (ipc_bad_handler_type(ipc_rpc_type)) {
LM_ERR("failed to self register RPC type\n");
return -1;
}
/* we are all set */
return 0;
}
int create_ipc_pipes( int proc_no )
{
int optval, i;
for( i=0 ; i<proc_no ; i++ ) {
if (pipe(pt[i].ipc_pipe_holder)<0) {
LM_ERR("failed to create IPC pipe for process %d, err %d/%s\n",
i, errno, strerror(errno));
return -1;
}
/* make writing fd non-blocking */
optval = fcntl( pt[i].ipc_pipe_holder[1], F_GETFL);
if (optval == -1) {
LM_ERR("fcntl failed: (%d) %s\n", errno, strerror(errno));
return -1;
}
if (fcntl(pt[i].ipc_pipe_holder[1], F_SETFL, optval|O_NONBLOCK) == -1){
LM_ERR("set non-blocking write failed: (%d) %s\n",
errno, strerror(errno));
return -1;
}
if (pipe(pt[i].ipc_sync_pipe_holder)<0) {
LM_ERR("failed to create IPC sync pipe for process %d, "
"err %d/%s\n", i, errno, strerror(errno));
return -1;
}
/* make writing fd non-blocking */
optval = fcntl( pt[i].ipc_sync_pipe_holder[1], F_GETFL);
if (optval == -1) {
LM_ERR("fcntl failed: (%d) %s\n", errno, strerror(errno));
return -1;
}
if (fcntl(pt[i].ipc_sync_pipe_holder[1], F_SETFL, optval|O_NONBLOCK) == -1){
LM_ERR("set non-blocking write failed: (%d) %s\n",
errno, strerror(errno));
return -1;
}
}
return 0;
}
ipc_handler_type ipc_register_handler( ipc_handler_f *hdl, char *name)
{
ipc_handler *new;
/* allocate an n+1 new buffer to accomodate the new handler */
new = (ipc_handler*)
pkg_malloc( (ipc_handlers_no+1)*sizeof(ipc_handler) );
if (new==NULL) {
LM_ERR("failed to alloctes IPC handler array for size %d\n",
ipc_handlers_no+1);
return -1;
}
/* copy previous records, if any */
if (ipc_handlers) {
memcpy( new, ipc_handlers, ipc_handlers_no*sizeof(ipc_handler) );
pkg_free( ipc_handlers );
}
/* copy handler function */
new[ipc_handlers_no].func = hdl;
/* copy the name, trunkate it needed, but keep it null terminated */
strncpy( new[ipc_handlers_no].name , name, IPC_HANDLER_NAME_MAX);
new[ipc_handlers_no].name[IPC_HANDLER_NAME_MAX] = 0;
ipc_handlers = new;
LM_DBG("IPC type %d [%s] registered with handler %p\n",
ipc_handlers_no, ipc_handlers[ipc_handlers_no].name, hdl );
return ipc_handlers_no++;
}
static inline int __ipc_send_job(int fd, int dst_proc, ipc_handler_type type,
void *payload1, void *payload2)
{
ipc_job job;
int n;
// FIXME - we should check if the destination process really listens
// for read, otherwise we may end up filling in the pipe and block
memset(&job, 0, sizeof job);
job.snd_proc = (short)process_no;
job.handler_type = type;
job.payload1 = payload1;
job.payload2 = payload2;
again:
/* The per-proc IPC write fds are sent to non-blocking (to be sure we
* do not escalate into a global blocking if a single process got stuck.
* In such care the EAGAIN or EWOULDBLOCK will be thrown and we will
* handle as generic error, nothing special to do.
*/
n = write(fd, &job, sizeof(job) );
if (n<0) {
if (errno==EAGAIN || errno==EWOULDBLOCK)
LM_CRIT("blocking detected while sending job type %d[%s] on %d "
" to proc id %d/%d [%s]\n", type, ipc_handlers[type].name, fd,
dst_proc, (dst_proc==-1)?-1:pt[dst_proc].pid ,
(dst_proc==-1)?"n/a":pt[dst_proc].desc);
else if (errno==EINTR)
goto again;
else
LM_ERR("sending job type %d[%s] on %d failed: %s\n",
type, ipc_handlers[type].name, fd, strerror(errno));
return -1;
}
return 0;
}
int ipc_send_job(int dst_proc, ipc_handler_type type, void *payload)
{
return __ipc_send_job(IPC_FD_WRITE(dst_proc), dst_proc,
type, payload, NULL);
}
int ipc_dispatch_job(ipc_handler_type type, void *payload)
{
return __ipc_send_job(ipc_shared_pipe[1], -1, type, payload, NULL);
}
int ipc_send_rpc(int dst_proc, ipc_rpc_f *rpc, void *param)
{
return __ipc_send_job(IPC_FD_WRITE(dst_proc), dst_proc,
ipc_rpc_type, rpc, param);
}
int ipc_dispatch_rpc( ipc_rpc_f *rpc, void *param)
{
return __ipc_send_job(ipc_shared_pipe[1], -1, ipc_rpc_type, rpc, param);
}
int ipc_send_sync_reply(int dst_proc, void *param)
{
int n;
again:
n = write(IPC_FD_SYNC_WRITE(dst_proc), ¶m, sizeof(param));
if (n<0) {
if (errno==EINTR)
goto again;
LM_ERR("sending sync rpc %d[%s]\n", errno, strerror(errno));
return -1;
}
return 0;
}
int ipc_recv_sync_reply(void **param)
{
void *ret;
int n;
again:
n = read(IPC_FD_SYNC_READ_SELF, &ret, sizeof(ret));
if (n < sizeof(*ret)) {
if (errno == EINTR)
goto again;
/* if we got here, it's definitely an error, because the socket is
* blocking, so we can't read partial messages */
LM_ERR("read failed:[%d] %s\n", errno, strerror(errno));
return -1;
}
*param = ret;
return 0;
}
void ipc_handle_job(int fd)
{
ipc_job job;
int n;
/* read one IPC job from the pipe; even if the read is blocking,
* we are here triggered from the reactor, on a READ event, so
* we shouldn;t ever block */
n = read(fd, &job, sizeof(job) );
if (n==-1) {
if (errno==EAGAIN || errno==EINTR || errno==EWOULDBLOCK )
return;
LM_ERR("read failed:[%d] %s\n", errno, strerror(errno));
return;
}
/* suppress the E_CORE_LOG event for the below log while handling
* the event itself */
suppress_proc_log_event();
LM_DBG("received job type %d[%s] from process %d\n",
job.handler_type, ipc_handlers[job.handler_type].name, job.snd_proc);
reset_proc_log_event();
/* custom handling for RPC type */
if (job.handler_type==ipc_rpc_type) {
((ipc_rpc_f*)job.payload1)( job.snd_proc, job.payload2);
} else {
/* generic registered type */
ipc_handlers[job.handler_type].func( job.snd_proc, job.payload1);
}
return;
}
void ipc_handle_all_pending_jobs(int fd)
{
char buf;
while ( recv(fd, &buf, 1, MSG_DONTWAIT|MSG_PEEK)==1 )
ipc_handle_job(fd);
}