Skip to content

Commit

Permalink
DROP: Implement Prometheus metrics for LocalQueue (kubernetes-sigs#3673)
Browse files Browse the repository at this point in the history
* add LocalQueue metrics (no feature gate)

Signed-off-by: Kevin <kpostlet@redhat.com>

* add all clear and report calls

Signed-off-by: Kevin <kpostlet@redhat.com>

* add feature gate

Signed-off-by: Kevin <kpostlet@redhat.com>

* cleanup todos and add more feature gates

Signed-off-by: Kevin <kpostlet@redhat.com>

* use feature gate instead of config

Signed-off-by: Kevin <kpostlet@redhat.com>

* cleanup

Signed-off-by: Kevin <kpostlet@redhat.com>

* add metrics checks to a test

Signed-off-by: Kevin <kpostlet@redhat.com>

* add lq metrics to cq integration test

Signed-off-by: Kevin <kpostlet@redhat.com>

* lint fix

Signed-off-by: Kevin <kpostlet@redhat.com>

* use name instead of local_queue

Signed-off-by: Kevin <kpostlet@redhat.com>

* update status metric description

Signed-off-by: Kevin <kpostlet@redhat.com>

* fix key name

Signed-off-by: Kevin <kpostlet@redhat.com>

* move registerLQ into metrics package

Signed-off-by: Kevin <kpostlet@redhat.com>

---------

Signed-off-by: Kevin <kpostlet@redhat.com>
  • Loading branch information
KPostOffice committed Dec 11, 2024
1 parent db8b9da commit 63fa660
Show file tree
Hide file tree
Showing 15 changed files with 529 additions and 4 deletions.
1 change: 1 addition & 0 deletions config/rhoai/manager_config_patch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ spec:
args:
- "--config=/controller_manager_config.yaml"
- "--zap-log-level=2"
- "--feature-gates=LocalQueueMetrics=true"
volumeMounts:
- name: manager-config
mountPath: /controller_manager_config.yaml
Expand Down
5 changes: 5 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,11 @@ func (c *Cache) DeleteClusterQueue(cq *kueue.ClusterQueue) {
if !ok {
return
}
if features.Enabled(features.LocalQueueMetrics) {
for _, q := range c.clusterQueues[cq.Name].localQueues {
metrics.ClearLocalQueueCacheMetrics(metrics.LQRefFromLocalQueueKey(q.key))
}
}
c.deleteClusterQueueFromCohort(cqImpl)
delete(c.clusterQueues, cq.Name)
metrics.ClearCacheMetrics(cq.Name)
Expand Down
15 changes: 15 additions & 0 deletions pkg/cache/clusterqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,12 @@ func (c *clusterQueue) reportActiveWorkloads() {
metrics.ReservingActiveWorkloads.WithLabelValues(c.Name).Set(float64(len(c.Workloads)))
}

func (q *queue) reportActiveWorkloads() {
qKeySlice := strings.Split(q.key, "/")
metrics.LocalQueueAdmittedActiveWorkloads.WithLabelValues(qKeySlice[1], qKeySlice[0]).Set(float64(q.admittedWorkloads))
metrics.LocalQueueReservingActiveWorkloads.WithLabelValues(qKeySlice[1], qKeySlice[0]).Set(float64(q.reservingWorkloads))
}

// updateWorkloadUsage updates the usage of the ClusterQueue for the workload
// and the number of admitted workloads for local queues.
func (c *clusterQueue) updateWorkloadUsage(wi *workload.Info, m int64) {
Expand All @@ -445,6 +451,9 @@ func (c *clusterQueue) updateWorkloadUsage(wi *workload.Info, m int64) {
updateFlavorUsage(frUsage, lq.admittedUsage, m)
lq.admittedWorkloads += int(m)
}
if features.Enabled(features.LocalQueueMetrics) {
lq.reportActiveWorkloads()
}
}
}

Expand Down Expand Up @@ -496,11 +505,17 @@ func (c *clusterQueue) addLocalQueue(q *kueue.LocalQueue) error {
}
}
c.localQueues[qKey] = qImpl
if features.Enabled(features.LocalQueueMetrics) {
qImpl.reportActiveWorkloads()
}
return nil
}

func (c *clusterQueue) deleteLocalQueue(q *kueue.LocalQueue) {
qKey := queueKey(q)
if features.Enabled(features.LocalQueueMetrics) {
metrics.ClearLocalQueueCacheMetrics(metrics.LQRefFromLocalQueueKey(qKey))
}
delete(c.localQueues, qKey)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ func SetupControllers(mgr ctrl.Manager, qManager *queue.Manager, cc *cache.Cache
qManager,
cc,
WithQueueVisibilityUpdateInterval(queueVisibilityUpdateInterval(cfg)),
WithQueueVisibilityClusterQueuesMaxCount(queueVisibilityClusterQueuesMaxCount(cfg)),
WithReportResourceMetrics(cfg.Metrics.EnableClusterQueueResources),
WithQueueVisibilityClusterQueuesMaxCount(queueVisibilityClusterQueuesMaxCount(cfg)),
WithFairSharing(fairSharingEnabled),
WithWatchers(rfRec, acRec),
)
Expand Down
52 changes: 51 additions & 1 deletion pkg/controller/core/localqueue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ import (
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/constants"
"sigs.k8s.io/kueue/pkg/controller/core/indexer"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/queue"
"sigs.k8s.io/kueue/pkg/util/resource"
)

const (
Expand All @@ -63,7 +66,11 @@ type LocalQueueReconciler struct {
wlUpdateCh chan event.GenericEvent
}

func NewLocalQueueReconciler(client client.Client, queues *queue.Manager, cache *cache.Cache) *LocalQueueReconciler {
func NewLocalQueueReconciler(
client client.Client,
queues *queue.Manager,
cache *cache.Cache,
) *LocalQueueReconciler {
return &LocalQueueReconciler{
log: ctrl.Log.WithName("localqueue-reconciler"),
queues: queues,
Expand Down Expand Up @@ -142,6 +149,10 @@ func (r *LocalQueueReconciler) Create(e event.CreateEvent) bool {
log.Error(err, "Failed to add localQueue to the cache")
}

if features.Enabled(features.LocalQueueMetrics) {
recordLocalQueueUsageMetrics(q)
}

return true
}

Expand All @@ -151,6 +162,11 @@ func (r *LocalQueueReconciler) Delete(e event.DeleteEvent) bool {
// No need to interact with the queue manager for other objects.
return true
}

if features.Enabled(features.LocalQueueMetrics) {
metrics.ClearLocalQueueResourceMetrics(localQueueReferenceFromLocalQueue(q))
}

r.log.V(2).Info("LocalQueue delete event", "localQueue", klog.KObj(q))
r.queues.DeleteLocalQueue(q)
r.cache.DeleteLocalQueue(q)
Expand Down Expand Up @@ -191,10 +207,38 @@ func (r *LocalQueueReconciler) Update(e event.UpdateEvent) bool {
}

r.queues.DeleteLocalQueue(oldLq)
if features.Enabled(features.LocalQueueMetrics) {
updateLocalQueueResourceMetrics(newLq)
}

return true
}

func localQueueReferenceFromLocalQueue(lq *kueue.LocalQueue) metrics.LocalQueueReference {
return metrics.LocalQueueReference{
Name: lq.Name,
Namespace: lq.Namespace,
}
}

func recordLocalQueueUsageMetrics(queue *kueue.LocalQueue) {
for _, flavor := range queue.Status.FlavorUsage {
for _, r := range flavor.Resources {
metrics.ReportLocalQueueResourceUsage(localQueueReferenceFromLocalQueue(queue), string(flavor.Name), string(r.Name), resource.QuantityToFloat(&r.Total))
}
}
for _, flavor := range queue.Status.FlavorsReservation {
for _, r := range flavor.Resources {
metrics.ReportLocalQueueResourceReservations(localQueueReferenceFromLocalQueue(queue), string(flavor.Name), string(r.Name), resource.QuantityToFloat(&r.Total))
}
}
}

func updateLocalQueueResourceMetrics(queue *kueue.LocalQueue) {
metrics.ClearLocalQueueResourceMetrics(localQueueReferenceFromLocalQueue(queue))
recordLocalQueueUsageMetrics(queue)
}

func (r *LocalQueueReconciler) Generic(e event.GenericEvent) bool {
r.log.V(3).Info("Got Workload event", "workload", klog.KObj(e.Object))
return true
Expand Down Expand Up @@ -337,6 +381,12 @@ func (r *LocalQueueReconciler) UpdateStatusIfChanged(
Message: msg,
ObservedGeneration: queue.Generation,
})
if features.Enabled(features.LocalQueueMetrics) {
metrics.ReportLocalQueueStatus(metrics.LocalQueueReference{
Name: queue.Name,
Namespace: queue.Namespace,
}, conditionStatus)
}
}
if !equality.Semantic.DeepEqual(oldStatus, queue.Status) {
return r.client.Status().Update(ctx, queue)
Expand Down
8 changes: 8 additions & 0 deletions pkg/controller/core/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/controller/core/indexer"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/queue"
utilac "sigs.k8s.io/kueue/pkg/util/admissioncheck"
Expand Down Expand Up @@ -258,6 +259,10 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
r.recorder.Eventf(&wl, corev1.EventTypeNormal, "Admitted", "Admitted by ClusterQueue %v, wait time since reservation was %.0fs", wl.Status.Admission.ClusterQueue, quotaReservedWaitTime.Seconds())
metrics.AdmittedWorkload(kueue.ClusterQueueReference(cqName), queuedWaitTime)
metrics.AdmissionChecksWaitTime(kueue.ClusterQueueReference(cqName), quotaReservedWaitTime)
if features.Enabled(features.LocalQueueMetrics) {
metrics.LocalQueueAdmittedWorkload(metrics.LQRefFromWorkload(&wl), queuedWaitTime)
metrics.LocalQueueAdmissionChecksWaitTime(metrics.LQRefFromWorkload(&wl), quotaReservedWaitTime)
}
}
return ctrl.Result{}, nil
}
Expand Down Expand Up @@ -384,6 +389,9 @@ func (r *WorkloadReconciler) reconcileOnLocalQueueActiveState(ctx context.Contex
cqName := string(lq.Spec.ClusterQueue)
if slices.Contains(r.queues.GetClusterQueueNames(), cqName) {
metrics.ReportEvictedWorkloads(cqName, kueue.WorkloadEvictedByLocalQueueStopped)
if features.Enabled(features.LocalQueueMetrics) {
metrics.ReportLocalQueueEvictedWorkloads(metrics.LQRefFromWorkload(wl), kueue.WorkloadEvictedByLocalQueueStopped)
}
}
}
return true, client.IgnoreNotFound(err)
Expand Down
7 changes: 7 additions & 0 deletions pkg/features/kube_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ const (
// Enable the Flavors status field in the LocalQueue, allowing users to view
// all currently available ResourceFlavors in the LocalQueue.
ExposeFlavorsInLocalQueue featuregate.Feature = "ExposeFlavorsInLocalQueue"

// owner: @kpostoffice
// alpha: v0.10
//
// Enabled gathering of LocalQueue metrics
LocalQueueMetrics featuregate.Feature = "LocalQueueMetrics"
)

func init() {
Expand All @@ -129,6 +135,7 @@ var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
MultiKueueBatchJobWithManagedBy: {Default: false, PreRelease: featuregate.Alpha},
MultiplePreemptions: {Default: false, PreRelease: featuregate.Alpha},
ExposeFlavorsInLocalQueue: {Default: true, PreRelease: featuregate.Beta},
LocalQueueMetrics: {Default: false, PreRelease: featuregate.Alpha},
}

func SetFeatureGateDuringTest(tb testing.TB, f featuregate.Feature, value bool) func() {
Expand Down
Loading

0 comments on commit 63fa660

Please sign in to comment.