Skip to content

Commit

Permalink
[qob] Better compiler stage names for QOB (#12592)
Browse files Browse the repository at this point in the history
* [qob] Better compiler stage names for QOB

* fix
  • Loading branch information
tpoterba authored Jan 17, 2023
1 parent 827fce1 commit 4461c47
Show file tree
Hide file tree
Showing 8 changed files with 9 additions and 5 deletions.
1 change: 1 addition & 0 deletions hail/src/main/scala/is/hail/backend/Backend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ abstract class Backend {
backendContext: BackendContext,
fs: FS,
collection: Array[Array[Byte]],
stageIdentifier: String,
dependency: Option[TableStageDependency] = None
)(
f: (Array[Byte], HailTaskContext, HailClassLoader, FS) => Array[Byte]
Expand Down
2 changes: 1 addition & 1 deletion hail/src/main/scala/is/hail/backend/BackendUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class BackendUtils(mods: Array[(String, (HailClassLoader, FS, Int, Region) => Ba
} else {
val globalsBC = backend.broadcast(globals)
val fsConfigBC = backend.broadcast(fs.getConfiguration())
backend.parallelizeAndComputeWithIndex(backendContext, fs, contexts, tsd)({ (ctx, htc, theHailClassLoader, fs) =>
backend.parallelizeAndComputeWithIndex(backendContext, fs, contexts, stageName, tsd)({ (ctx, htc, theHailClassLoader, fs) =>
val fsConfig = fsConfigBC.value
val gs = globalsBC.value
fs.setConfiguration(fsConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ class LocalBackend(
backendContext: BackendContext,
fs: FS,
collection: Array[Array[Byte]],
stageIdentifier: String,
dependency: Option[TableStageDependency] = None
)(
f: (Array[Byte], HailTaskContext, HailClassLoader, FS) => Array[Byte]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ class ServiceBackend(
_backendContext: BackendContext,
_fs: FS,
collection: Array[Array[Byte]],
stageIdentifier: String,
dependency: Option[TableStageDependency] = None
)(f: (Array[Byte], HailTaskContext, HailClassLoader, FS) => Array[Byte]
): Array[Array[Byte]] = {
Expand Down Expand Up @@ -180,7 +181,7 @@ class ServiceBackend(
JString(s"$n"))),
"type" -> JString("jvm")),
"attributes" -> JObject(
"name" -> JString(name + "_" + stageCount + "_" + i),
"name" -> JString(s"${ name }_stage${ stageCount }_${ stageIdentifier }_job$i"),
),
"mount_tokens" -> JBool(true),
"resources" -> resources,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ class SparkBackend(
backendContext: BackendContext,
fs: FS,
collection: Array[Array[Byte]],
stageIdentifier: String,
dependency: Option[TableStageDependency] = None
)(
f: (Array[Byte], HailTaskContext, HailClassLoader, FS) => Array[Byte]
Expand Down
2 changes: 1 addition & 1 deletion hail/src/main/scala/is/hail/expr/ir/Emit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2448,7 +2448,7 @@ class Emit[C](
emitI(dynamicID).consume(cb,
(),
{ dynamicID =>
cb.assign(stageName, stageName.concat(": ").concat(dynamicID.asString.loadString(cb)))
cb.assign(stageName, stageName.concat("|").concat(dynamicID.asString.loadString(cb)))
})

cb.assign(encRes, spark.invoke[BackendContext, HailClassLoader, FS, String, Array[Array[Byte]], Array[Byte], String, Option[TableStageDependency], Array[Array[Byte]]](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1038,7 +1038,7 @@ object LowerTableIR {
FastIndexedSeq(howManyPartsToTryRef.name -> howManyPartsToTry, iteration.name -> 0),
bindIR(loweredChild.mapContexts(_ => StreamTake(ToStream(childContexts), howManyPartsToTryRef)){ ctx: IR => ctx }
.mapCollect(relationalLetsAbove, "table_head_recursive_count",
strConcat(Str("iteration="), invoke("str", TString, iteration), Str(", nParts="), invoke("str", TString, howManyPartsToTryRef))
strConcat(Str("iteration="), invoke("str", TString, iteration), Str(",nParts="), invoke("str", TString, howManyPartsToTryRef))
)(streamLenOrMax)) { counts =>
If((Cast(streamSumIR(ToStream(counts)), TInt64) >= targetNumRows) || (ArrayLen(childContexts) <= ArrayLen(counts)),
counts,
Expand Down
2 changes: 1 addition & 1 deletion hail/src/main/scala/is/hail/io/vcf/LoadVCF.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1607,7 +1607,7 @@ object MatrixVCFReader {
val localFilterAndReplace = params.filterAndReplace

val fsConfigBC = backend.broadcast(fs.getConfiguration())
backend.parallelizeAndComputeWithIndex(ctx.backendContext, fs, files.tail.map(_.getBytes), None) { (bytes, htc, _, fs) =>
backend.parallelizeAndComputeWithIndex(ctx.backendContext, fs, files.tail.map(_.getBytes), "load_vcf_parse_header", None) { (bytes, htc, _, fs) =>
val fsConfig = fsConfigBC.value
fs.setConfiguration(fsConfig)
val file = new String(bytes)
Expand Down

0 comments on commit 4461c47

Please sign in to comment.