From b9ce74b784cebc01b7bc2de5bbf24e0a0eb91ef8 Mon Sep 17 00:00:00 2001 From: wirg_mo Date: Fri, 8 Oct 2021 09:19:14 +0200 Subject: [PATCH] Add support for multipart messages --- CHANGELOG.md | 9 +++ lib/src/constants.dart | 1 + lib/src/zeromq.dart | 171 +++++++++++++++++++++++++++++++++++++++-- pubspec.yaml | 6 +- 4 files changed, 178 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ac5594b..67ec95f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,12 @@ +## 1.0.0-dev.2 + +### Add support for multipart messages +- Rename `Message` to `ZFrame` +- Add `ZMessage` as a queue of `ZFrame`'s +- Receive messages as `ZMessage` instead of `Message`(`ZFrame`) +- Reduce minimum SDK version to `2.13.0` + + ## 1.0.0-dev.1 ### Add crude implementation of libzmq diff --git a/lib/src/constants.dart b/lib/src/constants.dart index 03a9a6b..fa28369 100644 --- a/lib/src/constants.dart +++ b/lib/src/constants.dart @@ -21,6 +21,7 @@ const int ZMQ_POLLOUT = 2; const int ZMQ_POLLERR = 4; const int ZMQ_POLLPRI = 8; +const int ZMQ_DONTWAIT = 1; const int ZMQ_SNDMORE = 2; const int ZMQ_CURVE_PUBLICKEY = 48; diff --git a/lib/src/zeromq.dart b/lib/src/zeromq.dart index 3cbd123..e532e62 100644 --- a/lib/src/zeromq.dart +++ b/lib/src/zeromq.dart @@ -1,6 +1,7 @@ library dartzmq; import 'dart:async'; +import 'dart:collection'; import 'dart:ffi'; import 'dart:io'; import 'dart:typed_data'; @@ -85,6 +86,7 @@ class ZeroMQ { final socket = _createdSockets[event.socket]!; // Receive multiple message parts + final zMessage = ZMessage(); while (true) { var rc = _bindings.zmq_msg_init(msg); _checkSuccess(rc); @@ -98,9 +100,11 @@ class ZeroMQ { final copyOfData = Uint8List.fromList(data.asTypedList(size)); final hasNext = _bindings.zmq_msg_more(msg) != 0; - socket._controller.add(Message(copyOfData, hasMore: hasNext)); + zMessage.add(ZFrame(copyOfData, hasMore: hasNext)); if (!hasNext) break; } + // TODO need to check if zMessage.isEmpty ? + socket._controller.add(zMessage); _bindings.zmq_msg_close(msg); } @@ -190,11 +194,165 @@ enum SocketMode { stream } -class Message { +/// ZFrame +/// +/// A 'frame' corresponds to one underlying zmq_msg_t in the libzmq code. +/// When you read a frame from a socket, the [hasMore] member indicates +/// if the frame is part of an unfinished multipart message. +class ZFrame { + /// The payload that was received or is to be sent final Uint8List payload; + + /// Is this frame part of an unfinished multipart message? final bool hasMore; - Message(this.payload, {this.hasMore = false}); + ZFrame(this.payload, {this.hasMore = false}); +} + +/// ZMessage +/// +/// This class provides a list-like container interface, +/// with methods to work with the overall container. ZMessage messages are +/// composed of zero or more ZFrame objects. +class ZMessage implements Queue { + final DoubleLinkedQueue _frames = DoubleLinkedQueue(); + + @override + Iterator get iterator => _frames.iterator; + + @override + void add(ZFrame value) => _frames.add; + + @override + void addAll(Iterable iterable) => _frames.addAll(iterable); + + @override + void addFirst(ZFrame value) => _frames.addFirst(value); + + @override + void addLast(ZFrame value) => _frames.addLast(value); + + @override + void clear() => _frames.clear(); + + @override + bool remove(Object? value) => _frames.remove(value); + + @override + ZFrame removeFirst() => _frames.removeFirst(); + + @override + ZFrame removeLast() => _frames.removeLast(); + + @override + void removeWhere(bool Function(ZFrame element) test) => + _frames.removeWhere(test); + + @override + void retainWhere(bool Function(ZFrame element) test) => + _frames.retainWhere(test); + + @override + bool any(bool Function(ZFrame element) test) => _frames.any(test); + + @override + Queue cast() => _frames.cast(); + + @override + bool contains(Object? element) => _frames.contains(element); + + @override + ZFrame elementAt(int index) => _frames.elementAt(index); + + @override + bool every(bool Function(ZFrame element) test) => _frames.every(test); + + @override + Iterable expand(Iterable Function(ZFrame element) toElements) => + _frames.expand(toElements); + + @override + ZFrame get first => _frames.first; + + @override + ZFrame firstWhere(bool Function(ZFrame element) test, + {ZFrame Function()? orElse}) => + _frames.firstWhere(test, orElse: orElse); + + @override + T fold(T initialValue, + T Function(T previousValue, ZFrame element) combine) => + _frames.fold(initialValue, combine); + + @override + Iterable followedBy(Iterable other) => + _frames.followedBy(other); + + @override + void forEach(void Function(ZFrame element) action) => _frames.forEach(action); + + @override + bool get isEmpty => _frames.isEmpty; + + @override + bool get isNotEmpty => _frames.isNotEmpty; + + @override + String join([String separator = ""]) => _frames.join(separator); + + @override + ZFrame get last => _frames.last; + + @override + ZFrame lastWhere(bool Function(ZFrame element) test, + {ZFrame Function()? orElse}) => + _frames.lastWhere(test, orElse: orElse); + + @override + int get length => _frames.length; + + @override + Iterable map(T Function(ZFrame e) toElement) => _frames.map(toElement); + + @override + ZFrame reduce(ZFrame Function(ZFrame value, ZFrame element) combine) => + _frames.reduce(combine); + + @override + ZFrame get single => _frames.single; + + @override + ZFrame singleWhere(bool Function(ZFrame element) test, + {ZFrame Function()? orElse}) => + _frames.singleWhere(test, orElse: orElse); + + @override + Iterable skip(int count) => _frames.skip(count); + + @override + Iterable skipWhile(bool Function(ZFrame value) test) => + _frames.skipWhile(test); + + @override + Iterable take(int count) => _frames.take(count); + + @override + Iterable takeWhile(bool Function(ZFrame value) test) => + _frames.takeWhile(test); + + @override + List toList({bool growable = true}) => + _frames.toList(growable: growable); + + @override + Set toSet() => _frames.toSet(); + + @override + Iterable where(bool Function(ZFrame element) test) => + _frames.where(test); + + @override + Iterable whereType() => _frames.whereType(); } class ZmqSocket { @@ -203,9 +361,10 @@ class ZmqSocket { bool _closed = false; - late final StreamController _controller; - Stream get messages => _controller.stream; - Stream get payloads => messages.map((m) => m.payload); + late final StreamController _controller; + Stream get messages => _controller.stream; + Stream get payloads => + messages.expand((element) => element._frames.map((e) => e.payload)); ZmqSocket(this._handle, this._zmq) { _controller = StreamController(onListen: () { diff --git a/pubspec.yaml b/pubspec.yaml index 18de1bf..2773b79 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,10 +1,10 @@ name: dartzmq -description: A wrapper for libzmq -version: 1.0.0-dev.1 +description: A simple dart zeromq implementation/wrapper around the libzmq C++ library +version: 1.0.0-dev.2 homepage: https://github.com/enwi/dartzmq environment: - sdk: ">=2.15.0-116.0.dev <3.0.0" + sdk: ">=2.13.0 <3.0.0" flutter: ">=1.17.0" dependencies: