Skip to content

Commit

Permalink
added signer
Browse files Browse the repository at this point in the history
  • Loading branch information
JulienR1 committed Nov 23, 2023
1 parent 6b7a852 commit 7a6105f
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 31 deletions.
10 changes: 8 additions & 2 deletions bin/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export interface WebhookRunOptions extends commander.RunOptions {
webhookUrl: string;
secretKey: string;
disablePing: boolean;
expiryTime: number;
}

// Run Webhook Sink
Expand All @@ -25,6 +26,11 @@ command.addOption(
.env("SECRET_KEY"),
);
command.addOption(new Option("--disable-ping", "Disable ping on init").env("DISABLE_PING").default(false));
command.addOption(
new Option("--expiry-time <number>", "Time before a transmission becomes invalid (in seconds)")
.env("EXPIRY_TIME")
.default(40),
);
command.action(action);

program
Expand All @@ -45,9 +51,9 @@ program
.makeOptionMandatory()
.env("SECRET_KEY"),
)
.action(async (options) => {
.action(async (options: WebhookRunOptions) => {
logger.settings.type = "hidden";
const response = await ping(options.webhookUrl, options.secretKey);
const response = await ping(options.webhookUrl, options.secretKey, options.expiryTime);
if (response) console.log("✅ OK");
else console.log("⁉️ ERROR");
});
Expand Down
11 changes: 6 additions & 5 deletions index.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import PQueue from "p-queue";
import { http, logger, setup } from "substreams-sink";
import { postWebhook } from "./src/postWebhook.js";
import { signMessage } from "./src/signMessage.js";

import type { SessionInit } from "@substreams/core/proto";
import type { WebhookRunOptions } from "./bin/cli.js";
import { banner } from "./src/banner.js";
import { toText } from "./src/http.js";
import { ping } from "./src/ping.js";
import { Signer } from "./src/signer.js";

export async function action(options: WebhookRunOptions) {
// Block Emitter
Expand All @@ -16,9 +16,12 @@ export async function action(options: WebhookRunOptions) {
// Queue
const queue = new PQueue({ concurrency: 1 }); // all messages are sent in block order, no need to parallelize

// Signer
const signer = new Signer(options.secretKey, options.expiryTime);

// Ping URL to check if it's valid
if (!options.disablePing) {
if (!(await ping(options.webhookUrl, options.secretKey))) {
if (!(await ping(options.webhookUrl, options.secretKey, options.expiryTime))) {
logger.error("exiting from invalid PING response");
process.exit(1);
}
Expand Down Expand Up @@ -54,13 +57,11 @@ export async function action(options: WebhookRunOptions) {
},
};
// Sign body
const seconds = Number(clock.timestamp.seconds);
const body = JSON.stringify({ ...metadata, data });
const signature = signMessage(seconds, body, options.secretKey);

// Queue POST
queue.add(async () => {
const response = await postWebhook(options.webhookUrl, body, signature, seconds);
const response = await postWebhook(options.webhookUrl, body, signer);
logger.info("POST", response, metadata);
});
});
Expand Down
18 changes: 9 additions & 9 deletions src/ping.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
import { postWebhook } from "./postWebhook.js";
import { keyPair, signMessage } from "./signMessage.js";
import { keyPair } from "./signMessage.js";
import { Signer } from "./signer.js";

export async function ping(url: string, secretKey: string) {
export async function ping(url: string, secretKey: string, expirationTime: number) {
const body = JSON.stringify({ message: "PING" });
const timestamp = Math.floor(Date.now().valueOf() / 1000);
const signature = signMessage(timestamp, body, secretKey);
const invalidSecretKey = keyPair().secretKey;
const invalidSignature = signMessage(timestamp, body, invalidSecretKey);

const signer = new Signer(secretKey, expirationTime);
const invalidSigner = new Signer(invalidSecretKey, expirationTime);

// send valid signature (must respond with 200)
try {
await postWebhook(url, body, signature, timestamp, { maximumAttempts: 0 });
await postWebhook(url, body, signer, { maximumAttempts: 0 });
} catch (_e) {
return false;
}

// send invalid signature (must NOT respond with 200)
try {
await postWebhook(url, body, invalidSignature, timestamp, {
maximumAttempts: 0,
});
await postWebhook(url, body, invalidSigner, { maximumAttempts: 0 });
return false;
} catch (_e) {
return true;
Expand Down
24 changes: 9 additions & 15 deletions src/postWebhook.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { logger } from "substreams-sink";
import { Signer } from "./signer.js";

function awaitSetTimeout(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms));
Expand All @@ -8,13 +9,7 @@ interface PostWebhookOptions {
maximumAttempts?: number;
}

export async function postWebhook(
url: string,
body: string,
signature: string,
timestamp: number,
options: PostWebhookOptions = {},
) {
export async function postWebhook(url: string, body: string, signer: Signer, options: PostWebhookOptions = {}) {
// Retry Policy
const initialInterval = 1000; // 1s
const maximumAttempts = options.maximumAttempts ?? 100 * initialInterval;
Expand All @@ -27,40 +22,39 @@ export async function postWebhook(
logger.error("invalid response", { url });
throw new Error("invalid response");
}

if (attempts > maximumAttempts) {
logger.error("Maximum attempts exceeded", { url });
throw new Error("Maximum attempts exceeded");
}

if (attempts) {
let milliseconds = initialInterval * backoffCoefficient ** attempts;
if (milliseconds > maximumInterval) milliseconds = maximumInterval;
logger.warn(`delay ${milliseconds}`, { attempts, url });
await awaitSetTimeout(milliseconds);
}

try {
const response = await fetch(url, {
body,
method: "POST",
headers: {
"content-type": "application/json",
"x-signature-ed25519": signature,
"x-signature-timestamp": String(timestamp),
"x-signature-ed25519": signer.signature,
},
});

const status = response.status;
if (status !== 200) {
attempts++;
logger.warn(`Unexpected status code ${status}`, {
url,
timestamp,
body,
});
logger.warn(`Unexpected status code ${status}`, { url, body });
continue;
}
return { url, status };
} catch (e: any) {
const error = e.cause;
logger.error("Unexpected error", { url, timestamp, body, error });
logger.error("Unexpected error", { url, body, error });
attempts++;
}
}
Expand Down
31 changes: 31 additions & 0 deletions src/signer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { makeSignature } from "./signMessage.js";

export class Signer {
private latestSignature!: { signature: string; expiryTime: number };

constructor(public secretKey: string, public expirationTime: number) {
this.refreshSignature();
}

public get signature() {
if (this.latestSignature.expiryTime - this.now() < 0.6 * this.expirationTime * 1000) {
this.refreshSignature();
}

return this.latestSignature.signature;
}

private refreshSignature() {
const expiryTime = this.nextExpiredTime();
const signature = makeSignature(expiryTime, this.secretKey);
this.latestSignature = { expiryTime, signature };
}

private now() {
return new Date().getTime();
}

private nextExpiredTime() {
return this.now() + this.expirationTime * 1000;
}
}

0 comments on commit 7a6105f

Please sign in to comment.