diff --git a/aeron-client/src/main/c/concurrent/aeron_logbuffer_descriptor.c b/aeron-client/src/main/c/concurrent/aeron_logbuffer_descriptor.c index e48429c582..2e6d84127b 100644 --- a/aeron-client/src/main/c/concurrent/aeron_logbuffer_descriptor.c +++ b/aeron-client/src/main/c/concurrent/aeron_logbuffer_descriptor.c @@ -266,5 +266,39 @@ extern bool aeron_logbuffer_rotate_log( aeron_logbuffer_metadata_t *log_meta_data, int32_t current_term_count, int32_t current_term_id); extern void aeron_logbuffer_fill_default_header( uint8_t *log_meta_data_buffer, int32_t session_id, int32_t stream_id, int32_t initial_term_id); +extern void aeron_logbuffer_metadata_init( + uint8_t *log_meta_data_buffer, + int64_t end_of_stream_position, + int32_t is_connected, + int32_t active_transport_count, + int64_t correlation_id, + int32_t initial_term_id, + int32_t mtu_length, + int32_t term_length, + int32_t page_size, + int32_t publication_window_length, + int32_t receiver_window_length, + int32_t socket_sndbuf_length, + int32_t os_default_socket_sndbuf_length, + int32_t os_max_socket_sndbuf_length, + int32_t socket_rcvbuf_length, + int32_t os_default_socket_rcvbuf_length, + int32_t os_max_socket_rcvbuf_length, + int32_t max_resend, + int32_t session_id, + int32_t stream_id, + int64_t entity_tag, + int64_t response_correlation_id, + int64_t linger_timeout_ns, + int64_t untethered_window_limit_timeout_ns, + int64_t untethered_resting_timeout_ns, + uint8_t group, + uint8_t is_response, + uint8_t rejoin, + uint8_t reliable, + uint8_t sparse, + uint8_t signal_eos, + uint8_t spies_simulate_connection, + uint8_t tether); extern void aeron_logbuffer_apply_default_header(uint8_t *log_meta_data_buffer, uint8_t *buffer); extern size_t aeron_logbuffer_compute_fragmented_length(size_t length, size_t max_payload_length); diff --git a/aeron-client/src/main/c/concurrent/aeron_logbuffer_descriptor.h b/aeron-client/src/main/c/concurrent/aeron_logbuffer_descriptor.h index 92a95a4d6e..65d28697a0 100644 --- a/aeron-client/src/main/c/concurrent/aeron_logbuffer_descriptor.h +++ b/aeron-client/src/main/c/concurrent/aeron_logbuffer_descriptor.h @@ -225,6 +225,85 @@ inline void aeron_logbuffer_fill_default_header( data_header->reserved_value = AERON_DATA_HEADER_DEFAULT_RESERVED_VALUE; } +/* + * Does NOT initialize the following fields: + * - term_tail_counters + * - active_term_count + */ +inline void aeron_logbuffer_metadata_init( + uint8_t *log_meta_data_buffer, + int64_t end_of_stream_position, + int32_t is_connected, + int32_t active_transport_count, + int64_t correlation_id, + int32_t initial_term_id, + int32_t mtu_length, + int32_t term_length, + int32_t page_size, + int32_t publication_window_length, + int32_t receiver_window_length, + int32_t socket_sndbuf_length, + int32_t os_default_socket_sndbuf_length, + int32_t os_max_socket_sndbuf_length, + int32_t socket_rcvbuf_length, + int32_t os_default_socket_rcvbuf_length, + int32_t os_max_socket_rcvbuf_length, + int32_t max_resend, + int32_t session_id, + int32_t stream_id, + int64_t entity_tag, + int64_t response_correlation_id, + int64_t linger_timeout_ns, + int64_t untethered_window_limit_timeout_ns, + int64_t untethered_resting_timeout_ns, + uint8_t group, + uint8_t is_response, + uint8_t rejoin, + uint8_t reliable, + uint8_t sparse, + uint8_t signal_eos, + uint8_t spies_simulate_connection, + uint8_t tether) +{ + aeron_logbuffer_metadata_t *log_meta_data = (aeron_logbuffer_metadata_t *)log_meta_data_buffer; + + log_meta_data->end_of_stream_position = end_of_stream_position; + log_meta_data->is_connected = is_connected; + log_meta_data->active_transport_count = active_transport_count; + + log_meta_data->correlation_id = correlation_id; + log_meta_data->initial_term_id = initial_term_id; + log_meta_data->mtu_length = mtu_length; + log_meta_data->term_length = term_length; + log_meta_data->page_size = page_size; + + log_meta_data->publication_window_length = publication_window_length; + log_meta_data->receiver_window_length = receiver_window_length; + log_meta_data->socket_sndbuf_length = socket_sndbuf_length; + log_meta_data->os_default_socket_sndbuf_length = os_default_socket_sndbuf_length; + log_meta_data->os_max_socket_sndbuf_length = os_max_socket_sndbuf_length; + log_meta_data->socket_rcvbuf_length = socket_rcvbuf_length; + log_meta_data->os_default_socket_rcvbuf_length = os_default_socket_rcvbuf_length; + log_meta_data->os_max_socket_rcvbuf_length = os_max_socket_rcvbuf_length; + log_meta_data->max_resend = max_resend; + + aeron_logbuffer_fill_default_header(log_meta_data_buffer, session_id, stream_id, initial_term_id); + + log_meta_data->entity_tag = entity_tag; + log_meta_data->response_correlation_id = response_correlation_id; + log_meta_data->linger_timeout_ns = linger_timeout_ns; + log_meta_data->untethered_window_limit_timeout_ns = untethered_window_limit_timeout_ns; + log_meta_data->untethered_resting_timeout_ns = untethered_resting_timeout_ns; + log_meta_data->group = group; + log_meta_data->is_response = is_response; + log_meta_data->rejoin = rejoin; + log_meta_data->reliable = reliable; + log_meta_data->sparse = sparse; + log_meta_data->signal_eos = signal_eos; + log_meta_data->spies_simulate_connection = spies_simulate_connection; + log_meta_data->tether = tether; +} + inline void aeron_logbuffer_apply_default_header(uint8_t *log_meta_data_buffer, uint8_t *buffer) { aeron_logbuffer_metadata_t *log_meta_data = (aeron_logbuffer_metadata_t *)log_meta_data_buffer; diff --git a/aeron-driver/src/main/c/aeron_driver_conductor.c b/aeron-driver/src/main/c/aeron_driver_conductor.c index 89c0f8a59d..5cd5dc151d 100644 --- a/aeron-driver/src/main/c/aeron_driver_conductor.c +++ b/aeron-driver/src/main/c/aeron_driver_conductor.c @@ -6009,7 +6009,7 @@ void aeron_driver_conductor_on_create_publication_image(void *clientd, void *ite &image, endpoint, destination, - conductor->context, + conductor, registration_id, command->session_id, command->stream_id, diff --git a/aeron-driver/src/main/c/aeron_ipc_publication.c b/aeron-driver/src/main/c/aeron_ipc_publication.c index 685c83592d..1cbd6744d9 100644 --- a/aeron-driver/src/main/c/aeron_ipc_publication.c +++ b/aeron-driver/src/main/c/aeron_ipc_publication.c @@ -137,16 +137,40 @@ int aeron_ipc_publication_create( int64_t now_ns = aeron_clock_cached_nano_time(context->cached_clock); - _pub->log_meta_data->initial_term_id = initial_term_id; - _pub->log_meta_data->mtu_length = (int32_t)params->mtu_length; - _pub->log_meta_data->term_length = (int32_t)params->term_length; - _pub->log_meta_data->page_size = (int32_t)context->file_page_size; - _pub->log_meta_data->correlation_id = registration_id; - _pub->log_meta_data->is_connected = 0; - _pub->log_meta_data->active_transport_count = 0; - _pub->log_meta_data->end_of_stream_position = INT64_MAX; - aeron_logbuffer_fill_default_header( - _pub->mapped_raw_log.log_meta_data.addr, session_id, stream_id, initial_term_id); + aeron_logbuffer_metadata_init( + _pub->mapped_raw_log.log_meta_data.addr, + INT64_MAX, + 0, + 0, + registration_id, + initial_term_id, + (int32_t)params->mtu_length, + (int32_t)params->term_length, + (int32_t)context->file_page_size, + (int32_t)params->publication_window_length, + 0, + 0, + (int32_t)context->os_buffer_lengths.default_so_sndbuf, + (int32_t)context->os_buffer_lengths.max_so_sndbuf, + 0, + (int32_t)context->os_buffer_lengths.default_so_rcvbuf, + (int32_t)context->os_buffer_lengths.max_so_rcvbuf, + (int32_t)params->max_resend, + session_id, + stream_id, + (int64_t)params->entity_tag, + (int64_t)params->response_correlation_id, + (int64_t)params->linger_timeout_ns, + (int64_t)params->untethered_window_limit_timeout_ns, + (int64_t)params->untethered_resting_timeout_ns, + (uint8_t)false, + (uint8_t)params->is_response, + (uint8_t)false, + (uint8_t)false, + (uint8_t)params->is_sparse, + (uint8_t)params->signal_eos, + (uint8_t)params->spies_simulate_connection, + (uint8_t)false); _pub->conductor_fields.subscribable.correlation_id = registration_id; _pub->conductor_fields.subscribable.array = NULL; @@ -174,8 +198,7 @@ int aeron_ipc_publication_create( _pub->starting_term_id = params->has_position ? params->term_id : initial_term_id; _pub->starting_term_offset = params->has_position ? params->term_offset : 0; _pub->position_bits_to_shift = (size_t)aeron_number_of_trailing_zeroes((int32_t)params->term_length); - _pub->term_window_length = (int64_t)aeron_producer_window_length( - context->ipc_publication_window_length, params->term_length); + _pub->term_window_length = params->publication_window_length; _pub->trip_gain = _pub->term_window_length / 8; _pub->unblock_timeout_ns = (int64_t)context->publication_unblock_timeout_ns; _pub->untethered_window_limit_timeout_ns = (int64_t)params->untethered_window_limit_timeout_ns; diff --git a/aeron-driver/src/main/c/aeron_network_publication.c b/aeron-driver/src/main/c/aeron_network_publication.c index 81aa01a082..01f661f375 100644 --- a/aeron-driver/src/main/c/aeron_network_publication.c +++ b/aeron-driver/src/main/c/aeron_network_publication.c @@ -152,12 +152,14 @@ int aeron_network_publication_create( int64_t *retransmit_overflow_counter = aeron_system_counter_addr( system_counters, AERON_SYSTEM_COUNTER_RETRANSMIT_OVERFLOW); + bool has_group_semantics = aeron_udp_channel_has_group_semantics(endpoint->conductor_fields.udp_channel); + if (aeron_retransmit_handler_init( &_pub->retransmit_handler, aeron_system_counter_addr(system_counters, AERON_SYSTEM_COUNTER_INVALID_PACKETS), context->retransmit_unicast_delay_ns, context->retransmit_unicast_linger_ns, - aeron_udp_channel_has_group_semantics(endpoint->conductor_fields.udp_channel), + has_group_semantics, params->has_max_resend ? params->max_resend : context->max_resend, retransmit_overflow_counter) < 0) { @@ -226,16 +228,40 @@ int aeron_network_publication_create( // Called from conductor thread... int64_t now_ns = aeron_clock_cached_nano_time(context->cached_clock); - _pub->log_meta_data->initial_term_id = initial_term_id; - _pub->log_meta_data->mtu_length = (int32_t)params->mtu_length; - _pub->log_meta_data->term_length = (int32_t)params->term_length; - _pub->log_meta_data->page_size = (int32_t)context->file_page_size; - _pub->log_meta_data->correlation_id = registration_id; - _pub->log_meta_data->is_connected = 0; - _pub->log_meta_data->active_transport_count = 0; - _pub->log_meta_data->end_of_stream_position = INT64_MAX; - aeron_logbuffer_fill_default_header( - _pub->mapped_raw_log.log_meta_data.addr, session_id, stream_id, initial_term_id); + aeron_logbuffer_metadata_init( + _pub->mapped_raw_log.log_meta_data.addr, + INT64_MAX, + 0, + 0, + registration_id, + initial_term_id, + (int32_t)params->mtu_length, + (int32_t)params->term_length, + (int32_t)context->file_page_size, + (int32_t)params->publication_window_length, + 0, + (int32_t)endpoint->conductor_fields.udp_channel->socket_sndbuf_length, + (int32_t)context->os_buffer_lengths.default_so_sndbuf, + (int32_t)context->os_buffer_lengths.max_so_sndbuf, + (int32_t)endpoint->conductor_fields.udp_channel->socket_rcvbuf_length, + (int32_t)context->os_buffer_lengths.default_so_rcvbuf, + (int32_t)context->os_buffer_lengths.max_so_rcvbuf, + (int32_t)params->max_resend, + session_id, + stream_id, + (int64_t)params->entity_tag, + (int64_t)params->response_correlation_id, + (int64_t)params->linger_timeout_ns, + (int64_t)params->untethered_window_limit_timeout_ns, + (int64_t)params->untethered_resting_timeout_ns, + (uint8_t)has_group_semantics, + (uint8_t)params->is_response, + (uint8_t)false, + (uint8_t)false, + (uint8_t)params->is_sparse, + (uint8_t)params->signal_eos, + (uint8_t)params->spies_simulate_connection, + (uint8_t)false); _pub->endpoint = endpoint; _pub->flow_control = flow_control_strategy; @@ -279,8 +305,7 @@ int aeron_network_publication_create( _pub->mtu_length = params->mtu_length; _pub->max_messages_per_send = context->network_publication_max_messages_per_send; _pub->current_messages_per_send = _pub->max_messages_per_send; - _pub->term_window_length = (int64_t)aeron_producer_window_length( - context->publication_window_length, params->term_length); + _pub->term_window_length = params->publication_window_length; _pub->linger_timeout_ns = (int64_t)params->linger_timeout_ns; _pub->untethered_window_limit_timeout_ns = (int64_t)params->untethered_window_limit_timeout_ns; _pub->untethered_resting_timeout_ns = (int64_t)params->untethered_resting_timeout_ns; diff --git a/aeron-driver/src/main/c/aeron_publication_image.c b/aeron-driver/src/main/c/aeron_publication_image.c index 37ae8c1fc7..682aa1c5cd 100644 --- a/aeron-driver/src/main/c/aeron_publication_image.c +++ b/aeron-driver/src/main/c/aeron_publication_image.c @@ -110,7 +110,7 @@ int aeron_publication_image_create( aeron_publication_image_t **image, aeron_receive_channel_endpoint_t *endpoint, aeron_receive_destination_t *destination, - aeron_driver_context_t *context, + aeron_driver_conductor_t *conductor, int64_t correlation_id, int32_t session_id, int32_t stream_id, @@ -131,6 +131,8 @@ int aeron_publication_image_create( bool treat_as_multicast, aeron_system_counters_t *system_counters) { + aeron_driver_context_t *context = conductor->context; + aeron_publication_image_t *_image = NULL; const uint64_t log_length = aeron_logbuffer_compute_log_length( (uint64_t)term_buffer_length, context->file_page_size); @@ -216,16 +218,47 @@ int aeron_publication_image_create( _image->log_file_name_length = (size_t)path_length; _image->log_meta_data = (aeron_logbuffer_metadata_t *)(_image->mapped_raw_log.log_meta_data.addr); - _image->log_meta_data->initial_term_id = initial_term_id; - _image->log_meta_data->mtu_length = sender_mtu_length; - _image->log_meta_data->term_length = term_buffer_length; - _image->log_meta_data->page_size = (int32_t)context->file_page_size; - _image->log_meta_data->correlation_id = correlation_id; - _image->log_meta_data->is_connected = 0; - _image->log_meta_data->active_transport_count = 0; - _image->log_meta_data->end_of_stream_position = INT64_MAX; - aeron_logbuffer_fill_default_header( - _image->mapped_raw_log.log_meta_data.addr, session_id, stream_id, initial_term_id); + aeron_driver_uri_subscription_params_t params; + if (aeron_driver_uri_subscription_params(&endpoint->conductor_fields.udp_channel->uri, ¶ms, conductor) < 0) + { + AERON_APPEND_ERR("%s", ""); + goto error; + } + + aeron_logbuffer_metadata_init( + _image->mapped_raw_log.log_meta_data.addr, + INT64_MAX, + 0, + 0, + correlation_id, + initial_term_id, + sender_mtu_length, + term_buffer_length, + (int32_t)context->file_page_size, + 0, + (int32_t)params.initial_window_length, + (int32_t)endpoint->conductor_fields.udp_channel->socket_sndbuf_length, + (int32_t)context->os_buffer_lengths.default_so_sndbuf, + (int32_t)context->os_buffer_lengths.max_so_sndbuf, + (int32_t)endpoint->conductor_fields.udp_channel->socket_rcvbuf_length, + (int32_t)context->os_buffer_lengths.default_so_rcvbuf, + (int32_t)context->os_buffer_lengths.max_so_rcvbuf, + 0, + session_id, + stream_id, + 0, + 0, + 0, + 0, + 0, + (uint8_t)treat_as_multicast, + (uint8_t)params.is_response, + (uint8_t)params.is_rejoin, + (uint8_t)params.is_reliable, + (uint8_t)params.is_sparse, + (uint8_t)false, + (uint8_t)false, + (uint8_t)params.is_tether); _image->endpoint = endpoint; _image->conductor_fields.endpoint = endpoint; diff --git a/aeron-driver/src/main/c/aeron_publication_image.h b/aeron-driver/src/main/c/aeron_publication_image.h index cea99c2974..551ebaf0de 100644 --- a/aeron-driver/src/main/c/aeron_publication_image.h +++ b/aeron-driver/src/main/c/aeron_publication_image.h @@ -160,7 +160,7 @@ int aeron_publication_image_create( aeron_publication_image_t **image, aeron_receive_channel_endpoint_t *endpoint, aeron_receive_destination_t *destination, - aeron_driver_context_t *context, + aeron_driver_conductor_t *conductor, int64_t correlation_id, int32_t session_id, int32_t stream_id, diff --git a/aeron-driver/src/main/c/uri/aeron_driver_uri.c b/aeron-driver/src/main/c/uri/aeron_driver_uri.c index 3ca9bf7186..10110c78a3 100644 --- a/aeron-driver/src/main/c/uri/aeron_driver_uri.c +++ b/aeron-driver/src/main/c/uri/aeron_driver_uri.c @@ -107,6 +107,51 @@ int aeron_uri_get_mtu_length_param(aeron_uri_params_t *uri_params, aeron_driver_ return 0; } +int aeron_uri_get_publication_window_length_param(aeron_uri_params_t *uri_params, aeron_driver_uri_publication_params_t *params) +{ + const char *value_str; + + if ((value_str = aeron_uri_find_param_value(uri_params, AERON_URI_PUBLICATION_WINDOW_KEY)) != NULL) + { + uint64_t value; + + if (-1 == aeron_parse_size64(value_str, &value)) + { + AERON_SET_ERR(EINVAL, "could not parse %s=%s in URI", AERON_URI_PUBLICATION_WINDOW_KEY, value_str); + return -1; + } + + if (value < params->mtu_length) + { + AERON_SET_ERR( + EINVAL, + "%s=" PRIu64 " cannot be less than the %s=" PRIu64, + AERON_URI_PUBLICATION_WINDOW_KEY, + value, + AERON_URI_MTU_LENGTH_KEY, + params->mtu_length); + return -1; + } + + if (value > (params->term_length >> 1)) + { + AERON_SET_ERR( + EINVAL, + "%s=" PRIu64 " must not exceed half the %s=" PRIu64, + AERON_URI_PUBLICATION_WINDOW_KEY, + value, + AERON_URI_TERM_LENGTH_KEY, + params->term_length); + return -1; + } + + params->publication_window_length = (int32_t)value; + params->has_publication_window_length = true; + } + + return 0; +} + int aeron_uri_linger_timeout_param(aeron_uri_params_t *uri_params, aeron_driver_uri_publication_params_t *params) { return aeron_uri_get_timeout(uri_params, AERON_URI_LINGER_TIMEOUT_KEY, ¶ms->linger_timeout_ns); @@ -196,6 +241,10 @@ int aeron_diver_uri_publication_params( params->response_correlation_id = AERON_NULL_VALUE; params->has_max_resend = false; params->max_resend = 0; + params->is_response = + (AERON_URI_UDP == uri->type && + NULL != uri->params.udp.control_mode && + strcmp(uri->params.udp.control_mode, AERON_UDP_CHANNEL_CONTROL_MODE_RESPONSE_VALUE) == 0); aeron_uri_params_t *uri_params = AERON_URI_IPC == uri->type ? &uri->params.ipc.additional_params : &uri->params.udp.additional_params; @@ -240,6 +289,16 @@ int aeron_diver_uri_publication_params( return -1; } + params->publication_window_length = (int32_t)aeron_producer_window_length( + AERON_URI_IPC == uri->type ? context->ipc_publication_window_length : context->publication_window_length, + params->term_length); + params->has_publication_window_length = false; + + if (aeron_uri_get_publication_window_length_param(uri_params, params) < 0) + { + return -1; + } + int count = 0; int32_t initial_term_id; diff --git a/aeron-driver/src/main/c/uri/aeron_driver_uri.h b/aeron-driver/src/main/c/uri/aeron_driver_uri.h index 70bc168cb3..4c5cb04e01 100644 --- a/aeron-driver/src/main/c/uri/aeron_driver_uri.h +++ b/aeron-driver/src/main/c/uri/aeron_driver_uri.h @@ -43,6 +43,9 @@ typedef struct aeron_driver_uri_publication_params_stct int64_t response_correlation_id; bool has_max_resend; uint32_t max_resend; + bool has_publication_window_length; + int32_t publication_window_length; + bool is_response; } aeron_driver_uri_publication_params_t; diff --git a/aeron-driver/src/test/c/aeron_receiver_test.h b/aeron-driver/src/test/c/aeron_receiver_test.h index e2f8b1cfa6..707b38405a 100644 --- a/aeron-driver/src/test/c/aeron_receiver_test.h +++ b/aeron-driver/src/test/c/aeron_receiver_test.h @@ -222,8 +222,11 @@ class ReceiverTestBase : public testing::Test m_context, &m_counters_manager); + aeron_driver_conductor_t conductor; // the conductor struct is only used for its context + conductor.context = m_context; + if (aeron_publication_image_create( - &image, endpoint, destination, m_context, correlation_id, session_id, stream_id, 0, 0, 0, + &image, endpoint, destination, &conductor, correlation_id, session_id, stream_id, 0, 0, 0, &hwm_position, &pos_position, congestion_control_strategy, &channel->remote_control, &channel->local_data, TERM_BUFFER_SIZE, MTU, UINT8_C(0), nullptr, true, true, false, &m_system_counters) < 0)