Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Commit

Permalink
Merge branch 'hotfix/5.0.1' of https://github.com/LiskHQ/lisk-sdk
Browse files Browse the repository at this point in the history
  • Loading branch information
shuse2 committed Dec 14, 2020
2 parents eebe4d6 + 2d53924 commit a018d7f
Show file tree
Hide file tree
Showing 39 changed files with 669 additions and 261 deletions.
2 changes: 1 addition & 1 deletion elements/lisk-api-client/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@liskhq/lisk-api-client",
"version": "5.0.0",
"version": "5.0.1-alpha.0",
"description": "An API client for the Lisk network",
"author": "Lisk Foundation <admin@lisk.io>, lightcurve GmbH <admin@lightcurve.io>",
"license": "Apache-2.0",
Expand Down
35 changes: 23 additions & 12 deletions elements/lisk-api-client/src/ipc_channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,14 @@ import * as axon from 'pm2-axon';
import { PubSocket, PullSocket, PushSocket, SubSocket, ReqSocket } from 'pm2-axon';
import { Client as RPCClient } from 'pm2-axon-rpc';
import { EventEmitter } from 'events';
import { Channel, EventCallback, JSONRPCNotification, JSONRPCResponse } from './types';
import {
Channel,
EventCallback,
JSONRPCNotification,
JSONRPCResponse,
JSONRPCError,
} from './types';
import { convertRPCError } from './utils';

const CONNECTION_TIME_OUT = 2000;

Expand Down Expand Up @@ -139,17 +146,21 @@ export class IPCChannel implements Channel {
params: params ?? {},
};
return new Promise((resolve, reject) => {
this._rpcClient.call('invoke', action, (err: Error | undefined, data: JSONRPCResponse<T>) => {
if (err) {
reject(err);
return;
}
if (data.error) {
reject(err);
return;
}
resolve(data.result);
});
this._rpcClient.call(
'invoke',
action,
(err: JSONRPCError | undefined, data: JSONRPCResponse<T>) => {
if (err) {
reject(convertRPCError(err));
return;
}
if (data.error) {
reject(convertRPCError(data.error));
return;
}
resolve(data.result);
},
);
});
}

Expand Down
8 changes: 7 additions & 1 deletion elements/lisk-api-client/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,18 @@ export interface JSONRPCNotification<T> {
readonly params?: T;
}

export interface JSONRPCError {
code: number;
message: string;
data?: string | number | boolean | Record<string, unknown>;
}

export interface JSONRPCResponse<T> {
readonly id: number;
readonly jsonrpc: string;
readonly method: never;
readonly params: never;
readonly error?: { code: number; message: string; data?: string };
readonly error?: JSONRPCError;
readonly result?: T;
}

Expand Down
19 changes: 19 additions & 0 deletions elements/lisk-api-client/src/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright © 2020 Lisk Foundation
*
* See the LICENSE file at the top-level directory of this distribution
* for licensing information.
*
* Unless otherwise agreed in a custom licensing agreement with the Lisk Foundation,
* no part of this software, including this file, may be copied, modified,
* propagated, or distributed except according to the terms contained in the
* LICENSE file.
*
* Removal or modification of this copyright notice is prohibited.
*
*/

import { JSONRPCError } from './types';

export const convertRPCError = (error: JSONRPCError): Error =>
new Error(typeof error.data === 'string' ? error.data : error.message);
106 changes: 62 additions & 44 deletions elements/lisk-api-client/src/ws_channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
import * as WebSocket from 'isomorphic-ws';
import { EventEmitter } from 'events';
import { JSONRPCMessage, JSONRPCNotification, EventCallback } from './types';
import { convertRPCError } from './utils';

const CONNECTION_TIMEOUT = 2000;
const ACKNOWLEDGMENT_TIMEOUT = 2000;
const RESPONSE_TIMEOUT = 3000;

const timeout = async <T = void>(ms: number, message?: string): Promise<T> =>
Expand Down Expand Up @@ -69,80 +69,90 @@ export class WSChannel {

public async connect(): Promise<void> {
this._ws = new WebSocket(this._url);
this._ws.onclose = this._handleClose.bind(this);
this._ws.onmessage = this._handleMessage.bind(this);
this._ws.addEventListener('ping', this._handlePing.bind(this));

const connect = new Promise<void>(resolve => {
this._ws?.on('open', () => {
const connectHandler = new Promise<void>(resolve => {
const onOpen = () => {
this.isAlive = true;
this._ws?.removeEventListener('open', onOpen);
resolve();
});
});
};

const error = new Promise<void>((_, reject) => {
this._ws?.on('error', err => {
this.isAlive = false;
reject(err);
});
this._ws?.addEventListener('open', onOpen);
});

await Promise.race([
connect,
error,
timeout(CONNECTION_TIMEOUT, `Could not connect in ${CONNECTION_TIMEOUT}ms`),
]);
const errorHandler = new Promise<void>((_, reject) => {
const onError = (error: WebSocket.ErrorEvent) => {
this.isAlive = false;
this._ws?.removeEventListener('error', onError);
reject(error.error);
};

this._ws.on('ping', () => {
this.isAlive = true;
this._ws?.addEventListener('error', onError);
});

this._ws.on('message', data => {
this._handleMessage(data as string);
});
try {
await Promise.race([
connectHandler,
errorHandler,
timeout(CONNECTION_TIMEOUT, `Could not connect in ${CONNECTION_TIMEOUT}ms`),
]);
} catch (err) {
this._ws.close();

throw err;
}
}

public async disconnect(): Promise<void> {
this._requestCounter = 0;
this._pendingRequests = {};

if (!this._ws) {
return Promise.resolve();
return;
}

if (this._ws.readyState === WebSocket.CLOSED) {
this.isAlive = false;
this._ws = undefined;
return;
}

return new Promise<void>(resolve => {
this._ws?.on('close', () => {
this._ws?.terminate();
const closeHandler = new Promise<void>(resolve => {
const onClose = () => {
this.isAlive = false;
this._ws = undefined;
this._ws?.removeEventListener('close', onClose);
resolve();
});
this._ws?.close();
};

this._ws?.addEventListener('close', onClose);
});

this._ws.close();
await Promise.race([
closeHandler,
timeout(CONNECTION_TIMEOUT, `Could not disconnect in ${CONNECTION_TIMEOUT}ms`),
]);
}

public async invoke<T = Record<string, unknown>>(
actionName: string,
params?: Record<string, unknown>,
): Promise<T> {
if (!this.isAlive) {
throw new Error('Websocket client is not connected.');
}

const request = {
jsonrpc: '2.0',
id: this._requestCounter,
method: actionName,
params: params ?? {},
};

const send = new Promise((resolve, reject) => {
this._ws?.send(JSON.stringify(request), (err): void => {
if (err) {
return reject(err);
}

return resolve();
});
});

await Promise.race([
send,
timeout(ACKNOWLEDGMENT_TIMEOUT, `Request is not acknowledged in ${ACKNOWLEDGMENT_TIMEOUT}ms`),
]);
this._ws?.send(JSON.stringify(request));

const response = defer<T>();
this._pendingRequests[this._requestCounter] = response;
Expand All @@ -159,8 +169,16 @@ export class WSChannel {
this._emitter.on(eventName, cb);
}

private _handleMessage(message: string): void {
const res = JSON.parse(message) as JSONRPCMessage<unknown>;
private _handleClose(): void {
this.isAlive = false;
}

private _handlePing(): void {
this.isAlive = true;
}

private _handleMessage(event: WebSocket.MessageEvent): void {
const res = JSON.parse(event.data as string) as JSONRPCMessage<unknown>;

// Its an event
if (messageIsNotification(res)) {
Expand All @@ -172,7 +190,7 @@ export class WSChannel {

if (this._pendingRequests[id]) {
if (res.error) {
this._pendingRequests[id].reject(new Error(res.error.data ?? res.error.data));
this._pendingRequests[id].reject(convertRPCError(res.error));
} else {
this._pendingRequests[id].resolve(res.result);
}
Expand Down
43 changes: 31 additions & 12 deletions elements/lisk-api-client/test/integration/ws_channel.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,24 @@
* Removal or modification of this copyright notice is prohibited.
*/

import { createServer, Server } from 'http';
import * as WebSocket from 'isomorphic-ws';
import { WSChannel } from '../../src/ws_channel';

jest.unmock('isomorphic-ws');

const closeServer = async (server: WebSocket.Server | Server): Promise<void> => {
return new Promise((resolve, reject) => {
server.close(error => {
if (error) {
return reject(error);
}

return resolve();
});
});
};

describe('WSChannel', () => {
describe('connect', () => {
it('should be connect to ws server', async () => {
Expand All @@ -28,36 +41,42 @@ describe('WSChannel', () => {
expect(server.clients.size).toEqual(1);
expect([...server.clients][0].readyState).toEqual(WebSocket.OPEN);
} finally {
server.close();
await closeServer(server);
}
expect.assertions(3);
});

it('should timeout if ws server not responding', async () => {
const verifyClient = (_: any, done: (result: boolean) => void) => {
// Take more time to accept connection
const http = createServer();
const server = new WebSocket.Server({ path: '/my-path', noServer: true });

// https://github.com/websockets/ws/issues/377#issuecomment-462152231
http.on('upgrade', (request, socket, head) => {
setTimeout(() => {
done(true);
server.handleUpgrade(request, socket, head, ws => {
server.emit('connection', ws, request);
});
}, 3000);
};
const server = new WebSocket.Server({ path: '/my-path', port: 65535, verifyClient });
});

http.listen(65535);

const channel = new WSChannel('ws://localhost:65535/my-path');

try {
await expect(channel.connect()).rejects.toThrow('Could not connect in 2000ms');
expect(server.clients.size).toEqual(0);
} finally {
// TODO: Found that unless we disconnect channel, sever.close keep open handles.
await channel.disconnect();
server.close();
await closeServer(server);
await closeServer(http);
}
expect.assertions(2);
}, 5000);

it('should throw error if server is not running', async () => {
const channel = new WSChannel('ws://localhost:65535/my-path');
const channel = new WSChannel('ws://localhost:65534/my-path');

await expect(channel.connect()).rejects.toThrow('connect ECONNREFUSED 127.0.0.1:65535');
await expect(channel.connect()).rejects.toThrow('connect ECONNREFUSED 127.0.0.1:65534');
});
});

Expand All @@ -74,7 +93,7 @@ describe('WSChannel', () => {
expect(server.clients.size).toEqual(1);
expect([...server.clients][0].readyState).toEqual(WebSocket.CLOSING);
} finally {
server.close();
await closeServer(server);
}
expect.assertions(3);
});
Expand Down
15 changes: 12 additions & 3 deletions elements/lisk-api-client/test/unit/__mocks__/isomorphic-ws.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,22 @@ const { EventEmitter } = require('events');

class WebSocket extends EventEmitter {
// eslint-disable-next-line @typescript-eslint/explicit-member-accessibility
send(message, cb) {
send(message) {
const data = JSON.parse(message);
cb();
setTimeout(() => {
this.emit('message', JSON.stringify({ ...data, result: message }));
this.onmessage({ data: JSON.stringify({ ...data, result: message }) });
}, 100);
}

// eslint-disable-next-line @typescript-eslint/explicit-member-accessibility
addEventListener(event, cb) {
this.prependListener(event, cb);
}

// eslint-disable-next-line @typescript-eslint/explicit-member-accessibility
removeEventListener(event, cb) {
this.removeListener(event, cb);
}
}

module.exports = WebSocket;
Loading

0 comments on commit a018d7f

Please sign in to comment.