Skip to content

Commit

Permalink
Added host while report readQ and invalidNode metrics
Browse files Browse the repository at this point in the history
Added Data size metrcs for read operations
  • Loading branch information
smadappa committed Jun 21, 2016
1 parent b3a6c49 commit c23aa09
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 32 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.netflix.evcache;

import static com.netflix.evcache.util.Sneaky.sneakyThrow;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -33,15 +35,12 @@
import com.netflix.spectator.api.DistributionSummary;

import net.spy.memcached.CachedData;
import net.spy.memcached.protocol.binary.BinaryOperationFactory;
import net.spy.memcached.transcoders.Transcoder;
import net.spy.memcached.util.StringUtils;
import rx.Observable;
import rx.Scheduler;
import rx.Single;

import static com.netflix.evcache.util.Sneaky.sneakyThrow;

/**
* An implementation of a ephemeral volatile cache.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -27,19 +26,13 @@
import com.netflix.evcache.EVCacheException;
import com.netflix.evcache.EVCacheLatch;
import com.netflix.evcache.EVCacheReadQueueException;
import com.netflix.evcache.EVCache.Call;
import com.netflix.evcache.event.EVCacheEvent;
import com.netflix.evcache.metrics.EVCacheMetricsFactory;
import com.netflix.evcache.metrics.Operation;
import com.netflix.evcache.operation.EVCacheFuture;
import com.netflix.evcache.operation.EVCacheFutures;
import com.netflix.evcache.operation.EVCacheLatchImpl;
import com.netflix.evcache.pool.observer.EVCacheConnectionObserver;
import com.netflix.evcache.util.EVCacheConfig;
import com.netflix.servo.monitor.Counter;
import com.netflix.servo.monitor.Stopwatch;
import com.netflix.servo.tag.BasicTag;
import com.netflix.servo.tag.Tag;

import net.spy.memcached.CASValue;
import net.spy.memcached.CachedData;
Expand Down Expand Up @@ -135,10 +128,9 @@ private Collection<String> validateReadQueueSize(Collection<String> canonicalKey
// Size - " + size + " for app " + appName + " & zone " + zone +
// " ; node " + node);
if (!canAddToOpQueue) {
EVCacheMetricsFactory.increment("EVCacheClient-" + appName + "-" + zone + "-READ_QUEUE_FULL");
EVCacheMetricsFactory.getCounter("EVCacheClient-" + appName + "-READ_QUEUE_FULL", evcNode.getBaseTags()).increment();
if (log.isDebugEnabled()) log.debug("Read Queue Full on Bulk Operation for app : " + appName
+ "; zone : " + zone + "; Current Size : " + size + "; Max Size : "
+ maxReadQueueSize.get() * 2);
+ "; zone : " + zone + "; Current Size : " + size + "; Max Size : " + maxReadQueueSize.get() * 2);
} else {
retKeys.add(key);
}
Expand All @@ -151,7 +143,7 @@ private boolean ensureWriteQueueSize(MemcachedNode node, String key) throws EVCa
if (node instanceof EVCacheNodeImpl) {
final EVCacheNodeImpl evcNode = (EVCacheNodeImpl) node;
if (!evcNode.isAvailable()) {
EVCacheMetricsFactory.increment("EVCacheClient-" + appName + "-" + zone + "-INACTIVE_NODE");
EVCacheMetricsFactory.getCounter("EVCacheClient-" + appName + "-INACTIVE_NODE", evcNode.getBaseTags()).increment();
pool.refreshAsync(evcNode);
}

Expand All @@ -162,7 +154,7 @@ private boolean ensureWriteQueueSize(MemcachedNode node, String key) throws EVCa
if (log.isDebugEnabled()) log.debug("App : " + appName + "; zone : " + zone + "; key : " + key
+ "; WriteQSize : " + size);
if (canAddToOpQueue) break;
EVCacheMetricsFactory.increment("EVCacheClient-" + appName + "-" + zone + "-WRITE_BLOCK");
EVCacheMetricsFactory.getCounter("EVCacheClient-" + appName + "-WRITE_BLOCK", evcNode.getBaseTags()).increment();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Expand All @@ -171,8 +163,7 @@ private boolean ensureWriteQueueSize(MemcachedNode node, String key) throws EVCa
if(startTime > 0) {
startTime -= 100;
} else {
Tag tag = new BasicTag("HOST", evcNode.getHostName());
EVCacheMetricsFactory.getCounter(appName, null, serverGroup.getName(), "EVCacheClient-" + appName + "-" + zone + "-INACTIVE_NODE", tag).increment();
EVCacheMetricsFactory.getCounter("EVCacheClient-" + appName + "-INACTIVE_NODE", evcNode.getBaseTags()).increment();
if (log.isDebugEnabled()) log.debug("Node : " + evcNode + " for app : " + appName + "; zone : "
+ zone + " is not active. Will Fail Fast and the write will be dropped for key : " + key);
return false;
Expand All @@ -188,29 +179,24 @@ private boolean validateNode(String key, boolean _throwException) throws EVCache
if (node instanceof EVCacheNodeImpl) {
final EVCacheNodeImpl evcNode = (EVCacheNodeImpl) node;
if (!evcNode.isAvailable()) {
EVCacheMetricsFactory.increment("EVCacheClient-" + appName + "-" + zone + "-INACTIVE_NODE");
EVCacheMetricsFactory.getCounter("EVCacheClient-" + appName + "-INACTIVE_NODE", evcNode.getBaseTags()).increment();
if (log.isDebugEnabled()) log.debug("Node : " + node + " for app : " + appName + "; zone : " + zone
+ " is not active. Will Fail Fast so that we can fallback to Other Zone if available.");
if (_throwException) throw new EVCacheException("Connection for Node : " + node + " for app : " + appName
+ "; zone : " + zone + " is not active");
return false;
}
}

// now check to see if the read queue is full.
if (node instanceof EVCacheNodeImpl) {
final int size = ((EVCacheNodeImpl) node).getReadQueueSize();
final int size = evcNode.getReadQueueSize();
final boolean canAddToOpQueue = size < maxReadQueueSize.get();
if (log.isDebugEnabled()) log.debug("Current Read Queue Size - " + size + " for app " + appName + " & zone "
+ zone);
if (!canAddToOpQueue) {
EVCacheMetricsFactory.increment("EVCacheClient-" + appName + "-" + zone + "-READ_QUEUE_FULL");
EVCacheMetricsFactory.getCounter("EVCacheClient-" + appName + "-" + zone + "-READ_QUEUE_FULL", evcNode.getBaseTags()).increment();
if (log.isDebugEnabled()) log.debug("Read Queue Full for Node : " + node + "; app : " + appName
+ "; zone : " + zone + "; Current Size : " + size + "; Max Size : "
+ maxReadQueueSize.get());
+ "; zone : " + zone + "; Current Size : " + size + "; Max Size : " + maxReadQueueSize.get());
if (_throwException) throw new EVCacheReadQueueException("Read Queue Full for Node : " + node + "; app : "
+ appName + "; zone : " + zone + "; Current Size : " + size
+ "; Max Size : " + maxReadQueueSize.get());
+ appName + "; zone : " + zone + "; Current Size : " + size + "; Max Size : " + maxReadQueueSize.get());
return false;
}
}
Expand Down Expand Up @@ -447,8 +433,7 @@ private boolean checkCRCChecksum(byte[] data, final ChunkInfo ci, boolean hasZF)
if (!hasZF) {
if (log.isWarnEnabled()) log.warn("CHECKSUM_ERROR : Chunks : " + ci.getChunks() + " ; "
+ "currentChecksum : " + currentChecksum + "; expectedChecksum : " + expectedChecksum
+ " for key : "
+ ci.getKey());
+ " for key : " + ci.getKey());
EVCacheMetricsFactory.increment(appName + "-CHECK_SUM_ERROR");
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import com.netflix.evcache.operation.EVCacheLatchImpl;
import com.netflix.evcache.operation.EVCacheOperationFuture;
import com.netflix.evcache.pool.ServerGroup;
import com.netflix.evcache.util.EVCacheConfig;
import com.netflix.servo.monitor.Stopwatch;
import com.netflix.servo.tag.BasicTag;
import com.netflix.servo.tag.Tag;
import com.netflix.spectator.api.DistributionSummary;

import net.spy.memcached.internal.GetFuture;
import net.spy.memcached.internal.OperationFuture;
Expand Down Expand Up @@ -55,6 +57,7 @@ public class EVCacheMemcachedClient extends MemcachedClient {
private final String zone;
private final ChainedDynamicProperty.IntProperty readTimeout;
private final ServerGroup serverGroup;
private DistributionSummary getDataSize, bulkDataSize, getAndTouchDataSize;

public EVCacheMemcachedClient(ConnectionFactory cf, List<InetSocketAddress> addrs,
ChainedDynamicProperty.IntProperty readTimeout, String appName, String zone, int id,
Expand Down Expand Up @@ -100,6 +103,11 @@ public void receivedStatus(OperationStatus status) {

@SuppressWarnings("unchecked")
public void gotData(String k, int flags, byte[] data) {

if (data != null) {
if(getDataSize == null) getDataSize = EVCacheConfig.getInstance().getDistributionSummary(appName + "-GetData-Size");
if (getDataSize != null) getDataSize.record(data.length);
}
if (!key.equals(k)) log.warn("Wrong key returned. Key - " + key + "; Returned Key " + k);
if (tc == null) {
if (tcService == null) {
Expand Down Expand Up @@ -167,6 +175,11 @@ public void receivedStatus(OperationStatus status) {

@Override
public void gotData(String k, int flags, byte[] data) {
if (data != null) {
if(bulkDataSize == null) bulkDataSize = EVCacheConfig.getInstance().getDistributionSummary(appName + "-BulkData-Size");
if (bulkDataSize != null) bulkDataSize.record(data.length);
}

m.put(k, tcService.decode(tc, new CachedData(flags, data, tc.getMaxSize())));
}

Expand Down Expand Up @@ -211,6 +224,11 @@ public void complete() {

public void gotData(String k, int flags, long cas, byte[] data) {
if (!key.equals(k)) log.warn("Wrong key returned. Key - " + key + "; Returned Key " + k);
if (data != null) {
if(getAndTouchDataSize == null) getAndTouchDataSize = EVCacheConfig.getInstance().getDistributionSummary(appName + "-GATData-Size");
if (getAndTouchDataSize != null) getAndTouchDataSize.record(data.length);
}

val = new CASValue<T>(cas, tc.decode(new CachedData(flags, data, tc.getMaxSize())));
}
});
Expand Down Expand Up @@ -280,7 +298,7 @@ public <T> OperationFuture<Boolean> asyncAppendOrAdd(final String key, int exp,
Operation op = opFact.cat(ConcatenationType.append, 0, key, co.getData(),
new OperationCallback() {
final Stopwatch operationDuration = EVCacheMetricsFactory.getStatsTimer(appName, serverGroup, "LatencyAoA").start();
boolean appendSuccess = true;
boolean appendSuccess = false;
@Override
public void receivedStatus(OperationStatus val) {
if (val.getStatusCode().equals(StatusCode.SUCCESS)) {
Expand All @@ -290,6 +308,7 @@ public void receivedStatus(OperationStatus val) {

EVCacheMetricsFactory.getCounter(appName + "-" + serverGroup.getName() + "-AoA-AppendCall-SUCCESS").increment();
rv.set(val.isSuccess(), val);
appendSuccess = true;
} else {
appendSuccess = false;
}
Expand All @@ -298,8 +317,8 @@ public void receivedStatus(OperationStatus val) {
@Override
public void complete() {
if(appendSuccess) {
rv.signalComplete();
latch.countDown();
rv.signalComplete();
} else {
Operation op = opFact.store(StoreType.add, key, co.getFlags(), exp, co.getData(), new StoreOperation.Callback() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class EVCacheNodeImpl extends BinaryMemcachedNodeImpl implements EVCacheN
protected final DynamicBooleanProperty sendMetrics;
protected final MonitorConfig baseConfig;
protected final TagList baseTags;
protected final TagList tags;
protected final TagList tags;

private long timeoutStartTime;

Expand Down Expand Up @@ -245,4 +245,7 @@ public int getId() {
return id;
}

public TagList getBaseTags() {
return baseTags;
}
}

0 comments on commit c23aa09

Please sign in to comment.