diff --git a/cmd/controller-manager/app/core.go b/cmd/controller-manager/app/core.go index a98e2508..8099d08c 100644 --- a/cmd/controller-manager/app/core.go +++ b/cmd/controller-manager/app/core.go @@ -151,6 +151,7 @@ func startGlobalScheduler( controllerCtx.FedInformerFactory.Core().V1alpha1().FederatedClusters(), controllerCtx.FedInformerFactory.Core().V1alpha1().SchedulingProfiles(), controllerCtx.FedInformerFactory.Core().V1alpha1().SchedulerPluginWebhookConfigurations(), + controllerCtx.FederatedClientFactory, controllerCtx.Metrics, controllerCtx.WorkerCount, ) diff --git a/pkg/controllers/federatedcluster/clusterstatus.go b/pkg/controllers/federatedcluster/clusterstatus.go index aa6da674..742712a7 100644 --- a/pkg/controllers/federatedcluster/clusterstatus.go +++ b/pkg/controllers/federatedcluster/clusterstatus.go @@ -184,7 +184,7 @@ func updateClusterResources( } } - allocatable, available := aggregateResources(nodes, pods) + allocatable, available := AggregateResources(nodes, pods) clusterStatus.Resources = fedcorev1a1.Resources{ SchedulableNodes: &schedulableNodes, Allocatable: allocatable, diff --git a/pkg/controllers/federatedcluster/util.go b/pkg/controllers/federatedcluster/util.go index 63319731..dcb5ce72 100644 --- a/pkg/controllers/federatedcluster/util.go +++ b/pkg/controllers/federatedcluster/util.go @@ -188,10 +188,10 @@ func getPodResourceRequests(pod *corev1.Pod) corev1.ResourceList { return reqs } -// aggregateResources returns +// AggregateResources returns // - allocatable resources from the nodes and, // - available resources after considering allocations to the given pods. -func aggregateResources( +func AggregateResources( nodes []*corev1.Node, pods []*corev1.Pod, ) (corev1.ResourceList, corev1.ResourceList) { @@ -207,24 +207,36 @@ func aggregateResources( // Don't consider pod resource for now delete(allocatable, corev1.ResourcePods) - available := make(corev1.ResourceList) - for name, quantity := range allocatable { - available[name] = quantity.DeepCopy() + available := allocatable.DeepCopy() + usage := AggregatePodUsage(pods, func(pod *corev1.Pod) *corev1.Pod { return pod }) + + for name, quantity := range available { + // `quantity` is a copy here; pointer methods do not mutate `available[name]` + quantity.Sub(usage[name]) + available[name] = quantity } + return allocatable, available +} + +func AggregatePodUsage[T any](pods []T, podFunc func(T) *corev1.Pod) corev1.ResourceList { + list := make(corev1.ResourceList) + for _, pod := range pods { + pod := podFunc(pod) + if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed { continue } podRequests := getPodResourceRequests(pod) for name, requestedQuantity := range podRequests { - if availableQuantity, ok := available[name]; ok { - availableQuantity.Sub(requestedQuantity) - available[name] = availableQuantity + if q, exists := list[name]; exists { + requestedQuantity.Add(q) } + list[name] = requestedQuantity } } - return allocatable, available + return list } diff --git a/pkg/controllers/federatedcluster/util_test.go b/pkg/controllers/federatedcluster/util_test.go index 2639b734..346cee16 100644 --- a/pkg/controllers/federatedcluster/util_test.go +++ b/pkg/controllers/federatedcluster/util_test.go @@ -260,7 +260,7 @@ func Test_aggregateResources(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - allocatable, available := aggregateResources(tc.nodes, tc.pods) + allocatable, available := AggregateResources(tc.nodes, tc.pods) if len(allocatable) != len(tc.expectedAllocatable) { t.Fatalf("expected allocatable %s differs from actual allocatable %s", spew.Sdump(tc.expectedAllocatable), spew.Sdump(allocatable)) } diff --git a/pkg/controllers/scheduler/framework/plugins/rsp/rsp.go b/pkg/controllers/scheduler/framework/plugins/rsp/rsp.go index a47481b9..f7acf1d3 100644 --- a/pkg/controllers/scheduler/framework/plugins/rsp/rsp.go +++ b/pkg/controllers/scheduler/framework/plugins/rsp/rsp.go @@ -51,9 +51,7 @@ const ( allocatableResource string = "allocatable" ) -var ( - ErrNoCPUResource = errors.New("no cpu resource") -) +var ErrNoCPUResource = errors.New("no cpu resource") type ClusterCapacityWeight struct{} @@ -77,12 +75,12 @@ func (pl *ClusterCapacityWeight) ReplicaScheduling( var schedulingWeights map[string]int64 if dynamicSchedulingEnabled { - clusterAvailables := QueryClusterResource(clusters, availableResource) + clusterAvailables := QueryAvailable(clusters, su) if len(clusters) != len(clusterAvailables) { return clusterReplicasList, framework.NewResult(framework.Error) } - weightLimit, err := CalcWeightLimit(clusters, supplyLimitProportion) + weightLimit, err := CalcWeightLimit(clusters, supplyLimitProportion, su) if err != nil { return clusterReplicasList, framework.NewResult( framework.Error, @@ -184,8 +182,9 @@ func (pl *ClusterCapacityWeight) ReplicaScheduling( func CalcWeightLimit( clusters []*fedcorev1a1.FederatedCluster, supplyLimitRatio float64, + su *framework.SchedulingUnit, ) (weightLimit map[string]int64, err error) { - allocatables := QueryClusterResource(clusters, allocatableResource) + allocatables := QueryAllocatable(clusters) if len(allocatables) != len(clusters) { err = fmt.Errorf("allocatables are incomplete: %v", allocatables) return @@ -272,19 +271,8 @@ func AvailableToPercentage( return } -// QueryClusterResource aggregate cluster resources, accept available and allocatable. -func QueryClusterResource(clusters []*fedcorev1a1.FederatedCluster, resource string) map[string]corev1.ResourceList { - switch resource { - case availableResource: - return QueryAvailable(clusters) - case allocatableResource: - return QueryAllocatable(clusters) - } - return nil -} - // QueryAvailable aggregate cluster available resource. -func QueryAvailable(clusters []*fedcorev1a1.FederatedCluster) map[string]corev1.ResourceList { +func QueryAvailable(clusters []*fedcorev1a1.FederatedCluster, schedulingUnit *framework.SchedulingUnit) map[string]corev1.ResourceList { ret := make(map[string]corev1.ResourceList) for _, cluster := range clusters { available := make(corev1.ResourceList) @@ -299,6 +287,13 @@ func QueryAvailable(clusters []*fedcorev1a1.FederatedCluster) map[string]corev1. available[resourceName] = cluster.Status.Resources.Available[resourceName] } } + + usageTmp := schedulingUnit.CurrentUsage[cluster.Name] + usage := *usageTmp.Clone() + + usage.Add(available) + available = usage.ResourceList() + ret[cluster.GetName()] = available } return ret diff --git a/pkg/controllers/scheduler/framework/types.go b/pkg/controllers/scheduler/framework/types.go index 74a215f8..bdc2be9e 100644 --- a/pkg/controllers/scheduler/framework/types.go +++ b/pkg/controllers/scheduler/framework/types.go @@ -51,6 +51,7 @@ type SchedulingUnit struct { // Describes the current scheduling state CurrentClusters map[string]*int64 AutoMigration *AutoMigrationSpec + CurrentUsage map[string]Resource // Controls the scheduling behavior SchedulingMode fedcorev1a1.SchedulingMode diff --git a/pkg/controllers/scheduler/scheduler.go b/pkg/controllers/scheduler/scheduler.go index 0747f8b9..c2d8bf6f 100644 --- a/pkg/controllers/scheduler/scheduler.go +++ b/pkg/controllers/scheduler/scheduler.go @@ -50,6 +50,7 @@ import ( annotationutil "github.com/kubewharf/kubeadmiral/pkg/controllers/util/annotation" "github.com/kubewharf/kubeadmiral/pkg/controllers/util/delayingdeliver" "github.com/kubewharf/kubeadmiral/pkg/controllers/util/eventsink" + "github.com/kubewharf/kubeadmiral/pkg/controllers/util/federatedclient" "github.com/kubewharf/kubeadmiral/pkg/controllers/util/pendingcontrollers" schemautil "github.com/kubewharf/kubeadmiral/pkg/controllers/util/schema" "github.com/kubewharf/kubeadmiral/pkg/controllers/util/worker" @@ -86,6 +87,8 @@ type Scheduler struct { webhookConfigurationSynced cache.InformerSynced webhookPlugins sync.Map + federatedClient federatedclient.FederatedClientFactory + worker worker.ReconcileWorker eventRecorder record.EventRecorder @@ -106,18 +109,20 @@ func NewScheduler( clusterInformer fedcorev1a1informers.FederatedClusterInformer, schedulingProfileInformer fedcorev1a1informers.SchedulingProfileInformer, webhookConfigurationInformer fedcorev1a1informers.SchedulerPluginWebhookConfigurationInformer, + federatedClient federatedclient.FederatedClientFactory, metrics stats.Metrics, workerCount int, ) (*Scheduler, error) { schedulerName := fmt.Sprintf("%s-scheduler", typeConfig.GetFederatedType().Name) s := &Scheduler{ - typeConfig: typeConfig, - name: schedulerName, - fedClient: fedClient, - dynamicClient: dynamicClient, - metrics: metrics, - logger: klog.LoggerWithName(klog.Background(), schedulerName), + typeConfig: typeConfig, + name: schedulerName, + fedClient: fedClient, + dynamicClient: dynamicClient, + federatedClient: federatedClient, + metrics: metrics, + logger: klog.LoggerWithName(klog.Background(), schedulerName), } s.worker = worker.NewReconcileWorker( @@ -353,7 +358,7 @@ func (s *Scheduler) reconcile(qualifiedName common.QualifiedName) (status worker policyKey.String(), ) - schedulingUnit, err := s.schedulingUnitForFedObject(fedObject, policy) + schedulingUnit, err := s.schedulingUnitForFedObject(context.TODO(), fedObject, policy) if err != nil { keyedLogger.Error(err, "Failed to get scheduling unit") s.eventRecorder.Eventf( diff --git a/pkg/controllers/scheduler/schedulingunit.go b/pkg/controllers/scheduler/schedulingunit.go index 8c2868cb..7edba14c 100644 --- a/pkg/controllers/scheduler/schedulingunit.go +++ b/pkg/controllers/scheduler/schedulingunit.go @@ -17,6 +17,7 @@ limitations under the License. package scheduler import ( + "context" "encoding/json" "fmt" "strconv" @@ -24,18 +25,21 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/klog/v2" fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1" "github.com/kubewharf/kubeadmiral/pkg/controllers/common" + "github.com/kubewharf/kubeadmiral/pkg/controllers/federatedcluster" "github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework" "github.com/kubewharf/kubeadmiral/pkg/controllers/util" utilunstructured "github.com/kubewharf/kubeadmiral/pkg/controllers/util/unstructured" ) func (s *Scheduler) schedulingUnitForFedObject( + ctx context.Context, fedObject *unstructured.Unstructured, policy fedcorev1a1.GenericPropagationPolicy, ) (*framework.SchedulingUnit, error) { @@ -75,6 +79,12 @@ func (s *Scheduler) schedulingUnitForFedObject( return nil, err } + var currentUsage map[string]framework.Resource + selectorPath := s.typeConfig.Spec.PathDefinition.LabelSelector + if selectorPath != "" { + currentUsage, err = s.getPodUsage(ctx, fedObject, selectorPath) + } + schedulingUnit := &framework.SchedulingUnit{ GroupVersion: schema.GroupVersion{Group: targetType.Group, Version: targetType.Version}, Kind: targetType.Kind, @@ -85,6 +95,7 @@ func (s *Scheduler) schedulingUnitForFedObject( Annotations: objectMeta.GetAnnotations(), DesiredReplicas: desiredReplicasOption, CurrentClusters: currentReplicas, + CurrentUsage: currentUsage, AvoidDisruption: true, } @@ -162,6 +173,55 @@ func (s *Scheduler) schedulingUnitForFedObject( return schedulingUnit, nil } +func (s *Scheduler) getPodUsage(ctx context.Context, fedObject *unstructured.Unstructured, selectorPath string) (map[string]framework.Resource, error) { + clusters, err := s.clusterLister.List(labels.Everything()) + if err != nil { + return nil, fmt.Errorf("failed to get clusters from store: %w", err) + } + + selector, err := utilunstructured.GetLabelSelectorFromPath(fedObject, selectorPath, common.TemplatePath) + if err != nil { + return nil, fmt.Errorf("invalid label selector: %w", err) + } + + currentUsage := make(map[string]framework.Resource, len(clusters)) + + // this loop is intentionally not parallelized to reduce memory overhead. + for _, cluster := range clusters { + if !util.IsClusterJoined(&cluster.Status) { + continue + } + + currentUsage[cluster.Name], err = s.getClusterPodUsage(ctx, cluster, fedObject, selector) + if err != nil { + return nil, fmt.Errorf("failed to get pod resource usage in cluster %q: %w", cluster.Name, err) + } + } + + return currentUsage, nil +} + +func (s *Scheduler) getClusterPodUsage(ctx context.Context, cluster *fedcorev1a1.FederatedCluster, fedObject *unstructured.Unstructured, selector *metav1.LabelSelector) (res framework.Resource, err error) { + client, exists, err := s.federatedClient.KubeClientsetForCluster(cluster.Name) + if err != nil { + return res, fmt.Errorf("get clientset: %w", err) + } + if !exists { + return res, fmt.Errorf("clientset does not exist yet") // wait for the clientset to get created + } + + pods, err := client.CoreV1().Pods(metav1.NamespaceAll).List(ctx, metav1.ListOptions{ + ResourceVersion: "0", + LabelSelector: metav1.FormatLabelSelector(selector), + }) + if err != nil { + return res, fmt.Errorf("cannot list pods: %w", err) + } + + usage := federatedcluster.AggregatePodUsage(pods.Items, func(pod corev1.Pod) *corev1.Pod { return &pod }) + return *framework.NewResource(usage), nil +} + func getTemplateObjectMeta(fedObject *unstructured.Unstructured) (*metav1.ObjectMeta, error) { templateContent, exists, err := unstructured.NestedMap(fedObject.Object, common.TemplatePath...) if err != nil {