Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
nvthongswansea committed Sep 8, 2024
1 parent cc2ad62 commit 1e4718c
Showing 1 changed file with 155 additions and 85 deletions.
240 changes: 155 additions & 85 deletions cluster-autoscaler/core/scaledown/actuation/actuator.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ type actuatorNodeGroupConfigGetter interface {

type NodeGroupWithNodes struct {
Group cloudprovider.NodeGroup
Nodes []*apiv1.Node
All []*apiv1.Node
Empty []*apiv1.Node
Drain []*apiv1.Node
}

// NewActuator returns a new instance of Actuator.
Expand Down Expand Up @@ -146,6 +148,86 @@ func (a *Actuator) StartDeletion(empty, drain []*apiv1.Node) (status.ScaleDownRe
return status.ScaleDownNodeDeleteStarted, scaledDownNodes, nil
}

// groupNodesByNodeGroup groups empty and drain nodes by their node group.
// If sortByNodeName is true, the nodes in each group will be sorted alphabetically by node name.
func (a *Actuator) groupNodesByNodeGroup(empty, drain, all []*apiv1.Node, sortByNodeName bool) (map[string]NodeGroupWithNodes, errors.AutoscalerError) {
grouped := map[string]NodeGroupWithNodes{}
for _, node := range empty {
nodeGroup, err := a.ctx.CloudProvider.NodeGroupForNode(node)
if err != nil {
return nil, errors.NewAutoscalerError(errors.CloudProviderError, "failed to find node group for %s: %v", node.Name, err)
}
if _, ok := grouped[nodeGroup.Id()]; !ok {
grouped[nodeGroup.Id()] = NodeGroupWithNodes{
Group: nodeGroup,
All: []*apiv1.Node{},
Empty: []*apiv1.Node{},
Drain: []*apiv1.Node{},
}
}
currentNodeGroupWithNodes := grouped[nodeGroup.Id()]
currentNodeGroupWithNodes.Empty = append(currentNodeGroupWithNodes.Empty, node)
grouped[nodeGroup.Id()] = currentNodeGroupWithNodes
}

for _, node := range drain {
nodeGroup, err := a.ctx.CloudProvider.NodeGroupForNode(node)
if err != nil {
return nil, errors.NewAutoscalerError(errors.CloudProviderError, "failed to find node group for %s: %v", node.Name, err)
}
if _, ok := grouped[nodeGroup.Id()]; !ok {
grouped[nodeGroup.Id()] = NodeGroupWithNodes{
Group: nodeGroup,
All: []*apiv1.Node{},
Empty: []*apiv1.Node{},
Drain: []*apiv1.Node{},
}
}
currentNodeGroupWithNodes := grouped[nodeGroup.Id()]
currentNodeGroupWithNodes.Drain = append(currentNodeGroupWithNodes.Drain, node)
grouped[nodeGroup.Id()] = currentNodeGroupWithNodes
}

for _, node := range all {
nodeGroup, err := a.ctx.CloudProvider.NodeGroupForNode(node)
if err != nil {
return nil, errors.NewAutoscalerError(errors.CloudProviderError, "failed to find node group for %s: %v", node.Name, err)
}
if _, ok := grouped[nodeGroup.Id()]; !ok {
grouped[nodeGroup.Id()] = NodeGroupWithNodes{
Group: nodeGroup,
All: []*apiv1.Node{},
Empty: []*apiv1.Node{},
Drain: []*apiv1.Node{},
}
}
currentNodeGroupWithNodes := grouped[nodeGroup.Id()]
currentNodeGroupWithNodes.All = append(currentNodeGroupWithNodes.All, node)
grouped[nodeGroup.Id()] = currentNodeGroupWithNodes
}
// if sortByNodeName is true, sort the nodes alphabetically by node name in each group
if sortByNodeName {
for _, nodeGroupWithNodes := range grouped {
sort.Slice(nodeGroupWithNodes.Empty, func(i, j int) bool {
iNameLower := strings.ToLower(nodeGroupWithNodes.Empty[i].Name)
jNameLower := strings.ToLower(nodeGroupWithNodes.Empty[j].Name)
return iNameLower < jNameLower
})
sort.Slice(nodeGroupWithNodes.Drain, func(i, j int) bool {
iNameLower := strings.ToLower(nodeGroupWithNodes.Drain[i].Name)
jNameLower := strings.ToLower(nodeGroupWithNodes.Drain[j].Name)
return iNameLower < jNameLower
})
sort.Slice(nodeGroupWithNodes.All, func(i, j int) bool {
iNameLower := strings.ToLower(nodeGroupWithNodes.All[i].Name)
jNameLower := strings.ToLower(nodeGroupWithNodes.All[j].Name)
return iNameLower < jNameLower
})
}
}
return grouped, nil
}

// StartDeletionForGridscaleProvider triggers a new deletion process for gridscale provider.
// *NOTE* gridscale provider does not support deletion of specific nodes. Gridscale provider only supports
// scale up/down by changing the number of nodes in the cluster. For the case of scale down, the last n nodes are
Expand All @@ -154,25 +236,14 @@ func (a *Actuator) StartDeletion(empty, drain []*apiv1.Node) (status.ScaleDownRe
// 2. Replace the to-be-deleted nodes with the last n nodes in the cluster.
// 3. Taint & drain the to-be-deleted nodes.
// 4. Delete the last n nodes in the cluster.

// NOTE: I drain the wrong nodes.
func (a *Actuator) StartDeletionForGridscaleProvider(empty, drain, all []*apiv1.Node) (status.ScaleDownResult, []*status.ScaleDownNode, errors.AutoscalerError) {
a.nodeDeletionScheduler.ResetAndReportMetrics()
deletionStartTime := time.Now()
defer func() { metrics.UpdateDuration(metrics.ScaleDownNodeDeletion, time.Since(deletionStartTime)) }()
emptyToDelete := []*apiv1.Node{}
drainToDelete := []*apiv1.Node{}
emptyToDeleteNodeGroupViews, drainToDeleteNodeGroupViews := a.budgetProcessor.CropNodes(a.nodeDeletionTracker, empty, drain)
for _, bucket := range emptyToDeleteNodeGroupViews {
emptyToDelete = append(emptyToDelete, bucket.Nodes...)
}
for _, bucket := range drainToDeleteNodeGroupViews {
drainToDelete = append(drainToDelete, bucket.Nodes...)
}
if len(emptyToDelete) == 0 && len(drainToDelete) == 0 {
return status.ScaleDownNoNodeDeleted, nil, nil
}

// Count the number of nodes to be deleted.
nodesToDeleteCount := len(emptyToDelete) + len(drainToDelete)
nodesToDeleteCount := len(empty) + len(drain)

if nodesToDeleteCount >= len(all) {
// If the number of nodes to be deleted is greater than or equal to the number of nodes in the cluster,
Expand All @@ -182,88 +253,86 @@ func (a *Actuator) StartDeletionForGridscaleProvider(empty, drain, all []*apiv1.
"cannot delete nodes because the number of nodes to be deleted is greater than or equal to the number of nodes in the cluster. There has to be at least one node left in the cluster.",
)
}
klog.V(4).Info("[**]Original empty nodes to delete: ", len(emptyToDelete))
for _, node := range emptyToDelete {
klog.V(4).Infof("\t-\t%s\n", node.Name)
}
klog.V(4).Info("[**]Original drain nodes to delete: ", len(drainToDelete))
for _, node := range drainToDelete {
klog.V(4).Infof("\t-\t%s\n", node.Name)
}

// copy the all nodes (for safety) to a new slice and sort it
copiedAll := make([]*apiv1.Node, len(all))
copy(copiedAll, all)
sort.Slice(copiedAll, func(i, j int) bool {
return copiedAll[i].Name < copiedAll[j].Name
})
// Replace the to-be-deleted nodes with the last n nodes in the cluster.
var nodesToDelete []*apiv1.Node
if nodesToDeleteCount > 0 {
nodesToDelete = copiedAll[len(copiedAll)-nodesToDeleteCount:]
}
klog.V(4).Info("[**]New empty nodes to delete: ", len(emptyToDelete))
for _, node := range nodesToDelete {
klog.V(4).Infof("\t-\t%s\n", node.Name)
// Group the emtpy/drain nodes by node group.
nodesToDeleteByNodeGroup, err := a.groupNodesByNodeGroup(empty, drain, all, true)
if err != nil {
return status.ScaleDownError, nil, err
}

// Clean taint from OLD empty to-be-deleted nodes
for _, node := range emptyToDelete {
if _, err := taints.CleanDeletionCandidate(node, a.ctx.ClientSet); err != nil {
klog.Warningf("failed to clean taint DeletionCandidateTaint from node %s: %v", node.Name, err)
var scaledDownNodes []*status.ScaleDownNode
// Scale down nodes for each node group. One node group at a time.
for nodeGroupID, nodeGroupWithNodes := range nodesToDeleteByNodeGroup {
klog.V(4).Infof(" ------ Start scaling down nodes for node group %s", nodeGroupID)
emptyToDelete := []*apiv1.Node{}
drainToDelete := []*apiv1.Node{}
emptyToDeleteNodeGroupViews, drainToDeleteNodeGroupViews := a.budgetProcessor.CropNodes(
a.nodeDeletionTracker,
nodeGroupWithNodes.Empty,
nodeGroupWithNodes.Drain,
)
for _, bucket := range emptyToDeleteNodeGroupViews {
emptyToDelete = append(emptyToDelete, bucket.Nodes...)
}
if _, err := taints.CleanToBeDeleted(node, a.ctx.ClientSet, a.ctx.CordonNodeBeforeTerminate); err != nil {
klog.Warningf("failed to clean taint ToBeDeletedTaint from node %s: %v", node.Name, err)
for _, bucket := range drainToDeleteNodeGroupViews {
drainToDelete = append(drainToDelete, bucket.Nodes...)
}
}
// Clean taint from OLD nonempty to-be-deleted nodes
for _, node := range drainToDelete {
if _, err := taints.CleanDeletionCandidate(node, a.ctx.ClientSet); err != nil {
klog.Warningf("failed to clean taint DeletionCandidateTaint from node %s: %v", node.Name, err)
if len(emptyToDelete) == 0 && len(drainToDelete) == 0 {
return status.ScaleDownNoNodeDeleted, nil, nil
}

klog.V(4).Infof("[**]Original empty nodes in node group %s (count: %d):", nodeGroupID, len(emptyToDelete))
for _, node := range emptyToDelete {
klog.V(4).Infof("\t-\t%s\n", node.Name)
}
if _, err := taints.CleanToBeDeleted(node, a.ctx.ClientSet, a.ctx.CordonNodeBeforeTerminate); err != nil {
klog.Warningf("failed to clean taint ToBeDeletedTaint from node %s: %v", node.Name, err)
klog.V(4).Infof("[**]Original drain nodes in node group %s (count: %d):", nodeGroupID, len(drainToDelete))
for _, node := range drainToDelete {
klog.V(4).Infof("\t-\t%s\n", node.Name)
}
}

// do some sanity check
if len(nodesToDelete) <= 0 {
return status.ScaleDownError, nil, errors.NewAutoscalerError(
errors.InternalError,
"cannot delete nodes because there is no node to be deleted.",
)
}
for i, node := range nodesToDelete {
if node == nil {
// copy the all nodes (for safety).
copiedAllByGroup := make([]*apiv1.Node, len(nodeGroupWithNodes.All))
copy(copiedAllByGroup, nodeGroupWithNodes.All)
// Replace the to-be-deleted nodes with the last n nodes in the group.
var nodesToDelete []*apiv1.Node
if nodesToDeleteCount > 0 {
nodesToDelete = copiedAllByGroup[len(copiedAllByGroup)-nodesToDeleteCount:]
}
klog.V(4).Info("[**]New empty nodes to delete: ", len(nodesToDelete))
for _, node := range nodesToDelete {
klog.V(4).Infof("\t-\t%s\n", node.Name)
}

// Clean taint from OLD to-be-deleted nodes
oldToBeDeletedNodes := append(emptyToDelete, drainToDelete...)
for _, node := range oldToBeDeletedNodes {
if _, err := taints.CleanDeletionCandidate(node, a.ctx.ClientSet); err != nil {
klog.Warningf("failed to clean taint DeletionCandidateTaint from node %s: %v", node.Name, err)
}
if _, err := taints.CleanToBeDeleted(node, a.ctx.ClientSet, a.ctx.CordonNodeBeforeTerminate); err != nil {
klog.Warningf("failed to clean taint ToBeDeletedTaint from node %s: %v", node.Name, err)
}
}

// do some sanity check
if len(nodesToDelete) <= 0 {
return status.ScaleDownError, nil, errors.NewAutoscalerError(
errors.InternalError,
fmt.Sprintf("cannot delete nodes because the node at index %d of to-be-deleted nodes is nil.", i),
"cannot delete nodes because there is no node to be deleted.",
)
}
}

nodesToDeleteByNodeGroup := make(map[string]NodeGroupWithNodes)
// Get all node groups which contain nodes to be deleted.
for _, node := range nodesToDelete {
nodeGroup, cpErr := a.ctx.CloudProvider.NodeGroupForNode(node)
if cpErr != nil {
return status.ScaleDownError, nil, errors.NewAutoscalerError(errors.CloudProviderError, "failed to find node group for %s: %v", node.Name, cpErr)
}
if _, ok := nodesToDeleteByNodeGroup[nodeGroup.Id()]; !ok {
nodesToDeleteByNodeGroup[nodeGroup.Id()] = NodeGroupWithNodes{
Group: nodeGroup,
Nodes: []*apiv1.Node{},
for i, node := range nodesToDelete {
if node == nil {
return status.ScaleDownError, nil, errors.NewAutoscalerError(
errors.InternalError,
fmt.Sprintf("cannot delete nodes because the node at index %d of to-be-deleted nodes is nil.", i),
)
}
}
currentNodeGroupWithNodes := nodesToDeleteByNodeGroup[nodeGroup.Id()]
currentNodeGroupWithNodes.Nodes = append(currentNodeGroupWithNodes.Nodes, node)
nodesToDeleteByNodeGroup[nodeGroup.Id()] = currentNodeGroupWithNodes
}
var scaledDownNodes []*status.ScaleDownNode
for nodeGroupID, nodesToDeleteBucket := range nodesToDeleteByNodeGroup {

nodesToDeleteNodeGroupViews := []*budgets.NodeGroupView{
{
Nodes: nodesToDeleteBucket.Nodes,
Nodes: nodesToDelete,
},
}

Expand All @@ -277,13 +346,13 @@ func (a *Actuator) StartDeletionForGridscaleProvider(empty, drain, all []*apiv1.
// Clean taint from NEW to-be-deleted nodes after scale down. We don't care about the error here.
defer func() {
klog.V(4).Infof("Cleaning taint from to-be-deleted nodes for node group %s", nodeGroupID)
for _, node := range nodesToDeleteBucket.Nodes {
for _, node := range nodesToDelete {
taints.CleanToBeDeleted(node, a.ctx.ClientSet, a.ctx.CordonNodeBeforeTerminate)
}
}()
klog.V(4).Infof("Finish tainting to-be-deleted nodes for node group %s", nodeGroupID)

for _, drainNode := range nodesToDeleteBucket.Nodes {
for _, drainNode := range nodesToDelete {
if sdNode, err := a.scaleDownNodeToReport(drainNode, true); err == nil {
klog.V(0).Infof("Scale-down: removing node %s, utilization: %v, pods to reschedule: %s", drainNode.Name, sdNode.UtilInfo, joinPodNames(sdNode.EvictedPods))
a.ctx.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDown", "Scale-down: removing node %s, utilization: %v, pods to reschedule: %s", drainNode.Name, sdNode.UtilInfo, joinPodNames(sdNode.EvictedPods))
Expand All @@ -295,15 +364,15 @@ func (a *Actuator) StartDeletionForGridscaleProvider(empty, drain, all []*apiv1.

klog.V(4).Infof("Draining to-be-deleted nodes for node group %s", nodeGroupID)
// Drain to-be-deleted nodes synchronously.
finishFuncList, cpErr := a.drainNodesSyncForGridscaleProvider(nodeGroupID, nodesToDeleteBucket.Nodes)
finishFuncList, cpErr := a.drainNodesSyncForGridscaleProvider(nodeGroupID, nodesToDelete)
if cpErr != nil {
return status.ScaleDownError, nil, errors.NewAutoscalerError(errors.CloudProviderError, "failed to drain nodes: %v", cpErr)
}
klog.V(4).Infof("Finish draining to-be-deleted nodes for node group %s", nodeGroupID)

klog.V(4).Infof("Start scaling down nodes for node group %s", nodeGroupID)
// Delete the last n nodes in the cluster.
dErr := nodesToDeleteBucket.Group.DeleteNodes(nodesToDeleteBucket.Nodes)
dErr := nodeGroupWithNodes.Group.DeleteNodes(nodesToDelete)
if dErr != nil {
for _, finishFunc := range finishFuncList {
finishFunc(status.NodeDeleteErrorFailedToDelete, dErr)
Expand All @@ -313,6 +382,7 @@ func (a *Actuator) StartDeletionForGridscaleProvider(empty, drain, all []*apiv1.
for _, finishFunc := range finishFuncList {
finishFunc(status.NodeDeleteOk, nil)
}
klog.V(4).Infof(" ------ Finish scaling down nodes for node group %s", nodeGroupID)
}
klog.V(4).Infof("Finish scaling down nodes")
return status.ScaleDownNodeDeleteStarted, scaledDownNodes, nil
Expand Down

0 comments on commit 1e4718c

Please sign in to comment.