Skip to content

Commit

Permalink
basic blossom without auth
Browse files Browse the repository at this point in the history
  • Loading branch information
leo-lox committed Jan 10, 2025
1 parent a3b8639 commit b91cee3
Show file tree
Hide file tree
Showing 14 changed files with 684 additions and 19 deletions.
2 changes: 1 addition & 1 deletion packages/amber/pubspec.lock
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ packages:
path: "../ndk"
relative: true
source: path
version: "0.2.0"
version: "0.2.4"
package_config:
dependency: transitive
description:
Expand Down
2 changes: 1 addition & 1 deletion packages/isar/pubspec.lock
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ packages:
path: "../ndk"
relative: true
source: path
version: "0.2.0"
version: "0.2.4"
node_preamble:
dependency: transitive
description:
Expand Down
42 changes: 42 additions & 0 deletions packages/ndk/lib/data_layer/data_sources/http_request.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import 'dart:convert';
import 'dart:typed_data';

import 'package:http/http.dart' as http;

Expand All @@ -21,4 +22,45 @@ class HttpRequestDS {
}
return jsonDecode(response.body);
}

Future<http.Response> put({
required Uri url,
required Uint8List body,
required headers,
}) async {
http.Response response = await _client.put(
url,
body: body,
headers: headers,
);

if (response.statusCode != 200) {
throw Exception(
"error fetching STATUS: ${response.statusCode}, Link: $url");
}

return response;
}

Future<http.Response> get(Uri url) async {
http.Response response = await _client.get(url);

if (response.statusCode != 200) {
throw Exception(
"error fetching STATUS: ${response.statusCode}, Link: $url");
}

return response;
}

Future<http.Response> delete(Uri url) async {
http.Response response = await _client.delete(url);

if (response.statusCode != 200) {
throw Exception(
"error fetching STATUS: ${response.statusCode}, Link: $url");
}

return response;
}
}
207 changes: 207 additions & 0 deletions packages/ndk/lib/data_layer/repositories/blossom/blossom_impl.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
import 'dart:convert';
import 'dart:typed_data';

import '../../../domain_layer/repositories/blossom.dart';
import '../../data_sources/http_request.dart';

class BlossomRepositoryImpl implements BlossomRepository {
final HttpRequestDS client;
final List<String> serverUrls;

BlossomRepositoryImpl({
required this.client,
required this.serverUrls,
}) {
if (serverUrls.isEmpty) {
throw ArgumentError('At least one server URL must be provided');
}
}

@override
Future<List<BlobUploadResult>> uploadBlob(
Uint8List data, {
String? contentType,
UploadStrategy strategy = UploadStrategy.mirrorAfterSuccess,
}) async {
switch (strategy) {
case UploadStrategy.mirrorAfterSuccess:
return _uploadWithMirroring(data, contentType);
case UploadStrategy.allSimultaneous:
return _uploadToAllServers(data, contentType);
case UploadStrategy.firstSuccess:
return _uploadToFirstSuccess(data, contentType);
}
}

Future<List<BlobUploadResult>> _uploadWithMirroring(
Uint8List data,
String? contentType,
) async {
final results = <BlobUploadResult>[];

// Try primary upload
final primaryResult = await _uploadToServer(
serverUrls.first,
data,
contentType,
);
results.add(primaryResult);

if (primaryResult.success) {
// Mirror to other servers
final mirrorResults = await Future.wait(serverUrls
.skip(1)
.map((url) => _uploadToServer(url, data, contentType)));
results.addAll(mirrorResults);
}

return results;
}

Future<List<BlobUploadResult>> _uploadToAllServers(
Uint8List data,
String? contentType,
) async {
final results = await Future.wait(
serverUrls.map((url) => _uploadToServer(url, data, contentType)));
return results;
}

Future<List<BlobUploadResult>> _uploadToFirstSuccess(
Uint8List data,
String? contentType,
) async {
for (final url in serverUrls) {
final result = await _uploadToServer(url, data, contentType);
if (result.success) {
return [result];
}
}

// If all servers failed, return all errors
final results = await _uploadToAllServers(data, contentType);
return results;
}

Future<BlobUploadResult> _uploadToServer(
String serverUrl,
Uint8List data,
String? contentType,
) async {
try {
final response = await client.put(
url: Uri.parse('$serverUrl/upload'),
body: data,
headers: {
if (contentType != null) 'Content-Type': contentType,
'Content-Length': '${data.length}',
},
);

if (response.statusCode != 200) {
return BlobUploadResult(
serverUrl: serverUrl,
success: false,
error: 'HTTP ${response.statusCode}',
);
}

return BlobUploadResult(
serverUrl: serverUrl,
success: true,
descriptor: BlobDescriptor.fromJson(jsonDecode(response.body)),
);
} catch (e) {
return BlobUploadResult(
serverUrl: serverUrl,
success: false,
error: e.toString(),
);
}
}

@override
Future<Uint8List> getBlob(String sha256) async {
Exception? lastError;

for (final url in serverUrls) {
try {
final response = await client.get(
Uri.parse('$url/$sha256'),
);

if (response.statusCode == 200) {
return response.bodyBytes;
}
lastError = Exception('HTTP ${response.statusCode}');
} catch (e) {
lastError = e is Exception ? e : Exception(e.toString());
}
}

throw Exception(
'Failed to get blob from all servers. Last error: $lastError');
}

@override
Future<List<BlobDescriptor>> listBlobs(
String pubkey, {
DateTime? since,
DateTime? until,
}) async {
Exception? lastError;

for (final url in serverUrls) {
try {
final queryParams = <String, String>{
if (since != null) 'since': '${since.millisecondsSinceEpoch ~/ 1000}',
if (until != null) 'until': '${until.millisecondsSinceEpoch ~/ 1000}',
};

final response = await client.get(
Uri.parse('$url/list/$pubkey').replace(queryParameters: queryParams),
);

if (response.statusCode == 200) {
final List<dynamic> json = jsonDecode(response.body);
return json.map((j) => BlobDescriptor.fromJson(j)).toList();
}
lastError = Exception('HTTP ${response.statusCode}');
} catch (e) {
lastError = e is Exception ? e : Exception(e.toString());
}
}

throw Exception(
'Failed to list blobs from all servers. Last error: $lastError');
}

@override
Future<List<BlobDeleteResult>> deleteBlob(String sha256) async {
final results = await Future.wait(
serverUrls.map((url) => _deleteFromServer(url, sha256)));
return results;
}

Future<BlobDeleteResult> _deleteFromServer(
String serverUrl, String sha256) async {
try {
final response = await client.delete(
Uri.parse('$serverUrl/$sha256'),
);

return BlobDeleteResult(
serverUrl: serverUrl,
success: response.statusCode == 200,
error:
response.statusCode != 200 ? 'HTTP ${response.statusCode}' : null,
);
} catch (e) {
return BlobDeleteResult(
serverUrl: serverUrl,
success: false,
error: e.toString(),
);
}
}
}
81 changes: 81 additions & 0 deletions packages/ndk/lib/domain_layer/repositories/blossom.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import 'dart:typed_data';

enum UploadStrategy {
/// Upload to first server, then mirror to others
mirrorAfterSuccess,

/// Upload to all servers simultaneously
allSimultaneous,

/// Upload to first successful server only
firstSuccess
}

abstract class BlossomRepository {
/// Uploads a blob using the specified strategy
Future<List<BlobUploadResult>> uploadBlob(
Uint8List data, {
String? contentType,
UploadStrategy strategy = UploadStrategy.mirrorAfterSuccess,
});

/// Gets a blob by trying servers sequentially until success
Future<Uint8List> getBlob(String sha256);

/// Lists blobs from the first successful server
Future<List<BlobDescriptor>> listBlobs(String pubkey,
{DateTime? since, DateTime? until});

/// Attempts to delete blob from all servers
Future<List<BlobDeleteResult>> deleteBlob(String sha256);
}

class BlobDescriptor {
final String url;
final String sha256;
final int size;
final String? type;
final DateTime uploaded;

BlobDescriptor(
{required this.url,
required this.sha256,
required this.size,
this.type,
required this.uploaded});

factory BlobDescriptor.fromJson(Map<String, dynamic> json) {
return BlobDescriptor(
url: json['url'],
sha256: json['sha256'],
size: json['size'],
type: json['type'],
uploaded: DateTime.fromMillisecondsSinceEpoch(json['uploaded'] * 1000));
}
}

class BlobUploadResult {
final String serverUrl;
final bool success;
final BlobDescriptor? descriptor;
final String? error;

BlobUploadResult({
required this.serverUrl,
required this.success,
this.descriptor,
this.error,
});
}

class BlobDeleteResult {
final String serverUrl;
final bool success;
final String? error;

BlobDeleteResult({
required this.serverUrl,
required this.success,
this.error,
});
}
39 changes: 39 additions & 0 deletions packages/ndk/lib/domain_layer/usecases/files/files.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import 'dart:typed_data';

import '../../repositories/blossom.dart';

class Files {
final BlossomRepository repository;

Files(this.repository);

Future<List<BlobUploadResult>> uploadBlob({
required Uint8List data,
String? contentType,
UploadStrategy strategy = UploadStrategy.mirrorAfterSuccess,
}) {
return repository.uploadBlob(
data,
contentType: contentType,
strategy: strategy,
);
}

Future<Uint8List> getBlob(String sha256) {
return repository.getBlob(sha256);
}

Future<List<BlobDescriptor>> listBlobs(
String pubkey, {
DateTime? since,
DateTime? until,
}) {
return repository.listBlobs(pubkey, since: since, until: until);
}

// lib/domain/usecases/delete_blob_usecase.dart

Future<void> delteBlob(String sha256) {
return repository.deleteBlob(sha256);
}
}
Loading

0 comments on commit b91cee3

Please sign in to comment.