Skip to content

Commit

Permalink
add preference binpack replicas plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
mrlihanbo committed Dec 12, 2023
1 parent e57d64a commit 863b6ef
Show file tree
Hide file tree
Showing 15 changed files with 1,447 additions and 62 deletions.
20 changes: 13 additions & 7 deletions pkg/controllers/automigration/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func (c *Controller) reconcile(ctx context.Context, qualifiedName common.Qualifi
// auto-migration controller sets AutoMigrationAnnotation to
// feedback auto-migration information back to the scheduler

var estimatedCapacity map[string]int64
var estimatedCapacity, scheduledReplicas map[string]int64

Check warning on line 291 in pkg/controllers/automigration/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/automigration/controller.go#L291

Added line #L291 was not covered by tests
var result *worker.Result
needsUpdate := false
if unschedulableThreshold == nil {
Expand All @@ -302,8 +302,8 @@ func (c *Controller) reconcile(ctx context.Context, qualifiedName common.Qualifi
} else {
// Keep the annotation up-to-date if auto migration is enabled.
keyedLogger.V(3).Info("Auto migration is enabled")
estimatedCapacity, result = c.estimateCapacity(ctx, ftc, clusterObjs, *unschedulableThreshold)
autoMigrationInfo := &framework.AutoMigrationInfo{EstimatedCapacity: estimatedCapacity}
estimatedCapacity, scheduledReplicas, result = c.estimateCapacity(ctx, ftc, clusterObjs, *unschedulableThreshold)
autoMigrationInfo := &framework.AutoMigrationInfo{EstimatedCapacity: estimatedCapacity, ScheduledReplicas: scheduledReplicas}

Check warning on line 306 in pkg/controllers/automigration/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/automigration/controller.go#L305-L306

Added lines #L305 - L306 were not covered by tests

// Compare with the existing autoMigration annotation
existingAutoMigrationInfo := &framework.AutoMigrationInfo{EstimatedCapacity: nil}
Expand Down Expand Up @@ -366,19 +366,22 @@ func (c *Controller) estimateCapacity(
typeConfig *fedcorev1a1.FederatedTypeConfig,
clusterObjs []FederatedObject,
unschedulableThreshold time.Duration,
) (map[string]int64, *worker.Result) {
) (map[string]int64, map[string]int64, *worker.Result) {

Check warning on line 369 in pkg/controllers/automigration/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/automigration/controller.go#L369

Added line #L369 was not covered by tests
needsBackoff := false
var retryAfter *time.Duration

estimatedCapacity := make(map[string]int64, len(clusterObjs))
scheduledReplicas := make(map[string]int64, len(clusterObjs))

Check warning on line 374 in pkg/controllers/automigration/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/automigration/controller.go#L374

Added line #L374 was not covered by tests

for _, clusterObj := range clusterObjs {
ctx, logger := logging.InjectLoggerValues(ctx, "cluster", clusterObj.ClusterName, "ftc", typeConfig.Name)

// This is an optimization to skip pod listing when there are no unschedulable pods.
totalReplicas, readyReplicas, err := c.getTotalAndReadyReplicas(typeConfig, clusterObj.Object)
if err == nil && totalReplicas == readyReplicas {
logger.V(3).Info("No unschedulable pods found, skip estimating capacity")
logger.V(3).Info("No unschedulable pods found, skip estimating capacity",
"readyReplicas", readyReplicas)
scheduledReplicas[clusterObj.ClusterName] = totalReplicas

Check warning on line 384 in pkg/controllers/automigration/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/automigration/controller.go#L382-L384

Added lines #L382 - L384 were not covered by tests
continue
}

Expand All @@ -398,13 +401,16 @@ func (c *Controller) estimateCapacity(
continue
}

unschedulable, nextCrossIn := countUnschedulablePods(pods, time.Now(), unschedulableThreshold)
scheduled, unschedulable, nextCrossIn := countScheduledAndUnschedulablePods(pods, time.Now(), unschedulableThreshold)

Check warning on line 404 in pkg/controllers/automigration/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/automigration/controller.go#L404

Added line #L404 was not covered by tests
logger.V(2).Info("Analyzed pods",
"total", len(pods),
"desired", desiredReplicas,
"scheduled", scheduled,

Check warning on line 408 in pkg/controllers/automigration/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/automigration/controller.go#L408

Added line #L408 was not covered by tests
"unschedulable", unschedulable,
)

scheduledReplicas[clusterObj.ClusterName] = int64(scheduled)

Check warning on line 413 in pkg/controllers/automigration/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/automigration/controller.go#L412-L413

Added lines #L412 - L413 were not covered by tests
if nextCrossIn != nil && (retryAfter == nil || *nextCrossIn < *retryAfter) {
retryAfter = nextCrossIn
}
Expand Down Expand Up @@ -441,7 +447,7 @@ func (c *Controller) estimateCapacity(
Backoff: needsBackoff,
}
}
return estimatedCapacity, result
return estimatedCapacity, scheduledReplicas, result

Check warning on line 450 in pkg/controllers/automigration/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/automigration/controller.go#L450

Added line #L450 was not covered by tests
}

func (c *Controller) getTotalAndReadyReplicas(
Expand Down
10 changes: 6 additions & 4 deletions pkg/controllers/automigration/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,21 @@ import (
// unschedulable for more than unschedulableThreshold,
// and a time.Duration representing the time from now
// when the new unschedulable pod will cross the threshold, if any.
func countUnschedulablePods(
func countScheduledAndUnschedulablePods(
podList []*corev1.Pod,
currentTime time.Time,
unschedulableThreshold time.Duration,
) (unschedulableCount int, nextCrossIn *time.Duration) {
) (scheduledCount, unschedulableCount int, nextCrossIn *time.Duration) {
for _, pod := range podList {
if pod.GetDeletionTimestamp() != nil {
continue
}

scheduledCondition, isUnschedulable := getPodScheduledCondition(pod)
if !isUnschedulable {
if scheduledCondition != nil && scheduledCondition.Status == corev1.ConditionTrue {
scheduledCount++
}
continue
}

Expand All @@ -51,8 +54,7 @@ func countUnschedulablePods(
nextCrossIn = &crossingThresholdIn
}
}

return unschedulableCount, nextCrossIn
return scheduledCount, unschedulableCount, nextCrossIn
}

func getPodScheduledCondition(pod *corev1.Pod) (scheduledCondition *corev1.PodCondition, isUnschedulable bool) {
Expand Down
16 changes: 9 additions & 7 deletions pkg/controllers/automigration/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ func doCheck(
threshold time.Duration,
pods []*corev1.Pod,
expectedUnschedulable int,
expectedScheduled int,
expectedNextCrossIn *time.Duration,
) {
t.Helper()
assert := assert.New(t)

unschedulableCount, nextCrossIn := countUnschedulablePods(pods, now, threshold)
scheduledCount, unschedulableCount, nextCrossIn := countScheduledAndUnschedulablePods(pods, now, threshold)
assert.Equal(scheduledCount, expectedScheduled)
assert.Equal(expectedUnschedulable, unschedulableCount)
assert.Equal(expectedNextCrossIn, nextCrossIn)
}
Expand All @@ -56,26 +58,26 @@ func TestCountUnschedulablePods(t *testing.T) {
okPod,
okPod,
okPod,
}, 0, nil)
}, 0, 3, nil)

doCheck(t, now, time.Minute, []*corev1.Pod{
okPod,
okPod,
unschedulablePod,
}, 1, nil)
}, 1, 2, nil)

doCheck(t, now, time.Minute, []*corev1.Pod{
okPod,
okPod,
crossingIn10s,
}, 0, pointer.Duration(10*time.Second))
}, 0, 2, pointer.Duration(10*time.Second))

doCheck(t, now, time.Minute, []*corev1.Pod{
okPod,
okPod,
unschedulablePod,
crossingIn20s,
}, 1, pointer.Duration(20*time.Second))
}, 1, 2, pointer.Duration(20*time.Second))

doCheck(t, now, time.Minute, []*corev1.Pod{
okPod,
Expand All @@ -84,7 +86,7 @@ func TestCountUnschedulablePods(t *testing.T) {
unschedulablePod,
crossingIn10s,
crossingIn20s,
}, 2, pointer.Duration(10*time.Second))
}, 2, 2, pointer.Duration(10*time.Second))

doCheck(t, now, time.Minute, []*corev1.Pod{
okPod,
Expand All @@ -93,7 +95,7 @@ func TestCountUnschedulablePods(t *testing.T) {
unschedulableTerminatingPod,
crossingIn10s,
crossingIn20s,
}, 1, pointer.Duration(10*time.Second))
}, 1, 2, pointer.Duration(10*time.Second))
}

func newPod(terminating bool, schedulable bool, lastTransitionTimestamp time.Time) *corev1.Pod {
Expand Down
1 change: 1 addition & 0 deletions pkg/controllers/scheduler/framework/plugins/names/names.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ const (
MaxCluster = "MaxCluster"
ClusterCapacityWeight = "ClusterCapacityWeight"
ClusterEvicted = "ClusterEvicted"
PreferenceBinPack = "PreferenceBinPack"
)
Loading

0 comments on commit 863b6ef

Please sign in to comment.