-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
137 lines (113 loc) · 3.16 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
'use strict';
const CronJob = require('cron').CronJob;
const INDEX_TYPES = {
single: null,
daily: '0 0 0 * * *', // Every day at midnight
monthly: '0 0 0 1 * *' // Midnight on the first day of every month
};
exports.getBucket = function (opts) {
opts = opts || {};
const config = opts.config;
const dbName = opts.dbName;
const tableName = opts.tableName;
const indexType = opts.indexType || 'single';
const bulk_timeout = opts.bulk_timeout || 2000;
const bulk_maxSize = opts.bulk_maxSize || 1000;
const bulk_useInterval = opts.bulk_useInterval || true;
const logger = opts.logger || console.log;
if (!config) {
throw new Error('Must provide an "opts.config" object');
}
if (!dbName) {
throw new Error('Must provide an "opts.dbName" string');
}
if (!tableName) {
throw new Error('Must provide an "opts.tableName" string');
}
if (Object.keys(INDEX_TYPES).indexOf(indexType) === -1) {
throw new Error('Invalid "opts.indexType" string; must be one of ' +
JSON.stringify(Object.keys(INDEX_TYPES)));
}
var client;
try {
const r = require('rethinkdbdash')(config);
r.getPoolMaster().on('healthy', function(healthy) {
if (healthy === true) { logger('RethinkDB healthy'); }
else { logger('RethinkDB unhealthy');}
});
r.connect();
} catch(err){
throw new Error(JSON.stringify(err));
}
// Dynamic Client
client = r.db(getCurrentdbName()).table(tableName);
const indexCronTime = INDEX_TYPES[indexType];
if (indexCronTime) {
// Create a new index every so often
new CronJob({
cronTime: indexCronTime,
onTick: function () {
createIndexDb(client,getCurrentdbName(),tableName);
client = r.db(getCurrentdbName()).table(tableName);
},
start: true
});
}
function send(records) {
return client.insert(records).run();
};
// Gets the name of the index that any record created right now should use
function getCurrentdbName() {
const now = new Date();
const year = now.getFullYear();
const month = pad(now.getMonth() + 1);
const date = pad(now.getDate());
switch (indexType) {
case 'single':
return dbName;
case 'monthly':
return `${dbName}_${year}-${month}`;
case 'daily':
return `${dbName}_${year}-${month}-${date}`;
default:
return dbName;
}
function pad(num) {
return num < 10 ? '0' + num : '' + num;
}
}
// Bucket
const bucket_emitter = require('./bulk-emitter');
const bucket = bucket_emitter.create({
timeout: bulk_timeout,
maxSize: bulk_maxSize,
useInterval: bulk_useInterval
});
bucket.on('data', function(data) {
// Bulk ready to emit!
send(data);
}).on('error', function(err) {
throw new Error(err);
});
process.on('beforeExit', function() {
bucket.close(function(leftData) {
send(leftData);
});
});
return bucket;
};
// Creates the necessary Elasticsearch index if it doesn't alreay exist
function createIndexDb(client, dbName, tableName) {
try {
if(dbName) {
client.dbCreate(dbName).run().then(function(result) {
logger(result.tables_created);
if(tableName) { client.db(dbName).tableCreate(tableName).run(); }
});
}
return;
} catch(err) {
logger(err);
throw new Error('Failed initializing DB Tables!', err);
}
}