From 66c4f97e910cbe661b47edaf4ff0e0e597516aab Mon Sep 17 00:00:00 2001 From: wirg_mo Date: Mon, 11 Oct 2021 15:17:08 +0200 Subject: [PATCH] Fix heap corruption due to wrong usage of malloc --- CHANGELOG.md | 14 ++++ example/pubspec.lock | 12 ++-- example/pubspec.yaml | 2 +- lib/src/bindings.dart | 19 +++++ lib/src/constants.dart | 2 + lib/src/zeromq.dart | 158 ++++++++++++++++++++++------------------- pubspec.yaml | 2 +- 7 files changed, 128 insertions(+), 81 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 481f31e..fcfd10d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,17 @@ +## 1.0.0-dev.4 + +### Fix heap corruption due to wrong usage of `malloc.allocate` +- Use periodic timer to poll sockets every second +- Poll all messages on socket instead of one for each event to not loose messages +- Reuse zeromq message pointer +- Improve return code handling +- Rename `_isActive` of `ZContext` to `_shutdown` +- Rename `_handle` and `_zmq` of `ZSocket` to `_socket` and `_context` +- Add stream for `ZFrames` to `ZSocket` +- Always show error code in `ZeroMQException` +- Fix pubspec of example + + ## 1.0.0-dev.3 ### Add example, subscriptions for `sub` sockets and code cleanup diff --git a/example/pubspec.lock b/example/pubspec.lock index 9441d00..87f6f42 100644 --- a/example/pubspec.lock +++ b/example/pubspec.lock @@ -7,7 +7,7 @@ packages: name: async url: "https://pub.dartlang.org" source: hosted - version: "2.8.2" + version: "2.8.1" boolean_selector: dependency: transitive description: @@ -21,7 +21,7 @@ packages: name: characters url: "https://pub.dartlang.org" source: hosted - version: "1.2.0" + version: "1.1.0" charcode: dependency: transitive description: @@ -56,7 +56,7 @@ packages: path: ".." relative: true source: path - version: "1.0.0-dev.3" + version: "1.0.0-dev.4" fake_async: dependency: transitive description: @@ -101,7 +101,7 @@ packages: name: matcher url: "https://pub.dartlang.org" source: hosted - version: "0.12.11" + version: "0.12.10" meta: dependency: transitive description: @@ -162,7 +162,7 @@ packages: name: test_api url: "https://pub.dartlang.org" source: hosted - version: "0.4.3" + version: "0.4.2" typed_data: dependency: transitive description: @@ -178,5 +178,5 @@ packages: source: hosted version: "2.1.0" sdks: - dart: ">=2.15.0-116.0.dev <3.0.0" + dart: ">=2.14.0 <3.0.0" flutter: ">=1.17.0" diff --git a/example/pubspec.yaml b/example/pubspec.yaml index 48c39ef..b96fe99 100644 --- a/example/pubspec.yaml +++ b/example/pubspec.yaml @@ -18,7 +18,7 @@ publish_to: 'none' # Remove this line if you wish to publish to pub.dev version: 1.0.0+1 environment: - sdk: ">=2.15.0-116.0.dev <3.0.0" + sdk: '>=2.14.0 <3.0.0' # Dependencies specify other packages that your package needs in order to work. # To automatically upgrade your package dependencies to the latest versions diff --git a/lib/src/bindings.dart b/lib/src/bindings.dart index 6b18a1b..714be09 100644 --- a/lib/src/bindings.dart +++ b/lib/src/bindings.dart @@ -45,6 +45,11 @@ typedef zmq_poller_remove_native = Int32 Function( typedef zmq_poller_remove_dart = int Function( ZMQPoller poller, ZMQSocket socket); +typedef zmq_poll_native = Int32 Function( + Pointer items, Int32 nitems, Int64 timeout); +typedef zmq_poll_dart = int Function( + Pointer items, int nitems, int timeout); + typedef zmq_poller_wait_all_native = Int32 Function(ZMQPoller poller, Pointer events, Int32 count, Int64 timeout); typedef zmq_poller_wait_all_dart = int Function( @@ -109,6 +114,17 @@ class ZMQPollerEvent extends Struct { external int events; } +class ZMQPollItem extends Struct { + external ZMQSocket socket; + @Int32() + external int fd; + + @Int16() + external int events; + @Int16() + external int revents; +} + class ZMQBindings { final DynamicLibrary library; @@ -127,6 +143,7 @@ class ZMQBindings { late final zmq_poller_destroy_dart zmq_poller_destroy; late final zmq_poller_add_dart zmq_poller_add; late final zmq_poller_remove_dart zmq_poller_remove; + late final zmq_poll_dart zmq_poll; late final zmq_poller_wait_all_dart zmq_poller_wait_all; late final zmq_msg_init_dart zmq_msg_init; @@ -167,6 +184,8 @@ class ZMQBindings { 'zmq_poller_add'); zmq_poller_remove = library.lookupFunction('zmq_poller_remove'); + zmq_poll = + library.lookupFunction('zmq_poll'); zmq_poller_wait_all = library.lookupFunction('zmq_poller_wait_all'); diff --git a/lib/src/constants.dart b/lib/src/constants.dart index be63983..0327785 100644 --- a/lib/src/constants.dart +++ b/lib/src/constants.dart @@ -91,3 +91,5 @@ const int ENOTCONN = _errorBase + 15; const int ETIMEDOUT = _errorBase + 16; const int EHOSTUNREACH = _errorBase + 17; const int ENETRESET = _errorBase + 18; + +const int EAGAIN = 11; diff --git a/lib/src/zeromq.dart b/lib/src/zeromq.dart index 4cfb19a..a738804 100644 --- a/lib/src/zeromq.dart +++ b/lib/src/zeromq.dart @@ -35,8 +35,8 @@ class ZContext { late final ZMQContext _context; late final ZMQPoller _poller; - bool _isActive = true; - bool _pollingMicrotaskScheduled = false; + bool _shutdown = false; + Timer? _timer; final Map _createdSockets = {}; final List _listening = []; @@ -58,7 +58,7 @@ class ZContext { } Future stop() { - _isActive = false; + _shutdown = true; _stopCompleter = Completer(); return _stopCompleter!.future; } @@ -68,63 +68,71 @@ class ZContext { } void _startPolling() { - if (!_pollingMicrotaskScheduled && _listening.isNotEmpty) { - _pollingMicrotaskScheduled = true; - scheduleMicrotask(_poll); + if (_timer == null && _listening.isNotEmpty) { + _timer = Timer.periodic(const Duration(seconds: 1), (timer) => _poll()); } } void _poll() { - final listeners = _listening.length; - final events = malloc.allocate(listeners); - final readEvents = - _bindings.zmq_poller_wait_all(_poller, events, listeners, 0); - - final msg = _allocateMessage(); - for (var i = 0; i < readEvents; i++) { - final event = events[i]; - final socket = _createdSockets[event.socket]!; - - // Receive multiple message parts - final zMessage = ZMessage(); - while (true) { - var rc = _bindings.zmq_msg_init(msg); - _checkSuccess(rc); - - rc = _bindings.zmq_msg_recv(msg, socket._handle, 0); - _checkSuccess(rc, positiveIsSuccess: true); - - final size = _bindings.zmq_msg_size(msg); - final data = _bindings.zmq_msg_data(msg).cast(); + final socketCount = _listening.length; + + final pollerEvents = + malloc.allocate(sizeOf() * socketCount); + final availableEventCount = + _bindings.zmq_poller_wait_all(_poller, pollerEvents, socketCount, 0); + + if (availableEventCount > 0) { + final msg = _allocateMessage(); + var rc = _bindings.zmq_msg_init(msg); // rc == 0 + _checkReturnCode(rc); + + for (var eventIdx = 0; eventIdx < availableEventCount; ++eventIdx) { + final pollerEvent = pollerEvents[eventIdx]; + final socket = _createdSockets[pollerEvent.socket]!; + + // Receive multiple message parts + ZMessage zMessage = ZMessage(); + bool hasMore = true; + while ((rc = + _bindings.zmq_msg_recv(msg, socket._socket, ZMQ_DONTWAIT)) > + 0) { + // final size = _bindings.zmq_msg_size(msg); + final data = _bindings.zmq_msg_data(msg).cast(); + + final copyOfData = Uint8List.fromList(data.asTypedList(rc)); + hasMore = _bindings.zmq_msg_more(msg) != 0; + + zMessage.add(ZFrame(copyOfData, hasMore: hasMore)); + + if (!hasMore) { + socket._controller.add(zMessage); + zMessage = ZMessage(); + } + } + + _checkReturnCode(rc, ignore: [EAGAIN]); + } - final copyOfData = Uint8List.fromList(data.asTypedList(size)); - final hasNext = _bindings.zmq_msg_more(msg) != 0; + rc = _bindings.zmq_msg_close(msg); // rc == 0 + _checkReturnCode(rc); - zMessage.add(ZFrame(copyOfData, hasMore: hasNext)); - if (!hasNext) break; - } - // TODO need to check if zMessage.isEmpty ? - socket._controller.add(zMessage); - _bindings.zmq_msg_close(msg); + malloc.free(msg); } - malloc.free(msg); - malloc.free(events); + malloc.free(pollerEvents); // After the polling iteration, re-schedule another one if necessary. - if (_isActive) { - if (_listening.isNotEmpty) { - // NOT using scheduleMicrotask because it blocks up the queue - Timer.run(_poll); - return; - } - } else { + if (_shutdown) { _shutdownInternal(); _stopCompleter?.complete(null); + } else if (socketCount > 0) { + return; } + // no polling necessary, reset flag so that the next call to _startPolling // will bring the mechanism back up. - _pollingMicrotaskScheduled = false; + _timer?.cancel(); + _timer = null; } ZSocket createSocket(SocketMode mode) { @@ -135,19 +143,19 @@ class ZContext { } void _listen(ZSocket socket) { - _bindings.zmq_poller_add(_poller, socket._handle, nullptr, ZMQ_POLLIN); + _bindings.zmq_poller_add(_poller, socket._socket, nullptr, ZMQ_POLLIN); _listening.add(socket); _startPolling(); } void _stopListening(ZSocket socket) { - _bindings.zmq_poller_remove(_poller, socket._handle); + _bindings.zmq_poller_remove(_poller, socket._socket); _listening.remove(socket); } void _handleSocketClosed(ZSocket socket) { - if (_isActive) { - _createdSockets.remove(socket._handle); + if (!_shutdown) { + _createdSockets.remove(socket._socket); } if (_listening.contains(socket)) { _stopListening(socket); @@ -169,11 +177,15 @@ class ZContext { malloc.free(_poller); } - void _checkSuccess(int statusCode, {bool positiveIsSuccess = false}) { - final isFailure = positiveIsSuccess ? statusCode < 0 : statusCode != 0; + void _checkReturnCode(int code, {List ignore = const []}) { + if (code < 0) { + _checkErrorCode(ignore: ignore); + } + } - if (isFailure) { - final errorCode = _bindings.zmq_errno(); + void _checkErrorCode({List ignore = const []}) { + final errorCode = _bindings.zmq_errno(); + if (!ignore.contains(errorCode)) { throw ZeroMQException(errorCode); } } @@ -221,7 +233,7 @@ class ZMessage implements Queue { Iterator get iterator => _frames.iterator; @override - void add(ZFrame value) => _frames.add; + void add(ZFrame value) => _frames.add(value); @override void addAll(Iterable iterable) => _frames.addAll(iterable); @@ -356,21 +368,21 @@ class ZMessage implements Queue { } class ZSocket { - final ZMQSocket _handle; - final ZContext _zmq; + final ZMQSocket _socket; + final ZContext _context; bool _closed = false; late final StreamController _controller; Stream get messages => _controller.stream; - Stream get payloads => - messages.expand((element) => element._frames.map((e) => e.payload)); + Stream get frames => messages.expand((element) => element._frames); + Stream get payloads => frames.map((e) => e.payload); - ZSocket(this._handle, this._zmq) { + ZSocket(this._socket, this._context) { _controller = StreamController(onListen: () { - _zmq._listen(this); + _context._listen(this); }, onCancel: () { - _zmq._stopListening(this); + _context._stopListening(this); }); } @@ -385,32 +397,32 @@ class ZSocket { ptr.asTypedList(data.length).setAll(0, data); final sendParams = more ? ZMQ_SNDMORE : 0; - final result = - _zmq._bindings.zmq_send(_handle, ptr.cast(), data.length, sendParams); - _zmq._checkSuccess(result, positiveIsSuccess: true); + final result = _context._bindings + .zmq_send(_socket, ptr.cast(), data.length, sendParams); + _context._checkReturnCode(result); malloc.free(ptr); } void bind(String address) { _checkNotClosed(); final endpointPointer = address.toNativeUtf8(); - final result = _zmq._bindings.zmq_bind(_handle, endpointPointer); - _zmq._checkSuccess(result); + final result = _context._bindings.zmq_bind(_socket, endpointPointer); + _context._checkReturnCode(result); malloc.free(endpointPointer); } void connect(String address) { _checkNotClosed(); final endpointPointer = address.toNativeUtf8(); - final result = _zmq._bindings.zmq_connect(_handle, endpointPointer); - _zmq._checkSuccess(result); + final result = _context._bindings.zmq_connect(_socket, endpointPointer); + _context._checkReturnCode(result); malloc.free(endpointPointer); } void close() { if (!_closed) { - _zmq._handleSocketClosed(this); - _zmq._bindings.zmq_close(_handle); + _context._handleSocketClosed(this); + _context._bindings.zmq_close(_socket); _controller.close(); _closed = true; } @@ -418,8 +430,8 @@ class ZSocket { void setOption(int option, String value) { final ptr = value.toNativeUtf8(); - _zmq._bindings - .zmq_setsockopt(_handle, option, ptr.cast(), ptr.length); + _context._bindings + .zmq_setsockopt(_socket, option, ptr.cast(), ptr.length); malloc.free(ptr); } @@ -474,7 +486,7 @@ class ZeroMQException implements Exception { if (msg == null) { return 'ZeroMQException($errorCode)'; } else { - return 'ZeroMQException: $msg'; + return 'ZeroMQException($errorCode): $msg'; } } diff --git a/pubspec.yaml b/pubspec.yaml index 0f95695..26ae629 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,6 +1,6 @@ name: dartzmq description: A simple dart zeromq implementation/wrapper around the libzmq C++ library -version: 1.0.0-dev.3 +version: 1.0.0-dev.4 homepage: https://github.com/enwi/dartzmq environment: