Skip to content

Commit

Permalink
fix: host installation tasks (#82)
Browse files Browse the repository at this point in the history
Signed-off-by: liangjunhao <3220663807@qq.com>
  • Loading branch information
paterleng authored Jul 26, 2024
1 parent 9026d1b commit 4d0e890
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 52 deletions.
194 changes: 142 additions & 52 deletions internal/task/init_rainbond_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,24 @@ import (
"context"
"encoding/json"
"fmt"
"goodrain.com/cloud-adaptor/internal/adaptor/rke"
"k8s.io/apimachinery/pkg/fields"
ktype "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"runtime/debug"
"strings"
"time"

rainbondv1alpha1 "github.com/goodrain/rainbond-operator/api/v1alpha1"
"github.com/nsqio/go-nsq"
"github.com/rancher/rke/k8s"
"github.com/sirupsen/logrus"
apiv1 "goodrain.com/cloud-adaptor/api/cloud-adaptor/v1"
ccv1 "goodrain.com/cloud-adaptor/api/cloud-adaptor/v1"
"goodrain.com/cloud-adaptor/internal/adaptor/factory"
"goodrain.com/cloud-adaptor/internal/types"
"goodrain.com/cloud-adaptor/internal/usecase"
"goodrain.com/cloud-adaptor/pkg/util/constants"
"goodrain.com/cloud-adaptor/pkg/util/versionutil"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand All @@ -52,87 +57,172 @@ func (c *InitRainbondCluster) rollback(step, message, status string) {
c.result <- apiv1.Message{StepType: step, Message: message, Status: status}
}

func (c *InitRainbondCluster) Rollback(step, message, status string) {
if status == "failure" {
logrus.Errorf("%s failure, Message: %s", step, message)
// CheckKubernetesStatus Check kubernetes status
func (c *InitRainbondCluster) CheckKubernetesStatus(clientset *kubernetes.Clientset) (bool, error) {
nodeList, err := clientset.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{})
if err != nil {
return false, err
}
c.result <- apiv1.Message{StepType: step, Message: message, Status: status}
if len(nodeList.Items) == 0 {
return false, nil
}
return true, err
}

// Run run take time 214.10s
func (c *InitRainbondCluster) Run(ctx context.Context) {
defer c.rollback("Close", "", "")
c.rollback("Init", "", "start")
// create adaptor
adaptor, err := factory.GetCloudFactory().GetRainbondClusterAdaptor(c.config.Provider, c.config.AccessKey, c.config.SecretKey)
if err != nil {
c.rollback("Init", fmt.Sprintf("create cloud adaptor failure %s", err.Error()), "failure")
return
}
func (c *InitRainbondCluster) CheckOperatorStatus(ctx context.Context, clientset *kubernetes.Clientset) error {
//通过一个定时器来控制检测时间
ticker := time.NewTicker(time.Second * 5)
timer := time.NewTimer(time.Minute * 60)
defer ticker.Stop()
defer timer.Stop()
for {
select {
case <-ctx.Done():
return fmt.Errorf("context cancel")
case <-ticker.C:
case <-timer.C:
return nil
}

c.rollback("Init", "cloud adaptor create success", "success")
c.rollback("CheckCluster", "", "start")
// get kubernetes cluster info
cluster, err := adaptor.DescribeCluster(c.config.EnterpriseID, c.config.ClusterID)
if err != nil {
cluster, err = adaptor.DescribeCluster(c.config.EnterpriseID, c.config.ClusterID)
roPods, err := clientset.CoreV1().Pods(constants.Namespace).List(ctx, metav1.ListOptions{
LabelSelector: fields.SelectorFromSet(map[string]string{
"release": "rainbond",
}).String(),
})
if err != nil {
c.rollback("CheckCluster", err.Error(), "failure")
return
return fmt.Errorf("get rainbond-operator pod failed:%s", err)
}

if len(roPods.Items) == 0 {
continue
}
if roPods.Items[0].Status.Phase == "Running" {
break
}
}
// check cluster status
if cluster.State != "running" {
c.rollback("CheckCluster", fmt.Sprintf("cluster status is %s,not support init rainbond", cluster.State), "failure")
return
c.rollback("InitRainbondOperator", "", "success")
return nil
}

func (c *InitRainbondCluster) CheckClusterStatus(ctx context.Context) error {
adapter, _ := rke.Create()
kubeConfig, err := adapter.GetKubeConfig(c.config.EnterpriseID, c.config.ClusterID)
_, runtimeClient, err := kubeConfig.GetKubeClient()
if err != nil {
logrus.Infof("get kubeclient failure %s", err.Error())
return err
}
// check cluster version
if !versionutil.CheckVersion(cluster.KubernetesVersion) {
c.rollback("CheckCluster", fmt.Sprintf("current cluster version is %s, init rainbond support kubernetes version is 1.19.x-1.29.x", cluster.KubernetesVersion), "failure")
return

var cluster rainbondv1alpha1.RainbondCluster
ticker := time.NewTicker(time.Second * 10)
timer := time.NewTimer(time.Minute * 60)
defer timer.Stop()
defer ticker.Stop()
var initRainbondCluster bool
for {
select {
case <-ctx.Done():
return fmt.Errorf("context cancel")
case <-ticker.C:
case <-timer.C:
return fmt.Errorf("check cluster status failure")
}
ctx2, cancel2 := context.WithTimeout(context.Background(), time.Second*5)
defer cancel2()
err = runtimeClient.Get(ctx2, ktype.NamespacedName{Name: constants.RainbondCluster, Namespace: constants.Namespace}, &cluster)
if err != nil {
logrus.Errorf("get cluster failure %s", err.Error())
return err
}
//获取到cluster信息后,进行数据校验
for _, condition := range cluster.Status.Conditions {
if condition.Type == rainbondv1alpha1.RainbondClusterConditionTypeStorage {
continue
}

status, msg := c.HandleClusterStatus(cluster, condition.Type)
if strings.Contains(msg.Message, "not ready") {
break
}
if !status && msg.Message != "" {
c.rollback(msg.StepType, msg.Message, "failure")
return fmt.Errorf("get clusterType %s failure%s:", msg.StepType, msg.Message)
}
//更新状态为成功
c.rollback(msg.StepType, "", "success")
logrus.Infof("get clusterType %s success", msg.StepType)
if condition.Type == "ImageRepository" {
initRainbondCluster = true
}
}
if initRainbondCluster {
break
}
}
// check cluster connection status
logrus.Infof("init kubernetes url %s", cluster.MasterURL)
if cluster.MasterURL.APIServerEndpoint == "" {
c.rollback("CheckCluster", "cluster api not open eip,not support init rainbond", "failure")
c.rollback("InitRainbondCluster", "", "success")

return nil
}

func (c *InitRainbondCluster) HandleClusterStatus(cluster rainbondv1alpha1.RainbondCluster, clusterType rainbondv1alpha1.RainbondClusterConditionType) (status bool, msg ccv1.Message) {
//如果成功就更新状态
if idx, condition := cluster.Status.GetCondition((clusterType)); idx != -1 && condition.Status == v1.ConditionTrue {
status = true
msg.StepType = string(condition.Type)
} else if condition.Status == v1.ConditionFalse {
// 拿到这里面的一些报错信息去展示,并且退出本次安装
msg.Status = string(condition.Status)
msg.Message = condition.Message
msg.StepType = string(condition.Type)
status = false
return
}
return
}

// Run run take time 214.10s
func (c *InitRainbondCluster) Run(ctx context.Context) {
defer c.rollback("Close", "", "")
c.rollback("Init", "", "start")
adaptor, err := factory.GetCloudFactory().GetRainbondClusterAdaptor(c.config.Provider, c.config.AccessKey, c.config.SecretKey)
kubeConfig, err := adaptor.GetKubeConfig(c.config.EnterpriseID, c.config.ClusterID)
if err != nil {
kubeConfig, err = adaptor.GetKubeConfig(c.config.EnterpriseID, c.config.ClusterID)
if err != nil {
logrus.Errorf("get kubeconfig failure:%s", err.Error())
c.rollback("CheckCluster", fmt.Sprintf("get kube config failure %s", err.Error()), "failure")
return
}
}

// check cluster not init rainbond
coreClient, _, err := kubeConfig.GetKubeClient()
if err != nil {
c.rollback("CheckCluster", fmt.Sprintf("get kube config failure %s", err.Error()), "failure")
return
}

// get cluster node lists
getctx, cancel := context.WithTimeout(ctx, time.Second*10)
nodes, err := coreClient.CoreV1().Nodes().List(getctx, metav1.ListOptions{})
// 检测k8s状态
status, err := c.CheckKubernetesStatus(coreClient)
if !status {
c.rollback("CheckKubernetes", fmt.Sprintf("Kubernetes connection failed %s", err.Error()), "failure")
logrus.Errorf("Kubernetes connection failed")
return
}
c.rollback("CheckKubernetes", c.config.ClusterID, "success")

//安装后检测operator的状态
err = c.CheckOperatorStatus(ctx, coreClient)
if err != nil {
nodes, err = coreClient.CoreV1().Nodes().List(getctx, metav1.ListOptions{})
cancel()
if err != nil {
logrus.Errorf("get kubernetes cluster node failure %s", err.Error())
c.rollback("CheckCluster", "cluster node list can not found, please check cluster public access and account authorization", "failure")
return
}
} else {
cancel()
c.rollback("CheckOperator", fmt.Sprintf("operator check failed %s", err.Error()), "failure")
logrus.Errorf("operator detection failed %s", err.Error())
return
}
if len(nodes.Items) == 0 {
c.rollback("CheckCluster", "node num is 0, can not init rainbond", "failure")
//检测cluster的状态
err = c.CheckClusterStatus(ctx)
if err != nil {
logrus.Errorf("detection failed cluster: %s", err)
return
}
c.rollback("CheckCluster", c.config.ClusterID, "success")

}

// GetRainbondGatewayNodeAndChaosNodes get gateway nodes
Expand Down
7 changes: 7 additions & 0 deletions internal/usecase/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,13 @@ func (c *ClusterUsecase) CreateTaskEvent(em *v1.EventMessage) (*model.TaskEvent,
}
logrus.Infof("set init task %s status is inited", em.TaskID)
}
if em.Message.StepType == "InitRainbondCluster" && em.Message.Status == "success" {
if err := initRainbondTaskRepo.UpdateStatus(em.EnterpriseID, em.TaskID, "inited"); err != nil && err != gorm.ErrRecordNotFound {
ctx.Rollback()
return nil, err
}
logrus.Infof("set init task %s status is inited", em.TaskID)
}
if em.Message.StepType == "UpdateKubernetes" && em.Message.Status == "success" {
if err := c.UpdateKubernetesTaskRepo.Transaction(ctx).UpdateStatus(em.EnterpriseID, em.TaskID, "complete"); err != nil && err != gorm.ErrRecordNotFound {
ctx.Rollback()
Expand Down
2 changes: 2 additions & 0 deletions pkg/util/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ const (
CloudUpdate = "cloud-update"
// Namespace is the namespace for rainbond-operator and rainbond components
Namespace = "rbd-system"
// RainbondCluster -
RainbondCluster = "rainbondcluster"
)

0 comments on commit 4d0e890

Please sign in to comment.