Skip to content

Commit

Permalink
[Java] Renaming for clarity.
Browse files Browse the repository at this point in the history
  • Loading branch information
mikeb01 committed Jan 20, 2025
1 parent 9cf935b commit 8203064
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -640,8 +640,8 @@ public void onLoadPendingMessageTracker(
public void onLoadPendingMessage(
final long clusterSessionId, final DirectBuffer buffer, final int offset, final int length)
{
final int serviceTrackerIndex = PendingServiceMessageTracker.serviceId(clusterSessionId);
pendingServiceMessageTrackers[serviceTrackerIndex].appendMessage(buffer, offset, length);
final int serviceId = PendingServiceMessageTracker.serviceIdFromLogMessage(clusterSessionId);
pendingServiceMessageTrackers[serviceId].appendMessage(buffer, offset, length);
}

public void onLoadTimer(
Expand Down Expand Up @@ -1388,8 +1388,8 @@ void onServiceCloseSession(final long clusterSessionId)

void onServiceMessage(final long clusterSessionId, final DirectBuffer buffer, final int offset, final int length)
{
final int i = (int)clusterSessionId;
pendingServiceMessageTrackers[i].enqueueMessage((MutableDirectBuffer)buffer, offset, length);
final int serviceId = PendingServiceMessageTracker.serviceIdFromServiceMessage(clusterSessionId);
pendingServiceMessageTrackers[serviceId].enqueueMessage((MutableDirectBuffer)buffer, offset, length);
}

void onScheduleTimer(final long correlationId, final long deadline)
Expand Down Expand Up @@ -1447,8 +1447,8 @@ void onReplaySessionMessage(final long clusterSessionId, final long timestamp)
}
else if (clusterSessionId < 0)
{
final int i = PendingServiceMessageTracker.serviceId(clusterSessionId);
pendingServiceMessageTrackers[i].sweepFollowerMessages(clusterSessionId);
final int serviceId = PendingServiceMessageTracker.serviceIdFromLogMessage(clusterSessionId);
pendingServiceMessageTrackers[serviceId].sweepFollowerMessages(clusterSessionId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
package io.aeron.cluster;

import io.aeron.Counter;
import io.aeron.DirectBufferVector;
import io.aeron.cluster.client.ClusterException;
import io.aeron.cluster.codecs.MessageHeaderDecoder;
import io.aeron.cluster.codecs.SessionMessageHeaderDecoder;
import io.aeron.cluster.codecs.SessionMessageHeaderEncoder;
import io.aeron.cluster.service.ClusterClock;
import io.aeron.exceptions.AeronException;
import io.aeron.logbuffer.BufferClaim;
import org.agrona.DirectBuffer;
import org.agrona.ExpandableRingBuffer;
import org.agrona.MutableDirectBuffer;
Expand Down Expand Up @@ -276,11 +278,23 @@ private boolean followerMessageSweeper(
return buffer.getLong(clusterSessionIdOffset, SessionMessageHeaderDecoder.BYTE_ORDER) <= logServiceSessionId;
}

static int serviceId(final long clusterSessionId)
static int serviceIdFromLogMessage(final long clusterSessionId)
{
return ((int)(clusterSessionId >>> 56)) & 0x7F;
}

/**
* Services use different approach for communicating the serviceId, this method extracts the serviceId from a
* cluster session id sent via an inter-service message.
*
* @param clusterSessionId passed in on an inter-service message.
* @return the associated serviceId.
*/
static int serviceIdFromServiceMessage(final long clusterSessionId)
{
return (int)clusterSessionId;
}

static long serviceSessionId(final int serviceId, final long sessionId)
{
return ((long)serviceId << 56) | sessionId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ public void onLoadConsensusModuleState(
public void onLoadPendingMessage(
final long clusterSessionId, final DirectBuffer buffer, final int offset, final int length)
{
final int serviceId = PendingServiceMessageTracker.serviceId(clusterSessionId);
final int serviceId = PendingServiceMessageTracker.serviceIdFromLogMessage(clusterSessionId);
final PendingMessageTrackerState trackerState = tracker(serviceId);
trackerState.pendingServiceMessageCount++;
trackerState.minClusterSessionId = min(trackerState.minClusterSessionId, clusterSessionId);
Expand Down Expand Up @@ -489,7 +489,8 @@ public void onLoadPendingMessage(
final int offset,
final int length)
{
final TargetState targetState = targetStates[PendingServiceMessageTracker.serviceId(clusterSessionId)];
final int serviceId = PendingServiceMessageTracker.serviceIdFromLogMessage(clusterSessionId);
final TargetState targetState = targetStates[serviceId];

tempBuffer.putBytes(0, buffer, offset, length);
sessionMessageHeaderEncoder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ public void onLoadPendingMessageTracker(
public void onLoadPendingMessage(
final long clusterSessionId, final DirectBuffer buffer, final int offset, final int length)
{
final int serviceId = serviceId(clusterSessionId);
final int serviceId = serviceIdFromLogMessage(clusterSessionId);
pendingMessageOffsets[serviceId].addInt(offset);
assertEquals(
MessageHeaderDecoder.ENCODED_LENGTH + SessionMessageHeaderDecoder.BLOCK_LENGTH + SIZE_OF_INT,
Expand Down Expand Up @@ -445,7 +445,7 @@ public void onLoadPendingMessageTracker(
public void onLoadPendingMessage(
final long clusterSessionId, final DirectBuffer buffer, final int offset, final int length)
{
final int serviceId = serviceId(clusterSessionId);
final int serviceId = serviceIdFromLogMessage(clusterSessionId);
onLoadPendingMessageCounters[serviceId].increment();
assertEquals(
nextClusterSessionIds[serviceId]++, clusterSessionId, "Invalid pending message header!");
Expand Down Expand Up @@ -594,7 +594,7 @@ private static void modifySnapshot(
final long originalClusterSessionId = sessionMessageHeaderDecoder
.wrapAndApplyHeader(snapshotBuffer, offset, messageHeaderDecoder)
.clusterSessionId();
final int serviceId = serviceId(originalClusterSessionId);
final int serviceId = serviceIdFromLogMessage(originalClusterSessionId);
final long newClusterSessionId = serviceSessionId(serviceId, clusterSessionId);

sessionMessageHeaderEncoder
Expand Down

0 comments on commit 8203064

Please sign in to comment.