From 9c093c4039478c04c7e55a1825c391dd4742fd61 Mon Sep 17 00:00:00 2001 From: Igor Abdrakhimov Date: Tue, 14 Jan 2025 12:59:31 -0800 Subject: [PATCH] Add topic argument to publish callback in request-response stream client (#380) --- bin/elastishadow/main.c | 6 +- .../request_response_client.h | 5 +- .../request_response_client.c | 2 +- .../request_response_client_tests.c | 178 +++++++++++++----- 4 files changed, 143 insertions(+), 48 deletions(-) diff --git a/bin/elastishadow/main.c b/bin/elastishadow/main.c index b9d15e87..5ac9f0d9 100644 --- a/bin/elastishadow/main.c +++ b/bin/elastishadow/main.c @@ -750,7 +750,11 @@ static void s_stream_subscription_status_fn( aws_error_debug_str(error_code)); } -static void s_stream_incoming_publish_fn(struct aws_byte_cursor payload, void *user_data) { +static void s_stream_incoming_publish_fn( + struct aws_byte_cursor payload, + struct aws_byte_cursor topic, + void *user_data) { + (void)topic; struct aws_shadow_streaming_operation *operation = user_data; struct aws_byte_cursor thing_cursor = aws_byte_cursor_from_buf(&operation->thing); diff --git a/include/aws/mqtt/request-response/request_response_client.h b/include/aws/mqtt/request-response/request_response_client.h index 3edff934..c064d655 100644 --- a/include/aws/mqtt/request-response/request_response_client.h +++ b/include/aws/mqtt/request-response/request_response_client.h @@ -121,7 +121,10 @@ typedef void(aws_mqtt_streaming_operation_subscription_status_fn)( /* * Callback signature for when a publish arrives that matches a streaming operation's subscription */ -typedef void(aws_mqtt_streaming_operation_incoming_publish_fn)(struct aws_byte_cursor payload, void *user_data); +typedef void(aws_mqtt_streaming_operation_incoming_publish_fn)( + struct aws_byte_cursor payload, + struct aws_byte_cursor topic, + void *user_data); /* * Callback signature for when a streaming operation is fully destroyed and no more events will be emitted. diff --git a/source/request-response/request_response_client.c b/source/request-response/request_response_client.c index 52f7fd55..448f7c98 100644 --- a/source/request-response/request_response_client.c +++ b/source/request-response/request_response_client.c @@ -897,7 +897,7 @@ static void s_apply_publish_to_streaming_operation_list( } void *user_data = operation->storage.streaming_storage.options.user_data; - (*incoming_publish_callback)(publish_event->payload, user_data); + (*incoming_publish_callback)(publish_event->payload, publish_event->topic, user_data); AWS_LOGF_DEBUG( AWS_LS_MQTT_REQUEST_RESPONSE, diff --git a/tests/request-response/request_response_client_tests.c b/tests/request-response/request_response_client_tests.c index a1b55f1d..a12e2bf9 100644 --- a/tests/request-response/request_response_client_tests.c +++ b/tests/request-response/request_response_client_tests.c @@ -189,6 +189,16 @@ static int s_rrc_verify_request_completion( return AWS_OP_SUCCESS; } +struct aws_rr_client_fixture_publish_message { + struct aws_byte_buf payload; + struct aws_byte_buf topic; +}; + +struct aws_rr_client_fixture_publish_message_view { + struct aws_byte_cursor payload; + struct aws_byte_cursor topic; +}; + struct aws_rr_client_fixture_streaming_record { struct aws_allocator *allocator; @@ -221,7 +231,8 @@ struct aws_rr_client_fixture_streaming_record *s_aws_rr_client_fixture_streaming aws_byte_buf_init_copy_from_cursor(&record->record_key, allocator, record_key); record->record_key_cursor = aws_byte_cursor_from_buf(&record->record_key); - aws_array_list_init_dynamic(&record->publishes, allocator, 10, sizeof(struct aws_byte_buf)); + aws_array_list_init_dynamic( + &record->publishes, allocator, 10, sizeof(struct aws_rr_client_fixture_publish_message)); aws_array_list_init_dynamic( &record->subscription_events, allocator, @@ -236,10 +247,11 @@ void s_aws_rr_client_fixture_streaming_record_delete(struct aws_rr_client_fixtur size_t publish_count = aws_array_list_length(&record->publishes); for (size_t i = 0; i < publish_count; ++i) { - struct aws_byte_buf publish_payload; - aws_array_list_get_at(&record->publishes, &publish_payload, i); + struct aws_rr_client_fixture_publish_message publish_message; + aws_array_list_get_at(&record->publishes, &publish_message, i); - aws_byte_buf_clean_up(&publish_payload); + aws_byte_buf_clean_up(&publish_message.payload); + aws_byte_buf_clean_up(&publish_message.topic); } aws_array_list_clean_up(&record->publishes); @@ -276,16 +288,18 @@ static void s_rrc_fixture_streaming_operation_subscription_status_callback( static void s_rrc_fixture_streaming_operation_incoming_publish_callback( struct aws_byte_cursor payload, + struct aws_byte_cursor topic, void *user_data) { struct aws_rr_client_fixture_streaming_record *record = user_data; struct aws_rr_client_test_fixture *fixture = record->fixture; aws_mutex_lock(&fixture->lock); - struct aws_byte_buf payload_buffer; - aws_byte_buf_init_copy_from_cursor(&payload_buffer, fixture->allocator, payload); + struct aws_rr_client_fixture_publish_message publish_message; + aws_byte_buf_init_copy_from_cursor(&publish_message.payload, fixture->allocator, payload); + aws_byte_buf_init_copy_from_cursor(&publish_message.topic, fixture->allocator, topic); - aws_array_list_push_back(&record->publishes, &payload_buffer); + aws_array_list_push_back(&record->publishes, &publish_message); aws_mutex_unlock(&fixture->lock); aws_condition_variable_notify_all(&fixture->signal); @@ -379,7 +393,7 @@ static int s_rrc_verify_streaming_publishes( struct aws_rr_client_test_fixture *fixture, struct aws_byte_cursor key, size_t expected_publish_count, - struct aws_byte_cursor *expected_publishes) { + struct aws_rr_client_fixture_publish_message_view *expected_publishes) { aws_mutex_lock(&fixture->lock); @@ -394,13 +408,21 @@ static int s_rrc_verify_streaming_publishes( ASSERT_INT_EQUALS(expected_publish_count, actual_publish_count); for (size_t i = 0; i < actual_publish_count; ++i) { - struct aws_byte_buf actual_payload; - aws_array_list_get_at(&record->publishes, &actual_payload, i); + struct aws_rr_client_fixture_publish_message actual_publish_message; + aws_array_list_get_at(&record->publishes, &actual_publish_message, i); - struct aws_byte_cursor *expected_payload = &expected_publishes[i]; + struct aws_rr_client_fixture_publish_message_view *expected_payload = &expected_publishes[i]; ASSERT_BIN_ARRAYS_EQUALS( - expected_payload->ptr, expected_payload->len, actual_payload.buffer, actual_payload.len); + expected_payload->payload.ptr, + expected_payload->payload.len, + actual_publish_message.payload.buffer, + actual_publish_message.payload.len); + ASSERT_BIN_ARRAYS_EQUALS( + expected_payload->topic.ptr, + expected_payload->topic.len, + actual_publish_message.topic.buffer, + actual_publish_message.topic.len); } aws_mutex_unlock(&fixture->lock); @@ -1249,9 +1271,15 @@ static int s_rrc_streaming_operation_success_single_fn(struct aws_allocator *all s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 2); - struct aws_byte_cursor expected_publishes[] = { - payload1, - payload2, + struct aws_rr_client_fixture_publish_message_view expected_publishes[] = { + { + payload1, + topic_filter1, + }, + { + payload2, + topic_filter1, + }, }; ASSERT_SUCCESS(s_rrc_verify_streaming_publishes( &fixture, record_key1, AWS_ARRAY_SIZE(expected_publishes), expected_publishes)); @@ -1314,10 +1342,19 @@ static int s_rrc_streaming_operation_success_overlapping_fn(struct aws_allocator s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 2); s_rrc_wait_for_n_streaming_publishes(&fixture, record_key2, 2); - struct aws_byte_cursor expected_publishes[] = { - payload1, - payload2, - payload3, + struct aws_rr_client_fixture_publish_message_view expected_publishes[] = { + { + payload1, + topic_filter1, + }, + { + payload2, + topic_filter1, + }, + { + payload3, + topic_filter1, + }, }; ASSERT_SUCCESS(s_rrc_verify_streaming_publishes(&fixture, record_key1, 2, expected_publishes)); ASSERT_SUCCESS(s_rrc_verify_streaming_publishes(&fixture, record_key2, 2, expected_publishes)); @@ -1397,9 +1434,15 @@ static int s_rrc_streaming_operation_success_starting_offline_fn(struct aws_allo s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 2); - struct aws_byte_cursor expected_publishes[] = { - payload1, - payload2, + struct aws_rr_client_fixture_publish_message_view expected_publishes[] = { + { + payload1, + topic_filter1, + }, + { + payload2, + topic_filter1, + }, }; ASSERT_SUCCESS(s_rrc_verify_streaming_publishes( &fixture, record_key1, AWS_ARRAY_SIZE(expected_publishes), expected_publishes)); @@ -1468,9 +1511,15 @@ static int s_rrc_streaming_operation_clean_session_reestablish_subscription_fn( s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 1); - struct aws_byte_cursor expected_publishes[] = { - payload1, - payload2, + struct aws_rr_client_fixture_publish_message_view expected_publishes[] = { + { + payload1, + topic_filter1, + }, + { + payload2, + topic_filter1, + }, }; ASSERT_SUCCESS(s_rrc_verify_streaming_publishes(&fixture, record_key1, 1, expected_publishes)); @@ -1546,9 +1595,15 @@ static int s_rrc_streaming_operation_resume_session_fn(struct aws_allocator *all s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 1); - struct aws_byte_cursor expected_publishes[] = { - payload1, - payload2, + struct aws_rr_client_fixture_publish_message_view expected_publishes[] = { + { + payload1, + topic_filter1, + }, + { + payload2, + topic_filter1, + }, }; ASSERT_SUCCESS(s_rrc_verify_streaming_publishes(&fixture, record_key1, 1, expected_publishes)); @@ -1675,9 +1730,15 @@ static int s_rrc_streaming_operation_first_subscribe_times_out_resub_succeeds_fn s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 2); - struct aws_byte_cursor expected_publishes[] = { - payload1, - payload2, + struct aws_rr_client_fixture_publish_message_view expected_publishes[] = { + { + payload1, + topic_filter1, + }, + { + payload2, + topic_filter1, + }, }; ASSERT_SUCCESS(s_rrc_verify_streaming_publishes( &fixture, record_key1, AWS_ARRAY_SIZE(expected_publishes), expected_publishes)); @@ -1791,9 +1852,15 @@ static int s_rrc_streaming_operation_first_subscribe_retryable_failure_resub_suc s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 2); - struct aws_byte_cursor expected_publishes[] = { - payload1, - payload2, + struct aws_rr_client_fixture_publish_message_view expected_publishes[] = { + { + payload1, + topic_filter1, + }, + { + payload2, + topic_filter1, + }, }; ASSERT_SUCCESS(s_rrc_verify_streaming_publishes( &fixture, record_key1, AWS_ARRAY_SIZE(expected_publishes), expected_publishes)); @@ -1965,9 +2032,15 @@ static int s_rrc_streaming_operation_failure_exceeds_subscription_budget_fn( s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 2); - struct aws_byte_cursor expected_publishes[] = { - payload1, - payload2, + struct aws_rr_client_fixture_publish_message_view expected_publishes[] = { + { + payload1, + topic_filter1, + }, + { + payload2, + topic_filter1, + }, }; ASSERT_SUCCESS(s_rrc_verify_streaming_publishes(&fixture, record_key1, 2, expected_publishes)); @@ -1998,8 +2071,11 @@ static int s_rrc_streaming_operation_failure_exceeds_subscription_budget_fn( // verify third operation got the new publish s_rrc_wait_for_n_streaming_publishes(&fixture, record_key3, 1); - struct aws_byte_cursor third_expected_publishes[] = { - payload3, + struct aws_rr_client_fixture_publish_message_view third_expected_publishes[] = { + { + payload3, + topic_filter2, + }, }; ASSERT_SUCCESS(s_rrc_verify_streaming_publishes( &fixture, record_key3, AWS_ARRAY_SIZE(third_expected_publishes), third_expected_publishes)); @@ -2127,9 +2203,15 @@ static int s_rrc_streaming_operation_success_delayed_by_request_operations_fn( s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 2); - struct aws_byte_cursor expected_publishes[] = { - payload1, - payload2, + struct aws_rr_client_fixture_publish_message_view expected_publishes[] = { + { + payload1, + topic_filter1, + }, + { + payload2, + topic_filter1, + }, }; ASSERT_SUCCESS(s_rrc_verify_streaming_publishes( &fixture, record_key1, AWS_ARRAY_SIZE(expected_publishes), expected_publishes)); @@ -2204,9 +2286,15 @@ static int s_rrc_streaming_operation_success_sandwiched_by_request_operations_fn s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 2); - struct aws_byte_cursor expected_publishes[] = { - payload1, - payload2, + struct aws_rr_client_fixture_publish_message_view expected_publishes[] = { + { + payload1, + topic_filter1, + }, + { + payload2, + topic_filter1, + }, }; ASSERT_SUCCESS(s_rrc_verify_streaming_publishes( &fixture, record_key1, AWS_ARRAY_SIZE(expected_publishes), expected_publishes));