Skip to content

Commit

Permalink
Bug/fix error with tagged channels reresolution (#1720)
Browse files Browse the repository at this point in the history
* [Java] Add symmetry for matching tags to endpoints.

* [Java] Use endpoint addresses when checking against tagged endpoints instead of UdpChannel address, so it works correctly during re-resolution.

* [Java] Handle null connect/current control addresses. Flip matches tag logic so that the incoming channel is matched against the existing channel/endpoint to simplify the code.  Re-encapsulate the matching logic and pass in the overriding addresses from the endpoints.
  • Loading branch information
mikeb01 authored Jan 16, 2025
1 parent b5ccbce commit ab96d35
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2154,8 +2154,7 @@ private SendChannelEndpoint findExistingSendChannelEndpoint(final UdpChannel udp
{
for (final SendChannelEndpoint endpoint : sendChannelEndpointByChannelMap.values())
{
final UdpChannel endpointUdpChannel = endpoint.udpChannel();
if (endpointUdpChannel.matchesTag(udpChannel))
if (endpoint.matchesTag(udpChannel))
{
return endpoint;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ public long tag()
*/
public boolean matchesTag(final UdpChannel udpChannel)
{
return super.udpChannel.matchesTag(udpChannel);
return udpChannel.matchesTag(super.udpChannel, currentControlAddress, null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,17 @@ private void applyChannelSendTimestamp(final ByteBuffer buffer)
}
}
}

/**
* Does the channel have a matching tag?
*
* @param udpChannel with tag to match against.
* @return true if the channel matches on tag identity.
*/
public boolean matchesTag(final UdpChannel udpChannel)
{
return udpChannel.matchesTag(super.udpChannel, null, connectAddress);
}
}

abstract class MultiSndDestinationLhsPadding
Expand Down
44 changes: 28 additions & 16 deletions aeron-driver/src/main/java/io/aeron/driver/media/UdpChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -775,22 +775,27 @@ public Long nakDelayNs()
* Does this channel have a tag match to another channel having INADDR_ANY endpoints.
*
* @param udpChannel to match against.
* @param localAddress local address override to use for this channel.
* @param remoteAddress remote address override to use for this channel.
* @return true if there is a match otherwise false.
*/
public boolean matchesTag(final UdpChannel udpChannel)
public boolean matchesTag(
final UdpChannel udpChannel,
final InetSocketAddress localAddress,
final InetSocketAddress remoteAddress)
{
if (!hasTag || !udpChannel.hasTag() || tag != udpChannel.tag())
{
return false;
}

if (!matchesControlMode(udpChannel))
if (!hasMatchingControlMode(udpChannel))
{
throw new IllegalArgumentException(
"matching tag=" + tag + " has mismatched control-mode: " + uriStr + " <> " + udpChannel.uriStr);
}

if (!hasMatchingAddress(udpChannel))
if (!hasMatchingAddress(udpChannel, localAddress, remoteAddress))
{
throw new IllegalArgumentException(
"matching tag=" + tag + " has mismatched endpoint or control: " + uriStr + " <> " + udpChannel.uriStr);
Expand All @@ -799,24 +804,31 @@ public boolean matchesTag(final UdpChannel udpChannel)
return true;
}

private boolean hasMatchingAddress(final UdpChannel udpChannel)
private boolean isWildcard()
{
final boolean otherChannelIsWildcard = udpChannel.remoteData().getAddress().isAnyLocalAddress() &&
udpChannel.remoteData().getPort() == 0 &&
udpChannel.localData().getAddress().isAnyLocalAddress() &&
udpChannel.localData().getPort() == 0;

final boolean otherChannelMatches = udpChannel.remoteData().getAddress().equals(remoteData.getAddress()) &&
udpChannel.remoteData().getPort() == remoteData.getPort() &&
udpChannel.localData().getAddress().equals(localData.getAddress()) &&
udpChannel.localData().getPort() == localData.getPort();
return remoteData.getAddress().isAnyLocalAddress() &&
remoteData.getPort() == 0 &&
localData.getAddress().isAnyLocalAddress() &&
localData.getPort() == 0;
}

return otherChannelIsWildcard || otherChannelMatches;
private boolean hasMatchingControlMode(final UdpChannel udpChannel)
{
return controlMode() == ControlMode.NONE || controlMode() == udpChannel.controlMode();
}

private boolean matchesControlMode(final UdpChannel udpChannel)
private boolean hasMatchingAddress(
final UdpChannel udpChannel,
final InetSocketAddress localAddress,
final InetSocketAddress remoteAddress)
{
return udpChannel.controlMode() == ControlMode.NONE || controlMode() == udpChannel.controlMode();
final InetSocketAddress otherLocalData = localAddress != null ? localAddress : udpChannel.localData();
final InetSocketAddress otherRemoteData = remoteAddress != null ? remoteAddress : udpChannel.remoteData();

return isWildcard() || remoteData().getAddress().equals(otherRemoteData.getAddress()) &&
remoteData().getPort() == otherRemoteData.getPort() &&
localData().getAddress().equals(otherLocalData.getAddress()) &&
localData().getPort() == otherLocalData.getPort();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,54 @@ void shouldReResolveUnicastAddressWhenSendChannelEndpointIsReused()
}
}

@Test
@SlowTest
@InterruptAfter(10)
void shouldHandleTaggedSubscriptionsAddressWithReResolutionToMdcPublications()
{
final String taggedUri = SUBSCRIPTION_DYNAMIC_MDC_URI + "|tags=22701";

subscription = client.addSubscription(taggedUri, STREAM_ID);
assertFalse(subscription.isConnected());

assertTrue(updateNameResolutionStatus(countersReader, CONTROL_NAME, USE_RE_RESOLUTION_HOST));

publication = client.addPublication(SECOND_PUBLICATION_DYNAMIC_MDC_URI, STREAM_ID);

Tests.awaitConnected(subscription);

try (Subscription taggedSub1 = client.addSubscription("aeron:udp?tags=22701", STREAM_ID);
Subscription taggedSub2 = client.addSubscription(taggedUri, STREAM_ID))
{
Tests.awaitConnected(taggedSub1);
Tests.awaitConnected(taggedSub2);
}
}

@Test
@SlowTest
@InterruptAfter(10)
void shouldHandleTaggedPublication()
{
final String taggedUri = PUBLICATION_URI + "|tags=22701";

publication = client.addPublication(taggedUri, STREAM_ID);
assertFalse(publication.isConnected());

assertTrue(updateNameResolutionStatus(countersReader, ENDPOINT_NAME, USE_RE_RESOLUTION_HOST));

subscription = client.addSubscription(SECOND_SUBSCRIPTION_URI, STREAM_ID);

Tests.awaitConnected(publication);

try (Publication taggedPub1 = client.addPublication("aeron:udp?tags=22701", STREAM_ID);
Publication taggedPub2 = client.addPublication(taggedUri, STREAM_ID))
{
Tests.awaitConnected(taggedPub1);
Tests.awaitConnected(taggedPub2);
}
}

private static void assumeBindAddressAvailable(final String address)
{
final String message = NetworkTestingUtil.isBindAddressAvailable(address);
Expand Down

0 comments on commit ab96d35

Please sign in to comment.