Skip to content

Commit

Permalink
Merge branch 'v2.5' into v2.5-2
Browse files Browse the repository at this point in the history
  • Loading branch information
hxy7yx authored Jun 28, 2023
2 parents 8d57bbb + 3cecf25 commit 413cb76
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 1 deletion.
59 changes: 58 additions & 1 deletion plugins/ekuiper/plugin_ekuiper.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
**/

#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/socket.h>

#include "errcodes.h"
#include "neuron.h"
#include "utils/asprintf.h"
Expand Down Expand Up @@ -99,6 +103,7 @@ static int ekuiper_plugin_uninit(neu_plugin_t *plugin)
{
int rv = 0;

nng_close(plugin->sock);
nng_aio_free(plugin->recv_aio);
nng_mtx_free(plugin->mtx);
free(plugin->host);
Expand Down Expand Up @@ -127,7 +132,14 @@ static int ekuiper_plugin_start(neu_plugin_t *plugin)
if ((rv = nng_listen(plugin->sock, url, NULL, 0)) != 0) {
plog_error(plugin, "nng_listen: %s", nng_strerror(rv));
nng_close(plugin->sock);
return NEU_ERR_EINTERNAL;
if (NNG_EADDRINVAL == rv) {
rv = NEU_ERR_IP_ADDRESS_INVALID;
} else if (NNG_EADDRINUSE == rv) {
rv = NEU_ERR_IP_ADDRESS_IN_USE;
} else {
rv = NEU_ERR_EINTERNAL;
}
return rv;
}

nng_recv_aio(plugin->sock, plugin->recv_aio);
Expand Down Expand Up @@ -163,6 +175,12 @@ static int parse_config(neu_plugin_t *plugin, const char *setting,
goto error;
}

struct in_addr addr;
if (0 == inet_aton(host.v.val_str, &addr)) {
plog_error(plugin, "inet_aton fail: %s", host.v.val_str);
goto error;
}

// port, required
if (0 == port.v.val_int || port.v.val_int > 65535) {
plog_error(plugin, "setting invalid port: %" PRIi64, port.v.val_int);
Expand All @@ -182,6 +200,41 @@ static int parse_config(neu_plugin_t *plugin, const char *setting,
return -1;
}

static inline int check_url_listenable(neu_plugin_t *plugin, const char *url,
const char *host, uint16_t port)
{
if (NULL != plugin->host && // already configured and
port == plugin->port && // port is the same, then if
(0 == strcmp(plugin->host, host) // 1. host is the same
|| 0 == strcmp("0.0.0.0", host) // 2. to bind to any address
|| 0 == strcmp("0.0.0.0", plugin->host) // 3. bound to any address
)) {
// early return, no need check url is listenable
return 0;
}

nng_socket sock = NNG_SOCKET_INITIALIZER;
int rv = nng_pair0_open(&sock);
if (0 != rv) {
plog_error(plugin, "nng_pair0_open: %s", nng_strerror(rv));
return NEU_ERR_EINTERNAL;
}

if (0 != (rv = nng_listen(sock, url, NULL, 0))) {
plog_error(plugin, "nng_listen: %s", nng_strerror(rv));
if (NNG_EADDRINVAL == rv) {
rv = NEU_ERR_IP_ADDRESS_INVALID;
} else if (NNG_EADDRINUSE == rv) {
rv = NEU_ERR_IP_ADDRESS_IN_USE;
} else {
rv = NEU_ERR_EINTERNAL;
}
}

nng_close(sock);
return rv;
}

static int ekuiper_plugin_config(neu_plugin_t *plugin, const char *setting)
{
int rv = 0;
Expand All @@ -201,6 +254,10 @@ static int ekuiper_plugin_config(neu_plugin_t *plugin, const char *setting)
goto error;
}

if (0 != (rv = check_url_listenable(plugin, url, host, port))) {
goto error;
}

plog_notice(plugin, "config success");

free(plugin->host);
Expand Down
2 changes: 2 additions & 0 deletions src/utils/http.c
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,8 @@ int neu_http_response(nng_aio *aio, neu_err_code_e code, char *content)
break;
case NEU_ERR_BODY_IS_WRONG:
case NEU_ERR_PARAM_IS_WRONG:
case NEU_ERR_IP_ADDRESS_INVALID:
case NEU_ERR_IP_ADDRESS_IN_USE:
case NEU_ERR_NODE_SETTING_INVALID:
case NEU_ERR_NODE_NOT_ALLOW_UPDATE:
case NEU_ERR_NODE_NOT_ALLOW_SUBSCRIBE:
Expand Down

0 comments on commit 413cb76

Please sign in to comment.