Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Java] Add the few missing fields for logbuffer descriptor #1721

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1099,6 +1099,51 @@ public static void tether(final UnsafeBuffer metadataBuffer, final boolean value
metadataBuffer.putByte(LOG_TETHER_OFFSET, (byte)(value ? 1 : 0));
}

/**
* Get whether the log is group from the metadata.
*
* @param metadataBuffer containing the meta data.
* @return true if the log is group, otherwise false.
*/
public static boolean group(final UnsafeBuffer metadataBuffer)
{
return metadataBuffer.getByte(LOG_GROUP_OFFSET) == 1;
}

/**
* Set whether the log is group in the metadata.
*
* @param metadataBuffer containing the meta data.
* @param value true if the log is group, otherwise false.
*/
public static void group(final UnsafeBuffer metadataBuffer, final boolean value)
{
metadataBuffer.putByte(LOG_GROUP_OFFSET, (byte)(value ? 1 : 0));
}

/**
* Get whether the log is response from the metadata.
*
* @param metadataBuffer containing the meta data.
* @return true if the log is group, otherwise false.
*/
public static boolean isResponse(final UnsafeBuffer metadataBuffer)
{
return metadataBuffer.getByte(LOG_IS_RESPONSE_OFFSET) == 1;
}

/**
* Set whether the log is response in the metadata.
*
* @param metadataBuffer containing the meta data.
* @param value true if the log is group, otherwise false.
*/
public static void isResponse(final UnsafeBuffer metadataBuffer, final boolean value)
{
metadataBuffer.putByte(LOG_IS_RESPONSE_OFFSET, (byte)(value ? 1 : 0));
}


/**
* Get whether the log is rejoining from the metadata.
*
Expand Down Expand Up @@ -1407,6 +1452,50 @@ public static void lingerTimeoutNs(final UnsafeBuffer metadataBuffer, final long
metadataBuffer.putLong(LOG_LINGER_TIMEOUT_NS_OFFSET, value);
}

/**
* Get the entity tag from the metadata.
*
* @param metadataBuffer containing the meta data.
* @return the entity tag in nanoseconds.
*/
public static long entityTag(final UnsafeBuffer metadataBuffer)
{
return metadataBuffer.getLong(LOG_ENTITY_TAG_OFFSET);
}

/**
* Set the entity tag in the metadata.
*
* @param metadataBuffer containing the meta data.
* @param value the entity tag to set.
*/
public static void entityTag(final UnsafeBuffer metadataBuffer, final long value)
{
metadataBuffer.putLong(LOG_ENTITY_TAG_OFFSET, value);
}

/**
* Get the response correlation id from the metadata.
*
* @param metadataBuffer containing the meta data.
* @return the entity tag in nanoseconds.
*/
public static long responseCorrelationId(final UnsafeBuffer metadataBuffer)
{
return metadataBuffer.getLong(LOG_RESPONSE_CORRELATION_ID_OFFSET);
}

/**
* Set the response correlation id in the metadata.
*
* @param metadataBuffer containing the meta data.
* @param value the resonse correlation id to set.
*/
public static void responseCorrelationId(final UnsafeBuffer metadataBuffer, final long value)
{
metadataBuffer.putLong(LOG_RESPONSE_CORRELATION_ID_OFFSET, value);
}

/**
* Get whether the signal EOS is enabled from the metadata.
*
Expand Down
49 changes: 42 additions & 7 deletions aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,8 @@ void onCreatePublicationImage(
channelEndpoint.socketSndbufLength(),
termOffset,
subscriptionParams,
registrationId);
registrationId,
isMulticastSemantics(subscriptionChannel, subscriptionParams.group, flags));

congestionControl = ctx.congestionControlSupplier().newInstance(
registrationId,
Expand Down Expand Up @@ -1731,7 +1732,8 @@ private NetworkPublication newNetworkPublication(
final int termOffset = params.termOffset;

final RawLog rawLog = newNetworkPublicationLog(params.sessionId, streamId, params.initialTermId, registrationId,
channelEndpoint.socketRcvbufLength(), channelEndpoint.socketSndbufLength(), termOffset, params);
channelEndpoint.socketRcvbufLength(), channelEndpoint.socketSndbufLength(), termOffset, params,
udpChannel.hasGroupSemantics());
UnsafeBufferPosition publisherPos = null;
UnsafeBufferPosition publisherLmt = null;
UnsafeBufferPosition senderPos = null;
Expand Down Expand Up @@ -1814,7 +1816,8 @@ private RawLog newNetworkPublicationLog(
final int socketRcvBufLength,
final int socketSndBufLength,
final int termOffset,
final PublicationParams params)
final PublicationParams params,
final boolean hasGroupSemantics)
{
final RawLog rawLog = logFactory.newPublication(registrationId, params.termLength, params.isSparse);
final int receiverWindowLength = 0;
Expand All @@ -1835,13 +1838,17 @@ private RawLog newNetworkPublicationLog(
rejoin,
reliable,
params.isSparse,
hasGroupSemantics,
params.isResponse,
params.publicationWindowLength,
params.untetheredWindowLimitTimeoutNs,
params.untetheredRestingTimeoutNs,
params.maxResend,
params.lingerTimeoutNs,
params.signalEos,
params.spiesSimulateConnection,
params.entityTag,
params.responseCorrelationId,
rawLog);
initialisePositionCounters(initialTermId, params, rawLog.metaData());

Expand All @@ -1864,6 +1871,7 @@ private RawLog newIpcPublicationLog(
final boolean tether = false;
final boolean rejoin = false;
final boolean reliable = false;
final boolean group = false;
initLogMetadata(
sessionId,
streamId,
Expand All @@ -1878,13 +1886,17 @@ private RawLog newIpcPublicationLog(
rejoin,
reliable,
params.isSparse,
group,
params.isResponse,
params.publicationWindowLength,
params.untetheredWindowLimitTimeoutNs,
params.untetheredRestingTimeoutNs,
params.maxResend,
params.lingerTimeoutNs,
params.signalEos,
params.spiesSimulateConnection,
params.entityTag,
params.responseCorrelationId,
rawLog);
initialisePositionCounters(initialTermId, params, rawLog.metaData());

Expand All @@ -1905,13 +1917,17 @@ private void initLogMetadata(
final boolean rejoin,
final boolean reliable,
final boolean sparse,
final boolean group,
final boolean isResponse,
final int publicationWindowLength,
final long untetheredWindowLimitTimeoutNs,
final long untetheredRestingTimeoutNs,
final int maxResend,
final long lingerTimeoutNs,
final boolean signalEos,
final boolean spiesSimulateConnection,
final long entityTag,
final long responseCorrelationId,
final RawLog rawLog)
{
final UnsafeBuffer logMetaData = rawLog.metaData();
Expand Down Expand Up @@ -1945,7 +1961,11 @@ private void initLogMetadata(
signalEos(logMetaData, signalEos);
spiesSimulateConnection(logMetaData, spiesSimulateConnection);
tether(logMetaData, tether);
group(logMetaData, group);
isResponse(logMetaData, isResponse);

entityTag(logMetaData, entityTag);
responseCorrelationId(logMetaData, responseCorrelationId);
untetheredWindowLimitTimeoutNs(logMetaData, untetheredWindowLimitTimeoutNs);
untetheredRestingTimeoutNs(logMetaData, untetheredRestingTimeoutNs);
lingerTimeoutNs(logMetaData, lingerTimeoutNs);
Expand Down Expand Up @@ -1997,7 +2017,8 @@ private RawLog newPublicationImageLog(
final int socketSndBufLength,
final int termOffset,
final SubscriptionParams params,
final long correlationId)
final long correlationId,
final boolean hasGroupSemantics)
{
final RawLog rawLog = logFactory.newImage(correlationId, termBufferLength, isSparse);

Expand All @@ -2008,6 +2029,8 @@ private RawLog newPublicationImageLog(
final long lingerTimeoutNs = 0;
final boolean signalEos = false;
final boolean spiesSimulateConnection = false;
final long entityTag = 0;
final long responseCorrelationId = 0;
initLogMetadata(
sessionId,
streamId,
Expand All @@ -2022,13 +2045,17 @@ private RawLog newPublicationImageLog(
params.isRejoin,
params.isReliable,
params.isSparse,
hasGroupSemantics,
params.isResponse,
publicationWindowLength,
untetheredWindowLimitTimeoutNs,
untetheredRestingTimeoutNs,
maxResend,
lingerTimeoutNs,
signalEos,
spiesSimulateConnection,
entityTag,
responseCorrelationId,
rawLog);

return rawLog;
Expand Down Expand Up @@ -2842,12 +2869,20 @@ static FeedbackDelayGenerator resolveDelayGenerator(
final UdpChannel channel,
final InferableBoolean receiverGroupConsideration,
final short flags)
{
return resolveDelayGenerator(ctx, channel, isMulticastSemantics(channel, receiverGroupConsideration, flags));
}

static boolean isMulticastSemantics(
final UdpChannel channel,
final InferableBoolean receiverGroupConsideration,
final short flags)
{
final boolean isGroupFromFlag = (flags & SetupFlyweight.GROUP_FLAG) == SetupFlyweight.GROUP_FLAG;
final boolean isMulticastSemantics = receiverGroupConsideration == INFER ?
channel.isMulticast() || isGroupFromFlag : receiverGroupConsideration == FORCE_TRUE;

return resolveDelayGenerator(ctx, channel, isMulticastSemantics);
return receiverGroupConsideration == INFER ?
channel.isMulticast() || isGroupFromFlag :
receiverGroupConsideration == FORCE_TRUE;
}

private interface AsyncResult<T> extends Supplier<T>
Expand Down
Loading