Skip to content

Commit

Permalink
add subscribepayload to hooks and change context and schema arguments
Browse files Browse the repository at this point in the history
  • Loading branch information
enisdenjo committed Jan 15, 2025
1 parent 6ffc3a3 commit 8d30500
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 56 deletions.
67 changes: 53 additions & 14 deletions .changeset/strange-ties-mix.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,50 @@
'graphql-ws': major
---

`onSubscribe`, `onOperation`, `onError`, `onNext` and `onComplete` hooks don't have the full accompanying message anymore, only the ID and the relevant part from the message
`schema`, `context`, `onSubscribe`, `onOperation`, `onError`, `onNext` and `onComplete` hooks don't have the full accompanying message anymore, only the ID and the relevant part from the message

There is really no need to pass the full `SubscribeMessage` to the `onSubscribe` hook. The only relevant parts from the message are the `id` and the `payload`, the `type` is useless since the hook inherently has it (`onNext` is `next` type, `onError` is `error` type, etc).

The actual techincal reason for not having the full message is to avoid serialising results and errors twice. Both `onNext` and `onError` allow the user to augment the result and return it to be used instead. `onNext` originally had the `NextMessage` argument which already has the `FormattedExecutionResult`, and `onError` originally had the `ErrorMessage` argument which already has the `GraphQLFormattedError`, and they both also returned `FormattedExecutionResult` and `GraphQLFormattedError` respectivelly - meaning, if the user serialised the results - the serialisation would happen **twice**.

Additionally, the `onOperation`, `onError`, `onNext` and `onComplete` now have the `payload` which is the `SubscribeMessage.payload` (`SubscribePayload`) for easier access to the original query as well as execution params extensions.

### Migrating from v5 to v6

#### `schema`

```diff
import { ServerOptions, SubscribePayload } from 'graphql-ws';

const opts: ServerOptions = {
- schema(ctx, message) {
- const messageId = message.id;
- const messagePayload: SubscribePayload = message.payload;
- },
+ schema(ctx, id, payload) {
+ const messageId = id;
+ const messagePayload: SubscribePayload = payload;
+ },
};
```

#### `context`

```diff
import { ServerOptions, SubscribePayload } from 'graphql-ws';

const opts: ServerOptions = {
- context(ctx, message) {
- const messageId = message.id;
- const messagePayload: SubscribePayload = message.payload;
- },
+ context(ctx, id, payload) {
+ const messageId = id;
+ const messagePayload: SubscribePayload = payload;
+ },
};
```

#### `onSubscribe`

```diff
Expand Down Expand Up @@ -40,9 +76,9 @@ const opts: ServerOptions = {
- const messageId = message.id;
- const messagePayload: SubscribePayload = message.payload;
- },
+ onOperation(ctx, id, args) {
+ onOperation(ctx, id, payload) {
+ const messageId = id;
+ const executionArgs: ExecutionArgs = args;
+ const messagePayload: SubscribePayload = payload;
+ },
};
```
Expand All @@ -53,18 +89,19 @@ The `ErrorMessage.payload` (`GraphQLFormattedError[]`) is not useful here at all

```diff
import { GraphQLError, GraphQLFormattedError } from 'graphql';
import { ServerOptions } from 'graphql-ws';
import { ServerOptions, SubscribePayload } from 'graphql-ws';

const opts: ServerOptions = {
- onError(ctx, message, errors) {
- const messageId = message.id;
- const graphqlErrors: readonly GraphQLError[] = errors;
- const messagePayload: readonly GraphQLFormattedError[] = message.payload;
- const errorMessagePayload: readonly GraphQLFormattedError[] = message.payload;
- },
+ onError(ctx, id, errors) {
+ onError(ctx, id, payload, errors) {
+ const messageId = id;
+ const graphqlErrors: readonly GraphQLError[] = errors;
+ const messagePayload: readonly GraphQLFormattedError[] = errors.map((e) => e.toJSON());
+ const subscribeMessagePayload: SubscribePayload = payload;
+ const errorMessagePayload: readonly GraphQLFormattedError[] = errors.map((e) => e.toJSON());
+ },
};
```
Expand All @@ -75,33 +112,35 @@ The `NextMessage.payload` (`FormattedExecutionResult`) is not useful here at all

```diff
import { ExecutionResult, FormattedExecutionResult } from 'graphql';
import { ServerOptions } from 'graphql-ws';
import { ServerOptions, SubscribePayload } from 'graphql-ws';

const opts: ServerOptions = {
- onNext(ctx, message, result) {
- onNext(ctx, message, _args, result) {
- const messageId = message.id;
- const graphqlResult: ExecutionResult = result;
- const messagePayload: FormattedExecutionResult = message.payload;
- const nextMessagePayload: FormattedExecutionResult = message.payload;
- },
+ onNext(ctx, id, result) {
+ onNext(ctx, id, payload, _args, result) {
+ const messageId = id;
+ const graphqlResult: ExecutionResult = result;
+ const messagePayload: FormattedExecutionResult = { ...result, errors: result.errors?.map((e) => e.toJSON()) };
+ const subscribeMessagePayload: SubscribePayload = payload;
+ const nextMessagePayload: FormattedExecutionResult = { ...result, errors: result.errors?.map((e) => e.toJSON()) };
+ },
};
```

#### `onComplete`

```diff
import { ServerOptions } from 'graphql-ws';
import { ServerOptions, SubscribePayload } from 'graphql-ws';

const opts: ServerOptions = {
- onComplete(ctx, message) {
- const messageId = message.id;
- },
+ onComplete(ctx, id) {
+ onComplete(ctx, id, payload) {
+ const messageId = id;
+ const subscribeMessagePayload: SubscribePayload = payload;
+ },
};
```
99 changes: 63 additions & 36 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ export interface ServerOptions<
| GraphQLSchema
| ((
ctx: Context<P, E>,
message: SubscribeMessage,
id: string,
payload: SubscribePayload,
args: Omit<ExecutionArgs, 'schema'>,
) => Promise<GraphQLSchema> | GraphQLSchema);
/**
Expand All @@ -123,7 +124,8 @@ export interface ServerOptions<
| GraphQLExecutionContextValue
| ((
ctx: Context<P, E>,
message: SubscribeMessage,
id: string,
payload: SubscribePayload,
args: ExecutionArgs,
) =>
| Promise<GraphQLExecutionContextValue>
Expand Down Expand Up @@ -333,6 +335,7 @@ export interface ServerOptions<
| ((
ctx: Context<P, E>,
id: string,
payload: SubscribePayload,
args: ExecutionArgs,
result: OperationResult,
) => Promise<OperationResult | void> | OperationResult | void);
Expand All @@ -354,6 +357,7 @@ export interface ServerOptions<
| ((
ctx: Context<P, E>,
id: string,
payload: SubscribePayload,
errors: readonly GraphQLError[],
) =>
| Promise<readonly GraphQLFormattedError[] | void>
Expand All @@ -378,6 +382,7 @@ export interface ServerOptions<
| ((
ctx: Context<P, E>,
id: string,
payload: SubscribePayload,
args: ExecutionArgs,
result: ExecutionResult | ExecutionPatchResult,
) =>
Expand All @@ -402,7 +407,11 @@ export interface ServerOptions<
*/
onComplete?:
| undefined
| ((ctx: Context<P, E>, id: string) => Promise<void> | void);
| ((
ctx: Context<P, E>,
id: string,
payload: SubscribePayload,
) => Promise<void> | void);
/**
* An optional override for the JSON.parse function used to hydrate
* incoming messages to this server. Useful for parsing custom datatypes
Expand Down Expand Up @@ -690,10 +699,17 @@ export function makeServer<
const emit = {
next: async (
result: ExecutionResult | ExecutionPatchResult,
{ id, payload }: SubscribeMessage,
args: ExecutionArgs,
) => {
const { errors, ...resultWithoutErrors } = result;
const maybeResult = await onNext?.(ctx, id, args, result);
const maybeResult = await onNext?.(
ctx,
id,
payload,
args,
result,
);
await socket.send(
stringifyMessage<MessageType.Next>(
{
Expand All @@ -711,8 +727,11 @@ export function makeServer<
),
);
},
error: async (errors: readonly GraphQLError[]) => {
const maybeErrors = await onError?.(ctx, id, errors);
error: async (
errors: readonly GraphQLError[],
{ id, payload }: SubscribeMessage,
) => {
const maybeErrors = await onError?.(ctx, id, payload, errors);
await socket.send(
stringifyMessage<MessageType.Error>(
{
Expand All @@ -724,8 +743,11 @@ export function makeServer<
),
);
},
complete: async (notifyClient: boolean) => {
await onComplete?.(ctx, id);
complete: async (
notifyClient: boolean,
{ id, payload }: SubscribeMessage,
) => {
await onComplete?.(ctx, id, payload);
if (notifyClient)
await socket.send(
stringifyMessage<MessageType.Complete>(
Expand All @@ -749,7 +771,7 @@ export function makeServer<
if (maybeExecArgsOrErrors) {
if (areGraphQLErrors(maybeExecArgsOrErrors))
return id in ctx.subscriptions
? await emit.error(maybeExecArgsOrErrors)
? await emit.error(maybeExecArgsOrErrors, message)
: void 0;
else if (Array.isArray(maybeExecArgsOrErrors))
throw new Error(
Expand All @@ -772,7 +794,7 @@ export function makeServer<
...args,
schema:
typeof schema === 'function'
? await schema(ctx, message, args)
? await schema(ctx, id, payload, args)
: schema,
};
const validationErrors = (validate ?? graphqlValidate)(
Expand All @@ -781,7 +803,7 @@ export function makeServer<
);
if (validationErrors.length > 0)
return id in ctx.subscriptions
? await emit.error(validationErrors)
? await emit.error(validationErrors, message)
: void 0;
}

Expand All @@ -791,9 +813,10 @@ export function makeServer<
);
if (!operationAST)
return id in ctx.subscriptions
? await emit.error([
new GraphQLError('Unable to identify operation'),
])
? await emit.error(
[new GraphQLError('Unable to identify operation')],
message,
)
: void 0;

// if `onSubscribe` didn't specify a rootValue, inject one
Expand All @@ -804,7 +827,7 @@ export function makeServer<
if (!('contextValue' in execArgs))
execArgs.contextValue =
typeof context === 'function'
? await context(ctx, message, execArgs)
? await context(ctx, id, payload, execArgs)
: context;

// the execution arguments have been prepared
Expand All @@ -820,7 +843,8 @@ export function makeServer<

const maybeResult = await onOperation?.(
ctx,
message.id,
id,
payload,
execArgs,
operationResult,
);
Expand All @@ -836,41 +860,44 @@ export function makeServer<
ctx.subscriptions[id] = operationResult;
try {
for await (const result of operationResult) {
await emit.next(result, execArgs);
await emit.next(result, message, execArgs);
}
} catch (err) {
const originalError =
err instanceof Error ? err : new Error(String(err));
await emit.error([
versionInfo.major >= 16
? new GraphQLError(
originalError.message,
// @ts-ignore graphql@15 and less dont have the second arg as object (version is ensured by versionInfo.major check above)
{ originalError },
)
: // versionInfo.major <= 15
new GraphQLError(
originalError.message,
null,
null,
null,
null,
originalError,
),
]);
await emit.error(
[
versionInfo.major >= 16
? new GraphQLError(
originalError.message,
// @ts-ignore graphql@15 and less dont have the second arg as object (version is ensured by versionInfo.major check above)
{ originalError },
)
: // versionInfo.major <= 15
new GraphQLError(
originalError.message,
null,
null,
null,
null,
originalError,
),
],
message,
);
}
}
} else {
/** single emitted result */
// if the client completed the subscription before the single result
// became available, he effectively canceled it and no data should be sent
if (id in ctx.subscriptions)
await emit.next(operationResult, execArgs);
await emit.next(operationResult, message, execArgs);
}

// lack of subscription at this point indicates that the client
// completed the subscription, he doesn't need to be reminded
await emit.complete(id in ctx.subscriptions);
await emit.complete(id in ctx.subscriptions, message);
} finally {
// whatever happens to the subscription, we finally want to get rid of the reservation
delete ctx.subscriptions[id];
Expand Down
6 changes: 3 additions & 3 deletions tests/server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ it('should use the schema resolved from a promise on subscribe', async ({
const schema = new GraphQLSchema(schemaConfig);

const { url } = await startTServer({
schema: (_, msg) => {
expect(msg.id).toBe('1');
schema: (_, id) => {
expect(id).toBe('1');
return Promise.resolve(schema);
},
execute: (args) => {
Expand Down Expand Up @@ -2138,7 +2138,7 @@ describe.concurrent('Disconnect/close', () => {
waitForComplete,
waitForClientClose,
} = await startTServer({
onOperation(_ctx, _msg, _args, result) {
onOperation(_ctx, _id, _msg, _args, result) {
const origReturn = (result as AsyncGenerator).return;
(result as AsyncGenerator).return = async (...args) => {
if (++i === 1) {
Expand Down
Loading

0 comments on commit 8d30500

Please sign in to comment.