Skip to content

Commit

Permalink
Add topic argument to publish callback in request-response stream cli…
Browse files Browse the repository at this point in the history
…ent (#380)
  • Loading branch information
sfod authored Jan 14, 2025
1 parent 255bac4 commit 9c093c4
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 48 deletions.
6 changes: 5 additions & 1 deletion bin/elastishadow/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,11 @@ static void s_stream_subscription_status_fn(
aws_error_debug_str(error_code));
}

static void s_stream_incoming_publish_fn(struct aws_byte_cursor payload, void *user_data) {
static void s_stream_incoming_publish_fn(
struct aws_byte_cursor payload,
struct aws_byte_cursor topic,
void *user_data) {
(void)topic;
struct aws_shadow_streaming_operation *operation = user_data;

struct aws_byte_cursor thing_cursor = aws_byte_cursor_from_buf(&operation->thing);
Expand Down
5 changes: 4 additions & 1 deletion include/aws/mqtt/request-response/request_response_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,10 @@ typedef void(aws_mqtt_streaming_operation_subscription_status_fn)(
/*
* Callback signature for when a publish arrives that matches a streaming operation's subscription
*/
typedef void(aws_mqtt_streaming_operation_incoming_publish_fn)(struct aws_byte_cursor payload, void *user_data);
typedef void(aws_mqtt_streaming_operation_incoming_publish_fn)(
struct aws_byte_cursor payload,
struct aws_byte_cursor topic,
void *user_data);

/*
* Callback signature for when a streaming operation is fully destroyed and no more events will be emitted.
Expand Down
2 changes: 1 addition & 1 deletion source/request-response/request_response_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,7 @@ static void s_apply_publish_to_streaming_operation_list(
}

void *user_data = operation->storage.streaming_storage.options.user_data;
(*incoming_publish_callback)(publish_event->payload, user_data);
(*incoming_publish_callback)(publish_event->payload, publish_event->topic, user_data);

AWS_LOGF_DEBUG(
AWS_LS_MQTT_REQUEST_RESPONSE,
Expand Down
178 changes: 133 additions & 45 deletions tests/request-response/request_response_client_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,16 @@ static int s_rrc_verify_request_completion(
return AWS_OP_SUCCESS;
}

struct aws_rr_client_fixture_publish_message {
struct aws_byte_buf payload;
struct aws_byte_buf topic;
};

struct aws_rr_client_fixture_publish_message_view {
struct aws_byte_cursor payload;
struct aws_byte_cursor topic;
};

struct aws_rr_client_fixture_streaming_record {
struct aws_allocator *allocator;

Expand Down Expand Up @@ -221,7 +231,8 @@ struct aws_rr_client_fixture_streaming_record *s_aws_rr_client_fixture_streaming
aws_byte_buf_init_copy_from_cursor(&record->record_key, allocator, record_key);
record->record_key_cursor = aws_byte_cursor_from_buf(&record->record_key);

aws_array_list_init_dynamic(&record->publishes, allocator, 10, sizeof(struct aws_byte_buf));
aws_array_list_init_dynamic(
&record->publishes, allocator, 10, sizeof(struct aws_rr_client_fixture_publish_message));
aws_array_list_init_dynamic(
&record->subscription_events,
allocator,
Expand All @@ -236,10 +247,11 @@ void s_aws_rr_client_fixture_streaming_record_delete(struct aws_rr_client_fixtur

size_t publish_count = aws_array_list_length(&record->publishes);
for (size_t i = 0; i < publish_count; ++i) {
struct aws_byte_buf publish_payload;
aws_array_list_get_at(&record->publishes, &publish_payload, i);
struct aws_rr_client_fixture_publish_message publish_message;
aws_array_list_get_at(&record->publishes, &publish_message, i);

aws_byte_buf_clean_up(&publish_payload);
aws_byte_buf_clean_up(&publish_message.payload);
aws_byte_buf_clean_up(&publish_message.topic);
}

aws_array_list_clean_up(&record->publishes);
Expand Down Expand Up @@ -276,16 +288,18 @@ static void s_rrc_fixture_streaming_operation_subscription_status_callback(

static void s_rrc_fixture_streaming_operation_incoming_publish_callback(
struct aws_byte_cursor payload,
struct aws_byte_cursor topic,
void *user_data) {
struct aws_rr_client_fixture_streaming_record *record = user_data;
struct aws_rr_client_test_fixture *fixture = record->fixture;

aws_mutex_lock(&fixture->lock);

struct aws_byte_buf payload_buffer;
aws_byte_buf_init_copy_from_cursor(&payload_buffer, fixture->allocator, payload);
struct aws_rr_client_fixture_publish_message publish_message;
aws_byte_buf_init_copy_from_cursor(&publish_message.payload, fixture->allocator, payload);
aws_byte_buf_init_copy_from_cursor(&publish_message.topic, fixture->allocator, topic);

aws_array_list_push_back(&record->publishes, &payload_buffer);
aws_array_list_push_back(&record->publishes, &publish_message);

aws_mutex_unlock(&fixture->lock);
aws_condition_variable_notify_all(&fixture->signal);
Expand Down Expand Up @@ -379,7 +393,7 @@ static int s_rrc_verify_streaming_publishes(
struct aws_rr_client_test_fixture *fixture,
struct aws_byte_cursor key,
size_t expected_publish_count,
struct aws_byte_cursor *expected_publishes) {
struct aws_rr_client_fixture_publish_message_view *expected_publishes) {

aws_mutex_lock(&fixture->lock);

Expand All @@ -394,13 +408,21 @@ static int s_rrc_verify_streaming_publishes(
ASSERT_INT_EQUALS(expected_publish_count, actual_publish_count);

for (size_t i = 0; i < actual_publish_count; ++i) {
struct aws_byte_buf actual_payload;
aws_array_list_get_at(&record->publishes, &actual_payload, i);
struct aws_rr_client_fixture_publish_message actual_publish_message;
aws_array_list_get_at(&record->publishes, &actual_publish_message, i);

struct aws_byte_cursor *expected_payload = &expected_publishes[i];
struct aws_rr_client_fixture_publish_message_view *expected_payload = &expected_publishes[i];

ASSERT_BIN_ARRAYS_EQUALS(
expected_payload->ptr, expected_payload->len, actual_payload.buffer, actual_payload.len);
expected_payload->payload.ptr,
expected_payload->payload.len,
actual_publish_message.payload.buffer,
actual_publish_message.payload.len);
ASSERT_BIN_ARRAYS_EQUALS(
expected_payload->topic.ptr,
expected_payload->topic.len,
actual_publish_message.topic.buffer,
actual_publish_message.topic.len);
}

aws_mutex_unlock(&fixture->lock);
Expand Down Expand Up @@ -1249,9 +1271,15 @@ static int s_rrc_streaming_operation_success_single_fn(struct aws_allocator *all

s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 2);

struct aws_byte_cursor expected_publishes[] = {
payload1,
payload2,
struct aws_rr_client_fixture_publish_message_view expected_publishes[] = {
{
payload1,
topic_filter1,
},
{
payload2,
topic_filter1,
},
};
ASSERT_SUCCESS(s_rrc_verify_streaming_publishes(
&fixture, record_key1, AWS_ARRAY_SIZE(expected_publishes), expected_publishes));
Expand Down Expand Up @@ -1314,10 +1342,19 @@ static int s_rrc_streaming_operation_success_overlapping_fn(struct aws_allocator
s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 2);
s_rrc_wait_for_n_streaming_publishes(&fixture, record_key2, 2);

struct aws_byte_cursor expected_publishes[] = {
payload1,
payload2,
payload3,
struct aws_rr_client_fixture_publish_message_view expected_publishes[] = {
{
payload1,
topic_filter1,
},
{
payload2,
topic_filter1,
},
{
payload3,
topic_filter1,
},
};
ASSERT_SUCCESS(s_rrc_verify_streaming_publishes(&fixture, record_key1, 2, expected_publishes));
ASSERT_SUCCESS(s_rrc_verify_streaming_publishes(&fixture, record_key2, 2, expected_publishes));
Expand Down Expand Up @@ -1397,9 +1434,15 @@ static int s_rrc_streaming_operation_success_starting_offline_fn(struct aws_allo

s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 2);

struct aws_byte_cursor expected_publishes[] = {
payload1,
payload2,
struct aws_rr_client_fixture_publish_message_view expected_publishes[] = {
{
payload1,
topic_filter1,
},
{
payload2,
topic_filter1,
},
};
ASSERT_SUCCESS(s_rrc_verify_streaming_publishes(
&fixture, record_key1, AWS_ARRAY_SIZE(expected_publishes), expected_publishes));
Expand Down Expand Up @@ -1468,9 +1511,15 @@ static int s_rrc_streaming_operation_clean_session_reestablish_subscription_fn(

s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 1);

struct aws_byte_cursor expected_publishes[] = {
payload1,
payload2,
struct aws_rr_client_fixture_publish_message_view expected_publishes[] = {
{
payload1,
topic_filter1,
},
{
payload2,
topic_filter1,
},
};
ASSERT_SUCCESS(s_rrc_verify_streaming_publishes(&fixture, record_key1, 1, expected_publishes));

Expand Down Expand Up @@ -1546,9 +1595,15 @@ static int s_rrc_streaming_operation_resume_session_fn(struct aws_allocator *all

s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 1);

struct aws_byte_cursor expected_publishes[] = {
payload1,
payload2,
struct aws_rr_client_fixture_publish_message_view expected_publishes[] = {
{
payload1,
topic_filter1,
},
{
payload2,
topic_filter1,
},
};
ASSERT_SUCCESS(s_rrc_verify_streaming_publishes(&fixture, record_key1, 1, expected_publishes));

Expand Down Expand Up @@ -1675,9 +1730,15 @@ static int s_rrc_streaming_operation_first_subscribe_times_out_resub_succeeds_fn

s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 2);

struct aws_byte_cursor expected_publishes[] = {
payload1,
payload2,
struct aws_rr_client_fixture_publish_message_view expected_publishes[] = {
{
payload1,
topic_filter1,
},
{
payload2,
topic_filter1,
},
};
ASSERT_SUCCESS(s_rrc_verify_streaming_publishes(
&fixture, record_key1, AWS_ARRAY_SIZE(expected_publishes), expected_publishes));
Expand Down Expand Up @@ -1791,9 +1852,15 @@ static int s_rrc_streaming_operation_first_subscribe_retryable_failure_resub_suc

s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 2);

struct aws_byte_cursor expected_publishes[] = {
payload1,
payload2,
struct aws_rr_client_fixture_publish_message_view expected_publishes[] = {
{
payload1,
topic_filter1,
},
{
payload2,
topic_filter1,
},
};
ASSERT_SUCCESS(s_rrc_verify_streaming_publishes(
&fixture, record_key1, AWS_ARRAY_SIZE(expected_publishes), expected_publishes));
Expand Down Expand Up @@ -1965,9 +2032,15 @@ static int s_rrc_streaming_operation_failure_exceeds_subscription_budget_fn(

s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 2);

struct aws_byte_cursor expected_publishes[] = {
payload1,
payload2,
struct aws_rr_client_fixture_publish_message_view expected_publishes[] = {
{
payload1,
topic_filter1,
},
{
payload2,
topic_filter1,
},
};
ASSERT_SUCCESS(s_rrc_verify_streaming_publishes(&fixture, record_key1, 2, expected_publishes));

Expand Down Expand Up @@ -1998,8 +2071,11 @@ static int s_rrc_streaming_operation_failure_exceeds_subscription_budget_fn(
// verify third operation got the new publish
s_rrc_wait_for_n_streaming_publishes(&fixture, record_key3, 1);

struct aws_byte_cursor third_expected_publishes[] = {
payload3,
struct aws_rr_client_fixture_publish_message_view third_expected_publishes[] = {
{
payload3,
topic_filter2,
},
};
ASSERT_SUCCESS(s_rrc_verify_streaming_publishes(
&fixture, record_key3, AWS_ARRAY_SIZE(third_expected_publishes), third_expected_publishes));
Expand Down Expand Up @@ -2127,9 +2203,15 @@ static int s_rrc_streaming_operation_success_delayed_by_request_operations_fn(

s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 2);

struct aws_byte_cursor expected_publishes[] = {
payload1,
payload2,
struct aws_rr_client_fixture_publish_message_view expected_publishes[] = {
{
payload1,
topic_filter1,
},
{
payload2,
topic_filter1,
},
};
ASSERT_SUCCESS(s_rrc_verify_streaming_publishes(
&fixture, record_key1, AWS_ARRAY_SIZE(expected_publishes), expected_publishes));
Expand Down Expand Up @@ -2204,9 +2286,15 @@ static int s_rrc_streaming_operation_success_sandwiched_by_request_operations_fn

s_rrc_wait_for_n_streaming_publishes(&fixture, record_key1, 2);

struct aws_byte_cursor expected_publishes[] = {
payload1,
payload2,
struct aws_rr_client_fixture_publish_message_view expected_publishes[] = {
{
payload1,
topic_filter1,
},
{
payload2,
topic_filter1,
},
};
ASSERT_SUCCESS(s_rrc_verify_streaming_publishes(
&fixture, record_key1, AWS_ARRAY_SIZE(expected_publishes), expected_publishes));
Expand Down

0 comments on commit 9c093c4

Please sign in to comment.