From aa5118a0ef7284455682b4aa550f2f238ca56540 Mon Sep 17 00:00:00 2001 From: Priojeet Das Priyom Date: Mon, 18 Dec 2023 14:12:15 +0100 Subject: [PATCH 01/10] :WIP: Decouple api client instantiation by using cached connection pool --- .../blockchain-connector/shared/sdk/client.js | 221 +++++++----------- 1 file changed, 87 insertions(+), 134 deletions(-) diff --git a/services/blockchain-connector/shared/sdk/client.js b/services/blockchain-connector/shared/sdk/client.js index 3059711130..e2ada88231 100644 --- a/services/blockchain-connector/shared/sdk/client.js +++ b/services/blockchain-connector/shared/sdk/client.js @@ -16,7 +16,6 @@ const { Logger, Signals, - Utils: { waitForIt }, } = require('lisk-service-framework'); const { createWSClient, createIPCClient } = require('@liskhq/lisk-api-client'); @@ -28,127 +27,62 @@ const logger = Logger(); // Constants const timeoutMessage = 'Response not received in'; const liskAddress = config.endpoints.liskWs; -const RETRY_INTERVAL = config.apiClient.instantiation.retryInterval; -const MAX_INSTANTIATION_WAIT_TIME = config.apiClient.instantiation.maxWaitTime; const NUM_REQUEST_RETRIES = config.apiClient.request.maxRetries; const ENDPOINT_INVOKE_RETRY_DELAY = config.apiClient.request.retryDelay; const CLIENT_ALIVE_ASSUMPTION_TIME = config.apiClient.aliveAssumptionTime; -const CLIENT_ALIVE_ASSUMPTION_TIME_BEFORE_GENESIS = - config.apiClient.aliveAssumptionTimeBeforeGenesis; const HEARTBEAT_ACK_MAX_WAIT_TIME = config.apiClient.heartbeatAckMaxWaitTime; -const WS_CONNECTION_LIMIT = config.apiClient.wsConnectionLimit; - -// Caching and flags -let clientCache; -let instantiationBeginTime; -let lastClientAliveTime; -let heartbeatCheckBeginTime; -let isInstantiating = false; -let isClientAlive = false; -let isGenesisBlockIndexed = false; -let wsConnectionsEstablished = 0; - -const pongListener = res => { - isClientAlive = true; - lastClientAliveTime = Date.now(); - return res(true); -}; +const CACHED_CLIENT_COUNT = 5; + +// Pool of cached api clients +const cachedApiClients = []; -const checkIsClientAlive = async () => +const checkIsClientAlive = async (clientCache) => // eslint-disable-next-line consistent-return new Promise(resolve => { - if (!clientCache || (clientCache._channel && !clientCache._channel.isAlive)) { + if (!clientCache || !clientCache._channel || !clientCache._channel.isAlive) { return resolve(false); } - if ( - config.isUseLiskIPCClient || - Date.now() - lastClientAliveTime < CLIENT_ALIVE_ASSUMPTION_TIME || - // The below condition ensures that no other pings are sent when there's already a ping sent - // after the CLIENT_ALIVE_ASSUMPTION_TIME is exceeded - Date.now() - heartbeatCheckBeginTime < HEARTBEAT_ACK_MAX_WAIT_TIME * 2 - ) { - return resolve(clientCache._channel && clientCache._channel.isAlive); + if (config.isUseLiskIPCClient) { + return resolve(clientCache && clientCache._channel && clientCache._channel.isAlive); } - heartbeatCheckBeginTime = Date.now(); - const boundPongListener = () => pongListener(resolve); - + const heartbeatCheckBeginTime = Date.now(); const wsInstance = clientCache._channel._ws; + + // eslint-disable-next-line no-use-before-define + const boundPongListener = () => pongListener(resolve); wsInstance.on('pong', boundPongListener); - isClientAlive = false; wsInstance.ping(() => {}); // eslint-disable-next-line consistent-return const timeout = setTimeout(() => { - clearTimeout(timeout); wsInstance.removeListener('pong', boundPongListener); - if (!isClientAlive) return resolve(false); + // TODO: Downlevel log to debug + logger.info(`Did not receive API client pong after ${Date.now() - heartbeatCheckBeginTime}ms.`); + return resolve(false); }, HEARTBEAT_ACK_MAX_WAIT_TIME); + + const pongListener = res => { + clearTimeout(timeout); + wsInstance.removeListener('pong', boundPongListener); + // TODO: Downlevel log to debug + logger.info(`Received API client pong in ${Date.now() - heartbeatCheckBeginTime}ms.`); + return res(true); + }; }).catch(() => false); -// eslint-disable-next-line consistent-return -const instantiateClient = async (isForceReInstantiate = false) => { +const instantiateAndCacheClient = async () => { try { - if (!isInstantiating || isForceReInstantiate) { - const isNodeClientAlive = await checkIsClientAlive(); - - if (!config.isUseLiskIPCClient) { - if (isNodeClientAlive) { - wsConnectionsEstablished = 0; - } else { - let numRetries = NUM_REQUEST_RETRIES; - while (wsConnectionsEstablished >= WS_CONNECTION_LIMIT && numRetries--) { - await delay(MAX_INSTANTIATION_WAIT_TIME); - if (await checkIsClientAlive()) { - wsConnectionsEstablished = 0; - return clientCache; - } - } - } - } - - isInstantiating = true; - if (!isNodeClientAlive || isForceReInstantiate) { - if (!config.isUseLiskIPCClient) wsConnectionsEstablished++; - - instantiationBeginTime = Date.now(); - - if (clientCache) { - clientCache.disconnect().catch(err => { - // Ensure failed disconnection doesn't impact the re-instantiation - logger.warn(`Client disconnection failed due to: ${err.message}`); - }); - } - - clientCache = config.isUseLiskIPCClient - ? await createIPCClient(config.liskAppDataPath) - : await createWSClient(`${liskAddress}/rpc-ws`); - - lastClientAliveTime = Date.now(); - - if (isForceReInstantiate) logger.info('Re-instantiated the API client forcefully.'); - - // Inform listeners about the newly instantiated ApiClient - Signals.get('newApiClient').dispatch(); - } - - isInstantiating = false; - return clientCache; - } + const instantiationBeginTime = Date.now(); + const clientCache = config.isUseLiskIPCClient + ? await createIPCClient(config.liskAppDataPath) + : await createWSClient(`${liskAddress}/rpc-ws`); - if (Date.now() - instantiationBeginTime > MAX_INSTANTIATION_WAIT_TIME) { - // Waited too long, reset the flag to re-attempt client instantiation - logger.debug( - `MAX_INSTANTIATION_WAIT_TIME of ${MAX_INSTANTIATION_WAIT_TIME}ms has expired. Resetting isInstantiating to false.`, - ); - isInstantiating = false; - } + cachedApiClients.push(clientCache); + logger.info(`Instantiated another API client. Time taken: ${Date.now() - instantiationBeginTime}ms. Cached API client count:${cachedApiClients.length}`); } catch (err) { // Nullify the apiClient cache and unset isInstantiating, so that it can be re-instantiated properly - clientCache = null; - isInstantiating = false; - const errMessage = config.isUseLiskIPCClient ? `Error instantiating IPC client at ${config.liskAppDataPath}.` : `Error instantiating WS client to ${liskAddress}.`; @@ -158,14 +92,14 @@ const instantiateClient = async (isForceReInstantiate = false) => { if (err.message.includes('ECONNREFUSED')) { throw new Error('ECONNREFUSED: Unable to reach a network node.'); } - - return null; } }; const getApiClient = async () => { - const apiClient = await waitForIt(instantiateClient, RETRY_INTERVAL); - return (await checkIsClientAlive()) ? apiClient : getApiClient(); + if (cachedApiClients.length === 0) { + throw new Error(`No api client is alive!`); + } + return cachedApiClients[0]; }; // eslint-disable-next-line consistent-return @@ -186,45 +120,64 @@ const invokeEndpoint = async (endpoint, params = {}, numRetries = NUM_REQUEST_RE } while (retries--); }; +const refreshClientsCache = async () => { + // Indicates if an active client was destroyed or no active clients available + let activeClientNotAvailable = cachedApiClients.length === 0; + + // Check liveliness and remove non-responsive clients + let index = 0; + while (index < cachedApiClients.length) { + const cachedClient = cachedApiClients[index]; + try { + if (!await checkIsClientAlive(cachedClient)) { + cachedApiClients.splice(index, 1); + if (index === 0) { + activeClientNotAvailable = true; + } + cachedClient.disconnect().catch(err => { + logger.warn(`Client disconnection failed due to: ${err.message}`); + }); + } else { + index++; + } + } catch (err) { + logger.info(`Failed to check client aliveness.\nError:${err.message}`); + } + } + + // Initiate new clients if necessary + let missingClientCount = CACHED_CLIENT_COUNT - cachedApiClients.length; + + while (missingClientCount-- > 0) { + try { + await instantiateAndCacheClient(); + } catch (err) { + logger.info(`Failed to instantiate new api client.\nError:${err.message}`); + } + } + + // Reset event listeners if active api client was destroyed + if (activeClientNotAvailable && cachedApiClients.length > 0) { + Signals.get('newApiClient').dispatch(); + } +}; + // Checks to ensure that the API Client is always alive if (config.isUseLiskIPCClient) { - const resetApiClientListener = async () => instantiateClient(true).catch(() => {}); + const resetApiClientListener = async () => instantiateAndCacheClient().catch(() => {}); Signals.get('resetApiClient').add(resetApiClientListener); -} else { - let intervalTimeout; - const triggerRegularClientLivelinessChecks = intervalMs => { - intervalTimeout = setInterval(async () => { - const isAlive = await checkIsClientAlive(); - if (!isAlive) instantiateClient(true).catch(() => {}); - }, intervalMs); - }; - - const genesisBlockDownloadedListener = () => { - triggerRegularClientLivelinessChecks(CLIENT_ALIVE_ASSUMPTION_TIME_BEFORE_GENESIS); - logger.info( - `API client heartbeat checks scheduled every ${CLIENT_ALIVE_ASSUMPTION_TIME_BEFORE_GENESIS}ms. The frequency will be set to ${CLIENT_ALIVE_ASSUMPTION_TIME}ms after successful indexing of the genesis block.`, - ); - }; - - const genesisBlockIndexedListener = indexStatus => { - if ( - !isGenesisBlockIndexed && - indexStatus.data && - indexStatus.data.genesisHeight <= indexStatus.data.lastIndexedBlockHeight - ) { - clearInterval(intervalTimeout); - triggerRegularClientLivelinessChecks(CLIENT_ALIVE_ASSUMPTION_TIME); - isGenesisBlockIndexed = true; - logger.info( - `API client heartbeat checks re-scheduled to run every ${CLIENT_ALIVE_ASSUMPTION_TIME}ms.`, - ); - } - }; - - Signals.get('genesisBlockDownloaded').add(genesisBlockDownloadedListener); - Signals.get('updateIndexStatus').add(genesisBlockIndexedListener); } +// Check periodically for client aliveness and refill cached clients pool +// TODO: Remove all console.time +setInterval(async () => { + console.time('refreshClientsCache'); + await refreshClientsCache(); + console.timeEnd('refreshClientsCache'); +}, CLIENT_ALIVE_ASSUMPTION_TIME); +// Initiate client cache for first time +refreshClientsCache(); + module.exports = { timeoutMessage, From e8e5d7db2cef10e79c8b03ab791fb2416ebbb1eb Mon Sep 17 00:00:00 2001 From: Priojeet Das Priyom Date: Mon, 18 Dec 2023 14:13:28 +0100 Subject: [PATCH 02/10] :art: Lint --- .../blockchain-connector/shared/sdk/client.js | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/services/blockchain-connector/shared/sdk/client.js b/services/blockchain-connector/shared/sdk/client.js index e2ada88231..09e0132cee 100644 --- a/services/blockchain-connector/shared/sdk/client.js +++ b/services/blockchain-connector/shared/sdk/client.js @@ -13,10 +13,7 @@ * Removal or modification of this copyright notice is prohibited. * */ -const { - Logger, - Signals, -} = require('lisk-service-framework'); +const { Logger, Signals } = require('lisk-service-framework'); const { createWSClient, createIPCClient } = require('@liskhq/lisk-api-client'); const config = require('../../config'); @@ -36,7 +33,7 @@ const CACHED_CLIENT_COUNT = 5; // Pool of cached api clients const cachedApiClients = []; -const checkIsClientAlive = async (clientCache) => +const checkIsClientAlive = async clientCache => // eslint-disable-next-line consistent-return new Promise(resolve => { if (!clientCache || !clientCache._channel || !clientCache._channel.isAlive) { @@ -59,7 +56,9 @@ const checkIsClientAlive = async (clientCache) => const timeout = setTimeout(() => { wsInstance.removeListener('pong', boundPongListener); // TODO: Downlevel log to debug - logger.info(`Did not receive API client pong after ${Date.now() - heartbeatCheckBeginTime}ms.`); + logger.info( + `Did not receive API client pong after ${Date.now() - heartbeatCheckBeginTime}ms.`, + ); return resolve(false); }, HEARTBEAT_ACK_MAX_WAIT_TIME); @@ -80,7 +79,11 @@ const instantiateAndCacheClient = async () => { : await createWSClient(`${liskAddress}/rpc-ws`); cachedApiClients.push(clientCache); - logger.info(`Instantiated another API client. Time taken: ${Date.now() - instantiationBeginTime}ms. Cached API client count:${cachedApiClients.length}`); + logger.info( + `Instantiated another API client. Time taken: ${ + Date.now() - instantiationBeginTime + }ms. Cached API client count:${cachedApiClients.length}`, + ); } catch (err) { // Nullify the apiClient cache and unset isInstantiating, so that it can be re-instantiated properly const errMessage = config.isUseLiskIPCClient @@ -129,7 +132,7 @@ const refreshClientsCache = async () => { while (index < cachedApiClients.length) { const cachedClient = cachedApiClients[index]; try { - if (!await checkIsClientAlive(cachedClient)) { + if (!(await checkIsClientAlive(cachedClient))) { cachedApiClients.splice(index, 1); if (index === 0) { activeClientNotAvailable = true; @@ -171,8 +174,10 @@ if (config.isUseLiskIPCClient) { // Check periodically for client aliveness and refill cached clients pool // TODO: Remove all console.time setInterval(async () => { + // eslint-disable-next-line no-console console.time('refreshClientsCache'); await refreshClientsCache(); + // eslint-disable-next-line no-console console.timeEnd('refreshClientsCache'); }, CLIENT_ALIVE_ASSUMPTION_TIME); // Initiate client cache for first time From 50fdd5b05538b752d96ef7fb83150882053e0672 Mon Sep 17 00:00:00 2001 From: Priojeet Das Priyom Date: Mon, 18 Dec 2023 15:02:42 +0100 Subject: [PATCH 03/10] Use delay between client cache refresh --- .../blockchain-connector/shared/sdk/client.js | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/services/blockchain-connector/shared/sdk/client.js b/services/blockchain-connector/shared/sdk/client.js index 09e0132cee..8cb230b765 100644 --- a/services/blockchain-connector/shared/sdk/client.js +++ b/services/blockchain-connector/shared/sdk/client.js @@ -40,8 +40,9 @@ const checkIsClientAlive = async clientCache => return resolve(false); } + // Skip heartbeat check for IPC client if (config.isUseLiskIPCClient) { - return resolve(clientCache && clientCache._channel && clientCache._channel.isAlive); + return resolve(true); } const heartbeatCheckBeginTime = Date.now(); @@ -173,13 +174,17 @@ if (config.isUseLiskIPCClient) { // Check periodically for client aliveness and refill cached clients pool // TODO: Remove all console.time -setInterval(async () => { - // eslint-disable-next-line no-console - console.time('refreshClientsCache'); - await refreshClientsCache(); - // eslint-disable-next-line no-console - console.timeEnd('refreshClientsCache'); -}, CLIENT_ALIVE_ASSUMPTION_TIME); +(async () => { + // eslint-disable-next-line no-constant-condition + while (true) { + // eslint-disable-next-line no-console + console.time('refreshClientsCache'); + await refreshClientsCache(); + // eslint-disable-next-line no-console + console.timeEnd('refreshClientsCache'); + await delay(CLIENT_ALIVE_ASSUMPTION_TIME); + } +})(); // Initiate client cache for first time refreshClientsCache(); From 4b1ae84e51300283ffc74c72335c773528528ead Mon Sep 17 00:00:00 2001 From: Priojeet Das Priyom Date: Mon, 18 Dec 2023 15:11:21 +0100 Subject: [PATCH 04/10] Remove redundant first time instantiation --- services/blockchain-connector/shared/sdk/client.js | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/services/blockchain-connector/shared/sdk/client.js b/services/blockchain-connector/shared/sdk/client.js index 8cb230b765..852d4a9811 100644 --- a/services/blockchain-connector/shared/sdk/client.js +++ b/services/blockchain-connector/shared/sdk/client.js @@ -73,6 +73,12 @@ const checkIsClientAlive = async clientCache => }).catch(() => false); const instantiateAndCacheClient = async () => { + if (cachedApiClients.length > CACHED_CLIENT_COUNT) { + // TODO: Down level log to debug + logger.info(`Skipping API client instantiation as cached API client count(${cachedApiClients.length}) is greater than CACHED_CLIENT_COUNT(${CACHED_CLIENT_COUNT}).`); + return; + } + try { const instantiationBeginTime = Date.now(); const clientCache = config.isUseLiskIPCClient @@ -185,8 +191,6 @@ if (config.isUseLiskIPCClient) { await delay(CLIENT_ALIVE_ASSUMPTION_TIME); } })(); -// Initiate client cache for first time -refreshClientsCache(); module.exports = { timeoutMessage, From f28c6be733bd30eb044a2036d7ff98521abcc407 Mon Sep 17 00:00:00 2001 From: Priojeet Das Priyom Date: Mon, 18 Dec 2023 15:12:13 +0100 Subject: [PATCH 05/10] :art: Lint --- services/blockchain-connector/shared/sdk/client.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/services/blockchain-connector/shared/sdk/client.js b/services/blockchain-connector/shared/sdk/client.js index 852d4a9811..09d75bb352 100644 --- a/services/blockchain-connector/shared/sdk/client.js +++ b/services/blockchain-connector/shared/sdk/client.js @@ -75,7 +75,9 @@ const checkIsClientAlive = async clientCache => const instantiateAndCacheClient = async () => { if (cachedApiClients.length > CACHED_CLIENT_COUNT) { // TODO: Down level log to debug - logger.info(`Skipping API client instantiation as cached API client count(${cachedApiClients.length}) is greater than CACHED_CLIENT_COUNT(${CACHED_CLIENT_COUNT}).`); + logger.info( + `Skipping API client instantiation as cached API client count(${cachedApiClients.length}) is greater than CACHED_CLIENT_COUNT(${CACHED_CLIENT_COUNT}).`, + ); return; } From f72abdd7f4753aa1e60467283c54286dabc7a5d3 Mon Sep 17 00:00:00 2001 From: Priojeet Das Priyom Date: Mon, 18 Dec 2023 15:37:47 +0100 Subject: [PATCH 06/10] Update default values for INVOKE_ALLOWED_METHODS --- docker-compose.yml | 1 + docker/example.env | 2 +- ecosystem.config.js | 2 +- ecosystem.jenkins.config.js | 2 +- services/blockchain-indexer/config.js | 1 + 5 files changed, 5 insertions(+), 3 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 9df2e15045..8ccae35cde 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -227,6 +227,7 @@ services: - JOB_INTERVAL_TRIGGER_ACCOUNT_UPDATES=${JOB_INTERVAL_TRIGGER_ACCOUNT_UPDATES} - JOB_SCHEDULE_TRIGGER_ACCOUNT_UPDATES=${JOB_SCHEDULE_TRIGGER_ACCOUNT_UPDATES} - ESTIMATES_BUFFER_BYTES_LENGTH=${ESTIMATES_BUFFER_BYTES_LENGTH} + - INVOKE_ALLOWED_METHODS=${INVOKE_ALLOWED_METHODS} restart: always blockchain-coordinator: diff --git a/docker/example.env b/docker/example.env index 9bb3acc863..b212f5b987 100644 --- a/docker/example.env +++ b/docker/example.env @@ -67,7 +67,7 @@ LISK_APP_WS=ws://host.docker.internal:7887 # ENABLE_SNAPSHOT_ALLOW_INSECURE_HTTP=false # ACCOUNT_BALANCE_UPDATE_BATCH_SIZE=1000 # INDEX_BLOCKS_QUEUE_SCHEDULED_JOB_MAX_COUNT=100000 -# INVOKE_ALLOWED_METHODS='dynamicReward_getExpectedValidatorRewards,token_hasUserAccount,token_getInitializationFees,interoperability_getMinimumMessageFee,txpool_getTransactionsFromPool' +# INVOKE_ALLOWED_METHODS='dynamicReward_getExpectedValidatorRewards,token_hasUserAccount,token_getInitializationFees,interoperability_getMinimumMessageFee,txpool_getTransactionsFromPool,pos_getExpectedSharedRewards' # Moleculer jobs configuration # JOB_INTERVAL_DELETE_SERIALIZED_EVENTS=0 diff --git a/ecosystem.config.js b/ecosystem.config.js index 7262ff1706..5bffa62759 100644 --- a/ecosystem.config.js +++ b/ecosystem.config.js @@ -196,7 +196,7 @@ module.exports = { // JOB_SCHEDULE_DELETE_FINALIZED_CCU_METADATA: '0 2 * * *', // JOB_INTERVAL_TRIGGER_ACCOUNT_UPDATES: 0, // JOB_SCHEDULE_TRIGGER_ACCOUNT_UPDATES: '*/15 * * * *', - // INVOKE_ALLOWED_METHODS: 'dynamicReward_getExpectedValidatorRewards,token_hasUserAccount,token_getInitializationFees,interoperability_getMinimumMessageFee,txpool_getTransactionsFromPool', + // INVOKE_ALLOWED_METHODS: 'dynamicReward_getExpectedValidatorRewards,token_hasUserAccount,token_getInitializationFees,interoperability_getMinimumMessageFee,txpool_getTransactionsFromPool,pos_getExpectedSharedRewards', }, }, { diff --git a/ecosystem.jenkins.config.js b/ecosystem.jenkins.config.js index cff9011106..5d92f9565d 100644 --- a/ecosystem.jenkins.config.js +++ b/ecosystem.jenkins.config.js @@ -110,7 +110,7 @@ module.exports = { ENABLE_DATA_RETRIEVAL_MODE: true, ENABLE_INDEXING_MODE: true, ENABLE_PERSIST_EVENTS: false, - // INVOKE_ALLOWED_METHODS: 'dynamicReward_getExpectedValidatorRewards,token_hasUserAccount,token_getInitializationFees,interoperability_getMinimumMessageFee,txpool_getTransactionsFromPool', + // INVOKE_ALLOWED_METHODS: 'dynamicReward_getExpectedValidatorRewards,token_hasUserAccount,token_getInitializationFees,interoperability_getMinimumMessageFee,txpool_getTransactionsFromPool,pos_getExpectedSharedRewards', }, }, { diff --git a/services/blockchain-indexer/config.js b/services/blockchain-indexer/config.js index a108c13017..fc57b117fd 100644 --- a/services/blockchain-indexer/config.js +++ b/services/blockchain-indexer/config.js @@ -222,6 +222,7 @@ config.invokeAllowedMethods = process.env.INVOKE_ALLOWED_METHODS 'token_getInitializationFees', 'interoperability_getMinimumMessageFee', 'txpool_getTransactionsFromPool', + 'pos_getExpectedSharedRewards', ]; module.exports = config; From 8a3370fcc816660ff0aa4e71de41c2cc8403ceaf Mon Sep 17 00:00:00 2001 From: Priojeet Das Priyom Date: Mon, 18 Dec 2023 23:52:28 +0100 Subject: [PATCH 07/10] :WIP: Add total client instantiation count monitoring --- docker-compose.yml | 1 + .../blockchain-connector/shared/sdk/client.js | 28 +++++++++++++++---- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 8ccae35cde..a9922fb887 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -162,6 +162,7 @@ services: - JOB_SCHEDULE_CACHE_CLEANUP=${JOB_SCHEDULE_CACHE_CLEANUP} - JOB_INTERVAL_REFRESH_PEERS=${JOB_INTERVAL_REFRESH_PEERS} - JOB_SCHEDULE_REFRESH_PEERS=${JOB_SCHEDULE_REFRESH_PEERS} + - SERVICE_CONNECTOR_CACHE_REDIS=redis://lisk:password@redis_persistent:6379/0 restart: always extra_hosts: - 'host.docker.internal:host-gateway' diff --git a/services/blockchain-connector/shared/sdk/client.js b/services/blockchain-connector/shared/sdk/client.js index 09d75bb352..50033de87e 100644 --- a/services/blockchain-connector/shared/sdk/client.js +++ b/services/blockchain-connector/shared/sdk/client.js @@ -13,7 +13,7 @@ * Removal or modification of this copyright notice is prohibited. * */ -const { Logger, Signals } = require('lisk-service-framework'); +const { Logger, Signals, CacheRedis } = require('lisk-service-framework'); const { createWSClient, createIPCClient } = require('@liskhq/lisk-api-client'); const config = require('../../config'); @@ -33,6 +33,13 @@ const CACHED_CLIENT_COUNT = 5; // Pool of cached api clients const cachedApiClients = []; +// TODO: Remove this variable and cache usage and docker compose variable. Used to get an idea about the total number of time api client connection is created +const redisCache = CacheRedis( + 'temp', + process.env.SERVICE_CONNECTOR_CACHE_REDIS || 'redis://lisk:password@127.0.0.1:6381/2', +); +const TOTAL_CLIENT_INITIALIZATION_COUNT = 'totalClientInitializationCount'; + const checkIsClientAlive = async clientCache => // eslint-disable-next-line consistent-return new Promise(resolve => { @@ -88,6 +95,12 @@ const instantiateAndCacheClient = async () => { : await createWSClient(`${liskAddress}/rpc-ws`); cachedApiClients.push(clientCache); + + await redisCache.set( + TOTAL_CLIENT_INITIALIZATION_COUNT, + ((await redisCache.get(TOTAL_CLIENT_INITIALIZATION_COUNT)) || 0) + 1, + ); + logger.info( `Instantiated another API client. Time taken: ${ Date.now() - instantiationBeginTime @@ -185,11 +198,16 @@ if (config.isUseLiskIPCClient) { (async () => { // eslint-disable-next-line no-constant-condition while (true) { - // eslint-disable-next-line no-console - console.time('refreshClientsCache'); + const cacheRefreshStartTime = Date.now(); await refreshClientsCache(); - // eslint-disable-next-line no-console - console.timeEnd('refreshClientsCache'); + // TODO: Down level the log to debug + logger.info( + `Refreshed api client cached in ${ + Date.now() - cacheRefreshStartTime + }ms. API client instantiated ${await redisCache.get( + TOTAL_CLIENT_INITIALIZATION_COUNT, + )} time(s) so far.`, + ); await delay(CLIENT_ALIVE_ASSUMPTION_TIME); } })(); From ba319958b0824dcff76a27b9d941ddd41a6a0c15 Mon Sep 17 00:00:00 2001 From: Priojeet Das Priyom Date: Tue, 19 Dec 2023 13:21:48 +0100 Subject: [PATCH 08/10] :art: Remove unused config values and refactor --- docker-compose.yml | 2 -- docker/example.env | 2 -- .../modules/ROOT/pages/configuration/index.adoc | 12 ------------ ecosystem.config.js | 2 -- jenkins/docker-compose.nightly.yml | 2 -- services/blockchain-connector/README.md | 2 -- services/blockchain-connector/config.js | 7 +------ services/blockchain-connector/shared/sdk/client.js | 8 ++++---- 8 files changed, 5 insertions(+), 32 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index a9922fb887..c3fc1e6b80 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -151,8 +151,6 @@ services: - ENABLE_TESTING_MODE=${ENABLE_TESTING_MODE} - ENABLE_BLOCK_CACHING=${ENABLE_BLOCK_CACHING} - EXPIRY_IN_HOURS=${EXPIRY_IN_HOURS} - - CLIENT_INSTANTIATION_MAX_WAIT_TIME=${CLIENT_INSTANTIATION_MAX_WAIT_TIME} - - CLIENT_INSTANTIATION_RETRY_INTERVAL=${CLIENT_INSTANTIATION_RETRY_INTERVAL} - CLIENT_ALIVE_ASSUMPTION_TIME=${CLIENT_ALIVE_ASSUMPTION_TIME} - HEARTBEAT_ACK_MAX_WAIT_TIME=${HEARTBEAT_ACK_MAX_WAIT_TIME} - ENDPOINT_INVOKE_MAX_RETRIES=${ENDPOINT_INVOKE_MAX_RETRIES} diff --git a/docker/example.env b/docker/example.env index b212f5b987..c59a85b5c8 100644 --- a/docker/example.env +++ b/docker/example.env @@ -28,8 +28,6 @@ ## Lisk Service Blockchain Connector # GENESIS_BLOCK_URL='https://downloads.lisk.com/lisk/mainnet/genesis_block.json.tar.gz' -# CLIENT_INSTANTIATION_MAX_WAIT_TIME=5000 -# CLIENT_INSTANTIATION_RETRY_INTERVAL=1 # CLIENT_ALIVE_ASSUMPTION_TIME=5000 # HEARTBEAT_ACK_MAX_WAIT_TIME=1000 # ENDPOINT_INVOKE_MAX_RETRIES=3 diff --git a/docs/antora/modules/ROOT/pages/configuration/index.adoc b/docs/antora/modules/ROOT/pages/configuration/index.adoc index 8a57e00427..9c26491d56 100644 --- a/docs/antora/modules/ROOT/pages/configuration/index.adoc +++ b/docs/antora/modules/ROOT/pages/configuration/index.adoc @@ -401,18 +401,6 @@ To disable it, set it to `false`. By default, it is set to `12` hours. | 12 -| `CLIENT_INSTANTIATION_MAX_WAIT_TIME` -| number -| Maximum wait time (in milliseconds) for the API client instantiation before forcefully instantiating a new client when getApiClient is invoked. -By default, it is set to `5000`. -| 5000 - -| `CLIENT_INSTANTIATION_RETRY_INTERVAL` -| number -| Retry interval (in milliseconds) to invoke instantiate API client when getApiClient is invoked. -By default, it is set to `1`. -| 1 - | `CLIENT_ALIVE_ASSUMPTION_TIME` | number | Interval (in milliseconds) for which the WS API Client is assumed to be alive since the last ping/pong success check. diff --git a/ecosystem.config.js b/ecosystem.config.js index 5bffa62759..9bf9da558c 100644 --- a/ecosystem.config.js +++ b/ecosystem.config.js @@ -127,8 +127,6 @@ module.exports = { // SERVICE_LOG_FILE: false, // DOCKER_HOST: 'local', // GENESIS_BLOCK_URL: 'https://downloads.lisk.com/lisk/mainnet/genesis_block.json.tar.gz', - // CLIENT_INSTANTIATION_MAX_WAIT_TIME: 5000, - // CLIENT_INSTANTIATION_RETRY_INTERVAL: 1, // CLIENT_ALIVE_ASSUMPTION_TIME: 5 * 1000, // HEARTBEAT_ACK_MAX_WAIT_TIME: 1000, // ENDPOINT_INVOKE_MAX_RETRIES: 3, diff --git a/jenkins/docker-compose.nightly.yml b/jenkins/docker-compose.nightly.yml index 11f8cddb3f..186fb334a7 100644 --- a/jenkins/docker-compose.nightly.yml +++ b/jenkins/docker-compose.nightly.yml @@ -153,8 +153,6 @@ services: - ENABLE_TESTING_MODE=${ENABLE_TESTING_MODE} - ENABLE_BLOCK_CACHING=${ENABLE_BLOCK_CACHING} - EXPIRY_IN_HOURS=${EXPIRY_IN_HOURS} - - CLIENT_INSTANTIATION_MAX_WAIT_TIME=${CLIENT_INSTANTIATION_MAX_WAIT_TIME} - - CLIENT_INSTANTIATION_RETRY_INTERVAL=${CLIENT_INSTANTIATION_RETRY_INTERVAL} - CLIENT_ALIVE_ASSUMPTION_TIME=${CLIENT_ALIVE_ASSUMPTION_TIME} - ENDPOINT_INVOKE_MAX_RETRIES=${ENDPOINT_INVOKE_MAX_RETRIES} - ENDPOINT_INVOKE_RETRY_DELAY=${ENDPOINT_INVOKE_RETRY_DELAY} diff --git a/services/blockchain-connector/README.md b/services/blockchain-connector/README.md index 81ad85d552..dee0ea333b 100644 --- a/services/blockchain-connector/README.md +++ b/services/blockchain-connector/README.md @@ -34,8 +34,6 @@ A list of the most commonly used environment variables is presented below: - `GEOIP_JSON`: URL of GeoIP server. - `ENABLE_BLOCK_CACHING`: Boolean flag to enable the block caching. Enabled by default. To disable, set it to `false`. - `EXPIRY_IN_HOURS`: Expiry time (in hours) for block cache. By default, it is set to 12. -- `CLIENT_INSTANTIATION_MAX_WAIT_TIME`: Maximum wait time (in milliseconds) for the API client instantiation before forcefully instantiating a new client when getApiClient is invoked. By default, it is set to 5000. -- `CLIENT_INSTANTIATION_RETRY_INTERVAL`: Retry interval (in milliseconds) to invoke instantiate API client when getApiClient is invoked. By default, it is set to 5. - `CLIENT_ALIVE_ASSUMPTION_TIME`: Interval (in milliseconds) for which the WS API Client is assumed to be alive since the last ping/pong success check. By default, it is set to 5000. - `HEARTBEAT_ACK_MAX_WAIT_TIME`: Maximum time (in milliseconds) within which the checkIsClientAlive's algorithm expects a corresponding `pong` for the `ping` sent to the WS server. By default, it is set to 1000. - `ENDPOINT_INVOKE_MAX_RETRIES`: Maximum number of endpoint invocation request retries to the node. By default, it is set to 3. diff --git a/services/blockchain-connector/config.js b/services/blockchain-connector/config.js index baed64d092..f5b4fe9b77 100644 --- a/services/blockchain-connector/config.js +++ b/services/blockchain-connector/config.js @@ -115,12 +115,7 @@ config.job = { config.apiClient = { heartbeatAckMaxWaitTime: Number(process.env.HEARTBEAT_ACK_MAX_WAIT_TIME) || 1000, // in millisecs aliveAssumptionTime: Number(process.env.CLIENT_ALIVE_ASSUMPTION_TIME) || 5 * 1000, // in millisecs - aliveAssumptionTimeBeforeGenesis: 30 * 1000, - wsConnectionLimit: 10, - instantiation: { - maxWaitTime: Number(process.env.CLIENT_INSTANTIATION_MAX_WAIT_TIME) || 5 * 1000, // in millisecs - retryInterval: Number(process.env.CLIENT_INSTANTIATION_RETRY_INTERVAL) || 1, // in millisecs - }, + connectionLimit: 5, request: { maxRetries: Number(process.env.ENDPOINT_INVOKE_MAX_RETRIES) || 3, retryDelay: Number(process.env.ENDPOINT_INVOKE_RETRY_DELAY) || 10, // in millisecs diff --git a/services/blockchain-connector/shared/sdk/client.js b/services/blockchain-connector/shared/sdk/client.js index 50033de87e..750f9a1a5e 100644 --- a/services/blockchain-connector/shared/sdk/client.js +++ b/services/blockchain-connector/shared/sdk/client.js @@ -28,7 +28,7 @@ const NUM_REQUEST_RETRIES = config.apiClient.request.maxRetries; const ENDPOINT_INVOKE_RETRY_DELAY = config.apiClient.request.retryDelay; const CLIENT_ALIVE_ASSUMPTION_TIME = config.apiClient.aliveAssumptionTime; const HEARTBEAT_ACK_MAX_WAIT_TIME = config.apiClient.heartbeatAckMaxWaitTime; -const CACHED_CLIENT_COUNT = 5; +const CONNECTION_LIMIT = config.apiClient.connectionLimit; // Pool of cached api clients const cachedApiClients = []; @@ -80,10 +80,10 @@ const checkIsClientAlive = async clientCache => }).catch(() => false); const instantiateAndCacheClient = async () => { - if (cachedApiClients.length > CACHED_CLIENT_COUNT) { + if (cachedApiClients.length > CONNECTION_LIMIT) { // TODO: Down level log to debug logger.info( - `Skipping API client instantiation as cached API client count(${cachedApiClients.length}) is greater than CACHED_CLIENT_COUNT(${CACHED_CLIENT_COUNT}).`, + `Skipping API client instantiation as cached API client count(${cachedApiClients.length}) is greater than CONNECTION_LIMIT(${CONNECTION_LIMIT}).`, ); return; } @@ -171,7 +171,7 @@ const refreshClientsCache = async () => { } // Initiate new clients if necessary - let missingClientCount = CACHED_CLIENT_COUNT - cachedApiClients.length; + let missingClientCount = CONNECTION_LIMIT - cachedApiClients.length; while (missingClientCount-- > 0) { try { From 4582b39f500009ed2d939c1eadfb043c37f2f914 Mon Sep 17 00:00:00 2001 From: Priojeet Das Priyom Date: Tue, 19 Dec 2023 14:26:54 +0100 Subject: [PATCH 09/10] :art: Cleanup --- docker-compose.yml | 1 - .../blockchain-connector/shared/sdk/client.js | 66 +++++++++---------- 2 files changed, 31 insertions(+), 36 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index c3fc1e6b80..ed5c42c127 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -160,7 +160,6 @@ services: - JOB_SCHEDULE_CACHE_CLEANUP=${JOB_SCHEDULE_CACHE_CLEANUP} - JOB_INTERVAL_REFRESH_PEERS=${JOB_INTERVAL_REFRESH_PEERS} - JOB_SCHEDULE_REFRESH_PEERS=${JOB_SCHEDULE_REFRESH_PEERS} - - SERVICE_CONNECTOR_CACHE_REDIS=redis://lisk:password@redis_persistent:6379/0 restart: always extra_hosts: - 'host.docker.internal:host-gateway' diff --git a/services/blockchain-connector/shared/sdk/client.js b/services/blockchain-connector/shared/sdk/client.js index 750f9a1a5e..e483d91972 100644 --- a/services/blockchain-connector/shared/sdk/client.js +++ b/services/blockchain-connector/shared/sdk/client.js @@ -13,7 +13,7 @@ * Removal or modification of this copyright notice is prohibited. * */ -const { Logger, Signals, CacheRedis } = require('lisk-service-framework'); +const { Logger, Signals } = require('lisk-service-framework'); const { createWSClient, createIPCClient } = require('@liskhq/lisk-api-client'); const config = require('../../config'); @@ -33,13 +33,6 @@ const CONNECTION_LIMIT = config.apiClient.connectionLimit; // Pool of cached api clients const cachedApiClients = []; -// TODO: Remove this variable and cache usage and docker compose variable. Used to get an idea about the total number of time api client connection is created -const redisCache = CacheRedis( - 'temp', - process.env.SERVICE_CONNECTOR_CACHE_REDIS || 'redis://lisk:password@127.0.0.1:6381/2', -); -const TOTAL_CLIENT_INITIALIZATION_COUNT = 'totalClientInitializationCount'; - const checkIsClientAlive = async clientCache => // eslint-disable-next-line consistent-return new Promise(resolve => { @@ -63,8 +56,7 @@ const checkIsClientAlive = async clientCache => // eslint-disable-next-line consistent-return const timeout = setTimeout(() => { wsInstance.removeListener('pong', boundPongListener); - // TODO: Downlevel log to debug - logger.info( + logger.debug( `Did not receive API client pong after ${Date.now() - heartbeatCheckBeginTime}ms.`, ); return resolve(false); @@ -73,16 +65,14 @@ const checkIsClientAlive = async clientCache => const pongListener = res => { clearTimeout(timeout); wsInstance.removeListener('pong', boundPongListener); - // TODO: Downlevel log to debug - logger.info(`Received API client pong in ${Date.now() - heartbeatCheckBeginTime}ms.`); + logger.debug(`Received API client pong in ${Date.now() - heartbeatCheckBeginTime}ms.`); return res(true); }; }).catch(() => false); const instantiateAndCacheClient = async () => { if (cachedApiClients.length > CONNECTION_LIMIT) { - // TODO: Down level log to debug - logger.info( + logger.debug( `Skipping API client instantiation as cached API client count(${cachedApiClients.length}) is greater than CONNECTION_LIMIT(${CONNECTION_LIMIT}).`, ); return; @@ -96,11 +86,6 @@ const instantiateAndCacheClient = async () => { cachedApiClients.push(clientCache); - await redisCache.set( - TOTAL_CLIENT_INITIALIZATION_COUNT, - ((await redisCache.get(TOTAL_CLIENT_INITIALIZATION_COUNT)) || 0) + 1, - ); - logger.info( `Instantiated another API client. Time taken: ${ Date.now() - instantiationBeginTime @@ -145,6 +130,14 @@ const invokeEndpoint = async (endpoint, params = {}, numRetries = NUM_REQUEST_RE } while (retries--); }; +const disconnectClient = async cachedClient => { + try { + await cachedClient.disconnect(); + } catch (err) { + logger.warn(`Client disconnection failed due to: ${err.message}`); + } +}; + const refreshClientsCache = async () => { // Indicates if an active client was destroyed or no active clients available let activeClientNotAvailable = cachedApiClients.length === 0; @@ -159,9 +152,7 @@ const refreshClientsCache = async () => { if (index === 0) { activeClientNotAvailable = true; } - cachedClient.disconnect().catch(err => { - logger.warn(`Client disconnection failed due to: ${err.message}`); - }); + await disconnectClient(cachedClient); } else { index++; } @@ -185,28 +176,33 @@ const refreshClientsCache = async () => { if (activeClientNotAvailable && cachedApiClients.length > 0) { Signals.get('newApiClient').dispatch(); } + + return cachedApiClients.length; }; -// Checks to ensure that the API Client is always alive -if (config.isUseLiskIPCClient) { - const resetApiClientListener = async () => instantiateAndCacheClient().catch(() => {}); - Signals.get('resetApiClient').add(resetApiClientListener); -} +// Listen to client unresponsive events and try to reinitialize client +const resetApiClientListener = async () => { + logger.info( + `Received API client reset signal. Will drop ${cachedApiClients.length} cached API client(s).`, + ); + // Drop pool of cached clients. Will be instantiated by refresh clients job + while (cachedApiClients.length > 0) { + const cachedClient = cachedApiClients.pop(); + await disconnectClient(cachedClient); + } +}; +Signals.get('resetApiClient').add(resetApiClientListener); // Check periodically for client aliveness and refill cached clients pool -// TODO: Remove all console.time (async () => { // eslint-disable-next-line no-constant-condition while (true) { const cacheRefreshStartTime = Date.now(); await refreshClientsCache(); - // TODO: Down level the log to debug - logger.info( - `Refreshed api client cached in ${ - Date.now() - cacheRefreshStartTime - }ms. API client instantiated ${await redisCache.get( - TOTAL_CLIENT_INITIALIZATION_COUNT, - )} time(s) so far.`, + logger.debug( + `Refreshed api client cached in ${Date.now() - cacheRefreshStartTime}ms. There are ${ + cachedApiClients.length + } API client(s) in the pool.`, ); await delay(CLIENT_ALIVE_ASSUMPTION_TIME); } From 4c708ce505f04080f48091faa5293a3e0653ae20 Mon Sep 17 00:00:00 2001 From: Priojeet Date: Wed, 20 Dec 2023 14:16:26 +0100 Subject: [PATCH 10/10] Update services/blockchain-connector/shared/sdk/client.js Co-authored-by: Vardan Nadkarni --- services/blockchain-connector/shared/sdk/client.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/blockchain-connector/shared/sdk/client.js b/services/blockchain-connector/shared/sdk/client.js index e483d91972..784e02a973 100644 --- a/services/blockchain-connector/shared/sdk/client.js +++ b/services/blockchain-connector/shared/sdk/client.js @@ -157,7 +157,7 @@ const refreshClientsCache = async () => { index++; } } catch (err) { - logger.info(`Failed to check client aliveness.\nError:${err.message}`); + logger.info(`Failed to refresh an active API client from cache.\nError:${err.message}`); } }