Skip to content

Commit

Permalink
Demo retry middleware with reverse proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
alpe committed Jan 22, 2024
1 parent a5a39ab commit 723e417
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 16 deletions.
3 changes: 1 addition & 2 deletions cmd/lingo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,7 @@ func run() error {
go autoscaler.Start()

proxy.MustRegister(metricsRegistry)
var proxyHandler http.Handler = proxy.NewHandler(deploymentManager, endpointManager, queueManager)
proxyHandler = proxy.NewRetryMiddleware(maxRetriesOnErr, proxyHandler)
var proxyHandler http.Handler = proxy.NewHandler(deploymentManager, endpointManager, queueManager, maxRetriesOnErr)
proxyServer := &http.Server{Addr: ":8080", Handler: proxyHandler}

statsHandler := &stats.Handler{
Expand Down
19 changes: 13 additions & 6 deletions pkg/proxy/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,19 @@ type deploymentSource interface {
// Handler serves http requests for end-clients.
// It is also responsible for triggering scale-from-zero.
type Handler struct {
Deployments deploymentSource
Endpoints *endpoints.Manager
Queues *queue.Manager
Deployments deploymentSource
Endpoints *endpoints.Manager
Queues *queue.Manager
retriesOnErr int
}

func NewHandler(deployments deploymentSource, endpoints *endpoints.Manager, queues *queue.Manager) *Handler {
return &Handler{Deployments: deployments, Endpoints: endpoints, Queues: queues}
func NewHandler(deployments deploymentSource, endpoints *endpoints.Manager, queues *queue.Manager, retriesOnErr int) *Handler {
return &Handler{
Deployments: deployments,
Endpoints: endpoints,
Queues: queues,
retriesOnErr: retriesOnErr,
}
}

func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -120,7 +126,8 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log.Printf("Proxying request to host %v: %v\n", host, id)
// TODO: Avoid creating new reverse proxies for each request.
// TODO: Consider implementing a round robin scheme.
newReverseProxy(host).ServeHTTP(w, proxyRequest)
proxy := newReverseProxy(host)
NewRetryMiddleware(h.retriesOnErr, proxy).ServeHTTP(w, proxyRequest)
}

// parseModel parses the model name from the request
Expand Down
2 changes: 1 addition & 1 deletion pkg/proxy/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestProxy(t *testing.T) {
em, err := endpoints.NewManager(&fakeManager{}, func(deploymentName string, replicas int) {})
require.NoError(t, err)
em.SetEndpoints("my-deployment", map[string]struct{}{"my-ip": {}}, map[string]int32{"my-port": 8080})
h := NewHandler(deplMgr, em, queue.NewManager(10))
h := NewHandler(deplMgr, em, queue.NewManager(10), 1)

svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
em.SetEndpoints("my-deployment", map[string]struct{}{"my-other-ip": {}}, map[string]int32{"my-other-port": 8080})
Expand Down
2 changes: 1 addition & 1 deletion pkg/proxy/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestMetrics(t *testing.T) {

deplMgr, err := deployments.NewManager(&fakeManager{})
require.NoError(t, err)
h := NewHandler(deplMgr, nil, nil)
h := NewHandler(deplMgr, nil, nil, 2)
recorder := httptest.NewRecorder()

// when
Expand Down
8 changes: 2 additions & 6 deletions tests/integration/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,8 @@ func TestMain(m *testing.M) {
autoscaler.Endpoints = endpointManager
go autoscaler.Start()

handler := &proxy.Handler{
Deployments: deploymentManager,
Endpoints: endpointManager,
Queues: queueManager,
}
testServer = httptest.NewServer(proxy.NewRetryMiddleware(3, handler))
handler := proxy.NewHandler(deploymentManager, endpointManager, queueManager, 3)
testServer = httptest.NewServer(handler)
defer testServer.Close()

go func() {
Expand Down

0 comments on commit 723e417

Please sign in to comment.