Skip to content

Commit

Permalink
fix indexes to yield to event queue
Browse files Browse the repository at this point in the history
  • Loading branch information
MorningLightMountain713 committed Jun 30, 2024
1 parent f9d322c commit a3c6259
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 61 deletions.
6 changes: 5 additions & 1 deletion ZelBack/src/services/utils/daemonrpcClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@ const daemonrpc = require('daemonrpc');
const fullnode = require('fullnode');
const config = require('config');

userconfig = require('../../../../config/userconfig');

const { initial: isTestnet } = userconfig;

const fnconfig = new fullnode.Config();
const isTestnet = userconfig.initial.testnet;

const rpcuser = fnconfig.rpcuser() || 'rpcuser';
const rpcpassword = fnconfig.rpcpassword() || 'rpcpassword';
const rpcport = fnconfig.rpcport() || (isTestnet === true ? config.daemon.rpcporttestnet : config.daemon.rpcport);
Expand Down
111 changes: 52 additions & 59 deletions ZelBack/src/services/utils/networkStateManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ class NetworkStateManager extends EventEmitter {

#endpointIndex = new Map();

#indexes = { pubkey: this.#pubkeyIndex, endpoint: this.#endpointIndex };

#running = false;

#controller = new FluxController();
Expand Down Expand Up @@ -91,6 +89,10 @@ class NetworkStateManager extends EventEmitter {
return this.#running;
}

get #indexes() {
return { pubkey: this.#pubkeyIndex, endpoint: this.#endpointIndex };
}

#createIndexWorker() {
this.#indexWorker = new Worker('./ZelBack/src/services/utils/networkStateWorker.js');
this.#indexWorker.on('message', (indexes) => {
Expand All @@ -110,68 +112,55 @@ class NetworkStateManager extends EventEmitter {
this.#endpointIndex = endpointIndex;
}

#buildIndexes(nodes, options = {}) {
const filter = options.filter || null;
const type = options.type || 'stateUpdate';

if (type === 'stateUpdate') {
// nodes.forEach((node) => {
// if (!this.#pubkeyIndex.has(node.pubkey)) {
// this.#pubkeyIndex.set(node.pubkey, new Map());
// }
// this.#pubkeyIndex.get(node.pubkey).set(node.ip, node);
// this.#endpointIndex.set(node.ip, node);
// });
nodes.forEach((node) => {
const nodesByPubkey = this.#pubkeyIndex.get(node.pubkey)
|| this.#pubkeyIndex.set(node.pubkey, new Map()).get(node.pubkey);
async #buildIndexes(nodes) {
// nodes.forEach((node) => {
// if (!this.#pubkeyIndex.has(node.pubkey)) {
// this.#pubkeyIndex.set(node.pubkey, new Map());
// }
// this.#pubkeyIndex.get(node.pubkey).set(node.ip, node);
// this.#endpointIndex.set(node.ip, node);
// });

const nodeCount = nodes.length;

const pubkeyIndex = new Map();
const endpointIndex = new Map();

function iterIndexes(startIndex, callback) {
const endIndex = startIndex + 1000;
const chunk = nodes.slice(startIndex, endIndex);

chunk.forEach((node) => {
const nodesByPubkey = pubkeyIndex.get(node.pubkey)
|| pubkeyIndex.set(node.pubkey, new Map()).get(node.pubkey);

nodesByPubkey.set(node.ip, node);
this.#endpointIndex.set(node.ip, node);
endpointIndex.set(node.ip, node);
});
// nodes.forEach((node) => {
// const pubkeyIndex = this.#pubkeyIndex.get(node.pubkey) || new Map();
// pubkeyIndex.set(node.ip, node);
// this.#endpointIndex.set(node.ip, node);
// });
return;
}

if (!nodes.length) {
if (type === 'pubkey') {
const toDelete = this.#indexes.pubkey.get(filter);
this.#indexes.pubkey.delete(filter);
toDelete.forEach((node) => {
this.#indexes.endpoint.delete(node.ip);
});
} else if (type === 'endpoint') {
this.#indexes.endpoint.delete(filter);
if (endIndex >= nodeCount) {
callback();
return;
}

return;
}

let pubkeyMap = null;

if (type === 'pubkey') {
pubkeyMap = new Map();
this.#pubkeyIndex.set(nodes[0].pubkey, pubkeyMap);
setImmediate(iterIndexes.bind(this, endIndex, callback));
}

nodes.forEach((node) => {
if (type === 'pubkey') pubkeyMap.set(node.ip, node);

this.#endpointIndex.set(node.ip, node);
// Yield to the event queue here, this way we are only ever doing O(1000),
// instead of O(n). With around 13k nodes, this was taking on average 8ms.
// I.e. the event queue was blocked for 8ms. Now we yield. I was using a
// worker here, but overkill for what we are doing.

return new Promise((resolve) => {
iterIndexes(0, () => {
this.#setIndexes(pubkeyIndex, endpointIndex);
console.log('pubkeyIndexSize:', pubkeyIndex.size);
console.log('endpointIndexSize:', endpointIndex.size);
resolve();
});
});
}

#recreateIndexes() {
this.#pubkeyIndex.clear();
this.#endpointIndex.clear();

this.#buildIndexes(this.#state);
}

reset() {
// recreate objects so they can't be mutated externally
this.#pubkeyIndex = new Map();
Expand All @@ -180,18 +169,19 @@ class NetworkStateManager extends EventEmitter {
}

async fetchNetworkState() {
console.log('fetching state');
this.#updatingState = true;
// should use monotonic clock for any elapsed times
const start = process.hrtime.bigint();

let state = null;
let state = [];

while (!state) {
while (!state.length) {
if (this.#controller.aborted) break;

const fetchStart = process.hrtime.bigint();
// eslint-disable-next-line no-await-in-loop
const res = await this.stateFetcher().catch(() => null);
const res = await this.stateFetcher().catch(() => []);
console.log('Fetch finished, elapsed ms:', Number(process.hrtime.bigint() - fetchStart) / 1000000);

// eslint-disable-next-line no-await-in-loop
Expand All @@ -202,16 +192,19 @@ class NetworkStateManager extends EventEmitter {

if (state.length) {
console.log('Nodes found:', state.length);
console.log('Setting state and indexes');
this.#indexStart = process.hrtime.bigint();

this.#state = state;

if (!populated) this.emit('populated');

console.log('Setting state and indexes');
this.#indexStart = process.hrtime.bigint();

if (this.useIndexWorker) {
this.#indexWorker.postMessage(state);
} else {
this.#recreateIndexes();
// this.#recreateIndexes();
await this.#buildIndexes(this.#state);
console.log('Indexes created, elapsed ms:', Number(process.hrtime.bigint() - this.#indexStart) / 1000000);
this.#indexStart = null;

Expand Down
2 changes: 1 addition & 1 deletion apiServer.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
global.userconfig = require('./config/userconfig');
globalThis.userconfig = require('./config/userconfig');

if (typeof AbortController === 'undefined') {
// polyfill for nodeJS 14.18.1 - without having to use experimental features
Expand Down

0 comments on commit a3c6259

Please sign in to comment.