Skip to content

Commit

Permalink
runner: report high memory usage
Browse files Browse the repository at this point in the history
  • Loading branch information
TBonnin committed Apr 3, 2024
1 parent 339a802 commit fdaf59c
Show file tree
Hide file tree
Showing 9 changed files with 180 additions and 63 deletions.
6 changes: 3 additions & 3 deletions packages/jobs/lib/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import { cronAutoIdleDemo } from './crons/autoIdleDemo.js';
import { deleteOldActivityLogs } from './crons/deleteOldActivities.js';
import { deleteSyncsData } from './crons/deleteSyncsData.js';
import { getLogger } from '@nangohq/utils/dist/logger.js';
import { JOBS_PORT } from './constants.js';

const logger = getLogger('Jobs');

try {
const port = parseInt(process.env['NANGO_JOBS_PORT'] || '') || 3005;
server.listen(port);
logger.info(`🚀 service ready at http://localhost:${port}`);
server.listen(JOBS_PORT);
logger.info(`🚀 service ready at http://localhost:${JOBS_PORT}`);
const temporalNs = process.env['TEMPORAL_NAMESPACE'] || 'default';
const temporal = new Temporal(temporalNs);

Expand Down
1 change: 1 addition & 0 deletions packages/jobs/lib/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export const JOBS_PORT = parseInt(process.env['NANGO_JOBS_PORT'] || '') || 3005;
7 changes: 6 additions & 1 deletion packages/jobs/lib/runner/local.runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,12 @@ export class LocalRunner implements Runner {
logger.info(`[Runner] Starting runner with command: ${cmd} ${cmdOptions.join(' ')} `);

const childProcess = spawn(cmd, cmdOptions, {
stdio: [null, null, null]
stdio: [null, null, null],
env: {
...process.env,
RUNNER_ID: runnerId,
IDLE_MAX_DURATION_MS: '60000'
}
});

if (!childProcess) {
Expand Down
2 changes: 1 addition & 1 deletion packages/jobs/lib/runner/render.runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ export class RenderRunner implements Runner {
{ key: 'NANGO_CLOUD', value: process.env['NANGO_CLOUD'] || 'true' },
{ key: 'NODE_OPTIONS', value: '--max-old-space-size=384' },
{ key: 'RUNNER_ID', value: runnerId },
{ key: 'NOTIFY_IDLE_ENDPOINT', value: `${jobsServiceUrl}/idle` },
{ key: 'JOBS_SERVICE_URL', value: `${jobsServiceUrl}` },
{ key: 'IDLE_MAX_DURATION_MS', value: `${25 * 60 * 60 * 1000}` }, // 25 hours
{ key: 'PERSIST_SERVICE_URL', value: getPersistAPIUrl() },
{ key: 'NANGO_TELEMETRY_SDK', value: process.env['NANGO_TELEMETRY_SDK'] || 'false' },
Expand Down
2 changes: 1 addition & 1 deletion packages/jobs/lib/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ function healthProcedure() {
function idleProcedure() {
return publicProcedure.input(z.object({ runnerId: z.string().nonempty(), idleTimeMs: z.number() })).mutation(async ({ input }) => {
const { runnerId, idleTimeMs } = input;
logger.info(`[IDLE]: runner '${runnerId}' has been idle for ${idleTimeMs}ms. Suspending...`);
logger.info(`[runner ${runnerId}]: idle for ${idleTimeMs}ms. Suspending...`);
await suspendRunner(runnerId);
return { status: 'ok' };
});
Expand Down
150 changes: 150 additions & 0 deletions packages/runner/lib/monitor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import os from 'os';
import fs from 'fs';
import { stringifyError, type NangoProps } from '@nangohq/shared';
import * as superjson from 'superjson';
import { fetch } from 'undici';
import { getLogger } from '@nangohq/utils/dist/logger.js';

const MEMORY_WARNING_PERCENTAGE_THRESHOLD = 75;
const logger = getLogger('Runner');

export class RunnerMonitor {
private runnerId: string;
private tracked: Map<number, NangoProps> = new Map<number, NangoProps>();
private jobsServiceUrl: string = '';
private persistServiceUrl: string = '';
private idleMaxDurationMs = parseInt(process.env['IDLE_MAX_DURATION_MS'] || '') || 0;
private lastIdleTrackingDate = Date.now();
private lastMemoryReportDate: Date | null = null;
private idleInterval: NodeJS.Timeout | null = null;
private memoryInterval: NodeJS.Timeout | null = null;

constructor({ runnerId, jobsServiceUrl, persistServiceUrl }: { runnerId: string; jobsServiceUrl: string; persistServiceUrl: string }) {
this.runnerId = runnerId;
this.jobsServiceUrl = jobsServiceUrl;
this.persistServiceUrl = persistServiceUrl;
if (this.jobsServiceUrl.length > 0) {
this.memoryInterval = this.checkMemoryUsage();
this.idleInterval = this.checkIdle();
}
process.on('SIGTERM', this.onExit.bind(this));
}

private onExit(): void {
if (this.idleInterval) {
clearInterval(this.idleInterval);
}
if (this.memoryInterval) {
clearInterval(this.memoryInterval);
}
}

track(nangoProps: NangoProps): void {
if (nangoProps.syncJobId) {
this.lastIdleTrackingDate = Date.now();
this.tracked.set(nangoProps.syncJobId, nangoProps);
}
}

untrack(nangoProps: NangoProps): void {
if (nangoProps.syncJobId) {
this.tracked.delete(nangoProps.syncJobId);
}
}

private checkMemoryUsage(): NodeJS.Timeout {
// eslint-disable-next-line @typescript-eslint/no-misused-promises
return setInterval(async () => {
const rss = process.memoryUsage().rss;
const total = getTotalMemoryInBytes();
const memoryUsagePercentage = (rss / total) * 100;
if (memoryUsagePercentage > MEMORY_WARNING_PERCENTAGE_THRESHOLD) {
await this.reportHighMemoryUsage(memoryUsagePercentage);
}
}, 1000);
}

private async reportHighMemoryUsage(memoryUsagePercentage: number): Promise<void> {
// only report if it has been more than 30 seconds since the last report
if (this.lastMemoryReportDate) {
const now = new Date();
const diffInSecs = (now.getTime() - this.lastMemoryReportDate.getTime()) / 1000;
if (diffInSecs < 30) {
return;
}
}
this.lastMemoryReportDate = new Date();
for (const { environmentId, activityLogId } of this.tracked.values()) {
if (!environmentId || !activityLogId) {
continue;
}
await httpSend({
method: 'post',
url: `${this.persistServiceUrl}/environment/${environmentId}/log`,
data: JSON.stringify({
activityLogId: activityLogId,
level: 'warn',
msg: `Memory usage of nango scripts is high: ${memoryUsagePercentage.toFixed(2)}% of the total available memory.`
})
});
}
}

private checkIdle(): NodeJS.Timeout {
// eslint-disable-next-line @typescript-eslint/no-misused-promises
return setInterval(async () => {
if (this.tracked.size == 0) {
const idleTimeMs = Date.now() - this.lastIdleTrackingDate;
if (idleTimeMs > this.idleMaxDurationMs) {
logger.info(`Runner '${this.runnerId}' idle for more than ${this.idleMaxDurationMs}ms`);
await httpSend({
method: 'post',
url: `${this.jobsServiceUrl}/idle`,
data: superjson.stringify({
runnerId: this.runnerId,
idleTimeMs
})
});
this.lastIdleTrackingDate = Date.now();
}
}
}, 10000);
}
}

function getRenderTotalMemoryInBytes(): number {
const memoryMaxFile = '/sys/fs/cgroup/memory.max';
try {
const output = fs.readFileSync(memoryMaxFile, 'utf-8');
const memoryLimitInBytes = parseInt(output.trim(), 10);
return memoryLimitInBytes;
} catch {
return 0;
}
}

function getTotalMemoryInBytes(): number {
// when running inside a container, os.totalmem() returns the total memory of the system, not the memory limit of the container
// see: https://github.com/nodejs/node/issues/51095
// process.constrainedMemory() is supposed to return the memory limit of the container but it doesn't work on Render
// so we need to use a workaround to get the memory limit of the container on Render
return process.constrainedMemory() || getRenderTotalMemoryInBytes() || os.totalmem();
}

async function httpSend({ method, url, data }: { method: string; url: string; data: string }): Promise<void> {
try {
const res = await fetch(url, {
method: method,
headers: {
Accept: 'application/json',
'Content-Type': 'application/json'
},
body: data
});
if (res.status !== 200) {
logger.error(`Error sending '${data}' to '${url}': ${JSON.stringify(await res.json())}`);
}
} catch (err) {
logger.error(`Error sending '${data}' to '${url}': ${stringifyError(err)}`);
}
}
70 changes: 14 additions & 56 deletions packages/runner/lib/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,11 @@ import * as trpcExpress from '@trpc/server/adapters/express';
import express from 'express';
import type { Request, Response, NextFunction } from 'express';
import timeout from 'connect-timeout';
import type { NangoProps, RunnerOutput } from '@nangohq/shared';
import { getLogger } from '@nangohq/utils/dist/logger.js';
import { getJobsUrl, getPersistAPIUrl, type NangoProps, type RunnerOutput } from '@nangohq/shared';
import { RunnerMonitor } from './monitor.js';
import { exec } from './exec.js';
import { cancel } from './cancel.js';
import superjson from 'superjson';
import { fetch } from 'undici';

const logger = getLogger('Runner');

export const t = initTRPC.create({
transformer: superjson
Expand All @@ -36,31 +33,27 @@ const appRouter = router({
export type AppRouter = typeof appRouter;

function healthProcedure() {
return publicProcedure
.use(async (opts) => {
pendingRequests.add(opts);
const next = opts.next();
pendingRequests.delete(opts);
lastRequestTime = Date.now();
return next;
})
.query(() => {
return { status: 'ok' };
});
return publicProcedure.query(() => {
return { status: 'ok' };
});
}

const idleMaxDurationMs = parseInt(process.env['IDLE_MAX_DURATION_MS'] || '') || 0;
const runnerId = process.env['RUNNER_ID'] || '';
let lastRequestTime = Date.now();
const pendingRequests = new Set();
const notifyIdleEndpoint = process.env['NOTIFY_IDLE_ENDPOINT'] || '';
const jobsServiceUrl = process.env['NOTIFY_IDLE_ENDPOINT']?.replace(/\/idle$/, '') || getJobsUrl(); // TODO: remove legacy NOTIFY_IDLE_ENDPOINT once all runners are updated with JOBS_SERVICE_URL env var
const persistServiceUrl = getPersistAPIUrl();
const usage = new RunnerMonitor({ runnerId, jobsServiceUrl, persistServiceUrl });

function runProcedure() {
return publicProcedure
.input((input) => input as RunParams)
.mutation(async ({ input }): Promise<RunnerOutput> => {
const { nangoProps, code, codeParams } = input;
return await exec(nangoProps, input.isInvokedImmediately, input.isWebhook, code, codeParams);
try {
usage.track(nangoProps);
return await exec(nangoProps, input.isInvokedImmediately, input.isWebhook, code, codeParams);
} finally {
usage.untrack(nangoProps);
}
});
}

Expand All @@ -86,38 +79,3 @@ server.use(haltOnTimedout);
function haltOnTimedout(req: Request, _res: Response, next: NextFunction) {
if (!req.timedout) next();
}

if (idleMaxDurationMs > 0) {
setInterval(async () => {
if (pendingRequests.size == 0) {
const idleTimeMs = Date.now() - lastRequestTime;
if (idleTimeMs > idleMaxDurationMs) {
logger.info(`Runner '${runnerId}' idle for more than ${idleMaxDurationMs}ms`);
// calling jobs service to suspend runner
// using fetch instead of jobs trcp client to avoid circular dependency
// TODO: use trpc client once jobs doesn't depend on runner
if (notifyIdleEndpoint.length > 0) {
try {
const res = await fetch(notifyIdleEndpoint, {
method: 'post',
headers: {
Accept: 'application/json',
'Content-Type': 'application/json'
},
body: superjson.stringify({
runnerId,
idleTimeMs
})
});
if (res.status !== 200) {
logger.error(`Error calling ${notifyIdleEndpoint}: ${JSON.stringify(await res.json())}`);
}
} catch (err) {
logger.error(`Error calling ${notifyIdleEndpoint}: ${JSON.stringify(err)}`);
}
}
lastRequestTime = Date.now(); // reset last request time
}
}
}, 10000);
}
4 changes: 4 additions & 0 deletions packages/shared/lib/utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ export function getPersistAPIUrl() {
return process.env['PERSIST_SERVICE_URL'] || 'http://localhost:3007';
}

export function getJobsUrl() {
return process.env['JOBS_SERVICE_URL'] || 'http://localhost:3005';
}

function getServerHost() {
return process.env['SERVER_HOST'] || process.env['SERVER_RUN_MODE'] === 'DOCKERIZED' ? 'http://nango-server' : 'http://localhost';
}
Expand Down
1 change: 0 additions & 1 deletion packages/utils/lib/environment/parse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ export const ENVS = z.object({
RUNNER_OWNER_ID: z.coerce.number().optional(),
RUNNER_ID: z.string().optional(),
IDLE_MAX_DURATION_MS: z.coerce.number().default(0),
NOTIFY_IDLE_ENDPOINT: z.string().optional(),

// Demo
DEFAULT_GITHUB_CLIENT_ID: z.string().optional(),
Expand Down

0 comments on commit fdaf59c

Please sign in to comment.