diff --git a/pkg/controllers/automigration/controller.go b/pkg/controllers/automigration/controller.go index 146ae4e1..cdcf4053 100644 --- a/pkg/controllers/automigration/controller.go +++ b/pkg/controllers/automigration/controller.go @@ -288,7 +288,7 @@ func (c *Controller) reconcile(ctx context.Context, qualifiedName common.Qualifi // auto-migration controller sets AutoMigrationAnnotation to // feedback auto-migration information back to the scheduler - var estimatedCapacity map[string]int64 + var estimatedCapacity, scheduledReplicas map[string]int64 var result *worker.Result needsUpdate := false if unschedulableThreshold == nil { @@ -302,8 +302,8 @@ func (c *Controller) reconcile(ctx context.Context, qualifiedName common.Qualifi } else { // Keep the annotation up-to-date if auto migration is enabled. keyedLogger.V(3).Info("Auto migration is enabled") - estimatedCapacity, result = c.estimateCapacity(ctx, ftc, clusterObjs, *unschedulableThreshold) - autoMigrationInfo := &framework.AutoMigrationInfo{EstimatedCapacity: estimatedCapacity} + estimatedCapacity, scheduledReplicas, result = c.estimateCapacity(ctx, ftc, clusterObjs, *unschedulableThreshold) + autoMigrationInfo := &framework.AutoMigrationInfo{EstimatedCapacity: estimatedCapacity, ScheduledReplicas: scheduledReplicas} // Compare with the existing autoMigration annotation existingAutoMigrationInfo := &framework.AutoMigrationInfo{EstimatedCapacity: nil} @@ -366,11 +366,12 @@ func (c *Controller) estimateCapacity( typeConfig *fedcorev1a1.FederatedTypeConfig, clusterObjs []FederatedObject, unschedulableThreshold time.Duration, -) (map[string]int64, *worker.Result) { +) (map[string]int64, map[string]int64, *worker.Result) { needsBackoff := false var retryAfter *time.Duration estimatedCapacity := make(map[string]int64, len(clusterObjs)) + scheduledReplicas := make(map[string]int64, len(clusterObjs)) for _, clusterObj := range clusterObjs { ctx, logger := logging.InjectLoggerValues(ctx, "cluster", clusterObj.ClusterName, "ftc", typeConfig.Name) @@ -378,7 +379,9 @@ func (c *Controller) estimateCapacity( // This is an optimization to skip pod listing when there are no unschedulable pods. totalReplicas, readyReplicas, err := c.getTotalAndReadyReplicas(typeConfig, clusterObj.Object) if err == nil && totalReplicas == readyReplicas { - logger.V(3).Info("No unschedulable pods found, skip estimating capacity") + logger.V(3).Info("No unschedulable pods found, skip estimating capacity", + "readyReplicas", readyReplicas) + scheduledReplicas[clusterObj.ClusterName] = totalReplicas continue } @@ -398,13 +401,16 @@ func (c *Controller) estimateCapacity( continue } - unschedulable, nextCrossIn := countUnschedulablePods(pods, time.Now(), unschedulableThreshold) + scheduled, unschedulable, nextCrossIn := countScheduledAndUnschedulablePods(pods, time.Now(), unschedulableThreshold) logger.V(2).Info("Analyzed pods", "total", len(pods), "desired", desiredReplicas, + "scheduled", scheduled, "unschedulable", unschedulable, ) + scheduledReplicas[clusterObj.ClusterName] = int64(scheduled) + if nextCrossIn != nil && (retryAfter == nil || *nextCrossIn < *retryAfter) { retryAfter = nextCrossIn } @@ -441,7 +447,7 @@ func (c *Controller) estimateCapacity( Backoff: needsBackoff, } } - return estimatedCapacity, result + return estimatedCapacity, scheduledReplicas, result } func (c *Controller) getTotalAndReadyReplicas( diff --git a/pkg/controllers/automigration/util.go b/pkg/controllers/automigration/util.go index 5eb5970f..2da00dd4 100644 --- a/pkg/controllers/automigration/util.go +++ b/pkg/controllers/automigration/util.go @@ -27,11 +27,11 @@ import ( // unschedulable for more than unschedulableThreshold, // and a time.Duration representing the time from now // when the new unschedulable pod will cross the threshold, if any. -func countUnschedulablePods( +func countScheduledAndUnschedulablePods( podList []*corev1.Pod, currentTime time.Time, unschedulableThreshold time.Duration, -) (unschedulableCount int, nextCrossIn *time.Duration) { +) (scheduledCount, unschedulableCount int, nextCrossIn *time.Duration) { for _, pod := range podList { if pod.GetDeletionTimestamp() != nil { continue @@ -39,6 +39,9 @@ func countUnschedulablePods( scheduledCondition, isUnschedulable := getPodScheduledCondition(pod) if !isUnschedulable { + if scheduledCondition != nil && scheduledCondition.Status == corev1.ConditionTrue { + scheduledCount++ + } continue } @@ -51,8 +54,7 @@ func countUnschedulablePods( nextCrossIn = &crossingThresholdIn } } - - return unschedulableCount, nextCrossIn + return scheduledCount, unschedulableCount, nextCrossIn } func getPodScheduledCondition(pod *corev1.Pod) (scheduledCondition *corev1.PodCondition, isUnschedulable bool) { diff --git a/pkg/controllers/automigration/util_test.go b/pkg/controllers/automigration/util_test.go index 4ee1be6a..9aa48185 100644 --- a/pkg/controllers/automigration/util_test.go +++ b/pkg/controllers/automigration/util_test.go @@ -32,12 +32,14 @@ func doCheck( threshold time.Duration, pods []*corev1.Pod, expectedUnschedulable int, + expectedScheduled int, expectedNextCrossIn *time.Duration, ) { t.Helper() assert := assert.New(t) - unschedulableCount, nextCrossIn := countUnschedulablePods(pods, now, threshold) + scheduledCount, unschedulableCount, nextCrossIn := countScheduledAndUnschedulablePods(pods, now, threshold) + assert.Equal(scheduledCount, expectedScheduled) assert.Equal(expectedUnschedulable, unschedulableCount) assert.Equal(expectedNextCrossIn, nextCrossIn) } @@ -56,26 +58,26 @@ func TestCountUnschedulablePods(t *testing.T) { okPod, okPod, okPod, - }, 0, nil) + }, 0, 3, nil) doCheck(t, now, time.Minute, []*corev1.Pod{ okPod, okPod, unschedulablePod, - }, 1, nil) + }, 1, 2, nil) doCheck(t, now, time.Minute, []*corev1.Pod{ okPod, okPod, crossingIn10s, - }, 0, pointer.Duration(10*time.Second)) + }, 0, 2, pointer.Duration(10*time.Second)) doCheck(t, now, time.Minute, []*corev1.Pod{ okPod, okPod, unschedulablePod, crossingIn20s, - }, 1, pointer.Duration(20*time.Second)) + }, 1, 2, pointer.Duration(20*time.Second)) doCheck(t, now, time.Minute, []*corev1.Pod{ okPod, @@ -84,7 +86,7 @@ func TestCountUnschedulablePods(t *testing.T) { unschedulablePod, crossingIn10s, crossingIn20s, - }, 2, pointer.Duration(10*time.Second)) + }, 2, 2, pointer.Duration(10*time.Second)) doCheck(t, now, time.Minute, []*corev1.Pod{ okPod, @@ -93,7 +95,7 @@ func TestCountUnschedulablePods(t *testing.T) { unschedulableTerminatingPod, crossingIn10s, crossingIn20s, - }, 1, pointer.Duration(10*time.Second)) + }, 1, 2, pointer.Duration(10*time.Second)) } func newPod(terminating bool, schedulable bool, lastTransitionTimestamp time.Time) *corev1.Pod { diff --git a/pkg/controllers/scheduler/framework/plugins/names/names.go b/pkg/controllers/scheduler/framework/plugins/names/names.go index 59215364..b0750390 100644 --- a/pkg/controllers/scheduler/framework/plugins/names/names.go +++ b/pkg/controllers/scheduler/framework/plugins/names/names.go @@ -30,4 +30,5 @@ const ( MaxCluster = "MaxCluster" ClusterCapacityWeight = "ClusterCapacityWeight" ClusterEvicted = "ClusterEvicted" + PreferenceBinPack = "PreferenceBinPack" ) diff --git a/pkg/controllers/scheduler/framework/plugins/preferencebinpack/planner.go b/pkg/controllers/scheduler/framework/plugins/preferencebinpack/planner.go new file mode 100644 index 00000000..657aefdf --- /dev/null +++ b/pkg/controllers/scheduler/framework/plugins/preferencebinpack/planner.go @@ -0,0 +1,340 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +This file may have been modified by The KubeAdmiral Authors +("KubeAdmiral Modifications"). All KubeAdmiral Modifications +are Copyright 2023 The KubeAdmiral Authors. +*/ + +package preferencebinpack + +import ( + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog/v2" + + "github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework" +) + +// ClusterPreferences regarding number of replicas assigned to a cluster workload object (dep, rs, ..) within +// a federated workload object. +type ClusterPreferences struct { + // Minimum number of replicas that should be assigned to this cluster workload object. 0 by default. + MinReplicas int64 + + // Maximum number of replicas that should be assigned to this cluster workload object. + // Unbounded if no value provided (default). + MaxReplicas *int64 +} + +type ReplicaSchedulingPreference struct { + Clusters map[string]ClusterPreferences +} + +type namedClusterPreferences struct { + clusterName string + ClusterPreferences +} + +func Plan( + rsp *ReplicaSchedulingPreference, + totalReplicas int64, + availableClusters []string, + currentReplicaCount map[string]int64, + estimatedCapacity map[string]int64, + limitedCapacity map[string]int64, + avoidDisruption bool, + keepUnschedulableReplicas bool, + scheduledReplicas map[string]int64, // Replicas that have been scheduled in the member clusters +) (map[string]int64, map[string]int64, error) { + if !shouldPlan( + availableClusters, + currentReplicaCount, + limitedCapacity, + estimatedCapacity, + scheduledReplicas, + ) { + return currentReplicaCount, nil, nil + } + + namedPreferences := make([]*namedClusterPreferences, 0, len(availableClusters)) + for _, cluster := range availableClusters { + namedPreferences = append(namedPreferences, &namedClusterPreferences{ + clusterName: cluster, + ClusterPreferences: rsp.Clusters[cluster], + }) + } + + if !avoidDisruption { + keepUnschedulableReplicas = true + } + + desiredPlan, desiredOverflow := getDesiredPlan( + namedPreferences, + estimatedCapacity, + limitedCapacity, + totalReplicas, + ) + + var currentTotalOkReplicas, currentTotalScheduledReplicas int64 + // currentPlan should only contain clusters in availableClusters + currentPlan := make(map[string]int64, len(namedPreferences)) + for _, preference := range namedPreferences { + replicas := currentReplicaCount[preference.clusterName] + + if capacity, exists := estimatedCapacity[preference.clusterName]; exists && capacity < replicas { + replicas = capacity + } + + limitedReplicas, limitedExists := limitedCapacity[preference.clusterName] + if limitedExists && limitedReplicas < replicas { + replicas = limitedReplicas + } + + currentPlan[preference.clusterName] = replicas + + currentTotalOkReplicas += replicas + + if scheduled, exist := scheduledReplicas[preference.clusterName]; exist { + if limitedExists && limitedReplicas < scheduled { + scheduled = limitedReplicas + } + currentTotalScheduledReplicas += scheduled + } + } + + if !keepUnschedulableReplicas && currentTotalScheduledReplicas == totalReplicas { + klog.V(4).Infof("Trim overflow replicas when getting desired number of scheduled replicas") + return scheduledReplicas, nil, nil + } + + // If we don't need to avoid migration, just return the plan computed from preferences + if !avoidDisruption { + return desiredPlan, desiredOverflow, nil + } + + var desiredTotalReplicas int64 + for _, replicas := range desiredPlan { + desiredTotalReplicas += replicas + } + + // Cap the overflow at currently unscheduled replicas. + if !keepUnschedulableReplicas { + newOverflow := make(map[string]int64) + for key, value := range desiredOverflow { + value = framework.MinInt64(value, totalReplicas-currentTotalScheduledReplicas) + if value > 0 { + newOverflow[key] = value + } + } + + desiredOverflow = newOverflow + } + + klog.V(4).Infof("Desired plan: %v and overflow: %v before scale", desiredPlan, desiredOverflow) + + // Try to avoid instance migration between clusters + switch { + case currentTotalOkReplicas == desiredTotalReplicas: + return currentPlan, desiredOverflow, nil + case currentTotalOkReplicas > desiredTotalReplicas: + plan, err := scaleDown( + currentPlan, desiredPlan, + currentTotalOkReplicas-totalReplicas, + availableClusters, + ) + if err != nil { + return nil, nil, err + } + klog.V(4).Infof("ScaleDown plan: %v", plan) + return plan, desiredOverflow, nil + default: + plan, err := scaleUp( + rsp, + currentPlan, desiredPlan, + limitedCapacity, + totalReplicas-currentTotalOkReplicas, + availableClusters, + ) + if err != nil { + return nil, nil, err + } + klog.V(4).Infof("ScaleUp plan: %v", plan) + return plan, desiredOverflow, nil + } +} + +func getDesiredPlan( + preferences []*namedClusterPreferences, + estimatedCapacity map[string]int64, + limitedCapacity map[string]int64, + totalReplicas int64, +) (map[string]int64, map[string]int64) { + remainingReplicas := totalReplicas + plan := make(map[string]int64, len(preferences)) + overflow := make(map[string]int64, len(preferences)) + + // Assign each cluster the minimum number of replicas it requested. + for _, preference := range preferences { + min := framework.MinInt64(preference.MinReplicas, remainingReplicas) + if capacity, hasCapacity := limitedCapacity[preference.clusterName]; hasCapacity && capacity < min { + min = capacity + } + if capacity, hasCapacity := estimatedCapacity[preference.clusterName]; hasCapacity && capacity < min { + overflow[preference.clusterName] = min - capacity + min = capacity + } + remainingReplicas -= min + plan[preference.clusterName] = min + } + + for _, preference := range preferences { + start := plan[preference.clusterName] + + // In total there should be the amount that was there at start plus whatever is due + // in this iteration + total := start + remainingReplicas + + if preference.MaxReplicas != nil && total > *preference.MaxReplicas { + total = *preference.MaxReplicas + } + if capacity, hasCapacity := limitedCapacity[preference.clusterName]; hasCapacity && total > capacity { + total = capacity + } + if capacity, hasCapacity := estimatedCapacity[preference.clusterName]; hasCapacity && total > capacity { + overflow[preference.clusterName] += total - capacity + total = capacity + } + + // Only total-start replicas were actually taken. + remainingReplicas -= total - start + plan[preference.clusterName] = total + } + + return plan, overflow +} + +func scaleUp( + rsp *ReplicaSchedulingPreference, + currentReplicaCount, desiredReplicaCount map[string]int64, + limitedCapacity map[string]int64, + scaleUpCount int64, + availableClusters []string, +) (map[string]int64, error) { + namedPreferences := make([]*namedClusterPreferences, 0, len(availableClusters)) + for _, cluster := range availableClusters { + current := currentReplicaCount[cluster] + desired := desiredReplicaCount[cluster] + if desired > current { + pref := &namedClusterPreferences{ + clusterName: cluster, + } + pref.MinReplicas = rsp.Clusters[cluster].MinReplicas + if rsp.Clusters[cluster].MaxReplicas != nil { + // note that max is always positive because MaxReplicas >= desired > current + max := *rsp.Clusters[cluster].MaxReplicas - current + pref.MaxReplicas = &max + } + namedPreferences = append(namedPreferences, pref) + } + } + + // no estimatedCapacity and hence no overflow + replicasToScaleUp, _ := getDesiredPlan(namedPreferences, nil, limitedCapacity, scaleUpCount) + for cluster, count := range replicasToScaleUp { + currentReplicaCount[cluster] += count + } + + return currentReplicaCount, nil +} + +func scaleDown( + currentReplicaCount, desiredReplicaCount map[string]int64, + scaleDownCount int64, + availableClusters []string, +) (map[string]int64, error) { + namedPreferences := make([]*namedClusterPreferences, 0, len(availableClusters)) + for _, cluster := range availableClusters { + current := currentReplicaCount[cluster] + desired := desiredReplicaCount[cluster] + if desired < current { + namedPreferences = append([]*namedClusterPreferences{{ + clusterName: cluster, + ClusterPreferences: ClusterPreferences{ + MaxReplicas: ¤t, + }, + }}, namedPreferences...) + } + } + + // no estimatedCapacity and hence no overflow + replicasToScaleDown, _ := getDesiredPlan(namedPreferences, nil, nil, scaleDownCount) + for cluster, count := range replicasToScaleDown { + currentReplicaCount[cluster] -= count + } + + return currentReplicaCount, nil +} + +func shouldPlan( + availableClusters []string, + currentReplicaCount, limitedCapacity, + estimatedCapacity, scheduledReplicas map[string]int64, +) bool { + if isScheduledClustersRemoved(availableClusters, currentReplicaCount) { + return true + } + + for cluster, replicas := range currentReplicaCount { + if capacity, exists := limitedCapacity[cluster]; exists && replicas > capacity { + return true + } + } + + return isEstimatedCapacityAvailable(currentReplicaCount, estimatedCapacity, scheduledReplicas) +} + +func isScheduledClustersRemoved( + availableClusters []string, + currentReplicaCount map[string]int64, +) bool { + availableClustersSets := sets.New(availableClusters...) + for cluster := range currentReplicaCount { + if !availableClustersSets.Has(cluster) { + return true + } + } + return false +} + +func isEstimatedCapacityAvailable( + currentReplicaCount, estimatedCapacity, scheduledReplicas map[string]int64, +) (available bool) { + defer func() { + if !available { + klog.V(4).Infof("Current estimate capacity is unavailable") + } + }() + + // If `capacity != replicas`, some replicas are still being scheduled. We defer planning until the status of all replicas are known. + for cluster, replicas := range scheduledReplicas { + if capacity, exist := estimatedCapacity[cluster]; exist && capacity != replicas { + return false + } else if !exist && replicas != currentReplicaCount[cluster] { + return false + } + } + + return true +} diff --git a/pkg/controllers/scheduler/framework/plugins/preferencebinpack/planner_test.go b/pkg/controllers/scheduler/framework/plugins/preferencebinpack/planner_test.go new file mode 100644 index 00000000..d8a5706a --- /dev/null +++ b/pkg/controllers/scheduler/framework/plugins/preferencebinpack/planner_test.go @@ -0,0 +1,849 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +This file may have been modified by The KubeAdmiral Authors +("KubeAdmiral Modifications"). All KubeAdmiral Modifications +are Copyright 2023 The KubeAdmiral Authors. +*/ + +package preferencebinpack + +import ( + "reflect" + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/utils/pointer" +) + +func Test_getDesiredPlan(t *testing.T) { + type args struct { + preferences []*namedClusterPreferences + estimatedCapacity map[string]int64 + limitedCapacity map[string]int64 + totalReplicas int64 + } + tests := []struct { + name string + args args + desiredPlan map[string]int64 + desiredOverflow map[string]int64 + }{ + { + name: "all pods scheduled to 1 cluster", + args: args{ + preferences: []*namedClusterPreferences{ + { + clusterName: "A", + ClusterPreferences: ClusterPreferences{}, + }, + { + clusterName: "B", + ClusterPreferences: ClusterPreferences{}, + }, + { + clusterName: "C", + ClusterPreferences: ClusterPreferences{}, + }, + }, + estimatedCapacity: nil, + limitedCapacity: nil, + totalReplicas: 10, + }, + desiredPlan: map[string]int64{ + "A": 10, + "B": 0, + "C": 0, + }, + desiredOverflow: map[string]int64{}, + }, + { + name: "1 cluster with estimated capacity", + args: args{ + preferences: []*namedClusterPreferences{ + { + clusterName: "A", + ClusterPreferences: ClusterPreferences{}, + }, + { + clusterName: "B", + ClusterPreferences: ClusterPreferences{}, + }, + { + clusterName: "C", + ClusterPreferences: ClusterPreferences{}, + }, + }, + estimatedCapacity: map[string]int64{ + "A": 5, + }, + limitedCapacity: nil, + totalReplicas: 10, + }, + desiredPlan: map[string]int64{ + "A": 5, + "B": 5, + "C": 0, + }, + desiredOverflow: map[string]int64{"A": 5}, + }, + { + name: "2 cluster with estimated capacity", + args: args{ + preferences: []*namedClusterPreferences{ + { + clusterName: "A", + ClusterPreferences: ClusterPreferences{}, + }, + { + clusterName: "B", + ClusterPreferences: ClusterPreferences{}, + }, + { + clusterName: "C", + ClusterPreferences: ClusterPreferences{}, + }, + }, + estimatedCapacity: map[string]int64{ + "A": 5, + "B": 3, + }, + limitedCapacity: nil, + totalReplicas: 10, + }, + desiredPlan: map[string]int64{ + "A": 5, + "B": 3, + "C": 2, + }, + desiredOverflow: map[string]int64{ + "A": 5, + "B": 2, + }, + }, + { + name: "all cluster with estimated capacity", + args: args{ + preferences: []*namedClusterPreferences{ + { + clusterName: "A", + ClusterPreferences: ClusterPreferences{}, + }, + { + clusterName: "B", + ClusterPreferences: ClusterPreferences{}, + }, + { + clusterName: "C", + ClusterPreferences: ClusterPreferences{}, + }, + }, + estimatedCapacity: map[string]int64{ + "A": 5, + "B": 3, + "C": 1, + }, + limitedCapacity: nil, + totalReplicas: 10, + }, + desiredPlan: map[string]int64{ + "A": 5, + "B": 3, + "C": 1, + }, + desiredOverflow: map[string]int64{ + "A": 5, + "B": 2, + "C": 1, + }, + }, + { + name: "1 cluster with limited capacity", + args: args{ + preferences: []*namedClusterPreferences{ + { + clusterName: "A", + ClusterPreferences: ClusterPreferences{}, + }, + { + clusterName: "B", + ClusterPreferences: ClusterPreferences{}, + }, + { + clusterName: "C", + ClusterPreferences: ClusterPreferences{}, + }, + }, + estimatedCapacity: map[string]int64{"A": 5}, + limitedCapacity: map[string]int64{ + "A": 3, + }, + totalReplicas: 10, + }, + desiredPlan: map[string]int64{ + "A": 3, + "B": 7, + "C": 0, + }, + desiredOverflow: map[string]int64{}, + }, + { + name: "all cluster with limited capacity", + args: args{ + preferences: []*namedClusterPreferences{ + { + clusterName: "A", + ClusterPreferences: ClusterPreferences{}, + }, + { + clusterName: "B", + ClusterPreferences: ClusterPreferences{}, + }, + { + clusterName: "C", + ClusterPreferences: ClusterPreferences{}, + }, + }, + estimatedCapacity: map[string]int64{"A": 5}, + limitedCapacity: map[string]int64{ + "A": 3, + "B": 2, + "C": 1, + }, + totalReplicas: 10, + }, + desiredPlan: map[string]int64{ + "A": 3, + "B": 2, + "C": 1, + }, + desiredOverflow: map[string]int64{}, + }, + { + name: "1 cluster with maxReplicas", + args: args{ + preferences: []*namedClusterPreferences{ + { + clusterName: "A", + ClusterPreferences: ClusterPreferences{ + MaxReplicas: pointer.Int64(5), + }, + }, + { + clusterName: "B", + ClusterPreferences: ClusterPreferences{}, + }, + { + clusterName: "C", + ClusterPreferences: ClusterPreferences{}, + }, + }, + estimatedCapacity: nil, + limitedCapacity: nil, + totalReplicas: 10, + }, + desiredPlan: map[string]int64{ + "A": 5, + "B": 5, + "C": 0, + }, + desiredOverflow: map[string]int64{}, + }, + { + name: "1 cluster with minReplicas", + args: args{ + preferences: []*namedClusterPreferences{ + { + clusterName: "A", + ClusterPreferences: ClusterPreferences{}, + }, + { + clusterName: "B", + ClusterPreferences: ClusterPreferences{ + MinReplicas: 5, + }, + }, + { + clusterName: "C", + ClusterPreferences: ClusterPreferences{}, + }, + }, + estimatedCapacity: nil, + limitedCapacity: nil, + totalReplicas: 10, + }, + desiredPlan: map[string]int64{ + "A": 5, + "B": 5, + "C": 0, + }, + desiredOverflow: map[string]int64{}, + }, + { + name: "1 cluster with minReplicas and limitCapacity", + args: args{ + preferences: []*namedClusterPreferences{ + { + clusterName: "A", + ClusterPreferences: ClusterPreferences{}, + }, + { + clusterName: "B", + ClusterPreferences: ClusterPreferences{ + MinReplicas: 5, + }, + }, + { + clusterName: "C", + ClusterPreferences: ClusterPreferences{}, + }, + }, + estimatedCapacity: nil, + limitedCapacity: map[string]int64{"B": 2}, + totalReplicas: 10, + }, + desiredPlan: map[string]int64{ + "A": 8, + "B": 2, + "C": 0, + }, + desiredOverflow: map[string]int64{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotDesiredPlan, gotDesiredOverflow := getDesiredPlan( + tt.args.preferences, + tt.args.estimatedCapacity, + tt.args.limitedCapacity, + tt.args.totalReplicas, + ) + if !reflect.DeepEqual(gotDesiredPlan, tt.desiredPlan) { + t.Errorf("getDesiredPlan() gotDesiredPlan = %v, desiredPlan %v", gotDesiredPlan, tt.desiredPlan) + } + if !reflect.DeepEqual(gotDesiredOverflow, tt.desiredOverflow) { + t.Errorf("getDesiredPlan() gotDesiredOverflow = %v, desiredOverflow %v", gotDesiredOverflow, tt.desiredOverflow) + } + }) + } +} + +type testCase struct { + rsp map[string]ClusterPreferences + replicas int64 + clusters []string + existing map[string]int64 + capacity map[string]int64 + limitedCapacity map[string]int64 + scheduledReplicas map[string]int64 +} + +type expectedResult struct { + plan map[string]int64 + overflow map[string]int64 +} + +func estimateCapacity(currentReplicas, actualCapacity map[string]int64) map[string]int64 { + estimatedCapacity := make(map[string]int64, len(actualCapacity)) + for cluster, c := range actualCapacity { + if currentReplicas[cluster] > c { + estimatedCapacity[cluster] = c + } + } + + return estimatedCapacity +} + +func doCheck( + t *testing.T, + tc *testCase, + avoidDisruption bool, + keepUnschedulableReplicas bool, + expected *expectedResult, +) { + t.Helper() + assert := assert.New(t) + + existing := tc.existing + var plan, overflow, lastPlan, lastOverflow map[string]int64 + var err error + + converged := false + const maxConvergenceSteps = 3 + for i := 0; i < maxConvergenceSteps; i++ { + estimatedCapacity := estimateCapacity(existing, tc.capacity) + plan, overflow, err = Plan( + &ReplicaSchedulingPreference{ + Clusters: tc.rsp, + }, + tc.replicas, tc.clusters, + existing, estimatedCapacity, tc.limitedCapacity, + avoidDisruption, keepUnschedulableReplicas, tc.scheduledReplicas, + ) + + assert.Nil(err) + t.Logf("Step %v: avoidDisruption=%v keepUnschedulableReplicas=%v pref=%+v existing=%v estimatedCapacity=%v plan=%v overflow=%v\n", + i, avoidDisruption, keepUnschedulableReplicas, tc.rsp, existing, estimatedCapacity, plan, overflow) + + // nil and empty map should be treated as equal + planConverged := len(plan) == 0 && len(lastPlan) == 0 || reflect.DeepEqual(plan, lastPlan) + overflowConverged := len(overflow) == 0 && len(lastOverflow) == 0 || reflect.DeepEqual(overflow, lastOverflow) + converged = planConverged && overflowConverged + if converged { + // Break out of the loop if converged + break + } + + // Not converged yet, do another round + existing = make(map[string]int64, len(plan)) + for cluster, replicas := range plan { + existing[cluster] += replicas + } + for cluster, replicas := range overflow { + existing[cluster] += replicas + } + + lastPlan, lastOverflow = plan, overflow + } + + if !converged { + t.Errorf("did not converge after %v steps", maxConvergenceSteps) + } + + if len(plan) != 0 || len(expected.plan) != 0 { + assert.Equal(expected.plan, plan) + } + if len(overflow) != 0 || len(expected.overflow) != 0 { + assert.Equal(expected.overflow, overflow) + } +} + +func doCheckWithoutExisting( + t *testing.T, + tc *testCase, + expected *expectedResult, +) { + // The replica distribution should be the same regardless of + // avoidDisruption and keepUnschedulableReplicas. + + t.Helper() + doCheck(t, tc, false, false, expected) + doCheck(t, tc, false, true, expected) + doCheck(t, tc, true, false, expected) + doCheck(t, tc, true, true, expected) +} + +func TestWithoutExisting(t *testing.T) { + // all pods scheduled to 1 cluster + doCheckWithoutExisting(t, + &testCase{ + rsp: map[string]ClusterPreferences{}, + replicas: 50, + clusters: []string{"A", "B"}, + }, + &expectedResult{plan: map[string]int64{"A": 50, "B": 0}}, + ) + + // 1 cluster with minReplicas + doCheckWithoutExisting(t, + &testCase{ + rsp: map[string]ClusterPreferences{ + "B": {MinReplicas: 10}, + }, + replicas: 50, + clusters: []string{"A", "B"}, + }, + &expectedResult{plan: map[string]int64{"A": 40, "B": 10}}, + ) + + // 2 clusters with maxReplicas + doCheckWithoutExisting(t, + &testCase{ + rsp: map[string]ClusterPreferences{ + "A": {MaxReplicas: pointer.Int64(10)}, + "B": {MaxReplicas: pointer.Int64(21)}, + }, + replicas: 50, + clusters: []string{"A", "B", "C"}, + }, + &expectedResult{plan: map[string]int64{"A": 10, "B": 21, "C": 19}}, + ) + + // 1 cluster with maxReplicas and 1 cluster with minReplicas + doCheckWithoutExisting(t, + &testCase{ + rsp: map[string]ClusterPreferences{ + "A": {MaxReplicas: pointer.Int64(10)}, + "C": {MinReplicas: 10}, + }, + replicas: 50, + clusters: []string{"A", "B", "C"}, + }, + &expectedResult{plan: map[string]int64{"A": 10, "B": 30, "C": 10}}, + ) + + // 1 cluster with maxReplicas, 1 cluster with minReplicas and 1 cluster with limitCapacity + doCheckWithoutExisting(t, + &testCase{ + rsp: map[string]ClusterPreferences{ + "A": {MaxReplicas: pointer.Int64(10)}, + "D": {MinReplicas: 10}, + }, + replicas: 50, + clusters: []string{"A", "B", "C", "D"}, + limitedCapacity: map[string]int64{"B": 10}, + }, + &expectedResult{plan: map[string]int64{"A": 10, "B": 10, "C": 20, "D": 10}}, + ) +} + +func doCheckWithExisting( + t *testing.T, + tc *testCase, + expected [2]*expectedResult, +) { + // With existing, avoidDisruption should affect the distribution + + t.Helper() + doCheck(t, tc, false, false, expected[0]) + doCheck(t, tc, false, true, expected[0]) + doCheck(t, tc, true, false, expected[1]) + doCheck(t, tc, true, true, expected[1]) +} + +func TestWithExisting(t *testing.T) { + doCheckWithExisting(t, + &testCase{ + rsp: map[string]ClusterPreferences{}, + replicas: 50, + clusters: []string{"A", "B", "C"}, + existing: map[string]int64{"C": 30}, + scheduledReplicas: map[string]int64{"C": 30}, + }, + [2]*expectedResult{ + {plan: map[string]int64{"A": 50, "B": 0, "C": 0}}, + {plan: map[string]int64{"A": 20, "B": 0, "C": 30}}, + }, + ) + + doCheckWithExisting(t, + &testCase{ + rsp: map[string]ClusterPreferences{}, + replicas: 50, + clusters: []string{"A", "B"}, + existing: map[string]int64{"A": 30}, + limitedCapacity: map[string]int64{"A": 0}, + scheduledReplicas: map[string]int64{"A": 30}, + }, + [2]*expectedResult{ + {plan: map[string]int64{"A": 0, "B": 50}}, + {plan: map[string]int64{"A": 0, "B": 50}}, + }, + ) + + doCheckWithExisting(t, + &testCase{ + rsp: map[string]ClusterPreferences{}, + replicas: 50, + clusters: []string{"A", "B"}, + existing: map[string]int64{"A": 30}, + limitedCapacity: map[string]int64{"A": 10}, + scheduledReplicas: map[string]int64{"A": 30}, + }, + [2]*expectedResult{ + {plan: map[string]int64{"A": 10, "B": 40}}, + {plan: map[string]int64{"A": 10, "B": 40}}, + }, + ) + + doCheckWithExisting(t, + &testCase{ + rsp: map[string]ClusterPreferences{}, + replicas: 15, + clusters: []string{"A", "B"}, + existing: map[string]int64{"A": 0, "B": 8}, + scheduledReplicas: map[string]int64{"B": 8}, + }, + [2]*expectedResult{ + {plan: map[string]int64{"A": 15, "B": 0}}, + {plan: map[string]int64{"A": 7, "B": 8}}, + }, + ) + + doCheckWithExisting(t, + &testCase{ + rsp: map[string]ClusterPreferences{}, + replicas: 15, + clusters: []string{"A", "B"}, + existing: map[string]int64{"A": 15, "B": 8}, + scheduledReplicas: map[string]int64{"A": 15, "B": 8}, + }, + [2]*expectedResult{ + {plan: map[string]int64{"A": 15, "B": 0}}, + {plan: map[string]int64{"A": 15, "B": 0}}, + }, + ) + + // add maxReplicas for existing replicas + doCheckWithExisting(t, + &testCase{ + rsp: map[string]ClusterPreferences{ + "A": { + MaxReplicas: pointer.Int64(10), + }, + }, + replicas: 15, + clusters: []string{"A", "B"}, + existing: map[string]int64{"A": 15, "B": 0}, + scheduledReplicas: map[string]int64{"A": 15, "B": 0}, + }, + [2]*expectedResult{ + {plan: map[string]int64{"A": 10, "B": 5}}, + {plan: map[string]int64{"A": 15, "B": 0}}, + }, + ) + + // add minReplicas for existing replicas + doCheckWithExisting(t, + &testCase{ + rsp: map[string]ClusterPreferences{ + "B": { + MinReplicas: 10, + }, + }, + replicas: 15, + clusters: []string{"A", "B"}, + existing: map[string]int64{"A": 15, "B": 0}, + scheduledReplicas: map[string]int64{"A": 15, "B": 0}, + }, + [2]*expectedResult{ + {plan: map[string]int64{"A": 5, "B": 10}}, + {plan: map[string]int64{"A": 15, "B": 0}}, + }, + ) + + // add limitCapacity for existing replicas + doCheckWithExisting(t, + &testCase{ + rsp: map[string]ClusterPreferences{}, + replicas: 15, + clusters: []string{"A", "B"}, + existing: map[string]int64{"A": 15, "B": 0}, + scheduledReplicas: map[string]int64{"A": 15, "B": 0}, + limitedCapacity: map[string]int64{"A": 5}, + }, + [2]*expectedResult{ + {plan: map[string]int64{"A": 5, "B": 10}}, + {plan: map[string]int64{"A": 5, "B": 10}}, + }, + ) +} + +func doCheckWithExistingAndCapacity( + t *testing.T, + tc *testCase, + expected [4]*expectedResult, +) { + // With existing and capacity, both avoidDisruption and keepUnschedulableReplicas + // should affect the distribution + + t.Helper() + doCheck(t, tc, false, false, expected[0]) + doCheck(t, tc, false, true, expected[1]) + doCheck(t, tc, true, false, expected[2]) + doCheck(t, tc, true, true, expected[3]) +} + +func TestWithExistingAndCapacity(t *testing.T) { + doCheckWithExistingAndCapacity(t, + &testCase{ + rsp: map[string]ClusterPreferences{}, + replicas: 50, + clusters: []string{"A", "B", "C"}, + existing: map[string]int64{"A": 50, "B": 20}, + capacity: map[string]int64{"A": 30, "B": 10}, + scheduledReplicas: map[string]int64{"A": 30, "B": 10}, + }, + + [4]*expectedResult{ + { + plan: map[string]int64{"A": 30, "B": 10, "C": 10}, + overflow: map[string]int64{"A": 20, "B": 10}, + }, + { + plan: map[string]int64{"A": 30, "B": 10, "C": 10}, + overflow: map[string]int64{"A": 20, "B": 10}, + }, + { + plan: map[string]int64{"A": 30, "B": 10, "C": 10}, + overflow: map[string]int64{"A": 10, "B": 10}, + }, + { + plan: map[string]int64{"A": 30, "B": 10, "C": 10}, + overflow: map[string]int64{"A": 20, "B": 10}, + }, + }, + ) + + // scale up + doCheckWithExistingAndCapacity(t, + &testCase{ + rsp: map[string]ClusterPreferences{}, + replicas: 50, + clusters: []string{"A", "B", "C"}, + existing: map[string]int64{"A": 30, "C": 20}, + capacity: map[string]int64{"C": 10}, + scheduledReplicas: map[string]int64{"A": 30, "C": 10}, + }, + [4]*expectedResult{ + { + plan: map[string]int64{"A": 50, "B": 0, "C": 0}, + overflow: map[string]int64{}, + }, + { + plan: map[string]int64{"A": 50, "B": 0, "C": 0}, + overflow: map[string]int64{}, + }, + { + plan: map[string]int64{"A": 40, "B": 0, "C": 10}, + }, + { + plan: map[string]int64{"A": 40, "B": 0, "C": 10}, + }, + }, + ) + + // scale down + doCheckWithExistingAndCapacity(t, + &testCase{ + rsp: map[string]ClusterPreferences{}, + replicas: 60, + clusters: []string{"A", "B", "C"}, + existing: map[string]int64{"A": 60, "B": 40}, + capacity: map[string]int64{"A": 40}, + scheduledReplicas: map[string]int64{"A": 40}, + }, + [4]*expectedResult{ + { + plan: map[string]int64{"A": 40, "B": 20, "C": 0}, + overflow: map[string]int64{"A": 20}, + }, + { + plan: map[string]int64{"A": 40, "B": 20, "C": 0}, + overflow: map[string]int64{"A": 20}, + }, + { + plan: map[string]int64{"A": 40, "B": 20, "C": 0}, + overflow: map[string]int64{"A": 20}, + }, + { + plan: map[string]int64{"A": 40, "B": 20, "C": 0}, + overflow: map[string]int64{"A": 20}, + }, + }, + ) + + // total capacity < desired replicas + doCheckWithExistingAndCapacity(t, + &testCase{ + rsp: map[string]ClusterPreferences{}, + replicas: 60, + clusters: []string{"A", "B", "C"}, + existing: map[string]int64{"A": 60, "B": 50, "C": 40}, + capacity: map[string]int64{"A": 10, "B": 10, "C": 20}, + }, + [4]*expectedResult{ + { + plan: map[string]int64{"A": 10, "B": 10, "C": 20}, + overflow: map[string]int64{"A": 50, "B": 40, "C": 20}, + }, + { + plan: map[string]int64{"A": 10, "B": 10, "C": 20}, + overflow: map[string]int64{"A": 50, "B": 40, "C": 20}, + }, + { + plan: map[string]int64{"A": 10, "B": 10, "C": 20}, + overflow: map[string]int64{"A": 50, "B": 40, "C": 20}, + }, + { + plan: map[string]int64{"A": 10, "B": 10, "C": 20}, + overflow: map[string]int64{"A": 50, "B": 40, "C": 20}, + }, + }, + ) + + doCheckWithExistingAndCapacity(t, + &testCase{ + rsp: map[string]ClusterPreferences{ + "A": {MaxReplicas: pointer.Int64(10)}, + "C": {MaxReplicas: pointer.Int64(21)}, + "D": {MaxReplicas: pointer.Int64(10)}, + }, + replicas: 60, + clusters: []string{"A", "B", "C", "D"}, + existing: map[string]int64{"A": 20, "B": 40}, + capacity: map[string]int64{"B": 10}, + }, + [4]*expectedResult{ + { + plan: map[string]int64{"A": 10, "B": 10, "C": 21, "D": 10}, + overflow: map[string]int64{"B": 40}, + }, + { + plan: map[string]int64{"A": 10, "B": 10, "C": 21, "D": 10}, + overflow: map[string]int64{"B": 40}, + }, + { + plan: map[string]int64{"A": 20, "B": 10, "C": 21, "D": 9}, + overflow: map[string]int64{"B": 40}, + }, + { + plan: map[string]int64{"A": 20, "B": 10, "C": 21, "D": 9}, + overflow: map[string]int64{"B": 40}, + }, + }, + ) + + doCheckWithExistingAndCapacity(t, + &testCase{ + rsp: map[string]ClusterPreferences{ + "B": {MinReplicas: 20}, + }, + replicas: 50, + clusters: []string{"A", "B", "C"}, + existing: map[string]int64{"C": 24}, + capacity: map[string]int64{"B": 10}, + }, + [4]*expectedResult{ + { + plan: map[string]int64{"A": 40, "B": 10, "C": 0}, + overflow: map[string]int64{"B": 10}, + }, + { + plan: map[string]int64{"A": 40, "B": 10, "C": 0}, + overflow: map[string]int64{"B": 10}, + }, + { + plan: map[string]int64{"A": 16, "B": 10, "C": 24}, + overflow: map[string]int64{"B": 10}, + }, + { + plan: map[string]int64{"A": 16, "B": 10, "C": 24}, + overflow: map[string]int64{"B": 10}, + }, + }, + ) +} diff --git a/pkg/controllers/scheduler/framework/plugins/preferencebinpack/preference_bin_pack.go b/pkg/controllers/scheduler/framework/plugins/preferencebinpack/preference_bin_pack.go new file mode 100644 index 00000000..97e57cfa --- /dev/null +++ b/pkg/controllers/scheduler/framework/plugins/preferencebinpack/preference_bin_pack.go @@ -0,0 +1,152 @@ +/* +Copyright 2023 The KubeAdmiral Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package preferencebinpack + +import ( + "context" + + "github.com/davecgh/go-spew/spew" + corev1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" + "k8s.io/utils/pointer" + + fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1" + "github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework" + "github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework/plugins/names" +) + +type PreferenceBinPack struct{} + +var _ framework.ReplicasPlugin = &PreferenceBinPack{} + +func NewPreferenceBinPack(frameworkHandle framework.Handle) (framework.Plugin, error) { + return &PreferenceBinPack{}, nil +} + +func (pl *PreferenceBinPack) Name() string { + return names.PreferenceBinPack +} + +func (pl *PreferenceBinPack) ReplicaScheduling( + ctx context.Context, + su *framework.SchedulingUnit, + clusters []*fedcorev1a1.FederatedCluster, +) (framework.ClusterReplicasList, *framework.Result) { + clusterReplicasList := make(framework.ClusterReplicasList, 0) + totalReplicas := int64(0) + if su.DesiredReplicas != nil { + totalReplicas = *su.DesiredReplicas + } + + currentReplicas := map[string]int64{} + for cluster, replicas := range su.CurrentClusters { + if replicas != nil { + currentReplicas[cluster] = *replicas + continue + } + currentReplicas[cluster] = totalReplicas + } + + clusterPreferences := map[string]ClusterPreferences{} + for _, cluster := range clusters { + pref := ClusterPreferences{ + MinReplicas: su.MinReplicas[cluster.Name], + MaxReplicas: nil, + } + + if maxReplicas, exists := su.MaxReplicas[cluster.Name]; exists { + pref.MaxReplicas = pointer.Int64(maxReplicas) + } + + // if member cluster has untolerated NoSchedule taint, no new replicas will be scheduled to this cluster + if _, isUntolerated := framework.FindMatchingUntoleratedTaint( + cluster.Spec.Taints, + su.Tolerations, + func(t *corev1.Taint) bool { + return t.Effect == corev1.TaintEffectNoSchedule + }, + ); isUntolerated { + if pref.MaxReplicas == nil || currentReplicas[cluster.Name] < *pref.MaxReplicas { + pref.MaxReplicas = pointer.Int64(currentReplicas[cluster.Name]) + } + } + + clusterPreferences[cluster.Name] = pref + } + + estimatedCapacity := map[string]int64{} + scheduledReplicas := map[string]int64{} + keepUnschedulableReplicas := false + if autoMigration := su.AutoMigration; autoMigration != nil { + keepUnschedulableReplicas = autoMigration.KeepUnschedulableReplicas + if info := autoMigration.Info; info != nil { + for cluster, ec := range info.EstimatedCapacity { + if ec >= 0 { + estimatedCapacity[cluster] = ec + } + } + scheduledReplicas = su.AutoMigration.Info.ScheduledReplicas + } + } + + limitedCapacity := map[string]int64{} + if su.CustomMigration.Info != nil && su.CustomMigration.Info.LimitedCapacity != nil { + limitedCapacity = su.CustomMigration.Info.LimitedCapacity + } + + scheduleResult, overflow, err := Plan( + &ReplicaSchedulingPreference{ + Clusters: clusterPreferences, + }, + totalReplicas, + framework.ExtractClusterNames(clusters), + currentReplicas, + estimatedCapacity, + limitedCapacity, + su.AvoidDisruption, + keepUnschedulableReplicas, + scheduledReplicas, + ) + if err != nil { + return clusterReplicasList, framework.NewResult(framework.Error) + } + + klog.V(4).Infof( + "[scheduling] for %q clusterPreferences: %s, estimatedCapacity: %v, scheduledReplicas: %v, currentReplicas: %v, result: %v, overflow: %v", + su.Key(), spew.Sprint(clusterPreferences), estimatedCapacity, scheduledReplicas, currentReplicas, scheduleResult, overflow, + ) + + result := make(map[string]int64) + for clusterName, replicas := range scheduleResult { + result[clusterName] = replicas + } + for clusterName, replicas := range overflow { + result[clusterName] += replicas + } + + for _, cluster := range clusters { + replicas, ok := result[cluster.Name] + if !ok || replicas == 0 { + continue + } + clusterReplicasList = append(clusterReplicasList, framework.ClusterReplicas{ + Cluster: cluster, + Replicas: replicas, + }) + } + return clusterReplicasList, framework.NewResult(framework.Success) +} diff --git a/pkg/controllers/scheduler/framework/plugins/rsp/rsp.go b/pkg/controllers/scheduler/framework/plugins/rsp/rsp.go index 0f8f2b54..f2619016 100644 --- a/pkg/controllers/scheduler/framework/plugins/rsp/rsp.go +++ b/pkg/controllers/scheduler/framework/plugins/rsp/rsp.go @@ -163,7 +163,7 @@ func (pl *ClusterCapacityWeight) ReplicaScheduling( Clusters: clusterPreferences, }, totalReplicas, - ExtractClusterNames(clusters), + framework.ExtractClusterNames(clusters), currentReplicas, estimatedCapacity, limitedCapacity, @@ -348,11 +348,3 @@ func QueryAllocatable(clusters []*fedcorev1a1.FederatedCluster) map[string]corev } return ret } - -func ExtractClusterNames(clusters []*fedcorev1a1.FederatedCluster) []string { - ret := make([]string, len(clusters)) - for i := range clusters { - ret[i] = clusters[i].Name - } - return ret -} diff --git a/pkg/controllers/scheduler/framework/plugins/rsp/rsp_test.go b/pkg/controllers/scheduler/framework/plugins/rsp/rsp_test.go index 59a7d8d6..40e15aae 100644 --- a/pkg/controllers/scheduler/framework/plugins/rsp/rsp_test.go +++ b/pkg/controllers/scheduler/framework/plugins/rsp/rsp_test.go @@ -49,19 +49,6 @@ func addTaint(cluster *fedcorev1a1.FederatedCluster, key, value string, effect c return cluster } -func TestExtractClusterNames(t *testing.T) { - clusters := []*fedcorev1a1.FederatedCluster{} - names := []string{"foo", "bar"} - for _, name := range names { - clusters = append(clusters, NewFederatedCluster(name)) - } - ret := ExtractClusterNames(clusters) - assert.Equal(t, len(ret), len(names), "the length should be the same.") - for i := range ret { - assert.Equal(t, ret[i], names[i], "the name should be the same.") - } -} - func makeClusterWithCPU(name string, allocatable, available int) *fedcorev1a1.FederatedCluster { cluster := &fedcorev1a1.FederatedCluster{ ObjectMeta: metav1.ObjectMeta{ diff --git a/pkg/controllers/scheduler/framework/types.go b/pkg/controllers/scheduler/framework/types.go index b8beee7f..978ca97d 100644 --- a/pkg/controllers/scheduler/framework/types.go +++ b/pkg/controllers/scheduler/framework/types.go @@ -58,9 +58,10 @@ type SchedulingUnit struct { CustomMigration CustomMigrationSpec // Controls the scheduling behavior - SchedulingMode fedcorev1a1.SchedulingMode - StickyCluster bool - AvoidDisruption bool + SchedulingMode fedcorev1a1.SchedulingMode + ReplicasStrategy fedcorev1a1.ReplicasStrategy + StickyCluster bool + AvoidDisruption bool // Used to filter/select clusters ClusterSelector map[string]string @@ -82,6 +83,7 @@ type AutoMigrationSpec struct { type AutoMigrationInfo struct { // Describes the estimated max number of replicas a cluster can accommodate. EstimatedCapacity map[string]int64 `json:"estimatedCapacity,omitempty"` + ScheduledReplicas map[string]int64 `json:"scheduledReplicas,omitempty"` } type MigrationConfig struct { diff --git a/pkg/controllers/scheduler/framework/util.go b/pkg/controllers/scheduler/framework/util.go index 86e042ea..1730219e 100644 --- a/pkg/controllers/scheduler/framework/util.go +++ b/pkg/controllers/scheduler/framework/util.go @@ -498,3 +498,18 @@ func DefaultNormalizeScore(maxPriority int64, reverse bool, scores ClusterScoreL } return nil } + +func MinInt64(a int64, b int64) int64 { + if a < b { + return a + } + return b +} + +func ExtractClusterNames(clusters []*fedcorev1a1.FederatedCluster) []string { + ret := make([]string, len(clusters)) + for i := range clusters { + ret[i] = clusters[i].Name + } + return ret +} diff --git a/pkg/controllers/scheduler/profile.go b/pkg/controllers/scheduler/profile.go index 68f09ae3..18534314 100644 --- a/pkg/controllers/scheduler/profile.go +++ b/pkg/controllers/scheduler/profile.go @@ -33,6 +33,7 @@ import ( "github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework/plugins/maxcluster" "github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework/plugins/names" "github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework/plugins/placement" + "github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework/plugins/preferencebinpack" "github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework/plugins/rsp" "github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework/plugins/tainttoleration" "github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework/runtime" @@ -53,6 +54,7 @@ var inTreeRegistry = runtime.Registry{ names.MaxCluster: maxcluster.NewMaxCluster, names.ClusterCapacityWeight: rsp.NewClusterCapacityWeight, names.ClusterEvicted: clusterevicted.NewClusterEvicted, + names.PreferenceBinPack: preferencebinpack.NewPreferenceBinPack, } func applyProfile(base *fedcore.EnabledPlugins, profile *fedcorev1a1.SchedulingProfile) { @@ -90,8 +92,14 @@ func reconcileExtPoint(enabled []string, pluginSet fedcorev1a1.PluginSet) []stri func (s *Scheduler) createFramework( profile *fedcorev1a1.SchedulingProfile, handle framework.Handle, + replicasPlugin []string, ) (framework.Framework, error) { enabledPlugins := fedcorev1a1.GetDefaultEnabledPlugins() + + if len(replicasPlugin) != 0 { + enabledPlugins.ReplicasPlugins = replicasPlugin + } + if profile != nil { applyProfile(enabledPlugins, profile) } diff --git a/pkg/controllers/scheduler/scheduler.go b/pkg/controllers/scheduler/scheduler.go index ff6a8d2e..02874b84 100644 --- a/pkg/controllers/scheduler/scheduler.go +++ b/pkg/controllers/scheduler/scheduler.go @@ -46,6 +46,7 @@ import ( "github.com/kubewharf/kubeadmiral/pkg/controllers/common" "github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/core" frameworktypes "github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework" + "github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework/plugins/names" "github.com/kubewharf/kubeadmiral/pkg/stats" utilmetrics "github.com/kubewharf/kubeadmiral/pkg/stats/metrics" clusterutil "github.com/kubewharf/kubeadmiral/pkg/util/cluster" @@ -579,7 +580,12 @@ func (s *Scheduler) schedule( return nil, &worker.StatusError } - framework, err := s.createFramework(schedulingProfile, s.buildFrameworkHandle()) + var replicasPlugin []string + if schedulingUnit.ReplicasStrategy == fedcorev1a1.ReplicasStrategyBinpack { + replicasPlugin = []string{names.PreferenceBinPack} + } + + framework, err := s.createFramework(schedulingProfile, s.buildFrameworkHandle(), replicasPlugin) if err != nil { logger.Error(err, "Failed to construct scheduling profile") s.eventRecorder.Eventf( diff --git a/pkg/controllers/scheduler/scheduler_test.go b/pkg/controllers/scheduler/scheduler_test.go index f1e59899..bac3cb73 100644 --- a/pkg/controllers/scheduler/scheduler_test.go +++ b/pkg/controllers/scheduler/scheduler_test.go @@ -98,7 +98,8 @@ func TestGetSchedulingUnit(t *testing.T) { policy := fedcorev1a1.PropagationPolicy{ Spec: fedcorev1a1.PropagationPolicySpec{ - SchedulingMode: fedcorev1a1.SchedulingModeDuplicate, + SchedulingMode: fedcorev1a1.SchedulingModeDuplicate, + ReplicasStrategy: (*fedcorev1a1.ReplicasStrategy)(pointer.String(string(fedcorev1a1.ReplicasStrategySpread))), AutoMigration: &fedcorev1a1.AutoMigration{ KeepUnschedulableReplicas: false, }, @@ -150,9 +151,10 @@ func TestGetSchedulingUnit(t *testing.T) { Info: nil, KeepUnschedulableReplicas: false, }, - SchedulingMode: fedcorev1a1.SchedulingModeDuplicate, - StickyCluster: false, - AvoidDisruption: false, + SchedulingMode: fedcorev1a1.SchedulingModeDuplicate, + ReplicasStrategy: fedcorev1a1.ReplicasStrategySpread, + StickyCluster: false, + AvoidDisruption: false, })) } @@ -167,7 +169,8 @@ func TestGetSchedulingUnitWithAnnotationOverrides(t *testing.T) { name: "scheduling mode override", policy: &fedcorev1a1.PropagationPolicy{ Spec: fedcorev1a1.PropagationPolicySpec{ - SchedulingMode: fedcorev1a1.SchedulingModeDivide, + SchedulingMode: fedcorev1a1.SchedulingModeDivide, + ReplicasStrategy: (*fedcorev1a1.ReplicasStrategy)(pointer.String(string(fedcorev1a1.ReplicasStrategySpread))), ClusterSelector: map[string]string{ "label": "value1", }, @@ -177,7 +180,8 @@ func TestGetSchedulingUnitWithAnnotationOverrides(t *testing.T) { SchedulingModeAnnotation: string(fedcorev1a1.SchedulingModeDuplicate), }, expectedResult: &framework.SchedulingUnit{ - SchedulingMode: fedcorev1a1.SchedulingModeDuplicate, + SchedulingMode: fedcorev1a1.SchedulingModeDuplicate, + ReplicasStrategy: fedcorev1a1.ReplicasStrategySpread, ClusterSelector: map[string]string{ "label": "value1", }, @@ -197,8 +201,9 @@ func TestGetSchedulingUnitWithAnnotationOverrides(t *testing.T) { StickyClusterAnnotation: "false", }, expectedResult: &framework.SchedulingUnit{ - SchedulingMode: DefaultSchedulingMode, - StickyCluster: false, + SchedulingMode: DefaultSchedulingMode, + ReplicasStrategy: fedcorev1a1.ReplicasStrategySpread, + StickyCluster: false, ClusterSelector: map[string]string{ "label": "value1", }, @@ -218,8 +223,9 @@ func TestGetSchedulingUnitWithAnnotationOverrides(t *testing.T) { ClusterSelectorAnnotations: "{\"override\": \"label\"}", }, expectedResult: &framework.SchedulingUnit{ - SchedulingMode: DefaultSchedulingMode, - StickyCluster: true, + SchedulingMode: DefaultSchedulingMode, + ReplicasStrategy: fedcorev1a1.ReplicasStrategySpread, + StickyCluster: true, ClusterSelector: map[string]string{ "override": "label", }, @@ -229,7 +235,8 @@ func TestGetSchedulingUnitWithAnnotationOverrides(t *testing.T) { name: "cluster affinity override", policy: &fedcorev1a1.PropagationPolicy{ Spec: fedcorev1a1.PropagationPolicySpec{ - SchedulingMode: fedcorev1a1.SchedulingModeDuplicate, + SchedulingMode: fedcorev1a1.SchedulingModeDuplicate, + ReplicasStrategy: (*fedcorev1a1.ReplicasStrategy)(pointer.String(string(fedcorev1a1.ReplicasStrategySpread))), ClusterSelector: map[string]string{ "label": "value1", }, @@ -256,8 +263,9 @@ func TestGetSchedulingUnitWithAnnotationOverrides(t *testing.T) { }`, }, expectedResult: &framework.SchedulingUnit{ - SchedulingMode: fedcorev1a1.SchedulingModeDuplicate, - StickyCluster: true, + SchedulingMode: fedcorev1a1.SchedulingModeDuplicate, + ReplicasStrategy: fedcorev1a1.ReplicasStrategySpread, + StickyCluster: true, ClusterSelector: map[string]string{ "label": "value1", }, @@ -301,8 +309,9 @@ func TestGetSchedulingUnitWithAnnotationOverrides(t *testing.T) { TolerationsAnnotations: "[{\"key\": \"override\", \"operator\": \"Exists\", \"effect\": \"NoSchedule\"}]", }, expectedResult: &framework.SchedulingUnit{ - SchedulingMode: DefaultSchedulingMode, - StickyCluster: true, + SchedulingMode: DefaultSchedulingMode, + ReplicasStrategy: fedcorev1a1.ReplicasStrategySpread, + StickyCluster: true, ClusterSelector: map[string]string{ "label": "value1", }, @@ -329,7 +338,8 @@ func TestGetSchedulingUnitWithAnnotationOverrides(t *testing.T) { MaxClustersAnnotations: "10", }, expectedResult: &framework.SchedulingUnit{ - SchedulingMode: DefaultSchedulingMode, + SchedulingMode: DefaultSchedulingMode, + ReplicasStrategy: fedcorev1a1.ReplicasStrategySpread, ClusterSelector: map[string]string{ "label": "value1", }, @@ -371,7 +381,8 @@ func TestGetSchedulingUnitWithAnnotationOverrides(t *testing.T) { ]`, }, expectedResult: &framework.SchedulingUnit{ - SchedulingMode: DefaultSchedulingMode, + SchedulingMode: DefaultSchedulingMode, + ReplicasStrategy: fedcorev1a1.ReplicasStrategySpread, ClusterSelector: map[string]string{ "label": "value1", }, diff --git a/pkg/controllers/scheduler/schedulingunit.go b/pkg/controllers/scheduler/schedulingunit.go index 3a07fcab..a25908aa 100644 --- a/pkg/controllers/scheduler/schedulingunit.go +++ b/pkg/controllers/scheduler/schedulingunit.go @@ -46,7 +46,6 @@ func schedulingUnitForFedObject( if exists { schedulingMode = schedulingModeOverride } - var desiredReplicasOption *int64 if schedulingMode == fedcorev1a1.SchedulingModeDivide && typeConfig.Spec.PathDefinition.ReplicasSpec == "" { // TODO remove this check in favor of a DivideIfPossible mode @@ -83,6 +82,7 @@ func schedulingUnitForFedObject( AvoidDisruption: true, } + schedulingUnit.ReplicasStrategy = getReplicasStrategyFromPolicy(policy) if autoMigration := policy.GetSpec().AutoMigration; autoMigration != nil { info, err := getAutoMigrationInfo(fedObject) if err != nil { @@ -219,6 +219,18 @@ func getSchedulingModeFromPolicy(policy fedcorev1a1.GenericPropagationPolicy) fe return DefaultSchedulingMode } +func getReplicasStrategyFromPolicy(policy fedcorev1a1.GenericPropagationPolicy) fedcorev1a1.ReplicasStrategy { + if policy.GetSpec().ReplicasStrategy == nil { + return fedcorev1a1.ReplicasStrategySpread + } + + if *policy.GetSpec().ReplicasStrategy == fedcorev1a1.ReplicasStrategyBinpack { + return fedcorev1a1.ReplicasStrategyBinpack + } + + return fedcorev1a1.ReplicasStrategySpread +} + func getSchedulingModeFromObject(fedObject fedcorev1a1.GenericFederatedObject) (fedcorev1a1.SchedulingMode, bool) { annotations := fedObject.GetAnnotations() if annotations == nil {