Skip to content

Commit

Permalink
* Fix timeout is not a function bug (#71)
Browse files Browse the repository at this point in the history
  • Loading branch information
denyshld authored Jul 6, 2020
1 parent faa0cbd commit 2ce8bcf
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 3 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 2.1.3 (June 5, 2020)

* Fix `timeout is not a function` bug

## 2.1.2 (May 22, 2020)

* Update sailor version to 2.6.7
Expand Down
1 change: 1 addition & 0 deletions lib/actions/write.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ async function ProcessAction(msg, cfg) {
rowCount = 0;
self.logger.trace('Emitting message %j', messageToEmit);
await self.emit('data', messageToEmit);
await init(cfg);
}, TIMEOUT_BETWEEN_EVENTS);

let row = msg.body.writer;
Expand Down
5 changes: 4 additions & 1 deletion lib/actions/writeFromJson.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ async function init(cfg) {
util.addRetryCountInterceptorToAxios(ax);
readyFlag = true;
}
async function ProcessAction(msg) {

async function ProcessAction(msg, cfg) {
// eslint-disable-next-line consistent-this
const self = this;
const { inputObject } = msg.body;
Expand All @@ -83,6 +84,7 @@ async function ProcessAction(msg) {
}

if (timeout) {
this.logger.info('Clearing timeout...');
clearTimeout(timeout);
}

Expand Down Expand Up @@ -114,6 +116,7 @@ async function ProcessAction(msg) {
rowCount = 0;
self.logger.trace('Emitting message %j', messageToEmit);
await self.emit('data', messageToEmit);
await init(cfg);
}, TIMEOUT_BETWEEN_EVENTS);

let row = inputObject;
Expand Down
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "csv-component",
"version": "2.1.2",
"version": "2.1.3",
"description": "CSV Component for elastic.io platform",
"main": "index.js",
"scripts": {
Expand Down
150 changes: 150 additions & 0 deletions spec/writeTimeoutError.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/* eslint-disable no-unused-vars */
const chai = require('chai');
const chaiAsPromised = require('chai-as-promised');
const fs = require('fs');
const nock = require('nock');
const sinon = require('sinon');
const logger = require('@elastic.io/component-logger')();

chai.use(chaiAsPromised);
const { expect } = require('chai');

if (fs.existsSync('.env')) {
// eslint-disable-next-line global-require
require('dotenv').config();
} else {
process.env.ELASTICIO_API_USERNAME = 'name';
process.env.ELASTICIO_API_KEY = 'key';
}

const write = require('../lib/actions/write.js');
const writeFromJson = require('../lib/actions/writeFromJson.js');

// eslint-disable-next-line func-names
describe('CSV Write Timeout', function () {
this.timeout(180000);

let emit;
let cfg;

before(async () => {
nock('https://api.elastic.io', { encodedQueryParams: true })
.post('/v2/resources/storage/signed-url')
.reply(200,
{ put_url: 'https://examlple.mock/putUrl', get_url: 'https://examlple.mock/getUrl' })
.persist();

nock('https://examlple.mock')
.put('/putUrl')
.reply(200, {})
.persist();
});

beforeEach(() => {
emit = sinon.spy();
});

describe('raw', () => {
before(() => {
cfg = {
writer: {
columns: [
{ property: 'header1' },
{ property: 'header2' },
],
},
};
});

it('should write', async () => {
await write.init.call({
logger,
}, cfg);

const msg1 = {
body: {
inputObject: {
ProductKey: 'text11',
CategoryGroup_1: 'text12',
},
},
};

for (let i = 0; i < 3; i += 1) {
// eslint-disable-next-line no-await-in-loop
await write.process.call({
emit,
logger,
}, msg1, cfg);
// eslint-disable-next-line no-await-in-loop
await new Promise(resolve => setTimeout(resolve, 1000));
}
await new Promise(resolve => setTimeout(resolve, 12000));
expect(emit.getCalls().length).to.equal(4);
expect(emit.getCalls().filter(call => call.args[0] === 'data').length).to.equal(1);
for (let i = 0; i < 3; i += 1) {
// eslint-disable-next-line no-await-in-loop
await write.process.call({
emit,
logger,
}, msg1, cfg);
// eslint-disable-next-line no-await-in-loop
await new Promise(resolve => setTimeout(resolve, 1000));
}

await new Promise(resolve => setTimeout(resolve, 12000));
expect(emit.getCalls().length).to.equal(8);
expect(emit.getCalls().filter(call => call.args[0] === 'data').length).to.equal(2);
});
});

describe('From Object', () => {
before(() => {
cfg = {
includeHeaders: 'Yes',
separator: 'semicolon',
};
});

it('should write from object', async () => {
await writeFromJson.init.call({
logger,
}, cfg);

const msg1 = {
body: {
inputObject: {
ProductKey: 'text11',
CategoryGroup_1: 'text12',
},
},
};

for (let i = 0; i < 3; i += 1) {
// eslint-disable-next-line no-await-in-loop
await writeFromJson.process.call({
emit,
logger,
}, msg1, cfg);
// eslint-disable-next-line no-await-in-loop
await new Promise(resolve => setTimeout(resolve, 1000));
}
await new Promise(resolve => setTimeout(resolve, 12000));
expect(emit.getCalls().length).to.equal(4);
expect(emit.getCalls().filter(call => call.args[0] === 'data').length).to.equal(1);
for (let i = 0; i < 3; i += 1) {
// eslint-disable-next-line no-await-in-loop
await writeFromJson.process.call({
emit,
logger,
}, msg1, cfg);
// eslint-disable-next-line no-await-in-loop
await new Promise(resolve => setTimeout(resolve, 1000));
}

await new Promise(resolve => setTimeout(resolve, 12000));
expect(emit.getCalls().length).to.equal(8);
expect(emit.getCalls().filter(call => call.args[0] === 'data').length).to.equal(2);
});
});
});

0 comments on commit 2ce8bcf

Please sign in to comment.