diff --git a/common/transport/amqp/devdoc/amqp_requirements.md b/common/transport/amqp/devdoc/amqp_requirements.md index de4c9b585..67737bb64 100644 --- a/common/transport/amqp/devdoc/amqp_requirements.md +++ b/common/transport/amqp/devdoc/amqp_requirements.md @@ -49,17 +49,17 @@ A string containing the version of the SDK used for telemetry purposes**]** ### connect(done) Establishes a connection using the AMQP protocol with the IoT Hub instance. -**SRS_NODE_COMMON_AMQP_06_001: [** The connect method shall accept 3 parameters: +**SRS_NODE_COMMON_AMQP_06_001: [** The `connect` method shall accept 3 parameters: A uri that will be used for the connection. A possibly undefined sslOptions structure. A done callback. **]** -**SRS_NODE_COMMON_AMQP_06_002: [** The connect method shall throw a ReferenceError if the uri parameter has not been supplied.**]** +**SRS_NODE_COMMON_AMQP_06_002: [** The `connect` method shall throw a ReferenceError if the uri parameter has not been supplied.**]** -**SRS_NODE_COMMON_AMQP_16_002: [**The connect method shall establish a connection with the IoT hub instance and if given as argument call the `done` callback with a null error object in the case of success and a `results.Connected` object.**]** +**SRS_NODE_COMMON_AMQP_16_002: [**The `connect` method shall establish a connection with the IoT hub instance and if given as argument call the `done` callback with a null error object in the case of success and a `results.Connected` object.**]** -**SRS_NODE_COMMON_AMQP_16_003: [**If given as an argument, the connect method shall call the `done` callback with a standard `Error` object if the connection fails.**]** +**SRS_NODE_COMMON_AMQP_16_003: [**If given as an argument, the connect method shall call the `done` callback with a standard `Error` object if the connection or link/listener establishment fails.**]** ### disconnect(done) @@ -117,5 +117,44 @@ Configures an AmqpReceiver object to use the endpoint passed as a parameter and **SRS_NODE_COMMON_AMQP_16_030: [** The `detachReceiverLink` method shall call the `done` callback with no arguments if the link for this endpoint doesn't exist. **]** **SRS_NODE_COMMON_AMQP_16_031: [** The `detachReceiverLink` method shall call the `done` callback with an `Error` object if there was an error while detaching the link. **]** +# putToken(audience, token, putTokenCallback) + +**SRS_NODE_COMMON_AMQP_06_016: [** The `putToken` method shall throw a ReferenceError if the `audience` argument is falsy. **]** +**SRS_NODE_COMMON_AMQP_06_017: [** The `putToken` method shall throw a ReferenceError if the `token` argument is falsy. **]** +**SRS_NODE_COMMON_AMQP_06_018: [** The `putToken` method shall call the `putTokenCallback` callback (if provided) with a `NotConnectedError` object if the amqp client is not connected when the method is called. **]** +**SRS_NODE_COMMON_AMQP_06_022: [** The `putToken` method shall call the `putTokenCallback` callback (if provided) with a `NotConnectedError` object if the `initializeCBS` has NOT been invoked. **]** +**SRS_NODE_COMMON_AMQP_06_005: [** The `putToken` method shall construct an amqp message that contains the following application properties: +'operation': 'put-token' +'type': 'servicebus.windows.net:sastoken' +'name': + +and system properties of + +'to': '$cbs' +'messageId': +'reply_to': 'cbs' + +and a body containing . **]** + +**SRS_NODE_COMMON_AMQP_06_015: [** The `putToken` method shall send this message over the `$cbs` sender link. **]** +**SRS_NODE_COMMON_AMQP_06_006: [** The `putToken` method shall call `putTokenCallback` (if supplied) if the `send` generates an error such that no response from the service will be forthcoming. **]** +**SRS_NODE_COMMON_AMQP_06_007: [** The `putToken` method will time out the put token operation if no response is returned within a configurable number of seconds. **]** +**SRS_NODE_COMMON_AMQP_06_008: [** The `putToken` method will invoke the `putTokenCallback` (if supplied) with an error object if the put token operation timed out. **]** + +#initializeCBS(initializeCBSCallback) + +**SRS_NODE_COMMON_AMQP_06_021: [** If given as an argument, the `initializeCBS` method shall call `initializeCBSCallback` with a `NotConnectedError` object if amqp client is not connnected. **]** +**SRS_NODE_COMMON_AMQP_06_009: [** The `initializeCBS` method shall establish a sender link to the `$cbs` endpoint for sending put token operations, utilizing a custom policy `{encoder: function(body) { return body;}}` which forces the amqp layer to send the token as an amqp value in the body. **]** +**SRS_NODE_COMMON_AMQP_06_010: [** The `initializeCBS` method shall establish a receiver link to the cbs endpoint. **]** +**SRS_NODE_COMMON_AMQP_06_011: [** The `initializeCBS` method shall set up a listener for responses to put tokens. **]** +**SRS_NODE_COMMON_AMQP_06_019: [** If given as an argument, the `initializeCBS` method shall call `initializeCBSCallback` with a standard `Error` object if the link/listener establishment fails. **]** +**SRS_NODE_COMMON_AMQP_06_020: [** If given as an argument, the `initializeCBS` method shall call `initializeCBSCallback` with a null error object if successful. **]** + +# $cbs listener + +**SRS_NODE_COMMON_AMQP_06_013: [** A put token response of 200 will invoke `putTokenCallback` with null parameters. **]** +**SRS_NODE_COMMON_AMQP_06_014: [** A put token response not equal to 200 will invoke `putTokenCallback` with an error object of UnauthorizedError. **]** +**SRS_NODE_COMMON_AMQP_06_012: [** All responses shall be completed. **]** + ### All methods **SRS_NODE_COMMON_AMQP_16_011: [** All methods should treat the `done` callback argument as optional and not throw if it is not passed as argument. **]** \ No newline at end of file diff --git a/common/transport/amqp/lib/amqp.js b/common/transport/amqp/lib/amqp.js index b88d1c251..38cf425d1 100644 --- a/common/transport/amqp/lib/amqp.js +++ b/common/transport/amqp/lib/amqp.js @@ -9,8 +9,14 @@ var AmqpMessage = require('./amqp_message.js'); var AmqpReceiver = require('./amqp_receiver.js'); var errors = require('azure-iot-common').errors; var results = require('azure-iot-common').results; + +var uuid = require('uuid'); var debug = require('debug')('amqp-common'); + +var _putTokenSendingEndpoint = '$cbs'; +var _putTokenReceivingEndpoint = '$cbs'; + /** * @class module:azure-iot-amqp-base.Amqp * @classdesc Basic AMQP functionality used by higher-level IoT Hub libraries. @@ -74,6 +80,36 @@ function Amqp(autoSettleMessages, sdkVersionString) { this._receivers = {}; this._senders = {}; + this._putToken = {}; + // + // This array will hold outstanding put token operations. The array has + // a monotonically increasing time ordering. This is effected by the nature + // of inserting new elements at the end of the array. Note that elements may + // be removed from the array at any index. The elements are the following object + // { + // putTokenCallback - The callback to be invoked on termination of the put token operation. + // This could be because the put token operation response was received + // from the service or because the put token operation times out. + // + // expirationTime - The number of seconds from the epoch, by which the put token operation will + // be expected to finish. + // + // correlationId - The put token operation was sent with a message id. The response + // to the put token operation will contain this message id as the + // correlation id. This id is a uuid. + // } + // + this._putToken.outstandingPutTokens = []; + // + // Currently a fixed value. Could have a set option if we want to make this configurable. + // + this._putToken.numberOfSecondsToTimeout = 120; + // + // While there are ANY put token operations outstanding a timer will be invoked every + // 10 seconds to examine the outstandingPutTokens array for any put tokens that may have + // expired. + // + this._putToken.putTokenTimeOutExaminationInterval = 10000; this._connected = false; } @@ -93,7 +129,7 @@ function safeCallback(callback, error, result) { * @param {Function} done Called when the connection is established or if an error happened. */ Amqp.prototype.connect = function connect(uri, sslOptions, done) { - /*Codes_SRS_NODE_COMMON_AMQP_06_002: [The connect method shall throw a ReferenceError if the uri parameter has not been supplied.] */ + /*Codes_SRS_NODE_COMMON_AMQP_06_002: [The `connect` method shall throw a ReferenceError if the uri parameter has not been supplied.] */ if (!uri) throw new ReferenceError('The uri parameter can not be \'' + uri + '\''); if (!this._connected) { this.uri = uri; @@ -111,14 +147,14 @@ Amqp.prototype.connect = function connect(uri, sslOptions, done) { .then(function (result) { debug('AMQP transport connected.'); this._connected = true; - /*Codes_SRS_NODE_COMMON_AMQP_16_002: [The connect method shall establish a connection with the IoT hub instance and call the done() callback if given as argument] */ + /*Codes_SRS_NODE_COMMON_AMQP_16_002: [The `connect` method shall establish a connection with the IoT hub instance and if given as argument call the `done` callback with a null error object in the case of success and a `results.Connected` object.]*/ safeCallback(done, null, new results.Connected(result)); return null; }.bind(this)) .catch(function (err) { this._amqp.removeListener('client:errorReceived', connectErrorHander); this._connected = false; - /*Codes_SRS_NODE_COMMON_AMQP_16_003: [The connect method shall call the done callback if the connection fails.] */ + /*Codes_SRS_NODE_COMMON_AMQP_16_003: [The `connect` method shall call the `done` callback if the connection fails.] */ safeCallback(done, connectError || err); }.bind(this)); } else { @@ -192,7 +228,7 @@ Amqp.prototype.send = function send(message, endpoint, to, done) { if (!this._connected) { safeCallback(done, new errors.NotConnectedError('Cannot send while disconnected.')); } else { - /*Codes_SRS_NODE_COMMON_AMQP_16_006: [The send method shall construct an AMQP message using information supplied by the caller, as follows: + /*Codes_SRS_NODE_COMMON_AMQP_16_006: [The `send` method shall construct an AMQP message using information supplied by the caller, as follows: The ‘to’ field of the message should be set to the ‘to’ argument. The ‘body’ of the message should be built using the message argument.] */ @@ -208,7 +244,7 @@ Amqp.prototype.send = function send(message, endpoint, to, done) { return null; }) .catch(function (err) { - /*Codes_SRS_NODE_IOTHUB_AMQPCOMMON_16_007: [If sendEvent encounters an error before it can send the request, it shall invoke the done callback function and pass the standard JavaScript Error object with a text description of the error (err.message).]*/ + /*Codes_SRS_NODE_IOTHUB_AMQPCOMMON_16_007: [If sendEvent encounters an error before it can send the request, it shall invoke the `done` callback function and pass the standard JavaScript Error object with a text description of the error (err.message).]*/ safeCallback(done, err); }); }; @@ -268,7 +304,7 @@ Amqp.prototype.attachReceiverLink = function attachReceiverLink(endpoint, linkOp var clientErrorHandler = function(err) { connectionError = err; }; - /*Codes_SRS_NODE_COMMON_AMQP_16_007: [If send encounters an error before it can send the request, it shall invoke the done callback function and pass the standard JavaScript Error object with a text description of the error (err.message).]*/ + /*Codes_SRS_NODE_COMMON_AMQP_16_007: [If send encounters an error before it can send the request, it shall invoke the `done` callback function and pass the standard JavaScript Error object with a text description of the error (err.message).]*/ this._amqp.on('client:errorReceived', clientErrorHandler); /*Codes_SRS_NODE_COMMON_AMQP_06_004: [The `attachReceiverLink` method shall create a policy object that contain link options to be merged if the linkOptions argument is not falsy.]*/ @@ -320,7 +356,7 @@ Amqp.prototype.attachSenderLink = function attachSenderLink(endpoint, linkOption var clientErrorHandler = function(err) { connectionError = err; }; - /*Codes_SRS_NODE_COMMON_AMQP_16_007: [If send encounters an error before it can send the request, it shall invoke the done callback function and pass the standard JavaScript Error object with a text description of the error (err.message).]*/ + /*Codes_SRS_NODE_COMMON_AMQP_16_007: [If send encounters an error before it can send the request, it shall invoke the `done` callback function and pass the standard JavaScript Error object with a text description of the error (err.message).]*/ this._amqp.on('client:errorReceived', clientErrorHandler); /*Codes_SRS_NODE_COMMON_AMQP_06_003: [The `attachSenderLink` method shall create a policy object that contain link options to be merged if the linkOptions argument is not falsy.]*/ @@ -341,7 +377,7 @@ Amqp.prototype.attachSenderLink = function attachSenderLink(endpoint, linkOption return null; }) .catch(function (err) { - /*Codes_SRS_NODE_IOTHUB_AMQPCOMMON_16_007: [If sendEvent encounters an error before it can send the request, it shall invoke the done callback function and pass the standard JavaScript Error object with a text description of the error (err.message).]*/ + /*Codes_SRS_NODE_IOTHUB_AMQPCOMMON_16_007: [If sendEvent encounters an error before it can send the request, it shall invoke the `done` callback function and pass the standard JavaScript Error object with a text description of the error (err.message).]*/ var error = new errors.NotConnectedError('AMQP: Could not create sender'); error.amqpError = err; safeCallback(done, error); @@ -411,4 +447,195 @@ Amqp.prototype._detachLink = function _detachLink(link, detachCallback) { } }; +Amqp.prototype._removeExpiredPutTokens = function removeExpiredPutTokens() { + var currentTime = Math.round(Date.now() / 1000); + var expiredPutTokens = []; + while (this._putToken.outstandingPutTokens.length > 0) { + // + // The timeouts in this array by definition are monotonically increasing. We will be done looking if we + // hit one that is not yet expired. + // + /*Codes_SRS_NODE_COMMON_AMQP_06_007: [ The `putToken` method will time out the put token operation if no response is returned within a configurable number of seconds.]*/ + if (this._putToken.outstandingPutTokens[0].expirationTime < currentTime) { + expiredPutTokens.push(this._putToken.outstandingPutTokens[0]); + this._putToken.outstandingPutTokens.splice(0, 1); + } else { + break; + } + } + expiredPutTokens.forEach(function(currentExpiredPut) { + /*Codes_SRS_NODE_COMMON_AMQP_06_008: [ The `putToken` method will invoke the `putTokenCallback` (if supplied) with an error object if the put token operation timed out. .]*/ + safeCallback(currentExpiredPut.putTokenCallback, new errors.TimeoutError('Put Token operation had no response within ' + this._putToken.numberOfSecondsToTimeout)); + }.bind(this)); + // + // If there are any putTokens left keep trying to time them out. + // + if (this._putToken.outstandingPutTokens.length > 0) { + this._putToken.timeoutTimer = setTimeout(this._removeExpiredPutTokens.bind(this), this._putToken.putTokenTimeOutExaminationInterval); + } +}; + +/** + * @method module:azure-iot-amqp-base.Amqp#putToken + * @description Sends a put token operation to the IoT Hub to provide authentication for a device. + * @param audience The path that describes what is being authenticated. An example would be + * hub.azure-devices.net%2Fdevices%2Fmydevice + * @param token The actual sas token being used to authenticate the device. For the most + * part the audience is likely to be the sr field of the token. + * @param {Function} putTokenCallback Called when the put token operation terminates. + */ +Amqp.prototype.putToken = function(audience, token, putTokenCallback) { + + /*Codes_SRS_NODE_COMMON_AMQP_06_016: [The `putToken` method shall throw a ReferenceError if the `audience` argument is falsy.]*/ + if (!audience) { + throw new ReferenceError('audience cannot be \'' + audience + '\''); + } + + /*Codes_SRS_NODE_COMMON_AMQP_06_017: [The `putToken` method shall throw a ReferenceError if the `token` argument is falsy.]*/ + if (!token) { + throw new ReferenceError('token cannot be \'' + token + '\''); + } + + /*Codes_SRS_NODE_COMMON_AMQP_06_018: [The `putToken` method shall call the `putTokenCallback` callback (if provided) with a `NotConnectedError` object if the amqp client is not connected when the method is called.]*/ + if (!this._connected) { + safeCallback(putTokenCallback, new errors.NotConnectedError('Cannot putToken while disconnected.')); + } else if (!this._senders[_putTokenSendingEndpoint] || !this._receivers[_putTokenReceivingEndpoint]) { + /*Codes_SRS_NODE_COMMON_AMQP_06_022: [ The `putToken` method shall call the `putTokenCallback` callback (if provided) with a `NotConnectedError` object if the `initializeCBS` has NOT been invoked.]*/ + safeCallback(putTokenCallback, new errors.NotConnectedError('Cannot putToken unless initializeCBS invoked.')); + } else { + /*Codes_SRS_NODE_COMMON_AMQP_06_005: [The `putToken` method shall construct an amqp message that contains the following application properties: + 'operation': 'put-token' + 'type': 'servicebus.windows.net:sastoken' + 'name': + + and system properties of + + 'to': '$cbs' + 'messageId': + 'reply_to': 'cbs'] + + and a body containing . */ + var amqpMessage = new AmqpMessage(); + amqpMessage.applicationProperties = { + operation: 'put-token', + type: 'servicebus.windows.net:sastoken', + name: audience + }; + amqpMessage.body = token; + amqpMessage.properties = { + to: '$cbs', + messageId: uuid.v4(), + reply_to: 'cbs' + }; + var outstandingPutToken = { + putTokenCallback: putTokenCallback, + expirationTime: Math.round(Date.now() / 1000) + this._putToken.numberOfSecondsToTimeout, + correlationId: amqpMessage.properties.messageId + }; + this._putToken.outstandingPutTokens.push(outstandingPutToken); + // + // If this is the first put token then start trying to time it out. + // + if (this._putToken.outstandingPutTokens.length === 1) { + this._putToken.timeoutTimer = setTimeout(this._removeExpiredPutTokens.bind(this), this._putToken.putTokenTimeOutExaminationInterval); + } + /*Codes_SRS_NODE_COMMON_AMQP_06_015: [The `putToken` method shall send this message over the `$cbs` sender link.]*/ + this._senders[_putTokenSendingEndpoint].send(amqpMessage) + .then(function () { + // + // Only here if the message was queued successfully. Yay! We already set up a callback + // to handle the response message of the put token operation. That will finish up anything + // we need to do. + // + return null; + }) + .catch(function (err) { + // + // Sadness. Something went wrong sending the put token. + // + // Find the operation in the outstanding array. Remove it from the array since, well, it's not outstanding anymore. + // Since we may have arrived here asynchronously, we simply can't assume that it is the end of the array. But, + // it's more likely near the end. + // + for (var i = this._putToken.outstandingPutTokens.length - 1;i >= 0; i--) { + if (this._putToken.outstandingPutTokens[i].correlationId === amqpMessage.properties.messageId) { + var outStandingPutTokenInError = this._putToken.outstandingPutTokens[i]; + this._putToken.outstandingPutTokens.splice(i, 1); + // + // This was the last outstanding put token. No point in having a timer around trying to time nothing out. + // + if (this._putToken.outstandingPutTokens.length === 0) { + clearTimeout(this._putToken.timeoutTimer); + } + /*Codes_SRS_NODE_COMMON_AMQP_06_006: [The `putToken` method shall call `putTokenCallback` (if supplied) if the `send` generates an error such that no response from the service will be forthcoming.]*/ + safeCallback(outStandingPutTokenInError.putTokenCallback, err); + break; + } + } + }.bind(this)); + } +}; + +/** + * @method module:azure-iot-amqp-base.Amqp#initializeCBS + * @description If CBS authentication is to be used, set it up. + * @param {Function} initializeCBSCallback Called when the initialization terminates. + */ +Amqp.prototype.initializeCBS = function(initializeCBSCallback) { + if (!this._connected) { + /*Codes_SRS_NODE_COMMON_AMQP_06_021: [If given as an argument, the `initializeCBS` method shall call `initializeCBSCallback` with a `NotConnectedError` object if amqp client is not connnected. **]*/ + safeCallback(initializeCBSCallback, new errors.NotConnectedError('Initializing CBS must only be done on a connnected client')); + } else { + /*Codes_SRS_NODE_COMMON_AMQP_06_009: [The `initializeCBS` method shall establish a sender link to the `$cbs` endpoint for sending put token operations, utilizing a custom policy `{encoder: function(body) { return body;}}` which forces the amqp layer to send the token as an amqp value in the body.]*/ + this.attachSenderLink(_putTokenSendingEndpoint, {encoder: function(body) { return body;}}, function (err) { + if (err) { + this.disconnect(function () { + /*Codes_SRS_NODE_COMMON_AMQP_06_019: [If given as an argument, the `initializeCBS` method shall call `initializeCBSCallback` with a standard `Error` object if the link/listener establishment fails.]*/ + safeCallback(initializeCBSCallback, err); + }); + } else { + /*Codes_SRS_NODE_COMMON_AMQP_06_010: [The `initializeCBS` method shall establish a receiver link to the cbs endpoint.]*/ + this.attachReceiverLink(_putTokenReceivingEndpoint, null, function (err) { + if (err) { + this.disconnect( function () { + /*Codes_SRS_NODE_COMMON_AMQP_06_019: [If given as an argument, the `initializeCBS` method shall call `initializeCBSCallback` with a standard `Error` object if the link/listener establishment fails.]*/ + safeCallback(initializeCBSCallback, err); + }); + } else { + /*Codes_SRS_NODE_COMMON_AMQP_06_011: [The `initializeCBS` method shall set up a listener for responses to put tokens.]*/ + this._receivers[_putTokenReceivingEndpoint].on('message', function (msg) { + for (var i = 0; i < this._putToken.outstandingPutTokens.length; i++) { + if (msg.correlationId === this._putToken.outstandingPutTokens[i].correlationId) { + var completedPutToken = this._putToken.outstandingPutTokens[i]; + this._putToken.outstandingPutTokens.splice(i, 1); + if (completedPutToken.putTokenCallback) { + /*Codes_SRS_NODE_COMMON_AMQP_06_013: [A put token response of 200 will invoke `putTokenCallback` with null parameters.]*/ + var error = null; + if (msg.properties.getValue('status-code') !== 200) { + /*Codes_SRS_NODE_COMMON_AMQP_06_014: [A put token response not equal to 200 will invoke `putTokenCallback` with an error object of UnauthorizedError.]*/ + error = new errors.UnauthorizedError(msg.properties.getValue('status-description')); + } + safeCallback(completedPutToken.putTokenCallback, error); + } + break; + } + } + // + // Regardless of whether we found the put token in the list of outstanding + // operations, accept it. This could be a put token that we previously + // timed out. Be happy. It made it home, just too late to be useful. + // + /*Codes_SRS_NODE_COMMON_AMQP_06_012: [All responses shall be completed.]*/ + this._receivers[_putTokenReceivingEndpoint].complete(msg); + }.bind(this)); + /*Codes_SRS_NODE_COMMON_AMQP_06_020: [If given as an argument, the `initializeCBS` method shall call `initializeCBSCallback` with a null error object if successful.]*/ + safeCallback(initializeCBSCallback, null); + } + }.bind(this)); + } + }.bind(this)); + } +}; + + module.exports = Amqp; \ No newline at end of file diff --git a/common/transport/amqp/package.json b/common/transport/amqp/package.json index 5efd0ee59..ffb7352e0 100644 --- a/common/transport/amqp/package.json +++ b/common/transport/amqp/package.json @@ -12,10 +12,11 @@ "amqp10-transport-ws": "^0.0.5", "azure-iot-common": "1.1.5", "debug": "^2.6.0", - "sinon": "^1.17.7", - "sinon-as-promised": "^4.0.2" + "uuid": "^3.0.1" }, "devDependencies": { + "sinon": "^1.17.7", + "sinon-as-promised": "^4.0.2", "chai": "^3.5.0", "istanbul": "^0.4.5", "jshint": "^2.9.4", @@ -32,7 +33,7 @@ "alltest": "istanbul cover node_modules/mocha/bin/_mocha -- --reporter spec test/_*_test*.js", "ci": "npm -s run lint && npm -s run alltest-min && npm -s run check-cover", "test": "npm -s run lint && npm -s run unittest", - "check-cover": "istanbul check-coverage --statements 95 --branches 83 --functions 92 --lines 95" + "check-cover": "istanbul check-coverage --statements 95 --branches 85 --functions 91 --lines 95" }, "engines": { "node": ">= 0.10" diff --git a/common/transport/amqp/test/_amqp_test.js b/common/transport/amqp/test/_amqp_test.js index da5b0894b..705b251e3 100644 --- a/common/transport/amqp/test/_amqp_test.js +++ b/common/transport/amqp/test/_amqp_test.js @@ -13,7 +13,10 @@ var results = require('azure-iot-common').results; var errors = require('azure-iot-common').errors; var Message = require('azure-iot-common').Message; var EventEmitter = require('events').EventEmitter; +var uuid = require('uuid'); +var cbsReceiveEndpoint = '$cbs'; +var cbsSendEndpoint = '$cbs'; describe('Amqp', function () { describe('#connect', function () { @@ -28,6 +31,7 @@ describe('Amqp', function () { }); /*Tests_SRS_NODE_COMMON_AMQP_16_002: [The connect method shall establish a connection with the IoT hub instance and if given as argument call the `done` callback with a null error object in the case of success and a `results.Connected` object.]*/ + /*Tests_SRS_NODE_COMMON_AMQP_06_011: [The `connect` method shall set up a listener for responses to put tokens.]*/ it('Calls the done callback when successfully connected', function(testCallback) { var amqp = new Amqp(); sinon.stub(amqp._amqp, 'connect').resolves('connected'); @@ -329,6 +333,434 @@ describe('Amqp', function () { }); }); + describe('#initializeCBS', function() { + /*Tests_SRS_NODE_COMMON_AMQP_06_019: [If given as an argument, the `initializeCBS` method shall call `initializeCBSCallback` with a standard `Error` object if the link/listener establishment fails.]*/ + it('Calls initializeCBSCallback with an error if can NOT establish a sender link', function(testCallback) { + var amqp = new Amqp(); + var testError = new Error(); + sinon.stub(amqp, 'attachSenderLink').callsArgWith(2, testError); + amqp._connected = true; + amqp.initializeCBS(function(err) { + assert.strictEqual(err, testError); + testCallback(); + }); + }); + + /*Tests_SRS_NODE_COMMON_AMQP_06_019: [If given as an argument, the `initializeCBS` method shall call `initializeCBSCallback` with a standard `Error` object if the link/listener establishment fails.]*/ + it('Calls initializeCBSCallback with an error if can NOT establish a receiver link', function(testCallback) { + var amqp = new Amqp(); + var testError = new Error(); + sinon.stub(amqp, 'attachSenderLink').callsArgWith(2, null); + sinon.stub(amqp, 'attachReceiverLink').callsArgWith(2, testError); + amqp._connected = true; + amqp.initializeCBS(function(err) { + assert.strictEqual(err, testError); + testCallback(); + }); + }); + + it('returns NotConnectedError if client not connnected', function(testCallback) { + var amqp = new Amqp(); + amqp.initializeCBS(function(err) { + assert.instanceOf(err, errors.NotConnectedError); + testCallback(); + }); + + }); + + /*Tests_SRS_NODE_COMMON_AMQP_06_020: [If given as an argument, the `initializeCBS` method shall call `initializeCBSCallback` with a null error object if successful.]*/ + it('Calls initializeCBSCallback with a null error object if successful', function(testCallback) { + var amqp = new Amqp(); + sinon.stub(amqp, 'attachSenderLink').callsArgWith(2, null); + sinon.stub(amqp, 'attachReceiverLink').callsArgWith(2, null); + + amqp._receivers[cbsReceiveEndpoint] = new EventEmitter(); + amqp._senders[cbsSendEndpoint] = new EventEmitter(); + amqp._connected = true; + amqp.initializeCBS(function(err) { + assert.strictEqual(err, null); + testCallback(); + }); + }); + + + }); + + describe('#putToken', function() { + [undefined, null, ''].forEach(function (badAudience){ + /*Tests_SRS_NODE_COMMON_AMQP_06_016: [The `putToken` method shall throw a ReferenceError if the `audience` argument is falsy.]*/ + it('throws if audience is \'' + badAudience +'\'', function () { + var newClient = new Amqp(); + assert.throws(function () { + newClient.putToken(badAudience, 'sas', function () {}); + }, ReferenceError, ''); + }); + }); + + [undefined, null, ''].forEach(function (badToken){ + /*Tests_SRS_NODE_COMMON_AMQP_06_017: [The `putToken` method shall throw a ReferenceError if the `token` argument is falsy.]*/ + it('throws if sasToken is \'' + badToken +'\'', function () { + var newClient = new Amqp(); + assert.throws(function () { + newClient.putToken('audience', badToken, function () {}); + }, ReferenceError, ''); + }); + }); + + /*Tests_SRS_NODE_COMMON_AMQP_06_018: [The `putToken` method shall call the `putTokenCallback` callback (if provided) with a `NotConnectedError` object if the amqp client is not connected when the method is called.]*/ + it('calls the putTokenCallback with a NotConnectedError if the client is not connected', function(testCallback) { + var amqp = new Amqp(); + amqp.putToken('audience', 'sasToken', function(err) { + assert.instanceOf(err, errors.NotConnectedError); + testCallback(); + }); + }); + + /*Tests_SRS_NODE_COMMON_AMQP_06_022: [The `putToken` method shall call the `putTokenCallback` callback (if provided) with a `NotConnectedError` object if the `initializeCBS` has NOT been invoked.]*/ + it('calls the putTokenCallback with a NotConnectedError if initializeCBS not invoked', function(testCallback) { + var amqp = new Amqp(); + amqp._connected = true; + amqp.putToken('audience', 'sasToken', function(err) { + assert.instanceOf(err, errors.NotConnectedError); + testCallback(); + }); + }); + + it('creates a timer if this is the first pending put token operation', function(testCallback) { + this.clock = sinon.useFakeTimers(); + var amqp = new Amqp(); + var cbsSender = new EventEmitter(); + var cbsReceiver = new EventEmitter(); + cbsSender.send = sinon.stub().resolves('message enqueued'); + sinon.stub(amqp._amqp, 'createReceiver').resolves(cbsReceiver); + sinon.stub(amqp._amqp, 'createSender').resolves(cbsSender); + amqp._connected = true; + var spyRemoveExpiredPutTokens = sinon.spy(amqp, '_removeExpiredPutTokens'); + amqp.initializeCBS(function(err) { + assert.isNotOk(err, 'initalization passed'); + amqp._putToken.numberOfSecondsToTimeout = 120; + amqp._putToken.putTokenTimeOutExaminationInterval = 10000; + amqp.putToken('audience','sasToken', function () {}); + this.clock.tick(9999); + assert.isTrue(spyRemoveExpiredPutTokens.notCalled, ' removeExpiredPutTokens should not have been called.'); + this.clock.tick(2); + assert.isTrue(spyRemoveExpiredPutTokens.calledOnce, ' removeExpiredPutTokens should have been called once.'); + clearTimeout(amqp._putToken.timeoutTimer); + this.clock.restore(); + testCallback(); + }.bind(this)); + }); + + it('Two putTokens in succession still causes only one invocation of the timer callback', function(testCallback) { + this.clock = sinon.useFakeTimers(); + var amqp = new Amqp(); + var cbsSender = new EventEmitter(); + var cbsReceiver = new EventEmitter(); + cbsSender.send = sinon.stub().resolves('message enqueued'); + sinon.stub(amqp._amqp, 'createReceiver').resolves(cbsReceiver); + sinon.stub(amqp._amqp, 'createSender').resolves(cbsSender); + amqp._connected = true; + var spyRemoveExpiredPutTokens = sinon.spy(amqp, '_removeExpiredPutTokens'); + amqp.initializeCBS(function(err) { + assert.isNotOk(err, 'initalization passed'); + amqp._putToken.numberOfSecondsToTimeout = 120; + amqp._putToken.putTokenTimeOutExaminationInterval = 10000; + amqp.putToken('audience','sasToken', function () {}); + this.clock.tick(500); + assert.isTrue(spyRemoveExpiredPutTokens.notCalled, ' removeExpiredPutTokens should not have been called.'); + amqp.putToken('audience1','sasToken2', function () {}); + this.clock.tick(500); + assert.isTrue(spyRemoveExpiredPutTokens.notCalled, ' removeExpiredPutTokens should not have been called.'); + this.clock.tick(8999); + assert.isTrue(spyRemoveExpiredPutTokens.notCalled, ' removeExpiredPutTokens should not have been called.'); + this.clock.tick(6000); + assert.isTrue(spyRemoveExpiredPutTokens.calledOnce, ' removeExpiredPutTokens should have been called once.'); + clearTimeout(amqp._putToken.timeoutTimer); + this.clock.restore(); + testCallback(); + }.bind(this)); + }); + + /*Tests_SRS_NODE_COMMON_AMQP_06_005: [The `putToken` method shall construct an amqp message that contains the following application properties: + 'operation': 'put-token' + 'type': 'servicebus.windows.net:sastoken' + 'name': + + and system properties of + + 'to': '$cbs' + 'messageId': + 'reply_to': 'cbs'] + + and a body containing . */ + /*Tests_SRS_NODE_COMMON_AMQP_06_015: [The `putToken` method shall send this message over the `$cbs` sender link.]*/ + it('sends a put token operation', function(testCallback) { + var fakeUuids = [uuid.v4()]; + var uuidStub = sinon.stub(uuid,'v4'); + var amqp = new Amqp(); + uuidStub.onCall(0).returns(fakeUuids[0]); + var cbsSender = new EventEmitter(); + var cbsReceiver = new EventEmitter(); + cbsSender.send = sinon.stub().resolves('message enqueued'); + sinon.stub(amqp._amqp, 'createReceiver').resolves(cbsReceiver); + sinon.stub(amqp._amqp, 'createSender').resolves(cbsSender); + amqp._connected = true; + amqp.initializeCBS(function(err) { + assert.isNotOk(err, 'initalization passed'); + amqp.putToken('myaudience', 'my token'); + uuid.v4.restore(); + assert.equal(cbsSender.send.args[0][0].applicationProperties.operation, 'put-token', 'operation application property not equal'); + assert.equal(cbsSender.send.args[0][0].applicationProperties.type, 'servicebus.windows.net:sastoken', 'type application property not equal'); + assert.equal(cbsSender.send.args[0][0].applicationProperties.name, 'myaudience', 'name application property not equal'); + assert.equal(cbsSender.send.args[0][0].properties.to, '$cbs', 'to application property not equal'); + assert.equal(cbsSender.send.args[0][0].properties.messageId, fakeUuids[0], 'messageId n property not equal'); + assert.equal(cbsSender.send.args[0][0].properties.reply_to, 'cbs', 'reply_to property not equal'); + assert.isTrue(cbsSender.send.args[0][0].body === 'my token', 'body of put token not the sas token'); + clearTimeout(amqp._putToken.timeoutTimer); + testCallback(); + }); + }); + + /*Tests_SRS_NODE_COMMON_AMQP_06_006: [The `putToken` method shall call `putTokenCallback` (if supplied) if the `send` generates an error such that no response from the service will be forthcoming.]*/ + it('sends two put tokens erroring the second , ensuring the first remains', function(testCallback) { + var fakeUuids = [uuid.v4(), uuid.v4()]; + var uuidStub = sinon.stub(uuid,'v4'); + uuidStub.onCall(0).returns(fakeUuids[0]); + uuidStub.onCall(1).returns(fakeUuids[1]); + var amqp = new Amqp(); + var cbsSender = new EventEmitter(); + var cbsReceiver = new EventEmitter(); + cbsSender.send = sinon.stub().resolves('message enqueued'); + sinon.stub(amqp._amqp, 'createReceiver').resolves(cbsReceiver); + sinon.stub(amqp._amqp, 'createSender').resolves(cbsSender); + amqp._connected = true; + + amqp.initializeCBS(function(err) { + assert.isNotOk(err, 'initalization passed'); + amqp.putToken('first audience', 'first token', function () { + assert.fail('This callback for the first put token should not have been called'); + }); + cbsSender.send = sinon.stub().rejects('could not send'); + amqp.putToken('second audience', 'second token', function (err) { + assert.instanceOf(err, Error); + // + // Make sure that the first put token is still outstanding. + // + assert.equal(amqp._putToken.outstandingPutTokens.length, 1, 'outstanding token array length' ); + assert.equal(amqp._putToken.outstandingPutTokens[0].correlationId, fakeUuids[0], 'outstanding token correlation id '); + clearTimeout(amqp._putToken.timeoutTimer); + testCallback(); + }); + uuid.v4.restore(); + }); + }); + + /*Tests_SRS_NODE_COMMON_AMQP_06_007: [ The `putToken` method will time out the put token operation if no response is returned within a configurable number of seconds.]*/ + /*Tests_SRS_NODE_COMMON_AMQP_06_008: [ The `putToken` method will invoke the `putTokenCallback` (if supplied) with an error object if the put token operation timed out. ]*/ + it('Three put tokens, two timeout initially, third later', function(testCallback) { + this.clock = sinon.useFakeTimers(); + var fakeUuids = [uuid.v4(), uuid.v4(), uuid.v4()]; + var uuidStub = sinon.stub(uuid,'v4'); + uuidStub.onCall(0).returns(fakeUuids[0]); + uuidStub.onCall(1).returns(fakeUuids[1]); + uuidStub.onCall(2).returns(fakeUuids[2]); + var amqp = new Amqp(); + var cbsSender = new EventEmitter(); + var cbsReceiver = new EventEmitter(); + cbsSender.send = sinon.stub().resolves('message enqueued'); + sinon.stub(amqp._amqp, 'createReceiver').resolves(cbsReceiver); + sinon.stub(amqp._amqp, 'createSender').resolves(cbsSender); + amqp._connected = true; + + amqp.initializeCBS(function(err) { + assert.isNotOk(err, 'initalization passed'); + amqp._putToken.numberOfSecondsToTimeout = 120; + amqp._putToken.putTokenTimeOutExaminationInterval = 10000; + amqp.putToken('first audience', 'first token', function (err) { + assert.instanceOf(err, errors.TimeoutError); + // + // There should only be one outstanding put token when this first put token is timed out. + // + assert.equal(amqp._putToken.outstandingPutTokens.length, 1, 'For the first put token call back invocation outstanding remaining '); + assert.equal(amqp._putToken.outstandingPutTokens[0].correlationId, fakeUuids[2], 'For the first put token callback the outstanding correlation id. '); + }); + this.clock.tick(5000); // 5 seconds between first and second + amqp.putToken('second audience', 'second token', function (err) { + assert.instanceOf(err, errors.TimeoutError); + // + // There should only be one outstanding put token when this second put token is timed out. + // + assert.equal(amqp._putToken.outstandingPutTokens.length, 1, 'For the second put token call back invocation outstanding remaining '); + assert.equal(amqp._putToken.outstandingPutTokens[0].correlationId, fakeUuids[2], 'For the second put token callback the outstanding correlation id. '); + }); + this.clock.tick(30000); // 30 seconds more till the third + amqp.putToken('third audience', 'third token', function (err) { + assert.instanceOf(err, errors.TimeoutError); + // + // There should be no outstanding put token when this put token is timed out. + // + assert.equal(amqp._putToken.outstandingPutTokens.length, 0, 'For the third put token call back invocation outstanding remaining '); + this.clock.restore(); + testCallback(); + }.bind(this)); + this.clock.tick(100000); // 100 seconds more should have resulted in timeouts for the first two. + process.nextTick(function () { + this.clock.tick(40000); // 40 seconds more should have resulted in timeouts for the third. + }.bind(this)); + uuid.v4.restore(); + }.bind(this)); + }); + + /*Tests_SRS_NODE_COMMON_AMQP_06_013: [A put token response of 200 will invoke `putTokenCallback` with null parameters.]*/ + it('Three put tokens, first and third timeout eventually, second completes successfully', function(testCallback) { + this.clock = sinon.useFakeTimers(); + var fakeUuids = [uuid.v4(), uuid.v4(), uuid.v4()]; + var uuidStub = sinon.stub(uuid,'v4'); + var amqp = new Amqp(); + uuidStub.onCall(0).returns(fakeUuids[0]); + uuidStub.onCall(1).returns(fakeUuids[1]); + uuidStub.onCall(2).returns(fakeUuids[2]); + var cbsSender = new EventEmitter(); + var cbsReceiver = new EventEmitter(); + cbsSender.send = sinon.stub().resolves('message enqueued'); + sinon.stub(amqp._amqp, 'createReceiver').resolves(cbsReceiver); + sinon.stub(amqp._amqp, 'createSender').resolves(cbsSender); + var responseMessage = new Message(); + responseMessage.correlationId = fakeUuids[1]; + responseMessage.properties.add('status-code', 200); + amqp._connected = true; + + amqp.initializeCBS(function(err) { + assert.isNotOk(err, 'initalization passed'); + amqp._receivers[cbsReceiveEndpoint].complete = sinon.spy(); + amqp._putToken.numberOfSecondsToTimeout = 120; + amqp._putToken.putTokenTimeOutExaminationInterval = 10000; + amqp.putToken('first audience', 'first token', function (err) { + assert.instanceOf(err, errors.TimeoutError); + // + // There should only be one outstanding put token when this first put token is timed out. + // + assert.equal(amqp._putToken.outstandingPutTokens.length, 1, 'For the first put token call back invocation outstanding remaining '); + assert.equal(amqp._putToken.outstandingPutTokens[0].correlationId, fakeUuids[2], 'For the second put token callback the first outstanding correlation id. '); + }); + this.clock.tick(20000); // 20 seconds between first and second + amqp.putToken('second audience', 'second token', function (err, putTokenResult) { + assert.isNotOk(err,'The error object passed'); + assert.isNotOk(putTokenResult,'The result object passed'); + // + // There should be two outstanding put token when this second put token succeeds. + // + assert.equal(amqp._putToken.outstandingPutTokens.length, 2, 'For the second put token call back invocation outstanding remaining '); + assert.equal(amqp._putToken.outstandingPutTokens[0].correlationId, fakeUuids[0], 'For the second put token callback the first outstanding correlation id. '); + assert.equal(amqp._putToken.outstandingPutTokens[1].correlationId, fakeUuids[2], 'For the second put token callback the second outstanding correlation id. '); + }); + this.clock.tick(30000); // 30 seconds more till the third. We are at 50 seconds from the start of the first put token. + amqp.putToken('third audience', 'third token', function (err) { + assert.instanceOf(err, errors.TimeoutError); + // + // There should be no outstanding put token when this put token is timed out. + // + assert.equal(amqp._putToken.outstandingPutTokens.length, 0, 'For the third put token call back invocation outstanding remaining '); + assert(amqp._receivers[cbsReceiveEndpoint].complete.calledOnce); + this.clock.restore(); + testCallback(); + }.bind(this)); + this.clock.tick(40000); // Let them sit in the outstanding list for a bit. We should be at 1 minute 30 seconds at this point. + process.nextTick(function () { + // + // Emit a put token response which should complete the second put token. + // + amqp._receivers[cbsReceiveEndpoint].emit('message',responseMessage); + this.clock.tick(10000); // Move forward a bit We should be at 1 minute 40 seconds after the start of the first put token. + process.nextTick(function () { + assert.equal(amqp._putToken.outstandingPutTokens.length, 2, 'First time stop after completing the second put token'); + assert.equal(amqp._putToken.outstandingPutTokens[0].correlationId, fakeUuids[0], 'First time stop - the first outstanding correlation id. '); + assert.equal(amqp._putToken.outstandingPutTokens[1].correlationId, fakeUuids[2], 'First time stop - the second outstanding correlation id. '); + // + // At this point the second put token is done. Move forward enough time to complete the first put token + // + this.clock.tick(30000); // Move forward 30 seconds. We should be at 2 minutes 20 seconds after the start of the first put token. + process.nextTick(function () { + assert.equal(amqp._putToken.outstandingPutTokens.length, 1, 'Second time stop after completing the second put token'); + assert.equal(amqp._putToken.outstandingPutTokens[0].correlationId, fakeUuids[2], 'Second time stop - the third put token should be remaining.'); + // + // At this point the first put token is done (timed out). Move forward enough time to complete the third put token. + // + this.clock.tick(60000); // Move forward 60 seconds. We should be at 3 minutes 20 seconds after the start of the first put token. This should time out the third put token + }.bind(this)); + }.bind(this)); + }.bind(this)); + uuid.v4.restore(); + }.bind(this)); + }); + + /*Tests_SRS_NODE_COMMON_AMQP_06_014: [A put token response not equal to 200 will invoke `putTokenCallback` with an error object of UnauthorizedError.]*/ + it('Status result not equal to 200 completes the put token with an error.', function(testCallback) { + var fakeUuids = [uuid.v4()]; + var uuidStub = sinon.stub(uuid,'v4'); + uuidStub.onCall(0).returns(fakeUuids[0]); + var amqp = new Amqp(); + var cbsSender = new EventEmitter(); + var cbsReceiver = new EventEmitter(); + cbsSender.send = sinon.stub().resolves('message enqueued'); + sinon.stub(amqp._amqp, 'createReceiver').resolves(cbsReceiver); + sinon.stub(amqp._amqp, 'createSender').resolves(cbsSender); + amqp._connected = true; + var responseMessage = new Message(); + responseMessage.correlationId = fakeUuids[0]; + responseMessage.properties.add('status-code', 201); + responseMessage.properties.add('status-description', 'cryptic message'); + + amqp.initializeCBS(function(err) { + assert.isNotOk(err, 'initalization passed'); + amqp._receivers[cbsReceiveEndpoint].complete = sinon.spy(); + amqp.putToken('audience', 'token', function (err, putTokenResult) { + assert.instanceOf(err, errors.UnauthorizedError); + assert.isNotOk(putTokenResult,'The result object passed'); + assert.equal(amqp._putToken.outstandingPutTokens.length, 0, 'For put token call nothing should be outstanding.'); + assert(amqp._receivers[cbsReceiveEndpoint].complete.calledOnce); + uuid.v4.restore(); + testCallback(); + }); + amqp._receivers[cbsReceiveEndpoint].emit('message',responseMessage); + }.bind(this)); + }); + + it('Do a put token with no callback that completes successfully', function(testCallback) { + var fakeUuids = [uuid.v4()]; + var uuidStub = sinon.stub(uuid,'v4'); + uuidStub.onCall(0).returns(fakeUuids[0]); + var amqp = new Amqp(); + var cbsSender = new EventEmitter(); + var cbsReceiver = new EventEmitter(); + cbsSender.send = sinon.stub().resolves('message enqueued'); + sinon.stub(amqp._amqp, 'createReceiver').resolves(cbsReceiver); + sinon.stub(amqp._amqp, 'createSender').resolves(cbsSender); + amqp._connected = true; + var responseMessage = new Message(); + responseMessage.correlationId = fakeUuids[0]; + responseMessage.properties.add('status-code', 200); + + amqp.initializeCBS(function(err) { + assert.isNotOk(err, 'initalization passed'); + amqp._receivers[cbsReceiveEndpoint].complete = sinon.spy(); + amqp.putToken('first audience', 'first token'); + assert.equal(amqp._putToken.outstandingPutTokens.length, 1, 'Should be one put token outstanding.'); + // + // Emit a put token response which should complete the put token. + // + amqp._receivers[cbsReceiveEndpoint].emit('message',responseMessage); + process.nextTick(function () { + assert.equal(amqp._putToken.outstandingPutTokens.length, 0, 'First time stop all should be done'); + assert(amqp._receivers[cbsReceiveEndpoint].complete.calledOnce); + uuid.v4.restore(); + testCallback(); + }.bind(this)); + }.bind(this)); + }); + + }); + describe('Links', function() { var fake_generic_endpoint = 'fake_generic_endpoint'; [ @@ -487,7 +919,7 @@ describe('Amqp', function () { }); }); }); - + describe('#detachReceiverLink', function() { /*Tests_SRS_NODE_COMMON_AMQP_16_027: [The `detachReceiverLink` method shall throw a ReferenceError if the `endpoint` argument is falsy.]*/ [null, undefined, ''].forEach(function(badEndpoint) { diff --git a/common/transport/amqp/yarn.lock b/common/transport/amqp/yarn.lock index 66e7e3399..6fa54dafa 100644 --- a/common/transport/amqp/yarn.lock +++ b/common/transport/amqp/yarn.lock @@ -297,7 +297,7 @@ fs.realpath@^1.0.0: version "1.0.0" resolved "https://registry.yarnpkg.com/fs.realpath/-/fs.realpath-1.0.0.tgz#1504ad2523158caa40db4a2787cb01411994ea4f" -glob@7.0.5: +glob@7.0.5, glob@^7.0.3: version "7.0.5" resolved "https://registry.yarnpkg.com/glob/-/glob-7.0.5.tgz#b4202a69099bbb4d292a7c1b95b6682b67ebdc95" dependencies: @@ -318,7 +318,7 @@ glob@^5.0.15, glob@~5.0.0: once "^1.3.0" path-is-absolute "^1.0.0" -glob@^7.0.3, glob@^7.1.1: +glob@^7.1.1: version "7.1.1" resolved "https://registry.yarnpkg.com/glob/-/glob-7.1.1.tgz#805211df04faaf1c63a3600306cdf5ade50b2ec8" dependencies: @@ -521,14 +521,10 @@ longest@^1.0.1: dependencies: brace-expansion "^1.0.0" -minimist@0.0.8: +minimist@0.0.8, minimist@~0.0.1: version "0.0.8" resolved "https://registry.yarnpkg.com/minimist/-/minimist-0.0.8.tgz#857fcabfc3397d2625b8228262e86aa7a011b05d" -minimist@~0.0.1: - version "0.0.10" - resolved "https://registry.yarnpkg.com/minimist/-/minimist-0.0.10.tgz#de3f98543dbf96082be48ad1a0c7cda836301dcf" - mkdirp@0.5.1, mkdirp@0.5.x: version "0.5.1" resolved "https://registry.yarnpkg.com/mkdirp/-/mkdirp-0.5.1.tgz#30057438eac6cf7f8c4767f38648d6697d75c903" @@ -612,10 +608,6 @@ path-is-absolute@^1.0.0: version "1.0.1" resolved "https://registry.yarnpkg.com/path-is-absolute/-/path-is-absolute-1.0.1.tgz#174b9268735534ffbc7ace6bf53a5a9e1b5c5f5f" -path-parse@^1.0.5: - version "1.0.5" - resolved "https://registry.yarnpkg.com/path-parse/-/path-parse-1.0.5.tgz#3c1adf871ea9cd6c9431b6ea2bd74a0ff055c4c1" - prelude-ls@~1.1.2: version "1.1.2" resolved "https://registry.yarnpkg.com/prelude-ls/-/prelude-ls-1.1.2.tgz#21932a549f5e52ffd9a827f570e04be62a97da54" @@ -649,16 +641,10 @@ repeat-string@^1.5.2: version "1.6.1" resolved "https://registry.yarnpkg.com/repeat-string/-/repeat-string-1.6.1.tgz#8dcae470e1c88abc2d600fff4a776286da75e637" -resolve@1.1.x: +resolve@1.1.x, resolve@^1.1.7: version "1.1.7" resolved "https://registry.yarnpkg.com/resolve/-/resolve-1.1.7.tgz#203114d82ad2c5ed9e8e0411b3932875e889e97b" -resolve@^1.1.7: - version "1.3.2" - resolved "https://registry.yarnpkg.com/resolve/-/resolve-1.3.2.tgz#1f0442c9e0cbb8136e87b9305f932f46c7f28235" - dependencies: - path-parse "^1.0.5" - right-align@^0.1.1: version "0.1.3" resolved "https://registry.yarnpkg.com/right-align/-/right-align-0.1.3.tgz#61339b722fe6a3515689210d24e14c96148613ef" diff --git a/device/core/devdoc/device_client_requirements.md b/device/core/devdoc/device_client_requirements.md index fabcc1d61..2b19d892c 100644 --- a/device/core/devdoc/device_client_requirements.md +++ b/device/core/devdoc/device_client_requirements.md @@ -60,7 +60,7 @@ client.sendEvent(new Message('hello world'), print); **SRS_NODE_DEVICE_CLIENT_16_020: [** The `open` function should start listening for C2D messages if there are listeners on the `message` event **]** -**SRS_NODE_DEVICE_CLIENT_16_064: [** The `open` method shall call the `openCallback` immediately with a null error object and a `results.Connected()` object if called while renewing the shared access signature. **]** +**SRS_NODE_DEVICE_CLIENT_16_064: [** The `open` method shall call the `openCallback` immediately with a null error object and a `results.Connected()` object if called while renewing the shared access signature. **]** **SRS_NODE_DEVICE_CLIENT_16_061: [** The `open` method shall not throw if the `openCallback` callback has not been provided. **]** @@ -75,7 +75,7 @@ client.sendEvent(new Message('hello world'), print); **SRS_NODE_DEVICE_CLIENT_16_055: [** The `close` method shall call the `closeCallback` function when done with either a single Error object if it failed or null and a results.Disconnected object if successful. **]** -**SRS_NODE_DEVICE_CLIENT_16_058: [** The `close` method shall immediately call the `closeCallback` function if provided and the transport is already disconnected. **]** +**SRS_NODE_DEVICE_CLIENT_16_058: [** The `close` method shall immediately call the `closeCallback` function if provided and the transport is already disconnected. **]** #### sendEvent(message, sendEventCallback) The `sendEvent` method sends an event message to the IoT Hub as the device indicated in the constructor argument. @@ -187,11 +187,13 @@ The `sendEventBatch` method sends a list of event messages to the IoT Hub as the **SRS_NODE_DEVICE_CLIENT_16_031: [** The `updateSharedAccessSignature` method shall throw a `ReferenceError` if the sharedAccessSignature parameter is falsy. **]** +**SRS_NODE_DEVICE_CLIENT_06_002: [** The `updateSharedAccessSignature` method shall throw a `ReferenceError` if the client was created using x509. **]** + **SRS_NODE_DEVICE_CLIENT_16_032: [** The `updateSharedAccessSignature` method shall call the `updateSharedAccessSignature` method of the transport currently in use with the sharedAccessSignature parameter. **]** **SRS_NODE_DEVICE_CLIENT_16_033: [** The `updateSharedAccessSignature` method shall reconnect the transport to the IoTHub service if it was connected before before the method is called. **]** -**SRS_NODE_DEVICE_CLIENT_16_034: [** The `updateSharedAccessSignature` method shall not reconnect the transport if the transport was disconnected to begin with. **]** +**SRS_NODE_DEVICE_CLIENT_16_034: [** The `updateSharedAccessSignature` method shall not reconnect when the 'needToReconnect' property of the result argument of the callback is false. **]** **SRS_NODE_DEVICE_CLIENT_16_035: [** The `updateSharedAccessSignature` method shall call the `done` callback with an error object if an error happened while renewing the token. **]** @@ -215,7 +217,7 @@ The `sendEventBatch` method sends a list of event messages to the IoT Hub as the **SRS_NODE_DEVICE_CLIENT_18_002: [** The `getTwin` method shall pass itself as the first parameter to `fromDeviceClient` and it shall pass the `done` method as the second parameter. **]** -**SRS_NODE_DEVICE_CLIENT_18_003: [** The `getTwin` method shall use the second parameter (if it is not falsy) to call `fromDeviceClient` on. **]** +**SRS_NODE_DEVICE_CLIENT_18_003: [** The `getTwin` method shall use the second parameter (if it is not falsy) to call `fromDeviceClient` on. **]** #### onDeviceMethod(methodName, callback) @@ -235,7 +237,7 @@ interface DeviceMethodRequest { interface DeviceMethodResponse { properties: StringMap; write(data: Buffer | string): void; - end(status: number, done?: (err: any): void); + end(status: number, done?: (err: any): void); } interface DeviceMethodEventHandler { diff --git a/device/core/lib/client.js b/device/core/lib/client.js index 2f6a9d67a..ba16fd7a8 100644 --- a/device/core/lib/client.js +++ b/device/core/lib/client.js @@ -43,7 +43,10 @@ var Client = function (transport, connStr, blobUploadClient) { if (this._connectionString && ConnectionString.parse(this._connectionString).SharedAccessKey) { /*Codes_SRS_NODE_DEVICE_CLIENT_16_027: [If a connection string argument is provided and is using SharedAccessKey authentication, the Client shall automatically generate and renew SAS tokens.] */ - this._sasRenewalTimeout = setTimeout(this._renewSharedAccessSignature.bind(this), Client.sasRenewalInterval); + this._useAutomaticRenewal = true; + this._sasRenewalTimeout = setTimeout(this._renewSharedAccessSignature.bind(this), Client.sasRenewalInterval); + } else { + this._useAutomaticRenewal = false; } this.blobUploadClient = blobUploadClient; @@ -61,12 +64,12 @@ var Client = function (transport, connStr, blobUploadClient) { this.on('newListener', function (eventName) { if (eventName === 'message') { - /* Schedules startMessageReceiver() on the next tick because the event handler for the + /* Schedules startMessageReceiver() on the next tick because the event handler for the * 'message' event is only added after this handler (for 'newListener') finishes and - * the state machine depends on having an event handler on 'message' to determine if + * the state machine depends on having an event handler on 'message' to determine if * it should connect the receiver, depending on its state. */ - process.nextTick(function() { + process.nextTick(function() { thisClient._fsm.handle('startMessageReceiver'); }); } @@ -113,7 +116,7 @@ var Client = function (transport, connStr, blobUploadClient) { /*Codes_SRS_NODE_DEVICE_CLIENT_16_045: [If the transport successfully establishes a connection the `open` method shall subscribe to the `disconnect` event of the transport.]*/ thisClient._transport.removeListener('disconnect', thisClient._disconnectHandler); // remove the old one before adding a new -- this can happen when renewing SAS tokens thisClient._transport.on('disconnect', thisClient._disconnectHandler); - transportConnectedCallback(connectErr, connectResult); + transportConnectedCallback(connectErr, connectResult); }); } else { transportConnectedCallback(null, new results.Connected()); @@ -230,6 +233,7 @@ var Client = function (transport, connStr, blobUploadClient) { } thisClient.blobUploadClient.updateSharedAccessSignature(sharedAccessSignature); + if (thisClient._twin) { thisClient._twin.updateSharedAccessSignature(); } @@ -242,21 +246,25 @@ var Client = function (transport, connStr, blobUploadClient) { } else { debug('sas token updated: ' + result.constructor.name + ' needToReconnect: ' + result.needToReconnect); /*Codes_SRS_NODE_DEVICE_CLIENT_16_033: [The updateSharedAccessSignature method shall reconnect the transport to the IoTHub service if it was connected before before the method is clled.]*/ - /*Codes_SRS_NODE_DEVICE_CLIENT_16_034: [The updateSharedAccessSignature method shall not reconnect the transport if the transport was disconnected to begin with.]*/ + /*Codes_SRS_NODE_DEVICE_CLIENT_16_034: [The `updateSharedAccessSignature` method shall not reconnect when the 'needToReconnect' property of the result argument of the callback is false.]*/ if (result.needToReconnect) { thisClient._fsm.transition('connecting'); thisClient._transport.connect(function(connectErr) { if (connectErr) { safeUpdateSasCallback(connectErr); } else { - thisClient._sasRenewalTimeout = setTimeout(thisClient._renewSharedAccessSignature.bind(thisClient), Client.sasRenewalInterval); - /*Codes_SRS_NODE_DEVICE_CLIENT_16_036: [The updateSharedAccessSignature method shall call the `updateSasCallback` callback with a null error object and a result of type SharedAccessSignatureUpdated if the oken was updated successfully.]*/ + if (thisClient._useAutomaticRenewal) { + thisClient._sasRenewalTimeout = setTimeout(thisClient._renewSharedAccessSignature.bind(thisClient), Client.sasRenewalInterval); + /*Codes_SRS_NODE_DEVICE_CLIENT_16_036: [The updateSharedAccessSignature method shall call the `updateSasCallback` callback with a null error object and a result of type SharedAccessSignatureUpdated if the oken was updated successfully.]*/ + } safeUpdateSasCallback(null, new results.SharedAccessSignatureUpdated(false)); } }); } else { - thisClient._sasRenewalTimeout = setTimeout(thisClient._renewSharedAccessSignature.bind(thisClient), Client.sasRenewalInterval); - /*Codes_SRS_NODE_DEVICE_CLIENT_16_036: [The updateSharedAccessSignature method shall call the `updateSasCallback` callback with a null error object and a result of type SharedAccessSignatureUpdated if the oken was updated successfully.]*/ + if (thisClient._useAutomaticRenewal) { + thisClient._sasRenewalTimeout = setTimeout(thisClient._renewSharedAccessSignature.bind(thisClient), Client.sasRenewalInterval); + /*Codes_SRS_NODE_DEVICE_CLIENT_16_036: [The updateSharedAccessSignature method shall call the `updateSasCallback` callback with a null error object and a result of type SharedAccessSignatureUpdated if the oken was updated successfully.]*/ + } safeUpdateSasCallback(null, new results.SharedAccessSignatureUpdated(false)); } } @@ -332,7 +340,7 @@ var Client = function (transport, connStr, blobUploadClient) { } } }); - + this._fsm.on('transition', function (data) { debug('Client state change: ' + data.fromState + ' -> ' + data.toState + ' (action: ' + data.action + ')'); }); @@ -430,7 +438,7 @@ Client.prototype._connectMessageReceiver = function () { self.emit('message', msg); }); }); - + }; Client.prototype._connectMethodReceiver = function () { @@ -475,8 +483,7 @@ Client.prototype._disconnectReceiver = function () { Client.prototype._renewSharedAccessSignature = function () { var cn = ConnectionString.parse(this._connectionString); var sas = SharedAccessSignature.create(cn.HostName, cn.DeviceId, cn.SharedAccessKey, anHourFromNow()); - - this.updateSharedAccessSignature(sas.toString(), function (err) { + this._fsm.handle('updateSharedAccessSignature', sas.toString(), function (err) { if (err) { /*Codes_SRS_NODE_DEVICE_CLIENT_16_006: [The ‘error’ event shall be emitted when an error occurred within the client code.] */ this.emit('error', err); @@ -565,11 +572,20 @@ Client.fromSharedAccessSignature = function (sharedAccessSignature, Transport) { * completes execution. * * @throws {ReferenceError} If the sharedAccessSignature parameter is falsy. + * @throws {ReferenceError} If the client uses x509 authentication. */ Client.prototype.updateSharedAccessSignature = function (sharedAccessSignature, updateSasCallback) { /*Codes_SRS_NODE_DEVICE_CLIENT_16_031: [The updateSharedAccessSignature method shall throw a ReferenceError if the sharedAccessSignature parameter is falsy.]*/ if (!sharedAccessSignature) throw new ReferenceError('sharedAccessSignature is falsy'); - this._fsm.handle('updateSharedAccessSignature', sharedAccessSignature, updateSasCallback); + if (this._useAutomaticRenewal) console.log('calling updateSharedAccessSignature while using automatic sas renewal'); + /*Codes_SRS_NODE_DEVICE_CLIENT_06_002: [The `updateSharedAccessSignature` method shall throw a `ReferenceError` if the client was created using x509.]*/ + if (this._connectionString && ConnectionString.parse(this._connectionString).x509) throw new ReferenceError('client uses x509'); + this._fsm.handle('updateSharedAccessSignature', sharedAccessSignature, function (err, result) { + if (!err) { + this.emit('_sharedAccessSignatureUpdated'); + } + safeCallback(updateSasCallback, err, result); + }.bind(this)); }; /** @@ -667,10 +683,10 @@ Client.prototype.setTransportOptions = function (options, done) { /** * @method module:azure-iot-device.Client#setOptions * @description The `setOptions` method let the user configure the client. - * + * * @param {Object} options The options structure * @param {Function} done The callback that shall be called when setOptions is finished. - * + * * @throws {ReferenceError} If the options structure is falsy */ diff --git a/device/core/package.json b/device/core/package.json index 65b62746f..40a3ec040 100644 --- a/device/core/package.json +++ b/device/core/package.json @@ -33,7 +33,7 @@ "alltest": "istanbul cover node_modules/mocha/bin/_mocha -- --reporter spec \"test/**/_*_test*.js\"", "ci": "npm -s run lint && npm -s run alltest-min && npm -s run check-cover", "test": "npm -s run lint && npm -s run unittest", - "check-cover": "istanbul check-coverage --statements 98 --branches 94 --functions 99 --lines 98" + "check-cover": "istanbul check-coverage --statements 98 --branches 94 --functions 100 --lines 99" }, "engines": { "node": ">= 0.10" diff --git a/device/core/test/_client_test.js b/device/core/test/_client_test.js index 7b02f20e7..06ea7e6ea 100644 --- a/device/core/test/_client_test.js +++ b/device/core/test/_client_test.js @@ -16,6 +16,7 @@ var errors = require('azure-iot-common').errors; var Message = require('azure-iot-common').Message; describe('Client', function () { + var sharedKeyConnectionString = 'HostName=host;DeviceId=id;SharedAccessKey=key'; describe('#constructor', function () { /*Tests_SRS_NODE_DEVICE_CLIENT_05_001: [The Client constructor shall throw ReferenceError if the transport argument is falsy.]*/ it('throws if transport arg is falsy', function () { @@ -28,7 +29,6 @@ describe('Client', function () { }); describe('#fromConnectionString', function () { - var connectionString = 'HostName=host;DeviceId=id;SharedAccessKey=key'; /*Tests_SRS_NODE_DEVICE_CLIENT_05_003: [The fromConnectionString method shall throw ReferenceError if the connStr argument is falsy.]*/ it('throws if connStr arg is falsy', function () { @@ -42,7 +42,7 @@ describe('Client', function () { /*Tests_SRS_NODE_DEVICE_CLIENT_05_006: [The fromConnectionString method shall return a new instance of the Client object, as by a call to new Client(new Transport(...)).]*/ it('returns an instance of Client', function () { var DummyTransport = function () { }; - var client = Client.fromConnectionString(connectionString, DummyTransport); + var client = Client.fromConnectionString(sharedKeyConnectionString, DummyTransport); assert.instanceOf(client, Client); }); @@ -63,8 +63,9 @@ describe('Client', function () { }; }; - var client = Client.fromConnectionString(connectionString, DummyTransport); + var client = Client.fromConnectionString(sharedKeyConnectionString, DummyTransport); assert.instanceOf(client, Client); + client.sasRenewalInterval = 2700000; tick = firstTick; this.clock.tick(tick); // 30 minutes. shouldn't have updated yet. assert.isFalse(sasUpdated); @@ -429,6 +430,40 @@ describe('Client', function () { client.open(); }); }); + + [true, false].forEach( function(doReconnect) { + it('should invoke the renew path with a reconnect of ' + doReconnect.toString(), function(testCallback){ + this.clock = sinon.useFakeTimers(); + var connectedResults = new results.Connected(); + var dummyTransport = { + connect: function(callback) { + callback(null, connectedResults); + }, + on: function () {}, + updateSharedAccessSignature: sinon.stub().callsArgWith(1, null, new results.SharedAccessSignatureUpdated(doReconnect)), + removeListener: function () {} + }; + + var client = new Client(dummyTransport, sharedKeyConnectionString); + client.blobUploadClient = { updateSharedAccessSignature: function() {} }; + var renewalInterval = Client.sasRenewalInterval; + client.open(function(err, res) { + if (err) { + testCallback(err); + } else { + assert.equal(res, connectedResults); + assert(dummyTransport.updateSharedAccessSignature.notCalled); + this.clock.tick(renewalInterval+1); + process.nextTick(function () { + assert(dummyTransport.updateSharedAccessSignature.calledOnce); + this.clock.restore(); + testCallback(); + }.bind(this)); + } + }.bind(this)); + }); + }); + }); describe('#close', function () { @@ -1385,6 +1420,16 @@ describe('Client', function () { }); }); + /*Tests_SRS_NODE_DEVICE_CLIENT_06_002: [The `updateSharedAccessSignature` method shall throw a `ReferenceError` if the client was created using x509.]*/ + it('throws a ReferenceError if client created using x509', function () { + var DummyTransport = function () { }; + var client = new Client(new DummyTransport(), 'HostName=host;DeviceId=id;x509=true'); + assert.throws(function () { + client.updateSharedAccessSignature('sas', function () { }); + }, ReferenceError); + }); + + /*Tests_SRS_NODE_DEVICE_CLIENT_16_032: [The updateSharedAccessSignature method shall call the updateSharedAccessSignature method of the transport currently in use with the sharedAccessSignature parameter.]*/ it('calls the transport `updateSharedAccessSignature` method with the sharedAccessSignature parameter', function () { var DummyTransport = function () { @@ -1433,7 +1478,7 @@ describe('Client', function () { }); }); - /*Tests_SRS_NODE_DEVICE_CLIENT_16_034: [The updateSharedAccessSignature method shall not reconnect the transport if the transport was disconnected to begin with.]*/ + /*Tests_SRS_NODE_DEVICE_CLIENT_16_034: [The `updateSharedAccessSignature` method shall not reconnect when the 'needToReconnect' property of the result argument of the callback is false.]*/ it('Doesn\'t reconnect the transport if it\'s not necessary', function (done) { var DummyTransport = function () { this.connect = function () { assert.fail(); }; @@ -1560,6 +1605,7 @@ describe('Client', function () { }); }); }); + }); describe('getTwin', function() { diff --git a/device/transport/amqp/devdoc/device_amqp_requirements.md b/device/transport/amqp/devdoc/device_amqp_requirements.md index f166223dc..09a4bef28 100644 --- a/device/transport/amqp/devdoc/device_amqp_requirements.md +++ b/device/transport/amqp/devdoc/device_amqp_requirements.md @@ -51,6 +51,10 @@ The `connect` method establishes a connection with the Azure IoT Hub instance. **SRS_NODE_DEVICE_AMQP_16_008: [**The `done` callback method passed in argument shall be called if the connection is established**]** **SRS_NODE_DEVICE_AMQP_16_009: [**The `done` callback method passed in argument shall be called with an error object if the connection fails**]** +**SRS_NODE_DEVICE_AMQP_06_005: [** If x509 authentication is NOT being utilized then `initializeCBS` shall be invoked. **]** +**SRS_NODE_DEVICE_AMQP_06_008: [** If `initializeCBS` is not successful then the client will remain disconnected and the callback will be called with an error per SRS_NODE_DEVICE_AMQP_16_009. **]** +**SRS_NODE_DEVICE_AMQP_06_006: [**If `initializeCBS` is successful, `putToken` shall be invoked with the first parameter audience, created from the sr of the sas signature, the next parameter of the actual sas, and a callback. **]** +**SRS_NODE_DEVICE_AMQP_06_009: [** If `putToken` is not successful then the client will remain disconnected and the callback will be called with an error per SRS_NODE_DEVICE_AMQP_16_009. **]** ### disconnect(done) The `disconnect` method terminates the connection with the Azure IoT Hub instance. @@ -104,11 +108,9 @@ Gets the AmqpReceiver object used to subscribe to messages and errors sent to th **SRS_NODE_DEVICE_AMQP_16_015: [**The `updateSharedAccessSignature` method shall save the new shared access signature given as a parameter to its configuration.**]** -**SRS_NODE_DEVICE_AMQP_16_016: [**The `updateSharedAccessSignature` method shall disconnect the current connection operating with the deprecated token, and re-initialize the transport object with the new connection parameters.**]** +**SRS_NODE_DEVICE_AMQP_06_010: [** The `updateSharedAccessSignature` method shall call the amqp transport `putToken` method with the first parameter audience, created from the sr of the sas signature, the next parameter of the actual sas, and a callback. *]** -**SRS_NODE_DEVICE_AMQP_16_017: [**The `updateSharedAccessSignature` method shall call the `done` method with an Error object if updating the configuration or re-initializing the transport object.**]** - -**SRS_NODE_DEVICE_AMQP_16_018: [**The `updateSharedAccessSignature` method shall call the `done` callback with a null error object and a SharedAccessSignatureUpdated object as a result, indicating that the client needs to reestablish the transport connection when ready.**]** +**SRS_NODE_DEVICE_AMQP_06_011: [** The `updateSharedAccessSignature` method shall call the `done` callback with a null error object and a SharedAccessSignatureUpdated object as a result, indicating the client does NOT need to reestablish the transport connection. **]** ### sendMethodResponse(methodResponse, callback) diff --git a/device/transport/amqp/lib/amqp.js b/device/transport/amqp/lib/amqp.js index 84d98fdb3..3583232ea 100644 --- a/device/transport/amqp/lib/amqp.js +++ b/device/transport/amqp/lib/amqp.js @@ -7,6 +7,12 @@ var EventEmitter = require('events').EventEmitter; var util = require('util'); var Base = require('azure-iot-amqp-base').Amqp; var endpoint = require('azure-iot-common').endpoint; +var SharedAccessSignature = require('azure-iot-common').SharedAccessSignature; + +var UnauthorizedError = require('azure-iot-common').errors.UnauthorizedError; +var DeviceNotFoundError = require('azure-iot-common').errors.DeviceNotFoundError; +var NotConnectedError = require('azure-iot-common').errors.NotConnectedError; + var PackageJson = require('../package.json'); var results = require('azure-iot-common').results; var translateError = require('./amqp_device_errors.js'); @@ -52,6 +58,48 @@ var handleResult = function (errorMessage, done) { }; }; +var getTranslatedError = function(err, message) { + if (err instanceof UnauthorizedError || err instanceof NotConnectedError || err instanceof DeviceNotFoundError) { + return err; + } + return translateError(message, err); +}; + +Amqp.prototype._commonConnect = function _commonConnect(uri, done) { + var sslOptions = this._config.x509; + this._amqp.connect(uri, sslOptions, function (err, connectResult) { + if (err) { + done(translateError('AMQP Transport: Could not connect', err)); + } else { + if (!sslOptions) { + /*Codes_SRS_NODE_DEVICE_AMQP_06_005: [If x509 authentication is NOT being utilized then `initializeCBS` shall be invoked.]*/ + this._amqp.initializeCBS(function (err) { + if (err) { + /*Codes_SRS_NODE_DEVICE_AMQP_06_008: [If `initializeCBS` is not successful then the client will be disconnected.]*/ + this._amqp.disconnect(function() { + done(getTranslatedError(err, 'AMQP Transport: Could not initialize CBS')); + }); + } else { + /*Codes_SRS_NODE_DEVICE_AMQP_06_006: [If `initializeCBS` is successful, `putToken` shall be invoked If `initializeCBS` is successful, `putToken` shall be invoked with the first parameter audience, created from the sr of the sas signature, the next parameter of the actual sas, and a callback.]*/ + this._amqp.putToken(SharedAccessSignature.parse(this._config.sharedAccessSignature, ['sr', 'sig', 'se']).sr, this._config.sharedAccessSignature, function(err) { + if (err) { + /*Codes_SRS_NODE_DEVICE_AMQP_06_009: [If `putToken` is not successful then the client will be disconnected.]*/ + this._amqp.disconnect(function() { + done(getTranslatedError(err, 'AMQP Transport: Could not authorize with puttoken')); + }); + } else { + done(null, connectResult); + } + }.bind(this)); + } + }.bind(this)); + } else { + done(null, connectResult); + } + } + }.bind(this)); +}; + /** * @method module:azure-iot-device-amqp.Amqp#connect * @description Establishes a connection with the IoT Hub instance. @@ -61,19 +109,8 @@ var handleResult = function (errorMessage, done) { /*Codes_SRS_NODE_DEVICE_AMQP_16_008: [The done callback method passed in argument shall be called if the connection is established]*/ /*Codes_SRS_NODE_DEVICE_AMQP_16_009: [The done callback method passed in argument shall be called with an error object if the connecion fails]*/ Amqp.prototype.connect = function connect(done) { - var uri = 'amqps://'; - if (!this._config.x509) { - uri += encodeURIComponent(this._config.deviceId) + - '@sas.' + - this._config.hubName + - ':' + - encodeURIComponent(this._config.sharedAccessSignature) + '@'; - } - uri += this._config.host; - - var sslOptions = this._config.x509; - - this._amqp.connect(uri, sslOptions, handleResult('AMQP Transport: Could not connect', done)); + var uri = 'amqps://' + this._config.host; + this._commonConnect(uri, done); }; /** @@ -164,17 +201,19 @@ Amqp.prototype.abandon = function (message, done) { * @param {Function} done The callback to be invoked when `updateSharedAccessSignature` completes. */ Amqp.prototype.updateSharedAccessSignature = function (sharedAccessSignature, done) { - this._amqp.disconnect(function (err) { + /*Codes_SRS_NODE_DEVICE_AMQP_16_015: [The updateSharedAccessSignature method shall save the new shared access signature given as a parameter to its configuration.] */ + this._config.sharedAccessSignature = sharedAccessSignature; + /*Codes_SRS_NODE_DEVICE_AMQP_06_010: [The `updateSharedAccessSignature` method shall call the amqp transport `putToken` method with the first parameter audience, created from the sr of the sas signature, the next parameter of the actual sas, and a callback.]*/ + this._amqp.putToken(SharedAccessSignature.parse(this._config.sharedAccessSignature, ['sr', 'sig', 'se']).sr, this._config.sharedAccessSignature, function(err) { if (err) { - /*Codes_SRS_NODE_DEVICE_AMQP_16_017: [The updateSharedAccessSignature method shall call the `done` method with an Error object if updating the configuration or re-initializing the transport object.] */ - if (done) done(err); + this._amqp.disconnect(function() { + if (done) { + done(getTranslatedError(err, 'AMQP Transport: Could not authorize with puttoken')); + } + }); } else { - /*Codes_SRS_NODE_DEVICE_AMQP_16_015: [The updateSharedAccessSignature method shall save the new shared access signature given as a parameter to its configuration.] */ - this._config.sharedAccessSignature = sharedAccessSignature; - /*Codes_SRS_NODE_DEVICE_AMQP_16_016: [The updateSharedAccessSignature method shall disconnect the current connection operating with the deprecated token, and re-initialize the transport object with the new connection parameters.] */ - this._initialize(); - /*Codes_SRS_NODE_DEVICE_AMQP_16_018: [The updateSharedAccessSignature method shall call the `done` callback with a null error object and a SharedAccessSignatureUpdated object as a result, indicating that the client needs to reestablish the transport connection when ready.] */ - done(null, new results.SharedAccessSignatureUpdated(true)); + /*Codes_SRS_NODE_DEVICE_AMQP_06_011: [The `updateSharedAccessSignature` method shall call the `done` callback with a null error object and a SharedAccessSignatureUpdated object as a result, indicating the client does NOT need to reestablish the transport connection.]*/ + if (done) done(null, new results.SharedAccessSignatureUpdated(false)); } }.bind(this)); }; diff --git a/device/transport/amqp/lib/amqp_ws.js b/device/transport/amqp/lib/amqp_ws.js index e08989ef3..f96e4d9c9 100644 --- a/device/transport/amqp/lib/amqp_ws.js +++ b/device/transport/amqp/lib/amqp_ws.js @@ -5,34 +5,6 @@ var util = require('util'); var Amqp = require('./amqp.js'); -var errors = require('azure-iot-common').errors; -var translateCommonError = require('azure-iot-amqp-base').translateError; - -var translateError = function translateError(message, amqpError) { - var error; - - if (amqpError.constructor.name === 'AMQPError' && amqpError.condition.contents === 'amqp:resource-limit-exceeded') { - /*Codes_SRS_NODE_DEVICE_AMQP_DEVICE_ERRORS_16_001: [`translateError` shall return an `IotHubQuotaExceededError` if the AMQP error condition is `amqp:resource-limit-exceeded`.]*/ - error = new errors.IotHubQuotaExceededError(message); - } else { - error = translateCommonError(message, amqpError); - } - - error.amqpError = amqpError; - - return error; -}; - - -var handleResult = function (errorMessage, done) { - return function (err, result) { - if (err) { - done(translateError(errorMessage, err)); - } else { - done(null, result); - } - }; -}; /** * @class module:azure-iot-device-amqp.AmqpWs @@ -51,20 +23,8 @@ function AmqpWs(config) { util.inherits(AmqpWs, Amqp); AmqpWs.prototype.connect = function connect(done) { - var uri = 'wss://'; - if (!this._config.x509) { - uri += encodeURIComponent(this._config.deviceId) + - '@sas.' + - this._config.hubName + - ':' + - encodeURIComponent(this._config.sharedAccessSignature) + - '@'; - } - uri += this._config.host + ':443/$iothub/websocket'; - - var sslOptions = this._config.x509; - - this._amqp.connect(uri, sslOptions, handleResult('AMQP Transport: Could not connect', done)); + var uri = 'wss://' + this._config.host + ':443/$iothub/websocket'; + this._commonConnect(uri,done); }; diff --git a/device/transport/amqp/package.json b/device/transport/amqp/package.json index 171a809a7..5f3a7a541 100644 --- a/device/transport/amqp/package.json +++ b/device/transport/amqp/package.json @@ -31,7 +31,7 @@ "alltest": "istanbul cover node_modules/mocha/bin/_mocha -- --reporter spec test/_*_test*.js", "ci": "npm -s run lint && npm -s run alltest-min && npm -s run check-cover", "test": "npm -s run lint && npm -s run unittest", - "check-cover": "istanbul check-coverage --statements 95 --branches 86 --functions 96 --lines 96" + "check-cover": "istanbul check-coverage --statements 99 --branches 92 --functions 100 --lines 99" }, "engines": { "node": ">= 0.10" diff --git a/device/transport/amqp/test/_amqp_test.js b/device/transport/amqp/test/_amqp_test.js index 831507d9b..fc37d30b4 100644 --- a/device/transport/amqp/test/_amqp_test.js +++ b/device/transport/amqp/test/_amqp_test.js @@ -8,12 +8,17 @@ var sinon = require('sinon'); var Message = require('azure-iot-common').Message; var Amqp = require('../lib/amqp.js'); +var errors = require('azure-iot-common').errors; + describe('Amqp', function () { var transport = null; var receiver = null; var testMessage = new Message(); var testCallback = function () { }; + var configWithSSLOptions = { host: 'hub.host.name', hubName: 'hub', deviceId: 'deviceId', x509: 'some SSL options' }; + var simpleSas = 'SharedAccessSignature sr=foo&sig=123&se=123'; + var configWithSAS = { host: 'hub.host.name', hubName: 'hub', deviceId: 'deviceId', sharedAccessSignature: simpleSas}; beforeEach(function () { var DummyReceiver = function () { @@ -24,7 +29,7 @@ describe('Amqp', function () { receiver = new DummyReceiver(); - transport = new Amqp({ host: 'hub.host.name', hubName: 'hub', deviceId: 'deviceId', sas: 'sas.key' }); + transport = new Amqp(configWithSAS); transport._receiver = receiver; transport._deviceMethodClient = { sendMethodResponse: sinon.spy() @@ -79,6 +84,93 @@ describe('Amqp', function () { }); }); + describe('#connect', function () { + /*Tests_SRS_NODE_DEVICE_AMQP_16_008: [The `done` callback method passed in argument shall be called if the connection is established]*/ + it('calls done if connection established using SSL', function () { + var transport = new Amqp(configWithSSLOptions); + sinon.stub(transport._amqp,'connect').callsArgWith(2,null); + transport.connect(function(err) { + assert.isNotOk(err); + }); + }); + + /*Tests_SRS_NODE_DEVICE_AMQP_16_009: [The `done` callback method passed in argument shall be called with an error object if the connection fails]*/ + it('calls done with an error if connection failed', function () { + var transport = new Amqp(configWithSSLOptions); + sinon.stub(transport._amqp,'connect').callsArgWith(2,new errors.UnauthorizedError('cryptic')); + transport.connect(function(err) { + assert.isOk(err); + }); + }); + + /*Tests_SRS_NODE_DEVICE_AMQP_06_005: [If x509 authentication is NOT being utilized then `initializeCBS` shall be invoked.]*/ + /*Tests_SRS_NODE_DEVICE_AMQP_06_008: [If `initializeCBS` is not successful then the client will remain disconnected and the callback will be called with an error per SRS_NODE_DEVICE_AMQP_16_009.]*/ + it('Invokes initializeCBS if NOT using x509 - initialize fails and disconnects', function () { + var testError = new errors.NotConnectedError('fake error'); + var transport = new Amqp(configWithSAS); + sinon.stub(transport._amqp,'connect').callsArgWith(2, null); + sinon.stub(transport._amqp,'initializeCBS').callsArgWith(0, testError); + sinon.stub(transport._amqp,'disconnect').callsArgWith(0, null); + transport.connect(function(err) { + assert.instanceOf(err, Error); + }); + }); + + /*Tests_SRS_NODE_DEVICE_AMQP_06_006: [If `initializeCBS` is successful, `putToken` shall be invoked If `initializeCBS` is successful, `putToken` shall be invoked with the first parameter audience, created from the sr of the sas signature, the next parameter of the actual sas, and a callback.]*/ + /*Tests_SRS_NODE_DEVICE_AMQP_06_009: [If `putToken` is not successful then the client will remain disconnected and the callback will be called with an error per SRS_NODE_DEVICE_AMQP_16_009.]*/ + it('Invokes putToken - puttoken fails and disconnects', function () { + var testError = new errors.NotConnectedError('fake error'); + var transport = new Amqp(configWithSAS); + sinon.stub(transport._amqp,'connect').callsArgWith(2, null); + sinon.stub(transport._amqp,'initializeCBS').callsArgWith(0, null); + sinon.stub(transport._amqp,'putToken').callsArgWith(2, testError); + sinon.stub(transport._amqp,'disconnect').callsArgWith(0, null); + transport.connect(function(err) { + assert.instanceOf(err, Error); + }); + }); + + /*Tests_SRS_NODE_DEVICE_AMQP_16_008: [The `done` callback method passed in argument shall be called if the connection is established]*/ + it('Connect calls done when using sas', function () { + var testError = new Error('fake error'); + var transport = new Amqp(configWithSAS); + sinon.stub(transport._amqp,'connect').callsArgWith(2, null, testError); + sinon.stub(transport._amqp,'initializeCBS').callsArgWith(0, null); + sinon.stub(transport._amqp,'putToken').callsArgWith(2, null); + transport.connect(function(err, result) { + assert.isNotOk(err); + assert.strictEqual(result, testError); + }); + }); + + }); + + describe('#updateSharedAccessSignature', function() { + + /*Tests_SRS_NODE_DEVICE_AMQP_16_015: [The `updateSharedAccessSignature` method shall save the new shared access signature given as a parameter to its configuration.]*/ + /*Tests_SRS_NODE_DEVICE_AMQP_06_011: [The `updateSharedAccessSignature` method shall call the `done` callback with a null error object and a SharedAccessSignatureUpdated object as a result, indicating the client does NOT need to reestablish the transport connection.]*/ + /*Tests_SRS_NODE_DEVICE_AMQP_06_010: [The `updateSharedAccessSignature` method shall call the amqp transport `putToken` method with the first parameter audience, created from the sr of the sas signature, the next parameter of the actual sas, and a callback.]*/ + it('saves sharedAccessSignature - invokes puttoken and passes back result indicating NOT needing to reconnect', function () { + var transport = new Amqp(configWithSAS); + sinon.stub(transport._amqp,'putToken').callsArgWith(2, null); + transport.updateSharedAccessSignature(simpleSas, function(err, result) { + assert.equal(transport._config.sharedAccessSignature, simpleSas); + assert.isNotOk(err); + assert.isFalse(result.needToReconnect); + }); + }); + + it('invokes puttoken and an error results', function () { + var testError = new Error('fake error'); + var transport = new Amqp(configWithSAS); + sinon.stub(transport._amqp,'putToken').callsArgWith(2, testError); + transport.updateSharedAccessSignature(simpleSas, function(err) { + assert.strictEqual(err, Error); + }); + }); + + }); + describe('#setOptions', function () { var testOptions = { http: { @@ -88,7 +180,7 @@ describe('Amqp', function () { /*Tests_SRS_NODE_DEVICE_AMQP_06_001: [The `setOptions` method shall throw a ReferenceError if the `options` parameter has not been supplied.]*/ [undefined, null, ''].forEach(function (badOptions){ it('throws if options is \'' + badOptions +'\'', function () { - var transport = new Amqp({ host: 'hub.host.name', hubName: 'hub', deviceId: 'deviceId', sas: 'sas.key' }); + var transport = new Amqp(configWithSAS); assert.throws(function () { transport.setOptions(badOptions); }, ReferenceError, ''); @@ -97,13 +189,13 @@ describe('Amqp', function () { /*Tests_SRS_NODE_DEVICE_AMQP_06_002: [If `done` has been specified the `setOptions` method shall call the `done` callback with no arguments when successful.]*/ it('calls the done callback with no arguments', function(done) { - var transport = new Amqp({ host: 'hub.host.name', hubName: 'hub', deviceId: 'deviceId', sas: 'sas.key' }); + var transport = new Amqp(configWithSAS); transport.setOptions(testOptions, done); }); /*Tests_SRS_NODE_DEVICE_AMQP_06_003: [`setOptions` should not throw if `done` has not been specified.]*/ it('does not throw if `done` is not specified', function() { - var transport = new Amqp({ host: 'hub.host.name', hubName: 'hub', deviceId: 'deviceId', sas: 'sas.key' }); + var transport = new Amqp(configWithSAS); assert.doesNotThrow(function() { transport.setOptions({}); }); diff --git a/e2etests/test/device_method.js b/e2etests/test/device_method.js index 005bfd825..d36f42691 100644 --- a/e2etests/test/device_method.js +++ b/e2etests/test/device_method.js @@ -7,6 +7,7 @@ var Registry = require('azure-iothub').Registry; var ServiceClient = require('azure-iothub').Client; var ConnectionString = require('azure-iothub').ConnectionString; var SharedAccessSignature = require('azure-iothub').SharedAccessSignature; +var deviceSas = require('azure-iot-device').SharedAccessSignature; var deviceSdk = require('azure-iot-device'); var anHourFromNow = require('azure-iot-common').anHourFromNow; var util = require('util'); @@ -102,7 +103,7 @@ module.exports = function(hubConnectionString, protocols) { setTimeout(function() { // make the method call via the service var methodParams = { - methodName: methodName, + methodName: methodName, payload: testPayload, timeoutInSeconds: 10 }; @@ -150,7 +151,7 @@ module.exports = function(hubConnectionString, protocols) { sendMethodCall(serviceClient, testPayload, done); }, 1000); }); - deviceClient._renewSharedAccessSignature(); + deviceClient.updateSharedAccessSignature(deviceSas.create(ConnectionString.parse(hubConnectionString).HostName, deviceDescription.deviceId, deviceDescription.authentication.symmetricKey.primaryKey, anHourFromNow()).toString()); }); }); }); diff --git a/e2etests/test/twin_e2e_tests.js b/e2etests/test/twin_e2e_tests.js index 7c345ca57..2519f1a8b 100644 --- a/e2etests/test/twin_e2e_tests.js +++ b/e2etests/test/twin_e2e_tests.js @@ -7,6 +7,8 @@ var Registry = require('azure-iothub').Registry; var ConnectionString = require('azure-iothub').ConnectionString; var deviceSdk = require('azure-iot-device'); var deviceMqtt = require('azure-iot-device-mqtt').Mqtt; +var deviceSas = require('azure-iot-device').SharedAccessSignature; +var anHourFromNow = require('azure-iot-common').anHourFromNow; var uuid = require('uuid'); var _ = require('lodash'); var assert = require('chai').assert; @@ -18,7 +20,7 @@ var newProps = { baz : 2, tweedle : { dee : 3 - } + } } }; @@ -37,7 +39,7 @@ var nullIndividualProps = { }; var nullMergeResult = JSON.parse(JSON.stringify(newProps)); -delete nullMergeResult.tweedle; +delete nullMergeResult.tweedle; var runTests = function (hubConnectionString) { describe('Twin', function() { @@ -54,7 +56,7 @@ var runTests = function (hubConnectionString) { var host = ConnectionString.parse(hubConnectionString).HostName; var pkey = new Buffer(uuid.v4()).toString('base64'); var deviceId = '0000e2etest-delete-me-twin-e2e-' + uuid.v4(); - + deviceDescription = { deviceId: deviceId, status: 'enabled', @@ -95,7 +97,7 @@ var runTests = function (hubConnectionString) { if (deviceClient) { deviceClient.close(function(err) { if (err) return done(err); - + var registry = Registry.fromConnectionString(hubConnectionString); registry.delete(deviceDescription.deviceId, function(err) { if (err) return done(err); @@ -119,7 +121,7 @@ var runTests = function (hubConnectionString) { var compare = function(left, right) { _.every(_.keys(right), function(key) { if (typeof right[key] !== 'function' && !key.startsWith('$')) { - assert.equal(left[key], right[key], 'key ' + key + ' not matched between service and device'); + assert.equal(left[key], right[key], 'key ' + key + ' not matched between service and device'); } }); }; @@ -199,7 +201,7 @@ var runTests = function (hubConnectionString) { }; it('sends and receives desired properties', sendsAndReceivesDesiredProperties); - + var mergeDesiredProperties = function(first, second, newEtag, result, done) { serviceTwin.update( { properties : { desired : first } }, function(err) { if (err) return done(err); @@ -284,7 +286,7 @@ var runTests = function (hubConnectionString) { sendsAndReceiveReportedProperties(done); }, 1000); }); - deviceClient._renewSharedAccessSignature(); + deviceClient.updateSharedAccessSignature(deviceSas.create(ConnectionString.parse(hubConnectionString).HostName, deviceDescription.deviceId, deviceDescription.authentication.symmetricKey.primaryKey, anHourFromNow()).toString()); }); it('can receive desired properties from the service after renewing the sas token', function(done) { @@ -294,7 +296,7 @@ var runTests = function (hubConnectionString) { sendsAndReceivesDesiredProperties(done); }, 1000); }); - deviceClient._renewSharedAccessSignature(); + deviceClient.updateSharedAccessSignature(deviceSas.create(ConnectionString.parse(hubConnectionString).HostName, deviceDescription.deviceId, deviceDescription.authentication.symmetricKey.primaryKey, anHourFromNow()).toString()); }); it.skip('call null out all reported properties', function(done) { @@ -337,7 +339,7 @@ var runTests = function (hubConnectionString) { mergeTags(newProps, nullIndividualProps, "*", nullMergeResult, done); }); - it('can renew SAS 20 times without failure', function(done) + it('can renew SAS 20 times without failure', function(done) { this.timeout(120000); var iteration = 0; @@ -346,7 +348,7 @@ var runTests = function (hubConnectionString) { if (iteration === 20) { done(); } else { - deviceClient._renewSharedAccessSignature(); + deviceClient.updateSharedAccessSignature(deviceSas.create(ConnectionString.parse(hubConnectionString).HostName, deviceDescription.deviceId, deviceDescription.authentication.symmetricKey.primaryKey, anHourFromNow()).toString()); // at this point, signature renewal has begun, but the connection is // not complete. We need a "connection complete" event here, but // we don't have one yet. Instead, sleep for a while -- 3 seconds @@ -385,7 +387,7 @@ var runTests = function (hubConnectionString) { callback(); } ], done); - + }); });