Skip to content

Commit

Permalink
Implement hybrid mode for BYOB and default reading
Browse files Browse the repository at this point in the history
  • Loading branch information
Borewit committed Jan 18, 2025
1 parent f8f14f7 commit 8b5f206
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 49 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ This module is used by [strtok3](https://github.com/Borewit/strtok3)
The `peek-readable` contains one class: `StreamReader`, which reads from a [stream.Readable](https://nodejs.org/api/stream.html#stream_class_stream_readable).

- Class `StreamReader` is used to read from Node.js [stream.Readable](https://nodejs.org/api/stream.html#stream_class_stream_readable).
- Class `WebStreamReader` is used to read from [ReadableStream<Uint8Array>](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream)
- Class `WebStreamByobReader` is used to read from [ReadableStream<Uint8Array>](https://developer.mozilla.org/docs/Web/API/ReadableStream)

## Compatibility

Expand All @@ -34,7 +34,7 @@ npm install --save peek-readable

## API Documentation

Both `StreamReader` and `WebStreamReader` implement the [IStreamReader interface](#istreamreader-interface).
Both `StreamReader` and `WebStreamByobReader` implement the [IStreamReader interface](#istreamreader-interface).

### `IStreamReader` Interface

Expand Down
3 changes: 2 additions & 1 deletion biome.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
"style":{
"useConsistentBuiltinInstantiation": "error",
"useThrowNewError": "error",
"useThrowOnlyError": "error"
"useThrowOnlyError": "error",
"noParameterAssign": { "level": "off"}
}
}
},
Expand Down
15 changes: 5 additions & 10 deletions lib/WebStreamReader.ts → lib/WebStreamByobReader.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,16 @@
import type { ReadableStream as NodeReadableStream, ReadableStreamBYOBReader } from 'node:stream/web';
import type { ReadableStreamBYOBReader } from 'node:stream/web';
import { EndOfStreamError } from './EndOfStreamError.js';
export { EndOfStreamError } from './EndOfStreamError.js';
import { AbstractStreamReader } from "./AbstractStreamReader.js";

export type AnyWebByteStream = NodeReadableStream<Uint8Array> | ReadableStream<Uint8Array>;

/**
* Read from a WebStream
* Read from a WebStream using a BYOB reader
* Reference: https://nodejs.org/api/webstreams.html#class-readablestreambyobreader
*/
export class WebStreamReader extends AbstractStreamReader {

private reader: ReadableStreamBYOBReader;
export class WebStreamByobReader extends AbstractStreamReader {

public constructor(stream: AnyWebByteStream) {
public constructor(private reader: ReadableStreamBYOBReader) {
super();
this.reader = stream.getReader({ mode: 'byob' }) as ReadableStreamBYOBReader;
}

protected async readFromStream(buffer: Uint8Array, offset: number, length: number): Promise<number> {
Expand All @@ -42,4 +37,4 @@ export class WebStreamReader extends AbstractStreamReader {
await this.reader.cancel(); // Signals a loss of interest in the stream by a consumer
this.reader.releaseLock();
}
}
}
77 changes: 77 additions & 0 deletions lib/WebStreamDefaultReader.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import type { ReadableStreamDefaultReader } from 'node:stream/web';
import { EndOfStreamError } from './EndOfStreamError.js';
import { AbstractStreamReader } from "./AbstractStreamReader.js";

export class WebStreamDefaultReader extends AbstractStreamReader {
private buffer: Uint8Array | null = null; // Internal buffer to store excess data
private bufferOffset = 0; // Current position in the buffer

public constructor(private reader: ReadableStreamDefaultReader) {
super();
}

protected async readFromStream(buffer: Uint8Array, offset: number, length: number): Promise<number> {
if (this.endOfStream) {
throw new EndOfStreamError();
}

let totalBytesRead = 0;

// Serve from the internal buffer first
if (this.buffer) {
const remainingInBuffer = this.buffer.byteLength - this.bufferOffset;
const toCopy = Math.min(remainingInBuffer, length);
buffer.set(this.buffer.subarray(this.bufferOffset, this.bufferOffset + toCopy), offset);
this.bufferOffset += toCopy;
totalBytesRead += toCopy;
length -= toCopy;
offset += toCopy;

// If the buffer is exhausted, clear it
if (this.bufferOffset >= this.buffer.byteLength) {
this.buffer = null;
this.bufferOffset = 0;
}
}

// Continue reading from the stream if more data is needed
while (length > 0 && !this.endOfStream) {
const result = await this.reader.read();

if (result.done) {
this.endOfStream = true;
break;
}

if (result.value) {
const chunk = result.value;

// If the chunk is larger than the requested length, store the excess
if (chunk.byteLength > length) {
buffer.set(chunk.subarray(0, length), offset);
this.buffer = chunk;
this.bufferOffset = length; // Keep track of the unconsumed part
totalBytesRead += length;
return totalBytesRead;
}

// Otherwise, consume the entire chunk
buffer.set(chunk, offset);
totalBytesRead += chunk.byteLength;
length -= chunk.byteLength;
offset += chunk.byteLength;
}
}

if (totalBytesRead === 0 && this.endOfStream) {
throw new EndOfStreamError();
}

return totalBytesRead;
}

public async abort(): Promise<void> {
await this.reader.cancel();
this.reader.releaseLock();
}
}
13 changes: 13 additions & 0 deletions lib/WebStreamReaderFactory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import type { ReadableStream as NodeReadableStream, ReadableStreamDefaultReader } from 'node:stream/web';
import { WebStreamByobReader } from './WebStreamByobReader.js';
import { WebStreamDefaultReader } from './WebStreamDefaultReader.js';
export type AnyWebByteStream = NodeReadableStream<Uint8Array> | ReadableStream<Uint8Array>;

export function makeWebStreamReader(stream: AnyWebByteStream): WebStreamByobReader | WebStreamDefaultReader {
const reader = stream.getReader({mode: "byob"});
if (reader instanceof ReadableStreamBYOBReader) {
return new WebStreamByobReader(reader);
}
// Fall back on default reader
return new WebStreamDefaultReader(reader as ReadableStreamDefaultReader);
}
8 changes: 7 additions & 1 deletion lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@

import { AbstractStreamReader } from './AbstractStreamReader.js';
import type { WebStreamByobReader } from './WebStreamByobReader.js';

export { EndOfStreamError } from './EndOfStreamError.js';
export { StreamReader } from './StreamReader.js';
export { WebStreamReader, type AnyWebByteStream } from './WebStreamReader.js';
export { WebStreamByobReader } from './WebStreamByobReader.js';
export { WebStreamDefaultReader } from './WebStreamDefaultReader.js';
export type { IStreamReader } from './AbstractStreamReader.js';
export { type AnyWebByteStream, makeWebStreamReader } from './WebStreamReaderFactory.js'
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "peek-readable",
"version": "5.3.1",
"version": "5.4.0-beta.1",
"description": "Read and peek from a readable stream",
"author": {
"name": "Borewit",
Expand Down
49 changes: 36 additions & 13 deletions test/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ import chaiAsPromised from 'chai-as-promised';
import {EventEmitter} from 'node:events';
import * as fs from 'node:fs';
import {Readable} from 'node:stream';
import {EndOfStreamError, type IStreamReader, StreamReader, WebStreamReader} from '../lib/index.js';
import { EndOfStreamError, type IStreamReader, makeWebStreamReader, StreamReader } from '../lib/index.js';
import {SourceStream, stringToReadableStream} from './util.js';

use(chaiAsPromised);

interface StreamFactorySuite {
description: string;
isDefaultWebReader?: true;
fromString: (input: string, delay?: number) => IStreamReader;
}

Expand All @@ -21,8 +22,12 @@ describe('Matrix', () => {
description: 'Node.js StreamReader',
fromString: (input, delay) => new StreamReader(new SourceStream(input, delay))
}, {
description: 'WebStream Reader',
fromString: (input, delay) => new WebStreamReader(stringToReadableStream(input, delay))
description: 'WebStream BYOB Reader',
fromString: (input, delay) => makeWebStreamReader(stringToReadableStream(input, false, delay))
}, {
description: 'WebStream Default Reader',
isDefaultWebReader: true,
fromString: (input, delay) => makeWebStreamReader(stringToReadableStream(input, true, delay ))
}];

streamFactories
Expand Down Expand Up @@ -56,7 +61,7 @@ describe('Matrix', () => {
assert.strictEqual(bytesRead, 5, 'Should read 5 bytes');
assert.strictEqual(new TextDecoder('latin1').decode(uint8Array), 'peter');

// should should reject at the end of the stream
// should reject at the end of the stream
uint8Array = new Uint8Array(1);
try {
await streamReader.read(uint8Array, 0, 1);
Expand All @@ -74,7 +79,11 @@ describe('Matrix', () => {
return uint8Array[0];
}

it('should support concurrent reads', () => {
it('should support concurrent reads', async function () {

if (factory.isDefaultWebReader) {
this.skip(); // Default web reader does not support concurrent reads
}

const streamReader = factory.fromString('\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09');

Expand All @@ -84,11 +93,10 @@ describe('Matrix', () => {
prom.push(readByteAsNumber(streamReader));
}

return Promise.all(prom).then(res => {
for (let i = 0; i < 10; ++i) {
assert.strictEqual(res[i], i);
}
});
const res = await Promise.all(prom);
for (let i = 0; i < 10; ++i) {
assert.strictEqual(res[i], i);
}

});
});
Expand Down Expand Up @@ -389,14 +397,29 @@ describe('Node.js StreamReader', () => {

});

describe('WebStreamReader', () => {
describe('BYOB WebStreamReader', () => {

it('abort() should release stream-lock', async () => {

const readableStream = stringToReadableStream('abc', false);
assert.isFalse(readableStream.locked, 'stream is unlocked before initializing tokenizer');

const webStreamReader = makeWebStreamReader(readableStream);
assert.isTrue(readableStream.locked, 'stream is locked after initializing tokenizer');

await webStreamReader.abort();
assert.isFalse(readableStream.locked, 'stream is unlocked after closing tokenizer');
});
});

describe('Default WebStreamReader', () => {

it('abort() should release stream-lock', async () => {

const readableStream = stringToReadableStream('abc');
const readableStream = stringToReadableStream('abc', true);
assert.isFalse(readableStream.locked, 'stream is unlocked before initializing tokenizer');

const webStreamReader = new WebStreamReader(readableStream);
const webStreamReader = makeWebStreamReader(readableStream);
assert.isTrue(readableStream.locked, 'stream is locked after initializing tokenizer');

await webStreamReader.abort();
Expand Down
63 changes: 42 additions & 21 deletions test/util.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
// Utilities for testing

import { Readable } from 'node:stream';
import { ReadableStream} from 'node:stream/web';
import { ReadableStream } from 'node:stream/web';

/**
* A mock Node.js readable-stream, using string to read from
*/
export class SourceStream extends Readable {

private buf: Uint8Array;
private readonly buf: Uint8Array;

constructor(private str = '', private delay = 0) {
super();
Expand All @@ -26,7 +26,7 @@ export class SourceStream extends Readable {


// Function to convert a string to a BYOB ReadableStream
function stringToBYOBStream(inputString: string, delay = 0): ReadableStream<Uint8Array> {
function stringReadableStream(inputString: string, delay = 0): ReadableStream<Uint8Array> {
// Convert the string to a Uint8Array using TextEncoder
const encoder = new TextEncoder();
const uint8Array = encoder.encode(inputString);
Expand All @@ -39,31 +39,52 @@ function stringToBYOBStream(inputString: string, delay = 0): ReadableStream<Uint
async pull(controller) {
// Check if there is data left to be pushed
if (position < uint8Array.length) {
// Push the chunk to the controller
const remaining = uint8Array.length - position;

if (controller.byobRequest) {
const remaining = uint8Array.length - position;
// @ts-ignore
const v = controller.byobRequest.view;
const bytesRead = Math.min(remaining, v.byteLength);
v.set(uint8Array.subarray(position, position + bytesRead));
// BYOB path
const view = (controller.byobRequest as ReadableStreamBYOBRequest).view as Uint8Array;
const bytesRead = Math.min(remaining, view.byteLength);
view.set(uint8Array.subarray(position, position + bytesRead));
position += bytesRead;
// @ts-ignore
controller.byobRequest.respond(bytesRead);
(controller.byobRequest as ReadableStreamBYOBRequest).respond(bytesRead);
} else {
setTimeout(() => {
controller.enqueue(uint8Array);
position = uint8Array.length;
}, delay);
}
if (position >= uint8Array.length) {
controller.close();
// Non-BYOB path
const chunk = uint8Array.subarray(position, position + remaining);
position += remaining;

if (delay > 0) {
await new Promise((resolve) => setTimeout(resolve, delay));
}
controller.enqueue(chunk);
}
}

// Close the stream if all data has been pushed
if (position >= uint8Array.length) {
controller.close();
}
},
cancel() {
// Handle stream cancellation
position = uint8Array.length;
}
});
}

// Function to convert a string to a ReadableStreamBYOBReader
export function stringToReadableStream(inputString: string, delay?: number): ReadableStream<Uint8Array> {
return stringToBYOBStream(inputString, delay);
export function stringToReadableStream(inputString: string, forceDefault: boolean, delay?: number): ReadableStream<Uint8Array> {
const stream = stringReadableStream(inputString, delay);
const _getReader = stream.getReader.bind(stream);

// @ts-ignore
stream.getReader = (options?: { mode?: string }) => {
if (forceDefault) {
// Force returning the default reader
return _getReader(); // Call without options for a default reader
}
// @ts-ignore
return _getReader(options); // Pass through other options
};

return stream;
}

0 comments on commit 8b5f206

Please sign in to comment.