From 153c27fb5b0871333431058f6c4fc1baa9c9e714 Mon Sep 17 00:00:00 2001 From: Carlos V Date: Tue, 25 Jun 2024 20:38:39 +0000 Subject: [PATCH] common: add history for cost models and set view table for latest models. Indexer service and tap agent require a history of cost models for tap cost model checks and accept new and old one for 2-3 mins --- .../db/migrations/14-cost-models-history.ts | 150 ++++++++++++++++++ .../indexer-management/models/cost-model.ts | 7 +- .../resolvers/cost-models.ts | 91 +++++++---- 3 files changed, 215 insertions(+), 33 deletions(-) create mode 100644 packages/indexer-agent/src/db/migrations/14-cost-models-history.ts diff --git a/packages/indexer-agent/src/db/migrations/14-cost-models-history.ts b/packages/indexer-agent/src/db/migrations/14-cost-models-history.ts new file mode 100644 index 000000000..dab5cc23f --- /dev/null +++ b/packages/indexer-agent/src/db/migrations/14-cost-models-history.ts @@ -0,0 +1,150 @@ +import { Logger } from '@graphprotocol/common-ts' +import { utils } from 'ethers' +import { QueryInterface, DataTypes } from 'sequelize' + +interface MigrationContext { + queryInterface: QueryInterface + logger: Logger +} + +interface Context { + context: MigrationContext +} +export const COST_MODEL_GLOBAL = 'global' +export async function up({ context }: Context): Promise { + const { queryInterface, logger } = context + + const tables = await queryInterface.showAllTables() + logger.debug(`Checking if CostModelsHistory table exists`, { tables }) + + // CostModelsHistory: this table will store the history of cost models + // this is necessary since there could be a mismtach between the old table and the info the gateway has + // causing a failed request. Solution is to have a history and allow the "old" model for a limited time frame 2-3 mins + // For indexer-service is also helpful to have the history of the cost models since we want to obtain the minimum cost per receipt + // this will help since the gateway could send an old model and get blocked so we need the indexer to accept one of the 2 latest models + // in the past 30 seconds since the gateway updates its model every 30 seconds + + if (tables.includes('CostModelsHistory')) { + logger.debug(`CostModelsHistory already exist, migration not necessary`) + } else { + logger.info(`Create CostModelsHistory`) + await queryInterface.createTable('CostModelsHistory', { + id: { + type: DataTypes.BIGINT, + primaryKey: true, + autoIncrement: true, + unique: true, + }, + deployment: { + type: DataTypes.STRING, + allowNull: false, + validate: { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + isDeploymentID: (value: any) => { + if (typeof value !== 'string') { + throw new Error('Deployment ID must be a string') + } + // "0x..." and "global" is ok + if (utils.isHexString(value, 32) || value === COST_MODEL_GLOBAL) { + return + } + + throw new Error( + `Deployment ID must be a valid subgraph deployment ID or "global"`, + ) + }, + }, + }, + model: { + type: DataTypes.TEXT, + allowNull: true, + }, + variables: { + type: DataTypes.JSONB, + allowNull: true, + }, + createdAt: { + type: DataTypes.DATE, + allowNull: false, + }, + updatedAt: { + type: DataTypes.DATE, + allowNull: false, + }, + }) + if (tables.includes('CostModels')) { + logger.debug(`Copying data from CostModels into CostModelsHistory`) + const copyTableSQL = ` + INSERT INTO "CostModelsHistory" + SELECT * FROM "CostModels"; + ` + await queryInterface.sequelize.query(copyTableSQL) + logger.info(`Drop table "CostModels"`) + await queryInterface.dropTable('CostModels', { cascade: true }) + } + // To avoid creating a breaking change for the indexer-agent or indexer-service we create a view table + // Since now other systems can keep the same query towards "CostModels" and not need to change anything + + logger.info( + `Creating a view for CostModelsHistory to substitute "CostModels" table`, + ) + const viewSQL = ` + CREATE VIEW "CostModels" AS SELECT id, + deployment, + model, + variables, + "createdAt", + "updatedAt" + FROM "CostModelsHistory" t1 + JOIN + ( + SELECT MAX(id) + FROM "CostModelsHistory" + GROUP BY deployment + ) t2 + ON t1.id = t2.MAX; + ` + // We also need to create a trigger to notify indexer-service when a new cost model is added + // instead of it polling the db + await queryInterface.sequelize.query(viewSQL) + + const functionSQL = ` + CREATE FUNCTION cost_models_update_notify() + RETURNS trigger AS + $$ + BEGIN + IF TG_OP = 'DELETE' THEN + PERFORM pg_notify('cost_models_update_notification', format('{"tg_op": "DELETE", "deployment": "%s"}', OLD.deployment)); + RETURN OLD; + ELSIF TG_OP = 'INSERT' THEN + PERFORM pg_notify('cost_models_update_notification', format('{"tg_op": "INSERT", "deployment": "%s"}', NEW.deployment)); + RETURN NEW; + ELSE -- UPDATE OR TRUNCATE, should never happen + PERFORM pg_notify('cost_models_update_notification', format('{"tg_op": "%s", "deployment": null}', TG_OP, NEW.deployment)); + RETURN NEW; + END IF; + END; + $$ LANGUAGE 'plpgsql'; + ` + const triggerSQL = ` + CREATE TRIGGER cost_models_update AFTER INSERT OR UPDATE OR DELETE + ON "CostModelsHistory" + FOR EACH ROW EXECUTE PROCEDURE cost_models_update_notify(); + ` + await queryInterface.sequelize.query(functionSQL) + await queryInterface.sequelize.query(triggerSQL) + // Need to update sequence value for table else it will be unsynced with actual data + logger.info(`Update sequence for CostModelsHistory`) + const updateIdSeqSQL = `SELECT setval('"CostModelsHistory_id_seq"', (SELECT MAX(id) FROM "CostModelsHistory"));` + await queryInterface.sequelize.query(updateIdSeqSQL) + } +} + +export async function down({ context }: Context): Promise { + const { queryInterface, logger } = context + + logger.info(`Drop view "CostModels"`) + await queryInterface.sequelize.query('DROP VIEW IF EXISTS "CostModels"') + logger.info(`Drop table CostModelsHistory`) + await queryInterface.dropTable('CostModelsHistory') +} diff --git a/packages/indexer-common/src/indexer-management/models/cost-model.ts b/packages/indexer-common/src/indexer-management/models/cost-model.ts index 65cb27bb4..f74eb322c 100644 --- a/packages/indexer-common/src/indexer-management/models/cost-model.ts +++ b/packages/indexer-common/src/indexer-management/models/cost-model.ts @@ -73,14 +73,14 @@ export const defineCostModelModels = (sequelize: Sequelize): CostModelModels => CostModel.init( { id: { - type: DataTypes.INTEGER, + type: DataTypes.BIGINT, autoIncrement: true, + primaryKey: true, unique: true, }, deployment: { type: DataTypes.STRING, allowNull: false, - primaryKey: true, validate: { // eslint-disable-next-line @typescript-eslint/no-explicit-any isDeploymentID: (value: any) => { @@ -122,7 +122,8 @@ export const defineCostModelModels = (sequelize: Sequelize): CostModelModels => }, }, { - modelName: 'CostModel', + modelName: 'CostModelsHistory', + freezeTableName: true, sequelize, }, ) diff --git a/packages/indexer-common/src/indexer-management/resolvers/cost-models.ts b/packages/indexer-common/src/indexer-management/resolvers/cost-models.ts index 8a8e8885d..e33bff707 100644 --- a/packages/indexer-common/src/indexer-management/resolvers/cost-models.ts +++ b/packages/indexer-common/src/indexer-management/resolvers/cost-models.ts @@ -52,6 +52,7 @@ export default { ): Promise => { const model = await models.CostModel.findOne({ where: { deployment }, + order: [['id', 'DESC']], }) if (model) { return model.toGraphQL() @@ -59,6 +60,7 @@ export default { const globalModel = await models.CostModel.findOne({ where: { deployment: COST_MODEL_GLOBAL }, + order: [['id', 'DESC']], }) if (globalModel) { globalModel.setDataValue('deployment', deployment) @@ -73,14 +75,37 @@ export default { { deployments }: { deployments: string[] | null | undefined }, { models }: IndexerManagementResolverContext, ): Promise => { - const costModels = await models.CostModel.findAll({ - where: deployments ? { deployment: deployments } : undefined, - order: [['deployment', 'ASC']], + const sequelize = models.CostModel.sequelize + if (!sequelize) { + throw new Error('No sequelize instance available') + } + const query = ` + SELECT id, + deployment, + model, + variables, + "createdAt", + "updatedAt" + FROM "CostModelsHistory" t1 + JOIN + ( + SELECT MAX(id) + FROM "CostModelsHistory" + ${deployments ? 'WHERE deployment IN (:deployments)' : ''} + GROUP BY deployment + ) t2 + ON t1.id = t2.MAX; + ` + const costModels = await sequelize.query(query, { + replacements: { deployments: deployments ? deployments : [] }, + mapToModel: true, + model: models.CostModel, }) const definedDeployments = new Set(costModels.map((model) => model.deployment)) const undefinedDeployments = deployments?.filter((d) => !definedDeployments.has(d)) const globalModel = await models.CostModel.findOne({ where: { deployment: COST_MODEL_GLOBAL }, + order: [['id', 'DESC']], }) if (globalModel && undefinedDeployments) { const mergedCostModels = undefinedDeployments.map((d) => { @@ -110,7 +135,6 @@ export default { `Can't set cost model: DAI injection enabled but not on Ethereum Mainnet`, ) } - const update = parseGraphQLCostModel(costModel) // Validate cost model @@ -121,37 +145,44 @@ export default { } catch (err) { throw new Error(`Invalid cost model or variables: ${err.message}`) } - const [model] = await models.CostModel.findOrBuild({ + const oldModel = await models.CostModel.findOne({ where: { deployment: update.deployment }, + order: [['id', 'DESC']], }) - // logger.info('Fetched current model', { current: model, update }) - // model.set('deployment', update.deployment || model.deployment) - // // model.set('model', update.model || model.model) - // model.model = update.model || model.model - // logger.info('Merged models', { now: model }) - model.deployment = update.deployment || model.deployment - model.model = update.model || model.model - - // Update the model variables (fall back to current value if unchanged) - let variables = update.variables || model.variables - - if (injectDai) { - const oldDai = getVariable(model.variables, 'DAI') - const newDai = getVariable(update.variables, 'DAI') - - // Inject the latest DAI value if available - if (dai.valueReady) { - variables = setVariable(variables, 'DAI', await dai.value()) - } else if (newDai === undefined && oldDai !== undefined) { - // Otherwise preserve the old DAI value if there is one; - // this ensures it's never dropped - variables = setVariable(variables, 'DAI', oldDai) + const model = models.CostModel.build({ + deployment: update.deployment, + model: update.model || oldModel?.model, + variables: update.variables || oldModel?.variables, + }) + if (oldModel) { + let variables = update.variables || oldModel!.variables + if (injectDai) { + const oldDai = getVariable(oldModel!.variables, 'DAI') + const newDai = getVariable(update.variables, 'DAI') + + // Inject the latest DAI value if available + if (dai.valueReady) { + variables = setVariable(variables, 'DAI', await dai.value()) + } else if (newDai === undefined && oldDai !== undefined) { + // Otherwise preserve the old DAI value if there is one; + // this ensures it's never dropped + variables = setVariable(variables, 'DAI', oldDai) + } + // Apply new variables + model.variables = variables + } + } else { + let variables = update.variables + if (injectDai) { + // Inject the latest DAI value if available + if (dai.valueReady) { + variables = setVariable(variables, 'DAI', await dai.value()) + } + // Apply new variables + model.variables = variables } } - // Apply new variables - model.variables = variables - return (await model.save()).toGraphQL() },