diff --git a/.github/workflows/build-push-huggingface-model-loader.yml b/.github/workflows/build-push-huggingface-model-loader.yml new file mode 100644 index 00000000..2dceb151 --- /dev/null +++ b/.github/workflows/build-push-huggingface-model-loader.yml @@ -0,0 +1,61 @@ +name: Build and Push huggingface-model-loader Docker image +on: + push: + branches: + - main + tags: + - "v*.*.*" + paths-ignore: + - '**/README.md' + pull_request: + +# Defines two custom environment variables for the workflow. These are used for the Container registry domain, and a name for the Docker image that this workflow builds. +env: + REGISTRY: ghcr.io + IMAGE_NAME: substratusai/huggingface-model-loader + +jobs: + huggingface-model-loader: + runs-on: ubuntu-latest + # Sets the permissions granted to the `GITHUB_TOKEN` for the actions in this job. + permissions: + contents: read + packages: write + steps: + - name: Checkout repository + uses: actions/checkout@v4 + - name: Set up QEMU + uses: docker/setup-qemu-action@v3 + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + - name: Log in to the Container registry + if: github.event_name == 'push' + uses: docker/login-action@v3 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Login to docker.io + if: github.event_name == 'push' + uses: docker/login-action@v3 + with: + username: ${{ vars.DOCKERHUB_USERNAME }} + password: ${{ secrets.DOCKERHUB_TOKEN }} + - name: Extract metadata (tags, labels) for Docker + id: meta + uses: docker/metadata-action@v5 + with: + images: | + ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + ${{ env.IMAGE_NAME }} + - name: Build and push Docker image + uses: docker/build-push-action@v6 + with: + context: ./components/huggingface-model-loader + platforms: linux/amd64,linux/arm64 + push: ${{ github.event_name == 'push' }} + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} + cache-from: type=gha + cache-to: type=gha,mode=max diff --git a/.github/workflows/build-push.yml b/.github/workflows/build-push-kubeai.yml similarity index 97% rename from .github/workflows/build-push.yml rename to .github/workflows/build-push-kubeai.yml index 83b0fc4e..788d66a3 100644 --- a/.github/workflows/build-push.yml +++ b/.github/workflows/build-push-kubeai.yml @@ -1,4 +1,4 @@ -name: Build and Push Docker image +name: Build and Push kubeai Docker image on: push: branches: diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index e04bcf24..fca21dff 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -20,14 +20,14 @@ jobs: - name: Run integration tests run: make test-integration - e2e: + e2e-general: runs-on: ubuntu-latest # NOTE: Uncomment if we start getting limited on number of concurrent jobs # (due to rapid pushes, etc). #needs: unit-and-integration # No use in running e2e tests if integration tests fail. strategy: matrix: - testcase: ["quickstart", "openai-python-client", "faster-whisper", "autoscaler-restart"] + testcase: ["quickstart", "openai-python-client", "autoscaler-restart", "cache-shared-filesystem"] steps: - name: Checkout code uses: actions/checkout@v2 @@ -48,4 +48,36 @@ jobs: run: kind create cluster - name: Run the e2e testcase - run: make test-e2e-${{ matrix.testcase }} \ No newline at end of file + run: make test-e2e-${{ matrix.testcase }} + + e2e-engines: + runs-on: ubuntu-latest + # NOTE: Uncomment if we start getting limited on number of concurrent jobs + # (due to rapid pushes, etc). + #needs: unit-and-integration # No use in running e2e tests if integration tests fail. + strategy: + matrix: + engine: ["FasterWhisper"] # "VLLM", "Infinity", "OLlama" + # Run each test case with and without caching. + cacheProfile: ["", "e2e-test-kind-pv"] + steps: + - name: Checkout code + uses: actions/checkout@v2 + + - name: Install kind + run: | + curl -Lo ./kind https://kind.sigs.k8s.io/dl/v0.24.0/kind-linux-amd64 + chmod +x ./kind + sudo mv ./kind /usr/local/bin/kind + + - name: Install helm + run: | + curl -fsSL -o get_helm.sh https://raw.githubusercontent.com/helm/helm/master/scripts/get-helm-3 + chmod 700 get_helm.sh + ./get_helm.sh + + - name: Start kind cluster + run: kind create cluster + + - name: Run the e2e testcase + run: make test-e2e-engine ENGINE=${{ matrix.engine }} CACHE_PROFILE=${{ matrix.cacheProfile }} \ No newline at end of file diff --git a/Makefile b/Makefile index 3ea71350..1f444125 100644 --- a/Makefile +++ b/Makefile @@ -79,17 +79,21 @@ test-integration: fmt vet envtest test-e2e-quickstart: skaffold ./test/e2e/run.sh quickstart -.PHONY: test-e2e-faster-whisper -test-e2e-faster-whisper: skaffold - ./test/e2e/run.sh faster-whisper --profile kubeai-only - .PHONY: test-e2e-openai-python-client test-e2e-openai-python-client: skaffold - ./test/e2e/run.sh openai-python-client --profile kubeai-only + ./test/e2e/run.sh openai-python-client --profile e2e-test-default .PHONY: test-e2e-autoscaler-restart test-e2e-autoscaler-restart: skaffold - ./test/e2e/run.sh autoscaler-restart --profile kubeai-only-rapid-scaling + ./test/e2e/run.sh autoscaler-restart --profile e2e-test-autoscaler-restart + +.PHONY: test-e2e-cache-shared-filesystem +test-e2e-cache-shared-filesystem: skaffold + ./test/e2e/run.sh cache-shared-filesystem --profile e2e-test-default + +.PHONY: test-e2e-engine +test-e2e-engine: skaffold + CACHE_PROFILE=$(CACHE_PROFILE) ./test/e2e/run.sh engine-$(ENGINE) --profile e2e-test-default .PHONY: lint lint: golangci-lint ## Run golangci-lint linter diff --git a/api/v1/constants.go b/api/v1/metadata.go similarity index 79% rename from api/v1/constants.go rename to api/v1/metadata.go index 959e17bc..db98166a 100644 --- a/api/v1/constants.go +++ b/api/v1/metadata.go @@ -14,4 +14,10 @@ const ( // Use in conjunction with --allow-pod-address-override for development purposes. ModelPodIPAnnotation = "model-pod-ip" ModelPodPortAnnotation = "model-pod-port" + + ModelCacheEvictionFinalizer = "kubeai.org/cache-eviction" ) + +func PVCModelAnnotation(modelName string) string { + return "models.kubeai.org/" + modelName +} diff --git a/api/v1/model_types.go b/api/v1/model_types.go index 282ba9f6..fb2d28b6 100644 --- a/api/v1/model_types.go +++ b/api/v1/model_types.go @@ -21,11 +21,16 @@ import ( ) // ModelSpec defines the desired state of Model. +// +kubebuilder:validation:XValidation:rule="!has(self.cacheProfile) || self.url.startsWith(\"hf://\")", message="cacheProfile is only supported with a huggingface url (\"hf://...\") at the moment." +// +kubebuilder:validation:XValidation:rule="!has(self.maxReplicas) || self.minReplicas <= self.maxReplicas", message="minReplicas should be less than or equal to maxReplicas." type ModelSpec struct { // URL of the model to be served. // Currently only the following formats are supported: // For VLLM & FasterWhisper engines: "hf:///" // For OLlama engine: "ollama:// + // +kubebuilder:validation:Required + // +kubebuilder:validation:XValidation:rule="self == oldSelf", message="url is immutable." + // +kubebuilder:validation:XValidation:rule="self.startsWith(\"hf://\") || self.startsWith(\"ollama://\")", message="url must start with \"hf://\" or \"ollama://\" and not be empty." URL string `json:"url"` // Features that the model supports. @@ -34,6 +39,7 @@ type ModelSpec struct { // Engine to be used for the server process. // +kubebuilder:validation:Enum=OLlama;VLLM;FasterWhisper;Infinity + // +kubebuilder:validation:Required Engine string `json:"engine"` // ResourceProfile required to serve the model. @@ -42,6 +48,11 @@ type ModelSpec struct { // Must be a valid ResourceProfile defined in the system config. ResourceProfile string `json:"resourceProfile,omitempty"` + // CacheProfile to be used for caching model artifacts. + // Must be a valid CacheProfile defined in the system config. + // +kubebuilder:validation:XValidation:rule="self == oldSelf", message="cacheProfile is immutable." + CacheProfile string `json:"cacheProfile,omitempty"` + // Image to be used for the server process. // Will be set from ResourceProfile + Engine if not specified. Image string `json:"image,omitempty"` @@ -110,6 +121,7 @@ const ( // ModelStatus defines the observed state of Model. type ModelStatus struct { Replicas ModelStatusReplicas `json:"replicas,omitempty"` + Cache *ModelStatusCache `json:"cache,omitempty"` } type ModelStatusReplicas struct { @@ -117,6 +129,10 @@ type ModelStatusReplicas struct { Ready int32 `json:"ready"` } +type ModelStatusCache struct { + Loaded bool `json:"loaded"` +} + // +kubebuilder:object:root=true // +kubebuilder:subresource:status // +kubebuilder:subresource:scale:specpath=.spec.replicas,statuspath=.status.replicas.all diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index ddd8858e..360cc351 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -30,7 +30,7 @@ func (in *Model) DeepCopyInto(out *Model) { out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) in.Spec.DeepCopyInto(&out.Spec) - out.Status = in.Status + in.Status.DeepCopyInto(&out.Status) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Model. @@ -139,6 +139,11 @@ func (in *ModelSpec) DeepCopy() *ModelSpec { func (in *ModelStatus) DeepCopyInto(out *ModelStatus) { *out = *in out.Replicas = in.Replicas + if in.Cache != nil { + in, out := &in.Cache, &out.Cache + *out = new(ModelStatusCache) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ModelStatus. @@ -151,6 +156,21 @@ func (in *ModelStatus) DeepCopy() *ModelStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ModelStatusCache) DeepCopyInto(out *ModelStatusCache) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ModelStatusCache. +func (in *ModelStatusCache) DeepCopy() *ModelStatusCache { + if in == nil { + return nil + } + out := new(ModelStatusCache) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ModelStatusReplicas) DeepCopyInto(out *ModelStatusReplicas) { *out = *in diff --git a/charts/kubeai/templates/configmap.yaml b/charts/kubeai/templates/configmap.yaml index 85f72b73..084a0421 100644 --- a/charts/kubeai/templates/configmap.yaml +++ b/charts/kubeai/templates/configmap.yaml @@ -10,8 +10,12 @@ data: huggingface: {{ include "kubeai.huggingfaceSecretName" . }} resourceProfiles: {{- .Values.resourceProfiles | toYaml | nindent 6 }} + cacheProfiles: + {{- .Values.cacheProfiles | toYaml | nindent 6 }} modelServers: {{- .Values.modelServers | toYaml | nindent 6 }} + modelLoaders: + {{- .Values.modelLoaders | toYaml | nindent 6 }} modelRollouts: {{- .Values.modelRollouts | toYaml | nindent 6 }} modelServerPods: diff --git a/charts/kubeai/templates/crds/kubeai.org_models.yaml b/charts/kubeai/templates/crds/kubeai.org_models.yaml index 3eb65205..4eda918c 100644 --- a/charts/kubeai/templates/crds/kubeai.org_models.yaml +++ b/charts/kubeai/templates/crds/kubeai.org_models.yaml @@ -49,6 +49,14 @@ spec: AutoscalingDisabled will stop the controller from managing the replicas for the Model. When disabled, metrics will not be collected on server Pods. type: boolean + cacheProfile: + description: |- + CacheProfile to be used for caching model artifacts. + Must be a valid CacheProfile defined in the system config. + type: string + x-kubernetes-validations: + - message: cacheProfile is immutable. + rule: self == oldSelf engine: description: Engine to be used for the server process. enum: @@ -134,6 +142,11 @@ spec: For VLLM & FasterWhisper engines: "hf:///" For OLlama engine: "ollama:// type: string + x-kubernetes-validations: + - message: url is immutable. + rule: self == oldSelf + - message: url must start with "hf://" or "ollama://" and not be empty. + rule: self.startsWith("hf://") || self.startsWith("ollama://") required: - engine - features @@ -141,9 +154,22 @@ spec: - targetRequests - url type: object + x-kubernetes-validations: + - message: cacheProfile is only supported with a huggingface url ("hf://...") + at the moment. + rule: '!has(self.cacheProfile) || self.url.startsWith("hf://")' + - message: minReplicas should be less than or equal to maxReplicas. + rule: '!has(self.maxReplicas) || self.minReplicas <= self.maxReplicas' status: description: ModelStatus defines the observed state of Model. properties: + cache: + properties: + loaded: + type: boolean + required: + - loaded + type: object replicas: properties: all: diff --git a/charts/kubeai/templates/role.yaml b/charts/kubeai/templates/role.yaml index 3f1ed18f..51c4f356 100644 --- a/charts/kubeai/templates/role.yaml +++ b/charts/kubeai/templates/role.yaml @@ -12,6 +12,32 @@ rules: verbs: - create - delete + - deletecollection + - get + - list + - patch + - update + - watch +- apiGroups: + - "batch" + resources: + - jobs + verbs: + - create + - delete + - deletecollection + - get + - list + - patch + - update + - watch +- apiGroups: + - "" + resources: + - persistentvolumeclaims + verbs: + - create + - delete - get - list - patch diff --git a/charts/kubeai/values-gke.yaml b/charts/kubeai/values-gke.yaml index 10d1e379..ba15b8b1 100644 --- a/charts/kubeai/values-gke.yaml +++ b/charts/kubeai/values-gke.yaml @@ -36,3 +36,11 @@ resourceProfiles: nodeSelector: cloud.google.com/gke-tpu-accelerator: tpu-v5-lite-podslice cloud.google.com/gke-tpu-topology: "2x4" + +cacheProfiles: + standard-filestore: + sharedFilesystem: + storageClassName: "standard-rwx" + premium-filestore: + sharedFilesystem: + storageClassName: "premium-rwx" \ No newline at end of file diff --git a/charts/kubeai/values.yaml b/charts/kubeai/values.yaml index 88ab0ca4..d626f83c 100644 --- a/charts/kubeai/values.yaml +++ b/charts/kubeai/values.yaml @@ -32,6 +32,11 @@ modelServers: images: default: "michaelf34/infinity:latest" +modelLoaders: + huggingface: + # TODO: Update image to the one built with GH Actions. + image: "us-central1-docker.pkg.dev/substratus-dev/default/huggingface-model-downloader:v0.0.1" + modelServerPods: # Security Context for the model pods # Needed for OpenShift @@ -100,6 +105,8 @@ resourceProfiles: value: "present" effect: "NoSchedule" +cacheProfiles: {} + modelAutoscaling: # Interval that the autoscaler will scrape model server metrics. # and calculate the desired number of replicas. diff --git a/charts/models/templates/models.yaml b/charts/models/templates/models.yaml index a43614a0..e63e0562 100644 --- a/charts/models/templates/models.yaml +++ b/charts/models/templates/models.yaml @@ -35,5 +35,8 @@ spec: {{- with $model.resourceProfile }} resourceProfile: {{ . }} {{- end}} + {{- with $model.cacheProfile }} + cacheProfile: {{ . }} + {{- end}} {{- end}} {{- end}} \ No newline at end of file diff --git a/components/huggingface-model-loader/Dockerfile b/components/huggingface-model-loader/Dockerfile new file mode 100644 index 00000000..1602ce94 --- /dev/null +++ b/components/huggingface-model-loader/Dockerfile @@ -0,0 +1,17 @@ +FROM python:3.10-slim + +# Set environment variables +ENV PYTHONUNBUFFERED=1 + +# Set the working directory in the container +WORKDIR /app + +# Install Hugging Face CLI tool and other necessary dependencies +RUN pip install --no-cache-dir huggingface_hub + +RUN apt-get update && apt-get install -y git && rm -rf /var/lib/apt/lists/* + +COPY download.sh /app/download.sh +RUN chmod +x /app/download.sh + +CMD ["/app/download.sh"] diff --git a/components/huggingface-model-loader/download.sh b/components/huggingface-model-loader/download.sh new file mode 100755 index 00000000..f11f4835 --- /dev/null +++ b/components/huggingface-model-loader/download.sh @@ -0,0 +1,6 @@ +#!/bin/bash + +set -euxo pipefail + +huggingface-cli download --local-dir $MODEL_DIR $MODEL_REPO +rm -rf $MODEL_DIR/.cache diff --git a/docs/concepts/storage-caching.md b/docs/concepts/storage-caching.md index 257a9066..4f2c3d08 100644 --- a/docs/concepts/storage-caching.md +++ b/docs/concepts/storage-caching.md @@ -28,9 +28,10 @@ Building a model into a container image can provide a simple way to take advanta ## B. Model on shared filesystem (read-write-many) -**Status:** [Planned](https://github.com/substratusai/kubeai/blob/main/proposals/model-storage.md). +KubeAI can manage model caches on a shared filesystem (i.e. AWS [EFS](https://aws.amazon.com/efs/), GCP [Filestore](https://cloud.google.com/filestore/docs/overview), NFS). It manages the full lifecycle of a cached model: loading, serving, and cache eviction (on deletion of the Model). -Examples: [AWS EFS](https://aws.amazon.com/efs/) +
+ ## C. Model on read-only-many disk diff --git a/docs/contributing/development-environment.md b/docs/contributing/development-environment.md index cd3a1f81..0eec3c9a 100644 --- a/docs/contributing/development-environment.md +++ b/docs/contributing/development-environment.md @@ -46,6 +46,7 @@ helm upgrade --install kubeai ./charts/kubeai \ # OPTION B # # For quick local interation (run KubeAI outside of cluster) +kubectl create cm kubeai-autoscaler-state -oyaml --dry-run=client | kubectl apply -f - CONFIG_PATH=./hack/dev-config.yaml POD_NAMESPACE=default go run ./cmd/main.go # In another terminal: diff --git a/docs/diagrams/arch.excalidraw.png b/docs/diagrams/arch.excalidraw.png index b7163a4d..d101c4a6 100644 Binary files a/docs/diagrams/arch.excalidraw.png and b/docs/diagrams/arch.excalidraw.png differ diff --git a/docs/diagrams/caching-shared-filesystem.excalidraw.png b/docs/diagrams/caching-shared-filesystem.excalidraw.png new file mode 100644 index 00000000..0dd618f3 Binary files /dev/null and b/docs/diagrams/caching-shared-filesystem.excalidraw.png differ diff --git a/docs/how-to/build-models-into-containers.md b/docs/how-to/build-models-into-containers.md index 38b42767..d7e4aa19 100644 --- a/docs/how-to/build-models-into-containers.md +++ b/docs/how-to/build-models-into-containers.md @@ -14,7 +14,7 @@ Build and push image. Note: building (downloading base image & model) and pushin ```bash git clone https://github.com/substratusai/kubeai -cd ./kubeai/images/ollama-builtin +cd ./kubeai/examples/ollama-builtin docker build --build-arg MODEL_URL=$MODEL_URL -t $IMAGE . docker push $IMAGE diff --git a/docs/how-to/cache-models-with-gcp-filestore.md b/docs/how-to/cache-models-with-gcp-filestore.md new file mode 100644 index 00000000..95b57052 --- /dev/null +++ b/docs/how-to/cache-models-with-gcp-filestore.md @@ -0,0 +1,99 @@ +# Cache models with GCP Filestore + +KubeAI can manage model caches. GCP Filestore is supported as a pluggable backend store. + +
+ + +Follow the [GKE install guide](../installation/gke.md). + +Ensure that the Filestore API is enabled. + +```bash +gcloud services enable file.googleapis.com +``` + +Apply a Model with the cache profile set to `standard-filestore` (defined in the reference [GKE Helm values file](https://github.com/substratusai/kubeai/blob/main/charts/kubeai/values-gke.yaml)). + +
+TIP: If you want to use `premium-filestore` you will need to ensure you have quota. +Open the cloud console quotas page: https://console.cloud.google.com/iam-admin/quotas. Make sure your project is selected in the top left. + +Ensure that you have at least 2.5Tb of `PremiumStorageGbPerRegion` quota in the region where your cluster is deployed. + +![Premium Storage Quota Screenshot](../screenshots/gcp-quota-premium-storage-gb-per-region) + +
+
+ +NOTE: If you already installed the models chart, you will need to edit you values file and run `helm upgrade`. + +```bash +helm install kubeai-models kubeai/models -f - < +Example: Out-of-quota error +``` + Warning ProvisioningFailed 11m (x26 over 21m) filestore.csi.storage.gke.io_gke-50826743a27a4d52bf5b-7fac-9607-vm_b4bdb2ec-b58b-4363-adec-15c270a14066 failed to provision volume with StorageClass "premium-rwx": rpc error: code = ResourceExhausted desc = googleapi: Error 429: Quota limit 'PremiumStorageGbPerRegion' has been exceeded. Limit: 0 in region us-central1. +Details: +[ + { + "@type": "type.googleapis.com/google.rpc.QuotaFailure", + "violations": [ + { + "description": "Quota 'PremiumStorageGbPerRegion' exhausted. Limit 0 in region us-central1", + "subject": "project:819220466562" + } + ] + } +] +``` + + +Check to see if the PersistentVolume has been fully provisioned. + +```bash +kubectl get pv +# Find name of corresponding pv... +kubectl describe pv +``` + +### Model Loading Job + +Check to see if there is an ongoing model loader Job. + +```bash +kubectl get jobs +``` \ No newline at end of file diff --git a/docs/how-to/configure-autoscaling.md b/docs/how-to/configure-autoscaling.md index f76021f3..f559b52b 100644 --- a/docs/how-to/configure-autoscaling.md +++ b/docs/how-to/configure-autoscaling.md @@ -1,6 +1,6 @@ # Configure autoscaling -This guide with cover how to configure KubeAI [autoscaling](../concepts/autoscaling.md) parameters. +This guide will cover how to configure KubeAI [autoscaling](../concepts/autoscaling.md) parameters. ## System Settings diff --git a/docs/screenshots/gcp-quota-premium-storage-gb-per-region.png b/docs/screenshots/gcp-quota-premium-storage-gb-per-region.png new file mode 100644 index 00000000..1e78ab9e Binary files /dev/null and b/docs/screenshots/gcp-quota-premium-storage-gb-per-region.png differ diff --git a/images/ollama-builtin/Dockerfile b/examples/ollama-builtin/Dockerfile similarity index 100% rename from images/ollama-builtin/Dockerfile rename to examples/ollama-builtin/Dockerfile diff --git a/images/ollama-builtin/download.sh b/examples/ollama-builtin/download.sh similarity index 100% rename from images/ollama-builtin/download.sh rename to examples/ollama-builtin/download.sh diff --git a/examples/storage-classes/gcp-filestore.yaml b/examples/storage-classes/gcp-filestore.yaml new file mode 100644 index 00000000..6c664b89 --- /dev/null +++ b/examples/storage-classes/gcp-filestore.yaml @@ -0,0 +1,10 @@ +apiVersion: storage.k8s.io/v1 +kind: StorageClass +metadata: + name: gcp-filestore +provisioner: filestore.csi.storage.gke.io +volumeBindingMode: Immediate +allowVolumeExpansion: true +parameters: + tier: standard + network: default \ No newline at end of file diff --git a/hack/dev-config.yaml b/hack/dev-config.yaml index 6127388d..841f41c6 100644 --- a/hack/dev-config.yaml +++ b/hack/dev-config.yaml @@ -1,14 +1,31 @@ secretNames: huggingface: huggingface + modelServers: - vLLM: + VLLM: images: - default: "vllm/vllm-openai:latest" - cpu: "us-central1-docker.pkg.dev/substratus-dev/default/vllm-cpu:v0.5.4-118-gfc93e561" - ollama: + # The key is the image name (referenced from resourceProfiles) and the value is the image. + # The "default" image should always be specified. + # "default" is used when no imageName is specified or if a specific image is not found. + default: "vllm/vllm-openai:v0.6.2" + cpu: "substratusai/vllm:v0.6.1-cpu" + nvidia-gpu: "vllm/vllm-openai:v0.6.2" + google-tpu: "substratusai/vllm:v0.6.1-tpu" + OLlama: images: default: "ollama/ollama:latest" - cpu: "ollama/ollama:0.3.8" + FasterWhisper: + images: + default: "fedirz/faster-whisper-server:latest-cpu" + nvidia-gpu: "fedirz/faster-whisper-server:latest-cuda" + Infinity: + images: + default: "michaelf34/infinity:latest" + +modelDownloaders: + huggingface: + image: "us-central1-docker.pkg.dev/substratus-dev/default/huggingface-model-downloader:v0.0.1" + modelRollouts: surge: 0 messaging: @@ -19,9 +36,17 @@ messaging: # maxHandlers: 1 resourceProfiles: cpu: + imageName: "cpu" requests: - cpu: 0.5 - memory: 1Gi + # Kind + #cpu: 0.5 + #memory: 1Gi + # GKE + cpu: 3 + memory: 12Gi + limits: + cpu: 3 + memory: 12Gi nvidia-gpu-l4: limits: nvidia.com/gpu: "1" @@ -30,6 +55,17 @@ resourceProfiles: cpu: "6" memory: "24Gi" +cacheProfiles: + fstore: + sharedFilesystem: + #storageClassName: "kubeai-filestore" + persistentVolumeName: "preprov1" + # Dev-only configuration. allowPodAddressOverride: true -fixedSelfMetricAddrs: ["127.0.0.1:"] \ No newline at end of file +fixedSelfMetricAddrs: ["127.0.0.1:"] + +modelAutoscaling: + interval: 10s + timeWindow: 60s + stateConfigMapName: kubeai-autoscaler-state \ No newline at end of file diff --git a/hack/dev-model.yaml b/hack/dev-model.yaml index a31b1335..8a29d593 100644 --- a/hack/dev-model.yaml +++ b/hack/dev-model.yaml @@ -10,11 +10,18 @@ metadata: spec: features: ["TextGeneration"] owner: alibaba - url: "ollama://qwen2:0.5b" - engine: OLlama + #url: "ollama://qwen2:0.5b" + #engine: OLlama + url: hf://facebook/opt-125m + engine: VLLM resourceProfile: cpu:1 + cacheProfile: fstore minReplicas: 1 maxReplicas: 3 + #url: hf://meta-llama/Meta-Llama-3.1-8B-Instruct + #args: + # - --max-model-len=32768 + # - --max-num-batched-token=32768 --- # Service for port-fowarding to the model: # diff --git a/hack/pvs/preprov-filestore.yaml b/hack/pvs/preprov-filestore.yaml new file mode 100644 index 00000000..0b39ecda --- /dev/null +++ b/hack/pvs/preprov-filestore.yaml @@ -0,0 +1,19 @@ +apiVersion: v1 +kind: PersistentVolume +metadata: + name: preprov1 +spec: + storageClassName: "" + capacity: + storage: 1Ti + accessModes: + - ReadWriteMany + persistentVolumeReclaimPolicy: Retain + volumeMode: Filesystem + csi: + driver: filestore.csi.storage.gke.io + volumeHandle: "modeInstance/us-central1-f/preprov1/vol1" + volumeAttributes: + # Replace with IP from created Filestore instance: + ip: "10.100.234.50" + volume: vol1 \ No newline at end of file diff --git a/hack/volume-debug-pod.yaml b/hack/volume-debug-pod.yaml new file mode 100644 index 00000000..248024b3 --- /dev/null +++ b/hack/volume-debug-pod.yaml @@ -0,0 +1,16 @@ +apiVersion: v1 +kind: Pod +metadata: + name: volume-debug-pod +spec: + containers: + - name: main + image: ubuntu + command: ["sleep", "10000"] + volumeMounts: + - name: models + mountPath: /my-mnt + volumes: + - name: models + persistentVolumeClaim: + claimName: shared-model-cache-fstore \ No newline at end of file diff --git a/internal/config/system.go b/internal/config/system.go index f50a4bce..4d1bc51b 100644 --- a/internal/config/system.go +++ b/internal/config/system.go @@ -15,8 +15,12 @@ type System struct { ModelServers ModelServers `json:"modelServers" validate:"required"` + ModelLoaders ModelLoaders `json:"modelLoaders" validate:"required"` + ResourceProfiles map[string]ResourceProfile `json:"resourceProfiles" validate:"required"` + CacheProfiles map[string]CacheProfile `json:"cacheProfiles"` + Messaging Messaging `json:"messaging"` // MetricsAddr is the address the metric endpoint binds to. @@ -73,6 +77,10 @@ func (s *System) DefaultAndValidate() error { s.LeaderElection.RetryPeriod.Duration = 2 * time.Second } + if s.CacheProfiles == nil { + s.CacheProfiles = map[string]CacheProfile{} + } + return validator.New(validator.WithRequiredStructEnabled()).Struct(s) } @@ -187,6 +195,18 @@ type ResourceProfile struct { RuntimeClassName *string `json:"runtimeClassName,omitempty"` } +type CacheProfile struct { + SharedFilesystem *CacheSharedFilesystem `json:"sharedFilesystem,omitempty"` +} + +type CacheSharedFilesystem struct { + // StorageClassName is the name of the StorageClass to use for the shared filesystem. + StorageClassName string `json:"storageClassName,omitempty" validate:"required_without=PersistentVolumeName"` + // PersistentVolumeName is the name of the PersistentVolume to use for the shared filesystem. + // This is usually used if you have an existing filesystem that you want to use. + PersistentVolumeName string `json:"persistentVolumeName,omitempty" validate:"required_without=StorageClassName"` +} + type MessageStream struct { RequestsURL string `json:"requestsURL"` ResponsesURL string `json:"responsesURL"` @@ -206,6 +226,15 @@ type ModelServer struct { Images map[string]string `json:"images"` } +type ModelLoaders struct { + Huggingface ModelLoader `json:"huggingface" validate:"required"` +} + +type ModelLoader struct { + // Image is the image to use for the downloader. + Image string `json:"image" validate:"required"` +} + type ModelServerPods struct { // The service account to use for all model pods ModelServiceAccountName string `json:"serviceAccountName,omitempty"` diff --git a/internal/k8sutils/jobs.go b/internal/k8sutils/jobs.go new file mode 100644 index 00000000..f5403686 --- /dev/null +++ b/internal/k8sutils/jobs.go @@ -0,0 +1,15 @@ +package k8sutils + +import ( + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" +) + +func IsJobCompleted(job *batchv1.Job) bool { + for _, cond := range job.Status.Conditions { + if cond.Type == batchv1.JobComplete && cond.Status == corev1.ConditionTrue { + return true + } + } + return false +} diff --git a/internal/manager/run.go b/internal/manager/run.go index ba3f12f9..1dacfd00 100644 --- a/internal/manager/run.go +++ b/internal/manager/run.go @@ -202,8 +202,10 @@ func Run(ctx context.Context, k8sCfg *rest.Config, cfg config.System) error { AllowPodAddressOverride: cfg.AllowPodAddressOverride, HuggingfaceSecretName: cfg.SecretNames.Huggingface, ResourceProfiles: cfg.ResourceProfiles, + CacheProfiles: cfg.CacheProfiles, ModelServers: cfg.ModelServers, ModelServerPods: cfg.ModelServerPods, + ModelLoaders: cfg.ModelLoaders, ModelRollouts: cfg.ModelRollouts, } if err = modelReconciler.SetupWithManager(mgr); err != nil { diff --git a/internal/modelcontroller/cache.go b/internal/modelcontroller/cache.go new file mode 100644 index 00000000..5c450467 --- /dev/null +++ b/internal/modelcontroller/cache.go @@ -0,0 +1,468 @@ +package modelcontroller + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" + + kubeaiv1 "github.com/substratusai/kubeai/api/v1" + "github.com/substratusai/kubeai/internal/k8sutils" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/ptr" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" +) + +type PVCModelAnnotationValue struct { + UID string `json:"uid"` + Timestamp time.Time `json:"timestamp"` +} + +func (r *ModelReconciler) reconcileCache(ctx context.Context, model *kubeaiv1.Model, cfg ModelConfig) (ctrl.Result, error) { + if model.Status.Cache == nil { + model.Status.Cache = &kubeaiv1.ModelStatusCache{} + } + + modelDeleted := model.DeletionTimestamp != nil + + pvc := &corev1.PersistentVolumeClaim{} + var pvcExists bool + if err := r.Client.Get(ctx, types.NamespacedName{ + Namespace: model.Namespace, + Name: cachePVCName(model, cfg), + }, pvc); err != nil { + if apierrors.IsNotFound(err) { + pvcExists = false + } else { + return ctrl.Result{}, fmt.Errorf("getting cache PVC: %w", err) + } + } else { + pvcExists = true + } + + // Create PVC if not exists. + if !pvcExists { + if !modelDeleted { + pvc = r.cachePVCForModel(model, cfg) + // TODO: Set controller reference on PVC for 1:1 Model to PVC situations + // such as Google Hyperdisk ML. + //if err := controllerutil.SetControllerReference(model, pvc, r.Scheme); err != nil { + // return ctrl.Result{}, fmt.Errorf("setting controller reference on pvc: %w", err) + //} + if err := r.Create(ctx, pvc); err != nil { + return ctrl.Result{}, fmt.Errorf("creating cache PVC: %w", err) + } + } + } + + // Caches that are shared across multiple Models require model-specific cleanup. + if cfg.CacheProfile.SharedFilesystem != nil { + if controllerutil.AddFinalizer(model, kubeaiv1.ModelCacheEvictionFinalizer) { + if err := r.Update(ctx, model); err != nil { + return ctrl.Result{}, fmt.Errorf("adding cache deletion finalizer: %w", err) + } + } + + } + // NOTE: .Spec.CacheProfile and .Spec.URL are immutable, so we don't need to check if they + // have changed in order to evict a stale cache. + + loadJob := &batchv1.Job{} + var jobExists bool + if err := r.Client.Get(ctx, types.NamespacedName{ + Namespace: model.Namespace, + Name: loadCacheJobName(model), + }, loadJob); err != nil { + if apierrors.IsNotFound(err) { + jobExists = false + } else { + return ctrl.Result{}, fmt.Errorf("getting cache job: %w", err) + } + } else { + jobExists = true + } + + pvcModelAnn, err := parsePVCModelAnnotation(pvc, model.Name) + if err != nil { + return ctrl.Result{}, fmt.Errorf("parsing pvc model annotation: %w", err) + } + + // Run Job to populate PVC if not already downloaded. + if pvcModelAnn.UID != string(model.UID) { + // Ensure the download job exists. + if !jobExists { + loadJob = r.loadCacheJobForModel(model, cfg) + if err := ctrl.SetControllerReference(model, loadJob, r.Scheme); err != nil { + return ctrl.Result{}, fmt.Errorf("setting controller reference on job: %w", err) + } + if err := r.Create(ctx, loadJob); err != nil { + return ctrl.Result{}, fmt.Errorf("creating job: %w", err) + } + return ctrl.Result{}, errReturnEarly + } + + if !k8sutils.IsJobCompleted(loadJob) { + return ctrl.Result{}, errReturnEarly + } + if err := r.updatePVCModelAnnotation(ctx, pvc, model.Name, PVCModelAnnotationValue{ + UID: string(model.UID), + Timestamp: time.Now(), + }); err != nil { + return ctrl.Result{}, fmt.Errorf("setting pvc model annotation: %w", err) + } + } + model.Status.Cache.Loaded = pvcModelAnn.UID == string(model.UID) + + if jobExists { + // Cache loading completed, delete Job to avoid accumulating a mess of completed Jobs. + // Use foreground deletion policy to ensure the Pods are deleted as well. + if err := r.Delete(ctx, loadJob, client.PropagationPolicy(metav1.DeletePropagationForeground)); err != nil { + return ctrl.Result{}, fmt.Errorf("deleting job: %w", err) + } + } + + return ctrl.Result{}, nil +} + +func (r *ModelReconciler) finalizeCache(ctx context.Context, model *kubeaiv1.Model, cfg ModelConfig) error { + pvc := &corev1.PersistentVolumeClaim{} + var pvcExists bool + if err := r.Client.Get(ctx, types.NamespacedName{ + Namespace: model.Namespace, + Name: cachePVCName(model, cfg), + }, pvc); err != nil { + if !apierrors.IsNotFound(err) { + return fmt.Errorf("getting cache PVC: %w", err) + } + } else { + pvcExists = true + } + + if !pvcExists || pvc.DeletionTimestamp != nil { + // If the PVC is not found or is already being deleted, delete all cache jobs and pods. + // No need trying to update the PVC annotations or perform other cleanup. + if err := r.deleteAllCacheJobsAndPods(ctx, model); err != nil { + return fmt.Errorf("deleting all cache jobs and pods: %w", err) + } + if controllerutil.RemoveFinalizer(model, kubeaiv1.ModelCacheEvictionFinalizer) { + if err := r.Update(ctx, model); err != nil { + return fmt.Errorf("removing cache deletion finalizer: %w", err) + } + } + return nil + } + + if controllerutil.ContainsFinalizer(model, kubeaiv1.ModelCacheEvictionFinalizer) { + evictJob := &batchv1.Job{} + var jobExists bool + if err := r.Client.Get(ctx, types.NamespacedName{ + Namespace: model.Namespace, + Name: evictCacheJobName(model), + }, evictJob); err != nil { + if apierrors.IsNotFound(err) { + jobExists = false + } else { + return fmt.Errorf("getting cache deletion job: %w", err) + } + } else { + jobExists = true + } + + if !jobExists { + job := r.evictCacheJobForModel(model, cfg) + if err := ctrl.SetControllerReference(model, job, r.Scheme); err != nil { + return fmt.Errorf("setting controller reference on cache deletion job: %w", err) + } + if err := r.Create(ctx, job); err != nil { + return fmt.Errorf("creating cache deletion job: %w", err) + } + return errReturnEarly + } else { + // Wait for the Job to complete. + if !k8sutils.IsJobCompleted(evictJob) { + return errReturnEarly + } + + // Delete the Model from the PVC annotation. + if pvc.Annotations != nil { + if _, ok := pvc.Annotations[kubeaiv1.PVCModelAnnotation(model.Name)]; ok { + delete(pvc.Annotations, kubeaiv1.PVCModelAnnotation(model.Name)) + if err := r.Update(ctx, pvc); err != nil { + return fmt.Errorf("updating PVC, removing cache annotation: %w", err) + } + } + } + } + + controllerutil.RemoveFinalizer(model, kubeaiv1.ModelCacheEvictionFinalizer) + if err := r.Update(ctx, model); err != nil { + return fmt.Errorf("removing cache deletion finalizer: %w", err) + } + } + + if err := r.deleteAllCacheJobsAndPods(ctx, model); err != nil { + return fmt.Errorf("deleting all cache jobs and pods: %w", err) + } + + return nil +} + +func (r *ModelReconciler) deleteAllCacheJobsAndPods(ctx context.Context, model *kubeaiv1.Model) error { + jobNames := []string{ + loadCacheJobName(model), + evictCacheJobName(model), + } + + for _, jobName := range jobNames { + if err := r.Delete(ctx, &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: model.Namespace, + Name: jobName, + }, + }); err != nil { + if !apierrors.IsNotFound(err) { + return fmt.Errorf("deleting job %q: %w", jobName, err) + } + } + + // NOTE: There are different conditions in which Pods might not be deleted by the Job controller + // after a Job is deleted. + if err := r.DeleteAllOf(ctx, &corev1.Pod{}, client.InNamespace(model.Namespace), client.MatchingLabels{ + batchv1.JobNameLabel: jobName, + }); err != nil { + if !apierrors.IsNotFound(err) { + return fmt.Errorf("deleting pods for job %q: %w", jobName, err) + } + } + } + + return nil +} + +func parsePVCModelAnnotation(pvc *corev1.PersistentVolumeClaim, modelName string) (PVCModelAnnotationValue, error) { + pvcModelStatusJSON := k8sutils.GetAnnotation(pvc, kubeaiv1.PVCModelAnnotation(modelName)) + if pvcModelStatusJSON == "" { + return PVCModelAnnotationValue{}, nil + } + var status PVCModelAnnotationValue + if err := json.Unmarshal([]byte(pvcModelStatusJSON), &status); err != nil { + return PVCModelAnnotationValue{}, fmt.Errorf("unmarshalling pvc model status: %w", err) + } + return status, nil +} + +func (r *ModelReconciler) updatePVCModelAnnotation(ctx context.Context, pvc *corev1.PersistentVolumeClaim, modelName string, status PVCModelAnnotationValue) error { + statusJSON, err := json.Marshal(status) + if err != nil { + return fmt.Errorf("marshalling pvc model status: %w", err) + } + k8sutils.SetAnnotation(pvc, kubeaiv1.PVCModelAnnotation(modelName), string(statusJSON)) + if err := r.Client.Update(ctx, pvc); err != nil { + return fmt.Errorf("updating pvc: %w", err) + } + return nil +} + +func (r *ModelReconciler) cachePVCForModel(m *kubeaiv1.Model, c ModelConfig) *corev1.PersistentVolumeClaim { + pvc := corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: cachePVCName(m, c), + Namespace: m.Namespace, + }, + Spec: corev1.PersistentVolumeClaimSpec{}, + } + switch { + case c.CacheProfile.SharedFilesystem != nil: + pvc.Spec.AccessModes = []corev1.PersistentVolumeAccessMode{corev1.ReadWriteMany} + storageClassName := c.CacheProfile.SharedFilesystem.StorageClassName + pvc.Spec.StorageClassName = &storageClassName + pvc.Spec.VolumeName = c.CacheProfile.SharedFilesystem.PersistentVolumeName + pvc.Spec.Resources.Requests = corev1.ResourceList{ + // https://discuss.huggingface.co/t/how-to-get-model-size/11038/7 + corev1.ResourceStorage: resource.MustParse("10Gi"), + } + default: + panic("unsupported cache profile, this point should not be reached") + } + return &pvc +} + +func cachePVCName(m *kubeaiv1.Model, c ModelConfig) string { + switch { + case c.CacheProfile.SharedFilesystem != nil: + // One PVC for all models. + return fmt.Sprintf("shared-model-cache-%s", m.Spec.CacheProfile) + default: + // One PVC per model. + return fmt.Sprintf("model-cache-%s-%s", m.Name, m.UID[0:7]) + } +} + +func (r *ModelReconciler) loadCacheJobForModel(m *kubeaiv1.Model, c ModelConfig) *batchv1.Job { + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: loadCacheJobName(m), + Namespace: m.Namespace, + }, + Spec: batchv1.JobSpec{ + TTLSecondsAfterFinished: ptr.To[int32](60), + Parallelism: ptr.To[int32](1), + Completions: ptr.To[int32](1), + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + RestartPolicy: corev1.RestartPolicyOnFailure, + Containers: []corev1.Container{ + { + Name: "loader", + VolumeMounts: []corev1.VolumeMount{ + { + Name: "model", + MountPath: modelCacheDir(m), + SubPath: strings.TrimPrefix(modelCacheDir(m), "/"), + }, + }, + }, + }, + Volumes: []corev1.Volume{ + { + Name: "model", + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: cachePVCName(m, c), + }, + }, + }, + }, + }, + }, + }, + } + + switch c.Source.typ { + case modelSourceTypeHuggingface: + job.Spec.Template.Spec.Containers[0].Image = r.ModelLoaders.Huggingface.Image + job.Spec.Template.Spec.Containers[0].Env = append(job.Spec.Template.Spec.Containers[0].Env, + corev1.EnvVar{ + Name: "MODEL_DIR", + Value: modelCacheDir(m), + }, + corev1.EnvVar{ + Name: "MODEL_REPO", + Value: c.Source.huggingface.repo, + }, + corev1.EnvVar{ + Name: "HF_TOKEN", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: r.HuggingfaceSecretName, + }, + Key: "token", + Optional: ptr.To(true), + }, + }, + }, + ) + default: + panic("unsupported model source, this point should not be reached") + } + + return job +} + +func (r *ModelReconciler) evictCacheJobForModel(m *kubeaiv1.Model, c ModelConfig) *batchv1.Job { + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: evictCacheJobName(m), + Namespace: m.Namespace, + }, + Spec: batchv1.JobSpec{ + TTLSecondsAfterFinished: ptr.To[int32](60), + Parallelism: ptr.To[int32](1), + Completions: ptr.To[int32](1), + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + RestartPolicy: corev1.RestartPolicyOnFailure, + Containers: []corev1.Container{ + { + Name: "evictor", + VolumeMounts: []corev1.VolumeMount{ + { + Name: "model", + MountPath: "/models", + SubPath: "models", + }, + }, + }, + }, + Volumes: []corev1.Volume{ + { + Name: "model", + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: cachePVCName(m, c), + }, + }, + }, + }, + }, + }, + }, + } + + if c.CacheProfile.SharedFilesystem != nil { + switch c.Source.typ { + case modelSourceTypeHuggingface: + job.Spec.Template.Spec.Containers[0].Image = r.ModelLoaders.Huggingface.Image + job.Spec.Template.Spec.Containers[0].Command = []string{"bash", "-c", "rm -rf " + modelCacheDir(m)} + default: + panic("unsupported model source, this point should not be reached") + } + } + + return job +} + +func modelCacheDir(m *kubeaiv1.Model) string { + return fmt.Sprintf("/models/%s-%s", m.Name, m.UID) +} + +func loadCacheJobName(m *kubeaiv1.Model) string { + return fmt.Sprintf("load-cache-%s", m.Name) +} + +func evictCacheJobName(m *kubeaiv1.Model) string { + return fmt.Sprintf("evict-cache-%s", m.Name) +} + +func patchServerCacheVolumes(podSpec *corev1.PodSpec, m *kubeaiv1.Model, c ModelConfig) { + if m.Spec.CacheProfile == "" { + return + } + podSpec.Volumes = append(podSpec.Volumes, corev1.Volume{ + Name: "models", + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: cachePVCName(m, c), + }, + }, + }) + for i := range podSpec.Containers { + if podSpec.Containers[i].Name == "server" { + podSpec.Containers[i].VolumeMounts = append(podSpec.Containers[i].VolumeMounts, corev1.VolumeMount{ + Name: "models", + MountPath: modelCacheDir(m), + SubPath: strings.TrimPrefix(modelCacheDir(m), "/"), + ReadOnly: true, + }) + } + } +} diff --git a/internal/modelcontroller/engine_fasterwhisper.go b/internal/modelcontroller/engine_fasterwhisper.go new file mode 100644 index 00000000..159ebccb --- /dev/null +++ b/internal/modelcontroller/engine_fasterwhisper.go @@ -0,0 +1,156 @@ +package modelcontroller + +import ( + "sort" + + kubeaiv1 "github.com/substratusai/kubeai/api/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/utils/ptr" +) + +func (r *ModelReconciler) fasterWhisperPodForModel(m *kubeaiv1.Model, c ModelConfig) *corev1.Pod { + lbs := labelsForModel(m) + ann := r.annotationsForModel(m) + if _, ok := ann[kubeaiv1.ModelPodPortAnnotation]; !ok { + ann[kubeaiv1.ModelPodPortAnnotation] = "8000" + } + + args := []string{} + args = append(args, m.Spec.Args...) + + whisperModel := c.Source.huggingface.repo + if m.Spec.CacheProfile != "" { + whisperModel = modelCacheDir(m) + } + + env := []corev1.EnvVar{ + { + Name: "WHISPER__MODEL", + Value: whisperModel, + }, + { + Name: "ENABLE_UI", + Value: "false", + }, + { + // TODO: Conditionally set this token based on whether + // huggingface is the model source. + Name: "HF_TOKEN", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: r.HuggingfaceSecretName, + }, + Key: "token", + Optional: ptr.To(true), + }, + }, + }, + } + var envKeys []string + for key := range m.Spec.Env { + envKeys = append(envKeys, key) + } + sort.Strings(envKeys) + for _, key := range envKeys { + env = append(env, corev1.EnvVar{ + Name: key, + Value: m.Spec.Env[key], + }) + } + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: m.Namespace, + Labels: lbs, + Annotations: ann, + }, + Spec: corev1.PodSpec{ + NodeSelector: c.NodeSelector, + Affinity: c.Affinity, + Tolerations: c.Tolerations, + RuntimeClassName: c.RuntimeClassName, + ServiceAccountName: r.ModelServerPods.ModelServiceAccountName, + SecurityContext: r.ModelServerPods.ModelPodSecurityContext, + Containers: []corev1.Container{ + { + Name: serverContainerName, + Image: c.Image, + Args: args, + Env: env, + SecurityContext: r.ModelServerPods.ModelContainerSecurityContext, + Resources: corev1.ResourceRequirements{ + Requests: c.Requests, + Limits: c.Limits, + }, + Ports: []corev1.ContainerPort{ + { + ContainerPort: 8000, + Protocol: corev1.ProtocolTCP, + Name: "http", + }, + }, + StartupProbe: &corev1.Probe{ + // Give the model 30 minutes to start up. + FailureThreshold: 900, + PeriodSeconds: 2, + TimeoutSeconds: 2, + SuccessThreshold: 1, + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/health", + Port: intstr.FromString("http"), + }, + }, + }, + ReadinessProbe: &corev1.Probe{ + FailureThreshold: 3, + PeriodSeconds: 10, + TimeoutSeconds: 2, + SuccessThreshold: 1, + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/health", + Port: intstr.FromString("http"), + }, + }, + }, + LivenessProbe: &corev1.Probe{ + FailureThreshold: 3, + PeriodSeconds: 30, + TimeoutSeconds: 3, + SuccessThreshold: 1, + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/health", + Port: intstr.FromString("http"), + }, + }, + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "dshm", + MountPath: "/dev/shm", + }, + }, + }, + }, + Volumes: []corev1.Volume{ + { + Name: "dshm", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{ + Medium: corev1.StorageMediumMemory, + }, + }, + }, + }, + }, + } + + patchServerCacheVolumes(&pod.Spec, m, c) + + return pod +} diff --git a/internal/modelcontroller/engine_infinity.go b/internal/modelcontroller/engine_infinity.go new file mode 100644 index 00000000..a5069a47 --- /dev/null +++ b/internal/modelcontroller/engine_infinity.go @@ -0,0 +1,176 @@ +package modelcontroller + +import ( + "sort" + + kubeaiv1 "github.com/substratusai/kubeai/api/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/utils/ptr" +) + +func (r *ModelReconciler) infinityPodForModel(m *kubeaiv1.Model, c ModelConfig) *corev1.Pod { + lbs := labelsForModel(m) + ann := r.annotationsForModel(m) + + args := []string{ + "v2", + } + args = append(args, m.Spec.Args...) + + if _, ok := ann[kubeaiv1.ModelPodPortAnnotation]; !ok { + ann[kubeaiv1.ModelPodPortAnnotation] = "8000" + } + + infinityModelID := c.Source.huggingface.repo + if m.Spec.CacheProfile != "" { + // TODO: Verify loading from dir works. + infinityModelID = modelCacheDir(m) + } + + env := []corev1.EnvVar{ + { + Name: "INFINITY_MODEL_ID", + // TODO: infinity supports multiple models, separate by comma. + Value: infinityModelID, + }, + { + Name: "INFINITY_SERVED_MODEL_NAME", + Value: m.Name, + }, + { + Name: "INFINITY_URL_PREFIX", + Value: "/v1", + }, + { + Name: "INFINITY_ENGINE", + // TODO: switch between optimum backend (cpu), nvidia/amd (torch), inf2 (inferentia) based on what is available. + Value: "torch", + }, + { + Name: "INFINITY_PORT", + Value: "8000", + }, + { + // TODO: Conditionally set this token based on whether + // huggingface is the model source. + Name: "HF_TOKEN", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: r.HuggingfaceSecretName, + }, + Key: "token", + Optional: ptr.To(true), + }, + }, + }, + } + var envKeys []string + for key := range m.Spec.Env { + envKeys = append(envKeys, key) + } + sort.Strings(envKeys) + for _, key := range envKeys { + env = append(env, corev1.EnvVar{ + Name: key, + Value: m.Spec.Env[key], + }) + } + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: m.Namespace, + Labels: lbs, + Annotations: ann, + }, + Spec: corev1.PodSpec{ + NodeSelector: c.NodeSelector, + Affinity: c.Affinity, + Tolerations: c.Tolerations, + RuntimeClassName: c.RuntimeClassName, + ServiceAccountName: r.ModelServerPods.ModelServiceAccountName, + SecurityContext: r.ModelServerPods.ModelPodSecurityContext, + Containers: []corev1.Container{ + { + Name: serverContainerName, + Image: c.Image, + Args: args, + Env: env, + Resources: corev1.ResourceRequirements{ + Requests: c.Requests, + Limits: c.Limits, + }, + + Ports: []corev1.ContainerPort{ + { + ContainerPort: 8000, + Protocol: corev1.ProtocolTCP, + Name: "http", + }, + }, + StartupProbe: &corev1.Probe{ + // TODO: Decrease the default and make it configurable. + // Give the model 20 minutes to start up. + FailureThreshold: 600, + PeriodSeconds: 2, + TimeoutSeconds: 2, + SuccessThreshold: 1, + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/health", + Port: intstr.FromString("http"), + }, + }, + }, + ReadinessProbe: &corev1.Probe{ + FailureThreshold: 3, + PeriodSeconds: 10, + TimeoutSeconds: 2, + SuccessThreshold: 1, + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/health", + Port: intstr.FromString("http"), + }, + }, + }, + LivenessProbe: &corev1.Probe{ + FailureThreshold: 3, + PeriodSeconds: 30, + TimeoutSeconds: 3, + SuccessThreshold: 1, + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/health", + Port: intstr.FromString("http"), + }, + }, + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "dshm", + MountPath: "/dev/shm", + }, + }, + }, + }, + Volumes: []corev1.Volume{ + { + Name: "dshm", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{ + Medium: corev1.StorageMediumMemory, + // TODO: Set size limit + }, + }, + }, + }, + }, + } + + patchServerCacheVolumes(&pod.Spec, m, c) + + return pod +} diff --git a/internal/modelcontroller/engine_ollama.go b/internal/modelcontroller/engine_ollama.go new file mode 100644 index 00000000..64d07a23 --- /dev/null +++ b/internal/modelcontroller/engine_ollama.go @@ -0,0 +1,175 @@ +package modelcontroller + +import ( + "fmt" + "sort" + + kubeaiv1 "github.com/substratusai/kubeai/api/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" +) + +func (r *ModelReconciler) oLlamaPodForModel(m *kubeaiv1.Model, c ModelConfig) *corev1.Pod { + lbs := labelsForModel(m) + ann := r.annotationsForModel(m) + + if _, ok := ann[kubeaiv1.ModelPodPortAnnotation]; !ok { + // Set port to 8000 (vLLM) if not overwritten. + ann[kubeaiv1.ModelPodPortAnnotation] = "8000" + } + + env := []corev1.EnvVar{ + { + Name: "OLLAMA_HOST", + Value: "0.0.0.0:8000", + }, + { + // Ollama server typically operates in a 1:N server-to-model mode so it + // swaps models in and out of memory. In our case we are deploying 1:1 + // model-to-server-pod so we want to always keep the model in memory. + Name: "OLLAMA_KEEP_ALIVE", + // Ollama treates 0 as "no keep alive" so we need to set a large value. + Value: "999999h", + }, + } + var envKeys []string + for key := range m.Spec.Env { + envKeys = append(envKeys, key) + } + sort.Strings(envKeys) + for _, key := range envKeys { + env = append(env, corev1.EnvVar{ + Name: key, + Value: m.Spec.Env[key], + }) + } + + ollamaModelRef := c.Source.ollama.ref + + featuresMap := map[kubeaiv1.ModelFeature]struct{}{} + for _, f := range m.Spec.Features { + featuresMap[f] = struct{}{} + } + + // Pull model and copy to rename it to Model.metadata.name. + // See Ollama issue for rename/copy workaround: https://github.com/ollama/ollama/issues/5914 + // NOTE: The cp command should just create a pointer to the old model, not copy data + // (see https://github.com/ollama/ollama/issues/5914#issuecomment-2248168474). + // Use `ollama run` to send a single prompt to ollama to load the model into memory + // before the Pod becomes Ready. (by default it will load on the first prompt request). + startupProbeScript := fmt.Sprintf("/bin/ollama pull %s && /bin/ollama cp %s %s", + ollamaModelRef, ollamaModelRef, m.Name) + if _, ok := featuresMap[kubeaiv1.ModelFeatureTextGeneration]; ok { + // NOTE: Embedding text models do not support "ollama run": + // + // ollama run nomic-embed-text hey + // Error: "nomic-embed-text" does not support generate + // + startupProbeScript += fmt.Sprintf(" && /bin/ollama run %s hi", m.Name) + } + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: m.Namespace, + Labels: lbs, + Annotations: ann, + }, + Spec: corev1.PodSpec{ + NodeSelector: c.NodeSelector, + Affinity: c.Affinity, + Tolerations: c.Tolerations, + RuntimeClassName: c.RuntimeClassName, + ServiceAccountName: r.ModelServerPods.ModelServiceAccountName, + SecurityContext: r.ModelServerPods.ModelPodSecurityContext, + Containers: []corev1.Container{ + { + Name: serverContainerName, + Image: c.Image, + Args: m.Spec.Args, + Env: env, + SecurityContext: r.ModelServerPods.ModelContainerSecurityContext, + Resources: corev1.ResourceRequirements{ + Requests: c.Requests, + Limits: c.Limits, + }, + Ports: []corev1.ContainerPort{ + { + ContainerPort: 8000, + Protocol: corev1.ProtocolTCP, + Name: "http", + }, + }, + // Use a startup probe to pull the model because ollama server needs + // to be running already (`ollama pull` issues a HTTP request to the server). + // Example log from ollama server when a model is pulled: + // [GIN] 2024/08/20 - 15:12:28 | 200 | 981.561436ms | 127.0.0.1 | POST "/api/pull" + StartupProbe: &corev1.Probe{ + InitialDelaySeconds: 1, + PeriodSeconds: 3, + FailureThreshold: 10, + // Give the model pull 180 minutes to complete. + TimeoutSeconds: 60 * 180, + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{ + Command: []string{ + "bash", "-c", + startupProbeScript, + }, + }, + }, + }, + ReadinessProbe: &corev1.Probe{ + FailureThreshold: 3, + // Will be delayed by the startup probe, so no need to delay here. + InitialDelaySeconds: 0, + PeriodSeconds: 10, + TimeoutSeconds: 2, + SuccessThreshold: 1, + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/", + Port: intstr.FromString("http"), + }, + }, + }, + LivenessProbe: &corev1.Probe{ + FailureThreshold: 3, + InitialDelaySeconds: 900, + TimeoutSeconds: 3, + PeriodSeconds: 30, + SuccessThreshold: 1, + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/", + Port: intstr.FromString("http"), + }, + }, + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "dshm", + MountPath: "/dev/shm", + }, + }, + }, + }, + Volumes: []corev1.Volume{ + { + Name: "dshm", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{ + Medium: corev1.StorageMediumMemory, + // TODO: Set size limit + }, + }, + }, + }, + }, + } + + patchServerCacheVolumes(&pod.Spec, m, c) + + return pod + +} diff --git a/internal/modelcontroller/engine_vllm.go b/internal/modelcontroller/engine_vllm.go new file mode 100644 index 00000000..ed92266e --- /dev/null +++ b/internal/modelcontroller/engine_vllm.go @@ -0,0 +1,155 @@ +package modelcontroller + +import ( + "sort" + + kubeaiv1 "github.com/substratusai/kubeai/api/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/utils/ptr" +) + +func (r *ModelReconciler) vLLMPodForModel(m *kubeaiv1.Model, c ModelConfig) *corev1.Pod { + lbs := labelsForModel(m) + ann := r.annotationsForModel(m) + if _, ok := ann[kubeaiv1.ModelPodPortAnnotation]; !ok { + // Set port to 8000 (vLLM) if not overwritten. + ann[kubeaiv1.ModelPodPortAnnotation] = "8000" + } + + vllmModelFlag := c.Source.huggingface.repo + if m.Spec.CacheProfile != "" { + vllmModelFlag = modelCacheDir(m) + } + + args := []string{ + "--model=" + vllmModelFlag, + "--served-model-name=" + m.Name, + } + args = append(args, m.Spec.Args...) + + env := []corev1.EnvVar{ + { + // TODO: Conditionally set this token based on whether + // huggingface is the model source. + Name: "HF_TOKEN", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: r.HuggingfaceSecretName, + }, + Key: "token", + Optional: ptr.To(true), + }, + }, + }, + } + var envKeys []string + for key := range m.Spec.Env { + envKeys = append(envKeys, key) + } + sort.Strings(envKeys) + for _, key := range envKeys { + env = append(env, corev1.EnvVar{ + Name: key, + Value: m.Spec.Env[key], + }) + } + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: m.Namespace, + Labels: lbs, + Annotations: ann, + }, + Spec: corev1.PodSpec{ + NodeSelector: c.NodeSelector, + Affinity: c.Affinity, + Tolerations: c.Tolerations, + RuntimeClassName: c.RuntimeClassName, + ServiceAccountName: r.ModelServerPods.ModelServiceAccountName, + SecurityContext: r.ModelServerPods.ModelPodSecurityContext, + Containers: []corev1.Container{ + { + Name: serverContainerName, + Image: c.Image, + Command: []string{"python3", "-m", "vllm.entrypoints.openai.api_server"}, + Args: args, + Env: env, + SecurityContext: r.ModelServerPods.ModelContainerSecurityContext, + Resources: corev1.ResourceRequirements{ + Requests: c.Requests, + Limits: c.Limits, + }, + Ports: []corev1.ContainerPort{ + { + ContainerPort: 8000, + Protocol: corev1.ProtocolTCP, + Name: "http", + }, + }, + StartupProbe: &corev1.Probe{ + // TODO: Decrease the default and make it configurable. + // Give the model 3 hours to start up. + FailureThreshold: 5400, + PeriodSeconds: 2, + TimeoutSeconds: 2, + SuccessThreshold: 1, + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/health", + Port: intstr.FromString("http"), + }, + }, + }, + ReadinessProbe: &corev1.Probe{ + FailureThreshold: 3, + PeriodSeconds: 10, + TimeoutSeconds: 2, + SuccessThreshold: 1, + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/health", + Port: intstr.FromString("http"), + }, + }, + }, + LivenessProbe: &corev1.Probe{ + FailureThreshold: 3, + PeriodSeconds: 30, + TimeoutSeconds: 3, + SuccessThreshold: 1, + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/health", + Port: intstr.FromString("http"), + }, + }, + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "dshm", + MountPath: "/dev/shm", + }, + }, + }, + }, + Volumes: []corev1.Volume{ + { + Name: "dshm", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{ + Medium: corev1.StorageMediumMemory, + // TODO: Set size limit + }, + }, + }, + }, + }, + } + + patchServerCacheVolumes(&pod.Spec, m, c) + + return pod +} diff --git a/internal/modelcontroller/model_controller.go b/internal/modelcontroller/model_controller.go index 19b443b6..98c483fe 100644 --- a/internal/modelcontroller/model_controller.go +++ b/internal/modelcontroller/model_controller.go @@ -18,15 +18,17 @@ package modelcontroller import ( "context" + "errors" "fmt" "reflect" - "sort" "strconv" "strings" "time" + batchv1 "k8s.io/api/batch/v1" + + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -35,12 +37,13 @@ import ( kubeaiv1 "github.com/substratusai/kubeai/api/v1" "github.com/substratusai/kubeai/internal/config" "github.com/substratusai/kubeai/internal/k8sutils" - utils "github.com/substratusai/kubeai/internal/k8sutils" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -const modelReconcilerName = "kubeai-model-controller" +const ( + modelReconcilerName = "kubeai-model-controller" + serverContainerName = "server" +) // ModelReconciler reconciles a Model object type ModelReconciler struct { @@ -50,8 +53,10 @@ type ModelReconciler struct { AllowPodAddressOverride bool HuggingfaceSecretName string ResourceProfiles map[string]config.ResourceProfile + CacheProfiles map[string]config.CacheProfile ModelServers config.ModelServers ModelServerPods config.ModelServerPods + ModelLoaders config.ModelLoaders ModelRollouts config.ModelRollouts } @@ -64,7 +69,7 @@ type ModelReconciler struct { //+kubebuilder:rbac:groups="",resources=pods/status,verbs=get;update;patch //+kubebuilder:rbac:groups="",resources=pods/finalizers,verbs=update -func (r *ModelReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { +func (r *ModelReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, resErr error) { log := log.FromContext(ctx) log.Info("Reconciling Model") @@ -75,9 +80,16 @@ func (r *ModelReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl status0 := model.Status.DeepCopy() - var shouldUpdate bool + defer func() { + if !reflect.DeepEqual(status0, model.Status) && model.DeletionTimestamp == nil { + if err := r.Status().Update(ctx, model); err != nil { + resErr = errors.Join(resErr, err) + } + } + }() + // Apply self labels based on features so that we can easily filter models. - shouldUpdate = r.applySelfLabels(model) || shouldUpdate + shouldUpdate := r.applySelfLabels(model) // Apply replica bounds to handle cases where min/max replicas were updated but a scale event was not triggered. if !model.Spec.AutoscalingDisabled { shouldUpdate = r.applyAutoscalingReplicaBounds(model) || shouldUpdate @@ -93,6 +105,42 @@ func (r *ModelReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl return ctrl.Result{}, fmt.Errorf("getting model profile: %w", err) } + if model.DeletionTimestamp != nil { + // Get rid of all Pods for the Model. + // This should help avoid any issues with cache cleanup. + if err := r.DeleteAllOf(ctx, &corev1.Pod{}, client.InNamespace(model.Namespace), client.MatchingLabels{ + kubeaiv1.PodModelLabel: model.Name, + }); err != nil { + if !apierrors.IsNotFound(err) { + return ctrl.Result{}, fmt.Errorf("deleting all pods: %w", err) + } + } + if model.Spec.CacheProfile != "" { + if err := r.finalizeCache(ctx, model, modelConfig); err != nil { + if errors.Is(err, errReturnEarly) { + return ctrl.Result{}, nil + } else { + return ctrl.Result{}, fmt.Errorf("finalizing cache: %w", err) + } + } + } + + return ctrl.Result{}, nil + } + + if model.Spec.CacheProfile != "" { + cacheRes, err := r.reconcileCache(ctx, model, modelConfig) + if err != nil { + if errors.Is(err, errReturnEarly) { + return cacheRes, nil + } + return cacheRes, fmt.Errorf("reconciling cache: %w", err) + } + if !res.IsZero() { + return cacheRes, nil + } + } + allPods := &corev1.PodList{} if err := r.List(ctx, allPods, client.InNamespace(model.Namespace), client.MatchingLabels{ kubeaiv1.PodModelLabel: model.Name, @@ -100,6 +148,16 @@ func (r *ModelReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl return ctrl.Result{}, fmt.Errorf("listing all node pools: %w", err) } + // Summarize all pods. + var readyPods int32 + for _, pod := range allPods.Items { + if k8sutils.PodIsReady(&pod) { + readyPods++ + } + } + model.Status.Replicas.All = int32(len(allPods.Items)) + model.Status.Replicas.Ready = readyPods + plan := r.calculatePodPlan(allPods, model, modelConfig) if plan.containsActions() { changed, err := plan.execute(ctx, r.Client, r.Scheme) @@ -115,23 +173,6 @@ func (r *ModelReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl } } - // Summarize all pods. - var readyPods int32 - for _, pod := range allPods.Items { - if utils.PodIsReady(&pod) { - readyPods++ - } - } - - model.Status.Replicas.All = int32(len(allPods.Items)) - model.Status.Replicas.Ready = readyPods - - if !reflect.DeepEqual(status0, model.Status) { - if err := r.Status().Update(ctx, model); err != nil { - return ctrl.Result{}, fmt.Errorf("updating status: %w", err) - } - } - return ctrl.Result{}, nil } @@ -141,611 +182,12 @@ func (r *ModelReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&kubeaiv1.Model{}). Owns(&corev1.Pod{}). + Owns(&corev1.PersistentVolumeClaim{}). + Owns(&batchv1.Job{}). Complete(r) } -/* -func (r *ModelReconciler) apply(ctx context.Context, model *kubeaiv1.Model, obj client.Object) error { - if err := ctrlutil.SetControllerReference(model, obj, r.Scheme); err != nil { - return fmt.Errorf("setting controller reference: %w", err) - } - return utils.ServerSideApply(ctx, r.Client, obj, modelReconcilerName) -} -*/ - -func (r *ModelReconciler) vLLMPodForModel(m *kubeaiv1.Model, profile ModelConfig) *corev1.Pod { - lbs := labelsForModel(m) - ann := r.annotationsForModel(m) - if _, ok := ann[kubeaiv1.ModelPodPortAnnotation]; !ok { - // Set port to 8000 (vLLM) if not overwritten. - ann[kubeaiv1.ModelPodPortAnnotation] = "8000" - } - - args := []string{ - "--model=" + strings.TrimPrefix(m.Spec.URL, "hf://"), - "--served-model-name=" + m.Name, - } - args = append(args, m.Spec.Args...) - - env := []corev1.EnvVar{ - { - // TODO: Conditionally set this token based on whether - // huggingface is the model source. - Name: "HF_TOKEN", - ValueFrom: &corev1.EnvVarSource{ - SecretKeyRef: &corev1.SecretKeySelector{ - LocalObjectReference: corev1.LocalObjectReference{ - Name: r.HuggingfaceSecretName, - }, - Key: "token", - Optional: ptr.To(true), - }, - }, - }, - } - var envKeys []string - for key := range m.Spec.Env { - envKeys = append(envKeys, key) - } - sort.Strings(envKeys) - for _, key := range envKeys { - env = append(env, corev1.EnvVar{ - Name: key, - Value: m.Spec.Env[key], - }) - } - - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: m.Namespace, - Labels: lbs, - Annotations: ann, - }, - Spec: corev1.PodSpec{ - NodeSelector: profile.NodeSelector, - Affinity: profile.Affinity, - Tolerations: profile.Tolerations, - RuntimeClassName: profile.RuntimeClassName, - ServiceAccountName: r.ModelServerPods.ModelServiceAccountName, - SecurityContext: r.ModelServerPods.ModelPodSecurityContext, - Containers: []corev1.Container{ - { - Name: "server", - Image: profile.Image, - Command: []string{"python3", "-m", "vllm.entrypoints.openai.api_server"}, - Args: args, - Env: env, - SecurityContext: r.ModelServerPods.ModelContainerSecurityContext, - Resources: corev1.ResourceRequirements{ - Requests: profile.Requests, - Limits: profile.Limits, - }, - Ports: []corev1.ContainerPort{ - { - ContainerPort: 8000, - Protocol: corev1.ProtocolTCP, - Name: "http", - }, - }, - StartupProbe: &corev1.Probe{ - // TODO: Decrease the default and make it configurable. - // Give the model 3 hours to start up. - FailureThreshold: 5400, - PeriodSeconds: 2, - TimeoutSeconds: 2, - SuccessThreshold: 1, - ProbeHandler: corev1.ProbeHandler{ - HTTPGet: &corev1.HTTPGetAction{ - Path: "/health", - Port: intstr.FromString("http"), - }, - }, - }, - ReadinessProbe: &corev1.Probe{ - FailureThreshold: 3, - PeriodSeconds: 10, - TimeoutSeconds: 2, - SuccessThreshold: 1, - ProbeHandler: corev1.ProbeHandler{ - HTTPGet: &corev1.HTTPGetAction{ - Path: "/health", - Port: intstr.FromString("http"), - }, - }, - }, - LivenessProbe: &corev1.Probe{ - FailureThreshold: 3, - PeriodSeconds: 30, - TimeoutSeconds: 3, - SuccessThreshold: 1, - ProbeHandler: corev1.ProbeHandler{ - HTTPGet: &corev1.HTTPGetAction{ - Path: "/health", - Port: intstr.FromString("http"), - }, - }, - }, - VolumeMounts: []corev1.VolumeMount{ - { - Name: "dshm", - MountPath: "/dev/shm", - }, - }, - }, - }, - Volumes: []corev1.Volume{ - { - Name: "dshm", - VolumeSource: corev1.VolumeSource{ - EmptyDir: &corev1.EmptyDirVolumeSource{ - Medium: corev1.StorageMediumMemory, - // TODO: Set size limit - }, - }, - }, - }, - }, - } - - return pod -} - -func (r *ModelReconciler) oLlamaPodForModel(m *kubeaiv1.Model, profile ModelConfig) *corev1.Pod { - lbs := labelsForModel(m) - ann := r.annotationsForModel(m) - - if _, ok := ann[kubeaiv1.ModelPodPortAnnotation]; !ok { - // Set port to 8000 (vLLM) if not overwritten. - ann[kubeaiv1.ModelPodPortAnnotation] = "8000" - } - - env := []corev1.EnvVar{ - { - Name: "OLLAMA_HOST", - Value: "0.0.0.0:8000", - }, - { - // Ollama server typically operates in a 1:N server-to-model mode so it - // swaps models in and out of memory. In our case we are deploying 1:1 - // model-to-server-pod so we want to always keep the model in memory. - Name: "OLLAMA_KEEP_ALIVE", - // Ollama treates 0 as "no keep alive" so we need to set a large value. - Value: "999999h", - }, - } - var envKeys []string - for key := range m.Spec.Env { - envKeys = append(envKeys, key) - } - sort.Strings(envKeys) - for _, key := range envKeys { - env = append(env, corev1.EnvVar{ - Name: key, - Value: m.Spec.Env[key], - }) - } - - ollamaModelRef := strings.TrimPrefix(m.Spec.URL, "ollama://") - - featuresMap := map[kubeaiv1.ModelFeature]struct{}{} - for _, f := range m.Spec.Features { - featuresMap[f] = struct{}{} - } - - // Pull model and copy to rename it to Model.metadata.name. - // See Ollama issue for rename/copy workaround: https://github.com/ollama/ollama/issues/5914 - // NOTE: The cp command should just create a pointer to the old model, not copy data - // (see https://github.com/ollama/ollama/issues/5914#issuecomment-2248168474). - // Use `ollama run` to send a single prompt to ollama to load the model into memory - // before the Pod becomes Ready. (by default it will load on the first prompt request). - startupProbeScript := fmt.Sprintf("/bin/ollama pull %s && /bin/ollama cp %s %s", - ollamaModelRef, ollamaModelRef, m.Name) - if _, ok := featuresMap[kubeaiv1.ModelFeatureTextGeneration]; ok { - // NOTE: Embedding text models do not support "ollama run": - // - // ollama run nomic-embed-text hey - // Error: "nomic-embed-text" does not support generate - // - startupProbeScript += fmt.Sprintf(" && /bin/ollama run %s hi", m.Name) - } - - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: m.Namespace, - Labels: lbs, - Annotations: ann, - }, - Spec: corev1.PodSpec{ - NodeSelector: profile.NodeSelector, - Affinity: profile.Affinity, - Tolerations: profile.Tolerations, - RuntimeClassName: profile.RuntimeClassName, - ServiceAccountName: r.ModelServerPods.ModelServiceAccountName, - SecurityContext: r.ModelServerPods.ModelPodSecurityContext, - Containers: []corev1.Container{ - { - Name: "server", - Image: profile.Image, - Args: m.Spec.Args, - Env: env, - SecurityContext: r.ModelServerPods.ModelContainerSecurityContext, - Resources: corev1.ResourceRequirements{ - Requests: profile.Requests, - Limits: profile.Limits, - }, - Ports: []corev1.ContainerPort{ - { - ContainerPort: 8000, - Protocol: corev1.ProtocolTCP, - Name: "http", - }, - }, - // Use a startup probe to pull the model because ollama server needs - // to be running already (`ollama pull` issues a HTTP request to the server). - // Example log from ollama server when a model is pulled: - // [GIN] 2024/08/20 - 15:12:28 | 200 | 981.561436ms | 127.0.0.1 | POST "/api/pull" - StartupProbe: &corev1.Probe{ - InitialDelaySeconds: 1, - PeriodSeconds: 3, - FailureThreshold: 10, - // Give the model pull 180 minutes to complete. - TimeoutSeconds: 60 * 180, - ProbeHandler: corev1.ProbeHandler{ - Exec: &corev1.ExecAction{ - Command: []string{ - "bash", "-c", - startupProbeScript, - }, - }, - }, - }, - ReadinessProbe: &corev1.Probe{ - FailureThreshold: 3, - // Will be delayed by the startup probe, so no need to delay here. - InitialDelaySeconds: 0, - PeriodSeconds: 10, - TimeoutSeconds: 2, - SuccessThreshold: 1, - ProbeHandler: corev1.ProbeHandler{ - HTTPGet: &corev1.HTTPGetAction{ - Path: "/", - Port: intstr.FromString("http"), - }, - }, - }, - LivenessProbe: &corev1.Probe{ - FailureThreshold: 3, - InitialDelaySeconds: 900, - TimeoutSeconds: 3, - PeriodSeconds: 30, - SuccessThreshold: 1, - ProbeHandler: corev1.ProbeHandler{ - HTTPGet: &corev1.HTTPGetAction{ - Path: "/", - Port: intstr.FromString("http"), - }, - }, - }, - VolumeMounts: []corev1.VolumeMount{ - { - Name: "dshm", - MountPath: "/dev/shm", - }, - }, - }, - }, - Volumes: []corev1.Volume{ - { - Name: "dshm", - VolumeSource: corev1.VolumeSource{ - EmptyDir: &corev1.EmptyDirVolumeSource{ - Medium: corev1.StorageMediumMemory, - // TODO: Set size limit - }, - }, - }, - }, - }, - } - - return pod - -} - -func (r *ModelReconciler) fasterWhisperPodForModel(m *kubeaiv1.Model, profile ModelConfig) *corev1.Pod { - lbs := labelsForModel(m) - ann := r.annotationsForModel(m) - if _, ok := ann[kubeaiv1.ModelPodPortAnnotation]; !ok { - ann[kubeaiv1.ModelPodPortAnnotation] = "8000" - } - - args := []string{} - args = append(args, m.Spec.Args...) - - env := []corev1.EnvVar{ - { - Name: "WHISPER__MODEL", - Value: strings.TrimPrefix(m.Spec.URL, "hf://"), - }, - { - Name: "ENABLE_UI", - Value: "false", - }, - { - // TODO: Conditionally set this token based on whether - // huggingface is the model source. - Name: "HF_TOKEN", - ValueFrom: &corev1.EnvVarSource{ - SecretKeyRef: &corev1.SecretKeySelector{ - LocalObjectReference: corev1.LocalObjectReference{ - Name: r.HuggingfaceSecretName, - }, - Key: "token", - Optional: ptr.To(true), - }, - }, - }, - } - var envKeys []string - for key := range m.Spec.Env { - envKeys = append(envKeys, key) - } - sort.Strings(envKeys) - for _, key := range envKeys { - env = append(env, corev1.EnvVar{ - Name: key, - Value: m.Spec.Env[key], - }) - } - - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: m.Namespace, - Labels: lbs, - Annotations: ann, - }, - Spec: corev1.PodSpec{ - NodeSelector: profile.NodeSelector, - Affinity: profile.Affinity, - Tolerations: profile.Tolerations, - RuntimeClassName: profile.RuntimeClassName, - ServiceAccountName: r.ModelServerPods.ModelServiceAccountName, - SecurityContext: r.ModelServerPods.ModelPodSecurityContext, - Containers: []corev1.Container{ - { - Name: "server", - Image: profile.Image, - Args: args, - Env: env, - SecurityContext: r.ModelServerPods.ModelContainerSecurityContext, - Resources: corev1.ResourceRequirements{ - Requests: profile.Requests, - Limits: profile.Limits, - }, - Ports: []corev1.ContainerPort{ - { - ContainerPort: 8000, - Protocol: corev1.ProtocolTCP, - Name: "http", - }, - }, - StartupProbe: &corev1.Probe{ - // Give the model 30 minutes to start up. - FailureThreshold: 900, - PeriodSeconds: 2, - TimeoutSeconds: 2, - SuccessThreshold: 1, - ProbeHandler: corev1.ProbeHandler{ - HTTPGet: &corev1.HTTPGetAction{ - Path: "/health", - Port: intstr.FromString("http"), - }, - }, - }, - ReadinessProbe: &corev1.Probe{ - FailureThreshold: 3, - PeriodSeconds: 10, - TimeoutSeconds: 2, - SuccessThreshold: 1, - ProbeHandler: corev1.ProbeHandler{ - HTTPGet: &corev1.HTTPGetAction{ - Path: "/health", - Port: intstr.FromString("http"), - }, - }, - }, - LivenessProbe: &corev1.Probe{ - FailureThreshold: 3, - PeriodSeconds: 30, - TimeoutSeconds: 3, - SuccessThreshold: 1, - ProbeHandler: corev1.ProbeHandler{ - HTTPGet: &corev1.HTTPGetAction{ - Path: "/health", - Port: intstr.FromString("http"), - }, - }, - }, - VolumeMounts: []corev1.VolumeMount{ - { - Name: "dshm", - MountPath: "/dev/shm", - }, - }, - }, - }, - Volumes: []corev1.Volume{ - { - Name: "dshm", - VolumeSource: corev1.VolumeSource{ - EmptyDir: &corev1.EmptyDirVolumeSource{ - Medium: corev1.StorageMediumMemory, - }, - }, - }, - }, - }, - } - - return pod -} - -func (r *ModelReconciler) infinityPodForModel(m *kubeaiv1.Model, profile ModelConfig) *corev1.Pod { - lbs := labelsForModel(m) - ann := r.annotationsForModel(m) - - args := []string{ - "v2", - } - args = append(args, m.Spec.Args...) - - if _, ok := ann[kubeaiv1.ModelPodPortAnnotation]; !ok { - ann[kubeaiv1.ModelPodPortAnnotation] = "8000" - } - - env := []corev1.EnvVar{ - { - Name: "INFINITY_MODEL_ID", - // TODO: infinity supports multiple models, separate by comma. - Value: strings.TrimPrefix(m.Spec.URL, "hf://"), - }, - { - Name: "INFINITY_SERVED_MODEL_NAME", - Value: m.Name, - }, - { - Name: "INFINITY_URL_PREFIX", - Value: "/v1", - }, - { - Name: "INFINITY_ENGINE", - // TODO: switch between optimum backend (cpu), nvidia/amd (torch), inf2 (inferentia) based on what is available. - Value: "torch", - }, - { - Name: "INFINITY_PORT", - Value: "8000", - }, - { - // TODO: Conditionally set this token based on whether - // huggingface is the model source. - Name: "HF_TOKEN", - ValueFrom: &corev1.EnvVarSource{ - SecretKeyRef: &corev1.SecretKeySelector{ - LocalObjectReference: corev1.LocalObjectReference{ - Name: r.HuggingfaceSecretName, - }, - Key: "token", - Optional: ptr.To(true), - }, - }, - }, - } - var envKeys []string - for key := range m.Spec.Env { - envKeys = append(envKeys, key) - } - sort.Strings(envKeys) - for _, key := range envKeys { - env = append(env, corev1.EnvVar{ - Name: key, - Value: m.Spec.Env[key], - }) - } - - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: m.Namespace, - Labels: lbs, - Annotations: ann, - }, - Spec: corev1.PodSpec{ - NodeSelector: profile.NodeSelector, - Affinity: profile.Affinity, - Tolerations: profile.Tolerations, - RuntimeClassName: profile.RuntimeClassName, - ServiceAccountName: r.ModelServerPods.ModelServiceAccountName, - SecurityContext: r.ModelServerPods.ModelPodSecurityContext, - Containers: []corev1.Container{ - { - Name: "server", - Image: profile.Image, - Args: args, - Env: env, - Resources: corev1.ResourceRequirements{ - Requests: profile.Requests, - Limits: profile.Limits, - }, - - Ports: []corev1.ContainerPort{ - { - ContainerPort: 8000, - Protocol: corev1.ProtocolTCP, - Name: "http", - }, - }, - StartupProbe: &corev1.Probe{ - // TODO: Decrease the default and make it configurable. - // Give the model 20 minutes to start up. - FailureThreshold: 600, - PeriodSeconds: 2, - TimeoutSeconds: 2, - SuccessThreshold: 1, - ProbeHandler: corev1.ProbeHandler{ - HTTPGet: &corev1.HTTPGetAction{ - Path: "/health", - Port: intstr.FromString("http"), - }, - }, - }, - ReadinessProbe: &corev1.Probe{ - FailureThreshold: 3, - PeriodSeconds: 10, - TimeoutSeconds: 2, - SuccessThreshold: 1, - ProbeHandler: corev1.ProbeHandler{ - HTTPGet: &corev1.HTTPGetAction{ - Path: "/health", - Port: intstr.FromString("http"), - }, - }, - }, - LivenessProbe: &corev1.Probe{ - FailureThreshold: 3, - PeriodSeconds: 30, - TimeoutSeconds: 3, - SuccessThreshold: 1, - ProbeHandler: corev1.ProbeHandler{ - HTTPGet: &corev1.HTTPGetAction{ - Path: "/health", - Port: intstr.FromString("http"), - }, - }, - }, - VolumeMounts: []corev1.VolumeMount{ - { - Name: "dshm", - MountPath: "/dev/shm", - }, - }, - }, - }, - Volumes: []corev1.Volume{ - { - Name: "dshm", - VolumeSource: corev1.VolumeSource{ - EmptyDir: &corev1.EmptyDirVolumeSource{ - Medium: corev1.StorageMediumMemory, - // TODO: Set size limit - }, - }, - }, - }, - }, - } - - return pod -} +var errReturnEarly = fmt.Errorf("return early") func labelsForModel(m *kubeaiv1.Model) map[string]string { engineLowerCase := strings.ToLower(m.Spec.Engine) @@ -781,13 +223,73 @@ func (r *ModelReconciler) annotationsForModel(m *kubeaiv1.Model) map[string]stri } type ModelConfig struct { + config.CacheProfile config.ResourceProfile - Image string + Image string + Source modelSource +} + +type modelSource struct { + typ modelSourceType + huggingface huggingfaceModelSource + ollama ollamaModelSource +} + +type modelSourceType string + +const ( + modelSourceTypeHuggingface modelSourceType = "huggingface" + modelSourceTypeOLlama modelSourceType = "ollama" +) + +type huggingfaceModelSource struct { + repo string +} +type ollamaModelSource struct { + ref string +} + +func parseModelSource(url string) (modelSource, error) { + const ( + huggingfacePrefix = "hf://" + ollamaPrefix = "ollama://" + ) + switch { + case strings.HasPrefix(url, huggingfacePrefix): + return modelSource{ + typ: modelSourceTypeHuggingface, + huggingface: huggingfaceModelSource{ + repo: strings.TrimPrefix(url, huggingfacePrefix), + }, + }, nil + case strings.HasPrefix(url, ollamaPrefix): + return modelSource{ + typ: modelSourceTypeOLlama, + ollama: ollamaModelSource{ + ref: strings.TrimPrefix(url, ollamaPrefix), + }, + }, nil + } + return modelSource{}, fmt.Errorf("unrecognized model source: %q", url) } func (r *ModelReconciler) getModelConfig(model *kubeaiv1.Model) (ModelConfig, error) { var result ModelConfig + src, err := parseModelSource(model.Spec.URL) + if err != nil { + return result, fmt.Errorf("parsing model source: %w", err) + } + result.Source = src + + if model.Spec.CacheProfile != "" { + cacheProfile, ok := r.CacheProfiles[model.Spec.CacheProfile] + if !ok { + return result, fmt.Errorf("cache profile not found: %q", model.Spec.CacheProfile) + } + result.CacheProfile = cacheProfile + } + split := strings.Split(model.Spec.ResourceProfile, ":") if len(split) != 2 { return result, fmt.Errorf("invalid resource profile: %q, should match :, example: nvidia-gpu-l4:2", model.Spec.ResourceProfile) @@ -918,27 +420,3 @@ func (r *ModelReconciler) applySelfLabels(model *kubeaiv1.Model) bool { return changed } - -func resourcesEqual(a, b corev1.ResourceList) bool { - if len(a) != len(b) { - return false - } - for key, quantity := range a { - if q, ok := b[key]; !ok || !q.Equal(quantity) { - return false - } - } - return true -} - -func selectorsEqual(a, b map[string]string) bool { - if len(a) != len(b) { - return false - } - for key, val := range a { - if v, ok := b[key]; !ok || v != val { - return false - } - } - return true -} diff --git a/internal/modelcontroller/model_controller_test.go b/internal/modelcontroller/model_controller_test.go index 6e2f4365..11afd766 100644 --- a/internal/modelcontroller/model_controller_test.go +++ b/internal/modelcontroller/model_controller_test.go @@ -85,6 +85,7 @@ func Test_getModelConfig(t *testing.T) { Spec: v1.ModelSpec{ Engine: v1.VLLMEngine, ResourceProfile: "my-gpu:2", + URL: "hf://some-repo/some-model", }, }, expected: ModelConfig{ diff --git a/skaffold.yaml b/skaffold.yaml index a52e63b7..ee28704e 100644 --- a/skaffold.yaml +++ b/skaffold.yaml @@ -40,13 +40,50 @@ profiles: chartPath: ./charts/kubeai setValueTemplates: openwebui.enabled: false -- name: kubeai-only-rapid-scaling + +- name: kubeai-only-gke + build: + local: + push: true + deploy: + helm: + releases: + - name: kubeai + chartPath: ./charts/kubeai + setValueTemplates: + openwebui.enabled: false + valuesFiles: + - ./charts/kubeai/values-gke.yaml + +- name: e2e-test-default deploy: helm: releases: - name: kubeai chartPath: ./charts/kubeai + valuesFiles: + - ./test/e2e/common-values.yaml + setValueTemplates: + openwebui.enabled: false +- name: e2e-test-autoscaler-restart + deploy: + helm: + releases: + - name: kubeai + chartPath: ./charts/kubeai + valuesFiles: + - ./test/e2e/common-values.yaml setValueTemplates: openwebui.enabled: false modelAutoscaling.interval: 1s - modelAutoscaling.timeWindow: 30s \ No newline at end of file + modelAutoscaling.timeWindow: 30s +- name: e2e-test-engine + deploy: + helm: + releases: + - name: kubeai + chartPath: ./charts/kubeai + valuesFiles: + - ./test/e2e/common-values.yaml + setValueTemplates: + openwebui.enabled: false diff --git a/test/e2e/cache-shared-filesystem/cache-mount-pod.yaml b/test/e2e/cache-shared-filesystem/cache-mount-pod.yaml new file mode 100644 index 00000000..265766ea --- /dev/null +++ b/test/e2e/cache-shared-filesystem/cache-mount-pod.yaml @@ -0,0 +1,16 @@ +apiVersion: v1 +kind: Pod +metadata: + name: cache-mount-pod +spec: + containers: + - name: main + image: ubuntu + command: ["sleep", "10000"] + volumeMounts: + - name: models + mountPath: /test-mount + volumes: + - name: models + persistentVolumeClaim: + claimName: shared-model-cache-e2e-test-kind-pv \ No newline at end of file diff --git a/test/e2e/cache-shared-filesystem/test.sh b/test/e2e/cache-shared-filesystem/test.sh new file mode 100755 index 00000000..f55bd9d8 --- /dev/null +++ b/test/e2e/cache-shared-filesystem/test.sh @@ -0,0 +1,21 @@ +#!/bin/bash + +source $REPO_DIR/test/e2e/common.sh + +models_release="kubeai-models" + + +helm install $models_release $REPO_DIR/charts/models -f - <