diff --git a/examples/multithread/multithread.c b/examples/multithread/multithread.c index b00365b95..983f9fc14 100755 --- a/examples/multithread/multithread.c +++ b/examples/multithread/multithread.c @@ -109,7 +109,7 @@ static int check_response(MQTTCtx* mqttCtx, int rc, word32* startSec, /* check for test mode */ if (mqtt_stop_get()) { PRINTF("MQTT Exiting Thread..."); - return MQTT_CODE_SUCCESS; + return MQTT_CODE_ERROR_SYSTEM; } #ifdef WOLFMQTT_NONBLOCK @@ -387,6 +387,11 @@ static int multithread_test_finish(MQTTCtx *mqttCtx) PRINTF("MQTT Client Done: %d", mqttCtx->return_code); + if (mStopRead && mqttCtx->return_code == MQTT_CODE_ERROR_SYSTEM) { + /* this is okay, we requested termination */ + mqttCtx->return_code = MQTT_CODE_SUCCESS; + } + return mqttCtx->return_code; } @@ -507,12 +512,17 @@ static void *waitMessage_task(void *param) } /* Try and read packet */ - rc = MqttClient_WaitMessage(&mqttCtx->client, cmd_timeout_ms); + rc = MqttClient_WaitMessage_ex(&mqttCtx->client, &mqttCtx->client.msg, + cmd_timeout_ms); if (mqttCtx->test_mode && rc == MQTT_CODE_ERROR_TIMEOUT) { rc = 0; } rc = check_response(mqttCtx, rc, &startSec, MQTT_PACKET_TYPE_ANY, cmd_timeout_ms); + if (rc != MQTT_CODE_SUCCESS && rc != MQTT_CODE_CONTINUE) { + MqttClient_CancelMessage(&mqttCtx->client, + (MqttObject*)&mqttCtx->client.msg); + } /* check return code */ if (rc == MQTT_CODE_CONTINUE) { @@ -540,6 +550,10 @@ static void *waitMessage_task(void *param) rc = MqttClient_Publish(&mqttCtx->client, &mqttCtx->publish); } while (rc == MQTT_CODE_CONTINUE); + if (rc != MQTT_CODE_SUCCESS) { + MqttClient_CancelMessage(&mqttCtx->client, + (MqttObject*)&mqttCtx->publish); + } PRINTF("MQTT Publish: Topic %s, %s (%d)", mqttCtx->publish.topic_name, MqttClient_ReturnCodeToString(rc), rc); diff --git a/src/mqtt_socket.c b/src/mqtt_socket.c index f8888d254..86ae1822b 100644 --- a/src/mqtt_socket.c +++ b/src/mqtt_socket.c @@ -41,6 +41,12 @@ #ifdef WOLFMQTT_NO_STDIO #undef WOLFMQTT_DEBUG_SOCKET #endif + +/* #define WOLFMQTT_TEST_NONBLOCK */ +#ifdef WOLFMQTT_TEST_NONBLOCK + #define WOLFMQTT_TEST_NONBLOCK_TIMES 1 +#endif + /* lwip */ #ifdef WOLFSSL_LWIP #undef read @@ -125,8 +131,8 @@ static int MqttSocket_WriteDo(MqttClient *client, const byte* buf, int buf_len, #if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK) static int testNbWriteAlt = 0; - if (!testNbWriteAlt) { - testNbWriteAlt = 1; + if (testNbWriteAlt < WOLFMQTT_TEST_NONBLOCK_TIMES) { + testNbWriteAlt++; return MQTT_CODE_CONTINUE; } testNbWriteAlt = 0; @@ -247,8 +253,8 @@ static int MqttSocket_ReadDo(MqttClient *client, byte* buf, int buf_len, #if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK) static int testNbReadAlt = 0; - if (!testNbReadAlt) { - testNbReadAlt = 1; + if (testNbReadAlt < WOLFMQTT_TEST_NONBLOCK_TIMES) { + testNbReadAlt++; return MQTT_CODE_CONTINUE; } testNbReadAlt = 0;