From b311dd9ce8a7ed9291938f59ca42560255e495f8 Mon Sep 17 00:00:00 2001 From: Nate Bradac <38217740+nbradac@users.noreply.github.com> Date: Wed, 15 Jan 2025 17:11:36 -0600 Subject: [PATCH] [Java] Add the few missing fields for logbuffer descriptor (#1721) * Adding missing fields to the logbuffer metadata The following fields are missing: int64_t entity_tag; int64_t response_correlation_id; uint8_t group; uint8_t is_response; * [Java] clean up last few logbuffer metadata fields --------- Co-authored-by: Peter Veentjer --- .../aeron/logbuffer/LogBufferDescriptor.java | 89 +++++++++++++++++++ .../java/io/aeron/driver/DriverConductor.java | 49 ++++++++-- 2 files changed, 131 insertions(+), 7 deletions(-) diff --git a/aeron-client/src/main/java/io/aeron/logbuffer/LogBufferDescriptor.java b/aeron-client/src/main/java/io/aeron/logbuffer/LogBufferDescriptor.java index a0c9daa717..ae28d40a07 100644 --- a/aeron-client/src/main/java/io/aeron/logbuffer/LogBufferDescriptor.java +++ b/aeron-client/src/main/java/io/aeron/logbuffer/LogBufferDescriptor.java @@ -1099,6 +1099,51 @@ public static void tether(final UnsafeBuffer metadataBuffer, final boolean value metadataBuffer.putByte(LOG_TETHER_OFFSET, (byte)(value ? 1 : 0)); } + /** + * Get whether the log is group from the metadata. + * + * @param metadataBuffer containing the meta data. + * @return true if the log is group, otherwise false. + */ + public static boolean group(final UnsafeBuffer metadataBuffer) + { + return metadataBuffer.getByte(LOG_GROUP_OFFSET) == 1; + } + + /** + * Set whether the log is group in the metadata. + * + * @param metadataBuffer containing the meta data. + * @param value true if the log is group, otherwise false. + */ + public static void group(final UnsafeBuffer metadataBuffer, final boolean value) + { + metadataBuffer.putByte(LOG_GROUP_OFFSET, (byte)(value ? 1 : 0)); + } + + /** + * Get whether the log is response from the metadata. + * + * @param metadataBuffer containing the meta data. + * @return true if the log is group, otherwise false. + */ + public static boolean isResponse(final UnsafeBuffer metadataBuffer) + { + return metadataBuffer.getByte(LOG_IS_RESPONSE_OFFSET) == 1; + } + + /** + * Set whether the log is response in the metadata. + * + * @param metadataBuffer containing the meta data. + * @param value true if the log is group, otherwise false. + */ + public static void isResponse(final UnsafeBuffer metadataBuffer, final boolean value) + { + metadataBuffer.putByte(LOG_IS_RESPONSE_OFFSET, (byte)(value ? 1 : 0)); + } + + /** * Get whether the log is rejoining from the metadata. * @@ -1407,6 +1452,50 @@ public static void lingerTimeoutNs(final UnsafeBuffer metadataBuffer, final long metadataBuffer.putLong(LOG_LINGER_TIMEOUT_NS_OFFSET, value); } + /** + * Get the entity tag from the metadata. + * + * @param metadataBuffer containing the meta data. + * @return the entity tag in nanoseconds. + */ + public static long entityTag(final UnsafeBuffer metadataBuffer) + { + return metadataBuffer.getLong(LOG_ENTITY_TAG_OFFSET); + } + + /** + * Set the entity tag in the metadata. + * + * @param metadataBuffer containing the meta data. + * @param value the entity tag to set. + */ + public static void entityTag(final UnsafeBuffer metadataBuffer, final long value) + { + metadataBuffer.putLong(LOG_ENTITY_TAG_OFFSET, value); + } + + /** + * Get the response correlation id from the metadata. + * + * @param metadataBuffer containing the meta data. + * @return the entity tag in nanoseconds. + */ + public static long responseCorrelationId(final UnsafeBuffer metadataBuffer) + { + return metadataBuffer.getLong(LOG_RESPONSE_CORRELATION_ID_OFFSET); + } + + /** + * Set the response correlation id in the metadata. + * + * @param metadataBuffer containing the meta data. + * @param value the resonse correlation id to set. + */ + public static void responseCorrelationId(final UnsafeBuffer metadataBuffer, final long value) + { + metadataBuffer.putLong(LOG_RESPONSE_CORRELATION_ID_OFFSET, value); + } + /** * Get whether the signal EOS is enabled from the metadata. * diff --git a/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java b/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java index 580d15a406..af7a977279 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java +++ b/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java @@ -313,7 +313,8 @@ void onCreatePublicationImage( channelEndpoint.socketSndbufLength(), termOffset, subscriptionParams, - registrationId); + registrationId, + isMulticastSemantics(subscriptionChannel, subscriptionParams.group, flags)); congestionControl = ctx.congestionControlSupplier().newInstance( registrationId, @@ -1731,7 +1732,8 @@ private NetworkPublication newNetworkPublication( final int termOffset = params.termOffset; final RawLog rawLog = newNetworkPublicationLog(params.sessionId, streamId, params.initialTermId, registrationId, - channelEndpoint.socketRcvbufLength(), channelEndpoint.socketSndbufLength(), termOffset, params); + channelEndpoint.socketRcvbufLength(), channelEndpoint.socketSndbufLength(), termOffset, params, + udpChannel.hasGroupSemantics()); UnsafeBufferPosition publisherPos = null; UnsafeBufferPosition publisherLmt = null; UnsafeBufferPosition senderPos = null; @@ -1814,7 +1816,8 @@ private RawLog newNetworkPublicationLog( final int socketRcvBufLength, final int socketSndBufLength, final int termOffset, - final PublicationParams params) + final PublicationParams params, + final boolean hasGroupSemantics) { final RawLog rawLog = logFactory.newPublication(registrationId, params.termLength, params.isSparse); final int receiverWindowLength = 0; @@ -1835,6 +1838,8 @@ private RawLog newNetworkPublicationLog( rejoin, reliable, params.isSparse, + hasGroupSemantics, + params.isResponse, params.publicationWindowLength, params.untetheredWindowLimitTimeoutNs, params.untetheredRestingTimeoutNs, @@ -1842,6 +1847,8 @@ private RawLog newNetworkPublicationLog( params.lingerTimeoutNs, params.signalEos, params.spiesSimulateConnection, + params.entityTag, + params.responseCorrelationId, rawLog); initialisePositionCounters(initialTermId, params, rawLog.metaData()); @@ -1864,6 +1871,7 @@ private RawLog newIpcPublicationLog( final boolean tether = false; final boolean rejoin = false; final boolean reliable = false; + final boolean group = false; initLogMetadata( sessionId, streamId, @@ -1878,6 +1886,8 @@ private RawLog newIpcPublicationLog( rejoin, reliable, params.isSparse, + group, + params.isResponse, params.publicationWindowLength, params.untetheredWindowLimitTimeoutNs, params.untetheredRestingTimeoutNs, @@ -1885,6 +1895,8 @@ private RawLog newIpcPublicationLog( params.lingerTimeoutNs, params.signalEos, params.spiesSimulateConnection, + params.entityTag, + params.responseCorrelationId, rawLog); initialisePositionCounters(initialTermId, params, rawLog.metaData()); @@ -1905,6 +1917,8 @@ private void initLogMetadata( final boolean rejoin, final boolean reliable, final boolean sparse, + final boolean group, + final boolean isResponse, final int publicationWindowLength, final long untetheredWindowLimitTimeoutNs, final long untetheredRestingTimeoutNs, @@ -1912,6 +1926,8 @@ private void initLogMetadata( final long lingerTimeoutNs, final boolean signalEos, final boolean spiesSimulateConnection, + final long entityTag, + final long responseCorrelationId, final RawLog rawLog) { final UnsafeBuffer logMetaData = rawLog.metaData(); @@ -1945,7 +1961,11 @@ private void initLogMetadata( signalEos(logMetaData, signalEos); spiesSimulateConnection(logMetaData, spiesSimulateConnection); tether(logMetaData, tether); + group(logMetaData, group); + isResponse(logMetaData, isResponse); + entityTag(logMetaData, entityTag); + responseCorrelationId(logMetaData, responseCorrelationId); untetheredWindowLimitTimeoutNs(logMetaData, untetheredWindowLimitTimeoutNs); untetheredRestingTimeoutNs(logMetaData, untetheredRestingTimeoutNs); lingerTimeoutNs(logMetaData, lingerTimeoutNs); @@ -1997,7 +2017,8 @@ private RawLog newPublicationImageLog( final int socketSndBufLength, final int termOffset, final SubscriptionParams params, - final long correlationId) + final long correlationId, + final boolean hasGroupSemantics) { final RawLog rawLog = logFactory.newImage(correlationId, termBufferLength, isSparse); @@ -2008,6 +2029,8 @@ private RawLog newPublicationImageLog( final long lingerTimeoutNs = 0; final boolean signalEos = false; final boolean spiesSimulateConnection = false; + final long entityTag = 0; + final long responseCorrelationId = 0; initLogMetadata( sessionId, streamId, @@ -2022,6 +2045,8 @@ private RawLog newPublicationImageLog( params.isRejoin, params.isReliable, params.isSparse, + hasGroupSemantics, + params.isResponse, publicationWindowLength, untetheredWindowLimitTimeoutNs, untetheredRestingTimeoutNs, @@ -2029,6 +2054,8 @@ private RawLog newPublicationImageLog( lingerTimeoutNs, signalEos, spiesSimulateConnection, + entityTag, + responseCorrelationId, rawLog); return rawLog; @@ -2842,12 +2869,20 @@ static FeedbackDelayGenerator resolveDelayGenerator( final UdpChannel channel, final InferableBoolean receiverGroupConsideration, final short flags) + { + return resolveDelayGenerator(ctx, channel, isMulticastSemantics(channel, receiverGroupConsideration, flags)); + } + + static boolean isMulticastSemantics( + final UdpChannel channel, + final InferableBoolean receiverGroupConsideration, + final short flags) { final boolean isGroupFromFlag = (flags & SetupFlyweight.GROUP_FLAG) == SetupFlyweight.GROUP_FLAG; - final boolean isMulticastSemantics = receiverGroupConsideration == INFER ? - channel.isMulticast() || isGroupFromFlag : receiverGroupConsideration == FORCE_TRUE; - return resolveDelayGenerator(ctx, channel, isMulticastSemantics); + return receiverGroupConsideration == INFER ? + channel.isMulticast() || isGroupFromFlag : + receiverGroupConsideration == FORCE_TRUE; } private interface AsyncResult extends Supplier