Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modified to be compatible with more OOP paradigms. #93

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions examples/mqttNonGlobalCallback/mqttNonGlobalCallback.ino
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#include "MQTT.h"
#include "ISubCallback.h"

/*
This methodology allows the mqtt driver to exist in non-global space (such as belonging to another wrapper class).
Other features here allow default constructor, with post constructed Initialization (but only a single time).
Also, the callback feature is backwards compatible with a global function callback OR the option of registering
other objects (by means of the ISubCallback interface) to a list of callback listeners.
*/

class Foo : ISubCallback{
public:
Foo() {
client.Initialize("some.domain.com", NULL, 1883, MQTT_DEFAULT_KEEPALIVE, MQTT_MAX_PACKET_SIZE, NULL, true);
client.RegisterCallbackListener(this);
}

void Update() {
if (client.isConnected())
client.publish("outTopic/message","hello world");
client.loop();
else {
client.connect("sparkclient");
client.subscribe("inTopic/message");
}
}

void Callback(char* topic, byte* payload, unsigned int length) {
char p[length + 1];
memcpy(p, payload, length);
p[length] = NULL;

if (!strcmp(p, "RED"))
RGB.color(255, 0, 0);
else if (!strcmp(p, "GREEN"))
RGB.color(0, 255, 0);
else if (!strcmp(p, "BLUE"))
RGB.color(0, 0, 255);
else
RGB.color(255, 255, 255);
delay(1000);
}
private:
MQTT client;
}

Foo foo;


void setup() {
RGB.control(true);
}

void loop() {
foo.Update();
}
76 changes: 38 additions & 38 deletions examples/mqtttest/mqtttest.ino
Original file line number Diff line number Diff line change
@@ -1,49 +1,49 @@
#include "MQTT.h"
#include "ISubCallback.h"

void callback(char* topic, byte* payload, unsigned int length);

/**
* if want to use IP address,
* const uint8_t[] = { XXX,XXX,XXX,XXX };
* MQTT client(server, 1883, callback);
* want to use domain name,
* exp) iot.eclipse.org is Eclipse Open MQTT Broker: https://iot.eclipse.org/getting-started
* MQTT client("mqtt.eclipse.org", 1883, callback);
**/
MQTT client("server_name", 1883, callback);

// recieve message
void callback(char* topic, byte* payload, unsigned int length) {
char p[length + 1];
memcpy(p, payload, length);
p[length] = NULL;

if (!strcmp(p, "RED"))
RGB.color(255, 0, 0);
else if (!strcmp(p, "GREEN"))
RGB.color(0, 255, 0);
else if (!strcmp(p, "BLUE"))
RGB.color(0, 0, 255);
else
RGB.color(255, 255, 255);
delay(1000);
class Foo : ISubCallback{
public:
Foo() {
client.Initialize("some.domain.com", NULL, 1883, MQTT_DEFAULT_KEEPALIVE, MQTT_MAX_PACKET_SIZE, NULL, true);
client.RegisterCallbackListener(this);
}

void Update() {
if (client.isConnected())
client.publish("outTopic/message","hello world");
client.loop();
else {
client.connect("sparkclient");
client.subscribe("inTopic/message");
}
}

void Callback(char* topic, byte* payload, unsigned int length) {
char p[length + 1];
memcpy(p, payload, length);
p[length] = NULL;

if (!strcmp(p, "RED"))
RGB.color(255, 0, 0);
else if (!strcmp(p, "GREEN"))
RGB.color(0, 255, 0);
else if (!strcmp(p, "BLUE"))
RGB.color(0, 0, 255);
else
RGB.color(255, 255, 255);
delay(1000);
}
private:
MQTT client;
}

Foo foo;


void setup() {
RGB.control(true);

// connect to the server
client.connect("sparkclient");

// publish/subscribe
if (client.isConnected()) {
client.publish("outTopic/message","hello world");
client.subscribe("inTopic/message");
}
}

void loop() {
if (client.isConnected())
client.loop();
foo.Update();
}
12 changes: 12 additions & 0 deletions src/ISubCallback.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@

#ifndef __ISUBCALLBACK_
#define __ISUBCALLBACK_

class ISubCallback
{
public:
virtual void Callback(char*, uint8_t*, unsigned int) = 0;
};


#endif
75 changes: 48 additions & 27 deletions src/MQTT.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "MQTT.h"
#include "ISubCallback.h"

#define LOGGING

Expand Down Expand Up @@ -48,6 +49,16 @@ MQTT::~MQTT() {
delete[] buffer;
}

void MQTT::Initialize(const char* domain, const uint8_t *ip, uint16_t port, int keepalive, int maxpacketsize,
void (*callback)(char*,uint8_t*,unsigned int), bool thread)
{
if (this->initialized)
{
return;
}
this->initialize(domain, ip, port, keepalive, maxpacketsize, callback, thread);
}

void MQTT::initialize(const char* domain, const uint8_t *ip, uint16_t port, int keepalive, int maxpacketsize,
void (*callback)(char*,uint8_t*,unsigned int), bool thread) {
if (thread) {
Expand All @@ -68,6 +79,7 @@ void MQTT::initialize(const char* domain, const uint8_t *ip, uint16_t port, int
if (buffer != NULL)
delete[] buffer;
buffer = new uint8_t[this->maxpacketsize];
this->initialized = true;
}

void MQTT::setBroker(const char* domain, uint16_t port) {
Expand Down Expand Up @@ -267,42 +279,34 @@ bool MQTT::loop() {
uint8_t *payload;
if (len > 0) {
lastInActivity = t;
uint8_t type = buffer[0]&0xF0;
uint8_t type = buffer[0] & 0xF0;
if (type == MQTTPUBLISH) {
if (callback) {
uint16_t tl = (buffer[llen+1]<<8)+buffer[llen+2]; // topic length
char topic[tl+1];
for (uint16_t i=0;i<tl;i++) {
topic[i] = buffer[llen+3+i];
if (true) {
uint16_t tl = (buffer[llen + 1] << 8) + buffer[llen + 2]; // topic length
char topic[tl + 1];
for (uint16_t i = 0; i < tl; i++) {
topic[i] = buffer[llen + 3 + i];
}
topic[tl] = 0;
// msgId only present for QOS>0
if ((buffer[0]&0x06) == MQTTQOS1_HEADER_MASK) { // QoS=1
msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1];
payload = buffer+llen+3+tl+2;
callback(topic,payload,len-llen-3-tl-2);

buffer[0] = MQTTPUBACK; // respond with PUBACK
uint16_t payloadLength;
if ((buffer[0] & 0x06) == MQTTQOS1_HEADER_MASK || (buffer[0] & 0x06) == MQTTQOS2_HEADER_MASK)
{
msgId = (buffer[llen + 3 + tl] << 8) + buffer[llen + 3 + tl + 1];
payload = buffer + llen + 3 + tl + 2;
payloadLength = len - llen - 3 - tl - 2;

buffer[0] = (buffer[0] & 0x06) == MQTTQOS1_HEADER_MASK ? MQTTPUBACK : MQTTPUBREC;
buffer[1] = 2;
buffer[2] = (msgId >> 8);
buffer[3] = (msgId & 0xFF);
_client.write(buffer,4);
_client.write(buffer, 4);
lastOutActivity = t;
} else if ((buffer[0] & 0x06) == MQTTQOS2_HEADER_MASK) { // QoS=2
msgId = (buffer[llen + 3 + tl] << 8) + buffer[llen + 3 + tl + 1];
payload = buffer + llen + 3 + tl + 2;
callback(topic, payload, len - llen - 3 - tl - 2);

buffer[0] = MQTTPUBREC; // respond with PUBREC
buffer[1] = 2;
buffer[2] = (msgId >> 8);
buffer[3] = (msgId & 0xFF);
_client.write(buffer, 4);
lastOutActivity = t;
} else {
payload = buffer+llen+3+tl;
callback(topic,payload,len-llen-3-tl);
} else {
payload = buffer + llen + 3 + tl;
payloadLength = len - llen - 3 - tl;
}
doCallbacks(topic, payload, payloadLength);
}
} else if (type == MQTTPUBREC) {
// check for the situation that QoS2 receive PUBREC, should return PUBREL
Expand Down Expand Up @@ -548,3 +552,20 @@ void MQTT::clear() {
_client.stop();
lastInActivity = lastOutActivity = millis();
}

void MQTT::RegisterCallbackListener(ISubCallback *listener)
{
this->callbackListeners.push_back(listener);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For thread enable, MutexLocker lock(this) is needed before push_back.

}

void MQTT::doCallbacks(char* topic, uint8_t* buffer, unsigned int bufferLength)
{
if (NULL != callback)
{
callback(topic, buffer, bufferLength);
}
for (ISubCallback *listener : callbackListeners)
{
listener->Callback(topic, buffer, bufferLength);
}
}
13 changes: 13 additions & 0 deletions src/MQTT.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ bug fixed and features pull requests
#include "spark_wiring_string.h"
#include "spark_wiring_tcpclient.h"
#include "spark_wiring_usbserial.h"
#include <vector>

// MQTT_MAX_PACKET_SIZE : Maximum packet size
// this size is total of [MQTT Header(Max:5byte) + Topic Name Length + Topic Name + Message ID(QoS1|2) + Payload]
Expand Down Expand Up @@ -101,6 +102,9 @@ bug fixed and features pull requests
#define debug_print(fmt, ...) ((void)0)
#endif /* DEBUG_MQTT_SERIAL_OUTPUT */

// Forward declaration
class ISubCallback;


class MQTT {
/** types */
Expand Down Expand Up @@ -145,12 +149,16 @@ typedef enum {
uint16_t maxpacketsize;
os_mutex_t mutex_lock;
bool thread = false;
bool initialized = false;

void initialize(const char* domain, const uint8_t *ip, uint16_t port, int keepalive, int maxpacketsize,
void (*callback)(char*,uint8_t*,unsigned int), bool thread = false);
bool publishRelease(uint16_t messageid);
bool publishComplete(uint16_t messageid);

void doCallbacks(char*, uint8_t*, unsigned int);
std::vector<ISubCallback*> callbackListeners;

class MutexLocker {
MQTT * mqtt;
public:
Expand Down Expand Up @@ -191,6 +199,11 @@ typedef enum {

~MQTT();

void Initialize(const char* domain, const uint8_t *ip, uint16_t port, int keepalive, int maxpacketsize,
void (*callback)(char*,uint8_t*,unsigned int), bool thread = false);

void RegisterCallbackListener(ISubCallback *listener);

void setBroker(const char* domain, uint16_t port);
void setBroker(const uint8_t *ip, uint16_t port);

Expand Down