Skip to content

Commit

Permalink
Fix for #25: allow multiple nodes with different connection strings
Browse files Browse the repository at this point in the history
  • Loading branch information
pierreca committed Apr 4, 2017
1 parent 09304e4 commit 10e2ad0
Showing 1 changed file with 118 additions and 121 deletions.
239 changes: 118 additions & 121 deletions device/node-red/azureiothub/azureiothub.js
Original file line number Diff line number Diff line change
@@ -1,127 +1,133 @@
module.exports = function (RED) {

var Client = require('azure-iot-device').Client;

var Protocols = {
amqp: require('azure-iot-device-amqp').Amqp,
mqtt: require('azure-iot-device-mqtt').Mqtt,
http: require('azure-iot-device-http').Http,
amqpWs: require('azure-iot-device-amqp').AmqpWs
};

var Message = require('azure-iot-device').Message;

var client = null;
var clientConnectionString = "";
var clientProtocol = "";
var node = null;
var nodeConfig = null;

var statusEnum = {
disconnected: { color: "red", text: "Disconnected" },
connected: { color: "green", text: "Connected" },
sent: { color: "blue", text: "Sent message" },
received: { color: "yellow", text: "Received" },
error: { color: "grey", text: "Error" }
};

var setStatus = function (status) {
node.status({ fill: status.color, shape: "dot", text: status.text });
}
// Main function called by Node-RED
function AzureIoTHubNode(config) {
// Store node for further use
var node = this;
var nodeConfig = config;

var sendData = function (data) {
node.log('Sending Message to Azure IoT Hub :\n Payload: ' + data.toString());
// Create a message and send it to the IoT Hub every second
var message = new Message(data);
client.sendEvent(message, function (err, res) {
if (err) {
node.error('Error while trying to send message:' + err.toString());
setStatus(statusEnum.error);
} else {
node.log('Message sent.');
setStatus(statusEnum.sent);
}
});
};

var sendMessageToIoTHub = function (message, reconnect) {
if (!client || reconnect) {
node.log('Connection to IoT Hub not established or configuration changed. Reconnecting.');
// Update the connection string
clientConnectionString = node.credentials.connectionstring;
// update the protocol
clientProtocol = nodeConfig.protocol;

// If client was previously connected, disconnect first
if (client)
disconnectFromIoTHub();

// Connect the IoT Hub
connectToIoTHub(message);
} else {
sendData(message);
var Client = require('azure-iot-device').Client;

var Protocols = {
amqp: require('azure-iot-device-amqp').Amqp,
mqtt: require('azure-iot-device-mqtt').Mqtt,
http: require('azure-iot-device-http').Http,
amqpWs: require('azure-iot-device-amqp').AmqpWs
};

var Message = require('azure-iot-device').Message;

var client = null;
var clientConnectionString = "";
var clientProtocol = "";

var statusEnum = {
disconnected: { color: "red", text: "Disconnected" },
connected: { color: "green", text: "Connected" },
sent: { color: "blue", text: "Sent message" },
received: { color: "yellow", text: "Received" },
error: { color: "grey", text: "Error" }
};

// Helper function to print results in the console
function printResultFor(op) {
return function printResult(err, res) {
if (err) node.error(op + ' error: ' + err.toString());
if (res) node.log(op + ' status: ' + res.constructor.name);
};
}
};

var connectToIoTHub = function (pendingMessage) {
node.log('Connecting to Azure IoT Hub:\n Protocol: ' + clientProtocol + '\n Connection string :' + clientConnectionString);
client = Client.fromConnectionString(clientConnectionString, Protocols[clientProtocol]);
client.open(function (err) {
if (err) {
node.error('Could not connect: ' + err.message);
setStatus(statusEnum.disconnected);
} else {
node.log('Connected to Azure IoT Hub.');
setStatus(statusEnum.connected);

// Check if a message is pending and send it
if (pendingMessage) {
node.log('Message is pending. Sending it to Azure IoT Hub.');
// Send the pending message
sendData(pendingMessage);
}

client.on('message', function (msg) {
// We received a message
node.log('Message received from Azure IoT Hub\n Id: ' + msg.messageId + '\n Payload: ' + msg.data);
var outpuMessage = new Message();
outpuMessage.payload = msg.data;
setStatus(statusEnum.received);
node.send(outpuMessage);
client.complete(msg, printResultFor('Completed'));
});
var setStatus = function (status) {
node.status({ fill: status.color, shape: "dot", text: status.text });
}

client.on('error', function (err) {
node.error(err.message);
var sendData = function (data) {
node.log('Sending Message to Azure IoT Hub :\n Payload: ' + data.toString());
// Create a message and send it to the IoT Hub every second
var message = new Message(data);
client.sendEvent(message, function (err, res) {
if (err) {
node.error('Error while trying to send message:' + err.toString());
setStatus(statusEnum.error);
} else {
node.log('Message sent.');
setStatus(statusEnum.sent);
}
});
};

});
var sendMessageToIoTHub = function (message, reconnect) {
if (!client || reconnect) {
node.log('Connection to IoT Hub not established or configuration changed. Reconnecting.');
// Update the connection string
clientConnectionString = node.credentials.connectionstring;
// update the protocol
clientProtocol = nodeConfig.protocol;

client.on('disconnect', function () {
// If client was previously connected, disconnect first
if (client)
disconnectFromIoTHub();
});

// Connect the IoT Hub
connectToIoTHub(message);
} else {
sendData(message);
}
});
};

var disconnectFromIoTHub = function () {
if (client) {
node.log('Disconnecting from Azure IoT Hub');
client.removeAllListeners();
client.close(printResultFor('close'));
client = null;
setStatus(statusEnum.disconnected);
}
};
};

function nodeConfigUpdated(cs, proto) {
return ((clientConnectionString != cs) || (clientProtocol != proto));
}
var connectToIoTHub = function (pendingMessage) {
node.log('Connecting to Azure IoT Hub:\n Protocol: ' + clientProtocol + '\n Connection string :' + clientConnectionString);
client = Client.fromConnectionString(clientConnectionString, Protocols[clientProtocol]);
client.open(function (err) {
if (err) {
node.error('Could not connect: ' + err.message);
setStatus(statusEnum.disconnected);
} else {
node.log('Connected to Azure IoT Hub.');
setStatus(statusEnum.connected);

// Check if a message is pending and send it
if (pendingMessage) {
node.log('Message is pending. Sending it to Azure IoT Hub.');
// Send the pending message
sendData(pendingMessage);
}

client.on('message', function (msg) {
// We received a message
node.log('Message received from Azure IoT Hub\n Id: ' + msg.messageId + '\n Payload: ' + msg.data);
var outpuMessage = new Message();
outpuMessage.payload = msg.data;
setStatus(statusEnum.received);
node.send(outpuMessage);
client.complete(msg, printResultFor('Completed'));
});

client.on('error', function (err) {
node.error(err.message);

});

client.on('disconnect', function () {
disconnectFromIoTHub();
});
}
});
};

// Main function called by Node-RED
function AzureIoTHubNode(config) {
// Store node for further use
node = this;
nodeConfig = config;
var disconnectFromIoTHub = function () {
if (client) {
node.log('Disconnecting from Azure IoT Hub');
client.removeAllListeners();
client.close(printResultFor('close'));
client = null;
setStatus(statusEnum.disconnected);
}
};

function nodeConfigUpdated(cs, proto) {
return ((clientConnectionString != cs) || (clientProtocol != proto));
}

// Create the Node-RED node
RED.nodes.createNode(this, config);
Expand All @@ -146,13 +152,4 @@ module.exports = function (RED) {
protocol: { value: "amqp" }
}
});


// Helper function to print results in the console
function printResultFor(op) {
return function printResult(err, res) {
if (err) node.error(op + ' error: ' + err.toString());
if (res) node.log(op + ' status: ' + res.constructor.name);
};
}
}

0 comments on commit 10e2ad0

Please sign in to comment.