Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Commit

Permalink
🐛 Re-subscribe events when prev subscribed client is reset
Browse files Browse the repository at this point in the history
  • Loading branch information
sameersubudhi committed Jan 7, 2024
1 parent 13918a9 commit 0dc5ceb
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 27 deletions.
17 changes: 13 additions & 4 deletions services/blockchain-connector/shared/sdk/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ const pingListener = apiClient => {
);
apiClient._channel.isAlive = false;
Signals.get('resetApiClient').dispatch(apiClient);
logger.debug(
`Dispatched 'resetApiClient' signal from pingListener for API client ${apiClient.poolIndex}.`,
);
}
}, WS_SERVER_PING_THRESHOLD);

Expand Down Expand Up @@ -174,7 +177,12 @@ const getApiClient = async poolIndex => {
return checkIsClientAlive(apiClient)
? apiClient
: (() => {
if (apiClient) Signals.get('resetApiClient').dispatch(apiClient);
if (apiClient) {
Signals.get('resetApiClient').dispatch(apiClient);
logger.debug(
`Dispatched 'resetApiClient' signal from getApiClient for API client ${apiClient.poolIndex}.`,
);
}
return waitForIt(getApiClient, 10);
})();
};
Expand All @@ -186,14 +194,15 @@ const resetApiClient = async (apiClient, isEventSubscriptionClient = false) => {
return;
}

const { poolIndex } = apiClient;

// Do not attempt reset if last ping was within the acceptable threshold
// This is to avoid unnecessary socket creation
if (Date.now() - (apiClient.lastPingAt || 0) < WS_SERVER_PING_THRESHOLD) {
logger.debug(`Not resetting apiClient ${poolIndex}. Received a late ping from the server.`);
return;
}

const { poolIndex } = apiClient;

if (isEventSubscriptionClient) {
logger.info(`Attempting to reset the eventSubscriptionClient: apiClient ${poolIndex}.`);
Signals.get('eventSubscriptionClientReset').dispatch();
Expand All @@ -215,7 +224,7 @@ const resetApiClient = async (apiClient, isEventSubscriptionClient = false) => {

clientPool[poolIndex] = newApiClient;

if (newApiClient) Signals.get('newApiClient').dispatch();
if (newApiClient) Signals.get('newApiClient').dispatch(newApiClient.poolIndex);
};
Signals.get('resetApiClient').add(resetApiClient);

Expand Down
17 changes: 11 additions & 6 deletions services/blockchain-connector/shared/sdk/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,16 @@ const emitEngineEvents = async () => {
};

// eslint-disable-next-line consistent-return
const subscribeToAllRegisteredEvents = async () => {
const subscribeToAllRegisteredEvents = async (newClientPoolIndex = null) => {
if (config.isUseHttpApi) return emitEngineEvents();

// Active client subscription available, skip invocation
if (typeof eventSubscribeClientPoolIndex === 'number') return null;
if (
typeof eventSubscribeClientPoolIndex === 'number' &&
eventSubscribeClientPoolIndex !== newClientPoolIndex
) {
return null;
}

// Reset eventsCounter first
eventsCounter = 0;
Expand Down Expand Up @@ -149,10 +154,10 @@ const ensureAPIClientLiveness = () => {
}

if (typeof eventSubscribeClientPoolIndex === 'number') {
const subscribedApiClient = await getApiClient(eventSubscribeClientPoolIndex);
Signals.get('resetApiClient').dispatch(subscribedApiClient, true);
logger.info(
`Dispatched 'resetApiClient' signal to re-instantiate the event subscription API client ${subscribedApiClient.poolIndex}.`,
const apiClient = await getApiClient(eventSubscribeClientPoolIndex);
Signals.get('resetApiClient').dispatch(apiClient, true);
logger.debug(
`Dispatched 'resetApiClient' signal for the event subscription API client ${apiClient.poolIndex}.`,
);
}
}
Expand Down
1 change: 1 addition & 0 deletions services/fee-estimator/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const app = Microservice({
logger: config.log,
events: {
chainNewBlock: async payload => Signals.get('newBlock').dispatch(payload),
chainDeleteBlock: async payload => Signals.get('deleteBlock').dispatch(payload),
},
dependencies: ['connector'],
});
Expand Down
20 changes: 9 additions & 11 deletions services/fee-estimator/shared/dynamicFees.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,18 @@
const util = require('util');
const { CacheRedis, Logger, Signals } = require('lisk-service-framework');

const { requestConnector } = require('./utils/request');

const config = require('../config');

const { getFeeConstants } = require('./feeConstants');

const { getLatestBlock } = require('./utils/chain');
const { checkAndProcessExecution, isFeeCalculationRunningInMode } = require('./utils/dynamicFees');

const cacheRedisFees = CacheRedis('fees', config.endpoints.cache);

const logger = Logger();

const calculateEstimateFeePerByteFull = async () => {
const { header: latestBlock } = await requestConnector('getLastBlock');
const calculateEstimateFeePerByteFull = async newBlock => {
const { header: latestBlock } = newBlock;
const fromHeight = config.feeEstimates.defaultStartBlockHeight;
const toHeight = latestBlock.height;

Expand All @@ -50,9 +48,9 @@ const calculateEstimateFeePerByteFull = async () => {
return cachedFeeEstPerByteFull;
};

const calculateEstimateFeePerByteQuick = async () => {
const calculateEstimateFeePerByteQuick = async newBlock => {
// For the cold start scenario
const { header: latestBlock } = await requestConnector('getLastBlock');
const { header: latestBlock } = newBlock;
const batchSize = config.feeEstimates.coldStartBatchSize;
const toHeight = latestBlock.height;
const fromHeight = toHeight - batchSize;
Expand Down Expand Up @@ -81,7 +79,7 @@ const getEstimateFeePerByte = async () => {
};
}

const { header: latestBlock } = await requestConnector('getLastBlock');
const { header: latestBlock } = await getLatestBlock();
const validate = (feeEstPerByte, allowedLag = 0) =>
feeEstPerByte &&
['low', 'med', 'high', 'updated', 'blockHeight', 'blockID'].every(key =>
Expand Down Expand Up @@ -111,15 +109,15 @@ const getEstimateFeePerByte = async () => {
};
};

const newBlockListener = async () => {
const newBlockListener = async newBlock => {
try {
if (config.feeEstimates.fullAlgorithmEnabled) {
logger.debug('Initiate the dynamic fee estimates computation (full computation).');
calculateEstimateFeePerByteFull();
calculateEstimateFeePerByteFull(newBlock);
}
if (config.feeEstimates.quickAlgorithmEnabled) {
logger.debug('Initiate the dynamic fee estimates computation (quick algorithm).');
const feeEstimate = await calculateEstimateFeePerByteQuick();
const feeEstimate = await calculateEstimateFeePerByteQuick(newBlock);
logger.debug(
`============== 'newFeeEstimate' signal: ${Signals.get('newFeeEstimate')} ==============.`,
);
Expand Down
109 changes: 109 additions & 0 deletions services/fee-estimator/shared/utils/chain.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* LiskHQ/lisk-service
* Copyright © 2024 Lisk Foundation
*
* See the LICENSE file at the top-level directory of this distribution
* for licensing information.
*
* Unless otherwise agreed in a custom licensing agreement with the Lisk Foundation,
* no part of this software, including this file, may be copied, modified,
* propagated, or distributed except according to the terms contained in the
* LICENSE file.
*
* Removal or modification of this copyright notice is prohibited.
*
*/
const {
CacheRedis,
Signals,
Utils: { isObject },
} = require('lisk-service-framework');

const config = require('../../config');
const { requestConnector } = require('./request');

let genesisHeight;
let blockTime;
let latestBlock;

const { emaBatchSize } = config.feeEstimates;

const blockByHeightCache = CacheRedis('blockByHeight', config.endpoints.cache);

const isValidBlock = block => {
if (!isObject(block)) return false;
return 'assets' in block && 'header' in block && 'transactions' in block;
};

const setLatestBlock = block => {
if (isValidBlock(block)) latestBlock = block;
};

const getLatestBlock = async () => {
if (typeof latestBlock !== 'object') {
latestBlock = await requestConnector('getLastBlock');
}
return latestBlock;
};

const getBlockTime = async () => {
if (typeof blockTime !== 'number') {
const nodeInfo = await requestConnector('getNodeInfo');
blockTime = nodeInfo.genesis.blockTime;
}
return blockTime;
};

const getGenesisHeight = async () => {
if (typeof genesisHeight !== 'number') {
genesisHeight = await requestConnector('getGenesisHeight');
}
return genesisHeight;
};

const cacheBlockByHeight = async block => {
try {
if (isValidBlock(block)) {
const numBlocksToKeep = emaBatchSize * 2;
const blockTimeInMs = (await getBlockTime()) * 1000;

await blockByHeightCache.set(
block.header.height,
JSON.stringify(block),
numBlocksToKeep * blockTimeInMs,
);
}
} catch (_) {
// No actions to be taken
}
};

const emptyCacheBlockByHeight = async block => {
try {
if (isValidBlock(block)) {
await blockByHeightCache.delete(block.header.height);
}
} catch (_) {
// No actions to be taken
}
};

const getBlockByHeight = async height => {
const blockStr = await blockByHeightCache.get(height);
if (blockStr) return JSON.parse(blockStr);

const block = await requestConnector('getBlockByHeight', { height });
await cacheBlockByHeight(block);
return block;
};

Signals.get('newBlock').add(setLatestBlock);
Signals.get('newBlock').add(cacheBlockByHeight);
Signals.get('deleteBlock').add(emptyCacheBlockByHeight);

module.exports = {
setLatestBlock,
getLatestBlock,
getGenesisHeight,
getBlockByHeight,
};
12 changes: 6 additions & 6 deletions services/fee-estimator/shared/utils/dynamicFees.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ const BluebirdPromise = require('bluebird');

const { CacheRedis, Logger } = require('lisk-service-framework');

const { getGenesisHeight, getBlockByHeight } = require('./chain');
const { getEstimateFeePerByteForBlock } = require('./dynamicFeesLIP');
const { requestConnector } = require('./request');

const config = require('../../config');

Expand All @@ -34,11 +34,11 @@ const cacheRedisFees = CacheRedis('fees', config.endpoints.cache);
const logger = Logger();

const getEstimateFeePerByteForBatch = async (fromHeight, toHeight, cacheKey) => {
const genesisHeight = await requestConnector('getGenesisHeight');
const genesisHeight = await getGenesisHeight();
const { defaultStartBlockHeight } = config.feeEstimates;

// Check if the starting height is permitted by config or adjust acc.
// Use incrementation to skip the genesis block - it is not needed
// Skip the genesis block for calculation - it is not needed
fromHeight = Math.max(
...[defaultStartBlockHeight, genesisHeight + 1, fromHeight].filter(n => !Number.isNaN(n)),
);
Expand Down Expand Up @@ -70,9 +70,9 @@ const getEstimateFeePerByteForBatch = async (fromHeight, toHeight, cacheKey) =>
blockBatch.data = await BluebirdPromise.map(
range(finalEMABatchSize),
async i => {
const { header, transactions } = await requestConnector('getBlockByHeight', {
height: prevFeeEstPerByte.blockHeight + 1 - i,
});
const { header, transactions } = await getBlockByHeight(
prevFeeEstPerByte.blockHeight + 1 - i,
);
return { ...header, transactions };
},
{ concurrency: 50 },
Expand Down

0 comments on commit 0dc5ceb

Please sign in to comment.