From 863496349bf4e70f620761edceeb14ac83a33364 Mon Sep 17 00:00:00 2001 From: Eric Blankenhorn Date: Tue, 19 Dec 2023 12:55:34 -0600 Subject: [PATCH] Med fixes --- examples/mqttnet.c | 10 ++- examples/multithread/multithread.c | 117 ++++++++++++++------------- examples/sn-client/sn-client_qos-1.c | 5 -- examples/sn-client/sn-multithread.c | 13 +-- src/mqtt_client.c | 47 ++++++----- 5 files changed, 102 insertions(+), 90 deletions(-) diff --git a/examples/mqttnet.c b/examples/mqttnet.c index 3f811b74f..ef86e858a 100644 --- a/examples/mqttnet.c +++ b/examples/mqttnet.c @@ -528,7 +528,8 @@ static int NetConnect(void *context, const char* host, word16 port, #ifdef WOLFMQTT_NONBLOCK { /* Check for error */ - GET_SOCK_ERROR(sock->fd, SOL_SOCKET, SO_ERROR, so_error); + (void)GET_SOCK_ERROR(sock->fd, SOL_SOCKET, SO_ERROR, + so_error); } if ( #ifndef _WIN32 @@ -704,14 +705,15 @@ static int NetWrite(void *context, const byte* buf, int buf_len, #ifndef WOLFMQTT_NO_TIMEOUT /* Setup timeout */ setup_timeout(&tv, timeout_ms); - setsockopt(sock->fd, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv, sizeof(tv)); + (void)setsockopt(sock->fd, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv, + sizeof(tv)); #endif rc = (int)SOCK_SEND(sock->fd, buf, buf_len, 0); if (rc == -1) { { /* Get error */ - GET_SOCK_ERROR(sock->fd, SOL_SOCKET, SO_ERROR, so_error); + (void)GET_SOCK_ERROR(sock->fd, SOL_SOCKET, SO_ERROR, so_error); } if (so_error == 0) { #if defined(USE_WINDOWS_API) && defined(WOLFMQTT_NONBLOCK) @@ -906,7 +908,7 @@ static int NetRead_ex(void *context, byte* buf, int buf_len, else if (rc < 0) { { /* Get error */ - GET_SOCK_ERROR(sock->fd, SOL_SOCKET, SO_ERROR, so_error); + (void)GET_SOCK_ERROR(sock->fd, SOL_SOCKET, SO_ERROR, so_error); } if (so_error == 0) { rc = 0; /* Handle signal */ diff --git a/examples/multithread/multithread.c b/examples/multithread/multithread.c index 8a6ee52c9..0623162c2 100644 --- a/examples/multithread/multithread.c +++ b/examples/multithread/multithread.c @@ -82,27 +82,30 @@ static MQTTCtx gMqttCtx; static word16 mqtt_get_packetid_threadsafe(void) { - word16 packet_id; - wm_SemLock(&mtLock); - packet_id = mqtt_get_packetid(); - wm_SemUnlock(&mtLock); + word16 packet_id = 0; + if (wm_SemLock(&mtLock) == 0) { + packet_id = mqtt_get_packetid(); + wm_SemUnlock(&mtLock); + } return packet_id; } static void mqtt_stop_set(void) { - wm_SemLock(&mtLock); - PRINTF("MQTT Stopping"); - mStopRead = 1; - wm_SemUnlock(&mtLock); + if (wm_SemLock(&mtLock) == 0) { + PRINTF("MQTT Stopping"); + mStopRead = 1; + wm_SemUnlock(&mtLock); + } } static int mqtt_stop_get(void) { - int rc; - wm_SemLock(&mtLock); - rc = mStopRead; - wm_SemUnlock(&mtLock); + int rc = 0; + if (wm_SemLock(&mtLock) == 0) { + rc = mStopRead; + wm_SemUnlock(&mtLock); + } return rc; } @@ -164,47 +167,48 @@ static int mqtt_message_cb(MqttClient *client, MqttMessage *msg, MQTTCtx* mqttCtx = (MQTTCtx*)client->ctx; (void)mqttCtx; - wm_SemLock(&mtLock); - if (msg_new) { - /* Determine min size to dump */ - len = msg->topic_name_len; + if (wm_SemLock(&mtLock) == 0) { + if (msg_new) { + /* Determine min size to dump */ + len = msg->topic_name_len; + if (len > PRINT_BUFFER_SIZE) { + len = PRINT_BUFFER_SIZE; + } + XMEMCPY(buf, msg->topic_name, len); + buf[len] = '\0'; /* Make sure its null terminated */ + + /* Print incoming message */ + PRINTF("MQTT Message: Topic %s, Qos %d, Id %d, Len %u, %u, %u", + buf, msg->qos, msg->packet_id, msg->total_len, msg->buffer_len, + msg->buffer_pos); + } + + /* Print message payload */ + len = msg->buffer_len; if (len > PRINT_BUFFER_SIZE) { len = PRINT_BUFFER_SIZE; } - XMEMCPY(buf, msg->topic_name, len); + XMEMCPY(buf, msg->buffer, len); buf[len] = '\0'; /* Make sure its null terminated */ + PRINTF("Payload (%d - %d) printing %d bytes:" LINE_END "%s", + msg->buffer_pos, msg->buffer_pos + msg->buffer_len, len, buf); - /* Print incoming message */ - PRINTF("MQTT Message: Topic %s, Qos %d, Id %d, Len %u, %u, %u", - buf, msg->qos, msg->packet_id, msg->total_len, msg->buffer_len, msg->buffer_pos); - } - - /* Print message payload */ - len = msg->buffer_len; - if (len > PRINT_BUFFER_SIZE) { - len = PRINT_BUFFER_SIZE; - } - XMEMCPY(buf, msg->buffer, len); - buf[len] = '\0'; /* Make sure its null terminated */ - PRINTF("Payload (%d - %d) printing %d bytes:" LINE_END "%s", - msg->buffer_pos, msg->buffer_pos + msg->buffer_len, len, buf); - - if (msg_done) { - /* for test mode: count the number of messages received */ - if (mqttCtx->test_mode) { - if (msg->buffer_pos + msg->buffer_len == - (word32)sizeof(mTestMessage) && - XMEMCMP(&mTestMessage[msg->buffer_pos], msg->buffer, - msg->buffer_len) == 0) - { - mNumMsgsRecvd++; + if (msg_done) { + /* for test mode: count the number of messages received */ + if (mqttCtx->test_mode) { + if (msg->buffer_pos + msg->buffer_len == + (word32)sizeof(mTestMessage) && + XMEMCMP(&mTestMessage[msg->buffer_pos], msg->buffer, + msg->buffer_len) == 0) + { + mNumMsgsRecvd++; + } } - } - PRINTF("MQTT Message: Done"); + PRINTF("MQTT Message: Done"); + } + wm_SemUnlock(&mtLock); } - wm_SemUnlock(&mtLock); - /* Return negative to terminate publish processing */ return MQTT_CODE_SUCCESS; } @@ -469,19 +473,20 @@ static int TestIsDone(int rc, MQTTCtx* mqttCtx) { int isDone = 0; /* check if we are in test mode and done */ - wm_SemLock(&mtLock); - if ((rc == 0 || rc == MQTT_CODE_CONTINUE) && mqttCtx->test_mode && - mNumMsgsDone == (NUM_PUB_TASKS * NUM_PUB_PER_TASK) && - mNumMsgsRecvd == (NUM_PUB_TASKS * NUM_PUB_PER_TASK) - #ifdef WOLFMQTT_NONBLOCK - && !MqttClient_IsMessageActive(&mqttCtx->client, NULL) - #endif - ) { + if (wm_SemLock(&mtLock) == 0) { + if ((rc == 0 || rc == MQTT_CODE_CONTINUE) && mqttCtx->test_mode && + mNumMsgsDone == (NUM_PUB_TASKS * NUM_PUB_PER_TASK) && + mNumMsgsRecvd == (NUM_PUB_TASKS * NUM_PUB_PER_TASK) + #ifdef WOLFMQTT_NONBLOCK + && !MqttClient_IsMessageActive(&mqttCtx->client, NULL) + #endif + ) { + wm_SemUnlock(&mtLock); + mqtt_stop_set(); + isDone = 1; /* done */ + } wm_SemUnlock(&mtLock); - mqtt_stop_set(); - isDone = 1; /* done */ } - wm_SemUnlock(&mtLock); return isDone; } diff --git a/examples/sn-client/sn-client_qos-1.c b/examples/sn-client/sn-client_qos-1.c index ed57b09b2..cb89db29e 100644 --- a/examples/sn-client/sn-client_qos-1.c +++ b/examples/sn-client/sn-client_qos-1.c @@ -111,11 +111,6 @@ int sn_testQoSn1(MQTTCtx *mqttCtx) } } - /* Check for error */ - if (rc != MQTT_CODE_SUCCESS) { - goto disconn; - } - disconn: rc = MqttClient_NetDisconnect(&mqttCtx->client); diff --git a/examples/sn-client/sn-multithread.c b/examples/sn-client/sn-multithread.c index 7ca310201..bfd18f8de 100644 --- a/examples/sn-client/sn-multithread.c +++ b/examples/sn-client/sn-multithread.c @@ -79,10 +79,11 @@ static MQTTCtx gMqttCtx; static word16 mqtt_get_packetid_threadsafe(void) { - word16 packet_id; - wm_SemLock(&packetIdLock); - packet_id = mqtt_get_packetid(); - wm_SemUnlock(&packetIdLock); + word16 packet_id = 0; + if (wm_SemLock(&packetIdLock) == 0) { + packet_id = mqtt_get_packetid(); + wm_SemUnlock(&packetIdLock); + } return packet_id; } @@ -243,7 +244,9 @@ static int multithread_test_init(MQTTCtx *mqttCtx) wm_SemFree(&packetIdLock); client_exit(mqttCtx); } - wm_SemLock(&pingSignal); /* default to locked */ + if (wm_SemLock(&pingSignal) != 0) { /* default to locked */ + client_exit(mqttCtx); + } PRINTF("MQTT-SN Client: QoS %d, Use TLS %d", mqttCtx->qos, mqttCtx->use_tls); diff --git a/src/mqtt_client.c b/src/mqtt_client.c index 4f1aa7fdd..f20753fdf 100644 --- a/src/mqtt_client.c +++ b/src/mqtt_client.c @@ -2844,6 +2844,7 @@ int MqttClient_NetDisconnect(MqttClient *client) { #ifdef WOLFMQTT_MULTITHREAD MqttPendResp *tmpResp; + int rc; #endif if (client == NULL) { @@ -2852,24 +2853,28 @@ int MqttClient_NetDisconnect(MqttClient *client) #ifdef WOLFMQTT_MULTITHREAD /* Get client lock on to ensure no other threads are active */ - wm_SemLock(&client->lockClient); - -#ifdef WOLFMQTT_DEBUG_CLIENT - PRINTF("Net Disconnect: Removing pending responses"); -#endif - for (tmpResp = client->firstPendResp; - tmpResp != NULL; - tmpResp = tmpResp->next) { + rc = wm_SemLock(&client->lockClient); + if (rc == 0) { #ifdef WOLFMQTT_DEBUG_CLIENT - PRINTF("\tPendResp: %p (obj %p), Type %s (%d), ID %d, InProc %d, Done %d", - tmpResp, tmpResp->packet_obj, - MqttPacket_TypeDesc(tmpResp->packet_type), - tmpResp->packet_type, tmpResp->packet_id, - tmpResp->packetProcessing, tmpResp->packetDone); + PRINTF("Net Disconnect: Removing pending responses"); #endif - MqttClient_RespList_Remove(client, tmpResp); + for (tmpResp = client->firstPendResp; + tmpResp != NULL; + tmpResp = tmpResp->next) { + #ifdef WOLFMQTT_DEBUG_CLIENT + PRINTF("\tPendResp: %p (obj %p), Type %s (%d), ID %d, InProc %d, Done %d", + tmpResp, tmpResp->packet_obj, + MqttPacket_TypeDesc(tmpResp->packet_type), + tmpResp->packet_type, tmpResp->packet_id, + tmpResp->packetProcessing, tmpResp->packetDone); + #endif + MqttClient_RespList_Remove(client, tmpResp); + } + wm_SemUnlock(&client->lockClient); + } + else { + return rc; } - wm_SemUnlock(&client->lockClient); #endif return MqttSocket_Disconnect(client); @@ -3029,14 +3034,16 @@ word32 MqttClient_Flags(MqttClient *client, word32 mask, word32 flags) if (client != NULL) { #ifdef WOLFMQTT_MULTITHREAD /* Get client lock on to ensure no other threads are active */ - wm_SemLock(&client->lockClient); + if (wm_SemLock(&client->lockClient) == 0) #endif - client->flags &= ~mask; - client->flags |= flags; - ret = client->flags; + { + client->flags &= ~mask; + client->flags |= flags; + ret = client->flags; #ifdef WOLFMQTT_MULTITHREAD - wm_SemUnlock(&client->lockClient); + wm_SemUnlock(&client->lockClient); #endif + } } return ret; }