Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(rsp): current pod resource usage should be included in resource calculation #21

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions cmd/controller-manager/app/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func startGlobalScheduler(
controllerCtx.FedInformerFactory.Core().V1alpha1().FederatedClusters(),
controllerCtx.FedInformerFactory.Core().V1alpha1().SchedulingProfiles(),
controllerCtx.FedInformerFactory.Core().V1alpha1().SchedulerPluginWebhookConfigurations(),
controllerCtx.FederatedClientFactory,
controllerCtx.Metrics,
controllerCtx.WorkerCount,
)
Expand Down
5 changes: 3 additions & 2 deletions pkg/controllers/federatedcluster/clusterstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (

fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1"
fedclient "github.com/kubewharf/kubeadmiral/pkg/client/clientset/versioned"
clusterresource "github.com/kubewharf/kubeadmiral/pkg/controllers/federatedcluster/resource"
"github.com/kubewharf/kubeadmiral/pkg/controllers/util/federatedclient"
)

Expand Down Expand Up @@ -186,12 +187,12 @@ func updateClusterResources(

schedulableNodes := int64(0)
for _, node := range nodes {
if isNodeSchedulable(node) {
if clusterresource.IsNodeSchedulable(node) {
schedulableNodes++
}
}

allocatable, available := aggregateResources(nodes, pods)
SOF3 marked this conversation as resolved.
Show resolved Hide resolved
allocatable, available := clusterresource.AggregateResources(nodes, pods)
clusterStatus.Resources = fedcorev1a1.Resources{
SchedulableNodes: &schedulableNodes,
Allocatable: allocatable,
Expand Down
136 changes: 136 additions & 0 deletions pkg/controllers/federatedcluster/resource/resource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
Copyright 2023 The KubeAdmiral Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package clusterresource

import (
corev1 "k8s.io/api/core/v1"
)

// AggregateResources returns
// - allocatable resources from the nodes and,
// - available resources after considering allocations to the given pods.
func AggregateResources(
nodes []*corev1.Node,
pods []*corev1.Pod,
) (corev1.ResourceList, corev1.ResourceList) {
allocatable := make(corev1.ResourceList)
for _, node := range nodes {
if !IsNodeSchedulable(node) {
continue
}

addResources(node.Status.Allocatable, allocatable)
}

// Don't consider pod resource for now
delete(allocatable, corev1.ResourcePods)

available := allocatable.DeepCopy()
usage := AggregatePodUsage(pods, func(pod *corev1.Pod) *corev1.Pod { return pod })

for name, quantity := range available {
// `quantity` is a copy here; pointer methods do not mutate `available[name]`
quantity.Sub(usage[name])
available[name] = quantity
}

return allocatable, available
}

func AggregatePodUsage[T any](pods []T, podFunc func(T) *corev1.Pod) corev1.ResourceList {
list := make(corev1.ResourceList)

for _, pod := range pods {
pod := podFunc(pod)

if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed {
continue
}

podRequests := getPodResourceRequests(pod)
for name, requestedQuantity := range podRequests {
if q, exists := list[name]; exists {
requestedQuantity.Add(q)
}
list[name] = requestedQuantity
}
}

return list
}

// IsNodeSchedulable returns true if node is ready and schedulable, otherwise false.
func IsNodeSchedulable(node *corev1.Node) bool {
if node.Spec.Unschedulable {
return false
}

for _, taint := range node.Spec.Taints {
if taint.Effect == corev1.TaintEffectNoSchedule ||
taint.Effect == corev1.TaintEffectNoExecute {
return false
}
}

for _, condition := range node.Status.Conditions {
if condition.Type == corev1.NodeReady && condition.Status != corev1.ConditionTrue {
return false
}
}
return true
}

func addResources(src, dest corev1.ResourceList) {
for k, v := range src {
if prevVal, ok := dest[k]; ok {
prevVal.Add(v)
dest[k] = prevVal
} else {
dest[k] = v.DeepCopy()
}
}
}

// maxResources sets dst to the greater of dst/src for every resource in src
func maxResources(src, dst corev1.ResourceList) {
for name, srcQuantity := range src {
if dstQuantity, ok := dst[name]; !ok || srcQuantity.Cmp(dstQuantity) > 0 {
dst[name] = srcQuantity.DeepCopy()
}
}
}

// podResourceRequest = max(sum(podSpec.Containers), podSpec.InitContainers...) + overHead
func getPodResourceRequests(pod *corev1.Pod) corev1.ResourceList {
reqs := make(corev1.ResourceList)

for _, container := range pod.Spec.Containers {
addResources(container.Resources.Requests, reqs)
}

for _, container := range pod.Spec.InitContainers {
maxResources(container.Resources.Requests, reqs)
}

// if PodOverhead feature is supported, add overhead for running a pod
// to the sum of requests and to non-zero limits:
if pod.Spec.Overhead != nil {
addResources(pod.Spec.Overhead, reqs)
}

return reqs
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,16 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package federatedcluster
package clusterresource_test

import (
"testing"

"github.com/davecgh/go-spew/spew"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"

clusterresource "github.com/kubewharf/kubeadmiral/pkg/controllers/federatedcluster/resource"
)

func Test_aggregateResources(t *testing.T) {
Expand Down Expand Up @@ -260,13 +262,21 @@ func Test_aggregateResources(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
allocatable, available := aggregateResources(tc.nodes, tc.pods)
allocatable, available := clusterresource.AggregateResources(tc.nodes, tc.pods)
if len(allocatable) != len(tc.expectedAllocatable) {
t.Fatalf("expected allocatable %s differs from actual allocatable %s", spew.Sdump(tc.expectedAllocatable), spew.Sdump(allocatable))
t.Fatalf(
"expected allocatable %s differs from actual allocatable %s",
spew.Sdump(tc.expectedAllocatable),
spew.Sdump(allocatable),
)
}
for name, actualQuantity := range allocatable {
if expectedQuantity, ok := tc.expectedAllocatable[name]; !ok || !actualQuantity.Equal(expectedQuantity) {
t.Fatalf("expected allocatable %s differs from actual allocatable %s", spew.Sdump(tc.expectedAllocatable), spew.Sdump(allocatable))
t.Fatalf(
"expected allocatable %s differs from actual allocatable %s",
spew.Sdump(tc.expectedAllocatable),
spew.Sdump(allocatable),
)
}
}

Expand All @@ -275,7 +285,11 @@ func Test_aggregateResources(t *testing.T) {
}
for name, actualQuantity := range available {
if expectedQuantity, ok := tc.expectedAvailable[name]; !ok || !actualQuantity.Equal(expectedQuantity) {
t.Fatalf("expected available %s differs from actual available %s", spew.Sdump(tc.expectedAvailable), spew.Sdump(available))
t.Fatalf(
"expected available %s differs from actual available %s",
spew.Sdump(tc.expectedAvailable),
spew.Sdump(available),
)
}
}
})
Expand Down
103 changes: 0 additions & 103 deletions pkg/controllers/federatedcluster/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,106 +112,3 @@ func isClusterJoined(status *fedcorev1a1.FederatedClusterStatus) (joined, failed

return false, false
}

// isNodeSchedulable returns true if node is ready and schedulable, otherwise false.
func isNodeSchedulable(node *corev1.Node) bool {
if node.Spec.Unschedulable {
return false
}

for _, taint := range node.Spec.Taints {
if taint.Effect == corev1.TaintEffectNoSchedule ||
taint.Effect == corev1.TaintEffectNoExecute {
return false
}
}

for _, condition := range node.Status.Conditions {
if condition.Type == corev1.NodeReady && condition.Status != corev1.ConditionTrue {
return false
}
}
return true
}

func addResources(src, dest corev1.ResourceList) {
for k, v := range src {
if prevVal, ok := dest[k]; ok {
prevVal.Add(v)
dest[k] = prevVal
} else {
dest[k] = v.DeepCopy()
}
}
}

// maxResources sets dst to the greater of dst/src for every resource in src
func maxResources(src, dst corev1.ResourceList) {
for name, srcQuantity := range src {
if dstQuantity, ok := dst[name]; !ok || srcQuantity.Cmp(dstQuantity) > 0 {
dst[name] = srcQuantity.DeepCopy()
}
}
}

// podResourceRequest = max(sum(podSpec.Containers), podSpec.InitContainers...) + overHead
func getPodResourceRequests(pod *corev1.Pod) corev1.ResourceList {
reqs := make(corev1.ResourceList)

for _, container := range pod.Spec.Containers {
addResources(container.Resources.Requests, reqs)
}

for _, container := range pod.Spec.InitContainers {
maxResources(container.Resources.Requests, reqs)
}

// if PodOverhead feature is supported, add overhead for running a pod
// to the sum of requests and to non-zero limits:
if pod.Spec.Overhead != nil {
addResources(pod.Spec.Overhead, reqs)
}

return reqs
}

// aggregateResources returns
// - allocatable resources from the nodes and,
// - available resources after considering allocations to the given pods.
func aggregateResources(
nodes []*corev1.Node,
pods []*corev1.Pod,
) (corev1.ResourceList, corev1.ResourceList) {
allocatable := make(corev1.ResourceList)
for _, node := range nodes {
if !isNodeSchedulable(node) {
continue
}

addResources(node.Status.Allocatable, allocatable)
}

// Don't consider pod resource for now
delete(allocatable, corev1.ResourcePods)

available := make(corev1.ResourceList)
for name, quantity := range allocatable {
available[name] = quantity.DeepCopy()
}

for _, pod := range pods {
if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed {
continue
}

podRequests := getPodResourceRequests(pod)
for name, requestedQuantity := range podRequests {
if availableQuantity, ok := available[name]; ok {
availableQuantity.Sub(requestedQuantity)
available[name] = availableQuantity
}
}
}

return allocatable, available
}
7 changes: 6 additions & 1 deletion pkg/controllers/override/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,12 @@ func TestLookForMatchedPolicies(t *testing.T) {
}
}

foundPolicies, needsRecheckOnError, err := lookForMatchedPolicies(obj, isNamespaced, overridePolicyStore, clusterOverridePolicyStore)
foundPolicies, needsRecheckOnError, err := lookForMatchedPolicies(
obj,
isNamespaced,
overridePolicyStore,
clusterOverridePolicyStore,
)
if (err != nil) != testCase.isErrorExpected {
t.Fatalf("err = %v, but isErrorExpected = %v", err, testCase.isErrorExpected)
}
Expand Down
24 changes: 10 additions & 14 deletions pkg/controllers/scheduler/framework/plugins/rsp/rsp.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (pl *ClusterCapacityWeight) ReplicaScheduling(

var schedulingWeights map[string]int64
if dynamicSchedulingEnabled {
clusterAvailables := QueryClusterResource(clusters, availableResource)
clusterAvailables := QueryAvailable(clusters, su.CurrentUsage)
if len(clusters) != len(clusterAvailables) {
return clusterReplicasList, framework.NewResult(framework.Error)
}
Expand Down Expand Up @@ -180,7 +180,7 @@ func CalcWeightLimit(
clusters []*fedcorev1a1.FederatedCluster,
supplyLimitRatio float64,
) (weightLimit map[string]int64, err error) {
allocatables := QueryClusterResource(clusters, allocatableResource)
allocatables := QueryAllocatable(clusters)
if len(allocatables) != len(clusters) {
err = fmt.Errorf("allocatables are incomplete: %v", allocatables)
return
Expand Down Expand Up @@ -267,19 +267,8 @@ func AvailableToPercentage(
return
}

// QueryClusterResource aggregate cluster resources, accept available and allocatable.
func QueryClusterResource(clusters []*fedcorev1a1.FederatedCluster, resource string) map[string]corev1.ResourceList {
switch resource {
case availableResource:
return QueryAvailable(clusters)
case allocatableResource:
return QueryAllocatable(clusters)
}
return nil
}

// QueryAvailable aggregate cluster available resource.
func QueryAvailable(clusters []*fedcorev1a1.FederatedCluster) map[string]corev1.ResourceList {
func QueryAvailable(clusters []*fedcorev1a1.FederatedCluster, currentUsage map[string]framework.Resource) map[string]corev1.ResourceList {
ret := make(map[string]corev1.ResourceList)
for _, cluster := range clusters {
available := make(corev1.ResourceList)
Expand All @@ -294,6 +283,13 @@ func QueryAvailable(clusters []*fedcorev1a1.FederatedCluster) map[string]corev1.
available[resourceName] = cluster.Status.Resources.Available[resourceName]
}
}

usageTmp := currentUsage[cluster.Name]
usage := *usageTmp.Clone()

usage.Add(available)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment here to explain why this is necessary

available = usage.ResourceList()

ret[cluster.GetName()] = available
}
return ret
Expand Down
Loading