Skip to content

Commit

Permalink
Add queue cancellation on request cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
nstogner committed Nov 11, 2023
1 parent b336066 commit 2d42c28
Show file tree
Hide file tree
Showing 11 changed files with 244 additions and 149 deletions.
13 changes: 6 additions & 7 deletions autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"math"
"sync"
"time"

"github.com/substratusai/lingo/pkg/queuemanager"
)

func NewAutoscaler() *Autoscaler {
Expand All @@ -19,7 +21,9 @@ type Autoscaler struct {
AverageCount int

Scaler *DeploymentManager
FIFO *FIFOQueueManager
FIFO *queuemanager.FIFOQueueManager

ConcurrencyPerReplica int

movingAvgQueueSizeMtx sync.Mutex
movingAvgQueueSize map[string]*movingAvg
Expand All @@ -32,12 +36,7 @@ func (a *Autoscaler) Start() {
avg := a.getMovingAvgQueueSize(deploymentName)
avg.Next(float64(waitCount))
flt := avg.Calculate()
// TODO fix this to use configurable concurrency setting that's supplied
// by the user.
// Note this uses the default queue size, not the current queue size.
// the current queue size increases and decreases based on replica count
concurrencyPerReplica := a.FIFO.size
normalized := flt / float64(concurrencyPerReplica)
normalized := flt / float64(a.ConcurrencyPerReplica)
ceil := math.Ceil(normalized)
log.Printf("Average for deployment: %s: %v (ceil: %v), current wait count: %v", deploymentName, flt, ceil, waitCount)
a.Scaler.SetDesiredScale(deploymentName, int32(ceil))
Expand Down
65 changes: 0 additions & 65 deletions fifo_queue.go

This file was deleted.

63 changes: 0 additions & 63 deletions fifo_queue_manager.go

This file was deleted.

4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module golang-proxy
module github.com/substratusai/lingo

go 1.21.0

Expand Down Expand Up @@ -29,7 +29,7 @@ require (
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/google/uuid v1.4.0 // indirect
github.com/imdario/mergo v0.3.6 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJY
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4=
github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28=
github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
Expand Down
10 changes: 7 additions & 3 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,21 @@ import (
"net/http"
"net/http/httputil"
"net/url"

"github.com/google/uuid"
"github.com/substratusai/lingo/pkg/queuemanager"
)

// Handler serves http requests.
// It is also responsible for triggering scale-from-zero.
type Handler struct {
Deployments *DeploymentManager
Endpoints *EndpointsManager
FIFO *FIFOQueueManager
FIFO *queuemanager.FIFOQueueManager
}

func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
id := uuid.New().String()
log.Printf("request: %v", r.URL)

w.Header().Set("X-Proxy", "lingo")
Expand All @@ -45,9 +49,9 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.Deployments.AtLeastOne(deploy)

log.Println("Entering queue")
unqueue := h.FIFO.Enqueue(deploy)
complete := h.FIFO.EnqueueAndWait(r.Context(), deploy, id)
log.Println("Admitted into queue")
defer unqueue()
defer complete()

log.Println("Waiting for IPs")
host := h.Endpoints.GetHost(r.Context(), deploy)
Expand Down
8 changes: 5 additions & 3 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,17 @@ func TestIntegration(t *testing.T) {
completeRequests(backendComplete, 1)

// Ensure the deployment scaled scaled past 1.
sendRequests(t, &wg, modelName, 2)
// 1/3 should be admitted
// 2/3 should remain in queue --> replicas should equal 2
sendRequests(t, &wg, modelName, 3)
requireDeploymentReplicas(t, deploy, 2)

// Make sure deployment will not be scaled past default max (3).
sendRequests(t, &wg, modelName, 2)
requireDeploymentReplicas(t, deploy, 3)

// Have the mock backend respond to the remaining 4 requests.
completeRequests(backendComplete, 4)
completeRequests(backendComplete, 5)

// Ensure scale-down.
requireDeploymentReplicas(t, deploy, 0)
Expand All @@ -82,7 +84,7 @@ func requireDeploymentReplicas(t *testing.T, deploy *appsv1.Deployment, n int32)
err := testK8sClient.Get(testCtx, types.NamespacedName{Namespace: deploy.Namespace, Name: deploy.Name}, deploy)
assert.NoError(t, err, "getting the deployment")
assert.NotNil(t, deploy.Spec.Replicas, "scale-up should have occurred")
assert.Equal(t, *deploy.Spec.Replicas, n, "scale-up should have occurred")
assert.Equal(t, n, *deploy.Spec.Replicas, "scale-up should have occurred")
}, 3*time.Second, time.Second/2, "waiting for the deployment to be scaled up")
}

Expand Down
8 changes: 4 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log/zap"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

"github.com/substratusai/lingo/pkg/queuemanager"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -44,15 +45,13 @@ func run() error {
var enableLeaderElection bool
var probeAddr string
var concurrencyPerReplica int
var maxQueueSize int

flag.StringVar(&metricsAddr, "metrics-bind-address", ":8082", "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
flag.IntVar(&concurrencyPerReplica, "concurrency", 100, "the number of simultaneous requests that can be processed by each replica")
flag.IntVar(&maxQueueSize, "max-queue-size", 60000, "the maximum size of the queue that holds requests")
opts := zap.Options{
Development: true,
}
Expand Down Expand Up @@ -80,13 +79,13 @@ func run() error {
return fmt.Errorf("starting manager: %w", err)
}

fifo := NewFIFOQueueManager(concurrencyPerReplica, maxQueueSize)
fifo := queuemanager.NewFIFOQueueManager(concurrencyPerReplica)

endpoints, err := NewEndpointsManager(mgr)
if err != nil {
return fmt.Errorf("setting up endpoint manager: %w", err)
}
endpoints.EndpointSizeCallback = fifo.UpdateQueueSize
endpoints.EndpointSizeCallback = fifo.UpdateQueueSizeForReplicas

scaler, err := NewDeploymentManager(mgr)
if err != nil {
Expand All @@ -99,6 +98,7 @@ func run() error {
autoscaler.Interval = 3 * time.Second
autoscaler.AverageCount = 10 // 10 * 3 seconds = 30 sec avg
autoscaler.Scaler = scaler
autoscaler.ConcurrencyPerReplica = concurrencyPerReplica
autoscaler.FIFO = fifo
go autoscaler.Start()

Expand Down
7 changes: 5 additions & 2 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"testing"
"time"

"github.com/substratusai/lingo/pkg/queuemanager"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -63,11 +64,12 @@ func TestMain(m *testing.M) {
})
requireNoError(err)

fifo := NewFIFOQueueManager(1, 1000)
const concurrencyPerReplica = 1
fifo := queuemanager.NewFIFOQueueManager(concurrencyPerReplica)

endpoints, err := NewEndpointsManager(mgr)
requireNoError(err)
endpoints.EndpointSizeCallback = fifo.UpdateQueueSize
endpoints.EndpointSizeCallback = fifo.UpdateQueueSizeForReplicas

scaler, err := NewDeploymentManager(mgr)
requireNoError(err)
Expand All @@ -79,6 +81,7 @@ func TestMain(m *testing.M) {
autoscaler.AverageCount = 1 // 10 * 3 seconds = 30 sec avg
autoscaler.Scaler = scaler
autoscaler.FIFO = fifo
autoscaler.ConcurrencyPerReplica = concurrencyPerReplica
go autoscaler.Start()

handler := &Handler{
Expand Down
Loading

0 comments on commit 2d42c28

Please sign in to comment.