Skip to content

Commit

Permalink
Merge branch 'stage-019-esFormat' into stage-091-datasetsRucio
Browse files Browse the repository at this point in the history
Stage 019 transformation

Now it can accept data not only from Stage 016,
but also from 091 (see #79) -- or any other stage,
that produces data in the required format.

---
This merge overrides merge of PR #86
  • Loading branch information
mgolosova committed Nov 29, 2017
2 parents bd97f27 + 60163d0 commit 1871690
Show file tree
Hide file tree
Showing 18 changed files with 1,186 additions and 907 deletions.
472 changes: 236 additions & 236 deletions Utils/Dataflow/016_task2es/output/sample.ndjson

Large diffs are not rendered by default.

23 changes: 23 additions & 0 deletions Utils/Dataflow/016_task2es/task2es.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,25 @@
sys.stderr.write("(ERROR) Failed to import pyDKB library: %s\n" % err)
sys.exit(1)

def add_es_index_info(data):
""" Update data with required for ES indexing info.
Add fields:
_id => taskid
_type => 'task'
Return value:
False -- update failed, skip the record
True -- update successful
"""
if type(data) is not dict:
return False
if not data.get('taskid'):
return False
data['_id'] = data['taskid']
data['_type'] = 'task'
return True

def get_category(row):
"""
Each task can be associated with a number of Physics Categories.
Expand Down Expand Up @@ -97,6 +116,10 @@ def process(stage, message):
hashtags = hashtags.lower().split(',')
data['hashtag_list'] = [x.strip() for x in hashtags]
data['phys_category'] = get_category(data)
if not add_es_index_info(data):
sys.stderr.write("(WARN) Skip message (not enough info"
" for ES indexing.\n")
return True
out_message = JSONMessage(data)
stage.output(out_message)
return True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,22 @@

Description
-----------
Prepare data, taken by Oracle Connector, before uploading to ElasticSearch:
Prepare data before uploading to ElasticSearch:
* turn JSON keys to lower case
* generate JSON with index information

Use ./run.sh to run stage with common configuration from
/Utils/Elasticsearch/config/es

Input
-----
...comes from Oracle Connector: JSON documents, one document -- one line.
Expects data in JSON format with special fields.
Required:
* '_id' (ES record identifier);
* '_type' (ES record type).

Optional:
* '_parent' (ES identifier for parent type record).

Output
------
Expand Down
91 changes: 91 additions & 0 deletions Utils/Dataflow/019_esFormat/esFormat.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#!/usr/bin/env php
<?php
function exception_error_handler($errno, $errstr, $errfile, $errline ) {
throw new ErrorException($errstr, 0, $errno, $errfile, $errline);
}
set_error_handler("exception_error_handler");

$DEFAULT_INDEX = 'prodsys';
$ES_INDEX = NULL;

function check_input($row) {
$required_fields = array('_id', '_type');

if (!is_array($row)) {
fwrite(STDERR, "(WARN) Failed to decode message.\n");
return FALSE;
}

foreach ($required_fields as $field) {
if (!(isset($row[$field]))) {
fwrite(STDERR, "(WARN) Required field \"$field\" is not set or empty.\n");
return FALSE;
}
}

return TRUE;
}

function convertIndexToLowerCase(&$a) {
$result = array();

foreach (array_keys($a) as $i) {
$result[strtolower($i)] = $a[$i];
}

$a = $result;
}

function constructIndexJson(&$row) {
global $ES_INDEX;
$index = Array(
'index' => Array(
'_index' => $ES_INDEX,
'_type' => $row['_type'],
'_id' => $row['_id'],
)
);

if (isset($row['_parent'])) {
$index['index']['_parent'] = $row['_parent'];
}

foreach ($index['index'] as $key => $val) {
unset($row[$key]);
}

return $index;
}

if (isset($argv[1])) {
$h = fopen($argv[1], "r");
} else {
$h = fopen('php://stdin', 'r');
}

$ES_INDEX = getenv('ES_INDEX');
if (!$ES_INDEX) {
$ES_INDEX = $DEFAULT_INDEX;
}

if ($h) {
while (($line = fgets($h)) !== false) {
$row = json_decode($line,true);

if (!check_input($row)) {
fwrite(STDERR, "(WARN) Skipping message (\"".substr($line, 0, 1000)."\").\n");
continue;
}

convertIndexToLowerCase($row);

$index = constructIndexJson($row);

echo json_encode($index)."\n";
echo json_encode($row)."\n";
}
}

fclose($h);

?>
1 change: 1 addition & 0 deletions Utils/Dataflow/019_esFormat/input/sample.016.ndjson
1 change: 1 addition & 0 deletions Utils/Dataflow/019_esFormat/input/sample.091.ndjson
250 changes: 250 additions & 0 deletions Utils/Dataflow/019_esFormat/output/sample.016.ndjson

Large diffs are not rendered by default.

352 changes: 352 additions & 0 deletions Utils/Dataflow/019_esFormat/output/sample.091.ndjson

Large diffs are not rendered by default.

15 changes: 15 additions & 0 deletions Utils/Dataflow/019_esFormat/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#/usr//bin/env bash

base_dir=$(cd "$(dirname "$(readlink -f "$0")")"; pwd)

ES_CONFIG=$base_dir/../../Elasticsearch/config/es

[ -r "$ES_CONFIG" ] \
|| { echo "Can't access configuration file: $ES_CONFIG" >&2 \
&& exit 1; }

set -a
. "$ES_CONFIG"
set +a

$base_dir/esFormat.php
1 change: 0 additions & 1 deletion Utils/Dataflow/019_oracle2esFormat/input

This file was deleted.

65 changes: 0 additions & 65 deletions Utils/Dataflow/019_oracle2esFormat/oracle2es.php

This file was deleted.

Loading

0 comments on commit 1871690

Please sign in to comment.