Skip to content

Commit

Permalink
Add event callback system
Browse files Browse the repository at this point in the history
Signed-off-by: Rafael Silva <[email protected]>
  • Loading branch information
perigoso committed Dec 15, 2021
1 parent ac73367 commit c64c4c5
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 78 deletions.
155 changes: 99 additions & 56 deletions include/mqtt.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,15 @@ extern "C" {
*/


/**
/**
* @brief An enumeration of the MQTT control packet types.
* @ingroup unpackers
*
* @see <a href="http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718021">
* MQTT v3.1.1: MQTT Control Packet Types
* </a>
*/
enum MQTTControlPacketType {
enum MQTTControlPacketType {
MQTT_CONTROL_CONNECT=1u,
MQTT_CONTROL_CONNACK=2u,
MQTT_CONTROL_PUBLISH=3u,
Expand Down Expand Up @@ -1084,6 +1084,34 @@ struct mqtt_queued_message* mqtt_mq_find(struct mqtt_message_queue *mq, enum MQT

/* CLIENT */

/**
* @brief An enumeration of callback events.
* @ingroup details
*/
enum MQTTCallbackEvent {
MQTT_EVENT_RECONNECT,
MQTT_EVENT_CONNECTION_REFUSED,
MQTT_EVENT_CONNECTED,
MQTT_EVENT_DISCONNECTED,
MQTT_EVENT_RECEIVE,
MQTT_EVENT_PUBLISH,
MQTT_EVENT_SUBSCRIBE,
MQTT_EVENT_UNSUBSCRIBE,
MQTT_EVENT_PING,
MQTT_EVENT_PUBLISH_TIMEOUT,
MQTT_EVENT_ERROR
};

/**
* @brief union to serve as proxy to multiple datatypes on one pointer.
* @ingroup details
*/
union MQTTCallbackData {
struct mqtt_response_publish *received_msg;
struct mqtt_queued_message *queued_msg;
enum MQTTErrors *error;
};

/**
* @brief An MQTT client.
* @ingroup details
Expand Down Expand Up @@ -1154,25 +1182,64 @@ struct mqtt_client {
double typical_response_time;

/**
* @brief The callback that is called whenever a publish is received from the broker.
* @brief The callback that is called whenever an event happens
* events happen when:
* - publish is received from the broker.
*
* Any topics that you have subscribed to will be returned from the broker as
* mqtt_response_publish messages. All the publishes received from the broker will
* be passed to this function on a MQTT_EVENT_RECEIVE.
*
* - reconnect is called whenever the client enters an error state
* that requires reinitialization.
*
* The job of the MQTT_EVENT_RECONNECT is to: (1) perform error handling/logging,
* (2) clean up the old connection (i.e. close client->socketfd), (3) \ref mqtt_reinit the
* client, and (4) reconfigure the MQTT session by calling \ref mqtt_connect followed by other
* API calls such as \ref mqtt_subscribe.
*
* - (dis)connect (refused) is called whenever a connection is complete, is refused, or disconnected.
*
* MQTT_EVENT_CONNECTION_REFUSED is called on a connection refused error
* MQTT_EVENT_CONNECTED is called whenever a connection is acknowledged and accepted
* MQTT_EVENT_DISCONNECTED is called whenver a disconnect is sent by the client
*
* Any topics that you have subscribed to will be returned from the broker as
* mqtt_response_publish messages. All the publishes received from the broker will
* be passed to this function.
* - publish is called whenver a message WE published is successful, ie acknowledged
*
* @note A pointer to publish_response_callback_state is always passed to the callback.
* Use publish_response_callback_state to keep track of any state information you
* MQTT_EVENT_PUBLISH is called when
* on QoS == 0: when the message is sent
* on QoS == 1: when the message is acknowledged by the broker
* on QoS == 2: when the message is acknowledged by the broker
*
* MQTT_EVENT_PUBLISH_TIMEOUT is called whenever a message that requires acknowledgement is not so
* for a response_timeout period, the message is requeued automatically
*
* - (un)subscribe is called when a (un)subscription is ackowledged
*
* MQTT_EVENT_SUBSCRIBE on sub
* MQTT_EVENT_UNSUBSCRIBE on unsub
*
* - ping is called when we get a ping response
*
* MQTT_EVENT_PING
*
* - error is called when an error state not handled by any of the other events happens
*
* MQTT_EVENT_ERROR
*
* @note A pointer to user_callback_state is always passed to the callback.
* Use user_callback_state to keep track of any state information you
* need.
*/
void (*publish_response_callback)(void** state, struct mqtt_response_publish *publish);
void (*user_callback)(struct mqtt_client* client, enum MQTTCallbackEvent event, union MQTTCallbackData* data, void** user_state);

/**
* @brief A pointer to any publish_response_callback state information you need.
*
* @note A pointer to this pointer will always be publish_response_callback upon
* receiving a publish message from the broker.
*/
void* publish_response_callback_state;
void* user_callback_state;

/**
* @brief A user-specified callback, triggered on each \ref mqtt_sync, allowing
Expand All @@ -1191,21 +1258,6 @@ struct mqtt_client {
*/
enum MQTTErrors (*inspector_callback)(struct mqtt_client*);

/**
* @brief A callback that is called whenever the client is in an error state.
*
* This callback is responsible for: application level error handling, closing
* previous sockets, and reestabilishing the connection to the broker and
* session configurations (i.e. subscriptions).
*/
void (*reconnect_callback)(struct mqtt_client*, void**);

/**
* @brief A pointer to some state. A pointer to this member is passed to
* \ref mqtt_client.reconnect_callback.
*/
void* reconnect_state;

/**
* @brief The buffer where ingress data is temporarily stored.
*/
Expand Down Expand Up @@ -1276,8 +1328,8 @@ ssize_t __mqtt_recv(struct mqtt_client *client);
* being sent to the broker. This function does the actual sending of
* those messages. Additionally this function receives traffic (responses and
* acknowledgements) from the broker and responds to that traffic accordingly.
* Lastly this function also calls the \c publish_response_callback when
* any \c MQTT_CONTROL_PUBLISH messages are received.
* Lastly this function also calls the \c user_callback when
* any \c MQTTCallbackEvent events happen.
*
* @pre mqtt_init must have been called.
*
Expand Down Expand Up @@ -1310,12 +1362,11 @@ enum MQTTErrors mqtt_sync(struct mqtt_client *client);
* @param[in] sendbufsz The size of \p sendbuf in bytes.
* @param[in] recvbuf A buffer that will be used for receiving messages from the broker.
* @param[in] recvbufsz The size of \p recvbuf in bytes.
* @param[in] publish_response_callback The callback to call whenever application messages
* are received from the broker.
* @param[in] callback The callback to call whenever events happen.
*
* @post mqtt_connect must be called.
*
* @note \p sockfd is a non-blocking TCP connection.
* @note \p sockfd is a non-blocking socket connection.
* @note If \p sendbuf fills up completely during runtime a \c MQTT_ERROR_SEND_BUFFER_IS_FULL
* error will be set. Similarly if \p recvbuf is ever to small to receive a message from
* the broker an MQTT_ERROR_RECV_BUFFER_TOO_SMALL error will be set.
Expand Down Expand Up @@ -1348,56 +1399,48 @@ enum MQTTErrors mqtt_init(struct mqtt_client *client,
mqtt_pal_socket_handle sockfd,
uint8_t *sendbuf, size_t sendbufsz,
uint8_t *recvbuf, size_t recvbufsz,
void (*publish_response_callback)(void** state, struct mqtt_response_publish *publish));
void (*callback)(struct mqtt_client* client, enum MQTTCallbackEvent event, union MQTTCallbackData* data, void** user_state));

/**
* @brief Initializes an MQTT client and enables automatic reconnections.
* @brief Briefly initializes an MQTT client, expecting full init in a reconnect event.
* @ingroup api
*
* An alternative to \ref mqtt_init that allows the client to automatically reconnect to the
* broker after an error occurs (e.g. socket error or internal buffer overflows).
*
* This is accomplished by calling the \p reconnect_callback whenever the client enters an error
* state. The job of the \p reconnect_callback is to: (1) perform error handling/logging,
* (2) clean up the old connection (i.e. close client->socketfd), (3) \ref mqtt_reinit the
* client, and (4) reconfigure the MQTT session by calling \ref mqtt_connect followed by other
* API calls such as \ref mqtt_subscribe.
* An alternative to \ref mqtt_init that expects the client to automatically reconnect to the
* broker in the reconnect event after an error occurs (e.g. socket error or internal buffer overflows)..
*
* The first argument to the \p reconnect_callback is the client (which will be in an error
* state) and the second argument is a pointer to a void pointer where you can store some state
* The first argument to the \p user_callback is the client (which will be in an error
* state) and the second argument is an MQTTCallbackEvent which identifies the event type
* here we care about the MQTT_EVENT_RECONNECT event
* lastly a pointer to a void pointer where you can store some state
* information. Internally, MQTT-C calls the reconnect callback like so:
*
* \code
* client->reconnect_callback(client, &client->reconnect_state)
* client->user_callback(client, MQTT_EVENT_RECONNECT, NULL, &client->user_state)
* \endcode
*
* Note that the \p reconnect_callback is also called to setup the initial session. After
* Note that the \p user_callback is also called to setup the initial session. After
* calling \ref mqtt_init_reconnect the client will be in the error state
* \c MQTT_ERROR_INITIAL_RECONNECT.
*
* @pre None.
*
* @param[in,out] client The MQTT client that will be initialized.
* @param[in] reconnect_callback The callback that will be called to connect/reconnect the
* client to the broker and perform application level error handling.
* @param[in] reconnect_state A pointer to some state data for your \p reconnect_callback.
* If your \p reconnect_callback does not require any state information set this
* @param[in] callback_state A pointer to some state data for your \p user_callback.
* If your \p user_callback does not require any state information set this
* to NULL. A pointer to the memory address where the client stores a copy of this
* pointer is passed as the second argumnet to \p reconnect_callback.
* @param[in] publish_response_callback The callback to call whenever application messages
* are received from the broker.
* pointer is passed as an argumnet to \p user_callback.
* @param[in] callback The callback that will be called to connect/reconnect the
* client and every other event.
*
* @post Call \p reconnect_callback yourself, or call \ref mqtt_sync
* (which will trigger the call to \p reconnect_callback).
* @post Call \ref mqtt_sync (which will trigger the call to \p user_callback).
*
* @attention Only initialize an MQTT client once (i.e. don't call \ref mqtt_init or
* \ref mqtt_init_reconnect more than once per client).
*
*/
void mqtt_init_reconnect(struct mqtt_client *client,
void (*reconnect_callback)(struct mqtt_client *client, void** state),
void *reconnect_state,
void (*publish_response_callback)(void** state, struct mqtt_response_publish *publish));
void *callback_state,
void (*callback)(struct mqtt_client* client, enum MQTTCallbackEvent event, union MQTTCallbackData* data, void** user_state));

/**
* @brief Safely assign/reassign a socket and buffers to an new/existing client.
Expand Down
Loading

0 comments on commit c64c4c5

Please sign in to comment.