diff --git a/src/main/java/com/yelp/nrtsearch/server/grpc/LuceneServer.java b/src/main/java/com/yelp/nrtsearch/server/grpc/LuceneServer.java index 053058a24..5339c8e3b 100644 --- a/src/main/java/com/yelp/nrtsearch/server/grpc/LuceneServer.java +++ b/src/main/java/com/yelp/nrtsearch/server/grpc/LuceneServer.java @@ -794,10 +794,13 @@ public void onNext(AddDocumentRequest addDocumentRequest) { "indexing addDocumentRequestQueue size: %s, total: %s", addDocumentRequestQueue.size(), getCount(indexName))); try { + DeadlineUtils.checkDeadline("addDocuments: onNext", "INDEXING"); + List addDocRequestList = new ArrayList<>(addDocumentRequestQueue); Future future = globalState.submitIndexingTask( - new DocumentIndexer(globalState, addDocRequestList, indexName)); + Context.current() + .wrap(new DocumentIndexer(globalState, addDocRequestList, indexName))); futures.put(indexName, future); } catch (Exception e) { responseObserver.onError(e); @@ -821,6 +824,8 @@ private String onCompletedForIndex(String indexName) { "onCompleted, addDocumentRequestQueue: %s", addDocumentRequestQueue.size())); long highestGen = -1; try { + DeadlineUtils.checkDeadline("addDocuments: onCompletedForIndex", "INDEXING"); + // index the left over docs if (!addDocumentRequestQueue.isEmpty()) { logger.debug( @@ -874,24 +879,26 @@ private String onCompletedForIndex(String indexName) { public void onCompleted() { try { globalState.submitIndexingTask( - () -> { - try { - // TODO: this should return a map on index to genId in the response - String genId = "-1"; - for (String indexName : addDocumentRequestQueueMap.keySet()) { - genId = onCompletedForIndex(indexName); - } - responseObserver.onNext( - AddDocumentResponse.newBuilder() - .setGenId(genId) - .setPrimaryId(globalState.getEphemeralId()) - .build()); - responseObserver.onCompleted(); - } catch (Throwable t) { - responseObserver.onError(t); - } - return null; - }); + Context.current() + .wrap( + () -> { + try { + // TODO: this should return a map on index to genId in the response + String genId = "-1"; + for (String indexName : addDocumentRequestQueueMap.keySet()) { + genId = onCompletedForIndex(indexName); + } + responseObserver.onNext( + AddDocumentResponse.newBuilder() + .setGenId(genId) + .setPrimaryId(globalState.getEphemeralId()) + .build()); + responseObserver.onCompleted(); + } catch (Throwable t) { + responseObserver.onError(t); + } + return null; + })); } catch (RejectedExecutionException e) { logger.error("Threadpool is full, unable to submit indexing completion job"); responseObserver.onError( diff --git a/src/main/java/com/yelp/nrtsearch/server/luceneserver/AddDocumentHandler.java b/src/main/java/com/yelp/nrtsearch/server/luceneserver/AddDocumentHandler.java index 2c3adb74d..acca7c168 100644 --- a/src/main/java/com/yelp/nrtsearch/server/luceneserver/AddDocumentHandler.java +++ b/src/main/java/com/yelp/nrtsearch/server/luceneserver/AddDocumentHandler.java @@ -17,6 +17,7 @@ import com.google.protobuf.ProtocolStringList; import com.yelp.nrtsearch.server.grpc.AddDocumentRequest; +import com.yelp.nrtsearch.server.grpc.DeadlineUtils; import com.yelp.nrtsearch.server.grpc.FacetHierarchyPath; import com.yelp.nrtsearch.server.luceneserver.field.FieldDef; import com.yelp.nrtsearch.server.luceneserver.field.IdFieldDef; @@ -181,6 +182,8 @@ public DocumentIndexer( } public long runIndexingJob() throws Exception { + DeadlineUtils.checkDeadline("DocumentIndexer: runIndexingJob", "INDEXING"); + logger.debug( String.format( "running indexing job on threadId: %s",