Skip to content

Commit

Permalink
Update sailor version, build type, loggger and fix path to read action (
Browse files Browse the repository at this point in the history
#53)

* Update sailor version, build type, logger and fix path to read action
* Update readme
  • Loading branch information
uaArsen authored and stas-fomenko committed Dec 27, 2019
1 parent a17fd46 commit 81ba614
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 142 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## 2.0.2 (December 24, 2019)

* Update sailor version to 2.5.4
* Update component to use logger
* Update buildType to docker
* Fixed bug with invalid path to read action

## 2.0.1 (October 10, 2019)

* Hotfix Action path
Expand Down
16 changes: 9 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ attachment. It can also write a CSV file from the incoming events.
## Requirements

## Environment variables

1. `EIO_REQUIRED_RAM_MB` - recommended value of allocated memory is `512` MB
2. `REQUEST_TIMEOUT` - HTTP request timeout in milliseconds, default value 10000
3. `REQUEST_RETRY_DELAY` - delay between retry attempts in milliseconds, default value 7000
4. `REQUEST_MAX_RETRY` - number of HTTP request retry attempts, default value 7
5. `REQUEST_MAX_CONTENT_LENGTH` - max size of http request in bytes, default value: 10485760
6. `TIMEOUT_BETWEEN_EVENTS` - number of milliseconds write action wait before creating separate attachments, default value: 10000
Name|Mandatory|Description|Values|
|----|---------|-----------|------|
|EIO_REQUIRED_RAM_MB| false | Value of allocated memory to component | Recommended: 512 |
|REQUEST_TIMEOUT| false | HTTP request timeout in milliseconds | Default value: 10000 |
|REQUEST_RETRY_DELAY| false | Delay between retry attempts in milliseconds | Default value: 7000 |
|REQUEST_MAX_RETRY| false | Number of HTTP request retry attempts | Default value: 7 |
|REQUEST_MAX_CONTENT_LENGTH| false | Max size of http request in bytes | Default value: 10485760 |
|TIMEOUT_BETWEEN_EVENTS| false | Number of milliseconds write action wait before creating separate attachments | Default value: 10000 |
|LOG_LEVEL| false | Level of logger verbosity | trace, debug, info, warning, error Default: info |

## Credentials

Expand Down
3 changes: 2 additions & 1 deletion component.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"title": "CSV",
"description": "A comma-separated values (CSV) file stores tabular data (numbers and text) in plain-text form",
"docsUrl": "https://github.com/elasticio/csv-component",
"buildType" : "docker",
"triggers": {
"read": {
"main": "./lib/triggers/read.js",
Expand All @@ -26,7 +27,7 @@
},
"actions" : {
"read_action": {
"main": "./lib/actions/write.js",
"main": "./lib/triggers/read.js",
"title": "Read CSV attachment",
"fields": {
"emitAll": {
Expand Down
29 changes: 16 additions & 13 deletions lib/actions/write.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ const csv = require('csv');
const _ = require('lodash');
const { messages } = require('elasticio-node');
const client = require('elasticio-rest-node')();
const logger = require('@elastic.io/component-logger')();

const util = require('../util/util');

Expand All @@ -21,10 +22,10 @@ let putUrl;

let readyFlag = false;

exports.init = async function init(cfg) {
async function init(cfg) {
const delimiter = cfg.writer.separator || ',';
const header = cfg.includeHeaders !== 'No';
console.log('Using delimiter: \'%s\'', delimiter);
logger.trace('Using delimiter: \'%s\'', delimiter);
const options = {
header,
delimiter,
Expand All @@ -33,20 +34,19 @@ exports.init = async function init(cfg) {
if (cfg.writer.columns) {
const columns = Object.keys(_.keyBy(cfg.writer.columns, 'property'));

console.log('Configured column names:', columns);
logger.trace('Configured column names:', columns);
options.columns = columns;
}

stringifier = csv.stringify(options);
signedUrl = await client.resources.storage.createSignedUrl();
putUrl = signedUrl.put_url;
console.log('CSV file to be uploaded file to uri=%s', putUrl);
logger.trace('CSV file to be uploaded file to uri=%s', putUrl);
ax = axios.create();
util.addRetryCountInterceptorToAxios(ax);
readyFlag = true;
};

exports.process = async function ProcessAction(msg, cfg) {
}
async function ProcessAction(msg, cfg) {
// eslint-disable-next-line consistent-this
const self = this;

Expand All @@ -62,10 +62,10 @@ exports.process = async function ProcessAction(msg, cfg) {
timeout = setTimeout(async () => {
readyFlag = false;

console.log('Closing the stream due to inactivity');
self.logger.info('Closing the stream due to inactivity');

const finalRowCount = rowCount;
console.log('The resulting CSV file contains %s rows', finalRowCount);
self.logger.info('The resulting CSV file contains %s rows', finalRowCount);
ax.put(putUrl, stringifier, {
method: 'PUT',
timeout: REQUEST_TIMEOUT,
Expand All @@ -85,19 +85,22 @@ exports.process = async function ProcessAction(msg, cfg) {
};
signedUrl = null;
rowCount = 0;
console.log('Emitting message %j', messageToEmit);
self.logger.trace('Emitting message %j', messageToEmit);
await self.emit('data', messageToEmit);
}, TIMEOUT_BETWEEN_EVENTS);

let row = msg.body.writer;
console.log(`Incoming data: ${JSON.stringify(row)}`);
self.logger.trace(`Incoming data: ${JSON.stringify(row)}`);
if (cfg.writer.columns) {
const columns = Object.keys(_.keyBy(cfg.writer.columns, 'property'));
row = _.pick(row, columns);
}
console.log(`Writing Row: ${JSON.stringify(row)}`);
self.logger.trace(`Writing Row: ${JSON.stringify(row)}`);
stringifier.write(row);
rowCount += 1;

await self.emit('end');
};
}

exports.process = ProcessAction;
exports.init = init;
28 changes: 13 additions & 15 deletions lib/triggers/read.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@ const _ = require('underscore');
const csv = require('csv');
const { messages } = require('elasticio-node');
const moment = require('moment');
const debug = require('debug')('csv');
const axios = require('axios');
const { Writable } = require('stream');
const util = require('../util/util');

const REQUEST_TIMEOUT = process.env.REQUEST_TIMEOUT || 10000; // ms
const REQUEST_MAX_RETRY = process.env.REQUEST_MAX_RETRY || 7;
const REQUEST_RETRY_DELAY = process.env.REQUEST_RETRY_DELAY || 7000; // ms

const formatters = {
date: (value, col) => moment(value, col.format).toDate(),
number: (value, col) => {
Expand Down Expand Up @@ -64,16 +62,15 @@ async function ProcessRead(msg, cfg) {
let index = 0;
const separator = cfg.reader ? cfg.reader.separator || ',' : ',';
const startRow = cfg.reader ? cfg.reader.startRow || 0 : 0;

console.log('Incoming message is %j', msg);
that.logger.trace('Incoming message is %j', msg);
if (!csvURL || csvURL.length === 0) {
// Now let's check for the attachment
if (msg && msg.attachments && Object.keys(msg.attachments).length > 0) {
const key = Object.keys(msg.attachments)[0];
console.log('Found attachment key=%s attachment=%j', key, msg.attachments[key]);
that.logger.trace('Found attachment key=%s attachment=%j', key, msg.attachments[key]);
csvURL = msg.attachments[key].url;
} else {
console.error('URL of the CSV is missing');
that.logger.error('URL of the CSV is missing');
that.emit('error', 'URL of the CSV is missing');
return that.emit('end');
}
Expand All @@ -91,18 +88,18 @@ async function ProcessRead(msg, cfg) {
class CsvWriter extends Writable {
async write(chunk, encoding, callback) {
parser.pause();
debug('Processing %d row...', index);
debug('Memory usage: %d Mb', process.memoryUsage().heapUsed / 1024 / 1024);
this.logger.debug('Processing %d row...', index);
this.logger.debug('Memory usage: %d Mb', process.memoryUsage().heapUsed / 1024 / 1024);
if (index >= startRow) {
const msg = createRowMessage(chunk, cfg.reader.columns);
if (cfg.emitAll) {
debug('Row #%s added to result array', index);
this.logger.debug('Row #%s added to result array', index);
outputMsg.result.push(msg.body);
} else {
await that.emit('data', msg);
}
} else {
debug('Row #%s is skipped based on configuration', index);
this.logger.debug('Row #%s is skipped based on configuration', index);
}
index += 1;
parser.resume();
Expand All @@ -112,18 +109,19 @@ async function ProcessRead(msg, cfg) {
if (cfg.emitAll) {
await that.emit('data', messages.newMessageWithBody(outputMsg));
}
debug('Processing csv writer end event...');
debug('Memory usage: %d Mb', process.memoryUsage().heapUsed / 1024 / 1024);
this.logger.debug('Processing csv writer end event...');
this.logger.debug('Memory usage: %d Mb', process.memoryUsage().heapUsed / 1024 / 1024);

debug(`Number of lines: ${index}`);
this.logger.debug(`Number of lines: ${index}`);
await that.emit('end');
ended = true;
}
}

const writer = new CsvWriter();
writer.logger = that.logger;

debug('Sending GET request to url=%s', csvURL);
that.logger.debug('Sending GET request to url=%s', csvURL);
const ax = axios.create();
util.addRetryCountInterceptorToAxios(ax);
const response = await ax({
Expand All @@ -134,7 +132,7 @@ async function ProcessRead(msg, cfg) {
retry: REQUEST_MAX_RETRY,
delay: REQUEST_RETRY_DELAY,
});
debug('Have got response status=%s headers=%j', response.status, response.headers);
that.logger.debug('Have got response status=%s headers=%j', response.status, response.headers);
if (response.status !== 200) {
await that.emit('error', `Unexpected response code code=${response.status}`);
ended = true;
Expand Down
Loading

0 comments on commit 81ba614

Please sign in to comment.