Skip to content

Commit

Permalink
Refactor and cleanup API/code
Browse files Browse the repository at this point in the history
  • Loading branch information
thomas-muller666 committed Apr 8, 2024
1 parent a4bad6b commit e12217b
Show file tree
Hide file tree
Showing 31 changed files with 1,253 additions and 745 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,10 @@ For issues and bugs please submit an issue on the [GitHub repository](https://gi
TODO

## Roadmap
- [ ] Add support for Kubernetes Jobs as a container runtime.
- [x] Add support for Kubernetes Jobs as a container runtime.
- [ ] Add conditional wait strategies for containers - similar to Testcontainers, see [here](https://java.testcontainers.org/features/startup_and_waits/).
- [ ] Add specific container implementations for popular databases and services.
- [ ] Add support for multi-pod/multi-container deployments in both Docker and Kubernetes.
- [ ] Convert all tests to use [Testcontainers](https://testcontainers.com/) - using either the official [K3s module](https://java.testcontainers.org/modules/k3s/), or the community contributed [KinD module](https://testcontainers.com/modules/kindcontainer/).

## Contributing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,15 @@ abstract class AbstractContainerRuntime(

override fun start() {
container.builder.maxLifeTime?.let {
terminateFuture = SCHEDULER.schedule(TerminateTask(), it.toSeconds(), TimeUnit.SECONDS)
terminateFuture = SCHEDULER.schedule(TerminateTask(), it.toSeconds(), TimeUnit.SECONDS).also {
log.info("Container '${container.getName()}' will be terminated in ${it.getDelay(TimeUnit.SECONDS)} seconds")
}
}
}

override fun stop() {
terminateFuture?.cancel(false)
container.changeState(ContainerState.STOPPED)
}

internal abstract fun execute(
Expand Down
32 changes: 17 additions & 15 deletions src/main/kotlin/no/acntech/easycontainers/GenericContainer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ open class GenericContainer(
internal val builder: GenericContainerBuilder,
) : Container {

class GenericContainerBuilder : BaseContainerBuilder<GenericContainerBuilder>() {
open class GenericContainerBuilder : BaseContainerBuilder<GenericContainerBuilder>() {

override fun self(): GenericContainerBuilder {
return this
Expand All @@ -42,13 +42,25 @@ open class GenericContainer(
}

companion object {
private val log: Logger = LoggerFactory.getLogger(GenericContainer::class.java)

private val LEGAL_STATE_TRANSITIONS: Map<ContainerState, Set<ContainerState>> = mapOf(
ContainerState.UNINITIATED to setOf(ContainerState.INITIALIZING),
ContainerState.INITIALIZING to setOf(ContainerState.RUNNING, ContainerState.FAILED),
ContainerState.RUNNING to setOf(ContainerState.TERMINATING, ContainerState.STOPPED, ContainerState.FAILED),
ContainerState.TERMINATING to setOf(ContainerState.STOPPED, ContainerState.DELETED, ContainerState.FAILED),
ContainerState.STOPPED to setOf(ContainerState.DELETED, ContainerState.FAILED),
ContainerState.DELETED to emptySet(),
ContainerState.FAILED to emptySet(),
ContainerState.UNKNOWN to ContainerState.entries.toSet()
)

fun builder(): ContainerBuilder<*> {
return GenericContainerBuilder()
}
}

protected val log: Logger = LoggerFactory.getLogger(javaClass)

private var runtime: AbstractContainerRuntime = when (builder.containerPlatformType) {
ContainerPlatformType.DOCKER -> DockerRuntime(this)
ContainerPlatformType.KUBERNETES -> {
Expand Down Expand Up @@ -253,24 +265,14 @@ open class GenericContainer(
internal fun requireOneOfStates(vararg states: ContainerState) {
if (states.isNotEmpty() && !states.contains(state)) {
throw ContainerException(
"Illegal state: container '${getName()}' is in state '${getState()}'" +
", but required one of '${states.joinToString()}'"
"Illegal state: container '${getName()}' is in state '${getState()}', " +
"but required one of '${states.joinToString()}'"
)
}
}

@Synchronized
internal fun isLegalStateChange(oldState: ContainerState = getState(), newState: ContainerState): Boolean {
return when (oldState) {
ContainerState.UNINITIATED -> newState == ContainerState.INITIALIZING
ContainerState.INITIALIZING -> newState == ContainerState.RUNNING || newState == ContainerState.FAILED
ContainerState.RUNNING -> newState == ContainerState.TERMINATING || newState == ContainerState.FAILED
ContainerState.TERMINATING -> newState == ContainerState.STOPPED || newState == ContainerState.DELETED || newState == ContainerState.FAILED
ContainerState.STOPPED -> newState == ContainerState.DELETED || newState == ContainerState.FAILED
ContainerState.DELETED -> false
ContainerState.FAILED -> false
ContainerState.UNKNOWN -> true
}
return newState in (LEGAL_STATE_TRANSITIONS[oldState] ?: emptySet())
}

}
2 changes: 1 addition & 1 deletion src/main/kotlin/no/acntech/easycontainers/ImageBuilder.kt
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import java.time.Instant
* // other properties
* </code></pre>
* <p>
* For kind k8s the shared folder can be anywhere on the host file system, but the kind cluster must be configured
* For KinD k8s the shared folder can be anywhere on the host file system, but the kind cluster must be configured
* using a custom config applied at cluster startup in order for containers to mount the shared folder.
* <p>
* Example:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import com.github.dockerjava.api.model.PushResponseItem
import no.acntech.easycontainers.ContainerException
import no.acntech.easycontainers.ImageBuilder
import no.acntech.easycontainers.model.ImageTag
import no.acntech.easycontainers.util.lang.toMap
import no.acntech.easycontainers.util.collections.prettyPrint
import no.acntech.easycontainers.util.lang.asStringMap
import no.acntech.easycontainers.util.lang.prettyPrintMe
import no.acntech.easycontainers.util.text.NEW_LINE
import java.nio.file.Path
import java.time.Duration
Expand Down Expand Up @@ -102,7 +104,7 @@ internal class DockerImageBuilder(
log.info("Build response item (progress): {}", item.progressDetail)

// Stringify the BuildResponseItem with all its properties
log.info("BuildResponseItem: " + item.toMap().toString())
log.info("BuildResponseItem: " + item.prettyPrintMe())

if (item.isErrorIndicated) {
log.error("Error building image: {}", item.errorDetail?.message)
Expand Down
51 changes: 37 additions & 14 deletions src/main/kotlin/no/acntech/easycontainers/docker/DockerRuntime.kt
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ internal class DockerRuntime(
private val dockerClient: DockerClient = DockerClientFactory.createDefaultClient(),
) : AbstractContainerRuntime(container) {

private inner class LogWatcher : Runnable {
private inner class EventSubscriber : Runnable {
override fun run() {
dockerClient.logContainerCmd(containerId.get())
.withStdOut(true)
Expand All @@ -73,10 +73,25 @@ internal class DockerRuntime(

override fun onError(throwable: Throwable) {
log.warn("Container '${getDisplayName()}' output error", throwable)
container.changeState(ContainerState.FAILED)
}

override fun onComplete() {
log.info("Container '${getDisplayName()}' output complete")
container.changeState(ContainerState.STOPPED)

guardedExecution({
val containerInfo = dockerClient.inspectContainerCmd(containerId.get()).exec()
setFinishedTime(containerInfo)
setExitCode(containerInfo)
log.info("Container '${getDisplayName()}' finished at $finishedAt with exit code: $exitCode")
})

if (container.isEphemeral()) {
cleanUpResources()
container.changeState(ContainerState.DELETED)
}

}

}).awaitCompletion()
Expand All @@ -85,7 +100,9 @@ internal class DockerRuntime(

private val containerId: AtomicReference<String> = AtomicReference()

private var exitCode: Int? = null
private val exitCode: AtomicReference<Int> = AtomicReference()

private val finishedAt: AtomicReference<Instant> = AtomicReference()

private var ipAddress: InetAddress? = null

Expand All @@ -95,8 +112,6 @@ internal class DockerRuntime(

private var startedAt: Instant? = null

private var finishedAt: Instant? = null

override fun getType(): ContainerPlatformType {
return ContainerPlatformType.DOCKER
}
Expand All @@ -112,12 +127,16 @@ internal class DockerRuntime(
container.changeState(ContainerState.INITIALIZING, ContainerState.UNINITIATED)
pullImage()
createAndStartContainer()
GENERAL_EXECUTOR_SERVICE.submit(LogWatcher())
GENERAL_EXECUTOR_SERVICE.submit(EventSubscriber())
super.start()
container.changeState(ContainerState.RUNNING, ContainerState.INITIALIZING)
}

override fun stop() {
if (container.getState() == ContainerState.STOPPED || container.getState() == ContainerState.TERMINATING) {
log.debug("Container is already stopped: ${getDisplayName()}")
return
}
container.changeState(ContainerState.TERMINATING, ContainerState.RUNNING)

val callback = object : WaitContainerResultCallback() {
Expand All @@ -130,7 +149,6 @@ internal class DockerRuntime(

guardedExecution(
{

dockerClient.stopContainerCmd(containerId.get()).exec()
dockerClient.waitContainerCmd(containerId.get()).exec(callback).awaitCompletion().also {
log.info("Container successfully stopped: ${getDisplayName()}")
Expand Down Expand Up @@ -182,6 +200,11 @@ internal class DockerRuntime(
}

override fun delete(force: Boolean) {
if (container.getState() == ContainerState.DELETED) {
log.debug("Container is already deleted: ${getDisplayName()}")
return
}

if (container.getState() == ContainerState.RUNNING) {
kill()
}
Expand Down Expand Up @@ -364,20 +387,20 @@ internal class DockerRuntime(

override fun getDuration(): Duration? {
return startedAt?.let { start ->
if (finishedAt == null) {
if (finishedAt.get() == null) {
setFinishedTime(inspectContainer())
}
val end = finishedAt ?: Instant.now()
val end = finishedAt.get() ?: Instant.now()
Duration.between(start, end)
}
}

override fun getExitCode(): Int? {
if (exitCode == null && containerId.get() != null && !container.isEphemeral()) {
if (exitCode.get() == null && containerId.get() != null && !container.isEphemeral()) {
val containerInfo = dockerClient.inspectContainerCmd(containerId.get()).exec()
exitCode = containerInfo.state.exitCodeLong?.toInt()
setExitCode(containerInfo)
}
return exitCode
return exitCode.get()
}

override fun getHost(): Host? {
Expand Down Expand Up @@ -494,7 +517,7 @@ internal class DockerRuntime(
setFinishedTime(info)
setExitCode(info)
} else {
finishedAt = Instant.now()
finishedAt.compareAndSet(null, Instant.now())
}
}

Expand Down Expand Up @@ -546,12 +569,12 @@ internal class DockerRuntime(

private fun setFinishedTime(containerInfo: InspectContainerResponse) {
containerInfo.state.finishedAt?.let {
finishedAt = Instant.parse(it)
finishedAt.set(Instant.parse(it))
}
}

private fun setExitCode(containerInfo: InspectContainerResponse) {
exitCode = containerInfo.state.exitCodeLong?.toInt()
exitCode.set(containerInfo.state.exitCodeLong?.toInt())
}

private fun createContainerCommand(image: String, hostConfig: HostConfig): CreateContainerCmd {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import no.acntech.easycontainers.util.text.SPACE
import no.acntech.easycontainers.util.text.truncate
import no.acntech.easycontainers.util.time.WaitTimeCalculator
import org.apache.commons.io.output.TeeOutputStream
import org.awaitility.Awaitility
import org.awaitility.core.ConditionTimeoutException
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.io.ByteArrayOutputStream
Expand All @@ -27,14 +29,21 @@ import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference

internal class CommandSupport(
/**
* Handles executing commands within a Kubernetes pod's container.
*
* @property client the Kubernetes client
* @property pod the pod to execute the command in
* @property container the container to execute the command in
*/
internal class ExecHandler(
private val client: KubernetesClient,
private val pod: Pod,
private val container: Container,
) {

companion object {
private val log: Logger = LoggerFactory.getLogger(CommandSupport::class.java)
private val log: Logger = LoggerFactory.getLogger(ExecHandler::class.java)
}

/**
Expand Down Expand Up @@ -348,4 +357,37 @@ internal class CommandSupport(
// Command
.exec(*command.toTypedArray())
}

private fun execPollWait(
waitTimeValue: Long?,
waitTimeUnit: TimeUnit?,
alive: AtomicBoolean,
stdErr: ByteArrayOutputStream,
stdOut: ByteArrayOutputStream,
) {
log.trace("Poll-waiting for command execution to finish...")
try {
Awaitility.await()
.atMost(waitTimeValue ?: 30, waitTimeUnit ?: TimeUnit.SECONDS)
.pollInterval(500, TimeUnit.MILLISECONDS)
.until {
log.trace("Polling... alive=${alive.get()}")
!alive.get()
}
} catch (e: ConditionTimeoutException) {
val errorOut = stdErr.toString()
if (errorOut.isNotEmpty()) {
log.error("Error executing command: $errorOut")
throw ContainerException("Error executing command: ${errorOut.truncate(32)}")
}

val standardOut = stdOut.toString()
if (standardOut.isNotEmpty()) {
log.info("Command execution timed out, but std output was captured: ${standardOut.truncate(32)}")
} else {
log.warn("Command execution timed out and no std output was captured")
}
}
}

}
Loading

0 comments on commit e12217b

Please sign in to comment.