Skip to content

Commit

Permalink
Improvements to multithread example and tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
dgarske committed Nov 1, 2023
1 parent a45ddaf commit 5dbd4ee
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 6 deletions.
18 changes: 16 additions & 2 deletions examples/multithread/multithread.c
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ static int check_response(MQTTCtx* mqttCtx, int rc, word32* startSec,
/* check for test mode */
if (mqtt_stop_get()) {
PRINTF("MQTT Exiting Thread...");
return MQTT_CODE_SUCCESS;
return MQTT_CODE_ERROR_SYSTEM;
}

#ifdef WOLFMQTT_NONBLOCK
Expand Down Expand Up @@ -387,6 +387,11 @@ static int multithread_test_finish(MQTTCtx *mqttCtx)

PRINTF("MQTT Client Done: %d", mqttCtx->return_code);

if (mStopRead && mqttCtx->return_code == MQTT_CODE_ERROR_SYSTEM) {
/* this is okay, we requested termination */
mqttCtx->return_code = MQTT_CODE_SUCCESS;
}

return mqttCtx->return_code;
}

Expand Down Expand Up @@ -507,12 +512,17 @@ static void *waitMessage_task(void *param)
}

/* Try and read packet */
rc = MqttClient_WaitMessage(&mqttCtx->client, cmd_timeout_ms);
rc = MqttClient_WaitMessage_ex(&mqttCtx->client, &mqttCtx->client.msg,
cmd_timeout_ms);
if (mqttCtx->test_mode && rc == MQTT_CODE_ERROR_TIMEOUT) {
rc = 0;
}
rc = check_response(mqttCtx, rc, &startSec, MQTT_PACKET_TYPE_ANY,
cmd_timeout_ms);
if (rc != MQTT_CODE_SUCCESS && rc != MQTT_CODE_CONTINUE) {
MqttClient_CancelMessage(&mqttCtx->client,
(MqttObject*)&mqttCtx->client.msg);
}

/* check return code */
if (rc == MQTT_CODE_CONTINUE) {
Expand Down Expand Up @@ -540,6 +550,10 @@ static void *waitMessage_task(void *param)
rc = MqttClient_Publish(&mqttCtx->client,
&mqttCtx->publish);
} while (rc == MQTT_CODE_CONTINUE);
if (rc != MQTT_CODE_SUCCESS) {
MqttClient_CancelMessage(&mqttCtx->client,
(MqttObject*)&mqttCtx->publish);
}
PRINTF("MQTT Publish: Topic %s, %s (%d)",
mqttCtx->publish.topic_name,
MqttClient_ReturnCodeToString(rc), rc);
Expand Down
14 changes: 10 additions & 4 deletions src/mqtt_socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@
#ifdef WOLFMQTT_NO_STDIO
#undef WOLFMQTT_DEBUG_SOCKET
#endif

/* #define WOLFMQTT_TEST_NONBLOCK */
#ifdef WOLFMQTT_TEST_NONBLOCK
#define WOLFMQTT_TEST_NONBLOCK_TIMES 1
#endif

/* lwip */
#ifdef WOLFSSL_LWIP
#undef read
Expand Down Expand Up @@ -125,8 +131,8 @@ static int MqttSocket_WriteDo(MqttClient *client, const byte* buf, int buf_len,

#if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK)
static int testNbWriteAlt = 0;
if (!testNbWriteAlt) {
testNbWriteAlt = 1;
if (testNbWriteAlt < WOLFMQTT_TEST_NONBLOCK_TIMES) {
testNbWriteAlt++;
return MQTT_CODE_CONTINUE;
}
testNbWriteAlt = 0;
Expand Down Expand Up @@ -247,8 +253,8 @@ static int MqttSocket_ReadDo(MqttClient *client, byte* buf, int buf_len,

#if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK)
static int testNbReadAlt = 0;
if (!testNbReadAlt) {
testNbReadAlt = 1;
if (testNbReadAlt < WOLFMQTT_TEST_NONBLOCK_TIMES) {
testNbReadAlt++;
return MQTT_CODE_CONTINUE;
}
testNbReadAlt = 0;
Expand Down

0 comments on commit 5dbd4ee

Please sign in to comment.