Skip to content

Commit

Permalink
[C] add a call to init the new fields in the logbuffer metadata (#1717)
Browse files Browse the repository at this point in the history
* [C] add a call to init the new fields in the logbuffer metadata

* [C] implement aeron_uri_get_publication_window_length_param

* [C] implement the last 4 missing bits of metadata
  • Loading branch information
nbradac authored Jan 15, 2025
1 parent 1dc7886 commit b5ccbce
Show file tree
Hide file tree
Showing 10 changed files with 298 additions and 39 deletions.
34 changes: 34 additions & 0 deletions aeron-client/src/main/c/concurrent/aeron_logbuffer_descriptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -266,5 +266,39 @@ extern bool aeron_logbuffer_rotate_log(
aeron_logbuffer_metadata_t *log_meta_data, int32_t current_term_count, int32_t current_term_id);
extern void aeron_logbuffer_fill_default_header(
uint8_t *log_meta_data_buffer, int32_t session_id, int32_t stream_id, int32_t initial_term_id);
extern void aeron_logbuffer_metadata_init(
uint8_t *log_meta_data_buffer,
int64_t end_of_stream_position,
int32_t is_connected,
int32_t active_transport_count,
int64_t correlation_id,
int32_t initial_term_id,
int32_t mtu_length,
int32_t term_length,
int32_t page_size,
int32_t publication_window_length,
int32_t receiver_window_length,
int32_t socket_sndbuf_length,
int32_t os_default_socket_sndbuf_length,
int32_t os_max_socket_sndbuf_length,
int32_t socket_rcvbuf_length,
int32_t os_default_socket_rcvbuf_length,
int32_t os_max_socket_rcvbuf_length,
int32_t max_resend,
int32_t session_id,
int32_t stream_id,
int64_t entity_tag,
int64_t response_correlation_id,
int64_t linger_timeout_ns,
int64_t untethered_window_limit_timeout_ns,
int64_t untethered_resting_timeout_ns,
uint8_t group,
uint8_t is_response,
uint8_t rejoin,
uint8_t reliable,
uint8_t sparse,
uint8_t signal_eos,
uint8_t spies_simulate_connection,
uint8_t tether);
extern void aeron_logbuffer_apply_default_header(uint8_t *log_meta_data_buffer, uint8_t *buffer);
extern size_t aeron_logbuffer_compute_fragmented_length(size_t length, size_t max_payload_length);
79 changes: 79 additions & 0 deletions aeron-client/src/main/c/concurrent/aeron_logbuffer_descriptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,85 @@ inline void aeron_logbuffer_fill_default_header(
data_header->reserved_value = AERON_DATA_HEADER_DEFAULT_RESERVED_VALUE;
}

/*
* Does NOT initialize the following fields:
* - term_tail_counters
* - active_term_count
*/
inline void aeron_logbuffer_metadata_init(
uint8_t *log_meta_data_buffer,
int64_t end_of_stream_position,
int32_t is_connected,
int32_t active_transport_count,
int64_t correlation_id,
int32_t initial_term_id,
int32_t mtu_length,
int32_t term_length,
int32_t page_size,
int32_t publication_window_length,
int32_t receiver_window_length,
int32_t socket_sndbuf_length,
int32_t os_default_socket_sndbuf_length,
int32_t os_max_socket_sndbuf_length,
int32_t socket_rcvbuf_length,
int32_t os_default_socket_rcvbuf_length,
int32_t os_max_socket_rcvbuf_length,
int32_t max_resend,
int32_t session_id,
int32_t stream_id,
int64_t entity_tag,
int64_t response_correlation_id,
int64_t linger_timeout_ns,
int64_t untethered_window_limit_timeout_ns,
int64_t untethered_resting_timeout_ns,
uint8_t group,
uint8_t is_response,
uint8_t rejoin,
uint8_t reliable,
uint8_t sparse,
uint8_t signal_eos,
uint8_t spies_simulate_connection,
uint8_t tether)
{
aeron_logbuffer_metadata_t *log_meta_data = (aeron_logbuffer_metadata_t *)log_meta_data_buffer;

log_meta_data->end_of_stream_position = end_of_stream_position;
log_meta_data->is_connected = is_connected;
log_meta_data->active_transport_count = active_transport_count;

log_meta_data->correlation_id = correlation_id;
log_meta_data->initial_term_id = initial_term_id;
log_meta_data->mtu_length = mtu_length;
log_meta_data->term_length = term_length;
log_meta_data->page_size = page_size;

log_meta_data->publication_window_length = publication_window_length;
log_meta_data->receiver_window_length = receiver_window_length;
log_meta_data->socket_sndbuf_length = socket_sndbuf_length;
log_meta_data->os_default_socket_sndbuf_length = os_default_socket_sndbuf_length;
log_meta_data->os_max_socket_sndbuf_length = os_max_socket_sndbuf_length;
log_meta_data->socket_rcvbuf_length = socket_rcvbuf_length;
log_meta_data->os_default_socket_rcvbuf_length = os_default_socket_rcvbuf_length;
log_meta_data->os_max_socket_rcvbuf_length = os_max_socket_rcvbuf_length;
log_meta_data->max_resend = max_resend;

aeron_logbuffer_fill_default_header(log_meta_data_buffer, session_id, stream_id, initial_term_id);

log_meta_data->entity_tag = entity_tag;
log_meta_data->response_correlation_id = response_correlation_id;
log_meta_data->linger_timeout_ns = linger_timeout_ns;
log_meta_data->untethered_window_limit_timeout_ns = untethered_window_limit_timeout_ns;
log_meta_data->untethered_resting_timeout_ns = untethered_resting_timeout_ns;
log_meta_data->group = group;
log_meta_data->is_response = is_response;
log_meta_data->rejoin = rejoin;
log_meta_data->reliable = reliable;
log_meta_data->sparse = sparse;
log_meta_data->signal_eos = signal_eos;
log_meta_data->spies_simulate_connection = spies_simulate_connection;
log_meta_data->tether = tether;
}

inline void aeron_logbuffer_apply_default_header(uint8_t *log_meta_data_buffer, uint8_t *buffer)
{
aeron_logbuffer_metadata_t *log_meta_data = (aeron_logbuffer_metadata_t *)log_meta_data_buffer;
Expand Down
2 changes: 1 addition & 1 deletion aeron-driver/src/main/c/aeron_driver_conductor.c
Original file line number Diff line number Diff line change
Expand Up @@ -6009,7 +6009,7 @@ void aeron_driver_conductor_on_create_publication_image(void *clientd, void *ite
&image,
endpoint,
destination,
conductor->context,
conductor,
registration_id,
command->session_id,
command->stream_id,
Expand Down
47 changes: 35 additions & 12 deletions aeron-driver/src/main/c/aeron_ipc_publication.c
Original file line number Diff line number Diff line change
Expand Up @@ -137,16 +137,40 @@ int aeron_ipc_publication_create(

int64_t now_ns = aeron_clock_cached_nano_time(context->cached_clock);

_pub->log_meta_data->initial_term_id = initial_term_id;
_pub->log_meta_data->mtu_length = (int32_t)params->mtu_length;
_pub->log_meta_data->term_length = (int32_t)params->term_length;
_pub->log_meta_data->page_size = (int32_t)context->file_page_size;
_pub->log_meta_data->correlation_id = registration_id;
_pub->log_meta_data->is_connected = 0;
_pub->log_meta_data->active_transport_count = 0;
_pub->log_meta_data->end_of_stream_position = INT64_MAX;
aeron_logbuffer_fill_default_header(
_pub->mapped_raw_log.log_meta_data.addr, session_id, stream_id, initial_term_id);
aeron_logbuffer_metadata_init(
_pub->mapped_raw_log.log_meta_data.addr,
INT64_MAX,
0,
0,
registration_id,
initial_term_id,
(int32_t)params->mtu_length,
(int32_t)params->term_length,
(int32_t)context->file_page_size,
(int32_t)params->publication_window_length,
0,
0,
(int32_t)context->os_buffer_lengths.default_so_sndbuf,
(int32_t)context->os_buffer_lengths.max_so_sndbuf,
0,
(int32_t)context->os_buffer_lengths.default_so_rcvbuf,
(int32_t)context->os_buffer_lengths.max_so_rcvbuf,
(int32_t)params->max_resend,
session_id,
stream_id,
(int64_t)params->entity_tag,
(int64_t)params->response_correlation_id,
(int64_t)params->linger_timeout_ns,
(int64_t)params->untethered_window_limit_timeout_ns,
(int64_t)params->untethered_resting_timeout_ns,
(uint8_t)false,
(uint8_t)params->is_response,
(uint8_t)false,
(uint8_t)false,
(uint8_t)params->is_sparse,
(uint8_t)params->signal_eos,
(uint8_t)params->spies_simulate_connection,
(uint8_t)false);

_pub->conductor_fields.subscribable.correlation_id = registration_id;
_pub->conductor_fields.subscribable.array = NULL;
Expand Down Expand Up @@ -174,8 +198,7 @@ int aeron_ipc_publication_create(
_pub->starting_term_id = params->has_position ? params->term_id : initial_term_id;
_pub->starting_term_offset = params->has_position ? params->term_offset : 0;
_pub->position_bits_to_shift = (size_t)aeron_number_of_trailing_zeroes((int32_t)params->term_length);
_pub->term_window_length = (int64_t)aeron_producer_window_length(
context->ipc_publication_window_length, params->term_length);
_pub->term_window_length = params->publication_window_length;
_pub->trip_gain = _pub->term_window_length / 8;
_pub->unblock_timeout_ns = (int64_t)context->publication_unblock_timeout_ns;
_pub->untethered_window_limit_timeout_ns = (int64_t)params->untethered_window_limit_timeout_ns;
Expand Down
51 changes: 38 additions & 13 deletions aeron-driver/src/main/c/aeron_network_publication.c
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,14 @@ int aeron_network_publication_create(
int64_t *retransmit_overflow_counter = aeron_system_counter_addr(
system_counters, AERON_SYSTEM_COUNTER_RETRANSMIT_OVERFLOW);

bool has_group_semantics = aeron_udp_channel_has_group_semantics(endpoint->conductor_fields.udp_channel);

if (aeron_retransmit_handler_init(
&_pub->retransmit_handler,
aeron_system_counter_addr(system_counters, AERON_SYSTEM_COUNTER_INVALID_PACKETS),
context->retransmit_unicast_delay_ns,
context->retransmit_unicast_linger_ns,
aeron_udp_channel_has_group_semantics(endpoint->conductor_fields.udp_channel),
has_group_semantics,
params->has_max_resend ? params->max_resend : context->max_resend,
retransmit_overflow_counter) < 0)
{
Expand Down Expand Up @@ -226,16 +228,40 @@ int aeron_network_publication_create(
// Called from conductor thread...
int64_t now_ns = aeron_clock_cached_nano_time(context->cached_clock);

_pub->log_meta_data->initial_term_id = initial_term_id;
_pub->log_meta_data->mtu_length = (int32_t)params->mtu_length;
_pub->log_meta_data->term_length = (int32_t)params->term_length;
_pub->log_meta_data->page_size = (int32_t)context->file_page_size;
_pub->log_meta_data->correlation_id = registration_id;
_pub->log_meta_data->is_connected = 0;
_pub->log_meta_data->active_transport_count = 0;
_pub->log_meta_data->end_of_stream_position = INT64_MAX;
aeron_logbuffer_fill_default_header(
_pub->mapped_raw_log.log_meta_data.addr, session_id, stream_id, initial_term_id);
aeron_logbuffer_metadata_init(
_pub->mapped_raw_log.log_meta_data.addr,
INT64_MAX,
0,
0,
registration_id,
initial_term_id,
(int32_t)params->mtu_length,
(int32_t)params->term_length,
(int32_t)context->file_page_size,
(int32_t)params->publication_window_length,
0,
(int32_t)endpoint->conductor_fields.udp_channel->socket_sndbuf_length,
(int32_t)context->os_buffer_lengths.default_so_sndbuf,
(int32_t)context->os_buffer_lengths.max_so_sndbuf,
(int32_t)endpoint->conductor_fields.udp_channel->socket_rcvbuf_length,
(int32_t)context->os_buffer_lengths.default_so_rcvbuf,
(int32_t)context->os_buffer_lengths.max_so_rcvbuf,
(int32_t)params->max_resend,
session_id,
stream_id,
(int64_t)params->entity_tag,
(int64_t)params->response_correlation_id,
(int64_t)params->linger_timeout_ns,
(int64_t)params->untethered_window_limit_timeout_ns,
(int64_t)params->untethered_resting_timeout_ns,
(uint8_t)has_group_semantics,
(uint8_t)params->is_response,
(uint8_t)false,
(uint8_t)false,
(uint8_t)params->is_sparse,
(uint8_t)params->signal_eos,
(uint8_t)params->spies_simulate_connection,
(uint8_t)false);

_pub->endpoint = endpoint;
_pub->flow_control = flow_control_strategy;
Expand Down Expand Up @@ -279,8 +305,7 @@ int aeron_network_publication_create(
_pub->mtu_length = params->mtu_length;
_pub->max_messages_per_send = context->network_publication_max_messages_per_send;
_pub->current_messages_per_send = _pub->max_messages_per_send;
_pub->term_window_length = (int64_t)aeron_producer_window_length(
context->publication_window_length, params->term_length);
_pub->term_window_length = params->publication_window_length;
_pub->linger_timeout_ns = (int64_t)params->linger_timeout_ns;
_pub->untethered_window_limit_timeout_ns = (int64_t)params->untethered_window_limit_timeout_ns;
_pub->untethered_resting_timeout_ns = (int64_t)params->untethered_resting_timeout_ns;
Expand Down
55 changes: 44 additions & 11 deletions aeron-driver/src/main/c/aeron_publication_image.c
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ int aeron_publication_image_create(
aeron_publication_image_t **image,
aeron_receive_channel_endpoint_t *endpoint,
aeron_receive_destination_t *destination,
aeron_driver_context_t *context,
aeron_driver_conductor_t *conductor,
int64_t correlation_id,
int32_t session_id,
int32_t stream_id,
Expand All @@ -131,6 +131,8 @@ int aeron_publication_image_create(
bool treat_as_multicast,
aeron_system_counters_t *system_counters)
{
aeron_driver_context_t *context = conductor->context;

aeron_publication_image_t *_image = NULL;
const uint64_t log_length = aeron_logbuffer_compute_log_length(
(uint64_t)term_buffer_length, context->file_page_size);
Expand Down Expand Up @@ -216,16 +218,47 @@ int aeron_publication_image_create(
_image->log_file_name_length = (size_t)path_length;
_image->log_meta_data = (aeron_logbuffer_metadata_t *)(_image->mapped_raw_log.log_meta_data.addr);

_image->log_meta_data->initial_term_id = initial_term_id;
_image->log_meta_data->mtu_length = sender_mtu_length;
_image->log_meta_data->term_length = term_buffer_length;
_image->log_meta_data->page_size = (int32_t)context->file_page_size;
_image->log_meta_data->correlation_id = correlation_id;
_image->log_meta_data->is_connected = 0;
_image->log_meta_data->active_transport_count = 0;
_image->log_meta_data->end_of_stream_position = INT64_MAX;
aeron_logbuffer_fill_default_header(
_image->mapped_raw_log.log_meta_data.addr, session_id, stream_id, initial_term_id);
aeron_driver_uri_subscription_params_t params;
if (aeron_driver_uri_subscription_params(&endpoint->conductor_fields.udp_channel->uri, &params, conductor) < 0)
{
AERON_APPEND_ERR("%s", "");
goto error;
}

aeron_logbuffer_metadata_init(
_image->mapped_raw_log.log_meta_data.addr,
INT64_MAX,
0,
0,
correlation_id,
initial_term_id,
sender_mtu_length,
term_buffer_length,
(int32_t)context->file_page_size,
0,
(int32_t)params.initial_window_length,
(int32_t)endpoint->conductor_fields.udp_channel->socket_sndbuf_length,
(int32_t)context->os_buffer_lengths.default_so_sndbuf,
(int32_t)context->os_buffer_lengths.max_so_sndbuf,
(int32_t)endpoint->conductor_fields.udp_channel->socket_rcvbuf_length,
(int32_t)context->os_buffer_lengths.default_so_rcvbuf,
(int32_t)context->os_buffer_lengths.max_so_rcvbuf,
0,
session_id,
stream_id,
0,
0,
0,
0,
0,
(uint8_t)treat_as_multicast,
(uint8_t)params.is_response,
(uint8_t)params.is_rejoin,
(uint8_t)params.is_reliable,
(uint8_t)params.is_sparse,
(uint8_t)false,
(uint8_t)false,
(uint8_t)params.is_tether);

_image->endpoint = endpoint;
_image->conductor_fields.endpoint = endpoint;
Expand Down
2 changes: 1 addition & 1 deletion aeron-driver/src/main/c/aeron_publication_image.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ int aeron_publication_image_create(
aeron_publication_image_t **image,
aeron_receive_channel_endpoint_t *endpoint,
aeron_receive_destination_t *destination,
aeron_driver_context_t *context,
aeron_driver_conductor_t *conductor,
int64_t correlation_id,
int32_t session_id,
int32_t stream_id,
Expand Down
Loading

0 comments on commit b5ccbce

Please sign in to comment.