Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Commit

Permalink
Merge pull request #2009 from LiskHQ/2006-add-handling-for-txpool-new…
Browse files Browse the repository at this point in the history
…transaction-event

Add handling for txpool_newTransaction event
  • Loading branch information
sameersubudhi authored Jan 8, 2024
2 parents a4248c1 + 958d3c1 commit b83e921
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 38 deletions.
11 changes: 9 additions & 2 deletions services/blockchain-connector/events/controller/blockchain.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ const { Logger, Signals } = require('lisk-service-framework');
const { MODULE_NAME_POS } = require('../../shared/sdk/constants/names');

const { getBlockByID } = require('../../shared/sdk/endpoints');
const { formatBlock: formatBlockFromFormatter } = require('../../shared/sdk/formatter');
const {
formatBlock: formatBlockFromFormatter,
formatTransaction,
} = require('../../shared/sdk/formatter');

const EMPTY_TREE_ROOT_HASH = 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855';
const logger = Logger();
Expand All @@ -43,7 +46,11 @@ const appNetworkEventController = async cb => {
};

const txpoolNewTransactionController = async cb => {
const txpoolNewTransactionListener = async payload => cb(payload);
const txpoolNewTransactionListener = async payload =>
cb({
...payload,
transaction: formatTransaction(payload.transaction),
});
Signals.get('txpoolNewTransaction').add(txpoolNewTransactionListener);
};

Expand Down
4 changes: 4 additions & 0 deletions services/blockchain-coordinator/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ const app = Microservice({
logger.debug("Received a 'systemNodeInfo' moleculer event from connecter.");
Signals.get('nodeInfo').dispatch(payload);
},
txpoolNewTransaction: async payload => {
logger.debug("Received a 'txpoolNewTransaction' moleculer event from connecter.");
Signals.get('txpoolNewTransaction').dispatch(payload);
},
},
dependencies: ['connector', 'indexer'],
});
Expand Down
11 changes: 10 additions & 1 deletion services/blockchain-coordinator/shared/eventsScheduler.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,13 @@ const scheduleDeleteBlock = async payload => {
const scheduleUpdatesOnNewRound = async payload => {
logger.debug('Scheduling updates on new round.');
await eventMessageQueue.add({ ...payload, isNewRound: true });
logger.debug('Finished scheduling updates on new round}.');
logger.debug('Finished scheduling updates on new round.');
};

const scheduleUpdatesOnNewTransaction = async payload => {
logger.debug('Scheduling updates on new transaction in the pool.');
await eventMessageQueue.add({ ...payload, isTxPoolNewTransaction: true });
logger.debug('Finished scheduling updates on new transaction in the pool.');
};

const initEventsScheduler = async () => {
Expand All @@ -53,6 +59,9 @@ const initEventsScheduler = async () => {

const newRoundListener = async payload => scheduleUpdatesOnNewRound(payload);
Signals.get('newRound').add(newRoundListener);

const txpoolNewTransactionListener = async payload => scheduleUpdatesOnNewTransaction(payload);
Signals.get('txpoolNewTransaction').add(txpoolNewTransactionListener);
};

module.exports = {
Expand Down
4 changes: 4 additions & 0 deletions services/blockchain-indexer/events/blockchain.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,11 @@ module.exports = [
logger.error(`Error occurred when processing 'transactions.new' event:\n${err.stack}`);
}
};

const txPoolNewTransactionListener = async payload => callback(payload);

Signals.get('newBlock').add(newTransactionsListener);
Signals.get('txPoolNewTransaction').add(txPoolNewTransactionListener);
},
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ const {
formatTransactionsInBlock,
} = require('./transactions');

const { getPendingTransactions, loadAllPendingTransactions } = require('./pendingTransactions');
const {
getPendingTransactions,
loadAllPendingTransactions,
formatPendingTransaction,
} = require('./pendingTransactions');

const {
getBlockchainApps,
Expand Down Expand Up @@ -127,6 +131,7 @@ module.exports = {
normalizeTransaction,
getPendingTransactions,
loadAllPendingTransactions,
formatPendingTransaction,
postTransactions,
dryRunTransactions,
estimateTransactionFees,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const BluebirdPromise = require('bluebird');
const {
Logger,
Exceptions: { ValidationException },
Signals,
} = require('lisk-service-framework');

const logger = Logger();
Expand All @@ -26,52 +27,55 @@ const { normalizeTransaction } = require('./transactions');
const { getIndexedAccountInfo } = require('../utils/account');
const { requestConnector } = require('../../utils/request');
const { getLisk32AddressFromPublicKey } = require('../../utils/account');
const { TRANSACTION_STATUS } = require('../../constants');
const { indexAccountPublicKey } = require('../../indexer/accountIndex');

let pendingTransactionsList = [];

const getPendingTransactionsFromCore = async () => {
const formatPendingTransaction = async transaction => {
const normalizedTransaction = await normalizeTransaction(transaction);
const senderAddress = getLisk32AddressFromPublicKey(normalizedTransaction.senderPublicKey);
const account = await getIndexedAccountInfo({ address: senderAddress }, ['name']);

normalizedTransaction.sender = {
address: senderAddress,
publicKey: normalizedTransaction.senderPublicKey,
name: account.name || null,
};

if (normalizedTransaction.params.recipientAddress) {
const recipientAccount = await getIndexedAccountInfo(
{ address: normalizedTransaction.params.recipientAddress },
['publicKey', 'name'],
);

normalizedTransaction.meta = {
recipient: {
address: normalizedTransaction.params.recipientAddress,
publicKey: recipientAccount ? recipientAccount.publicKey : null,
name: recipientAccount ? recipientAccount.name : null,
},
};
}

indexAccountPublicKey(normalizedTransaction.senderPublicKey);
normalizedTransaction.executionStatus = TRANSACTION_STATUS.PENDING;
return normalizedTransaction;
};

const getPendingTransactionsFromNode = async () => {
const response = await requestConnector('getTransactionsFromPool');
const pendingTx = await BluebirdPromise.map(
response,
async transaction => {
const normalizedTransaction = await normalizeTransaction(transaction);
const senderAddress = getLisk32AddressFromPublicKey(normalizedTransaction.senderPublicKey);
const account = await getIndexedAccountInfo({ address: senderAddress }, ['name']);

normalizedTransaction.sender = {
address: senderAddress,
publicKey: normalizedTransaction.senderPublicKey,
name: account.name || null,
};

if (normalizedTransaction.params.recipientAddress) {
const recipientAccount = await getIndexedAccountInfo(
{ address: normalizedTransaction.params.recipientAddress },
['publicKey', 'name'],
);

normalizedTransaction.meta = {
recipient: {
address: normalizedTransaction.params.recipientAddress,
publicKey: recipientAccount ? recipientAccount.publicKey : null,
name: recipientAccount ? recipientAccount.name : null,
},
};
}

indexAccountPublicKey(normalizedTransaction.senderPublicKey);
normalizedTransaction.executionStatus = 'pending';
return normalizedTransaction;
},
async transaction => formatPendingTransaction(transaction),
{ concurrency: response.length },
);
return pendingTx;
};

const loadAllPendingTransactions = async () => {
try {
pendingTransactionsList = await getPendingTransactionsFromCore();
pendingTransactionsList = await getPendingTransactionsFromNode();
logger.info(
`Updated pending transaction cache with ${pendingTransactionsList.length} transactions.`,
);
Expand Down Expand Up @@ -156,7 +160,7 @@ const getPendingTransactions = async params => {
.slice(offset, offset + limit)
.map(transaction => {
// Set the 'executionStatus'
transaction.executionStatus = 'pending';
transaction.executionStatus = TRANSACTION_STATUS.PENDING;
return transaction;
});

Expand All @@ -169,9 +173,16 @@ const getPendingTransactions = async params => {
return pendingTransactions;
};

const txPoolNewTransactionListener = async payload => {
const [transaction] = payload.data;
pendingTransactionsList.push(transaction);
};
Signals.get('txPoolNewTransaction').add(txPoolNewTransactionListener);

module.exports = {
getPendingTransactions,
loadAllPendingTransactions,
formatPendingTransaction,

// For unit test
validateParams,
Expand Down
2 changes: 2 additions & 0 deletions services/blockchain-indexer/shared/dataService/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const {
getBlockByHeight,
getBlockByID,
loadAllPendingTransactions,
formatPendingTransaction,
getTransactionIDsByBlockID,
getTransactionsByIDs,
normalizeTransaction,
Expand Down Expand Up @@ -141,6 +142,7 @@ module.exports = {
getTransactions,
getPendingTransactions,
reloadAllPendingTransactions,
formatPendingTransaction,
postTransactions,
getTransactionsByBlockID,
dryRunTransactions,
Expand Down
15 changes: 14 additions & 1 deletion services/blockchain-indexer/shared/messageProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const {
reloadValidatorCache,
getGenerators,
getNumberOfGenerators,
formatPendingTransaction,
} = require('./dataService');
const { accountAddrUpdateQueue } = require('./indexer/accountIndex');

Expand Down Expand Up @@ -117,6 +118,15 @@ const newRoundProcessor = async () => {
logger.info(`Finished performing all updates on new round.`);
};

const txPoolNewTransactionProcessor = async transaction => {
logger.debug(`New transaction (${transaction.id}) received.`);
const formattedTransaction = await formatPendingTransaction(transaction);
Signals.get('txPoolNewTransaction').dispatch({
data: [formattedTransaction],
meta: { count: 1, total: 1, offset: 0 },
});
};

const initMessageProcessors = async () => {
logger.info(`Registering job processor for ${accountMessageQueue.name} message queue.`);
accountMessageQueue.process(async job => {
Expand All @@ -138,7 +148,7 @@ const initMessageProcessors = async () => {

eventMessageQueue.process(async job => {
logger.debug('Subscribed to the events from coordinator.');
const { isNewBlock, isDeleteBlock, isNewRound } = job.data;
const { isNewBlock, isDeleteBlock, isNewRound, isTxPoolNewTransaction } = job.data;

if (isNewBlock) {
const { block } = job.data;
Expand All @@ -154,6 +164,9 @@ const initMessageProcessors = async () => {
}
} else if (isNewRound) {
await newRoundProcessor();
} else if (isTxPoolNewTransaction) {
const { transaction } = job.data;
await txPoolNewTransactionProcessor(transaction);
}
});

Expand Down

0 comments on commit b83e921

Please sign in to comment.