diff --git a/src/mqtt_client.c b/src/mqtt_client.c index fbd7044f1..5938729b1 100644 --- a/src/mqtt_client.c +++ b/src/mqtt_client.c @@ -162,7 +162,7 @@ static int MqttClient_Publish_ReadPayload(MqttClient* client, static int MqttWriteStart(MqttClient* client, MqttMsgStat* stat) { - int rc = 0; + int rc = MQTT_CODE_SUCCESS; #ifdef WOLFMQTT_MULTITHREAD #ifdef WOLFMQTT_DEBUG_CLIENT @@ -217,7 +217,7 @@ static void MqttWriteStop(MqttClient* client, MqttMsgStat* stat) static int MqttReadStart(MqttClient* client, MqttMsgStat* stat) { - int rc = 0; + int rc = MQTT_CODE_SUCCESS; #ifdef WOLFMQTT_MULTITHREAD #ifdef WOLFMQTT_DEBUG_CLIENT @@ -362,6 +362,7 @@ void MqttClient_RespList_Remove(MqttClient *client, MqttPendResp *rmResp) #endif } +/* return codes: 0=not found, 1=found */ int MqttClient_RespList_Find(MqttClient *client, MqttPacketType packet_type, word16 packet_id, MqttPendResp **retResp) { @@ -1318,7 +1319,7 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj, } client->write.len = rc; - rc = 0; + rc = MQTT_CODE_SUCCESS; mms_stat->write = MQTT_MSG_HEADER; } @@ -1339,7 +1340,7 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj, #endif MqttWriteStop(client, mms_stat); if (rc == xfer) { - rc = 0; /* success */ + rc = MQTT_CODE_SUCCESS; /* success */ } mms_stat->write = MQTT_MSG_BEGIN; /* reset write state */ @@ -1722,15 +1723,6 @@ static int MqttClient_Publish_ReadPayload(MqttClient* client, /* make sure there is something to read */ if (msg_len > 0) { - #if defined(WOLFMQTT_NONBLOCK) && defined(WOLFMQTT_TEST_NONBLOCK) - static int testNbAlt = 0; - if (!testNbAlt) { - testNbAlt = 1; - return MQTT_CODE_CONTINUE; - } - testNbAlt = 0; - #endif - rc = MqttSocket_Read(client, client->rx_buf, msg_len, timeout_ms); if (rc < 0) { @@ -2036,9 +2028,14 @@ static int MqttPublishMsg(MqttClient *client, MqttPublish *publish, #ifdef WOLFMQTT_MULTITHREAD if (writeOnly) { - /* another thread will handle the wait type */ + /* another thread will handle response */ + /* check if response already received from other thread */ rc = MqttClient_CheckPendResp(client, resp_type, publish->packet_id); + if (rc == MQTT_CODE_CONTINUE) { + /* mark success, let other thread handle response */ + rc = MQTT_CODE_SUCCESS; + } } else #endif @@ -2048,17 +2045,18 @@ static int MqttPublishMsg(MqttClient *client, MqttPublish *publish, /* Wait for publish response packet */ rc = MqttClient_WaitType(client, &publish->resp, resp_type, publish->packet_id, client->cmd_timeout_ms); + + #if defined(WOLFMQTT_NONBLOCK) || defined(WOLFMQTT_MULTITHREAD) + if (rc == MQTT_CODE_CONTINUE) + break; + #endif + #ifdef WOLFMQTT_MULTITHREAD + if (wm_SemLock(&client->lockClient) == 0) { + MqttClient_RespList_Remove(client, &publish->pendResp); + wm_SemUnlock(&client->lockClient); + } + #endif } - #if defined(WOLFMQTT_NONBLOCK) || defined(WOLFMQTT_MULTITHREAD) - if (rc == MQTT_CODE_CONTINUE) - break; - #endif - #ifdef WOLFMQTT_MULTITHREAD - if (wm_SemLock(&client->lockClient) == 0) { - MqttClient_RespList_Remove(client, &publish->pendResp); - wm_SemUnlock(&client->lockClient); - } - #endif } break; }