Skip to content

Commit

Permalink
Switch eph container to sidecar
Browse files Browse the repository at this point in the history
  • Loading branch information
nstogner committed Nov 12, 2024
1 parent cfb07cd commit 33fa72e
Show file tree
Hide file tree
Showing 15 changed files with 71 additions and 132 deletions.
8 changes: 0 additions & 8 deletions hack/dev-models/gke-vllm-gpu-adapters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,10 @@ spec:
features: [TextGeneration]
owner: meta-llama
url: hf://TinyLlama/TinyLlama-1.1B-Chat-v0.3
#url: hf://meta-llama/Llama-2-7b
adapters:
- id: test
url: hf://jashing/tinyllama-colorist-lora
#- id: sql
# url: hf://yard1/llama-2-7b-sql-lora-test
engine: VLLM
#args:
# - --max-model-len=16384
# - --max-num-batched-token=16384
# - --gpu-memory-utilization=0.8
# - --cpu-offload-gb=10
resourceProfile: nvidia-gpu-l4:1
minReplicas: 1
---
Expand Down
26 changes: 20 additions & 6 deletions internal/endpoints/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ func newEndpointGroup() *endpointGroup {
return e
}

type endpoint struct {
inFlight *atomic.Int64
}

type endpointGroup struct {
mtx sync.RWMutex
endpoints map[string]endpoint
Expand All @@ -26,6 +22,20 @@ type endpointGroup struct {
bcast chan struct{} // closed when there's a broadcast
}

func newEndpoint() endpoint {
return endpoint{
inFlight: &atomic.Int64{},
endpointAttrs: endpointAttrs{
adapters: make(map[string]struct{}),
},
}
}

type endpoint struct {
inFlight *atomic.Int64
endpointAttrs
}

// getBestAddr returns the best "IP:Port". It blocks until there are available endpoints
// in the endpoint group. It selects the host with the minimum in-flight requests
// among all the available endpoints.
Expand Down Expand Up @@ -83,11 +93,15 @@ func (g *endpointGroup) lenIPs() int {
return len(g.endpoints)
}

func (g *endpointGroup) setAddrs(ips map[string]struct{}) {
type endpointAttrs struct {
adapters map[string]struct{}
}

func (g *endpointGroup) setAddrs(ips map[string]endpointAttrs) {
g.mtx.Lock()
for ip := range ips {
if _, ok := g.endpoints[ip]; !ok {
g.endpoints[ip] = endpoint{inFlight: &atomic.Int64{}}
g.endpoints[ip] = newEndpoint()
}
}
for ip := range g.endpoints {
Expand Down
2 changes: 1 addition & 1 deletion internal/endpoints/endpoints_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

func BenchmarkEndpointGroup(b *testing.B) {
e := newEndpointGroup()
e.setAddrs(map[string]struct{}{"10.0.0.1": {}})
e.setAddrs(map[string]endpointAttrs{"10.0.0.1": {}})
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
Expand Down
8 changes: 4 additions & 4 deletions internal/endpoints/endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ func TestConcurrentAccess(t *testing.T) {
}
for name, spec := range testCases {
randomReadFn := []func(g *endpointGroup){
func(g *endpointGroup) { g.getBestAddr(nil) },
func(g *endpointGroup) { g.getBestAddr(context.Background()) },
func(g *endpointGroup) { g.getAllAddrs() },
func(g *endpointGroup) { g.lenIPs() },
}
t.Run(name, func(t *testing.T) {
// setup endpoint with one service so that requests are not waiting
endpoint := newEndpointGroup()
endpoint.setAddrs(
map[string]struct{}{myModel: {}},
map[string]endpointAttrs{myModel: {}},
)

var startWg, doneWg sync.WaitGroup
Expand All @@ -53,7 +53,7 @@ func TestConcurrentAccess(t *testing.T) {
startTogether(spec.readerCount, func() { randomReadFn[rand.Intn(len(randomReadFn)-1)](endpoint) })
startTogether(spec.writerCount, func() {
endpoint.setAddrs(
map[string]struct{}{rand.String(1): {}},
map[string]endpointAttrs{rand.String(1): {}},
)
})
doneWg.Wait()
Expand Down Expand Up @@ -86,7 +86,7 @@ func TestBlockAndWaitForEndpoints(t *testing.T) {

// when broadcast triggered
endpoint.setAddrs(
map[string]struct{}{rand.String(4): {}},
map[string]endpointAttrs{rand.String(4): {}},
)
// then
doneWg.Wait()
Expand Down
19 changes: 17 additions & 2 deletions internal/endpoints/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log"
"strings"
"sync"

kubeaiv1 "github.com/substratusai/kubeai/api/v1"
Expand Down Expand Up @@ -88,7 +89,7 @@ func (r *Resolver) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result
return ctrl.Result{}, fmt.Errorf("listing matching pods: %w", err)
}

addrs := map[string]struct{}{}
addrs := map[string]endpointAttrs{}
for _, pod := range podList.Items {
if _, exclude := r.ExcludePods[pod.Name]; exclude {
continue
Expand Down Expand Up @@ -116,14 +117,28 @@ func (r *Resolver) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result
continue
}

addrs[ip+":"+port] = struct{}{}
addrs[ip+":"+port] = getEndpointAttrs(pod)
}

r.getEndpoints(modelName).setAddrs(addrs)

return ctrl.Result{}, nil
}

func getEndpointAttrs(pod corev1.Pod) endpointAttrs {
attrs := endpointAttrs{
adapters: map[string]struct{}{},
}

for k := range pod.GetLabels() {
if strings.HasPrefix(k, kubeaiv1.PodAdapterLabelPrefix) {
attrs.adapters[strings.TrimPrefix(k, kubeaiv1.PodAdapterLabelPrefix)] = struct{}{}
}
}

return attrs
}

func getPodAnnotation(pod corev1.Pod, key string) string {
if ann := pod.GetAnnotations(); ann != nil {
return ann[key]
Expand Down
8 changes: 4 additions & 4 deletions internal/endpoints/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import (
)

func TestAwaitBestHost(t *testing.T) {
const myModel = "myModel"
const myModel = "my-model"

manager := &Resolver{endpoints: make(map[string]*endpointGroup, 1)}
manager.getEndpoints(myModel).
setAddrs(map[string]struct{}{myModel: {}})
setAddrs(map[string]endpointAttrs{myModel: {}})

testCases := map[string]struct {
model string
Expand All @@ -25,8 +25,8 @@ func TestAwaitBestHost(t *testing.T) {
model: myModel,
timeout: time.Millisecond,
},
"unknown service - blocks until timeout": {
model: "unknownService",
"unknown endpoint - blocks until timeout": {
model: "unknown-model",
timeout: time.Millisecond,
expErr: context.DeadlineExceeded,
},
Expand Down
54 changes: 4 additions & 50 deletions internal/k8sutils/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,56 +56,10 @@ func DeepHashObject(hasher hash.Hash, objectToWrite interface{}) {
fmt.Fprintf(hasher, "%v", dump.ForHash(objectToWrite))
}

// RemoveEphemeralContainer removes the container with the given name from the pod
// and returns true if it did.
func RemoveEphemeralContainer(pod *corev1.Pod, name string) bool {
if i := FindEphemeralContainer(pod, name); i != -1 {
pod.Spec.EphemeralContainers = append(pod.Spec.EphemeralContainers[:i], pod.Spec.EphemeralContainers[i+1:]...)
return true
}
return false
}

// AddEphemeralContainer adds the container to the pod and returns true if it did.
func AddEphemeralContainer(pod *corev1.Pod, container corev1.EphemeralContainer) bool {
if FindEphemeralContainer(pod, container.Name) != -1 {
return false
}
pod.Spec.EphemeralContainers = append(pod.Spec.EphemeralContainers, container)
return true
}

// FindEphemeralContainer returns the index of the container with the given name in the pod.
func FindEphemeralContainer(pod *corev1.Pod, name string) int {
for i, container := range pod.Spec.EphemeralContainers {
if container.Name == name {
return i
}
}
return -1
}

// EphemeralContainerIsRunning returns true if the ephemeral container with the given name is started.
// NOTE: .status.ephemeralContainerStatuses[].ready and .started appear to never be true, so we use
// .state.running.startedAt instead.
func EphemeralContainerIsRunning(pod *corev1.Pod, containerName string) bool {
// Example:
//
// ephemeralContainerStatuses:
// - containerID: containerd://7455a9e04440fa74a2f483495e9343feeb4e762bbc5498dff5b7ccd0d4209ca8
// image: docker.io/library/kubeai-model-loader:latest
// imageID: docker.io/library/import-2024-11-07@sha256:b0708534a7587995bd8949d5e539499d41fc1cdce678a05c407cd618e135c980
// lastState: {}
// name: adapter-loader
// ready: false
// restartCount: 0
// state:
// running:
// startedAt: "2024-11-07T21:47:38Z"
//
for _, containerStatus := range pod.Status.EphemeralContainerStatuses {
if containerStatus.Name == containerName {
return containerStatus.State.Running != nil && !containerStatus.State.Running.StartedAt.IsZero()
func ContainerIsReady(pod *corev1.Pod, containerName string) bool {
for _, status := range pod.Status.ContainerStatuses {
if status.Name == containerName {
return status.Ready
}
}
return false
Expand Down
69 changes: 19 additions & 50 deletions internal/modelcontroller/adapters.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

const (
adapterLoaderContainerName = "adapter-loader"
loaderContainerName = "loader"
)

func (r *ModelReconciler) reconcileAdapters(ctx context.Context, pods []*corev1.Pod, adapters []v1.Adapter) error {
Expand Down Expand Up @@ -60,10 +60,7 @@ func (r *ModelReconciler) reconcileAdapters(ctx context.Context, pods []*corev1.
for _, param := range reconcileList {
// TODO: Parallelize
addr := getPodModelServerAddr(param.pod)
if err := r.ensureAdapterLoader(ctx, param.pod /*, len(param.toEnsure) > 0 || len(param.toRemoveIDs) > 0*/); err != nil {
return fmt.Errorf("reconcile adapter loader for pod %q: %w", param.pod.Namespace+"/"+param.pod.Name, err)
}
if !k8sutils.EphemeralContainerIsRunning(param.pod, adapterLoaderContainerName) {
if !k8sutils.ContainerIsReady(param.pod, loaderContainerName) {
return errReturnEarly
}
for _, adapter := range param.toEnsure {
Expand Down Expand Up @@ -127,51 +124,9 @@ func getPodModelServerAddr(pod *corev1.Pod) string {
return fmt.Sprintf("http://%s:%s", ip, port)
}

// ensureAdapterLoader ensures that the adapter loader ephemeral container is present or not.
func (r *ModelReconciler) ensureAdapterLoader(ctx context.Context, pod *corev1.Pod /*, enabled bool*/) error {
// NOTE: Ephemeral containers cannot be removed once added.
/*
if !enabled {
changed := k8sutils.RemoveEphemeralContainer(pod, adapterLoaderContainerName)
if !changed {
return nil
}
if err := r.Client.SubResource("ephemeralcontainers").Update(ctx, pod); err != nil {
return fmt.Errorf("update pod ephemeral containers: %w", err)
}
return nil
}
*/

container := corev1.EphemeralContainer{
EphemeralContainerCommon: corev1.EphemeralContainerCommon{
Name: adapterLoaderContainerName,
Image: r.ModelLoaders.Image,
ImagePullPolicy: corev1.PullIfNotPresent,
Command: []string{"sleep", "infinity"},
VolumeMounts: []corev1.VolumeMount{
{
Name: adaptersVolName,
MountPath: adaptersRootDir,
},
},
},
TargetContainerName: serverContainerName,
}

changed := k8sutils.AddEphemeralContainer(pod, container)
if changed {
if err := r.Client.SubResource("ephemeralcontainers").Update(ctx, pod); err != nil {
return fmt.Errorf("update pod ephemeral containers: %w", err)
}
}

return nil
}

// execAdapterLoad executes the adapter load command in the adapter loader container.
func (r *ModelReconciler) execAdapterLoad(ctx context.Context, pod *corev1.Pod, adapter v1.Adapter) error {
if err := r.execPod(ctx, pod, adapterLoaderContainerName, []string{
if err := r.execPod(ctx, pod, loaderContainerName, []string{
"load", adapter.URL, adapterDir(adapter),
}); err != nil {
return fmt.Errorf("exec adapter load: %w", err)
Expand All @@ -180,7 +135,7 @@ func (r *ModelReconciler) execAdapterLoad(ctx context.Context, pod *corev1.Pod,
}

func (r *ModelReconciler) execAdapterUnload(ctx context.Context, pod *corev1.Pod, adapterID string) error {
if err := r.execPod(ctx, pod, adapterLoaderContainerName, []string{
if err := r.execPod(ctx, pod, loaderContainerName, []string{
"rm", "-rf", adapterDir(v1.Adapter{ID: adapterID}),
}); err != nil {
return fmt.Errorf("exec adapter load: %w", err)
Expand All @@ -197,7 +152,7 @@ func adapterDir(a v1.Adapter) string {
return fmt.Sprintf("%s/%s", adaptersRootDir, a.ID)
}

func patchServerAdapterVolume(podSpec *corev1.PodSpec) {
func patchServerAdapterLoader(podSpec *corev1.PodSpec, image string) {
podSpec.Volumes = append(podSpec.Volumes, corev1.Volume{
Name: adaptersVolName,
VolumeSource: corev1.VolumeSource{
Expand All @@ -213,6 +168,20 @@ func patchServerAdapterVolume(podSpec *corev1.PodSpec) {
})
}
}

loaderContainer := corev1.Container{
Name: loaderContainerName,
Image: image,
ImagePullPolicy: corev1.PullIfNotPresent,
Command: []string{"sleep", "infinity"},
VolumeMounts: []corev1.VolumeMount{
{
Name: adaptersVolName,
MountPath: adaptersRootDir,
},
},
}
podSpec.Containers = append(podSpec.Containers, loaderContainer)
}

func getLabelledAdapters(pod *corev1.Pod) map[string]struct{} {
Expand Down
1 change: 0 additions & 1 deletion internal/modelcontroller/engine_fasterwhisper.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ func (r *ModelReconciler) fasterWhisperPodForModel(m *kubeaiv1.Model, c ModelCon
},
}

patchServerAdapterVolume(&pod.Spec)
patchServerCacheVolumes(&pod.Spec, m, c)

return pod
Expand Down
1 change: 0 additions & 1 deletion internal/modelcontroller/engine_infinity.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ func (r *ModelReconciler) infinityPodForModel(m *kubeaiv1.Model, c ModelConfig)
},
}

patchServerAdapterVolume(&pod.Spec)
patchServerCacheVolumes(&pod.Spec, m, c)

return pod
Expand Down
1 change: 0 additions & 1 deletion internal/modelcontroller/engine_ollama.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ func (r *ModelReconciler) oLlamaPodForModel(m *kubeaiv1.Model, c ModelConfig) *c
},
}

patchServerAdapterVolume(&pod.Spec)
patchServerCacheVolumes(&pod.Spec, m, c)

return pod
Expand Down
2 changes: 1 addition & 1 deletion internal/modelcontroller/engine_vllm.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (r *ModelReconciler) vLLMPodForModel(m *kubeaiv1.Model, c ModelConfig) *cor
},
}

patchServerAdapterVolume(&pod.Spec)
patchServerAdapterLoader(&pod.Spec, r.ModelLoaders.Image)
patchServerCacheVolumes(&pod.Spec, m, c)

return pod
Expand Down
Binary file modified proposals/diagrams/lora-direct-loading.excalidraw.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified proposals/diagrams/lora.excalidraw.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit 33fa72e

Please sign in to comment.