Skip to content

Commit

Permalink
New: Write CSV attachment from JSON action (#56)
Browse files Browse the repository at this point in the history
* Add "Write CSV attachment from JSON Array" action
* Add "Write CSV attachment from JSON Object" action
* Update sailor version to 2.6.5
  • Loading branch information
kirill-levitskiy authored Apr 17, 2020
1 parent 81ba614 commit 1e74980
Show file tree
Hide file tree
Showing 9 changed files with 638 additions and 30 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## 2.1.0 (May 7, 2020)

* Add "Write CSV attachment from Array" action
* Add "Write CSV attachment from JSON" action
* Update sailor version to 2.6.5

## 2.0.2 (December 24, 2019)

* Update sailor version to 2.5.4
Expand Down
91 changes: 90 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ a `JSON` object. To configure this action the following fields can be used:

![image](https://user-images.githubusercontent.com/40201204/60706373-fda1a380-9f11-11e9-8b5a-2acd2df33a87.png)


### Write CSV attachment

* `Include Header` - this select configures output behavior of the component. If option is `Yes` or no value chosen than header of csv file will be written to attachment, this is default behavior. If value `No` selected than csv header will be omitted from attachment.
Expand Down Expand Up @@ -100,8 +99,98 @@ The output of the CSV Write component will be a message with an attachment. In
order to access this attachment, the component following the CSV Write must be
able to handle file attachments.

### Write CSV attachment from JSON Object

* `Include Header` - this select configures output behavior of the component. If option is `Yes` or no value chosen than header of csv file will be written to attachment, this is default behavior. If value `No` selected than csv header will be omitted from attachment.
* `Separator` - this select configures type of CSV delimiter in an output file. There are next options: `Comma (,)`, `Semicolon (;)`, `Space ( )`, `Tab (\t)`.

This action will combine multiple incoming events into a CSV file until there is a gap
of more than 10 seconds between events. Afterwards, the CSV file will be closed
and attached to the outgoing message.

This action will convert an incoming array into a CSV file by following approach:

* Header inherits names of keys from the input message;
* Payload will store data from Values of relevant Keys (Columns);
* Undefined values of a JSON Object won't be joined to result set (`{ key: undefined }`);
* False values of a JSON Object will be represented as empty string (`{ key: false }` => `""`).

Requirements:

* The inbound message is an JSON Object;
* This JSON object has plain structure without nested levels (structured types `objects` and `arrays` are not supported as values). Only primitive types are supported: `strings`, `numbers`, `booleans` and `null`. Otherwise, the error message will be thrown: `Inbound message should be a plain Object. At least one of entries is not a primitive type`.

The keys of an input JSON will be published as the header in the first row. For each incoming
event, the value for each header will be `stringified` and written as the value
for that cell. All other properties will be ignored. For example, headers
`foo,bar` along with the following JSON events:

```
{"foo":"myfoo", "bar":"mybar"}
{"foo":"myfoo", "bar":[1,2]}
{"bar":"mybar", "baz":"mybaz"}
```

will produce the following `.csv` file:

```
foo,bar
myfoo,mybar
myfoo,"[1,2]"
,mybar
```

The output of the CSV Write component will be a message with an attachment. In
order to access this attachment, the component following the CSV Write must be
able to handle file attachments.

### Write CSV attachment from JSON Array

* `Include Header` - this select configures output behavior of the component. If option is `Yes` or no value chosen than header of csv file will be written to attachment, this is default behavior. If value `No` selected than csv header will be omitted from attachment.
* `Separator` - this select configures type of CSV delimiter in an output file. There are next options: `Comma (,)`, `Semicolon (;)`, `Space ( )`, `Tab (\t)`.

This action will convert an incoming array into a CSV file by following approach:

* Header inherits names of keys from the input message;
* Payload will store data from Values of relevant Keys (Columns);
* Undefined values of a JSON Object won't be joined to result set (`{ key: undefined }`);
* False values of a JSON Object will be represented as empty string (`{ key: false }` => `""`).

Requirements:

* The inbound message is an JSON Array of Objects with identical structure;
* Each JSON object has plain structure without nested levels (structured types `objects` and `arrays` are not supported as values). Only primitive types are supported: `strings`, `numbers`, `booleans` and `null`. Otherwise, the error message will be thrown: `Inbound message should be a plain Object. At least one of entries is not a primitive type`.

The keys of an input JSON will be published as the header in the first row. For each incoming
event, the value for each header will be `stringified` and written as the value
for that cell. All other properties will be ignored. For example, headers
`foo,bar` along with the following JSON events:

```
[
{"foo":"myfoo", "bar":"mybar"}
{"foo":"myfoo", "bar":[1,2]}
{"bar":"mybar", "baz":"mybaz"}
]
```

will produce the following `.csv` file:

```
foo,bar
myfoo,mybar
myfoo2,[1,2]"
,mybar
```

The output of the CSV Write component will be a message with an attachment. In
order to access this attachment, the component following the CSV Write must be
able to handle file attachments.

### Limitations

#### General

1. You may get `Component run out of memory and terminated.` error during run-time, that means that component needs more memory, please add
`EIO_REQUIRED_RAM_MB` environment variable with an appropriate value (e.g. value `512` means that 512 MB will be allocated) for the component in this case.
2. You may get `Error: write after end` error, as a current workaround try increase value of environment variable: `TIMEOUT_BETWEEN_EVENTS`.
Expand Down
116 changes: 104 additions & 12 deletions component.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@
"title": "CSV",
"description": "A comma-separated values (CSV) file stores tabular data (numbers and text) in plain-text form",
"docsUrl": "https://github.com/elasticio/csv-component",
"buildType" : "docker",
"buildType": "docker",
"triggers": {
"read": {
"main": "./lib/triggers/read.js",
"title": "Read CSV file from URL",
"help": {
"description": "Fetch a CSV file from a given URL and store it in the attachment storage.",
"link": "/components/csv/index.html#read-csv-file-from-url"
},
"type": "polling",
"fields": {
"url": {
Expand All @@ -25,10 +29,14 @@
}
}
},
"actions" : {
"actions": {
"read_action": {
"main": "./lib/triggers/read.js",
"title": "Read CSV attachment",
"help": {
"description": "Read a CSV attachment of an incoming message.",
"link": "/components/csv/index.html#read-csv-attachment"
},
"fields": {
"emitAll": {
"label": "Emit all messages",
Expand All @@ -44,19 +52,21 @@
}
},
"write_attachment": {
"description":
"Multiple incoming events can be combined into one CSV file with the write CSV action. See https://github.com/elasticio/csv-component/ for additional documentation.",
"main": "./lib/actions/write.js",
"title": "Write CSV attachment",
"help": {
"description": "Multiple incoming events can be combined into one CSV file with the write CSV action.",
"link": "/components/csv/index.html#write-csv-attachment"
},
"fields": {
"includeHeaders": {
"label" : "Include Headers",
"label": "Include Headers",
"required": false,
"viewClass" : "SelectView",
"description" : "Default Yes",
"viewClass": "SelectView",
"description": "Default Yes",
"model": {
"Yes" : "Yes",
"No" : "No"
"Yes": "Yes",
"No": "No"
},
"prompt": "Include headers? Default Yes."
},
Expand All @@ -66,11 +76,93 @@
},
"metadata": {
"in": {
"type": "object",
"properties": {}
"type": "object",
"properties": {}
},
"out": {}
}
},
"write_attachment_from_json": {
"main": "./lib/actions/writeFromJson.js",
"title": "Write CSV attachment from JSON Object",
"help": {
"description": "Multiple incoming events can be combined into one CSV file with the write CSV action.",
"link": "/components/csv/index.html#write-csv-attachment-from-json"
},
"fields": {
"includeHeaders": {
"label": "Include Headers",
"required": true,
"viewClass": "SelectView",
"description": "Default Yes",
"model": {
"Yes": "Yes",
"No": "No"
},
"prompt": "Include headers? Default Yes"
},
"separator": {
"label": "Separators",
"required": true,
"viewClass": "SelectView",
"description": "Default Yes",
"model": {
"comma": "Comma (,)",
"semicolon": "Semicolon (;)",
"space": "Space ( )",
"tab": "Tab (\\t)"
},
"prompt": "Choose required CSV delimiter"
}
},
"metadata": {
"in": {
"type": "object",
"properties": {}
},
"out": {}
}
},
"write_attachment_from_array": {
"main": "./lib/actions/writeFromArray.js",
"title": "Write CSV attachment from JSON Array",
"help": {
"description": "Incoming array can be converted into one CSV file with the write CSV action.",
"link": "/components/csv/index.html#write-csv-attachment-from-array"
},
"fields": {
"includeHeaders": {
"label": "Include Headers",
"required": true,
"viewClass": "SelectView",
"description": "Default Yes",
"model": {
"Yes": "Yes",
"No": "No"
},
"prompt": "Include headers? Default Yes"
},
"separator": {
"label": "Separators",
"required": true,
"viewClass": "SelectView",
"description": "Default Yes",
"model": {
"comma": "Comma (,)",
"semicolon": "Semicolon (;)",
"space": "Space ( )",
"tab": "Tab (\\t)"
},
"prompt": "Choose required CSV delimiter"
}
},
"metadata": {
"in": {
"type": "array",
"properties": {}
},
"out": {}
}
}
}
}
}
119 changes: 119 additions & 0 deletions lib/actions/writeFromArray.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
const axios = require('axios');
const csv = require('csv');
const _ = require('lodash');
const { messages } = require('elasticio-node');
const client = require('elasticio-rest-node')();
const logger = require('@elastic.io/component-logger')();

const util = require('../util/util');

const REQUEST_TIMEOUT = process.env.REQUEST_TIMEOUT || 10000; // 10s
const REQUEST_MAX_RETRY = process.env.REQUEST_MAX_RETRY || 7;
const REQUEST_RETRY_DELAY = process.env.REQUEST_RETRY_DELAY || 7000; // 7s
const REQUEST_MAX_CONTENT_LENGTH = process.env.REQUEST_MAX_CONTENT_LENGTH || 10485760; // 10MB

let stringifier;
let signedUrl;
let rowCount = 0;
let ax;
let putUrl;
let options;

async function init(cfg) {
let delimiter;
switch (cfg.separator) {
case 'comma': {
delimiter = ',';
break;
}
case 'semicolon': {
delimiter = ';';
break;
}
case 'space': {
delimiter = ' ';
break;
}
case 'tab': {
delimiter = '\t';
break;
}
default: {
throw Error(`Unexpected separator type: ${cfg.separator}`);
}
}
const header = cfg.includeHeaders !== 'No';
logger.trace('Using delimiter: \'%s\'', delimiter);
options = {
header,
delimiter,
};

stringifier = csv.stringify(options);
signedUrl = await client.resources.storage.createSignedUrl();
putUrl = signedUrl.put_url;
logger.trace('CSV file to be uploaded file to uri=%s', putUrl);
ax = axios.create();
util.addRetryCountInterceptorToAxios(ax);
}
async function ProcessAction(msg) {
// eslint-disable-next-line consistent-this
const self = this;
let isError = false;
let errorValue = '';

const columns = Object.keys(msg.body[0]);
rowCount = msg.body.length;
logger.trace('Configured column names:', columns);
let row = {};

await _.each(msg.body, async (item) => {
const entries = Object.values(msg.body);
// eslint-disable-next-line no-restricted-syntax
for (const entry of entries) {
if (isError) {
break;
}
const values = Object.values(entry);
// eslint-disable-next-line no-restricted-syntax
for (const value of values) {
if (value !== null && value !== undefined && (typeof value === 'object' || Array.isArray(value))) {
isError = true;
errorValue = value;
break;
}
}
}
row = _.pick(item, columns);
await stringifier.write(row);
});
self.logger.info('The resulting CSV file contains %s rows', rowCount);

if (isError) {
throw Error(`Inbound message should be a plain Object. At least one of entries is not a primitive type: ${JSON.stringify(errorValue)}`);
}

ax.put(putUrl, stringifier, {
method: 'PUT',
timeout: REQUEST_TIMEOUT,
retry: REQUEST_MAX_RETRY,
delay: REQUEST_RETRY_DELAY,
maxContentLength: REQUEST_MAX_CONTENT_LENGTH,
});
stringifier.end();

const messageToEmit = messages.newMessageWithBody({
rowCount,
});
const fileName = `${messageToEmit.id}.csv`;
messageToEmit.attachments[fileName] = {
'content-type': 'text/csv',
url: signedUrl.get_url,
};
self.logger.trace('Emitting message %j', messageToEmit);
await self.emit('data', messageToEmit);
await self.emit('end');
}

exports.process = ProcessAction;
exports.init = init;
Loading

0 comments on commit 1e74980

Please sign in to comment.