Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replacing request library for callbacks with fetch #585

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 32 additions & 46 deletions lib/hub/action_request.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, ge
};
Object.defineProperty(exports, "__esModule", { value: true });
exports.ActionRequest = exports.ActionDownloadSettings = exports.ActionVisualizationFormatting = exports.ActionFormatting = exports.ActionFormat = exports.ActionType = void 0;
const node_fetch_1 = require("node-fetch");
jimbert marked this conversation as resolved.
Show resolved Hide resolved
const oboe = require("oboe");
const httpRequest = require("request");
const semver = require("semver");
const stream_1 = require("stream");
const winston = require("winston");
Expand Down Expand Up @@ -122,55 +122,41 @@ class ActionRequest {
return __awaiter(this, void 0, void 0, function* () {
const stream = new stream_1.PassThrough();
const returnPromise = callback(stream);
const timeout = process.env.ACTION_HUB_STREAM_REQUEST_TIMEOUT ?
parseInt(process.env.ACTION_HUB_STREAM_REQUEST_TIMEOUT, 10)
:
13 * 60 * 1000;
const url = this.scheduledPlan && this.scheduledPlan.downloadUrl;
const streamPromise = new Promise((resolve, reject) => {
if (url) {
winston.info(`[stream] beginning stream via download url`, this.logInfo);
let hasResolved = false;
httpRequest
.get(url, { timeout })
.on("error", (err) => {
if (hasResolved && err.code === "ECONNRESET") {
winston.info(`[stream] ignoring ECONNRESET that occured after streaming finished`, this.logInfo);
}
else {
winston.error(`[stream] request stream error`, Object.assign(Object.assign({}, this.logInfo), { error: err.message, stack: err.stack }));
reject(err);
}
})
.on("finish", () => {
winston.info(`[stream] streaming via download url finished`, this.logInfo);
})
.on("socket", (socket) => {
winston.info(`[stream] setting keepalive on socket`, this.logInfo);
socket.setKeepAlive(true);
})
.on("abort", () => {
winston.info(`[stream] streaming via download url aborted`, this.logInfo);
})
.on("response", () => {
winston.info(`[stream] got response from download url`, this.logInfo);
})
.on("close", () => {
winston.info(`[stream] request stream closed`, this.logInfo);
})
.pipe(stream)
.on("error", (err) => {
winston.error(`[stream] PassThrough stream error`, Object.assign({}, this.logInfo));
reject(err);
})
.on("finish", () => {
winston.info(`[stream] PassThrough stream finished`, this.logInfo);
resolve();
hasResolved = true;
})
.on("close", () => {
winston.info(`[stream] PassThrough stream closed`, this.logInfo);
});
try {
(0, node_fetch_1.default)(url).then((response) => {
if (response.ok) {
response.body.pipe(stream)
.on("error", (err) => {
winston.error(`[stream] PassThrough stream error`, this.logInfo);
reject(err);
})
.on("finish", () => {
winston.info(`[stream] PassThrough stream finished`, this.logInfo);
})
.on("close", () => {
winston.info(`[stream] PassThrough stream closed`, this.logInfo);
jimbert marked this conversation as resolved.
Show resolved Hide resolved
resolve();
});
}
else {
const responseText = `[stream] There was a problem in the callback HTTPS Status Code: ${response.status}`;
winston.warn(responseText, this.logInfo);
reject(responseText);
}
}).catch((err) => {
const responseText = `[stream] There was a problem in the fetch request: ${err.toString()}`;
winston.warn(responseText, this.logInfo);
reject(responseText);
});
}
catch (e) {
winston.warn(`Error connecting to callback url`, this.logInfo);
reject(e);
}
}
else {
if (this.attachment && this.attachment.dataBuffer) {
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
"@types/express": "^4.17.17",
"@types/jsforce": "^1.9.33",
"@types/node": "^18.11.11",
"@types/node-fetch": "^2.6.11",
phillipperalez marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR says it's replacing the request library. Can we remove the dependency?

"@types/nodemailer": "^4.6.8",
"@types/normalize-url": "3.3.0",
"@types/oboe": "^2.0.29",
Expand Down
75 changes: 27 additions & 48 deletions src/hub/action_request.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as express from "express"
import fetch from "node-fetch"
import * as oboe from "oboe"
import * as httpRequest from "request"
import * as semver from "semver"
import { PassThrough, Readable } from "stream"
import * as winston from "winston"
Expand Down Expand Up @@ -180,62 +180,41 @@ export class ActionRequest {

const stream = new PassThrough()
const returnPromise = callback(stream)
const timeout = process.env.ACTION_HUB_STREAM_REQUEST_TIMEOUT ?
parseInt(process.env.ACTION_HUB_STREAM_REQUEST_TIMEOUT, 10)
:
13 * 60 * 1000

const url = this.scheduledPlan && this.scheduledPlan.downloadUrl

const streamPromise = new Promise<void>((resolve, reject) => {
if (url) {
winston.info(`[stream] beginning stream via download url`, this.logInfo)
let hasResolved = false
httpRequest
.get(url, {timeout})
.on("error", (err) => {
if (hasResolved && (err as any).code === "ECONNRESET") {
winston.info(`[stream] ignoring ECONNRESET that occured after streaming finished`, this.logInfo)
try {
fetch(url).then((response: any) => {
if (response.ok) {
response.body.pipe(stream)
.on("error", (err: any) => {
winston.error(`[stream] PassThrough stream error`, this.logInfo)
reject(err)
})
.on("finish", () => {
winston.info(`[stream] PassThrough stream finished`, this.logInfo)
phillipperalez marked this conversation as resolved.
Show resolved Hide resolved
})
.on("close", () => {
winston.info(`[stream] PassThrough stream closed`, this.logInfo)
phillipperalez marked this conversation as resolved.
Show resolved Hide resolved
resolve()
})
} else {
winston.error(`[stream] request stream error`, {
...this.logInfo,
error: err.message,
stack: err.stack,
})
reject(err)
const responseText = `[stream] There was a problem in the callback HTTPS Status Code: ${response.status}`
winston.warn(responseText, this.logInfo)
reject(responseText)
}
}).catch((err: any) => {
const responseText = `[stream] There was a problem in the fetch request: ${err.toString()}`
winston.warn(responseText, this.logInfo)
reject(responseText)
})
.on("finish", () => {
winston.info(`[stream] streaming via download url finished`, this.logInfo)
})
.on("socket", (socket) => {
winston.info(`[stream] setting keepalive on socket`, this.logInfo)
socket.setKeepAlive(true)
})
.on("abort", () => {
winston.info(`[stream] streaming via download url aborted`, this.logInfo)
})
.on("response", () => {
winston.info(`[stream] got response from download url`, this.logInfo)
})
.on("close", () => {
winston.info(`[stream] request stream closed`, this.logInfo)
})
.pipe(stream)
.on("error", (err) => {
winston.error(`[stream] PassThrough stream error`, {
...this.logInfo,
})
reject(err)
})
.on("finish", () => {
winston.info(`[stream] PassThrough stream finished`, this.logInfo)
resolve()
hasResolved = true
})
.on("close", () => {
winston.info(`[stream] PassThrough stream closed`, this.logInfo)
})
} catch (e) {
winston.warn(`Error connecting to callback url`, this.logInfo)
reject(e)
}
} else {
if (this.attachment && this.attachment.dataBuffer) {
winston.info(`Using "fake" streaming because request contained attachment data.`, this.logInfo)
Expand Down
8 changes: 8 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,14 @@
resolved "https://registry.yarnpkg.com/@types/mocha/-/mocha-8.2.3.tgz#bbeb55fbc73f28ea6de601fbfa4613f58d785323"
integrity sha512-ekGvFhFgrc2zYQoX4JeZPmVzZxw6Dtllga7iGHzfbYIYkAMUx/sAFP2GdFpLff+vdHXu5fl7WX9AT+TtqYcsyw==

"@types/node-fetch@^2.6.11":
version "2.6.11"
resolved "https://registry.yarnpkg.com/@types/node-fetch/-/node-fetch-2.6.11.tgz#9b39b78665dae0e82a08f02f4967d62c66f95d24"
integrity sha512-24xFj9R5+rfQJLRyM56qh+wnVSYhyXC2tkoBndtY0U+vubqNsYXGjufB2nn8Q6gt0LrARwL6UBtMCSVCwl4B1g==
dependencies:
"@types/node" "*"
form-data "^4.0.0"

"@types/node@*":
version "17.0.10"
resolved "https://registry.yarnpkg.com/@types/node/-/node-17.0.10.tgz#616f16e9d3a2a3d618136b1be244315d95bd7cab"
Expand Down
Loading