Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Commit

Permalink
📝 Add logs
Browse files Browse the repository at this point in the history
  • Loading branch information
sameersubudhi committed Feb 5, 2024
1 parent 6301f61 commit da085fe
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 46 deletions.
8 changes: 4 additions & 4 deletions services/export/shared/helpers/event.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
const { EVENT_TOPIC_PREFIX, LENGTH_ID } = require('./constants');

const getTransactionIDFromTopic0 = topic0 =>
topic0.length === topic0.startsWith(EVENT_TOPIC_PREFIX.TX_ID) &&
EVENT_TOPIC_PREFIX.TX_ID.length + LENGTH_ID
topic0.startsWith(EVENT_TOPIC_PREFIX.TX_ID) &&
topic0.length === EVENT_TOPIC_PREFIX.TX_ID.length + LENGTH_ID
? topic0.slice(EVENT_TOPIC_PREFIX.TX_ID.length)
: null;

const getCcmIDFromTopic0 = topic0 =>
topic0.length === topic0.startsWith(EVENT_TOPIC_PREFIX.CCM_ID) &&
EVENT_TOPIC_PREFIX.CCM_ID.length + LENGTH_ID
topic0.startsWith(EVENT_TOPIC_PREFIX.CCM_ID) &&
topic0.length === EVENT_TOPIC_PREFIX.CCM_ID.length + LENGTH_ID
? topic0.slice(EVENT_TOPIC_PREFIX.CCM_ID.length)
: null;

Expand Down
100 changes: 58 additions & 42 deletions services/export/shared/transactionsExport.js
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ const getEntriesByChronology = async (params, sortedBlocks, sortedTransactions,

// generatorAmount is the (excessive) transaction fee left after burning the transaction minFee
if (e.module === MODULE.FEE && e.name === EVENT.GENERATOR_FEE_PROCESSED) {
const generatorFeeEntries = getGeneratorFeeEntries(addressFromParams, e, tx, block);
const generatorFeeEntries = await getGeneratorFeeEntries(addressFromParams, e, tx, block);
entries.push(...generatorFeeEntries);
}

Expand Down Expand Up @@ -623,7 +623,7 @@ const getEntriesByChronology = async (params, sortedBlocks, sortedTransactions,
);
})();

const messageFeeEntries = getMessageFeeEntries(
const messageFeeEntries = await getMessageFeeEntries(
addressFromParams,
e,
tx,
Expand All @@ -639,7 +639,7 @@ const getEntriesByChronology = async (params, sortedBlocks, sortedTransactions,
// Shared custodial reward received/sent
if (e.module === MODULE.POS && e.name === EVENT.REWARDS_ASSIGNED) {
const isStaker = addressFromParams === e.data.stakerAddress;
const rewardAssignedEntries = getSharedRewardsAssignedEntries(
const rewardAssignedEntries = await getSharedRewardsAssignedEntries(
addressFromParams,
isStaker,
e,
Expand Down Expand Up @@ -691,12 +691,12 @@ const rescheduleExportOnTimeout = async params => {

const { address } = params;
const requestInterval = await standardizeIntervalFromParams(params);
logger.info(`Original job timed out. Re-scheduling job for ${address} (${requestInterval}).`);
logger.info(`Original job timed out. Rescheduling job for ${address} (${requestInterval}).`);

// eslint-disable-next-line no-use-before-define
await scheduleTransactionExportQueue.add({ params, isRescheduled: true });
} catch (err) {
logger.warn(`History export job Re-scheduling failed due to: ${err.message}`);
logger.warn(`History export job rescheduling failed due to: ${err.message}`);
logger.debug(err.stack);
}
};
Expand All @@ -711,16 +711,18 @@ const exportTransactions = async job => {
const isAccountHasTransactions = await checkIfAccountHasTransactions(params.address);
const isAccountValidator = await checkIfAccountIsValidator(params.address);
if (isAccountHasTransactions || isAccountValidator) {
const interval = await standardizeIntervalFromParams(params);
// Add a timeout to automatically re-schedule, if the current job times out on its last attempt
// Reschedule only once if all the current retries fail. Failsafe to avoid redundant scheduling and memory leaks
if (!isRescheduled && job.attemptsMade === job.opts.attempts - 1) {
timeout = setTimeout(
rescheduleExportOnTimeout.bind(null, params),
config.queue.scheduleTransactionExport.options.defaultJobOptions.timeout,
const rescheduleAfterMs =
config.queue.scheduleTransactionExport.options.defaultJobOptions.timeout;
timeout = setTimeout(rescheduleExportOnTimeout.bind(null, params), rescheduleAfterMs);
logger.info(
`Set timeout to auto-reschedule export for ${params.address} (${interval}) in ${rescheduleAfterMs}ms.`,
);
}

const interval = await standardizeIntervalFromParams(params);
const [from, to] = interval.split(':');
const range = moment.range(moment(from, DATE_FORMAT), moment(to, DATE_FORMAT));
const arrayOfDates = Array.from(range.by('day')).map(d => d.format(DATE_FORMAT));
Expand Down Expand Up @@ -795,6 +797,7 @@ const exportTransactions = async job => {

const excelFilename = await getExcelFilenameFromParams(params, currentChainID);
await workBook.xlsx.writeFile(`${config.cache.exports.dirPath}/${excelFilename}`);
logger.info(`Successfully exported the account transaction history to: ${excelFilename}.`);
await jobScheduledCache.delete(excelFilename); // Remove the entry from cache to free up memory

// Clear the auto re-schedule timeout on successful completion
Expand All @@ -821,49 +824,62 @@ const scheduleTransactionHistoryExport = async params => {
const address = getAddressFromParams(params);
const requestInterval = await standardizeIntervalFromParams(params);

exportResponse.data.address = address;
exportResponse.data.publicKey = publicKey;
exportResponse.data.interval = requestInterval;
try {
exportResponse.data.address = address;
exportResponse.data.publicKey = publicKey;
exportResponse.data.interval = requestInterval;

const currentChainID = await getCurrentChainID();
const excelFilename = await getExcelFilenameFromParams(params, currentChainID);
const currentChainID = await getCurrentChainID();
const excelFilename = await getExcelFilenameFromParams(params, currentChainID);

// Job already scheduled, skip remaining checks
if ((await jobScheduledCache.get(excelFilename)) === true) {
return exportResponse;
}
// Job already scheduled, skip remaining checks
if ((await jobScheduledCache.get(excelFilename)) === true) {
return exportResponse;
}

// Request already processed and the history is ready to be downloaded
if (await staticFiles.fileExists(excelFilename)) {
exportResponse.data.fileName = excelFilename;
exportResponse.data.fileUrl = await getExcelFileUrlFromParams(params, currentChainID);
exportResponse.meta.ready = true;
// Request already processed and the history is ready to be downloaded
if (await staticFiles.fileExists(excelFilename)) {
exportResponse.data.fileName = excelFilename;
exportResponse.data.fileUrl = await getExcelFileUrlFromParams(params, currentChainID);
exportResponse.meta.ready = true;

return exportResponse;
}
return exportResponse;
}

// Validate if account exists
const isAccountExists = await checkIfAccountExists(address);
if (!isAccountExists) throw new NotFoundException(`Account ${address} not found.`);

// Validate if account exists
const isAccountExists = await checkIfAccountExists(address);
if (!isAccountExists) throw new NotFoundException(`Account ${address} not found.`);
// Validate if the index is ready enough to serve the user request
const isBlockchainIndexReady = await checkIfIndexReadyForInterval(requestInterval);
if (!isBlockchainIndexReady) {
throw new ValidationException(
`The blockchain index is not yet ready for the requested interval (${requestInterval}). Please retry later.`,
);
}

// Validate if the index is ready enough to serve the user request
const isBlockchainIndexReady = await checkIfIndexReadyForInterval(requestInterval);
if (!isBlockchainIndexReady) {
throw new ValidationException(
`The blockchain index is not yet ready for the requested interval (${requestInterval}). Please retry later.`,
// Schedule a new job to process the history export
logger.debug(
`Attempting to schedule transaction history export for ${address} (${requestInterval}).`,
);
}
await scheduleTransactionExportQueue.add({ params: { ...params, address } });
logger.info(
`Successfully scheduled transaction history export for ${address} (${requestInterval}).`,
);
exportResponse.status = 'ACCEPTED';

// Schedule a new job to process the history export
await scheduleTransactionExportQueue.add({ params: { ...params, address } });
exportResponse.status = 'ACCEPTED';
const ttl = config.queue.scheduleTransactionExport.options.defaultJobOptions.timeout * 2;
await jobScheduledCache.set(excelFilename, true, ttl);

const ttl =
config.queue.scheduleTransactionExport.options.defaultJobOptions.timeout *
config.queue.scheduleTransactionExport.options.defaultJobOptions.attempts;
await jobScheduledCache.set(excelFilename, true, ttl);
return exportResponse;
} catch (err) {
if (err instanceof ValidationException) throw err;

return exportResponse;
const errMessage = `Unable to schedule transaction history export for ${address} (${requestInterval}) due to: ${err.message}`;
logger.warn(errMessage);
logger.debug(err.stack);
throw new Error(errMessage);
}
};

const downloadTransactionHistory = async ({ filename }) => {
Expand Down

0 comments on commit da085fe

Please sign in to comment.