Skip to content

Commit

Permalink
Fix file data streaming for long polling (#14659)
Browse files Browse the repository at this point in the history
  • Loading branch information
msujew authored Dec 24, 2024
1 parent ea87531 commit d7bc169
Showing 1 changed file with 32 additions and 34 deletions.
66 changes: 32 additions & 34 deletions packages/filesystem/src/common/remote-file-system-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export interface RemoteFileSystemServer extends RpcServer<RemoteFileSystemClient
open(resource: string, opts: FileOpenOptions): Promise<number>;
close(fd: number): Promise<void>;
read(fd: number, pos: number, length: number): Promise<{ bytes: Uint8Array; bytesRead: number; }>;
readFileStream(resource: string, opts: FileReadStreamOptions, token: CancellationToken): Promise<number>;
readFileStream(resource: string, handle: number, opts: FileReadStreamOptions, token: CancellationToken): Promise<void>;
readFile(resource: string): Promise<Uint8Array>;
write(fd: number, pos: number, data: Uint8Array, offset: number, length: number): Promise<number>;
writeFile(resource: string, content: Uint8Array, opts: FileWriteOptions): Promise<void>;
Expand Down Expand Up @@ -162,6 +162,8 @@ export class RemoteFileSystemProvider implements Required<FileSystemProvider>, D
protected readonly readyDeferred = new Deferred<void>();
readonly ready = this.readyDeferred.promise;

protected streamHandleSeq = 0;

/**
* Wrapped remote filesystem.
*/
Expand Down Expand Up @@ -251,36 +253,35 @@ export class RemoteFileSystemProvider implements Required<FileSystemProvider>, D

readFileStream(resource: URI, opts: FileReadStreamOptions, token: CancellationToken): ReadableStreamEvents<Uint8Array> {
const capturedError = new Error();
// eslint-disable-next-line @typescript-eslint/no-shadow
const stream = newWriteableStream<Uint8Array>(data => BinaryBuffer.concat(data.map(data => BinaryBuffer.wrap(data))).buffer);
this.server.readFileStream(resource.toString(), opts, token).then(streamHandle => {
const stream = newWriteableStream<Uint8Array>(data => BinaryBuffer.concat(data.map(item => BinaryBuffer.wrap(item))).buffer);
const streamHandle = this.streamHandleSeq++;
const toDispose = new DisposableCollection(
token.onCancellationRequested(() => stream.end(cancelled())),
this.onFileStreamData(([handle, data]) => {
if (streamHandle === handle) {
stream.write(data);
}
}),
this.onFileStreamEnd(([handle, error]) => {
if (streamHandle === handle) {
if (error) {
const code = ('code' in error && error.code) || FileSystemProviderErrorCode.Unknown;
const fileOperationError = new FileSystemProviderError(error.message, code);
fileOperationError.name = error.name;
const capturedStack = capturedError.stack || '';
fileOperationError.stack = `${capturedStack}\nCaused by: ${error.stack}`;
stream.end(fileOperationError);
} else {
stream.end();
}
}
})
);
stream.on('end', () => toDispose.dispose());
this.server.readFileStream(resource.toString(), streamHandle, opts, token).then(() => {
if (token.isCancellationRequested) {
stream.end(cancelled());
return;
}
const toDispose = new DisposableCollection(
token.onCancellationRequested(() => stream.end(cancelled())),
this.onFileStreamData(([handle, data]) => {
if (streamHandle === handle) {
stream.write(data);
}
}),
this.onFileStreamEnd(([handle, error]) => {
if (streamHandle === handle) {
if (error) {
const code = ('code' in error && error.code) || FileSystemProviderErrorCode.Unknown;
const fileOperationError = new FileSystemProviderError(error.message, code);
fileOperationError.name = error.name;
const capturedStack = capturedError.stack || '';
fileOperationError.stack = `${capturedStack}\nCaused by: ${error.stack}`;
stream.end(fileOperationError);
} else {
stream.end();
}
}
})
);
stream.on('end', () => toDispose.dispose());
}, error => stream.end(error));
return stream;
}
Expand Down Expand Up @@ -528,11 +529,8 @@ export class FileSystemProviderServer implements RemoteFileSystemServer {
}
}

protected readFileStreamSeq = 0;

async readFileStream(resource: string, opts: FileReadStreamOptions, token: CancellationToken): Promise<number> {
async readFileStream(resource: string, handle: number, opts: FileReadStreamOptions, token: CancellationToken): Promise<void> {
if (hasFileReadStreamCapability(this.provider)) {
const handle = this.readFileStreamSeq++;
const stream = this.provider.readFileStream(new URI(resource), opts, token);
stream.on('data', data => this.client?.onFileStreamData(handle, data));
stream.on('error', error => {
Expand All @@ -541,9 +539,9 @@ export class FileSystemProviderServer implements RemoteFileSystemServer {
this.client?.onFileStreamEnd(handle, { code, name, message, stack });
});
stream.on('end', () => this.client?.onFileStreamEnd(handle, undefined));
return handle;
} else {
throw new Error('not supported');
}
throw new Error('not supported');
}

}

0 comments on commit d7bc169

Please sign in to comment.