Skip to content

Commit

Permalink
Windows support
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiimk committed Jul 1, 2020
1 parent 1f62843 commit 49acdd1
Show file tree
Hide file tree
Showing 19 changed files with 273 additions and 155 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.20.0] - 2020-06-30
### Added
- Windows is somewhat supported now

## [0.19.0] - 2020-06-28
### Changed
- Improving Windows support by removing Hadoop FS dependencies
Expand Down
2 changes: 1 addition & 1 deletion core.coordinator/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

<Loggers>
<Logger name="org.apache.hive.jdbc" level="warn"/>
<Logger name="dev.kamu.cli" level="info" additivity="false">
<Logger name="dev.kamu" level="info" additivity="false">
<AppenderRef ref="console-cli"/>
</Logger>
<Root level="info">
Expand Down
6 changes: 5 additions & 1 deletion core.coordinator/src/main/scala/dev/kamu/cli/Kamu.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,23 @@ import dev.kamu.cli.metadata.MetadataRepository
import dev.kamu.cli.output._
import dev.kamu.core.utils.fs._
import dev.kamu.core.utils.{Clock, DockerClient}
import org.apache.logging.log4j.Level
import org.apache.logging.log4j.{Level, LogManager}

class Kamu(
config: KamuConfig,
systemClock: Clock
) {
val logger = LogManager.getLogger(getClass.getName)

val workspaceLayout = WorkspaceLayout(
kamuRootDir = config.kamuRoot,
metadataDir = config.kamuRoot / "datasets",
remotesDir = config.kamuRoot / "remotes",
localVolumeDir = config.localVolume
).toAbsolute

logger.debug("Workspace root: {}", workspaceLayout.kamuRootDir)

val metadataRepository =
new MetadataRepository(workspaceLayout, systemClock)

Expand Down
2 changes: 1 addition & 1 deletion core.coordinator/src/main/scala/dev/kamu/cli/KamuApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ object KamuApp extends App {
val cliArgs = new CliArgs(args)

Configurator.setLevel(
getClass.getPackage.getName,
"dev.kamu",
if (cliArgs.debug()) Level.ALL else cliArgs.logLevel()
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,15 @@ class SQLServerCommand(
// TODO: Avoid thrift ecxeption during testing of the port
val hostPort = livyProcess.waitForHostPort(containerPort, 15 seconds)

logger.info(s"Server is running at: jdbc:hive2://localhost:$hostPort")
logger.info(
s"Server is running at: jdbc:hive2://${dockerClient.getDockerHost}:${hostPort}"
)

try {
livyProcess.join()
} finally {
livyProcess.kill()
livyProcess.stop()
livyProcess.join()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,15 @@ class SQLShellCommand(
)

try {
val hostPort = livyProcess.waitForHostPort(containerPort, 15 seconds)
val livyUrl = URI.create(s"jdbc:hive2://localhost:$hostPort")
val hostPort = livyProcess.waitForHostPort(containerPort, 60 seconds)
val livyUrl =
URI.create(s"jdbc:hive2://${dockerClient.getDockerHost}:${hostPort}")
logger.debug(s"Resolved Livy URL: $livyUrl")

body(livyUrl)
} finally {
livyProcess.kill()
livyProcess.stop()
livyProcess.join()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
package dev.kamu.cli.external

object DockerImages {
val SPARK = "kamudata/engine-spark:0.6.0"
val FLINK = "kamudata/engine-flink:0.4.0"
val SPARK = "kamudata/engine-spark:0.7.0"
val FLINK = "kamudata/engine-flink:0.5.0"

val LIVY = SPARK
val JUPYTER = "kamudata/jupyter-uber:0.0.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import dev.kamu.core.utils.{
DockerClient,
DockerProcess,
DockerProcessBuilder,
DockerRunArgs
DockerRunArgs,
OS
}
import org.apache.logging.log4j.LogManager

Expand Down Expand Up @@ -57,24 +58,25 @@ class JupyterDockerProcessBuilder(

// TODO: avoid this by setting up correct user inside the container
def chown(): Unit = {
logger.debug("Fixing file ownership")

val unix = new com.sun.security.auth.module.UnixSystem()
val shellCommand = Seq(
"chown",
"-R",
s"${unix.getUid}:${unix.getGid}",
"/opt/workdir"
)
if (!OS.isWindows) {
logger.debug("Fixing file ownership")

val shellCommand = Seq(
"chown",
"-R",
s"${OS.uid}:${OS.gid}",
"/opt/workdir"
)

dockerClient.runShell(
DockerRunArgs(
image = runArgs.image,
volumeMap =
Map(Paths.get("").toAbsolutePath -> Paths.get("/opt/workdir"))
),
shellCommand
)
dockerClient.runShell(
DockerRunArgs(
image = runArgs.image,
volumeMap =
Map(Paths.get("").toAbsolutePath -> Paths.get("/opt/workdir"))
),
shellCommand
)
}
}
}

Expand Down Expand Up @@ -139,23 +141,26 @@ class JupyterDockerProcess(
}

def openBrowserWhenReady(): Unit = {
if (Desktop.isDesktopSupported && Desktop.getDesktop.isSupported(
Desktop.Action.BROWSE
)) {
val browserOpenerThread = new Thread {
override def run(): Unit = {
val token = waitForToken()

val hostPort = getHostPort(80).get
val uri = URI.create(s"http://localhost:$hostPort/?token=$token")

logger.info(s"Opening in browser: $uri")
Desktop.getDesktop.browse(uri)
}
}
if (!Desktop.isDesktopSupported ||
!Desktop.getDesktop.isSupported(Desktop.Action.BROWSE))
return

val browserOpenerThread = new Thread {
override def run(): Unit = {
val token = waitForToken()

val hostPort = getHostPort(80).get
val uri = URI.create(
s"http://${dockerClient.getDockerHost}:$hostPort/?token=$token"
)

browserOpenerThread.setDaemon(true)
browserOpenerThread.start()
logger.info(s"Opening in browser: $uri")
Desktop.getDesktop.browse(uri)
}
}

browserOpenerThread.setDaemon(true)
browserOpenerThread.start()

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,14 @@ class NotebookRunnerDocker(
var jupyterProcess: JupyterDockerProcess = null

def stopAll(): Unit = {
if (livyProcess != null)
livyProcess.kill()
if (jupyterProcess != null)
jupyterProcess.kill()
if (livyProcess != null) {
livyProcess.stop()
livyProcess.join()
}
if (jupyterProcess != null) {
jupyterProcess.stop()
jupyterProcess.join()
}
}

Signal.handle(new Signal("INT"), new SignalHandler {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,13 +283,16 @@ class IngestService(
prepDataPath: Path,
vocabulary: DatasetVocabulary
): MetadataBlock = {
val layout = metadataRepository.getDatasetLayout(datasetID)

val request = IngestRequest(
datasetID = datasetID,
source = source,
datasetLayout = metadataRepository.getDatasetLayout(datasetID),
dataToIngest = prepDataPath,
ingestPath = prepDataPath.toString,
eventTime = eventTime,
datasetVocab = vocabulary
source = source,
datasetVocab = vocabulary,
dataDir = layout.dataDir.toString,
checkpointsDir = layout.checkpointsDir.toString
)

val engine =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ class MetadataRepository(
layout.checkpointsDir,
layout.metadataDir,
workspaceLayout.metadataDir.resolve(id.toString)
).foreach(p => File(p).delete())
).foreach(p => File(p).delete(true))
}

////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2018 kamu.dev
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

package dev.kamu.cli.transform

import java.nio.file.{Path, Paths}

import dev.kamu.core.utils.OS

trait EngineUtils {
/////////////////////////////////////////////////////////////////////////////
// Path mappings between host and container
/////////////////////////////////////////////////////////////////////////////

protected val volumeDirInContainer: String = "/opt/engine/volume"
protected val inOutDirInContainer: String = "/opt/engine/in-out"

protected def isSubPathOf(p: Path, parent: Path): Boolean = {
var pp = p.getParent
while (pp != null) {
if (pp == parent)
return true
pp = pp.getParent
}
false
}

protected def toContainerPath(ps: String, volumePath: Path): String = {
val p = Paths.get(ps).normalize().toAbsolutePath
val rel = volumePath.relativize(p)
val x = if (!OS.isWindows) {
Paths.get(volumeDirInContainer).resolve(rel).toString
} else {
volumeDirInContainer + "/" + rel.toString.replace("\\", "/")
}
println(s"Mapped path $ps to $x")
x
}
}
Loading

0 comments on commit 49acdd1

Please sign in to comment.