Skip to content

Commit

Permalink
[Java] Add symmetry for matching tags to endpoints.
Browse files Browse the repository at this point in the history
  • Loading branch information
mikeb01 committed Jan 15, 2025
1 parent 7b7a186 commit aa897e9
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2127,8 +2127,7 @@ private SendChannelEndpoint findExistingSendChannelEndpoint(final UdpChannel udp
{
for (final SendChannelEndpoint endpoint : sendChannelEndpointByChannelMap.values())
{
final UdpChannel endpointUdpChannel = endpoint.udpChannel();
if (endpointUdpChannel.matchesTag(udpChannel))
if (endpoint.matchesTag(udpChannel))
{
return endpoint;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,17 @@ private void applyChannelSendTimestamp(final ByteBuffer buffer)
}
}
}

/**
* Does the channel have a matching tag?
*
* @param udpChannel with tag to match against.
* @return true if the channel matches on tag identity.
*/
public boolean matchesTag(final UdpChannel udpChannel)
{
return super.udpChannel.matchesTag(udpChannel);
}
}

abstract class MultiSndDestinationLhsPadding
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,54 @@ void shouldReResolveUnicastAddressWhenSendChannelEndpointIsReused()
}
}

@Test
@SlowTest
@InterruptAfter(10)
void shouldHandleTaggedSubscriptionsAddressWithReResolutionToMdcPublications()
{
final String taggedUri = SUBSCRIPTION_DYNAMIC_MDC_URI + "|tags=22701";

subscription = client.addSubscription(taggedUri, STREAM_ID);
assertFalse(subscription.isConnected());

assertTrue(updateNameResolutionStatus(countersReader, CONTROL_NAME, USE_RE_RESOLUTION_HOST));

publication = client.addPublication(SECOND_PUBLICATION_DYNAMIC_MDC_URI, STREAM_ID);

Tests.awaitConnected(subscription);

try (Subscription taggedSub1 = client.addSubscription("aeron:udp?tags=22701", STREAM_ID);
Subscription taggedSub2 = client.addSubscription(taggedUri, STREAM_ID))
{
Tests.awaitConnected(taggedSub1);
Tests.awaitConnected(taggedSub2);
}
}

@Test
@SlowTest
@InterruptAfter(10)
void shouldHandleTaggedPublication()
{
final String taggedUri = PUBLICATION_URI + "|tags=22701";

publication = client.addPublication(taggedUri, STREAM_ID);
assertFalse(publication.isConnected());

assertTrue(updateNameResolutionStatus(countersReader, ENDPOINT_NAME, USE_RE_RESOLUTION_HOST));

subscription = client.addSubscription(SECOND_SUBSCRIPTION_URI, STREAM_ID);

Tests.awaitConnected(publication);

try (Publication taggedPub1 = client.addPublication("aeron:udp?tags=22701", STREAM_ID);
Publication taggedPub2 = client.addPublication(taggedUri, STREAM_ID))
{
Tests.awaitConnected(taggedPub1);
Tests.awaitConnected(taggedPub2);
}
}

private static void assumeBindAddressAvailable(final String address)
{
final String message = NetworkTestingUtil.isBindAddressAvailable(address);
Expand Down

0 comments on commit aa897e9

Please sign in to comment.