From cf27eb35c3aa8c0642345ff5427f089cf3b6a405 Mon Sep 17 00:00:00 2001 From: Akhilesh Thite Date: Fri, 18 Oct 2024 15:48:07 -0700 Subject: [PATCH 1/2] feat: implement hyper chat application and enhance hyper-handler --- package-lock.json | 24 ++- package.json | 3 + src/pages/p2p/chat/app.js | 162 ++++++++++++++++ src/pages/p2p/chat/index.html | 238 +++++++++++++++++++---- src/pages/p2p/chat/lib.js | 91 --------- src/pages/p2p/chat/pubsub.js | 76 -------- src/pages/p2p/chat/style.css | 36 ---- src/protocols/hyper-handler.js | 340 ++++++++++++++++++++++++++++----- 8 files changed, 673 insertions(+), 297 deletions(-) create mode 100644 src/pages/p2p/chat/app.js delete mode 100644 src/pages/p2p/chat/lib.js delete mode 100644 src/pages/p2p/chat/pubsub.js delete mode 100644 src/pages/p2p/chat/style.css diff --git a/package-lock.json b/package-lock.json index 3b81bfd..36f24f8 100644 --- a/package-lock.json +++ b/package-lock.json @@ -21,13 +21,16 @@ "@libp2p/tcp": "^9.0.26", "@libp2p/webrtc": "^4.0.33", "@libp2p/websockets": "^8.0.24", + "b4a": "^1.6.7", "content-type": "^1.0.5", "electron-updater": "^6.2.1", "find-process": "^1.4.7", "fs-extra": "^11.2.0", "helia": "^4.2.1", "hyper-sdk": "^5.0.0", + "hypercore-crypto": "^3.4.2", "hypercore-fetch": "^9.9.1", + "hyperswarm": "^4.8.4", "jquery": "^3.7.1", "libp2p": "^1.6.0", "libp2p-gossipsub": "^0.13.0", @@ -6505,9 +6508,9 @@ } }, "node_modules/b4a": { - "version": "1.6.6", - "resolved": "https://registry.npmjs.org/b4a/-/b4a-1.6.6.tgz", - "integrity": "sha512-5Tk1HLk6b6ctmjIkAcU/Ujv/1WqiDl0F0JdRCR80VsOcUlHcu7pWeWRlOqQLHfDEsVx9YH/aif5AG4ehoCtTmg==" + "version": "1.6.7", + "resolved": "https://registry.npmjs.org/b4a/-/b4a-1.6.7.tgz", + "integrity": "sha512-OnAYlL5b7LEkALw87fUVafQw5rVR9RjwGd4KUwNQ6DrrNmaVaUCgLipfVlzrPQ4tWOR9P0IXGNOx50jYCCdSJg==" }, "node_modules/babel-core": { "version": "7.0.0-bridge.0", @@ -9692,15 +9695,16 @@ } }, "node_modules/hyperswarm": { - "version": "4.7.15", - "resolved": "https://registry.npmjs.org/hyperswarm/-/hyperswarm-4.7.15.tgz", - "integrity": "sha512-/KYmjz3j/oN2XlC0aHcxxGVXflVK4FCY6aze8AT0jJNjB6p9Y7NXnF1798p6nT1BH4ODieu+p1yV6cfU+zmcOg==", + "version": "4.8.4", + "resolved": "https://registry.npmjs.org/hyperswarm/-/hyperswarm-4.8.4.tgz", + "integrity": "sha512-+ysGkogdCK3LqF9kpJ5yCp38EkJq0Gr9z0iXLAbFWzKhZfyDMy6YzqIOn1uFDwpNvlHK3tMaSeiBsWkUdT4jHQ==", "dependencies": { "b4a": "^1.3.1", "bare-events": "^2.2.0", "hyperdht": "^6.11.0", "safety-catch": "^1.0.2", - "shuffled-priority-queue": "^2.1.0" + "shuffled-priority-queue": "^2.1.0", + "unslab": "^1.3.0" } }, "node_modules/hypertrace": { @@ -15432,9 +15436,9 @@ } }, "node_modules/unslab": { - "version": "1.2.0", - "resolved": "https://registry.npmjs.org/unslab/-/unslab-1.2.0.tgz", - "integrity": "sha512-VDhUt6dP/pMcisarj64YBmBhiTIcXmpPgRMP7ZoJRCnBdpoPZ+SahMEqwu2VwTuwGEth61F2UGP29ksOidRF6g==", + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/unslab/-/unslab-1.3.0.tgz", + "integrity": "sha512-YATkfKAFj47kTzmiQrWXMyRvaVrHsW6MEALa4bm+FhiA2YG4oira+Z3DXN6LrYOYn2Y8eO94Lwl9DOHjs1FpoQ==", "dependencies": { "b4a": "^1.6.6" } diff --git a/package.json b/package.json index b7be861..4544dee 100644 --- a/package.json +++ b/package.json @@ -112,13 +112,16 @@ "@libp2p/tcp": "^9.0.26", "@libp2p/webrtc": "^4.0.33", "@libp2p/websockets": "^8.0.24", + "b4a": "^1.6.7", "content-type": "^1.0.5", "electron-updater": "^6.2.1", "find-process": "^1.4.7", "fs-extra": "^11.2.0", "helia": "^4.2.1", "hyper-sdk": "^5.0.0", + "hypercore-crypto": "^3.4.2", "hypercore-fetch": "^9.9.1", + "hyperswarm": "^4.8.4", "jquery": "^3.7.1", "libp2p": "^1.6.0", "libp2p-gossipsub": "^0.13.0", diff --git a/src/pages/p2p/chat/app.js b/src/pages/p2p/chat/app.js new file mode 100644 index 0000000..5ba90a7 --- /dev/null +++ b/src/pages/p2p/chat/app.js @@ -0,0 +1,162 @@ +// src/pages/app.js + +// Define the base URL for the chat API using the hyper protocol +const apiBase = 'hyper://chat'; + +// Function to create a new chat room +async function createChatRoom() { + try { + const response = await fetch(`${apiBase}?action=create`, { method: 'POST' }); + if (!response.ok) { + throw new Error(`Failed to create chat room: ${response.statusText}`); + } + const data = await response.json(); + const { roomKey } = data; + console.log(`Chat room created with key: ${roomKey}`); + + await joinChatRoom(roomKey); + startChatRoom(roomKey); + } catch (error) { + console.error('Error creating chat room:', error); + alert(`Error creating chat room: ${error.message}`); + } +} + +// Function to join an existing chat room +async function joinChatRoom(roomKey) { + try { + const response = await fetch(`${apiBase}?action=join&roomKey=${roomKey}`, { + method: 'POST', + }); + if (!response.ok) { + throw new Error(`Failed to join chat room: ${response.statusText}`); + } + const data = await response.json(); + console.log(data.message); + } catch (error) { + console.error('Error joining chat room:', error); + alert(`Error joining chat room: ${error.message}`); + throw error; + } +} + +document.querySelector('#create-chat-room').addEventListener('click', async () => { + await createChatRoom(); +}); + +document.querySelector('#join-form').addEventListener('submit', async (e) => { + e.preventDefault(); + const topic = document.querySelector('#join-chat-room-topic').value.trim(); + if (!topic) { + alert('Please enter a valid chat room topic.'); + return; + } + try { + await joinChatRoom(topic); + startChatRoom(topic); + } catch (error) {} +}); + +function startChatRoom(roomKey) { + document.querySelector('#setup').style.display = 'none'; + document.querySelector('#chat').style.display = 'flex'; + document.querySelector('#chat-room-info').style.display = 'flex'; // Show room info + document.querySelector('#chat-room-topic').textContent = roomKey; + setupMessageReceiver(); +} + +document.querySelector('#message-form').addEventListener('submit', async (e) => { + e.preventDefault(); + const messageInput = document.querySelector('#message'); + const message = messageInput.value.trim(); + if (!message) { + alert('Cannot send an empty message.'); + return; + } + messageInput.value = ''; + sendMessage('You', message); +}); + +async function sendMessage(sender, message) { + try { + onMessageReceived(sender, message); + const response = await fetch(`${apiBase}?action=send`, { + method: 'POST', + headers: { 'Content-Type': 'text/plain' }, + body: message, + }); + if (!response.ok) { + throw new Error(`Failed to send message: ${response.statusText}`); + } + const data = await response.json(); + console.log(data.message); + } catch (error) { + console.error('Error sending message:', error); + alert(`Error sending message: ${error.message}`); + } +} + +function setupMessageReceiver() { + const eventSource = new EventSource(`${apiBase}?action=receive`); + + eventSource.onmessage = function (event) { + const messageData = JSON.parse(event.data); + const sender = messageData.sender; + const message = messageData.message; + onMessageReceived(sender, message); + }; + + eventSource.addEventListener('peersCount', function (event) { + const count = event.data; + updatePeersCount(count); + }); + + eventSource.onerror = function (error) { + console.error('EventSource failed:', error); + alert('Connection to the message stream failed.'); + }; +} + +function onMessageReceived(sender, message) { + const messagesContainer = document.querySelector('#messages'); + const messageDiv = document.createElement('div'); + const messageTextDiv = document.createElement('div'); + const senderAndTimeDiv = document.createElement('div'); // Sender and time in one line + + const time = new Date().toLocaleTimeString([], { hour: '2-digit', minute: '2-digit' }); + + messageDiv.classList.add('message'); + messageTextDiv.innerHTML = formatMessageWithLinks(message); // Format message to include links + senderAndTimeDiv.textContent = `${sender} ยท ${time}`; // Combine sender and timestamp + + if (sender === 'You') { + messageDiv.classList.add('message-right'); // Align to the right + messageTextDiv.classList.add('message-text-right'); + senderAndTimeDiv.classList.add('sender-right'); // Right-align sender and time + } else { + messageDiv.classList.add('message-left'); // Align to the left + messageTextDiv.classList.add('message-text-left'); + senderAndTimeDiv.classList.add('sender-left'); // Left-align sender and time + } + + messageDiv.appendChild(messageTextDiv); + messageDiv.appendChild(senderAndTimeDiv); + + messagesContainer.appendChild(messageDiv); + messagesContainer.scrollTop = messagesContainer.scrollHeight; // Scroll to the bottom +} + +/** + * Helper function to find URLs in the message and convert them to anchor tags + * with target="_blank" and rel="noopener noreferrer" + */ +function formatMessageWithLinks(message) { + const urlPattern = /(\b(https?|ftp|file|hyper|ipfs|ipns):\/\/[-A-Z0-9+&@#\/%?=~_|!:,.;]*[-A-Z0-9+&@#\/%=~_|])/gi; + return message.replace(urlPattern, function(url) { + return `${url}`; + }); +} + +function updatePeersCount(count) { + document.querySelector('#peers-count').textContent = count; +} \ No newline at end of file diff --git a/src/pages/p2p/chat/index.html b/src/pages/p2p/chat/index.html index fea1976..6af7b05 100644 --- a/src/pages/p2p/chat/index.html +++ b/src/pages/p2p/chat/index.html @@ -1,36 +1,208 @@ - - - - Chat - - - - - - - -
-

P2P Chat

-
- -

-
-
- - - - - - + + + + + P2P Chat App + + + +
+
+ +
- or -
+
+ + +
+
+ +
+
+
Topic:
+
Peers: 0
+
+
+
+ + +
+
+
+ + + \ No newline at end of file diff --git a/src/pages/p2p/chat/lib.js b/src/pages/p2p/chat/lib.js deleted file mode 100644 index 8306f19..0000000 --- a/src/pages/p2p/chat/lib.js +++ /dev/null @@ -1,91 +0,0 @@ -async function updateSite(filename, content){ - const resp = await fetch(`${window.origin}/${filename}`, {method: 'put', body: content}) - const newLocation = resp.headers.get('location') - window.location = new URL(newLocation).origin -} - -async function publishSite(){ - let resp = await fetch('ipns://localhost/?key=mysite', {method: 'POST'}) - const key = resp.headers.get('location') - resp = await fetch(key, {method: 'POST', body: window.origin}) - window.location = new URL(resp.headers.get('location')).origin -} - -async function loadFile(filename){ - const resp = await fetch(filename) - const content = await resp.text() - document.getElementById('idFilenameInput').value = filename - document.getElementById('idContentInput').value = content -} - -async function listDir(path){ - const resp = await fetch(window.origin + '?noResolve') - const files = await resp.json() - return files -} - -async function showEditor(){ - let editorDiv = document.getElementById("editor") - if (!editorDiv){ - editorDiv = document.createElement('div') - editorDiv.id = 'editor' - } - editorDiv.style = `display: flex; - flex-direction: column; - position: absolute; - top: 0; - left: 0; - width: 100vw; - height: 100vh; - background-color: rgb(233 233 233 / 95%); - ` - editorDiv.innerHTML = `
-

Files

-
-
- - - - - -
-
` - document.body.appendChild(editorDiv) - const form = document.getElementById('idForm') - form.onsubmit = e => { - e.preventDefault() - const filename = document.getElementById('idFilenameInput').value - const content = document.getElementById('idContentInput').value - updateSite(filename, content) - } - const sidebar = document.getElementById('idSidebar') - const files = await listDir(window.origin) - const list = document.createElement('ul') - list.style = "list-style: none; padding-inline-start: 0;" - files.map( file => { - let li = document.createElement('li') - li.innerHTML = `${file}` - li.querySelector('a').onclick = e => loadFile(file) - list.appendChild(li) - }) - sidebar.appendChild(list) - - if (window.origin.startsWith('ipfs://')){ - const button = document.createElement('button') - button.innerHTML = 'Publish site' - button.onclick = e => { - e.preventDefault() - publishSite() - } - sidebar.appendChild(button) - } -} - -window.addEventListener('load', e => { - document.addEventListener('keydown', e => { - if( e.ctrlKey && e.key == 'i' ){ - showEditor().catch(console.error) - } - }) -}) - diff --git a/src/pages/p2p/chat/pubsub.js b/src/pages/p2p/chat/pubsub.js deleted file mode 100644 index deb5d93..0000000 --- a/src/pages/p2p/chat/pubsub.js +++ /dev/null @@ -1,76 +0,0 @@ -class PubSub { - constructor(channelName){ - this.channelName = channelName - this.onopen = this.onopen.bind(this) - this.onmessage = this.onmessage.bind(this) - this.onerror = this.onerror.bind(this) - } - - async listenForMsg() { - let es = new EventSource(`pubsub://${this.channelName}/?format=json`) - es.onmessage = this.onmessage - es.onopen = this.onopen - es.onerror = this.onerror - } - - onopen(e) { - console.log('onOpen', e) - this.myRand = Math.random() - let message = {msg: "hello", rnd: this.myRand} - fetch(`pubsub://${this.channelName}/`, { - method: 'POST', - body: JSON.stringify(message), - }).catch(console.error) - - document.getElementById('setup').classList.add('hidden') - document.getElementById('chat').classList.remove('hidden') - document.getElementById('roomName').innerHTML = this.channelName - - document.querySelector('#chatForm').addEventListener('submit', e => { - e.preventDefault() - let textInput = document.querySelector('#chat input') - fetch(`pubsub://${this.channelName}/`, { - method: 'POST', - body: JSON.stringify({message: textInput.value}), - }).catch(console.error) - textInput.value = '' - }) - } - - onmessage(e) { - console.log('onmessage', e) - try { - let msg = JSON.parse(e.data) - if (!this.whoami && msg.data.rnd && msg.data.rnd == this.myRand){ - console.log('Hello from myself. Yay!') - this.whoami = msg.from - } else if (msg.data.rnd && msg.data.rnd != this.myRand ){ - console.log('Hello from a friend!') - } else { - let textarea = document.querySelector('#chat textarea') - textarea.value = textarea.value + `\n> ${msg.from}: ${msg.data.message}` - } - } catch (error) { - console.log(error) - } - } - - - onerror(e) { - console.log('onmessage', e) - } - - -} - -window.addEventListener('load', (event) => { - const form = document.getElementById('roomNameForm') - form.addEventListener('submit', event => { - event.preventDefault() - const channelName = document.getElementById('channelNameInput').value - console.log('start pubsub', channelName) - window.pubsub = new PubSub(channelName) - window.pubsub.listenForMsg().catch(console.error) - }) -}) - diff --git a/src/pages/p2p/chat/style.css b/src/pages/p2p/chat/style.css deleted file mode 100644 index 2e04669..0000000 --- a/src/pages/p2p/chat/style.css +++ /dev/null @@ -1,36 +0,0 @@ -html, body { - height: 100%; - margin: 0; - background: var(--peersky-p2p-background-color); -} -.container { - display: flex; - color: var(--peersky-text-color); - flex-direction: column; - height: 100%; - align-items: center; - justify-content: center; -} - -.hidden { - display: none; -} - -#chat textarea { - flex-grow: 1; - margin-bottom: 1em; - width: 80%; - border: 1px solid var(--peersky-primary-color); -} - -#chatForm { - display: flex; - width: 80%; - margin-bottom: 1em; -} - -#messageInput { - flex-grow: 1; - line-height: 1.7; -} - diff --git a/src/protocols/hyper-handler.js b/src/protocols/hyper-handler.js index 5e11573..16de3b1 100644 --- a/src/protocols/hyper-handler.js +++ b/src/protocols/hyper-handler.js @@ -1,76 +1,314 @@ import { create as createSDK } from 'hyper-sdk'; import makeHyperFetch from 'hypercore-fetch'; -import { Readable } from 'stream'; -import fs from "fs-extra"; +import { Readable, PassThrough } from 'stream'; +import fs from 'fs-extra'; +import Hyperswarm from 'hyperswarm'; +import crypto from 'hypercore-crypto'; +import b4a from 'b4a'; -// Initialize the SDK and create the fetch function +let sdk, fetch; +let swarm = null; +let peers = []; + +// Store all active SSE client streams +let sseClients = []; + +// Initialize the SDK and fetch async function initializeHyperSDK(options) { - const sdk = await createSDK(options); - const fetch = makeHyperFetch({ + if (sdk && fetch) return fetch; // Return fetch if already initialized + + console.log('Initializing Hyper SDK...'); + sdk = await createSDK(options); // Create SDK + fetch = makeHyperFetch({ sdk: sdk, - writable: true + writable: true, // Enable write capability }); - - return fetch; + console.log('Hyper SDK initialized.'); + return fetch; // Return the fetch function } -async function * readBody (body, session) { - for (const chunk of body) { - if (chunk.bytes) { - yield await Promise.resolve(chunk.bytes) - } else if (chunk.blobUUID) { - yield await session.getBlobData(chunk.blobUUID) - } else if (chunk.file) { - yield * Readable.from(fs.createReadStream(chunk.file)) - } - } -} - -// Create the Hyper protocol handler +// Protocol handler creation export async function createHandler(options, session) { - const fetch = await initializeHyperSDK(options); + await initializeHyperSDK(options); // Initialize SDK and fetch return async function protocolHandler(req, callback) { const { url, method = 'GET', headers = {}, uploadData } = req; + const urlObj = new URL(url); + const pathname = urlObj.pathname; + const protocol = urlObj.protocol.replace(':', ''); - try { - console.log(`Handling request: ${method} ${url}`); - console.log('Headers:', headers); - - const body = uploadData ? Readable.from(readBody(uploadData, session)) : null - - const response = await fetch(url, { - method, - headers, - body, - duplex: 'half' - }); - - // Use a stream to handle the response data - if (response.body) { - const responseBody = Readable.from(response.body); - console.log('Response received:', response.status); + console.log(`Handling request: ${method} ${url}`); - callback({ - statusCode: response.status, - headers: Object.fromEntries(response.headers), - data: responseBody // Return the stream directly - }); + try { + if (protocol === 'hyper' && (urlObj.hostname === 'chat' || pathname.startsWith('/chat'))) { + await handleChatRequest(req, callback, session); // Handle chat-specific requests } else { - console.warn('No response body received.'); - callback({ - statusCode: response.status, - headers: Object.fromEntries(response.headers), - data: Readable.from('') // Return empty data if no body - }); + await handleHyperRequest(req, callback, session); // Handle general hyper requests } } catch (e) { console.error('Failed to handle Hyper request:', e); callback({ statusCode: 500, headers: { 'Content-Type': 'text/plain' }, - data: Readable.from(`Error handling Hyper request: ${e.message}`) + data: Readable.from([`Error handling Hyper request: ${e.message}`]), }); } }; } + +// Function to handle chat requests +async function handleChatRequest(req, callback, session) { + const { url, method, uploadData } = req; // Extract uploadData + const urlObj = new URL(url); + const searchParams = urlObj.searchParams; + const action = searchParams.get('action'); + + console.log(`Chat request: ${method} ${url}`); + + try { + if (method === 'POST' && action === 'create') { + const roomKey = await generateChatRoom(); + console.log(`Created chat room with key: ${roomKey}`); + // Do NOT automatically join the created chat room on the server + callback({ + statusCode: 200, + headers: { 'Content-Type': 'application/json' }, + data: Readable.from([Buffer.from(JSON.stringify({ roomKey }))]), + }); + } else if (method === 'POST' && action === 'join') { + const roomKey = searchParams.get('roomKey'); + if (!roomKey) { + throw new Error('Missing roomKey in join request'); + } + console.log(`Joining chat room with key: ${roomKey}`); + await joinChatRoom(roomKey); + callback({ + statusCode: 200, + headers: { 'Content-Type': 'application/json' }, + data: Readable.from([Buffer.from(JSON.stringify({ message: 'Joined chat room' }))]), + }); + } else if (method === 'POST' && action === 'send') { + const message = await getRequestBody(uploadData, session); // Corrected + console.log(`Sending message: ${message}`); + sendMessageToPeers(message); + callback({ + statusCode: 200, + headers: { 'Content-Type': 'application/json' }, + data: Readable.from([Buffer.from(JSON.stringify({ message: 'Message sent' }))]), + }); + } else if (method === 'GET' && action === 'receive') { + console.log('Setting up message stream for receiving messages'); + const stream = new PassThrough(); + + // Keep a reference to the stream to prevent garbage collection + session.messageStream = stream; + + // Send keep-alive messages every 15 seconds + const keepAliveInterval = setInterval(() => { + stream.write(':\n\n'); // Comment line in SSE to keep the connection alive + }, 15000); + + // Clean up on stream close + stream.on('close', () => { + clearInterval(keepAliveInterval); + sseClients = sseClients.filter(s => s !== stream); + }); + + // Add the stream to the list of SSE clients + sseClients.push(stream); + + callback({ + statusCode: 200, + headers: { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + }, + data: stream, + }); + } else { + callback({ + statusCode: 400, + headers: { 'Content-Type': 'text/plain' }, + data: Readable.from([Buffer.from('Invalid chat action')]), + }); + } + } catch (e) { + console.error('Error in handleChatRequest:', e); + callback({ + statusCode: 500, + headers: { 'Content-Type': 'text/plain' }, + data: Readable.from([Buffer.from(`Error in chat request: ${e.message}`)]), + }); + } +} + +// Function to handle general Hyper requests +async function handleHyperRequest(req, callback, session) { + const { url, method = 'GET', headers = {}, uploadData } = req; + const fetch = await initializeHyperSDK(); // Ensure fetch is initialized + + let body = null; + if (uploadData) { + try { + const buffer = await readBody(uploadData, session); + body = Readable.from([buffer]); // Pass buffer within an array + } catch (error) { + console.error('Error reading uploadData:', error); + callback({ + statusCode: 400, + headers: { 'Content-Type': 'text/plain' }, + data: Readable.from([Buffer.from('Invalid upload data')]), + }); + return; + } + } + + try { + const response = await fetch(url, { + method, + headers, + body, + duplex: 'half', // Support half-duplex communication + }); + + if (response.body) { + // Determine if the response is binary or text based on Content-Type + const contentType = response.headers.get('Content-Type') || ''; + let responseData; + + if (contentType.startsWith('text/') || contentType.includes('application/json')) { + // For text or JSON responses + const text = await response.text(); + responseData = Buffer.from(text); + } else { + // For binary responses (e.g., images) + const arrayBuffer = await response.arrayBuffer(); + responseData = Buffer.from(arrayBuffer); + } + + const responseBody = Readable.from([responseData]); + console.log('Response received:', response.status); + + callback({ + statusCode: response.status, + headers: Object.fromEntries(response.headers), + data: responseBody, + }); + } else { + console.warn('No response body received.'); + callback({ + statusCode: response.status, + headers: Object.fromEntries(response.headers), + data: Readable.from([Buffer.from('')]), + }); + } + } catch (e) { + console.error('Failed to fetch from Hyper SDK:', e); + callback({ + statusCode: 500, + headers: { 'Content-Type': 'text/plain' }, + data: Readable.from([Buffer.from(`Error fetching data: ${e.message}`)]), + }); + } +} + +// Helper function to read request body +async function readBody(body, session) { + const buffers = []; + for (const data of body || []) { + if (data.bytes) { + buffers.push(data.bytes); + } else if (data.file) { + const fileBuffer = await fs.promises.readFile(data.file); + buffers.push(fileBuffer); + } else if (data.blobUUID) { + const blobData = await session.getBlobData(data.blobUUID); + buffers.push(blobData); + } + } + return Buffer.concat(buffers); +} + +// Helper function to extract request body +async function getRequestBody(uploadData, session) { + try { + const buffer = await readBody(uploadData, session); + console.log('Request body received:', buffer.toString()); + return buffer.toString(); + } catch (error) { + console.error('Error reading request body:', error); + throw error; + } +} + +// Chat room generation and swarm management functions +async function generateChatRoom() { + const topicBuffer = crypto.randomBytes(32); + const roomKey = b4a.toString(topicBuffer, 'hex'); + // Do NOT join the swarm here; let the client handle joining + return roomKey; +} + +async function joinChatRoom(roomKey) { + const topicBuffer = b4a.from(roomKey, 'hex'); + await joinSwarm(topicBuffer); +} + +async function joinSwarm(topicBuffer) { + if (swarm) { + console.log('Already connected to a swarm. Destroying current swarm.'); + swarm.destroy(); + peers = []; + } + + swarm = new Hyperswarm(); + + swarm.on('connection', (peer) => { + console.log('New peer connected'); + peers.push(peer); + + // Notify clients of updated peer count + updatePeersCount(); + + const peerId = b4a.toString(peer.remotePublicKey, 'hex').substr(0, 6); // Get peer ID + + peer.on('data', (data) => { + const message = data.toString(); + console.log(`Received message from peer (${peerId}): ${message}`); + // Broadcast the message along with the sender (peer) ID to all SSE clients + sseClients.forEach(stream => { + stream.write(`data: ${JSON.stringify({ sender: peerId, message })}\n\n`); + }); + }); + + peer.on('close', () => { + console.log('Peer disconnected'); + peers = peers.filter((p) => p !== peer); + // Notify clients of updated peer count + updatePeersCount(); + }); + }); + + const discovery = swarm.join(topicBuffer, { client: true, server: true }); + await discovery.flushed(); + console.log('Joined swarm with topic:', b4a.toString(topicBuffer, 'hex')); +} + +// Send message to all connected peers +function sendMessageToPeers(message) { + console.log(`Broadcasting message to ${peers.length} peers`); + peers.forEach((peer) => { + peer.write(message); + }); +} + +// Update peers count and notify clients +function updatePeersCount() { + const count = peers.length; + console.log(`Peers connected: ${count}`); + // Broadcast the updated peer count to all SSE clients + sseClients.forEach(stream => { + stream.write(`event: peersCount\ndata: ${count}\n\n`); + }); +} From 74c7aaef2e007138a76782c59a60f6d81b010b39 Mon Sep 17 00:00:00 2001 From: Akhilesh Thite Date: Fri, 18 Oct 2024 15:51:11 -0700 Subject: [PATCH 2/2] perf: stream request body to handle large file uploads efficiently --- src/protocols/hyper-handler.js | 214 ++++++++++++++++++--------------- 1 file changed, 114 insertions(+), 100 deletions(-) diff --git a/src/protocols/hyper-handler.js b/src/protocols/hyper-handler.js index 16de3b1..f576959 100644 --- a/src/protocols/hyper-handler.js +++ b/src/protocols/hyper-handler.js @@ -1,10 +1,10 @@ -import { create as createSDK } from 'hyper-sdk'; -import makeHyperFetch from 'hypercore-fetch'; -import { Readable, PassThrough } from 'stream'; -import fs from 'fs-extra'; -import Hyperswarm from 'hyperswarm'; -import crypto from 'hypercore-crypto'; -import b4a from 'b4a'; +import { create as createSDK } from "hyper-sdk"; +import makeHyperFetch from "hypercore-fetch"; +import { Readable, PassThrough } from "stream"; +import fs from "fs-extra"; +import Hyperswarm from "hyperswarm"; +import crypto from "hypercore-crypto"; +import b4a from "b4a"; let sdk, fetch; let swarm = null; @@ -17,13 +17,13 @@ let sseClients = []; async function initializeHyperSDK(options) { if (sdk && fetch) return fetch; // Return fetch if already initialized - console.log('Initializing Hyper SDK...'); + console.log("Initializing Hyper SDK..."); sdk = await createSDK(options); // Create SDK fetch = makeHyperFetch({ sdk: sdk, writable: true, // Enable write capability }); - console.log('Hyper SDK initialized.'); + console.log("Hyper SDK initialized."); return fetch; // Return the fetch function } @@ -32,24 +32,27 @@ export async function createHandler(options, session) { await initializeHyperSDK(options); // Initialize SDK and fetch return async function protocolHandler(req, callback) { - const { url, method = 'GET', headers = {}, uploadData } = req; + const { url, method = "GET", headers = {}, uploadData } = req; const urlObj = new URL(url); const pathname = urlObj.pathname; - const protocol = urlObj.protocol.replace(':', ''); + const protocol = urlObj.protocol.replace(":", ""); console.log(`Handling request: ${method} ${url}`); try { - if (protocol === 'hyper' && (urlObj.hostname === 'chat' || pathname.startsWith('/chat'))) { + if ( + protocol === "hyper" && + (urlObj.hostname === "chat" || pathname.startsWith("/chat")) + ) { await handleChatRequest(req, callback, session); // Handle chat-specific requests } else { await handleHyperRequest(req, callback, session); // Handle general hyper requests } } catch (e) { - console.error('Failed to handle Hyper request:', e); + console.error("Failed to handle Hyper request:", e); callback({ statusCode: 500, - headers: { 'Content-Type': 'text/plain' }, + headers: { "Content-Type": "text/plain" }, data: Readable.from([`Error handling Hyper request: ${e.message}`]), }); } @@ -61,43 +64,47 @@ async function handleChatRequest(req, callback, session) { const { url, method, uploadData } = req; // Extract uploadData const urlObj = new URL(url); const searchParams = urlObj.searchParams; - const action = searchParams.get('action'); + const action = searchParams.get("action"); console.log(`Chat request: ${method} ${url}`); try { - if (method === 'POST' && action === 'create') { + if (method === "POST" && action === "create") { const roomKey = await generateChatRoom(); console.log(`Created chat room with key: ${roomKey}`); // Do NOT automatically join the created chat room on the server callback({ statusCode: 200, - headers: { 'Content-Type': 'application/json' }, + headers: { "Content-Type": "application/json" }, data: Readable.from([Buffer.from(JSON.stringify({ roomKey }))]), }); - } else if (method === 'POST' && action === 'join') { - const roomKey = searchParams.get('roomKey'); + } else if (method === "POST" && action === "join") { + const roomKey = searchParams.get("roomKey"); if (!roomKey) { - throw new Error('Missing roomKey in join request'); + throw new Error("Missing roomKey in join request"); } console.log(`Joining chat room with key: ${roomKey}`); await joinChatRoom(roomKey); callback({ statusCode: 200, - headers: { 'Content-Type': 'application/json' }, - data: Readable.from([Buffer.from(JSON.stringify({ message: 'Joined chat room' }))]), + headers: { "Content-Type": "application/json" }, + data: Readable.from([ + Buffer.from(JSON.stringify({ message: "Joined chat room" })), + ]), }); - } else if (method === 'POST' && action === 'send') { - const message = await getRequestBody(uploadData, session); // Corrected + } else if (method === "POST" && action === "send") { + const message = await getRequestBody(uploadData, session); console.log(`Sending message: ${message}`); sendMessageToPeers(message); callback({ statusCode: 200, - headers: { 'Content-Type': 'application/json' }, - data: Readable.from([Buffer.from(JSON.stringify({ message: 'Message sent' }))]), + headers: { "Content-Type": "application/json" }, + data: Readable.from([ + Buffer.from(JSON.stringify({ message: "Message sent" })), + ]), }); - } else if (method === 'GET' && action === 'receive') { - console.log('Setting up message stream for receiving messages'); + } else if (method === "GET" && action === "receive") { + console.log("Setting up message stream for receiving messages"); const stream = new PassThrough(); // Keep a reference to the stream to prevent garbage collection @@ -105,13 +112,13 @@ async function handleChatRequest(req, callback, session) { // Send keep-alive messages every 15 seconds const keepAliveInterval = setInterval(() => { - stream.write(':\n\n'); // Comment line in SSE to keep the connection alive + stream.write(":\n\n"); // Comment line in SSE to keep the connection alive }, 15000); // Clean up on stream close - stream.on('close', () => { + stream.on("close", () => { clearInterval(keepAliveInterval); - sseClients = sseClients.filter(s => s !== stream); + sseClients = sseClients.filter((s) => s !== stream); }); // Add the stream to the list of SSE clients @@ -120,24 +127,24 @@ async function handleChatRequest(req, callback, session) { callback({ statusCode: 200, headers: { - 'Content-Type': 'text/event-stream', - 'Cache-Control': 'no-cache', - Connection: 'keep-alive', + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + Connection: "keep-alive", }, data: stream, }); } else { callback({ statusCode: 400, - headers: { 'Content-Type': 'text/plain' }, - data: Readable.from([Buffer.from('Invalid chat action')]), + headers: { "Content-Type": "text/plain" }, + data: Readable.from([Buffer.from("Invalid chat action")]), }); } } catch (e) { - console.error('Error in handleChatRequest:', e); + console.error("Error in handleChatRequest:", e); callback({ statusCode: 500, - headers: { 'Content-Type': 'text/plain' }, + headers: { "Content-Type": "text/plain" }, data: Readable.from([Buffer.from(`Error in chat request: ${e.message}`)]), }); } @@ -145,20 +152,19 @@ async function handleChatRequest(req, callback, session) { // Function to handle general Hyper requests async function handleHyperRequest(req, callback, session) { - const { url, method = 'GET', headers = {}, uploadData } = req; + const { url, method = "GET", headers = {}, uploadData } = req; const fetch = await initializeHyperSDK(); // Ensure fetch is initialized let body = null; if (uploadData) { try { - const buffer = await readBody(uploadData, session); - body = Readable.from([buffer]); // Pass buffer within an array + body = readBody(uploadData, session); // Get the stream directly } catch (error) { - console.error('Error reading uploadData:', error); + console.error("Error reading uploadData:", error); callback({ statusCode: 400, - headers: { 'Content-Type': 'text/plain' }, - data: Readable.from([Buffer.from('Invalid upload data')]), + headers: { "Content-Type": "text/plain" }, + data: Readable.from(["Invalid upload data"]), }); return; } @@ -169,75 +175,81 @@ async function handleHyperRequest(req, callback, session) { method, headers, body, - duplex: 'half', // Support half-duplex communication + duplex: "half", // Ensure that the request supports streaming }); if (response.body) { - // Determine if the response is binary or text based on Content-Type - const contentType = response.headers.get('Content-Type') || ''; - let responseData; - - if (contentType.startsWith('text/') || contentType.includes('application/json')) { - // For text or JSON responses - const text = await response.text(); - responseData = Buffer.from(text); - } else { - // For binary responses (e.g., images) - const arrayBuffer = await response.arrayBuffer(); - responseData = Buffer.from(arrayBuffer); - } - - const responseBody = Readable.from([responseData]); - console.log('Response received:', response.status); + // Stream the response body back to the client + const responseStream = Readable.from(response.body); + console.log("Response received:", response.status); callback({ statusCode: response.status, headers: Object.fromEntries(response.headers), - data: responseBody, + data: responseStream, }); } else { - console.warn('No response body received.'); + console.warn("No response body received."); callback({ statusCode: response.status, headers: Object.fromEntries(response.headers), - data: Readable.from([Buffer.from('')]), + data: Readable.from([""]), }); } } catch (e) { - console.error('Failed to fetch from Hyper SDK:', e); + console.error("Failed to fetch from Hyper SDK:", e); callback({ statusCode: 500, - headers: { 'Content-Type': 'text/plain' }, - data: Readable.from([Buffer.from(`Error fetching data: ${e.message}`)]), + headers: { "Content-Type": "text/plain" }, + data: Readable.from([`Error fetching data: ${e.message}`]), }); } } // Helper function to read request body -async function readBody(body, session) { - const buffers = []; - for (const data of body || []) { - if (data.bytes) { - buffers.push(data.bytes); - } else if (data.file) { - const fileBuffer = await fs.promises.readFile(data.file); - buffers.push(fileBuffer); - } else if (data.blobUUID) { - const blobData = await session.getBlobData(data.blobUUID); - buffers.push(blobData); +function readBody(body, session) { + const stream = new PassThrough(); + + (async () => { + try { + for (const data of body || []) { + if (data.bytes) { + stream.write(data.bytes); + } else if (data.file) { + const fileStream = fs.createReadStream(data.file); + fileStream.pipe(stream, { end: false }); + await new Promise((resolve, reject) => { + fileStream.on("end", resolve); + fileStream.on("error", reject); + }); + } else if (data.blobUUID) { + const blobData = await session.getBlobData(data.blobUUID); + stream.write(blobData); + } + } + stream.end(); + } catch (error) { + console.error("Error reading request body:", error); + stream.emit("error", error); } - } - return Buffer.concat(buffers); + })(); + + return stream; } // Helper function to extract request body async function getRequestBody(uploadData, session) { try { - const buffer = await readBody(uploadData, session); - console.log('Request body received:', buffer.toString()); + const stream = readBody(uploadData, session); + const chunks = []; + for await (const chunk of stream) { + chunks.push(chunk); + } + const buffer = Buffer.concat(chunks); + console.log("Request body received:", buffer.toString()); return buffer.toString(); } catch (error) { - console.error('Error reading request body:', error); + console.error("Error reading request body:", error); throw error; } } @@ -245,45 +257,47 @@ async function getRequestBody(uploadData, session) { // Chat room generation and swarm management functions async function generateChatRoom() { const topicBuffer = crypto.randomBytes(32); - const roomKey = b4a.toString(topicBuffer, 'hex'); + const roomKey = b4a.toString(topicBuffer, "hex"); // Do NOT join the swarm here; let the client handle joining return roomKey; } async function joinChatRoom(roomKey) { - const topicBuffer = b4a.from(roomKey, 'hex'); + const topicBuffer = b4a.from(roomKey, "hex"); await joinSwarm(topicBuffer); } async function joinSwarm(topicBuffer) { if (swarm) { - console.log('Already connected to a swarm. Destroying current swarm.'); + console.log("Already connected to a swarm. Destroying current swarm."); swarm.destroy(); peers = []; } swarm = new Hyperswarm(); - swarm.on('connection', (peer) => { - console.log('New peer connected'); + swarm.on("connection", (peer) => { + console.log("New peer connected"); peers.push(peer); - + // Notify clients of updated peer count updatePeersCount(); - - const peerId = b4a.toString(peer.remotePublicKey, 'hex').substr(0, 6); // Get peer ID - - peer.on('data', (data) => { + + const peerId = b4a.toString(peer.remotePublicKey, "hex").substr(0, 6); // Get peer ID + + peer.on("data", (data) => { const message = data.toString(); console.log(`Received message from peer (${peerId}): ${message}`); // Broadcast the message along with the sender (peer) ID to all SSE clients - sseClients.forEach(stream => { - stream.write(`data: ${JSON.stringify({ sender: peerId, message })}\n\n`); + sseClients.forEach((stream) => { + stream.write( + `data: ${JSON.stringify({ sender: peerId, message })}\n\n` + ); }); }); - - peer.on('close', () => { - console.log('Peer disconnected'); + + peer.on("close", () => { + console.log("Peer disconnected"); peers = peers.filter((p) => p !== peer); // Notify clients of updated peer count updatePeersCount(); @@ -292,7 +306,7 @@ async function joinSwarm(topicBuffer) { const discovery = swarm.join(topicBuffer, { client: true, server: true }); await discovery.flushed(); - console.log('Joined swarm with topic:', b4a.toString(topicBuffer, 'hex')); + console.log("Joined swarm with topic:", b4a.toString(topicBuffer, "hex")); } // Send message to all connected peers @@ -308,7 +322,7 @@ function updatePeersCount() { const count = peers.length; console.log(`Peers connected: ${count}`); // Broadcast the updated peer count to all SSE clients - sseClients.forEach(stream => { + sseClients.forEach((stream) => { stream.write(`event: peersCount\ndata: ${count}\n\n`); }); }