diff --git a/docker-compose.yml b/docker-compose.yml index 9df2e15045..ed5c42c127 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -151,8 +151,6 @@ services: - ENABLE_TESTING_MODE=${ENABLE_TESTING_MODE} - ENABLE_BLOCK_CACHING=${ENABLE_BLOCK_CACHING} - EXPIRY_IN_HOURS=${EXPIRY_IN_HOURS} - - CLIENT_INSTANTIATION_MAX_WAIT_TIME=${CLIENT_INSTANTIATION_MAX_WAIT_TIME} - - CLIENT_INSTANTIATION_RETRY_INTERVAL=${CLIENT_INSTANTIATION_RETRY_INTERVAL} - CLIENT_ALIVE_ASSUMPTION_TIME=${CLIENT_ALIVE_ASSUMPTION_TIME} - HEARTBEAT_ACK_MAX_WAIT_TIME=${HEARTBEAT_ACK_MAX_WAIT_TIME} - ENDPOINT_INVOKE_MAX_RETRIES=${ENDPOINT_INVOKE_MAX_RETRIES} @@ -227,6 +225,7 @@ services: - JOB_INTERVAL_TRIGGER_ACCOUNT_UPDATES=${JOB_INTERVAL_TRIGGER_ACCOUNT_UPDATES} - JOB_SCHEDULE_TRIGGER_ACCOUNT_UPDATES=${JOB_SCHEDULE_TRIGGER_ACCOUNT_UPDATES} - ESTIMATES_BUFFER_BYTES_LENGTH=${ESTIMATES_BUFFER_BYTES_LENGTH} + - INVOKE_ALLOWED_METHODS=${INVOKE_ALLOWED_METHODS} restart: always blockchain-coordinator: diff --git a/docker/example.env b/docker/example.env index 9bb3acc863..c59a85b5c8 100644 --- a/docker/example.env +++ b/docker/example.env @@ -28,8 +28,6 @@ ## Lisk Service Blockchain Connector # GENESIS_BLOCK_URL='https://downloads.lisk.com/lisk/mainnet/genesis_block.json.tar.gz' -# CLIENT_INSTANTIATION_MAX_WAIT_TIME=5000 -# CLIENT_INSTANTIATION_RETRY_INTERVAL=1 # CLIENT_ALIVE_ASSUMPTION_TIME=5000 # HEARTBEAT_ACK_MAX_WAIT_TIME=1000 # ENDPOINT_INVOKE_MAX_RETRIES=3 @@ -67,7 +65,7 @@ LISK_APP_WS=ws://host.docker.internal:7887 # ENABLE_SNAPSHOT_ALLOW_INSECURE_HTTP=false # ACCOUNT_BALANCE_UPDATE_BATCH_SIZE=1000 # INDEX_BLOCKS_QUEUE_SCHEDULED_JOB_MAX_COUNT=100000 -# INVOKE_ALLOWED_METHODS='dynamicReward_getExpectedValidatorRewards,token_hasUserAccount,token_getInitializationFees,interoperability_getMinimumMessageFee,txpool_getTransactionsFromPool' +# INVOKE_ALLOWED_METHODS='dynamicReward_getExpectedValidatorRewards,token_hasUserAccount,token_getInitializationFees,interoperability_getMinimumMessageFee,txpool_getTransactionsFromPool,pos_getExpectedSharedRewards' # Moleculer jobs configuration # JOB_INTERVAL_DELETE_SERIALIZED_EVENTS=0 diff --git a/docs/antora/modules/ROOT/pages/configuration/index.adoc b/docs/antora/modules/ROOT/pages/configuration/index.adoc index 8a57e00427..9c26491d56 100644 --- a/docs/antora/modules/ROOT/pages/configuration/index.adoc +++ b/docs/antora/modules/ROOT/pages/configuration/index.adoc @@ -401,18 +401,6 @@ To disable it, set it to `false`. By default, it is set to `12` hours. | 12 -| `CLIENT_INSTANTIATION_MAX_WAIT_TIME` -| number -| Maximum wait time (in milliseconds) for the API client instantiation before forcefully instantiating a new client when getApiClient is invoked. -By default, it is set to `5000`. -| 5000 - -| `CLIENT_INSTANTIATION_RETRY_INTERVAL` -| number -| Retry interval (in milliseconds) to invoke instantiate API client when getApiClient is invoked. -By default, it is set to `1`. -| 1 - | `CLIENT_ALIVE_ASSUMPTION_TIME` | number | Interval (in milliseconds) for which the WS API Client is assumed to be alive since the last ping/pong success check. diff --git a/ecosystem.config.js b/ecosystem.config.js index 7262ff1706..9bf9da558c 100644 --- a/ecosystem.config.js +++ b/ecosystem.config.js @@ -127,8 +127,6 @@ module.exports = { // SERVICE_LOG_FILE: false, // DOCKER_HOST: 'local', // GENESIS_BLOCK_URL: 'https://downloads.lisk.com/lisk/mainnet/genesis_block.json.tar.gz', - // CLIENT_INSTANTIATION_MAX_WAIT_TIME: 5000, - // CLIENT_INSTANTIATION_RETRY_INTERVAL: 1, // CLIENT_ALIVE_ASSUMPTION_TIME: 5 * 1000, // HEARTBEAT_ACK_MAX_WAIT_TIME: 1000, // ENDPOINT_INVOKE_MAX_RETRIES: 3, @@ -196,7 +194,7 @@ module.exports = { // JOB_SCHEDULE_DELETE_FINALIZED_CCU_METADATA: '0 2 * * *', // JOB_INTERVAL_TRIGGER_ACCOUNT_UPDATES: 0, // JOB_SCHEDULE_TRIGGER_ACCOUNT_UPDATES: '*/15 * * * *', - // INVOKE_ALLOWED_METHODS: 'dynamicReward_getExpectedValidatorRewards,token_hasUserAccount,token_getInitializationFees,interoperability_getMinimumMessageFee,txpool_getTransactionsFromPool', + // INVOKE_ALLOWED_METHODS: 'dynamicReward_getExpectedValidatorRewards,token_hasUserAccount,token_getInitializationFees,interoperability_getMinimumMessageFee,txpool_getTransactionsFromPool,pos_getExpectedSharedRewards', }, }, { diff --git a/ecosystem.jenkins.config.js b/ecosystem.jenkins.config.js index cff9011106..5d92f9565d 100644 --- a/ecosystem.jenkins.config.js +++ b/ecosystem.jenkins.config.js @@ -110,7 +110,7 @@ module.exports = { ENABLE_DATA_RETRIEVAL_MODE: true, ENABLE_INDEXING_MODE: true, ENABLE_PERSIST_EVENTS: false, - // INVOKE_ALLOWED_METHODS: 'dynamicReward_getExpectedValidatorRewards,token_hasUserAccount,token_getInitializationFees,interoperability_getMinimumMessageFee,txpool_getTransactionsFromPool', + // INVOKE_ALLOWED_METHODS: 'dynamicReward_getExpectedValidatorRewards,token_hasUserAccount,token_getInitializationFees,interoperability_getMinimumMessageFee,txpool_getTransactionsFromPool,pos_getExpectedSharedRewards', }, }, { diff --git a/jenkins/docker-compose.nightly.yml b/jenkins/docker-compose.nightly.yml index 11f8cddb3f..186fb334a7 100644 --- a/jenkins/docker-compose.nightly.yml +++ b/jenkins/docker-compose.nightly.yml @@ -153,8 +153,6 @@ services: - ENABLE_TESTING_MODE=${ENABLE_TESTING_MODE} - ENABLE_BLOCK_CACHING=${ENABLE_BLOCK_CACHING} - EXPIRY_IN_HOURS=${EXPIRY_IN_HOURS} - - CLIENT_INSTANTIATION_MAX_WAIT_TIME=${CLIENT_INSTANTIATION_MAX_WAIT_TIME} - - CLIENT_INSTANTIATION_RETRY_INTERVAL=${CLIENT_INSTANTIATION_RETRY_INTERVAL} - CLIENT_ALIVE_ASSUMPTION_TIME=${CLIENT_ALIVE_ASSUMPTION_TIME} - ENDPOINT_INVOKE_MAX_RETRIES=${ENDPOINT_INVOKE_MAX_RETRIES} - ENDPOINT_INVOKE_RETRY_DELAY=${ENDPOINT_INVOKE_RETRY_DELAY} diff --git a/services/blockchain-connector/README.md b/services/blockchain-connector/README.md index 81ad85d552..dee0ea333b 100644 --- a/services/blockchain-connector/README.md +++ b/services/blockchain-connector/README.md @@ -34,8 +34,6 @@ A list of the most commonly used environment variables is presented below: - `GEOIP_JSON`: URL of GeoIP server. - `ENABLE_BLOCK_CACHING`: Boolean flag to enable the block caching. Enabled by default. To disable, set it to `false`. - `EXPIRY_IN_HOURS`: Expiry time (in hours) for block cache. By default, it is set to 12. -- `CLIENT_INSTANTIATION_MAX_WAIT_TIME`: Maximum wait time (in milliseconds) for the API client instantiation before forcefully instantiating a new client when getApiClient is invoked. By default, it is set to 5000. -- `CLIENT_INSTANTIATION_RETRY_INTERVAL`: Retry interval (in milliseconds) to invoke instantiate API client when getApiClient is invoked. By default, it is set to 5. - `CLIENT_ALIVE_ASSUMPTION_TIME`: Interval (in milliseconds) for which the WS API Client is assumed to be alive since the last ping/pong success check. By default, it is set to 5000. - `HEARTBEAT_ACK_MAX_WAIT_TIME`: Maximum time (in milliseconds) within which the checkIsClientAlive's algorithm expects a corresponding `pong` for the `ping` sent to the WS server. By default, it is set to 1000. - `ENDPOINT_INVOKE_MAX_RETRIES`: Maximum number of endpoint invocation request retries to the node. By default, it is set to 3. diff --git a/services/blockchain-connector/config.js b/services/blockchain-connector/config.js index baed64d092..f5b4fe9b77 100644 --- a/services/blockchain-connector/config.js +++ b/services/blockchain-connector/config.js @@ -115,12 +115,7 @@ config.job = { config.apiClient = { heartbeatAckMaxWaitTime: Number(process.env.HEARTBEAT_ACK_MAX_WAIT_TIME) || 1000, // in millisecs aliveAssumptionTime: Number(process.env.CLIENT_ALIVE_ASSUMPTION_TIME) || 5 * 1000, // in millisecs - aliveAssumptionTimeBeforeGenesis: 30 * 1000, - wsConnectionLimit: 10, - instantiation: { - maxWaitTime: Number(process.env.CLIENT_INSTANTIATION_MAX_WAIT_TIME) || 5 * 1000, // in millisecs - retryInterval: Number(process.env.CLIENT_INSTANTIATION_RETRY_INTERVAL) || 1, // in millisecs - }, + connectionLimit: 5, request: { maxRetries: Number(process.env.ENDPOINT_INVOKE_MAX_RETRIES) || 3, retryDelay: Number(process.env.ENDPOINT_INVOKE_RETRY_DELAY) || 10, // in millisecs diff --git a/services/blockchain-connector/shared/sdk/client.js b/services/blockchain-connector/shared/sdk/client.js index 3059711130..784e02a973 100644 --- a/services/blockchain-connector/shared/sdk/client.js +++ b/services/blockchain-connector/shared/sdk/client.js @@ -13,11 +13,7 @@ * Removal or modification of this copyright notice is prohibited. * */ -const { - Logger, - Signals, - Utils: { waitForIt }, -} = require('lisk-service-framework'); +const { Logger, Signals } = require('lisk-service-framework'); const { createWSClient, createIPCClient } = require('@liskhq/lisk-api-client'); const config = require('../../config'); @@ -28,127 +24,75 @@ const logger = Logger(); // Constants const timeoutMessage = 'Response not received in'; const liskAddress = config.endpoints.liskWs; -const RETRY_INTERVAL = config.apiClient.instantiation.retryInterval; -const MAX_INSTANTIATION_WAIT_TIME = config.apiClient.instantiation.maxWaitTime; const NUM_REQUEST_RETRIES = config.apiClient.request.maxRetries; const ENDPOINT_INVOKE_RETRY_DELAY = config.apiClient.request.retryDelay; const CLIENT_ALIVE_ASSUMPTION_TIME = config.apiClient.aliveAssumptionTime; -const CLIENT_ALIVE_ASSUMPTION_TIME_BEFORE_GENESIS = - config.apiClient.aliveAssumptionTimeBeforeGenesis; const HEARTBEAT_ACK_MAX_WAIT_TIME = config.apiClient.heartbeatAckMaxWaitTime; -const WS_CONNECTION_LIMIT = config.apiClient.wsConnectionLimit; - -// Caching and flags -let clientCache; -let instantiationBeginTime; -let lastClientAliveTime; -let heartbeatCheckBeginTime; -let isInstantiating = false; -let isClientAlive = false; -let isGenesisBlockIndexed = false; -let wsConnectionsEstablished = 0; - -const pongListener = res => { - isClientAlive = true; - lastClientAliveTime = Date.now(); - return res(true); -}; +const CONNECTION_LIMIT = config.apiClient.connectionLimit; + +// Pool of cached api clients +const cachedApiClients = []; -const checkIsClientAlive = async () => +const checkIsClientAlive = async clientCache => // eslint-disable-next-line consistent-return new Promise(resolve => { - if (!clientCache || (clientCache._channel && !clientCache._channel.isAlive)) { + if (!clientCache || !clientCache._channel || !clientCache._channel.isAlive) { return resolve(false); } - if ( - config.isUseLiskIPCClient || - Date.now() - lastClientAliveTime < CLIENT_ALIVE_ASSUMPTION_TIME || - // The below condition ensures that no other pings are sent when there's already a ping sent - // after the CLIENT_ALIVE_ASSUMPTION_TIME is exceeded - Date.now() - heartbeatCheckBeginTime < HEARTBEAT_ACK_MAX_WAIT_TIME * 2 - ) { - return resolve(clientCache._channel && clientCache._channel.isAlive); + // Skip heartbeat check for IPC client + if (config.isUseLiskIPCClient) { + return resolve(true); } - heartbeatCheckBeginTime = Date.now(); - const boundPongListener = () => pongListener(resolve); - + const heartbeatCheckBeginTime = Date.now(); const wsInstance = clientCache._channel._ws; + + // eslint-disable-next-line no-use-before-define + const boundPongListener = () => pongListener(resolve); wsInstance.on('pong', boundPongListener); - isClientAlive = false; wsInstance.ping(() => {}); // eslint-disable-next-line consistent-return const timeout = setTimeout(() => { - clearTimeout(timeout); wsInstance.removeListener('pong', boundPongListener); - if (!isClientAlive) return resolve(false); + logger.debug( + `Did not receive API client pong after ${Date.now() - heartbeatCheckBeginTime}ms.`, + ); + return resolve(false); }, HEARTBEAT_ACK_MAX_WAIT_TIME); - }).catch(() => false); - -// eslint-disable-next-line consistent-return -const instantiateClient = async (isForceReInstantiate = false) => { - try { - if (!isInstantiating || isForceReInstantiate) { - const isNodeClientAlive = await checkIsClientAlive(); - - if (!config.isUseLiskIPCClient) { - if (isNodeClientAlive) { - wsConnectionsEstablished = 0; - } else { - let numRetries = NUM_REQUEST_RETRIES; - while (wsConnectionsEstablished >= WS_CONNECTION_LIMIT && numRetries--) { - await delay(MAX_INSTANTIATION_WAIT_TIME); - if (await checkIsClientAlive()) { - wsConnectionsEstablished = 0; - return clientCache; - } - } - } - } - - isInstantiating = true; - if (!isNodeClientAlive || isForceReInstantiate) { - if (!config.isUseLiskIPCClient) wsConnectionsEstablished++; - instantiationBeginTime = Date.now(); - - if (clientCache) { - clientCache.disconnect().catch(err => { - // Ensure failed disconnection doesn't impact the re-instantiation - logger.warn(`Client disconnection failed due to: ${err.message}`); - }); - } - - clientCache = config.isUseLiskIPCClient - ? await createIPCClient(config.liskAppDataPath) - : await createWSClient(`${liskAddress}/rpc-ws`); - - lastClientAliveTime = Date.now(); + const pongListener = res => { + clearTimeout(timeout); + wsInstance.removeListener('pong', boundPongListener); + logger.debug(`Received API client pong in ${Date.now() - heartbeatCheckBeginTime}ms.`); + return res(true); + }; + }).catch(() => false); - if (isForceReInstantiate) logger.info('Re-instantiated the API client forcefully.'); +const instantiateAndCacheClient = async () => { + if (cachedApiClients.length > CONNECTION_LIMIT) { + logger.debug( + `Skipping API client instantiation as cached API client count(${cachedApiClients.length}) is greater than CONNECTION_LIMIT(${CONNECTION_LIMIT}).`, + ); + return; + } - // Inform listeners about the newly instantiated ApiClient - Signals.get('newApiClient').dispatch(); - } + try { + const instantiationBeginTime = Date.now(); + const clientCache = config.isUseLiskIPCClient + ? await createIPCClient(config.liskAppDataPath) + : await createWSClient(`${liskAddress}/rpc-ws`); - isInstantiating = false; - return clientCache; - } + cachedApiClients.push(clientCache); - if (Date.now() - instantiationBeginTime > MAX_INSTANTIATION_WAIT_TIME) { - // Waited too long, reset the flag to re-attempt client instantiation - logger.debug( - `MAX_INSTANTIATION_WAIT_TIME of ${MAX_INSTANTIATION_WAIT_TIME}ms has expired. Resetting isInstantiating to false.`, - ); - isInstantiating = false; - } + logger.info( + `Instantiated another API client. Time taken: ${ + Date.now() - instantiationBeginTime + }ms. Cached API client count:${cachedApiClients.length}`, + ); } catch (err) { // Nullify the apiClient cache and unset isInstantiating, so that it can be re-instantiated properly - clientCache = null; - isInstantiating = false; - const errMessage = config.isUseLiskIPCClient ? `Error instantiating IPC client at ${config.liskAppDataPath}.` : `Error instantiating WS client to ${liskAddress}.`; @@ -158,14 +102,14 @@ const instantiateClient = async (isForceReInstantiate = false) => { if (err.message.includes('ECONNREFUSED')) { throw new Error('ECONNREFUSED: Unable to reach a network node.'); } - - return null; } }; const getApiClient = async () => { - const apiClient = await waitForIt(instantiateClient, RETRY_INTERVAL); - return (await checkIsClientAlive()) ? apiClient : getApiClient(); + if (cachedApiClients.length === 0) { + throw new Error(`No api client is alive!`); + } + return cachedApiClients[0]; }; // eslint-disable-next-line consistent-return @@ -186,44 +130,83 @@ const invokeEndpoint = async (endpoint, params = {}, numRetries = NUM_REQUEST_RE } while (retries--); }; -// Checks to ensure that the API Client is always alive -if (config.isUseLiskIPCClient) { - const resetApiClientListener = async () => instantiateClient(true).catch(() => {}); - Signals.get('resetApiClient').add(resetApiClientListener); -} else { - let intervalTimeout; - const triggerRegularClientLivelinessChecks = intervalMs => { - intervalTimeout = setInterval(async () => { - const isAlive = await checkIsClientAlive(); - if (!isAlive) instantiateClient(true).catch(() => {}); - }, intervalMs); - }; - - const genesisBlockDownloadedListener = () => { - triggerRegularClientLivelinessChecks(CLIENT_ALIVE_ASSUMPTION_TIME_BEFORE_GENESIS); - logger.info( - `API client heartbeat checks scheduled every ${CLIENT_ALIVE_ASSUMPTION_TIME_BEFORE_GENESIS}ms. The frequency will be set to ${CLIENT_ALIVE_ASSUMPTION_TIME}ms after successful indexing of the genesis block.`, - ); - }; - - const genesisBlockIndexedListener = indexStatus => { - if ( - !isGenesisBlockIndexed && - indexStatus.data && - indexStatus.data.genesisHeight <= indexStatus.data.lastIndexedBlockHeight - ) { - clearInterval(intervalTimeout); - triggerRegularClientLivelinessChecks(CLIENT_ALIVE_ASSUMPTION_TIME); - isGenesisBlockIndexed = true; - logger.info( - `API client heartbeat checks re-scheduled to run every ${CLIENT_ALIVE_ASSUMPTION_TIME}ms.`, - ); +const disconnectClient = async cachedClient => { + try { + await cachedClient.disconnect(); + } catch (err) { + logger.warn(`Client disconnection failed due to: ${err.message}`); + } +}; + +const refreshClientsCache = async () => { + // Indicates if an active client was destroyed or no active clients available + let activeClientNotAvailable = cachedApiClients.length === 0; + + // Check liveliness and remove non-responsive clients + let index = 0; + while (index < cachedApiClients.length) { + const cachedClient = cachedApiClients[index]; + try { + if (!(await checkIsClientAlive(cachedClient))) { + cachedApiClients.splice(index, 1); + if (index === 0) { + activeClientNotAvailable = true; + } + await disconnectClient(cachedClient); + } else { + index++; + } + } catch (err) { + logger.info(`Failed to refresh an active API client from cache.\nError:${err.message}`); + } + } + + // Initiate new clients if necessary + let missingClientCount = CONNECTION_LIMIT - cachedApiClients.length; + + while (missingClientCount-- > 0) { + try { + await instantiateAndCacheClient(); + } catch (err) { + logger.info(`Failed to instantiate new api client.\nError:${err.message}`); } - }; + } - Signals.get('genesisBlockDownloaded').add(genesisBlockDownloadedListener); - Signals.get('updateIndexStatus').add(genesisBlockIndexedListener); -} + // Reset event listeners if active api client was destroyed + if (activeClientNotAvailable && cachedApiClients.length > 0) { + Signals.get('newApiClient').dispatch(); + } + + return cachedApiClients.length; +}; + +// Listen to client unresponsive events and try to reinitialize client +const resetApiClientListener = async () => { + logger.info( + `Received API client reset signal. Will drop ${cachedApiClients.length} cached API client(s).`, + ); + // Drop pool of cached clients. Will be instantiated by refresh clients job + while (cachedApiClients.length > 0) { + const cachedClient = cachedApiClients.pop(); + await disconnectClient(cachedClient); + } +}; +Signals.get('resetApiClient').add(resetApiClientListener); + +// Check periodically for client aliveness and refill cached clients pool +(async () => { + // eslint-disable-next-line no-constant-condition + while (true) { + const cacheRefreshStartTime = Date.now(); + await refreshClientsCache(); + logger.debug( + `Refreshed api client cached in ${Date.now() - cacheRefreshStartTime}ms. There are ${ + cachedApiClients.length + } API client(s) in the pool.`, + ); + await delay(CLIENT_ALIVE_ASSUMPTION_TIME); + } +})(); module.exports = { timeoutMessage, diff --git a/services/blockchain-indexer/config.js b/services/blockchain-indexer/config.js index a108c13017..fc57b117fd 100644 --- a/services/blockchain-indexer/config.js +++ b/services/blockchain-indexer/config.js @@ -222,6 +222,7 @@ config.invokeAllowedMethods = process.env.INVOKE_ALLOWED_METHODS 'token_getInitializationFees', 'interoperability_getMinimumMessageFee', 'txpool_getTransactionsFromPool', + 'pos_getExpectedSharedRewards', ]; module.exports = config;