Skip to content

Commit

Permalink
Methods for unacked messages metrics
Browse files Browse the repository at this point in the history
This change adds a getNumUnackedMessagesPerDest method to UNICAST3/4/5/NAKACK4 which can be used for e.g. metrics.
  • Loading branch information
cfredri4 committed Jan 17, 2025
1 parent ace28d1 commit db8dd2b
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 1 deletion.
13 changes: 12 additions & 1 deletion src/org/jgroups/protocols/NAKACK4.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import java.util.concurrent.atomic.LongAdder;
import java.util.function.IntBinaryOperator;
import java.util.function.Predicate;

import java.util.stream.Collectors;
import static org.jgroups.Message.Flag.OOB;
import static org.jgroups.conf.AttributeType.SCALAR;

Expand Down Expand Up @@ -51,6 +51,17 @@ public class NAKACK4 extends ReliableMulticast {
public NAKACK4 ackThreshold(int t) {ack_threshold=t; return this;}
@Override public Options sendOptions() {return SEND_OPTIONS;}

@ManagedAttribute(type = SCALAR)
public long getNumUnackedMessages() {
return seqno.get() - ack_table.min();
}

public Map<Address, Long> getNumUnackedMessagesPerDest() {
long seq = seqno.get();
return ack_table.minPerDest().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> seq - entry.getValue()));
}

@ManagedAttribute(description="Number of times sender threads were blocked on a full send window",type=SCALAR)
public long getNumBlockings() {
FixedBuffer<Message> buf=(FixedBuffer<Message>)sendBuf();
Expand Down
5 changes: 5 additions & 0 deletions src/org/jgroups/protocols/ReliableUnicast.java
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,11 @@ public int getNumUnackedMessages() {
return accumulate(Buffer::size, send_table.values());
}

public Map<Address, Integer> getNumUnackedMessagesPerDest() {
return send_table.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().buf.size()));
}

@ManagedAttribute(description="Total number of undelivered messages in all receive windows",type=SCALAR)
public int getXmitTableUndeliveredMessages() {
return accumulate(Buffer::size, recv_table.values());
Expand Down
5 changes: 5 additions & 0 deletions src/org/jgroups/protocols/UNICAST3.java
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,11 @@ public int getNumUnackedMessages() {
return accumulate(Table::size, send_table.values());
}

public Map<Address, Integer> getNumUnackedMessagesPerDest() {
return send_table.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().msgs.size()));
}

@ManagedAttribute(description="Total number of undelivered messages in all receive windows",type=SCALAR)
public int getXmitTableUndeliveredMessages() {
return accumulate(Table::size, recv_table.values());
Expand Down
9 changes: 9 additions & 0 deletions src/org/jgroups/util/AckTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ public long min() {
}
}

public Map<Address,Long> minPerDest() {
lock.lock();
try {
return new HashMap<>(acks);
} finally {
lock.unlock();
}
}

/** Adds an ACK from a sender to the map. Returns the old and new minimum */
public long[] ack(Address sender, long seqno) {
lock.lock();
Expand Down

0 comments on commit db8dd2b

Please sign in to comment.