-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathindex.js
1603 lines (1347 loc) · 62.6 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/**
* Copyright (C) 2019-present FreeTrade
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License v3.0
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see
* <https://www.gnu.org/licenses/agpl-3.0.en.html>.
*
*/
'use strict';
var run = async function () {
//Version 5.0.8
var version="5.0.8";
var versionCodename="Engelmann";
console.log("Stating Member Server v"+version+" ("+versionCodename+")");
{//App includes
try {
var config = require(process.cwd() + '/../memberprivateconfig.js');
} catch (e) {
var config = require(process.cwd() + '/config.js');
}
var sqlforaction = require('./sqlforaction.js');
var dbqueries = require('./dbqueries.js');
var dbhandler = require('./dbhandler.js');
var richlist = require('./balances.js');
var RpcClient = require('./rpcclient.js');
}
{//External libs
var fs = require('fs');
var bitcoinJs = require('bitcoinjs-lib');
var request = require('request');
}
{//Configuration settings
var secondsToWaitonStart = config.secondsToWaitonStart;
var secondsToWaitBetweenProcessingBlocks = config.secondsToWaitBetweenProcessingBlocks;
var secondsToWaitBetweenPollingNextBlock = config.secondsToWaitBetweenPollingNextBlock;
var secondsToWaitBetweenPollingMemPool = config.secondsToWaitBetweenPollingMemPool;
var secondsToWaitBetweenErrorOnBlocks = 1; //todo make configurable
var rpcconfig = config.rpcconfig;
var dbconfig = config.dbconfig;
var dbname = dbconfig.database;
var overrideStartBlock = config.startBlock;
var bchdhost = config.bchdhost;
var bchdgrpcenabled = config.bchdgrpcenabled;
var acceptmaxtrxsize = config.acceptmaxtrxsize;
var httpserverenabled = config.httpserverenabled;
var httpsserverenabled = config.httpsserverenabled;
var httpport = config.httpport;
var httpsport = config.httpsport;
var keypem = config.keypem;
var certpem = config.certpem;
var usesqlite = config.usesqlite;
var sqldbfile = config.sqldbfile;
var AccessControlAllowOrigin = config.AccessControlAllowOrigin;
var pathtoindex = config.pathtoindex;
var bchdcertpem = config.bchdcertpem;
var profilepicpath = config.profilepicpath;
var querymemoformissingpics = config.querymemoformissingpics;
var debug = config.debug;
var allowpragmajournalmode = config.allowpragmajournalmode;
var batchsqlonsynccount = config.batchsqlonsynccount;
var rebuildFromDBTransactions = config.rebuildFromDBTransactions;
var actionTypes = config.actionTypes;
var rebuildPauseBetweenCalls = config.rebuildPauseBetweenCalls;
var completeDBrebuild = config.completeDBrebuild;
var downloadprofilepics = config.downloadprofilepics;
var reimportTransaction = config.reimportTransaction;
var keepThreadNotificationsTime = config.keepThreadNotificationsTime;
var vapidPublicKey = config.vapidPublicKey;
var vapidPrivateKey = config.vapidPrivateKey;
var vapidEmail = config.vapidEmail;
var pushnotificationserver = config.pushnotificationserver;
var keepNotificationsTime = config.keepNotificationsTime;
var tokenbalanceserver = config.tokenbalanceserver;
var tokenbalanceupdateinterval = config.tokenbalanceupdateinterval;
var useServerWallets = config.useServerWallets;
var dbHouseKeepingOperationInterval = config.dbHouseKeepingOperationInterval;
var backuputxoserver = config.backuputxoserver;
}
{//Conditionally included libs
if (usesqlite) {
var sqlite = require('sqlite-async');
}
else {
var mysql = require('mysql');
var util = require('util'); //for promisify
}
if (bchdgrpcenabled) {
var grpc = require('grpc');
var protoLoader = require('@grpc/proto-loader');
}
if (httpserverenabled || httpsserverenabled) {
var url = require('url');
var querystring = require('querystring');
if (httpserverenabled) {
var http = require('http');
}
if (httpsserverenabled) {
var https = require('https');
}
}
if (pushnotificationserver) {
var webpush = require('web-push'); //For notifications
const vapidKeys = { publicKey: vapidPublicKey, privateKey: vapidPrivateKey };
webpush.setVapidDetails(vapidEmail, vapidKeys.publicKey, vapidKeys.privateKey);
}
}
//local vars
{
//rpc to get blocks, trxs from Bitcoin
var rpc = new RpcClient(rpcconfig);
var currentBlock = 0;
var lastBlockSuccessfullyProcessed = 0;
var mempoolprocessingstarted = false;
var memotxidsalreadyprocessed = [];
//sql
var dbpoolapp;//database connection pool for essential app functions
//var blockConnection; //database connection for processing blocks
//var mempoolConnection; //database connection for processing mempool
//var logConnection; //database connection for writing long query logs
//var rlConnection; //database connection for writing rl
var dbpoolService;//database connection pool for web requests
var lastblocktimestamp = 0;
var dbtype = 'sqlite';
//Housekeeping on DB
{
var lastExpensiveSQLHousekeepingOperation = 0;
var expensiveHousekeepingSQLOperations = [];
//Different timestamp formats for different databases
if (usesqlite) {
var timestampSQL = "strftime('%s', 'now')";
var escapeFunction = function (s) { s = s + ""; s = s.replace(/'/g, "''"); return "'" + s + "'"; }
var insertignore = "INSERT OR IGNORE ";
var onConflictAddress = " ON CONFLICT(address) DO UPDATE SET ";
} else {
var timestampSQL = "UNIX_TIMESTAMP()";
var escapeFunction = mysql.escape;
var insertignore = "INSERT IGNORE ";
var onConflictAddress = " ON DUPLICATE KEY UPDATE ";
}
expensiveHousekeepingSQLOperations.push(`DELETE from notifications where notifications.time<` + timestampSQL + `-` + config.keepNotificationsTime + `;`);
expensiveHousekeepingSQLOperations.push(`DELETE from notifications where notifications.time<` + timestampSQL + `-` + config.keepThreadNotificationsTime + ` and notifications.type='thread';`);
//easier to add all notifications and just delete self notifications later
expensiveHousekeepingSQLOperations.push(`DELETE from notifications where origin=address;`);
//It's bad to have too many null roottxids - slows down fixorphan queries
//Give it 24 hours to find root tx
//After 48 hours make orphans their own root trxs.
expensiveHousekeepingSQLOperations.push(`UPDATE messages SET roottxid=txid WHERE roottxid IS NULL AND retxid ='' AND ` + timestampSQL + `-firstseen > 60*60*24;`, `UPDATE messages SET roottxid=txid WHERE roottxid IS NULL AND ` + timestampSQL + `-firstseen > 60*60*48;`);
//Recreate topics table - null topic was returning null timestamp, so added a clause to address this
//Need the null topic to return to allow moderator functions on sitewide bases
expensiveHousekeepingSQLOperations.push(`DROP TABLE IF EXISTS topics;`);
if (usesqlite) {
expensiveHousekeepingSQLOperations.push(`CREATE TABLE topics (topic VARCHAR(220), messagescount int(9), mostrecent int(11), subscount mediumint(9));`);
expensiveHousekeepingSQLOperations.push(`INSERT into topics SELECT * FROM (SELECT IFNULL(messages.topic,''), COUNT(*) as messagescount, IFNULL(recent.latest,` + timestampSQL + `) as mostrecent, IFNULL(subs.subscount,0) FROM messages LEFT JOIN (SELECT messages.topic, MAX(firstseen) as latest FROM messages GROUP BY messages.topic) recent on messages.topic = recent.topic LEFT JOIN (SELECT count(*) as subscount, subs.topic FROM subs GROUP BY subs.topic) subs on subs.topic=messages.topic GROUP BY subs.topic) as fulltable WHERE messagescount>3 AND mostrecent>` + timestampSQL + `-30*24*60*60;`);
} else {
expensiveHousekeepingSQLOperations.push(`CREATE TABLE topics (topic VARCHAR(220) CHARACTER SET utf8mb4, messagescount int(9), mostrecent int(11), subscount mediumint(9)) SELECT * FROM (SELECT messages.topic, COUNT(*) as messagescount, IFNULL(recent.latest,` + timestampSQL + `) as mostrecent, IFNULL(subs.subscount,0) as subscount FROM messages LEFT JOIN (SELECT topic, MAX(firstseen) as latest FROM messages GROUP BY topic) recent on messages.topic = recent.topic LEFT JOIN (SELECT count(*) as subscount, topic FROM subs GROUP BY topic) subs on subs.topic=messages.topic GROUP BY topic) as fulltable WHERE messagescount>3 AND mostrecent>` + timestampSQL + `-30*24*60*60;`);
}
//This is a lighter housekeeping operation, can run frequently, ensure messages are linked back to their root disscussion topic
var fixOrphanMessages = "UPDATE messages JOIN messages parent ON messages.retxid=parent.txid SET messages.roottxid = parent.roottxid, messages.topic = parent.topic WHERE messages.roottxid = '';";
var fixOrphanMessages2 = "UPDATE privatemessages JOIN privatemessages parent ON privatemessages.retxid=parent.txid SET privatemessages.roottxid = parent.roottxid, privatemessages.toaddress = parent.toaddress, privatemessages.stamp = parent.stamp WHERE privatemessages.roottxid = '' AND privatemessages.address=parent.address;";
if (usesqlite) {
fixOrphanMessages = "UPDATE messages SET (roottxid,topic) = (SELECT p.roottxid, p.topic FROM messages p WHERE messages.retxid=p.txid) WHERE messages.roottxid IS NULL;";
fixOrphanMessages2 = "UPDATE privatemessages SET (roottxid,toaddress,stamp) = (SELECT m.roottxid, m.toaddress, m.stamp FROM privatemessages m WHERE privatemessages.retxid=m.txid AND privatemessages.address=m.address) WHERE privatemessages.roottxid IS NULL;";
}
}
//For mempool processing
//Keeps a list of mempool transactions that have already been processed
var mempooltxidsAlreadyProcessed = [];
//Global store of mempool trxs to write
var mempoolSQL = [];
//Global store of txs in the mempoolSQL
var mempoolTXsBeingWritten = [];
//Global store of txs from blocks to be written
var globalsql = [];
//keeping track of time spent writing block transactions
var sqlTime;
//keeping track of time spent writing mempool transactions
var memSqlTime;
}
//Expect service to be started on startup, give the database/rpc some time to come up
console.log("Sleeping for " + secondsToWaitonStart + " seconds to give mysql/rpc time to come up.");
await sleep(secondsToWaitonStart * 1000);
if (usesqlite) {
dbtype = 'sqlite';
dbconfig = { sqldbfile: sqldbfile, schemafile: 'memberEmpty.db' }
} else {
dbtype = 'mysql';
dbconfig.connectionLimit = 10; // put this in the config when all other direct connections are removed
dbconfig.charset = "utf8mb4";
}
dbpoolapp = await dbhandler.createPool(dbtype, dbconfig);
var result = await dbpoolapp.runQuery("select 1=1;");
//blockConnection = await dbhandler.getConnection(dbtype, dbpoolapp);
//mempoolConnection = await dbhandler.getConnection(dbtype, dbpoolapp);
//logConnection = await dbhandler.getConnection(dbtype, dbpoolapp);
//rlConnection = await dbhandler.getConnection(dbtype, dbpoolapp);
dbpoolService = await dbhandler.createPool(dbtype, dbconfig);
sqliteStartProcessing();
//Processing Blocks Into DB
//SQLite specific
async function sqliteStartProcessing() {
if (usesqlite) {
//dbbc = await sqlite.open(sqldbfile);
//FULL
//This mode blocks (invokes the busy-handler callback) until there is no database writer
//and all readers are reading from the most recent database snapshot. It then checkpoints
//all frames in the log file and syncs the database file. FULL blocks concurrent writers
//while it is running, but readers can proceed.
//RESTART
//This mode works the same way as FULL with the addition that after checkpointing the log
//file it blocks (calls the busy-handler callback) until all readers are finished with the
//log file. This ensures that the next client to write to the database file restarts the log
//file from the beginning. RESTART blocks concurrent writers while it is running, but allowed
//readers to proceed.
//TRUNCATE
//This mode works the same way as RESTART with the addition that the WAL file is truncated to
//zero bytes upon successful completion.
await dbpoolapp.runQuery("PRAGMA wal_checkpoint(TRUNCATE)");
//Defines the number of pages from the database file for storing in RAM, i.e., the cache size.
//Increasing this parameter may increase performance of the database on high load, since the
//greater its value is, the more modifications a session can perform before retrieving
//exclusive lock.
//await dbbc.run("PRAGMA cache_size=100000");
await dbpoolapp.runQuery("PRAGMA cache_size=100000");
//EXCLUSIVE — the database file is used in exclusive mode. The number of system calls
//to implement file operations decreases in this case, which may increase database performance.
//Use only for initial sync
//await dbbc.run("PRAGMA LOCKING_MODE = EXCLUSIVE");
await dbpoolapp.runQuery("PRAGMA LOCKING_MODE = EXCLUSIVE");
//0 | OFF — database synchronization is not used. I.e., SQLite takes no breaks when
//transmitting data to the operating system. Such mode can substantially increase
//performance. The database will meet the integrity conditions after the SQLite crash,
// however, data will be corrupted in case of system crash or power off.
//Use only for initial sync
//await dbbc.run("PRAGMA synchronous = OFF");
await dbpoolapp.runQuery("PRAGMA synchronous = OFF");
if (completeDBrebuild && allowpragmajournalmode) {
//await dbbc.run("PRAGMA JOURNAL_MODE = MEMORY");
await dbpoolapp.runQuery("PRAGMA JOURNAL_MODE = MEMORY");
}
}
if (completeDBrebuild) {
await rebuildActionTypes([], rebuildPauseBetweenCalls, true, true);
} else if (rebuildFromDBTransactions) {
await rebuildActionTypes(actionTypes, rebuildPauseBetweenCalls, false, false, reimportTransaction);
}
//Block with first memo transaction
currentBlock = 525471;
//var result = await dbbc.get("SELECT * FROM status WHERE name='lastblockprocessed';");
var result = await dbpoolapp.runQuery("SELECT * FROM status WHERE name='lastblockprocessed';");
try {
currentBlock = result[0].value;
} catch (e) {
//start from beginning
}
if (overrideStartBlock) {
currentBlock = overrideStartBlock;
}
if (currentBlock < 600000 && allowpragmajournalmode) {
//MEMORY — the rollback journal is kept in RAM and doesn’t use the disk subsystem.
//Such mode provides more significant performance increase when working with log.
//However, in case of any failures within a transaction, data in the DB will be
//corrupted with high probability due to a lack of saved data copy on the disk.
//Use only if there are a lot more blocks to process
//await dbbc.run("PRAGMA JOURNAL_MODE = MEMORY");
await dbpoolapp.runQuery("PRAGMA JOURNAL_MODE = MEMORY");
}
lastBlockSuccessfullyProcessed = currentBlock - 1;
fetchAndProcessBlocksIntoDB();
}
async function runSafeDBPolicy() {
if (usesqlite) {
await dbpoolapp.runQuery("PRAGMA LOCKING_MODE = NORMAL");
await dbpoolapp.runQuery("PRAGMA synchronous = FULL");
//await dbpoolapp.runQuery("PRAGMA JOURNAL_MODE = DELETE");
//The default method by which SQLite implements atomic commit and rollback is a rollback journal.
//Beginning with version 3.7.0 (2010-07-21), a new "Write-Ahead Log" option (hereafter referred to as "WAL") is available.
//There are advantages and disadvantages to using WAL instead of a rollback journal. Advantages include:
//WAL is significantly faster in most scenarios.
//WAL provides more concurrency as readers do not block writers and a writer does not block readers. Reading and writing can proceed concurrently.
//Disk I/O operations tends to be more sequential using WAL.
//WAL uses many fewer fsync() operations and is thus less vulnerable to problems on systems where the fsync() system call is broken.
await dbpoolapp.runQuery("PRAGMA journal_mode=WAL");
}
}
async function rebuildActionTypes(actionTypes, sleepTime, complete, emptyDBFirst, reimportTransaction) {
if (completeDBrebuild) {
downloadprofilepics = false;
}
//if(!usesqlite){
// console.log("Rebuild actions from db only works with sqlite for now.");
// return;
//}
var whereClause = " WHERE 1=0 "
for (var i = 0; i < actionTypes.length; i++) {
whereClause += " OR action='6a02" + actionTypes[i] + "' ";
}
if (complete) {
whereClause = " ";
}
if (reimportTransaction) {
whereClause = " WHERE txid='" + reimportTransaction + "'";
}
var fullquery = "SELECT * FROM transactions " + whereClause + " order by time asc;";
var result;
var conn;
var query;
if (!usesqlite) {
conn = mysql.createConnection(dbconfig);
query = util.promisify(conn.query).bind(conn);
}
if (emptyDBFirst) {
var deletedb = [];
deletedb.push("delete from blocks;");
deletedb.push("delete from follows;");
deletedb.push("delete from hiddenposts;");
deletedb.push("delete from hiddenusers;");
deletedb.push("delete from likesdislikes;");
deletedb.push("delete from messages;");
deletedb.push("delete from mods;");
deletedb.push("delete from names;");
deletedb.push("delete from notifications;");
deletedb.push("delete from privatemessages;");
deletedb.push("delete from subs;");
deletedb.push("delete from tips;");
deletedb.push("delete from topics;");
deletedb.push("delete from userratings;");
//deletedb.push("delete from zsqlqueries;");
await putMultipleSQLStatementsInSQL(deletedb, dbpoolapp, query);
}
console.log(" Transactions: " + whereClause);
await dbpoolapp.runQuery(fullquery);
/*if (usesqlite) {
result = await dbbc.all(fullquery);
} else {
result = await query(fullquery);
}*/
console.log("Reimporting " + result.length + " Transactions");
var startTime = new Date().getTime();
var perThousandTime = new Date().getTime();
var opTimes = [];
var opCount = [];
for (var i = 0; i < result.length; i++) {
try {
//const request = require("request");
//var txiddata=await requestPromise("https://rest.bitcoin.com/v2/rawtransactions/getrawtransaction/"+result[i].txid);
//var tx = bitcoinJs.Transaction.fromHex(txiddata.body.replace('"',''));
var tx = bitcoinJs.Transaction.fromHex(result[i].rawtx.replace(/"/g, ''));
var opStartTime = new Date().getTime();
var sql = getSQLForTRX(tx, result[i].time, result[i].blockno);
var oc = sqlforaction.lastoc;
await putMultipleSQLStatementsInSQL(sql, dbpoolapp, query);
var opEndTime = new Date().getTime();
var opTime = opEndTime - opStartTime;
if (!opTimes[oc]) {
opTimes[oc] = 0;
opCount[oc] = 0;
}
opTimes[oc] += opTime;
opCount[oc]++;
await sleep(sleepTime);
//
if (i % 1000 == 0) {
var perThousandEndTime = new Date().getTime();
console.log(i + " :Time per thousand: " + (perThousandEndTime - perThousandTime));
await putMultipleSQLStatementsInSQL([fixOrphanMessages], dbpoolapp, query);
await putMultipleSQLStatementsInSQL([fixOrphanMessages2], dbpoolapp, query);
perThousandTime = new Date().getTime();
//for(var j=0;j<opCount.length;j++){
for (var op in opCount) {
console.log("op:" + op + ":" + opTimes[op] / opCount[op] + ":" + opTimes[op] + ":" + opCount[op]);
}
}
} catch (err) {
console.log(err);
await sleep(1000);
i--;
}
}
var endTime = new Date().getTime();
console.log("Total time: " + (endTime - startTime) / 1000);
console.log("Time per transaction: " + (endTime - startTime) / result.length);
if (!usesqlite) {
conn.end();
}
}
async function putMultipleSQLStatementsInSQL(mempoolSQL, dbfconn, query) {
//This processes statements one by one.
//TODO
//For mysql, maybe better to concat statements, for sqlite maybe better to use transactions
//var sqlToRun = [];
for (var i = 0; i < mempoolSQL.length; i++) {
if (mempoolSQL[i] == "") continue;
try {
var newTime = new Date().getTime();
await dbpoolapp.runQuery(mempoolSQL[i]);
var duration = new Date().getTime() - newTime;
if (duration > 100) {
var logquery = "insert into zsqlqueries values (" + escapeFunction(mempoolSQL[i]) + "," + duration + ");"
await dbpoolapp.runQuery(logquery);
console.log(mempoolSQL[i]);
console.log("Query Time (ms):" + duration);
}
//sqlToRun.push(dbbc.run(sql[i]));
} catch (e) {
console.error(e);
console.error(mempoolSQL[i]);
//Skip unrecognized token errors
if (e.message.indexOf("unrecognized token") == -1) {
throw (e);
}
}
}
/*
//Transactions
await dbbc.transaction(dbbc => {
Promise.all(sqlToRun);
});*/
}
//MYSQL specific
/*
async function getLastBlockProcessedMYSQL() {
dbbc = mysql.createConnection(dbconfig);
if (completeDBrebuild) {
await rebuildActionTypes([], rebuildPauseBetweenCalls, true, true);
} else if (rebuildFromDBTransactions) {
await rebuildActionTypes(actionTypes, rebuildPauseBetweenCalls, false, false);
}
dbbc.connect(function (err) {
if (err) {
console.log(err);
console.log("Waiting 10 Seconds");
return setTimeout(getLastBlockProcessedMYSQL, secondsToWaitBetweenPollingNextBlock * 1000);
}
var sql = "USE " + dbname + ";SELECT * FROM status WHERE name='lastblockprocessed';";
dbbc.query(sql, startProcessing);
}
);
}
function startProcessing(err, result) {
if (err) {
try {
dbbc.end();
} catch (e) { }
console.log(err);
console.log("Waiting 10 Seconds");
return setTimeout(getLastBlockProcessedMYSQL, secondsToWaitBetweenPollingNextBlock * 1000);
return;
}
try {
dbbc.end();
console.log('Read last block processed from DB as:' + result[1][0].value);
currentBlock = parseInt(result[1][0].value) + 1;
} catch (error) {
if (currentBlock < 525471) currentBlock = 525471;
}
if (overrideStartBlock) {
currentBlock = overrideStartBlock;
}
lastBlockSuccessfullyProcessed = currentBlock - 1;
fetchAndProcessBlocksIntoDB();
}
*/
//General SQL
function fetchAndProcessBlocksIntoDB() {
rpc.getBlockHash(currentBlock, processBlockHashIntoDB);
}
function processBlockHashIntoDB(err, ret) {
if (err) {
//-8 code from BU, -1 code from BCHD
if (err.code != -8 && err.code != -1) {
console.log(err);
console.log("Wait " + secondsToWaitBetweenPollingNextBlock + " seconds");
}
//We've exhausted current blocks. Start processing into the mempool at this point.
if ((err.code == -8 || err.code == -1) && mempoolprocessingstarted == false && currentBlock > 613419) {
//-8 code from BU, -1 code from BCHD
mempoolprocessingstarted = true;
//wait 10 seconds so as not to conflict with putting last blocks in db
console.log("Received " + err.code + " error (" + err.message + "), we're up to date - start processing mempool in 10 seconds");
setTimeout(putMempoolIntoDB, 10000);
//Process previous block again immediately to clear all built up trxs into db
currentBlock--;
//Switch off fast but risky db PRAGMA calls
if (usesqlite) {
runSafeDBPolicy();
}
return fetchAndProcessBlocksIntoDB();
}
return setTimeout(fetchAndProcessBlocksIntoDB, secondsToWaitBetweenPollingNextBlock * 1000);
}
console.log("Processing Block Into SQL:" + currentBlock);
//console.log("block hash:" + ret.result);
rpc.getBlock(ret.result, false, function (err, ret) { processBlockIntoDB(err, ret, currentBlock) });
}
function processBlockIntoDB(err, ret, blocknumber) {
if (err) {
console.log(err);
if (err.code == -1) {//Pruned block?
currentBlock++;
}
console.log("Wait " + secondsToWaitBetweenProcessingBlocks + " seconds");
return setTimeout(fetchAndProcessBlocksIntoDB, secondsToWaitBetweenErrorOnBlocks * 1000);
}
takeBlockHexTransactionsAndPutThemInTheDB(ret.result, blocknumber);
}
function takeBlockHexTransactionsAndPutThemInTheDB(hex, blocknumber) {
var block = bitcoinJs.Block.fromHex(hex);
//console.log(block.getId() + "\n");
var transactions = block.transactions;
lastblocktimestamp = block.timestamp;
for (var i = 1; i < transactions.length; i++) {
try {
var dbresults = getSQLForTRX(transactions[i], block.timestamp, blocknumber);
globalsql = globalsql.concat(dbresults);
} catch (error) {
//Skip if any problem
console.error(error);
console.log("error: " + error);
}
}
writeToSQL(globalsql);
}
function getSQLForTRX(tx, time, blocknumber) {
try {
if (tx === undefined) {
return [];
}
//This assumes maximum of 1 memo action per trx
var txid = tx.getId();
var sql = [];
//write the raw trx to db for future use if the tx exists in a block
if (blocknumber > 0) {
for (var i = 0; i < tx.outs.length; i++) {
var hex = tx.outs[i].script.toString('hex');
if (hex.startsWith("6a02") || hex.startsWith("6a04534c500001010747454e45534953")) {
var txhex = tx.toHex();
if (txhex.length > 0 && txhex.length < 51200) {
if (!hex.substr(0, 8).startsWith("6a026d3")) { //Ignore token actions
sql.push(insertignore + " into transactions VALUES (" + escapeFunction(txid) + "," + escapeFunction(hex.substr(0, 8)) + "," + escapeFunction(txhex) + "," + escapeFunction(time) + "," + escapeFunction(blocknumber) + ");");
}
}
}
}
}
//Don't examine all transactions again that have already been examined for memo trxs
if (mempooltxidsAlreadyProcessed.indexOf(txid) !== -1) {
//console.log("Skipping - this tx already processed from the mempool.");
return sql;
}
//Don't process memo transactions that have already been processed
if (memotxidsalreadyprocessed.indexOf(txid) !== -1) {
console.log("Skipping - this tx already processed:" + txid);
return sql;
}
return sql.concat(sqlforaction.getSQLForAction(tx, time, usesqlite, escapeFunction, blocknumber, profilepicpath, insertignore, querymemoformissingpics, debug, downloadprofilepics, keepThreadNotificationsTime, keepNotificationsTime, onConflictAddress));
} catch (e2) {
console.log(e2);
return [];
}
}
function getHouseKeepingOperation() {
//These are expensive, so max of once per day
var currentTime = new Date().getTime();
if (currentTime - lastExpensiveSQLHousekeepingOperation > dbHouseKeepingOperationInterval) {
console.log("Adding SQL Housekeeping operation");
lastExpensiveSQLHousekeepingOperation = currentTime;
//return expensiveHousekeepingSQLOperations[Math.floor(Math.random() * expensiveHousekeepingSQLOperations.length)];
return expensiveHousekeepingSQLOperations;
}
//return expensiveHousekeepingSQLOperations;
return [];
}
async function writeToSQL(sql) {
console.log("SQL processing queue:" + sql.length);
//console.log("Processed:" + memotxidsalreadyprocessed.length);
//console.log("Processed:" + mempooltxidsAlreadyProcessed.length);
if (sql.length < batchsqlonsynccount && !mempoolprocessingstarted) {
//on initial sync, we'll batch sql
//console.log("Not enough transactions to process");
currentBlock++;
return setTimeout(fetchAndProcessBlocksIntoDB, secondsToWaitBetweenProcessingBlocks * 1000);
}
//Keep track of the last block processed
sql.push("REPLACE INTO `status` (`name`, `value`) VALUES ('lastblockprocessed', '" + currentBlock + "');");
if (mempoolprocessingstarted) {
//Don't want to perform housekeeping if still doing initial sync
sql = sql.concat(getHouseKeepingOperation());
if (sqlite) {
//Try to sync wal log with database if sqlite
sql.push("PRAGMA wal_checkpoint(RESTART)");
}
}
//if (usesqlite) {
try {
sqlTime = new Date().getTime();
await putMultipleSQLStatementsInSQL(sql, dbpoolapp);
}
catch (e) {
return afterBlockProcessing(e, null);
}
return afterBlockProcessing(null, null);
/*} else {
sql = sql.join(" ");
sql = "SET NAMES 'utf8mb4'; USE " + dbname + ";" + sql;
//This creates a connection each time. Might be better to reuse connection, especially on initial sync
dbbc = mysql.createConnection(dbconfig);
dbbc.connect(function (err) { if (err) console.log("dberror 1;" + err); });
sqlTime = new Date().getTime();
dbbc.query(sql, afterBlockProcessing);
}*/
}
function afterBlockProcessing(err, result) {
if (err) {
console.log("dberror 2;" + err);
console.log("Wait 60 Seconds");
currentBlock = lastBlockSuccessfullyProcessed + 1;
return setTimeout(fetchAndProcessBlocksIntoDB, 60000);
} else {
console.log("Fetched And Processed Upto Block " + currentBlock);
console.log("SQL time:" + (new Date().getTime() - sqlTime));
globalsql = [];
lastBlockSuccessfullyProcessed = currentBlock;
currentBlock++;
//console.log("Processed:" + memotxidsalreadyprocessed.length);
//These transactions have been included in a block, therefore shouldn't appear in the mempool again
memotxidsalreadyprocessed = [];
console.log("Wait " + secondsToWaitBetweenProcessingBlocks + " Seconds");
return setTimeout(fetchAndProcessBlocksIntoDB, secondsToWaitBetweenProcessingBlocks * 1000);
}
}
//Processing Mempool Into DB
function putMempoolIntoDB() {
//console.log(new Date() + ":Start putMempoolIntoDB:");
try {
rpc.getRawMemPool(processAllRawTransactions);
} catch (err) {
console.log(err);
}
}
function processAllRawTransactions(err, ret) {
if (err) {
console.log(err);
console.log("processAllRawTransactions error - starting to process mempool in 10");
return setTimeout(putMempoolIntoDB, 10000);
}
function batchCall() {
ret.result.forEach(getRawTransactionIfNotProcessedBefore);
}
rpc.batch(batchCall, function (err, rawtxs) { processMempoolTX(err, rawtxs, ret); });
}
function getRawTransactionIfNotProcessedBefore(txid) {
if (mempooltxidsAlreadyProcessed.indexOf(txid) === -1) {
rpc.getRawTransaction(txid);
} else {
//console.log("Already processed - Skipping mempool trx:"+txid);
}
}
function processMempoolTX(err, rawtxs, ret) {
if (err) {
console.log(err);
return setTimeout(putMempoolIntoDB, secondsToWaitBetweenPollingMemPool * 1000);
}
rawtxs.map(putSingleTransactionIntoSQLglobalvarsResult);
writeMempoolSQLtoDBs(ret.result);
}
function putSingleTransactionIntoSQLglobalvarsResult(rawtx) {
putSingleTransactionIntoSQLglobalvars(rawtx.result)
}
function putSingleTransactionIntoSQLglobalvars(rawtx) {
var timeStampInMs = Math.floor(Date.now() / 1000);
try {
var transaction = bitcoinJs.Transaction.fromHex(rawtx);
} catch (e) {
console.log(e);
console.log(rawtx);
return;
}
var inserts = getSQLForTRX(transaction, timeStampInMs);
mempoolSQL = mempoolSQL.concat(inserts);
if (inserts.length > 0) {
mempoolTXsBeingWritten.push(transaction.getId());
console.log("Will process:" + transaction.getId());
}
}
async function writeMempoolSQLtoDBs(trxidsbeingprocessed) {
if (mempoolSQL.length < 1) {
setTimeout(putMempoolIntoDB, secondsToWaitBetweenPollingMemPool * 1000);
return;
}
memSqlTime = new Date().getTime();
//FixOrphans probably doesn't need to be run so frequently and should probably be on its own thread
mempoolSQL.push(fixOrphanMessages);
mempoolSQL.push(fixOrphanMessages2);
//if (usesqlite) {
try {
await putMultipleSQLStatementsInSQL(mempoolSQL, dbpoolapp);
}
catch (e) {
return finishMempoolProcessing(e, null, trxidsbeingprocessed);
}
return finishMempoolProcessing(null, null, trxidsbeingprocessed);
/*} else {
dbmem = mysql.createConnection(dbconfig);
dbmem.connect(function (err) { if (err) console.log("mempool dberror 3;" + err); });
var mempoolFinalSQL = "SET NAMES 'utf8mb4'; USE " + dbname + ";" + mempoolSQL.join(" ");
dbmem.query(mempoolFinalSQL, function (err, result) { finishMempoolProcessing(err, result, trxidsbeingprocessed); });
//writeToNEO4J(mempoolCYPHER);
}*/
}
var finishMempoolProcessing = function (err, result, trxidsbeingprocessed) {
if (err) {
console.log("mempool dberror 4;" + err);
}
else {
mempooltxidsAlreadyProcessed = trxidsbeingprocessed;
console.log("Mempool Processing time (ms):" + (new Date().getTime() - memSqlTime));
mempoolSQL = [];
memotxidsalreadyprocessed = memotxidsalreadyprocessed.concat(mempoolTXsBeingWritten);
mempoolTXsBeingWritten = [];
}
setTimeout(putMempoolIntoDB, secondsToWaitBetweenPollingMemPool * 1000);
}
//HTTP server to handle providing utxos and putting trxs in the mempool
if (httpserverenabled || httpsserverenabled) {
var indexfile = "";
var indexparts = [];
try {
indexfile = fs.readFileSync(pathtoindex).toString('utf-8');
indexparts = indexfile.split("<!--INSERTMETADATA-->").join('<!--INSERTCONTENT-->').split("<!--INSERTCONTENT-->");
//override header to remove title/description
indexparts[0] = `<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta http-equiv="x-ua-compatible" content="ie=edge">
`;
} catch (err) {
console.log("Failed to load index.html file " + err);
}
try {
//webServer.use(bodyParser.json());
if (httpserverenabled) {
console.log("Try starting httpserver ");
// Create an instance of the http server to handle HTTP requests
var app = http.createServer(webServer).listen(httpport);
console.log('HTTP server running on port ' + httpport);
}
if (httpsserverenabled) {
console.log("Try starting httpsserver ");
const options = {
key: fs.readFileSync(keypem),
cert: fs.readFileSync(certpem)
};
// Create an instance of the https server to handle HTTPS requests
var app = https.createServer(options, webServer).listen(httpsport);
console.log('HTTPS server running on port ' + httpsport);
}
} catch (err) {
console.log(err);
}
async function webServer(req, res) {
console.log("webserver request received:" + req.url);
try {
if (req.url.startsWith("/v2/address/utxo/bitcoincash:")) {
let address = sanitizeAlphanumeric(req.url.substr(29));
res.writeHead(200, { "Access-Control-Allow-Origin": AccessControlAllowOrigin, 'Content-Type': 'application/json; charset=utf-8', 'Cache-Control': 'no-store' });
if (address.length > 120) {
res.end(`{"error":"Address Too Long"}`);
return;
}
if (useServerWallets) {
console.log(req.url);
rpc.listUnspent(0, 9999999, [address], function (err, ret) {
if (ret.result.length == 0) {
console.log('redirecting');
request({ url: backuputxoserver + address, encoding: null }, async function (error, response, body) {
res.end(body);
console.log("importing wallet for " + address);
//Try using importprunedfunds to read utxos into wallet
try {
var theUTXOs = JSON.parse(body).utxos;
} catch (err) {
console.log(err);
}
for (var i = 0; i < theUTXOs.length; i++) {
try {
if(theUTXOs[i].satoshis==546)continue;//don't want SLP trxs
rpc.getRawTransaction(theUTXOs[i].txid, function (err, ret) {
var raw = ret.result;
try {
rpc.getTxOutProof([theUTXOs[i].txid], function (err, ret) {
try {
var proof = ret.result;
rpc.importPrunedFunds(raw, proof, "ipf", function (err, ret) {
console.log("utxo imported " + ret.id);
});
} catch (err) { console.log(err); }
});
} catch (err) { console.log(err); }
});
await sleep(100);
} catch (err) { console.log(err); }
}
return;
});
return;
}
console.log(ret.result.length);
returnUTXOs(err, ret, res);
return;
});
return;
} else if (bchdgrpcenabled) {
client.getAddressUnspentOutputs({ address: address, includemempool: true }, {}, function (err, response) { returnUTXOs(err, response, res); });
return;
} else {
//rpc.listUnspent({ address }, function (err, ret) { returnUTXOs(err, ret, res); });
res.end(`{"error":"Server wallets and BCHD GRPC - Neither Supported"}`);
return;
}
} else if (req.url.startsWith("/v2/reg/")) {
res.writeHead(200, { "Access-Control-Allow-Origin": AccessControlAllowOrigin, 'Content-Type': 'application/json; charset=utf-8', 'Cache-Control': 'no-store' });
if (useServerWallets) {
let key = sanitizeAlphanumeric(req.url.split('?')[0].substr(8));
if (key.length > 120) {
res.end(`{"error":"Address Too Long"}`);
return;
}
rpc.importPubKey(key, key, false, function (err, ret) {
res.end("OK");
return;
});
} else {
res.end(`{"error":"Not Supported"}`);
}
return;
} else if (req.url.startsWith("/v2/rawtransactions/sendRawTransaction/")) {