Skip to content

Commit

Permalink
NIFI-12889: Additional logic fixes per review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mattyb149 committed Apr 8, 2024
1 parent cd5707f commit 6bb97a5
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processors.hadoop.util.GSSExceptionRollbackYieldSessionHandler;
import org.apache.nifi.processors.hadoop.util.SequenceFileReader;
import org.apache.nifi.util.StopWatch;

Expand Down Expand Up @@ -110,9 +109,9 @@ protected void processBatchOfFiles(final List<Path> files, final ProcessContext
logger.warn("Unable to delete path " + file.toString() + " from HDFS. Will likely be picked up over and over...");
}
} catch (Throwable t) {
if (!handleAuthErrors(t, session, context, new GSSExceptionRollbackYieldSessionHandler())) {
logger.error("Error retrieving file {} from HDFS due to {}", file, t);
}
logger.error("Error retrieving file {} from HDFS due to {}", file, t);
session.rollback();
context.yield();
} finally {
stopWatch.stop();
long totalSize = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.hadoop.util.GSSExceptionRollbackYieldSessionHandler;
import org.apache.nifi.util.StopWatch;
import org.ietf.jgss.GSSException;

import java.io.FileNotFoundException;
import java.io.IOException;
Expand All @@ -59,6 +60,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -440,11 +442,13 @@ protected void processBatchOfFiles(final List<Path> files, final ProcessContext
session.transfer(flowFile, REL_SUCCESS);

} catch (final Throwable t) {
if (!handleAuthErrors(t, session, context, new GSSExceptionRollbackYieldSessionHandler())) {
getLogger().error("Failed to rename on HDFS due to {}", new Object[]{t});
session.transfer(session.penalize(flowFile), REL_FAILURE);
context.yield();
final Optional<GSSException> causeOptional = findCause(t, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor());
if (causeOptional.isPresent()) {
throw new UncheckedIOException(new IOException(causeOptional.get()));
}
getLogger().error("Failed to rename on HDFS due to {}", new Object[]{t});
session.transfer(session.penalize(flowFile), REL_FAILURE);
context.yield();
}
return null;
});
Expand Down

0 comments on commit 6bb97a5

Please sign in to comment.