Skip to content

Commit

Permalink
[query] Use SourcePos for implicit timing context
Browse files Browse the repository at this point in the history
  • Loading branch information
ehigham committed Oct 25, 2024
1 parent 70d2769 commit 100c354
Show file tree
Hide file tree
Showing 10 changed files with 18 additions and 24 deletions.
3 changes: 1 addition & 2 deletions hail/src/main/scala/is/hail/HailContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import org.apache.spark.executor.InputMetrics
import org.apache.spark.rdd.RDD
import org.json4s.Extraction
import org.json4s.jackson.JsonMethods
import sourcecode.Enclosing

case class FilePartition(index: Int, file: String) extends Partition

Expand All @@ -42,7 +41,7 @@ object HailContext {

def backend: Backend = get.backend

def sparkBackend(implicit E: Enclosing): SparkBackend = get.backend.asSpark
def sparkBackend(implicit E: SourcePos): SparkBackend = get.backend.asSpark

def configureLogging(logFile: String, quiet: Boolean, append: Boolean): Unit = {
org.apache.log4j.helpers.LogLog.setInternalDebugging(true)
Expand Down
5 changes: 2 additions & 3 deletions hail/src/main/scala/is/hail/backend/Backend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@ import is.hail.io.fs.FS
import is.hail.types.RTable
import is.hail.types.encoded.EType
import is.hail.types.physical.PTuple
import is.hail.utils.fatal
import is.hail.utils.{fatal, SourcePos}

import scala.reflect.ClassTag

import java.io.{Closeable, OutputStream}

import com.fasterxml.jackson.core.StreamReadConstraints
import sourcecode.Enclosing

object Backend {

Expand Down Expand Up @@ -85,7 +84,7 @@ abstract class Backend extends Closeable {

def close(): Unit

def asSpark(implicit E: Enclosing): SparkBackend =
def asSpark(implicit E: SourcePos): SparkBackend =
fatal(s"${getClass.getSimpleName}: ${E.value} requires SparkBackend")

def lowerDistributedSort(
Expand Down
6 changes: 2 additions & 4 deletions hail/src/main/scala/is/hail/backend/ExecuteContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import scala.collection.mutable
import java.io._
import java.security.SecureRandom

import sourcecode.Enclosing

trait TempFileManager extends AutoCloseable {
def newTmpPath(tmpdir: String, prefix: String, extension: String = null): String
}
Expand Down Expand Up @@ -150,7 +148,7 @@ class ExecuteContext(

def scopedExecution[T](
f: (HailClassLoader, FS, HailTaskContext, Region) => T
)(implicit E: Enclosing
)(implicit E: SourcePos
): T =
using(new LocalTaskContext(0, 0)) { tc =>
time {
Expand All @@ -176,7 +174,7 @@ class ExecuteContext(
taskContext.close()
}

def time[A](block: => A)(implicit E: Enclosing): A =
def time[A](block: => A)(implicit E: SourcePos): A =
timer.time(E.value)(block)

def local[A](
Expand Down
3 changes: 1 addition & 2 deletions hail/src/main/scala/is/hail/backend/api/Py4JBackendApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import org.apache.spark.sql.DataFrame
import org.json4s
import org.json4s._
import org.json4s.jackson.{JsonMethods, Serialization}
import sourcecode.Enclosing

final class Py4JBackendApi(backend: Backend) extends Closeable with ErrorHandling {

Expand Down Expand Up @@ -298,7 +297,7 @@ final class Py4JBackendApi(backend: Backend) extends Closeable with ErrorHandlin
selfContainedExecution: Boolean = true
)(
f: ExecuteContext => T
)(implicit E: Enclosing
)(implicit E: SourcePos
): (T, Timings) =
ExecutionTimer.time { timer =>
ExecuteContext.scoped(
Expand Down
5 changes: 2 additions & 3 deletions hail/src/main/scala/is/hail/backend/spark/SparkBackend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import sourcecode.Enclosing

class SparkBroadcastValue[T](bc: Broadcast[T]) extends BroadcastValue[T] with Serializable {
def value: T = bc.value
Expand Down Expand Up @@ -71,7 +70,7 @@ object SparkBackend {

private var theSparkBackend: SparkBackend = _

def sparkContext(implicit E: Enclosing): SparkContext = HailContext.sparkBackend.sc
def sparkContext(implicit E: SourcePos): SparkContext = HailContext.sparkBackend.sc

def checkSparkCompatibility(jarVersion: String, sparkVersion: String): Unit = {
def majorMinor(version: String): String = version.split("\\.", 3).take(2).mkString(".")
Expand Down Expand Up @@ -352,7 +351,7 @@ class SparkBackend(val sc: SparkContext) extends Backend {

def defaultParallelism: Int = sc.defaultParallelism

override def asSpark(implicit E: Enclosing): SparkBackend = this
override def asSpark(implicit E: SourcePos): SparkBackend = this

def close(): Unit =
SparkBackend.stop()
Expand Down
6 changes: 2 additions & 4 deletions hail/src/main/scala/is/hail/expr/ir/Compile.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import is.hail.utils._

import java.io.PrintWriter

import sourcecode.Enclosing

case class CodeCacheKey(
aggSigs: IndexedSeq[AggStateSig],
args: Seq[(Name, EmitParamType)],
Expand Down Expand Up @@ -89,7 +87,7 @@ object compile {
optimize: Boolean,
print: Option[PrintWriter],
)(implicit
E: Enclosing,
E: SourcePos,
N: sourcecode.Name,
): (Option[SingleCodeType], (HailClassLoader, FS, HailTaskContext, Region) => F with Mixin) =
ctx.time {
Expand Down Expand Up @@ -131,7 +129,7 @@ object compile {
CompiledFunction(rt, fb.resultWithIndex(print))
},
).asInstanceOf[CompiledFunction[F with Mixin]].tuple
}
}(E)
}

object CompileIterator {
Expand Down
5 changes: 3 additions & 2 deletions hail/src/main/scala/is/hail/expr/ir/lowering/IRState.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import is.hail.expr.ir.{
BaseIR, IRTraversal, RelationalLet, RelationalRef, TableKeyBy, TableKeyByAndAggregate,
TableOrderBy,
}
import is.hail.utils.SourcePos

abstract class IRState(implicit E: sourcecode.Enclosing) {
abstract class IRState(implicit E: SourcePos) {
protected val rules: Array[Rule]

final def verify(ctx: ExecuteContext, ir: BaseIR): Unit =
Expand All @@ -19,7 +20,7 @@ abstract class IRState(implicit E: sourcecode.Enclosing) {
}
}

def +(other: IRState)(implicit E: sourcecode.Enclosing): IRState = {
def +(other: IRState)(implicit E: SourcePos): IRState = {
val newRules = rules ++ other.rules
new IRState()(E) {
val rules: Array[Rule] = newRules
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ final class IrMetadata() {
}
}

abstract class LoweringPass(implicit E: sourcecode.Enclosing) {
abstract class LoweringPass(implicit E: SourcePos) {
val before: IRState
val after: IRState
val context: String
Expand Down
5 changes: 2 additions & 3 deletions hail/src/main/scala/is/hail/utils/ExecutionTimer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,18 @@ import scala.collection.mutable

import org.json4s.{JArray, JString, JValue}
import org.json4s.JsonAST.JLong
import sourcecode.Enclosing

object ExecutionTimer {

def time[T](f: ExecutionTimer => T)(implicit E: Enclosing): (T, Timings) = {
def time[T](f: ExecutionTimer => T)(implicit E: SourcePos): (T, Timings) = {
val timer = new ExecutionTimer(E.value)
val result = f(timer)
timer.finish()
timer.logInfo()
(result, timer.result)
}

def logTime[T](f: ExecutionTimer => T)(implicit E: Enclosing): T = {
def logTime[T](f: ExecutionTimer => T)(implicit E: SourcePos): T = {
val (result, _) = time[T](f)
result
}
Expand Down
2 changes: 2 additions & 0 deletions hail/src/main/scala/is/hail/utils/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1075,6 +1075,8 @@ package object utils

implicit def evalLazy[A](f: Lazy[A]): A =
f()

type SourcePos = sourcecode.FullName
}

class CancellingExecutorService(delegate: ExecutorService) extends AbstractExecutorService {
Expand Down

0 comments on commit 100c354

Please sign in to comment.