diff --git a/plugins/ekuiper/plugin_ekuiper.c b/plugins/ekuiper/plugin_ekuiper.c index c2e29304e..46152be21 100644 --- a/plugins/ekuiper/plugin_ekuiper.c +++ b/plugins/ekuiper/plugin_ekuiper.c @@ -114,6 +114,8 @@ static int ekuiper_plugin_start(neu_plugin_t *plugin) nng_pipe_notify(plugin->sock, NNG_PIPE_EV_ADD_POST, pipe_add_cb, plugin); nng_pipe_notify(plugin->sock, NNG_PIPE_EV_REM_POST, pipe_rm_cb, plugin); + nng_socket_set_int(plugin->sock, NNG_OPT_SENDBUF, 2048); + nng_socket_set_int(plugin->sock, NNG_OPT_RECVBUF, 2048); if ((rv = nng_listen(plugin->sock, EKUIPER_PLUGIN_URL, NULL, 0)) != 0) { plog_error(plugin, "nng_listen: %s", nng_strerror(rv)); diff --git a/plugins/ekuiper/read_write.c b/plugins/ekuiper/read_write.c index d11b612d1..bece71c15 100644 --- a/plugins/ekuiper/read_write.c +++ b/plugins/ekuiper/read_write.c @@ -56,7 +56,7 @@ void send_data(neu_plugin_t *plugin, neu_reqresp_trans_data_t *trans_data) rv = nng_sendmsg(plugin->sock, msg, NNG_FLAG_NONBLOCK); // TODO: use aio to send message if (0 != rv) { - plog_error(plugin, "nng cannot send msg"); + plog_error(plugin, "nng cannot send msg: %s", nng_strerror(rv)); nng_msg_free(msg); } }