From 5ad7601c8237ecb2312072ed4170551453e65590 Mon Sep 17 00:00:00 2001 From: Dmytro Vyazelenko <696855+vyazelenko@users.noreply.github.com> Date: Wed, 22 Jan 2025 17:42:30 +0100 Subject: [PATCH] [Java] Retry sending recording descriptors upon back-pressure from the client. This requires sending descriptors potentially out of order with already queued but not yet sent responses. Make listing recording by id also a retryable operation. --- .../AbstractListRecordingsSession.java | 6 +- .../io/aeron/archive/ArchiveConductor.java | 9 +- .../java/io/aeron/archive/ControlSession.java | 28 ++--- .../archive/ListRecordingByIdSession.java | 36 ++++++ .../ListRecordingSubscriptionsSession.java | 6 +- .../archive/ListRecordingByIdSessionTest.java | 110 ++++++++++++++++++ .../ListRecordingsForUriSessionTest.java | 4 +- .../archive/ListRecordingsSessionTest.java | 4 +- 8 files changed, 174 insertions(+), 29 deletions(-) create mode 100644 aeron-archive/src/main/java/io/aeron/archive/ListRecordingByIdSession.java create mode 100644 aeron-archive/src/test/java/io/aeron/archive/ListRecordingByIdSessionTest.java diff --git a/aeron-archive/src/main/java/io/aeron/archive/AbstractListRecordingsSession.java b/aeron-archive/src/main/java/io/aeron/archive/AbstractListRecordingsSession.java index 8b0228f109..c4d1230199 100644 --- a/aeron-archive/src/main/java/io/aeron/archive/AbstractListRecordingsSession.java +++ b/aeron-archive/src/main/java/io/aeron/archive/AbstractListRecordingsSession.java @@ -111,7 +111,11 @@ public int doWork() if (acceptDescriptor(descriptorBuffer)) { - controlSession.sendDescriptor(correlationId, descriptorBuffer); + if (!controlSession.sendDescriptor(correlationId, descriptorBuffer)) + { + isDone = controlSession.isDone(); + break; + } ++sent; } diff --git a/aeron-archive/src/main/java/io/aeron/archive/ArchiveConductor.java b/aeron-archive/src/main/java/io/aeron/archive/ArchiveConductor.java index 98607f973b..de2907378d 100644 --- a/aeron-archive/src/main/java/io/aeron/archive/ArchiveConductor.java +++ b/aeron-archive/src/main/java/io/aeron/archive/ArchiveConductor.java @@ -609,13 +609,12 @@ void listRecording(final long correlationId, final long recordingId, final Contr final String msg = "active listing already in progress"; controlSession.sendErrorResponse(correlationId, ACTIVE_LISTING, msg); } - else if (catalog.wrapDescriptor(recordingId, descriptorBuffer)) - { - controlSession.sendDescriptor(correlationId, descriptorBuffer); - } else { - controlSession.sendRecordingUnknown(correlationId, recordingId); + final ListRecordingByIdSession session = + new ListRecordingByIdSession(correlationId, recordingId, catalog, controlSession, descriptorBuffer); + addSession(session); + controlSession.activeListing(session); } } diff --git a/aeron-archive/src/main/java/io/aeron/archive/ControlSession.java b/aeron-archive/src/main/java/io/aeron/archive/ControlSession.java index ef8f7943ac..a3563a5d27 100644 --- a/aeron-archive/src/main/java/io/aeron/archive/ControlSession.java +++ b/aeron-archive/src/main/java/io/aeron/archive/ControlSession.java @@ -710,36 +710,28 @@ void asyncSendOkResponse(final long correlationId, final long replaySessionId) } } - void sendDescriptor(final long correlationId, final UnsafeBuffer descriptorBuffer) + boolean sendDescriptor(final long correlationId, final UnsafeBuffer descriptorBuffer) { assertCalledOnConductorThread(); - if (!syncResponseQueue.isEmpty() || - !controlResponseProxy.sendDescriptor(controlSessionId, correlationId, descriptorBuffer, this)) - { - updateActivityDeadline(cachedEpochClock.time()); - syncResponseQueue.offer(() -> controlResponseProxy.sendDescriptor( - controlSessionId, correlationId, descriptorBuffer, this)); - } - else + final boolean sent = + controlResponseProxy.sendDescriptor(controlSessionId, correlationId, descriptorBuffer, this); + if (sent) { activityDeadlineMs = Aeron.NULL_VALUE; } + return sent; } - void sendSubscriptionDescriptor(final long correlationId, final Subscription subscription) + boolean sendSubscriptionDescriptor(final long correlationId, final Subscription subscription) { assertCalledOnConductorThread(); - if (!syncResponseQueue.isEmpty() || - !controlResponseProxy.sendSubscriptionDescriptor(controlSessionId, correlationId, subscription, this)) - { - updateActivityDeadline(cachedEpochClock.time()); - syncResponseQueue.offer(() -> controlResponseProxy.sendSubscriptionDescriptor( - controlSessionId, correlationId, subscription, this)); - } - else + final boolean sent = + controlResponseProxy.sendSubscriptionDescriptor(controlSessionId, correlationId, subscription, this); + if (sent) { activityDeadlineMs = Aeron.NULL_VALUE; } + return sent; } void sendSignal( diff --git a/aeron-archive/src/main/java/io/aeron/archive/ListRecordingByIdSession.java b/aeron-archive/src/main/java/io/aeron/archive/ListRecordingByIdSession.java new file mode 100644 index 0000000000..67deb4af22 --- /dev/null +++ b/aeron-archive/src/main/java/io/aeron/archive/ListRecordingByIdSession.java @@ -0,0 +1,36 @@ +/* + * Copyright 2014-2025 Real Logic Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.aeron.archive; + +import org.agrona.concurrent.UnsafeBuffer; + +class ListRecordingByIdSession extends AbstractListRecordingsSession +{ + ListRecordingByIdSession( + final long correlationId, + final long recordingId, + final Catalog catalog, + final ControlSession controlSession, + final UnsafeBuffer descriptorBuffer) + { + super(correlationId, recordingId, 1, catalog, controlSession, descriptorBuffer); + } + + boolean acceptDescriptor(final UnsafeBuffer descriptorBuffer) + { + return true; + } +} diff --git a/aeron-archive/src/main/java/io/aeron/archive/ListRecordingSubscriptionsSession.java b/aeron-archive/src/main/java/io/aeron/archive/ListRecordingSubscriptionsSession.java index 6e2d2904b3..93370021bb 100644 --- a/aeron-archive/src/main/java/io/aeron/archive/ListRecordingSubscriptionsSession.java +++ b/aeron-archive/src/main/java/io/aeron/archive/ListRecordingSubscriptionsSession.java @@ -100,7 +100,11 @@ public int doWork() if (!(applyStreamId && subscription.streamId() != streamId) && subscription.channel().contains(channelFragment)) { - controlSession.sendSubscriptionDescriptor(correlationId, subscription); + if (!controlSession.sendSubscriptionDescriptor(correlationId, subscription)) + { + isDone = controlSession.isDone(); + break; + } workCount += 1; if (++sent >= subscriptionCount) diff --git a/aeron-archive/src/test/java/io/aeron/archive/ListRecordingByIdSessionTest.java b/aeron-archive/src/test/java/io/aeron/archive/ListRecordingByIdSessionTest.java new file mode 100644 index 0000000000..aa911840a4 --- /dev/null +++ b/aeron-archive/src/test/java/io/aeron/archive/ListRecordingByIdSessionTest.java @@ -0,0 +1,110 @@ +/* + * Copyright 2014-2025 Real Logic Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.aeron.archive; + +import io.aeron.archive.codecs.RecordingDescriptorDecoder; +import io.aeron.archive.codecs.RecordingDescriptorHeaderDecoder; +import org.agrona.CloseHelper; +import org.agrona.IoUtil; +import org.agrona.concurrent.EpochClock; +import org.agrona.concurrent.UnsafeBuffer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.stubbing.Answer; + +import java.io.File; + +import static org.mockito.Mockito.*; + +class ListRecordingByIdSessionTest +{ + private static final long CAPACITY = 1024 * 1024; + private static final int SEGMENT_FILE_SIZE = 128 * 1024 * 1024; + private final RecordingDescriptorDecoder recordingDescriptorDecoder = new RecordingDescriptorDecoder(); + private final long[] recordingIds = new long[3]; + private final File archiveDir = ArchiveTests.makeTestDirectory(); + private final EpochClock clock = mock(EpochClock.class); + + private Catalog catalog; + private final ControlSession controlSession = mock(ControlSession.class); + private final UnsafeBuffer descriptorBuffer = new UnsafeBuffer(); + + @BeforeEach + void before() + { + catalog = new Catalog(archiveDir, null, 0, CAPACITY, clock, null, null); + recordingIds[0] = catalog.addNewRecording( + 0L, 0L, 0, SEGMENT_FILE_SIZE, 4096, 1024, 6, 1, "channelG", "channelG?tag=f", "sourceA"); + recordingIds[1] = catalog.addNewRecording( + 0L, 0L, 0, SEGMENT_FILE_SIZE, 4096, 1024, 7, 2, "channelH", "channelH?tag=f", "sourceV"); + recordingIds[2] = catalog.addNewRecording( + 0L, 0L, 0, SEGMENT_FILE_SIZE, 4096, 1024, 8, 3, "channelK", "channelK?tag=f", "sourceB"); + } + + @AfterEach + void after() + { + CloseHelper.close(catalog); + IoUtil.delete(archiveDir, false); + } + + @Test + void shouldSendDescriptor() + { + final long correlationId = -53465834; + final ListRecordingByIdSession session = + new ListRecordingByIdSession(correlationId, recordingIds[1], catalog, controlSession, descriptorBuffer); + + doAnswer(verifySendDescriptor()) + .when(controlSession) + .sendDescriptor(eq(correlationId), any()); + + session.doWork(); + verify(controlSession).sendDescriptor(eq(correlationId), any()); + verifyNoMoreInteractions(controlSession); + } + + @Test + void shouldSendRecordingUnknownOnFirst() + { + final long correlationId = 42; + final long unknownRecordingId = 17777; + final ListRecordingByIdSession session = + new ListRecordingByIdSession(correlationId, unknownRecordingId, catalog, controlSession, descriptorBuffer); + + session.doWork(); + + verify(controlSession).sendRecordingUnknown(eq(correlationId), eq(unknownRecordingId)); + verifyNoMoreInteractions(controlSession); + } + + private Answer verifySendDescriptor() + { + return (invocation) -> + { + final UnsafeBuffer buffer = invocation.getArgument(1); + + recordingDescriptorDecoder.wrap( + buffer, + RecordingDescriptorHeaderDecoder.BLOCK_LENGTH, + RecordingDescriptorDecoder.BLOCK_LENGTH, + RecordingDescriptorDecoder.SCHEMA_VERSION); + + return true; + }; + } +} diff --git a/aeron-archive/src/test/java/io/aeron/archive/ListRecordingsForUriSessionTest.java b/aeron-archive/src/test/java/io/aeron/archive/ListRecordingsForUriSessionTest.java index eaf2a5456c..d4f3c3637c 100644 --- a/aeron-archive/src/test/java/io/aeron/archive/ListRecordingsForUriSessionTest.java +++ b/aeron-archive/src/test/java/io/aeron/archive/ListRecordingsForUriSessionTest.java @@ -199,7 +199,7 @@ private Answer verifySendDescriptor(final MutableLong counter) assertEquals(matchingRecordingIds[i], recordingDescriptorDecoder.recordingId()); counter.set(i + 1); - return buffer.getInt(0); + return true; }; } -} \ No newline at end of file +} diff --git a/aeron-archive/src/test/java/io/aeron/archive/ListRecordingsSessionTest.java b/aeron-archive/src/test/java/io/aeron/archive/ListRecordingsSessionTest.java index 240ad1eb4c..7db5001f08 100644 --- a/aeron-archive/src/test/java/io/aeron/archive/ListRecordingsSessionTest.java +++ b/aeron-archive/src/test/java/io/aeron/archive/ListRecordingsSessionTest.java @@ -162,7 +162,7 @@ private Answer verifySendDescriptor(final MutableLong counter) assertEquals(recordingIds[i], recordingDescriptorDecoder.recordingId()); counter.set(i + 1); - return buffer.getInt(0); + return true; }; } -} \ No newline at end of file +}