diff --git a/examples/mqttNonGlobalCallback/mqttNonGlobalCallback.ino b/examples/mqttNonGlobalCallback/mqttNonGlobalCallback.ino new file mode 100644 index 0000000..37d0a2a --- /dev/null +++ b/examples/mqttNonGlobalCallback/mqttNonGlobalCallback.ino @@ -0,0 +1,56 @@ +#include "MQTT.h" +#include "ISubCallback.h" + +/* + This methodology allows the mqtt driver to exist in non-global space (such as belonging to another wrapper class). + Other features here allow default constructor, with post constructed Initialization (but only a single time). + Also, the callback feature is backwards compatible with a global function callback OR the option of registering + other objects (by means of the ISubCallback interface) to a list of callback listeners. +*/ + +class Foo : ISubCallback{ +public: + Foo() { + client.Initialize("some.domain.com", NULL, 1883, MQTT_DEFAULT_KEEPALIVE, MQTT_MAX_PACKET_SIZE, NULL, true); + client.RegisterCallbackListener(this); + } + + void Update() { + if (client.isConnected()) + client.publish("outTopic/message","hello world"); + client.loop(); + else { + client.connect("sparkclient"); + client.subscribe("inTopic/message"); + } + } + + void Callback(char* topic, byte* payload, unsigned int length) { + char p[length + 1]; + memcpy(p, payload, length); + p[length] = NULL; + + if (!strcmp(p, "RED")) + RGB.color(255, 0, 0); + else if (!strcmp(p, "GREEN")) + RGB.color(0, 255, 0); + else if (!strcmp(p, "BLUE")) + RGB.color(0, 0, 255); + else + RGB.color(255, 255, 255); + delay(1000); + } +private: + MQTT client; +} + +Foo foo; + + +void setup() { + RGB.control(true); +} + +void loop() { + foo.Update(); +} diff --git a/examples/mqtttest/mqtttest.ino b/examples/mqtttest/mqtttest.ino index d3f2ba7..e64dbc9 100644 --- a/examples/mqtttest/mqtttest.ino +++ b/examples/mqtttest/mqtttest.ino @@ -1,49 +1,49 @@ #include "MQTT.h" +#include "ISubCallback.h" -void callback(char* topic, byte* payload, unsigned int length); - -/** - * if want to use IP address, - * const uint8_t[] = { XXX,XXX,XXX,XXX }; - * MQTT client(server, 1883, callback); - * want to use domain name, - * exp) iot.eclipse.org is Eclipse Open MQTT Broker: https://iot.eclipse.org/getting-started - * MQTT client("mqtt.eclipse.org", 1883, callback); - **/ -MQTT client("server_name", 1883, callback); - -// recieve message -void callback(char* topic, byte* payload, unsigned int length) { - char p[length + 1]; - memcpy(p, payload, length); - p[length] = NULL; - - if (!strcmp(p, "RED")) - RGB.color(255, 0, 0); - else if (!strcmp(p, "GREEN")) - RGB.color(0, 255, 0); - else if (!strcmp(p, "BLUE")) - RGB.color(0, 0, 255); - else - RGB.color(255, 255, 255); - delay(1000); +class Foo : ISubCallback{ +public: + Foo() { + client.Initialize("some.domain.com", NULL, 1883, MQTT_DEFAULT_KEEPALIVE, MQTT_MAX_PACKET_SIZE, NULL, true); + client.RegisterCallbackListener(this); + } + + void Update() { + if (client.isConnected()) + client.publish("outTopic/message","hello world"); + client.loop(); + else { + client.connect("sparkclient"); + client.subscribe("inTopic/message"); + } + } + + void Callback(char* topic, byte* payload, unsigned int length) { + char p[length + 1]; + memcpy(p, payload, length); + p[length] = NULL; + + if (!strcmp(p, "RED")) + RGB.color(255, 0, 0); + else if (!strcmp(p, "GREEN")) + RGB.color(0, 255, 0); + else if (!strcmp(p, "BLUE")) + RGB.color(0, 0, 255); + else + RGB.color(255, 255, 255); + delay(1000); + } +private: + MQTT client; } +Foo foo; + void setup() { RGB.control(true); - - // connect to the server - client.connect("sparkclient"); - - // publish/subscribe - if (client.isConnected()) { - client.publish("outTopic/message","hello world"); - client.subscribe("inTopic/message"); - } } void loop() { - if (client.isConnected()) - client.loop(); + foo.Update(); } diff --git a/src/ISubCallback.h b/src/ISubCallback.h new file mode 100644 index 0000000..6c255f7 --- /dev/null +++ b/src/ISubCallback.h @@ -0,0 +1,12 @@ + +#ifndef __ISUBCALLBACK_ +#define __ISUBCALLBACK_ + +class ISubCallback +{ +public: + virtual void Callback(char*, uint8_t*, unsigned int) = 0; +}; + + +#endif diff --git a/src/MQTT.cpp b/src/MQTT.cpp index 232f533..c263976 100644 --- a/src/MQTT.cpp +++ b/src/MQTT.cpp @@ -1,4 +1,5 @@ #include "MQTT.h" +#include "ISubCallback.h" #define LOGGING @@ -48,6 +49,16 @@ MQTT::~MQTT() { delete[] buffer; } +void MQTT::Initialize(const char* domain, const uint8_t *ip, uint16_t port, int keepalive, int maxpacketsize, + void (*callback)(char*,uint8_t*,unsigned int), bool thread) +{ + if (this->initialized) + { + return; + } + this->initialize(domain, ip, port, keepalive, maxpacketsize, callback, thread); +} + void MQTT::initialize(const char* domain, const uint8_t *ip, uint16_t port, int keepalive, int maxpacketsize, void (*callback)(char*,uint8_t*,unsigned int), bool thread) { if (thread) { @@ -68,6 +79,7 @@ void MQTT::initialize(const char* domain, const uint8_t *ip, uint16_t port, int if (buffer != NULL) delete[] buffer; buffer = new uint8_t[this->maxpacketsize]; + this->initialized = true; } void MQTT::setBroker(const char* domain, uint16_t port) { @@ -267,42 +279,34 @@ bool MQTT::loop() { uint8_t *payload; if (len > 0) { lastInActivity = t; - uint8_t type = buffer[0]&0xF0; + uint8_t type = buffer[0] & 0xF0; if (type == MQTTPUBLISH) { - if (callback) { - uint16_t tl = (buffer[llen+1]<<8)+buffer[llen+2]; // topic length - char topic[tl+1]; - for (uint16_t i=0;i0 - if ((buffer[0]&0x06) == MQTTQOS1_HEADER_MASK) { // QoS=1 - msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1]; - payload = buffer+llen+3+tl+2; - callback(topic,payload,len-llen-3-tl-2); - - buffer[0] = MQTTPUBACK; // respond with PUBACK + uint16_t payloadLength; + if ((buffer[0] & 0x06) == MQTTQOS1_HEADER_MASK || (buffer[0] & 0x06) == MQTTQOS2_HEADER_MASK) + { + msgId = (buffer[llen + 3 + tl] << 8) + buffer[llen + 3 + tl + 1]; + payload = buffer + llen + 3 + tl + 2; + payloadLength = len - llen - 3 - tl - 2; + + buffer[0] = (buffer[0] & 0x06) == MQTTQOS1_HEADER_MASK ? MQTTPUBACK : MQTTPUBREC; buffer[1] = 2; buffer[2] = (msgId >> 8); buffer[3] = (msgId & 0xFF); - _client.write(buffer,4); + _client.write(buffer, 4); lastOutActivity = t; - } else if ((buffer[0] & 0x06) == MQTTQOS2_HEADER_MASK) { // QoS=2 - msgId = (buffer[llen + 3 + tl] << 8) + buffer[llen + 3 + tl + 1]; - payload = buffer + llen + 3 + tl + 2; - callback(topic, payload, len - llen - 3 - tl - 2); - - buffer[0] = MQTTPUBREC; // respond with PUBREC - buffer[1] = 2; - buffer[2] = (msgId >> 8); - buffer[3] = (msgId & 0xFF); - _client.write(buffer, 4); - lastOutActivity = t; - } else { - payload = buffer+llen+3+tl; - callback(topic,payload,len-llen-3-tl); + } else { + payload = buffer + llen + 3 + tl; + payloadLength = len - llen - 3 - tl; } + doCallbacks(topic, payload, payloadLength); } } else if (type == MQTTPUBREC) { // check for the situation that QoS2 receive PUBREC, should return PUBREL @@ -548,3 +552,20 @@ void MQTT::clear() { _client.stop(); lastInActivity = lastOutActivity = millis(); } + +void MQTT::RegisterCallbackListener(ISubCallback *listener) +{ + this->callbackListeners.push_back(listener); +} + +void MQTT::doCallbacks(char* topic, uint8_t* buffer, unsigned int bufferLength) +{ + if (NULL != callback) + { + callback(topic, buffer, bufferLength); + } + for (ISubCallback *listener : callbackListeners) + { + listener->Callback(topic, buffer, bufferLength); + } +} diff --git a/src/MQTT.h b/src/MQTT.h index 4dfb96e..4f7b173 100644 --- a/src/MQTT.h +++ b/src/MQTT.h @@ -67,6 +67,7 @@ bug fixed and features pull requests #include "spark_wiring_string.h" #include "spark_wiring_tcpclient.h" #include "spark_wiring_usbserial.h" +#include // MQTT_MAX_PACKET_SIZE : Maximum packet size // this size is total of [MQTT Header(Max:5byte) + Topic Name Length + Topic Name + Message ID(QoS1|2) + Payload] @@ -101,6 +102,9 @@ bug fixed and features pull requests #define debug_print(fmt, ...) ((void)0) #endif /* DEBUG_MQTT_SERIAL_OUTPUT */ +// Forward declaration +class ISubCallback; + class MQTT { /** types */ @@ -145,12 +149,16 @@ typedef enum { uint16_t maxpacketsize; os_mutex_t mutex_lock; bool thread = false; + bool initialized = false; void initialize(const char* domain, const uint8_t *ip, uint16_t port, int keepalive, int maxpacketsize, void (*callback)(char*,uint8_t*,unsigned int), bool thread = false); bool publishRelease(uint16_t messageid); bool publishComplete(uint16_t messageid); + void doCallbacks(char*, uint8_t*, unsigned int); + std::vector callbackListeners; + class MutexLocker { MQTT * mqtt; public: @@ -191,6 +199,11 @@ typedef enum { ~MQTT(); + void Initialize(const char* domain, const uint8_t *ip, uint16_t port, int keepalive, int maxpacketsize, + void (*callback)(char*,uint8_t*,unsigned int), bool thread = false); + + void RegisterCallbackListener(ISubCallback *listener); + void setBroker(const char* domain, uint16_t port); void setBroker(const uint8_t *ip, uint16_t port);