Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MqttAsyncClient sometimes blocking indefinitely on publish #620

Open
1 of 2 tasks
tmbull opened this issue Mar 1, 2024 · 3 comments
Open
1 of 2 tasks

MqttAsyncClient sometimes blocking indefinitely on publish #620

tmbull opened this issue Mar 1, 2024 · 3 comments
Labels

Comments

@tmbull
Copy link

tmbull commented Mar 1, 2024

Checklist

❓ Question

Hi all, I have a Vertx application that consumes messages from Kafka and publishes messages to MQTT. Frequently, I see threads get stuck in the "WAITING" state while publishing messages out. I looked through prior issues, and it seems that there are several cases where this can occur if the client is not connected before publishing, or the client gets disconnected before the publish is ack'ed. I do not believe that is the case here, as I do not see any messages indicating that the client has disconnected.

I do not believe this is a bug, as we have several other applications using the HiveMQ MqttClient to publish messages and we never encounter this issue. I have inspected the code, but to be honest, I am not very familiar with RxJava. I plan to dig into that next, but I was wondering if there are any other obvious cases or race conditions I look out for. Thank you for your time.

📎 Additional context

I would like to provide a sample project, but this is a proprietary code base and, as of yet, I have been unable to reproduce this code in a "toy" project. However, here is a sample stack trace:

"vert.x-eventloop-thread-0" #21 prio=5 os_prio=0 cpu=84.25ms elapsed=135.84s tid=0x00007fdac4601000 nid=0x1b in Object.wait()  [0x00007fdaad038000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(java.base@11.0.16/Native Method)
	- waiting on <0x00000006951d1a00> (a com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttPublishFlowables)
	at java.lang.Object.wait(java.base@11.0.16/Unknown Source)
	at com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttPublishFlowables.add(MqttPublishFlowables.java:53)
	- waiting to re-lock in wait() <0x00000006951d1a00> (a com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttPublishFlowables)
	at com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttAckSingle.subscribeActual(MqttAckSingle.java:56)
	at io.reactivex.Single.subscribe(Single.java:3666)
	at io.reactivex.internal.operators.single.SingleObserveOn.subscribeActual(SingleObserveOn.java:35)
	at io.reactivex.Single.subscribe(Single.java:3666)
	at com.hivemq.client.internal.rx.RxFutureConverter$RxSingleFuture.<init>(RxFutureConverter.java:113)
	at com.hivemq.client.internal.rx.RxFutureConverter.toFuture(RxFutureConverter.java:43)
	at com.hivemq.client.internal.mqtt.MqttAsyncClient.publish(MqttAsyncClient.java:243)
	at com.my.code.MqttClient.publishMessageWithRetryInternal(MqttClient.java:258)
	at com.avalara.edge.common.mqtt.MqttClient$$Lambda$1608/0x0000000840939440.handle(Unknown Source)
	at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:948)
	at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:919)
	at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:55)
	at io.vertx.core.impl.ContextBase.emit(ContextBase.java:297)
	at io.vertx.core.impl.ContextInternal.emit(ContextInternal.java:207)
	at io.vertx.core.impl.VertxImpl$InternalTimerHandler.run(VertxImpl.java:937)
	at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
	at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:153)
	at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(java.base@11.0.16/Unknown Source)
@tmbull tmbull added the question label Mar 1, 2024
@Mystery406
Copy link

Same as #554

@code2life-crypto
Copy link

@tmbull @Mystery406 Hi, We also encountered the same problem, do you have any solution? Thank you so much

Same as #554, but we can not found any solution for this

@code2life-crypto
Copy link

@antpaw @Mystery406 @tmbull @grawinkel @dajudge Hi, we are facing the same issue, and we check the client state before each send. Here is the code for publish message:
ts20241012-1152-2

Under most circumstances, it runs fine in prod env. This is an intermittent issue that has occurred twice in the past six months, where the thread gets locked and cannot be awakened.

here is a sample stack trace:
".client.pool-2-2" #157 [111] prio=5 os_prio=0 cpu=3198.07ms elapsed=100922.67s tid=0x00007fd7fbfac520 nid=111 in Object.wait() [0x00007fd77e3f1000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait0(java.base@21.0.2/Native Method) - waiting on <no object reference available> at java.lang.Object.wait(java.base@21.0.2/Object.java:366) at java.lang.Object.wait(java.base@21.0.2/Object.java:339) at com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttPublishFlowables.add(MqttPublishFlowables.java:53) - locked <0x00000007220c0a30> (a com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttPublishFlowables) at com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttAckSingle.subscribeActual(MqttAckSingle.java:56) at io.reactivex.Single.subscribe(Single.java:3666) at io.reactivex.internal.operators.single.SingleObserveOn.subscribeActual(SingleObserveOn.java:35) at io.reactivex.Single.subscribe(Single.java:3666) at com.hivemq.client.internal.rx.RxFutureConverter$RxSingleFuture.<init>(RxFutureConverter.java:113) at com.hivemq.client.internal.rx.RxFutureConverter.toFuture(RxFutureConverter.java:43) at com.hivemq.client.internal.mqtt.MqttAsyncClient.publish(MqttAsyncClient.java:243)

here is "com.hivemq.client.mqtt" stack trace:
`"com.hivemq.client.mqtt-7-1" #176 [121] prio=10 os_prio=0 cpu=998.73ms elapsed=100918.76s tid=0x00007fd65c2a5220 nid=121 runnable [0x00007fd77dceb000]
java.lang.Thread.State: RUNNABLE
at io.netty.channel.epoll.Native.epollWait0(Native Method)
at io.netty.channel.epoll.Native.epollWait(Native.java:193)
at io.netty.channel.epoll.EpollEventLoop.epollWait(EpollEventLoop.java:304)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:368)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.runWith(java.base@21.0.2/Thread.java:1596)
at java.lang.Thread.run(java.base@21.0.2/Thread.java:1583)

"com.hivemq.client.mqtt-7-2" #175 [122] prio=10 os_prio=0 cpu=1038.93ms elapsed=100918.76s tid=0x00007fd674412360 nid=122 runnable [0x00007fd77dbea000]
java.lang.Thread.State: RUNNABLE
at io.netty.channel.epoll.Native.epollWait0(Native Method)
at io.netty.channel.epoll.Native.epollWait(Native.java:193)
at io.netty.channel.epoll.EpollEventLoop.epollWait(EpollEventLoop.java:304)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:368)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.runWith(java.base@21.0.2/Thread.java:1596)
at java.lang.Thread.run(java.base@21.0.2/Thread.java:1583)`

  **### Details**
  Affected HiveMQ MQTT Client version(s): 1.3.0
  Used JVM version: Oracle JDK 21+
  Used OS (name and version): deploy on AWS cloud Linux Debian AArch64
  Used MQTT version: N/A
  Used MQTT broker (name and version): N/A

We can't reproduce the issue in our local env, even after trying with breakpoints. We suspect that HiveMQ's thread pool doesn't always call the notifyAll method in certain cases, but we're unsure of the cause.
ts20241012-1210-2

Could you please help investigate this issue?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants