Skip to content

Commit

Permalink
[Java] Retry sending recording descriptors upon back-pressure from th…
Browse files Browse the repository at this point in the history
…e client. This requires sending descriptors potentially out of order with already queued but not yet sent responses. Make listing recording by id also a retryable operation.
  • Loading branch information
vyazelenko committed Jan 22, 2025
1 parent 809e5fb commit 5ad7601
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,11 @@ public int doWork()

if (acceptDescriptor(descriptorBuffer))
{
controlSession.sendDescriptor(correlationId, descriptorBuffer);
if (!controlSession.sendDescriptor(correlationId, descriptorBuffer))
{
isDone = controlSession.isDone();
break;
}
++sent;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,13 +609,12 @@ void listRecording(final long correlationId, final long recordingId, final Contr
final String msg = "active listing already in progress";
controlSession.sendErrorResponse(correlationId, ACTIVE_LISTING, msg);
}
else if (catalog.wrapDescriptor(recordingId, descriptorBuffer))
{
controlSession.sendDescriptor(correlationId, descriptorBuffer);
}
else
{
controlSession.sendRecordingUnknown(correlationId, recordingId);
final ListRecordingByIdSession session =
new ListRecordingByIdSession(correlationId, recordingId, catalog, controlSession, descriptorBuffer);
addSession(session);
controlSession.activeListing(session);
}
}

Expand Down
28 changes: 10 additions & 18 deletions aeron-archive/src/main/java/io/aeron/archive/ControlSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -710,36 +710,28 @@ void asyncSendOkResponse(final long correlationId, final long replaySessionId)
}
}

void sendDescriptor(final long correlationId, final UnsafeBuffer descriptorBuffer)
boolean sendDescriptor(final long correlationId, final UnsafeBuffer descriptorBuffer)
{
assertCalledOnConductorThread();
if (!syncResponseQueue.isEmpty() ||
!controlResponseProxy.sendDescriptor(controlSessionId, correlationId, descriptorBuffer, this))
{
updateActivityDeadline(cachedEpochClock.time());
syncResponseQueue.offer(() -> controlResponseProxy.sendDescriptor(
controlSessionId, correlationId, descriptorBuffer, this));
}
else
final boolean sent =
controlResponseProxy.sendDescriptor(controlSessionId, correlationId, descriptorBuffer, this);
if (sent)
{
activityDeadlineMs = Aeron.NULL_VALUE;
}
return sent;
}

void sendSubscriptionDescriptor(final long correlationId, final Subscription subscription)
boolean sendSubscriptionDescriptor(final long correlationId, final Subscription subscription)
{
assertCalledOnConductorThread();
if (!syncResponseQueue.isEmpty() ||
!controlResponseProxy.sendSubscriptionDescriptor(controlSessionId, correlationId, subscription, this))
{
updateActivityDeadline(cachedEpochClock.time());
syncResponseQueue.offer(() -> controlResponseProxy.sendSubscriptionDescriptor(
controlSessionId, correlationId, subscription, this));
}
else
final boolean sent =
controlResponseProxy.sendSubscriptionDescriptor(controlSessionId, correlationId, subscription, this);
if (sent)
{
activityDeadlineMs = Aeron.NULL_VALUE;
}
return sent;
}

void sendSignal(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2014-2025 Real Logic Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.aeron.archive;

import org.agrona.concurrent.UnsafeBuffer;

class ListRecordingByIdSession extends AbstractListRecordingsSession
{
ListRecordingByIdSession(
final long correlationId,
final long recordingId,
final Catalog catalog,
final ControlSession controlSession,
final UnsafeBuffer descriptorBuffer)
{
super(correlationId, recordingId, 1, catalog, controlSession, descriptorBuffer);
}

boolean acceptDescriptor(final UnsafeBuffer descriptorBuffer)
{
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,11 @@ public int doWork()
if (!(applyStreamId && subscription.streamId() != streamId) &&
subscription.channel().contains(channelFragment))
{
controlSession.sendSubscriptionDescriptor(correlationId, subscription);
if (!controlSession.sendSubscriptionDescriptor(correlationId, subscription))
{
isDone = controlSession.isDone();
break;
}
workCount += 1;

if (++sent >= subscriptionCount)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright 2014-2025 Real Logic Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.aeron.archive;

import io.aeron.archive.codecs.RecordingDescriptorDecoder;
import io.aeron.archive.codecs.RecordingDescriptorHeaderDecoder;
import org.agrona.CloseHelper;
import org.agrona.IoUtil;
import org.agrona.concurrent.EpochClock;
import org.agrona.concurrent.UnsafeBuffer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.stubbing.Answer;

import java.io.File;

import static org.mockito.Mockito.*;

class ListRecordingByIdSessionTest
{
private static final long CAPACITY = 1024 * 1024;
private static final int SEGMENT_FILE_SIZE = 128 * 1024 * 1024;
private final RecordingDescriptorDecoder recordingDescriptorDecoder = new RecordingDescriptorDecoder();
private final long[] recordingIds = new long[3];
private final File archiveDir = ArchiveTests.makeTestDirectory();
private final EpochClock clock = mock(EpochClock.class);

private Catalog catalog;
private final ControlSession controlSession = mock(ControlSession.class);
private final UnsafeBuffer descriptorBuffer = new UnsafeBuffer();

@BeforeEach
void before()
{
catalog = new Catalog(archiveDir, null, 0, CAPACITY, clock, null, null);
recordingIds[0] = catalog.addNewRecording(
0L, 0L, 0, SEGMENT_FILE_SIZE, 4096, 1024, 6, 1, "channelG", "channelG?tag=f", "sourceA");
recordingIds[1] = catalog.addNewRecording(
0L, 0L, 0, SEGMENT_FILE_SIZE, 4096, 1024, 7, 2, "channelH", "channelH?tag=f", "sourceV");
recordingIds[2] = catalog.addNewRecording(
0L, 0L, 0, SEGMENT_FILE_SIZE, 4096, 1024, 8, 3, "channelK", "channelK?tag=f", "sourceB");
}

@AfterEach
void after()
{
CloseHelper.close(catalog);
IoUtil.delete(archiveDir, false);
}

@Test
void shouldSendDescriptor()
{
final long correlationId = -53465834;
final ListRecordingByIdSession session =
new ListRecordingByIdSession(correlationId, recordingIds[1], catalog, controlSession, descriptorBuffer);

doAnswer(verifySendDescriptor())
.when(controlSession)
.sendDescriptor(eq(correlationId), any());

session.doWork();
verify(controlSession).sendDescriptor(eq(correlationId), any());
verifyNoMoreInteractions(controlSession);
}

@Test
void shouldSendRecordingUnknownOnFirst()
{
final long correlationId = 42;
final long unknownRecordingId = 17777;
final ListRecordingByIdSession session =
new ListRecordingByIdSession(correlationId, unknownRecordingId, catalog, controlSession, descriptorBuffer);

session.doWork();

verify(controlSession).sendRecordingUnknown(eq(correlationId), eq(unknownRecordingId));
verifyNoMoreInteractions(controlSession);
}

private Answer<Object> verifySendDescriptor()
{
return (invocation) ->
{
final UnsafeBuffer buffer = invocation.getArgument(1);

recordingDescriptorDecoder.wrap(
buffer,
RecordingDescriptorHeaderDecoder.BLOCK_LENGTH,
RecordingDescriptorDecoder.BLOCK_LENGTH,
RecordingDescriptorDecoder.SCHEMA_VERSION);

return true;
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ private Answer<Object> verifySendDescriptor(final MutableLong counter)
assertEquals(matchingRecordingIds[i], recordingDescriptorDecoder.recordingId());
counter.set(i + 1);

return buffer.getInt(0);
return true;
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private Answer<Object> verifySendDescriptor(final MutableLong counter)
assertEquals(recordingIds[i], recordingDescriptorDecoder.recordingId());
counter.set(i + 1);

return buffer.getInt(0);
return true;
};
}
}
}

0 comments on commit 5ad7601

Please sign in to comment.