Skip to content

Commit

Permalink
fix(scheduler): compensate current resource usage when calculating cl…
Browse files Browse the repository at this point in the history
…uster available
  • Loading branch information
SOF3 committed Apr 17, 2023
1 parent 4fcf830 commit dd872dd
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 36 deletions.
1 change: 1 addition & 0 deletions cmd/controller-manager/app/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func startGlobalScheduler(
controllerCtx.FedInformerFactory.Core().V1alpha1().FederatedClusters(),
controllerCtx.FedInformerFactory.Core().V1alpha1().SchedulingProfiles(),
controllerCtx.FedInformerFactory.Core().V1alpha1().SchedulerPluginWebhookConfigurations(),
controllerCtx.FederatedClientFactory,
controllerCtx.Metrics,
controllerCtx.WorkerCount,
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/federatedcluster/clusterstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func updateClusterResources(
}
}

allocatable, available := aggregateResources(nodes, pods)
allocatable, available := AggregateResources(nodes, pods)
clusterStatus.Resources = fedcorev1a1.Resources{
SchedulableNodes: &schedulableNodes,
Allocatable: allocatable,
Expand Down
30 changes: 21 additions & 9 deletions pkg/controllers/federatedcluster/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,10 @@ func getPodResourceRequests(pod *corev1.Pod) corev1.ResourceList {
return reqs
}

// aggregateResources returns
// AggregateResources returns
// - allocatable resources from the nodes and,
// - available resources after considering allocations to the given pods.
func aggregateResources(
func AggregateResources(
nodes []*corev1.Node,
pods []*corev1.Pod,
) (corev1.ResourceList, corev1.ResourceList) {
Expand All @@ -207,24 +207,36 @@ func aggregateResources(
// Don't consider pod resource for now
delete(allocatable, corev1.ResourcePods)

available := make(corev1.ResourceList)
for name, quantity := range allocatable {
available[name] = quantity.DeepCopy()
available := allocatable.DeepCopy()
usage := AggregatePodUsage(pods, func(pod *corev1.Pod) *corev1.Pod { return pod })

for name, quantity := range available {
// `quantity` is a copy here; pointer methods do not mutate `available[name]`
quantity.Sub(usage[name])
available[name] = quantity
}

return allocatable, available
}

func AggregatePodUsage[T any](pods []T, podFunc func(T) *corev1.Pod) corev1.ResourceList {
list := make(corev1.ResourceList)

for _, pod := range pods {
pod := podFunc(pod)

if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed {
continue
}

podRequests := getPodResourceRequests(pod)
for name, requestedQuantity := range podRequests {
if availableQuantity, ok := available[name]; ok {
availableQuantity.Sub(requestedQuantity)
available[name] = availableQuantity
if q, exists := list[name]; exists {
requestedQuantity.Add(q)
}
list[name] = requestedQuantity
}
}

return allocatable, available
return list
}
2 changes: 1 addition & 1 deletion pkg/controllers/federatedcluster/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func Test_aggregateResources(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
allocatable, available := aggregateResources(tc.nodes, tc.pods)
allocatable, available := AggregateResources(tc.nodes, tc.pods)
if len(allocatable) != len(tc.expectedAllocatable) {
t.Fatalf("expected allocatable %s differs from actual allocatable %s", spew.Sdump(tc.expectedAllocatable), spew.Sdump(allocatable))
}
Expand Down
31 changes: 13 additions & 18 deletions pkg/controllers/scheduler/framework/plugins/rsp/rsp.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,7 @@ const (
allocatableResource string = "allocatable"
)

var (
ErrNoCPUResource = errors.New("no cpu resource")
)
var ErrNoCPUResource = errors.New("no cpu resource")

type ClusterCapacityWeight struct{}

Expand All @@ -77,12 +75,12 @@ func (pl *ClusterCapacityWeight) ReplicaScheduling(

var schedulingWeights map[string]int64
if dynamicSchedulingEnabled {
clusterAvailables := QueryClusterResource(clusters, availableResource)
clusterAvailables := QueryAvailable(clusters, su)
if len(clusters) != len(clusterAvailables) {
return clusterReplicasList, framework.NewResult(framework.Error)
}

weightLimit, err := CalcWeightLimit(clusters, supplyLimitProportion)
weightLimit, err := CalcWeightLimit(clusters, supplyLimitProportion, su)
if err != nil {
return clusterReplicasList, framework.NewResult(
framework.Error,
Expand Down Expand Up @@ -184,8 +182,9 @@ func (pl *ClusterCapacityWeight) ReplicaScheduling(
func CalcWeightLimit(
clusters []*fedcorev1a1.FederatedCluster,
supplyLimitRatio float64,
su *framework.SchedulingUnit,
) (weightLimit map[string]int64, err error) {
allocatables := QueryClusterResource(clusters, allocatableResource)
allocatables := QueryAllocatable(clusters)
if len(allocatables) != len(clusters) {
err = fmt.Errorf("allocatables are incomplete: %v", allocatables)
return
Expand Down Expand Up @@ -272,19 +271,8 @@ func AvailableToPercentage(
return
}

// QueryClusterResource aggregate cluster resources, accept available and allocatable.
func QueryClusterResource(clusters []*fedcorev1a1.FederatedCluster, resource string) map[string]corev1.ResourceList {
switch resource {
case availableResource:
return QueryAvailable(clusters)
case allocatableResource:
return QueryAllocatable(clusters)
}
return nil
}

// QueryAvailable aggregate cluster available resource.
func QueryAvailable(clusters []*fedcorev1a1.FederatedCluster) map[string]corev1.ResourceList {
func QueryAvailable(clusters []*fedcorev1a1.FederatedCluster, schedulingUnit *framework.SchedulingUnit) map[string]corev1.ResourceList {
ret := make(map[string]corev1.ResourceList)
for _, cluster := range clusters {
available := make(corev1.ResourceList)
Expand All @@ -299,6 +287,13 @@ func QueryAvailable(clusters []*fedcorev1a1.FederatedCluster) map[string]corev1.
available[resourceName] = cluster.Status.Resources.Available[resourceName]
}
}

usageTmp := schedulingUnit.CurrentUsage[cluster.Name]
usage := *usageTmp.Clone()

usage.Add(available)
available = usage.ResourceList()

ret[cluster.GetName()] = available
}
return ret
Expand Down
1 change: 1 addition & 0 deletions pkg/controllers/scheduler/framework/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type SchedulingUnit struct {
// Describes the current scheduling state
CurrentClusters map[string]*int64
AutoMigration *AutoMigrationSpec
CurrentUsage map[string]Resource

// Controls the scheduling behavior
SchedulingMode fedcorev1a1.SchedulingMode
Expand Down
19 changes: 12 additions & 7 deletions pkg/controllers/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
annotationutil "github.com/kubewharf/kubeadmiral/pkg/controllers/util/annotation"
"github.com/kubewharf/kubeadmiral/pkg/controllers/util/delayingdeliver"
"github.com/kubewharf/kubeadmiral/pkg/controllers/util/eventsink"
"github.com/kubewharf/kubeadmiral/pkg/controllers/util/federatedclient"
"github.com/kubewharf/kubeadmiral/pkg/controllers/util/pendingcontrollers"
schemautil "github.com/kubewharf/kubeadmiral/pkg/controllers/util/schema"
"github.com/kubewharf/kubeadmiral/pkg/controllers/util/worker"
Expand Down Expand Up @@ -86,6 +87,8 @@ type Scheduler struct {
webhookConfigurationSynced cache.InformerSynced
webhookPlugins sync.Map

federatedClient federatedclient.FederatedClientFactory

worker worker.ReconcileWorker
eventRecorder record.EventRecorder

Expand All @@ -106,18 +109,20 @@ func NewScheduler(
clusterInformer fedcorev1a1informers.FederatedClusterInformer,
schedulingProfileInformer fedcorev1a1informers.SchedulingProfileInformer,
webhookConfigurationInformer fedcorev1a1informers.SchedulerPluginWebhookConfigurationInformer,
federatedClient federatedclient.FederatedClientFactory,
metrics stats.Metrics,
workerCount int,
) (*Scheduler, error) {
schedulerName := fmt.Sprintf("%s-scheduler", typeConfig.GetFederatedType().Name)

s := &Scheduler{
typeConfig: typeConfig,
name: schedulerName,
fedClient: fedClient,
dynamicClient: dynamicClient,
metrics: metrics,
logger: klog.LoggerWithName(klog.Background(), schedulerName),
typeConfig: typeConfig,
name: schedulerName,
fedClient: fedClient,
dynamicClient: dynamicClient,
federatedClient: federatedClient,
metrics: metrics,
logger: klog.LoggerWithName(klog.Background(), schedulerName),
}

s.worker = worker.NewReconcileWorker(
Expand Down Expand Up @@ -353,7 +358,7 @@ func (s *Scheduler) reconcile(qualifiedName common.QualifiedName) (status worker
policyKey.String(),
)

schedulingUnit, err := s.schedulingUnitForFedObject(fedObject, policy)
schedulingUnit, err := s.schedulingUnitForFedObject(context.TODO(), fedObject, policy)
if err != nil {
keyedLogger.Error(err, "Failed to get scheduling unit")
s.eventRecorder.Eventf(
Expand Down
60 changes: 60 additions & 0 deletions pkg/controllers/scheduler/schedulingunit.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,29 @@ limitations under the License.
package scheduler

import (
"context"
"encoding/json"
"fmt"
"strconv"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/klog/v2"

fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1"
"github.com/kubewharf/kubeadmiral/pkg/controllers/common"
"github.com/kubewharf/kubeadmiral/pkg/controllers/federatedcluster"
"github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework"
"github.com/kubewharf/kubeadmiral/pkg/controllers/util"
utilunstructured "github.com/kubewharf/kubeadmiral/pkg/controllers/util/unstructured"
)

func (s *Scheduler) schedulingUnitForFedObject(
ctx context.Context,
fedObject *unstructured.Unstructured,
policy fedcorev1a1.GenericPropagationPolicy,
) (*framework.SchedulingUnit, error) {
Expand Down Expand Up @@ -75,6 +79,12 @@ func (s *Scheduler) schedulingUnitForFedObject(
return nil, err
}

var currentUsage map[string]framework.Resource
selectorPath := s.typeConfig.Spec.PathDefinition.LabelSelector
if selectorPath != "" {
currentUsage, err = s.getPodUsage(ctx, fedObject, selectorPath)
}

schedulingUnit := &framework.SchedulingUnit{
GroupVersion: schema.GroupVersion{Group: targetType.Group, Version: targetType.Version},
Kind: targetType.Kind,
Expand All @@ -85,6 +95,7 @@ func (s *Scheduler) schedulingUnitForFedObject(
Annotations: objectMeta.GetAnnotations(),
DesiredReplicas: desiredReplicasOption,
CurrentClusters: currentReplicas,
CurrentUsage: currentUsage,
AvoidDisruption: true,
}

Expand Down Expand Up @@ -162,6 +173,55 @@ func (s *Scheduler) schedulingUnitForFedObject(
return schedulingUnit, nil
}

func (s *Scheduler) getPodUsage(ctx context.Context, fedObject *unstructured.Unstructured, selectorPath string) (map[string]framework.Resource, error) {
clusters, err := s.clusterLister.List(labels.Everything())
if err != nil {
return nil, fmt.Errorf("failed to get clusters from store: %w", err)
}

selector, err := utilunstructured.GetLabelSelectorFromPath(fedObject, selectorPath, common.TemplatePath)
if err != nil {
return nil, fmt.Errorf("invalid label selector: %w", err)
}

currentUsage := make(map[string]framework.Resource, len(clusters))

// this loop is intentionally not parallelized to reduce memory overhead.
for _, cluster := range clusters {
if !util.IsClusterJoined(&cluster.Status) {
continue
}

currentUsage[cluster.Name], err = s.getClusterPodUsage(ctx, cluster, fedObject, selector)
if err != nil {
return nil, fmt.Errorf("failed to get pod resource usage in cluster %q: %w", cluster.Name, err)
}
}

return currentUsage, nil
}

func (s *Scheduler) getClusterPodUsage(ctx context.Context, cluster *fedcorev1a1.FederatedCluster, fedObject *unstructured.Unstructured, selector *metav1.LabelSelector) (res framework.Resource, err error) {
client, exists, err := s.federatedClient.KubeClientsetForCluster(cluster.Name)
if err != nil {
return res, fmt.Errorf("get clientset: %w", err)
}
if !exists {
return res, fmt.Errorf("clientset does not exist yet") // wait for the clientset to get created
}

pods, err := client.CoreV1().Pods(metav1.NamespaceAll).List(ctx, metav1.ListOptions{
ResourceVersion: "0",
LabelSelector: metav1.FormatLabelSelector(selector),
})
if err != nil {
return res, fmt.Errorf("cannot list pods: %w", err)
}

usage := federatedcluster.AggregatePodUsage(pods.Items, func(pod corev1.Pod) *corev1.Pod { return &pod })
return *framework.NewResource(usage), nil
}

func getTemplateObjectMeta(fedObject *unstructured.Unstructured) (*metav1.ObjectMeta, error) {
templateContent, exists, err := unstructured.NestedMap(fedObject.Object, common.TemplatePath...)
if err != nil {
Expand Down

0 comments on commit dd872dd

Please sign in to comment.