Skip to content

Commit

Permalink
NIFI-7643 Removed absolute.path attribute from UnpackContent
Browse files Browse the repository at this point in the history
- Do not include the absolute.path attribute from Zip/Tar files in UnpackContent; some code cleanup

This closes #7902

Signed-off-by: David Handermann <exceptionfactory@apache.org>
  • Loading branch information
markap14 authored and exceptionfactory committed Oct 19, 2023
1 parent 8bfb6be commit 015b721
Showing 1 changed file with 47 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
Expand All @@ -68,9 +67,7 @@
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -187,29 +184,24 @@ public class UnpackContent extends AbstractProcessor {
.description("The original FlowFile is sent to this relationship when it cannot be unpacked for some reason")
.build();

private Set<Relationship> relationships;
private List<PropertyDescriptor> properties;
private static final Set<Relationship> relationships = Set.of(
REL_SUCCESS,
REL_FAILURE,
REL_ORIGINAL
);

private static final List<PropertyDescriptor> properties = List.of(
PACKAGING_FORMAT,
FILE_FILTER,
PASSWORD,
ALLOW_STORED_ENTRIES_WITH_DATA_DESCRIPTOR
);

private Pattern fileFilter;

private Unpacker tarUnpacker;
private Unpacker zipUnpacker;

@Override
protected void init(final ProcessorInitializationContext context) {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_ORIGINAL);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);

final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(PACKAGING_FORMAT);
properties.add(FILE_FILTER);
properties.add(PASSWORD);
properties.add(ALLOW_STORED_ENTRIES_WITH_DATA_DESCRIPTOR);
this.properties = Collections.unmodifiableList(properties);
}

@Override
public Set<Relationship> getRelationships() {
Expand Down Expand Up @@ -277,31 +269,31 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
final Unpacker unpacker;
final boolean addFragmentAttrs;
switch (packagingFormat) {
case TAR_FORMAT:
case X_TAR_FORMAT:
unpacker = tarUnpacker;
addFragmentAttrs = true;
break;
case ZIP_FORMAT:
unpacker = zipUnpacker;
addFragmentAttrs = true;
break;
case FLOWFILE_STREAM_FORMAT_V2:
unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV2());
addFragmentAttrs = false;
break;
case FLOWFILE_STREAM_FORMAT_V3:
unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV3());
addFragmentAttrs = false;
break;
case FLOWFILE_TAR_FORMAT:
unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV1());
addFragmentAttrs = false;
break;
case AUTO_DETECT_FORMAT:
default:
// The format of the unpacker should be known before initialization
throw new ProcessException(packagingFormat + " is not a valid packaging format");
case TAR_FORMAT:
case X_TAR_FORMAT:
unpacker = tarUnpacker;
addFragmentAttrs = true;
break;
case ZIP_FORMAT:
unpacker = zipUnpacker;
addFragmentAttrs = true;
break;
case FLOWFILE_STREAM_FORMAT_V2:
unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV2());
addFragmentAttrs = false;
break;
case FLOWFILE_STREAM_FORMAT_V3:
unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV3());
addFragmentAttrs = false;
break;
case FLOWFILE_TAR_FORMAT:
unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV1());
addFragmentAttrs = false;
break;
case AUTO_DETECT_FORMAT:
default:
// The format of the unpacker should be known before initialization
throw new ProcessException(packagingFormat + " is not a valid packaging format");
}

final List<FlowFile> unpacked = new ArrayList<>();
Expand Down Expand Up @@ -368,15 +360,12 @@ public void unpack(final ProcessSession session, final FlowFile source, final Li
final File file = new File(tarEntry.getName());
final Path filePath = file.toPath();
String filePathString = filePath.getParent() == null ? "/" : filePath.getParent() + "/";
final Path absPath = filePath.toAbsolutePath();
final String absPathString = absPath.getParent().toString() + "/";

FlowFile unpackedFile = session.create(source);
try {
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.FILENAME.key(), file.getName());
attributes.put(CoreAttributes.PATH.key(), filePathString);
attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM);

attributes.put(FILE_PERMISSIONS_ATTRIBUTE, FileInfo.permissionToString(tarEntry.getMode()));
Expand Down Expand Up @@ -466,7 +455,6 @@ protected void processEntry(final InputStream zipInputStream, final boolean dire
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.FILENAME.key(), file.getName());
attributes.put(CoreAttributes.PATH.key(), parentDirectory);
attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), file.toPath().toAbsolutePath() + PATH_SEPARATOR);
attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM);
attributes.put(FILE_ENCRYPTION_METHOD_ATTRIBUTE, encryptionMethod.toString());

Expand All @@ -484,7 +472,7 @@ protected void processEntry(final InputStream zipInputStream, final boolean dire

private static class CompressedZipInputStreamCallback extends ZipInputStreamCallback {

private boolean allowStoredEntriesWithDataDescriptor;
private final boolean allowStoredEntriesWithDataDescriptor;

private CompressedZipInputStreamCallback(
final Pattern fileFilter,
Expand Down Expand Up @@ -583,11 +571,6 @@ public void unpack(final ProcessSession session, final FlowFile source, final Li
}
}

private static void mapAttributes(final Map<String, String> attributes, final String oldKey, final String newKey) {
if (!attributes.containsKey(newKey) && attributes.containsKey(oldKey)) {
attributes.put(newKey, attributes.get(oldKey));
}
}

private void finishFragmentAttributes(final ProcessSession session, final FlowFile source, final List<FlowFile> unpacked) {
// first pass verifies all FlowFiles have the FRAGMENT_INDEX attribute and gets the total number of fragments
Expand Down Expand Up @@ -649,21 +632,15 @@ public String getMimeType() {
}

public static PackageFormat getFormat(String textValue) {
switch (textValue) {
case AUTO_DETECT_FORMAT_NAME:
return AUTO_DETECT_FORMAT;
case TAR_FORMAT_NAME:
return TAR_FORMAT;
case ZIP_FORMAT_NAME:
return ZIP_FORMAT;
case FLOWFILE_STREAM_FORMAT_V3_NAME:
return FLOWFILE_STREAM_FORMAT_V3;
case FLOWFILE_STREAM_FORMAT_V2_NAME:
return FLOWFILE_STREAM_FORMAT_V2;
case FLOWFILE_TAR_FORMAT_NAME:
return FLOWFILE_TAR_FORMAT;
}
return null;
return switch (textValue) {
case AUTO_DETECT_FORMAT_NAME -> AUTO_DETECT_FORMAT;
case TAR_FORMAT_NAME -> TAR_FORMAT;
case ZIP_FORMAT_NAME -> ZIP_FORMAT;
case FLOWFILE_STREAM_FORMAT_V3_NAME -> FLOWFILE_STREAM_FORMAT_V3;
case FLOWFILE_STREAM_FORMAT_V2_NAME -> FLOWFILE_STREAM_FORMAT_V2;
case FLOWFILE_TAR_FORMAT_NAME -> FLOWFILE_TAR_FORMAT;
default -> null;
};
}
}
}

0 comments on commit 015b721

Please sign in to comment.