Skip to content

Commit

Permalink
NIFI-12889: Refactored HDFS processors to use common method for handl…
Browse files Browse the repository at this point in the history
…ing GSS auth exceptions
  • Loading branch information
mattyb149 committed Apr 8, 2024
1 parent fbe6621 commit cccf325
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,12 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
session.remove(flowFile);
}
} catch (IOException e) {
getLogger().error("Error processing delete for flowfile {} due to {}", flowFile, e.getMessage(), e);
session.transfer(flowFile, getFailureRelationship());
if (handleAuthErrors(e, session, context)) {
return null;
} else {
getLogger().error("Error processing delete for flowfile {} due to {}", flowFile, e.getMessage(), e);
session.transfer(flowFile, getFailureRelationship());
}
}

return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,15 +254,10 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
if (!directoryExists) {
throw new IOException("Input Directory or File does not exist in HDFS");
}
} catch (final IOException e) {
if(!handleAuthErrors(e, session, context)) {
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", filenameValue, flowFile, e);
flowFile = session.putAttribute(flowFile, "hdfs.failure.reason", e.getMessage());
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
return;
} catch (Exception e) {
if (handleAuthErrors(e, session, context)) {
return;
}
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", filenameValue, flowFile, e);
flowFile = session.putAttribute(flowFile, "hdfs.failure.reason", e.getMessage());
flowFile = session.penalize(flowFile);
Expand Down Expand Up @@ -360,95 +355,95 @@ protected void processBatchOfFiles(final List<Path> files, final ProcessContext

for (final Path file : files) {

ugi.doAs(new PrivilegedAction<Object>() {
@Override
public Object run() {
FlowFile flowFile = session.create(parentFlowFile);
try {
final String originalFilename = file.getName();
final Path outputDirPath = getNormalizedPath(context, OUTPUT_DIRECTORY, parentFlowFile);
final Path newFile = new Path(outputDirPath, originalFilename);
final boolean destinationExists = hdfs.exists(newFile);
// If destination file already exists, resolve that
// based on processor configuration
if (destinationExists) {
switch (processorConfig.getConflictResolution()) {
case REPLACE_RESOLUTION:
// Remove destination file (newFile) to replace
if (hdfs.delete(newFile, false)) {
getLogger().info("deleted {} in order to replace with the contents of {}",
new Object[]{newFile, flowFile});
}
break;
case IGNORE_RESOLUTION:
session.transfer(flowFile, REL_SUCCESS);
getLogger().info(
"transferring {} to success because file with same name already exists",
new Object[]{flowFile});
return null;
case FAIL_RESOLUTION:
session.transfer(session.penalize(flowFile), REL_FAILURE);
getLogger().warn(
"penalizing {} and routing to failure because file with same name already exists",
new Object[]{flowFile});
return null;
default:
break;
}
ugi.doAs((PrivilegedAction<Object>) () -> {
FlowFile flowFile = session.create(parentFlowFile);
try {
final String originalFilename = file.getName();
final Path outputDirPath = getNormalizedPath(context, OUTPUT_DIRECTORY, parentFlowFile);
final Path newFile = new Path(outputDirPath, originalFilename);
final boolean destinationExists = hdfs.exists(newFile);
// If destination file already exists, resolve that
// based on processor configuration
if (destinationExists) {
switch (processorConfig.getConflictResolution()) {
case REPLACE_RESOLUTION:
// Remove destination file (newFile) to replace
if (hdfs.delete(newFile, false)) {
getLogger().info("deleted {} in order to replace with the contents of {}",
new Object[]{newFile, flowFile});
}
break;
case IGNORE_RESOLUTION:
session.transfer(flowFile, REL_SUCCESS);
getLogger().info(
"transferring {} to success because file with same name already exists",
new Object[]{flowFile});
return null;
case FAIL_RESOLUTION:
session.transfer(session.penalize(flowFile), REL_FAILURE);
getLogger().warn(
"penalizing {} and routing to failure because file with same name already exists",
new Object[]{flowFile});
return null;
default:
break;
}
}

// Create destination directory if it does not exist
try {
if (!hdfs.getFileStatus(outputDirPath).isDirectory()) {
throw new IOException(outputDirPath.toString()
+ " already exists and is not a directory");
}
} catch (FileNotFoundException fe) {
if (!hdfs.mkdirs(outputDirPath)) {
throw new IOException(outputDirPath.toString() + " could not be created");
}
changeOwner(context, hdfs, outputDirPath);
// Create destination directory if it does not exist
try {
if (!hdfs.getFileStatus(outputDirPath).isDirectory()) {
throw new IOException(outputDirPath + " already exists and is not a directory");
}
} catch (FileNotFoundException fe) {
if (!hdfs.mkdirs(outputDirPath)) {
throw new IOException(outputDirPath + " could not be created");
}
changeOwner(context, hdfs, outputDirPath);
}

boolean moved = false;
for (int i = 0; i < 10; i++) { // try to rename multiple
// times.
if (processorConfig.getOperation().equals("move")) {
if (hdfs.rename(file, newFile)) {
moved = true;
break;// rename was successful
}
} else {
if (FileUtil.copy(hdfs, file, hdfs, newFile, false, conf)) {
moved = true;
break;// copy was successful
}
boolean moved = false;
for (int i = 0; i < 10; i++) { // try to rename multiple
// times.
if (processorConfig.getOperation().equals("move")) {
if (hdfs.rename(file, newFile)) {
moved = true;
break;// rename was successful
}
} else {
if (FileUtil.copy(hdfs, file, hdfs, newFile, false, conf)) {
moved = true;
break;// copy was successful
}
Thread.sleep(200L);// try waiting to let whatever might cause rename failure to resolve
}
if (!moved) {
throw new ProcessException("Could not move file " + file + " to its final filename");
}
Thread.sleep(200L);// try waiting to let whatever might cause rename failure to resolve
}
if (!moved) {
throw new ProcessException("Could not move file " + file + " to its final filename");
}

changeOwner(context, hdfs, newFile);
final String outputPath = newFile.toString();
final String newFilename = newFile.getName();
final String hdfsPath = newFile.getParent().toString();
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), newFilename);
flowFile = session.putAttribute(flowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
final Path qualifiedPath = newFile.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory());
flowFile = session.putAttribute(flowFile, HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
final String transitUri = hdfs.getUri() + StringUtils.prependIfMissing(outputPath, "/");
session.getProvenanceReporter().send(flowFile, transitUri);
session.transfer(flowFile, REL_SUCCESS);

} catch (final Throwable t) {
changeOwner(context, hdfs, newFile);
final String outputPath = newFile.toString();
final String newFilename = newFile.getName();
final String hdfsPath = newFile.getParent().toString();
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), newFilename);
flowFile = session.putAttribute(flowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
final Path qualifiedPath = newFile.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory());
flowFile = session.putAttribute(flowFile, HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
final String transitUri = hdfs.getUri() + StringUtils.prependIfMissing(outputPath, "/");
session.getProvenanceReporter().send(flowFile, transitUri);
session.transfer(flowFile, REL_SUCCESS);

} catch (final Throwable t) {
if (handleAuthErrors(t, session, context)) {
return null;
} else {
getLogger().error("Failed to rename on HDFS due to {}", new Object[]{t});
session.transfer(session.penalize(flowFile), REL_FAILURE);
context.yield();
}
return null;
}
return null;
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,8 +391,6 @@ public Object run() {
StreamUtils.copy(bis, fos);
bis = null;
fos.flush();
} catch (IOException e) {
throw new ProcessException(e);
} finally {
try {
if (fos != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ public void testGSSException() throws Exception {
attributes.put("hdfs.file", filePath.toString());
runner.enqueue("foo", attributes);
runner.run();
runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 1);
// GSS Auth exceptions should cause rollback
runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 0);
runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 0);
}

@Test
Expand Down

0 comments on commit cccf325

Please sign in to comment.