Skip to content

Commit

Permalink
Async get and get bulk (#126)
Browse files Browse the repository at this point in the history
* WIP async get and get bulk

* WIP Removing formatting for easy code review

* WIP removing format changes

* handle inmemory & retries

* WIP get + get bulk with basic testing

* update to retry method

* WIP adding retry

* nebula-publish.yml: add missing secrets for candidate release

* WIP removing executor service from api from non blocking

* WIP: timeout thread

* split single wait thread

* working version of GET

* still working on the bulk

* refactored version of get

* working version of getBulk

* disabling timeout

* testing memory issue

* add timeout slots for less readtimeout

* review comments + adding more debug logs

* adding more logs to debug timeout error

* Bulk Async API with transcoder

* fix for timeout

* capturing timeout metrics for bulk

* refactoring

Co-authored-by: Sriram Rangarajan <srrangarajan@netflix.com>
  • Loading branch information
sriram-rangarajan and srrangarajan authored Oct 19, 2022
1 parent 46ef3f3 commit 047e055
Show file tree
Hide file tree
Showing 16 changed files with 1,324 additions and 147 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/nebula-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ jobs:
if: contains(github.ref, '-rc.')
run: ./gradlew --info --stacktrace -Prelease.useLastTag=true -PnetflixossPublishCandidatesToMavenCentral=true candidate -x test
env:
NETFLIX_OSS_SONATYPE_USERNAME: ${{ secrets.ORG_SONATYPE_USERNAME }}
NETFLIX_OSS_SONATYPE_PASSWORD: ${{ secrets.ORG_SONATYPE_PASSWORD }}
NETFLIX_OSS_SIGNING_KEY: ${{ secrets.ORG_SIGNING_KEY }}
NETFLIX_OSS_SIGNING_PASSWORD: ${{ secrets.ORG_SIGNING_PASSWORD }}
NETFLIX_OSS_REPO_USERNAME: ${{ secrets.ORG_NETFLIXOSS_USERNAME }}
Expand Down
136 changes: 106 additions & 30 deletions evcache-core/src/main/java/com/netflix/evcache/EVCache.java

Large diffs are not rendered by default.

740 changes: 719 additions & 21 deletions evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.netflix.evcache.dto;

public class EVCacheResponseStatus {
private String status;

public EVCacheResponseStatus(String status) {
this.status = status;
}

public String getStatus() {
return status;
}

public void setStatus(String status) {
this.status = status;
}
}
23 changes: 23 additions & 0 deletions evcache-core/src/main/java/com/netflix/evcache/dto/KeyMapDto.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.netflix.evcache.dto;

import com.netflix.evcache.EVCacheKey;

import java.util.Map;

public class KeyMapDto {
Map<String, EVCacheKey> keyMap;
boolean isKeyHashed;

public KeyMapDto(Map<String, EVCacheKey> keyMap, boolean isKeyHashed) {
this.keyMap = keyMap;
this.isKeyHashed = isKeyHashed;
}

public Map<String, EVCacheKey> getKeyMap() {
return keyMap;
}

public boolean isKeyHashed() {
return isKeyHashed;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -416,5 +416,4 @@ public String getStatusCode(StatusCode sc) {
public static final String META_GET_OPERATION = "M_GET";
public static final String META_SET_OPERATION = "M_SET";
public static final String META_DEBUG_OPERATION = "M_DEBUG";

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,11 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;

import com.netflix.evcache.EVCacheGetOperationListener;
import net.spy.memcached.internal.BulkGetCompletionListener;
import net.spy.memcached.internal.CheckedOperationTimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -153,6 +151,17 @@ public Map<String, T> getSome(long to, TimeUnit unit, boolean throwException, bo
}
}

public CompletableFuture<Map<String, T>> getSomeCompletableFuture(long to, TimeUnit unit, boolean throwException, boolean hasZF) {
CompletableFuture<Map<String, T>> completableFuture = new CompletableFuture<>();
try {
Map<String, T> value = getSome(to, unit, throwException, hasZF);
completableFuture.complete(value);
} catch (Exception e) {
completableFuture.completeExceptionally(e);
}
return completableFuture;
}

public Single<Map<String, T>> observe() {
return Single.create(subscriber ->
addListener(future -> {
Expand All @@ -165,6 +174,58 @@ public Single<Map<String, T>> observe() {
);
}

public <U> CompletableFuture<U> makeFutureWithTimeout(long timeout, TimeUnit units) {
final CompletableFuture<U> future = new CompletableFuture<>();
return EVCacheOperationFuture.withTimeout(future, timeout, units);
}

public CompletableFuture<Map<String, T>> getAsyncSome(long timeout, TimeUnit units) {
CompletableFuture<Map<String, T>> future = makeFutureWithTimeout(timeout, units);
doAsyncGetSome(future);
return future.handle((data, ex) -> {
if (ex != null) {
handleBulkException();
}
return data;
});
}

public void handleBulkException() {
ExecutionException t = null;
for (Operation op : ops) {
if (op.getState() != OperationState.COMPLETE) {
if (op.isCancelled()) {
throw new RuntimeException(new ExecutionException(new CancellationException("Cancelled")));
}
else if (op.hasErrored()) {
throw new RuntimeException(new ExecutionException(op.getException()));
}
else {
op.timeOut();
MemcachedConnection.opTimedOut(op);
t = new ExecutionException(new CheckedOperationTimeoutException("Checked Operation timed out.", op));
}
} else {
MemcachedConnection.opSucceeded(op);
}
}
throw new RuntimeException(t);
}
public void doAsyncGetSome(CompletableFuture<Map<String, T>> promise) {
this.addListener(future -> {
try {
Map<String, T> m = new HashMap<>();
Map<String, ?> result = future.get();
for (Map.Entry<String, ?> me : result.entrySet()) {
m.put(me.getKey(), (T)me.getValue());
}
promise.complete(m);
} catch (Exception t) {
promise.completeExceptionally(t);
}
});
}

public Single<Map<String, T>> getSome(long to, TimeUnit units, boolean throwException, boolean hasZF, Scheduler scheduler) {
return observe().timeout(to, units, Single.create(subscriber -> {
try {
Expand Down
Loading

0 comments on commit 047e055

Please sign in to comment.