Skip to content

Commit

Permalink
Implement remote file system
Browse files Browse the repository at this point in the history
  • Loading branch information
simolus3 committed Oct 22, 2024
1 parent b0513b8 commit ce1be73
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 9 deletions.
61 changes: 61 additions & 0 deletions sqlite3_web/lib/src/client.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import 'dart:async';
import 'dart:js_interop';
import 'dart:js_interop_unsafe';
import 'dart:typed_data';

import 'package:sqlite3/common.dart';
import 'package:web/web.dart'
Expand Down Expand Up @@ -142,6 +143,66 @@ final class RemoteDatabase implements Database {
}
}

final class RemoteFileSystem implements FileSystem {
final RemoteDatabase database;

RemoteFileSystem(this.database);

@override
Future<bool> exists(FileType type) async {
final response = await database.connection.sendRequest(
FileSystemExistsQuery(
databaseId: database.databaseId,
fsType: type,
requestId: 0,
),
MessageType.simpleSuccessResponse,
);

return (response.response as JSBoolean).toDart;
}

@override
Future<void> flush() async {
await database.connection.sendRequest(
FileSystemFlushRequest(databaseId: database.databaseId, requestId: 0),
MessageType.simpleSuccessResponse,
);
}

@override
Future<Uint8List> readFile(FileType type) async {
final response = await database.connection.sendRequest(
FileSystemAccess(
databaseId: database.databaseId,
requestId: 0,
buffer: null,
fsType: type,
),
MessageType.simpleSuccessResponse,
);

final buffer = (response.response as JSArrayBuffer);
return buffer.toDart.asUint8List();
}

@override
Future<void> writeFile(FileType type, Uint8List content) async {
// We need to copy since we're about to transfer contents over
final copy = Uint8List(content.length)..setAll(0, content);

await database.connection.sendRequest(
FileSystemAccess(
databaseId: database.databaseId,
requestId: 0,
buffer: copy.buffer.toJS,
fsType: type,
),
MessageType.simpleSuccessResponse,
);
}
}

final class WorkerConnection extends ProtocolChannel {
final StreamController<Notification> notifications =
StreamController.broadcast();
Expand Down
29 changes: 27 additions & 2 deletions sqlite3_web/lib/src/protocol.dart
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ enum MessageType<T extends Message> {
runQuery<RunQuery>(),
fileSystemExists<FileSystemExistsQuery>(),
fileSystemAccess<FileSystemAccess>(),
fileSystemFlush<FileSystemFlushRequest>(),
connect<ConnectRequest>(),
startFileSystemServer<StartFileSystemServer>(),
updateRequest<UpdateStreamRequest>(),
Expand Down Expand Up @@ -86,6 +87,7 @@ sealed class Message {
MessageType.runQuery => RunQuery.deserialize(object),
MessageType.fileSystemExists => FileSystemExistsQuery.deserialize(object),
MessageType.fileSystemAccess => FileSystemAccess.deserialize(object),
MessageType.fileSystemFlush => FileSystemFlushRequest.deserialize(object),
MessageType.connect => ConnectRequest.deserialize(object),
MessageType.closeDatabase => CloseDatabase.deserialize(object),
MessageType.openAdditionalConnection =>
Expand Down Expand Up @@ -348,6 +350,24 @@ final class FileSystemExistsQuery extends Request {
}
}

/// Requests the worker to flush the file system for a database.
final class FileSystemFlushRequest extends Request {
@override
MessageType<Message> get type => MessageType.fileSystemFlush;

FileSystemFlushRequest({
required super.databaseId,
required super.requestId,
});

factory FileSystemFlushRequest.deserialize(JSObject object) {
return FileSystemFlushRequest(
databaseId: object.databaseId,
requestId: object.requestId,
);
}
}

/// Read or write to files of an opened database.
///
/// For reads, other side will respond with a [SimpleSuccessResponse] containing
Expand Down Expand Up @@ -385,8 +405,6 @@ final class FileSystemAccess extends Request {
object[_UniqueFieldNames.buffer] = buffer;
object[_UniqueFieldNames.fileType] = fsType.index.toJS;

// false positive? dart2js seems to emit a null check as it should
// ignore: pattern_never_matches_value_type
if (buffer case final buffer?) {
transferred.add(buffer);
}
Expand Down Expand Up @@ -462,6 +480,9 @@ final class OpenAdditonalConnection extends Request {
MessageType<Message> get type => MessageType.openAdditionalConnection;
}

@JS('ArrayBuffer')
external JSFunction get _arrayBufferConstructor;

final class SimpleSuccessResponse extends Response {
final JSAny? response;

Expand All @@ -481,6 +502,10 @@ final class SimpleSuccessResponse extends Response {
void serialize(JSObject object, List<JSObject> transferred) {
super.serialize(object, transferred);
object[_UniqueFieldNames.responseData] = response;

if (response.instanceof(_arrayBufferConstructor)) {
transferred.add(response as JSObject);
}
}
}

Expand Down
7 changes: 4 additions & 3 deletions sqlite3_web/lib/src/types.dart
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,13 @@ final class RemoteException implements Exception {
}

abstract class FileSystem {
StorageMode get storage;
String get databaseName;

Future<bool> exists(FileType type);
Future<Uint8List> readFile(FileType type);
Future<void> writeFile(FileType type, Uint8List content);

/// If the file system hosting the database in the worker is not synchronous,
/// flushes pending writes.
Future<void> flush();
}

/// An enumeration of features not supported by the current browsers.
Expand Down
53 changes: 49 additions & 4 deletions sqlite3_web/lib/src/worker.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import 'dart:async';
import 'dart:js_interop';
import 'dart:js_interop_unsafe';
import 'dart:typed_data';
import 'package:sqlite3/wasm.dart';
import 'package:stream_channel/stream_channel.dart';
import 'package:web/web.dart'
Expand All @@ -21,6 +22,7 @@ import 'database.dart';
import 'channel.dart';
import 'protocol.dart';
import 'shared.dart';
import 'types.dart';

sealed class WorkerEnvironment {
WorkerEnvironment._();
Expand Down Expand Up @@ -239,10 +241,46 @@ final class _ClientConnection extends ProtocolChannel
await database.close();
return SimpleSuccessResponse(
response: null, requestId: request.requestId);
case FileSystemExistsQuery():
throw UnimplementedError();
case FileSystemAccess():
throw UnimplementedError();
case FileSystemFlushRequest():
if (database?.database.vfs case IndexedDbFileSystem idb) {
await idb.flush();
}

return SimpleSuccessResponse(
response: null, requestId: request.requestId);
case FileSystemExistsQuery(:final fsType):
await database!.database.opened;
final vfs = database.database.vfs!;
final exists = vfs.xAccess(fsType.pathInVfs, 0) == 1;

return SimpleSuccessResponse(
response: exists.toJS, requestId: request.requestId);
case FileSystemAccess(:final buffer, :final fsType):
await database!.database.opened;
final vfs = database.database.vfs!;
final file = vfs
.xOpen(
Sqlite3Filename(fsType.pathInVfs), SqlFlag.SQLITE_OPEN_CREATE)
.file;

try {
if (buffer != null) {
final asDartBuffer = buffer.toDart;
file.xTruncate(asDartBuffer.lengthInBytes);
file.xWrite(asDartBuffer.asUint8List(), 0);

return SimpleSuccessResponse(
response: null, requestId: request.requestId);
} else {
final buffer = Uint8List(file.xFileSize());
file.xRead(buffer, 0);

return SimpleSuccessResponse(
response: buffer.buffer.toJS, requestId: request.requestId);
}
} finally {
file.xClose();
}
}
}

Expand All @@ -268,6 +306,13 @@ final class _ClientConnection extends ProtocolChannel
}
}

extension on FileType {
String get pathInVfs => switch (this) {
FileType.database => '/database',
FileType.journal => '/database-journal',
};
}

final class DatabaseState {
final WorkerRunner runner;
final int id;
Expand Down

0 comments on commit ce1be73

Please sign in to comment.