Skip to content

Commit

Permalink
Update scala versions
Browse files Browse the repository at this point in the history
  • Loading branch information
mschuwalow committed Oct 9, 2024
1 parent 469a93e commit f47c757
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 93 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
fail-fast: false
matrix:
java: ['17']
scala: ['2.12.19', '2.13.13', '3.3.3']
scala: ['2.12.20', '2.13.15', '3.4.1']
steps:
- name: Checkout current branch
uses: actions/checkout@v4.0.0
Expand All @@ -63,7 +63,7 @@ jobs:
fail-fast: false
matrix:
java: ['8', '11', '17']
scala: ['2.12.19', '2.13.13', '3.3.3']
scala: ['2.12.20', '2.13.15', '3.4.1']
steps:
- name: Checkout current branch
uses: actions/checkout@v4.0.0
Expand Down
186 changes: 95 additions & 91 deletions zio-profiling/src/main/scala/zio/profiling/causal/CausalProfiler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -279,113 +279,117 @@ final case class CausalProfiler(
}
}

runSamplingStep.repeat[Any, Long](Schedule.spaced(samplingPeriod)).race(resultPromise.await).fork.as {
new Supervisor[ProfilingResult] {

// first event in fiber lifecycle. No previous events to consider.
override def onStart[R, E, A](
environment: ZEnvironment[R],
effect: ZIO[R, E, A],
parent: Option[Fiber.Runtime[Any, Any]],
fiber: Fiber.Runtime[E, A]
)(implicit unsafe: zio.Unsafe): Unit = {
val parentDelay =
parent.flatMap(f => Option(fibers.get(f.id)).map(_.localDelay.get())).getOrElse(globalDelay)

fibers.put(fiber.id.id, FiberState.makeFor(fiber, parentDelay))
()
}

// previous events could be {onStart, onResume, onEffect}
override def onEnd[R, E, A](
value: Exit[E, A],
fiber: Fiber.Runtime[E, A]
)(implicit unsafe: zio.Unsafe): Unit = {
val id = fiber.id.id
val state = fibers.get(id)
if (state ne null) {
state.running = false
// no need to refresh tag as we are shutting down
if (!value.isInterrupted) {
delayFiber(state)
}
fibers.remove(id)
runSamplingStep
.repeat[Any, Long](Schedule.spaced(samplingPeriod))
.race[Any, Throwable, Any](resultPromise.await)
.fork
.as {
new Supervisor[ProfilingResult] {

// first event in fiber lifecycle. No previous events to consider.
override def onStart[R, E, A](
environment: ZEnvironment[R],
effect: ZIO[R, E, A],
parent: Option[Fiber.Runtime[Any, Any]],
fiber: Fiber.Runtime[E, A]
)(implicit unsafe: zio.Unsafe): Unit = {
val parentDelay =
parent.flatMap(f => Option(fibers.get(f.id)).map(_.localDelay.get())).getOrElse(globalDelay)

fibers.put(fiber.id.id, FiberState.makeFor(fiber, parentDelay))
()
}
}

// previous events could be {onStart, onResume, onEffect}
override def onEffect[E, A](fiber: Fiber.Runtime[E, A], effect: ZIO[_, _, _])(implicit
unsafe: zio.Unsafe
): Unit = {
val id = fiber.id.id
var state = fibers.get(id)
var freshState = false

if (state == null) {
state = FiberState.makeFor(fiber, globalDelay)
fibers.put(id, state)
freshState = true
} else if (state.lastEffectWasStateful) {
state.refreshCostCenter(fiber)
state.lastEffectWasStateful = false
}

effect match {
case ZIO.Stateful(_, _) =>
state.lastEffectWasStateful = true
case ZIO.Sync(_, _) =>
if (!freshState) {
state.running = false
// previous events could be {onStart, onResume, onEffect}
override def onEnd[R, E, A](
value: Exit[E, A],
fiber: Fiber.Runtime[E, A]
)(implicit unsafe: zio.Unsafe): Unit = {
val id = fiber.id.id
val state = fibers.get(id)
if (state ne null) {
state.running = false
// no need to refresh tag as we are shutting down
if (!value.isInterrupted) {
delayFiber(state)
state.running = true
}
case ZIO.Async(_, _, _) =>
state.preAsyncGlobalDelay = globalDelay
state.inAsync = true
case _ =>
fibers.remove(id)
()
}
}
}

// previous events could be {onStart, onResume, onEffect}
override def onSuspend[E, A](fiber: Fiber.Runtime[E, A])(implicit unsafe: Unsafe): Unit = {
val id = fiber.id.id
val state = fibers.get(id)
if (state ne null) {
if (state.lastEffectWasStateful) {
state.lastEffectWasStateful = false
// previous events could be {onStart, onResume, onEffect}
override def onEffect[E, A](fiber: Fiber.Runtime[E, A], effect: ZIO[_, _, _])(implicit
unsafe: zio.Unsafe
): Unit = {
val id = fiber.id.id
var state = fibers.get(id)
var freshState = false

if (state == null) {
state = FiberState.makeFor(fiber, globalDelay)
fibers.put(id, state)
freshState = true
} else if (state.lastEffectWasStateful) {
state.refreshCostCenter(fiber)
state.lastEffectWasStateful = false
}

effect match {
case ZIO.Stateful(_, _) =>
state.lastEffectWasStateful = true
case ZIO.Sync(_, _) =>
if (!freshState) {
state.running = false
delayFiber(state)
state.running = true
}
case ZIO.Async(_, _, _) =>
state.preAsyncGlobalDelay = globalDelay
state.inAsync = true
case _ =>
()
}
state.running = false
} else {
val newState = FiberState.makeFor(fiber, globalDelay)
newState.running = false
fibers.put(id, newState)
()
}
}

// previous event can only be onSuspend
override def onResume[E, A](fiber: Fiber.Runtime[E, A])(implicit unsafe: Unsafe): Unit = {
val id = fiber.id.id
val state = fibers.get(id)
if (state ne null) {
state.running = true
if (state.inAsync) {
state.localDelay.addAndGet(globalDelay - state.preAsyncGlobalDelay)
state.preAsyncGlobalDelay = 0
state.inAsync = false
// previous events could be {onStart, onResume, onEffect}
override def onSuspend[E, A](fiber: Fiber.Runtime[E, A])(implicit unsafe: Unsafe): Unit = {
val id = fiber.id.id
val state = fibers.get(id)
if (state ne null) {
if (state.lastEffectWasStateful) {
state.lastEffectWasStateful = false
state.refreshCostCenter(fiber)
}
state.running = false
} else {
val newState = FiberState.makeFor(fiber, globalDelay)
newState.running = false
fibers.put(id, newState)
()
}
}

// previous event can only be onSuspend
override def onResume[E, A](fiber: Fiber.Runtime[E, A])(implicit unsafe: Unsafe): Unit = {
val id = fiber.id.id
val state = fibers.get(id)
if (state ne null) {
state.running = true
if (state.inAsync) {
state.localDelay.addAndGet(globalDelay - state.preAsyncGlobalDelay)
state.preAsyncGlobalDelay = 0
state.inAsync = false
}
} else {
fibers.put(id, FiberState.makeFor(fiber, globalDelay))
()
}
} else {
fibers.put(id, FiberState.makeFor(fiber, globalDelay))
()
}
}

def value(implicit trace: Trace): UIO[ProfilingResult] = resultPromise.await
def value(implicit trace: Trace): UIO[ProfilingResult] = resultPromise.await
}
}
}
}
}
}
Expand Down

0 comments on commit f47c757

Please sign in to comment.