Skip to content

Commit

Permalink
NIFI-12889: Additional logic fixes and refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
mattyb149 committed Apr 5, 2024
1 parent 39bc646 commit 0436b85
Show file tree
Hide file tree
Showing 10 changed files with 79 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Stream;
Expand Down Expand Up @@ -719,7 +720,7 @@ protected <T extends Throwable> Optional<T> findCause(Throwable t, Class<T> expe
.findFirst();
}

protected boolean handleAuthErrors(Throwable t, ProcessSession session, ProcessContext context) {
protected boolean handleAuthErrors(Throwable t, ProcessSession session, ProcessContext context, BiConsumer<ProcessSession, ProcessContext> sessionHandler) {
Optional<GSSException> causeOptional = findCause(t, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor());
if (causeOptional.isPresent()) {

Expand All @@ -729,8 +730,7 @@ protected boolean handleAuthErrors(Throwable t, ProcessSession session, ProcessC
} catch (IOException ioe) {
getLogger().error("An error occurred resetting HDFS resources, you may need to restart the processor.");
}
session.rollback(false);
context.yield();
sessionHandler.accept(session, context);
return true;
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.hadoop.util.GSSExceptionRollbackYieldSessionHandler;

import java.io.IOException;
import java.security.PrivilegedAction;
Expand Down Expand Up @@ -177,7 +178,7 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
flowFile = session.putAttribute(flowFile, HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
session.getProvenanceReporter().invokeRemoteProcess(flowFile, qualifiedPath.toString());
} catch (IOException ioe) {
if (handleAuthErrors(ioe, session, context)) {
if (handleAuthErrors(ioe, session, context, new GSSExceptionRollbackYieldSessionHandler())) {
return null;
} else {
// One possible scenario is that the IOException is permissions based, however it would be impractical to check every possible
Expand All @@ -202,7 +203,7 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
session.remove(flowFile);
}
} catch (IOException e) {
if (handleAuthErrors(e, session, context)) {
if (handleAuthErrors(e, session, context, new GSSExceptionRollbackYieldSessionHandler())) {
return null;
} else {
getLogger().error("Error processing delete for flowfile {} due to {}", flowFile, e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.hadoop.util.GSSExceptionRollbackYieldSessionHandler;
import org.apache.nifi.util.StopWatch;

import java.io.FileNotFoundException;
Expand Down Expand Up @@ -155,7 +156,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
codec = getCompressionCodec(context, getConfiguration());
}

FlowFile flowFile1 = finalFlowFile;
FlowFile outgoingFlowFile = finalFlowFile;
final Path qualifiedPath = path.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory());
try {
final String outputFilename;
Expand All @@ -170,27 +171,25 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
outputFilename = originalFilename;
}

flowFile1 = session.importFrom(stream, finalFlowFile);
flowFile1 = session.putAttribute(flowFile1, CoreAttributes.FILENAME.key(), outputFilename);
outgoingFlowFile = session.importFrom(stream, finalFlowFile);
outgoingFlowFile = session.putAttribute(outgoingFlowFile, CoreAttributes.FILENAME.key(), outputFilename);

stopWatch.stop();
getLogger().info("Successfully received content from {} for {} in {}", new Object[]{qualifiedPath, flowFile1, stopWatch.getDuration()});
flowFile1 = session.putAttribute(flowFile1, HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
session.getProvenanceReporter().fetch(flowFile1, qualifiedPath.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
session.transfer(flowFile1, getSuccessRelationship());
getLogger().info("Successfully received content from {} for {} in {}", new Object[]{qualifiedPath, outgoingFlowFile, stopWatch.getDuration()});
outgoingFlowFile = session.putAttribute(outgoingFlowFile, HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
session.getProvenanceReporter().fetch(outgoingFlowFile, qualifiedPath.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
session.transfer(outgoingFlowFile, getSuccessRelationship());
} catch (final FileNotFoundException | AccessControlException e) {
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[]{qualifiedPath, flowFile1, e});
flowFile1 = session.putAttribute(flowFile1, getAttributePrefix() + ".failure.reason", e.getMessage());
flowFile1 = session.penalize(flowFile1);
session.transfer(flowFile1, getFailureRelationship());
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[]{qualifiedPath, outgoingFlowFile, e});
outgoingFlowFile = session.putAttribute(outgoingFlowFile, getAttributePrefix() + ".failure.reason", e.getMessage());
outgoingFlowFile = session.penalize(outgoingFlowFile);
session.transfer(outgoingFlowFile, getFailureRelationship());
} catch (final IOException e) {
if (handleAuthErrors(e, session, context)) {
return null;
if (!handleAuthErrors(e, session, context, new GSSExceptionRollbackYieldSessionHandler())) {
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to comms.failure", qualifiedPath, outgoingFlowFile, e);
outgoingFlowFile = session.penalize(outgoingFlowFile);
session.transfer(outgoingFlowFile, getCommsFailureRelationship());
}
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to comms.failure", qualifiedPath, flowFile1, e);
flowFile1 = session.penalize(flowFile1);
session.transfer(flowFile1, getCommsFailureRelationship());

} finally {
IOUtils.closeQuietly(stream);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.hadoop.util.GSSExceptionRollbackYieldSessionHandler;
import org.apache.nifi.util.StopWatch;

import java.io.IOException;
Expand Down Expand Up @@ -298,7 +299,7 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
}
}
} catch (IOException e) {
handleAuthErrors(e, session, context);
handleAuthErrors(e, session, context, new GSSExceptionRollbackYieldSessionHandler());
getLogger().warn("Error while retrieving list of files due to {}", e.getMessage(), e);
return;
} catch (InterruptedException e) {
Expand Down Expand Up @@ -396,12 +397,12 @@ protected void processBatchOfFiles(final List<Path> files, final ProcessContext
session.getProvenanceReporter().receive(flowFile, file.toString());
session.transfer(flowFile, REL_SUCCESS);
getLogger().info("retrieved {} from HDFS {} in {} milliseconds at a rate of {}", flowFile, file, millis, dataRate);
} catch (final IOException e) {
handleAuthErrors(e, session, context);
} catch (final Throwable t) {
getLogger().error("Error retrieving file {} from HDFS due to {}", file, t);
session.rollback();
context.yield();
if (!handleAuthErrors(t, session, context, new GSSExceptionRollbackYieldSessionHandler())) {
getLogger().error("Error retrieving file {} from HDFS due to {}", file, t);
session.rollback();
context.yield();
}
} finally {
IOUtils.closeQuietly(stream);
stream = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.hadoop.util.GSSExceptionRollbackYieldSessionHandler;

import static org.apache.nifi.processors.hadoop.GetHDFSFileInfo.HDFSFileInfoRequest.Grouping.ALL;
import static org.apache.nifi.processors.hadoop.GetHDFSFileInfo.HDFSFileInfoRequest.Grouping.DIR;
Expand Down Expand Up @@ -326,17 +327,13 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
getLogger().error("Interrupted while performing listing of HDFS", e);
ff = session.putAttribute(ff, "hdfs.status", "Failed due to: " + e);
session.transfer(ff, REL_FAILURE);
} catch (final IOException e) {
} catch (final Exception e) {
// Catch GSSExceptions and reset the resources
if (!handleAuthErrors(e, session, context)) {
getLogger().error("Interrupted while performing listing of HDFS", e);
if (!handleAuthErrors(e, session, context, new GSSExceptionRollbackYieldSessionHandler())) {
getLogger().error("Failed to perform listing of HDFS due to {}", new Object[]{e});
ff = session.putAttribute(ff, "hdfs.status", "Failed due to: " + e);
session.transfer(ff, REL_FAILURE);
}
} catch (final Exception e) {
getLogger().error("Failed to perform listing of HDFS due to {}", new Object[]{e});
ff = session.putAttribute(ff, "hdfs.status", "Failed due to: " + e);
session.transfer(ff, REL_FAILURE);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processors.hadoop.util.GSSExceptionRollbackYieldSessionHandler;
import org.apache.nifi.processors.hadoop.util.SequenceFileReader;
import org.apache.nifi.util.StopWatch;

import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -109,12 +109,10 @@ protected void processBatchOfFiles(final List<Path> files, final ProcessContext
if (!keepSourceFiles && !hdfs.delete(file, false)) {
logger.warn("Unable to delete path " + file.toString() + " from HDFS. Will likely be picked up over and over...");
}
} catch (final IOException e) {
handleAuthErrors(e, session, context);
} catch (Throwable t) {
logger.error("Error retrieving file {} from HDFS due to {}", new Object[]{file, t});
session.rollback();
context.yield();
if (!handleAuthErrors(t, session, context, new GSSExceptionRollbackYieldSessionHandler())) {
logger.error("Error retrieving file {} from HDFS due to {}", file, t);
}
} finally {
stopWatch.stop();
long totalSize = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.hadoop.util.GSSExceptionRollbackYieldSessionHandler;
import org.apache.nifi.util.StopWatch;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
Expand Down Expand Up @@ -255,7 +257,7 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
throw new IOException("Input Directory or File does not exist in HDFS");
}
} catch (Exception e) {
if (handleAuthErrors(e, session, context)) {
if (handleAuthErrors(e, session, context, new GSSExceptionRollbackYieldSessionHandler())) {
return;
}
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", filenameValue, flowFile, e);
Expand Down Expand Up @@ -325,16 +327,19 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
queueLock.unlock();
}

processBatchOfFiles(files, context, session, flowFile);
try {
processBatchOfFiles(files, context, session, flowFile);
session.remove(flowFile);
} catch (UncheckedIOException e) {
handleAuthErrors(e, session, context, new GSSExceptionRollbackYieldSessionHandler());
}

queueLock.lock();
try {
processing.removeAll(files);
} finally {
queueLock.unlock();
}

session.remove(flowFile);
}

protected void processBatchOfFiles(final List<Path> files, final ProcessContext context,
Expand Down Expand Up @@ -435,9 +440,7 @@ protected void processBatchOfFiles(final List<Path> files, final ProcessContext
session.transfer(flowFile, REL_SUCCESS);

} catch (final Throwable t) {
if (handleAuthErrors(t, session, context)) {
return null;
} else {
if (!handleAuthErrors(t, session, context, new GSSExceptionRollbackYieldSessionHandler())) {
getLogger().error("Failed to rename on HDFS due to {}", new Object[]{t});
session.transfer(session.penalize(flowFile), REL_FAILURE);
context.yield();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.hadoop.util.GSSExceptionRollbackYieldSessionHandler;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;

Expand Down Expand Up @@ -451,7 +452,7 @@ public Object run() {
session.transfer(putFlowFile, getSuccessRelationship());

} catch (final Throwable t) {
if (handleAuthErrors(t, session, context)) {
if (handleAuthErrors(t, session, context, new GSSExceptionRollbackYieldSessionHandler())) {
return null;
}
if (tempDotCopyFile != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.hadoop.util;

import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;

import java.util.function.BiConsumer;

public class GSSExceptionRollbackYieldSessionHandler implements BiConsumer<ProcessSession, ProcessContext> {
@Override
public void accept(ProcessSession processSession, ProcessContext processContext) {
processSession.rollback();
processContext.yield();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.hadoop.util.MockFileSystem;
Expand Down Expand Up @@ -56,7 +55,6 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -352,15 +350,6 @@ public FileStatus getFileStatus(Path path) throws IOException {
// assert no flowfiles transferred to outgoing relationships
runner.assertTransferCount(PutHDFS.REL_SUCCESS, 0);
runner.assertTransferCount(PutHDFS.REL_FAILURE, 0);
// assert the processor's queue is not empty
assertFalse(runner.isQueueEmpty());
assertEquals(1, runner.getQueueSize().getObjectCount());
// assert the input file is back on the queue
ProcessSession session = runner.getProcessSessionFactory().createSession();
FlowFile queuedFlowFile = session.get();
assertNotNull(queuedFlowFile);
assertEquals("randombytes-1", queuedFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
session.rollback();
}

@Test
Expand Down

0 comments on commit 0436b85

Please sign in to comment.