diff --git a/pkg/kwok/controllers/controller.go b/pkg/kwok/controllers/controller.go index 1b39c798c..80bbce04e 100644 --- a/pkg/kwok/controllers/controller.go +++ b/pkg/kwok/controllers/controller.go @@ -95,6 +95,9 @@ type Controller struct { podOnNodeManageQueue queue.Queue[string] nodeManageQueue queue.Queue[string] + + podOnNodeManageQueueParallelism *queue.AdaptiveQueue[string] + nodeManageQueueParallelism *queue.AdaptiveQueue[string] } // Config is the configuration for the controller @@ -295,6 +298,7 @@ func (c *Controller) initNodeLeaseController(ctx context.Context) error { c.nodeLeases.ReleaseHold(nodeName) } + c.nodeManageQueueParallelism = queue.NewAdaptiveQueue(ctx, c.nodeManageQueue, c.nodeLeaseSyncWorker) go c.nodeLeaseSyncWorker(ctx) err = c.nodeLeases.Start(ctx) @@ -307,7 +311,7 @@ func (c *Controller) initNodeLeaseController(ctx context.Context) error { func (c *Controller) nodeLeaseSyncWorker(ctx context.Context) { logger := log.FromContext(ctx) for ctx.Err() == nil { - nodeName, ok := c.nodeManageQueue.GetOrWaitWithDone(ctx.Done()) + nodeName, ok := c.nodeManageQueueParallelism.GetOrWaitWithDone(ctx.Done()) if !ok { return } @@ -339,6 +343,7 @@ func (c *Controller) startStageController(ctx context.Context, ref internalversi return fmt.Errorf("failed to init pod controller: %w", err) } + c.podOnNodeManageQueueParallelism = queue.NewAdaptiveQueue(ctx, c.podOnNodeManageQueue, c.podsOnNodeSyncWorker) go c.podsOnNodeSyncWorker(ctx) case nodeRef: @@ -559,7 +564,7 @@ func (c *Controller) Start(ctx context.Context) error { func (c *Controller) podsOnNodeSyncWorker(ctx context.Context) { logger := log.FromContext(ctx) for ctx.Err() == nil { - nodeName, ok := c.podOnNodeManageQueue.GetOrWaitWithDone(ctx.Done()) + nodeName, ok := c.podOnNodeManageQueueParallelism.GetOrWaitWithDone(ctx.Done()) if !ok { return } diff --git a/pkg/kwok/controllers/node_controller.go b/pkg/kwok/controllers/node_controller.go index ce050621c..560f35ce5 100644 --- a/pkg/kwok/controllers/node_controller.go +++ b/pkg/kwok/controllers/node_controller.go @@ -60,6 +60,7 @@ type NodeController struct { lifecycle resources.Getter[lifecycle.Lifecycle] delayQueue queue.WeightDelayingQueue[resourceStageJob[*corev1.Node]] delayQueueMapping maps.SyncMap[string, resourceStageJob[*corev1.Node]] + delayQueueParallelism *queue.AdaptiveQueue[resourceStageJob[*corev1.Node]] backoff wait.Backoff recorder record.EventRecorder readOnlyFunc func(nodeName string) bool @@ -143,6 +144,7 @@ func NewNodeController(conf NodeControllerConfig) (*NodeController, error) { // if nodeSelectorFunc is not nil, it will use it to determine if the node should be managed func (c *NodeController) Start(ctx context.Context, events <-chan informer.Event[*corev1.Node]) error { go c.preprocessWorker(ctx) + c.delayQueueParallelism = queue.NewAdaptiveQueue(ctx, c.delayQueue, c.playStageWorker) for i := uint(0); i < c.playStageParallelism; i++ { go c.playStageWorker(ctx) } @@ -323,7 +325,7 @@ func (c *NodeController) playStageWorker(ctx context.Context) { logger := log.FromContext(ctx) for ctx.Err() == nil { - node, ok := c.delayQueue.GetOrWaitWithDone(ctx.Done()) + node, ok := c.delayQueueParallelism.GetOrWaitWithDone(ctx.Done()) if !ok { return } diff --git a/pkg/kwok/controllers/node_lease_controller.go b/pkg/kwok/controllers/node_lease_controller.go index dbf450753..fbca87986 100644 --- a/pkg/kwok/controllers/node_lease_controller.go +++ b/pkg/kwok/controllers/node_lease_controller.go @@ -49,8 +49,9 @@ type NodeLeaseController struct { // mutateLeaseFunc allows customizing a lease object mutateLeaseFunc func(*coordinationv1.Lease) error - delayQueue queue.WeightDelayingQueue[string] - holdLeaseSet maps.SyncMap[string, bool] + delayQueue queue.WeightDelayingQueue[string] + holdLeaseSet maps.SyncMap[string, bool] + delayQueueParallelism *queue.AdaptiveQueue[string] holderIdentity string onNodeManagedFunc func(nodeName string) @@ -99,6 +100,7 @@ func NewNodeLeaseController(conf NodeLeaseControllerConfig) (*NodeLeaseControlle // Start starts the NodeLeaseController func (c *NodeLeaseController) Start(ctx context.Context) error { + c.delayQueueParallelism = queue.NewAdaptiveQueue(ctx, c.delayQueue, c.syncWorker) for i := uint(0); i < c.leaseParallelism; i++ { go c.syncWorker(ctx) } @@ -108,7 +110,7 @@ func (c *NodeLeaseController) Start(ctx context.Context) error { func (c *NodeLeaseController) syncWorker(ctx context.Context) { logger := log.FromContext(ctx) for ctx.Err() == nil { - nodeName, ok := c.delayQueue.GetOrWaitWithDone(ctx.Done()) + nodeName, ok := c.delayQueueParallelism.GetOrWaitWithDone(ctx.Done()) if !ok { return } diff --git a/pkg/kwok/controllers/pod_controller.go b/pkg/kwok/controllers/pod_controller.go index 4c3588d7e..c53f4502f 100644 --- a/pkg/kwok/controllers/pod_controller.go +++ b/pkg/kwok/controllers/pod_controller.go @@ -64,6 +64,7 @@ type PodController struct { playStageParallelism uint lifecycle resources.Getter[lifecycle.Lifecycle] delayQueue queue.WeightDelayingQueue[resourceStageJob[*corev1.Pod]] + delayQueueParallelism *queue.AdaptiveQueue[resourceStageJob[*corev1.Pod]] backoff wait.Backoff delayQueueMapping maps.SyncMap[string, resourceStageJob[*corev1.Pod]] recorder record.EventRecorder @@ -148,6 +149,7 @@ func NewPodController(conf PodControllerConfig) (*PodController, error) { // It will modify the pods status to we want func (c *PodController) Start(ctx context.Context, events <-chan informer.Event[*corev1.Pod]) error { go c.preprocessWorker(ctx) + c.delayQueueParallelism = queue.NewAdaptiveQueue(ctx, c.delayQueue, c.playStageWorker) for i := uint(0); i < c.playStageParallelism; i++ { go c.playStageWorker(ctx) } @@ -258,7 +260,7 @@ func (c *PodController) playStageWorker(ctx context.Context) { logger := log.FromContext(ctx) for ctx.Err() == nil { - pod, ok := c.delayQueue.GetOrWaitWithDone(ctx.Done()) + pod, ok := c.delayQueueParallelism.GetOrWaitWithDone(ctx.Done()) if !ok { return } diff --git a/pkg/kwok/controllers/stage_controller.go b/pkg/kwok/controllers/stage_controller.go index a74847a2b..25c3bbabf 100644 --- a/pkg/kwok/controllers/stage_controller.go +++ b/pkg/kwok/controllers/stage_controller.go @@ -59,6 +59,7 @@ type StageController struct { playStageParallelism uint lifecycle resources.Getter[lifecycle.Lifecycle] delayQueue queue.WeightDelayingQueue[resourceStageJob[*unstructured.Unstructured]] + delayQueueParallelism *queue.AdaptiveQueue[resourceStageJob[*unstructured.Unstructured]] backoff wait.Backoff delayQueueMapping maps.SyncMap[string, resourceStageJob[*unstructured.Unstructured]] recorder record.EventRecorder @@ -123,6 +124,7 @@ func NewStageController(conf StageControllerConfig) (*StageController, error) { // It will modify the resources status to we want func (c *StageController) Start(ctx context.Context, events <-chan informer.Event[*unstructured.Unstructured]) error { go c.preprocessWorker(ctx) + c.delayQueueParallelism = queue.NewAdaptiveQueue(ctx, c.delayQueue, c.playStageWorker) for i := uint(0); i < c.playStageParallelism; i++ { go c.playStageWorker(ctx) } @@ -236,7 +238,7 @@ func (c *StageController) playStageWorker(ctx context.Context) { logger := log.FromContext(ctx) for ctx.Err() == nil { - resource, ok := c.delayQueue.GetOrWaitWithDone(ctx.Done()) + resource, ok := c.delayQueueParallelism.GetOrWaitWithDone(ctx.Done()) if !ok { return } diff --git a/pkg/utils/queue/parallelism.go b/pkg/utils/queue/parallelism.go new file mode 100644 index 000000000..435e3ef35 --- /dev/null +++ b/pkg/utils/queue/parallelism.go @@ -0,0 +1,61 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import ( + "context" + "sync" + "time" +) + +type AdaptiveQueue[T any] struct { + ctx context.Context + startFunc func(ctx context.Context) + latestStart time.Time + mut sync.Mutex + queue Queue[T] +} + +func NewAdaptiveQueue[T any](ctx context.Context, q Queue[T], startFunc func(ctx context.Context)) *AdaptiveQueue[T] { + return &AdaptiveQueue[T]{ + ctx: ctx, + startFunc: startFunc, + latestStart: time.Now(), + queue: q, + } +} + +func (p *AdaptiveQueue[T]) GetOrWaitWithDone(done <-chan struct{}) (T, bool) { + t, ok := p.queue.GetOrWaitWithDone(done) + if !ok { + return t, false + } + + length := p.queue.Len() + if length > 3 { + p.mut.Lock() + defer p.mut.Unlock() + now := time.Now() + sub := now.Sub(p.latestStart) + + if sub >= time.Second/10 { + go p.startFunc(p.ctx) + p.latestStart = now + } + } + return t, true +}