Skip to content

Commit

Permalink
Debug
Browse files Browse the repository at this point in the history
  • Loading branch information
wzshiming committed Nov 27, 2024
1 parent 53c57f2 commit 21d2378
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 8 deletions.
9 changes: 7 additions & 2 deletions pkg/kwok/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ type Controller struct {

podOnNodeManageQueue queue.Queue[string]
nodeManageQueue queue.Queue[string]

podOnNodeManageQueueParallelism *queue.AdaptiveQueue[string]
nodeManageQueueParallelism *queue.AdaptiveQueue[string]
}

// Config is the configuration for the controller
Expand Down Expand Up @@ -295,6 +298,7 @@ func (c *Controller) initNodeLeaseController(ctx context.Context) error {
c.nodeLeases.ReleaseHold(nodeName)
}

c.nodeManageQueueParallelism = queue.NewAdaptiveQueue(ctx, c.nodeManageQueue, c.nodeLeaseSyncWorker)
go c.nodeLeaseSyncWorker(ctx)

err = c.nodeLeases.Start(ctx)
Expand All @@ -307,7 +311,7 @@ func (c *Controller) initNodeLeaseController(ctx context.Context) error {
func (c *Controller) nodeLeaseSyncWorker(ctx context.Context) {
logger := log.FromContext(ctx)
for ctx.Err() == nil {
nodeName, ok := c.nodeManageQueue.GetOrWaitWithDone(ctx.Done())
nodeName, ok := c.nodeManageQueueParallelism.GetOrWaitWithDone(ctx.Done())
if !ok {
return
}
Expand Down Expand Up @@ -339,6 +343,7 @@ func (c *Controller) startStageController(ctx context.Context, ref internalversi
return fmt.Errorf("failed to init pod controller: %w", err)
}

c.podOnNodeManageQueueParallelism = queue.NewAdaptiveQueue(ctx, c.podOnNodeManageQueue, c.podsOnNodeSyncWorker)
go c.podsOnNodeSyncWorker(ctx)

case nodeRef:
Expand Down Expand Up @@ -559,7 +564,7 @@ func (c *Controller) Start(ctx context.Context) error {
func (c *Controller) podsOnNodeSyncWorker(ctx context.Context) {
logger := log.FromContext(ctx)
for ctx.Err() == nil {
nodeName, ok := c.podOnNodeManageQueue.GetOrWaitWithDone(ctx.Done())
nodeName, ok := c.podOnNodeManageQueueParallelism.GetOrWaitWithDone(ctx.Done())
if !ok {
return
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/kwok/controllers/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type NodeController struct {
lifecycle resources.Getter[lifecycle.Lifecycle]
delayQueue queue.WeightDelayingQueue[resourceStageJob[*corev1.Node]]
delayQueueMapping maps.SyncMap[string, resourceStageJob[*corev1.Node]]
delayQueueParallelism *queue.AdaptiveQueue[resourceStageJob[*corev1.Node]]
backoff wait.Backoff
recorder record.EventRecorder
readOnlyFunc func(nodeName string) bool
Expand Down Expand Up @@ -143,6 +144,7 @@ func NewNodeController(conf NodeControllerConfig) (*NodeController, error) {
// if nodeSelectorFunc is not nil, it will use it to determine if the node should be managed
func (c *NodeController) Start(ctx context.Context, events <-chan informer.Event[*corev1.Node]) error {
go c.preprocessWorker(ctx)
c.delayQueueParallelism = queue.NewAdaptiveQueue(ctx, c.delayQueue, c.playStageWorker)
for i := uint(0); i < c.playStageParallelism; i++ {
go c.playStageWorker(ctx)
}
Expand Down Expand Up @@ -323,7 +325,7 @@ func (c *NodeController) playStageWorker(ctx context.Context) {
logger := log.FromContext(ctx)

for ctx.Err() == nil {
node, ok := c.delayQueue.GetOrWaitWithDone(ctx.Done())
node, ok := c.delayQueueParallelism.GetOrWaitWithDone(ctx.Done())
if !ok {
return
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/kwok/controllers/node_lease_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ type NodeLeaseController struct {
// mutateLeaseFunc allows customizing a lease object
mutateLeaseFunc func(*coordinationv1.Lease) error

delayQueue queue.WeightDelayingQueue[string]
holdLeaseSet maps.SyncMap[string, bool]
delayQueue queue.WeightDelayingQueue[string]
holdLeaseSet maps.SyncMap[string, bool]
delayQueueParallelism *queue.AdaptiveQueue[string]

holderIdentity string
onNodeManagedFunc func(nodeName string)
Expand Down Expand Up @@ -99,6 +100,7 @@ func NewNodeLeaseController(conf NodeLeaseControllerConfig) (*NodeLeaseControlle

// Start starts the NodeLeaseController
func (c *NodeLeaseController) Start(ctx context.Context) error {
c.delayQueueParallelism = queue.NewAdaptiveQueue(ctx, c.delayQueue, c.syncWorker)
for i := uint(0); i < c.leaseParallelism; i++ {
go c.syncWorker(ctx)
}
Expand All @@ -108,7 +110,7 @@ func (c *NodeLeaseController) Start(ctx context.Context) error {
func (c *NodeLeaseController) syncWorker(ctx context.Context) {
logger := log.FromContext(ctx)
for ctx.Err() == nil {
nodeName, ok := c.delayQueue.GetOrWaitWithDone(ctx.Done())
nodeName, ok := c.delayQueueParallelism.GetOrWaitWithDone(ctx.Done())
if !ok {
return
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/kwok/controllers/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type PodController struct {
playStageParallelism uint
lifecycle resources.Getter[lifecycle.Lifecycle]
delayQueue queue.WeightDelayingQueue[resourceStageJob[*corev1.Pod]]
delayQueueParallelism *queue.AdaptiveQueue[resourceStageJob[*corev1.Pod]]
backoff wait.Backoff
delayQueueMapping maps.SyncMap[string, resourceStageJob[*corev1.Pod]]
recorder record.EventRecorder
Expand Down Expand Up @@ -148,6 +149,7 @@ func NewPodController(conf PodControllerConfig) (*PodController, error) {
// It will modify the pods status to we want
func (c *PodController) Start(ctx context.Context, events <-chan informer.Event[*corev1.Pod]) error {
go c.preprocessWorker(ctx)
c.delayQueueParallelism = queue.NewAdaptiveQueue(ctx, c.delayQueue, c.playStageWorker)
for i := uint(0); i < c.playStageParallelism; i++ {
go c.playStageWorker(ctx)
}
Expand Down Expand Up @@ -258,7 +260,7 @@ func (c *PodController) playStageWorker(ctx context.Context) {
logger := log.FromContext(ctx)

for ctx.Err() == nil {
pod, ok := c.delayQueue.GetOrWaitWithDone(ctx.Done())
pod, ok := c.delayQueueParallelism.GetOrWaitWithDone(ctx.Done())
if !ok {
return
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/kwok/controllers/stage_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type StageController struct {
playStageParallelism uint
lifecycle resources.Getter[lifecycle.Lifecycle]
delayQueue queue.WeightDelayingQueue[resourceStageJob[*unstructured.Unstructured]]
delayQueueParallelism *queue.AdaptiveQueue[resourceStageJob[*unstructured.Unstructured]]
backoff wait.Backoff
delayQueueMapping maps.SyncMap[string, resourceStageJob[*unstructured.Unstructured]]
recorder record.EventRecorder
Expand Down Expand Up @@ -123,6 +124,7 @@ func NewStageController(conf StageControllerConfig) (*StageController, error) {
// It will modify the resources status to we want
func (c *StageController) Start(ctx context.Context, events <-chan informer.Event[*unstructured.Unstructured]) error {
go c.preprocessWorker(ctx)
c.delayQueueParallelism = queue.NewAdaptiveQueue(ctx, c.delayQueue, c.playStageWorker)
for i := uint(0); i < c.playStageParallelism; i++ {
go c.playStageWorker(ctx)
}
Expand Down Expand Up @@ -236,7 +238,7 @@ func (c *StageController) playStageWorker(ctx context.Context) {
logger := log.FromContext(ctx)

for ctx.Err() == nil {
resource, ok := c.delayQueue.GetOrWaitWithDone(ctx.Done())
resource, ok := c.delayQueueParallelism.GetOrWaitWithDone(ctx.Done())
if !ok {
return
}
Expand Down
61 changes: 61 additions & 0 deletions pkg/utils/queue/parallelism.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package queue

import (
"context"
"sync"
"time"
)

type AdaptiveQueue[T any] struct {
ctx context.Context
startFunc func(ctx context.Context)
latestStart time.Time
mut sync.Mutex
queue Queue[T]
}

func NewAdaptiveQueue[T any](ctx context.Context, q Queue[T], startFunc func(ctx context.Context)) *AdaptiveQueue[T] {
return &AdaptiveQueue[T]{
ctx: ctx,
startFunc: startFunc,
latestStart: time.Now(),
queue: q,
}
}

func (p *AdaptiveQueue[T]) GetOrWaitWithDone(done <-chan struct{}) (T, bool) {
t, ok := p.queue.GetOrWaitWithDone(done)
if !ok {
return t, false
}

length := p.queue.Len()
if length > 3 {
p.mut.Lock()
defer p.mut.Unlock()
now := time.Now()
sub := now.Sub(p.latestStart)

if sub >= time.Second/10 {
go p.startFunc(p.ctx)
p.latestStart = now
}
}
return t, true
}

0 comments on commit 21d2378

Please sign in to comment.