Skip to content

Commit

Permalink
Merge commit 'a529bcdbad2237b093cb079b81f5579a257bb1b5' into HEAD
Browse files Browse the repository at this point in the history
  • Loading branch information
Azure IoT Builder committed Sep 10, 2018
2 parents 6f9e859 + a529bcd commit 38727d4
Show file tree
Hide file tree
Showing 10 changed files with 106 additions and 31 deletions.
2 changes: 1 addition & 1 deletion common/core/src/endpoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export function moduleEventPath(deviceId: string, moduleId: string): string {
}

export function moduleMessagePath(deviceId: string, moduleId: string): string {
return modulePath(deviceId, moduleId) + '/messages/devicebound';
return modulePath(deviceId, moduleId) + '/messages/events';
}

export function moduleMethodPath(deviceId: string, moduleId: string): string {
Expand Down
2 changes: 1 addition & 1 deletion common/core/test/_endpoint_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ describe('endpoint', function () {
},
{
name: 'moduleMessagePath',
expected: '/devices/mydevice/modules/mymodule/messages/devicebound',
expected: '/devices/mydevice/modules/mymodule/messages/events',
actual: endpoint.moduleMessagePath(deviceId, moduleId)
},
{
Expand Down
7 changes: 6 additions & 1 deletion common/transport/amqp/src/amqp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,11 @@ export class Amqp {
this._fsm.transition('disconnecting', null, err);
},
error: (context: EventContext) => {
this._fsm.transition('disconnecting', null, context.connection.error);
if (context && context.connection) {
this._fsm.transition('disconnecting', null, context.connection.error);
} else {
this._fsm.transition('disconnecting', null, context);
}
},
disconnected: (context: EventContext) => {
this._disconnectionOccurred = true;
Expand Down Expand Up @@ -660,6 +664,7 @@ export class Amqp {
if (config.sslOptions) {
connectionParameters.cert = config.sslOptions.cert;
connectionParameters.key = config.sslOptions.key;
connectionParameters.ca = config.sslOptions.ca;
}
connectionParameters.port = parsedUrl.port ? ( parsedUrl.port ) : (5671);
connectionParameters.transport = 'tls';
Expand Down
2 changes: 2 additions & 0 deletions device/core/devdoc/internal_client_requirements.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ The `sendEventBatch` method sends a list of event messages to the IoT Hub as the

**SRS_NODE_INTERNAL_CLIENT_16_044: [** The `done` callback shall be invoked with a standard javascript `Error` object and no result object if the client could not be configured as requested. **]**

**SRS_NODE_INTERNAL_CLIENT_06_001: [** The `setOptions` method shall first test if the `ca` property is the name of an already existent file. If so, it will attempt to read that file as a pem into a string value and pass the string to config object `ca` property. Otherwise, it is assumed to be a pem string. **]**

#### complete(message, completeCallback)

**SRS_NODE_INTERNAL_CLIENT_16_016: [** The `complete` method shall throw a `ReferenceError` if the `message` parameter is falsy. **]**
Expand Down
15 changes: 14 additions & 1 deletion device/core/src/internal_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import { Stream } from 'stream';
import { EventEmitter } from 'events';
import * as fs from 'fs';

import * as dbg from 'debug';
const debug = dbg('azure-iot-device:InternalClient');

Expand Down Expand Up @@ -204,11 +206,22 @@ export abstract class InternalClient extends EventEmitter {
setOptions(options: DeviceClientOptions, done?: (err?: Error, result?: results.TransportConfigured) => void): void {
/*Codes_SRS_NODE_INTERNAL_CLIENT_16_042: [The `setOptions` method shall throw a `ReferenceError` if the options object is falsy.]*/
if (!options) throw new ReferenceError('options cannot be falsy.');
let localOptions: DeviceClientOptions = {};
for (let k in options) {
localOptions[k] = options[k];
}

/*Codes_SRS_NODE_INTERNAL_CLIENT_06_001: [The `setOptions` method shall first test if the `ca` property is the name of an already existent file. If so, it will attempt to read that file as a pem into a string value and pass the string to config object `ca` property. Otherwise, it is assumed to be a pem string.] */
if (localOptions.ca) {
if (fs.existsSync(localOptions.ca)) {
localOptions.ca = fs.readFileSync(localOptions.ca, 'utf8');
}
}

// Making this an operation that can be retried because we cannot assume the transport's behavior (whether it's going to disconnect/reconnect, etc).
const retryOp = new RetryOperation(this._retryPolicy, this._maxOperationTimeout);
retryOp.retry((opCallback) => {
this._transport.setOptions(options, opCallback);
this._transport.setOptions(localOptions, opCallback);
}, (err) => {
/*Codes_SRS_NODE_INTERNAL_CLIENT_16_043: [The `done` callback shall be invoked no parameters when it has successfully finished setting the client and/or transport options.]*/
/*Codes_SRS_NODE_INTERNAL_CLIENT_16_044: [The `done` callback shall be invoked with a standard javascript `Error` object and no result object if the client could not be configured as requested.]*/
Expand Down
37 changes: 37 additions & 0 deletions device/core/test/_internal_client_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

var assert = require('chai').assert;
var sinon = require('sinon');
var fs = require('fs');
var EventEmitter = require('events').EventEmitter;
var SimulatedHttp = require('./http_simulated.js');
var FakeTransport = require('./fake_transport.js');
Expand Down Expand Up @@ -163,6 +164,42 @@ var ModuleClient = require('../lib/module_client').ModuleClient;
});
});

describe('#setOptions', function () {
beforeEach(function() {
fs.writeFileSync('aziotfakepemfile', 'ca cert');
});
afterEach(function() {
fs.unlinkSync('aziotfakepemfile');
});

/*Tests_SRS_NODE_INTERNAL_CLIENT_06_001: [The `setOption` method shall first test if the `ca` property is the name of an already existent file. If so, it will attempt to read that file as a pem into a string value and pass the string to config object `ca` property. Otherwise, it is assumed to be a pem string.] */
it('sets CA cert with contents of file if provided', function (testCallback) {
var fakeBaseClient = new FakeTransport();
fakeBaseClient.setOptions = sinon.stub();
var fakeMethodClient = {}
fakeMethodClient.setOptions = sinon.stub();
var client = new ClientCtor(fakeBaseClient);
client._methodClient = fakeMethodClient;
client.setOptions({ ca: 'aziotfakepemfile' });
assert(fakeBaseClient.setOptions.called);
assert.strictEqual(fakeBaseClient.setOptions.firstCall.args[0].ca, 'ca cert');
testCallback();
});

it('sets CA cert with contents of provided string', function (testCallback) {
var fakeBaseClient = new FakeTransport();
fakeBaseClient.setOptions = sinon.stub();
var fakeMethodClient = {}
fakeMethodClient.setOptions = sinon.stub();
var client = new ClientCtor(fakeBaseClient);
client._methodClient = fakeMethodClient;
client.setOptions({ ca: 'ca cert' });
assert(fakeBaseClient.setOptions.called);
assert.strictEqual(fakeBaseClient.setOptions.firstCall.args[0].ca, 'ca cert');
testCallback();
});
});

describe('#open', function () {
/* Tests_SRS_NODE_INTERNAL_CLIENT_12_001: [The open function shall call the transport’s connect function, if it exists.] */
it('calls connect on the transport if the method exists', function (done) {
Expand Down
6 changes: 3 additions & 3 deletions device/transport/amqp/devdoc/device_amqp_requirements.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ The `sendOutputEvent` method sends an event to the IoT Hub as the device indicat

**SRS_NODE_DEVICE_AMQP_18_007: [** The `sendOutputEvent` method shall construct an AMQP request using the message passed in argument as the body of the message. **]**

**SRS_NODE_DEVICE_AMQP_18_012: [** The `sendOutputEvent` method shall set the annotation "x-opt-output-name" on the message to the `outputName`. **]**
**SRS_NODE_DEVICE_AMQP_18_012: [** The `sendOutputEvent` method shall set the application property "iothub-outputname" on the message to the `outputName`. **]**

**SRS_NODE_DEVICE_AMQP_18_008: [** The `sendOutputEvent` method shall call the `done` callback with a null error object and a MessageEnqueued result object when the message has been successfully sent. **]**

Expand Down Expand Up @@ -288,9 +288,9 @@ This method is deprecated. The `AmqpReceiver` object and pattern is going away a

### message events

**SRS_NODE_DEVICE_AMQP_18_013: [** If `amqp` receives a message on the C2D link without an annotation named "x-opt-input-name", it shall emit a "message" event with the message as the event parameter. **]**
**SRS_NODE_DEVICE_AMQP_18_013: [** If `amqp` receives a message on the C2D link, it shall emit a "message" event with the message as the event parameter. **]**

### inputMessage events

**SRS_NODE_DEVICE_AMQP_18_014: [** If `amqp` receives a message on the C2D link with an annotation named "x-opt-input-name", it shall emit an "inputMessage" event with the "x-opt-input-name" annotation as the first parameter and the message as the second parameter. **]**
**SRS_NODE_DEVICE_AMQP_18_014: [** If `amqp` receives a message on the input message link, it shall emit an "inputMessage" event with the value of the annotation property "x-opt-input-name" as the first parameter and the agnostic message as the second parameter. **]**

28 changes: 15 additions & 13 deletions device/transport/amqp/src/amqp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ export class Amqp extends EventEmitter implements DeviceTransport {
private _twinClient: AmqpTwinClient;
private _c2dEndpoint: string;
private _d2cEndpoint: string;
private _messageEventName: string;
private _c2dLink: ReceiverLink;
private _d2cLink: SenderLink;
private _options: DeviceClientOptions;
Expand Down Expand Up @@ -127,11 +128,11 @@ export class Amqp extends EventEmitter implements DeviceTransport {
if (msg.message_annotations) {
inputName = msg.message_annotations['x-opt-input-name'];
}
if (inputName) {
/*Codes_SRS_NODE_DEVICE_AMQP_18_014: [If `amqp` receives a message on the C2D link with an annotation named "x-opt-input-name", it shall emit an "inputMessage" event with the "x-opt-input-name" annotation as the first parameter and the message as the second parameter.]*/
if (this._messageEventName === 'inputMessage') {
/*Codes_SRS_NODE_DEVICE_AMQP_18_014: [If `amqp` receives a message on the input message link, it shall emit an "inputMessage" event with the value of the annotation property "x-opt-input-name" as the first parameter and the agnostic message as the second parameter.]*/
this.emit('inputMessage', inputName, AmqpMessage.toMessage(msg));
} else {
/*Codes_SRS_NODE_DEVICE_AMQP_18_013: [If `amqp` receives a message on the C2D link without an annotation named "x-opt-input-name", it shall emit a "message" event with the message as the event parameter.]*/
/*Codes_SRS_NODE_DEVICE_AMQP_18_013: [If `amqp` receives a message on the C2D link, it shall emit a "message" event with the message as the event parameter.]*/
this.emit('message', AmqpMessage.toMessage(msg));
}
};
Expand Down Expand Up @@ -269,9 +270,11 @@ export class Amqp extends EventEmitter implements DeviceTransport {
if (credentials.moduleId) {
this._c2dEndpoint = endpoint.moduleMessagePath(credentials.deviceId, credentials.moduleId);
this._d2cEndpoint = endpoint.moduleEventPath(credentials.deviceId, credentials.moduleId);
this._messageEventName = 'inputMessage';
} else {
this._c2dEndpoint = endpoint.deviceMessagePath(credentials.deviceId);
this._d2cEndpoint = endpoint.deviceEventPath(credentials.deviceId);
this._messageEventName = 'message';
}

getUserAgentString((userAgentString) => {
Expand All @@ -283,7 +286,7 @@ export class Amqp extends EventEmitter implements DeviceTransport {
/*Codes_SRS_NODE_DEVICE_AMQP_13_002: [ The connect method shall set the CA cert on the options object when calling the underlying connection object's connect method if it was supplied. ]*/
if (this._options && this._options.ca) {
config.sslOptions = config.sslOptions || {};
config.sslOptions.caFile = this._options.ca;
config.sslOptions.ca = this._options.ca;
}
this._amqp.connect(config, (err, connectResult) => {
if (err) {
Expand Down Expand Up @@ -671,11 +674,6 @@ export class Amqp extends EventEmitter implements DeviceTransport {
if (options.hasOwnProperty('cert')) {
if (this._authenticationProvider.type === AuthenticationType.X509) {
(this._authenticationProvider as X509AuthenticationProvider).setX509Options(options);
/*Codes_SRS_NODE_DEVICE_AMQP_06_002: [If `done` has been specified the `setOptions` method shall call the `done` callback with no arguments.]*/
if (done) {
/*Codes_SRS_NODE_DEVICE_AMQP_06_003: [`setOptions` should not throw if `done` has not been specified.]*/
done();
}
} else {
/*Codes_SRS_NODE_DEVICE_AMQP_16_053: [The `setOptions` method shall throw an `InvalidOperationError` if the method is called while using token-based authentication.]*/
throw new errors.InvalidOperationError('cannot set X509 options when using token-based authentication');
Expand All @@ -684,6 +682,10 @@ export class Amqp extends EventEmitter implements DeviceTransport {

/*Codes_SRS_NODE_DEVICE_AMQP_13_001: [ The setOptions method shall save the options passed in. ]*/
this._options = options;
if (done) {
/*Codes_SRS_NODE_DEVICE_AMQP_06_003: [`setOptions` should not throw if `done` has not been specified.]*/
done();
}
}

/**
Expand Down Expand Up @@ -808,11 +810,11 @@ export class Amqp extends EventEmitter implements DeviceTransport {
/*Codes_SRS_NODE_DEVICE_AMQP_18_009: [If `sendOutputEvent` encounters an error before it can send the request, it shall invoke the `done` callback function and pass the standard JavaScript Error object with a text description of the error (err.message).]*/
sendOutputEvent(outputName: string, message: Message, callback: (err?: Error, result?: results.MessageEnqueued) => void): void {
let amqpMessage = AmqpMessage.fromMessage(message);
if (!amqpMessage.message_annotations) {
amqpMessage.message_annotations = {};
if (!amqpMessage.application_properties) {
amqpMessage.application_properties = {};
}
/*Codes_SRS_NODE_DEVICE_AMQP_18_012: [The `sendOutputEvent` method shall set the annotation "x-opt-output-name" on the message to the `outputName`.]*/
amqpMessage.message_annotations['x-opt-output-name'] = outputName;
/*Codes_SRS_NODE_DEVICE_AMQP_18_012: [The `sendOutputEvent` method shall set the application property "iothub-outputname" on the message to the `outputName`.]*/
amqpMessage.application_properties['iothub-outputname'] = outputName;
this._fsm.handle('sendEvent', amqpMessage, callback);
}

Expand Down
7 changes: 5 additions & 2 deletions device/transport/amqp/src/amqp_twin_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -304,12 +304,15 @@ export class AmqpTwinClient extends EventEmitter {
if (this._pendingTwinRequests[message.correlation_id]) {
const pendingRequestCallback = this._pendingTwinRequests[message.correlation_id];
delete this._pendingTwinRequests[message.correlation_id];
if (message.message_annotations.status >= 200 && message.message_annotations.status <= 300) {
if (!message.message_annotations) {
let result = (message.body && message.body.content.length > 0) ? JSON.parse(message.body.content) : undefined;
pendingRequestCallback(null, result);
} else if (message.message_annotations.status >= 200 && message.message_annotations.status <= 300) {
/*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_014: [The `getTwin` method shall parse the body of the received message and call its callback with a `null` error object and the parsed object as a result.]*/
/*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_022: [The `updateTwinReportedProperties` method shall call its callback with no argument when a response is received]*/
/*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_030: [The `enableTwinDesiredPropertiesUpdates` method shall call its callback with no argument when a response is received]*/
/*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_035: [The `disableTwinDesiredPropertiesUpdates` method shall call its callback with no argument when a response is received]*/
let result = message.body ? JSON.parse(message.body.content) : undefined;
let result = (message.body && message.body.content.length > 0) ? JSON.parse(message.body.content) : undefined;
pendingRequestCallback(null, result);
} else {
/*Codes_SRS_NODE_DEVICE_AMQP_TWIN_16_038: [The `getTwin` method shall call its callback with a translated error according to the table described in **SRS_NODE_DEVICE_AMQP_TWIN_16_037** if the `status` message annotation is `> 300`.]*/
Expand Down
Loading

0 comments on commit 38727d4

Please sign in to comment.