From 7e9f6ee458a6f4553e4aa57c7e22e1a23bcb258c Mon Sep 17 00:00:00 2001 From: Thomas Muller Date: Wed, 3 Apr 2024 19:40:47 +0200 Subject: [PATCH] Fix param naming issues, improve pipe function with optional buffering --- .../AbstractContainerRuntime.kt | 4 +-- .../easycontainers/GenericContainer.kt | 4 +-- .../easycontainers/kubernetes/K8sRuntime.kt | 14 ++++---- .../acntech/easycontainers/model/Container.kt | 4 +-- .../easycontainers/util/io/FileUtils.kt | 24 ++++++++++---- .../easycontainers/util/io/Functions.kt | 33 +++++++++++++++---- 6 files changed, 58 insertions(+), 25 deletions(-) diff --git a/src/main/kotlin/no/acntech/easycontainers/AbstractContainerRuntime.kt b/src/main/kotlin/no/acntech/easycontainers/AbstractContainerRuntime.kt index 305a7837..7159de4d 100644 --- a/src/main/kotlin/no/acntech/easycontainers/AbstractContainerRuntime.kt +++ b/src/main/kotlin/no/acntech/easycontainers/AbstractContainerRuntime.kt @@ -153,12 +153,12 @@ abstract class AbstractContainerRuntime( * Retrieves the directory from the remote Unix system and saves it to the local file system. * * @param remoteDir The remote directory path to retrieve. - * @param localPath The local path where the directory should be saved. Defaults to a temporary directory. + * @param localDir The local path where the directory should be saved. Defaults to a temporary directory. * @return A pair containing the local path where the directory was saved and a list of paths for each file in the directory. */ internal abstract fun getDirectory( remoteDir: UnixDir, - localPath: Path = Files.createTempDirectory("container-download-tar").toAbsolutePath(), + localDir: Path = Files.createTempDirectory("container-download-tar").toAbsolutePath(), ): Pair> /** diff --git a/src/main/kotlin/no/acntech/easycontainers/GenericContainer.kt b/src/main/kotlin/no/acntech/easycontainers/GenericContainer.kt index 278e7768..5f07b030 100644 --- a/src/main/kotlin/no/acntech/easycontainers/GenericContainer.kt +++ b/src/main/kotlin/no/acntech/easycontainers/GenericContainer.kt @@ -188,8 +188,8 @@ open class GenericContainer( return runtime.putDirectory(localDir, remoteDir) } - override fun getDirectory(remoteDir: UnixDir, localPath: Path): Pair> { - return runtime.getDirectory(remoteDir, localPath) + override fun getDirectory(remoteDir: UnixDir, localDir: Path): Pair> { + return runtime.getDirectory(remoteDir, localDir) } @Synchronized diff --git a/src/main/kotlin/no/acntech/easycontainers/kubernetes/K8sRuntime.kt b/src/main/kotlin/no/acntech/easycontainers/kubernetes/K8sRuntime.kt index f2f0d806..3e9cad6d 100644 --- a/src/main/kotlin/no/acntech/easycontainers/kubernetes/K8sRuntime.kt +++ b/src/main/kotlin/no/acntech/easycontainers/kubernetes/K8sRuntime.kt @@ -285,8 +285,8 @@ abstract class K8sRuntime( return countingInput.bytesRead } - override fun getDirectory(remoteDir: UnixDir, localPath: Path): Pair> { - require(localPath.exists() && Files.isDirectory(localPath)) { "The provided path is not a directory." } + override fun getDirectory(remoteDir: UnixDir, localDir: Path): Pair> { + require(localDir.exists() && Files.isDirectory(localDir)) { "The provided path is not a directory." } // Define the remote directory path val remotePath = remoteDir.unwrap() @@ -317,19 +317,19 @@ abstract class K8sRuntime( if ((exitCode != null && exitCode != 0) || stdErr != null) { val msg = - "Transferring directory '$remotePath' to local path '$localPath' failed with code $exitCode and error msg: $stdErr" + "Transferring directory '$remotePath' to local path '$localDir' failed with code $exitCode and error msg: $stdErr" log.error(msg) throw ContainerException(msg) } // Create the local directory if it doesn't exist - if (!Files.exists(localPath)) { - Files.createDirectories(localPath).also { - log.debug("Untar prep - creating directory: $localPath") + if (!Files.exists(localDir)) { + Files.createDirectories(localDir).also { + log.debug("Untar prep - creating directory: $localDir") } } - return FileUtils.untar(pipedInput, localPath) + return FileUtils.untar(pipedInput, localDir) } override fun getDuration(): Duration? { diff --git a/src/main/kotlin/no/acntech/easycontainers/model/Container.kt b/src/main/kotlin/no/acntech/easycontainers/model/Container.kt index 2c35b9cd..4ea098f8 100644 --- a/src/main/kotlin/no/acntech/easycontainers/model/Container.kt +++ b/src/main/kotlin/no/acntech/easycontainers/model/Container.kt @@ -219,11 +219,11 @@ interface Container { * Downloads a directory from the container. * * @param remoteDir the path of the directory to download - * @param localPath the path where the directory will be downloaded to + * @param localDir the path where the directory will be downloaded to */ fun getDirectory( remoteDir: UnixDir, - localPath: Path = Files.createTempDirectory("container-download-tar").toAbsolutePath(), + localDir: Path = Files.createTempDirectory("container-download-tar").toAbsolutePath(), ): Pair> /** diff --git a/src/main/kotlin/no/acntech/easycontainers/util/io/FileUtils.kt b/src/main/kotlin/no/acntech/easycontainers/util/io/FileUtils.kt index 9ef6d88e..972562bd 100644 --- a/src/main/kotlin/no/acntech/easycontainers/util/io/FileUtils.kt +++ b/src/main/kotlin/no/acntech/easycontainers/util/io/FileUtils.kt @@ -12,6 +12,7 @@ import java.nio.file.Files import java.nio.file.Path import java.nio.file.StandardCopyOption import kotlin.io.path.exists +import kotlin.io.path.isRegularFile import kotlin.io.path.name /** @@ -54,10 +55,12 @@ object FileUtils { } /** - * Creates a TAR file containing the specified directory. + * Creates a TAR archive of the specified directory. * - * @param dir The directory to be included in the TAR file. - * @return The TAR file as a [File] object. + * @param dir The directory to be included in the TAR archive. + * @param tarball The path of the TAR archive to be created. + * @param includeParentDir Specifies whether to include the parent directory in the TAR archive. Default is false. + * @return The path of the TAR archive. */ @Throws(IOException::class) fun tar( @@ -65,6 +68,8 @@ object FileUtils { tarball: Path = Files.createTempFile("tar-${dir.name}-", TAR_EXTENSION), includeParentDir: Boolean = false, ): Path { + require(dir.exists() && Files.isDirectory(dir)) { "The provided path '$dir' is not a directory." } + require(tarball.exists() && tarball.isRegularFile()) { "The provided tarball '$tarball' is not a valid file." } tar(dir, FileOutputStream(tarball.toFile()), includeParentDir) return tarball } @@ -123,8 +128,8 @@ object FileUtils { */ @Throws(IOException::class) fun tar(dir: Path, includeParentDir: Boolean = false): InputStream { - require(dir.exists() && Files.isDirectory(dir)) { "The provided path is not a directory." } - return pipe { tar(dir, it, includeParentDir) } + require(dir.exists() && Files.isDirectory(dir)) { "The provided path '$dir' is not a directory." } + return pipe { output -> tar(dir, output, includeParentDir) } } /** @@ -137,6 +142,9 @@ object FileUtils { */ @Throws(IOException::class) fun untarFile(tarFile: File, destination: Path = Files.createTempDirectory("untar-").toAbsolutePath()): Path { + require(tarFile.exists() && tarFile.isFile) { "The provided file '$tarFile' is not a valid file." } + require(destination.exists() && Files.isDirectory(destination)) { "The provided path '$destination' is not a directory." } + TarArchiveInputStream(BufferedInputStream(FileInputStream(tarFile))).use { tis -> val entry = tis.nextEntry if (entry != null && !entry.isDirectory) { @@ -167,6 +175,8 @@ object FileUtils { tarball: File, destination: Path = Files.createTempDirectory("untar-${tarball.name}-").toAbsolutePath(), ): Pair> { + require(tarball.exists() && tarball.isFile) { "The provided file '$tarball' is not a valid file." } + require(destination.exists() && Files.isDirectory(destination)) { "The provided path '$destination' is not a directory." } return untar(FileInputStream(tarball), destination) } @@ -184,7 +194,9 @@ object FileUtils { input: InputStream, destination: Path = Files.createTempDirectory("untar-").toAbsolutePath(), ): Pair> { - require(destination.exists() && Files.isDirectory(destination)) { "The provided path is not a directory." } + require(destination.exists() && Files.isDirectory(destination)) { + "The provided destination '$destination' is not a directory." + } log.trace("Untaring input to: $destination") val bufferedInput = if (input is BufferedInputStream) input else BufferedInputStream(input) diff --git a/src/main/kotlin/no/acntech/easycontainers/util/io/Functions.kt b/src/main/kotlin/no/acntech/easycontainers/util/io/Functions.kt index f7d6a044..dc86726d 100644 --- a/src/main/kotlin/no/acntech/easycontainers/util/io/Functions.kt +++ b/src/main/kotlin/no/acntech/easycontainers/util/io/Functions.kt @@ -28,18 +28,39 @@ fun ByteArrayOutputStream.toUtf8String(): String { * Executes the given (write) operation in a separate (virtual) thread and returns an [InputStream] that reads * from the result of the operation - effectively creating a "reactive" pipe between the [OutputStream] and * the [InputStream]. This means that this method is non-blocking and the operation is executed asynchronously. + * Streams are buffered. * - * @param operation The operation to be executed, which takes an [OutputStream] as input and returns a result of type T. + * @param operation The operation to execute in a separate thread. The operation receives an [OutputStream] as a parameter. + * The default operation is a no-op. * @return An [InputStream] that reads from the output of the operation. */ -fun pipe(operation: (OutputStream) -> T): InputStream { - val pipedOut = PipedOutputStream() - val pipedIn = BufferedInputStream(PipedInputStream(pipedOut)) +fun pipe( + operation: (OutputStream) -> T = { _: OutputStream -> null as T }, +): InputStream { + return pipe(operation, true) +} + +/** + * Executes the given (write) operation in a separate (virtual) thread and returns an [InputStream] that reads + * from the result of the operation - effectively creating a "reactive" pipe between the [OutputStream] and + * the [InputStream]. This means that this method is non-blocking and the operation is executed asynchronously. + * + * @param operation The operation to execute in a separate thread. The operation receives an [OutputStream] as a parameter. + * The default operation is a no-op. + * @param buffered Specifies whether the [InputStream] and [OutputStream] should be buffered. Default is true. + * @return An [InputStream] that reads from the output of the operation. + */ +fun pipe( + operation: (OutputStream) -> T = { _: OutputStream -> null as T }, + buffered: Boolean = true, +): InputStream { + val output = PipedOutputStream() + val input = if (buffered) BufferedInputStream(PipedInputStream(output)) else PipedInputStream(output) Thread.startVirtualThread { - operation(pipedOut) + operation(if (buffered) BufferedOutputStream(output) else output) } - return pipedIn + return input }