Skip to content

Commit

Permalink
PROTON-2807 Account for zero copy buffers on ingest activity tracking
Browse files Browse the repository at this point in the history
Update data ingest in the engine to account for the zero copy mechanics
in proton buffers resulting in missing the update to the incoming
sequence tracker and early read check idle timeout errors.
  • Loading branch information
tabish121 committed Mar 15, 2024
1 parent 2aa2911 commit 7ded26e
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public ProtonEngine tickAuto(ScheduledExecutorService executor) throws IllegalSt
private final ScheduledExecutorService service = executor;

@Override
public boolean isShutdown() {
public boolean isShutdown() {
return service.isShutdown();
}

Expand Down Expand Up @@ -261,14 +261,14 @@ public ProtonEngine ingest(ProtonBuffer input) throws EngineStateException {
throw new EngineNotWritableException("Engine is currently not accepting new input");
}

try {
final int startIndex = input.getReadOffset();
pipeline.fireRead(input);
if (input.getReadOffset() != startIndex) {
if (input.isReadable()) {
try {
pipeline.fireRead(input);
} catch (Exception error) {
throw engineFailed(error);
} finally {
inputSequence++;
}
} catch (Exception error) {
throw engineFailed(error);
}

return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import javax.security.sasl.SaslException;

Expand All @@ -40,6 +41,8 @@
import org.apache.qpid.protonj2.engine.EngineFactory;
import org.apache.qpid.protonj2.engine.EngineState;
import org.apache.qpid.protonj2.engine.HeaderEnvelope;
import org.apache.qpid.protonj2.engine.IncomingDelivery;
import org.apache.qpid.protonj2.engine.Receiver;
import org.apache.qpid.protonj2.engine.SASLEnvelope;
import org.apache.qpid.protonj2.engine.Session;
import org.apache.qpid.protonj2.engine.exceptions.EngineFailedException;
Expand Down Expand Up @@ -1378,4 +1381,70 @@ public void testEnginePipelineProtectsFromExternalUserMischief() {

peer.waitForScriptToComplete();
}

@Test
public void testSlowFrameCoalesceDoesNotTriggerReadIdleTimeout() throws Exception {
// Frame data for: Transfer
// Transfer{handle=0, deliveryId=1, deliveryTag=\x00\x01, messageFormat=null, settled=true, more=false,
// rcvSettleMode=null, state=null, resume=false, aborted=false, batchable=false}
// payload of size: 169
final byte[] completedTransfer1 = new byte[] {
0, 0, 0, -63, 2, 0, 0, 0, 0, 83, 20, -64, 11, 5, 82, 0, 82, 1, -96, 2, 0, 1, 64, 65, 0, 83, 115,
-48, 0, 0, 0, 28, 0, 0, 0, 3, -104, -107, -75, 19, 123, 103, 50, 77, 43, -73, 93, 29, 105, 64};
final byte[] completedTransfer2 = new byte[] {
-84, 45, 110, 64, -95, 4, 116, 101, 115, 116, 0, 83, 116, -63, 23, 2, -95, 9, 116, 105, 109, 101,
115, 116, 97, 109, 112, -95, 9, 49, 50, 51, 52, 53, 54, 55, 56, 57, 0, 83, 117, -96, 100, 65, 65};
final byte[] completedTransfer3 = new byte[] {
65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65,
65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65,
65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65,
65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65};

final Engine engine = EngineFactory.PROTON.createNonSaslEngine();
engine.errorHandler(result -> failure = result.failureCause());
final ProtonTestConnector peer = createTestPeer(engine);

peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().withIdleTimeOut(1000).respond().withIdleTimeOut(0);
peer.expectBegin().respond();
peer.expectAttach().respond();
peer.expectFlow();

final Connection connection = engine.start().setIdleTimeout(1000).open();
final Session session = connection.session().open();
final Receiver receiver = session.receiver("test").open().addCredit(10);
final AtomicReference<IncomingDelivery> receivedDelivery = new AtomicReference<>();
final AtomicBoolean deliveryArrived = new AtomicBoolean();

receiver.deliveryReadHandler(d -> {
deliveryArrived.set(true);
receivedDelivery.set(d);
});

// Initial tick sets first deadline
assertEquals(2000, connection.tick(1000));

peer.remoteBytes().withBytes(completedTransfer1).now();
assertEquals(2500, connection.tick(1500));

peer.remoteBytes().withBytes(completedTransfer2).now();
assertEquals(3000, connection.tick(2000));

peer.remoteBytes().withBytes(completedTransfer3).now();
assertEquals(3500, connection.tick(2500));

peer.waitForScriptToComplete();
peer.expectDetach().respond();
peer.expectEnd().respond();

assertTrue(deliveryArrived.get());
assertNotNull(receivedDelivery.get());

receiver.detach();
session.close();

peer.waitForScriptToComplete();

assertNull(failure);
}
}

0 comments on commit 7ded26e

Please sign in to comment.