Skip to content

Commit

Permalink
Merge pull request #42 from ably-forks/fix/broadcast-to-others
Browse files Browse the repository at this point in the history
[ECO-4977] Fix broadcast to others
  • Loading branch information
sacOO7 authored Sep 25, 2024
2 parents c50eba1 + ce52bfb commit b88c580
Show file tree
Hide file tree
Showing 9 changed files with 295 additions and 40 deletions.
36 changes: 31 additions & 5 deletions src/channel/ably/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ export const parseJwt = (jwtToken: string): { header: any; payload: any } => {
// Get Token Header
const base64HeaderUrl = jwtToken.split('.')[0];
const base64Header = base64HeaderUrl.replace('-', '+').replace('_', '/');
const header = JSON.parse(toText(base64Header));
const header = JSON.parse(fromBase64UrlEncoded(base64Header));
// Get Token payload
const base64Url = jwtToken.split('.')[1];
const base64 = base64Url.replace('-', '+').replace('_', '/');
const payload = JSON.parse(toText(base64));
const payload = JSON.parse(fromBase64UrlEncoded(base64));
return { header, payload };
};

Expand All @@ -33,11 +33,37 @@ export const toTokenDetails = (jwtToken: string): TokenDetails | any => {

const isBrowser = typeof window === 'object';

const toText = (base64: string) => {
/**
* Helper method to decode base64 url encoded string
* https://stackoverflow.com/a/78178053
* @param base64 base64 url encoded string
* @returns decoded text string
*/
export const fromBase64UrlEncoded = (base64: string): string => {
const base64Encoded = base64.replace(/-/g, '+').replace(/_/g, '/');
const padding = base64.length % 4 === 0 ? '' : '='.repeat(4 - (base64.length % 4));
const base64WithPadding = base64Encoded + padding;

if (isBrowser) {
return atob(base64WithPadding);
}
return Buffer.from(base64WithPadding, 'base64').toString();
};

/**
* Helper method to encode text into base64 url encoded string
* https://stackoverflow.com/a/78178053
* @param base64 text
* @returns base64 url encoded string
*/
export const toBase64UrlEncoded = (text: string): string => {
let encoded = ''
if (isBrowser) {
return atob(base64);
encoded = btoa(text);
} else {
encoded = Buffer.from(text).toString('base64');
}
return Buffer.from(base64, 'base64').toString('binary');
return encoded.replace(/\+/g, '-').replace(/\//g, '_').replace(/=+$/, '');
};

const isAbsoluteUrl = (url: string) => (url && url.indexOf('http://') === 0) || url.indexOf('https://') === 0;
Expand Down
8 changes: 7 additions & 1 deletion src/connector/ably-connector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Connector } from './connector';

import { AblyChannel, AblyPrivateChannel, AblyPresenceChannel, AblyAuth } from './../channel';
import { AblyRealtime, TokenDetails } from '../../typings/ably';
import { toBase64UrlEncoded } from '../channel/ably/utils';

/**
* This class creates a connector to Ably.
Expand Down Expand Up @@ -118,9 +119,14 @@ export class AblyConnector extends Connector {

/**
* Get the socket ID for the connection.
* For ably, returns base64 url encoded json with keys {connectionKey, clientId}
*/
socketId(): string {
return this.ably.connection.key;
let socketIdObject = {
connectionKey : this.ably.connection.key,
clientId : this.ably.auth.clientId ?? null,
}
return toBase64UrlEncoded(JSON.stringify(socketIdObject));
}

/**
Expand Down
1 change: 1 addition & 0 deletions src/echo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ export default class Echo {

/**
* Get the Socket ID for the connection.
* For ably, returns base64 url encoded json with keys {connectionKey, clientId}
*/
socketId(): string {
return this.connector.socketId();
Expand Down
4 changes: 2 additions & 2 deletions tests/ably/ably-user-login.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ describe('AblyUserLogin', () => {
});
});

test('user logs in without previous (guest) channels', async () => {
test('user logs in without previous (public) channels', async () => {
let connectionStates : Array<any>= []
// Initial clientId is null
expect(mockAuthServer.clientId).toBeNull();
Expand Down Expand Up @@ -70,7 +70,7 @@ describe('AblyUserLogin', () => {
expect(echo.connector.ablyAuth.existingToken().clientId).toBe('sacOO7@github.com');
});

test('user logs in with previous (guest) channels', async () => {
test('user logs in with previous (public) channels', async () => {
let connectionStates : Array<any>= []
let publicChannelStates : Array<any>= []

Expand Down
167 changes: 167 additions & 0 deletions tests/ably/ably-user-rest-publishing.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
import { setup, tearDown } from './setup/sandbox';
import Echo from '../../src/echo';
import { MockAuthServer } from './setup/mock-auth-server';
import { AblyChannel, AblyPrivateChannel } from '../../src/channel';
import * as Ably from 'ably';
import waitForExpect from 'wait-for-expect';
import { fromBase64UrlEncoded } from '../../src/channel/ably/utils';

jest.setTimeout(30000);
describe('AblyUserRestPublishing', () => {
let testApp: any;
let mockAuthServer: MockAuthServer;
let echoInstances: Array<Echo>;

beforeAll(async () => {
global.Ably = Ably;
testApp = await setup();
mockAuthServer = new MockAuthServer(testApp.keys[0].keyStr);
});

afterAll(async () => {
return await tearDown(testApp);
});

beforeEach(() => {
echoInstances = [];
})

afterEach((done) => {
let promises: Array<Promise<boolean>> = []
for (const echo of echoInstances) {
echo.disconnect();
const promise = new Promise<boolean>(res => {
echo.connector.ably.connection.once('closed', () => {
res(true);
});
})
promises.push(promise);
}
Promise.all(promises).then(_ => {
done();
})
});

async function getGuestUserChannel(channelName: string) {
mockAuthServer.clientId = null;
const guestUser = new Echo({
broadcaster: 'ably',
useTls: true,
environment: 'sandbox',
requestTokenFn: mockAuthServer.getSignedToken
});
echoInstances.push(guestUser);
const publicChannel = guestUser.channel(channelName) as AblyChannel;
await new Promise((resolve) => publicChannel.subscribed(resolve));
expect(guestUser.connector.ably.auth.clientId).toBeFalsy();
expect(guestUser.connector.ablyAuth.existingToken().clientId).toBeNull();
return publicChannel;
}

async function getLoggedInUserChannel(channelName: string) {
mockAuthServer.clientId = 'sacOO7@github.com';
const loggedInUser = new Echo({
broadcaster: 'ably',
useTls: true,
environment: 'sandbox',
requestTokenFn: mockAuthServer.getSignedToken
});
echoInstances.push(loggedInUser);
const privateChannel = loggedInUser.private(channelName) as AblyPrivateChannel;
await new Promise((resolve) => privateChannel.subscribed(resolve));
expect(loggedInUser.connector.ably.auth.clientId).toBe("sacOO7@github.com");
expect(loggedInUser.connector.ablyAuth.existingToken().clientId).toBe("sacOO7@github.com")
mockAuthServer.clientId = null;
return privateChannel;
}

test('Guest user return socketId as base64 encoded connectionkey and null clientId', async () => {
await getGuestUserChannel("dummyChannel");
const guestUser = echoInstances[0];
const socketIdObj = JSON.parse(fromBase64UrlEncoded(guestUser.socketId()));

const expectedConnectionKey = guestUser.connector.ably.connection.key;

expect(socketIdObj.connectionKey).toBe(expectedConnectionKey);
expect(socketIdObj.connectionKey).toBeTruthy();

expect(socketIdObj.clientId).toBeNull();
});

test('Guest user publishes message via rest API', async () => {
let messagesReceived: Array<string> = []
let channelName = "testChannel";

const publicChannel1 = await getGuestUserChannel(channelName);
publicChannel1.listenToAll((eventName, data) => {
messagesReceived.push(eventName);
});

const publicChannel2 = await getGuestUserChannel(channelName);
publicChannel2.listenToAll((eventName, data) => {
messagesReceived.push(eventName);
})

// Publish message to all clients
await mockAuthServer.broadcast(`public:${channelName}`, "testEvent", "mydata")
await waitForExpect(() => {
expect(messagesReceived.length).toBe(2);
expect(messagesReceived.filter(m => m == ".testEvent").length).toBe(2)
});

// Publish message to other client
messagesReceived = []
const firstClientSocketId = echoInstances[0].socketId();
await mockAuthServer.broadcastToOthers(firstClientSocketId,
{ channelName: `public:${channelName}`, eventName: "toOthers", payload: "data" })
await waitForExpect(() => {
expect(messagesReceived.length).toBe(1);
expect(messagesReceived.filter(m => m == ".toOthers").length).toBe(1);
});
});

test('Logged in user return socketId as base64 encoded connectionkey and clientId', async () => {
await getLoggedInUserChannel("dummyChannel");
const loggedInUser = echoInstances[0];
const socketIdObj = JSON.parse(fromBase64UrlEncoded(loggedInUser.socketId()));

const expectedConnectionKey = loggedInUser.connector.ably.connection.key;

expect(socketIdObj.connectionKey).toBe(expectedConnectionKey);
expect(socketIdObj.connectionKey).toBeTruthy();

expect(socketIdObj.clientId).toBe("sacOO7@github.com");
});

test('Logged in user publishes message via rest API', async () => {
let messagesReceived: Array<string> = []
let channelName = "testChannel";

const privateChannel1 = await getLoggedInUserChannel(channelName);
privateChannel1.listenToAll((eventName, data) => {
messagesReceived.push(eventName);
});

const privateChannel2 = await getLoggedInUserChannel(channelName);
privateChannel2.listenToAll((eventName, data) => {
messagesReceived.push(eventName);
})

// Publish message to all clients
await mockAuthServer.broadcast(`private:${channelName}`, "testEvent", "mydata")
await waitForExpect(() => {
expect(messagesReceived.length).toBe(2);
expect(messagesReceived.filter(m => m == ".testEvent").length).toBe(2);
});

// Publish message to other client
messagesReceived = []
const firstClientSocketId = echoInstances[0].socketId();
await mockAuthServer.broadcastToOthers(firstClientSocketId,
{ channelName: `private:${channelName}`, eventName: "toOthers", payload: "data" });
await waitForExpect(() => {
expect(messagesReceived.length).toBe(1);
expect(messagesReceived.filter(m => m == ".toOthers").length).toBe(1);
});
});
});
27 changes: 24 additions & 3 deletions tests/ably/setup/mock-auth-server.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,49 @@
import { isNullOrUndefinedOrEmpty, parseJwt } from '../../../src/channel/ably/utils';
import { isNullOrUndefinedOrEmpty, parseJwt, fromBase64UrlEncoded } from '../../../src/channel/ably/utils';
import * as Ably from 'ably/promises';
import * as jwt from 'jsonwebtoken';

type channels = Array<string>;

/**
* MockAuthServer mimicks {@link https://github.com/ably/laravel-broadcaster/blob/main/src/AblyBroadcaster.php AblyBroadcaster.php}.
* Aim is to keep implementation and behaviour in sync with AblyBroadcaster.php, so that it can be tested
* without running actual PHP server.
*/
export class MockAuthServer {
keyName: string;
keySecret: string;
ablyClient: Ably.Rest;
clientId: string | null = 'sacOO7@github.com';
userInfo = { id: 'sacOO7@github.com', name: 'sacOO7' };

shortLived: channels;
banned: channels;

userInfo = { id: 'sacOO7@github.com', name: 'sacOO7' }; // Used for presence

constructor(apiKey: string, environment = 'sandbox') {
const keys = apiKey.split(':');
this.keyName = keys[0];
this.keySecret = keys[1];
this.ablyClient = new Ably.Rest({ key: apiKey, environment });
}

/**
* Broadcast to all clients subscribed to given channel.
*/
broadcast = async (channelName: string, eventName: string, message: string) => {
await this.ablyClient.channels.get(channelName).publish(eventName, message);
await this.broadcastToOthers("", {channelName, eventName, payload: message});
};

/**
* Broadcast on behalf of a given realtime client.
*/
broadcastToOthers = async (socketId: string, {channelName, eventName, payload}) => {
let protoMsg = {name: eventName, data: payload};
if (!isNullOrUndefinedOrEmpty(socketId)) {
const socketIdObj = JSON.parse(fromBase64UrlEncoded(socketId));
protoMsg = {...protoMsg, ...socketIdObj}
}
await this.ablyClient.channels.get(channelName).publish(protoMsg);
};

tokenInvalidOrExpired = (serverTime, token) => {
Expand Down
4 changes: 2 additions & 2 deletions tests/ably/setup/sandbox.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { httpDeleteAsync, httpPostAsync, toBase64 } from './utils';
import { httpDeleteAsync, httpPostAsync, toBase64UrlEncoded } from './utils';

let sandboxUrl = 'https://sandbox-rest.ably.io/apps';

Expand All @@ -16,7 +16,7 @@ const creatNewApp = async () => {

const deleteApp = async (app) => {
let authKey = app.keys[0].keyStr;
const headers = { Authorization: 'Basic ' + toBase64(authKey) };
const headers = { Authorization: 'Basic ' + toBase64UrlEncoded(authKey) };
return await httpDeleteAsync(`${sandboxUrl}/${app.appId}`, headers);
};

Expand Down
5 changes: 2 additions & 3 deletions tests/ably/setup/utils.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { httpRequestAsync } from '../../../src/channel/ably/utils';
import { httpRequestAsync} from '../../../src/channel/ably/utils';
export { toBase64UrlEncoded } from '../../../src/channel/ably/utils';

const safeAssert = (assertions: Function, done: Function, finalAssertion = false) => {
try {
Expand All @@ -17,8 +18,6 @@ export const execute = (fn: Function, times: number) => {
}
};

export const toBase64 = (text: string) => Buffer.from(text, 'binary').toString('base64');

export const httpPostAsync = async (url: string, postData: any) => {
postData = JSON.stringify(postData);
let postOptions = {
Expand Down
Loading

0 comments on commit b88c580

Please sign in to comment.