Skip to content

Commit

Permalink
Merge pull request #878 from payall4u/feature/pod-initializer
Browse files Browse the repository at this point in the history
Optimize qos initializer.
  • Loading branch information
mfanjie authored Nov 30, 2023
2 parents 98ac477 + 74132af commit 6d7758e
Show file tree
Hide file tree
Showing 9 changed files with 237 additions and 43 deletions.
3 changes: 2 additions & 1 deletion pkg/ensurance/analyzer/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
ecache "github.com/gocrane/crane/pkg/ensurance/cache"
"github.com/gocrane/crane/pkg/ensurance/executor"
"github.com/gocrane/crane/pkg/ensurance/executor/podinfo"
"github.com/gocrane/crane/pkg/ensurance/util"
"github.com/gocrane/crane/pkg/known"
"github.com/gocrane/crane/pkg/metrics"
"github.com/gocrane/crane/pkg/utils"
Expand Down Expand Up @@ -490,7 +491,7 @@ func (s *AnomalyAnalyzer) filterPodQOSMatches(pods []*v1.Pod, actionName string)
}
for _, qos := range podQOSList {
for _, pod := range pods {
if !match(pod, qos) {
if !util.MatchPodAndPodQOS(pod, qos) {
klog.V(4).Infof("Pod %s/%s does not match PodQOS %s", pod.Namespace, pod.Name, qos.Name)
continue

Expand Down
2 changes: 1 addition & 1 deletion pkg/ensurance/collector/cadvisor/cadvisor_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func NewCadvisorManager(cgroupDriver string) Manager {
sysfs := csysfs.NewRealSysFs()
maxHousekeepingConfig := cmanager.HouskeepingConfig{Interval: &maxHousekeepingInterval, AllowDynamic: &allowDynamic}

m, err := cmanager.New(memCache, sysfs, maxHousekeepingConfig, includedMetrics, http.DefaultClient, []string{"/" + utils.CgroupKubePods}, nil /* containerEnvMetadataWhiteList */, "" /* perfEventsFile */, time.Duration(0) /*resctrlInterval*/)
m, err := cmanager.New(memCache, sysfs, maxHousekeepingConfig, includedMetrics, http.DefaultClient, []string{"/" + utils.CgroupKubePods}, nil /* containerEnvMetadataWhiteList */, "" /* perfEventsFile */, time.Duration(0) /*resctrlInterval*/)
if err != nil {
klog.Errorf("Failed to create cadvisor manager start: %v", err)
return nil
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package analyzer
package util

import (
"fmt"
"reflect"
"sort"
"strconv"
"strings"

Expand Down Expand Up @@ -69,7 +70,41 @@ func labelMatch(labelSelector metav1.LabelSelector, matchLabels map[string]strin
return true
}

func match(pod *v1.Pod, podQOS *ensuranceapi.PodQOS) bool {
func sortQOSSlice(qosSlice []*ensuranceapi.PodQOS) {
sort.Slice(qosSlice, func(i, j int) bool {
if len(qosSlice[i].Spec.LabelSelector.MatchLabels) != len(qosSlice[j].Spec.LabelSelector.MatchLabels) {
return len(qosSlice[i].Spec.LabelSelector.MatchLabels) > len(qosSlice[j].Spec.LabelSelector.MatchLabels)
}

if qosSlice[i].Spec.ScopeSelector == nil && qosSlice[j].Spec.ScopeSelector == nil {
return true
}

if qosSlice[i].Spec.ScopeSelector == nil {
return false
}

if qosSlice[j].Spec.ScopeSelector == nil {
return true
}

return len(qosSlice[i].Spec.ScopeSelector.MatchExpressions) > len(qosSlice[j].Spec.ScopeSelector.MatchExpressions)
})
}

func MatchPodAndPodQOSSlice(pod *v1.Pod, qosSlice []*ensuranceapi.PodQOS) (res *ensuranceapi.PodQOS) {
newSlice := make([]*ensuranceapi.PodQOS, len(qosSlice), len(qosSlice))
copy(newSlice, qosSlice)
sortQOSSlice(newSlice)
for _, qos := range newSlice {
if MatchPodAndPodQOS(pod, qos) {
return qos
}
}
return nil
}

func MatchPodAndPodQOS(pod *v1.Pod, podQOS *ensuranceapi.PodQOS) bool {

if podQOS.Spec.ScopeSelector == nil &&
podQOS.Spec.LabelSelector.MatchLabels == nil &&
Expand Down
5 changes: 3 additions & 2 deletions pkg/providers/grpc/pb/provider.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/providers/grpc/pb/provider_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

77 changes: 65 additions & 12 deletions pkg/webhooks/pod/mutating.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,33 @@ import (
"context"
"fmt"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"

"github.com/gocrane/api/ensurance/v1alpha1"

"github.com/gocrane/crane/pkg/ensurance/config"
"github.com/gocrane/crane/pkg/ensurance/util"
)

var (
SystemNamespaces = map[string]interface{}{"kube-system": nil, "crane-system": nil}
)

type MutatingAdmission struct {
Config *config.QOSConfig
Config *config.QOSConfig
listPodQOS func() ([]*v1alpha1.PodQOS, error)
}

func NewMutatingAdmission(config *config.QOSConfig, listPodQOS func() ([]*v1alpha1.PodQOS, error)) *MutatingAdmission {
return &MutatingAdmission{
Config: config,
listPodQOS: listPodQOS,
}
}

// Default implements webhook.Defaulter so a webhook will be registered for the type
Expand All @@ -28,36 +40,77 @@ func (m *MutatingAdmission) Default(ctx context.Context, obj runtime.Object) err
return fmt.Errorf("expected a Pod but got a %T", obj)
}

klog.Infof("Into Pod injection %s/%s", pod.Namespace, pod.Name)
klog.V(2).Infof("Mutating started for pod %s/%s", pod.Namespace, pod.Name)

if _, exist := SystemNamespaces[pod.Namespace]; exist {
return nil
}

if m.Config == nil || !m.Config.QOSInitializer.Enable {
if !m.available() {
return nil
}

if pod.Labels == nil {
ls, err := metav1.LabelSelectorAsSelector(m.Config.QOSInitializer.Selector)
if err != nil {
return err
}

if !ls.Matches(labels.Set(pod.Labels)) {
klog.V(2).Infof("Injection skipped: webhook is not interested in the pod")
return nil
}

ls, err := metav1.LabelSelectorAsSelector(m.Config.QOSInitializer.Selector)
qosSlice, err := m.listPodQOS()
if err != nil {
return err
return errors.WithMessage(err, "list PodQOS failed")
}

/****************************************************************
* Check whether the pod has a low CPUPriority (CPUPriority > 0)
****************************************************************/
qos := util.MatchPodAndPodQOSSlice(pod, qosSlice)
if qos == nil {
klog.V(2).Infof("Injection skipped: no podqos matched")
return nil
}

if qos.Spec.ResourceQOS.CPUQOS == nil ||
qos.Spec.ResourceQOS.CPUQOS.CPUPriority == nil ||
*qos.Spec.ResourceQOS.CPUQOS.CPUPriority == 0 {
klog.V(2).Infof("Injection skipped: not a low CPUPriority pod, qos %s", qos.Name)
return nil
}

if ls.Matches(labels.Set(pod.Labels)) {
if m.Config.QOSInitializer.InitContainerTemplate != nil {
pod.Spec.InitContainers = append(pod.Spec.InitContainers, *m.Config.QOSInitializer.InitContainerTemplate)
for _, container := range pod.Spec.InitContainers {
if container.Name == m.Config.QOSInitializer.InitContainerTemplate.Name {
klog.V(2).Infof("Injection skipped: pod has initializerContainer already")
return nil
}
}

if m.Config.QOSInitializer.VolumeTemplate != nil {
pod.Spec.Volumes = append(pod.Spec.Volumes, *m.Config.QOSInitializer.VolumeTemplate)
for _, volume := range pod.Spec.Volumes {
if volume.Name == m.Config.QOSInitializer.VolumeTemplate.Name {
klog.V(2).Infof("Injection skipped: pod has initializerVolume already")
return nil
}
}

if m.Config.QOSInitializer.InitContainerTemplate != nil {
pod.Spec.InitContainers = append(pod.Spec.InitContainers, *m.Config.QOSInitializer.InitContainerTemplate)
}

klog.Infof("Injected QOSInitializer for Pod %s/%s", pod.Namespace, pod.Name)
if m.Config.QOSInitializer.VolumeTemplate != nil {
pod.Spec.Volumes = append(pod.Spec.Volumes, *m.Config.QOSInitializer.VolumeTemplate)
}

klog.V(2).Infof("Mutating completed for pod %s/%s", pod.Namespace, pod.Name)

return nil
}

func (m *MutatingAdmission) available() bool {
return m.Config != nil &&
m.Config.QOSInitializer.Enable &&
m.Config.QOSInitializer.InitContainerTemplate != nil &&
m.Config.QOSInitializer.VolumeTemplate != nil
}
105 changes: 94 additions & 11 deletions pkg/webhooks/pod/mutating_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@ import (
"context"
"testing"

"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"
"sigs.k8s.io/yaml"

"github.com/gocrane/api/ensurance/v1alpha1"

"github.com/gocrane/crane/pkg/ensurance/config"
)

Expand All @@ -20,23 +24,102 @@ func TestDefaultingPodQOSInitializer(t *testing.T) {
t.Errorf("unmarshal config failed:%v", err)
}
m := MutatingAdmission{
Config: config,
Config: config,
listPodQOS: MockListPodQOSFunc,
}

type Case struct {
Pod *v1.Pod
Inject bool
}

for _, tc := range []Case{
{Pod: MockPod("offline", "offline", "enable", "app", "nginx"), Inject: true},
{Pod: MockPod("offline-not-interested", "offline", "enable"), Inject: false},
{Pod: MockPod("online", "offline", "disable", "app", "nginx"), Inject: false},
{Pod: MockPod("online-not-interested", "offline", "disable"), Inject: false},
{Pod: MockPod("default"), Inject: false},
} {
assert.NoError(t, m.Default(context.Background(), tc.Pod))
t.Log(tc.Pod.Name)
assert.Equal(t, len(tc.Pod.Spec.InitContainers) == 1, tc.Inject)
assert.Equal(t, len(tc.Pod.Spec.Volumes) == 1, tc.Inject)
}
}

func TestPrecheck(t *testing.T) {
configYaml := "apiVersion: ensurance.crane.io/v1alpha1\nkind: QOSConfig\nqosInitializer:\n enable: true\n selector: \n matchLabels:\n app: nginx\n"

config := &config.QOSConfig{}
err := yaml.Unmarshal([]byte(configYaml), config)
if err != nil {
t.Errorf("unmarshal config failed:%v", err)
}
m := MutatingAdmission{
Config: config,
listPodQOS: MockListPodQOSFunc,
}
assert.False(t, m.available())
}

func MockListPodQOSFunc() ([]*v1alpha1.PodQOS, error) {
return []*v1alpha1.PodQOS{
{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{},
Spec: v1alpha1.PodQOSSpec{
LabelSelector: metav1.LabelSelector{
MatchLabels: map[string]string{"offline": "enable"},
},
ResourceQOS: v1alpha1.ResourceQOS{
CPUQOS: &v1alpha1.CPUQOS{
CPUPriority: pointer.Int32(7),
},
},
},
}, {
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{},
Spec: v1alpha1.PodQOSSpec{
LabelSelector: metav1.LabelSelector{
MatchLabels: map[string]string{"offline": "disable"},
},
ResourceQOS: v1alpha1.ResourceQOS{
CPUQOS: &v1alpha1.CPUQOS{
CPUPriority: pointer.Int32(0),
},
},
},
}, {
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{},
Spec: v1alpha1.PodQOSSpec{
ResourceQOS: v1alpha1.ResourceQOS{
CPUQOS: &v1alpha1.CPUQOS{
CPUPriority: pointer.Int32(7),
},
},
},
},
}, nil
}

func MockPod(name string, labels ...string) *v1.Pod {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
Labels: map[string]string{
"app": "nginx",
"type": "offline",
},
Name: name,
Labels: nil,
},
}
err = m.Default(context.TODO(), pod)
if err != nil {
t.Fatalf("inject pod failed: %v", err)

if len(labels) < 2 {
return pod
}
if len(pod.Spec.InitContainers) == 0 {
t.Fatalf("should inject containers")

labelmap := map[string]string{}
for i := 0; i < len(labels)-1; i += 2 {
labelmap[labels[i]] = labels[i+1]
}
pod.Labels = labelmap
return pod
}
Loading

0 comments on commit 6d7758e

Please sign in to comment.