diff --git a/packages/jobs/lib/app.ts b/packages/jobs/lib/app.ts index 48eced5639e..0cad188a1f8 100644 --- a/packages/jobs/lib/app.ts +++ b/packages/jobs/lib/app.ts @@ -5,13 +5,13 @@ import { cronAutoIdleDemo } from './crons/autoIdleDemo.js'; import { deleteOldActivityLogs } from './crons/deleteOldActivities.js'; import { deleteSyncsData } from './crons/deleteSyncsData.js'; import { getLogger } from '@nangohq/utils/dist/logger.js'; +import { JOBS_PORT } from './constants.js'; const logger = getLogger('Jobs'); try { - const port = parseInt(process.env['NANGO_JOBS_PORT'] || '') || 3005; - server.listen(port); - logger.info(`🚀 service ready at http://localhost:${port}`); + server.listen(JOBS_PORT); + logger.info(`🚀 service ready at http://localhost:${JOBS_PORT}`); const temporalNs = process.env['TEMPORAL_NAMESPACE'] || 'default'; const temporal = new Temporal(temporalNs); diff --git a/packages/jobs/lib/constants.ts b/packages/jobs/lib/constants.ts new file mode 100644 index 00000000000..81dedef6b15 --- /dev/null +++ b/packages/jobs/lib/constants.ts @@ -0,0 +1 @@ +export const JOBS_PORT = parseInt(process.env['NANGO_JOBS_PORT'] || '') || 3005; diff --git a/packages/jobs/lib/runner/local.runner.ts b/packages/jobs/lib/runner/local.runner.ts index 68464755331..bc74d87419a 100644 --- a/packages/jobs/lib/runner/local.runner.ts +++ b/packages/jobs/lib/runner/local.runner.ts @@ -47,7 +47,12 @@ export class LocalRunner implements Runner { logger.info(`[Runner] Starting runner with command: ${cmd} ${cmdOptions.join(' ')} `); const childProcess = spawn(cmd, cmdOptions, { - stdio: [null, null, null] + stdio: [null, null, null], + env: { + ...process.env, + RUNNER_ID: runnerId, + IDLE_MAX_DURATION_MS: '60000' + } }); if (!childProcess) { diff --git a/packages/jobs/lib/runner/render.runner.ts b/packages/jobs/lib/runner/render.runner.ts index 47b89d4db8a..9eb8b99970f 100644 --- a/packages/jobs/lib/runner/render.runner.ts +++ b/packages/jobs/lib/runner/render.runner.ts @@ -73,7 +73,7 @@ export class RenderRunner implements Runner { { key: 'NANGO_CLOUD', value: process.env['NANGO_CLOUD'] || 'true' }, { key: 'NODE_OPTIONS', value: '--max-old-space-size=384' }, { key: 'RUNNER_ID', value: runnerId }, - { key: 'NOTIFY_IDLE_ENDPOINT', value: `${jobsServiceUrl}/idle` }, + { key: 'JOBS_SERVICE_URL', value: `${jobsServiceUrl}` }, { key: 'IDLE_MAX_DURATION_MS', value: `${25 * 60 * 60 * 1000}` }, // 25 hours { key: 'PERSIST_SERVICE_URL', value: getPersistAPIUrl() }, { key: 'NANGO_TELEMETRY_SDK', value: process.env['NANGO_TELEMETRY_SDK'] || 'false' }, diff --git a/packages/jobs/lib/server.ts b/packages/jobs/lib/server.ts index ca03699cbb2..bfec70510fe 100644 --- a/packages/jobs/lib/server.ts +++ b/packages/jobs/lib/server.ts @@ -35,7 +35,7 @@ function healthProcedure() { function idleProcedure() { return publicProcedure.input(z.object({ runnerId: z.string().nonempty(), idleTimeMs: z.number() })).mutation(async ({ input }) => { const { runnerId, idleTimeMs } = input; - logger.info(`[IDLE]: runner '${runnerId}' has been idle for ${idleTimeMs}ms. Suspending...`); + logger.info(`[runner ${runnerId}]: idle for ${idleTimeMs}ms. Suspending...`); await suspendRunner(runnerId); return { status: 'ok' }; }); diff --git a/packages/runner/lib/monitor.ts b/packages/runner/lib/monitor.ts new file mode 100644 index 00000000000..93da096ee15 --- /dev/null +++ b/packages/runner/lib/monitor.ts @@ -0,0 +1,150 @@ +import os from 'os'; +import fs from 'fs'; +import { stringifyError, type NangoProps } from '@nangohq/shared'; +import * as superjson from 'superjson'; +import { fetch } from 'undici'; +import { getLogger } from '@nangohq/utils/dist/logger.js'; + +const MEMORY_WARNING_PERCENTAGE_THRESHOLD = 75; +const logger = getLogger('Runner'); + +export class RunnerMonitor { + private runnerId: string; + private tracked: Map = new Map(); + private jobsServiceUrl: string = ''; + private persistServiceUrl: string = ''; + private idleMaxDurationMs = parseInt(process.env['IDLE_MAX_DURATION_MS'] || '') || 0; + private lastIdleTrackingDate = Date.now(); + private lastMemoryReportDate: Date | null = null; + private idleInterval: NodeJS.Timeout | null = null; + private memoryInterval: NodeJS.Timeout | null = null; + + constructor({ runnerId, jobsServiceUrl, persistServiceUrl }: { runnerId: string; jobsServiceUrl: string; persistServiceUrl: string }) { + this.runnerId = runnerId; + this.jobsServiceUrl = jobsServiceUrl; + this.persistServiceUrl = persistServiceUrl; + if (this.jobsServiceUrl.length > 0) { + this.memoryInterval = this.checkMemoryUsage(); + this.idleInterval = this.checkIdle(); + } + process.on('SIGTERM', this.onExit.bind(this)); + } + + private onExit(): void { + if (this.idleInterval) { + clearInterval(this.idleInterval); + } + if (this.memoryInterval) { + clearInterval(this.memoryInterval); + } + } + + track(nangoProps: NangoProps): void { + if (nangoProps.syncJobId) { + this.lastIdleTrackingDate = Date.now(); + this.tracked.set(nangoProps.syncJobId, nangoProps); + } + } + + untrack(nangoProps: NangoProps): void { + if (nangoProps.syncJobId) { + this.tracked.delete(nangoProps.syncJobId); + } + } + + private checkMemoryUsage(): NodeJS.Timeout { + // eslint-disable-next-line @typescript-eslint/no-misused-promises + return setInterval(async () => { + const rss = process.memoryUsage().rss; + const total = getTotalMemoryInBytes(); + const memoryUsagePercentage = (rss / total) * 100; + if (memoryUsagePercentage > MEMORY_WARNING_PERCENTAGE_THRESHOLD) { + await this.reportHighMemoryUsage(memoryUsagePercentage); + } + }, 1000); + } + + private async reportHighMemoryUsage(memoryUsagePercentage: number): Promise { + // only report if it has been more than 30 seconds since the last report + if (this.lastMemoryReportDate) { + const now = new Date(); + const diffInSecs = (now.getTime() - this.lastMemoryReportDate.getTime()) / 1000; + if (diffInSecs < 30) { + return; + } + } + this.lastMemoryReportDate = new Date(); + for (const { environmentId, activityLogId } of this.tracked.values()) { + if (!environmentId || !activityLogId) { + continue; + } + await httpSend({ + method: 'post', + url: `${this.persistServiceUrl}/environment/${environmentId}/log`, + data: JSON.stringify({ + activityLogId: activityLogId, + level: 'warn', + msg: `Memory usage of nango scripts is high: ${memoryUsagePercentage.toFixed(2)}% of the total available memory.` + }) + }); + } + } + + private checkIdle(): NodeJS.Timeout { + // eslint-disable-next-line @typescript-eslint/no-misused-promises + return setInterval(async () => { + if (this.tracked.size == 0) { + const idleTimeMs = Date.now() - this.lastIdleTrackingDate; + if (idleTimeMs > this.idleMaxDurationMs) { + logger.info(`Runner '${this.runnerId}' idle for more than ${this.idleMaxDurationMs}ms`); + await httpSend({ + method: 'post', + url: `${this.jobsServiceUrl}/idle`, + data: superjson.stringify({ + runnerId: this.runnerId, + idleTimeMs + }) + }); + this.lastIdleTrackingDate = Date.now(); + } + } + }, 10000); + } +} + +function getRenderTotalMemoryInBytes(): number { + const memoryMaxFile = '/sys/fs/cgroup/memory.max'; + try { + const output = fs.readFileSync(memoryMaxFile, 'utf-8'); + const memoryLimitInBytes = parseInt(output.trim(), 10); + return memoryLimitInBytes; + } catch { + return 0; + } +} + +function getTotalMemoryInBytes(): number { + // when running inside a container, os.totalmem() returns the total memory of the system, not the memory limit of the container + // see: https://github.com/nodejs/node/issues/51095 + // process.constrainedMemory() is supposed to return the memory limit of the container but it doesn't work on Render + // so we need to use a workaround to get the memory limit of the container on Render + return process.constrainedMemory() || getRenderTotalMemoryInBytes() || os.totalmem(); +} + +async function httpSend({ method, url, data }: { method: string; url: string; data: string }): Promise { + try { + const res = await fetch(url, { + method: method, + headers: { + Accept: 'application/json', + 'Content-Type': 'application/json' + }, + body: data + }); + if (res.status !== 200) { + logger.error(`Error sending '${data}' to '${url}': ${JSON.stringify(await res.json())}`); + } + } catch (err) { + logger.error(`Error sending '${data}' to '${url}': ${stringifyError(err)}`); + } +} diff --git a/packages/runner/lib/server.ts b/packages/runner/lib/server.ts index ea628ecd339..24744c70b37 100644 --- a/packages/runner/lib/server.ts +++ b/packages/runner/lib/server.ts @@ -3,14 +3,11 @@ import * as trpcExpress from '@trpc/server/adapters/express'; import express from 'express'; import type { Request, Response, NextFunction } from 'express'; import timeout from 'connect-timeout'; -import type { NangoProps, RunnerOutput } from '@nangohq/shared'; -import { getLogger } from '@nangohq/utils/dist/logger.js'; +import { getJobsUrl, getPersistAPIUrl, type NangoProps, type RunnerOutput } from '@nangohq/shared'; +import { RunnerMonitor } from './monitor.js'; import { exec } from './exec.js'; import { cancel } from './cancel.js'; import superjson from 'superjson'; -import { fetch } from 'undici'; - -const logger = getLogger('Runner'); export const t = initTRPC.create({ transformer: superjson @@ -36,31 +33,27 @@ const appRouter = router({ export type AppRouter = typeof appRouter; function healthProcedure() { - return publicProcedure - .use(async (opts) => { - pendingRequests.add(opts); - const next = opts.next(); - pendingRequests.delete(opts); - lastRequestTime = Date.now(); - return next; - }) - .query(() => { - return { status: 'ok' }; - }); + return publicProcedure.query(() => { + return { status: 'ok' }; + }); } -const idleMaxDurationMs = parseInt(process.env['IDLE_MAX_DURATION_MS'] || '') || 0; const runnerId = process.env['RUNNER_ID'] || ''; -let lastRequestTime = Date.now(); -const pendingRequests = new Set(); -const notifyIdleEndpoint = process.env['NOTIFY_IDLE_ENDPOINT'] || ''; +const jobsServiceUrl = process.env['NOTIFY_IDLE_ENDPOINT']?.replace(/\/idle$/, '') || getJobsUrl(); // TODO: remove legacy NOTIFY_IDLE_ENDPOINT once all runners are updated with JOBS_SERVICE_URL env var +const persistServiceUrl = getPersistAPIUrl(); +const usage = new RunnerMonitor({ runnerId, jobsServiceUrl, persistServiceUrl }); function runProcedure() { return publicProcedure .input((input) => input as RunParams) .mutation(async ({ input }): Promise => { const { nangoProps, code, codeParams } = input; - return await exec(nangoProps, input.isInvokedImmediately, input.isWebhook, code, codeParams); + try { + usage.track(nangoProps); + return await exec(nangoProps, input.isInvokedImmediately, input.isWebhook, code, codeParams); + } finally { + usage.untrack(nangoProps); + } }); } @@ -86,38 +79,3 @@ server.use(haltOnTimedout); function haltOnTimedout(req: Request, _res: Response, next: NextFunction) { if (!req.timedout) next(); } - -if (idleMaxDurationMs > 0) { - setInterval(async () => { - if (pendingRequests.size == 0) { - const idleTimeMs = Date.now() - lastRequestTime; - if (idleTimeMs > idleMaxDurationMs) { - logger.info(`Runner '${runnerId}' idle for more than ${idleMaxDurationMs}ms`); - // calling jobs service to suspend runner - // using fetch instead of jobs trcp client to avoid circular dependency - // TODO: use trpc client once jobs doesn't depend on runner - if (notifyIdleEndpoint.length > 0) { - try { - const res = await fetch(notifyIdleEndpoint, { - method: 'post', - headers: { - Accept: 'application/json', - 'Content-Type': 'application/json' - }, - body: superjson.stringify({ - runnerId, - idleTimeMs - }) - }); - if (res.status !== 200) { - logger.error(`Error calling ${notifyIdleEndpoint}: ${JSON.stringify(await res.json())}`); - } - } catch (err) { - logger.error(`Error calling ${notifyIdleEndpoint}: ${JSON.stringify(err)}`); - } - } - lastRequestTime = Date.now(); // reset last request time - } - } - }, 10000); -} diff --git a/packages/shared/lib/utils/utils.ts b/packages/shared/lib/utils/utils.ts index 7915b90e6a6..c391749a1a2 100644 --- a/packages/shared/lib/utils/utils.ts +++ b/packages/shared/lib/utils/utils.ts @@ -88,6 +88,10 @@ export function getPersistAPIUrl() { return process.env['PERSIST_SERVICE_URL'] || 'http://localhost:3007'; } +export function getJobsUrl() { + return process.env['JOBS_SERVICE_URL'] || 'http://localhost:3005'; +} + function getServerHost() { return process.env['SERVER_HOST'] || process.env['SERVER_RUN_MODE'] === 'DOCKERIZED' ? 'http://nango-server' : 'http://localhost'; } diff --git a/packages/utils/lib/environment/parse.ts b/packages/utils/lib/environment/parse.ts index abff8c1dff7..915a0650914 100644 --- a/packages/utils/lib/environment/parse.ts +++ b/packages/utils/lib/environment/parse.ts @@ -37,7 +37,6 @@ export const ENVS = z.object({ RUNNER_OWNER_ID: z.coerce.number().optional(), RUNNER_ID: z.string().optional(), IDLE_MAX_DURATION_MS: z.coerce.number().default(0), - NOTIFY_IDLE_ENDPOINT: z.string().optional(), // Demo DEFAULT_GITHUB_CLIENT_ID: z.string().optional(),