Skip to content

Commit

Permalink
got setinterval and settimeout working properly
Browse files Browse the repository at this point in the history
  • Loading branch information
npayne committed Mar 19, 2023
1 parent 7859cc9 commit 5fcdfe1
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 80 deletions.
38 changes: 16 additions & 22 deletions v4/event_bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,32 +10,26 @@ import (
"go.uber.org/atomic"
)

type SubscriberList struct {
// subscriberLists is a list of lists of EventChannels
// where the outer list is indexed by the EventType (type aliased
// to int). So you could have a list of queries on CollisionEvents, etc.
// Each EventFilter's Predicate will be tested against the events
// that are published for the matching type (and thus the predicates
// can safely assert the type of the Data member of the event)
channels map[string][]*EventChannel
}

type Event struct {
Type string
Data interface{}
}

type EventBus struct {
name string
subscriberList SubscriberList
name string
// channels is a map of lists of EventChannels
// Each EventFilter's Predicate will be tested against the events
// that are published for the matching type (and thus the predicates
// can safely assert the type of the Data member of the event)
channels map[string][]*EventChannel
// number of goroutines spawned to publish events to subscriber channels
// that are full
nHanging atomic.Int32
}

func NewEventBus(name string) *EventBus {
b := &EventBus{name: name}
b.subscriberList.channels = make(map[string][]*EventChannel)
b.channels = make(map[string][]*EventChannel)
return b
}

Expand All @@ -49,19 +43,19 @@ func (b *EventBus) Subscribe(q *EventFilter) *EventChannel {
// Create a channel to return to the user
c := NewEventChannel(q)
// Add the channel to the subscriber list for its type
b.subscriberList.channels[q.eventType] = append(
b.subscriberList.channels[q.eventType], c)
b.channels[q.eventType] = append(
b.channels[q.eventType], c)
// return the channel to the caller
return c
}

// Remove a subscriber
func (b *EventBus) Unsubscribe(c *EventChannel) {
eventType := c.filter.eventType
channels, ok := b.subscriberList.channels[eventType]
channels, ok := b.channels[eventType]
if ok {
channels = removeEventChannelFromSlice(channels, c)
b.subscriberList.channels[eventType] = channels
b.channels[eventType] = channels
}
}

Expand All @@ -76,11 +70,11 @@ func (b *EventBus) notifySubscribers(e Event) {
}

var notifyExtraFull = func() {
logWarning("/!\\ /!\\ /!\\ number of goroutines waiting for event channel (of event type %s) to go below max capacity is now greater than capacity (%d); you're sending too many events", e.Type, EVENT_SUBSCRIBER_CHANNEL_CAPACITY)
logWarning("/!\\ /!\\ /!\\ number of goroutines waiting for an event channel (of event type %s) to go below max capacity is now greater than capacity (%d); you're sending too many events", e.Type, EVENT_SUBSCRIBER_CHANNEL_CAPACITY)
}

logEvents("len(b.subscriberList.channels[e.Type])=%d", len(b.subscriberList.channels[e.Type]))
for _, c := range b.subscriberList.channels[e.Type] {
logEvents("len(b.channels[e.Type])=%d", len(b.channels[e.Type]))
for _, c := range b.channels[e.Type] {
logEvents("| Channel: %p", c)
if !c.IsActive() {
continue
Expand All @@ -99,12 +93,12 @@ func (b *EventBus) notifySubscribers(e Event) {
if b.nHanging.Load() > EVENT_SUBSCRIBER_CHANNEL_CAPACITY {
notifyExtraFull()
}
logEvents("---- event channel put <-")
logEvents("---- event channel put <- %s.%v", e.Type, e.Data)
c.C <- e
b.nHanging.Add(-1)
}()
} else {
logEvents("---- event channel put <-")
logEvents("---- event channel put <- %s.%v", e.Type, e.Data)
c.C <- e
logEvents("---- len(<%p>.C) = %d", c, len(c.C))
}
Expand Down
36 changes: 3 additions & 33 deletions v4/runtime_limit_sharer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ type RuntimeLimitSharer struct {
RunnerMap map[string]*RuntimeLimiter
runnerNames map[*RuntimeLimiter]string
addRemoveChannel chan AddRemoveLogicEvent

// used to keep track of expected worst case loop overhead
innerLoopOverhead_ms float64
}

func NewRuntimeLimitSharer() *RuntimeLimitSharer {
Expand Down Expand Up @@ -54,22 +51,13 @@ func (r *RuntimeLimitSharer) Share(allowance_ms float64) (overunder_ms float64,
starvedMode := false
var lastStarvation float64
logRuntimeLimiter("\n====================\nshare loop\n====================\n")
overheadBail := false
for remaining_ms >= 0 && loop < MAX_LOOPS && !overheadBail {
for remaining_ms >= 0 && loop < MAX_LOOPS {
toShare_ms := remaining_ms
logRuntimeLimiter("\n===\nloop = %d, total share = %f ms\n===\n", loop, toShare_ms)
totalStarvation := 0.0
considered := 0
worstOverheadThisTime := 0.0
var ran int
for ran = 0; remaining_ms >= 0 && considered < len(r.runners); {
if remaining_ms < r.innerLoopOverhead_ms {
logRuntimeLimiter("XXX SHARE() OVERHEAD BAIL XXX")
overheadBail = true
break
}
tLoop := time.Now()
var used float64
considered++
runner := r.runners[r.runIX]
var runnerAllowance float64
Expand All @@ -80,30 +68,22 @@ func (r *RuntimeLimitSharer) Share(allowance_ms float64) (overunder_ms float64,
runnerAllowance = toShare_ms * (runner.starvation / lastStarvation)
}
logRuntimeLimiter("%s.starvation = %f", r.runnerNames[runner], runner.starvation)
runnerOverheadLooksGood := remaining_ms > 3*runner.loopOverhead_ms
logRuntimeLimiter("Run()? starvedMode: %t, runnerOverheadLooksGood: %t, starvedMode: %t, runner.starvation: %f", starvedMode, runnerOverheadLooksGood, starvedMode, runner.starvation)
if !starvedMode || (runnerOverheadLooksGood && starvedMode && runner.starvation != 0) {
logRuntimeLimiter("Run()? starvedMode: %t, starvedMode: %t, runner.starvation: %f", starvedMode, starvedMode, runner.starvation)
if !starvedMode || (starvedMode && runner.starvation != 0) {
logRuntimeLimiter(color.InWhiteOverBlue(fmt.Sprintf("|||||| sharing %f ms to %s", runnerAllowance, r.runnerNames[runner])))
// loop > 0 is the parameter of Run(), bonsuTime (AKA bonusTime)
t0 := time.Now()
runner.Run(runnerAllowance, loop > 0)
totalStarvation += runner.starvation
if runner.starvation != 0 {
logRuntimeLimiter(color.InYellow(fmt.Sprintf("%s.starvation = %f", r.runnerNames[runner], runner.starvation)))
}
used = float64(time.Since(t0).Nanoseconds()) / 1e6
remaining_ms = allowance_ms - float64(time.Since(tStart).Nanoseconds())/1e6
logRuntimeLimiter(color.InWhiteOverBlue(fmt.Sprintf("[remaining_ms: %f]", remaining_ms)))
ran++
}

overhead := float64(time.Since(tLoop).Nanoseconds())/1e6 - used
if overhead > worstOverheadThisTime {
worstOverheadThisTime = overhead
}
r.runIX = (r.runIX + 1) % len(r.runners)
}
r.updateOverhead(worstOverheadThisTime)
if ran == 0 {
break
} else {
Expand Down Expand Up @@ -133,16 +113,6 @@ func (r *RuntimeLimitSharer) Share(allowance_ms float64) (overunder_ms float64,
return remaining_ms, starved
}

func (r *RuntimeLimitSharer) updateOverhead(worstThisTime float64) {
if worstThisTime > r.innerLoopOverhead_ms {
r.innerLoopOverhead_ms = worstThisTime
} else {
// else decay toward better worst overhead
r.innerLoopOverhead_ms = 0.9*r.innerLoopOverhead_ms + 0.1*worstThisTime
}

}

func (r *RuntimeLimitSharer) DumpStats() map[string](map[string]float64) {
stats := make(map[string](map[string]float64))
stats["totals"] = make(map[string]float64)
Expand Down
52 changes: 28 additions & 24 deletions v4/runtime_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type RuntimeLimiter struct {
logicUnits []*LogicUnit
// used to retrieve units by string
logicUnitsMap map[string]*LogicUnit
// track which logics have been removed even during a Run()
removed map[*LogicUnit]bool
// used to queue add/remove events so we don't change the slice while iterating
addRemoveChannel chan AddRemoveLogicEvent
// logicUnits sorted by hotness ascending, which is an int incremented every time
Expand All @@ -49,6 +51,8 @@ type RuntimeLimiter struct {
// used to provide an accurate dt_ms to each logic unit so it can integrate
// time smooth and proper
lastRun map[*LogicUnit]time.Time
// used to keep track of whether the schedule period has elapsed for each logic
lastScheduleTick map[*LogicUnit]time.Time
// we run a logic unit with a gap of at least x ms where it takes x ms
// to run. so a function taking 4ms will have at least 4 ms after it finishes
// til the next time it runs, so we need to keep track of when logicunits end.
Expand Down Expand Up @@ -89,10 +93,12 @@ func NewRuntimeLimiter() *RuntimeLimiter {
return &RuntimeLimiter{
logicUnits: make([]*LogicUnit, 0),
logicUnitsMap: make(map[string]*LogicUnit),
removed: make(map[*LogicUnit]bool),
addRemoveChannel: make(chan (AddRemoveLogicEvent), ADD_REMOVE_LOGIC_CHANNEL_CAPACITY),
ascendingHotness: make([]*LogicUnit, 0),
runtimeEstimates: make(map[*LogicUnit]float64),
lastRun: make(map[*LogicUnit]time.Time),
lastScheduleTick: make(map[*LogicUnit]time.Time),
lastEnd: make(map[*LogicUnit]time.Time),
indexes: make(map[*LogicUnit]int),
}
Expand Down Expand Up @@ -126,7 +132,8 @@ func (r *RuntimeLimiter) Run(allowance_ms float64, bonsuTime bool) (remaining_ms
mode := RoundRobin
worstOverheadThisTime := 0.0
logRuntimeLimiter("Run(); allowance: %f ms", allowance_ms)
for remaining_ms > 0 {
iterated := 0
for remaining_ms > 0 && len(r.logicUnits) > 0 {
if remaining_ms < 3*r.loopOverhead_ms {
logRuntimeLimiter("XXX RUN() OVERHEAD BAIL XXX")
break
Expand Down Expand Up @@ -155,12 +162,12 @@ func (r *RuntimeLimiter) Run(allowance_ms float64, bonsuTime bool) (remaining_ms
case Opportunistic:
logic = r.ascendingHotness[r.oppIx]
}

iterated++
logRuntimeLimiter(color.InWhiteOverBlack(logic.name))
_, removed := r.removed[logic]
logRuntimeLimiter("active: %t, removed: %t", logic.active, removed)
var func_ms float64
if logic.active {

logRuntimeLimiter("--- %s", logic.name)

if logic.active && !removed {
// check whether this logic has ever run
_, hasRunBefore := r.lastRun[logic]
// check its estimate
Expand Down Expand Up @@ -224,28 +231,22 @@ func (r *RuntimeLimiter) Run(allowance_ms float64, bonsuTime bool) (remaining_ms
// every 1ms)
durationHasElapsed := r.tick(logic)

// obviously the logic must be active
isActive := logic.active

// get real time since last run
var dt_ms float64
if hasRunBefore {
dt_ms = float64(time.Since(r.lastRun[logic]).Nanoseconds()) / 1.0e6
} else {
dt_ms = 0
}
dt_ms := float64(time.Since(r.lastRun[logic]).Nanoseconds()) / 1e6

// finally, if it has a runschedule defined, we should also tick that
// amount of time
scheduled := logic.runSchedule == nil || logic.runSchedule.Tick(dt_ms)
// tick schedule
schedule_tick_ms := float64(time.Since(r.lastScheduleTick[logic]).Nanoseconds()) / 1e6
r.lastScheduleTick[logic] = time.Now()
hasSchedule := logic.runSchedule != nil
scheduled := hasSchedule && logic.runSchedule.Tick(schedule_tick_ms)

if DEBUG_RUNTIME_LIMITER {
logRuntimeLimiter("hasRunBefore: %t", hasRunBefore)
logRuntimeLimiter("isActive: %t", isActive)
logRuntimeLimiter("durationHasElapsed: %t", durationHasElapsed)
logRuntimeLimiter("hasSchedule: %t", hasSchedule)
logRuntimeLimiter("scheduled: %t", scheduled)
}
if !hasRunBefore ||
if (!hasRunBefore && !hasSchedule) ||
(durationHasElapsed && scheduled && !oppSkip) {

t0 := time.Now()
Expand Down Expand Up @@ -315,11 +316,12 @@ func (r *RuntimeLimiter) Run(allowance_ms float64, bonsuTime bool) (remaining_ms
r.overrun = true
}
// calculate starved
if r.ranRobin == 0 {
// TODO: we use iterated, but maybe ranrobin/ranopp can be used?
if iterated == 0 {
r.starvation = 1.0
} else if r.ranRobin > 0 && r.ranRobin <= len(r.logicUnits) {
r.starvation = float64(len(r.logicUnits)-r.ranRobin) / float64(len(r.logicUnits))
} else if r.ranRobin > len(r.logicUnits) {
} else if iterated > 0 && iterated <= len(r.logicUnits) {
r.starvation = float64(len(r.logicUnits)-iterated) / float64(len(r.logicUnits))
} else if iterated > len(r.logicUnits) {
r.starvation = 0.0
}
return overunder_ms
Expand Down Expand Up @@ -382,6 +384,7 @@ func (r *RuntimeLimiter) addLogicImmediately(l *LogicUnit) {
}
r.logicUnits = append(r.logicUnits, l)
r.logicUnitsMap[l.name] = l
r.lastScheduleTick[l] = time.Now()
r.indexes[l] = len(r.logicUnits) - 1
r.insertAscendingHotness(l)
}
Expand All @@ -397,6 +400,7 @@ func (r *RuntimeLimiter) removeLogicImmediately(l *LogicUnit) {
return
}
delete(r.logicUnitsMap, l.name)
delete(r.removed, l)
delete(r.runtimeEstimates, l)
delete(r.lastRun, l)
delete(r.lastEnd, l)
Expand Down
20 changes: 19 additions & 1 deletion v4/world_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,5 +395,23 @@ func TestWorldSetTimeout(t *testing.T) {
w.Update(FRAME_MS)
time.Sleep(FRAME_DURATION)
}
Logger.Printf("x after 516 ms approx: %d", x)
if x != 1 {
t.Fatalf("Should've run settimeout func 1 time, ran %d times", x)
}
}

func TestWorldSetInterval(t *testing.T) {
w := testingWorld()
x := 0
w.SetInterval(func() {
Logger.Println("run")
x++
}, 100)
for i := 0; i < 516/FRAME_MS; i++ {
w.Update(FRAME_MS)
time.Sleep(FRAME_DURATION)
}
if x != 5 {
t.Fatalf("Should've run setinterval func 5 times, ran %d times", x)
}
}

0 comments on commit 5fcdfe1

Please sign in to comment.