Skip to content

Commit

Permalink
network: add dual ipv4/ipv6 support to UDP handler
Browse files Browse the repository at this point in the history
Signed-off-by: Eduardo Silva <[email protected]>
  • Loading branch information
edsiper committed Nov 15, 2020
1 parent 83dea71 commit 6700aeb
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 26 deletions.
4 changes: 3 additions & 1 deletion include/fluent-bit/flb_network.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ flb_sockfd_t flb_net_tcp_connect(const char *host, unsigned long port,
void *async_ctx,
struct flb_upstream_conn *u_conn);

flb_sockfd_t flb_net_udp_connect(const char *host, unsigned long port);
flb_sockfd_t flb_net_udp_connect(const char *host, unsigned long port,
char *source_addr);

int flb_net_tcp_fd_connect(flb_sockfd_t fd, const char *host, unsigned long port);
flb_sockfd_t flb_net_server(const char *port, const char *listen_addr);
flb_sockfd_t flb_net_server_udp(const char *port, const char *listen_addr);
Expand Down
87 changes: 62 additions & 25 deletions src/flb_network.c
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,27 @@ int flb_net_bind_address(int fd, char *source_addr)
return 0;
}

static void set_ip_family(const char *host, struct addrinfo *hints)
{

int ret;
struct in6_addr serveraddr;

/* check if the given 'host' is a network address, adjust ai_flags */
ret = inet_pton(AF_INET, host, &serveraddr);
if (ret == 1) { /* valid IPv4 text address ? */
hints->ai_family = AF_INET;
hints->ai_flags |= AI_NUMERICHOST;
}
else {
ret = inet_pton(AF_INET6, host, &serveraddr);
if (ret == 1) { /* valid IPv6 text address ? */
hints->ai_family = AF_INET6;
hints->ai_flags |= AI_NUMERICHOST;
}
}
}

/* Connect to a TCP socket server and returns the file descriptor */
flb_sockfd_t flb_net_tcp_connect(const char *host, unsigned long port,
char *source_addr, int connect_timeout,
Expand All @@ -442,30 +463,18 @@ flb_sockfd_t flb_net_tcp_connect(const char *host, unsigned long port,
char _port[6];
struct addrinfo hints;
struct addrinfo *res, *rp;
struct in6_addr serveraddr;

if (is_async == FLB_TRUE && !u_conn) {
flb_error("[net] invalid async mode with not set upstream connection");
return -1;
}

memset(&hints, 0, sizeof hints);
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;

/* check if the given 'host' is a network address, adjust ai_flags */
ret = inet_pton(AF_INET, host, &serveraddr);
if (ret == 1) { /* valid IPv4 text address ? */
hints.ai_family = AF_INET;
hints.ai_flags |= AI_NUMERICHOST;
}
else {
ret = inet_pton(AF_INET6, host, &serveraddr);
if (ret == 1) { /* valid IPv6 text address ? */
hints.ai_family = AF_INET6;
hints.ai_flags |= AI_NUMERICHOST;
}
}
/* Set hints */
set_ip_family(host, &hints);

/* fomart the TCP port */
snprintf(_port, sizeof(_port), "%lu", port);
Expand Down Expand Up @@ -500,9 +509,9 @@ flb_sockfd_t flb_net_tcp_connect(const char *host, unsigned long port,
if (ret == -1) {
flb_warn("[net] falling back to random interface");
}
}
else {
flb_trace("[net] client connect bind address: %s", source_addr);
else {
flb_trace("[net] client connect bind address: %s", source_addr);
}
}

/* Disable Nagle's algorithm */
Expand Down Expand Up @@ -542,33 +551,61 @@ flb_sockfd_t flb_net_tcp_connect(const char *host, unsigned long port,
}

/* "Connect" to a UDP socket server and returns the file descriptor */
flb_sockfd_t flb_net_udp_connect(const char *host, unsigned long port)
flb_sockfd_t flb_net_udp_connect(const char *host, unsigned long port,
char *source_addr)
{
flb_sockfd_t fd = -1;
int ret;
flb_sockfd_t fd = -1;
char _port[6];
struct addrinfo hints;
struct addrinfo *res, *rp;
char _port[6];

memset(&hints, 0, sizeof hints);
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_DGRAM;

/* Set hints */
set_ip_family(host, &hints);

/* Format UDP port */
snprintf(_port, sizeof(_port), "%lu", port);

/* retrieve DNS info */
ret = getaddrinfo(host, _port, &hints, &res);
if (ret != 0) {
flb_warn("net_udp_connect: getaddrinfo(host='%s'): %s",
flb_warn("net]: getaddrinfo(host='%s'): %s",
host, gai_strerror(ret));
return -1;
}

for (rp = res; rp != NULL; rp = rp->ai_next) {
fd = flb_net_socket_create_udp(rp->ai_family, 0);
/* create socket */
fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
if (fd == -1) {
flb_error("Error creating client socket, retrying");
flb_error("[net] coult not create client socket, retrying");
continue;
}

/* Bind a specific network interface ? */
if (source_addr != NULL) {
ret = flb_net_bind_address(fd, source_addr);
if (ret == -1) {
flb_warn("[net] falling back to random interface");
}
else {
flb_trace("[net] client connect bind address: %s", source_addr);
}
}

/*
* Why do we connect(2) an UDP socket ?, is this useful ?: Yes. Despite
* an UDP socket it's not in a connection state, connecting through the
* API it helps the Kernel to configure the destination address and
* is totally valid, so then you don't need to use sendto(2).
*
* For our use case this is quite helpful, since the caller keeps using
* the same Fluent Bit I/O API to deliver a message.
*/
if (connect(fd, rp->ai_addr, rp->ai_addrlen) == -1) {
flb_error("Cannot connect to %s port %s", host, _port);
flb_socket_close(fd);
Expand Down

0 comments on commit 6700aeb

Please sign in to comment.