diff --git a/docker-compose.yml b/docker-compose.yml index 984f72386d..89f55a779a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -153,8 +153,9 @@ services: - ENABLE_TESTING_MODE=${ENABLE_TESTING_MODE} - ENABLE_BLOCK_CACHING=${ENABLE_BLOCK_CACHING} - EXPIRY_IN_HOURS=${EXPIRY_IN_HOURS} - - CLIENT_ALIVE_ASSUMPTION_TIME=${CLIENT_ALIVE_ASSUMPTION_TIME} - - HEARTBEAT_ACK_MAX_WAIT_TIME=${HEARTBEAT_ACK_MAX_WAIT_TIME} + - CLIENT_POOL_SIZE=${CLIENT_POOL_SIZE} + - WS_SERVER_PING_INTERVAL=${WS_SERVER_PING_INTERVAL} + - WS_SERVER_PING_INTERVAL_BUFFER=${WS_SERVER_PING_INTERVAL_BUFFER} - ENDPOINT_INVOKE_MAX_RETRIES=${ENDPOINT_INVOKE_MAX_RETRIES} - ENDPOINT_INVOKE_RETRY_DELAY=${ENDPOINT_INVOKE_RETRY_DELAY} - CONNECTOR_EXIT_DELAY_IN_HOURS=${CONNECTOR_EXIT_DELAY_IN_HOURS} diff --git a/docker/example.env b/docker/example.env index 68983fc338..12f367ec66 100644 --- a/docker/example.env +++ b/docker/example.env @@ -36,10 +36,11 @@ # ==================================== # # GENESIS_BLOCK_URL='https://downloads.lisk.com/lisk/mainnet/genesis_block.json.tar.gz' -# CLIENT_ALIVE_ASSUMPTION_TIME=5000 -# HEARTBEAT_ACK_MAX_WAIT_TIME=1000 +# CLIENT_POOL_SIZE=10 +# WS_SERVER_PING_INTERVAL=3000 +# WS_SERVER_PING_INTERVAL_BUFFER=1000 # ENDPOINT_INVOKE_MAX_RETRIES=3 -# ENDPOINT_INVOKE_RETRY_DELAY=10 +# ENDPOINT_INVOKE_RETRY_DELAY=50 # CONNECTOR_EXIT_DELAY_IN_HOURS=0 # Moleculer jobs configuration diff --git a/docs/antora/modules/ROOT/pages/configuration/index.adoc b/docs/antora/modules/ROOT/pages/configuration/index.adoc index 08f882b511..577d8daae8 100644 --- a/docs/antora/modules/ROOT/pages/configuration/index.adoc +++ b/docs/antora/modules/ROOT/pages/configuration/index.adoc @@ -282,13 +282,13 @@ By default, it is set to `0`. | `JOB_INTERVAL_UPDATE_READINESS_STATUS` | number | Job run interval to update the readiness status. -By default, it is set to 0. +By default, it is set to `0`. | 0 | `JOB_SCHEDULE_UPDATE_READINESS_STATUS` | string | Job run cron schedule to update the readiness status. -By default, it is set to run every minute. +By default, it is set to run `every` minute. | * * * * * |=== @@ -413,15 +413,23 @@ To disable it, set it to `false`. By default, it is set to `12` hours. | 12 -| `CLIENT_ALIVE_ASSUMPTION_TIME` +| `CLIENT_POOL_SIZE` +| number +| Number of active API clients to be maintained in the pool. +Only applicable when using the IPC or WS clients to connect with the Lisk node. +By default, it is set to `10`. +| 10 + +| `WS_SERVER_PING_INTERVAL` | number -| Interval (in milliseconds) for which the WS API Client is assumed to be alive since the last ping/pong success check. -By default, it is set to `5000`. -| 5000 +| Interval (in milliseconds) at which the WS server checks for liveliness of all the connected clients. +This should not be modified unless explicitly recommended by the development team. +By default, it is set to `3000`. +| 3000 -| `HEARTBEAT_ACK_MAX_WAIT_TIME` +| `WS_SERVER_PING_INTERVAL_BUFFER` | number -| Maximum time (in milliseconds) within which the checkIsClientAlive's algorithm expects a corresponding `pong` for the `ping` sent to the WS server. +| A conservative assumption of the latency (in milliseconds) for WS server pings to arrive at the client. By default, it is set to `1000`. | 1000 @@ -434,8 +442,8 @@ By default, it is set to `3`. | `ENDPOINT_INVOKE_RETRY_DELAY` | number | Delay (in milliseconds) between each endpoint invocation request retry. -By default, it is set to `10`. -| 10 +By default, it is set to `50`. +| 50 | `CONNECTOR_EXIT_DELAY_IN_HOURS` | number @@ -448,13 +456,13 @@ By default, it is set to `0`. | `JOB_INTERVAL_CACHE_CLEANUP` | number | Job run interval to clean up block cache. -By default, it is set to 0. +By default, it is set to `0`. | 0 | `JOB_SCHEDULE_CACHE_CLEANUP` | string | Job run cron schedule to clean up block cache. -By default, it is set to run every 12 hours. +By default, it is set to run every `12` hours. |0 */12 * * * | `JOB_INTERVAL_REFRESH_PEERS` @@ -543,7 +551,7 @@ By default, it is set to `0`. | `JOB_SCHEDULE_INDEX_MISSING_BLOCKS` | string | Job run cron schedule to index missing blocks. -By default, it is set to run every 5 minutes. +By default, it is set to run every `5` minutes. | */5 * * * * |=== @@ -649,7 +657,7 @@ By default, it is set to `0`. | `JOB_SCHEDULE_DELETE_SERIALIZED_EVENTS` | string | Job run cron schedule to delete serialized events. -By default, it is set to run every 5 minutes. +By default, it is set to run every `5` minutes. | */5 * * * * | `JOB_INTERVAL_REFRESH_VALIDATORS` @@ -661,7 +669,7 @@ By default, it is set to `0`. | `JOB_SCHEDULE_REFRESH_VALIDATORS` | string | Job run cron schedule to refresh validators cache. -By default, it is set to run every 5 minutes. +By default, it is set to run every `5` minutes. | */5 * * * * | `JOB_INTERVAL_VALIDATE_VALIDATORS_RANK` @@ -697,7 +705,7 @@ By default, it is set to `0`. | `JOB_SCHEDULE_REFRESH_BLOCKCHAIN_APPS_STATS` | string | Job run cron schedule to refresh blockchain application statistics. -By default, it is set to run every 15 minutes. +By default, it is set to run every `15` minutes. | */15 * * * * | `JOB_INTERVAL_REFRESH_ACCOUNT_KNOWLEDGE` @@ -709,7 +717,7 @@ By default, it is set to `0`. | `JOB_SCHEDULE_REFRESH_ACCOUNT_KNOWLEDGE` | string | Job run cron schedule to refresh account knowledge. -By default, it is set to run every 15 minutes. +By default, it is set to run every `15` minutes. | */15 * * * * | `JOB_INTERVAL_DELETE_FINALIZED_CCU_METADATA` @@ -721,7 +729,7 @@ By default, it is set to `0`. | `JOB_SCHEDULE_DELETE_FINALIZED_CCU_METADATA` | string | Job run cron schedule to delete finalized CCU metadata. -By default, it is set to run once a day at 02:00 am. +By default, it is set to run once a day at `02:00 am`. | 0 2 * * * | `JOB_INTERVAL_TRIGGER_ACCOUNT_UPDATES` @@ -733,7 +741,7 @@ By default, it is set to `0`. | `JOB_SCHEDULE_TRIGGER_ACCOUNT_UPDATES` | string | Job run cron schedule to trigger account updates. -By default, it is set to run every 15 minutes. +By default, it is set to run every `15` minutes. | */15 * * * * | `ESTIMATES_BUFFER_BYTES_LENGTH` @@ -789,7 +797,7 @@ To accelerate the indexing process, the blockchain-indexer microservice also sup |`DURABILITY_VERIFY_FREQUENCY` |number -|Frequency in milliseconds to verify if a block is indexed or rolled-back successfully. By default, it is set to 1. +|Frequency in milliseconds to verify if a block is indexed or rolled-back successfully. By default, it is set to `1`. |1 |`INDEX_SNAPSHOT_URL` @@ -908,7 +916,7 @@ By default, it is set to `0`. | `JOB_SCHEDULE_DELETE_NON_METADATA_FILES` | string | Job run cron schedule to delete non-metadata files. -By default, it is set to run every day at midnight. +By default, it is set to run every day at `midnight`. | 0 0 * * * | `JOB_INTERVAL_UPDATE_METADATA` @@ -1111,7 +1119,7 @@ By default, it is set to `0`. | `JOB_SCHEDULE_REFRESH_TRANSACTION_STATS` | string | Job run cron schedule to refresh transaction statistics. -By default, it is set to run every 30 minutes. +By default, it is set to run every `30` minutes. | */30 * * * * | `JOB_INTERVAL_VERIFY_TRANSACTION_STATS` @@ -1123,7 +1131,7 @@ By default, it is set to `0`. | `JOB_SCHEDULE_VERIFY_TRANSACTION_STATS` | string |Job run cron schedule to verify if the transaction statistics have been built correctly. -By default, it is set to run every 3rd hour after the first `15` minutes. +By default, it is set to run every `3rd hour` after the first `15` minutes. | 15 */3 * * * |=== @@ -1212,7 +1220,7 @@ By default, it is set to `0`. | `JOB_SCHEDULE_REFRESH_PRICES_BINANCE` | string | Job run cron schedule to refresh prices from Binance. -By default, it is set to run every minute. +By default, it is set to run `every` minute. |* * * * * | `JOB_INTERVAL_REFRESH_PRICES_BITTREX` @@ -1224,7 +1232,7 @@ By default, it is set to `0`. | `JOB_SCHEDULE_REFRESH_PRICES_BITTREX` | string | Job run cron schedule to refresh prices from Bittrex. -By default, it is set to run every minute. +By default, it is set to run `every` minute. |* * * * * | `JOB_INTERVAL_REFRESH_PRICES_EXCHANGERATESAPI` @@ -1236,7 +1244,7 @@ By default, it is set to `0`. | `JOB_SCHEDULE_REFRESH_PRICES_EXCHANGERATESAPI` | string | Job run cron schedule to refresh prices from exchangeratesapi. -By default, it is set to run every minute. +By default, it is set to run `every` minute. |* * * * * | `JOB_INTERVAL_REFRESH_PRICES_KRAKEN` @@ -1248,7 +1256,7 @@ By default, it is set to `0`. | `JOB_SCHEDULE_REFRESH_PRICES_KRAKEN` | string | Job run cron schedule to refresh prices from Kraken. -By default, it is set to run every minute. +By default, it is set to run `every` minute. |* * * * * | `JOB_INTERVAL_UPDATE_PRICES` @@ -1399,7 +1407,7 @@ By default, it is set to `0`. | `JOB_SCHEDULE_CACHE_PURGE` | string | Job run cron schedule to clean up the cache. -By default, it is set to run daily at 04:45 am. +By default, it is set to run daily at `04:45 am`. | 45 4 * * * |=== diff --git a/ecosystem.config.js b/ecosystem.config.js index 871a5a30f6..6613bcda43 100644 --- a/ecosystem.config.js +++ b/ecosystem.config.js @@ -129,10 +129,11 @@ module.exports = { // SERVICE_LOG_FILE: false, // DOCKER_HOST: 'local', // GENESIS_BLOCK_URL: 'https://downloads.lisk.com/lisk/mainnet/genesis_block.json.tar.gz', - // CLIENT_ALIVE_ASSUMPTION_TIME: 5 * 1000, - // HEARTBEAT_ACK_MAX_WAIT_TIME: 1000, + // CLIENT_POOL_SIZE: 10, + // WS_SERVER_PING_INTERVAL=3000, + // WS_SERVER_PING_INTERVAL_BUFFER=1000, // ENDPOINT_INVOKE_MAX_RETRIES: 3, - // ENDPOINT_INVOKE_RETRY_DELAY: 10, + // ENDPOINT_INVOKE_RETRY_DELAY: 50, // CONNECTOR_EXIT_DELAY_IN_HOURS: 0, // JOB_INTERVAL_CACHE_CLEANUP: 0, // JOB_SCHEDULE_CACHE_CLEANUP: '0 */12 * * *', diff --git a/jenkins/docker-compose.nightly.yml b/jenkins/docker-compose.nightly.yml index 186fb334a7..77822f171b 100644 --- a/jenkins/docker-compose.nightly.yml +++ b/jenkins/docker-compose.nightly.yml @@ -153,7 +153,9 @@ services: - ENABLE_TESTING_MODE=${ENABLE_TESTING_MODE} - ENABLE_BLOCK_CACHING=${ENABLE_BLOCK_CACHING} - EXPIRY_IN_HOURS=${EXPIRY_IN_HOURS} - - CLIENT_ALIVE_ASSUMPTION_TIME=${CLIENT_ALIVE_ASSUMPTION_TIME} + - CLIENT_POOL_SIZE=${CLIENT_POOL_SIZE} + - WS_SERVER_PING_INTERVAL=${WS_SERVER_PING_INTERVAL} + - WS_SERVER_PING_INTERVAL_BUFFER=${WS_SERVER_PING_INTERVAL_BUFFER} - ENDPOINT_INVOKE_MAX_RETRIES=${ENDPOINT_INVOKE_MAX_RETRIES} - ENDPOINT_INVOKE_RETRY_DELAY=${ENDPOINT_INVOKE_RETRY_DELAY} - CONNECTOR_EXIT_DELAY_IN_HOURS=${CONNECTOR_EXIT_DELAY_IN_HOURS} diff --git a/services/blockchain-app-registry/README.md b/services/blockchain-app-registry/README.md index 06ee290bbe..b92bcb4264 100644 --- a/services/blockchain-app-registry/README.md +++ b/services/blockchain-app-registry/README.md @@ -30,9 +30,9 @@ A list of the most commonly used environment variables is presented below: - `SERVICE_APP_REGISTRY_MYSQL`: Connection string of the MySQL instance that the microservice connects to. - `ENABLE_REBUILD_INDEX_AT_INIT`: Boolean flag to truncate the index and rebuild at application init. - `DEFAULT_APPS`: Default blockchain applications. By default, it is set to `lisk_mainchain`. -- `JOB_INTERVAL_DELETE_NON_METADATA_FILES`: Job run interval to delete non-metadata files. By default, it is set to 0. +- `JOB_INTERVAL_DELETE_NON_METADATA_FILES`: Job run interval to delete non-metadata files. By default, it is set to `0`. - `JOB_SCHEDULE_DELETE_NON_METADATA_FILES`: Job run cron schedule to delete non-metadata files. By default, it is set to run every day at midnight (`0 0 * * *`). -- `JOB_INTERVAL_UPDATE_METADATA`: Job run interval to update off-chain metadata. By default, it is set to 0. +- `JOB_INTERVAL_UPDATE_METADATA`: Job run interval to update off-chain metadata. By default, it is set to `0`. - `JOB_SCHEDULE_UPDATE_METADATA`: Job run cron schedule to update off-chain metadata. By default, it is set to run every 10 minutes (`*/10 * * * *`). > **Note**: `interval` takes priority over `schedule` and must be greater than 0 to be valid for all the moleculer job configurations. diff --git a/services/blockchain-connector/README.md b/services/blockchain-connector/README.md index dce73ac917..ce074adf15 100644 --- a/services/blockchain-connector/README.md +++ b/services/blockchain-connector/README.md @@ -35,16 +35,17 @@ A list of the most commonly used environment variables is presented below: - `GENESIS_BLOCK_URL`: URL of the Lisk SDK-based application' genesis block. Only to be used when the genesis block is large enough to be transmitted over API calls within the timeout. - `GEOIP_JSON`: URL of GeoIP server. - `ENABLE_BLOCK_CACHING`: Boolean flag to enable the block caching. Enabled by default. To disable, set it to `false`. -- `EXPIRY_IN_HOURS`: Expiry time (in hours) for block cache. By default, it is set to 12. -- `CLIENT_ALIVE_ASSUMPTION_TIME`: Interval (in milliseconds) for which the WS API Client is assumed to be alive since the last ping/pong success check. By default, it is set to 5000. -- `HEARTBEAT_ACK_MAX_WAIT_TIME`: Maximum time (in milliseconds) within which the checkIsClientAlive's algorithm expects a corresponding `pong` for the `ping` sent to the WS server. By default, it is set to 1000. -- `ENDPOINT_INVOKE_MAX_RETRIES`: Maximum number of endpoint invocation request retries to the node. By default, it is set to 3. -- `ENDPOINT_INVOKE_RETRY_DELAY`: Delay (in milliseconds) between each endpoint invocation request retry. By default, it is set to 10. +- `EXPIRY_IN_HOURS`: Expiry time (in hours) for block cache. By default, it is set to `12`. +- `CLIENT_POOL_SIZE`: Number of active API clients to be maintained in the pool. Only applicable when using the IPC or WS clients to connect with the Lisk node. By default, it is set to `10`. +- `WS_SERVER_PING_INTERVAL`: Interval (in milliseconds) at which the WS server checks for liveliness of all the connected clients. This should not be modified unless explicitly recommended by the development team. By default, it is set to `3000`. +- `WS_SERVER_PING_INTERVAL_BUFFER`: A conservative assumption of the latency (in milliseconds) for WS server pings to arrive at the client. By default, it is set to `1000`. +- `ENDPOINT_INVOKE_MAX_RETRIES`: Maximum number of endpoint invocation request retries to the node. By default, it is set to `3`. +- `ENDPOINT_INVOKE_RETRY_DELAY`: Delay (in milliseconds) between each endpoint invocation request retry. By default, it is set to `50`. - `CONNECTOR_EXIT_DELAY_IN_HOURS`: Delay (in hours) after which the blockchain-connector microservice exits. The service should restart automatically if deployed using Docker or PM2. To be removed eventually. To enable it, set it higher than `0`. By default, it is set to `0`. -- `JOB_INTERVAL_CACHE_CLEANUP`: Job run interval to cleanup block cache. By default, it is set to 0. +- `JOB_INTERVAL_CACHE_CLEANUP`: Job run interval to cleanup block cache. By default, it is set to `0`. - `JOB_SCHEDULE_CACHE_CLEANUP`: Job run cron schedule to cleanup block cache. By default, it is set to run every 12 hours (`0 */12 * * *`). -- `JOB_INTERVAL_REFRESH_PEERS`: Job run interval to refresh the peers list. By default, it is set to run every 60 seconds. -- `JOB_SCHEDULE_REFRESH_PEERS`: Job run cron schedule to refresh the peers list. By default, it is set to ''. +- `JOB_INTERVAL_REFRESH_PEERS`: Job run interval to refresh the peers list. By default, it is set to run every `60` seconds. +- `JOB_SCHEDULE_REFRESH_PEERS`: Job run cron schedule to refresh the peers list. By default, it is set to `''`. > **Note**: `interval` takes priority over `schedule` and must be greater than 0 to be valid for all the moleculer job configurations. diff --git a/services/blockchain-connector/config.js b/services/blockchain-connector/config.js index f3342dea64..ad8864446f 100644 --- a/services/blockchain-connector/config.js +++ b/services/blockchain-connector/config.js @@ -116,12 +116,12 @@ config.job = { }; config.apiClient = { - heartbeatAckMaxWaitTime: Number(process.env.HEARTBEAT_ACK_MAX_WAIT_TIME) || 1000, // in millisecs - aliveAssumptionTime: Number(process.env.CLIENT_ALIVE_ASSUMPTION_TIME) || 5 * 1000, // in millisecs - connectionLimit: 5, + poolSize: Number(process.env.CLIENT_POOL_SIZE) || 10, + wsServerPingInterval: Number(process.env.WS_SERVER_PING_INTERVAL) || 3 * 1000, // in millisecs + pingIntervalBuffer: Number(process.env.WS_SERVER_PING_INTERVAL_BUFFER) || 1000, // in millisecs request: { maxRetries: Number(process.env.ENDPOINT_INVOKE_MAX_RETRIES) || 3, - retryDelay: Number(process.env.ENDPOINT_INVOKE_RETRY_DELAY) || 10, // in millisecs + retryDelay: Number(process.env.ENDPOINT_INVOKE_RETRY_DELAY) || 50, // in millisecs }, }; diff --git a/services/blockchain-connector/shared/sdk/client.js b/services/blockchain-connector/shared/sdk/client.js index f6e90d05e9..ff56665441 100644 --- a/services/blockchain-connector/shared/sdk/client.js +++ b/services/blockchain-connector/shared/sdk/client.js @@ -18,108 +18,215 @@ const { Signals, HTTP, Exceptions: { TimeoutException }, + Utils: { isObject, waitForIt }, } = require('lisk-service-framework'); const { createWSClient, createIPCClient } = require('@liskhq/lisk-api-client'); +const crypto = require('crypto'); + const config = require('../../config'); const delay = require('../utils/delay'); const logger = Logger(); +// Connection strings +const liskAddressWs = config.endpoints.liskWs; +const liskAddressHttp = config.endpoints.liskHttp; + // Constants -const HTTP_TIMEOUT_STATUS = 'ETIMEDOUT'; -const RPC_TIMEOUT_MESSAGE = 'Response not received in'; -const TIMEOUT_REGEX_STR = `(?:${HTTP_TIMEOUT_STATUS}|${RPC_TIMEOUT_MESSAGE})`; +const ERROR_CONN_REFUSED = 'ECONNREFUSED'; +const STATUS_HTTP_TIMEOUT = 'ETIMEDOUT'; +const MESSAGE_RPC_TIMEOUT = 'Response not received in'; +const TIMEOUT_REGEX_STR = `(?:${STATUS_HTTP_TIMEOUT}|${MESSAGE_RPC_TIMEOUT})`; const TIMEOUT_REGEX = new RegExp(TIMEOUT_REGEX_STR); -const liskAddressWs = config.endpoints.liskWs; -const liskAddressHttp = config.endpoints.liskHttp; +const MAX_CLIENT_POOL_SIZE = config.apiClient.poolSize; const NUM_REQUEST_RETRIES = config.apiClient.request.maxRetries; const ENDPOINT_INVOKE_RETRY_DELAY = config.apiClient.request.retryDelay; -const CLIENT_ALIVE_ASSUMPTION_TIME = config.apiClient.aliveAssumptionTime; -const HEARTBEAT_ACK_MAX_WAIT_TIME = config.apiClient.heartbeatAckMaxWaitTime; -const CONNECTION_LIMIT = config.apiClient.connectionLimit; - -// Pool of cached API clients -const cachedApiClients = []; - -const checkIsClientAlive = async clientCache => - // eslint-disable-next-line consistent-return - new Promise(resolve => { - if (!clientCache || !clientCache._channel || !clientCache._channel.isAlive) { - return resolve(false); - } +const WS_SERVER_PING_INTERVAL = config.apiClient.wsServerPingInterval; +const WS_SERVER_PING_BUFFER = config.apiClient.pingIntervalBuffer; // In case the server is under stress +const WS_SERVER_PING_THRESHOLD = WS_SERVER_PING_INTERVAL + WS_SERVER_PING_BUFFER; + +// Caching +const clientPool = []; +const clientInstantiationStats = { + attempts: 0, + success: 0, + fail: 0, +}; +let requestCount = 0; - if (config.isUseLiskIPCClient) { - return resolve(clientCache._channel.isAlive); - } +const getApiClientStats = () => ({ + ...clientInstantiationStats, + currentPoolSize: clientPool.length, + expectedPoolSize: MAX_CLIENT_POOL_SIZE, + numEndpointInvocations: requestCount, +}); + +const checkIsClientAlive = client => client && client._channel && client._channel.isAlive; - const heartbeatCheckBeginTime = Date.now(); - const wsClientInstance = clientCache._channel._ws; +const pingListener = apiClient => { + if (!isObject(apiClient)) { + logger.warn(`apiClient is ${JSON.stringify(apiClient)}. Cannot register a pingListener.`); + return; + } - // eslint-disable-next-line no-use-before-define - const boundPongListener = () => pongListener(resolve); - wsClientInstance.on('pong', boundPongListener); - wsClientInstance.ping(() => {}); + const now = Date.now(); + logger.trace(`Client ${apiClient.poolIndex} received server ping at ${now}.`); + clearTimeout(apiClient.pingTimeout); - // eslint-disable-next-line consistent-return - const timeout = setTimeout(() => { - wsClientInstance.removeListener('pong', boundPongListener); + apiClient.pingTimeout = setTimeout(() => { + // Do not reset if the ping was delayed and just received + const timeSinceLastPing = now - apiClient.lastPingAt; + if (timeSinceLastPing) { + logger.warn( + `No ping for client ${apiClient.poolIndex} from server in ${timeSinceLastPing}ms (last ping: ${apiClient.lastPingAt}).`, + ); + apiClient._channel.isAlive = false; + Signals.get('resetApiClient').dispatch(apiClient); logger.debug( - `Did not receive API client pong after ${Date.now() - heartbeatCheckBeginTime}ms.`, + `Dispatched 'resetApiClient' signal from pingListener for API client ${apiClient.poolIndex}.`, ); - return resolve(false); - }, HEARTBEAT_ACK_MAX_WAIT_TIME); - - const pongListener = res => { - clearTimeout(timeout); - wsClientInstance.removeListener('pong', boundPongListener); - logger.debug(`Received API client pong in ${Date.now() - heartbeatCheckBeginTime}ms.`); - return res(true); - }; - }).catch(() => false); - -const instantiateAndCacheClient = async () => { - if (cachedApiClients.length > CONNECTION_LIMIT) { - logger.debug( - `Skipping API client instantiation as cached API client count(${cachedApiClients.length}) is greater than CONNECTION_LIMIT(${CONNECTION_LIMIT}).`, - ); - return; - } + } + }, WS_SERVER_PING_THRESHOLD); + apiClient.lastPingAt = now; +}; + +const instantiateNewClient = async () => { + clientInstantiationStats.attempts++; try { - const instantiationBeginTime = Date.now(); - const clientCache = config.isUseLiskIPCClient + const newClient = config.isUseLiskIPCClient ? await createIPCClient(config.liskAppDataPath) - : await createWSClient(`${liskAddressWs}/rpc-ws`); + : await (async () => { + const client = await createWSClient(`${liskAddressWs}/rpc-ws`); + client._channel._ws.on('ping', pingListener.bind(null, client)); + return client; + })(); + + clientInstantiationStats.success++; + return newClient; + } catch (err) { + clientInstantiationStats.fail++; + const errMessage = config.isUseLiskIPCClient + ? `Error instantiating IPC client at ${config.liskAppDataPath}` + : `Error instantiating WS client to ${liskAddressWs}`; + + logger.error(`${errMessage}: ${err.message}`); + if (err.message.includes(ERROR_CONN_REFUSED)) { + throw new Error('Unable to connect to the node.'); + } + + throw err; + } +}; - cachedApiClients.push(clientCache); +const initClientPool = async poolSize => { + // Set the intervals only at application init + if (clientPool.length === 0) { + setInterval(() => { + logger.info(`API client instantiation stats: ${JSON.stringify(getApiClientStats())}`); + }, 5 * 60 * 1000); + + setInterval(() => { + clientPool.forEach(async (apiClient, index) => { + if (isObject(apiClient)) return; + + // Re-instantiate when null + clientPool[index] = await instantiateNewClient() + .then(client => { + client.poolIndex = index; + return client; + }) + .catch(() => null); + }); + }, WS_SERVER_PING_INTERVAL); + } + try { + const startTime = Date.now(); + for (let i = 0; i < poolSize; i++) { + // Do not instantiate new clients if enough clients already cached + if (clientPool.length >= poolSize) break; + + const newApiClient = await instantiateNewClient(); + newApiClient.poolIndex = clientPool.length; + clientPool.push(newApiClient); + } logger.info( - `Instantiated another API client. Time taken: ${ - Date.now() - instantiationBeginTime - }ms. Cached API client count:${cachedApiClients.length}`, + `Initialized client pool in ${Date.now() - startTime}ms with ${clientPool.length} instances.`, ); } catch (err) { - // Nullify the apiClient cache and unset isInstantiating, so that it can be re-instantiated properly - const errMessage = config.isUseLiskIPCClient - ? `Error instantiating IPC client at ${config.liskAppDataPath}.` - : `Error instantiating WS client to ${liskAddressWs}.`; - - logger.error(errMessage); - logger.error(err.message); - if (err.message.includes('ECONNREFUSED')) { - throw new Error('ECONNREFUSED: Unable to reach a network node.'); - } + logger.warn( + clientPool.length + ? `API client pool initialization failed due to: ${err.message}\nManaged to initialize the pool with only ${clientPool.length} instead of expected ${poolSize} clients.` + : `API client pool initialization failed due to: ${err.message}`, + ); + throw err; } }; -const getApiClient = async () => { - if (cachedApiClients.length === 0) { - await instantiateAndCacheClient(); +const getApiClient = async poolIndex => { + if (!clientPool.length) await initClientPool(MAX_CLIENT_POOL_SIZE); + + const index = Number.isNaN(Number(poolIndex)) + ? crypto.randomInt(Math.min(clientPool.length, MAX_CLIENT_POOL_SIZE)) + : poolIndex; + + const apiClient = clientPool[index]; + return checkIsClientAlive(apiClient) + ? apiClient + : (() => { + if (apiClient) { + Signals.get('resetApiClient').dispatch(apiClient); + logger.debug( + `Dispatched 'resetApiClient' signal from getApiClient for API client ${apiClient.poolIndex}.`, + ); + } + return waitForIt(getApiClient, 10); + })(); +}; + +const resetApiClient = async (apiClient, isEventSubscriptionClient = false) => { + // Replace the dead API client in the pool + if (!isObject(apiClient)) { + logger.warn(`apiClient is ${JSON.stringify(apiClient)}. Cannot reset.`); + return; } - return cachedApiClients[0]; + + const { poolIndex } = apiClient; + + // Do not attempt reset if last ping was within the acceptable threshold + // This is to avoid unnecessary socket creation + if (Date.now() - (apiClient.lastPingAt || 0) < WS_SERVER_PING_THRESHOLD) { + logger.debug(`Not resetting apiClient ${poolIndex}. Received a late ping from the server.`); + return; + } + + if (isEventSubscriptionClient) { + logger.info(`Attempting to reset the eventSubscriptionClient: apiClient ${poolIndex}.`); + Signals.get('eventSubscriptionClientReset').dispatch(); + } else { + logger.info(`Attempting to reset apiClient ${poolIndex}.`); + } + + await apiClient + .disconnect() + .catch(err => logger.warn(`Error disconnecting apiClient: ${err.message}. Will proceed.`)); + + const newApiClient = await instantiateNewClient() + .then(client => { + client.poolIndex = poolIndex; + logger.info(`Successfully reset apiClient ${poolIndex}.`); + return client; + }) + .catch(() => null); + + clientPool[poolIndex] = newApiClient; + + if (newApiClient) Signals.get('newApiClient').dispatch(newApiClient.poolIndex); }; +Signals.get('resetApiClient').add(resetApiClient); const is2XXResponse = response => String(response.status).startsWith('2'); const isSuccessResponse = response => is2XXResponse(response) && response.data.result; @@ -137,17 +244,17 @@ const buildHTTPResponse = (endpoint, params, response) => { throw new Error(errorMessage); }; -let id = 0; // eslint-disable-next-line consistent-return const invokeEndpoint = async (endpoint, params = {}, numRetries = NUM_REQUEST_RETRIES) => { let retriesLeft = numRetries; do { try { + requestCount++; if (config.isUseHttpApi) { // HTTP API-based communication with the Lisk app node const rpcRequest = { jsonrpc: '2.0', - id: id++, + id: requestCount, method: endpoint, params, }; @@ -164,110 +271,25 @@ const invokeEndpoint = async (endpoint, params = {}, numRetries = NUM_REQUEST_RE if (TIMEOUT_REGEX.test(err.message)) { if (!retriesLeft) { const exceptionMsg = Object.getOwnPropertyNames(params).length - ? `Request timed out when calling '${endpoint}' with params:\n${JSON.stringify( - params, - null, - '\t', - )}.` - : `Request timed out when calling '${endpoint}'.`; + ? `Invocation timed out for '${endpoint}' with params:\n${JSON.stringify(params)}.` + : `Invocation timed out for '${endpoint}'.`; throw new TimeoutException(exceptionMsg); } await delay(ENDPOINT_INVOKE_RETRY_DELAY); } else { - if (Object.getOwnPropertyNames(params).length) { - logger.warn( - `Error invoking '${endpoint}' with params:\n${JSON.stringify(params, null, ' ')}.\n${ - err.stack - }`, - ); - } + logger.warn( + Object.getOwnPropertyNames(params).length + ? `Error invoking '${endpoint}' with params:\n${JSON.stringify(params)}.\n${err.stack}` + : `Error invoking '${endpoint}'.\n${err.stack}`, + ); - logger.warn(`Error occurred when calling '${endpoint}' :\n${err.stack}`); throw err; } } } while (retriesLeft--); }; -const disconnectClient = async cachedClient => { - try { - await cachedClient.disconnect(); - } catch (err) { - logger.warn(`Client disconnection failed due to: ${err.message}`); - } -}; - -const refreshClientsCache = async () => { - // Indicates if an active client was destroyed or no active clients available - let activeClientNotAvailable = cachedApiClients.length === 0; - - // Check liveliness and remove non-responsive clients - let index = 0; - while (index < cachedApiClients.length) { - const cachedClient = cachedApiClients[index]; - try { - if (!(await checkIsClientAlive(cachedClient))) { - cachedApiClients.splice(index, 1); - if (index === 0) { - activeClientNotAvailable = true; - } - await disconnectClient(cachedClient); - } else { - index++; - } - } catch (err) { - logger.info(`Failed to refresh an active API client from cache.\nError:${err.message}`); - } - } - - // Initiate new clients if necessary - let missingClientCount = CONNECTION_LIMIT - cachedApiClients.length; - - while (missingClientCount-- > 0) { - try { - await instantiateAndCacheClient(); - } catch (err) { - logger.warn(`Failed to instantiate new API client.\nError: ${err.message}`); - } - } - - // Reset event listeners if active API client was destroyed - if (activeClientNotAvailable && cachedApiClients.length > 0) { - Signals.get('newApiClient').dispatch(); - } - - return cachedApiClients.length; -}; - -// Listen to client unresponsive events and try to reinitialize client -const resetApiClientListener = async () => { - logger.info( - `Received API client reset signal. Will drop ${cachedApiClients.length} cached API client(s).`, - ); - // Drop pool of cached clients. Will be instantiated by refresh clients job - while (cachedApiClients.length > 0) { - const cachedClient = cachedApiClients.pop(); - await disconnectClient(cachedClient); - } -}; -Signals.get('resetApiClient').add(resetApiClientListener); - -if (!config.isUseHttpApi) { - refreshClientsCache(); // Initialize the client cache - - // Periodically ensure API client liveliness - setInterval(async () => { - const cacheRefreshStartTime = Date.now(); - await refreshClientsCache(); - logger.debug( - `Refreshed API client cache in ${Date.now() - cacheRefreshStartTime}ms. There are ${ - cachedApiClients.length - } API client(s) in the pool.`, - ); - }, CLIENT_ALIVE_ASSUMPTION_TIME); -} - module.exports = { TIMEOUT_REGEX, diff --git a/services/blockchain-connector/shared/sdk/events.js b/services/blockchain-connector/shared/sdk/events.js index 654a46fd5b..1d92131d44 100644 --- a/services/blockchain-connector/shared/sdk/events.js +++ b/services/blockchain-connector/shared/sdk/events.js @@ -47,6 +47,7 @@ const events = [ EVENT_TX_POOL_TRANSACTION_NEW, ]; +let eventSubscribeClientPoolIndex; let eventsCounter; let lastBlockHeightEvent; @@ -86,13 +87,24 @@ const emitEngineEvents = async () => { }; // eslint-disable-next-line consistent-return -const subscribeToAllRegisteredEvents = async () => { +const subscribeToAllRegisteredEvents = async (newClientPoolIndex = null) => { if (config.isUseHttpApi) return emitEngineEvents(); + // Active client subscription available, skip invocation + if ( + typeof eventSubscribeClientPoolIndex === 'number' && + eventSubscribeClientPoolIndex !== newClientPoolIndex + ) { + return null; + } + // Reset eventsCounter first eventsCounter = 0; const apiClient = await getApiClient(); + eventSubscribeClientPoolIndex = apiClient.poolIndex; + logger.info(`Subscribing events with apiClient ${eventSubscribeClientPoolIndex}.`); + const registeredEvents = await getRegisteredEvents(); const allEvents = registeredEvents.concat(events); allEvents.forEach(event => { @@ -126,7 +138,7 @@ const ensureAPIClientLiveness = () => { if (config.isUseHttpApi) return; if (isNodeSynced && isGenesisBlockDownloaded) { - setInterval(() => { + setInterval(async () => { if (typeof eventsCounter === 'number' && eventsCounter > 0) { eventsCounter = 0; } else { @@ -141,8 +153,13 @@ const ensureAPIClientLiveness = () => { eventsCounter = 0; } - Signals.get('resetApiClient').dispatch(); - logger.info("Dispatched 'resetApiClient' signal to re-instantiate the API client."); + if (typeof eventSubscribeClientPoolIndex === 'number') { + const apiClient = await getApiClient(eventSubscribeClientPoolIndex); + Signals.get('resetApiClient').dispatch(apiClient, true); + logger.debug( + `Dispatched 'resetApiClient' signal for the event subscription API client ${apiClient.poolIndex}.`, + ); + } } }, config.clientConnVerifyInterval); } else { @@ -162,8 +179,13 @@ const genesisBlockDownloadedListener = () => { ensureAPIClientLiveness(); }; +const eventSubscriptionClientResetListener = () => { + eventSubscribeClientPoolIndex = null; +}; + Signals.get('nodeIsSynced').add(nodeIsSyncedListener); Signals.get('genesisBlockDownloaded').add(genesisBlockDownloadedListener); +Signals.get('eventSubscriptionClientReset').add(eventSubscriptionClientResetListener); module.exports = { events, diff --git a/services/blockchain-connector/shared/utils/download.js b/services/blockchain-connector/shared/utils/download.js index 7ffb6dcc0a..34db98ad89 100644 --- a/services/blockchain-connector/shared/utils/download.js +++ b/services/blockchain-connector/shared/utils/download.js @@ -113,10 +113,10 @@ const downloadFile = (url, dirPath) => }); const verifyFileChecksum = async (filePath, checksumPath) => { - // Validate existance of both files + // Validate the existence of both the files if (!(await exists(filePath)) || !(await exists(checksumPath))) { logger.info( - `Checksum verification failed. One of the following files does not exist:\nfilePath:${filePath}\nchecksumPath:${checksumPath}`, + `Checksum verification failed. One of the following files does not exist:\nfilePath: ${filePath}\nchecksumPath: ${checksumPath}`, ); return false; } @@ -141,11 +141,12 @@ const verifyFileChecksum = async (filePath, checksumPath) => { const fileChecksum = fileHash.toString('hex'); if (fileChecksum !== expectedChecksum) { logger.info( - `Checksum verification failed for file:${filePath}\nExpected: ${expectedChecksum}, Actual: ${fileChecksum}`, + `Checksum verification failed for file: ${filePath}\nExpected: ${expectedChecksum}, Actual: ${fileChecksum}`, ); return false; } + logger.info(`Checksum verification successful for file: ${filePath}`); return true; }; diff --git a/services/blockchain-coordinator/README.md b/services/blockchain-coordinator/README.md index 1649646b23..74ac1a7329 100644 --- a/services/blockchain-coordinator/README.md +++ b/services/blockchain-coordinator/README.md @@ -28,7 +28,7 @@ A list of the most commonly used environment variables is presented below: - `SERVICE_BROKER`: URL of the microservice message broker (NATS or Redis). - `SERVICE_MESSAGE_QUEUE_REDIS`: URL of the job queue to schedule the indexing jobs (Redis). -- `JOB_INTERVAL_INDEX_MISSING_BLOCKS`: Job run interval to index missing blocks. By default, it is set to 0. +- `JOB_INTERVAL_INDEX_MISSING_BLOCKS`: Job run interval to index missing blocks. By default, it is set to `0`. - `JOB_SCHEDULE_INDEX_MISSING_BLOCKS`: Job run cron schedule to index missing blocks. By default, it is set to run every 5 minutes (`*/5 * * * *`). > **Note**: `interval` takes priority over `schedule` and must be greater than 0 to be valid for all the moleculer job configurations. diff --git a/services/blockchain-indexer/README.md b/services/blockchain-indexer/README.md index 03f6dfd69e..0d2497641d 100644 --- a/services/blockchain-indexer/README.md +++ b/services/blockchain-indexer/README.md @@ -40,30 +40,30 @@ A list of the most commonly used environment variables is presented below: - `ENABLE_INDEXING_MODE`: Boolean flag to enable the Data Indexing mode. - `ENABLE_PERSIST_EVENTS`: Boolean flag to permanently maintain the events in the MySQL database. - `ENABLE_APPLY_SNAPSHOT`: Boolean flag to enable initialization of the index with the Lisk Service DB snapshot. -- `DURABILITY_VERIFY_FREQUENCY`: Frequency in milliseconds to verify if a block is indexed or rolled-back successfully. By default, it is set to 1. +- `DURABILITY_VERIFY_FREQUENCY`: Frequency in milliseconds to verify if a block is indexed or rolled-back successfully. By default, it is set to `1`. - `INDEX_SNAPSHOT_URL`: URL from where the Lisk Service DB snapshot will be downloaded. - `ENABLE_SNAPSHOT_ALLOW_INSECURE_HTTP`: Boolean flag to enable downloading snapshot from an (unsecured) HTTP URL. - `LISK_STATIC`: URL of Lisk static assets. - `SERVICE_INDEXER_CACHE_REDIS`: URL of the cache storage (Redis). -- `ACCOUNT_BALANCE_UPDATE_BATCH_SIZE`: Number of accounts for which the balance index is updated at a time. By default, it is set to 1000. -- `INDEX_BLOCKS_QUEUE_SCHEDULED_JOB_MAX_COUNT`: Maximum number of jobs (in active and waiting state) allowed in the block indexing queue. By default, it is set to 100000. -- `JOB_INTERVAL_DELETE_SERIALIZED_EVENTS`: Job run interval to delete serialized events. By default, it is set to 0. +- `ACCOUNT_BALANCE_UPDATE_BATCH_SIZE`: Number of accounts for which the balance index is updated at a time. By default, it is set to `1000`. +- `INDEX_BLOCKS_QUEUE_SCHEDULED_JOB_MAX_COUNT`: Maximum number of jobs (in active and waiting state) allowed in the block indexing queue. By default, it is set to `100000`. +- `JOB_INTERVAL_DELETE_SERIALIZED_EVENTS`: Job run interval to delete serialized events. By default, it is set to `0`. - `JOB_SCHEDULE_DELETE_SERIALIZED_EVENTS`: Job run cron schedule to delete serialized events. By default, it is set to run every 5th minute (`*/5 * * * *`). -- `JOB_INTERVAL_REFRESH_VALIDATORS`: Job run interval to refresh validators cache. By default, it is set to 0. +- `JOB_INTERVAL_REFRESH_VALIDATORS`: Job run interval to refresh validators cache. By default, it is set to `0`. - `JOB_SCHEDULE_REFRESH_VALIDATORS`: Job run cron schedule to refresh validators cache. By default, it is set to run every 5th minute (`*/5 * * * *`). -- `JOB_INTERVAL_VALIDATE_VALIDATORS_RANK`: Job run interval to validate the rank for all the validators. By default, it is set to 0. +- `JOB_INTERVAL_VALIDATE_VALIDATORS_RANK`: Job run interval to validate the rank for all the validators. By default, it is set to `0`. - `JOB_SCHEDULE_VALIDATE_VALIDATORS_RANK`: Job run cron schedule to validate the rank for all the validators. By default, it is set to run every 15 minutes, and starts at 4 minutes past the hour (`4-59/15 * * * *`). -- `JOB_INTERVAL_REFRESH_INDEX_STATUS`: Job run interval to refresh indexing status. By default, it is set to run every 10 seconds. -- `JOB_SCHEDULE_REFRESH_INDEX_STATUS`: Job run cron schedule to refresh indexing status. By default, it is set to ''. -- `JOB_INTERVAL_REFRESH_BLOCKCHAIN_APPS_STATS`: Job run interval to refresh blockchain application statistics. By default, it is set to 0. +- `JOB_INTERVAL_REFRESH_INDEX_STATUS`: Job run interval to refresh indexing status. By default, it is set to run every `10` seconds. +- `JOB_SCHEDULE_REFRESH_INDEX_STATUS`: Job run cron schedule to refresh indexing status. By default, it is set to `''`. +- `JOB_INTERVAL_REFRESH_BLOCKCHAIN_APPS_STATS`: Job run interval to refresh blockchain application statistics. By default, it is set to `0`. - `JOB_SCHEDULE_REFRESH_BLOCKCHAIN_APPS_STATS`: Job run cron schedule to refresh blockchain application statistics. By default, it is set to run every 15th minute (`*/15 * * * *`). -- `JOB_INTERVAL_REFRESH_ACCOUNT_KNOWLEDGE`: Job run interval to refresh account knowledge. By default, it is set to 0. +- `JOB_INTERVAL_REFRESH_ACCOUNT_KNOWLEDGE`: Job run interval to refresh account knowledge. By default, it is set to `0`. - `JOB_SCHEDULE_REFRESH_ACCOUNT_KNOWLEDGE`: Job run cron schedule to refresh account knowledge. By default, it is set to run every 15th minute (`*/15 * * * *`). -- `JOB_INTERVAL_DELETE_FINALIZED_CCU_METADATA`: Job run interval to delete finalized CCU metadata. By default, it is set to 0. +- `JOB_INTERVAL_DELETE_FINALIZED_CCU_METADATA`: Job run interval to delete finalized CCU metadata. By default, it is set to `0`. - `JOB_SCHEDULE_DELETE_FINALIZED_CCU_METADATA`: Job run cron schedule to delete finalized CCU metadata. By default, it is set to run once a day at 02:00 am (`0 2 * * *`). -- `JOB_INTERVAL_TRIGGER_ACCOUNT_UPDATES`: Job run interval to trigger account updates. By default, it is set to 0. +- `JOB_INTERVAL_TRIGGER_ACCOUNT_UPDATES`: Job run interval to trigger account updates. By default, it is set to `0`. - `JOB_SCHEDULE_TRIGGER_ACCOUNT_UPDATES`: Job run cron schedule to trigger account updates. By default, it is set to run every 15th minute (`*/15 * * * *`). -- `ESTIMATES_BUFFER_BYTES_LENGTH`: Transaction buffer bytes to consider when estimating the transaction fees. By default, it is set to 0. +- `ESTIMATES_BUFFER_BYTES_LENGTH`: Transaction buffer bytes to consider when estimating the transaction fees. By default, it is set to `0`. - `MAINCHAIN_SERVICE_URL`: Mainchain service URL for custom deployments. - `INVOKE_ALLOWED_METHODS`: List of allowed methods that can be invoked via the `/invoke` API endpoint. The list can be expressed as a CSV. To allow invocation of all endpoints, set it to `*`. To allow invocation of all the registered methods under the specified namespaces, set it similar to `legacy,chain`. To allow invocation of specific methods, set it similar to `chain_getBlocksByIDs,chain_getBlocksByHeight`. diff --git a/services/export/README.md b/services/export/README.md index 96a3d9090f..26ebc3f162 100644 --- a/services/export/README.md +++ b/services/export/README.md @@ -27,7 +27,7 @@ A list of the most commonly used environment variables is presented below: - `SERVICE_BROKER`: URL of the microservice message broker (NATS or Redis). - `SERVICE_EXPORT_REDIS`: URL of the permanent cache storage (Redis). - `SERVICE_EXPORT_REDIS_VOLATILE`: URL of the volatile cache storage (Redis). -- `JOB_INTERVAL_CACHE_PURGE`: Job run interval to cleanup cache. By default, it is set to 0. +- `JOB_INTERVAL_CACHE_PURGE`: Job run interval to cleanup cache. By default, it is set to `0`. - `JOB_SCHEDULE_CACHE_PURGE`: Job run cron schedule to cleanup cache. By default, it is set to run daily at 04:45 am (`45 4 * * *`). - `EXPORT_S3_ENDPOINT`: Amazon S3 bucket endpoint. - `EXPORT_S3_ACCESS_KEY`: Amazon S3 bucket access key for the specified endpoint. diff --git a/services/fee-estimator/app.js b/services/fee-estimator/app.js index 41314cf107..3b8c94c400 100644 --- a/services/fee-estimator/app.js +++ b/services/fee-estimator/app.js @@ -32,6 +32,7 @@ const app = Microservice({ logger: config.log, events: { chainNewBlock: async payload => Signals.get('newBlock').dispatch(payload), + chainDeleteBlock: async payload => Signals.get('deleteBlock').dispatch(payload), }, dependencies: ['connector'], }); diff --git a/services/fee-estimator/shared/dynamicFees.js b/services/fee-estimator/shared/dynamicFees.js index 1c85a7548e..4b6923507f 100644 --- a/services/fee-estimator/shared/dynamicFees.js +++ b/services/fee-estimator/shared/dynamicFees.js @@ -16,20 +16,18 @@ const util = require('util'); const { CacheRedis, Logger, Signals } = require('lisk-service-framework'); -const { requestConnector } = require('./utils/request'); - const config = require('../config'); const { getFeeConstants } = require('./feeConstants'); - +const { getLatestBlock } = require('./utils/chain'); const { checkAndProcessExecution, isFeeCalculationRunningInMode } = require('./utils/dynamicFees'); const cacheRedisFees = CacheRedis('fees', config.endpoints.cache); const logger = Logger(); -const calculateEstimateFeePerByteFull = async () => { - const { header: latestBlock } = await requestConnector('getLastBlock'); +const calculateEstimateFeePerByteFull = async newBlock => { + const { header: latestBlock } = newBlock; const fromHeight = config.feeEstimates.defaultStartBlockHeight; const toHeight = latestBlock.height; @@ -50,9 +48,9 @@ const calculateEstimateFeePerByteFull = async () => { return cachedFeeEstPerByteFull; }; -const calculateEstimateFeePerByteQuick = async () => { +const calculateEstimateFeePerByteQuick = async newBlock => { // For the cold start scenario - const { header: latestBlock } = await requestConnector('getLastBlock'); + const { header: latestBlock } = newBlock; const batchSize = config.feeEstimates.coldStartBatchSize; const toHeight = latestBlock.height; const fromHeight = toHeight - batchSize; @@ -81,7 +79,7 @@ const getEstimateFeePerByte = async () => { }; } - const { header: latestBlock } = await requestConnector('getLastBlock'); + const { header: latestBlock } = await getLatestBlock(); const validate = (feeEstPerByte, allowedLag = 0) => feeEstPerByte && ['low', 'med', 'high', 'updated', 'blockHeight', 'blockID'].every(key => @@ -111,15 +109,15 @@ const getEstimateFeePerByte = async () => { }; }; -const newBlockListener = async () => { +const newBlockListener = async newBlock => { try { if (config.feeEstimates.fullAlgorithmEnabled) { logger.debug('Initiate the dynamic fee estimates computation (full computation).'); - calculateEstimateFeePerByteFull(); + calculateEstimateFeePerByteFull(newBlock); } if (config.feeEstimates.quickAlgorithmEnabled) { logger.debug('Initiate the dynamic fee estimates computation (quick algorithm).'); - const feeEstimate = await calculateEstimateFeePerByteQuick(); + const feeEstimate = await calculateEstimateFeePerByteQuick(newBlock); logger.debug( `============== 'newFeeEstimate' signal: ${Signals.get('newFeeEstimate')} ==============.`, ); diff --git a/services/fee-estimator/shared/utils/chain.js b/services/fee-estimator/shared/utils/chain.js new file mode 100644 index 0000000000..e6ce481101 --- /dev/null +++ b/services/fee-estimator/shared/utils/chain.js @@ -0,0 +1,109 @@ +/* + * LiskHQ/lisk-service + * Copyright © 2024 Lisk Foundation + * + * See the LICENSE file at the top-level directory of this distribution + * for licensing information. + * + * Unless otherwise agreed in a custom licensing agreement with the Lisk Foundation, + * no part of this software, including this file, may be copied, modified, + * propagated, or distributed except according to the terms contained in the + * LICENSE file. + * + * Removal or modification of this copyright notice is prohibited. + * + */ +const { + CacheRedis, + Signals, + Utils: { isObject }, +} = require('lisk-service-framework'); + +const config = require('../../config'); +const { requestConnector } = require('./request'); + +let genesisHeight; +let blockTime; +let latestBlock; + +const { emaBatchSize } = config.feeEstimates; + +const blockByHeightCache = CacheRedis('blockByHeight', config.endpoints.cache); + +const isValidBlock = block => { + if (!isObject(block)) return false; + return 'assets' in block && 'header' in block && 'transactions' in block; +}; + +const setLatestBlock = block => { + if (isValidBlock(block)) latestBlock = block; +}; + +const getLatestBlock = async () => { + if (typeof latestBlock !== 'object') { + latestBlock = await requestConnector('getLastBlock'); + } + return latestBlock; +}; + +const getBlockTime = async () => { + if (typeof blockTime !== 'number') { + const nodeInfo = await requestConnector('getNodeInfo'); + blockTime = nodeInfo.genesis.blockTime; + } + return blockTime; +}; + +const getGenesisHeight = async () => { + if (typeof genesisHeight !== 'number') { + genesisHeight = await requestConnector('getGenesisHeight'); + } + return genesisHeight; +}; + +const cacheBlockByHeight = async block => { + try { + if (isValidBlock(block)) { + const numBlocksToKeep = emaBatchSize * 2; + const blockTimeInMs = (await getBlockTime()) * 1000; + + await blockByHeightCache.set( + block.header.height, + JSON.stringify(block), + numBlocksToKeep * blockTimeInMs, + ); + } + } catch (_) { + // No actions to be taken + } +}; + +const emptyCacheBlockByHeight = async block => { + try { + if (isValidBlock(block)) { + await blockByHeightCache.delete(block.header.height); + } + } catch (_) { + // No actions to be taken + } +}; + +const getBlockByHeight = async height => { + const blockStr = await blockByHeightCache.get(height); + if (blockStr) return JSON.parse(blockStr); + + const block = await requestConnector('getBlockByHeight', { height }); + await cacheBlockByHeight(block); + return block; +}; + +Signals.get('newBlock').add(setLatestBlock); +Signals.get('newBlock').add(cacheBlockByHeight); +Signals.get('deleteBlock').add(emptyCacheBlockByHeight); + +module.exports = { + setLatestBlock, + getLatestBlock, + getGenesisHeight, + getBlockByHeight, +}; diff --git a/services/fee-estimator/shared/utils/dynamicFees.js b/services/fee-estimator/shared/utils/dynamicFees.js index 827490d024..aa879418c4 100644 --- a/services/fee-estimator/shared/utils/dynamicFees.js +++ b/services/fee-estimator/shared/utils/dynamicFees.js @@ -17,8 +17,8 @@ const BluebirdPromise = require('bluebird'); const { CacheRedis, Logger } = require('lisk-service-framework'); +const { getGenesisHeight, getBlockByHeight } = require('./chain'); const { getEstimateFeePerByteForBlock } = require('./dynamicFeesLIP'); -const { requestConnector } = require('./request'); const config = require('../../config'); @@ -34,11 +34,11 @@ const cacheRedisFees = CacheRedis('fees', config.endpoints.cache); const logger = Logger(); const getEstimateFeePerByteForBatch = async (fromHeight, toHeight, cacheKey) => { - const genesisHeight = await requestConnector('getGenesisHeight'); + const genesisHeight = await getGenesisHeight(); const { defaultStartBlockHeight } = config.feeEstimates; // Check if the starting height is permitted by config or adjust acc. - // Use incrementation to skip the genesis block - it is not needed + // Skip the genesis block for calculation - it is not needed fromHeight = Math.max( ...[defaultStartBlockHeight, genesisHeight + 1, fromHeight].filter(n => !Number.isNaN(n)), ); @@ -70,9 +70,9 @@ const getEstimateFeePerByteForBatch = async (fromHeight, toHeight, cacheKey) => blockBatch.data = await BluebirdPromise.map( range(finalEMABatchSize), async i => { - const { header, transactions } = await requestConnector('getBlockByHeight', { - height: prevFeeEstPerByte.blockHeight + 1 - i, - }); + const { header, transactions } = await getBlockByHeight( + prevFeeEstPerByte.blockHeight + 1 - i, + ); return { ...header, transactions }; }, { concurrency: 50 }, diff --git a/services/gateway/README.md b/services/gateway/README.md index cc163c3509..07b91dc314 100644 --- a/services/gateway/README.md +++ b/services/gateway/README.md @@ -33,21 +33,21 @@ A list of the most commonly used environment variables is presented below: - `CORS_ALLOWED_ORIGIN`: Allows request from the comma separated string of origins. By default, it is set to `*` which allows request from all origins. - `SERVICE_GATEWAY_REDIS_VOLATILE`: URL of the volatile cache storage (Redis). - `ENABLE_REVERSE_PROXY_TIMEOUT_SETTINGS`: Boolean flag to enforce the `headersTimeout` and `keepAliveTimeout` settings on the API server. Enabling this might be helpful when deploying Lisk Service behind a load balancer or a reverse proxy. Check [this FAQ](https://moleculer.services/docs/0.14/faq.html#Why-am-I-getting-502-Bad-Gateway-when-api-gateway-is-behind-ALB-on-AWS) for more information. -- `HTTP_KEEP_ALIVE_TIMEOUT`: Defines the number of microseconds the gateway will wait before closing an idle connection. To enable, ensure `ENABLE_REVERSE_PROXY_TIMEOUT_SETTINGS` is set to true, or `HTTP_RATE_LIMIT_NUM_KNOWN_PROXIES` is set to greater than 0. By default, it is set to 65000. -- `HTTP_HEADERS_TIMEOUT`: Defines the maximum number of microseconds for the gateway to send HTTP response headers after the client's request. To enable, ensure `ENABLE_REVERSE_PROXY_TIMEOUT_SETTINGS` is set to true, or `HTTP_RATE_LIMIT_NUM_KNOWN_PROXIES` is set to greater than 0. Please ensure that the `HTTP_HEADERS_TIMEOUT` is set higher than the `HTTP_KEEP_ALIVE_TIMEOUT`. By default, it is set to 66000. +- `HTTP_KEEP_ALIVE_TIMEOUT`: Defines the number of microseconds the gateway will wait before closing an idle connection. To enable, ensure `ENABLE_REVERSE_PROXY_TIMEOUT_SETTINGS` is set to true, or `HTTP_RATE_LIMIT_NUM_KNOWN_PROXIES` is set to greater than 0. By default, it is set to `65000`. +- `HTTP_HEADERS_TIMEOUT`: Defines the maximum number of microseconds for the gateway to send HTTP response headers after the client's request. To enable, ensure `ENABLE_REVERSE_PROXY_TIMEOUT_SETTINGS` is set to true, or `HTTP_RATE_LIMIT_NUM_KNOWN_PROXIES` is set to greater than 0. Please ensure that the `HTTP_HEADERS_TIMEOUT` is set higher than the `HTTP_KEEP_ALIVE_TIMEOUT`. By default, it is set to `66000`. - `HTTP_RATE_LIMIT_ENABLE`: Boolean flag to enable HTTP rate limiter. Disabled by default. To enable, set it to `true`. -- `HTTP_RATE_LIMIT_WINDOW`: To keep a record of requests in the memory (in seconds). By default, it is set to 10 seconds. +- `HTTP_RATE_LIMIT_WINDOW`: To keep a record of requests in the memory (in seconds). By default, it is set to `10` seconds. - `HTTP_RATE_LIMIT_CONNECTIONS`: Maximum number of allowed requests for a specified window, configured by `HTTP_RATE_LIMIT_WINDOW`. - `HTTP_RATE_LIMIT_ENABLE_X_FORWARDED_FOR`: When set to true, the rate limiting algorithm considers the X-Forwarded-For header value to determine the client's IP address for rate limiting purposes. By default, it is set to `false`. -- `HTTP_RATE_LIMIT_NUM_KNOWN_PROXIES`: Defines the number of proxies that exist between the gateway and the external client application, enabling accurate identification of the client's IP address for rate limiting. Requires `HTTP_RATE_LIMIT_ENABLE_X_FORWARDED_FOR` to be enabled. By default, it is set to 0. +- `HTTP_RATE_LIMIT_NUM_KNOWN_PROXIES`: Defines the number of proxies that exist between the gateway and the external client application, enabling accurate identification of the client's IP address for rate limiting. Requires `HTTP_RATE_LIMIT_ENABLE_X_FORWARDED_FOR` to be enabled. By default, it is set to `0`. - `ENABLE_HTTP_CACHE_CONTROL`: Boolean flag to enable HTTP response caching. This includes the `Cache-Control` header within the responses. - `HTTP_CACHE_CONTROL_DIRECTIVES`: HTTP `cache-control` directives. - `WS_RATE_LIMIT_ENABLE`: Boolean flag to enable WS rate limiter. - `WS_RATE_LIMIT_CONNECTIONS`: Maximum number of new connections allowed within a configured window, specified by `WS_RATE_LIMIT_DURATION`. -- `WS_RATE_LIMIT_DURATION`: Rate limit window in seconds. By default, it is set to 1. +- `WS_RATE_LIMIT_DURATION`: Rate limit window in seconds. By default, it is set to `1`. - `ENABLE_REQUEST_CACHING`: Boolean flag to enable RPC response caching. - `GATEWAY_DEPENDENCIES`: Services on which the gateway is dependent (can be expressed as a CSV). -- `JOB_INTERVAL_UPDATE_READINESS_STATUS`: Job run interval to update the readiness status. By default, it is set to 0. +- `JOB_INTERVAL_UPDATE_READINESS_STATUS`: Job run interval to update the readiness status. By default, it is set to `0`. - `JOB_SCHEDULE_UPDATE_READINESS_STATUS`: Job run cron schedule to update the readiness status. By default, it is set to run every minute (`* * * * *`). > **Note**: `interval` takes priority over `schedule` and must be greater than 0 to be valid for all the moleculer job configurations. diff --git a/services/market/README.md b/services/market/README.md index 6e50bac2de..f5608c1e8c 100644 --- a/services/market/README.md +++ b/services/market/README.md @@ -31,16 +31,16 @@ A list of the most commonly used environment variables is presented below: - `EXCHANGERATESAPI_IO_API_KEY`: Access key to fetch data from the exchangeratesapi.io API. - `SERVICE_MARKET_FIAT_CURRENCIES`: Supported fiat currencies. - `SERVICE_MARKET_TARGET_PAIRS`: Supported target pairs. -- `JOB_INTERVAL_REFRESH_PRICES_BINANCE`: Job run interval to refresh prices from Binance. By default, it is set to 0. +- `JOB_INTERVAL_REFRESH_PRICES_BINANCE`: Job run interval to refresh prices from Binance. By default, it is set to `0`. - `JOB_SCHEDULE_REFRESH_PRICES_BINANCE`: Job run cron schedule to refresh prices from Binance. By default, it is set to run every minute (`* * * * *`). -- `JOB_INTERVAL_REFRESH_PRICES_BITTREX`: Job run interval to refresh prices from Bittrex. By default, it is set to 0. +- `JOB_INTERVAL_REFRESH_PRICES_BITTREX`: Job run interval to refresh prices from Bittrex. By default, it is set to `0`. - `JOB_SCHEDULE_REFRESH_PRICES_BITTREX`: Job run cron schedule to refresh prices from Bittrex. By default, it is set to run every minute (`* * * * *`). -- `JOB_INTERVAL_REFRESH_PRICES_EXCHANGERATESAPI`: Job run interval to refresh prices from exchangeratesapi. By default, it is set to 0. +- `JOB_INTERVAL_REFRESH_PRICES_EXCHANGERATESAPI`: Job run interval to refresh prices from exchangeratesapi. By default, it is set to `0`. - `JOB_SCHEDULE_REFRESH_PRICES_EXCHANGERATESAPI`: Job run cron schedule to refresh prices from exchangeratesapi. By default, it is set to run every minute (`* * * * *`). -- `JOB_INTERVAL_REFRESH_PRICES_KRAKEN`: Job run interval to refresh prices from Kraken. By default, it is set to 0. +- `JOB_INTERVAL_REFRESH_PRICES_KRAKEN`: Job run interval to refresh prices from Kraken. By default, it is set to `0`. - `JOB_SCHEDULE_REFRESH_PRICES_KRAKEN`: Job run cron schedule to refresh prices from Kraken. By default, it is set to run every minute (`* * * * *`). -- `JOB_INTERVAL_UPDATE_PRICES`: Job run interval to update market prices. By default, it is set to run every 5 seconds. -- `JOB_SCHEDULE_UPDATE_PRICES`: Job run cron schedule to update market prices. By default, it is set to ''. +- `JOB_INTERVAL_UPDATE_PRICES`: Job run interval to update market prices. By default, it is set to run every `5` seconds. +- `JOB_SCHEDULE_UPDATE_PRICES`: Job run cron schedule to update market prices. By default, it is set to `''`. > **Note**: `interval` takes priority over `schedule` and must be greater than 0 to be valid for all the moleculer job configurations. diff --git a/services/transaction-statistics/README.md b/services/transaction-statistics/README.md index 3d355a734f..1287cc5d1e 100644 --- a/services/transaction-statistics/README.md +++ b/services/transaction-statistics/README.md @@ -31,9 +31,9 @@ A list of the most commonly used environment variables is presented below: - `SERVICE_STATISTICS_MYSQL_READ_REPLICA`: Connection string of the (read only) replicated MySQL instance that the microservice connects to. - `SERVICE_STATISTICS_REDIS`: URL of the cache storage (Redis). - `TRANSACTION_STATS_HISTORY_LENGTH_DAYS`: The number of days for which the transaction statistics need to be built in retrospect to the application init. -- `JOB_INTERVAL_REFRESH_TRANSACTION_STATS`: Job run interval to refresh transaction statistics. By default, it is set to 0. +- `JOB_INTERVAL_REFRESH_TRANSACTION_STATS`: Job run interval to refresh transaction statistics. By default, it is set to `0`. - `JOB_SCHEDULE_REFRESH_TRANSACTION_STATS`: Job run cron schedule to refresh transaction statistics. By default, it is set to run every 30th minute (`*/30 * * * *`). -- `JOB_INTERVAL_VERIFY_TRANSACTION_STATS`: Job run interval to verify if the transaction statistics have been built correctly. By default, it is set to 0. +- `JOB_INTERVAL_VERIFY_TRANSACTION_STATS`: Job run interval to verify if the transaction statistics have been built correctly. By default, it is set to `0`. - `JOB_SCHEDULE_VERIFY_TRANSACTION_STATS`: Job run cron schedule to verify if the transaction statistics have been built correctly. By default, it is set to run every 3rd hour after the first 15 minutes (`15 */3 * * *`). > **Note**: `interval` takes priority over `schedule` and must be greater than 0 to be valid for all the moleculer job configurations.