Skip to content

Commit

Permalink
NIFI-12160 Kafka Connect: Check for NAR unpacking before starting
Browse files Browse the repository at this point in the history
Check that required NAR files are unpacked completely before starting the Kafka Connector

This closes #7832

Signed-off-by: David Handermann <exceptionfactory@apache.org>
  • Loading branch information
pgyori authored and exceptionfactory committed Oct 13, 2023
1 parent 0a47157 commit b2e3898
Show file tree
Hide file tree
Showing 3 changed files with 329 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.kafka.connect;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.File;
import java.io.IOException;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.io.CleanupMode.ALWAYS;

public class WorkingDirectoryUtilsTest {

@Test
public void testDeleteNonexistentFile(@TempDir(cleanup = ALWAYS) File tempDir) {
File nonexistentFile = new File(tempDir, "testFile");

WorkingDirectoryUtils.purgeDirectory(nonexistentFile);

assertFalse(nonexistentFile.exists());
}

@Test
public void testDeleteFlatFile(@TempDir(cleanup = ALWAYS) File tempDir) throws IOException {
File file = new File(tempDir, "testFile");
file.createNewFile();

WorkingDirectoryUtils.purgeDirectory(file);

assertFalse(file.exists());
}

@Test
public void testDeleteDirectoryWithContents(@TempDir(cleanup = ALWAYS) File tempDir) throws IOException {
File directory = new File(tempDir, "directory");
File subDirectory = new File(directory, "subDirectory");
File subDirectoryContent = new File(subDirectory, "subDirectoryContent");
File directoryContent = new File(directory, "directoryContent");

directory.mkdir();
subDirectory.mkdir();
subDirectoryContent.createNewFile();
directoryContent.createNewFile();

WorkingDirectoryUtils.purgeDirectory(directory);

assertFalse(directory.exists());
}

@Test
public void testPurgeUnpackedNarsEmptyRootDirectory(@TempDir(cleanup = ALWAYS) File tempDir) {
File rootDirectory = new File(tempDir, "rootDirectory");

rootDirectory.mkdir();

WorkingDirectoryUtils.purgeIncompleteUnpackedNars(rootDirectory);

assertTrue(rootDirectory.exists());
}

@Test
public void testPurgeUnpackedNarsRootDirectoryWithFilesOnly(@TempDir(cleanup = ALWAYS) File tempDir) throws IOException {
File rootDirectory = new File(tempDir, "rootDirectory");
File directoryContent1 = new File(rootDirectory, "file1");
File directoryContent2 = new File(rootDirectory, "file2");

rootDirectory.mkdir();
directoryContent1.createNewFile();
directoryContent2.createNewFile();

WorkingDirectoryUtils.purgeIncompleteUnpackedNars(rootDirectory);

assertTrue(rootDirectory.exists() && directoryContent1.exists() && directoryContent2.exists());
}

@Test
public void testPurgeUnpackedNars(@TempDir(cleanup = ALWAYS) File tempDir) throws IOException {
File rootDirectory = new File(tempDir, "rootDirectory");
rootDirectory.mkdir();
TestDirectoryStructure testDirectoryStructure = new TestDirectoryStructure(rootDirectory);

WorkingDirectoryUtils.purgeIncompleteUnpackedNars(testDirectoryStructure.getRootDirectory());

assertTrue(testDirectoryStructure.isConsistent());
}

@Test
public void testWorkingDirectoryIntegrityRestored(@TempDir(cleanup = ALWAYS) File tempDir) throws IOException {
/*
workingDirectory
- nar
- extensions
- *TestDirectoryStructure*
- narDirectory
- narFile
- extensions
- *TestDirectoryStructure*
- additionalDirectory
- workingDirectoryFile
*/
File workingDirectory = new File(tempDir, "workingDirectory");
File nar = new File(workingDirectory, "nar");
File narExtensions = new File(nar, "extensions");
File narDirectory = new File(nar, "narDirectory");
File narFile = new File(nar, "narFile");
File extensions = new File(workingDirectory, "extensions");
File additionalDirectory = new File(workingDirectory, "additionalDirectory");
File workingDirectoryFile = new File(workingDirectory, "workingDirectoryFile");

workingDirectory.mkdir();
nar.mkdir();
narExtensions.mkdir();
narDirectory.mkdir();
narFile.createNewFile();
extensions.mkdir();
additionalDirectory.mkdir();
workingDirectoryFile.createNewFile();

TestDirectoryStructure narExtensionsStructure = new TestDirectoryStructure(narExtensions);
TestDirectoryStructure extensionsStructure = new TestDirectoryStructure(extensions);

WorkingDirectoryUtils.reconcileWorkingDirectory(workingDirectory);

assertTrue(workingDirectory.exists()
&& nar.exists()
&& narExtensionsStructure.isConsistent()
&& narDirectory.exists()
&& narFile.exists()
&& extensionsStructure.isConsistent()
&& additionalDirectory.exists()
&& workingDirectoryFile.exists()
);
}

private class TestDirectoryStructure {
/*
rootDirectory
- subDirectory1-nar-unpacked
- subDirectory1File1
- nar-digest
- subDirectory2
- subDirectory2File1
- subDirectory3-nar-unpacked
- subDirectory3Dir1
- subDirectory3Dir1File1
- subDirectory3File1
- fileInRoot
*/
File rootDirectory;
File subDirectory1;
File subDirectory2;
File subDirectory3;
File fileInRoot;
File subDirectory1File1;
File subDirectory1File2;
File subDirectory2File1;
File subDirectory3Dir1;
File subDirectory3File1;
File subDirectory3Dir1File1;

public TestDirectoryStructure(final File rootDirectory) throws IOException {
this.rootDirectory = rootDirectory;
subDirectory1 = new File(rootDirectory, "subDirectory1-" + WorkingDirectoryUtils.NAR_UNPACKED_SUFFIX);
subDirectory2 = new File(rootDirectory, "subDirector2");
subDirectory3 = new File(rootDirectory, "subDirector3-" + WorkingDirectoryUtils.NAR_UNPACKED_SUFFIX);
fileInRoot = new File(rootDirectory, "fileInRoot");
subDirectory1File1 = new File(subDirectory1, "subDirectory1File1");
subDirectory1File2 = new File(subDirectory1, WorkingDirectoryUtils.HASH_FILENAME);
subDirectory2File1 = new File(subDirectory2, "subDirectory2File1");
subDirectory3Dir1 = new File(subDirectory3, "subDirectory3Dir1");
subDirectory3File1 = new File(subDirectory3, "subDirectory3File1");
subDirectory3Dir1File1 = new File(subDirectory3Dir1, "subDirectory3Dir1File1");

subDirectory1.mkdir();
subDirectory2.mkdir();
subDirectory3.mkdir();
fileInRoot.createNewFile();
subDirectory1File1.createNewFile();
subDirectory1File2.createNewFile();
subDirectory2File1.createNewFile();
subDirectory3File1.createNewFile();
subDirectory3Dir1.mkdir();
subDirectory3Dir1File1.createNewFile();
}

public File getRootDirectory() {
return rootDirectory;
}

/**
* Checks if all directories ending in 'nar-unpacked' that have a file named 'nar-digest' within still exist,
* and the directory ending in 'nar-unpacked' without 'nar-digest' has been removed with all of its contents.
* @return true if the above is met.
*/
public boolean isConsistent() {
return (rootDirectory.exists()
&& subDirectory1.exists() && subDirectory1File1.exists() && subDirectory1File2.exists()
&& subDirectory2.exists() && subDirectory2File1.exists()
&& !(subDirectory3.exists() || subDirectory3Dir1.exists() || subDirectory3File1.exists() || subDirectory3Dir1File1.exists())
&& fileInRoot.exists());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,24 @@ public static StatelessDataflow createDataflow(final StatelessNiFiCommonConfig c
config.setFlowDefinition(dataflowDefinitionProperties);
dataflowDefinitionProperties.put(StatelessNiFiCommonConfig.BOOTSTRAP_FLOW_NAME, dataflowName);
MDC.setContextMap(Collections.singletonMap("dataflow", dataflowName));
StatelessDataflow dataflow;

// Use a Write Lock to ensure that only a single thread is calling StatelessBootstrap.bootstrap().
// We do this because the bootstrap() method will expand all NAR files into the working directory.
// If we have multiple Connector instances, or multiple tasks, we don't want several threads all
// unpacking NARs at the same time, as it could potentially result in the working directory becoming corrupted.
unpackNarLock.lock();
try {
WorkingDirectoryUtils.reconcileWorkingDirectory(engineConfiguration.getWorkingDirectory());

bootstrap = StatelessBootstrap.bootstrap(engineConfiguration, StatelessNiFiSourceTask.class.getClassLoader());

dataflowDefinition = bootstrap.parseDataflowDefinition(dataflowDefinitionProperties, parameterOverrides);
dataflow = bootstrap.createDataflow(dataflowDefinition);
} finally {
unpackNarLock.unlock();
}

dataflowDefinition = bootstrap.parseDataflowDefinition(dataflowDefinitionProperties, parameterOverrides);
return bootstrap.createDataflow(dataflowDefinition);
return dataflow;
} catch (final Exception e) {
throw new RuntimeException("Failed to bootstrap Stateless NiFi Engine", e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.kafka.connect;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.Arrays;

public class WorkingDirectoryUtils {

protected static final String NAR_UNPACKED_SUFFIX = "nar-unpacked";
protected static final String HASH_FILENAME = "nar-digest";
private static final Logger logger = LoggerFactory.getLogger(WorkingDirectoryUtils.class);

/**
* Goes through the nar/extensions and extensions directories within the working directory
* and deletes every directory whose name ends in "nar-unpacked" and does not have a
* "nar-digest" file in it.
* @param workingDirectory File object pointing to the working directory.
*/
public static void reconcileWorkingDirectory(final File workingDirectory) {
purgeIncompleteUnpackedNars(new File(new File(workingDirectory, "nar"), "extensions"));
purgeIncompleteUnpackedNars(new File(workingDirectory, "extensions"));
}

/**
* Receives a directory as parameter and goes through every directory within it that ends in
* "nar-unpacked". If a directory ending in "nar-unpacked" does not have a file named
* "nar-digest" within it, it gets deleted with all of its contents.
* @param directory A File object pointing to the directory that is supposed to contain
* further directories whose name ends in "nar-unpacked".
*/
public static void purgeIncompleteUnpackedNars(final File directory) {
final File[] unpackedDirs = directory.listFiles(file -> file.isDirectory() && file.getName().endsWith(NAR_UNPACKED_SUFFIX));
if (unpackedDirs == null || unpackedDirs.length == 0) {
logger.debug("Found no unpacked NARs in {}", directory);
if (logger.isDebugEnabled()) {
logger.debug("Directory contains: {}", Arrays.deepToString(directory.listFiles()));
}
return;
}

for (final File unpackedDir : unpackedDirs) {
final File narHashFile = new File(unpackedDir, HASH_FILENAME);
if (narHashFile.exists()) {
logger.debug("Already successfully unpacked {}", unpackedDir);
} else {
purgeDirectory(unpackedDir);
}
}
}

/**
* Delete a directory with all of its contents.
* @param directory The directory to be deleted.
*/
public static void purgeDirectory(final File directory) {
if (directory.exists()) {
deleteRecursively(directory);
logger.debug("Cleaned up {}", directory);
}
}

private static void deleteRecursively(final File fileOrDirectory) {
if (fileOrDirectory.isDirectory()) {
final File[] files = fileOrDirectory.listFiles();
if (files != null) {
for (final File file : files) {
deleteRecursively(file);
}
}
}
deleteQuietly(fileOrDirectory);
}

private static void deleteQuietly(final File file) {
final boolean deleted = file.delete();
if (!deleted) {
logger.debug("Failed to cleanup temporary file {}", file);
}
}

}

0 comments on commit b2e3898

Please sign in to comment.