-
-
Notifications
You must be signed in to change notification settings - Fork 33
Fault Tolerant Indexing
Fault tolerance is the property of gracefully recovering from a failure. In the case of search ingestion we primarily care about the case where some or all of the infrastructure suffers an ungraceful shutdown or disconnection. Additionally we will want to handle cases involving errors during processing.
|
this document is not fully implemented until Issue #84 is resolved |
When a document is being processed it is initially read or received from a source. After it is read/received it spends some amount of time progressing through steps that conduct a variety of transformations, and enrichments. At one or more points the document, or information derived from the document is transmitted to one or more external systems. In our case the primary focus is transmission to a search engine, but transmission to multiple search engines, or to secondary systems may also be important. For example a document in a a back end filesystem or CMS might be read, and processed by JesterJ. The plan with which JesterJ is configured might include a step to emit the extracted text from the document to a search engine for indexing, and another step to ftp the original document to a filesystem served by an http server that will be linked in the search results.
A Shutdown is an event in which a node or service/step in the system quits with notification of its departure. The node/step shutting down may wait to drain all, some or none of presently queued work. In the event of a shutdown with incomplete draining of processing tasks, a document from the source flowing through such a pipeline might have been:
-
Undiscovered, and not (yet) processed in any way
-
Discovered and read, but not emitted to either the search engine or the http server
-
Indexed but not written to the http server
-
Written to the http server but not indexed
-
Fully processed and written to both the http server and the search engine
The key aspect that distinguishes shutdown however is that we are notified that there is a need to clean-up, and direct work elsewhere. There is potential for a shutdown to be temporary, but except in the case of very short temporary shutdowns, there is no advantage in treating this case specially.
Disconnection is similar to shutdown except the system that becomes disconnected has no opportunity to notify any other system of its unavailability. Such a situation may be limited to inability to accept or emit documents, but more frequently the system is entirely disconnected. Since latency is normal and in small quantities does not indicate a disconnection, there needs to be a response latency threshold, which when exceeded generates an event indicating the loss of the node. Once this event is generated the response will be similar to a shutdown with zero drain.
Imagine a processing system comprised of multiple nodes cooperating to index various documents. If one node looses network connection, its processing might be delayed indefinitely. After some time, it may be necessary to assume that the node has failed and ensure that documents are not sent to it. The document must be restarted on separate nodes IF there is a complete path to success remaining in the system.
A system that has lost a node has one of three possible states: 1. Incapacitated - the lost node was the only node capable of certain processing feats required for all documents. No documents can be processed. This is a simpler case and can (mostly) be treated as if a full system shut down occurred. 1. Disabled - the lost node was the only node capable of certain processing feats required for some documents but not all documents. Some documents may be processed and others need to be delayed until the node returns. 1. Diminished - the lost node did not contain unique capabilities, and the system may have lost capacity but all documents may still be properly processed. In this case the challenge is to time out the processing of documents that were "in flight" in the lost node and if the lost node re-joins invalidate any processing that has already been restarted.
Additionally, any of the above states might be temporary or "permanent" (here permanent refers to any condition where the system must await manual intervention by a human, i.e. until the missing node can be restarted). In the event that the capabilities are the result of the assignment of steps to particular nodes those steps could be reassigned to remaining nodes, and in the event that an auto-scaling scheme is in use, new nodes may be started to compensate. However if the steps require specific hardware or software that is not available, or for which additional licenses are not available, starting new nodes may not be an option (making the above states permanent).
The third type of fault that may be experienced is an error at the document level. This is distinguished by the fact that the problematic condition is not detectable until an attempt is made to process a document. Such errors may be "universal" or "document specific" meaning that in the former case the error will occur for a set of documents delimited exclusively by a time period in which processing is attempted, or in the latter case a set of documents defined by some other criteria, such as PDF documents that have been encrypted. Additionally, errors may be considered either transient, or permanent. Thus we can draw a set of quadrants like this:
| Transient | Permanent | -------------+-----------+-----------+ Universal | A | B | -------------+-----------+-----------+ Doc Specific | C | D | -------------+-----------+-----------+
Some examples of each quadrent are:
-
A. Timeouts due to overloaded local processor; Failure to write to the search engine due to network outage; External database down or overloaded;
-
B. Incorrect configuration resulting in Exception or inability to connect.
-
C. External source for Geo Coding unavailable (if not all documents require geo-coding);
-
D. Encrypted PDFs for which text cannot be extracted; Documents missing a required field leading to an Exception; Documents with malfomred data i.e. unclosed tags in XML
Quadrants A and D are most common with C usually being essentially the same as A, but for a service only required by a subset of documents. Quadrant B usually represents a programmer or configuration error. Furthermore if the errors are universal and permanent then all documents fail to be written to one or more outputs.
Permanent errors generally require human interaction for resolution. A well designed system should identify such errors when possible, attempt to notify the relevant humans, and not waste resources re-attempting processing. On the other hand a document experiencing a transient error should be re-attempted after a delay.
Finally, to bring the discussion full circle, one can think of a document that entered a processing stage and never exited due to a shutdown or disconnection as having experienced a universal, transient error condition. (Quadrant A) Thus our key tasks in fault tolerant indexing are detecting/categorizing errors, marking/tracking documents for retry or preventing hopeless retries, and of course communicating errors to the humans in charge.
The net result of the foregoing is that from the perspective of the document we only need to handle two cases Transient errors (where the document should be processed again at the next available opportunity) and permanent errors where the document must not be processed again (at least not until a new version of it is provided).
From the perspective of the system and routing of documents we will want to (eventually) detect and adapt to shutdowns and unavailability.
There are perhaps cases where a processing step is unable to complete all it’s prospective tasks, but having the document arrive in the index partially processed is better than having the document not be searchable at all. For the purposes of this discussion we only consider fatal errors where processing was halted. A notification and monitoring infrastructure might be contemplated later to facilitate handling of these cases (i.e. queuing the document again). For the near term handling of non-fatal exceptional cases will be handled by the user’s custom processors, and not the FTI infrastructure.
In the simplest cases, the nature of the error is anticipated and diagnosed in code by the author of a particular Document Processor. In this case the programer should be able to effortlessly identify whether or not the error is transient or permanent. By definition a document with a permanent error must never be re-processed unless it has changed in some fashion. The system might detect this change (by a change in the hash code) or an administrator may wish to signal this change directly with a command specifying that the document should be reprocessed.
In the face of an error that is not specifically anticipated by the programer, the error type may be initially assumed to be transient and the document automatically resubmitted, possibly after a delay. After the same document produces the same error for a certain number of attempts exponential back off may be required, but eventually it may be declared to have an unknown fatal flaw and marked as failed. Subsequent ingestion then requires a human to examine and resubmit the document.
If a document starts processing in a stage, and does not exit the stage in a timely manner it may be declared failed. In this case processing must be interrupted before the status indicating failure is set for the document. Otherwise the document might be omitted just after the status was set creating a duplicate processing of the document.
The notion that a document has a singular status is a reduction from the trivial case of a perfectly linear pipeline for a particular document. In truth, the document itself is entirely unimportant. What is important is the execution of steps that have an possible external side effect.
This is the more common case where the important effect is an output to a system external to the JesterJ system. Common examples would be:
-
Indexing the document into a search engine
-
Placing an event on a queue notifying external listeners of the document’s availability
-
Archiving a copy of the original document to a file system or other store.
-
Initiating an incremental update to corpus statistics every Nth document.
There are 3 possible levels of side effect which we will refer to as SAFE, IDEMPOTENT and POTENT.
-
SAFE steps are ones that have no side effects. These steps do not need to be tracked for fault tolerance.
-
IDEMPOTENT steps are ones that do have a side effect but can be repeated any number of times. The order of a set of IDEMPOTENT steps is important but if the same idempotent steps are performed in the same order the result should also be IDEMPOTENT. In JesterJ step order is only guaranteed within a particular path through the plan’s DAG, so there’s no need to track order of execution for the purposes of fault tolerance. Designing a processing plan with execution order dependencies across disparate paths through the DAG is a failed plan design.
-
POTENT is a term I am coining for this discussion to take the place of the cumbersome and verbally confusing "NON-IDEMPOTENT". This refers to any step which must only be executed once for a particular document, such as a step that invokes an API that incurs a usage fee, or increments a persistent value.
It is important to note that the handling of POTENT steps is very closely tied to the notion of document identity and the built in data derived hash codes that drive our default fault tolerance indexing implementation will consider versions of a document to be distinct documents (i.e. if you update a word doc with new formatting elements, it would change the hash code and therefore be handled as a novel document.
It is impossible to guarantee proper handling of a document in a fault tolerant manner in any case where a non-safe step follows a POTENT step unless the POTENT step can be skipped. Therefore it is also important to track whether or not the step is
-
MUTATING - the document is altered and the output of the step differs from the input, including steps that produce children
-
PRESERVING - the document is unaltered and the output of the step that will be handed to the next step exactly matches the input, drop or otherwise ignore the document.
Fault tolerance is not possible if a step that is both POTENT and MUTATING step is also emitting the document for further processing. The use of any such step might be considered an anti-pattern since there is no point in causing mutations that will not be recorded somewhere, and if the mutations are made in advance of persistence, they could have been a separate step. Good step design will choose between either POTENT or MUTATING, but not all systems need fault tolerance so this will only result in an error if fault tolerance is enabled and a step that is both POTENT and MUTATING is has successor steps.
Another common aspect of document processing is the need to split documents into child documents. A parent document cannot be considered complete unless all children (or children’s children) have completed. It is necessary to track both the child document id (to ensure children are not processed twice by POTENT steps) and the parent ID (to ensure that parent documents are re-ingested if some children have not been processed). This becomes even more complex if children can be further split int sub-children. This type of design could easily crop up if single large XML, CSV or JSON documents need to be represented as a complex set of documents in an index.
To handle arbitrary layers of children it is necessary to to track:
-
id (the current doc)
-
parent id (the most immediate parent document)
-
original id (the source document)
-
The hash of the original document to distinguish children generated by other versions of the same original.
The original ID can be used to fetch the original source for reprocessing, and coordinate the reprocessing of sibling or otherwise related documents. This coordination is important because if a million line CSV is meant to create a million documents in a search index, and processing fails after only 100k docs are in the index, we want to neither reprocess those 100k nor do we want to trigger a reprocessing of the entire CSV for every inflight document that was dropped (we should only reprocess the CSV once, not hundreds or thousands of times!). To avoid wasted reprocessing, upon detecting a failed child document we will need to gather all records for children of the original and ensure that the re-feed of the original marks all related failures as in progress.
This is a really thorny problem involving collecting the statuses across many children (possibly across multiple generations of document splitting). To handle our million line CSV hypothetical above, and imagining one column of the CSV contained a JSON structure that implied multiple levels of sub-children we would need to find the set of successful and unsuccessful children, and the parent record. To succeed we need to consider the following: - We must mark all failed children as processing in a single atomic operation. - We need to ensure that children already processed are skipped. - For correct end results all POTENT steps must identify and exclude previously successful children. - Though not necessary for correctness, to reduce load we also likely want to skip all SAFE and IDEMPOTENT steps for previously successful documents.
So the basic step infrastructure will want to filter previously successful children as soon as they are detected.
The above complexity in knowing if all child documents have been processed is complicated by the possibility that the hypothetical million line csv produces a variety of children that require different processing steps. JesterJ allows for a DAG of processing steps for just this reason, but if there are POTENT steps for which some child docs (or even some parent docs) do not reach by design, then we need to avoid marking the document and the child document incomplete when they have not reached that step. To identify cases where a document had correctly skipped a branch that has a POTENT step, we need to also account for routing decisions.
Conceptually there are two possible attributes of a router, determinism and distribution.
An example of a deterministic router is RouteByStepName which looks at a field value and matches it with the name of a down stream step. An example of a non-deterministic router is RoundRobinRouter which balances load across downstream steps.
A fully distributive router is one that sends an identical copy of a document to every downstream steps. A partially distributive router sends a copy to only a portion of the downstream steps. RouteByStepName is partially distributive and DuplicateToAll is fully distributive
The following image shows an example plan with routers that vary in both dimensions
ℹ️
|
An example plan with complicated, hard to track routing. The blue step is a scanner (document source), Red steps are document outputs (POTENT steps), Green shading indicates a deterministic router and blue shading indicates a non-deterministic router. In a real document processing plan the "ProcessTypeA" and "ProcessTypeB" would normally a path of several steps, but are shown as a single step here for brevity. |
|
Though the above example uses writes to solr as a POTENT step, this would indicate an abnormal (and possibly flawed) design for your Solr index. Typically writes to Solr are IDEMPOTENT. |
Fault tolerance can only be achieved to the extent we are able to predict the destination of the document. When faced with a non-deterministic routing scenario any valid downstream path represents a completion. Thus plan authors have to be very wary of unequal paths downstream of a non-deterministic router, and if the non-deterministic paths include POTENT steps fault tolerance will fail. We should detect and error out any plan with such a configuration that also turns on fault tolerance.
To handle this we need to form a completion expression that identifies child docs that are fully complete, which would be a boolean expression like this for the original document format scanner:
(LineItemIndex && IndexSolrCombined) || (IndexSolrB && IndexSolrCombined && (MessageClientA || MessageClientB || MessageClientC))
❗
|
The expression is the same for all children from any source, and is therefore an attribute of the scanner that can be calculated at plan instantiation. From a persistence perspective we just need to know if a child document has completed each POTENT step. |
The terms can be marked true for an individual child if the child document reached and completed the step and false if it did not. If these substitutions produce an expression that evaluates to true, then the child document does not need to be reprocessed. The parent does not need to be reprocessed if all the child expressions evaluate to true.
We will not attempt to handle merging and Sorting of documents across the DAG. This type of design is a major anti-pattern because it implies a retention of arbitrary numbers of documents. Such tasks must be handled outside of JesterJ in a DBMS or other system designed to hold large sets of data and perform calculations on sets of data efficiently. JesterJ is not a database and will not try to support database operations. Systems like this will likely require custom code to stash intermediate states in a store that can perform these operations, and then initiate a second plan for further processing, or at least be read by a second input/source step (scanner) in the plan.
JesterJ will not endeavor to solve what I will call the "Transient Version" problem. Specifically FTI will not ensure that all versions of a document are sent to all destinations for the following case:
-
Document X is scanned
-
Document X traverses the configured plan
-
Document X is emitted to destinations A and B
-
Document X is updated to v2
-
On a subsequent scan Document X(v2) is scanned.
-
Document X(v2) traverses the configured plan
-
Document X(v2) is emitted to destination A
-
Document X(v2) is NOT emitted to destination B because of a JesterJ shutdown/crash/power issue
-
Document X is updated to v3 before JesterJ is restarted
-
JesterJ is restarted
-
On startup, JesterJ will re-read X and send it for processing again.
-
Document X(v3) is emitted to destinations A and B
|
X(v2) will never reach B |
This is because JesterJ does not store the content of documents in a persistent store while processing. Such a feature requires a lot of extra disk IO or network traffic. Cassandra is used for FTI and massive numbers of deletes of full document content are… "worrisome". If at some time JesterJ does solve this issue it will be as a configurable option, and turning on that option may require configuration of a secondary data store.
ℹ️
|
If such a use case is truly important to you there’s nothing to stop you from writing a processor node that records what was sent into the plan in the first step, and then adding a callback in Solr as a custom update processor that can mark it as "received". Your custom processor would then load the missed version(s) and emit a document for each version, and the current version. Assuming these were all destined for a constant path they should then arrive in correct order (though we don’t have any tests for this right now). Obviously any randomizing routers or versions that take different paths could lead to out of order delivery |
Since JesterJ allows for routing documents through paths that may have a different number of steps, and may perform differing operations that take arbitrary amounts of time, there is no way to guarantee that sending documents A, B, C will result in them being indexed in the same order.
The below code is meant to represent what needs to happen for a single node system that does not need to worry about the possibility of other nodes being up. Additional logic will be required when clustering is introduced for 2.0.
There is no special code to run on system shutdown. Since it is not acceptable to loose anything if we experience a loss of power at the physical hardware level, taking any precautions on shutdown can only mask the flaws we want to fix. Every shutdown should be a kill -9.
Based on the foregoing shutdown goals we must not make assumptions regarding the state when we shut down.
parseArgs()
if (argsIndicateStartProcessing()) {
initializePersistentStore()
plan = loadPlanFromUserSuppliedJar()
if(newPlanVersion(plan)) { // based on serialVersionUID of plan
doPlanMigration() // (future feature)
}
analyzePlanToFindPotentSteps() // identify outputs
analyzePlanToFindScanners() // identify inputs
calculateCompleteChildExpressionForPlan()
incompleteProcessing = queryPersistentStoreForIncompleteDocs() // in progress and transient errors [1]
origDocs<doc,children> = groupByOriginal(incompleteProcessing)
origDocs = sortByOrigDocScanTimestampAsc(origDocs) // process in same order as previously seen.
for(origDocs:originalDoc) {
// NOTE: step/scanner translation logic required in Plan if Plan has changed.
scanner = original.lookupScanner(plan);
potentSteps = scanner.ListPotentSteps()
// all steps skipable for successful children
// This relies on the "Complete Child Expression" described above
successfulChildren = filterForSuccess(originalDoc.getAllDescendants())
for(successfulChildren:child) {
for(potentSteps:step) {
originalDoc.addSkipablePotentStep(child,step)
}
}
// only some step skipable for partially successful children
partialSuccessChildren = filterForPartialSuccess(originalDoc.getAllDescendants());
for(successfulChildren:child) {
for(potentSteps:step) {
if(previouslyCompleted(child,step)) {
originalDoc.addSkipablePotentStep(child,step) // map of child -> list<step> on doc
}
}
}
// Note, above two loops potentially combined, but shown for clarity
markInProgress(original,original.getAllDescendants()) // [2]
original.lookUpScanner().reprocess(original)
}
} else {
// do plan visualization, reporting, whatever other operation
}
doc = queue.takeDoc()
skipableSteps<child,[step]> = doc.getSkipablePotentSteps()
skipableStepsThisDoc = skipableSteps.get(doc.getId())
if (notEmpty(skipableStepsThisDoc) {
reachablePotentSteps = thisStep.downStreamPotentStepsAndSelfIfPotent()
downStreamStepsToProcess = reachablePotentSteps.removeAll(skipableStepsThisDoc)
if (notEmpty(downStreamStepsToProcess)) {
processDoc(doc)
} else {
skipAlreadyProcessedDoc(doc)
}
writeStatus(doc) // [3]
}
The JesterJ system will give authors of Processor implementations the freedom to create arbitrary child documents, but to have those children properly participate in FTI a few steps will be necessary, and a utility method to facilitate these steps should be added.
Document createChild(parent,step) {
child = new Document()
child.setParentId(parent.getId())
child.setParentVersion(parent.getContentHash())
child.setId(calculateChildId(parent))
child.setDownstreamPotentSteps(step.downStreamPotentStepsAndSelfIfPotent())
return child
}
The following factors contribute to the identity of a document and make it unique.
-
The source URI - this URI must locate the document such that the running instance of JesterJ can use the plan to read it without further configuration. This URI must encode a URL that can be used to retrieve the original source document.
-
The scanner that read the document. Note that it is entirely valid for two scanners to read the same source documents.
-
The plan containing the scanner that read the document so that if a new plan is run, old documents will not interfere.
-
The identity of the original parent document (if child).
-
A childDoc identifier (if child) - by default a monotonically increasing integer in processing order, but also possibly a value from the dat if that value can be assured to be unique among all siblings. Note UUIDs created in processing steps are discouraged unless previously generated due to their impact on performance due to the use of globally synchronized methods and the potential for entropy exhaustion.
Finally, it’s important to note that the identification of child documents must be strictly deterministic such that we never create a child doc with a different id on a subsequent re-process.
There are some persistence requirements that are absolutely required for correct processing of data. This section enumerates the bare minimum that can be stored to ensure proper processing.
The above pseudo-code contains one query (at [1]) and one write (used twice at [2] and [3]). The query needs to pull back data that tells us what documents require reprocessing. The selection criteria in the single node case is fairly simple we want to identify the documents that:
-
Relate to this plan, or a prior version of it
-
Are in a status that is non-terminal
For those documents we need to know:
-
Which POTENT steps have or have not been processed.
-
Which documents are children
-
The top level parent of any child doc
Cassandra is best used with a heavy write, occasional read, rarely update/delete pattern. Status changes for documents can be thought of as events which naturally lend them to a time series data design. Thus we hope to write document status events ordered by timestamp, and when we have a question about a document, we will query the most recent events for that document and inspect the result.
To minimize the footprint of the required data we can limit the data we record with the following high level pseudo code
// when the scanner finds the document
doc = scannerStep.findDocument()
doc.setDownstreamPotentSteps(scannerStep.getPendingPotentStepst())
doc.setSkippableSteps()
doc.recordPotentStepStatus()
// After each step
doc.setPendingPotentSteps(step.getPendingPotentStepst())
if(doc.statusChanged) {
doc.recordPotentStepStatus() // resets statusChanged flag
}
scanner.sendToNext(doc)
---------
// detail for recordPotentStepStatus
void recordPotentStepStatus() {
for(potentSteps:step) {
recordPotentStepStatus(step,getStatus())
}
this.statusUpdated = false;
}
Thus we write a record for the intention to produce an output (when we scan), and write a new event we have processed a step in which the status has changed, but only for potent steps that have not yet been completed and the current step we just completed. Documents to be restarted are the set of distinct parent documents, and steps to be skipped are the records that have not been updated with a terminal status.
We will certainly need to filter on status so we need a secondary index for that:
CREATE INDEX idx_step_status ON jj_potent_step_status (status);
We also want to ignore error counts greater than some value so that will also be a filter
CREATE INDEX idx_error_count ON jj_potent_step_status (status);
To satisfy the above operations, we need find a way to store the following data:
Column | DataType | Description |
---|---|---|
docId |
varchar |
URI identifying the document and corresponding to the location at which it can be found. |
parentId |
vachar |
URI of the immediate parent |
origParentId |
vachar |
URI of the ultimate parent document |
potentStepName |
varchar |
The name of the potent step |
status |
varchar |
possible values: see Status class |
message |
varchar |
Any applicable error or explanatory message. |
docHash |
varchar |
hash of the document content for change detection |
created |
timestamp |
The time the record was created |
To heuristically detect errors we want to identify documents which have consistently erred out. Keeping a count of errors either requires updating the status records
CREATE TABLE jj_potent_step_status (
docId varchar,
planName varchar,
planVersion int,
scannerName varchar
potentStepName varchar,
parentId varchar,
origParentId varchar,
status varchar,
errorCount int,
message varchar,
docHash varchar,
created timestamp,
updated timestamp,
PRIMARY KEY (docId,planName,planVersion,scannerName,potentStepName)
);
CREATE INDEX idx_step_status ON jj_potent_step_status (status);