Skip to content

Commit

Permalink
Support for add and moved appendOrAdd logic to EVCacheClient
Browse files Browse the repository at this point in the history
  • Loading branch information
smadappa committed May 10, 2016
1 parent d1bcc21 commit 71df7b8
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 26 deletions.
25 changes: 25 additions & 0 deletions evcache-client/src/main/java/com/netflix/evcache/EVCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,31 @@ <T> Map<String, T> getBulkAndTouch(Collection<String> keys, Transcoder<T> tc, in
*/
<T> Future<Boolean>[] append(String key, T value, int timeToLive) throws EVCacheException;

/**
* Add the given value to EVCache. You cannot add if the key already exist in EVCache.
*
* @param key
* the key which this object should be added to. Ensure the
* key is properly encoded and does not contain whitespace or
* control characters.
* @param T
* the value to be added
* @param tc
* the transcoder the will be used for serialization
* @param timeToLive
* the expiration of this object i.e. less than 30 days in
* seconds or the exact expiry time as UNIX time
*
* @return boolean which indicates if the add was successful or not.
* The operation will fail with a false response if the data already exists in EVCache.
*
* @throws EVCacheException
* in the rare circumstance where queue is too full to accept
* any more requests or issues Serializing the value or any IO
* Related issues
*/
<T> boolean add(String key, T value, Transcoder<T> tc, int timeToLive) throws EVCacheException;

/**
* Touch the given key and reset its expiration time.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ final public class EVCacheImpl implements EVCache {
private EVCacheInMemoryCache<?> cache;

private final EVCacheClientPoolManager _poolManager;
private DistributionSummary setTTLSummary, replaceTTLSummary, touchTTLSummary, setDataSizeSummary, replaceDataSizeSummary, appendDataSizeSummary;
private DistributionSummary setTTLSummary, replaceTTLSummary, addTTLSummary, touchTTLSummary, setDataSizeSummary, replaceDataSizeSummary, appendDataSizeSummary, addDataSizeSummary;
private Counter touchCounter;

EVCacheImpl(String appName, String cacheName, int timeToLive, Transcoder<?> transcoder, boolean enableZoneFallback,
Expand Down Expand Up @@ -1482,4 +1482,95 @@ public <T> Future<Boolean>[] appendOrAdd(String key, T value, Transcoder<T> tc,
}
}

@Override
public <T> boolean add(String key, T value, Transcoder<T> tc, int timeToLive) throws EVCacheException {
if ((null == key) || (null == value)) throw new IllegalArgumentException();

final boolean throwExc = doThrowException();
final EVCacheClient[] clients = _pool.getEVCacheClientForWrite();
if (clients.length == 0) {
increment("NULL_CLIENT");
if (throwExc) throw new EVCacheException("Could not find a client to Add the data");
return false;
}

final EVCacheEvent event = createEVCacheEvent(Arrays.asList(clients), Collections.singletonList(key), Call.ADD);
if (event != null) {
if (shouldThrottle(event)) {
increment("THROTTLED");
if (throwExc) throw new EVCacheException("Request Throttled for app " + _appName + " & key " + key);
return false;
}
startEvent(event);
}

final String canonicalKey = getCanonicalizedKey(key);
final Operation op = EVCacheMetricsFactory.getOperation(_metricName, Call.ADD, stats, Operation.TYPE.MILLI);
try {
final EVCacheFuture[] futures = new EVCacheFuture[clients.length];
CachedData cd = null;
int index = 0;
for (EVCacheClient client : clients) {
if (cd == null) {
if (tc != null) {
cd = tc.encode(value);
} else if ( _transcoder != null) {
cd = ((Transcoder<Object>)_transcoder).encode(value);
} else {
cd = client.getTranscoder().encode(value);
}
}
final Future<Boolean> future = client.add(canonicalKey, timeToLive, cd);
futures[index++] = new EVCacheFuture(future, key, _appName, client.getServerGroup(), client);

if (cd != null) {
if (addDataSizeSummary == null) this.addDataSizeSummary = EVCacheConfig.getInstance().getDistributionSummary(_appName + "-AddData-Size");
if (addDataSizeSummary != null) this.addDataSizeSummary.record(cd.getData().length);
}
}
if (event != null) {
event.setCanonicalKeys(Arrays.asList(canonicalKey));
event.setCachedData(cd);
endEvent(event);
}

if(futures.length == 0) return false;

int successCount = 0, failCount = 0;
for(int i = 0; i < futures.length; i++) {
final EVCacheFuture future = futures[i];
if(future.get() == Boolean.TRUE) {
successCount++;
if(log.isDebugEnabled()) log.debug("ADD Success : APP " + _appName + ", key " + key);
} else {
failCount++;
if(log.isDebugEnabled()) log.debug("ADD Fail : APP " + _appName + ", key " + key);
}
}

if(successCount > 0 && failCount > 0) {
for(int i = 0; i < futures.length; i++) {
final EVCacheFuture future = futures[i];
if(future.get() == Boolean.TRUE) {
final EVCacheClient client = future.getEVCacheClient();
client.delete(canonicalKey);
}
}
return false;
} else {
if(failCount == 0) return true;
else return false;
}

} catch (Exception ex) {
if (log.isDebugEnabled() && shouldLog()) log.debug("Exception adding the data for APP " + _appName + ", key : " + canonicalKey, ex);
if (event != null) eventError(event, ex);
if (!throwExc) return false;
throw new EVCacheException("Exception adding data for APP " + _appName + ", key : " + canonicalKey, ex);
} finally {
op.stop();
if (log.isDebugEnabled() && shouldLog()) log.debug("ADD : APP " + _appName + ", Took " + op.getDuration() + " milliSec for key : " + canonicalKey);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -26,7 +27,11 @@
import com.netflix.evcache.EVCacheException;
import com.netflix.evcache.EVCacheLatch;
import com.netflix.evcache.EVCacheReadQueueException;
import com.netflix.evcache.EVCache.Call;
import com.netflix.evcache.event.EVCacheEvent;
import com.netflix.evcache.metrics.EVCacheMetricsFactory;
import com.netflix.evcache.metrics.Operation;
import com.netflix.evcache.operation.EVCacheFuture;
import com.netflix.evcache.operation.EVCacheFutures;
import com.netflix.evcache.operation.EVCacheLatchImpl;
import com.netflix.evcache.pool.observer.EVCacheConnectionObserver;
Expand Down Expand Up @@ -76,6 +81,7 @@ public ConnectionFactory getConnectionFactory() {

private final ChainedDynamicProperty.IntProperty readTimeout;
private final ChainedDynamicProperty.IntProperty bulkReadTimeout;
private final DynamicIntProperty operationTimeout;
private final DynamicIntProperty maxReadQueueSize;
private final ChainedDynamicProperty.BooleanProperty enableChunking;
private final ChainedDynamicProperty.IntProperty chunkSize;
Expand All @@ -99,6 +105,7 @@ public ConnectionFactory getConnectionFactory() {
this.readTimeout = readTimeout;
this.bulkReadTimeout = bulkReadTimeout;
this.maxReadQueueSize = maxReadQueueSize;
this.operationTimeout = operationTimeout;
this.pool = pool;
this.connectionFactory = pool.getEVCacheClientPoolManager().getConnectionFactoryProvider().getConnectionFactory(appName, id, serverGroup, pool.getEVCacheClientPoolManager());
this.enableChunking = EVCacheConfig.getInstance().getChainedBooleanProperty(this.serverGroup.getName()+ ".chunk.data", appName + ".chunk.data", Boolean.FALSE);
Expand Down Expand Up @@ -853,8 +860,7 @@ public <T> Future<Boolean> set(String key, T value, int timeToLive) throws Excep
public <T> Future<Boolean> set(String key, T value, int timeToLive, EVCacheLatch evcacheLatch) throws Exception {
final MemcachedNode node = evcacheMemcachedClient.getEVCacheNode(key);
if (!node.isActive()) {
if (log.isInfoEnabled()) log.info("Node : " + node
+ " is not active. Failing fast and dropping the write event.");
if (log.isInfoEnabled()) log.info("Node : " + node + " is not active. Failing fast and dropping the write event.");
final ListenableFuture<Boolean, OperationCompletionListener> defaultFuture = (ListenableFuture<Boolean, OperationCompletionListener>) getDefaultFuture();
if (evcacheLatch != null && evcacheLatch instanceof EVCacheLatchImpl) ((EVCacheLatchImpl) evcacheLatch)
.addFuture(defaultFuture);
Expand All @@ -877,15 +883,14 @@ public <T> Future<Boolean> set(String key, T value, int timeToLive, EVCacheLatch
final String prefix = (i < 10) ? "0" : "";
futures[i] = evcacheMemcachedClient.set(key + "_" + prefix + i, timeToLive, cd[i], null, null);
}
evcacheMemcachedClient.delete(key);// ensure we are deleting
// the unchunked key if
// it exists. Ignore
// return value since it
// may not exist.
// ensure we are deleting the unchunked key if it exists.
// Ignore return value since it may not exist.
evcacheMemcachedClient.delete(key);
return new EVCacheFutures(futures, key, appName, serverGroup, evcacheLatch);
} else {
delete(key);// delete all the chunks if they exist as the
// data is moving from chunked to unchunked
// delete all the chunks if they exist as the
// data is moving from chunked to unchunked
delete(key);
return evcacheMemcachedClient.set(key, timeToLive, value, null, evcacheLatch);
}
} else {
Expand Down Expand Up @@ -933,33 +938,58 @@ public <T> Future<Boolean> replace(String key, T value, int timeToLive, EVCacheL
throw e;
}
}

public boolean appendOrAdd(String key, CachedData value, int timeToLive) throws EVCacheException {
int i = 0;
try {
do {
final Future<Boolean> future = evcacheMemcachedClient.append(key, value);
try {
if(future.get(operationTimeout.get(), TimeUnit.MILLISECONDS) == Boolean.FALSE) {
final Future<Boolean> f = evcacheMemcachedClient.add(key, timeToLive, value);
if(f.get(operationTimeout.get(), TimeUnit.MILLISECONDS) == Boolean.TRUE) {
return true;
}
} else {
return true;
}
} catch(TimeoutException te) {
return false;
}
} while(i++ < 2);
} catch (Exception ex) {
if (log.isDebugEnabled() ) log.debug("Exception appendOrAdd data for APP " + appName + ", key : " + key, ex);
return false;
}
return false;
}

public <T> Future<Boolean> add(String key, int exp, T value, Transcoder<T> tc) throws Exception {
if (enableChunking.get()) throw new EVCacheException(
"This operation is not supported as chunking is enabled on this EVCacheClient.");

public <T> Future<Boolean> add(String key, int exp, T value) throws Exception {
if (enableChunking.get()) throw new EVCacheException("This operation is not supported as chunking is enabled on this EVCacheClient.");
if (addCounter == null) addCounter = EVCacheMetricsFactory.getCounter(serverGroup.getName() + "-AddCall");

final MemcachedNode node = evcacheMemcachedClient.getEVCacheNode(key);
if (!node.isActive()) return getDefaultFuture();

ensureWriteQueueSize(node, key);
addCounter.increment();
return evcacheMemcachedClient.add(key, exp, value, tc);
return evcacheMemcachedClient.add(key, exp, value, null);
}

public <T> Future<Boolean> add(String key, int exp, T value) throws Exception {
if (enableChunking.get()) throw new EVCacheException(
"This operation is not supported as chunking is enabled on this EVCacheClient.");
public <T> Future<Boolean> add(String key, int exp, T value, Transcoder<T> tc) throws Exception {
if (enableChunking.get()) throw new EVCacheException("This operation is not supported as chunking is enabled on this EVCacheClient.");
if (addCounter == null) addCounter = EVCacheMetricsFactory.getCounter(serverGroup.getName() + "-AddCall");

final MemcachedNode node = evcacheMemcachedClient.getEVCacheNode(key);
if (!node.isActive()) return getDefaultFuture();

ensureWriteQueueSize(node, key);
addCounter.increment();
return evcacheMemcachedClient.add(key, exp, value);
return evcacheMemcachedClient.add(key, exp, value, tc);
}


public <T> Future<Boolean> touch(String key, int timeToLive) throws Exception {
final MemcachedNode node = evcacheMemcachedClient.getEVCacheNode(key);
if (!node.isActive()) return getDefaultFuture();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,4 +338,10 @@ public String toString() {
return appName + "_" + zone + " _" + id;
}

@SuppressWarnings("unchecked")
public <T> OperationFuture<Boolean> add(String key, int exp, T o, final Transcoder<T> tc, EVCacheLatch latch) {
Transcoder<T> t = (Transcoder<T>) ((tc == null) ? transcoder : tc);
return asyncStore(StoreType.add, key, exp, o, t, latch);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,15 @@ protected boolean appendOrAdd(int i, EVCache gCache, int ttl) throws Exception {
return true;
}

public boolean add(int i, EVCache gCache) throws Exception {
//String val = "This is a very long value that should work well since we are going to use compression on it. blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah val_"+i;
String val = "val_add_"+i;
String key = "key_" + i;
boolean status = gCache.add(key, val, null, 24 * 60 * 60);
if(log.isDebugEnabled()) log.debug("ADD : key : " + key + "; success = " + status);
return status;
}

public boolean insert(int i, EVCache gCache) throws Exception {
//String val = "This is a very long value that should work well since we are going to use compression on it. blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah val_"+i;
String val = "val_"+i;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,23 @@ public void testEVCache() {
this.evCache = getNewBuilder().setAppName("EVCACHE").setCachePrefix("cid").enableRetry().build();
assertNotNull(evCache);
}

@Test(dependsOnMethods = { "testEVCache" })
public void testDelete() throws Exception {
for (int i = 0; i < loops; i++) {
delete(i, evCache);
}
}

@Test(dependsOnMethods = { "testDelete" })
public void testAdd() throws Exception {
for (int i = 0; i < loops; i++) {
assertTrue(add(i, evCache));
}
}


@Test(dependsOnMethods = { "" })
public void testInsertBinary() throws Exception {
for (int i = 0; i < loops; i++) {
assertTrue(insertBytes(i, evCache));
Expand Down Expand Up @@ -189,17 +204,13 @@ public void testAppendOrAdd() throws Exception {
}
}

@Test(dependsOnMethods = { "testAppendOrAdd" })
public void testDelete() throws Exception {
for (int i = 0; i < loops; i++) {
delete(i, evCache);
}
}

public void testAll() {
try {
setupEnv();
testEVCache();
testDelete();
testAdd();
Thread.sleep(5000);
testInsertBinary();
testInsert();

Expand Down

0 comments on commit 71df7b8

Please sign in to comment.