Skip to content

Commit

Permalink
H-3709, H-3746, H-3747: HaRPC TypeScript implementation (#5836)
Browse files Browse the repository at this point in the history
Co-authored-by: Ciaran Morinan <37743469+CiaranMn@users.noreply.github.com>
  • Loading branch information
indietyp and CiaranMn authored Dec 9, 2024
1 parent 43abc52 commit a0eea11
Show file tree
Hide file tree
Showing 38 changed files with 899 additions and 307 deletions.
2 changes: 2 additions & 0 deletions apps/hash-api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
"@graphql-tools/schema": "8.5.1",
"@linear/sdk": "6.0.0",
"@local/advanced-types": "0.0.0-private",
"@local/harpc-client": "0.0.0-private",
"@local/hash-backend-utils": "0.0.0-private",
"@local/hash-graph-client": "0.0.0-private",
"@local/hash-graph-sdk": "0.0.0-private",
Expand Down Expand Up @@ -72,6 +73,7 @@
"cors": "2.8.5",
"cross-env": "7.0.3",
"dedent": "0.7.0",
"effect": "3.11.3",
"exponential-backoff": "3.1.1",
"express": "4.21.2",
"express-handlebars": "7.1.3",
Expand Down
55 changes: 55 additions & 0 deletions apps/hash-api/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,24 @@ import http from "node:http";
import path from "node:path";
import { promisify } from "node:util";

import { JsonDecoder, JsonEncoder } from "@local/harpc-client/codec";
import { Client as RpcClient, Transport } from "@local/harpc-client/net";
import { RequestIdProducer } from "@local/harpc-client/wire-protocol";
import { getAwsRegion } from "@local/hash-backend-utils/aws-config";
import { createGraphClient } from "@local/hash-backend-utils/create-graph-client";
import { OpenSearch } from "@local/hash-backend-utils/search/opensearch";
import { GracefulShutdown } from "@local/hash-backend-utils/shutdown";
import { createTemporalClient } from "@local/hash-backend-utils/temporal";
import { createVaultClient } from "@local/hash-backend-utils/vault";
import type { EnforcedEntityEditionProvenance } from "@local/hash-graph-sdk/entity";
import { EchoSubsystem } from "@local/hash-graph-sdk/harpc";
import { getHashClientTypeFromRequest } from "@local/hash-isomorphic-utils/http-requests";
import { isSelfHostedInstance } from "@local/hash-isomorphic-utils/instance";
import * as Sentry from "@sentry/node";
import bodyParser from "body-parser";
import cors from "cors";
import { Effect, Exit, Layer, Logger, LogLevel, ManagedRuntime } from "effect";
import { RuntimeException } from "effect/Cause";
import proxy from "express-http-proxy";
import type { Options as RateLimitOptions } from "express-rate-limit";
import { rateLimit } from "express-rate-limit";
Expand Down Expand Up @@ -521,6 +527,55 @@ const main = async () => {
res.send("Hello World");
});

/** RPC */
if (process.env.HASH_RPC_ENABLED === "true") {
const rpcClient = RpcClient.layer();

const runtime = ManagedRuntime.make(
Layer.mergeAll(
rpcClient,
RequestIdProducer.layer,
JsonDecoder.layer,
JsonEncoder.layer,
),
);

shutdown.addCleanup("ManagedRuntime", () => runtime.dispose());

const rpcHost = process.env.HASH_RPC_HOST ?? "127.0.0.1";
const rpcPort = parseInt(process.env.HASH_RPC_PORT ?? "4002", 10);

app.get("/rpc/echo", (req, res, next) => {
// eslint-disable-next-line func-names
const effect = Effect.gen(function* () {
const textQueryParam = req.query.text;
if (typeof textQueryParam !== "string") {
return yield* new RuntimeException(
"text query parameter is required",
);
}

const response = yield* EchoSubsystem.echo(textQueryParam);
res.status(200).send(response);
}).pipe(
Effect.provide(
RpcClient.connectLayer(
Transport.multiaddr(`/ip4/${rpcHost}/tcp/${rpcPort}`),
),
),
Logger.withMinimumLogLevel(LogLevel.Trace),
);

runtime.runCallback(effect, {
onExit: (exit) => {
if (Exit.isFailure(exit)) {
next(exit.cause);
}
},
});
});
}

// Used by AWS Application Load Balancer (ALB) for health checks
app.get("/health-check", (_, res) => res.status(200).send("Hello World!"));

Expand Down
4 changes: 3 additions & 1 deletion libs/@local/graph/sdk/typescript/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
"dependencies": {
"@blockprotocol/graph": "0.4.0-canary.0",
"@local/advanced-types": "0.0.0-private",
"@local/harpc-client": "0.0.0-private",
"@local/hash-graph-client": "0.0.0-private",
"@local/hash-graph-types": "0.0.0-private"
"@local/hash-graph-types": "0.0.0-private",
"effect": "3.11.3"
},
"devDependencies": {
"@local/eslint-config": "0.0.0-private",
Expand Down
110 changes: 110 additions & 0 deletions libs/@local/graph/sdk/typescript/src/harpc.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// This code is written in a way that be *easily* auto generated, which is why we use the classes.

import { ClientError } from "@local/harpc-client";
import { Decoder, Encoder } from "@local/harpc-client/codec";
import {
Connection,
Request,
// eslint-disable-next-line @typescript-eslint/no-unused-vars -- otherwise TypeScript will fail on inference, I don't know why
Response,
Transaction,
} from "@local/harpc-client/net";
import {
ProcedureDescriptor,
ProcedureId,
SubsystemDescriptor,
SubsystemId,
Version,
} from "@local/harpc-client/types";
// eslint-disable-next-line @typescript-eslint/no-unused-vars -- otherwise TypeScript will fail on inference, I don't know why
import { RequestIdProducer } from "@local/harpc-client/wire-protocol";
import {
Effect,
Function,
Option,
pipe,
Predicate,
Schema,
Stream,
} from "effect";

const ServerResult = <A, I, R>(ok: Schema.Schema<A, I, R>) =>
Schema.transform(
Schema.Union(
Schema.Struct({
Ok: ok,
}),
Schema.Struct({
Err: Schema.Unknown,
}),
),
Schema.Either({
left: Schema.instanceOf(ClientError.ServerError),
right: Schema.typeSchema(ok),
}),
{
strict: true,
decode: (value) => {
if (Predicate.hasProperty(value, "Ok")) {
return { _tag: "Right", right: value.Ok } as const;
}

return {
_tag: "Left",
left: new ClientError.ServerError({ cause: value.Err }),
} as const;
},
encode: (value) => {
if (value._tag === "Right") {
return { Ok: value.right };
}

return { Err: value.left.cause } as const;
},
},
);

export class EchoSubsystem {
static #subsystemId = 0x00;
static #version = Version.make(0x00, 0x00);

// eslint-disable-next-line func-names
static echo = Effect.fn("echo")(function* (payload: string) {
const procedureId = 0x00;

const connection = yield* Connection.Connection;
const encoder = yield* Encoder.Encoder;
const decoder = yield* Decoder.Decoder;

// buffer the stream, to send any encoding errors straight to the client
// see: https://linear.app/hash/issue/H-3748/request-interruption
const requestStream = yield* pipe(
payload,
Stream.succeed,
encoder.encode(Schema.String),
Stream.runCollect,
Effect.map(Stream.fromChunk),
);

const request = yield* Request.make(
SubsystemDescriptor.make(
yield* SubsystemId.make(EchoSubsystem.#subsystemId),
EchoSubsystem.#version,
),
ProcedureDescriptor.make(yield* ProcedureId.make(procedureId)),
requestStream,
);

const transaction = yield* Connection.send(connection, request);
const response = Transaction.read(transaction);

const items = decoder.decode(response.body, ServerResult(Schema.String));
const item = yield* Stream.runHead(items);

const eitherItem = Option.getOrThrowWith(item, () =>
ClientError.ExpectedItemCountMismatchError.exactly(1, 0),
);

return yield* eitherItem;
}, Effect.map(Function.identity));
}
1 change: 1 addition & 0 deletions libs/@local/harpc/client/typescript/.eslintrc.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ module.exports = {
"@typescript-eslint/no-redeclare": "off",
"unicorn/filename-case": "off",
"func-names": "off",
"canonical/filename-no-index": "off",
},
ignorePatterns: require("@local/eslint-config/generate-ignore-patterns.cjs")(
__dirname,
Expand Down
8 changes: 6 additions & 2 deletions libs/@local/harpc/client/typescript/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@
"type": "module",
"exports": {
".": {
"import": "./dist/index.js",
"types": "./dist/index.d.ts"
"import": "./dist/src/index.js",
"types": "./dist/src/index.d.ts"
},
"./*": {
"import": "./dist/src/*/index.js",
"types": "./dist/src/*/index.d.ts"
}
},
"scripts": {
Expand Down
85 changes: 85 additions & 0 deletions libs/@local/harpc/client/typescript/src/ClientError.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import { Data, Inspectable, Match, Option, pipe } from "effect";

export class InvalidUtf8Error extends Data.TaggedError("InvalidUtf8Error")<{
readonly cause: unknown;
}> {
get message() {
return "Invalid UTF-8 encoding";
}
}

export class ServerError extends Data.TaggedError("ServerError")<{
readonly cause: unknown;
}> {
get message() {
return `Server error: ${Inspectable.toStringUnknown(this.cause)}`;
}
}

export class ExpectedItemCountMismatchError extends Data.TaggedError(
"ExpectedItemCountMismatchError",
)<{
min: Option.Option<number>;
max: Option.Option<number>;
received: number;
}> {
static exactly(expected: number, actual: number) {
return new ExpectedItemCountMismatchError({
min: Option.some(expected),
max: Option.some(expected),
received: actual,
});
}

static atLeast(expected: number, actual: number) {
return new ExpectedItemCountMismatchError({
min: Option.some(expected),
max: Option.none(),
received: actual,
});
}

static atMost(expected: number, actual: number) {
return new ExpectedItemCountMismatchError({
min: Option.none(),
max: Option.some(expected),
received: actual,
});
}

static between(min: number, max: number, actual: number) {
return new ExpectedItemCountMismatchError({
min: Option.some(min),
max: Option.some(max),
received: actual,
});
}

get message() {
return pipe(
Match.value({ min: this.min, max: this.max }),
Match.when(
{ min: Option.isSome<number>, max: Option.isSome<number> },
({ min, max }) =>
min.value === max.value
? `Expected exactly ${min.value} items, got ${this.received}`
: `Expected between ${min.value} and ${max.value} items, got ${this.received}`,
),
Match.when(
{ min: Option.isSome<number>, max: Option.isNone<number> },
({ min }) =>
`Expected at least ${min.value} items, got ${this.received}`,
),
Match.when(
{ min: Option.isNone<number>, max: Option.isSome<number> },
({ max }) =>
`Expected at most ${max.value} items, got ${this.received}`,
),
Match.when(
{ min: Option.isNone<number>, max: Option.isNone<number> },
() => `Mismatched amount of items, got ${this.received}`,
),
Match.orElseAbsurd,
);
}
}
Loading

0 comments on commit a0eea11

Please sign in to comment.