diff --git a/.gitignore b/.gitignore index 498a2697e..378fe7ec6 100644 --- a/.gitignore +++ b/.gitignore @@ -67,4 +67,5 @@ examples/azure/azureiothub examples/aws/awsiot examples/wiot/wiot wolfmqtt/options.h -/IDE/Microchip-Harmony/wolfmqtt_client/firmware/mqtt_client.X/dist/default/ \ No newline at end of file +/IDE/Microchip-Harmony/wolfmqtt_client/firmware/mqtt_client.X/dist/default/ +examples/sn-client/sn-client \ No newline at end of file diff --git a/README.md b/README.md index ce8e89c24..0f6c5e36d 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,8 @@ This is an implementation of the MQTT Client written in C for embedded use, whic 2. `./configure` (to see a list of build options use `./configure --help`) 3. `make` +If `wolfssl` was recently installed run `sudo ldconfig` to update the linker cache. + ### Windows Visual Studio For building wolfMQTT with TLS support in Visual Studio: @@ -101,23 +103,25 @@ Here are the steps for creating your own implementation. ## Examples ### Client Example -The example MQTT client is located in /examples/mqttclient/. This example exercises many of the exposed API’s and prints any incoming publish messages for subscription topic “wolfMQTT/example/testTopic”. This client contains examples of many MQTTv5 features, including the property callback and server assignment of client ID. The mqqtclient example is a good starting template for your MQTT application. +The example MQTT client is located in `/examples/mqttclient/`. This example exercises many of the exposed API’s and prints any incoming publish messages for subscription topic “wolfMQTT/example/testTopic”. This client contains examples of many MQTTv5 features, including the property callback and server assignment of client ID. The mqqtclient example is a good starting template for your MQTT application. ### Non-Blocking Client Example -The example MQTT client is located in /examples/nbclient/. This example uses non-blocking I/O for message exchange. The wolfMQTT library must be configured with the `--enable-nonblock` option (or built with `WOLFMQTT_NONBLOCK`). +The example MQTT client is located in `/examples/nbclient/`. This example uses non-blocking I/O for message exchange. The wolfMQTT library must be configured with the `--enable-nonblock` option (or built with `WOLFMQTT_NONBLOCK`). ### Firmware Example -The MQTT firmware update is located in /examples/firmware/. This example has two parts. The first is called “fwpush”, which signs and publishes a firmware image. The second is called “fwclient”, which receives the firmware image and verifies the signature. This example publishes message on the topic “wolfMQTT/example/firmware”. The "fwpush" application is an example of using a publish callback to send the payload data. +The MQTT firmware update is located in `/examples/firmware/`. This example has two parts. The first is called “fwpush”, which signs and publishes a firmware image. The second is called “fwclient”, which receives the firmware image and verifies the signature. This example publishes message on the topic “wolfMQTT/example/firmware”. The "fwpush" application is an example of using a publish callback to send the payload data. ### Azure IoT Hub Example We setup a wolfMQTT IoT Hub on the Azure server for testing. We added a device called `demoDevice`, which you can connect and publish to. The example demonstrates creation of a SasToken, which is used as the password for the MQTT connect packet. It also shows the topic names for publishing events and listening to `devicebound` messages. This example only works with `ENABLE_MQTT_TLS` set and the wolfSSL library present because it requires Base64 Encode/Decode and HMAC-SHA256. Note: The wolfSSL library must be built with `./configure --enable-base64encode` or `#define WOLFSSL_BASE64_ENCODE`. The `wc_GetTime` API was added in 3.9.1 and if not present you'll need to implement your own version of this to get current UTC seconds or update your wolfSSL library. ### AWS IoT Example -We setup an AWS IoT endpoint and testing device certificate for testing. The AWS server uses TLS client certificate for authentication. The example is located in /examples/aws/. The example subscribes to `$aws/things/"AWSIOT_DEVICE_ID"/shadow/update/delta` and publishes to `$aws/things/"AWSIOT_DEVICE_ID"/shadow/update`. +We setup an AWS IoT endpoint and testing device certificate for testing. The AWS server uses TLS client certificate for authentication. The example is located in `/examples/aws/`. The example subscribes to `$aws/things/"AWSIOT_DEVICE_ID"/shadow/update/delta` and publishes to `$aws/things/"AWSIOT_DEVICE_ID"/shadow/update`. ### Watson IoT Example -This example enables the wolfMQTT client to connect to the IBM Watson Internet of Things (WIOT) Platform. The WIOT Platform has a limited test broker called "Quickstart" that allows non-secure connections to exercise the component. The example is located in /examples/wiot/. Works with MQTT v5 support enabled. +This example enables the wolfMQTT client to connect to the IBM Watson Internet of Things (WIOT) Platform. The WIOT Platform has a limited test broker called "Quickstart" that allows non-secure connections to exercise the component. The example is located in `/examples/wiot/`. Works with MQTT v5 support enabled. +### Mqtt-SN Example +The Sensor Network client implements the MQTT-SN protocol for low-bandwidth networks. There are several differences from MQTT, including the ability to use a two byte Topic ID instead the full topic during subscribe and publish. The SN client requires an MQTT-SN gateway. The gateway acts as an intermediary between the SN clients and the broker. This client was tested with the Eclipse Paho MQTT-SN Gateway, which connects by default to the public Eclipse broker, much like our wolfMQTT Client example. The address of the gateway must be configured as the host. The example is located in `/examples/sn-client/`. ## v5.0 Specification Support The wolfMQTT client supports connecting to v5 enabled brokers when configured with the `--enable-mqtt5` option. Handling properties received from the server is accomplished via a callback when the `--enable-propcb` option is set. The following v5.0 specification features are supported by the wolfMQTT client: @@ -134,7 +138,7 @@ The wolfMQTT client supports connecting to v5 enabled brokers when configured wi The v5 enabled wolfMQTT client was tested with the following MQTT v5 brokers: * Flespi -** Requires an account tied token that is regnerated hourly. +** Requires an account tied token that is regenerated hourly. ** `./examples/mqttclient/mqttclient -h "mqtt.flespi.io" -u ""` * VerneMQ MQTTv5 preview ** Runs locally. @@ -145,6 +149,19 @@ The v5 enabled wolfMQTT client was tested with the following MQTT v5 brokers: * Watson IoT Quickserver ** `./examples/wiot/wiot` +## Sensor Network Specification Support +The wolfMQTT SN Client implementation is based on the OASIS MQTT-SN v1.2 specification. The SN API is configured with the `--enable-sn` option. There is a separate API for the sensor network API, which all begin with the "SN_" prefix. The wolfMQTT SN Client operates over UDP, which is distinct from the wolfMQTT clients that use TCP. The following features are supported by the wolfMQTT SN Client: +* Register +* Will topic and message set up +* Will topic and message update +* All QoS levels +* Variable-sized packet length field + +Unsupported features: +* Automatic gateway discovery is not implemented +* Multiple gateway handling + +The SN client was tested using the Eclipse Paho MQTT-SN Gateway (https://github.com/eclipse/paho.mqtt-sn.embedded-c) running locally and on a separate network node. Instructions for building and running the gateway are in the project README. ## Release Notes diff --git a/configure.ac b/configure.ac index 7a6ea0cc4..f2519baf5 100644 --- a/configure.ac +++ b/configure.ac @@ -181,6 +181,20 @@ fi AM_CONDITIONAL([BUILD_STDINCAP], [test "x$ENABLED_STDINCAP" = "xyes"]) +# MQTT-SN Sensor Network +AC_ARG_ENABLE([sn], + [AS_HELP_STRING([--enable-sn],[Enable MQTT-SN support (default: disabled)])], + [ ENABLED_SN=$enableval ], + [ ENABLED_SN=no ] + ) + +if test "x$ENABLED_SN" = "xyes" +then + AM_CPPFLAGS="$AM_CPPFLAGS -DWOLFMQTT_SN" +fi + +AM_CONDITIONAL([BUILD_SN], [test "x$ENABLED_SN" = "xyes"]) + # MQTT v5.0 AC_ARG_ENABLE([mqtt5], [AS_HELP_STRING([--enable-mqtt5],[Enable MQTT v5.0 support (default: disabled)])], @@ -337,6 +351,7 @@ echo " * LIB Flags: $LIB" echo " * Disconnect Callback: $ENABLED_DISCB" echo " * Error Strings: $ENABLED_ERROR_STRINGS" +echo " * Enable MQTT-SN: $ENABLED_SN" echo " * Enable MQTT v5.0: $ENABLED_MQTTV50" echo " * Property Callback: $ENABLED_PROPCB" echo " * Examples: $ENABLED_EXAMPLES" diff --git a/examples/include.am b/examples/include.am index a332a0311..bac18d142 100644 --- a/examples/include.am +++ b/examples/include.am @@ -9,6 +9,9 @@ noinst_PROGRAMS += examples/mqttclient/mqttclient \ examples/azure/azureiothub \ examples/aws/awsiot \ examples/wiot/wiot +if BUILD_SN +noinst_PROGRAMS += examples/sn-client/sn-client +endif noinst_HEADERS += examples/mqttclient/mqttclient.h \ examples/nbclient/nbclient.h \ @@ -20,7 +23,9 @@ noinst_HEADERS += examples/mqttclient/mqttclient.h \ examples/wiot/wiot.h \ examples/mqttnet.h \ examples/mqttexample.h - +if BUILD_SN +noinst_HEADERS += examples/sn-client/sn-client.h +endif examples_mqttclient_mqttclient_SOURCES = examples/mqttclient/mqttclient.c \ examples/mqttnet.c \ @@ -77,6 +82,14 @@ examples_wiot_wiot_LDADD = src/libwolfmqtt.la examples_wiot_wiot_DEPENDENCIES = src/libwolfmqtt.la examples_wiot_wiot_CPPFLAGS = -I$(top_srcdir)/examples $(AM_CPPFLAGS) +if BUILD_SN +examples_sn_client_sn_client_SOURCES = examples/sn-client/sn-client.c \ + examples/mqttnet.c \ + examples/mqttexample.c +examples_sn_client_sn_client_LDADD = src/libwolfmqtt.la +examples_sn_client_sn_client_DEPENDENCIES = src/libwolfmqtt.la +examples_sn_client_sn_client_CPPFLAGS = -I$(top_srcdir)/examples $(AM_CPPFLAGS) +endif endif dist_example_DATA+= examples/mqttnet.c \ @@ -88,6 +101,9 @@ dist_example_DATA+= examples/mqttnet.c \ examples/azure/azureiothub.c \ examples/aws/awsiot.c \ examples/wiot/wiot.c +if BUILD_SN +dist_example_DATA+= examples/sn-client/sn-client.c +endif DISTCLEANFILES+= examples/mqttclient/.libs/mqttclient \ examples/nbclient/.libs/nbclient \ @@ -96,6 +112,9 @@ DISTCLEANFILES+= examples/mqttclient/.libs/mqttclient \ examples/azure/.libs/azureiothub \ examples/aws/.libs/awsiot \ examples/wiot/.libs/wiot +if BUILD_SN +DISTCLEANFILES+= examples/sn-client/.libs/sn-client +endif EXTRA_DIST+= examples/mqttuart.c \ examples/mqttclient/mqttclient.vcxproj \ diff --git a/examples/mqttnet.c b/examples/mqttnet.c index 4a1b8ae03..d0a68a6f4 100755 --- a/examples/mqttnet.c +++ b/examples/mqttnet.c @@ -213,7 +213,8 @@ static int NetConnect(void *context, const char* host, word16 port, return rc; } -static int NetRead(void *context, byte* buf, int buf_len, int timeout_ms) +static int NetRead(void *context, byte* buf, int buf_len, + int timeout_ms) { SocketContext *sock = (SocketContext*)context; int rc = -1, timeout = 0; @@ -523,7 +524,6 @@ static void tcp_set_nonblocking(SOCKET_T* sockfd) #endif /* WOLFMQTT_NONBLOCK */ #endif /* !WOLFMQTT_NO_TIMEOUT */ - static int NetConnect(void *context, const char* host, word16 port, int timeout_ms) { @@ -643,6 +643,88 @@ static int NetConnect(void *context, const char* host, word16 port, return rc; } +#ifdef WOLFMQTT_SN +static int SN_NetConnect(void *context, const char* host, word16 port, + int timeout_ms) +{ + SocketContext *sock = (SocketContext*)context; + int type = SOCK_DGRAM; + int rc; + SOERROR_T so_error = 0; + struct addrinfo *result = NULL; + struct addrinfo hints; + + /* Get address information for host and locate IPv4 */ + XMEMSET(&hints, 0, sizeof(struct addrinfo)); + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_DGRAM; /* Datagram socket */ + + XMEMSET(&sock->addr, 0, sizeof(sock->addr)); + sock->addr.sin_family = AF_INET; + + rc = getaddrinfo(host, NULL, &hints, &result); + if (rc >= 0 && result != NULL) { + struct addrinfo* res = result; + + /* prefer ip4 addresses */ + while (res) { + if (res->ai_family == AF_INET) { + result = res; + break; + } + res = res->ai_next; + } + + if (result->ai_family == AF_INET) { + sock->addr.sin_port = htons(port); + sock->addr.sin_family = AF_INET; + sock->addr.sin_addr = + ((SOCK_ADDR_IN*)(result->ai_addr))->sin_addr; + } + else { + rc = -1; + } + + freeaddrinfo(result); + } + + if (rc == 0) { + + /* Create the socket */ + sock->fd = socket(sock->addr.sin_family, type, 0); + if (sock->fd == SOCKET_INVALID) { + rc = -1; + } + } + + if (rc == 0) + { + #ifndef WOLFMQTT_NO_TIMEOUT + fd_set fdset; + struct timeval tv; + + /* Setup timeout and FD's */ + setup_timeout(&tv, timeout_ms); + FD_ZERO(&fdset); + FD_SET(sock->fd, &fdset); + #else + (void)timeout_ms; + #endif /* !WOLFMQTT_NO_TIMEOUT */ + + /* Start connect */ + rc = SOCK_CONNECT(sock->fd, (struct sockaddr*)&sock->addr, sizeof(sock->addr)); + } + + /* Show error */ + if (rc != 0) { + close(sock->fd); + PRINTF("NetConnect: Rc=%d, SoErr=%d", rc, so_error); + } + + return rc; +} +#endif + static int NetWrite(void *context, const byte* buf, int buf_len, int timeout_ms) { @@ -687,13 +769,14 @@ static int NetWrite(void *context, const byte* buf, int buf_len, return rc; } -static int NetRead(void *context, byte* buf, int buf_len, - int timeout_ms) +static int NetRead_ex(void *context, byte* buf, int buf_len, + int timeout_ms, byte peek) { SocketContext *sock = (SocketContext*)context; int rc = -1, timeout = 0; SOERROR_T so_error = 0; int bytes = 0; + int flags = 0; #if !defined(WOLFMQTT_NO_TIMEOUT) && !defined(WOLFMQTT_NONBLOCK) fd_set recvfds; fd_set errfds; @@ -704,6 +787,10 @@ static int NetRead(void *context, byte* buf, int buf_len, return MQTT_CODE_ERROR_BAD_ARG; } + if (peek == 1) { + flags |= MSG_PEEK; + } + #if !defined(WOLFMQTT_NO_TIMEOUT) && !defined(WOLFMQTT_NONBLOCK) /* Setup timeout and FD's */ setup_timeout(&tv, timeout_ms); @@ -737,7 +824,7 @@ static int NetRead(void *context, byte* buf, int buf_len, rc = (int)SOCK_RECV(sock->fd, &buf[bytes], buf_len - bytes, - 0); + flags); if (rc <= 0) { rc = -1; goto exit; /* Error */ @@ -798,6 +885,18 @@ static int NetRead(void *context, byte* buf, int buf_len, return rc; } +static int NetRead(void *context, byte* buf, int buf_len, int timeout_ms) +{ + return NetRead_ex(context, buf, buf_len, timeout_ms, 0); +} + +#ifdef WOLFMQTT_SN +static int NetPeek(void *context, byte* buf, int buf_len, int timeout_ms) +{ + return NetRead_ex(context, buf, buf_len, timeout_ms, 1); +} +#endif + static int NetDisconnect(void *context) { SocketContext *sock = (SocketContext*)context; @@ -815,7 +914,6 @@ static int NetDisconnect(void *context) #endif - /* Public Functions */ int MqttClientNet_Init(MqttNet* net) { @@ -872,6 +970,37 @@ int MqttClientNet_Init(MqttNet* net) return MQTT_CODE_SUCCESS; } +#ifdef WOLFMQTT_SN +int SN_ClientNet_Init(MqttNet* net) +{ + if (net) { + XMEMSET(net, 0, sizeof(MqttNet)); + net->connect = SN_NetConnect; + net->read = NetRead; + net->write = NetWrite; + net->peek = NetPeek; + net->disconnect = NetDisconnect; + net->context = (SocketContext *)WOLFMQTT_MALLOC(sizeof(SocketContext)); + if (net->context == NULL) { + return MQTT_CODE_ERROR_MEMORY; + } + XMEMSET(net->context, 0, sizeof(SocketContext)); + ((SocketContext*)(net->context))->stat = SOCK_BEGIN; + + #if 0 //TODO: multicast support + net->multi_ctx = (SocketContext *)WOLFMQTT_MALLOC(sizeof(SocketContext)); + if (net->multi_ctx == NULL) { + return MQTT_CODE_ERROR_MEMORY; + } + XMEMSET(net->multi_ctx, 0, sizeof(SocketContext)); + ((SocketContext*)(net->multi_ctx))->stat = SOCK_BEGIN; + #endif + } + + return MQTT_CODE_SUCCESS; +} +#endif + int MqttClientNet_DeInit(MqttNet* net) { if (net) { diff --git a/examples/mqttnet.h b/examples/mqttnet.h index 4aa7237ff..03a4e2519 100644 --- a/examples/mqttnet.h +++ b/examples/mqttnet.h @@ -33,7 +33,9 @@ /* Functions used to handle the MqttNet structure creation / destruction */ int MqttClientNet_Init(MqttNet* net); int MqttClientNet_DeInit(MqttNet* net); - +#ifdef WOLFMQTT_SN +int SN_ClientNet_Init(MqttNet* net); +#endif #ifdef __cplusplus } /* extern "C" */ diff --git a/examples/sn-client/sn-client.c b/examples/sn-client/sn-client.c new file mode 100644 index 000000000..0ea36fb97 --- /dev/null +++ b/examples/sn-client/sn-client.c @@ -0,0 +1,438 @@ +/* sn-client.c + * + * Copyright (C) 2006-2018 wolfSSL Inc. + * + * This file is part of wolfMQTT. + * + * wolfMQTT is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * wolfMQTT is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335, USA + */ + +/* Include the autoconf generated config.h */ +#ifdef HAVE_CONFIG_H + #include +#endif + +#include "wolfmqtt/mqtt_client.h" + +#include "sn-client.h" +#include "examples/mqttnet.h" + +/* Locals */ +static int mStopRead = 0; + +/* Configuration */ + +/* Maximum size for network read/write callbacks. */ +#define MAX_BUFFER_SIZE 1024 +#define TEST_MESSAGE "test" + +static int sn_message_cb(MqttClient *client, MqttMessage *msg, + byte msg_new, byte msg_done) +{ + byte buf[PRINT_BUFFER_SIZE+1]; + word32 len; + MQTTCtx* mqttCtx = (MQTTCtx*)client->ctx; + + (void)mqttCtx; + + if (msg_new) { + /* Determine min size to dump */ + len = msg->topic_name_len; + if (len > PRINT_BUFFER_SIZE) { + len = PRINT_BUFFER_SIZE; + } + XMEMCPY(buf, msg->topic_name, len); + buf[len] = '\0'; /* Make sure its null terminated */ + + /* Print incoming message */ + PRINTF("MQTT-SN Message: Topic %s, Qos %d, Len %u", + buf, msg->qos, msg->total_len); + + /* for test mode: check if TEST_MESSAGE was received */ + if (mqttCtx->test_mode) { + if (XSTRLEN(TEST_MESSAGE) == msg->buffer_len && + XSTRNCMP(TEST_MESSAGE, (char*)msg->buffer, + msg->buffer_len) == 0) + { + mStopRead = 1; + } + } + } + + /* Print message payload */ + len = msg->buffer_len; + if (len > PRINT_BUFFER_SIZE) { + len = PRINT_BUFFER_SIZE; + } + XMEMCPY(buf, msg->buffer, len); + buf[len] = '\0'; /* Make sure its null terminated */ + PRINTF("........Payload (%d - %d): %s", + msg->buffer_pos, msg->buffer_pos + len, buf); + + if (msg_done) { + PRINTF("....MQTT-SN Message: Done"); + } + + /* Return negative to terminate publish processing */ + return MQTT_CODE_SUCCESS; +} + +int sn_test(MQTTCtx *mqttCtx) +{ + int rc = MQTT_CODE_SUCCESS; + word16 topicID; + + PRINTF("MQTT-SN Client: QoS %d", mqttCtx->qos); + + /* Initialize Network */ + rc = SN_ClientNet_Init(&mqttCtx->net); + PRINTF("MQTT-SN Net Init: %s (%d)", + MqttClient_ReturnCodeToString(rc), rc); + if (rc != MQTT_CODE_SUCCESS) { + goto exit; + } + + /* setup tx/rx buffers */ + mqttCtx->tx_buf = (byte*)WOLFMQTT_MALLOC(MAX_BUFFER_SIZE); + mqttCtx->rx_buf = (byte*)WOLFMQTT_MALLOC(MAX_BUFFER_SIZE); + + /* Initialize MqttClient structure */ + rc = MqttClient_Init(&mqttCtx->client, &mqttCtx->net, + sn_message_cb, + mqttCtx->tx_buf, MAX_BUFFER_SIZE, + mqttCtx->rx_buf, MAX_BUFFER_SIZE, + mqttCtx->cmd_timeout_ms); + + PRINTF("MQTT-SN Init: %s (%d)", + MqttClient_ReturnCodeToString(rc), rc); + if (rc != MQTT_CODE_SUCCESS) { + goto exit; + } + + /* The client.ctx will be stored in the cert callback ctx during + MqttSocket_Connect for use by mqtt_tls_verify_cb */ + mqttCtx->client.ctx = mqttCtx; + + /* Setup socket direct to gateway */ + rc = MqttClient_NetConnect(&mqttCtx->client, mqttCtx->host, + mqttCtx->port,DEFAULT_CON_TIMEOUT_MS, + mqttCtx->use_tls, mqtt_tls_cb); + + PRINTF("MQTT-SN Socket Connect: %s (%d)", + MqttClient_ReturnCodeToString(rc), rc); + if (rc != MQTT_CODE_SUCCESS) { + goto exit; + } + + { + SN_Connect connect_s, *connect = &connect_s; + /* Build connect packet */ + XMEMSET(connect, 0, sizeof(SN_Connect)); + connect_s.keep_alive_sec = mqttCtx->keep_alive_sec; + connect_s.clean_session = mqttCtx->clean_session; + connect_s.client_id = mqttCtx->client_id; + connect->protocol_level = SN_PROTOCOL_ID; + + /* Last will and testament sent by broker to subscribers + of topic when broker connection is lost */ + connect_s.enable_lwt = mqttCtx->enable_lwt; + if (connect_s.enable_lwt) { + /* Send client id in LWT payload */ + connect_s.will.qos = mqttCtx->qos; + connect_s.will.retain = 0; + connect_s.will.willTopic = WOLFMQTT_TOPIC_NAME"lwttopic"; + connect_s.will.willMsg = (byte*)mqttCtx->client_id; + connect_s.will.willMsgLen = + (word16)XSTRLEN(mqttCtx->client_id); + } + + PRINTF("MQTT-SN Broker Connect: broker = %s : %d", + mqttCtx->host, mqttCtx->port); + /* Send Connect and wait for Connect Ack */ + rc = SN_Client_Connect(&mqttCtx->client, connect); + + /* Validate Connect Ack info */ + PRINTF("....MQTT-SN Connect Ack: Return Code %u", + mqttCtx->connect.ack.return_code); + if (rc != MQTT_CODE_SUCCESS) { + goto disconn; + } + } + + /* Either the register or the subscribe block could be used to get the + topic ID. Both are done here as an example of using the API. */ + { + /* Register topic name to get the assigned topic ID */ + SN_Register regist_s, *regist = ®ist_s; + + XMEMSET(regist, 0, sizeof(SN_Register)); + regist->packet_id = mqtt_get_packetid(); + regist->topicName = DEFAULT_TOPIC_NAME; + + PRINTF("MQTT-SN Register: topic = %s", regist->topicName); + rc = SN_Client_Register(&mqttCtx->client, regist); + + if ((rc == 0) && (regist->regack.return_code == SN_RC_ACCEPTED)) { + /* Topic ID is returned in RegAck */ + topicID = regist->regack.topicId; + } + PRINTF("....MQTT-SN Register Ack: rc = %d, topic id = %d", + regist->regack.return_code, regist->regack.topicId); + } + + { + /* Subscribe Topic */ + SN_Subscribe subscribe; + + XMEMSET(&subscribe, 0, sizeof(SN_Subscribe)); + + subscribe.duplicate = 0; + subscribe.qos = MQTT_QOS_0; + subscribe.topic_type = SN_TOPIC_ID_TYPE_NORMAL; + subscribe.topicNameId = DEFAULT_TOPIC_NAME; + subscribe.packet_id = mqtt_get_packetid(); + + PRINTF("MQTT-SN Subscribe: topic name = %s", subscribe.topicNameId); + rc = SN_Client_Subscribe(&mqttCtx->client, &subscribe); + + PRINTF("....MQTT-SN Subscribe Ack: topic id = %d, rc = %d", + subscribe.subAck.topicId, subscribe.subAck.return_code); + + if ((rc == 0) && (subscribe.subAck.return_code == SN_RC_ACCEPTED)) { + /* Topic ID is returned in SubAck */ + topicID = subscribe.subAck.topicId; + } + } + + { + /* Publish Topic */ + XMEMSET(&mqttCtx->publish, 0, sizeof(SN_Publish)); + mqttCtx->publish.retain = 0; + mqttCtx->publish.qos = MQTT_QOS_1;//mqttCtx->qos; + mqttCtx->publish.duplicate = 0; + mqttCtx->publish.topic_type = SN_TOPIC_ID_TYPE_NORMAL; + mqttCtx->publish.topic_name = (char*)&topicID; + if (mqttCtx->publish.qos > MQTT_QOS_0) { + mqttCtx->publish.packet_id = mqtt_get_packetid(); + } + else { + mqttCtx->publish.packet_id = 0x00; + } + + mqttCtx->publish.buffer = (byte*)TEST_MESSAGE; + mqttCtx->publish.total_len = (word16)XSTRLEN(TEST_MESSAGE); + + rc = SN_Client_Publish(&mqttCtx->client, &mqttCtx->publish); + + PRINTF("MQTT-SN Publish: topic id = %d, msg = \"%s\", rc = %d", + (word16)*mqttCtx->publish.topic_name, mqttCtx->publish.buffer, + mqttCtx->publish.return_code); + if (rc != MQTT_CODE_SUCCESS) { + goto disconn; + } + } + + /* Read Loop */ + PRINTF("MQTT Waiting for message..."); + + do { + /* Try and read packet */ + rc = SN_Client_WaitMessage(&mqttCtx->client, + mqttCtx->cmd_timeout_ms); + + /* check for test mode */ + if (mStopRead) { + rc = MQTT_CODE_SUCCESS; + PRINTF("MQTT Exiting..."); + break; + } + + /* check return code */ + #ifdef WOLFMQTT_ENABLE_STDIN_CAP + else if (rc == MQTT_CODE_STDIN_WAKE) { + XMEMSET(mqttCtx->rx_buf, 0, MAX_BUFFER_SIZE); + if (XFGETS((char*)mqttCtx->rx_buf, MAX_BUFFER_SIZE - 1, + stdin) != NULL) + { + rc = (int)XSTRLEN((char*)mqttCtx->rx_buf); + + /* Publish Topic */ + mqttCtx->stat = WMQ_PUB; + XMEMSET(&mqttCtx->publish, 0, sizeof(MqttPublish)); + mqttCtx->publish.retain = 0; + mqttCtx->publish.qos = mqttCtx->qos; + mqttCtx->publish.duplicate = 0; + mqttCtx->publish.topic_type = SN_TOPIC_ID_TYPE_NORMAL; + mqttCtx->publish.topic_name = (char*)&topicID; + mqttCtx->publish.packet_id = mqtt_get_packetid(); + mqttCtx->publish.buffer = mqttCtx->rx_buf; + mqttCtx->publish.total_len = (word16)rc; + rc = SN_Client_Publish(&mqttCtx->client, + &mqttCtx->publish); + PRINTF("MQTT-SN Publish: topic id = %d, msg = \"%s\", rc = %d", + (word16)*mqttCtx->publish.topic_name, mqttCtx->publish.buffer, + mqttCtx->publish.return_code); + } + } + #endif + else if (rc == MQTT_CODE_ERROR_TIMEOUT) { + /* Keep Alive */ + PRINTF("Keep-alive timeout, sending ping"); + + rc = SN_Client_Ping(&mqttCtx->client, NULL); + if (rc != MQTT_CODE_SUCCESS) { + PRINTF("MQTT Ping Keep Alive Error: %s (%d)", + MqttClient_ReturnCodeToString(rc), rc); + break; + } + } + else if (rc != MQTT_CODE_SUCCESS) { + /* There was an error */ + PRINTF("MQTT-SN Message Wait Error: %s (%d)", + MqttClient_ReturnCodeToString(rc), rc); + break; + } + } while (1); + + /* Check for error */ + if (rc != MQTT_CODE_SUCCESS) { + goto disconn; + } + + { + /* Unsubscribe Topics */ + SN_Unsubscribe unsubscribe; + + /* Build list of topics */ + XMEMSET(&unsubscribe, 0, sizeof(SN_Subscribe)); + + unsubscribe.topicNameId = mqttCtx->topic_name; + + /* Subscribe Topic */ + unsubscribe.packet_id = mqtt_get_packetid(); + + /* Unsubscribe Topics */ + rc = SN_Client_Unsubscribe(&mqttCtx->client, &unsubscribe); + + PRINTF("MQTT Unsubscribe: %s (%d)", + MqttClient_ReturnCodeToString(rc), rc); + if (rc != MQTT_CODE_SUCCESS) { + goto disconn; + } + mqttCtx->return_code = rc; + } + +disconn: + /* Disconnect */ + rc = SN_Client_Disconnect(&mqttCtx->client); + + PRINTF("MQTT Disconnect: %s (%d)", + MqttClient_ReturnCodeToString(rc), rc); + if (rc != MQTT_CODE_SUCCESS) { + goto disconn; + } + + rc = MqttClient_NetDisconnect(&mqttCtx->client); + + PRINTF("MQTT Socket Disconnect: %s (%d)", + MqttClient_ReturnCodeToString(rc), rc); + +exit: + + /* Free resources */ + if (mqttCtx->tx_buf) WOLFMQTT_FREE(mqttCtx->tx_buf); + if (mqttCtx->rx_buf) WOLFMQTT_FREE(mqttCtx->rx_buf); + + /* Cleanup network */ + MqttClientNet_DeInit(&mqttCtx->net); + + return rc; +} + + +/* so overall tests can pull in test function */ +#if !defined(NO_MAIN_DRIVER) && !defined(MICROCHIP_MPLAB_HARMONY) + #ifdef USE_WINDOWS_API + #include /* for ctrl handler */ + + static BOOL CtrlHandler(DWORD fdwCtrlType) + { + if (fdwCtrlType == CTRL_C_EVENT) { + mStopRead = 1; + PRINTF("Received Ctrl+c"); + return TRUE; + } + return FALSE; + } + #elif HAVE_SIGNAL + #include + static void sig_handler(int signo) + { + if (signo == SIGINT) { + mStopRead = 1; + PRINTF("Received SIGINT"); + } + } + #endif + +int main(int argc, char** argv) +{ + int rc; +#ifndef WOLFMQTT_NONBLOCK + MQTTCtx mqttCtx; + + /* init defaults */ + mqtt_init_ctx(&mqttCtx); + mqttCtx.app_name = "sn-client"; + + /* Settings for MQTT-SN gateway */ + mqttCtx.host = "localhost"; + mqttCtx.port = 10000; + + /* parse arguments */ + rc = mqtt_parse_args(&mqttCtx, argc, argv); + if (rc != 0) { + return rc; + } +#endif +#ifdef USE_WINDOWS_API + if (SetConsoleCtrlHandler((PHANDLER_ROUTINE)CtrlHandler, + TRUE) == FALSE) + { + PRINTF("Error setting Ctrl Handler! Error %d", (int)GetLastError()); + } +#elif HAVE_SIGNAL + if (signal(SIGINT, sig_handler) == SIG_ERR) { + PRINTF("Can't catch SIGINT"); + } +#endif + +#ifndef WOLFMQTT_NONBLOCK + rc = sn_test(&mqttCtx); +#else + (void)argc; + (void)argv; + + /* This example requires non-blocking mode to be disabled + ./configure --disable-nonblock */ + PRINTF("Example not compiled in!"); + rc = EXIT_FAILURE; +#endif + + + return (rc == 0) ? 0 : EXIT_FAILURE; +} + +#endif /* NO_MAIN_DRIVER */ diff --git a/examples/sn-client/sn-client.h b/examples/sn-client/sn-client.h new file mode 100755 index 000000000..f8812fd78 --- /dev/null +++ b/examples/sn-client/sn-client.h @@ -0,0 +1,32 @@ +/* sn-client.h + * + * Copyright (C) 2006-2018 wolfSSL Inc. + * + * This file is part of wolfMQTT. + * + * wolfMQTT is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * wolfMQTT is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335, USA + */ + +#ifndef WOLFMQTT_SNCLIENT_H +#define WOLFMQTT_SNCLIENT_H + +#include "examples/mqttexample.h" + + +/* Exposed functions */ +int sn_test(MQTTCtx *mqttCtx); + + +#endif /* WOLFMQTT_SNCLIENT_H */ diff --git a/src/include.am b/src/include.am index 385b644e7..c3b7bab37 100644 --- a/src/include.am +++ b/src/include.am @@ -7,8 +7,10 @@ lib_LTLIBRARIES+= src/libwolfmqtt.la src_libwolfmqtt_la_SOURCES = src/mqtt_client.c \ src/mqtt_packet.c \ src/mqtt_socket.c + src_libwolfmqtt_la_CFLAGS = -DBUILDING_WOLFMQTT $(AM_CFLAGS) src_libwolfmqtt_la_CPPFLAGS = -DBUILDING_WOLFMQTT $(AM_CPPFLAGS) src_libwolfmqtt_la_LDFLAGS = ${AM_LDFLAGS} -no-undefined -version-info ${WOLFMQTT_LIBRARY_VERSION} src_libwolfmqtt_la_DEPENDENCIES = + EXTRA_DIST += diff --git a/src/mqtt_client.c b/src/mqtt_client.c index a0801c4c6..936176fa2 100644 --- a/src/mqtt_client.c +++ b/src/mqtt_client.c @@ -1115,3 +1115,725 @@ const char* MqttClient_ReturnCodeToString(int return_code) } #endif /* !WOLFMQTT_NO_ERROR_STRINGS */ +#ifdef WOLFMQTT_SN + +/* Private functions */ +static int SN_Client_HandlePayload(MqttClient* client, MqttMessage* msg, + int timeout, void* p_decode, word16* packet_id) +{ + int rc = MQTT_CODE_SUCCESS; + (void)timeout; + + switch (msg->type) + { + case SN_MSG_TYPE_GWINFO: + { + SN_GwInfo info, *p_info = &info; + rc = SN_Decode_GWInfo(client->rx_buf, client->packet.buf_len, p_info); + if (rc <= 0) { + return rc; + } + break; + } + case SN_MSG_TYPE_CONNACK: + { + /* Decode connect ack */ + SN_ConnectAck connect_ack, *p_connect_ack = &connect_ack; + if (p_decode) { + p_connect_ack = (SN_ConnectAck*)p_decode; + } + p_connect_ack->return_code = client->rx_buf[client->packet.buf_len-1]; + + break; + } + case SN_MSG_TYPE_REGACK: + { + /* Decode register ack */ + SN_RegAck regack_s, *regack = ®ack_s; + if (p_decode) { + regack = (SN_RegAck*)p_decode; + } + + rc = SN_Decode_RegAck(client->rx_buf, client->packet.buf_len, regack); + + if (rc > 0) { + *packet_id = regack->packet_id; + } + + break; + } + case SN_MSG_TYPE_PUBLISH: + { + /* Decode publish message */ + rc = SN_Decode_Publish(client->rx_buf, client->packet.buf_len, + msg); + if (rc <= 0) { + return rc; + } + + /* Issue callback for new message */ + if (client->msg_cb) { + /* if using the temp publish message buffer, + then populate message context with client context */ + if (&client->msg == msg) + msg->ctx = client->ctx; + rc = client->msg_cb(client, msg, 1, 1); + if (rc != MQTT_CODE_SUCCESS) { + return rc; + }; + } + + /* Handle Qos */ + if (msg->qos > MQTT_QOS_0) { + SN_PublishResp publish_resp; + SN_MsgType type; + + *packet_id = msg->packet_id; + + /* Determine packet type to write */ + type = (msg->qos == MQTT_QOS_1) ? + SN_MSG_TYPE_PUBACK : + SN_MSG_TYPE_PUBREC; + publish_resp.packet_id = msg->packet_id; + + /* Encode publish response */ + rc = SN_Encode_PublishResp(client->tx_buf, + client->tx_buf_len, type, &publish_resp); + if (rc <= 0) { + return rc; + } + client->packet.buf_len = rc; + + /* Send packet */ + msg->stat = MQTT_MSG_BEGIN; + rc = MqttPacket_Write(client, client->tx_buf, + client->packet.buf_len); + } + break; + } + case SN_MSG_TYPE_PUBACK: + case SN_MSG_TYPE_PUBCOMP: + case SN_MSG_TYPE_PUBREC: + case SN_MSG_TYPE_PUBREL: + { + SN_PublishResp publish_resp, *p_publish_resp = &publish_resp; + if (p_decode) { + p_publish_resp = (SN_PublishResp*)p_decode; + } + else + { + XMEMSET(p_publish_resp, 0, sizeof(SN_PublishResp)); + } + /* Decode publish response message */ + rc = SN_Decode_PublishResp(client->rx_buf, client->packet.buf_len, + msg->type, p_publish_resp); + if (rc <= 0) { + return rc; + } + *packet_id = p_publish_resp->packet_id; + + /* If Qos then send response */ + if (msg->type == SN_MSG_TYPE_PUBREC || + msg->type == SN_MSG_TYPE_PUBREL) { + + /* Encode publish response */ + publish_resp.packet_id = p_publish_resp->packet_id; + rc = SN_Encode_PublishResp(client->tx_buf, + client->tx_buf_len, msg->type+1, &publish_resp); + if (rc <= 0) { + return rc; + } + client->packet.buf_len = rc; + + /* Send packet */ + msg->stat = MQTT_MSG_BEGIN; + rc = MqttPacket_Write(client, client->tx_buf, + client->packet.buf_len); + } + break; + } + case SN_MSG_TYPE_SUBACK: + { + /* Decode subscribe ack */ + SN_SubAck subscribe_ack, *p_subscribe_ack = &subscribe_ack; + if (p_decode) { + p_subscribe_ack = (SN_SubAck*)p_decode; + } + else { + XMEMSET(p_subscribe_ack, 0, sizeof(SN_SubAck)); + } + + rc = SN_Decode_SubscribeAck(client->rx_buf, client->packet.buf_len, + p_subscribe_ack); + if (rc <= 0) { + return rc; + } + *packet_id = p_subscribe_ack->packet_id; + + break; + } + case SN_MSG_TYPE_UNSUBACK: + { + /* Decode unsubscribe ack */ + SN_UnsubscribeAck unsubscribe_ack; + SN_UnsubscribeAck *p_unsubscribe_ack = &unsubscribe_ack; + + if (p_decode) { + p_unsubscribe_ack = (SN_UnsubscribeAck*)p_decode; + } + rc = SN_Decode_UnsubscribeAck(client->rx_buf, + client->packet.buf_len, p_unsubscribe_ack); + if (rc <= 0) { + return rc; + } + *packet_id = p_unsubscribe_ack->packet_id; + + break; + } + case SN_MSG_TYPE_PING_RESP: + { + /* Decode ping */ + rc = SN_Decode_Ping(client->rx_buf, client->packet.buf_len); + break; + } + default: + { + /* Other types are server side only, ignore */ + #ifdef WOLFMQTT_DEBUG_CLIENT + PRINTF("SN_Client_WaitMessage: Invalid client packet type %u!", + msg->type); + #endif + break; + } + } /* switch (msg->type) */ + + return rc; +} +static int SN_Client_WaitType(MqttClient *client, MqttMessage* msg, + int timeout_ms, byte wait_type, word16 wait_packet_id, void* p_decode) +{ + int rc; + word16 packet_id = 0; + +wait_again: + + switch (msg->stat) + { + case MQTT_MSG_BEGIN: + { + /* reset the packet state */ + client->packet.stat = MQTT_PK_BEGIN; + + FALL_THROUGH; + } + case MQTT_MSG_WAIT: + { + /* Wait for packet */ + rc = SN_Packet_Read(client, client->rx_buf, client->rx_buf_len, + timeout_ms); + if (rc <= 0) { + return rc; + } + + msg->stat = MQTT_MSG_WAIT; + client->packet.buf_len = rc; + + /* Determine packet type */ + if (client->rx_buf[0] == 0x01) { + /* Type is in fourth byte */ + msg->type = client->rx_buf[3]; + } + else { + /* Type is in second byte */ + msg->type = client->rx_buf[1]; + } + + #ifdef WOLFMQTT_DEBUG_CLIENT + PRINTF("Read Packet: Len %d, Type %d", + client->packet.buf_len, msg->type); + #endif + + msg->stat = MQTT_MSG_READ; + + FALL_THROUGH; + } + + case MQTT_MSG_READ: + case MQTT_MSG_READ_PAYLOAD: + { + rc = SN_Client_HandlePayload(client, msg, timeout_ms, p_decode, + &packet_id); + if (rc < 0) { + return rc; + } + rc = MQTT_CODE_SUCCESS; + + /* Check for type and packet id */ + if (wait_type == msg->type) { + if (wait_packet_id == 0 || wait_packet_id == packet_id) { + /* We found the packet type and id */ + break; + } + } + + msg->stat = MQTT_MSG_BEGIN; + goto wait_again; + } + + case MQTT_MSG_WRITE: + default: + { + #ifdef WOLFMQTT_DEBUG_CLIENT + PRINTF("SN_Client_WaitType: Invalid state %d!", + msg->stat); + #endif + rc = MQTT_CODE_ERROR_STAT; + break; + } + } /* switch (msg->stat) */ + + /* reset state */ + msg->stat = MQTT_MSG_BEGIN; + + return rc; +} + +/* Public Functions */ +int SN_Client_SearchGW(MqttClient *client, SN_SearchGw *search) +{ + int rc, len = 0; + + /* Validate required arguments */ + if (client == NULL || search == NULL) { + return MQTT_CODE_ERROR_BAD_ARG; + } + + if (search->stat == MQTT_MSG_BEGIN) { + + /* Encode the search packet */ + rc = SN_Encode_SearchGW(client->tx_buf, client->tx_buf_len, + search->radius); + if (rc <= 0) { + return rc; + } + len = rc; + + /* Send search for gateway packet */ + rc = MqttPacket_Write(client, client->tx_buf, len); + if (rc != len) { + return rc; + } + search->stat = MQTT_MSG_WAIT; + } + + /* Wait for gateway info packet */ + rc = SN_Client_WaitType(client, &client->msg, client->cmd_timeout_ms, + SN_MSG_TYPE_GWINFO, 0, &search->gwInfo); + + return rc; +} + +int SN_Client_Connect(MqttClient *client, SN_Connect *connect) +{ + int rc = 0, len = 0; + + /* Validate required arguments */ + if ((client == NULL) || (connect == NULL)) { + return MQTT_CODE_ERROR_BAD_ARG; + } + + if (connect->stat == MQTT_MSG_BEGIN) { + + /* Encode the connect packet */ + rc = SN_Encode_Connect(client->tx_buf, client->tx_buf_len, connect); + if (rc <= 0) { + return rc; + } + len = rc; + + /* Send connect packet */ + rc = MqttPacket_Write(client, client->tx_buf, len); + if (rc == len) { + rc = 0; + connect->stat = MQTT_MSG_WAIT; + } + else + { + if (rc == 0) { + /* Some other error */ + rc = -1; + } + } + } + + if ((rc == 0) && (connect->enable_lwt != 0)) { + /* If the will is enabled, then the gateway requests the topic and + message in separate packets. */ + rc = SN_Client_Will(client, &connect->will); + } + + if (rc == 0) { + connect->enable_lwt = 0; + + /* Wait for connect ack packet */ + rc = SN_Client_WaitType(client, &client->msg, client->cmd_timeout_ms, + SN_MSG_TYPE_CONNACK, 0, &connect->ack); + } + + return rc; +} + +int SN_Client_Will(MqttClient *client, SN_Will *will) +{ + int rc, len; + + /* Validate required arguments */ + if (client == NULL) { + return MQTT_CODE_ERROR_BAD_ARG; + } + + /* Wait for Will Topic Request packet */ + rc = SN_Client_WaitType(client, &client->msg, client->cmd_timeout_ms, + SN_MSG_TYPE_WILLTOPICREQ, 0, NULL); + if (rc == 0) { + + /* Encode Will Topic */ + len = rc = SN_Encode_WillTopic(client->tx_buf, client->tx_buf_len, will); + if (rc > 0) { + + /* Send Will Topic packet */ + rc = MqttPacket_Write(client, client->tx_buf, len); + if ((will != NULL) && (rc == len)) { + + /* Wait for Will Message Request */ + rc = SN_Client_WaitType(client, &client->msg, client->cmd_timeout_ms, + SN_MSG_TYPE_WILLMSGREQ, 0, NULL); + + if (rc == 0) { + + /* Encode Will Message */ + len = rc = SN_Encode_WillMsg(client->tx_buf, client->tx_buf_len, will); + if (rc > 0) { + + /* Send Will Topic packet */ + rc = MqttPacket_Write(client, client->tx_buf, len); + if (rc == len) + rc = 0; + } + } + } + } + } + + return rc; + +} + +int SN_Client_WillTopicUpdate(MqttClient *client, SN_Will *will) +{ + int rc = 0, len = 0; + + /* Validate required arguments */ + if (client == NULL) { + return MQTT_CODE_ERROR_BAD_ARG; + } + + /* Encode Will Topic Update */ + len = rc = SN_Encode_WillTopicUpdate(client->tx_buf, client->tx_buf_len, will); + if (rc > 0) { + + /* Send Will Topic Update packet */ + rc = MqttPacket_Write(client, client->tx_buf, len); + if ((will != NULL) && (rc == len)) { + + if (will != NULL) { + /* Wait for Will Topic Update Response packet */ + rc = SN_Client_WaitType(client, &client->msg, client->cmd_timeout_ms, + SN_MSG_TYPE_WILLTOPICREQ, 0, NULL); + } + } + } + + return rc; + +} + +int SN_Client_WillMsgUpdate(MqttClient *client, SN_Will *will) +{ + int rc = 0, len = 0; + + /* Validate required arguments */ + if ((client == NULL) || (will == NULL)) { + return MQTT_CODE_ERROR_BAD_ARG; + } + + /* Encode Will Message Update */ + len = rc = SN_Encode_WillMsgUpdate(client->tx_buf, client->tx_buf_len, will); + if (rc > 0) { + + /* Send Will Message Update packet */ + rc = MqttPacket_Write(client, client->tx_buf, len); + if ((will != NULL) && (rc == len)) { + + if (will != NULL) { + /* Wait for Will Message Update Response packet */ + rc = SN_Client_WaitType(client, &client->msg, client->cmd_timeout_ms, + SN_MSG_TYPE_WILLMSGRESP, 0, NULL); + } + } + } + + return rc; + +} + +int SN_Client_Subscribe(MqttClient *client, SN_Subscribe *subscribe) +{ + int rc = -1, len; + + /* Validate required arguments */ + if (client == NULL || subscribe == NULL) { + return MQTT_CODE_ERROR_BAD_ARG; + } + + if (subscribe->stat == MQTT_MSG_BEGIN) { + /* Encode the subscribe packet */ + rc = SN_Encode_Subscribe(client->tx_buf, client->tx_buf_len, + subscribe); + if (rc <= 0) { return rc; } + len = rc; + + /* Send subscribe packet */ + rc = MqttPacket_Write(client, client->tx_buf, len); + if (rc != len) { return rc; } + + subscribe->stat = MQTT_MSG_WAIT; + } + + /* Wait for subscribe ack packet */ + rc = SN_Client_WaitType(client, &client->msg, client->cmd_timeout_ms, + SN_MSG_TYPE_SUBACK, subscribe->packet_id, &subscribe->subAck); + + return rc; +} + +int SN_Client_Publish(MqttClient *client, SN_Publish *publish) +{ + int rc = MQTT_CODE_SUCCESS; + + /* Validate required arguments */ + if (client == NULL || publish == NULL) { + return MQTT_CODE_ERROR_BAD_ARG; + } + + switch (publish->stat) + { + case MQTT_MSG_BEGIN: + { + /* Encode the publish packet */ + rc = SN_Encode_Publish(client->tx_buf, client->tx_buf_len, + publish); + if (rc <= 0) { + return rc; + } + + client->write.len = rc; + publish->buffer_pos = 0; + + FALL_THROUGH; + } + case MQTT_MSG_WRITE: + { + publish->stat = MQTT_MSG_WRITE; + + /* Send packet and payload */ + rc = MqttPacket_Write(client, client->tx_buf, + client->write.len); + if (rc < 0) { + return rc; + } + + if (rc == client->write.len) { + rc = MQTT_CODE_SUCCESS; + } + else { + rc = -1; + } + + /* if not expecting a reply, the reset state and exit */ + if (publish->qos == MQTT_QOS_0) { + publish->stat = MQTT_MSG_BEGIN; + break; + } + + FALL_THROUGH; + } + + case MQTT_MSG_WAIT: + { + publish->stat = MQTT_MSG_WAIT; + + /* Handle QoS */ + if (publish->qos > MQTT_QOS_0) { + SN_PublishResp publish_resp; + XMEMSET(&publish_resp, 0, sizeof(SN_PublishResp)); + + /* Determine packet type to wait for */ + SN_MsgType type = (publish->qos == MQTT_QOS_1) ? + SN_MSG_TYPE_PUBACK : + SN_MSG_TYPE_PUBCOMP; + + /* Wait for publish response packet */ + rc = SN_Client_WaitType(client, &client->msg, + client->cmd_timeout_ms, type, publish->packet_id, &publish_resp); + + publish->return_code = publish_resp.return_code; + } + + break; + } + + case MQTT_MSG_READ: + case MQTT_MSG_READ_PAYLOAD: + #ifdef WOLFMQTT_DEBUG_CLIENT + PRINTF("SN_Client_Publish: Invalid state %d!", + publish->stat); + #endif + rc = MQTT_CODE_ERROR_STAT; + break; + } /* switch (publish->stat) */ + + return rc; +} + + +int SN_Client_Unsubscribe(MqttClient *client, SN_Unsubscribe *unsubscribe) +{ + int rc, len; + SN_UnsubscribeAck unsubscribe_ack; + + /* Validate required arguments */ + if (client == NULL || unsubscribe == NULL) { + return MQTT_CODE_ERROR_BAD_ARG; + } + + if (unsubscribe->stat == MQTT_MSG_BEGIN) { + /* Encode the subscribe packet */ + rc = SN_Encode_Unsubscribe(client->tx_buf, client->tx_buf_len, + unsubscribe); + if (rc <= 0) { return rc; } + len = rc; + + /* Send unsubscribe packet */ + rc = MqttPacket_Write(client, client->tx_buf, len); + if (rc != len) { return rc; } + + unsubscribe->stat = MQTT_MSG_WAIT; + } + + /* Wait for unsubscribe ack packet */ + rc = SN_Client_WaitType(client, &client->msg, client->cmd_timeout_ms, + SN_MSG_TYPE_UNSUBACK, unsubscribe->packet_id, + &unsubscribe_ack); + + return rc; +} + +int SN_Client_Register(MqttClient *client, SN_Register *regist) +{ + int rc, len; + + /* Validate required arguments */ + if (client == NULL) { + return MQTT_CODE_ERROR_BAD_ARG; + } + + if (client->msg.stat == MQTT_MSG_BEGIN) { + /* Encode the register packet */ + rc = SN_Encode_Register(client->tx_buf, client->tx_buf_len, regist); + if (rc <= 0) { + return rc; + } + len = rc; + + /* Send packet */ + rc = MqttPacket_Write(client, client->tx_buf, len); + if (rc != len) { + return rc; + } + + client->msg.stat = MQTT_MSG_WAIT; + } + + /* Wait for register acknowledge packet */ + rc = SN_Client_WaitType(client, &client->msg, client->cmd_timeout_ms, + SN_MSG_TYPE_REGACK, regist->packet_id, ®ist->regack); + + return rc; +} + +int SN_Client_Ping(MqttClient *client, SN_PingReq *ping) +{ + int rc, len; + + /* Validate required arguments */ + if (client == NULL) { + return MQTT_CODE_ERROR_BAD_ARG; + } + + if (client->msg.stat == MQTT_MSG_BEGIN) { + /* Encode the ping packet */ + rc = SN_Encode_Ping(client->tx_buf, client->tx_buf_len, ping); + if (rc <= 0) { return rc; } + len = rc; + + /* Send ping req packet */ + rc = MqttPacket_Write(client, client->tx_buf, len); + if (rc != len) { return rc; } + + client->msg.stat = MQTT_MSG_WAIT; + } + + /* Wait for ping resp packet */ + rc = SN_Client_WaitType(client, &client->msg, client->cmd_timeout_ms, + SN_MSG_TYPE_PING_RESP, 0, NULL); + + return rc; +} + +int SN_Client_Disconnect(MqttClient *client) +{ + return SN_Client_Disconnect_ex(client, NULL); +} + +int SN_Client_Disconnect_ex(MqttClient *client, SN_Disconnect *disconnect) +{ + int rc, len; + + /* Validate required arguments */ + if (client == NULL) { + return MQTT_CODE_ERROR_BAD_ARG; + } + + /* Encode the disconnect packet */ + rc = SN_Encode_Disconnect(client->tx_buf, client->tx_buf_len, disconnect); + if (rc <= 0) { return rc; } + len = rc; + + /* Send disconnect packet */ + rc = MqttPacket_Write(client, client->tx_buf, len); + if (rc != len) { return rc; } + + /* No response for MQTT disconnect packet */ + + return MQTT_CODE_SUCCESS; +} + +int SN_Client_WaitMessage(MqttClient *client, int timeout_ms) +{ + return SN_Client_WaitType(client, &client->msg, timeout_ms, + MQTT_PACKET_TYPE_ANY, 0, NULL); +} + +#endif /* defined WOLFMQTT_SN */ + diff --git a/src/mqtt_packet.c b/src/mqtt_packet.c index 5877f9b4f..57fefc5c7 100755 --- a/src/mqtt_packet.c +++ b/src/mqtt_packet.c @@ -1692,3 +1692,1225 @@ int MqttPacket_Read(MqttClient *client, byte* rx_buf, int rx_buf_len, /* Return read length */ return client->packet.header_len + remain_read; } + + +#ifdef WOLFMQTT_SN +int SN_Decode_Advertise(byte *rx_buf, int rx_buf_len, SN_Advertise *gw_info) +{ + int total_len; + byte *rx_payload = rx_buf, type; + + /* Validate required arguments */ + if (rx_buf == NULL || rx_buf_len <= 0) { + return MQTT_CODE_ERROR_BAD_ARG; + } + + /* Decode fixed header */ + total_len = *rx_payload++; + + /* Check message type */ + type = *rx_payload++; + if (total_len != 5) { + return MQTT_CODE_ERROR_MALFORMED_DATA; + } + + if (type != SN_MSG_TYPE_ADVERTISE) { + return MQTT_CODE_ERROR_PACKET_TYPE; + } + + /* Decode gateway info */ + if (gw_info != NULL) { + gw_info->gwId = *rx_payload++; + + rx_payload += MqttDecode_Num(rx_payload, &gw_info->duration); + } + + /* Return total length of packet */ + return total_len; +} + +int SN_Encode_SearchGW(byte *tx_buf, int tx_buf_len, byte hops) +{ + int total_len; + byte *tx_payload = tx_buf; + + /* Packet length is not variable */ + total_len = 3; + + /* Validate required arguments */ + if (tx_buf == NULL || tx_buf_len < total_len) { + return MQTT_CODE_ERROR_BAD_ARG; + } + + /* Encode length */ + *tx_payload++ = total_len; + + /* Encode message type */ + *tx_payload++ = SN_MSG_TYPE_SEARCHGW; + + /* Encode radius */ + *tx_payload++ = hops; + + /* Return total length of packet */ + return total_len; +} + +int SN_Decode_GWInfo(byte *rx_buf, int rx_buf_len, SN_GwInfo *gw_info) +{ + int total_len; + byte *rx_payload = rx_buf, type; + + /* Validate required arguments */ + if (rx_buf == NULL || rx_buf_len <= 0) { + return MQTT_CODE_ERROR_BAD_ARG; + } + + /* Decode fixed header */ + total_len = *rx_payload++; + if (total_len == SN_PACKET_LEN_IND) { + /* The length is stored in the next two bytes */ + rx_payload += MqttDecode_Num(rx_payload, (word16*)&total_len); + } + + if (total_len > rx_buf_len) { + return MQTT_CODE_ERROR_OUT_OF_BUFFER; + } + if (total_len < 3) { + return MQTT_CODE_ERROR_MALFORMED_DATA; + } + /* Check message type */ + type = *rx_payload++; + if (type != SN_MSG_TYPE_GWINFO) { + return MQTT_CODE_ERROR_PACKET_TYPE; + } + + /* Decode gateway info */ + if (gw_info != NULL) { + gw_info->gwId = *rx_payload++; + + //TODO: validate size of gwAddr + if (total_len - 3 > 0) { + /* The gateway address is only present if sent by a client */ + XMEMCPY(gw_info->gwAddr, rx_payload, total_len - 3); + } + } + + /* Return total length of packet */ + return total_len; +} + +/* Packet Type Encoders/Decoders */ +int SN_Encode_Connect(byte *tx_buf, int tx_buf_len, SN_Connect *connect) +{ + word16 total_len, id_len; + byte flags = 0; + byte *tx_payload = tx_buf; + + /* Validate required arguments */ + if ((tx_buf == NULL) || (connect == NULL) || + (connect->client_id == NULL) || (connect->protocol_level == 0)) { + return MQTT_CODE_ERROR_BAD_ARG; + } + + /* Determine packet length */ + total_len = 6; /* Len + Message Type + Flags + ProtocolID + Duration(2) */ + + /* Client ID size */ + id_len = (int)XSTRLEN(connect->client_id); + id_len = (id_len <= SN_CLIENTID_MAX_LEN) ? id_len : SN_CLIENTID_MAX_LEN; + + total_len += id_len; + + if (total_len > SN_PACKET_MAX_SMALL_SIZE) { + total_len += 2; /* Store len in three bytes */ + } + + if (total_len > tx_buf_len) { + return MQTT_CODE_ERROR_OUT_OF_BUFFER; + } + + /* Encode length */ + if (total_len <= SN_PACKET_MAX_SMALL_SIZE) { + *tx_payload++ = total_len; + } + else { + *tx_payload++ = SN_PACKET_LEN_IND; + tx_payload += MqttEncode_Num(tx_payload, total_len); + } + + /* Encode message type */ + *tx_payload++ = SN_MSG_TYPE_CONNECT; + + /* Encode flags */ + if (connect->clean_session) { + flags |= SN_PACKET_FLAG_CLEANSESSION; + } + if (connect->enable_lwt) { + flags |= SN_PACKET_FLAG_WILL; + } + *tx_payload++ = flags; + + /* Protocol version */ + *tx_payload++ = connect->protocol_level; + + /* Encode duration (keep-alive) */ + tx_payload += MqttEncode_Num(tx_payload, connect->keep_alive_sec); + + /* Encode Client ID */ + XMEMCPY(tx_payload, connect->client_id, id_len); + + /* Return total length of packet */ + return total_len; +} + +int SN_Decode_WillTopicReq(byte *rx_buf, int rx_buf_len) +{ + int total_len; + byte *rx_payload = rx_buf, type; + + /* Validate required arguments */ + if (rx_buf == NULL || rx_buf_len <= 0) { + return MQTT_CODE_ERROR_BAD_ARG; + } + + /* Length and MsgType */ + total_len = *rx_payload++; + if (total_len != 2) { + return MQTT_CODE_ERROR_MALFORMED_DATA; + } + + type = *rx_payload++; + if (type != SN_MSG_TYPE_WILLTOPICREQ) { + return MQTT_CODE_ERROR_PACKET_TYPE; + } + + /* Return total length of packet */ + return total_len; +} + +/* An empty WILLTOPIC message is a WILLTOPIC message without Flags and + WillTopic field (i.e. it is exactly 2 octets long). It is used by a client + to delete the Will topic and the Will message stored in the server */ +int SN_Encode_WillTopic(byte *tx_buf, int tx_buf_len, SN_Will *willTopic) +{ + int total_len; + byte *tx_payload, flags = 0; + + /* Validate required arguments */ + if (tx_buf == NULL) { + return MQTT_CODE_ERROR_BAD_ARG; + } + + /* Length and MsgType */ + total_len = 2; + + /* Determine packet length */ + if (willTopic != NULL) { + /* Will Topic is a string */ + total_len += XSTRLEN(willTopic->willTopic); + + /* Flags */ + total_len++; + + if (total_len > SN_PACKET_MAX_SMALL_SIZE) { + /* Length is stored in bytes 1 and 2 */ + total_len += 2; + } + } + + if (total_len > tx_buf_len) { + /* Buffer too small */ + return MQTT_CODE_ERROR_OUT_OF_BUFFER; + } + + /* Encode length */ + tx_payload = tx_buf; + + if (total_len <= SN_PACKET_MAX_SMALL_SIZE) { + *tx_payload++ = total_len; + } + else { + *tx_payload++ = SN_PACKET_LEN_IND; + tx_payload += MqttEncode_Num(tx_payload, total_len); + } + + /* Encode message type */ + *tx_payload++ = SN_MSG_TYPE_WILLTOPIC; + + if (willTopic != NULL) { + /* Encode flags */ + flags |= ((willTopic->qos << SN_PACKET_FLAG_QOS_SHIFT) & + SN_PACKET_FLAG_QOS_MASK); + flags |= (willTopic->retain != 0) ? SN_PACKET_FLAG_RETAIN : 0; + *tx_payload++ = flags; + + /* Encode Will Topic */ + XMEMCPY(tx_payload, willTopic->willTopic, XSTRLEN(willTopic->willTopic)); + } + + return total_len; +} + +int SN_Decode_WillMsgReq(byte *rx_buf, int rx_buf_len) +{ + int total_len; + byte *rx_payload = rx_buf, type; + + /* Validate required arguments */ + if (rx_buf == NULL || rx_buf_len <= 0) { + return MQTT_CODE_ERROR_BAD_ARG; + } + + /* Decode fixed header */ + total_len = *rx_payload++; + + /* Length and MsgType */ + if (total_len != 2){ + return MQTT_CODE_ERROR_MALFORMED_DATA; + } + + /* Message Type */ + type = *rx_payload++; + if (type != SN_MSG_TYPE_WILLMSGREQ) { + return MQTT_CODE_ERROR_PACKET_TYPE; + } + + /* Return total length of packet */ + return total_len; +} + +int SN_Encode_WillMsg(byte *tx_buf, int tx_buf_len, SN_Will *willMsg) +{ + int total_len; + byte *tx_payload; + + /* Validate required arguments */ + if ((tx_buf == NULL) || (willMsg == NULL)) { + return MQTT_CODE_ERROR_BAD_ARG; + } + + /* Length and MsgType */ + total_len = 2; + + /* Determine packet length */ + /* Add Will Message len */ + total_len += willMsg->willMsgLen; + + if (total_len > SN_PACKET_MAX_SMALL_SIZE) { + /* Length is stored in bytes 1 and 2 */ + total_len += 2; + } + + if (total_len > tx_buf_len) { + /* Buffer too small */ + return MQTT_CODE_ERROR_OUT_OF_BUFFER; + } + + /* Encode length */ + tx_payload = tx_buf; + + if (total_len <= SN_PACKET_MAX_SMALL_SIZE) { + *tx_payload++ = total_len; + } + else { + *tx_payload++ = SN_PACKET_LEN_IND; + tx_payload += MqttEncode_Num(tx_payload, total_len); + } + + /* Encode message type */ + *tx_payload++ = SN_MSG_TYPE_WILLMSG; + + /* Encode Will Message */ + XMEMCPY(tx_payload, willMsg->willMsg, willMsg->willMsgLen); + + return total_len; +} + +int SN_Encode_WillTopicUpdate(byte *tx_buf, int tx_buf_len, SN_Will *willTopic) +{ + int total_len; + byte *tx_payload, flags = 0; + + /* Validate required arguments */ + if (tx_buf == NULL) { + return MQTT_CODE_ERROR_BAD_ARG; + } + + /* Length and MsgType */ + total_len = 2; + + /* Determine packet length */ + if (willTopic != NULL) { + /* Will Topic is a string */ + total_len += XSTRLEN(willTopic->willTopic); + + /* Flags */ + total_len++; + + if (total_len > SN_PACKET_MAX_SMALL_SIZE) { + /* Length is stored in bytes 1 and 2 */ + total_len += 2; + } + } + + if (total_len > tx_buf_len) { + /* Buffer too small */ + return MQTT_CODE_ERROR_OUT_OF_BUFFER; + } + + /* Encode length */ + tx_payload = tx_buf; + + if (total_len <= SN_PACKET_MAX_SMALL_SIZE) { + *tx_payload++ = total_len; + } + else { + *tx_payload++ = SN_PACKET_LEN_IND; + tx_payload += MqttEncode_Num(tx_payload, total_len); + } + + /* Encode message type */ + *tx_payload++ = SN_MSG_TYPE_WILLTOPICUPD; + + if (willTopic != NULL) { + /* Encode flags */ + flags |= ((willTopic->qos << SN_PACKET_FLAG_QOS_SHIFT) & + SN_PACKET_FLAG_QOS_MASK); + flags |= (willTopic->retain != 0) ? SN_PACKET_FLAG_RETAIN : 0; + *tx_payload++ = flags; + + /* Encode Will Topic */ + XMEMCPY(tx_payload, willTopic->willTopic, XSTRLEN(willTopic->willTopic)); + } + + return total_len; + +} + +int SN_Decode_WillTopicResponse(byte *rx_buf, int rx_buf_len) +{ + int total_len; + byte *rx_payload = rx_buf, type; + + /* Validate required arguments */ + if (rx_buf == NULL || rx_buf_len <= 0) { + return MQTT_CODE_ERROR_BAD_ARG; + } + + /* Decode fixed header */ + total_len = *rx_payload++; + if (total_len != 2) { + return MQTT_CODE_ERROR_MALFORMED_DATA; + } + + type = *rx_payload++; + if (type != SN_MSG_TYPE_WILLTOPICRESP) { + return MQTT_CODE_ERROR_PACKET_TYPE; + } + + /* Return total length of packet */ + return total_len; +} + +int SN_Encode_WillMsgUpdate(byte *tx_buf, int tx_buf_len, SN_Will *willMsg) +{ + int total_len; + byte *tx_payload; + + /* Validate required arguments */ + if ((tx_buf == NULL) || (willMsg == NULL)) { + return MQTT_CODE_ERROR_BAD_ARG; + } + + /* Length and MsgType */ + total_len = 2; + + /* Determine packet length */ + /* Add Will Message len */ + total_len += willMsg->willMsgLen; + + if (total_len > SN_PACKET_MAX_SMALL_SIZE) { + /* Length is stored in bytes 1 and 2 */ + total_len += 2; + } + + if (total_len > tx_buf_len) { + /* Buffer too small */ + return MQTT_CODE_ERROR_OUT_OF_BUFFER; + } + + /* Encode length */ + tx_payload = tx_buf; + + if (total_len <= SN_PACKET_MAX_SMALL_SIZE) { + *tx_payload++ = total_len; + } + else { + *tx_payload++ = SN_PACKET_LEN_IND; + tx_payload += MqttEncode_Num(tx_payload, total_len); + } + + /* Encode message type */ + *tx_payload++ = SN_MSG_TYPE_WILLMSGUPD; + + /* Encode Will Message */ + XMEMCPY(tx_payload, willMsg->willMsg, willMsg->willMsgLen); + + return total_len; +} + +int SN_Decode_WillMsgResponse(byte *rx_buf, int rx_buf_len) +{ + int total_len; + byte *rx_payload = rx_buf, type; + + /* Validate required arguments */ + if (rx_buf == NULL || rx_buf_len <= 0) { + return MQTT_CODE_ERROR_BAD_ARG; + } + + /* Decode fixed header */ + total_len = *rx_payload++; + if (total_len != 2) { + return MQTT_CODE_ERROR_MALFORMED_DATA; + } + + type = *rx_payload++; + if (type != SN_MSG_TYPE_WILLMSGRESP) { + return MQTT_CODE_ERROR_PACKET_TYPE; + } + + /* Return total length of packet */ + return total_len; +} + +int SN_Decode_ConnectAck(byte *rx_buf, int rx_buf_len, + SN_ConnectAck *connect_ack) +{ + int total_len; + byte *rx_payload = rx_buf, type; + + /* Validate required arguments */ + if (rx_buf == NULL || rx_buf_len <= 0) { + return MQTT_CODE_ERROR_BAD_ARG; + } + + /* Decode fixed header */ + total_len = *rx_payload++; + if (total_len != 3) { + return MQTT_CODE_ERROR_MALFORMED_DATA; + } + + type = *rx_payload++; + if (type != SN_MSG_TYPE_CONNACK) { + return MQTT_CODE_ERROR_PACKET_TYPE; + } + + /* Decode variable header */ + if (connect_ack) { + connect_ack->return_code = *rx_payload++; + } + + /* Return total length of packet */ + return total_len; +} + +int SN_Encode_Register(byte *tx_buf, int tx_buf_len, SN_Register *regist) +{ + int total_len; + byte *tx_payload; + + /* Validate required arguments */ + if (tx_buf == NULL || regist == NULL) { + return MQTT_CODE_ERROR_BAD_ARG; + } + + /* Determine packet length */ + /* Topic name is a string */ + total_len = (int)XSTRLEN(regist->topicName); + + /* Length, MsgType, TopicID (2), and packet_id (2) */ + total_len += 6; + + if (total_len > SN_PACKET_MAX_SMALL_SIZE) { + /* Length is stored in bytes 1 and 2 */ + total_len += 2; + } + + if (total_len > tx_buf_len) { + /* Buffer too small */ + return MQTT_CODE_ERROR_OUT_OF_BUFFER; + } + + /* Encode length */ + tx_payload = tx_buf; + + if (total_len <= SN_PACKET_MAX_SMALL_SIZE) { + *tx_payload++ = total_len; + } + else { + *tx_payload++ = SN_PACKET_LEN_IND; + tx_payload += MqttEncode_Num(tx_payload, total_len); + } + + /* Encode message type */ + *tx_payload++ = SN_MSG_TYPE_REGISTER; + + /* Encode Topic ID */ + tx_payload += MqttEncode_Num(tx_payload, regist->topicId); + + /* Encode Packet ID */ + tx_payload += MqttEncode_Num(tx_payload, regist->packet_id); + + /* Encode Topic Name */ + XMEMCPY(tx_payload, regist->topicName, XSTRLEN(regist->topicName)); + + return total_len; +} + +int SN_Decode_RegAck(byte *rx_buf, int rx_buf_len, SN_RegAck *regack) +{ + int total_len; + byte *rx_payload = rx_buf, type; + + /* Validate required arguments */ + if (rx_buf == NULL || rx_buf_len <= 0) { + return MQTT_CODE_ERROR_BAD_ARG; + } + + /* Decode fixed header */ + total_len = *rx_payload++; + if (total_len != 7) { + return MQTT_CODE_ERROR_MALFORMED_DATA; + } + + type = *rx_payload++; + if (type != SN_MSG_TYPE_REGACK) { + return MQTT_CODE_ERROR_PACKET_TYPE; + } + + if (regack != NULL) { + /* Decode Topic ID assigned by GW */ + rx_payload += MqttDecode_Num(rx_payload, ®ack->topicId); + + /* Decode packet ID */ + rx_payload += MqttDecode_Num(rx_payload, ®ack->packet_id); + + /* Decode return code */ + regack->return_code = *rx_payload++; + } + + /* Return total length of packet */ + return total_len; +} + +int SN_Encode_Subscribe(byte *tx_buf, int tx_buf_len, SN_Subscribe *subscribe) +{ + int total_len; + byte *tx_payload, flags = 0x00; + + /* Validate required arguments */ + if (tx_buf == NULL || subscribe == NULL) { + return MQTT_CODE_ERROR_BAD_ARG; + } + + /* Determine packet length */ + if ((subscribe->topic_type & SN_PACKET_FLAG_TOPICIDTYPE_MASK) == + SN_TOPIC_ID_TYPE_NORMAL) { + /* Topic name is a string */ + total_len = (int)XSTRLEN(subscribe->topicNameId); + } + else { + /* Topic ID or Short name */ + total_len = 2; + } + + /* Length, MsgType, Flags, and MsgID (2) */ + total_len += 5; + + if (total_len > SN_PACKET_MAX_SMALL_SIZE) { + /* Length is stored in bytes 1 and 2 */ + total_len += 2; + } + + if (total_len > tx_buf_len) { + /* Buffer too small */ + return MQTT_CODE_ERROR_OUT_OF_BUFFER; + } + + /* Encode length */ + tx_payload = tx_buf; + + if (total_len <= SN_PACKET_MAX_SMALL_SIZE) { + *tx_payload++ = total_len; + } + else { + *tx_payload++ = SN_PACKET_LEN_IND; + tx_payload += MqttEncode_Num(tx_payload, total_len); + } + + /* Encode message type */ + *tx_payload++ = SN_MSG_TYPE_SUBSCRIBE; + + /* Set flags */ + if (subscribe->duplicate) + flags |= SN_PACKET_FLAG_DUPLICATE; + flags |= (SN_PACKET_FLAG_QOS_MASK & + (subscribe->qos << SN_PACKET_FLAG_QOS_SHIFT)); + flags |= (SN_PACKET_FLAG_TOPICIDTYPE_MASK & subscribe->topic_type); + + *tx_payload++ = flags; + + /* Encode packet ID */ + tx_payload += MqttEncode_Num(tx_payload, subscribe->packet_id); + + /* Encode topic */ + if ((subscribe->topic_type & SN_PACKET_FLAG_TOPICIDTYPE_MASK) != + SN_TOPIC_ID_TYPE_PREDEF) { + /* Topic name is a string */ + XMEMCPY(tx_payload, subscribe->topicNameId, XSTRLEN(subscribe->topicNameId)); + } + else + { + /* Topic ID */ + tx_payload += MqttEncode_Num(tx_payload, + (word16)subscribe->topicNameId[0]); + } + + /* Return total length of packet */ + return total_len; +} + +int SN_Decode_SubscribeAck(byte* rx_buf, int rx_buf_len, + SN_SubAck *subscribe_ack) +{ + word16 total_len; + byte* rx_payload = rx_buf, type; + + /* Validate required arguments */ + if (rx_buf == NULL || rx_buf_len <= 0) { + return MQTT_CODE_ERROR_BAD_ARG; + } + + /* Decode fixed header */ + total_len = *rx_payload++; + if (total_len != 8) { + return MQTT_CODE_ERROR_MALFORMED_DATA; + } + + type = *rx_payload++; + if (type != SN_MSG_TYPE_SUBACK) { + return MQTT_CODE_ERROR_OUT_OF_BUFFER; + } + + /* Decode SubAck fields */ + if (subscribe_ack) { + subscribe_ack->flags = *rx_payload++; + rx_payload += MqttDecode_Num(rx_payload, &subscribe_ack->topicId); + rx_payload += MqttDecode_Num(rx_payload, &subscribe_ack->packet_id); + subscribe_ack->return_code = *rx_payload++; + } + + /* Return total length of packet */ + return total_len; +} + +int SN_Encode_Publish(byte *tx_buf, int tx_buf_len, MqttPublish *publish) +{ + word16 total_len; + byte *tx_payload = tx_buf; + byte flags = 0; + + /* Validate required arguments */ + if (tx_buf == NULL || publish == NULL) { + return MQTT_CODE_ERROR_BAD_ARG; + } + + /* Determine packet length */ + total_len = publish->total_len; + + /* Add length, msgType, flags, topic ID (2), and msgID (2) */ + total_len += 7; + + if (total_len > SN_PACKET_MAX_SMALL_SIZE) { + /* Length is stored in bytes 1 and 2 */ + total_len += 2; + } + + if (total_len > tx_buf_len) { + /* Buffer too small */ + return MQTT_CODE_ERROR_OUT_OF_BUFFER; + } + + /* Encode header */ + if (total_len <= SN_PACKET_MAX_SMALL_SIZE) { + *tx_payload++ = total_len; + } + else { + *tx_payload++ = SN_PACKET_LEN_IND; + tx_payload += MqttEncode_Num(tx_payload, total_len); + } + + *tx_payload++ = SN_MSG_TYPE_PUBLISH; + + /* Set flags */ + if (publish->duplicate) + flags |= SN_PACKET_FLAG_DUPLICATE; + flags |= (SN_PACKET_FLAG_QOS_MASK & + (publish->qos << SN_PACKET_FLAG_QOS_SHIFT)); + if (publish->retain) + flags |= SN_PACKET_FLAG_RETAIN; + flags |= (SN_PACKET_FLAG_TOPICIDTYPE_MASK & publish->topic_type); + + *tx_payload++ = flags; + + tx_payload += MqttEncode_Num(tx_payload, (word16)*publish->topic_name); + tx_payload += MqttEncode_Num(tx_payload, publish->packet_id); + + /* Encode payload */ + XMEMCPY(tx_payload, publish->buffer, publish->total_len); + + /* Return length of packet placed into tx_buf */ + return total_len; +} + +int SN_Decode_Publish(byte *rx_buf, int rx_buf_len, MqttPublish *publish) +{ + word16 total_len; + byte *rx_payload = rx_buf; + byte flags = 0, type; + + /* Validate required arguments */ + if (rx_buf == NULL || publish == NULL) { + return MQTT_CODE_ERROR_BAD_ARG; + } + + /* Decode fixed header */ + total_len = *rx_payload++; + if (total_len == SN_PACKET_LEN_IND) { + /* The length is stored in the next two bytes */ + rx_payload += MqttDecode_Num(rx_payload, &total_len); + } + + if (total_len > rx_buf_len) { + return MQTT_CODE_ERROR_OUT_OF_BUFFER; + } + + if (total_len < 7) { + return MQTT_CODE_ERROR_MALFORMED_DATA; + } + + /* Message Type */ + type = *rx_payload++; + if (type != SN_MSG_TYPE_PUBLISH) { + return MQTT_CODE_ERROR_PACKET_TYPE; + } + + flags = *rx_payload++; + + rx_payload += MqttDecode_Num(rx_payload, (word16*)publish->topic_name); + + rx_payload += MqttDecode_Num(rx_payload, &publish->packet_id); + + /* Set flags */ + publish->duplicate = flags & SN_PACKET_FLAG_DUPLICATE; + + publish->qos = (flags >> SN_PACKET_FLAG_QOS_SHIFT) & SN_PACKET_FLAG_QOS_MASK; + + publish->retain = flags & SN_PACKET_FLAG_RETAIN; + + publish->type = flags & SN_PACKET_FLAG_TOPICIDTYPE_MASK; + + /* Decode payload */ + + publish->total_len = total_len - 7; + publish->buffer = rx_payload; + publish->buffer_pos = 0; + publish->buffer_len = publish->total_len; + + /* Return length of packet read from rx_buf */ + return total_len; +} + +int SN_Encode_PublishResp(byte* tx_buf, int tx_buf_len, byte type, + SN_PublishResp *publish_resp) +{ + int total_len; + byte *tx_payload = tx_buf; + + /* Validate required arguments */ + if (tx_buf == NULL || publish_resp == NULL) { + return MQTT_CODE_ERROR_BAD_ARG; + } + + /* Determine packet length */ + total_len = (type == SN_MSG_TYPE_PUBACK) ? 7 : 4; + + if (total_len > tx_buf_len) + return MQTT_CODE_ERROR_OUT_OF_BUFFER; + + /* Encode */ + *tx_payload++ = (byte)total_len; + + *tx_payload++ = type; + + if (type == SN_MSG_TYPE_PUBACK) { + tx_payload += MqttEncode_Num(tx_payload, publish_resp->topicId); + } + + tx_payload += MqttEncode_Num(tx_payload, publish_resp->packet_id); + + if (type == SN_MSG_TYPE_PUBACK) { + *tx_payload++ = publish_resp->return_code; + } + + /* Return total length of packet */ + return total_len; +} + +int SN_Decode_PublishResp(byte* rx_buf, int rx_buf_len, byte type, + SN_PublishResp *publish_resp) +{ + int total_len; + byte rec_type, *rx_payload = rx_buf; + + /* Validate required arguments */ + if (rx_buf == NULL || rx_buf_len <= 0) { + return MQTT_CODE_ERROR_BAD_ARG; + } + + /* Decode */ + total_len = *rx_payload++; + + if(total_len > rx_buf_len) { + /* Buffer too small */ + return MQTT_CODE_ERROR_OUT_OF_BUFFER; + } + + /* Validate packet type */ + rec_type = *rx_payload++; + if (rec_type != type) { + return MQTT_CODE_ERROR_PACKET_TYPE; + } + + if (publish_resp) { + if (type == SN_MSG_TYPE_PUBACK) { + rx_payload += MqttDecode_Num(rx_payload, &publish_resp->topicId); + } + + rx_payload += MqttDecode_Num(rx_payload, &publish_resp->packet_id); + + if (type == SN_MSG_TYPE_PUBACK) { + publish_resp->return_code = *rx_payload++; + } + } + + /* Return total length of packet */ + return total_len; +} + +int SN_Encode_Unsubscribe(byte *tx_buf, int tx_buf_len, + SN_Unsubscribe *unsubscribe) +{ + int total_len; + byte *tx_payload, flags = 0x00; + + /* Validate required arguments */ + if (tx_buf == NULL || unsubscribe == NULL) { + return MQTT_CODE_ERROR_BAD_ARG; + } + + /* Determine packet length */ + if ((unsubscribe->topic_type & SN_PACKET_FLAG_TOPICIDTYPE_MASK) == + SN_TOPIC_ID_TYPE_NORMAL) { + /* Topic name is a string */ + total_len = MqttEncode_String(NULL, unsubscribe->topicNameId); + } + else { + /* Topic ID or Short name */ + total_len = 2; + } + + /* Length, MsgType, Flags, and MsgID (2) */ + total_len += 5; + + if (total_len > SN_PACKET_MAX_SMALL_SIZE) { + /* Length is stored in bytes 1 and 2 */ + total_len += 2; + } + + if (total_len > tx_buf_len) { + /* Buffer too small */ + return MQTT_CODE_ERROR_OUT_OF_BUFFER; + } + + /* Encode header */ + tx_payload = tx_buf; + + if (total_len <= SN_PACKET_MAX_SMALL_SIZE) { + *tx_payload++ = total_len; + } + else { + *tx_payload++ = SN_PACKET_LEN_IND; + tx_payload += MqttEncode_Num(tx_payload, total_len); + } + + *tx_payload++ = SN_MSG_TYPE_UNSUBSCRIBE; + + /* Set flags */ + if (unsubscribe->duplicate) + flags |= SN_PACKET_FLAG_DUPLICATE; + flags |= (SN_PACKET_FLAG_QOS_MASK & + (unsubscribe->qos << SN_PACKET_FLAG_QOS_SHIFT)); + flags |= (SN_PACKET_FLAG_TOPICIDTYPE_MASK & unsubscribe->topic_type); + + *tx_payload++ = flags; + + tx_payload += MqttEncode_Num(tx_payload, unsubscribe->packet_id); + + /* Encode topic */ + if ((unsubscribe->topic_type & SN_PACKET_FLAG_TOPICIDTYPE_MASK) == + SN_TOPIC_ID_TYPE_NORMAL) { + /* Topic name is a string */ + tx_payload += MqttEncode_String(tx_payload, unsubscribe->topicNameId); + } + else { + /* Topic ID or Short name */ + tx_payload += MqttEncode_Num(tx_payload, + (word16)unsubscribe->topicNameId[0]); + } + + /* Return total length of packet */ + return total_len; +} + +int SN_Decode_UnsubscribeAck(byte *rx_buf, int rx_buf_len, + SN_UnsubscribeAck *unsubscribe_ack) +{ + word16 total_len; + byte *rx_payload = rx_buf, type; + + /* Validate required arguments */ + if (rx_buf == NULL || rx_buf_len <= 0) { + return MQTT_CODE_ERROR_BAD_ARG; + } + + /* Decode fixed header */ + total_len = *rx_payload++; + if (total_len != 4) { + return MQTT_CODE_ERROR_MALFORMED_DATA; + } + + type = *rx_payload++; + if (type != SN_MSG_TYPE_UNSUBACK) { + return MQTT_CODE_ERROR_PACKET_TYPE; + } + + /* Decode SubAck fields */ + if (unsubscribe_ack) { + rx_payload += MqttDecode_Num(rx_payload, &unsubscribe_ack->packet_id); + } + + /* Return total length of packet */ + return total_len; +} + +int SN_Encode_Disconnect(byte *tx_buf, int tx_buf_len, + SN_Disconnect* disconnect) +{ + int total_len = 2; /* length and message type */ + byte *tx_payload = tx_buf;; + + /* Validate required arguments */ + if (tx_buf == NULL) { + return MQTT_CODE_ERROR_BAD_ARG; + } + + if ((disconnect != NULL) && (disconnect->sleepTmr > 0)) { + total_len += 2; /* Sleep duration is set */ + } + + if (total_len > tx_buf_len) { + return MQTT_CODE_ERROR_OUT_OF_BUFFER; + } + + /* Encode message */ + *tx_payload++ = total_len; + + *tx_payload++ = SN_MSG_TYPE_DISCONNECT; + + if ((disconnect != NULL) && (disconnect->sleepTmr > 0)) { + tx_payload += MqttEncode_Num(tx_payload, disconnect->sleepTmr); + } + + /* Return total length of packet */ + return total_len; +} + +int SN_Encode_Ping(byte *tx_buf, int tx_buf_len, SN_PingReq *ping) +{ + int total_len = 2, clientId_len = 0; + byte *tx_payload = tx_buf; + + /* Validate required arguments */ + if (tx_buf == NULL) { + return MQTT_CODE_ERROR_BAD_ARG; + } + + if (ping != NULL && ping->clientId != NULL) { + total_len += clientId_len = (int)XSTRLEN(ping->clientId); + } + + if (total_len > tx_buf_len) { + return MQTT_CODE_ERROR_OUT_OF_BUFFER; + } + + *tx_payload++ = (byte)total_len; + + *tx_payload++ = SN_MSG_TYPE_PING_REQ; + + if (clientId_len > 0) { + XMEMCPY(tx_payload, ping->clientId, clientId_len); + } + + /* Return total length of packet */ + return total_len; +} + +int SN_Decode_Ping(byte *rx_buf, int rx_buf_len) +{ + int total_len; + byte *rx_payload = rx_buf, type; + + /* Validate required arguments */ + if (rx_buf == NULL || rx_buf_len <= 0) { + return MQTT_CODE_ERROR_BAD_ARG; + } + + total_len = *rx_payload++; + if (total_len != 2) { + return MQTT_CODE_ERROR_MALFORMED_DATA; + } + + type = *rx_payload++; + if (type != SN_MSG_TYPE_PING_RESP) { + return MQTT_CODE_ERROR_PACKET_TYPE; + } + + /* Return total length of packet */ + return total_len; +} + +static int SN_Packet_HandleNetError(MqttClient *client, int rc) +{ + (void)client; +#ifdef WOLFMQTT_DISCONNECT_CB + if (rc < 0 && + rc != MQTT_CODE_CONTINUE && + rc != MQTT_CODE_STDIN_WAKE) + { + /* don't use return code for now - future use */ + if (client->disconnect_cb) + client->disconnect_cb(client, rc, client->disconnect_ctx); + } +#endif + return rc; +} + +/* Read return code is length when > 0 */ +int SN_Packet_Read(MqttClient *client, byte* rx_buf, int rx_buf_len, + int timeout_ms) +{ + int rc, len = 0, remain_read = 0; + word16 total_len = 0; + + switch (client->packet.stat) + { + case MQTT_PK_BEGIN: + { + /* Read first 2 bytes using MSG_PEEK */ + rc = MqttSocket_Peek(client, rx_buf, 2, timeout_ms); + if (rc < 0) { + return SN_Packet_HandleNetError(client, rc); + } + else if (rc != 2) { + return SN_Packet_HandleNetError(client, + MQTT_CODE_ERROR_NETWORK); + } + + len = rc; + + if (rx_buf[0] == SN_PACKET_LEN_IND){ + /* Read length stored in first three bytes, type in fourth */ + rc = MqttSocket_Peek(client, rx_buf, 4, timeout_ms); + if (rc < 0) { + return SN_Packet_HandleNetError(client, rc); + } + else if (rc != 4) { + return SN_Packet_HandleNetError(client, + MQTT_CODE_ERROR_NETWORK); + } + + len = rc; + (void)MqttDecode_Num(&rx_buf[1], &total_len); + client->packet.header_len = len; + } + else { + /* Length is stored in first byte, type in second */ + total_len = rx_buf[0]; + client->packet.header_len = len; + } + + FALL_THROUGH; + } + + case MQTT_PK_READ_HEAD: + { + client->packet.stat = MQTT_PK_READ_HEAD; + + FALL_THROUGH; + } + + case MQTT_PK_READ: + { + client->packet.stat = MQTT_PK_READ; + + if (total_len > len) { + client->packet.remain_len = total_len - len; + } + else if ((total_len == 2) || (total_len == 4)) { + /* Handle peek */ + client->packet.remain_len = total_len; + } + else { + client->packet.remain_len = 0; + } + + /* Make sure it does not overflow rx_buf */ + if (client->packet.remain_len > + (rx_buf_len - client->packet.header_len)) { + client->packet.remain_len = rx_buf_len - + client->packet.header_len; + } + + /* Read whole message */ + if (client->packet.remain_len > 0) { + rc = MqttSocket_Read(client, &rx_buf[0], + total_len, timeout_ms); + if (rc <= 0) { + return SN_Packet_HandleNetError(client, rc); + } + remain_read = rc; + } + + break; + } + } /* switch (client->packet.stat) */ + + /* reset state */ + client->packet.stat = MQTT_PK_BEGIN; + + /* Return read length */ + return remain_read; +} +#endif /* defined WOLFMQTT_SN */ diff --git a/src/mqtt_socket.c b/src/mqtt_socket.c index fd93140a3..1100133fc 100644 --- a/src/mqtt_socket.c +++ b/src/mqtt_socket.c @@ -46,6 +46,7 @@ static int MqttSocket_TlsSocketReceive(WOLFSSL* ssl, char *buf, int sz, int rc; MqttClient *client = (MqttClient*)ptr; (void)ssl; /* Not used */ + rc = client->net->read(client->net->context, (byte*)buf, sz, client->tls.timeout_ms); @@ -278,6 +279,36 @@ int MqttSocket_Read(MqttClient *client, byte* buf, int buf_len, int timeout_ms) return rc; } +#ifdef WOLFMQTT_SN +int MqttSocket_Peek(MqttClient *client, byte* buf, int buf_len, int timeout_ms) +{ + int rc; + + /* Validate arguments */ + if (client == NULL || client->net == NULL || client->net->peek == NULL || + buf == NULL || buf_len <= 0) { + return MQTT_CODE_ERROR_BAD_ARG; + } + + /* check for buffer position overflow */ + if (client->read.pos >= buf_len) { + return MQTT_CODE_ERROR_OUT_OF_BUFFER; + } + + rc = client->net->peek(client->net->context, buf, buf_len, timeout_ms); + if (rc > 0) { + #ifdef WOLFMQTT_DEBUG_SOCKET + PRINTF("MqttSocket_Peek: Len=%d, Rc=%d", buf_len, rc); + #endif + + /* return length read and reset position */ + client->read.pos = 0; + } + + return rc; +} +#endif + int MqttSocket_Connect(MqttClient *client, const char* host, word16 port, int timeout_ms, int use_tls, MqttTlsCb cb) { diff --git a/wolfmqtt/include.am b/wolfmqtt/include.am index fe55b41d1..a97690651 100644 --- a/wolfmqtt/include.am +++ b/wolfmqtt/include.am @@ -5,7 +5,7 @@ nobase_include_HEADERS+= \ wolfmqtt/version.h \ wolfmqtt/mqtt_types.h \ - wolfmqtt/mqtt_client.h \ + wolfmqtt/mqtt_client.h \ wolfmqtt/mqtt_packet.h \ wolfmqtt/mqtt_socket.h \ wolfmqtt/visibility.h \ diff --git a/wolfmqtt/mqtt_client.h b/wolfmqtt/mqtt_client.h index b9b2dee81..d2586eb66 100644 --- a/wolfmqtt/mqtt_client.h +++ b/wolfmqtt/mqtt_client.h @@ -417,6 +417,188 @@ WOLFMQTT_API const char* MqttClient_ReturnCodeToString( "no support for error strings built in" #endif /* WOLFMQTT_NO_ERROR_STRINGS */ +#ifdef WOLFMQTT_SN +/*! \brief Encodes and sends the a message to search for a gateway and + waits for the gateway info response message. + * \discussion This is a blocking function that will wait for MqttNet.read + * \param client Pointer to MqttClient structure + * \param search Pointer to SN_SearchGW structure initialized + with hop radius. + * \return MQTT_CODE_SUCCESS or MQTT_CODE_ERROR_* + (see enum MqttPacketResponseCodes) + */ +WOLFMQTT_API int SN_Client_SearchGW( + MqttClient *client, + SN_SearchGw *search); + +/*! \brief Encodes and sends the Connect packet and waits for the + Connect Acknowledgment packet. If Will is enabled, then gateway + prompts for LWT Topic and Message. + * \discussion This is a blocking function that will wait for MqttNet.read + * \param client Pointer to MqttClient structure + * \param connect Pointer to SN_Connect structure initialized + with connect parameters + * \return MQTT_CODE_SUCCESS or MQTT_CODE_ERROR_* + (see enum MqttPacketResponseCodes) + */ +WOLFMQTT_API int SN_Client_Connect( + MqttClient *client, + SN_Connect *connect); + +/*! \brief Encodes and sends the MQTT-SN Will Topic packet. If 'will' is + non-NULL, first waits for the WillTopicReq and WillMsgReq packet + before sending WillMsg. Sending a NULL 'will' indicates that + the client wishes to delete the Will topic and the Will message + stored in the server. + * \discussion This is a blocking function that will wait for MqttNet.read + * \param client Pointer to MqttClient structure + * \param will Pointer to SN_Will structure initialized + with topic and message parameters. NULL is valid. + * \return MQTT_CODE_SUCCESS or MQTT_CODE_ERROR_* + (see enum MqttPacketResponseCodes) + */ +WOLFMQTT_API int SN_Client_Will( + MqttClient *client, + SN_Will *will); + +/*! \brief Encodes and sends the MQTT-SN Will Topic Update packet. Sending + a NULL 'will' indicates that the client wishes to delete the + Will topic and the Will message stored in the server. + * \discussion This is a blocking function that will wait for MqttNet.read + * \param client Pointer to MqttClient structure + * \param will Pointer to SN_Will structure initialized + with topic and message parameters. NULL is valid. + * \return MQTT_CODE_SUCCESS or MQTT_CODE_ERROR_* + (see enum MqttPacketResponseCodes) + */ +WOLFMQTT_API int SN_Client_WillTopicUpdate(MqttClient *client, SN_Will *will); + +/*! \brief Encodes and sends the MQTT-SN Will Message Update packet. + Sending a NULL 'will' indicates that the client wishes to + delete the Will topic and the Will message stored in the server. + * \discussion This is a blocking function that will wait for MqttNet.read + * \param client Pointer to MqttClient structure + * \param will Pointer to SN_Will structure initialized + with topic and message parameters. NULL is valid. + * \return MQTT_CODE_SUCCESS or MQTT_CODE_ERROR_* + (see enum MqttPacketResponseCodes) + */ +WOLFMQTT_API int SN_Client_WillMsgUpdate(MqttClient *client, SN_Will *will); + +/*! \brief Encodes and sends the MQTT-SN Register packet and waits for the + Register Acknowledge packet. The Register packet is sent by a + client to a GW for requesting a topic id value for the included + topic name. It is also sent by a GW to inform a client about + the topic id value it has assigned to the included topic name. + * \discussion This is a blocking function that will wait for MqttNet.read + * \param client Pointer to MqttClient structure + * \param regist Pointer to SN_Register structure + * \return MQTT_CODE_SUCCESS or MQTT_CODE_ERROR_* + (see enum MqttPacketResponseCodes) + */ +WOLFMQTT_API int SN_Client_Register( + MqttClient *client, + SN_Register *regist); + +/*! \brief Encodes and sends the MQTT-SN Publish packet and waits for the + Publish response (if QoS > 0). + * \discussion This is a blocking function that will wait for MqttNet.read + * If QoS level = 1 then will wait for PUBLISH_ACK. + * If QoS level = 2 then will wait for PUBLISH_REC then send + PUBLISH_REL and wait for PUBLISH_COMP. + * \param client Pointer to MqttClient structure + * \param publish Pointer to SN_Publish structure initialized + with message data + * Note: SN_Publish and MqttMessage are same + structure. + * \return MQTT_CODE_SUCCESS or MQTT_CODE_ERROR_* + (see enum MqttPacketResponseCodes) + */ +WOLFMQTT_API int SN_Client_Publish( + MqttClient *client, + SN_Publish *publish); + +/*! \brief Encodes and sends the MQTT-SN Subscribe packet and waits for the + Subscribe Acknowledgment packet containing the assigned + topic ID. + * \discussion This is a blocking function that will wait for MqttNet.read + * \param client Pointer to MqttClient structure + * \param subscribe Pointer to SN_Subscribe structure initialized with + subscription topic list and desired QoS. + * \return MQTT_CODE_SUCCESS or MQTT_CODE_ERROR_* + (see enum MqttPacketResponseCodes) + */ +WOLFMQTT_API int SN_Client_Subscribe( + MqttClient *client, + SN_Subscribe *subscribe); + +/*! \brief Encodes and sends the MQTT-SN Unsubscribe packet and waits for + the Unsubscribe Acknowledgment packet + * \discussion This is a blocking function that will wait for MqttNet.read + * \param client Pointer to MqttClient structure + * \param unsubscribe Pointer to SN_Unsubscribe structure initialized + with topic ID. + * \return MQTT_CODE_SUCCESS or MQTT_CODE_ERROR_* + (see enum MqttPacketResponseCodes) + */ +WOLFMQTT_API int SN_Client_Unsubscribe( + MqttClient *client, + SN_Unsubscribe *unsubscribe); + +/*! \brief Encodes and sends the MQTT-SN Disconnect packet. Client may + send the disconnect with a duration to indicate the client is + entering the "asleep" state. + * \discussion This is a non-blocking function that will try and send using + MqttNet.write + * \param client Pointer to MqttClient structure + * \return MQTT_CODE_SUCCESS or MQTT_CODE_ERROR_* + (see enum MqttPacketResponseCodes) + */ +WOLFMQTT_API int SN_Client_Disconnect( + MqttClient *client); + +/*! \brief Encodes and sends the MQTT-SN Disconnect packet. Client may + send the disconnect with a duration to indicate the client is + entering the "asleep" state. + * \discussion This is a non-blocking function that will try and send using + MqttNet.write + * \param client Pointer to MqttClient structure + * \param disconnect Pointer to SN_Disconnect structure. NULL is valid. + * \return MQTT_CODE_SUCCESS or MQTT_CODE_ERROR_* + (see enum MqttPacketResponseCodes) + */ +WOLFMQTT_API int SN_Client_Disconnect_ex( + MqttClient *client, + SN_Disconnect *disconnect); + + +/*! \brief Encodes and sends the MQTT-SN Ping Request packet and waits + for the Ping Response packet. If client is in the "asleep" + state and wants to notify the gateway that it is entering the + "awake" state, it should add it's client ID to the ping + request. + * \discussion This is a blocking function that will wait for MqttNet.read + * \param client Pointer to MqttClient structure + * \param ping Pointer to SN_PingReq structure. NULL is valid. + * \return MQTT_CODE_SUCCESS or MQTT_CODE_ERROR_* + (see enum MqttPacketResponseCodes) + */ +WOLFMQTT_API int SN_Client_Ping( + MqttClient *client, + SN_PingReq *ping); + +/*! \brief Waits for packets to arrive. Incoming publish messages + will arrive via callback provided in MqttClient_Init. + * \discussion This is a blocking function that will wait for MqttNet.read + * \param client Pointer to MqttClient structure + * \param timeout_ms Milliseconds until read timeout + * \return MQTT_CODE_SUCCESS or MQTT_CODE_ERROR_* + (see enum MqttPacketResponseCodes) + */ +WOLFMQTT_API int SN_Client_WaitMessage( + MqttClient *client, + int timeout_ms); +#endif /* WOLFMQTT_SN */ #ifdef __cplusplus } /* extern "C" */ diff --git a/wolfmqtt/mqtt_packet.h b/wolfmqtt/mqtt_packet.h index 8b4289256..76647027d 100644 --- a/wolfmqtt/mqtt_packet.h +++ b/wolfmqtt/mqtt_packet.h @@ -192,6 +192,10 @@ typedef struct _MqttMessage { MqttQoS qos; byte retain; byte duplicate; +#ifdef WOLFMQTT_SN + byte topic_type; + byte return_code; +#endif const char *topic_name; /* Pointer is valid only when msg_new set in callback */ word16 topic_name_len; @@ -576,6 +580,293 @@ WOLFMQTT_LOCAL MqttProp* MqttProps_FindType(MqttProp *head, MqttPropertyType type); #endif +#ifdef WOLFMQTT_SN + +/* Note that because MQTT-SN does not support message fragmentation and + reassembly, the maximum message length that could be used in a network is + governed by the maximum packet size that is supported by that network, + and not by the maximum length that could be encoded by MQTT-SN. */ +#define WOLFMQTT_SN_MAXPACKET_SIZE 1024 + +/* The SN_GwAddr field has a variable length and contains the address of a GW. + Its depends on the network over which MQTT-SN operates and is indicated in + the first octet of this field. For example, in a ZigBee network the network + address is 2-octet long. */ +typedef word16 SN_GwAddr ; + +/* RETURN CODE values */ +enum SN_ReturnCodes { + SN_RC_ACCEPTED = 0x00, + SN_RC_CONGESTION = 0x01, + SN_RC_INVTOPICNAME = 0x02, + SN_RC_NOTSUPPORTED = 0x03 + /* 0x04 - 0xFF reserved */ +}; + +/* MESSAGE HEADER */ +/* Message types: Located in last byte of header */ +typedef enum _SN_MsgType { + SN_MSG_TYPE_ADVERTISE = 0x00, + SN_MSG_TYPE_SEARCHGW = 0x01, + SN_MSG_TYPE_GWINFO = 0x02, + /* 0x03 reserved */ + SN_MSG_TYPE_CONNECT = 0x04, + SN_MSG_TYPE_CONNACK = 0x05, + SN_MSG_TYPE_WILLTOPICREQ = 0x06, + SN_MSG_TYPE_WILLTOPIC = 0x07, + SN_MSG_TYPE_WILLMSGREQ = 0x08, + SN_MSG_TYPE_WILLMSG = 0x09, + SN_MSG_TYPE_REGISTER = 0x0A, + SN_MSG_TYPE_REGACK = 0x0B, + SN_MSG_TYPE_PUBLISH = 0x0C, + SN_MSG_TYPE_PUBACK = 0x0D, + SN_MSG_TYPE_PUBCOMP = 0x0E, + SN_MSG_TYPE_PUBREC = 0x0F, + SN_MSG_TYPE_PUBREL = 0x10, + /* 0x11 reserved */ + SN_MSG_TYPE_SUBSCRIBE = 0x12, + SN_MSG_TYPE_SUBACK = 0x13, + SN_MSG_TYPE_UNSUBSCRIBE = 0x14, + SN_MSG_TYPE_UNSUBACK = 0x15, + SN_MSG_TYPE_PING_REQ = 0x16, + SN_MSG_TYPE_PING_RESP = 0x17, + SN_MSG_TYPE_DISCONNECT = 0x18, + /* 0x19 reserved */ + SN_MSG_TYPE_WILLTOPICUPD = 0x1A, + SN_MSG_TYPE_WILLTOPICRESP = 0x1B, + SN_MSG_TYPE_WILLMSGUPD = 0x1C, + SN_MSG_TYPE_WILLMSGRESP = 0x1D, + /* 0x1E - 0xFD reserved */ + SN_MSG_TYPE_ENCAPMSG = 0xFE /* Encapsulated message */ + /* 0xFF reserved */ +} SN_MsgType; + +/* Topic ID types */ +enum SN_TopicId_Types { + SN_TOPIC_ID_TYPE_NORMAL = 0x0, + SN_TOPIC_ID_TYPE_PREDEF = 0x1, + SN_TOPIC_ID_TYPE_SHORT = 0x2 +}; + +enum SN_PacketFlags { + SN_PACKET_FLAG_TOPICIDTYPE_MASK = 0x3, + SN_PACKET_FLAG_CLEANSESSION = 0x4, + SN_PACKET_FLAG_WILL = 0x8, + SN_PACKET_FLAG_RETAIN = 0x10, + SN_PACKET_FLAG_QOS_MASK = 0x60, + SN_PACKET_FLAG_QOS_SHIFT = 0x5, + SN_PACKET_FLAG_DUPLICATE = 0x80 +}; + +/* Message Header: Size is variable 2 or 4 bytes */ +#define SN_MSG_MAX_LEN_BYTES 3 + +/* If the first byte of the packet len is 0x01, then the packet size is + greater than 0xFF and is stored in the next two bytes */ +#define SN_PACKET_LEN_IND 0x01 + +#define SN_PACKET_MAX_SMALL_SIZE 0xFF + +/* Gateway (GW) messages */ +/* Advertise message */ +typedef struct _SN_AdvertiseMsg { + byte gwId; /* ID of the gateway that sent this message */ + word16 duration; /* Seconds until next Advertise + is broadcast by this gateway */ +} SN_Advertise; + +typedef struct _SN_GwInfo { + byte gwId; /* ID of the gateway that sent this message */ + SN_GwAddr* gwAddr; /* Address of the indicated gateway */ +} SN_GwInfo; + +typedef struct _SN_SearchGw { + MqttMsgStat stat; + byte radius; /* Broadcast radius (in hops) */ + SN_GwInfo gwInfo; +} SN_SearchGw; + +/* Connect Protocol */ +#define SN_PROTOCOL_ID_1 0x01 +#define SN_PROTOCOL_ID SN_PROTOCOL_ID_1 + +#define SN_CLIENTID_MAX_LEN 23 + +/* Connect Ack message structure */ +typedef struct _SN_ConnectAck { + byte return_code; +} SN_ConnectAck; + +typedef struct _SN_Will { + byte qos; + byte retain; + const char* willTopic; + byte* willMsg; + word16 willMsgLen; +} SN_Will; + +/* Connect */ +typedef struct _SN_Connect { + MqttMsgStat stat; + word16 keep_alive_sec; + byte clean_session; + const char *client_id; + + /* Protocol version: 1=v1.2 (default) */ + byte protocol_level; + + /* Optional Last will and testament */ + byte enable_lwt; + SN_Will will; + + /* Ack data */ + SN_ConnectAck ack; +} SN_Connect; + +/* REGISTER protocol */ +typedef struct _SN_RegAck { + word16 topicId; + word16 packet_id; + byte return_code; +} SN_RegAck; + +typedef struct _SN_Register { + MqttMsgStat stat; + word16 topicId; + word16 packet_id; + const char* topicName; + SN_RegAck regack; +} SN_Register; + +/* PUBLISH protocol */ +typedef MqttMessage SN_Publish; + +/* PUBLISH RESPONSE */ +/* This is the response struct for PUBREC, PUBREL, and PUBCOMP */ +/* If QoS = 0: No response */ +/* If QoS = 1: Expect response packet with type = SN_MSG_TYPE_PUBACK */ +/* If QoS = 2: Expect response packet with type = SN_MSG_TYPE_PUBREC */ +/* Message ID required if QoS is 1 or 2 */ +/* If QoS = 2: Send SN_MSG_TYPE_PUBREL with msgId to complete + QoS2 protocol exchange */ +/* Expect response packet with type = SN_MSG_TYPE_PUBCOMP */ +typedef struct _SN_PublishResp { + word16 packet_id; + word16 topicId; /* PUBACK Only */ + byte return_code; /* PUBACK Only */ +} SN_PublishResp; + +/* SUBSCRIBE ACK */ +typedef struct _SN_SubAck { + byte flags; + word16 topicId; + word16 packet_id; + byte return_code; +} SN_SubAck; + +/* SUBSCRIBE */ +typedef struct _SN_Subscribe { + MqttMsgStat stat; + byte duplicate; + byte qos; + word16 packet_id; + byte topic_type; + const char* topicNameId; /* Contains topic name, ID, + or short name as indicated in topic type */ + SN_SubAck subAck; +} SN_Subscribe; + +/* UNSUBSCRIBE */ +typedef SN_Subscribe SN_Unsubscribe; + +/* UNSUBSCRIBE RESPONSE ACK */ +typedef MqttUnsubscribeAck SN_UnsubscribeAck; + +/* PING / PING RESPONSE */ +typedef struct _SN_PingReq { + /* clientId is optional and is included by a “sleeping” client when it + goes to the “awake” state and is waiting for messages sent by the + server/gateway. */ + char *clientId; +} SN_PingReq; + +/* DISCONNECT */ +typedef struct _SN_Disconnect { + /* sleepTmr is optional and is included by a “sleeping” client + that wants to go the “asleep” state. */ + word16 sleepTmr; +} SN_Disconnect; + +/* WILL TOPIC */ +typedef struct _SN_WillTopicUpd { + byte flags; + char* willTopic; /* contains the Will topic name */ +} SN_WillTopicUpd; + +typedef struct _SN_WillMsgUpd { + char* willMsg; +} SN_WillMsgUpd; + +typedef struct _SN_WillTopicResp { + byte return_code; +} SN_WillTopicResp; + +typedef SN_WillTopicResp SN_WillMsgResp; + + +/* Forward Encapsulation */ +// TODO + + +WOLFMQTT_LOCAL int SN_Decode_Advertise(byte *rx_buf, int rx_buf_len, + SN_Advertise *gw_info); +WOLFMQTT_LOCAL int SN_Encode_SearchGW(byte *tx_buf, int tx_buf_len, byte hops); +WOLFMQTT_LOCAL int SN_Decode_GWInfo(byte *rx_buf, int rx_buf_len, + SN_GwInfo *gw_info); +WOLFMQTT_LOCAL int SN_Encode_Connect(byte *tx_buf, int tx_buf_len, + SN_Connect *connect); +WOLFMQTT_LOCAL int SN_Decode_ConnectAck(byte *rx_buf, int rx_buf_len, + SN_ConnectAck *connect_ack); +WOLFMQTT_LOCAL int SN_Decode_WillTopicReq(byte *rx_buf, int rx_buf_len); +WOLFMQTT_LOCAL int SN_Encode_WillTopic(byte *tx_buf, int tx_buf_len, + SN_Will *willTopic); +WOLFMQTT_LOCAL int SN_Decode_WillMsgReq(byte *rx_buf, int rx_buf_len); +WOLFMQTT_LOCAL int SN_Encode_WillMsg(byte *tx_buf, int tx_buf_len, + SN_Will *willMsg); +WOLFMQTT_LOCAL int SN_Encode_WillTopicUpdate(byte *tx_buf, int tx_buf_len, + SN_Will *willTopic); +WOLFMQTT_LOCAL int SN_Decode_WillTopicResponse(byte *rx_buf, int rx_buf_len); +WOLFMQTT_LOCAL int SN_Encode_WillMsgUpdate(byte *tx_buf, int tx_buf_len, + SN_Will *willMsg); +WOLFMQTT_LOCAL int SN_Decode_WillMsgResponse(byte *rx_buf, int rx_buf_len); +WOLFMQTT_LOCAL int SN_Encode_Register(byte *tx_buf, int tx_buf_len, + SN_Register *regist); +WOLFMQTT_LOCAL int SN_Decode_RegAck(byte *rx_buf, int rx_buf_len, + SN_RegAck *regack); +WOLFMQTT_LOCAL int SN_Encode_Subscribe(byte *tx_buf, int tx_buf_len, + SN_Subscribe *subscribe); +WOLFMQTT_LOCAL int SN_Decode_SubscribeAck(byte* rx_buf, int rx_buf_len, + SN_SubAck *subscribe_ack); +WOLFMQTT_LOCAL int SN_Encode_Publish(byte *tx_buf, int tx_buf_len, + MqttPublish *publish); +WOLFMQTT_LOCAL int SN_Decode_Publish(byte *rx_buf, int rx_buf_len, + MqttPublish *publish); +WOLFMQTT_LOCAL int SN_Encode_PublishResp(byte* tx_buf, int tx_buf_len, + byte type, SN_PublishResp *publish_resp); +WOLFMQTT_LOCAL int SN_Decode_PublishResp(byte* rx_buf, int rx_buf_len, + byte type, SN_PublishResp *publish_resp); +WOLFMQTT_LOCAL int SN_Encode_Unsubscribe(byte *tx_buf, int tx_buf_len, + SN_Unsubscribe *unsubscribe); +WOLFMQTT_LOCAL int SN_Decode_UnsubscribeAck(byte *rx_buf, int rx_buf_len, + SN_UnsubscribeAck *unsubscribe_ack); +WOLFMQTT_LOCAL int SN_Encode_Disconnect(byte *tx_buf, int tx_buf_len, + SN_Disconnect* disconnect); +WOLFMQTT_LOCAL int SN_Encode_Ping(byte *tx_buf, int tx_buf_len, + SN_PingReq *ping); +WOLFMQTT_LOCAL int SN_Decode_Ping(byte *rx_buf, int rx_buf_len); +WOLFMQTT_LOCAL int SN_Packet_Read(struct _MqttClient *client, byte* rx_buf, + int rx_buf_len, int timeout_ms); +#endif #ifdef __cplusplus } /* extern "C" */ diff --git a/wolfmqtt/mqtt_socket.h b/wolfmqtt/mqtt_socket.h index a87f48505..36bcd3782 100755 --- a/wolfmqtt/mqtt_socket.h +++ b/wolfmqtt/mqtt_socket.h @@ -65,9 +65,13 @@ typedef int (*MqttNetWriteCb)(void *context, const byte* buf, int buf_len, int timeout_ms); typedef int (*MqttNetReadCb)(void *context, byte* buf, int buf_len, int timeout_ms); +#ifdef WOLFMQTT_SN +typedef int (*MqttNetPeekCb)(void *context, + byte* buf, int buf_len, int timeout_ms); +#endif typedef int (*MqttNetDisconnectCb)(void *context); -/* Strucutre for Network Security */ +/* Structure for Network Security */ #ifdef ENABLE_MQTT_TLS typedef struct _MqttTls { WOLFSSL_CTX *ctx; @@ -84,18 +88,26 @@ typedef struct _MqttNet { MqttNetReadCb read; MqttNetWriteCb write; MqttNetDisconnectCb disconnect; +#ifdef WOLFMQTT_SN + MqttNetPeekCb peek; + void *multi_ctx; +#endif } MqttNet; /* MQTT SOCKET APPLICATION INTERFACE */ WOLFMQTT_LOCAL int MqttSocket_Init(struct _MqttClient *client, MqttNet* net); -WOLFMQTT_LOCAL int MqttSocket_Write(struct _MqttClient *client, const byte* buf, int buf_len, - int timeout_ms); -WOLFMQTT_LOCAL int MqttSocket_Read(struct _MqttClient *client, byte* buf, int buf_len, - int timeout_ms); - -WOLFMQTT_LOCAL int MqttSocket_Connect(struct _MqttClient *client, const char* host, - word16 port, int timeout_ms, int use_tls, MqttTlsCb cb); +WOLFMQTT_LOCAL int MqttSocket_Write(struct _MqttClient *client, const byte* buf, + int buf_len, int timeout_ms); +WOLFMQTT_LOCAL int MqttSocket_Read(struct _MqttClient *client, byte* buf, + int buf_len, int timeout_ms); +#ifdef WOLFMQTT_SN +WOLFMQTT_LOCAL int MqttSocket_Peek(struct _MqttClient *client, byte* buf, + int buf_len, int timeout_ms); +#endif +WOLFMQTT_LOCAL int MqttSocket_Connect(struct _MqttClient *client, + const char* host, word16 port, int timeout_ms, int use_tls, + MqttTlsCb cb); WOLFMQTT_LOCAL int MqttSocket_Disconnect(struct _MqttClient *client);