diff --git a/apps/hash-api/src/index.ts b/apps/hash-api/src/index.ts index 79af0468000..bf98282ce0d 100644 --- a/apps/hash-api/src/index.ts +++ b/apps/hash-api/src/index.ts @@ -545,7 +545,7 @@ const main = async () => { const rpcHost = getRequiredEnv("HASH_GRAPH_RPC_HOST"); const rpcPort = parseInt(process.env.HASH_GRAPH_RPC_PORT ?? "4002", 10); - app.get("/rpc/echo", (req, res, next) => { + app.get("/rpc/echo", (req, res) => { // eslint-disable-next-line func-names const effect = Effect.gen(function* () { const textQueryParam = req.query.text; @@ -569,7 +569,7 @@ const main = async () => { runtime.runCallback(effect, { onExit: (exit) => { if (Exit.isFailure(exit)) { - next(exit.cause); + res.status(500).send(exit.cause.toString()); } }, }); diff --git a/libs/@local/harpc/client/typescript/package.json b/libs/@local/harpc/client/typescript/package.json index d6de02e83a3..204922f7ed9 100644 --- a/libs/@local/harpc/client/typescript/package.json +++ b/libs/@local/harpc/client/typescript/package.json @@ -23,6 +23,7 @@ "test:unit": "vitest --run" }, "dependencies": { + "@chainsafe/is-ip": "2.0.2", "@chainsafe/libp2p-noise": "16.0.0", "@chainsafe/libp2p-yamux": "7.0.1", "@libp2p/crypto": "5.0.7", @@ -30,6 +31,7 @@ "@libp2p/interface": "2.2.1", "@libp2p/ping": "2.0.12", "@libp2p/tcp": "10.0.13", + "@multiformats/dns": "1.0.6", "@multiformats/multiaddr": "12.3.4", "effect": "3.11.3", "it-stream-types": "2.0.2", diff --git a/libs/@local/harpc/client/typescript/src/net/Transport.ts b/libs/@local/harpc/client/typescript/src/net/Transport.ts index 7d793673531..2fa6eb1e326 100644 --- a/libs/@local/harpc/client/typescript/src/net/Transport.ts +++ b/libs/@local/harpc/client/typescript/src/net/Transport.ts @@ -1,3 +1,4 @@ +import type { DNS } from "@multiformats/dns"; import { Data } from "effect"; import type { NoiseConfig, TCPConfig, YamuxConfig } from "./Config.js"; @@ -16,7 +17,37 @@ export class InitializationError extends Data.TaggedError( export type Address = internal.Address; +export interface DNSConfig { + /** + * The DNS resolver to use when resolving DNSADDR Multiaddrs. + */ + resolver?: DNS; + + /** + * When resolving DNSADDR Multiaddrs that resolve to other DNSADDR Multiaddrs, + * limit how many times we will recursively resolve them. + * + * @default 32 + */ + maxRecursiveDepth?: number; + + /** + * Amount of cached resolved multiaddrs to keep in memory. + * + * @default 32 + */ + cacheCapacity?: number; + + /** + * Time in milliseconds until a cached resolved multiaddr is considered stale. + * + * @default 5 minutes + */ + cacheTimeToLive?: number; +} + export interface TransportConfig { + dns?: DNSConfig; tcp?: TCPConfig; yamux?: YamuxConfig; noise?: NoiseConfig; diff --git a/libs/@local/harpc/client/typescript/src/net/internal/multiaddr.ts b/libs/@local/harpc/client/typescript/src/net/internal/multiaddr.ts new file mode 100644 index 00000000000..9559960271f --- /dev/null +++ b/libs/@local/harpc/client/typescript/src/net/internal/multiaddr.ts @@ -0,0 +1,124 @@ +import { + isMultiaddr, + type Multiaddr, + type MultiaddrInput, + type ResolveOptions, +} from "@multiformats/multiaddr"; +import { Equal, Hash, pipe, Predicate } from "effect"; + +import { createProto, hashUint8Array } from "../../utils.js"; + +const MultiaddrSymbol = Symbol.for("@multiformats/js-multiaddr/multiaddr"); +type MultiaddrSymbol = typeof MultiaddrSymbol; + +const TypeId: unique symbol = Symbol( + "@local/harpc-client/net/internal/HashableMultiaddr", +); +type TypeId = typeof TypeId; + +/** @internal */ +export interface HashableMultiaddr extends Multiaddr, Equal.Equal { + readonly [TypeId]: TypeId; + readonly [MultiaddrSymbol]: true; + readonly inner: Multiaddr; +} + +const HashableMultiaddrProto: Omit = { + [TypeId]: TypeId, + [MultiaddrSymbol]: true, + + get bytes() { + return (this as HashableMultiaddr).inner.bytes; + }, + + toString(this: HashableMultiaddr) { + return this.inner.toString(); + }, + + toJSON(this: HashableMultiaddr) { + return this.inner.toString(); + }, + + toOptions(this: HashableMultiaddr) { + return this.inner.toOptions(); + }, + + protos(this: HashableMultiaddr) { + return this.inner.protos(); + }, + + protoCodes(this: HashableMultiaddr) { + return this.inner.protoCodes(); + }, + + protoNames(this: HashableMultiaddr) { + return this.inner.protoNames(); + }, + + tuples(this: HashableMultiaddr) { + return this.inner.tuples(); + }, + + stringTuples(this: HashableMultiaddr) { + return this.inner.stringTuples(); + }, + + encapsulate(this: HashableMultiaddr, addr: MultiaddrInput) { + return this.inner.encapsulate(addr); + }, + + decapsulate(this: HashableMultiaddr, addr: Multiaddr | string) { + return this.inner.decapsulate(addr); + }, + + decapsulateCode(this: HashableMultiaddr, code: number) { + return this.inner.decapsulateCode(code); + }, + + getPeerId(this: HashableMultiaddr) { + return this.inner.getPeerId(); + }, + + getPath(this: HashableMultiaddr) { + return this.inner.getPath(); + }, + + equals(this: HashableMultiaddr, other: HashableMultiaddr) { + return this.inner.equals(other.inner); + }, + + resolve(this: HashableMultiaddr, options?: ResolveOptions) { + return this.inner.resolve(options); + }, + + nodeAddress(this: HashableMultiaddr) { + return this.inner.nodeAddress(); + }, + + isThinWaistAddress(this: HashableMultiaddr) { + return this.inner.isThinWaistAddress(); + }, + + [Equal.symbol](this: HashableMultiaddr, other: unknown) { + return isMultiaddr(other) && this.inner.equals(other); + }, + + [Hash.symbol](this: HashableMultiaddr) { + return pipe( + Hash.hash(MultiaddrSymbol), + Hash.combine(hashUint8Array(this.bytes)), + Hash.cached(this), + ); + }, +}; + +const isHashableMultiaddr = (value: unknown): value is HashableMultiaddr => + Predicate.hasProperty(value, TypeId); + +/** @internal */ +export const make = (inner: Multiaddr): HashableMultiaddr => + isHashableMultiaddr(inner) + ? inner + : (createProto(HashableMultiaddrProto, { + inner, + }) satisfies HashableMultiaddr); diff --git a/libs/@local/harpc/client/typescript/src/net/internal/transport.ts b/libs/@local/harpc/client/typescript/src/net/internal/transport.ts index 6e79e15d3a4..f88ec3bd63d 100644 --- a/libs/@local/harpc/client/typescript/src/net/internal/transport.ts +++ b/libs/@local/harpc/client/typescript/src/net/internal/transport.ts @@ -1,3 +1,4 @@ +import { isIPv4, isIPv6 } from "@chainsafe/is-ip"; import { noise } from "@chainsafe/libp2p-noise"; import { yamux } from "@chainsafe/libp2p-yamux"; import type { Identify } from "@libp2p/identify"; @@ -6,16 +7,56 @@ import { isPeerId, type PeerId } from "@libp2p/interface"; import type { PingService } from "@libp2p/ping"; import { ping } from "@libp2p/ping"; import { tcp } from "@libp2p/tcp"; -import { Array, Data, Effect, Option, pipe, Predicate } from "effect"; +import { + type Answer, + type DNS, + dns as defaultDns, + RecordType, +} from "@multiformats/dns"; +import { + multiaddr as makeMultiaddr, + protocols as getProtocol, + resolvers, +} from "@multiformats/multiaddr"; +import { + Array, + Cache, + Cause, + Chunk, + Data, + Effect, + Either, + flow, + Function, + Iterable, + Match, + Option, + pipe, + Predicate, + Stream, +} from "effect"; import type { Libp2p } from "libp2p"; import { createLibp2p } from "libp2p"; import * as NetworkLogger from "../NetworkLogger.js"; -import type { Multiaddr, TransportConfig } from "../Transport.js"; +import type { DNSConfig, Multiaddr, TransportConfig } from "../Transport.js"; import { InitializationError } from "../Transport.js"; +import * as HashableMultiaddr from "./multiaddr.js"; /** @internal */ -export type Transport = Libp2p<{ identify: Identify; ping: PingService }>; +export type Transport = Libp2p<{ + identify: Identify; + ping: PingService; + state: { + config: TransportConfig; + dns: DNS; + cache: Cache.Cache< + HashableMultiaddr.HashableMultiaddr, + Option.Option, + TransportError + >; + }; +}>; /** @internal */ export type Address = PeerId | Multiaddr; @@ -29,19 +70,221 @@ export class TransportError extends Data.TaggedError("TransportError")<{ } } -const resolvePeer = (transport: Transport, address: Address) => +/** @internal */ +export class DnsError extends Data.TaggedError("DnsError")<{ + cause: unknown; +}> { + get message() { + return "Underlying DNS resolver experienced an error"; + } +} + +/** The package @multiaddr/dns typed their own API wrong, so we need to correct it */ +type AnswerExt = Omit & { + type: RecordType | "A" | "AAAA"; +}; + +const DNS_PROTOCOL = getProtocol("dns"); +const DNS4_PROTOCOL = getProtocol("dns4"); +const DNS6_PROTOCOL = getProtocol("dns6"); +const DNS_CODES = [DNS_PROTOCOL.code, DNS4_PROTOCOL.code, DNS6_PROTOCOL.code]; + +const IPV4_PROTOCOL = getProtocol("ip4"); +const IPV6_PROTOCOL = getProtocol("ip6"); + +const resolveDnsMultiaddrSegment = + (dns: DNS) => (code: number, value?: string) => + Effect.gen(function* () { + if (!DNS_CODES.includes(code)) { + return [[code, value] as const]; + } + + if (value === undefined) { + yield* Effect.logWarning( + "domain of dns segment is undefined, skipping", + ).pipe(Effect.annotateLogs({ code, value })); + + return []; + } + + let fqdn = value; + const types: RecordType[] = []; + + if (code === DNS_PROTOCOL.code || code === DNS4_PROTOCOL.code) { + types.push(RecordType.A); + + if (isIPv4(fqdn)) { + return [[IPV4_PROTOCOL.code, fqdn] as const]; + } + } + + if (code === DNS_PROTOCOL.code || code === DNS6_PROTOCOL.code) { + types.push(RecordType.AAAA); + + if (isIPv6(fqdn)) { + return [[IPV6_PROTOCOL.code, fqdn] as const]; + } + } + + if (!fqdn.endsWith(".")) { + fqdn += "."; + } + + // because of a bad implementation of @multiformats/dns we need to dispatch a query for each type + const [errors, responses] = yield* Effect.partition( + types, + (type) => + Effect.tryPromise({ + try: (signal) => dns.query(fqdn, { types: type, signal }), + catch: (cause) => new DnsError({ cause }), + }).pipe(Effect.mapError((cause) => new TransportError({ cause }))), + { + concurrency: "unbounded", + }, + ); + + // if we have been successful at least once we can return the resolved addresses, otherwise error out + if (responses.length === 0) { + if (errors.length === 0) { + return yield* Effect.die( + new Error( + "Expected either responses or errors as the types are always non-empty, received neither", + ), + ); + } + + const [head, ...tail] = errors; + + return yield* pipe( + tail, + Array.map(Cause.fail), + Array.reduce(Cause.fail(head!), Cause.parallel), + Effect.failCause, + ); + } + + return pipe( + responses, + Array.flatMap((response) => response.Answer), + Array.filterMap( + Match.type().pipe( + Match.whenOr( + { type: RecordType.A }, + { type: "A" } as const, + ({ data }) => [IPV4_PROTOCOL.code, data] as const, + ), + Match.whenOr( + { type: RecordType.AAAA }, + { type: "AAAA" } as const, + ({ data }) => [IPV6_PROTOCOL.code, data] as const, + ), + Match.option, + ), + ), + ); + }); + +/** + * Resolve DNS addresses in a multiaddr (excluding DNSADDR) + * + * @internal + */ +const resolveDnsMultiaddr = (multiaddr: Multiaddr, dns: DNS) => { + const resolveSegment = resolveDnsMultiaddrSegment(dns); + + return pipe( + Stream.fromIterable(multiaddr.stringTuples()), + Stream.mapEffect(Function.tupled(resolveSegment), { + concurrency: "unbounded", + }), + Stream.flattenIterables, + Stream.runCollect, + Effect.map( + flow( + Chunk.toReadonlyArray, + Array.map(([code, data]) => [getProtocol(code).name, data] as const), + Array.flatten, + Array.filter(Predicate.isNotUndefined), + Array.map((part) => `/${part}`), + Array.join(""), + makeMultiaddr, + ), + ), + Effect.tap((resolved) => + Effect.logDebug("resolved DNS multiaddr").pipe( + Effect.annotateLogs({ + multiaddr: multiaddr.toString(), + resolved: resolved.toString(), + }), + ), + ), + ); +}; + +/** + * Recursively resolve DNSADDR multiaddrs + * + * Adapted from: https://github.com/libp2p/js-libp2p/blob/92f9acbc1d2aa7b1bb5a8e460e4e0b5770f4455c/packages/libp2p/src/connection-manager/utils.ts#L9 + */ +const resolveDnsaddrMultiaddr = (multiaddr: Multiaddr, options: DNSConfig) => Effect.gen(function* () { - if (isPeerId(address)) { - return Option.some(address); + // check multiaddr resolvers + const resolvable = Iterable.some(resolvers.keys(), (key) => + multiaddr.protoNames().includes(key), + ); + + // return multiaddr if it is not resolvable + if (!resolvable) { + return [multiaddr]; } + const resolved = yield* Effect.tryPromise({ + try: (signal) => + multiaddr.resolve({ + signal, + dns: options.resolver, + maxRecursiveDepth: options.maxRecursiveDepth, + }), + catch: (cause) => new TransportError({ cause }), + }); + + yield* Effect.logDebug("resolved multiaddr").pipe( + Effect.annotateLogs({ + multiaddr: multiaddr.toString(), + resolved: resolved.map((address) => address.toString()), + }), + ); + + return resolved; + }); + +const lookupPeer = (transport: Transport, address: Multiaddr) => + Effect.gen(function* () { + const resolved = yield* pipe( + resolveDnsaddrMultiaddr(address, { + resolver: transport.services.state.dns, + maxRecursiveDepth: + transport.services.state.config.dns?.maxRecursiveDepth, + }), + Stream.fromIterableEffect, + Stream.flatMap( + (multiaddr) => + resolveDnsMultiaddr(multiaddr, transport.services.state.dns), + { concurrency: "unbounded" }, + ), + Stream.runCollect, + Effect.map(Chunk.toReadonlyArray), + ); + const peers = yield* Effect.tryPromise({ try: () => transport.peerStore.all({ filters: [ (peer) => peer.addresses.some((peerAddress) => - peerAddress.multiaddr.equals(address), + resolved.some((resolvedAddress) => + resolvedAddress.equals(peerAddress.multiaddr), + ), ), ], limit: 1, @@ -52,10 +295,44 @@ const resolvePeer = (transport: Transport, address: Address) => return Array.head(peers).pipe(Option.map((peer) => peer.id)); }); +const resolvePeer = ( + cache: Cache.Cache< + HashableMultiaddr.HashableMultiaddr, + Option.Option, + TransportError + >, + address: Address, +) => + Effect.gen(function* () { + if (isPeerId(address)) { + return Option.some(address); + } + + const key = HashableMultiaddr.make(address); + const peerIdEither = yield* cache.getEither(key); + if (Either.isLeft(peerIdEither)) { + yield* Effect.logTrace("resolved peerID from cache"); + } else { + yield* Effect.logTrace("computed peerID"); + } + + const peerId = Either.merge(peerIdEither); + if (Option.isNone(peerId)) { + yield* Effect.logDebug( + "unable to resolve peer to a known peer ID, invalidating cache to retry next time", + ); + + yield* cache.invalidate(key); + } + + return peerId; + }).pipe(Effect.annotateLogs({ address })); + /** @internal */ export const connect = (transport: Transport, address: Address) => Effect.gen(function* () { - const peerId = yield* resolvePeer(transport, address); + const peerId = yield* resolvePeer(transport.services.state.cache, address); + if (Option.isSome(peerId)) { yield* Effect.logTrace( "peer has been dialed before, attempting to reuse connection", @@ -96,6 +373,22 @@ export const make = (config?: TransportConfig) => Effect.gen(function* () { const logger = yield* NetworkLogger.make(); + const clientDns = config?.dns?.resolver ?? defaultDns(); + const cache: Cache.Cache< + HashableMultiaddr.HashableMultiaddr, + Option.Option, + TransportError + > = yield* Cache.make< + HashableMultiaddr.HashableMultiaddr, + Option.Option, + TransportError + >({ + capacity: config?.dns?.cacheCapacity ?? 32, + timeToLive: config?.dns?.cacheTimeToLive ?? 5 * 60 * 1000, + // eslint-disable-next-line @typescript-eslint/no-use-before-define -- this is fine, because we're using it only after it's defined, as the cache accesses the transport + lookup: (address) => lookupPeer(transport, address), + }); + const acquire = Effect.tryPromise({ try: () => createLibp2p({ @@ -112,7 +405,9 @@ export const make = (config?: TransportConfig) => // (This is due to the fact that the implementation of the ping service has a while true loop, that will keep receiving data, so the timeout is not really a timeout) // see: https://github.com/libp2p/js-libp2p/blob/96654117c449603aed5b3c6668da29bdab44cff9/packages/protocol-ping/src/ping.ts#L66 ping: ping({ timeout: 60 * 1000 }), + state: () => ({ config: config ?? {}, dns: clientDns, cache }), }, + dns: clientDns, }), catch: (cause) => new InitializationError({ cause }), }); diff --git a/libs/@local/harpc/client/typescript/src/utils.ts b/libs/@local/harpc/client/typescript/src/utils.ts index bd8a82f864f..440828c9120 100644 --- a/libs/@local/harpc/client/typescript/src/utils.ts +++ b/libs/@local/harpc/client/typescript/src/utils.ts @@ -1,4 +1,4 @@ -import { Function, Record, Tuple } from "effect"; +import { Function, Hash, Record, Tuple } from "effect"; import type * as Buffer from "./wire-protocol/Buffer.js"; @@ -48,3 +48,34 @@ export const encodeDual: ( (self: U): (buffer: Buffer.WriteBuffer) => Buffer.WriteResult; (buffer: Buffer.WriteBuffer, self: U): Buffer.WriteResult; } = (closure) => Function.dual(2, closure as (...args: unknown[]) => unknown); + +export const hashUint8Array = (array: Uint8Array) => { + // same as array, so initial state is the same + let state = 6151; + + // we take the array in steps of 4, and then just hash the 4 bytes + const remainder = array.length % 4; + + // because they're just numbers and the safe integer range is 2^53 - 1, + // we can just take it in 32 bit chunks, which means we need to do less overall. + for (let i = 0; i < array.length - remainder; i += 4) { + const value = + // eslint-disable-next-line no-bitwise + array[i]! | + // eslint-disable-next-line no-bitwise + (array[i + 1]! << 8) | + // eslint-disable-next-line no-bitwise + (array[i + 2]! << 16) | + // eslint-disable-next-line no-bitwise + (array[i + 3]! << 24); + + state = Hash.combine(value)(state); + } + + // if there are any remaining bytes, we hash them as well + for (let i = array.length - remainder; i < array.length; i++) { + state = Hash.combine(array[i]!)(state); + } + + return Hash.optimize(state); +}; diff --git a/libs/@local/harpc/client/typescript/src/wire-protocol/models/Payload.ts b/libs/@local/harpc/client/typescript/src/wire-protocol/models/Payload.ts index 4ce741ca985..a5d57d1dc0b 100644 --- a/libs/@local/harpc/client/typescript/src/wire-protocol/models/Payload.ts +++ b/libs/@local/harpc/client/typescript/src/wire-protocol/models/Payload.ts @@ -11,7 +11,7 @@ import { } from "effect"; import { U16_MAX, U16_MIN } from "../../constants.js"; -import { createProto, encodeDual } from "../../utils.js"; +import { createProto, encodeDual, hashUint8Array } from "../../utils.js"; import * as Buffer from "../Buffer.js"; const TypeId: unique symbol = Symbol( @@ -51,37 +51,6 @@ const PayloadProto: Omit = { }, [Hash.symbol](this: Payload) { - const hashUint8Array = (array: Uint8Array) => { - // same as array, so initial state is the same - let state = 6151; - - // we take the array in steps of 4, and then just hash the 4 bytes - const remainder = array.length % 4; - - // because they're just numbers and the safe integer range is 2^53 - 1, - // we can just take it in 32 bit chunks, which means we need to do less overall. - for (let i = 0; i < array.length - remainder; i += 4) { - const value = - // eslint-disable-next-line no-bitwise - array[i]! | - // eslint-disable-next-line no-bitwise - (array[i + 1]! << 8) | - // eslint-disable-next-line no-bitwise - (array[i + 2]! << 16) | - // eslint-disable-next-line no-bitwise - (array[i + 3]! << 24); - - state = Hash.combine(value)(state); - } - - // if there are any remaining bytes, we hash them as well - for (let i = array.length - remainder; i < array.length; i++) { - state = Hash.combine(array[i]!)(state); - } - - return Hash.optimize(state); - }; - return pipe( Hash.hash(this[TypeId]), Hash.combine(hashUint8Array(this.buffer)), diff --git a/yarn.lock b/yarn.lock index 8ae88d87871..30e2f1937c6 100644 --- a/yarn.lock +++ b/yarn.lock @@ -4825,7 +4825,7 @@ __metadata: languageName: node linkType: hard -"@chainsafe/is-ip@npm:^2.0.1, @chainsafe/is-ip@npm:^2.0.2": +"@chainsafe/is-ip@npm:2.0.2, @chainsafe/is-ip@npm:^2.0.1, @chainsafe/is-ip@npm:^2.0.2": version: 2.0.2 resolution: "@chainsafe/is-ip@npm:2.0.2" checksum: 10c0/0bb8b9d0babe583642d31ffafad603ac5e5dc48884266feae57479d81f4e81ef903628527d81b39d5305657a957bf435bd2ef38b98a4526a7aab366febf793ad @@ -8692,6 +8692,7 @@ __metadata: version: 0.0.0-use.local resolution: "@local/harpc-client@workspace:libs/@local/harpc/client/typescript" dependencies: + "@chainsafe/is-ip": "npm:2.0.2" "@chainsafe/libp2p-noise": "npm:16.0.0" "@chainsafe/libp2p-yamux": "npm:7.0.1" "@effect/platform": "npm:0.70.4" @@ -8704,6 +8705,7 @@ __metadata: "@libp2p/tcp": "npm:10.0.13" "@local/eslint": "npm:0.0.0-private" "@local/tsconfig": "npm:0.0.0-private" + "@multiformats/dns": "npm:1.0.6" "@multiformats/multiaddr": "npm:12.3.4" "@rust/harpc-wire-protocol": "npm:0.0.0-private" "@types/node": "npm:22.10.1" @@ -9433,7 +9435,7 @@ __metadata: languageName: node linkType: hard -"@multiformats/dns@npm:^1.0.3, @multiformats/dns@npm:^1.0.6": +"@multiformats/dns@npm:1.0.6, @multiformats/dns@npm:^1.0.3, @multiformats/dns@npm:^1.0.6": version: 1.0.6 resolution: "@multiformats/dns@npm:1.0.6" dependencies: