Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use informers for pod events instead of Listing #2178

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 74 additions & 37 deletions clusterloader2/pkg/measurement/common/slos/pod_startup_latency.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func createPodStartupLatencyMeasurement() measurement.Measurement {
podStartupEntries: measurementutil.NewObjectTransitionTimes(podStartupLatencyMeasurementName),
podMetadata: measurementutil.NewPodsMetadata(podStartupLatencyMeasurementName),
eventQueue: workqueue.New(),
schedEventQueue: workqueue.New(),
}
}

Expand All @@ -70,12 +71,14 @@ type eventData struct {
}

type podStartupLatencyMeasurement struct {
selector *util.ObjectSelector
isRunning bool
stopCh chan struct{}
selector *util.ObjectSelector
isRunning bool
stopCh chan struct{}
stopSchedCh chan struct{}
// This queue can potentially grow indefinitely, beacause we put all changes here.
// Usually it's not recommended pattern, but we need it for measuring PodStartupLatency.
eventQueue *workqueue.Type
schedEventQueue *workqueue.Type
podStartupEntries *measurementutil.ObjectTransitionTimes
podMetadata *measurementutil.PodsMetadata
threshold time.Duration
Expand All @@ -91,7 +94,7 @@ func (p *podStartupLatencyMeasurement) Execute(config *measurement.Config) ([]me
if err != nil {
return nil, err
}

schedulerName, err := util.GetStringOrDefault(config.Params, "schedulerName", defaultSchedulerName)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please copy the error checking here from under the case "gather" as well.

switch action {
case "start":
if err := p.selector.Parse(config.Params); err != nil {
Expand All @@ -101,9 +104,8 @@ func (p *podStartupLatencyMeasurement) Execute(config *measurement.Config) ([]me
if err != nil {
return nil, err
}
return nil, p.start(config.ClusterFramework.GetClientSets().GetClient())
return nil, p.start(config.ClusterFramework.GetClientSets().GetClient(), schedulerName)
case "gather":
schedulerName, err := util.GetStringOrDefault(config.Params, "schedulerName", defaultSchedulerName)
if err != nil {
return nil, err
}
Expand All @@ -124,7 +126,7 @@ func (p *podStartupLatencyMeasurement) String() string {
return podStartupLatencyMeasurementName + ": " + p.selector.String()
}

func (p *podStartupLatencyMeasurement) start(c clientset.Interface) error {
func (p *podStartupLatencyMeasurement) start(c clientset.Interface, schedulerName string) error {
if p.isRunning {
klog.V(2).Infof("%s: pod startup latancy measurement already running", p)
return nil
Expand All @@ -146,6 +148,29 @@ func (p *podStartupLatencyMeasurement) start(c clientset.Interface) error {
p.addEvent,
)
go p.processEvents()

selector := fields.Set{
"involvedObject.kind": "Pod",
"source": schedulerName,
}.AsSelector().String()

p.stopSchedCh = make(chan struct{})

e := informer.NewInformer(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use Controller, a slightly lower-lever primitive than Informer and pass a large (e.g. 10k) WatchListPageSize in the Config?

The reason I'm asking this is that in large clusters there's a tendency to have O(hundreds of thousands) events and listing them using default page size (500) may result in informer's initial list getting timed out.

Copy link
Contributor Author

@hakuna-matatah hakuna-matatah Nov 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

listing them using default page size (500) may result in informer's initial list getting timed out.

Oh! how will it timeout IIUC ? I have ensured we are not relying on client side timeout defined here for this use-case, instead I'm calling directly Run method here . Am i misinterpreting what you are trying to imply here ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see... we deliberately set the timeout in our wrappers around informers to make sure that the initial list (the one responsible for the cache's sync) completes in a reasonable time. In a clusters with O(xxx k) events this will take ages (O(a few minutes)) if using the default page size and because of that possibly run into the "too old resource version" during the initial list. I'd strongly suggest using the larger page size and hence the Controller primitive for the List+Watch pattern for events like we do in https://github.com/kubernetes/perf-tests/blob/master/clusterloader2/pkg/measurement/common/loadbalancer_nodesync_latency.go.

CC @mborsz for his thoughts as I'll be OOO for the rest of the week.

&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = selector
return c.CoreV1().Events(p.selector.Namespace).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = selector
return c.CoreV1().Events(p.selector.Namespace).Watch(context.TODO(), options)
},
},
p.addSchedEvent,
)
go p.processSchedEvents()
go e.Run(p.stopSchedCh)
return informer.StartAndSync(i, p.stopCh, informerSyncTimeout)
}

Expand All @@ -154,11 +179,36 @@ func (p *podStartupLatencyMeasurement) addEvent(_, obj interface{}) {
p.eventQueue.Add(event)
}

func (p *podStartupLatencyMeasurement) addSchedEvent(_, obj interface{}) {
event := &eventData{obj: obj, recvTime: time.Now()}
p.schedEventQueue.Add(event)
}

func (p *podStartupLatencyMeasurement) processEvents() {
for p.processNextWorkItem() {
}
}

func (p *podStartupLatencyMeasurement) processSchedEvents() {
for p.processNextSchedWorkItem() {
}
}

func (p *podStartupLatencyMeasurement) processNextSchedWorkItem() bool {
item, quit := p.schedEventQueue.Get()
if quit {
return false
}
defer p.schedEventQueue.Done(item)
event, ok := item.(*eventData)
if !ok {
klog.Warningf("Couldn't convert work item to eventData: %v", item)
return true
}
p.processSchedEvent(event)
return true
}

func (p *podStartupLatencyMeasurement) processNextWorkItem() bool {
item, quit := p.eventQueue.Get()
if quit {
Expand All @@ -179,7 +229,9 @@ func (p *podStartupLatencyMeasurement) stop() {
if p.isRunning {
p.isRunning = false
close(p.stopCh)
close(p.stopSchedCh)
p.eventQueue.ShutDown()
p.schedEventQueue.ShutDown()
}
}

Expand Down Expand Up @@ -230,10 +282,6 @@ func (p *podStartupLatencyMeasurement) gather(c clientset.Interface, identifier

p.stop()

if err := p.gatherScheduleTimes(c, schedulerName); err != nil {
return nil, err
}

checks := []podStartupLatencyCheck{
{
namePrefix: "",
Expand Down Expand Up @@ -270,33 +318,23 @@ func (p *podStartupLatencyMeasurement) gather(c clientset.Interface, identifier
return summaries, err
}

// TODO(#2006): gatherScheduleTimes is currently listing events at the end of the test.
//
// Given that events by default have 1h TTL, for measurements across longer periods
// it just returns incomplete results.
// Given that we don't 100% accuracy, we should switch to a mechanism that is similar
// to the one that slo-monitor is using (added in #1477).
func (p *podStartupLatencyMeasurement) gatherScheduleTimes(c clientset.Interface, schedulerName string) error {
selector := fields.Set{
"involvedObject.kind": "Pod",
"source": schedulerName,
}.AsSelector().String()
options := metav1.ListOptions{FieldSelector: selector}
schedEvents, err := c.CoreV1().Events(p.selector.Namespace).List(context.TODO(), options)
if err != nil {
return err
func (p *podStartupLatencyMeasurement) processSchedEvent(event *eventData) {

obj := event.obj
if obj == nil {
return
}
for _, event := range schedEvents.Items {
key := createMetaNamespaceKey(event.InvolvedObject.Namespace, event.InvolvedObject.Name)
if _, exists := p.podStartupEntries.Get(key, createPhase); exists {
if !event.EventTime.IsZero() {
p.podStartupEntries.Set(key, schedulePhase, event.EventTime.Time)
} else {
p.podStartupEntries.Set(key, schedulePhase, event.FirstTimestamp.Time)
}
}
e, ok := obj.(*corev1.Event)
if !ok {
return
}
key := createMetaNamespaceKey(e.InvolvedObject.Namespace, e.InvolvedObject.Name)

if !e.EventTime.IsZero() {
p.podStartupEntries.Set(key, schedulePhase, e.EventTime.Time)
} else {
p.podStartupEntries.Set(key, schedulePhase, e.FirstTimestamp.Time)
}
return nil
}

func (p *podStartupLatencyMeasurement) processEvent(event *eventData) {
Expand All @@ -311,7 +349,6 @@ func (p *podStartupLatencyMeasurement) processEvent(event *eventData) {

key := createMetaNamespaceKey(pod.Namespace, pod.Name)
p.podMetadata.SetStateless(key, isPodStateless(pod))

if pod.Status.Phase == corev1.PodRunning {
if _, found := p.podStartupEntries.Get(key, createPhase); !found {
p.podStartupEntries.Set(key, watchPhase, recvTime)
Expand Down
90 changes: 90 additions & 0 deletions clusterloader2/testing/scheduler-throughput/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# ASSUMPTIONS:
# - Underlying cluster should have 100+ nodes.
# See https://github.com/kubernetes/perf-tests/pull/1667#issuecomment-769642266

# The minimal number of pods to be used to measure various things like
# pod-startup-latency or scheduler-throughput. The purpose of it is to avoid
# problems in small clusters where we wouldn't have enough samples (pods) to
# measure things accurately.
# TODO( https://github.com/kubernetes/perf-tests/issues/1027): Lower the number of "min-pods" once we fix the scheduler throughput measurement.
{{$MIN_PODS_IN_SMALL_CLUSTERS := 5000}}

{{$totalSchedulerThroughputPods := DefaultParam .CL2_SCHEDULER_THROUGHPUT_PODS $MIN_PODS_IN_SMALL_CLUSTERS}}
{{$defaultQps := DefaultParam .CL2_DEFAULT_QPS 500}}
{{$defaultBurst := DefaultParam .CL2_DEFAULT_BURST 1000}}
{{$uniformQps := DefaultParam .CL2_UNIFORM_QPS 500}}

{{$SCHEDULER_THROUGHPUT_THRESHOLD := DefaultParam .CL2_SCHEDULER_THROUGHPUT_THRESHOLD 400}}

name: direct-scheduler-throughput
namespace:
number: 1
tuningSets:
# default is a tuningset that is meant to be used when we don't have any specific requirements on pace of operations.
- name: default
globalQPSLoad:
qps: {{$defaultQps}}
burst: {{$defaultBurst}}
- name: UniformQPS
qpsLoad:
qps: {{$uniformQps}}
steps:
- name: Creating scheduler throughput measurements
measurements:
- Identifier: DirectSchedulerThroughputPodStartupLatency
Method: PodStartupLatency
Params:
action: start
labelSelector: group = direct-scheduler-throughput
threshold: 5s
- Identifier: DirectSchedulingThroughput
Method: SchedulingThroughput
Params:
action: start
labelSelector: group = direct-scheduler-throughput
measurmentInterval: 1s
- name: create scheduler throughput pods
phases:
- namespaceRange:
min: 1
max: 1
replicasPerNamespace: {{$totalSchedulerThroughputPods}}
tuningSet: UniformQPS
objectBundle:
- basename: direct-scheduler-throughput-pod
objectTemplatePath: pod-default.yaml
templateFillMap:
Group: direct-scheduler-throughput
- name: Waiting for scheduler throughput pods to be created
measurements:
- Identifier: WaitForDirectSchedulerThroughputPods
Method: WaitForRunningPods
Params:
action: gather
timeout: 5m
desiredPodCount: {{$totalSchedulerThroughputPods}}
labelSelector: group = direct-scheduler-throughput
- name: Collecting scheduler throughput measurements
measurements:
- Identifier: DirectSchedulerThroughputPodStartupLatency
Method: PodStartupLatency
Params:
action: gather
- Identifier: DirectSchedulingThroughput
Method: SchedulingThroughput
Params:
action: gather
enableViolations: true
threshold: {{$SCHEDULER_THROUGHPUT_THRESHOLD}}
- name: Delete scheduler throughput pods
phases:
- namespaceRange:
min: 1
max: 1
replicasPerNamespace: 0
tuningSet: default
objectBundle:
- basename: scheduler-throughput-pod
objectTemplatePath: pod-default.yaml
templateFillMap:
Group: direct-scheduler-throughput
10 changes: 10 additions & 0 deletions clusterloader2/testing/scheduler-throughput/pod-default.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
apiVersion: v1
kind: Pod
metadata:
generateName: pod-churn-
labels:
group: {{.Group}}
spec:
containers:
- image: registry.k8s.io/pause:3.9
name: pause