diff --git a/samples/net/nats/Makefile b/samples/net/nats/Makefile new file mode 100644 index 00000000000000..9b2b0e47ee375a --- /dev/null +++ b/samples/net/nats/Makefile @@ -0,0 +1,14 @@ +# Makefile - NATS client sample + +# +# Copyright (c) 2017 Intel Corporation +# +# SPDX-License-Identifier: Apache-2.0 +# + +BOARD ?= qemu_x86 +CONF_FILE ?= prj_$(BOARD).conf + +include $(ZEPHYR_BASE)/Makefile.inc + +include $(ZEPHYR_BASE)/samples/net/common/Makefile.ipstack diff --git a/samples/net/nats/README.rst b/samples/net/nats/README.rst new file mode 100644 index 00000000000000..7c67ebf01e7e66 --- /dev/null +++ b/samples/net/nats/README.rst @@ -0,0 +1,159 @@ +.. _NATS_Client_Sample: + + +NATS Client Implementation Sample +################################# + + +Overview +******** + +`NATS `__ is a +publisher/subscriber protocol implemented on top of TCP. It is specified in +`NATS Protocol documentation `__, +and this is a sample implementation for Zephyr using the new IP stack. +The API is loosely based off of the `Golang API +`__. + +With this sample, it's possible to subscribe/unsubscribe to a given subject, +and be notified of changes asynchronously. In order to conserve resources, +the implementation does not keep track of subscribed subjects; that +must be performed by the application itself, so it can ignore unknown/undesired +subjects. + +TLS is not supported yet, although basic authentication is. The client will indicate +if it supports username/password if a certain callback is set in the ``struct +nats``. This callback will then be called, and the user must copy the +username/password to the supplied user/pass buffers. + +Content might be also published for a given subject. + +The sample application lets one observe the subject "led0", and turn it +"on", "off", or "toggle" its value. Changing the value will, if supported, +act on a status LED on the development board. The new status will be +published. + +Also worth noting is that most of the networking and GPIO boilerplate has +been shamelessly copied from the IRC bot example. (Curiously, both +protocols are similar.) + +Requirements +************ + +To test the sample, build the Zephyr application for your platform. This +has only been tested with the QEMU emulator as provided by the Zephyr SDK, +but it should work with other supported hardware as long as they have enough +memory, the network stack has TCP enabled, and the connectivity hardware is +supported. + +As far as the software goes, this has been tested with the official `gnatsd +`__ for the server, and the official +`go-nats `__ client library. Both the +server and clients were set up as per instructions found in their respective +``README.md`` files. + +The client was a one-off test that is basically the same code provided in +the `Basic Usage +`__ +section as found in the ``go-nats`` README file, however, subscribing to the +topic used in this sample: ``led0``, and publishing values as described +above (``on``, ``off``, and ``toggle``). + +Library Usage +************* + +Allocate enough space for a ``struct nats``, setting a few callbacks so +that you're notified as events happen: + +:: + + struct nats nats_ctx = { + .on_auth_required = on_auth_required, + .on_message = on_message + }; + +The ``on_auth_required()`` and ``on_message()`` functions are part of +your application, and each must have these signatures: + +:: + + int on_auth_required(struct nats *nats, char **user, char **pass); + int on_message(struct nats *nats, struct nats_msg *msg); + +Both functions should return 0 to signal that they could successfully +handle their role, and a negative value, if they couldn't for any +reason. It's recommended to use a negative integer as provided by +errno.h in order to ease debugging. + +The first function, ``on_auth_required()``, is called if the server +notifies that it requires authentication. It's not going to be called if +that's not the case, so it is optional. However, if the server asks for +credentials and this function is not provided, the connection will be +closed and an error will be returned by ``nats_connect()``. + +The second function, ``on_message()``, will be called whenever the +server has been notified of a value change. The ``struct nats_msg`` has the +following fields: + +:: + + struct nats_msg { + const char *subject; + const char *sid; + const char *reply_to; + }; + +The field ``reply_to`` may be passed directly to ``nats_publish()``, +in order to publish a reply to this message. If it's ``NULL`` (no +reply-to field in the message from the server), the +``nats_publish()`` function will not reply to a specific mailbox and +will just update the topic value. + +In order to manage topic subscription, these functions can be used: + +:: + + int nats_subscribe(struct nats *nats, const char *subject, + const char *queue_group, const char *sid); + +``subject`` and ``sid`` are validated so that they're actually valid +per the protocol rules. ``-EINVAL`` is returned if they're not. + +If ``queue_group`` is NULL, it's not sent to the server. + +:: + + int nats_unsubscribe(struct nats *nats, const char *sid, + size_t max_msgs); + +``sid`` is validated so it's actually valid per the protocol rules. +-EINVAL is returned if it's not. + +``max_msgs`` specifies the number of messages that the server will +send before actually unsubscribing the message. Can be 0 to +immediately unsubscribe. + +Both of these functions will return ``-ENOMEM`` if they couldn't build +the message to transmit to the server. They can also return any error +that ``net_context_send()`` can return. + +In order to conserve resources, the Zephyr implementation will not make +not of subscribed topics. This is left as a task for the user of the API +to handle, for instance, when the ``on_message()`` callback is called. + +Topics can be published by using the following function: + +:: + + int nats_publish(struct nats *nats, const char *subject, + const char *reply_to, const char *payload, + size_t payload_len); + +As usual, ``subject`` is validated and ``-EINVAL`` will be returned if +it's in an invalid format. The ``reply_to`` field can be ``NULL``, in +which case, subscribers to this topic won't receive this information as +well. + +As ``net_subscribe()`` and ``net_unsubscribe()``, this function can +return ``ENOMEM`` -or any other errors that ``net_context_send()`` +returns. diff --git a/samples/net/nats/prj_qemu_x86.conf b/samples/net/nats/prj_qemu_x86.conf new file mode 100644 index 00000000000000..ea4ce1747ebf0a --- /dev/null +++ b/samples/net/nats/prj_qemu_x86.conf @@ -0,0 +1,30 @@ +CONFIG_INIT_STACKS=y + +CONFIG_NET_LOG_ENABLED=y +CONFIG_SYS_LOG_NET_LEVEL=2 +CONFIG_NET_DHCPV4=y +CONFIG_NET_IF_UNICAST_IPV4_ADDR_COUNT=3 +CONFIG_NET_IPV6=y +CONFIG_NET_IF_UNICAST_IPV6_ADDR_COUNT=3 +CONFIG_NET_LOG=y +CONFIG_NET_MAX_CONTEXTS=10 +CONFIG_NET_NBUF_DATA_COUNT=30 +CONFIG_NET_NBUF_RX_COUNT=14 +CONFIG_NET_NBUF_TX_COUNT=14 +CONFIG_NET_SHELL=y +CONFIG_NET_SLIP_TAP=y +CONFIG_NET_STATISTICS=y +CONFIG_NET_TCP=y +CONFIG_NETWORKING=y +CONFIG_DNS_RESOLVER=n + +CONFIG_PRINTK=y +CONFIG_SYS_LOG_SHOW_COLOR=y + +CONFIG_NET_SAMPLES_IP_ADDRESSES=y +CONFIG_NET_SAMPLES_MY_IPV6_ADDR="2001:db8::1" +CONFIG_NET_SAMPLES_PEER_IPV6_ADDR="2001:db8::2" + +CONFIG_TEST_RANDOM_GENERATOR=y + +CONFIG_JSON_LIBRARY=y diff --git a/samples/net/nats/src/Makefile b/samples/net/nats/src/Makefile new file mode 100644 index 00000000000000..2fb414d02c9333 --- /dev/null +++ b/samples/net/nats/src/Makefile @@ -0,0 +1,3 @@ +obj-y = main.o nats.o + +ccflags-y += -I${ZEPHYR_BASE}/lib/json/ diff --git a/samples/net/nats/src/main.c b/samples/net/nats/src/main.c new file mode 100644 index 00000000000000..73af3e67a3c567 --- /dev/null +++ b/samples/net/nats/src/main.c @@ -0,0 +1,324 @@ +/* + * Copyright (c) 2017 Intel Corporation + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include +#include +#include +#include +#include +#include + +#include "nats.h" + +/* LED */ +#if defined(LED0_GPIO_PORT) +#define LED_GPIO_NAME LED0_GPIO_PORT +#define LED_PIN LED0_GPIO_PIN +#else +#define LED_GPIO_NAME "(fail)" +#define LED_PIN 0 +#endif + +static struct device *led0; +static bool fake_led; + +/* Network Config */ + +#if defined(CONFIG_NET_IPV6) + +#define NATS_AF_INET AF_INET6 +#define NATS_SOCKADDR_IN sockaddr_in6 + +#if defined(CONFIG_NET_SAMPLES_MY_IPV6_ADDR) +#define NATS_LOCAL_IP_ADDR CONFIG_NET_SAMPLES_MY_IPV6_ADDR +#else +#define NATS_LOCAL_IP_ADDR "2001:db8::1" +#endif /* CONFIG_NET_SAMPLES_MY_IPV6_ADDR */ + +#if defined(CONFIG_NET_SAMPLES_PEER_IPV6_ADDR) +#define NATS_PEER_IP_ADDR CONFIG_NET_SAMPLES_PEER_IPV6_ADDR +#else +#define NATS_PEER_IP_ADDR "2001:db8::2" +#endif /* CONFIG_NET_SAMPLES_PEER_IPV6_ADDR */ + +#else /* CONFIG_NET_IPV4 */ + +#define NATS_AF_INET AF_INET +#define NATS_SOCKADDR_IN sockaddr_in + +#if defined(CONFIG_NET_SAMPLES_MY_IPV4_ADDR) +#define NATS_LOCAL_IP_ADDR CONFIG_NET_SAMPLES_MY_IPV4_ADDR +#else +#define NATS_LOCAL_IP_ADDR "192.168.0.1" +#endif /* CONFIG_NET_SAMPLES_MY_IPV4_ADDR */ + +#if defined(CONFIG_NET_SAMPLES_PEER_IPV4_ADDR) +#define NATS_PEER_IP_ADDR CONFIG_NET_SAMPLES_PEER_IPV4_ADDR +#else +#define NATS_PEER_IP_ADDR "192.168.0.2" +#endif /* CONFIG_NET_SAMPLES_PEER_IPV4_ADDR */ + +#endif + +/* DNS API */ +#define DNS_PORT 53 +#define DNS_SLEEP_MSECS 400 + +/* Default server */ +#define DEFAULT_PORT 4222 + +static uint8_t stack[2048]; + +static void panic(const char *msg) +{ + NET_ERR("Panic: %s", msg); + for (;;) { + k_sleep(K_FOREVER); + } +} + +static int in_addr_set(sa_family_t family, + const char *ip_addr, + int port, + struct sockaddr *_sockaddr) +{ + int rc = 0; + + _sockaddr->family = family; + + if (ip_addr) { + if (family == AF_INET6) { + rc = net_addr_pton(family, + ip_addr, + &net_sin6(_sockaddr)->sin6_addr); + } else { + rc = net_addr_pton(family, + ip_addr, + &net_sin(_sockaddr)->sin_addr); + } + + if (rc < 0) { + NET_ERR("Invalid IP address: %s", ip_addr); + return -EINVAL; + } + } + + if (port >= 0) { + if (family == AF_INET6) { + net_sin6(_sockaddr)->sin6_port = htons(port); + } else { + net_sin(_sockaddr)->sin_port = htons(port); + } + } + + return rc; +} + +static void initialize_network(void) +{ + struct net_if *iface; + + NET_INFO("Initializing network"); + + iface = net_if_get_default(); + if (!iface) { + panic("No default network interface"); + } + +#if defined(CONFIG_NET_IPV6) && defined(CONFIG_NET_DHCPV6) + /* TODO: IPV6 DHCP */ +#elif defined(CONFIG_NET_IPV4) && defined(CONFIG_NET_DHCPV4) + net_dhcpv4_start(iface); + + /* delay so DHCPv4 can assign IP */ + /* TODO: add a timeout/retry */ + NET_INFO("Waiting for DHCP ..."); + do { + k_sleep(K_SECONDS(1)); + } while (net_is_ipv4_addr_unspecified(&iface->dhcpv4.requested_ip)); + + NET_INFO("Done!"); + + /* TODO: add a timeout */ + NET_INFO("Waiting for IP assginment ..."); + do { + k_sleep(K_SECONDS(1)); + } while (!net_is_my_ipv4_addr(&iface->dhcpv4.requested_ip)); + + NET_INFO("Done!"); +#else + struct sockaddr addr; + + if (in_addr_set(NATS_AF_INET, NATS_LOCAL_IP_ADDR, 0, + &addr) < 0) { + NET_ERR("Invalid IP address: %s", + NATS_LOCAL_IP_ADDR); + } + +#if defined(CONFIG_NET_IPV6) + net_if_ipv6_addr_add(iface, + &net_sin6(&addr)->sin6_addr, + NET_ADDR_MANUAL, 0); +#else + net_if_ipv4_addr_add(iface, + &net_sin(&addr)->sin_addr, + NET_ADDR_MANUAL, 0); +#endif +#endif /* CONFIG_NET_IPV6 && CONFIG_NET_DHCPV6 */ +} + +static bool read_led(void) +{ + uint32_t led = 0; + int r; + + if (!led0) { + return fake_led; + } + + r = gpio_pin_read(led0, LED_PIN, &led); + if (r < 0) { + return false; + } + + return !led; +} + +static void write_led(const struct nats *nats, + const struct nats_msg *msg, + bool state) +{ + char *pubstate; + + if (!led0) { + fake_led = state; + } else { + gpio_pin_write(led0, LED_PIN, !state); + } + + pubstate = state ? "on" : "off"; + nats_publish(nats, "led0", 0, msg->reply_to, 0, + pubstate, strlen(pubstate)); +} + +static int on_msg_received(const struct nats *nats, + const struct nats_msg *msg) +{ + if (!strcmp(msg->subject, "led0")) { + if (msg->payload_len == 2 && !strcmp(msg->payload, "on")) { + write_led(nats, msg, true); + return 0; + } + + if (msg->payload_len == 3 && !strcmp(msg->payload, "off")) { + write_led(nats, msg, false); + return 0; + } + + if (msg->payload_len == 6 && !strcmp(msg->payload, "toggle")) { + write_led(nats, msg, !read_led()); + return 0; + } + + return -EINVAL; + } + + return -ENOENT; +} + +static void initialize_hardware(void) +{ + NET_INFO("Initializing hardware"); + + led0 = device_get_binding(LED_GPIO_NAME); + if (led0) { + gpio_pin_configure(led0, LED_PIN, GPIO_DIR_OUT); + } +} + +static int connect(struct nats *nats, uint16_t port) +{ +#if defined(CONFIG_NET_IPV4) && defined(CONFIG_NET_DHCPV4) + struct net_if *iface; +#endif + struct sockaddr dst_addr, src_addr; + int ret; + + NET_INFO("Connecting..."); + + ret = net_context_get(NATS_AF_INET, SOCK_STREAM, IPPROTO_TCP, + &nats->conn); + if (ret < 0) { + NET_DBG("Could not get new context: %d", ret); + return ret; + } + +#if defined(CONFIG_NET_IPV6) && defined(CONFIG_NET_DHCPV6) + /* TODO: IPV6 DHCP */ +#elif defined(CONFIG_NET_IPV4) && defined(CONFIG_NET_DHCPV4) + iface = net_if_get_default(); + + net_ipaddr_copy(&net_sin(&src_addr)->sin_addr, + &iface->dhcpv4.requested_ip); + ret = in_addr_set(NATS_AF_INET, NULL, 0, &src_addr); +#else + ret = in_addr_set(NATS_AF_INET, NATS_LOCAL_IP_ADDR, + 0, &src_addr); + if (ret < 0) { + goto connect_exit; + } +#endif + + ret = in_addr_set(NATS_AF_INET, NATS_PEER_IP_ADDR, + port, &dst_addr); + if (ret < 0) { + goto connect_exit; + } + + ret = net_context_bind(nats->conn, &src_addr, + sizeof(struct NATS_SOCKADDR_IN)); + if (ret < 0) { + NET_DBG("Could not bind to local address: %d", -ret); + goto connect_exit; + } + + ret = nats_connect(nats, &dst_addr, sizeof(struct NATS_SOCKADDR_IN)); + if (!ret) { + return 0; + } + +connect_exit: + net_context_put(nats->conn); + return ret; +} + +static void nats_client(void) +{ + struct nats nats = { + .on_message = on_msg_received + }; + + NET_INFO("NATS Client Sample"); + + initialize_network(); + initialize_hardware(); + + if (connect(&nats, DEFAULT_PORT) < 0) { + panic("Could not connect to NATS server"); + } + + if (nats_subscribe(&nats, "led0", 0, NULL, 0, + "sub132984012384098", 0) < 0) { + panic("Could not subscribe to `led0` topic"); + } +} + +void main(void) +{ + k_thread_spawn(stack, sizeof(stack), (k_thread_entry_t)nats_client, + NULL, NULL, NULL, K_PRIO_COOP(7), 0, 0); +} diff --git a/samples/net/nats/src/nats.c b/samples/net/nats/src/nats.c new file mode 100644 index 00000000000000..da5b492b5383ee --- /dev/null +++ b/samples/net/nats/src/nats.c @@ -0,0 +1,570 @@ +/* + * Copyright (c) 2017 Intel Corporation + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "nats.h" + +struct nats_info { + const char *server_id; + const char *version; + const char *go; + const char *host; + size_t max_payload; + uint16_t port; + bool ssl_required; + bool auth_required; +}; + +struct io_vec { + const void *base; + size_t len; +}; + +static bool is_subject_valid(const char *subject, size_t len) +{ + size_t pos; + char last = '\0'; + + if (!subject) { + return false; + } + + for (pos = 0; pos < len; last = subject[pos++]) { + switch (subject[pos]) { + case '>': + if (subject[pos + 1] != '\0') { + return false; + } + + break; + case '.': + case '*': + if (last == subject[pos]) { + return false; + } + + break; + default: + if (isalnum(subject[pos])) { + continue; + } + + return false; + } + } + + return true; +} + +static bool is_sid_valid(const char *sid, size_t len) +{ + size_t pos; + + if (!sid) { + return false; + } + + for (pos = 0; pos < len; pos++) { + if (!isalnum(sid[pos])) { + return false; + } + } + + return true; +} + +#define TRANSMITV_LITERAL(lit_) { .base = lit_, .len = sizeof(lit_) - 1 } + +static int transmitv(struct net_context *conn, int iovcnt, + struct io_vec *iov) +{ + struct net_buf *buf; + int i; + + buf = net_nbuf_get_tx(conn, K_FOREVER); + if (!buf) { + return -ENOMEM; + } + + for (i = 0; i < iovcnt; i++) { + if (!net_nbuf_append(buf, iov[i].len, iov[i].base, K_FOREVER)) { + net_nbuf_unref(buf); + + return -ENOMEM; + } + } + + return net_context_send(buf, NULL, K_NO_WAIT, NULL, NULL); +} + +static inline int transmit(struct net_context *conn, const char buffer[], + size_t len) +{ + return transmitv(conn, 1, (struct io_vec[]) { + { .base = buffer, .len = len }, + }); +} + +#define FIELD(struct_, member_, type_) { \ + .field_name = #member_, \ + .field_name_len = sizeof(#member_) - 1, \ + .offset = offsetof(struct_, member_), \ + .type = type_ \ +} +static int handle_server_info(struct nats *nats, char *payload, size_t len) +{ + static const struct json_obj_descr descr[] = { + FIELD(struct nats_info, server_id, JSON_TOK_STRING), + FIELD(struct nats_info, version, JSON_TOK_STRING), + FIELD(struct nats_info, go, JSON_TOK_STRING), + FIELD(struct nats_info, host, JSON_TOK_STRING), + FIELD(struct nats_info, port, JSON_TOK_NUMBER), + FIELD(struct nats_info, auth_required, JSON_TOK_TRUE), + FIELD(struct nats_info, ssl_required, JSON_TOK_TRUE), + FIELD(struct nats_info, max_payload, JSON_TOK_NUMBER), + }; + struct nats_info info = {}; + char user[32], pass[64]; + size_t user_len = sizeof(user), pass_len = sizeof(pass); + int ret; + + ret = json_obj_parse(payload, len, descr, ARRAY_SIZE(descr), &info); + if (ret < 0) { + return -EINVAL; + } + + if (info.ssl_required) { + return -ENOTSUP; + } + + if (!info.auth_required) { + return 0; + } + + if (!nats->on_auth_required) { + return -EPERM; + } + + ret = nats->on_auth_required(nats, user, &user_len, pass, &pass_len); + if (ret < 0) { + return ret; + } + + ret = json_escape(user, &user_len, sizeof(user)); + if (ret < 0) { + return ret; + } + + ret = json_escape(pass, &pass_len, sizeof(pass)); + if (ret < 0) { + return ret; + } + + return transmitv(nats->conn, 5, (struct io_vec[]) { + TRANSMITV_LITERAL("CONNECT {\"user\":\""), + { .base = user, .len = user_len }, + TRANSMITV_LITERAL("\",\"pass\":\""), + { .base = pass, .len = pass_len }, + TRANSMITV_LITERAL("\"}\r\n"), + }); +} +#undef FIELD + +static bool char_in_set(char chr, const char *set) +{ + const char *ptr; + + for (ptr = set; *ptr; ptr++) { + if (*ptr == chr) { + return true; + } + } + + return false; +} + +static char *strsep(char *strp, const char *delims) +{ + const char *delim; + char *ptr; + + if (!strp) { + return NULL; + } + + for (delim = delims; *delim; delim++) { + ptr = strchr(strp, *delim); + if (ptr) { + *ptr = '\0'; + + for (ptr++; *ptr; ptr++) { + if (!char_in_set(*ptr, delims)) { + break; + } + } + + return ptr; + } + } + + return NULL; +} + +static int handle_server_msg(struct nats *nats, char *payload, size_t len) +{ + char *subject, *sid, *reply_to, *bytes; + char *end_ptr; + char prev_end = payload[len]; + size_t payload_size; + + /* strsep() uses strchr(), ensure payload is NUL-terminated */ + payload[len] = '\0'; + + /* Slice the tokens */ + subject = payload; + sid = strsep(subject, " \t"); + reply_to = strsep(sid, " \t"); + if (!reply_to) { + bytes = strsep(sid, "\r"); + } else { + bytes = strsep(reply_to, "\r"); + } + + payload[len] = prev_end; + + if (!bytes) { + return -EINVAL; + } + + /* Parse the payload size */ + errno = 0; + payload_size = strtoul(bytes, &end_ptr, 10); + if (errno != 0) { + return -errno; + } + + if (!end_ptr) { + return -EINVAL; + } + + if (payload_size >= payload + len - end_ptr) { + return -EINVAL; + } + + payload = end_ptr + 2; + + return nats->on_message(nats, &(struct nats_msg) { + .subject = subject, + .sid = sid, + .reply_to = reply_to, + .payload = payload, + .payload_len = payload_size, + }); +} + +static int handle_server_ping(struct nats *nats, char *payload, size_t len) +{ + static const char pong[] = "PONG\r\n"; + + return transmit(nats->conn, pong, sizeof(pong) - 1); +} + +static int ignore(struct nats *nats, char *payload, size_t len) +{ + /* FIXME: Notify user of success/errors. This would require + * maintaining information of what was the last sent command in + * order to provide the best error information for the user. + * Without VERBOSE set, these won't be sent -- but be cautious and + * ignore them just in case. + */ + return 0; +} + +#define CMD(cmd_, handler_) { \ + .op = cmd_, \ + .len = sizeof(cmd_) - 1, \ + .handle = handler_ \ +} +static int handle_server_cmd(struct nats *nats, char *cmd, size_t len) +{ + static const struct { + const char *op; + size_t len; + int (*handle)(struct nats *nats, char *payload, size_t len); + } cmds[] = { + CMD("INFO", handle_server_info), + CMD("MSG", handle_server_msg), + CMD("PING", handle_server_ping), + CMD("+OK", ignore), + CMD("-ERR", ignore), + }; + size_t i; + char *payload; + size_t payload_len; + + payload = strsep(cmd, " \t"); + if (!payload) { + payload = strsep(cmd, "\r"); + if (!payload) { + return -EINVAL; + } + } + payload_len = len - (size_t)(payload - cmd); + len = (size_t)(payload - cmd); + + for (i = 0; i < ARRAY_SIZE(cmds); i++) { + if (len != cmds[i].len) { + continue; + } + + if (!strncmp(cmds[i].op, cmd, len)) { + return cmds[i].handle(nats, payload, payload_len); + } + } + + return -ENOENT; +} +#undef CMD + +int nats_subscribe(const struct nats *nats, + const char *subject, size_t subject_len, + const char *queue_group, size_t queue_group_len, + const char *sid, size_t sid_len) +{ + if (!is_subject_valid(subject, subject_len)) { + return -EINVAL; + } + + if (!is_sid_valid(sid, sid_len)) { + return -EINVAL; + } + + if (queue_group) { + return transmitv(nats->conn, 7, (struct io_vec[]) { + TRANSMITV_LITERAL("SUB "), + { + .base = subject, + .len = subject_len ? + subject_len : strlen(subject) + }, + TRANSMITV_LITERAL(" "), + { + .base = queue_group, + .len = queue_group_len ? + queue_group_len : strlen(queue_group) + }, + TRANSMITV_LITERAL(" "), + { + .base = sid, + .len = sid_len ? sid_len : strlen(sid) + }, + TRANSMITV_LITERAL("\r\n") + }); + } + + return transmitv(nats->conn, 5, (struct io_vec[]) { + TRANSMITV_LITERAL("SUB "), + { + .base = subject, + .len = subject_len ? subject_len : strlen(subject) + }, + TRANSMITV_LITERAL(" "), + { + .base = sid, + .len = sid_len ? sid_len : strlen(sid) + }, + TRANSMITV_LITERAL("\r\n") + }); +} + +int nats_unsubscribe(const struct nats *nats, + const char *sid, size_t sid_len, size_t max_msgs) +{ + if (!is_sid_valid(sid, sid_len)) { + return -EINVAL; + } + + if (max_msgs) { + char max_msgs_str[3 * sizeof(size_t)]; + int ret; + + ret = snprintk(max_msgs_str, sizeof(max_msgs_str), + "%zu", max_msgs); + if (ret < 0 || ret >= (int)sizeof(max_msgs_str)) { + return -ENOMEM; + } + + return transmitv(nats->conn, 5, (struct io_vec[]) { + TRANSMITV_LITERAL("UNSUB "), + { + .base = sid, + .len = sid_len ? sid_len : strlen(sid) + }, + TRANSMITV_LITERAL(" "), + { .base = max_msgs_str, .len = ret }, + TRANSMITV_LITERAL("\r\n"), + }); + } + + return transmitv(nats->conn, 3, (struct io_vec[]) { + TRANSMITV_LITERAL("UNSUB "), + { + .base = sid, + .len = sid_len ? sid_len : strlen(sid) + }, + TRANSMITV_LITERAL("\r\n") + }); +} + +int nats_publish(const struct nats *nats, + const char *subject, size_t subject_len, + const char *reply_to, size_t reply_to_len, + const char *payload, size_t payload_len) +{ + char payload_len_str[3 * sizeof(size_t)]; + int ret; + + if (!is_subject_valid(subject, subject_len)) { + return -EINVAL; + } + + ret = snprintk(payload_len_str, sizeof(payload_len_str), "%zu", + payload_len); + if (ret < 0 || ret >= (int)sizeof(payload_len_str)) { + return -ENOMEM; + } + + if (reply_to) { + return transmitv(nats->conn, 7, (struct io_vec[]) { + TRANSMITV_LITERAL("PUB "), + { + .base = subject, + .len = subject_len ? + subject_len : strlen(subject) + }, + TRANSMITV_LITERAL(" "), + { + .base = reply_to, + .len = reply_to_len ? + reply_to_len : strlen(reply_to) + }, + TRANSMITV_LITERAL(" "), + { .base = payload_len_str, .len = ret }, + TRANSMITV_LITERAL("\r\n"), + }); + } + + return transmitv(nats->conn, 5, (struct io_vec[]) { + TRANSMITV_LITERAL("PUB "), + { + .base = subject, + .len = subject_len ? subject_len : strlen(subject) + }, + TRANSMITV_LITERAL(" "), + { + .base = reply_to, + .len = reply_to_len ? reply_to_len : strlen(reply_to) + }, + TRANSMITV_LITERAL("\r\n"), + }); +} + +static void receive_cb(struct net_context *ctx, struct net_buf *buf, int status, + void *user_data) +{ + struct nats *nats = user_data; + char cmd_buf[256]; + struct net_buf *tmp; + uint16_t pos = 0, cmd_len = 0; + size_t len; + uint8_t *end_of_line; + + if (!buf) { + /* FIXME: How to handle disconnection? */ + return; + } + + if (status) { + /* FIXME: How to handle connectio error? */ + net_buf_unref(buf); + return; + } + + tmp = buf->frags; + pos = net_nbuf_appdata(buf) - tmp->data; + + while (tmp) { + len = tmp->len - pos; + + end_of_line = memchr((uint8_t *)tmp->data + pos, '\r', len); + if (end_of_line) { + len = end_of_line - ((uint8_t *)tmp->data + pos); + } + + if (cmd_len + len > sizeof(cmd_buf)) { + break; + } + + tmp = net_nbuf_read(tmp, pos, &pos, len, cmd_buf + cmd_len); + cmd_len += len; + + if (end_of_line) { + if (tmp) { + tmp = net_nbuf_read(tmp, pos, &pos, 1, NULL); + } + + cmd_buf[cmd_len] = '\0'; + if (handle_server_cmd(nats, cmd_buf, cmd_len) < 0) { + /* FIXME: What to do with unhandled messages? */ + break; + } + cmd_len = 0; + } + } + + net_nbuf_unref(buf); +} + +int nats_connect(struct nats *nats, struct sockaddr *addr, socklen_t addrlen) +{ + int ret; + + ret = net_context_connect(nats->conn, addr, addrlen, + NULL, K_FOREVER, NULL); + if (ret < 0) { + return ret; + } + + return net_context_recv(nats->conn, receive_cb, K_NO_WAIT, nats); +} + +int nats_disconnect(struct nats *nats) +{ + int ret; + + ret = net_context_put(nats->conn); + if (ret < 0) { + return ret; + } + + nats->conn = NULL; + + return 0; +} diff --git a/samples/net/nats/src/nats.h b/samples/net/nats/src/nats.h new file mode 100644 index 00000000000000..fe373449c47bc8 --- /dev/null +++ b/samples/net/nats/src/nats.h @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2017 Intel Corporation + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef __NATS_H +#define __NATS_H + +#include +#include + +struct nats_msg { + const char *subject; + const char *sid; + const char *reply_to; + const char *payload; + size_t payload_len; +}; + +struct nats { + struct net_context *conn; + + int (*on_auth_required)(const struct nats *nats, + char *user, size_t *user_len, + char *pass, size_t *pass_len); + int (*on_message)(const struct nats *nats, + const struct nats_msg *msg); +}; + +int nats_connect(struct nats *nats, struct sockaddr *addr, socklen_t addrlen); +int nats_disconnect(struct nats *nats); + +int nats_subscribe(const struct nats *nats, + const char *subject, size_t subject_len, + const char *queue_group, size_t queue_group_len, + const char *sid, size_t sid_len); +int nats_unsubscribe(const struct nats *nats, + const char *sid, size_t sid_len, + size_t max_msgs); + +int nats_publish(const struct nats *nats, + const char *subject, size_t subject_len, + const char *reply_to, size_t reply_to_len, + const char *payload, size_t payload_len); + +#endif /* __NATS_H */