Skip to content

Commit

Permalink
Fix mqtt5to3adapter suback failure handling (#723)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfod authored Nov 22, 2023
1 parent 6239f87 commit d27f864
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 1 deletion.
13 changes: 12 additions & 1 deletion src/native/mqtt_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,10 @@ static void s_on_op_complete(
/********** JNI ENV RELEASE **********/
}

static bool s_is_qos_successful(enum aws_mqtt_qos qos) {
return qos < 128;
}

static void s_on_ack(
struct aws_mqtt_client_connection *connection,
uint16_t packet_id,
Expand All @@ -767,7 +771,14 @@ static void s_on_ack(
int error_code,
void *user_data) {
(void)topic;
(void)qos;

// Handle a case when the server processed SUBSCRIBE request successfully, but rejected a subscription for some
// reason, i.e. error_code is 0 and qos is 0x80.
// This mostly applies to mqtt5to3adapter, as MQTT3 client will be disconnected on unsuccessful subscribe.
if (error_code == 0 && !s_is_qos_successful(qos)) {
error_code = AWS_ERROR_MQTT_CONNECTION_SUBSCRIBE_FAILURE;
}

s_on_op_complete(connection, packet_id, error_code, user_data);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,80 @@ public void TestOperationSubUnsub() {
}
}

@Test
public void TestAnauthorizedSub() {
skipIfNetworkUnavailable();
Assume.assumeNotNull(AWS_TEST_MQTT5_IOT_CORE_HOST, AWS_TEST_MQTT5_IOT_CORE_RSA_CERT,
AWS_TEST_MQTT5_IOT_CORE_RSA_KEY);
String testUUID = UUID.randomUUID().toString();
String testTopic = "$this/topic/is/unsubscribable";
String clientId = "test/MQTT5TO3Adapter_ClientId" + testUUID;
String testPayload = "PUBLISH ME!";

Consumer<MqttMessage> messageHandler = (message) -> {
byte[] payload = message.getPayload();
try {
assertEquals(testTopic, message.getTopic());
String contents = new String(payload, "UTF-8");
assertEquals("Message is intact", testPayload, contents);
} catch (Exception ex) {
fail("Unable to decode payload: " + ex.getMessage());
}
};

try {
Mqtt5ClientOptionsBuilder builder = new Mqtt5ClientOptionsBuilder(AWS_TEST_MQTT5_IOT_CORE_HOST, 8883l);
LifecycleEvents_Futured events = new LifecycleEvents_Futured();
builder.withLifecycleEvents(events);

TlsContextOptions tlsOptions = TlsContextOptions.createWithMtlsFromPath(
AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY);
TlsContext tlsContext = new TlsContext(tlsOptions);
tlsOptions.close();
builder.withTlsContext(tlsContext);

PublishEvents_Futured publishEvents = new PublishEvents_Futured();
builder.withPublishEvents(publishEvents);
ConnectPacketBuilder connectBuilder = new ConnectPacketBuilder();
connectBuilder.withClientId(clientId);
builder.withConnectOptions(connectBuilder.build());

try (Mqtt5Client client = new Mqtt5Client(builder.build());
MqttClientConnection connection = new MqttClientConnection(client, null);) {
connection.onMessage(messageHandler);
client.start();
events.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);

CompletableFuture<MqttMessage> receivedFuture = new CompletableFuture<>();
Consumer<MqttMessage> subscriberMessageHandler = (message) -> {
receivedFuture.complete(message);
};

CompletableFuture<Integer> subscribed = connection.subscribe(testTopic, QualityOfService.AT_LEAST_ONCE,
subscriberMessageHandler);
subscribed.thenApply(unused -> subsAcked++);
boolean subFailed = false;
try {
int packetId = subscribed.get();
} catch (Exception ex) {
System.out.println("Subscribe failed: " + ex);
subFailed = true;
}

assertTrue("The SUBSCRIBE should fail", subFailed);

client.stop(new DisconnectPacketBuilder().build());
}

if (tlsContext != null) {
tlsContext.close();
}

} catch (Exception ex) {
fail(ex.getMessage());
}
}

@Test
public void TestNullPubAck() {
skipIfNetworkUnavailable();
Expand Down

0 comments on commit d27f864

Please sign in to comment.