Skip to content

Commit

Permalink
Refactor createDataItem and add createDataBundle
Browse files Browse the repository at this point in the history
  • Loading branch information
karlprieb committed Aug 30, 2023
1 parent 3b14323 commit f653ed1
Show file tree
Hide file tree
Showing 11 changed files with 456 additions and 525 deletions.
16 changes: 14 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
# Files and directories created by pub
# Dart
.dart_tool/
.packages
build/
pubspec.lock

# IntelliJ
*.iml
*.ipr
*.iws
.idea/

# Mac
.DS_Store

# Direnv
.direnv

125 changes: 0 additions & 125 deletions lib/src/streams/bundle.dart

This file was deleted.

146 changes: 146 additions & 0 deletions lib/src/streams/data_bundle.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import 'dart:async';
import 'dart:typed_data';

import 'package:arweave/utils.dart';
import 'package:fpdart/fpdart.dart';

import '../models/models.dart' hide DataStreamGenerator;
import './utils.dart';
import 'data_item.dart';
import 'data_models.dart';
import 'errors.dart';

TaskEither<DataItemError, DataItemTaskEither> createBundledDataItemTaskEither({
required final Wallet wallet,
required DataBundleTaskEither dataBundleTaskEither,
final String target = '',
final String anchor = '',
final List<Tag> tags = const [],
}) {
return dataBundleTaskEither.flatMap((dataBundle) {
final dataBundleStream = dataBundle.stream;
final dataBundleSize = dataBundle.size;

final bundledDataItemTags = [
createTag('Bundle-Type', 'data'),
createTag('Bundle-Version', '2.0.0'),
...tags,
];

return TaskEither.of(createDataItemTaskEither(
wallet: wallet,
dataStream: dataBundleStream,
dataSize: dataBundleSize,
target: target,
anchor: anchor,
tags: bundledDataItemTags,
));
});
}

DataBundleTaskEither createDataBundleTaskEither(
final List<DataItemTaskEither> dataItemsTaskEither,
) {
return dataItemsTaskEither.sequenceTaskEitherSeq().flatMap((dataItems) {
final dataItemsLength = dataItems.length;
final headers = Uint8List(dataItemsLength * 64);
int dataItemsSize = 0;

for (var i = 0; i < dataItemsLength; i++) {
final dataItem = dataItems[i];
final id = decodeBase64ToBytes(dataItem.id);
final dataItemLength = dataItem.size;

dataItemsSize += dataItemLength;

final header = Uint8List(64);

// Set offset
header.setAll(0, longTo32ByteArray(dataItemLength));

// Set id
header.setAll(32, id);

// Add header to array of headers
headers.setAll(64 * i, header);
}

final bundleHeaders = [
...longTo32ByteArray(dataItemsLength),
...headers,
];

final bundleGenerator = combineStreamAndFunctionList(
Stream.fromIterable([bundleHeaders]),
dataItems.map((dataItem) => dataItem.stream).toList());

return TaskEither.of(DataBundleResult(
size: bundleHeaders.length + dataItemsSize,
stream: bundleGenerator,
));
});
}

// Will be refactored to use TaskEither

// Future<List<Map<String, dynamic>>> processBundle(
// Future<Stream<List<int>>> Function() streamGenerator,
// ) async {
// print('processBundle');
// final stream = await streamGenerator();
// final reader = ChunkedStreamReader(stream);
//
// final List<Map<String, dynamic>> items;
//
// int byteIndex = 0;
// try {
// // set numberOfDataItems
// final numberOfDataItemsBytes = await reader.readBytes(32);
// final numberOfDataItems = decodeBytesToLong(numberOfDataItemsBytes);
// print('numberOfDataItems: $numberOfDataItemsBytes');
// byteIndex += 32;
//
// // set headers
// final headersBytesLength = numberOfDataItems * 64;
// List<int> headersBytes = await reader.readChunk(headersBytesLength);
// final List<(int, String)> headers = List.filled(numberOfDataItems, (0, ""));
// items = List.filled(numberOfDataItems, {});
//
// for (var i = 0; i < headersBytesLength; i += 64) {
// final id =
// toBase64Url(base64Encode(headersBytes.sublist(i + 32, i + 64)));
//
// headers[i ~/ 64] = (
// decodeBytesToLong(Uint8List.fromList(headersBytes.sublist(i, i + 32))),
// id,
// );
// items[i ~/ 64] = {'id': id};
// }
// byteIndex += headersBytesLength;
//
// for (var i = 0; i < items.length; i++) {
// final item = items[i];
// final itemBytesLength = headers[i].$1;
//
// final dataItemInfo = await processDataItem(
// reader.readStream(itemBytesLength), item['id'], itemBytesLength);
//
// final int dataLength = dataItemInfo['dataLength'];
// final start = byteIndex + itemBytesLength - dataLength;
// final end = start + dataLength;
//
// item['signature'] = dataItemInfo['signature'];
// item['owner'] = dataItemInfo['owner'];
// item['target'] = dataItemInfo['target'];
// item['anchor'] = dataItemInfo['anchor'];
// item['tags'] = dataItemInfo['tags'];
// item['data'] = byteRangeStream(await streamGenerator(), start, end);
//
// byteIndex += itemBytesLength;
// }
// } finally {
// reader.cancel();
// }
//
// return items;
// }
Loading

0 comments on commit f653ed1

Please sign in to comment.