Skip to content

Commit

Permalink
Med fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
embhorn committed Dec 19, 2023
1 parent 906e574 commit 8634963
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 90 deletions.
10 changes: 6 additions & 4 deletions examples/mqttnet.c
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,8 @@ static int NetConnect(void *context, const char* host, word16 port,
#ifdef WOLFMQTT_NONBLOCK
{
/* Check for error */
GET_SOCK_ERROR(sock->fd, SOL_SOCKET, SO_ERROR, so_error);
(void)GET_SOCK_ERROR(sock->fd, SOL_SOCKET, SO_ERROR,
so_error);
}
if (
#ifndef _WIN32
Expand Down Expand Up @@ -704,14 +705,15 @@ static int NetWrite(void *context, const byte* buf, int buf_len,
#ifndef WOLFMQTT_NO_TIMEOUT
/* Setup timeout */
setup_timeout(&tv, timeout_ms);
setsockopt(sock->fd, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv, sizeof(tv));
(void)setsockopt(sock->fd, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv,
sizeof(tv));
#endif

rc = (int)SOCK_SEND(sock->fd, buf, buf_len, 0);
if (rc == -1) {
{
/* Get error */
GET_SOCK_ERROR(sock->fd, SOL_SOCKET, SO_ERROR, so_error);
(void)GET_SOCK_ERROR(sock->fd, SOL_SOCKET, SO_ERROR, so_error);
}
if (so_error == 0) {
#if defined(USE_WINDOWS_API) && defined(WOLFMQTT_NONBLOCK)
Expand Down Expand Up @@ -906,7 +908,7 @@ static int NetRead_ex(void *context, byte* buf, int buf_len,
else if (rc < 0) {
{
/* Get error */
GET_SOCK_ERROR(sock->fd, SOL_SOCKET, SO_ERROR, so_error);
(void)GET_SOCK_ERROR(sock->fd, SOL_SOCKET, SO_ERROR, so_error);
}
if (so_error == 0) {
rc = 0; /* Handle signal */
Expand Down
117 changes: 61 additions & 56 deletions examples/multithread/multithread.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,27 +82,30 @@ static MQTTCtx gMqttCtx;

static word16 mqtt_get_packetid_threadsafe(void)
{
word16 packet_id;
wm_SemLock(&mtLock);
packet_id = mqtt_get_packetid();
wm_SemUnlock(&mtLock);
word16 packet_id = 0;
if (wm_SemLock(&mtLock) == 0) {
packet_id = mqtt_get_packetid();
wm_SemUnlock(&mtLock);
}
return packet_id;
}

static void mqtt_stop_set(void)
{
wm_SemLock(&mtLock);
PRINTF("MQTT Stopping");
mStopRead = 1;
wm_SemUnlock(&mtLock);
if (wm_SemLock(&mtLock) == 0) {
PRINTF("MQTT Stopping");
mStopRead = 1;
wm_SemUnlock(&mtLock);
}
}

static int mqtt_stop_get(void)
{
int rc;
wm_SemLock(&mtLock);
rc = mStopRead;
wm_SemUnlock(&mtLock);
int rc = 0;
if (wm_SemLock(&mtLock) == 0) {
rc = mStopRead;
wm_SemUnlock(&mtLock);
}
return rc;
}

Expand Down Expand Up @@ -164,47 +167,48 @@ static int mqtt_message_cb(MqttClient *client, MqttMessage *msg,
MQTTCtx* mqttCtx = (MQTTCtx*)client->ctx;
(void)mqttCtx;

wm_SemLock(&mtLock);
if (msg_new) {
/* Determine min size to dump */
len = msg->topic_name_len;
if (wm_SemLock(&mtLock) == 0) {
if (msg_new) {
/* Determine min size to dump */
len = msg->topic_name_len;
if (len > PRINT_BUFFER_SIZE) {
len = PRINT_BUFFER_SIZE;
}
XMEMCPY(buf, msg->topic_name, len);
buf[len] = '\0'; /* Make sure its null terminated */

/* Print incoming message */
PRINTF("MQTT Message: Topic %s, Qos %d, Id %d, Len %u, %u, %u",
buf, msg->qos, msg->packet_id, msg->total_len, msg->buffer_len,
msg->buffer_pos);
}

/* Print message payload */
len = msg->buffer_len;
if (len > PRINT_BUFFER_SIZE) {
len = PRINT_BUFFER_SIZE;
}
XMEMCPY(buf, msg->topic_name, len);
XMEMCPY(buf, msg->buffer, len);
buf[len] = '\0'; /* Make sure its null terminated */
PRINTF("Payload (%d - %d) printing %d bytes:" LINE_END "%s",
msg->buffer_pos, msg->buffer_pos + msg->buffer_len, len, buf);

/* Print incoming message */
PRINTF("MQTT Message: Topic %s, Qos %d, Id %d, Len %u, %u, %u",
buf, msg->qos, msg->packet_id, msg->total_len, msg->buffer_len, msg->buffer_pos);
}

/* Print message payload */
len = msg->buffer_len;
if (len > PRINT_BUFFER_SIZE) {
len = PRINT_BUFFER_SIZE;
}
XMEMCPY(buf, msg->buffer, len);
buf[len] = '\0'; /* Make sure its null terminated */
PRINTF("Payload (%d - %d) printing %d bytes:" LINE_END "%s",
msg->buffer_pos, msg->buffer_pos + msg->buffer_len, len, buf);

if (msg_done) {
/* for test mode: count the number of messages received */
if (mqttCtx->test_mode) {
if (msg->buffer_pos + msg->buffer_len ==
(word32)sizeof(mTestMessage) &&
XMEMCMP(&mTestMessage[msg->buffer_pos], msg->buffer,
msg->buffer_len) == 0)
{
mNumMsgsRecvd++;
if (msg_done) {
/* for test mode: count the number of messages received */
if (mqttCtx->test_mode) {
if (msg->buffer_pos + msg->buffer_len ==
(word32)sizeof(mTestMessage) &&
XMEMCMP(&mTestMessage[msg->buffer_pos], msg->buffer,
msg->buffer_len) == 0)
{
mNumMsgsRecvd++;
}
}
}

PRINTF("MQTT Message: Done");
PRINTF("MQTT Message: Done");
}
wm_SemUnlock(&mtLock);
}
wm_SemUnlock(&mtLock);

/* Return negative to terminate publish processing */
return MQTT_CODE_SUCCESS;
}
Expand Down Expand Up @@ -469,19 +473,20 @@ static int TestIsDone(int rc, MQTTCtx* mqttCtx)
{
int isDone = 0;
/* check if we are in test mode and done */
wm_SemLock(&mtLock);
if ((rc == 0 || rc == MQTT_CODE_CONTINUE) && mqttCtx->test_mode &&
mNumMsgsDone == (NUM_PUB_TASKS * NUM_PUB_PER_TASK) &&
mNumMsgsRecvd == (NUM_PUB_TASKS * NUM_PUB_PER_TASK)
#ifdef WOLFMQTT_NONBLOCK
&& !MqttClient_IsMessageActive(&mqttCtx->client, NULL)
#endif
) {
if (wm_SemLock(&mtLock) == 0) {
if ((rc == 0 || rc == MQTT_CODE_CONTINUE) && mqttCtx->test_mode &&
mNumMsgsDone == (NUM_PUB_TASKS * NUM_PUB_PER_TASK) &&
mNumMsgsRecvd == (NUM_PUB_TASKS * NUM_PUB_PER_TASK)
#ifdef WOLFMQTT_NONBLOCK
&& !MqttClient_IsMessageActive(&mqttCtx->client, NULL)
#endif
) {
wm_SemUnlock(&mtLock);
mqtt_stop_set();
isDone = 1; /* done */
}
wm_SemUnlock(&mtLock);
mqtt_stop_set();
isDone = 1; /* done */
}
wm_SemUnlock(&mtLock);
return isDone;
}

Expand Down
5 changes: 0 additions & 5 deletions examples/sn-client/sn-client_qos-1.c
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,6 @@ int sn_testQoSn1(MQTTCtx *mqttCtx)
}
}

/* Check for error */
if (rc != MQTT_CODE_SUCCESS) {
goto disconn;
}

disconn:

rc = MqttClient_NetDisconnect(&mqttCtx->client);
Expand Down
13 changes: 8 additions & 5 deletions examples/sn-client/sn-multithread.c
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,11 @@ static MQTTCtx gMqttCtx;

static word16 mqtt_get_packetid_threadsafe(void)
{
word16 packet_id;
wm_SemLock(&packetIdLock);
packet_id = mqtt_get_packetid();
wm_SemUnlock(&packetIdLock);
word16 packet_id = 0;
if (wm_SemLock(&packetIdLock) == 0) {
packet_id = mqtt_get_packetid();
wm_SemUnlock(&packetIdLock);
}
return packet_id;
}

Expand Down Expand Up @@ -243,7 +244,9 @@ static int multithread_test_init(MQTTCtx *mqttCtx)
wm_SemFree(&packetIdLock);
client_exit(mqttCtx);
}
wm_SemLock(&pingSignal); /* default to locked */
if (wm_SemLock(&pingSignal) != 0) { /* default to locked */
client_exit(mqttCtx);
}

PRINTF("MQTT-SN Client: QoS %d, Use TLS %d", mqttCtx->qos,
mqttCtx->use_tls);
Expand Down
47 changes: 27 additions & 20 deletions src/mqtt_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -2844,6 +2844,7 @@ int MqttClient_NetDisconnect(MqttClient *client)
{
#ifdef WOLFMQTT_MULTITHREAD
MqttPendResp *tmpResp;
int rc;
#endif

if (client == NULL) {
Expand All @@ -2852,24 +2853,28 @@ int MqttClient_NetDisconnect(MqttClient *client)

#ifdef WOLFMQTT_MULTITHREAD
/* Get client lock on to ensure no other threads are active */
wm_SemLock(&client->lockClient);

#ifdef WOLFMQTT_DEBUG_CLIENT
PRINTF("Net Disconnect: Removing pending responses");
#endif
for (tmpResp = client->firstPendResp;
tmpResp != NULL;
tmpResp = tmpResp->next) {
rc = wm_SemLock(&client->lockClient);
if (rc == 0) {
#ifdef WOLFMQTT_DEBUG_CLIENT
PRINTF("\tPendResp: %p (obj %p), Type %s (%d), ID %d, InProc %d, Done %d",
tmpResp, tmpResp->packet_obj,
MqttPacket_TypeDesc(tmpResp->packet_type),
tmpResp->packet_type, tmpResp->packet_id,
tmpResp->packetProcessing, tmpResp->packetDone);
PRINTF("Net Disconnect: Removing pending responses");
#endif
MqttClient_RespList_Remove(client, tmpResp);
for (tmpResp = client->firstPendResp;
tmpResp != NULL;
tmpResp = tmpResp->next) {
#ifdef WOLFMQTT_DEBUG_CLIENT
PRINTF("\tPendResp: %p (obj %p), Type %s (%d), ID %d, InProc %d, Done %d",
tmpResp, tmpResp->packet_obj,
MqttPacket_TypeDesc(tmpResp->packet_type),
tmpResp->packet_type, tmpResp->packet_id,
tmpResp->packetProcessing, tmpResp->packetDone);
#endif
MqttClient_RespList_Remove(client, tmpResp);
}
wm_SemUnlock(&client->lockClient);
}
else {
return rc;
}
wm_SemUnlock(&client->lockClient);
#endif

return MqttSocket_Disconnect(client);
Expand Down Expand Up @@ -3029,14 +3034,16 @@ word32 MqttClient_Flags(MqttClient *client, word32 mask, word32 flags)
if (client != NULL) {
#ifdef WOLFMQTT_MULTITHREAD
/* Get client lock on to ensure no other threads are active */
wm_SemLock(&client->lockClient);
if (wm_SemLock(&client->lockClient) == 0)
#endif
client->flags &= ~mask;
client->flags |= flags;
ret = client->flags;
{
client->flags &= ~mask;
client->flags |= flags;
ret = client->flags;
#ifdef WOLFMQTT_MULTITHREAD
wm_SemUnlock(&client->lockClient);
wm_SemUnlock(&client->lockClient);
#endif
}
}
return ret;
}

0 comments on commit 8634963

Please sign in to comment.