From 605aa3410ecd8bf96f188ff5ecc3d5de6cf7962f Mon Sep 17 00:00:00 2001 From: Sameer Kumar Subudhi Date: Thu, 4 Jan 2024 19:46:44 +0530 Subject: [PATCH 1/6] :racehorse: Optimize WS API client healthcheck logic --- services/blockchain-connector/config.js | 11 +- .../blockchain-connector/shared/sdk/client.js | 236 ++++++------------ 2 files changed, 86 insertions(+), 161 deletions(-) diff --git a/services/blockchain-connector/config.js b/services/blockchain-connector/config.js index f3342dea64..c07b57864b 100644 --- a/services/blockchain-connector/config.js +++ b/services/blockchain-connector/config.js @@ -116,12 +116,15 @@ config.job = { }; config.apiClient = { - heartbeatAckMaxWaitTime: Number(process.env.HEARTBEAT_ACK_MAX_WAIT_TIME) || 1000, // in millisecs - aliveAssumptionTime: Number(process.env.CLIENT_ALIVE_ASSUMPTION_TIME) || 5 * 1000, // in millisecs - connectionLimit: 5, + wsServerPingInterval: 3 * 1000, // in millisecs + pingIntervalBuffer: 1000, // in millisecs + instantiation: { + maxWaitTime: Number(process.env.CLIENT_INSTANTIATION_MAX_WAIT_TIME) || 5 * 1000, // in millisecs + retryInterval: Number(process.env.CLIENT_INSTANTIATION_RETRY_INTERVAL) || 1, // in millisecs + }, request: { maxRetries: Number(process.env.ENDPOINT_INVOKE_MAX_RETRIES) || 3, - retryDelay: Number(process.env.ENDPOINT_INVOKE_RETRY_DELAY) || 10, // in millisecs + retryDelay: Number(process.env.ENDPOINT_INVOKE_RETRY_DELAY) || 50, // in millisecs }, }; diff --git a/services/blockchain-connector/shared/sdk/client.js b/services/blockchain-connector/shared/sdk/client.js index f6e90d05e9..ac5d053e6d 100644 --- a/services/blockchain-connector/shared/sdk/client.js +++ b/services/blockchain-connector/shared/sdk/client.js @@ -18,6 +18,7 @@ const { Signals, HTTP, Exceptions: { TimeoutException }, + Utils: { waitForIt }, } = require('lisk-service-framework'); const { createWSClient, createIPCClient } = require('@liskhq/lisk-api-client'); @@ -26,99 +27,102 @@ const delay = require('../utils/delay'); const logger = Logger(); +// Connection strings +const liskAddressWs = config.endpoints.liskWs; +const liskAddressHttp = config.endpoints.liskHttp; + // Constants const HTTP_TIMEOUT_STATUS = 'ETIMEDOUT'; const RPC_TIMEOUT_MESSAGE = 'Response not received in'; const TIMEOUT_REGEX_STR = `(?:${HTTP_TIMEOUT_STATUS}|${RPC_TIMEOUT_MESSAGE})`; const TIMEOUT_REGEX = new RegExp(TIMEOUT_REGEX_STR); -const liskAddressWs = config.endpoints.liskWs; -const liskAddressHttp = config.endpoints.liskHttp; +const RETRY_INTERVAL = config.apiClient.instantiation.retryInterval; +const MAX_INSTANTIATION_WAIT_TIME = config.apiClient.instantiation.maxWaitTime; const NUM_REQUEST_RETRIES = config.apiClient.request.maxRetries; const ENDPOINT_INVOKE_RETRY_DELAY = config.apiClient.request.retryDelay; -const CLIENT_ALIVE_ASSUMPTION_TIME = config.apiClient.aliveAssumptionTime; -const HEARTBEAT_ACK_MAX_WAIT_TIME = config.apiClient.heartbeatAckMaxWaitTime; -const CONNECTION_LIMIT = config.apiClient.connectionLimit; - -// Pool of cached API clients -const cachedApiClients = []; - -const checkIsClientAlive = async clientCache => - // eslint-disable-next-line consistent-return - new Promise(resolve => { - if (!clientCache || !clientCache._channel || !clientCache._channel.isAlive) { - return resolve(false); +const WS_SERVER_PING_INTERVAL = config.apiClient.wsServerPingInterval; +const WS_SERVER_PING_BUFFER = config.apiClient.pingIntervalBuffer; // In case the server is under stress + +// Caching +let clientCache; +let pingTimeout; +let lastPingAt; +let instantiationBeginTime; +let isInstantiating = false; + +const pingListener = () => { + const now = Date.now(); + clearTimeout(pingTimeout); + logger.trace(`Server ping received at ${now}.`); + + pingTimeout = setTimeout(() => { + const timeSinceLastPing = now - lastPingAt; + // Do not reset if the ping was delayed and just received + if (timeSinceLastPing) { + logger.warn(`No ping from server in ${timeSinceLastPing}ms (last ping: ${lastPingAt}).`); + clientCache._channel.isAlive = false; + Signals.get('resetApiClient').dispatch(); } + }, WS_SERVER_PING_INTERVAL + WS_SERVER_PING_BUFFER); - if (config.isUseLiskIPCClient) { - return resolve(clientCache._channel.isAlive); - } - - const heartbeatCheckBeginTime = Date.now(); - const wsClientInstance = clientCache._channel._ws; - - // eslint-disable-next-line no-use-before-define - const boundPongListener = () => pongListener(resolve); - wsClientInstance.on('pong', boundPongListener); - wsClientInstance.ping(() => {}); - - // eslint-disable-next-line consistent-return - const timeout = setTimeout(() => { - wsClientInstance.removeListener('pong', boundPongListener); - logger.debug( - `Did not receive API client pong after ${Date.now() - heartbeatCheckBeginTime}ms.`, - ); - return resolve(false); - }, HEARTBEAT_ACK_MAX_WAIT_TIME); - - const pongListener = res => { - clearTimeout(timeout); - wsClientInstance.removeListener('pong', boundPongListener); - logger.debug(`Received API client pong in ${Date.now() - heartbeatCheckBeginTime}ms.`); - return res(true); - }; - }).catch(() => false); + lastPingAt = now; +}; -const instantiateAndCacheClient = async () => { - if (cachedApiClients.length > CONNECTION_LIMIT) { - logger.debug( - `Skipping API client instantiation as cached API client count(${cachedApiClients.length}) is greater than CONNECTION_LIMIT(${CONNECTION_LIMIT}).`, - ); - return; - } +const checkIsClientAlive = () => + clientCache && clientCache._channel && clientCache._channel.isAlive; +// eslint-disable-next-line consistent-return +const instantiateClient = async (isForceReInstantiate = false) => { try { - const instantiationBeginTime = Date.now(); - const clientCache = config.isUseLiskIPCClient - ? await createIPCClient(config.liskAppDataPath) - : await createWSClient(`${liskAddressWs}/rpc-ws`); - - cachedApiClients.push(clientCache); + if (!isInstantiating || isForceReInstantiate) { + if (!checkIsClientAlive() || isForceReInstantiate) { + isInstantiating = true; + instantiationBeginTime = Date.now(); + if (clientCache) await clientCache.disconnect(); + + clientCache = config.isUseLiskIPCClient + ? await createIPCClient(config.liskAppDataPath) + : await (async () => { + const client = await createWSClient(`${liskAddressWs}/rpc-ws`); + client._channel._ws.on('ping', pingListener); + return client; + })(); + + if (isForceReInstantiate) logger.info('Re-instantiated the API client forcefully.'); + + // Inform listeners about the newly instantiated ApiClient + Signals.get('newApiClient').dispatch(); + + isInstantiating = false; + } + return clientCache; + } - logger.info( - `Instantiated another API client. Time taken: ${ - Date.now() - instantiationBeginTime - }ms. Cached API client count:${cachedApiClients.length}`, - ); + if (Date.now() - instantiationBeginTime > MAX_INSTANTIATION_WAIT_TIME) { + // Waited too long, reset the flag to re-attempt client instantiation + isInstantiating = false; + } } catch (err) { - // Nullify the apiClient cache and unset isInstantiating, so that it can be re-instantiated properly + // Nullify the apiClient cache, so that it can be re-instantiated properly + clientCache = null; + const errMessage = config.isUseLiskIPCClient ? `Error instantiating IPC client at ${config.liskAppDataPath}.` : `Error instantiating WS client to ${liskAddressWs}.`; - logger.error(errMessage); - logger.error(err.message); + logger.error(`${errMessage}: ${err.message}`); if (err.message.includes('ECONNREFUSED')) { throw new Error('ECONNREFUSED: Unable to reach a network node.'); } + + return null; } }; const getApiClient = async () => { - if (cachedApiClients.length === 0) { - await instantiateAndCacheClient(); - } - return cachedApiClients[0]; + const apiClient = await waitForIt(instantiateClient, RETRY_INTERVAL); + return checkIsClientAlive() ? apiClient : getApiClient(); }; const is2XXResponse = response => String(response.status).startsWith('2'); @@ -164,110 +168,28 @@ const invokeEndpoint = async (endpoint, params = {}, numRetries = NUM_REQUEST_RE if (TIMEOUT_REGEX.test(err.message)) { if (!retriesLeft) { const exceptionMsg = Object.getOwnPropertyNames(params).length - ? `Request timed out when calling '${endpoint}' with params:\n${JSON.stringify( - params, - null, - '\t', - )}.` - : `Request timed out when calling '${endpoint}'.`; + ? `Invocation timed out for '${endpoint}'.` + : `Invocation timed out for '${endpoint}' with params:\n${JSON.stringify(params)}.`; throw new TimeoutException(exceptionMsg); } await delay(ENDPOINT_INVOKE_RETRY_DELAY); } else { - if (Object.getOwnPropertyNames(params).length) { - logger.warn( - `Error invoking '${endpoint}' with params:\n${JSON.stringify(params, null, ' ')}.\n${ - err.stack - }`, - ); - } + logger.warn( + Object.getOwnPropertyNames(params).length === 0 + ? `Error invoking '${endpoint}':\n${err.stack}` + : `Error invoking '${endpoint}' with params:\n${JSON.stringify(params)}.\n${err.stack}`, + ); - logger.warn(`Error occurred when calling '${endpoint}' :\n${err.stack}`); throw err; } } } while (retriesLeft--); }; -const disconnectClient = async cachedClient => { - try { - await cachedClient.disconnect(); - } catch (err) { - logger.warn(`Client disconnection failed due to: ${err.message}`); - } -}; - -const refreshClientsCache = async () => { - // Indicates if an active client was destroyed or no active clients available - let activeClientNotAvailable = cachedApiClients.length === 0; - - // Check liveliness and remove non-responsive clients - let index = 0; - while (index < cachedApiClients.length) { - const cachedClient = cachedApiClients[index]; - try { - if (!(await checkIsClientAlive(cachedClient))) { - cachedApiClients.splice(index, 1); - if (index === 0) { - activeClientNotAvailable = true; - } - await disconnectClient(cachedClient); - } else { - index++; - } - } catch (err) { - logger.info(`Failed to refresh an active API client from cache.\nError:${err.message}`); - } - } - - // Initiate new clients if necessary - let missingClientCount = CONNECTION_LIMIT - cachedApiClients.length; - - while (missingClientCount-- > 0) { - try { - await instantiateAndCacheClient(); - } catch (err) { - logger.warn(`Failed to instantiate new API client.\nError: ${err.message}`); - } - } - - // Reset event listeners if active API client was destroyed - if (activeClientNotAvailable && cachedApiClients.length > 0) { - Signals.get('newApiClient').dispatch(); - } - - return cachedApiClients.length; -}; - -// Listen to client unresponsive events and try to reinitialize client -const resetApiClientListener = async () => { - logger.info( - `Received API client reset signal. Will drop ${cachedApiClients.length} cached API client(s).`, - ); - // Drop pool of cached clients. Will be instantiated by refresh clients job - while (cachedApiClients.length > 0) { - const cachedClient = cachedApiClients.pop(); - await disconnectClient(cachedClient); - } -}; +const resetApiClientListener = async () => instantiateClient(true); Signals.get('resetApiClient').add(resetApiClientListener); -if (!config.isUseHttpApi) { - refreshClientsCache(); // Initialize the client cache - - // Periodically ensure API client liveliness - setInterval(async () => { - const cacheRefreshStartTime = Date.now(); - await refreshClientsCache(); - logger.debug( - `Refreshed API client cache in ${Date.now() - cacheRefreshStartTime}ms. There are ${ - cachedApiClients.length - } API client(s) in the pool.`, - ); - }, CLIENT_ALIVE_ASSUMPTION_TIME); -} - module.exports = { TIMEOUT_REGEX, From 805bf9d8b6009a0c9d8fd8a6dedc657ea3b271f6 Mon Sep 17 00:00:00 2001 From: Sameer Kumar Subudhi Date: Thu, 4 Jan 2024 21:47:40 +0530 Subject: [PATCH 2/6] :zap: Implement apiClient pooling --- docker-compose.yml | 5 +- docker/example.env | 7 +- .../ROOT/pages/configuration/index.adoc | 24 +- ecosystem.config.js | 7 +- jenkins/docker-compose.nightly.yml | 4 +- services/blockchain-connector/README.md | 7 +- services/blockchain-connector/config.js | 9 +- .../blockchain-connector/shared/sdk/client.js | 226 ++++++++++++------ .../blockchain-connector/shared/sdk/events.js | 23 +- 9 files changed, 211 insertions(+), 101 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 984f72386d..89f55a779a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -153,8 +153,9 @@ services: - ENABLE_TESTING_MODE=${ENABLE_TESTING_MODE} - ENABLE_BLOCK_CACHING=${ENABLE_BLOCK_CACHING} - EXPIRY_IN_HOURS=${EXPIRY_IN_HOURS} - - CLIENT_ALIVE_ASSUMPTION_TIME=${CLIENT_ALIVE_ASSUMPTION_TIME} - - HEARTBEAT_ACK_MAX_WAIT_TIME=${HEARTBEAT_ACK_MAX_WAIT_TIME} + - CLIENT_POOL_SIZE=${CLIENT_POOL_SIZE} + - WS_SERVER_PING_INTERVAL=${WS_SERVER_PING_INTERVAL} + - WS_SERVER_PING_INTERVAL_BUFFER=${WS_SERVER_PING_INTERVAL_BUFFER} - ENDPOINT_INVOKE_MAX_RETRIES=${ENDPOINT_INVOKE_MAX_RETRIES} - ENDPOINT_INVOKE_RETRY_DELAY=${ENDPOINT_INVOKE_RETRY_DELAY} - CONNECTOR_EXIT_DELAY_IN_HOURS=${CONNECTOR_EXIT_DELAY_IN_HOURS} diff --git a/docker/example.env b/docker/example.env index 68983fc338..12f367ec66 100644 --- a/docker/example.env +++ b/docker/example.env @@ -36,10 +36,11 @@ # ==================================== # # GENESIS_BLOCK_URL='https://downloads.lisk.com/lisk/mainnet/genesis_block.json.tar.gz' -# CLIENT_ALIVE_ASSUMPTION_TIME=5000 -# HEARTBEAT_ACK_MAX_WAIT_TIME=1000 +# CLIENT_POOL_SIZE=10 +# WS_SERVER_PING_INTERVAL=3000 +# WS_SERVER_PING_INTERVAL_BUFFER=1000 # ENDPOINT_INVOKE_MAX_RETRIES=3 -# ENDPOINT_INVOKE_RETRY_DELAY=10 +# ENDPOINT_INVOKE_RETRY_DELAY=50 # CONNECTOR_EXIT_DELAY_IN_HOURS=0 # Moleculer jobs configuration diff --git a/docs/antora/modules/ROOT/pages/configuration/index.adoc b/docs/antora/modules/ROOT/pages/configuration/index.adoc index 08f882b511..c6813718ae 100644 --- a/docs/antora/modules/ROOT/pages/configuration/index.adoc +++ b/docs/antora/modules/ROOT/pages/configuration/index.adoc @@ -413,15 +413,23 @@ To disable it, set it to `false`. By default, it is set to `12` hours. | 12 -| `CLIENT_ALIVE_ASSUMPTION_TIME` +| `CLIENT_POOL_SIZE` | number -| Interval (in milliseconds) for which the WS API Client is assumed to be alive since the last ping/pong success check. -By default, it is set to `5000`. -| 5000 +| Number of active API clients to be maintained in the pool. +Only applicable when using the IPC or WS clients to connect with the Lisk node. +By default, it is set to `10`. +| 10 + +| `WS_SERVER_PING_INTERVAL` +| number +| Interval (in milliseconds) at which the WS server checks for liveliness of all the connected clients. +This should not be modified unless explicitly recommended by the development team. +By default, it is set to `3000`. +| 3000 -| `HEARTBEAT_ACK_MAX_WAIT_TIME` +| `WS_SERVER_PING_INTERVAL_BUFFER` | number -| Maximum time (in milliseconds) within which the checkIsClientAlive's algorithm expects a corresponding `pong` for the `ping` sent to the WS server. +| A conservative assumption of the latency (in milliseconds) for WS server ping checks to arrive at the client. By default, it is set to `1000`. | 1000 @@ -434,8 +442,8 @@ By default, it is set to `3`. | `ENDPOINT_INVOKE_RETRY_DELAY` | number | Delay (in milliseconds) between each endpoint invocation request retry. -By default, it is set to `10`. -| 10 +By default, it is set to `50`. +| 50 | `CONNECTOR_EXIT_DELAY_IN_HOURS` | number diff --git a/ecosystem.config.js b/ecosystem.config.js index 871a5a30f6..6613bcda43 100644 --- a/ecosystem.config.js +++ b/ecosystem.config.js @@ -129,10 +129,11 @@ module.exports = { // SERVICE_LOG_FILE: false, // DOCKER_HOST: 'local', // GENESIS_BLOCK_URL: 'https://downloads.lisk.com/lisk/mainnet/genesis_block.json.tar.gz', - // CLIENT_ALIVE_ASSUMPTION_TIME: 5 * 1000, - // HEARTBEAT_ACK_MAX_WAIT_TIME: 1000, + // CLIENT_POOL_SIZE: 10, + // WS_SERVER_PING_INTERVAL=3000, + // WS_SERVER_PING_INTERVAL_BUFFER=1000, // ENDPOINT_INVOKE_MAX_RETRIES: 3, - // ENDPOINT_INVOKE_RETRY_DELAY: 10, + // ENDPOINT_INVOKE_RETRY_DELAY: 50, // CONNECTOR_EXIT_DELAY_IN_HOURS: 0, // JOB_INTERVAL_CACHE_CLEANUP: 0, // JOB_SCHEDULE_CACHE_CLEANUP: '0 */12 * * *', diff --git a/jenkins/docker-compose.nightly.yml b/jenkins/docker-compose.nightly.yml index 186fb334a7..77822f171b 100644 --- a/jenkins/docker-compose.nightly.yml +++ b/jenkins/docker-compose.nightly.yml @@ -153,7 +153,9 @@ services: - ENABLE_TESTING_MODE=${ENABLE_TESTING_MODE} - ENABLE_BLOCK_CACHING=${ENABLE_BLOCK_CACHING} - EXPIRY_IN_HOURS=${EXPIRY_IN_HOURS} - - CLIENT_ALIVE_ASSUMPTION_TIME=${CLIENT_ALIVE_ASSUMPTION_TIME} + - CLIENT_POOL_SIZE=${CLIENT_POOL_SIZE} + - WS_SERVER_PING_INTERVAL=${WS_SERVER_PING_INTERVAL} + - WS_SERVER_PING_INTERVAL_BUFFER=${WS_SERVER_PING_INTERVAL_BUFFER} - ENDPOINT_INVOKE_MAX_RETRIES=${ENDPOINT_INVOKE_MAX_RETRIES} - ENDPOINT_INVOKE_RETRY_DELAY=${ENDPOINT_INVOKE_RETRY_DELAY} - CONNECTOR_EXIT_DELAY_IN_HOURS=${CONNECTOR_EXIT_DELAY_IN_HOURS} diff --git a/services/blockchain-connector/README.md b/services/blockchain-connector/README.md index dce73ac917..d49d5fc6d0 100644 --- a/services/blockchain-connector/README.md +++ b/services/blockchain-connector/README.md @@ -36,10 +36,11 @@ A list of the most commonly used environment variables is presented below: - `GEOIP_JSON`: URL of GeoIP server. - `ENABLE_BLOCK_CACHING`: Boolean flag to enable the block caching. Enabled by default. To disable, set it to `false`. - `EXPIRY_IN_HOURS`: Expiry time (in hours) for block cache. By default, it is set to 12. -- `CLIENT_ALIVE_ASSUMPTION_TIME`: Interval (in milliseconds) for which the WS API Client is assumed to be alive since the last ping/pong success check. By default, it is set to 5000. -- `HEARTBEAT_ACK_MAX_WAIT_TIME`: Maximum time (in milliseconds) within which the checkIsClientAlive's algorithm expects a corresponding `pong` for the `ping` sent to the WS server. By default, it is set to 1000. +- `CLIENT_POOL_SIZE`: Number of active API clients to be maintained in the pool. Only applicable when using the IPC or WS clients to connect with the Lisk node. By default, it is set to 10. +- `WS_SERVER_PING_INTERVAL`: Interval (in milliseconds) at which the WS server checks for liveliness of all the connected clients. This should not be modified unless explicitly recommended by the development team. By default, it is set to `3000`. +- `WS_SERVER_PING_INTERVAL_BUFFER`: A conservative assumption of the latency (in milliseconds) for WS server ping checks to arrive at the client. By default, it is set to `1000`. - `ENDPOINT_INVOKE_MAX_RETRIES`: Maximum number of endpoint invocation request retries to the node. By default, it is set to 3. -- `ENDPOINT_INVOKE_RETRY_DELAY`: Delay (in milliseconds) between each endpoint invocation request retry. By default, it is set to 10. +- `ENDPOINT_INVOKE_RETRY_DELAY`: Delay (in milliseconds) between each endpoint invocation request retry. By default, it is set to 50. - `CONNECTOR_EXIT_DELAY_IN_HOURS`: Delay (in hours) after which the blockchain-connector microservice exits. The service should restart automatically if deployed using Docker or PM2. To be removed eventually. To enable it, set it higher than `0`. By default, it is set to `0`. - `JOB_INTERVAL_CACHE_CLEANUP`: Job run interval to cleanup block cache. By default, it is set to 0. - `JOB_SCHEDULE_CACHE_CLEANUP`: Job run cron schedule to cleanup block cache. By default, it is set to run every 12 hours (`0 */12 * * *`). diff --git a/services/blockchain-connector/config.js b/services/blockchain-connector/config.js index c07b57864b..ad8864446f 100644 --- a/services/blockchain-connector/config.js +++ b/services/blockchain-connector/config.js @@ -116,12 +116,9 @@ config.job = { }; config.apiClient = { - wsServerPingInterval: 3 * 1000, // in millisecs - pingIntervalBuffer: 1000, // in millisecs - instantiation: { - maxWaitTime: Number(process.env.CLIENT_INSTANTIATION_MAX_WAIT_TIME) || 5 * 1000, // in millisecs - retryInterval: Number(process.env.CLIENT_INSTANTIATION_RETRY_INTERVAL) || 1, // in millisecs - }, + poolSize: Number(process.env.CLIENT_POOL_SIZE) || 10, + wsServerPingInterval: Number(process.env.WS_SERVER_PING_INTERVAL) || 3 * 1000, // in millisecs + pingIntervalBuffer: Number(process.env.WS_SERVER_PING_INTERVAL_BUFFER) || 1000, // in millisecs request: { maxRetries: Number(process.env.ENDPOINT_INVOKE_MAX_RETRIES) || 3, retryDelay: Number(process.env.ENDPOINT_INVOKE_RETRY_DELAY) || 50, // in millisecs diff --git a/services/blockchain-connector/shared/sdk/client.js b/services/blockchain-connector/shared/sdk/client.js index ac5d053e6d..583a7797b9 100644 --- a/services/blockchain-connector/shared/sdk/client.js +++ b/services/blockchain-connector/shared/sdk/client.js @@ -18,10 +18,12 @@ const { Signals, HTTP, Exceptions: { TimeoutException }, - Utils: { waitForIt }, + Utils: { isObject, waitForIt }, } = require('lisk-service-framework'); const { createWSClient, createIPCClient } = require('@liskhq/lisk-api-client'); +const crypto = require('crypto'); + const config = require('../../config'); const delay = require('../utils/delay'); @@ -32,98 +34,181 @@ const liskAddressWs = config.endpoints.liskWs; const liskAddressHttp = config.endpoints.liskHttp; // Constants -const HTTP_TIMEOUT_STATUS = 'ETIMEDOUT'; -const RPC_TIMEOUT_MESSAGE = 'Response not received in'; -const TIMEOUT_REGEX_STR = `(?:${HTTP_TIMEOUT_STATUS}|${RPC_TIMEOUT_MESSAGE})`; +const ERROR_CONN_REFUSED = 'ECONNREFUSED'; +const STATUS_HTTP_TIMEOUT = 'ETIMEDOUT'; +const MESSAGE_RPC_TIMEOUT = 'Response not received in'; +const TIMEOUT_REGEX_STR = `(?:${STATUS_HTTP_TIMEOUT}|${MESSAGE_RPC_TIMEOUT})`; const TIMEOUT_REGEX = new RegExp(TIMEOUT_REGEX_STR); -const RETRY_INTERVAL = config.apiClient.instantiation.retryInterval; -const MAX_INSTANTIATION_WAIT_TIME = config.apiClient.instantiation.maxWaitTime; +const MAX_CLIENT_POOL_SIZE = config.apiClient.poolSize; const NUM_REQUEST_RETRIES = config.apiClient.request.maxRetries; const ENDPOINT_INVOKE_RETRY_DELAY = config.apiClient.request.retryDelay; const WS_SERVER_PING_INTERVAL = config.apiClient.wsServerPingInterval; const WS_SERVER_PING_BUFFER = config.apiClient.pingIntervalBuffer; // In case the server is under stress +const WS_SERVER_PING_THRESHOLD = WS_SERVER_PING_INTERVAL + WS_SERVER_PING_BUFFER; // Caching -let clientCache; -let pingTimeout; -let lastPingAt; -let instantiationBeginTime; -let isInstantiating = false; +const clientPool = []; +const clientInstantiationStats = { + requests: 0, + success: 0, + fail: 0, +}; + +const checkIsClientAlive = client => client && client._channel && client._channel.isAlive; + +const pingListener = apiClient => { + if (!isObject(apiClient)) { + logger.warn(`apiClient is ${JSON.stringify(apiClient)}. Cannot register a pingListener.`); + return; + } -const pingListener = () => { const now = Date.now(); - clearTimeout(pingTimeout); - logger.trace(`Server ping received at ${now}.`); + logger.trace(`Client ${apiClient.poolIndex} received server ping at ${now}.`); + clearTimeout(apiClient.pingTimeout); - pingTimeout = setTimeout(() => { - const timeSinceLastPing = now - lastPingAt; + apiClient.pingTimeout = setTimeout(() => { // Do not reset if the ping was delayed and just received + const timeSinceLastPing = now - apiClient.lastPingAt; if (timeSinceLastPing) { - logger.warn(`No ping from server in ${timeSinceLastPing}ms (last ping: ${lastPingAt}).`); - clientCache._channel.isAlive = false; - Signals.get('resetApiClient').dispatch(); + logger.warn( + `No ping for client ${apiClient.poolIndex} from server in ${timeSinceLastPing}ms (last ping: ${apiClient.lastPingAt}).`, + ); + apiClient._channel.isAlive = false; + Signals.get('resetApiClient').dispatch(apiClient); } - }, WS_SERVER_PING_INTERVAL + WS_SERVER_PING_BUFFER); + }, WS_SERVER_PING_THRESHOLD); - lastPingAt = now; + apiClient.lastPingAt = now; }; -const checkIsClientAlive = () => - clientCache && clientCache._channel && clientCache._channel.isAlive; - -// eslint-disable-next-line consistent-return -const instantiateClient = async (isForceReInstantiate = false) => { +const instantiateNewClient = async () => { + clientInstantiationStats.requests++; try { - if (!isInstantiating || isForceReInstantiate) { - if (!checkIsClientAlive() || isForceReInstantiate) { - isInstantiating = true; - instantiationBeginTime = Date.now(); - if (clientCache) await clientCache.disconnect(); - - clientCache = config.isUseLiskIPCClient - ? await createIPCClient(config.liskAppDataPath) - : await (async () => { - const client = await createWSClient(`${liskAddressWs}/rpc-ws`); - client._channel._ws.on('ping', pingListener); - return client; - })(); - - if (isForceReInstantiate) logger.info('Re-instantiated the API client forcefully.'); - - // Inform listeners about the newly instantiated ApiClient - Signals.get('newApiClient').dispatch(); - - isInstantiating = false; - } - return clientCache; - } + const newClient = config.isUseLiskIPCClient + ? await createIPCClient(config.liskAppDataPath) + : await (async () => { + const client = await createWSClient(`${liskAddressWs}/rpc-ws`); + client._channel._ws.on('ping', pingListener.bind(null, client)); + return client; + })(); - if (Date.now() - instantiationBeginTime > MAX_INSTANTIATION_WAIT_TIME) { - // Waited too long, reset the flag to re-attempt client instantiation - isInstantiating = false; - } + clientInstantiationStats.success++; + return newClient; } catch (err) { - // Nullify the apiClient cache, so that it can be re-instantiated properly - clientCache = null; - + clientInstantiationStats.fail++; const errMessage = config.isUseLiskIPCClient - ? `Error instantiating IPC client at ${config.liskAppDataPath}.` - : `Error instantiating WS client to ${liskAddressWs}.`; + ? `Error instantiating IPC client at ${config.liskAppDataPath}` + : `Error instantiating WS client to ${liskAddressWs}`; logger.error(`${errMessage}: ${err.message}`); - if (err.message.includes('ECONNREFUSED')) { - throw new Error('ECONNREFUSED: Unable to reach a network node.'); + if (err.message.includes(ERROR_CONN_REFUSED)) { + throw new Error('Unable to connect to the node.'); } - return null; + throw err; } }; -const getApiClient = async () => { - const apiClient = await waitForIt(instantiateClient, RETRY_INTERVAL); - return checkIsClientAlive() ? apiClient : getApiClient(); +const initClientPool = async poolSize => { + // Set the intervals only at application init + if (clientPool.length === 0) { + setInterval(() => { + logger.info(`API client instantiation stats: ${JSON.stringify(clientInstantiationStats)}`); + }, 5 * 60 * 1000); + + setInterval(() => { + clientPool.forEach(async (apiClient, index) => { + if (isObject(apiClient)) return; + + // Re-instantiate when null + clientPool[index] = await instantiateNewClient() + .then(client => { + client.poolIndex = index; + return client; + }) + .catch(() => null); + }); + }, WS_SERVER_PING_INTERVAL); + } + + try { + const startTime = Date.now(); + for (let i = 0; i < poolSize; i++) { + // Do not instantiate new clients if enough clients already cached + if (clientPool.length >= poolSize) break; + + const newApiClient = await instantiateNewClient(); + newApiClient.poolIndex = clientPool.length; + clientPool.push(newApiClient); + } + logger.info( + `Initialized client pool in ${Date.now() - startTime}ms with ${clientPool.length} instances.`, + ); + } catch (err) { + logger.warn( + clientPool.length + ? `API client pool initialization failed due to: ${err.message}\nManaged to initialize the pool with only ${clientPool.length} instead of expected ${poolSize} clients.` + : `API client pool initialization failed due to: ${err.message}`, + ); + throw err; + } +}; + +const getApiClient = async poolIndex => { + if (!clientPool.length) await initClientPool(MAX_CLIENT_POOL_SIZE); + + const index = Number.isNaN(Number(poolIndex)) + ? crypto.randomInt(Math.min(clientPool.length, MAX_CLIENT_POOL_SIZE)) + : poolIndex; + + const apiClient = clientPool[index]; + return checkIsClientAlive(apiClient) + ? apiClient + : (() => { + if (apiClient) Signals.get('resetApiClient').dispatch(apiClient); + return waitForIt(getApiClient, 10); + })(); +}; + +const resetApiClient = async (apiClient, isEventSubscriptionClient = false) => { + // Replace the dead API client in the pool + if (!isObject(apiClient)) { + logger.warn(`apiClient is ${JSON.stringify(apiClient)}. Cannot reset.`); + return; + } + + // Do not attempt reset if last ping was within the acceptable threshold + if (Date.now() - (apiClient.lastPingAt || 0) < WS_SERVER_PING_THRESHOLD) { + return; + } + + const { poolIndex } = apiClient; + + if (isEventSubscriptionClient) { + logger.info(`Attempting to reset the eventSubscriptionClient: apiClient ${poolIndex}.`); + Signals.get('eventSubscriptionClientReset').dispatch(); + } else { + logger.info(`Attempting to reset apiClient ${poolIndex}.`); + } + + await apiClient + .disconnect() + .catch(err => logger.warn(`Error disconnecting apiClient: ${err.message}. Will proceed.`)); + + const newApiClient = await instantiateNewClient() + .then(client => { + client.poolIndex = poolIndex; + logger.info(`Successfully reset apiClient ${poolIndex}.`); + return client; + }) + .catch(() => null); + + clientPool[poolIndex] = newApiClient; + + if (newApiClient) Signals.get('newApiClient').dispatch(); }; +Signals.get('resetApiClient').add(resetApiClient); const is2XXResponse = response => String(response.status).startsWith('2'); const isSuccessResponse = response => is2XXResponse(response) && response.data.result; @@ -168,17 +253,17 @@ const invokeEndpoint = async (endpoint, params = {}, numRetries = NUM_REQUEST_RE if (TIMEOUT_REGEX.test(err.message)) { if (!retriesLeft) { const exceptionMsg = Object.getOwnPropertyNames(params).length - ? `Invocation timed out for '${endpoint}'.` - : `Invocation timed out for '${endpoint}' with params:\n${JSON.stringify(params)}.`; + ? `Invocation timed out for '${endpoint}' with params:\n${JSON.stringify(params)}.` + : `Invocation timed out for '${endpoint}'.`; throw new TimeoutException(exceptionMsg); } await delay(ENDPOINT_INVOKE_RETRY_DELAY); } else { logger.warn( - Object.getOwnPropertyNames(params).length === 0 - ? `Error invoking '${endpoint}':\n${err.stack}` - : `Error invoking '${endpoint}' with params:\n${JSON.stringify(params)}.\n${err.stack}`, + Object.getOwnPropertyNames(params).length + ? `Error invoking '${endpoint}' with params:\n${JSON.stringify(params)}.\n${err.stack}` + : `Error invoking '${endpoint}'.\n${err.stack}`, ); throw err; @@ -187,9 +272,6 @@ const invokeEndpoint = async (endpoint, params = {}, numRetries = NUM_REQUEST_RE } while (retriesLeft--); }; -const resetApiClientListener = async () => instantiateClient(true); -Signals.get('resetApiClient').add(resetApiClientListener); - module.exports = { TIMEOUT_REGEX, diff --git a/services/blockchain-connector/shared/sdk/events.js b/services/blockchain-connector/shared/sdk/events.js index 654a46fd5b..9be663fc2e 100644 --- a/services/blockchain-connector/shared/sdk/events.js +++ b/services/blockchain-connector/shared/sdk/events.js @@ -47,6 +47,7 @@ const events = [ EVENT_TX_POOL_TRANSACTION_NEW, ]; +let eventSubscribeClientPoolIndex; let eventsCounter; let lastBlockHeightEvent; @@ -89,10 +90,16 @@ const emitEngineEvents = async () => { const subscribeToAllRegisteredEvents = async () => { if (config.isUseHttpApi) return emitEngineEvents(); + // Active client subscription available, skip invocation + if (typeof eventSubscribeClientPoolIndex === 'number') return null; + // Reset eventsCounter first eventsCounter = 0; const apiClient = await getApiClient(); + eventSubscribeClientPoolIndex = apiClient.poolIndex; + logger.info(`Subscribing events with apiClient ${eventSubscribeClientPoolIndex}.`); + const registeredEvents = await getRegisteredEvents(); const allEvents = registeredEvents.concat(events); allEvents.forEach(event => { @@ -126,7 +133,7 @@ const ensureAPIClientLiveness = () => { if (config.isUseHttpApi) return; if (isNodeSynced && isGenesisBlockDownloaded) { - setInterval(() => { + setInterval(async () => { if (typeof eventsCounter === 'number' && eventsCounter > 0) { eventsCounter = 0; } else { @@ -141,8 +148,18 @@ const ensureAPIClientLiveness = () => { eventsCounter = 0; } - Signals.get('resetApiClient').dispatch(); - logger.info("Dispatched 'resetApiClient' signal to re-instantiate the API client."); + if (typeof eventSubscribeClientPoolIndex === 'number') { + const subscribedApiClient = await getApiClient(eventSubscribeClientPoolIndex); + Signals.get('resetApiClient').dispatch(subscribedApiClient, true); + logger.info( + `Dispatched 'resetApiClient' signal to re-instantiate the API client ${eventSubscribeClientPoolIndex}.`, + ); + + const eventSubscriptionClientResetListener = () => { + eventSubscribeClientPoolIndex = null; + }; + Signals.get('eventSubscriptionClientReset').add(eventSubscriptionClientResetListener); + } } }, config.clientConnVerifyInterval); } else { From 0b554e3c361aca4c1babcae43673e299c1198974 Mon Sep 17 00:00:00 2001 From: Sameer Kumar Subudhi Date: Fri, 5 Jan 2024 19:35:44 +0530 Subject: [PATCH 3/6] :books: Sanitize documentation --- .../ROOT/pages/configuration/index.adoc | 42 +++++++++---------- services/blockchain-app-registry/README.md | 4 +- services/blockchain-connector/README.md | 16 +++---- services/blockchain-coordinator/README.md | 2 +- services/blockchain-indexer/README.md | 26 ++++++------ services/export/README.md | 2 +- services/gateway/README.md | 12 +++--- services/market/README.md | 12 +++--- services/transaction-statistics/README.md | 4 +- 9 files changed, 60 insertions(+), 60 deletions(-) diff --git a/docs/antora/modules/ROOT/pages/configuration/index.adoc b/docs/antora/modules/ROOT/pages/configuration/index.adoc index c6813718ae..577d8daae8 100644 --- a/docs/antora/modules/ROOT/pages/configuration/index.adoc +++ b/docs/antora/modules/ROOT/pages/configuration/index.adoc @@ -282,13 +282,13 @@ By default, it is set to `0`. | `JOB_INTERVAL_UPDATE_READINESS_STATUS` | number | Job run interval to update the readiness status. -By default, it is set to 0. +By default, it is set to `0`. | 0 | `JOB_SCHEDULE_UPDATE_READINESS_STATUS` | string | Job run cron schedule to update the readiness status. -By default, it is set to run every minute. +By default, it is set to run `every` minute. | * * * * * |=== @@ -429,7 +429,7 @@ By default, it is set to `3000`. | `WS_SERVER_PING_INTERVAL_BUFFER` | number -| A conservative assumption of the latency (in milliseconds) for WS server ping checks to arrive at the client. +| A conservative assumption of the latency (in milliseconds) for WS server pings to arrive at the client. By default, it is set to `1000`. | 1000 @@ -456,13 +456,13 @@ By default, it is set to `0`. | `JOB_INTERVAL_CACHE_CLEANUP` | number | Job run interval to clean up block cache. -By default, it is set to 0. +By default, it is set to `0`. | 0 | `JOB_SCHEDULE_CACHE_CLEANUP` | string | Job run cron schedule to clean up block cache. -By default, it is set to run every 12 hours. +By default, it is set to run every `12` hours. |0 */12 * * * | `JOB_INTERVAL_REFRESH_PEERS` @@ -551,7 +551,7 @@ By default, it is set to `0`. | `JOB_SCHEDULE_INDEX_MISSING_BLOCKS` | string | Job run cron schedule to index missing blocks. -By default, it is set to run every 5 minutes. +By default, it is set to run every `5` minutes. | */5 * * * * |=== @@ -657,7 +657,7 @@ By default, it is set to `0`. | `JOB_SCHEDULE_DELETE_SERIALIZED_EVENTS` | string | Job run cron schedule to delete serialized events. -By default, it is set to run every 5 minutes. +By default, it is set to run every `5` minutes. | */5 * * * * | `JOB_INTERVAL_REFRESH_VALIDATORS` @@ -669,7 +669,7 @@ By default, it is set to `0`. | `JOB_SCHEDULE_REFRESH_VALIDATORS` | string | Job run cron schedule to refresh validators cache. -By default, it is set to run every 5 minutes. +By default, it is set to run every `5` minutes. | */5 * * * * | `JOB_INTERVAL_VALIDATE_VALIDATORS_RANK` @@ -705,7 +705,7 @@ By default, it is set to `0`. | `JOB_SCHEDULE_REFRESH_BLOCKCHAIN_APPS_STATS` | string | Job run cron schedule to refresh blockchain application statistics. -By default, it is set to run every 15 minutes. +By default, it is set to run every `15` minutes. | */15 * * * * | `JOB_INTERVAL_REFRESH_ACCOUNT_KNOWLEDGE` @@ -717,7 +717,7 @@ By default, it is set to `0`. | `JOB_SCHEDULE_REFRESH_ACCOUNT_KNOWLEDGE` | string | Job run cron schedule to refresh account knowledge. -By default, it is set to run every 15 minutes. +By default, it is set to run every `15` minutes. | */15 * * * * | `JOB_INTERVAL_DELETE_FINALIZED_CCU_METADATA` @@ -729,7 +729,7 @@ By default, it is set to `0`. | `JOB_SCHEDULE_DELETE_FINALIZED_CCU_METADATA` | string | Job run cron schedule to delete finalized CCU metadata. -By default, it is set to run once a day at 02:00 am. +By default, it is set to run once a day at `02:00 am`. | 0 2 * * * | `JOB_INTERVAL_TRIGGER_ACCOUNT_UPDATES` @@ -741,7 +741,7 @@ By default, it is set to `0`. | `JOB_SCHEDULE_TRIGGER_ACCOUNT_UPDATES` | string | Job run cron schedule to trigger account updates. -By default, it is set to run every 15 minutes. +By default, it is set to run every `15` minutes. | */15 * * * * | `ESTIMATES_BUFFER_BYTES_LENGTH` @@ -797,7 +797,7 @@ To accelerate the indexing process, the blockchain-indexer microservice also sup |`DURABILITY_VERIFY_FREQUENCY` |number -|Frequency in milliseconds to verify if a block is indexed or rolled-back successfully. By default, it is set to 1. +|Frequency in milliseconds to verify if a block is indexed or rolled-back successfully. By default, it is set to `1`. |1 |`INDEX_SNAPSHOT_URL` @@ -916,7 +916,7 @@ By default, it is set to `0`. | `JOB_SCHEDULE_DELETE_NON_METADATA_FILES` | string | Job run cron schedule to delete non-metadata files. -By default, it is set to run every day at midnight. +By default, it is set to run every day at `midnight`. | 0 0 * * * | `JOB_INTERVAL_UPDATE_METADATA` @@ -1119,7 +1119,7 @@ By default, it is set to `0`. | `JOB_SCHEDULE_REFRESH_TRANSACTION_STATS` | string | Job run cron schedule to refresh transaction statistics. -By default, it is set to run every 30 minutes. +By default, it is set to run every `30` minutes. | */30 * * * * | `JOB_INTERVAL_VERIFY_TRANSACTION_STATS` @@ -1131,7 +1131,7 @@ By default, it is set to `0`. | `JOB_SCHEDULE_VERIFY_TRANSACTION_STATS` | string |Job run cron schedule to verify if the transaction statistics have been built correctly. -By default, it is set to run every 3rd hour after the first `15` minutes. +By default, it is set to run every `3rd hour` after the first `15` minutes. | 15 */3 * * * |=== @@ -1220,7 +1220,7 @@ By default, it is set to `0`. | `JOB_SCHEDULE_REFRESH_PRICES_BINANCE` | string | Job run cron schedule to refresh prices from Binance. -By default, it is set to run every minute. +By default, it is set to run `every` minute. |* * * * * | `JOB_INTERVAL_REFRESH_PRICES_BITTREX` @@ -1232,7 +1232,7 @@ By default, it is set to `0`. | `JOB_SCHEDULE_REFRESH_PRICES_BITTREX` | string | Job run cron schedule to refresh prices from Bittrex. -By default, it is set to run every minute. +By default, it is set to run `every` minute. |* * * * * | `JOB_INTERVAL_REFRESH_PRICES_EXCHANGERATESAPI` @@ -1244,7 +1244,7 @@ By default, it is set to `0`. | `JOB_SCHEDULE_REFRESH_PRICES_EXCHANGERATESAPI` | string | Job run cron schedule to refresh prices from exchangeratesapi. -By default, it is set to run every minute. +By default, it is set to run `every` minute. |* * * * * | `JOB_INTERVAL_REFRESH_PRICES_KRAKEN` @@ -1256,7 +1256,7 @@ By default, it is set to `0`. | `JOB_SCHEDULE_REFRESH_PRICES_KRAKEN` | string | Job run cron schedule to refresh prices from Kraken. -By default, it is set to run every minute. +By default, it is set to run `every` minute. |* * * * * | `JOB_INTERVAL_UPDATE_PRICES` @@ -1407,7 +1407,7 @@ By default, it is set to `0`. | `JOB_SCHEDULE_CACHE_PURGE` | string | Job run cron schedule to clean up the cache. -By default, it is set to run daily at 04:45 am. +By default, it is set to run daily at `04:45 am`. | 45 4 * * * |=== diff --git a/services/blockchain-app-registry/README.md b/services/blockchain-app-registry/README.md index 06ee290bbe..b92bcb4264 100644 --- a/services/blockchain-app-registry/README.md +++ b/services/blockchain-app-registry/README.md @@ -30,9 +30,9 @@ A list of the most commonly used environment variables is presented below: - `SERVICE_APP_REGISTRY_MYSQL`: Connection string of the MySQL instance that the microservice connects to. - `ENABLE_REBUILD_INDEX_AT_INIT`: Boolean flag to truncate the index and rebuild at application init. - `DEFAULT_APPS`: Default blockchain applications. By default, it is set to `lisk_mainchain`. -- `JOB_INTERVAL_DELETE_NON_METADATA_FILES`: Job run interval to delete non-metadata files. By default, it is set to 0. +- `JOB_INTERVAL_DELETE_NON_METADATA_FILES`: Job run interval to delete non-metadata files. By default, it is set to `0`. - `JOB_SCHEDULE_DELETE_NON_METADATA_FILES`: Job run cron schedule to delete non-metadata files. By default, it is set to run every day at midnight (`0 0 * * *`). -- `JOB_INTERVAL_UPDATE_METADATA`: Job run interval to update off-chain metadata. By default, it is set to 0. +- `JOB_INTERVAL_UPDATE_METADATA`: Job run interval to update off-chain metadata. By default, it is set to `0`. - `JOB_SCHEDULE_UPDATE_METADATA`: Job run cron schedule to update off-chain metadata. By default, it is set to run every 10 minutes (`*/10 * * * *`). > **Note**: `interval` takes priority over `schedule` and must be greater than 0 to be valid for all the moleculer job configurations. diff --git a/services/blockchain-connector/README.md b/services/blockchain-connector/README.md index d49d5fc6d0..ce074adf15 100644 --- a/services/blockchain-connector/README.md +++ b/services/blockchain-connector/README.md @@ -35,17 +35,17 @@ A list of the most commonly used environment variables is presented below: - `GENESIS_BLOCK_URL`: URL of the Lisk SDK-based application' genesis block. Only to be used when the genesis block is large enough to be transmitted over API calls within the timeout. - `GEOIP_JSON`: URL of GeoIP server. - `ENABLE_BLOCK_CACHING`: Boolean flag to enable the block caching. Enabled by default. To disable, set it to `false`. -- `EXPIRY_IN_HOURS`: Expiry time (in hours) for block cache. By default, it is set to 12. -- `CLIENT_POOL_SIZE`: Number of active API clients to be maintained in the pool. Only applicable when using the IPC or WS clients to connect with the Lisk node. By default, it is set to 10. +- `EXPIRY_IN_HOURS`: Expiry time (in hours) for block cache. By default, it is set to `12`. +- `CLIENT_POOL_SIZE`: Number of active API clients to be maintained in the pool. Only applicable when using the IPC or WS clients to connect with the Lisk node. By default, it is set to `10`. - `WS_SERVER_PING_INTERVAL`: Interval (in milliseconds) at which the WS server checks for liveliness of all the connected clients. This should not be modified unless explicitly recommended by the development team. By default, it is set to `3000`. -- `WS_SERVER_PING_INTERVAL_BUFFER`: A conservative assumption of the latency (in milliseconds) for WS server ping checks to arrive at the client. By default, it is set to `1000`. -- `ENDPOINT_INVOKE_MAX_RETRIES`: Maximum number of endpoint invocation request retries to the node. By default, it is set to 3. -- `ENDPOINT_INVOKE_RETRY_DELAY`: Delay (in milliseconds) between each endpoint invocation request retry. By default, it is set to 50. +- `WS_SERVER_PING_INTERVAL_BUFFER`: A conservative assumption of the latency (in milliseconds) for WS server pings to arrive at the client. By default, it is set to `1000`. +- `ENDPOINT_INVOKE_MAX_RETRIES`: Maximum number of endpoint invocation request retries to the node. By default, it is set to `3`. +- `ENDPOINT_INVOKE_RETRY_DELAY`: Delay (in milliseconds) between each endpoint invocation request retry. By default, it is set to `50`. - `CONNECTOR_EXIT_DELAY_IN_HOURS`: Delay (in hours) after which the blockchain-connector microservice exits. The service should restart automatically if deployed using Docker or PM2. To be removed eventually. To enable it, set it higher than `0`. By default, it is set to `0`. -- `JOB_INTERVAL_CACHE_CLEANUP`: Job run interval to cleanup block cache. By default, it is set to 0. +- `JOB_INTERVAL_CACHE_CLEANUP`: Job run interval to cleanup block cache. By default, it is set to `0`. - `JOB_SCHEDULE_CACHE_CLEANUP`: Job run cron schedule to cleanup block cache. By default, it is set to run every 12 hours (`0 */12 * * *`). -- `JOB_INTERVAL_REFRESH_PEERS`: Job run interval to refresh the peers list. By default, it is set to run every 60 seconds. -- `JOB_SCHEDULE_REFRESH_PEERS`: Job run cron schedule to refresh the peers list. By default, it is set to ''. +- `JOB_INTERVAL_REFRESH_PEERS`: Job run interval to refresh the peers list. By default, it is set to run every `60` seconds. +- `JOB_SCHEDULE_REFRESH_PEERS`: Job run cron schedule to refresh the peers list. By default, it is set to `''`. > **Note**: `interval` takes priority over `schedule` and must be greater than 0 to be valid for all the moleculer job configurations. diff --git a/services/blockchain-coordinator/README.md b/services/blockchain-coordinator/README.md index 1649646b23..74ac1a7329 100644 --- a/services/blockchain-coordinator/README.md +++ b/services/blockchain-coordinator/README.md @@ -28,7 +28,7 @@ A list of the most commonly used environment variables is presented below: - `SERVICE_BROKER`: URL of the microservice message broker (NATS or Redis). - `SERVICE_MESSAGE_QUEUE_REDIS`: URL of the job queue to schedule the indexing jobs (Redis). -- `JOB_INTERVAL_INDEX_MISSING_BLOCKS`: Job run interval to index missing blocks. By default, it is set to 0. +- `JOB_INTERVAL_INDEX_MISSING_BLOCKS`: Job run interval to index missing blocks. By default, it is set to `0`. - `JOB_SCHEDULE_INDEX_MISSING_BLOCKS`: Job run cron schedule to index missing blocks. By default, it is set to run every 5 minutes (`*/5 * * * *`). > **Note**: `interval` takes priority over `schedule` and must be greater than 0 to be valid for all the moleculer job configurations. diff --git a/services/blockchain-indexer/README.md b/services/blockchain-indexer/README.md index 03f6dfd69e..0d2497641d 100644 --- a/services/blockchain-indexer/README.md +++ b/services/blockchain-indexer/README.md @@ -40,30 +40,30 @@ A list of the most commonly used environment variables is presented below: - `ENABLE_INDEXING_MODE`: Boolean flag to enable the Data Indexing mode. - `ENABLE_PERSIST_EVENTS`: Boolean flag to permanently maintain the events in the MySQL database. - `ENABLE_APPLY_SNAPSHOT`: Boolean flag to enable initialization of the index with the Lisk Service DB snapshot. -- `DURABILITY_VERIFY_FREQUENCY`: Frequency in milliseconds to verify if a block is indexed or rolled-back successfully. By default, it is set to 1. +- `DURABILITY_VERIFY_FREQUENCY`: Frequency in milliseconds to verify if a block is indexed or rolled-back successfully. By default, it is set to `1`. - `INDEX_SNAPSHOT_URL`: URL from where the Lisk Service DB snapshot will be downloaded. - `ENABLE_SNAPSHOT_ALLOW_INSECURE_HTTP`: Boolean flag to enable downloading snapshot from an (unsecured) HTTP URL. - `LISK_STATIC`: URL of Lisk static assets. - `SERVICE_INDEXER_CACHE_REDIS`: URL of the cache storage (Redis). -- `ACCOUNT_BALANCE_UPDATE_BATCH_SIZE`: Number of accounts for which the balance index is updated at a time. By default, it is set to 1000. -- `INDEX_BLOCKS_QUEUE_SCHEDULED_JOB_MAX_COUNT`: Maximum number of jobs (in active and waiting state) allowed in the block indexing queue. By default, it is set to 100000. -- `JOB_INTERVAL_DELETE_SERIALIZED_EVENTS`: Job run interval to delete serialized events. By default, it is set to 0. +- `ACCOUNT_BALANCE_UPDATE_BATCH_SIZE`: Number of accounts for which the balance index is updated at a time. By default, it is set to `1000`. +- `INDEX_BLOCKS_QUEUE_SCHEDULED_JOB_MAX_COUNT`: Maximum number of jobs (in active and waiting state) allowed in the block indexing queue. By default, it is set to `100000`. +- `JOB_INTERVAL_DELETE_SERIALIZED_EVENTS`: Job run interval to delete serialized events. By default, it is set to `0`. - `JOB_SCHEDULE_DELETE_SERIALIZED_EVENTS`: Job run cron schedule to delete serialized events. By default, it is set to run every 5th minute (`*/5 * * * *`). -- `JOB_INTERVAL_REFRESH_VALIDATORS`: Job run interval to refresh validators cache. By default, it is set to 0. +- `JOB_INTERVAL_REFRESH_VALIDATORS`: Job run interval to refresh validators cache. By default, it is set to `0`. - `JOB_SCHEDULE_REFRESH_VALIDATORS`: Job run cron schedule to refresh validators cache. By default, it is set to run every 5th minute (`*/5 * * * *`). -- `JOB_INTERVAL_VALIDATE_VALIDATORS_RANK`: Job run interval to validate the rank for all the validators. By default, it is set to 0. +- `JOB_INTERVAL_VALIDATE_VALIDATORS_RANK`: Job run interval to validate the rank for all the validators. By default, it is set to `0`. - `JOB_SCHEDULE_VALIDATE_VALIDATORS_RANK`: Job run cron schedule to validate the rank for all the validators. By default, it is set to run every 15 minutes, and starts at 4 minutes past the hour (`4-59/15 * * * *`). -- `JOB_INTERVAL_REFRESH_INDEX_STATUS`: Job run interval to refresh indexing status. By default, it is set to run every 10 seconds. -- `JOB_SCHEDULE_REFRESH_INDEX_STATUS`: Job run cron schedule to refresh indexing status. By default, it is set to ''. -- `JOB_INTERVAL_REFRESH_BLOCKCHAIN_APPS_STATS`: Job run interval to refresh blockchain application statistics. By default, it is set to 0. +- `JOB_INTERVAL_REFRESH_INDEX_STATUS`: Job run interval to refresh indexing status. By default, it is set to run every `10` seconds. +- `JOB_SCHEDULE_REFRESH_INDEX_STATUS`: Job run cron schedule to refresh indexing status. By default, it is set to `''`. +- `JOB_INTERVAL_REFRESH_BLOCKCHAIN_APPS_STATS`: Job run interval to refresh blockchain application statistics. By default, it is set to `0`. - `JOB_SCHEDULE_REFRESH_BLOCKCHAIN_APPS_STATS`: Job run cron schedule to refresh blockchain application statistics. By default, it is set to run every 15th minute (`*/15 * * * *`). -- `JOB_INTERVAL_REFRESH_ACCOUNT_KNOWLEDGE`: Job run interval to refresh account knowledge. By default, it is set to 0. +- `JOB_INTERVAL_REFRESH_ACCOUNT_KNOWLEDGE`: Job run interval to refresh account knowledge. By default, it is set to `0`. - `JOB_SCHEDULE_REFRESH_ACCOUNT_KNOWLEDGE`: Job run cron schedule to refresh account knowledge. By default, it is set to run every 15th minute (`*/15 * * * *`). -- `JOB_INTERVAL_DELETE_FINALIZED_CCU_METADATA`: Job run interval to delete finalized CCU metadata. By default, it is set to 0. +- `JOB_INTERVAL_DELETE_FINALIZED_CCU_METADATA`: Job run interval to delete finalized CCU metadata. By default, it is set to `0`. - `JOB_SCHEDULE_DELETE_FINALIZED_CCU_METADATA`: Job run cron schedule to delete finalized CCU metadata. By default, it is set to run once a day at 02:00 am (`0 2 * * *`). -- `JOB_INTERVAL_TRIGGER_ACCOUNT_UPDATES`: Job run interval to trigger account updates. By default, it is set to 0. +- `JOB_INTERVAL_TRIGGER_ACCOUNT_UPDATES`: Job run interval to trigger account updates. By default, it is set to `0`. - `JOB_SCHEDULE_TRIGGER_ACCOUNT_UPDATES`: Job run cron schedule to trigger account updates. By default, it is set to run every 15th minute (`*/15 * * * *`). -- `ESTIMATES_BUFFER_BYTES_LENGTH`: Transaction buffer bytes to consider when estimating the transaction fees. By default, it is set to 0. +- `ESTIMATES_BUFFER_BYTES_LENGTH`: Transaction buffer bytes to consider when estimating the transaction fees. By default, it is set to `0`. - `MAINCHAIN_SERVICE_URL`: Mainchain service URL for custom deployments. - `INVOKE_ALLOWED_METHODS`: List of allowed methods that can be invoked via the `/invoke` API endpoint. The list can be expressed as a CSV. To allow invocation of all endpoints, set it to `*`. To allow invocation of all the registered methods under the specified namespaces, set it similar to `legacy,chain`. To allow invocation of specific methods, set it similar to `chain_getBlocksByIDs,chain_getBlocksByHeight`. diff --git a/services/export/README.md b/services/export/README.md index 96a3d9090f..26ebc3f162 100644 --- a/services/export/README.md +++ b/services/export/README.md @@ -27,7 +27,7 @@ A list of the most commonly used environment variables is presented below: - `SERVICE_BROKER`: URL of the microservice message broker (NATS or Redis). - `SERVICE_EXPORT_REDIS`: URL of the permanent cache storage (Redis). - `SERVICE_EXPORT_REDIS_VOLATILE`: URL of the volatile cache storage (Redis). -- `JOB_INTERVAL_CACHE_PURGE`: Job run interval to cleanup cache. By default, it is set to 0. +- `JOB_INTERVAL_CACHE_PURGE`: Job run interval to cleanup cache. By default, it is set to `0`. - `JOB_SCHEDULE_CACHE_PURGE`: Job run cron schedule to cleanup cache. By default, it is set to run daily at 04:45 am (`45 4 * * *`). - `EXPORT_S3_ENDPOINT`: Amazon S3 bucket endpoint. - `EXPORT_S3_ACCESS_KEY`: Amazon S3 bucket access key for the specified endpoint. diff --git a/services/gateway/README.md b/services/gateway/README.md index cc163c3509..07b91dc314 100644 --- a/services/gateway/README.md +++ b/services/gateway/README.md @@ -33,21 +33,21 @@ A list of the most commonly used environment variables is presented below: - `CORS_ALLOWED_ORIGIN`: Allows request from the comma separated string of origins. By default, it is set to `*` which allows request from all origins. - `SERVICE_GATEWAY_REDIS_VOLATILE`: URL of the volatile cache storage (Redis). - `ENABLE_REVERSE_PROXY_TIMEOUT_SETTINGS`: Boolean flag to enforce the `headersTimeout` and `keepAliveTimeout` settings on the API server. Enabling this might be helpful when deploying Lisk Service behind a load balancer or a reverse proxy. Check [this FAQ](https://moleculer.services/docs/0.14/faq.html#Why-am-I-getting-502-Bad-Gateway-when-api-gateway-is-behind-ALB-on-AWS) for more information. -- `HTTP_KEEP_ALIVE_TIMEOUT`: Defines the number of microseconds the gateway will wait before closing an idle connection. To enable, ensure `ENABLE_REVERSE_PROXY_TIMEOUT_SETTINGS` is set to true, or `HTTP_RATE_LIMIT_NUM_KNOWN_PROXIES` is set to greater than 0. By default, it is set to 65000. -- `HTTP_HEADERS_TIMEOUT`: Defines the maximum number of microseconds for the gateway to send HTTP response headers after the client's request. To enable, ensure `ENABLE_REVERSE_PROXY_TIMEOUT_SETTINGS` is set to true, or `HTTP_RATE_LIMIT_NUM_KNOWN_PROXIES` is set to greater than 0. Please ensure that the `HTTP_HEADERS_TIMEOUT` is set higher than the `HTTP_KEEP_ALIVE_TIMEOUT`. By default, it is set to 66000. +- `HTTP_KEEP_ALIVE_TIMEOUT`: Defines the number of microseconds the gateway will wait before closing an idle connection. To enable, ensure `ENABLE_REVERSE_PROXY_TIMEOUT_SETTINGS` is set to true, or `HTTP_RATE_LIMIT_NUM_KNOWN_PROXIES` is set to greater than 0. By default, it is set to `65000`. +- `HTTP_HEADERS_TIMEOUT`: Defines the maximum number of microseconds for the gateway to send HTTP response headers after the client's request. To enable, ensure `ENABLE_REVERSE_PROXY_TIMEOUT_SETTINGS` is set to true, or `HTTP_RATE_LIMIT_NUM_KNOWN_PROXIES` is set to greater than 0. Please ensure that the `HTTP_HEADERS_TIMEOUT` is set higher than the `HTTP_KEEP_ALIVE_TIMEOUT`. By default, it is set to `66000`. - `HTTP_RATE_LIMIT_ENABLE`: Boolean flag to enable HTTP rate limiter. Disabled by default. To enable, set it to `true`. -- `HTTP_RATE_LIMIT_WINDOW`: To keep a record of requests in the memory (in seconds). By default, it is set to 10 seconds. +- `HTTP_RATE_LIMIT_WINDOW`: To keep a record of requests in the memory (in seconds). By default, it is set to `10` seconds. - `HTTP_RATE_LIMIT_CONNECTIONS`: Maximum number of allowed requests for a specified window, configured by `HTTP_RATE_LIMIT_WINDOW`. - `HTTP_RATE_LIMIT_ENABLE_X_FORWARDED_FOR`: When set to true, the rate limiting algorithm considers the X-Forwarded-For header value to determine the client's IP address for rate limiting purposes. By default, it is set to `false`. -- `HTTP_RATE_LIMIT_NUM_KNOWN_PROXIES`: Defines the number of proxies that exist between the gateway and the external client application, enabling accurate identification of the client's IP address for rate limiting. Requires `HTTP_RATE_LIMIT_ENABLE_X_FORWARDED_FOR` to be enabled. By default, it is set to 0. +- `HTTP_RATE_LIMIT_NUM_KNOWN_PROXIES`: Defines the number of proxies that exist between the gateway and the external client application, enabling accurate identification of the client's IP address for rate limiting. Requires `HTTP_RATE_LIMIT_ENABLE_X_FORWARDED_FOR` to be enabled. By default, it is set to `0`. - `ENABLE_HTTP_CACHE_CONTROL`: Boolean flag to enable HTTP response caching. This includes the `Cache-Control` header within the responses. - `HTTP_CACHE_CONTROL_DIRECTIVES`: HTTP `cache-control` directives. - `WS_RATE_LIMIT_ENABLE`: Boolean flag to enable WS rate limiter. - `WS_RATE_LIMIT_CONNECTIONS`: Maximum number of new connections allowed within a configured window, specified by `WS_RATE_LIMIT_DURATION`. -- `WS_RATE_LIMIT_DURATION`: Rate limit window in seconds. By default, it is set to 1. +- `WS_RATE_LIMIT_DURATION`: Rate limit window in seconds. By default, it is set to `1`. - `ENABLE_REQUEST_CACHING`: Boolean flag to enable RPC response caching. - `GATEWAY_DEPENDENCIES`: Services on which the gateway is dependent (can be expressed as a CSV). -- `JOB_INTERVAL_UPDATE_READINESS_STATUS`: Job run interval to update the readiness status. By default, it is set to 0. +- `JOB_INTERVAL_UPDATE_READINESS_STATUS`: Job run interval to update the readiness status. By default, it is set to `0`. - `JOB_SCHEDULE_UPDATE_READINESS_STATUS`: Job run cron schedule to update the readiness status. By default, it is set to run every minute (`* * * * *`). > **Note**: `interval` takes priority over `schedule` and must be greater than 0 to be valid for all the moleculer job configurations. diff --git a/services/market/README.md b/services/market/README.md index 6e50bac2de..f5608c1e8c 100644 --- a/services/market/README.md +++ b/services/market/README.md @@ -31,16 +31,16 @@ A list of the most commonly used environment variables is presented below: - `EXCHANGERATESAPI_IO_API_KEY`: Access key to fetch data from the exchangeratesapi.io API. - `SERVICE_MARKET_FIAT_CURRENCIES`: Supported fiat currencies. - `SERVICE_MARKET_TARGET_PAIRS`: Supported target pairs. -- `JOB_INTERVAL_REFRESH_PRICES_BINANCE`: Job run interval to refresh prices from Binance. By default, it is set to 0. +- `JOB_INTERVAL_REFRESH_PRICES_BINANCE`: Job run interval to refresh prices from Binance. By default, it is set to `0`. - `JOB_SCHEDULE_REFRESH_PRICES_BINANCE`: Job run cron schedule to refresh prices from Binance. By default, it is set to run every minute (`* * * * *`). -- `JOB_INTERVAL_REFRESH_PRICES_BITTREX`: Job run interval to refresh prices from Bittrex. By default, it is set to 0. +- `JOB_INTERVAL_REFRESH_PRICES_BITTREX`: Job run interval to refresh prices from Bittrex. By default, it is set to `0`. - `JOB_SCHEDULE_REFRESH_PRICES_BITTREX`: Job run cron schedule to refresh prices from Bittrex. By default, it is set to run every minute (`* * * * *`). -- `JOB_INTERVAL_REFRESH_PRICES_EXCHANGERATESAPI`: Job run interval to refresh prices from exchangeratesapi. By default, it is set to 0. +- `JOB_INTERVAL_REFRESH_PRICES_EXCHANGERATESAPI`: Job run interval to refresh prices from exchangeratesapi. By default, it is set to `0`. - `JOB_SCHEDULE_REFRESH_PRICES_EXCHANGERATESAPI`: Job run cron schedule to refresh prices from exchangeratesapi. By default, it is set to run every minute (`* * * * *`). -- `JOB_INTERVAL_REFRESH_PRICES_KRAKEN`: Job run interval to refresh prices from Kraken. By default, it is set to 0. +- `JOB_INTERVAL_REFRESH_PRICES_KRAKEN`: Job run interval to refresh prices from Kraken. By default, it is set to `0`. - `JOB_SCHEDULE_REFRESH_PRICES_KRAKEN`: Job run cron schedule to refresh prices from Kraken. By default, it is set to run every minute (`* * * * *`). -- `JOB_INTERVAL_UPDATE_PRICES`: Job run interval to update market prices. By default, it is set to run every 5 seconds. -- `JOB_SCHEDULE_UPDATE_PRICES`: Job run cron schedule to update market prices. By default, it is set to ''. +- `JOB_INTERVAL_UPDATE_PRICES`: Job run interval to update market prices. By default, it is set to run every `5` seconds. +- `JOB_SCHEDULE_UPDATE_PRICES`: Job run cron schedule to update market prices. By default, it is set to `''`. > **Note**: `interval` takes priority over `schedule` and must be greater than 0 to be valid for all the moleculer job configurations. diff --git a/services/transaction-statistics/README.md b/services/transaction-statistics/README.md index 3d355a734f..1287cc5d1e 100644 --- a/services/transaction-statistics/README.md +++ b/services/transaction-statistics/README.md @@ -31,9 +31,9 @@ A list of the most commonly used environment variables is presented below: - `SERVICE_STATISTICS_MYSQL_READ_REPLICA`: Connection string of the (read only) replicated MySQL instance that the microservice connects to. - `SERVICE_STATISTICS_REDIS`: URL of the cache storage (Redis). - `TRANSACTION_STATS_HISTORY_LENGTH_DAYS`: The number of days for which the transaction statistics need to be built in retrospect to the application init. -- `JOB_INTERVAL_REFRESH_TRANSACTION_STATS`: Job run interval to refresh transaction statistics. By default, it is set to 0. +- `JOB_INTERVAL_REFRESH_TRANSACTION_STATS`: Job run interval to refresh transaction statistics. By default, it is set to `0`. - `JOB_SCHEDULE_REFRESH_TRANSACTION_STATS`: Job run cron schedule to refresh transaction statistics. By default, it is set to run every 30th minute (`*/30 * * * *`). -- `JOB_INTERVAL_VERIFY_TRANSACTION_STATS`: Job run interval to verify if the transaction statistics have been built correctly. By default, it is set to 0. +- `JOB_INTERVAL_VERIFY_TRANSACTION_STATS`: Job run interval to verify if the transaction statistics have been built correctly. By default, it is set to `0`. - `JOB_SCHEDULE_VERIFY_TRANSACTION_STATS`: Job run cron schedule to verify if the transaction statistics have been built correctly. By default, it is set to run every 3rd hour after the first 15 minutes (`15 */3 * * *`). > **Note**: `interval` takes priority over `schedule` and must be greater than 0 to be valid for all the moleculer job configurations. From f208bdff2fcd4dcfe65ebb1e2475b01546938043 Mon Sep 17 00:00:00 2001 From: Sameer Kumar Subudhi Date: Fri, 5 Jan 2024 20:42:28 +0530 Subject: [PATCH 4/6] :zap: Improve log --- services/blockchain-connector/shared/sdk/client.js | 8 +++++++- services/blockchain-connector/shared/utils/download.js | 7 ++++--- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/services/blockchain-connector/shared/sdk/client.js b/services/blockchain-connector/shared/sdk/client.js index 583a7797b9..2e9b970048 100644 --- a/services/blockchain-connector/shared/sdk/client.js +++ b/services/blockchain-connector/shared/sdk/client.js @@ -55,6 +55,12 @@ const clientInstantiationStats = { fail: 0, }; +const getApiClientStats = () => ({ + ...clientInstantiationStats, + currentPoolSize: clientPool.length, + expectedPoolSize: MAX_CLIENT_POOL_SIZE, +}); + const checkIsClientAlive = client => client && client._channel && client._channel.isAlive; const pingListener = apiClient => { @@ -114,7 +120,7 @@ const initClientPool = async poolSize => { // Set the intervals only at application init if (clientPool.length === 0) { setInterval(() => { - logger.info(`API client instantiation stats: ${JSON.stringify(clientInstantiationStats)}`); + logger.info(`API client instantiation stats: ${JSON.stringify(getApiClientStats())}`); }, 5 * 60 * 1000); setInterval(() => { diff --git a/services/blockchain-connector/shared/utils/download.js b/services/blockchain-connector/shared/utils/download.js index 7ffb6dcc0a..34db98ad89 100644 --- a/services/blockchain-connector/shared/utils/download.js +++ b/services/blockchain-connector/shared/utils/download.js @@ -113,10 +113,10 @@ const downloadFile = (url, dirPath) => }); const verifyFileChecksum = async (filePath, checksumPath) => { - // Validate existance of both files + // Validate the existence of both the files if (!(await exists(filePath)) || !(await exists(checksumPath))) { logger.info( - `Checksum verification failed. One of the following files does not exist:\nfilePath:${filePath}\nchecksumPath:${checksumPath}`, + `Checksum verification failed. One of the following files does not exist:\nfilePath: ${filePath}\nchecksumPath: ${checksumPath}`, ); return false; } @@ -141,11 +141,12 @@ const verifyFileChecksum = async (filePath, checksumPath) => { const fileChecksum = fileHash.toString('hex'); if (fileChecksum !== expectedChecksum) { logger.info( - `Checksum verification failed for file:${filePath}\nExpected: ${expectedChecksum}, Actual: ${fileChecksum}`, + `Checksum verification failed for file: ${filePath}\nExpected: ${expectedChecksum}, Actual: ${fileChecksum}`, ); return false; } + logger.info(`Checksum verification successful for file: ${filePath}`); return true; }; From 13918a91cdd3a575c49453e88aa605d874cd56b4 Mon Sep 17 00:00:00 2001 From: Sameer Kumar Subudhi Date: Sat, 6 Jan 2024 11:46:57 +0530 Subject: [PATCH 5/6] :hammer: Refactor code --- services/blockchain-connector/shared/sdk/client.js | 11 +++++++---- services/blockchain-connector/shared/sdk/events.js | 12 ++++++------ 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/services/blockchain-connector/shared/sdk/client.js b/services/blockchain-connector/shared/sdk/client.js index 2e9b970048..7c67e275bf 100644 --- a/services/blockchain-connector/shared/sdk/client.js +++ b/services/blockchain-connector/shared/sdk/client.js @@ -50,15 +50,17 @@ const WS_SERVER_PING_THRESHOLD = WS_SERVER_PING_INTERVAL + WS_SERVER_PING_BUFFER // Caching const clientPool = []; const clientInstantiationStats = { - requests: 0, + attempts: 0, success: 0, fail: 0, }; +let requestCount = 0; const getApiClientStats = () => ({ ...clientInstantiationStats, currentPoolSize: clientPool.length, expectedPoolSize: MAX_CLIENT_POOL_SIZE, + numEndpointInvocations: requestCount, }); const checkIsClientAlive = client => client && client._channel && client._channel.isAlive; @@ -89,7 +91,7 @@ const pingListener = apiClient => { }; const instantiateNewClient = async () => { - clientInstantiationStats.requests++; + clientInstantiationStats.attempts++; try { const newClient = config.isUseLiskIPCClient ? await createIPCClient(config.liskAppDataPath) @@ -185,6 +187,7 @@ const resetApiClient = async (apiClient, isEventSubscriptionClient = false) => { } // Do not attempt reset if last ping was within the acceptable threshold + // This is to avoid unnecessary socket creation if (Date.now() - (apiClient.lastPingAt || 0) < WS_SERVER_PING_THRESHOLD) { return; } @@ -232,17 +235,17 @@ const buildHTTPResponse = (endpoint, params, response) => { throw new Error(errorMessage); }; -let id = 0; // eslint-disable-next-line consistent-return const invokeEndpoint = async (endpoint, params = {}, numRetries = NUM_REQUEST_RETRIES) => { let retriesLeft = numRetries; do { try { + requestCount++; if (config.isUseHttpApi) { // HTTP API-based communication with the Lisk app node const rpcRequest = { jsonrpc: '2.0', - id: id++, + id: requestCount, method: endpoint, params, }; diff --git a/services/blockchain-connector/shared/sdk/events.js b/services/blockchain-connector/shared/sdk/events.js index 9be663fc2e..2a39131e59 100644 --- a/services/blockchain-connector/shared/sdk/events.js +++ b/services/blockchain-connector/shared/sdk/events.js @@ -152,13 +152,8 @@ const ensureAPIClientLiveness = () => { const subscribedApiClient = await getApiClient(eventSubscribeClientPoolIndex); Signals.get('resetApiClient').dispatch(subscribedApiClient, true); logger.info( - `Dispatched 'resetApiClient' signal to re-instantiate the API client ${eventSubscribeClientPoolIndex}.`, + `Dispatched 'resetApiClient' signal to re-instantiate the event subscription API client ${subscribedApiClient.poolIndex}.`, ); - - const eventSubscriptionClientResetListener = () => { - eventSubscribeClientPoolIndex = null; - }; - Signals.get('eventSubscriptionClientReset').add(eventSubscriptionClientResetListener); } } }, config.clientConnVerifyInterval); @@ -179,8 +174,13 @@ const genesisBlockDownloadedListener = () => { ensureAPIClientLiveness(); }; +const eventSubscriptionClientResetListener = () => { + eventSubscribeClientPoolIndex = null; +}; + Signals.get('nodeIsSynced').add(nodeIsSyncedListener); Signals.get('genesisBlockDownloaded').add(genesisBlockDownloadedListener); +Signals.get('eventSubscriptionClientReset').add(eventSubscriptionClientResetListener); module.exports = { events, From 0dc5ceb8c08064fe296da242813adc98ac6812f8 Mon Sep 17 00:00:00 2001 From: Sameer Kumar Subudhi Date: Sat, 6 Jan 2024 23:59:12 +0530 Subject: [PATCH 6/6] :bug: Re-subscribe events when prev subscribed client is reset --- .../blockchain-connector/shared/sdk/client.js | 17 ++- .../blockchain-connector/shared/sdk/events.js | 17 ++- services/fee-estimator/app.js | 1 + services/fee-estimator/shared/dynamicFees.js | 20 ++-- services/fee-estimator/shared/utils/chain.js | 109 ++++++++++++++++++ .../fee-estimator/shared/utils/dynamicFees.js | 12 +- 6 files changed, 149 insertions(+), 27 deletions(-) create mode 100644 services/fee-estimator/shared/utils/chain.js diff --git a/services/blockchain-connector/shared/sdk/client.js b/services/blockchain-connector/shared/sdk/client.js index 7c67e275bf..ff56665441 100644 --- a/services/blockchain-connector/shared/sdk/client.js +++ b/services/blockchain-connector/shared/sdk/client.js @@ -84,6 +84,9 @@ const pingListener = apiClient => { ); apiClient._channel.isAlive = false; Signals.get('resetApiClient').dispatch(apiClient); + logger.debug( + `Dispatched 'resetApiClient' signal from pingListener for API client ${apiClient.poolIndex}.`, + ); } }, WS_SERVER_PING_THRESHOLD); @@ -174,7 +177,12 @@ const getApiClient = async poolIndex => { return checkIsClientAlive(apiClient) ? apiClient : (() => { - if (apiClient) Signals.get('resetApiClient').dispatch(apiClient); + if (apiClient) { + Signals.get('resetApiClient').dispatch(apiClient); + logger.debug( + `Dispatched 'resetApiClient' signal from getApiClient for API client ${apiClient.poolIndex}.`, + ); + } return waitForIt(getApiClient, 10); })(); }; @@ -186,14 +194,15 @@ const resetApiClient = async (apiClient, isEventSubscriptionClient = false) => { return; } + const { poolIndex } = apiClient; + // Do not attempt reset if last ping was within the acceptable threshold // This is to avoid unnecessary socket creation if (Date.now() - (apiClient.lastPingAt || 0) < WS_SERVER_PING_THRESHOLD) { + logger.debug(`Not resetting apiClient ${poolIndex}. Received a late ping from the server.`); return; } - const { poolIndex } = apiClient; - if (isEventSubscriptionClient) { logger.info(`Attempting to reset the eventSubscriptionClient: apiClient ${poolIndex}.`); Signals.get('eventSubscriptionClientReset').dispatch(); @@ -215,7 +224,7 @@ const resetApiClient = async (apiClient, isEventSubscriptionClient = false) => { clientPool[poolIndex] = newApiClient; - if (newApiClient) Signals.get('newApiClient').dispatch(); + if (newApiClient) Signals.get('newApiClient').dispatch(newApiClient.poolIndex); }; Signals.get('resetApiClient').add(resetApiClient); diff --git a/services/blockchain-connector/shared/sdk/events.js b/services/blockchain-connector/shared/sdk/events.js index 2a39131e59..1d92131d44 100644 --- a/services/blockchain-connector/shared/sdk/events.js +++ b/services/blockchain-connector/shared/sdk/events.js @@ -87,11 +87,16 @@ const emitEngineEvents = async () => { }; // eslint-disable-next-line consistent-return -const subscribeToAllRegisteredEvents = async () => { +const subscribeToAllRegisteredEvents = async (newClientPoolIndex = null) => { if (config.isUseHttpApi) return emitEngineEvents(); // Active client subscription available, skip invocation - if (typeof eventSubscribeClientPoolIndex === 'number') return null; + if ( + typeof eventSubscribeClientPoolIndex === 'number' && + eventSubscribeClientPoolIndex !== newClientPoolIndex + ) { + return null; + } // Reset eventsCounter first eventsCounter = 0; @@ -149,10 +154,10 @@ const ensureAPIClientLiveness = () => { } if (typeof eventSubscribeClientPoolIndex === 'number') { - const subscribedApiClient = await getApiClient(eventSubscribeClientPoolIndex); - Signals.get('resetApiClient').dispatch(subscribedApiClient, true); - logger.info( - `Dispatched 'resetApiClient' signal to re-instantiate the event subscription API client ${subscribedApiClient.poolIndex}.`, + const apiClient = await getApiClient(eventSubscribeClientPoolIndex); + Signals.get('resetApiClient').dispatch(apiClient, true); + logger.debug( + `Dispatched 'resetApiClient' signal for the event subscription API client ${apiClient.poolIndex}.`, ); } } diff --git a/services/fee-estimator/app.js b/services/fee-estimator/app.js index 41314cf107..3b8c94c400 100644 --- a/services/fee-estimator/app.js +++ b/services/fee-estimator/app.js @@ -32,6 +32,7 @@ const app = Microservice({ logger: config.log, events: { chainNewBlock: async payload => Signals.get('newBlock').dispatch(payload), + chainDeleteBlock: async payload => Signals.get('deleteBlock').dispatch(payload), }, dependencies: ['connector'], }); diff --git a/services/fee-estimator/shared/dynamicFees.js b/services/fee-estimator/shared/dynamicFees.js index 1c85a7548e..4b6923507f 100644 --- a/services/fee-estimator/shared/dynamicFees.js +++ b/services/fee-estimator/shared/dynamicFees.js @@ -16,20 +16,18 @@ const util = require('util'); const { CacheRedis, Logger, Signals } = require('lisk-service-framework'); -const { requestConnector } = require('./utils/request'); - const config = require('../config'); const { getFeeConstants } = require('./feeConstants'); - +const { getLatestBlock } = require('./utils/chain'); const { checkAndProcessExecution, isFeeCalculationRunningInMode } = require('./utils/dynamicFees'); const cacheRedisFees = CacheRedis('fees', config.endpoints.cache); const logger = Logger(); -const calculateEstimateFeePerByteFull = async () => { - const { header: latestBlock } = await requestConnector('getLastBlock'); +const calculateEstimateFeePerByteFull = async newBlock => { + const { header: latestBlock } = newBlock; const fromHeight = config.feeEstimates.defaultStartBlockHeight; const toHeight = latestBlock.height; @@ -50,9 +48,9 @@ const calculateEstimateFeePerByteFull = async () => { return cachedFeeEstPerByteFull; }; -const calculateEstimateFeePerByteQuick = async () => { +const calculateEstimateFeePerByteQuick = async newBlock => { // For the cold start scenario - const { header: latestBlock } = await requestConnector('getLastBlock'); + const { header: latestBlock } = newBlock; const batchSize = config.feeEstimates.coldStartBatchSize; const toHeight = latestBlock.height; const fromHeight = toHeight - batchSize; @@ -81,7 +79,7 @@ const getEstimateFeePerByte = async () => { }; } - const { header: latestBlock } = await requestConnector('getLastBlock'); + const { header: latestBlock } = await getLatestBlock(); const validate = (feeEstPerByte, allowedLag = 0) => feeEstPerByte && ['low', 'med', 'high', 'updated', 'blockHeight', 'blockID'].every(key => @@ -111,15 +109,15 @@ const getEstimateFeePerByte = async () => { }; }; -const newBlockListener = async () => { +const newBlockListener = async newBlock => { try { if (config.feeEstimates.fullAlgorithmEnabled) { logger.debug('Initiate the dynamic fee estimates computation (full computation).'); - calculateEstimateFeePerByteFull(); + calculateEstimateFeePerByteFull(newBlock); } if (config.feeEstimates.quickAlgorithmEnabled) { logger.debug('Initiate the dynamic fee estimates computation (quick algorithm).'); - const feeEstimate = await calculateEstimateFeePerByteQuick(); + const feeEstimate = await calculateEstimateFeePerByteQuick(newBlock); logger.debug( `============== 'newFeeEstimate' signal: ${Signals.get('newFeeEstimate')} ==============.`, ); diff --git a/services/fee-estimator/shared/utils/chain.js b/services/fee-estimator/shared/utils/chain.js new file mode 100644 index 0000000000..e6ce481101 --- /dev/null +++ b/services/fee-estimator/shared/utils/chain.js @@ -0,0 +1,109 @@ +/* + * LiskHQ/lisk-service + * Copyright © 2024 Lisk Foundation + * + * See the LICENSE file at the top-level directory of this distribution + * for licensing information. + * + * Unless otherwise agreed in a custom licensing agreement with the Lisk Foundation, + * no part of this software, including this file, may be copied, modified, + * propagated, or distributed except according to the terms contained in the + * LICENSE file. + * + * Removal or modification of this copyright notice is prohibited. + * + */ +const { + CacheRedis, + Signals, + Utils: { isObject }, +} = require('lisk-service-framework'); + +const config = require('../../config'); +const { requestConnector } = require('./request'); + +let genesisHeight; +let blockTime; +let latestBlock; + +const { emaBatchSize } = config.feeEstimates; + +const blockByHeightCache = CacheRedis('blockByHeight', config.endpoints.cache); + +const isValidBlock = block => { + if (!isObject(block)) return false; + return 'assets' in block && 'header' in block && 'transactions' in block; +}; + +const setLatestBlock = block => { + if (isValidBlock(block)) latestBlock = block; +}; + +const getLatestBlock = async () => { + if (typeof latestBlock !== 'object') { + latestBlock = await requestConnector('getLastBlock'); + } + return latestBlock; +}; + +const getBlockTime = async () => { + if (typeof blockTime !== 'number') { + const nodeInfo = await requestConnector('getNodeInfo'); + blockTime = nodeInfo.genesis.blockTime; + } + return blockTime; +}; + +const getGenesisHeight = async () => { + if (typeof genesisHeight !== 'number') { + genesisHeight = await requestConnector('getGenesisHeight'); + } + return genesisHeight; +}; + +const cacheBlockByHeight = async block => { + try { + if (isValidBlock(block)) { + const numBlocksToKeep = emaBatchSize * 2; + const blockTimeInMs = (await getBlockTime()) * 1000; + + await blockByHeightCache.set( + block.header.height, + JSON.stringify(block), + numBlocksToKeep * blockTimeInMs, + ); + } + } catch (_) { + // No actions to be taken + } +}; + +const emptyCacheBlockByHeight = async block => { + try { + if (isValidBlock(block)) { + await blockByHeightCache.delete(block.header.height); + } + } catch (_) { + // No actions to be taken + } +}; + +const getBlockByHeight = async height => { + const blockStr = await blockByHeightCache.get(height); + if (blockStr) return JSON.parse(blockStr); + + const block = await requestConnector('getBlockByHeight', { height }); + await cacheBlockByHeight(block); + return block; +}; + +Signals.get('newBlock').add(setLatestBlock); +Signals.get('newBlock').add(cacheBlockByHeight); +Signals.get('deleteBlock').add(emptyCacheBlockByHeight); + +module.exports = { + setLatestBlock, + getLatestBlock, + getGenesisHeight, + getBlockByHeight, +}; diff --git a/services/fee-estimator/shared/utils/dynamicFees.js b/services/fee-estimator/shared/utils/dynamicFees.js index 827490d024..aa879418c4 100644 --- a/services/fee-estimator/shared/utils/dynamicFees.js +++ b/services/fee-estimator/shared/utils/dynamicFees.js @@ -17,8 +17,8 @@ const BluebirdPromise = require('bluebird'); const { CacheRedis, Logger } = require('lisk-service-framework'); +const { getGenesisHeight, getBlockByHeight } = require('./chain'); const { getEstimateFeePerByteForBlock } = require('./dynamicFeesLIP'); -const { requestConnector } = require('./request'); const config = require('../../config'); @@ -34,11 +34,11 @@ const cacheRedisFees = CacheRedis('fees', config.endpoints.cache); const logger = Logger(); const getEstimateFeePerByteForBatch = async (fromHeight, toHeight, cacheKey) => { - const genesisHeight = await requestConnector('getGenesisHeight'); + const genesisHeight = await getGenesisHeight(); const { defaultStartBlockHeight } = config.feeEstimates; // Check if the starting height is permitted by config or adjust acc. - // Use incrementation to skip the genesis block - it is not needed + // Skip the genesis block for calculation - it is not needed fromHeight = Math.max( ...[defaultStartBlockHeight, genesisHeight + 1, fromHeight].filter(n => !Number.isNaN(n)), ); @@ -70,9 +70,9 @@ const getEstimateFeePerByteForBatch = async (fromHeight, toHeight, cacheKey) => blockBatch.data = await BluebirdPromise.map( range(finalEMABatchSize), async i => { - const { header, transactions } = await requestConnector('getBlockByHeight', { - height: prevFeeEstPerByte.blockHeight + 1 - i, - }); + const { header, transactions } = await getBlockByHeight( + prevFeeEstPerByte.blockHeight + 1 - i, + ); return { ...header, transactions }; }, { concurrency: 50 },