forked from netkiller/mysql-zmq-plugin
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathzeromq.c
122 lines (99 loc) · 2.79 KB
/
zeromq.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
/*
Homepage: http://netkiller.github.io/
Author: netkiller<[email protected]>
*/
#include <stdlib.h>
#include <stdio.h>
#include <mysql.h>
#include <string.h>
#include <error.h>
#include <zmq.h>
#include "zeromq.h"
/* ------------------------ zmq_client ----------------------------- */
my_bool zmq_client_init(UDF_INIT *initid, UDF_ARGS *args, char *message)
{
if (args->arg_count != 2)
{
strncpy(message, "two arguments must be supplied: zmq_client('socket','message').", MYSQL_ERRMSG_SIZE);
return 1;
}
args->arg_type[0]= STRING_RESULT;
return 0;
}
char *zmq_client(UDF_INIT *initid, UDF_ARGS *args,
__attribute__ ((unused)) char *result,
unsigned long *length,
__attribute__ ((unused)) char *is_null,
__attribute__ ((unused)) char *error)
{
char *text;
int status = -1;
void *context = zmq_ctx_new ();
void *zmqsocket = zmq_socket (context, ZMQ_REQ);
zmq_connect (zmqsocket, args->args[0]);
zmq_msg_t buffer={0};
zmq_msg_init_size (&buffer, strlen(args->args[1]));
memcpy (zmq_msg_data (&buffer), args->args[1], strlen(args->args[1]));
if(zmq_msg_send(&buffer, zmqsocket, 0) != -1){
status = 1;
}else{
text="NULL";
}
zmq_msg_close (&buffer);
if(status){
zmq_msg_t buffer={0};
zmq_msg_init (&buffer);
zmq_msg_recv (&buffer, zmqsocket, 0);
int size = zmq_msg_size (&buffer);
char *string = malloc (size + 1);
memset(string,0,size+1);
memcpy (string, zmq_msg_data (&buffer), size);
if(size > 0){
asprintf(&text, "%s", string);
}
free(string);
zmq_msg_close (&buffer);
}
zmq_close (zmqsocket);
zmq_ctx_destroy (context);
*length = strlen(text);
return ((char *)text);
}
void zmq_client_deinit(UDF_INIT *initid)
{
return;
}
/* ------------------------ zmq_publish ----------------------------- */
my_bool zmq_publish_init(UDF_INIT *initid, UDF_ARGS *args, char *message)
{
if (args->arg_count != 2)
{
strncpy(message, "two arguments must be supplied: zmq_publish('socket','message').", MYSQL_ERRMSG_SIZE);
return 1;
}
args->arg_type[0]= STRING_RESULT;
return 0;
}
char *zmq_publish(UDF_INIT *initid, UDF_ARGS *args,
__attribute__ ((unused)) char *result,
unsigned long *length,
__attribute__ ((unused)) char *is_null,
__attribute__ ((unused)) char *error)
{
char *status;
void *context = zmq_ctx_new ();
void *publisher = zmq_socket (context, ZMQ_PUB);
int rc = zmq_bind (publisher, args->args[0]);
//assert (rc == 0);
// Send message to all subscribers
zmq_close (publisher);
zmq_ctx_destroy (context);
/* 等有时间再完善 */
status = "等有时间再完善";
*length = strlen(status);
return ((char *)status);
}
void zmq_publish_deinit(UDF_INIT *initid)
{
return;
}