Skip to content

Commit

Permalink
Apply review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
alpe committed Jan 11, 2024
1 parent d57436a commit c5374c4
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 36 deletions.
6 changes: 1 addition & 5 deletions pkg/autoscaler/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,7 @@ func (a *Autoscaler) Start() {
log.Println("Calculating scales for all")

// TODO: Remove hardcoded Service lookup by name "lingo".
otherLingoEndpoints, err := a.Endpoints.GetAllHosts(context.Background(), "lingo", "stats")
if err != nil {
log.Printf("Failed to find endpoints: %v", err)
continue
}
otherLingoEndpoints := a.Endpoints.GetAllHosts("lingo", "stats")

stats, errs := aggregateStats(stats.Stats{
ActiveRequests: a.Queues.TotalCounts(),
Expand Down
25 changes: 2 additions & 23 deletions pkg/endpoints/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,27 +119,6 @@ func (r *Manager) AwaitHostAddress(ctx context.Context, service, portName string
}

// GetAllHosts retrieves the list of all hosts for a given service and port.
// It returns a slice of strings representing the hosts or an error if any context timeout occurred.
func (r *Manager) GetAllHosts(ctx context.Context, service, portName string) ([]string, error) {
return execWithCtxAbort(ctx, func() []string { return r.getEndpoints(service).getAllHosts(portName) })
}

func execWithCtxAbort[T any](ctx context.Context, f func() T) (T, error) {
resultChan := make(chan T, 1)
go func() {
select {
case resultChan <- f():
case <-ctx.Done(): // exit go routine, too
}
close(resultChan)
}()
var result T
select {
case <-ctx.Done():
// keep resultChan open to prevent panic on write
// the channel
return result, ctx.Err()
case result = <-resultChan:
return result, nil
}
func (r *Manager) GetAllHosts(service, portName string) []string {
return r.getEndpoints(service).getAllHosts(portName)
}
8 changes: 4 additions & 4 deletions pkg/endpoints/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestAwaitBestHost(t *testing.T) {
service string
portName string
timeout time.Duration
expErr bool
expErr error
}{
"all good": {
service: myService,
Expand All @@ -37,7 +37,7 @@ func TestAwaitBestHost(t *testing.T) {
service: "unknownService",
portName: myPort,
timeout: time.Millisecond,
expErr: true,
expErr: context.DeadlineExceeded,
},
// not covered: unknown port with multiple ports on entrypoint
}
Expand All @@ -48,8 +48,8 @@ func TestAwaitBestHost(t *testing.T) {
defer cancel()

gotHost, gotErr := manager.AwaitHostAddress(ctx, spec.service, spec.portName)
if spec.expErr {
require.Error(t, gotErr)
if spec.expErr != nil {
require.ErrorIs(t, spec.expErr, gotErr)
return
}
require.NoError(t, gotErr)
Expand Down
21 changes: 17 additions & 4 deletions pkg/proxy/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package proxy

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
Expand Down Expand Up @@ -58,10 +60,21 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log.Println("Waiting for IPs", id)
host, err := h.Endpoints.AwaitHostAddress(r.Context(), deploy, "http")
if err != nil {
log.Printf("timeout finding the host address %v", err)
w.WriteHeader(http.StatusRequestTimeout)
w.Write([]byte(fmt.Sprintf("Request timed out for model: %v", modelName)))
return
log.Printf("error while finding the host address %v", err)
switch {
case errors.Is(err, context.Canceled):
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte("Request cancelled"))
return
case errors.Is(err, context.DeadlineExceeded):
w.WriteHeader(http.StatusGatewayTimeout)
_, _ = w.Write([]byte(fmt.Sprintf("Request timed out for model: %v", modelName)))
return
default:
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte("Internal server error"))
return
}
}
log.Printf("Got host: %v, id: %v\n", host, id)

Expand Down

0 comments on commit c5374c4

Please sign in to comment.