From f33b0bacf9fe650640e94eefc3ac4c9ac098f87e Mon Sep 17 00:00:00 2001 From: Harish Kuna Date: Wed, 2 Nov 2022 11:20:41 -0700 Subject: [PATCH 1/2] use informers for pod events instead of Listing --- .../common/slos/pod_startup_latency.go | 111 ++++++++++++------ 1 file changed, 74 insertions(+), 37 deletions(-) diff --git a/clusterloader2/pkg/measurement/common/slos/pod_startup_latency.go b/clusterloader2/pkg/measurement/common/slos/pod_startup_latency.go index 4b9b3a1df9..5319699d63 100644 --- a/clusterloader2/pkg/measurement/common/slos/pod_startup_latency.go +++ b/clusterloader2/pkg/measurement/common/slos/pod_startup_latency.go @@ -61,6 +61,7 @@ func createPodStartupLatencyMeasurement() measurement.Measurement { podStartupEntries: measurementutil.NewObjectTransitionTimes(podStartupLatencyMeasurementName), podMetadata: measurementutil.NewPodsMetadata(podStartupLatencyMeasurementName), eventQueue: workqueue.New(), + schedEventQueue: workqueue.New(), } } @@ -70,12 +71,14 @@ type eventData struct { } type podStartupLatencyMeasurement struct { - selector *util.ObjectSelector - isRunning bool - stopCh chan struct{} + selector *util.ObjectSelector + isRunning bool + stopCh chan struct{} + stopSchedCh chan struct{} // This queue can potentially grow indefinitely, beacause we put all changes here. // Usually it's not recommended pattern, but we need it for measuring PodStartupLatency. eventQueue *workqueue.Type + schedEventQueue *workqueue.Type podStartupEntries *measurementutil.ObjectTransitionTimes podMetadata *measurementutil.PodsMetadata threshold time.Duration @@ -91,7 +94,7 @@ func (p *podStartupLatencyMeasurement) Execute(config *measurement.Config) ([]me if err != nil { return nil, err } - + schedulerName, err := util.GetStringOrDefault(config.Params, "schedulerName", defaultSchedulerName) switch action { case "start": if err := p.selector.Parse(config.Params); err != nil { @@ -101,9 +104,8 @@ func (p *podStartupLatencyMeasurement) Execute(config *measurement.Config) ([]me if err != nil { return nil, err } - return nil, p.start(config.ClusterFramework.GetClientSets().GetClient()) + return nil, p.start(config.ClusterFramework.GetClientSets().GetClient(), schedulerName) case "gather": - schedulerName, err := util.GetStringOrDefault(config.Params, "schedulerName", defaultSchedulerName) if err != nil { return nil, err } @@ -124,7 +126,7 @@ func (p *podStartupLatencyMeasurement) String() string { return podStartupLatencyMeasurementName + ": " + p.selector.String() } -func (p *podStartupLatencyMeasurement) start(c clientset.Interface) error { +func (p *podStartupLatencyMeasurement) start(c clientset.Interface, schedulerName string) error { if p.isRunning { klog.V(2).Infof("%s: pod startup latancy measurement already running", p) return nil @@ -146,6 +148,29 @@ func (p *podStartupLatencyMeasurement) start(c clientset.Interface) error { p.addEvent, ) go p.processEvents() + + selector := fields.Set{ + "involvedObject.kind": "Pod", + "source": schedulerName, + }.AsSelector().String() + + p.stopSchedCh = make(chan struct{}) + + e := informer.NewInformer( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + options.FieldSelector = selector + return c.CoreV1().Events(p.selector.Namespace).List(context.TODO(), options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + options.FieldSelector = selector + return c.CoreV1().Events(p.selector.Namespace).Watch(context.TODO(), options) + }, + }, + p.addSchedEvent, + ) + go p.processSchedEvents() + go e.Run(p.stopSchedCh) return informer.StartAndSync(i, p.stopCh, informerSyncTimeout) } @@ -154,11 +179,36 @@ func (p *podStartupLatencyMeasurement) addEvent(_, obj interface{}) { p.eventQueue.Add(event) } +func (p *podStartupLatencyMeasurement) addSchedEvent(_, obj interface{}) { + event := &eventData{obj: obj, recvTime: time.Now()} + p.schedEventQueue.Add(event) +} + func (p *podStartupLatencyMeasurement) processEvents() { for p.processNextWorkItem() { } } +func (p *podStartupLatencyMeasurement) processSchedEvents() { + for p.processNextSchedWorkItem() { + } +} + +func (p *podStartupLatencyMeasurement) processNextSchedWorkItem() bool { + item, quit := p.schedEventQueue.Get() + if quit { + return false + } + defer p.schedEventQueue.Done(item) + event, ok := item.(*eventData) + if !ok { + klog.Warningf("Couldn't convert work item to eventData: %v", item) + return true + } + p.processSchedEvent(event) + return true +} + func (p *podStartupLatencyMeasurement) processNextWorkItem() bool { item, quit := p.eventQueue.Get() if quit { @@ -179,7 +229,9 @@ func (p *podStartupLatencyMeasurement) stop() { if p.isRunning { p.isRunning = false close(p.stopCh) + close(p.stopSchedCh) p.eventQueue.ShutDown() + p.schedEventQueue.ShutDown() } } @@ -230,10 +282,6 @@ func (p *podStartupLatencyMeasurement) gather(c clientset.Interface, identifier p.stop() - if err := p.gatherScheduleTimes(c, schedulerName); err != nil { - return nil, err - } - checks := []podStartupLatencyCheck{ { namePrefix: "", @@ -270,33 +318,23 @@ func (p *podStartupLatencyMeasurement) gather(c clientset.Interface, identifier return summaries, err } -// TODO(#2006): gatherScheduleTimes is currently listing events at the end of the test. -// -// Given that events by default have 1h TTL, for measurements across longer periods -// it just returns incomplete results. -// Given that we don't 100% accuracy, we should switch to a mechanism that is similar -// to the one that slo-monitor is using (added in #1477). -func (p *podStartupLatencyMeasurement) gatherScheduleTimes(c clientset.Interface, schedulerName string) error { - selector := fields.Set{ - "involvedObject.kind": "Pod", - "source": schedulerName, - }.AsSelector().String() - options := metav1.ListOptions{FieldSelector: selector} - schedEvents, err := c.CoreV1().Events(p.selector.Namespace).List(context.TODO(), options) - if err != nil { - return err +func (p *podStartupLatencyMeasurement) processSchedEvent(event *eventData) { + + obj := event.obj + if obj == nil { + return } - for _, event := range schedEvents.Items { - key := createMetaNamespaceKey(event.InvolvedObject.Namespace, event.InvolvedObject.Name) - if _, exists := p.podStartupEntries.Get(key, createPhase); exists { - if !event.EventTime.IsZero() { - p.podStartupEntries.Set(key, schedulePhase, event.EventTime.Time) - } else { - p.podStartupEntries.Set(key, schedulePhase, event.FirstTimestamp.Time) - } - } + e, ok := obj.(*corev1.Event) + if !ok { + return + } + key := createMetaNamespaceKey(e.InvolvedObject.Namespace, e.InvolvedObject.Name) + + if !e.EventTime.IsZero() { + p.podStartupEntries.Set(key, schedulePhase, e.EventTime.Time) + } else { + p.podStartupEntries.Set(key, schedulePhase, e.FirstTimestamp.Time) } - return nil } func (p *podStartupLatencyMeasurement) processEvent(event *eventData) { @@ -311,7 +349,6 @@ func (p *podStartupLatencyMeasurement) processEvent(event *eventData) { key := createMetaNamespaceKey(pod.Namespace, pod.Name) p.podMetadata.SetStateless(key, isPodStateless(pod)) - if pod.Status.Phase == corev1.PodRunning { if _, found := p.podStartupEntries.Get(key, createPhase); !found { p.podStartupEntries.Set(key, watchPhase, recvTime) From 09a0c4124850a7a2b00cf029a6b965f616b11d41 Mon Sep 17 00:00:00 2001 From: Harish Kuna Date: Tue, 14 May 2024 01:39:56 +0000 Subject: [PATCH 2/2] Add direct scheduler throughput test suite --- .../testing/scheduler-throughput/config.yaml | 90 +++++++++++++++++++ .../scheduler-throughput/pod-default.yaml | 10 +++ 2 files changed, 100 insertions(+) create mode 100644 clusterloader2/testing/scheduler-throughput/config.yaml create mode 100644 clusterloader2/testing/scheduler-throughput/pod-default.yaml diff --git a/clusterloader2/testing/scheduler-throughput/config.yaml b/clusterloader2/testing/scheduler-throughput/config.yaml new file mode 100644 index 0000000000..4b718881be --- /dev/null +++ b/clusterloader2/testing/scheduler-throughput/config.yaml @@ -0,0 +1,90 @@ +# ASSUMPTIONS: +# - Underlying cluster should have 100+ nodes. +# See https://github.com/kubernetes/perf-tests/pull/1667#issuecomment-769642266 + +# The minimal number of pods to be used to measure various things like +# pod-startup-latency or scheduler-throughput. The purpose of it is to avoid +# problems in small clusters where we wouldn't have enough samples (pods) to +# measure things accurately. +# TODO( https://github.com/kubernetes/perf-tests/issues/1027): Lower the number of "min-pods" once we fix the scheduler throughput measurement. +{{$MIN_PODS_IN_SMALL_CLUSTERS := 5000}} + +{{$totalSchedulerThroughputPods := DefaultParam .CL2_SCHEDULER_THROUGHPUT_PODS $MIN_PODS_IN_SMALL_CLUSTERS}} +{{$defaultQps := DefaultParam .CL2_DEFAULT_QPS 500}} +{{$defaultBurst := DefaultParam .CL2_DEFAULT_BURST 1000}} +{{$uniformQps := DefaultParam .CL2_UNIFORM_QPS 500}} + +{{$SCHEDULER_THROUGHPUT_THRESHOLD := DefaultParam .CL2_SCHEDULER_THROUGHPUT_THRESHOLD 400}} + +name: direct-scheduler-throughput +namespace: + number: 1 +tuningSets: +# default is a tuningset that is meant to be used when we don't have any specific requirements on pace of operations. +- name: default + globalQPSLoad: + qps: {{$defaultQps}} + burst: {{$defaultBurst}} +- name: UniformQPS + qpsLoad: + qps: {{$uniformQps}} +steps: +- name: Creating scheduler throughput measurements + measurements: + - Identifier: DirectSchedulerThroughputPodStartupLatency + Method: PodStartupLatency + Params: + action: start + labelSelector: group = direct-scheduler-throughput + threshold: 5s + - Identifier: DirectSchedulingThroughput + Method: SchedulingThroughput + Params: + action: start + labelSelector: group = direct-scheduler-throughput + measurmentInterval: 1s +- name: create scheduler throughput pods + phases: + - namespaceRange: + min: 1 + max: 1 + replicasPerNamespace: {{$totalSchedulerThroughputPods}} + tuningSet: UniformQPS + objectBundle: + - basename: direct-scheduler-throughput-pod + objectTemplatePath: pod-default.yaml + templateFillMap: + Group: direct-scheduler-throughput +- name: Waiting for scheduler throughput pods to be created + measurements: + - Identifier: WaitForDirectSchedulerThroughputPods + Method: WaitForRunningPods + Params: + action: gather + timeout: 5m + desiredPodCount: {{$totalSchedulerThroughputPods}} + labelSelector: group = direct-scheduler-throughput +- name: Collecting scheduler throughput measurements + measurements: + - Identifier: DirectSchedulerThroughputPodStartupLatency + Method: PodStartupLatency + Params: + action: gather + - Identifier: DirectSchedulingThroughput + Method: SchedulingThroughput + Params: + action: gather + enableViolations: true + threshold: {{$SCHEDULER_THROUGHPUT_THRESHOLD}} +- name: Delete scheduler throughput pods + phases: + - namespaceRange: + min: 1 + max: 1 + replicasPerNamespace: 0 + tuningSet: default + objectBundle: + - basename: scheduler-throughput-pod + objectTemplatePath: pod-default.yaml + templateFillMap: + Group: direct-scheduler-throughput \ No newline at end of file diff --git a/clusterloader2/testing/scheduler-throughput/pod-default.yaml b/clusterloader2/testing/scheduler-throughput/pod-default.yaml new file mode 100644 index 0000000000..4885d7ec8d --- /dev/null +++ b/clusterloader2/testing/scheduler-throughput/pod-default.yaml @@ -0,0 +1,10 @@ +apiVersion: v1 +kind: Pod +metadata: + generateName: pod-churn- + labels: + group: {{.Group}} +spec: + containers: + - image: registry.k8s.io/pause:3.9 + name: pause \ No newline at end of file