Skip to content

Commit

Permalink
feat: change cron workflow container image when use mirrors (#84)
Browse files Browse the repository at this point in the history
  • Loading branch information
hysyeah authored Sep 26, 2024
1 parent 4a4883c commit bfcaabf
Show file tree
Hide file tree
Showing 9 changed files with 244 additions and 59 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ require (
github.com/gorilla/mux v1.8.0 // indirect
github.com/gosuri/uitable v0.0.4 // indirect
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-safetemp v1.0.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,7 @@ github.com/gosuri/uitable v0.0.4 h1:IG2xLKRvErL3uhY6e1BylFzG+aJiwQviDDTfOKeKTpY=
github.com/gosuri/uitable v0.0.4/go.mod h1:tKR86bXuXPZazfOTG1FIzvjIdXzd0mo4Vtn16vt0PJo=
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 h1:+ngKgrYPPJrOjhax5N+uePQ0Fh1Z7PheYoUI/0nzkPA=
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q=
github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8=
Expand Down
1 change: 1 addition & 0 deletions pkg/apiserver/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func (h *Handler) authenticate(req *restful.Request, resp *restful.Response, cha
"/app-service/v1/recommenddev/",
"/app-service/v1/provider-registry/validate",
"/app-service/v1/pods/kubelet/eviction",
"/app-service/v1/workflow/inject",
}

needAuth := true
Expand Down
4 changes: 4 additions & 0 deletions pkg/apiserver/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ func (b *handlerBuilder) Build() (*Handler, error) {
if err != nil {
return nil, err
}
err = wh.CreateOrUpdateCronWorkflowMutatingWebhook()
if err != nil {
return nil, err
}

return &Handler{
kubeHost: b.ksHost,
Expand Down
82 changes: 82 additions & 0 deletions pkg/apiserver/handler_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"bytetrade.io/web3os/app-service/pkg/utils"
"bytetrade.io/web3os/app-service/pkg/webhook"

wfv1alpha1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/containerd/containerd/reference/docker"
"github.com/emicklei/go-restful/v3"
"github.com/google/uuid"
admissionv1 "k8s.io/api/admission/v1"
Expand All @@ -25,12 +27,18 @@ import (
"k8s.io/client-go/dynamic"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
)

var (
errNilAdmissionRequest = fmt.Errorf("nil admission request")
mirrorsEndpoint []string
)

func init() {
mirrorsEndpoint = utils.GetMirrorsEndpoint()
}

const (
deployment = "Deployment"
statefulSet = "StatefulSet"
Expand Down Expand Up @@ -557,3 +565,77 @@ func (h *Handler) kubeletPodEviction(req *restful.Request, resp *restful.Respons
}
klog.Infof("Done kubeletPodEviction admission request with uuid=%s, namespace=%s", proxyUUID, requestForNamespace)
}

func (h *Handler) cronWorkflowInject(req *restful.Request, resp *restful.Response) {
klog.Infof("Received cron workflow mutating webhook request: Method=%v, URL=%v", req.Request.Method, req.Request.URL)
admissionRequestBody, ok := h.sidecarWebhook.GetAdmissionRequestBody(req, resp)
if !ok {
klog.Errorf("Failed to get admission request body")
return
}
var admissionReq, admissionResp admissionv1.AdmissionReview
proxyUUID := uuid.New()
if _, _, err := webhook.Deserializer.Decode(admissionRequestBody, nil, &admissionReq); err != nil {
klog.Errorf("Failed to decoding admission request body err=%v", err)
admissionResp.Response = h.sidecarWebhook.AdmissionError(err)
} else {
admissionResp.Response = h.cronWorkflowMutate(req.Request.Context(), admissionReq.Request, proxyUUID)
}
admissionResp.TypeMeta = admissionReq.TypeMeta
admissionResp.Kind = admissionReq.Kind

requestForNamespace := "unknown"
if admissionReq.Request != nil {
requestForNamespace = admissionReq.Request.Namespace
}

err := resp.WriteAsJson(&admissionResp)
if err != nil {
klog.Infof("cron workflow: write response failed namespace=%s, err=%v", requestForNamespace, err)
return
}
klog.Infof("Done cron workflow injection admission request with uuid=%s, namespace=%s", proxyUUID, requestForNamespace)
}

func (h *Handler) cronWorkflowMutate(ctx context.Context, req *admissionv1.AdmissionRequest, proxyUUID uuid.UUID) *admissionv1.AdmissionResponse {
if req == nil {
klog.Error("Failed to get admission request err=admission request is nil")
return h.sidecarWebhook.AdmissionError(errNilAdmissionRequest)
}
resp := &admissionv1.AdmissionResponse{
Allowed: true,
UID: req.UID,
}

var wf wfv1alpha1.CronWorkflow
err := json.Unmarshal(req.Object.Raw, &wf)
if err != nil {
klog.Errorf("Failed to unmarshal request object raw with uuid=%s namespace=%s", proxyUUID, req.Namespace)
return resp
}
for i, t := range wf.Spec.WorkflowSpec.Templates {
if t.Container == nil || t.Container.Image == "" {
continue
}
ref, err := docker.ParseDockerRef(t.Container.Image)
if err != nil {
continue
}
newImage := utils.ReplacedImageRef(mirrorsEndpoint, ref.String(), false)
wf.Spec.WorkflowSpec.Templates[i].Container.Image = newImage
}
original := req.Object.Raw
current, err := json.Marshal(wf)
if err != nil {
klog.Errorf("Failed to marshal cron workflow err=%v", err)
return resp
}
admissionResponse := admission.PatchResponseFromRaw(original, current)
patchBytes, err := json.Marshal(admissionResponse.Patches)
if err != nil {
klog.Errorf("Failed to marshal cron workflow patch bytes err=%v", err)
return resp
}
h.sidecarWebhook.PatchAdmissionResponse(resp, patchBytes)
return resp
}
7 changes: 7 additions & 0 deletions pkg/apiserver/webservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,13 @@ func addServiceToContainer(c *restful.Container, handler *Handler) error {
Returns(http.StatusOK, "pod eviction validated success", nil)).
Consumes(restful.MIME_JSON)

ws.Route(ws.POST("/workflow/inject").
To(handler.cronWorkflowInject).
Doc("mutating webhook for cron workflow").
Metadata(restfulspec.KeyOpenAPITags, MODULE_TAGS).
Returns(http.StatusOK, "cron workflow inject success", nil)).
Consumes(restful.MIME_JSON)

ws.Route(ws.POST("/gpulimit/inject").
To(handler.gpuLimitInject).
Doc("add resources limits for deployment/statefulset").
Expand Down
62 changes: 3 additions & 59 deletions pkg/images/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@ import (
"context"
"errors"
"fmt"
"net"
"net/url"
"os"
"strings"
"time"

appv1alpha1 "bytetrade.io/web3os/app-service/api/app.bytetrade.io/v1alpha1"
"bytetrade.io/web3os/app-service/pkg/utils"

"github.com/containerd/containerd"
"github.com/containerd/containerd/cmd/ctr/commands"
Expand All @@ -20,20 +18,18 @@ import (
"github.com/containerd/containerd/platforms"
refdocker "github.com/containerd/containerd/reference/docker"
"github.com/containerd/containerd/remotes/docker"
srvconfig "github.com/containerd/containerd/services/server/config"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
)

const defaultRegistry = "https://registry-1.docker.io"
const maxRetries = 6

var sock = "/var/run/containerd/containerd.sock"
var mirrorsEndpoint []string

func init() {
mirrorsEndpoint = getMirrorsEndpoint()
mirrorsEndpoint = utils.GetMirrorsEndpoint()
klog.Infof("mirrorsEndPoint: %v", mirrorsEndpoint)
}

Expand Down Expand Up @@ -77,7 +73,7 @@ func (is *imageService) PullImage(ctx context.Context, ref appv1alpha1.Ref, opts
ref.Name = originNamed.String()
config := newFetchConfig()
// replaced image ref
replacedRef := replacedImageRef(originNamed.String())
replacedRef := utils.ReplacedImageRef(mirrorsEndpoint, originNamed.String(), true)

ongoing := newJobs(replacedRef, originNamed.String())

Expand Down Expand Up @@ -247,58 +243,6 @@ func newFetchConfig() *content.FetchConfig {
return config
}

func getMirrorsEndpoint() (ep []string) {
config := &srvconfig.Config{}
err := srvconfig.LoadConfig("/etc/containerd/config.toml", config)
if err != nil {
klog.Infof("load mirrors endpoint failed err=%v", err)
return
}
plugins := config.Plugins["io.containerd.grpc.v1.cri"]
r := plugins.GetPath([]string{"registry", "mirrors", "docker.io", "endpoint"})
if r == nil {
return
}
for _, e := range r.([]interface{}) {
ep = append(ep, e.(string))
}
return ep
}

func replacedImageRef(oldImageRef string) string {
if len(mirrorsEndpoint) == 0 {
return oldImageRef
}
for _, e := range mirrorsEndpoint {
if e != "" && e != defaultRegistry {
url, err := url.Parse(e)
if err != nil {
continue
}
if url.Scheme == "http" {
continue
}
host := url.Host
if !hasPort(url.Host) {
host = net.JoinHostPort(url.Host, "443")
}
conn, err := net.DialTimeout("tcp", host, 2*time.Second)
if err != nil {
continue
}
if conn != nil {
conn.Close()
}
parts := strings.Split(oldImageRef, "/")
parts[0] = url.Host
return strings.Join(parts, "/")
}
}
return oldImageRef
}

func hasPort(s string) bool { return strings.LastIndex(s, ":") > strings.LastIndex(s, "]") }

func fetchCtx(client *containerd.Client, remoteOpts ...containerd.RemoteOpt) (*containerd.RemoteContext, error) {
rCtx := &containerd.RemoteContext{
Resolver: docker.NewResolver(docker.ResolverOptions{}),
Expand Down
63 changes: 63 additions & 0 deletions pkg/utils/k8sutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@ package utils

import (
"context"
"net"
"net/url"
"strings"
"time"

"bytetrade.io/web3os/app-service/pkg/client/clientset"
"bytetrade.io/web3os/app-service/pkg/constants"

srvconfig "github.com/containerd/containerd/services/server/config"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
Expand All @@ -15,6 +20,7 @@ import (

// CalicoTunnelAddrAnnotation annotation key for calico tunnel address.
const CalicoTunnelAddrAnnotation = "projectcalico.org/IPv4IPIPTunnelAddr"
const DefaultRegistry = "https://registry-1.docker.io"

// GetAllNodesPodCIDRs returns all node pod's cidr.
func GetAllNodesPodCIDRs() (cidrs []string) {
Expand Down Expand Up @@ -106,3 +112,60 @@ func IsNodeReady(node *corev1.Node) bool {
}
return false
}

func GetMirrorsEndpoint() (ep []string) {
config := &srvconfig.Config{}
err := srvconfig.LoadConfig("/etc/containerd/config.toml", config)
if err != nil {
klog.Infof("load mirrors endpoint failed err=%v", err)
return
}
plugins := config.Plugins["io.containerd.grpc.v1.cri"]
r := plugins.GetPath([]string{"registry", "mirrors", "docker.io", "endpoint"})
if r == nil {
return
}
for _, e := range r.([]interface{}) {
ep = append(ep, e.(string))
}
return ep
}

func ReplacedImageRef(mirrorsEndpoint []string, oldImageRef string, checkConnection bool) string {
if len(mirrorsEndpoint) == 0 {
return oldImageRef
}
for _, ep := range mirrorsEndpoint {
if ep != "" && ep != DefaultRegistry {
url, err := url.Parse(ep)
if err != nil {
continue
}
if url.Scheme == "http" {
continue
}
if checkConnection {
host := url.Host
if !hasPort(url.Host) {
host = net.JoinHostPort(url.Host, "443")
}
conn, err := net.DialTimeout("tcp", host, 2*time.Second)
if err != nil {
continue
}
if conn != nil {
conn.Close()
}
}

parts := strings.Split(oldImageRef, "/")
klog.Infof("parts: %s", parts)
parts[0] = url.Host
klog.Infof("parts2: %s", parts)
return strings.Join(parts, "/")
}
}
return oldImageRef
}

func hasPort(s string) bool { return strings.LastIndex(s, ":") > strings.LastIndex(s, "]") }
Loading

0 comments on commit bfcaabf

Please sign in to comment.