Skip to content

Commit

Permalink
Fix param naming issues, improve pipe function with optional buffering
Browse files Browse the repository at this point in the history
  • Loading branch information
thomas-muller666 committed Apr 3, 2024
1 parent 3a7c1e9 commit 7e9f6ee
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,12 @@ abstract class AbstractContainerRuntime(
* Retrieves the directory from the remote Unix system and saves it to the local file system.
*
* @param remoteDir The remote directory path to retrieve.
* @param localPath The local path where the directory should be saved. Defaults to a temporary directory.
* @param localDir The local path where the directory should be saved. Defaults to a temporary directory.
* @return A pair containing the local path where the directory was saved and a list of paths for each file in the directory.
*/
internal abstract fun getDirectory(
remoteDir: UnixDir,
localPath: Path = Files.createTempDirectory("container-download-tar").toAbsolutePath(),
localDir: Path = Files.createTempDirectory("container-download-tar").toAbsolutePath(),
): Pair<Path, List<Path>>

/**
Expand Down
4 changes: 2 additions & 2 deletions src/main/kotlin/no/acntech/easycontainers/GenericContainer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ open class GenericContainer(
return runtime.putDirectory(localDir, remoteDir)
}

override fun getDirectory(remoteDir: UnixDir, localPath: Path): Pair<Path, List<Path>> {
return runtime.getDirectory(remoteDir, localPath)
override fun getDirectory(remoteDir: UnixDir, localDir: Path): Pair<Path, List<Path>> {
return runtime.getDirectory(remoteDir, localDir)
}

@Synchronized
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,8 @@ abstract class K8sRuntime(
return countingInput.bytesRead
}

override fun getDirectory(remoteDir: UnixDir, localPath: Path): Pair<Path, List<Path>> {
require(localPath.exists() && Files.isDirectory(localPath)) { "The provided path is not a directory." }
override fun getDirectory(remoteDir: UnixDir, localDir: Path): Pair<Path, List<Path>> {
require(localDir.exists() && Files.isDirectory(localDir)) { "The provided path is not a directory." }

// Define the remote directory path
val remotePath = remoteDir.unwrap()
Expand Down Expand Up @@ -317,19 +317,19 @@ abstract class K8sRuntime(

if ((exitCode != null && exitCode != 0) || stdErr != null) {
val msg =
"Transferring directory '$remotePath' to local path '$localPath' failed with code $exitCode and error msg: $stdErr"
"Transferring directory '$remotePath' to local path '$localDir' failed with code $exitCode and error msg: $stdErr"
log.error(msg)
throw ContainerException(msg)
}

// Create the local directory if it doesn't exist
if (!Files.exists(localPath)) {
Files.createDirectories(localPath).also {
log.debug("Untar prep - creating directory: $localPath")
if (!Files.exists(localDir)) {
Files.createDirectories(localDir).also {
log.debug("Untar prep - creating directory: $localDir")
}
}

return FileUtils.untar(pipedInput, localPath)
return FileUtils.untar(pipedInput, localDir)
}

override fun getDuration(): Duration? {
Expand Down
4 changes: 2 additions & 2 deletions src/main/kotlin/no/acntech/easycontainers/model/Container.kt
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,11 @@ interface Container {
* Downloads a directory from the container.
*
* @param remoteDir the path of the directory to download
* @param localPath the path where the directory will be downloaded to
* @param localDir the path where the directory will be downloaded to
*/
fun getDirectory(
remoteDir: UnixDir,
localPath: Path = Files.createTempDirectory("container-download-tar").toAbsolutePath(),
localDir: Path = Files.createTempDirectory("container-download-tar").toAbsolutePath(),
): Pair<Path, List<Path>>

/**
Expand Down
24 changes: 18 additions & 6 deletions src/main/kotlin/no/acntech/easycontainers/util/io/FileUtils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardCopyOption
import kotlin.io.path.exists
import kotlin.io.path.isRegularFile
import kotlin.io.path.name

/**
Expand Down Expand Up @@ -54,17 +55,21 @@ object FileUtils {
}

/**
* Creates a TAR file containing the specified directory.
* Creates a TAR archive of the specified directory.
*
* @param dir The directory to be included in the TAR file.
* @return The TAR file as a [File] object.
* @param dir The directory to be included in the TAR archive.
* @param tarball The path of the TAR archive to be created.
* @param includeParentDir Specifies whether to include the parent directory in the TAR archive. Default is false.
* @return The path of the TAR archive.
*/
@Throws(IOException::class)
fun tar(
dir: Path,
tarball: Path = Files.createTempFile("tar-${dir.name}-", TAR_EXTENSION),
includeParentDir: Boolean = false,
): Path {
require(dir.exists() && Files.isDirectory(dir)) { "The provided path '$dir' is not a directory." }
require(tarball.exists() && tarball.isRegularFile()) { "The provided tarball '$tarball' is not a valid file." }
tar(dir, FileOutputStream(tarball.toFile()), includeParentDir)
return tarball
}
Expand Down Expand Up @@ -123,8 +128,8 @@ object FileUtils {
*/
@Throws(IOException::class)
fun tar(dir: Path, includeParentDir: Boolean = false): InputStream {
require(dir.exists() && Files.isDirectory(dir)) { "The provided path is not a directory." }
return pipe { tar(dir, it, includeParentDir) }
require(dir.exists() && Files.isDirectory(dir)) { "The provided path '$dir' is not a directory." }
return pipe { output -> tar(dir, output, includeParentDir) }
}

/**
Expand All @@ -137,6 +142,9 @@ object FileUtils {
*/
@Throws(IOException::class)
fun untarFile(tarFile: File, destination: Path = Files.createTempDirectory("untar-").toAbsolutePath()): Path {
require(tarFile.exists() && tarFile.isFile) { "The provided file '$tarFile' is not a valid file." }
require(destination.exists() && Files.isDirectory(destination)) { "The provided path '$destination' is not a directory." }

TarArchiveInputStream(BufferedInputStream(FileInputStream(tarFile))).use { tis ->
val entry = tis.nextEntry
if (entry != null && !entry.isDirectory) {
Expand Down Expand Up @@ -167,6 +175,8 @@ object FileUtils {
tarball: File,
destination: Path = Files.createTempDirectory("untar-${tarball.name}-").toAbsolutePath(),
): Pair<Path, List<Path>> {
require(tarball.exists() && tarball.isFile) { "The provided file '$tarball' is not a valid file." }
require(destination.exists() && Files.isDirectory(destination)) { "The provided path '$destination' is not a directory." }
return untar(FileInputStream(tarball), destination)
}

Expand All @@ -184,7 +194,9 @@ object FileUtils {
input: InputStream,
destination: Path = Files.createTempDirectory("untar-").toAbsolutePath(),
): Pair<Path, List<Path>> {
require(destination.exists() && Files.isDirectory(destination)) { "The provided path is not a directory." }
require(destination.exists() && Files.isDirectory(destination)) {
"The provided destination '$destination' is not a directory."
}
log.trace("Untaring input to: $destination")

val bufferedInput = if (input is BufferedInputStream) input else BufferedInputStream(input)
Expand Down
33 changes: 27 additions & 6 deletions src/main/kotlin/no/acntech/easycontainers/util/io/Functions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,39 @@ fun ByteArrayOutputStream.toUtf8String(): String {
* Executes the given (write) operation in a separate (virtual) thread and returns an [InputStream] that reads
* from the result of the operation - effectively creating a "reactive" pipe between the [OutputStream] and
* the [InputStream]. This means that this method is non-blocking and the operation is executed asynchronously.
* Streams are buffered.
*
* @param operation The operation to be executed, which takes an [OutputStream] as input and returns a result of type T.
* @param operation The operation to execute in a separate thread. The operation receives an [OutputStream] as a parameter.
* The default operation is a no-op.
* @return An [InputStream] that reads from the output of the operation.
*/
fun <T> pipe(operation: (OutputStream) -> T): InputStream {
val pipedOut = PipedOutputStream()
val pipedIn = BufferedInputStream(PipedInputStream(pipedOut))
fun <T> pipe(
operation: (OutputStream) -> T = { _: OutputStream -> null as T },
): InputStream {
return pipe(operation, true)
}

/**
* Executes the given (write) operation in a separate (virtual) thread and returns an [InputStream] that reads
* from the result of the operation - effectively creating a "reactive" pipe between the [OutputStream] and
* the [InputStream]. This means that this method is non-blocking and the operation is executed asynchronously.
*
* @param operation The operation to execute in a separate thread. The operation receives an [OutputStream] as a parameter.
* The default operation is a no-op.
* @param buffered Specifies whether the [InputStream] and [OutputStream] should be buffered. Default is true.
* @return An [InputStream] that reads from the output of the operation.
*/
fun <T> pipe(
operation: (OutputStream) -> T = { _: OutputStream -> null as T },
buffered: Boolean = true,
): InputStream {
val output = PipedOutputStream()
val input = if (buffered) BufferedInputStream(PipedInputStream(output)) else PipedInputStream(output)

Thread.startVirtualThread {
operation(pipedOut)
operation(if (buffered) BufferedOutputStream(output) else output)
}

return pipedIn
return input
}

0 comments on commit 7e9f6ee

Please sign in to comment.