Skip to content

Commit

Permalink
add latch support for delete and touch calls. Touch and delete will h…
Browse files Browse the repository at this point in the history
…ave success and failure metrics
  • Loading branch information
smadappa committed Jul 21, 2016
1 parent 98caa2d commit accff80
Show file tree
Hide file tree
Showing 6 changed files with 194 additions and 44 deletions.
50 changes: 50 additions & 0 deletions evcache-client/src/main/java/com/netflix/evcache/EVCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,29 @@ <T> EVCacheLatch replace(String key, T value, Transcoder<T> tc, int timeToLive,
*/
Future<Boolean>[] delete(String key) throws EVCacheException;

/**
* Remove a current key value relation from the Cache.
*
* @param key
* the non-null key corresponding to the relation to be removed.
* Ensure the key is properly encoded and does not contain
* whitespace or control characters.
* @param policy
* The Latch will be returned based on the Policy. The Latch can
* then be used to await until the count down has reached to 0 or
* the specified time has elapsed.
*
* @return EVCacheLatch which will encompasses the Operation. You can block
* on the Operation based on the policy to ensure the required
* criteria is met. The Latch can also be queried to get details on
* status of the operations
*
* @throws EVCacheException
* in the rare circumstance where queue is too full to accept
* any more requests or any IO Related issues
*/
<T> EVCacheLatch delete(String key, EVCacheLatch.Policy policy) throws EVCacheException;

/**
* Retrieve the value for the given key.
*
Expand Down Expand Up @@ -820,6 +843,33 @@ <T> Map<String, T> getBulkAndTouch(Collection<String> keys, Transcoder<T> tc, in
*/
<T> Future<Boolean>[] touch(String key, int ttl) throws EVCacheException;


/**
* Remove a current key value relation from the Cache.
*
* Touch the given key and reset its expiration time.
*
* @param key
* the key to touch
* @param ttl
* the new expiration time in seconds
*
* @param policy
* The Latch will be returned based on the Policy. The Latch can
* then be used to await until the count down has reached to 0 or
* the specified time has elapsed.
*
* @return EVCacheLatch which will encompasses the Operation. You can block
* on the Operation based on the policy to ensure the required
* criteria is met. The Latch can also be queried to get details on
* status of the operations
*
* @throws EVCacheException
* in the rare circumstance where queue is too full to accept
* any more requests or any IO Related issues
*/
<T> EVCacheLatch touch(String key, int ttl, EVCacheLatch.Policy policy) throws EVCacheException;

/**
* Append the given value to the existing value in EVCache. If the Key does not exist the the key will added.
*
Expand Down
83 changes: 67 additions & 16 deletions evcache-client/src/main/java/com/netflix/evcache/EVCacheImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -640,15 +640,36 @@ public <T> T getAndTouch(String key, int timeToLive, Transcoder<T> tc) throws EV
}
}

@Override
public Future<Boolean>[] touch(String key, int timeToLive) throws EVCacheException {
final EVCacheLatch latch = this.touch(key, timeToLive, null);
if (latch == null) return new EVCacheFuture[0];
final List<Future<Boolean>> futures = latch.getAllFutures();
if (futures == null || futures.isEmpty()) return new EVCacheFuture[0];
final EVCacheFuture[] eFutures = new EVCacheFuture[futures.size()];
for (int i = 0; i < futures.size(); i++) {
final Future<Boolean> future = futures.get(i);
if (future instanceof EVCacheFuture) {
eFutures[i] = (EVCacheFuture) future;
} else if (future instanceof EVCacheOperationFuture) {
eFutures[i] = new EVCacheFuture(futures.get(i), key, _appName, ((EVCacheOperationFuture<Boolean>) futures.get(i)).getServerGroup());
} else {
eFutures[i] = new EVCacheFuture(futures.get(i), key, _appName, null);
}
}
return eFutures;
}


public <T> EVCacheLatch touch(String key, int timeToLive, Policy policy) throws EVCacheException {
if (null == key) throw new IllegalArgumentException();

final boolean throwExc = doThrowException();
final EVCacheClient[] clients = _pool.getEVCacheClientForWrite();
if (clients.length == 0) {
increment("NULL_CLIENT");
if (throwExc) throw new EVCacheException("Could not find a client to set the data");
return new EVCacheFuture[0]; // Fast failure
return new EVCacheLatchImpl(policy, 0, _appName); // Fast failure
}

final EVCacheEvent event = createEVCacheEvent(Arrays.asList(clients), Collections.singletonList(key), Call.TOUCH);
Expand All @@ -657,7 +678,7 @@ public Future<Boolean>[] touch(String key, int timeToLive) throws EVCacheExcepti
if (shouldThrottle(event)) {
increment("THROTTLED");
if (throwExc) throw new EVCacheException("Request Throttled for app " + _appName + " & key " + key);
return new EVCacheFuture[0];
return new EVCacheLatchImpl(policy, 0, _appName); // Fast failure
}
} catch(EVCacheException ex) {
if(throwExc) throw ex;
Expand All @@ -669,7 +690,8 @@ public Future<Boolean>[] touch(String key, int timeToLive) throws EVCacheExcepti

final String canonicalKey = getCanonicalizedKey(key);
try {
final EVCacheFuture[] futures = touchData(canonicalKey, key, timeToLive, clients);
final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy, clients.length, _appName);
touchData(canonicalKey, key, timeToLive, clients, latch);

if (touchTTLSummary == null) this.touchTTLSummary = EVCacheConfig.getInstance().getDistributionSummary(_appName + "-TouchData-TTL");
if (touchTTLSummary != null) touchTTLSummary.record(timeToLive);
Expand All @@ -679,13 +701,14 @@ public Future<Boolean>[] touch(String key, int timeToLive) throws EVCacheExcepti
if (event != null) {
event.setCanonicalKeys(Arrays.asList(canonicalKey));
event.setTTL(timeToLive);
event.setLatch(latch);
endEvent(event);
}
return futures;
return latch;
} catch (Exception ex) {
if (log.isDebugEnabled() && shouldLog()) log.debug("Exception touching the data for APP " + _appName + ", key : " + canonicalKey, ex);
if (event != null) eventError(event, ex);
if (!throwExc) return new EVCacheFuture[0];
if (!throwExc) return new EVCacheLatchImpl(policy, 0, _appName);
throw new EVCacheException("Exception setting data for APP " + _appName + ", key : " + canonicalKey, ex);
} finally {
if (log.isDebugEnabled() && shouldLog()) log.debug("TOUCH : APP " + _appName + " for key : " + canonicalKey + " with ttl : " + timeToLive);
Expand All @@ -698,10 +721,14 @@ private EVCacheFuture[] touchData(String canonicalKey, String key, int timeToLiv
}

private EVCacheFuture[] touchData(String canonicalKey, String key, int timeToLive, EVCacheClient[] clients) throws Exception {
return touchData(canonicalKey, key, timeToLive, clients, null);
}

private EVCacheFuture[] touchData(String canonicalKey, String key, int timeToLive, EVCacheClient[] clients, EVCacheLatch latch ) throws Exception {
final EVCacheFuture[] futures = new EVCacheFuture[clients.length];
int index = 0;
for (EVCacheClient client : clients) {
final Future<Boolean> future = client.touch(canonicalKey, timeToLive);
final Future<Boolean> future = client.touch(canonicalKey, timeToLive, latch);
futures[index++] = new EVCacheFuture(future, key, _appName, client.getServerGroup());
}
return futures;
Expand Down Expand Up @@ -1018,8 +1045,7 @@ public <T> EVCacheLatch set(String key, T value, Transcoder<T> tc, int timeToLiv

final String canonicalKey = getCanonicalizedKey(key);
final Operation op = EVCacheMetricsFactory.getOperation(_metricName, Call.SET, stats, Operation.TYPE.MILLI);
final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy,
clients.length, _appName);
final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy, clients.length, _appName);
try {
CachedData cd = null;
for (EVCacheClient client : clients) {
Expand Down Expand Up @@ -1152,6 +1178,27 @@ public <T> EVCacheFuture[] set(String key, T value) throws EVCacheException {
}

public EVCacheFuture[] delete(String key) throws EVCacheException {
final EVCacheLatch latch = this.delete(key, null);
if (latch == null) return new EVCacheFuture[0];
final List<Future<Boolean>> futures = latch.getAllFutures();
if (futures == null || futures.isEmpty()) return new EVCacheFuture[0];
final EVCacheFuture[] eFutures = new EVCacheFuture[futures.size()];
for (int i = 0; i < futures.size(); i++) {
final Future<Boolean> future = futures.get(i);
if (future instanceof EVCacheFuture) {
eFutures[i] = (EVCacheFuture) future;
} else if (future instanceof EVCacheOperationFuture) {
eFutures[i] = new EVCacheFuture(futures.get(i), key, _appName, ((EVCacheOperationFuture<Boolean>) futures.get(i)).getServerGroup());
} else {
eFutures[i] = new EVCacheFuture(futures.get(i), key, _appName, null);
}
}
return eFutures;

}

@Override
public <T> EVCacheLatch delete(String key, Policy policy) throws EVCacheException {
if (key == null) throw new IllegalArgumentException("Key cannot be null");

final boolean throwExc = doThrowException();
Expand All @@ -1160,7 +1207,7 @@ public EVCacheFuture[] delete(String key) throws EVCacheException {
increment("NULL_CLIENT");
if (throwExc) throw new EVCacheException("Could not find a client to delete the keyAPP " + _appName
+ ", Key " + key);
return new EVCacheFuture[0]; // Fast failure
return new EVCacheLatchImpl(policy, 0, _appName); // Fast failure
}

final EVCacheEvent event = createEVCacheEvent(Arrays.asList(clients), Collections.singletonList(key), Call.DELETE);
Expand All @@ -1169,7 +1216,7 @@ public EVCacheFuture[] delete(String key) throws EVCacheException {
if (shouldThrottle(event)) {
increment("THROTTLED");
if (throwExc) throw new EVCacheException("Request Throttled for app " + _appName + " & key " + key);
return new EVCacheFuture[0];
return new EVCacheLatchImpl(policy, 0, _appName); // Fast failure
}
} catch(EVCacheException ex) {
if(throwExc) throw ex;
Expand All @@ -1185,28 +1232,32 @@ public EVCacheFuture[] delete(String key) throws EVCacheException {
}

final Operation op = EVCacheMetricsFactory.getOperation(_metricName, Call.DELETE, stats);
final EVCacheLatchImpl latch = new EVCacheLatchImpl(policy == null ? Policy.ALL_MINUS_1 : policy,
clients.length, _appName);
try {
final EVCacheFuture[] futures = new EVCacheFuture[clients.length];
for (int i = 0; i < clients.length; i++) {
Future<Boolean> future = clients[i].delete(canonicalKey);
futures[i] = new EVCacheFuture(future, key, _appName, clients[i].getServerGroup());
Future<Boolean> future = clients[i].delete(canonicalKey, latch);
if (log.isDebugEnabled() && shouldLog()) log.debug("DELETE : APP " + _appName + ", Future " + future + " for key : " + canonicalKey);
}

if (event != null) {
event.setCanonicalKeys(Arrays.asList(canonicalKey));
event.setLatch(latch);
endEvent(event);
}
return futures;
return latch;
} catch (Exception ex) {
if (log.isDebugEnabled() && shouldLog()) log.debug("Exception while deleting the data for APP " + _appName + ", key : " + key, ex);
if (event != null) eventError(event, ex);
if (!throwExc) return new EVCacheFuture[0];
if (!throwExc) return new EVCacheLatchImpl(policy, 0, _appName);
throw new EVCacheException("Exception while deleting the data for APP " + _appName + ", key : " + key, ex);
} finally {
op.stop();
if (log.isDebugEnabled() && shouldLog()) log.debug("DELETE : APP " + _appName + " Took " + op.getDuration() + " milliSec for key : " + key);
}
}
}



public int getDefaultTTL() {
return _timeToLive;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
package com.netflix.evcache;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.inject.AbstractModule;
import com.google.inject.Singleton;
import com.netflix.evcache.pool.EVCacheClientPoolManager;

@Singleton
public class EVCacheModule extends AbstractModule {

private static final Logger log = LoggerFactory.getLogger(EVCacheModule.class);

public EVCacheModule() {
}

Expand All @@ -33,4 +28,4 @@ public boolean equals(Object obj) {
return (obj != null) && (obj.getClass() == getClass());
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1007,27 +1007,35 @@ public <T> Future<Boolean> add(String key, int exp, T value, Transcoder<T> tc) t
return evcacheMemcachedClient.add(key, exp, value, tc);
}


public <T> Future<Boolean> touch(String key, int timeToLive) throws Exception {
return touch(key, timeToLive, null);
}

public <T> Future<Boolean> touch(String key, int timeToLive, EVCacheLatch latch) throws Exception {
final MemcachedNode node = evcacheMemcachedClient.getEVCacheNode(key);
if (!ensureWriteQueueSize(node, key)) return getDefaultFuture();
if (!ensureWriteQueueSize(node, key)) {
final ListenableFuture<Boolean, OperationCompletionListener> defaultFuture = (ListenableFuture<Boolean, OperationCompletionListener>) getDefaultFuture();
if (latch != null && latch instanceof EVCacheLatchImpl) ((EVCacheLatchImpl) latch).addFuture(defaultFuture);
return defaultFuture;
}

if (enableChunking.get()) {
final ChunkDetails<?> cd = getChunkDetails(key);
if (cd.isChunked()) {
final List<String> keys = cd.getChunkKeys();
OperationFuture<Boolean>[] futures = new OperationFuture[keys.size() + 1];
futures[0] = evcacheMemcachedClient.touch(key + "_00", timeToLive);
futures[0] = evcacheMemcachedClient.touch(key + "_00", timeToLive, latch);
for (int i = 0; i < keys.size(); i++) {
final String prefix = (i < 10) ? "0" : "";
final String _key = key + "_" + prefix + i;
futures[i + 1] = evcacheMemcachedClient.touch(_key, timeToLive);
futures[i + 1] = evcacheMemcachedClient.touch(_key, timeToLive, latch);
}
return new EVCacheFutures(futures, key, appName, serverGroup, null);
return new EVCacheFutures(futures, key, appName, serverGroup, latch);
} else {
return evcacheMemcachedClient.touch(key, timeToLive);
return evcacheMemcachedClient.touch(key, timeToLive, latch);
}
} else {
return evcacheMemcachedClient.touch(key, timeToLive);
return evcacheMemcachedClient.touch(key, timeToLive, latch);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,12 @@
import java.util.Map;
import java.util.concurrent.CountDownLatch;

import net.spy.memcached.ops.Operation;
import net.spy.memcached.protocol.binary.BinaryOperationFactory;
import net.spy.memcached.protocol.binary.EVCacheNodeImpl;
import net.spy.memcached.util.StringUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import net.spy.memcached.ops.Operation;
import net.spy.memcached.protocol.binary.EVCacheNodeImpl;

public class EVCacheConnection extends MemcachedConnection {
private static Logger log = LoggerFactory.getLogger(EVCacheConnection.class);

Expand Down
Loading

0 comments on commit accff80

Please sign in to comment.