Skip to content

Commit

Permalink
Merge pull request #53 from biothings/fix-896-bte
Browse files Browse the repository at this point in the history
Add r/w lock on SmartAPI specs and fix all metakg endpoints
  • Loading branch information
tokebe authored Nov 13, 2024
2 parents ade5f18 + d9bd79d commit 8ca8bb4
Show file tree
Hide file tree
Showing 20 changed files with 155 additions and 61 deletions.
16 changes: 8 additions & 8 deletions __test__/integration/controllers/association.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,26 @@ import assoc from "../../../src/controllers/association";

describe("Test association module", () => {
test("By default, should return all associations", async () => {
const res = assoc();
const res = await assoc();
expect(res.length).toBeGreaterThan(10);
expect(res[0]).toHaveProperty("subject");
expect(res[0]).toHaveProperty("api");
});

test("If sub specified, should only return associations related to the sub", async () => {
const res = assoc("Gene");
const res = await assoc("Gene");
const inputTypes = new Set(res.map(item => item.subject));
expect(Array.from(inputTypes)).toHaveLength(1);
expect(Array.from(inputTypes)).toEqual(["Gene"]);
});

test("If invalid sub specified, should only empty list", async () => {
const res = assoc("Gene1");
const res = await assoc("Gene1");
expect(res).toEqual([]);
});

test("If obj specified, should only return associations related to the obj", async () => {
const res = assoc(undefined, "SmallMolecule");
const res = await assoc(undefined, "SmallMolecule");
const outputTypes = new Set(res.map(item => item.object));
const inputTypes = new Set(res.map(item => item.subject));
expect(inputTypes.size).toBeGreaterThan(1);
Expand All @@ -30,7 +30,7 @@ describe("Test association module", () => {
});

test("If pred specified, should only return associations related to the pred", async () => {
const res = assoc(undefined, undefined, "treats");
const res = await assoc(undefined, undefined, "treats");
const preds = new Set(res.map(item => item.predicate));
const inputTypes = new Set(res.map(item => item.subject));
expect(inputTypes.size).toBeGreaterThan(1);
Expand All @@ -39,7 +39,7 @@ describe("Test association module", () => {
});

test("If api specified, should only return associations related to the api", async () => {
const res = assoc(undefined, undefined, undefined, undefined, "MyGene.info API");
const res = await assoc(undefined, undefined, undefined, undefined, "MyGene.info API");
const apis = new Set(res.map(item => item.api.name));
const inputTypes = new Set(res.map(item => item.subject));
expect(inputTypes.size).toBeGreaterThan(1);
Expand All @@ -48,7 +48,7 @@ describe("Test association module", () => {
});

test("If source specified, should only return associations related to the source", async () => {
const res = assoc(undefined, undefined, undefined, undefined, undefined, "infores:disgenet");
const res = await assoc(undefined, undefined, undefined, undefined, undefined, "infores:disgenet");
const sources = new Set(res.map(item => item.provided_by));
const inputTypes = new Set(res.map(item => item.subject));
expect(inputTypes.size).toBeGreaterThan(1);
Expand All @@ -57,7 +57,7 @@ describe("Test association module", () => {
});

test("If both sub and obj specified, should only return associations related to both sub and obj", async () => {
const res = assoc("Gene", "SmallMolecule");
const res = await assoc("Gene", "SmallMolecule");
const outputTypes = new Set(res.map(item => item.object));
const inputTypes = new Set(res.map(item => item.subject));
expect(Array.from(inputTypes)).toHaveLength(1);
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
"node-cron": "^2.0.3",
"npm": "^9.9.0",
"piscina": "^3.2.0",
"proper-lockfile": "^4.1.2",
"ps-node": "^0.1.6",
"snake-case": "^3.0.4",
"stream-chunker": "^1.2.8",
Expand Down
8 changes: 4 additions & 4 deletions src/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,14 @@ export default class Config {
});
this.app.use("/", fastLimiter);
this.app.use("/v1/query", medLimiter);
this.app.use("/v1/team/:team_name/query", medLimiter);
this.app.use("/v1/team/:smartapiID/query", fastLimiter);
this.app.use("/v1/team/:teamName/query", medLimiter);
this.app.use("/v1/team/:smartAPIID/query", fastLimiter);
this.app.use("/v1/meta_knowledge_graph", fastLimiter);
this.app.use("/v1/team/:teamName/meta_knowledge_graph", fastLimiter);
this.app.use("/v1/smartapi/:smartapiID/meta_knowledge_graph", fastLimiter);
this.app.use("/v1/smartapi/:smartAPIID/meta_knowledge_graph", fastLimiter);
this.app.use("/v1/asyncquery", fastLimiter);
this.app.use("/v1/team/:teamName/asyncquery", fastLimiter);
this.app.use("/v1/smartapi/:smartapiID/asyncquery", fastLimiter);
this.app.use("/v1/smartapi/:smartAPIID/asyncquery", fastLimiter);
this.app.use("/queues", fastLimiter);
}

Expand Down
6 changes: 3 additions & 3 deletions src/controllers/association.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,21 @@ export interface AssocResult {
};
}

export default function (
export default async function (
sub: string = undefined,
obj: string = undefined,
pred: string = undefined,
component: string = undefined,
api: string = undefined,
source: string = undefined,
): AssocResult[] {
): Promise<AssocResult[]> {
const smartapi_specs = path.resolve(__dirname, "../../data/smartapi_specs.json");
debug(`smartapi specs loaded: ${smartapi_specs}`);
const predicates = path.resolve(__dirname, "../../data/predicates.json");
debug(`predicates endpoints loaded, ${predicates}`);
const kg = new meta_kg(smartapi_specs, predicates);
debug("metakg initialized");
kg.constructMetaKGSync(true, {});
await kg.constructMetaKGWithFileLock(true, {});
debug(`metakg loaded: ${kg.ops.length} ops`);
const associations: AssocResult[] = [];
const filtered_res = kg.filter({
Expand Down
2 changes: 1 addition & 1 deletion src/controllers/async/asyncquery_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ export function getQueryQueue(name: string): BullQueue {
workflow: [
{
id:
job.data.route.includes(":smartapi_id") || job.data.route.includes(":team_name")
job.data.route.includes(":smartAPIID") || job.data.route.includes(":teamName")
? "lookup"
: "lookup_and_score",
},
Expand Down
68 changes: 58 additions & 10 deletions src/controllers/cron/update_local_smartapi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ import SMARTAPI_EXCLUSIONS from "../../config/smartapi_exclusions";
import getSmartApiOverrideConfig from "../../config/smartapi_overrides";
import { SmartApiOverrides } from "../../types";
import apiList from "../../config/api_list";
import MetaKG, { SmartAPISpec } from "@biothings-explorer/smartapi-kg";
import { redisClient } from "@biothings-explorer/utils";
import MetaKG from "@biothings-explorer/smartapi-kg";
import { lockWithActionAsync, redisClient } from "@biothings-explorer/utils";
import { setTimeout } from "timers/promises";

const userAgent = `BTE/${process.env.NODE_ENV === "production" ? "prod" : "dev"} Node/${process.version} ${
process.platform
Expand Down Expand Up @@ -325,17 +326,42 @@ async function updateSmartAPISpecs() {
delete obj._score;
});

await fs.writeFile(localFilePath, JSON.stringify({ hits: hits }));
await lockWithActionAsync([localFilePath], async () => {
await fs.writeFile(localFilePath, JSON.stringify({ hits: hits }));
}, debug)

const predicatesInfo = await getOpsFromPredicatesEndpoints(res.data.hits);
await fs.writeFile(predicatesFilePath, JSON.stringify(predicatesInfo));
await lockWithActionAsync([predicatesFilePath], async () => {
await fs.writeFile(predicatesFilePath, JSON.stringify(predicatesInfo));
}, debug);

// Create a new metakg
const metakg = new MetaKG();
metakg.constructMetaKGSync(true, { predicates: predicatesInfo, smartapiSpecs: { hits: hits as any }, apiList });
await metakg.constructMetaKGWithFileLock(true, { predicates: predicatesInfo, smartapiSpecs: { hits: hits as any }, apiList });
global.metakg = metakg;
global.smartapi = { hits };
global.smartapi = { hits }; // hits is an array, but smartapi must be a dict
};

async function loadGlobalMetaKGReadOnly() {
await setTimeout(30000);
const localFilePath = path.resolve(__dirname, "../../../data/smartapi_specs.json");
const predicatesFilePath = path.resolve(__dirname, "../../../data/predicates.json");

const metakg = new MetaKG(localFilePath, predicatesFilePath);
await metakg.constructMetaKGWithFileLock(true, { apiList });
global.metakg = metakg;

global.smartapi = await lockWithActionAsync(
[localFilePath],
async () => {
const file = await fs.readFile(localFilePath, 'utf-8');
const hits = JSON.parse(file);
return hits;
},
debug
);
}

async function getAPIOverrides(data: { total?: number; hits: any }, overrides: SmartApiOverrides) {
// if only_overrides is enabled, only overridden apis are used
if (overrides.config.only_overrides) {
Expand Down Expand Up @@ -422,12 +448,35 @@ export default function manageSmartApi() {
process.env.INSTANCE_ID && process.env.INSTANCE_ID === "0", // Only one PM2 cluster instance should sync
].every(condition => condition);

/*
We schedule 2 cron jobs, one for non-syncing processes and one for the syncing process.
The non-syncing processes will only read from the local copy of the SmartAPI specs
after a 30 second timeout each time.
Whereas the syncing process will update the local copy of the SmartAPI specs.
We also run them once initially.
*/
if (!should_sync) {
debug(`SmartAPI sync disabled, server process ${process.pid} disabling smartapi updates.`);
debug(`Server process ${process.pid} disabling smartapi updates. SmartAPI files will be read from but not written to.`);
cron.schedule("*/10 * * * *", async () => {
debug(`Reading from SmartAPI specs now at ${new Date().toUTCString()}!`);
try {
await loadGlobalMetaKGReadOnly();
debug("Reading local copy of SmartAPI specs successful.");
} catch (err) {
debug(`Reading local copy of SmartAPI specs failed! The error message is ${err.toString()}`);
}
});

loadGlobalMetaKGReadOnly()
.then(() => {
debug("Reading local copy of SmartAPI specs successful.");
})
.catch(err => {
debug(`Reading local copy of SmartAPI specs failed! The error message is ${err.toString()}`);
});
return;
}

// Otherwise, schedule sync!
cron.schedule("*/10 * * * *", async () => {
debug(`Updating local copy of SmartAPI specs now at ${new Date().toUTCString()}!`);
try {
Expand All @@ -438,7 +487,6 @@ export default function manageSmartApi() {
}
});

// Run at start once
debug(`Running initial update of SmartAPI specs now at ${new Date().toUTCString()}`);
updateSmartAPISpecs()
.then(() => {
Expand All @@ -447,4 +495,4 @@ export default function manageSmartApi() {
.catch(err => {
debug(`Updating local copy of SmartAPI specs failed! The error message is ${err.toString()}`);
});
}
}
14 changes: 10 additions & 4 deletions src/controllers/meta_knowledge_graph.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import meta_kg, { KGQualifiersObject } from "@biothings-explorer/smartapi-kg";
import { snakeCase } from "snake-case";
import lockfile from "proper-lockfile";
import path from "path";
import PredicatesLoadingError from "../utils/errors/predicates_error";
const debug = require("debug")("bte:biothings-explorer-trapi:metakg");
import apiList from "../config/api_list";
import { supportedLookups } from "@biothings-explorer/query_graph_handler";
import MetaKG from "@biothings-explorer/smartapi-kg";

interface PredicateInfo {
predicate: string;
Expand All @@ -31,16 +33,17 @@ export default class MetaKnowledgeGraphHandler {
const smartapi_specs = path.resolve(__dirname, "../../data/smartapi_specs.json");
const predicates = path.resolve(__dirname, "../../data/predicates.json");
const kg = new meta_kg(smartapi_specs, predicates);

try {
if (smartAPIID !== undefined) {
debug(`Constructing with SmartAPI ID ${smartAPIID}`);
kg.constructMetaKGSync(false, { apiList, smartAPIID: smartAPIID });
await kg.constructMetaKGWithFileLock(false, { apiList, smartAPIID: smartAPIID });
} else if (teamName !== undefined) {
debug(`Constructing with team ${teamName}`);
kg.constructMetaKGSync(false, { apiList, teamName: teamName });
await kg.constructMetaKGWithFileLock(false, { apiList, teamName: teamName });
} else {
debug(`Constructing with default`);
kg.constructMetaKGSync(true, { apiList });
await kg.constructMetaKGWithFileLock(true, { apiList });
}
if (kg.ops.length === 0) {
debug(`Found 0 operations`);
Expand Down Expand Up @@ -86,10 +89,13 @@ export default class MetaKnowledgeGraphHandler {
}

async getKG(
metakg: MetaKG = undefined,
smartAPIID: string = this.smartAPIID,
teamName: string = this.teamName,
): Promise<{ nodes: {}; edges: any[] }> {
const kg = await this._loadMetaKG(smartAPIID, teamName);
// read metakg from files if not globally defined
const kg = metakg ?? await this._loadMetaKG(smartAPIID, teamName);

let knowledge_graph = {
nodes: {},
edges: [],
Expand Down
4 changes: 2 additions & 2 deletions src/controllers/threading/threadHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,11 @@ export async function runTask(req: Request, res: Response, route: string, useBul
route,
queryGraph: (req.body as TrapiQuery)?.message?.query_graph,
workflow: (req.body as TrapiQuery)?.workflow,
smartAPIID: req.params.smartAPIID,
teamName: req.params.teamName,
options: {
logLevel: (req.body as TrapiQuery).log_level || (req.query.log_level as string),
submitter: (req.body as TrapiQuery).submitter,
smartAPIID: req.params.smartapi_id,
teamName: req.params.team_name,
...req.query,
},
params: req.params,
Expand Down
4 changes: 2 additions & 2 deletions src/routes/bullboard.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ class BullBoardPage {
}
const queues = {
"/v1/asynquery": getQueryQueue("bte_query_queue"),
"/v1/smartapi/{smartapi_id}/asyncquery": getQueryQueue("bte_query_queue_by_api"),
"/v1/team/{team_name}/asyncquery": getQueryQueue("bte_query_queue_by_team"),
"/v1/smartapi/{smartAPIID}/asyncquery": getQueryQueue("bte_query_queue_by_api"),
"/v1/team/{teamName}/asyncquery": getQueryQueue("bte_query_queue_by_team"),
"/v1/query": getQueryQueue("bte_sync_query_queue"),
};

Expand Down
4 changes: 2 additions & 2 deletions src/routes/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ export const tasks: TaskByRoute = {
asyncquery_v1_by_team: (taskInfo: TaskInfo) => V1AsyncQueryByTeam.task(taskInfo),
// load MetaKG from global
meta_knowledge_graph_v1: (taskInfo: TaskInfo) => V1MetaKG.task(taskInfo),
meta_knowledge_graph_v1_by_team: (taskInfo: TaskInfo) => V1MetaKGByTeam.task(taskInfo),
meta_knowledge_graph_v1_by_api: (taskInfo: TaskInfo) => V1MetaKGByAPI.task(taskInfo),
// Not threaded due to being lightweight/speed being higher priority
// performance: (taskInfo: TaskInfo) => Performance.task(taskInfo),
// metakg: (taskInfo: TaskInfo) => MetaKG.task(taskInfo),
// meta_knowledge_graph_v1_by_api: (taskInfo: TaskInfo) => V1MetaKGByAPI.task(taskInfo),
// meta_knowledge_graph_v1_by_team: (taskInfo: TaskInfo) => V1MetaKGByTeam.task(taskInfo),
};
2 changes: 1 addition & 1 deletion src/routes/metakg.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class MetaKG {
if (req.query.provided_by !== undefined) {
source = utils.removeQuotesFromQuery(req.query.provided_by as string);
}
const assocs = assoc(
const assocs = await assoc(
req.query.subject as string,
req.query.object as string,
req.query.predicate as string,
Expand Down
4 changes: 2 additions & 2 deletions src/routes/v1/asyncquery_v1_by_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ import { BteRoute } from "../../types";
class V1AsyncQueryByAPI implements BteRoute {
setRoutes(app: Express) {
app
.route("/v1/smartapi/:smartapi_id/asyncquery")
.route("/v1/smartapi/:smartAPIID/asyncquery")
.post(swaggerValidation.validate, async (req: Request, res: Response, next: NextFunction) => {
const queueData: QueueData = {
route: req.route.path,
queryGraph: req.body?.message.query_graph,
smartAPIID: req.params.smartapi_id,
smartAPIID: req.params.smartAPIID,
workflow: req.body?.workflow,
callback_url: req.body?.callback,
options: {
Expand Down
4 changes: 2 additions & 2 deletions src/routes/v1/asyncquery_v1_by_team.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ import { QueueData, TaskInfo } from "@biothings-explorer/types";
class V1AsyncQueryByTeam implements BteRoute {
setRoutes(app: Express) {
app
.route("/v1/team/:team_name/asyncquery")
.route("/v1/team/:teamName/asyncquery")
.post(swaggerValidation.validate, (async (req: Request, res: Response, next: NextFunction) => {
const queueData: QueueData = {
route: req.route.path,
queryGraph: req.body?.message.query_graph,
teamName: req.params.team_name,
teamName: req.params.teamName,
workflow: req.body?.workflow,
callback_url: req.body?.callback,
options: {
Expand Down
17 changes: 8 additions & 9 deletions src/routes/v1/meta_knowledge_graph_v1.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import * as utils from "../../utils/common";
import { runTask, taskResponse, taskError } from "../../controllers/threading/threadHandler";
import { Express, NextFunction, Request, Response, RequestHandler } from "express";

import MetaKnowledgeGraph from "@biothings-explorer/smartapi-kg";

class MetaKG {
setRoutes(app: Express) {
app
Expand All @@ -23,15 +25,12 @@ class MetaKG {

async task(taskInfo: TaskInfo) {
try {
let kg = undefined;

// read metakg from files if not globally defined
if(!taskInfo.data.options.metakg) {
const metaKGHandler = new handler(undefined);
kg = await metaKGHandler.getKG();
} else {
kg = taskInfo.data.options.metakg;
}
const metaKGHandler = new handler(undefined);
let metakg = undefined;
// initialize MetaKG only if ops are provided because handler logic is built upon that
if (taskInfo.data.options.metakg !== undefined)
metakg = new MetaKnowledgeGraph(undefined, undefined, taskInfo.data.options.metakg);
const kg = await metaKGHandler.getKG(metakg);
// response.logs = utils.filterForLogLevel(response.logs, options.logLevel);
return taskResponse(kg);
} catch (error) {
Expand Down
Loading

0 comments on commit 8ca8bb4

Please sign in to comment.