Skip to content

Commit

Permalink
feat: add fast header support
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Jul 17, 2024
1 parent ab0cbcf commit f1ef894
Show file tree
Hide file tree
Showing 9 changed files with 270 additions and 218 deletions.
7 changes: 3 additions & 4 deletions src/event/event-handler.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
const messageBuilder = require('../message/message-builder')
const messageParser = require('../message/message-parser')
const C = require('../constants/constants')
const MulticastListener = require('../utils/multicast-listener')
const UnicastListener = require('../utils/unicast-listener')
const EventEmitter = require('component-emitter2')
const Message = require('../message/message')
const rxjs = require('rxjs')

const EventHandler = function (options, connection, client) {
Expand Down Expand Up @@ -105,7 +104,7 @@ EventHandler.prototype.emit = function (name, data) {
throw new Error('invalid argument name')
}

this._connection.sendMsg(C.TOPIC.EVENT, C.ACTIONS.EVENT, [name, messageBuilder.typed(data)])
this._connection.sendMsg(C.TOPIC.EVENT, C.ACTIONS.EVENT, [name, Message.encodeTyped(data)])
this._emitter.emit(name, data)
this._stats.emitted += 1
}
Expand Down Expand Up @@ -143,7 +142,7 @@ EventHandler.prototype._$handle = function (message) {

if (message.action === C.ACTIONS.EVENT) {
if (message.data && message.data.length === 2) {
this._emitter.emit(name, messageParser.convertTyped(data, this._client))
this._emitter.emit(name, Message.decodeTyped(data, this._client))
} else {
this._emitter.emit(name)
}
Expand Down
85 changes: 37 additions & 48 deletions src/message/connection.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
const BrowserWebSocket = globalThis.WebSocket || globalThis.MozWebSocket
const utils = require('../utils/utils')
const NodeWebSocket = utils.isNode ? require('ws') : null
const messageParser = require('./message-parser')
const messageBuilder = require('./message-builder')
const Message = require('./message')
const C = require('../constants/constants')
const pkg = require('../../package.json')
const xxhash = require('xxhash-wasm')
Expand Down Expand Up @@ -79,15 +78,15 @@ Connection.prototype.authenticate = function (authParams, callback) {
}

Connection.prototype.sendMsg = function (topic, action, data) {
return this.send(messageBuilder.getMsg(topic, action, data))
return this.send(Message.encode(topic, action, data))
}

Connection.prototype.sendMsg1 = function (topic, action, p0) {
return this.send(messageBuilder.getMsg1(topic, action, p0))
return this.send(Message.encode(topic, action, [p0]))
}

Connection.prototype.sendMsg2 = function (topic, action, p0, p1) {
return this.send(messageBuilder.getMsg2(topic, action, p0, p1))
return this.send(Message.encode(topic, action, [p0, p1]))
}

Connection.prototype.close = function () {
Expand All @@ -101,19 +100,22 @@ Connection.prototype.close = function () {
}

Connection.prototype._createEndpoint = function () {
this._endpoint = NodeWebSocket
? new NodeWebSocket(this._url, {
generateMask() {},
})
: new BrowserWebSocket(this._url)
if (NodeWebSocket) {
this._endpoint = new NodeWebSocket(this._url, {
generateMask() {},
})
} else {
this._endpoint = new BrowserWebSocket(this._url)
this._endpoint.binaryType = 'arraybuffer'
}
this._corked = false

this._endpoint.onopen = this._onOpen.bind(this)
this._endpoint.onerror = this._onError.bind(this)
this._endpoint.onclose = this._onClose.bind(this)
this._endpoint.onmessage = BrowserWebSocket
? ({ data }) => this._onMessage(typeof data === 'string' ? data : Buffer.from(data).toString())
: ({ data }) => this._onMessage(typeof data === 'string' ? data : data.toString())
? ({ data }) => this._onMessage(Buffer.from(data))
: ({ data }) => this._onMessage(data)
}

Connection.prototype.send = function (message) {
Expand Down Expand Up @@ -172,14 +174,15 @@ Connection.prototype._submit = function (message) {

Connection.prototype._sendAuthParams = function () {
this._setState(C.CONNECTION_STATE.AUTHENTICATING)
const authMessage = messageBuilder.getMsg(C.TOPIC.AUTH, C.ACTIONS.REQUEST, [
this._authParams,
pkg.version,
utils.isNode
? `Node/${process.version}`
: globalThis.navigator && globalThis.navigator.userAgent,
])
this._submit(authMessage)
this._submit(
Message.encode(C.TOPIC.AUTH, C.ACTIONS.REQUEST, [
this._authParams,
pkg.version,
utils.isNode
? `Node/${process.version}`
: globalThis.navigator && globalThis.navigator.userAgent,
])
)
}

Connection.prototype._onOpen = function () {
Expand Down Expand Up @@ -219,13 +222,11 @@ Connection.prototype._onClose = function () {
}
}

Connection.prototype._onMessage = function (data) {
// Remove MESSAGE_SEPERATOR if exists.
if (data.charCodeAt(data.length - 1) === 30) {
data = data.slice(0, -1)
Connection.prototype._onMessage = function (raw) {
if (raw.length <= 2) {
return
}

this._recvQueue.push(data)
this._recvQueue.push(Message.decode(raw))
if (!this._processingRecv) {
this._processingRecv = true
this._schedule(this._recvMessages)
Expand All @@ -245,24 +246,14 @@ Connection.prototype._recvMessages = function (deadline) {
return
}

if (message.length <= 2) {
continue
}

if (this._logger) {
this._logger.trace(message, 'receive')
}

messageParser.parseMessage(message, this._client, this._message)
this.emit('recv', message)

this.emit('recv', this._message)

if (this._message.topic === C.TOPIC.CONNECTION) {
this._handleConnectionResponse(this._message)
} else if (this._message.topic === C.TOPIC.AUTH) {
this._handleAuthResponse(this._message)
if (message.topic === C.TOPIC.CONNECTION) {
this._handleConnectionResponse(message)
} else if (message.topic === C.TOPIC.AUTH) {
this._handleAuthResponse(message)
} else {
this._client._$onMessage(this._message)
this._client._$onMessage(message)
}
}

Expand All @@ -271,17 +262,15 @@ Connection.prototype._recvMessages = function (deadline) {

Connection.prototype._handleConnectionResponse = function (message) {
if (message.action === C.ACTIONS.PING) {
this._submit(messageBuilder.getMsg(C.TOPIC.CONNECTION, C.ACTIONS.PONG))
this._submit(Message.encode(C.TOPIC.CONNECTION, C.ACTIONS.PONG))
} else if (message.action === C.ACTIONS.ACK) {
this._setState(C.CONNECTION_STATE.AWAITING_AUTHENTICATION)
if (this._authParams) {
this._sendAuthParams()
}
} else if (message.action === C.ACTIONS.CHALLENGE) {
this._setState(C.CONNECTION_STATE.CHALLENGING)
this._submit(
messageBuilder.getMsg(C.TOPIC.CONNECTION, C.ACTIONS.CHALLENGE_RESPONSE, [this._url])
)
this._submit(Message.encode(C.TOPIC.CONNECTION, C.ACTIONS.CHALLENGE_RESPONSE, [this._url]))
} else if (message.action === C.ACTIONS.REJECTION) {
this._challengeDenied = true
this.close()
Expand Down Expand Up @@ -316,10 +305,10 @@ Connection.prototype._handleAuthResponse = function (message) {
}

Connection.prototype._getAuthData = function (data) {
if (data === undefined) {
if (!data) {
return null
} else {
return messageParser.convertTyped(data, this._client)
return Message.decodeTyped(data, this._client)
}
}

Expand Down
65 changes: 0 additions & 65 deletions src/message/message-builder.js

This file was deleted.

91 changes: 0 additions & 91 deletions src/message/message-parser.js

This file was deleted.

Loading

0 comments on commit f1ef894

Please sign in to comment.