Skip to content

Commit

Permalink
extproc: adds ability to insert custom router (#104)
Browse files Browse the repository at this point in the history
Signed-off-by: Takeshi Yoneda <t.y.mathetake@gmail.com>
  • Loading branch information
mathetake authored Jan 16, 2025
1 parent 08c31fd commit 3a490bf
Show file tree
Hide file tree
Showing 15 changed files with 351 additions and 120 deletions.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ test-cel: envtest apigen
# This requires the extproc binary to be built as well as Envoy binary to be available in the PATH.
.PHONY: test-extproc # This requires the extproc binary to be built.
test-extproc: build.extproc
@$(MAKE) build.extproc_custom_router CMD_PATH_PREFIX=examples
@$(MAKE) build.testupstream CMD_PATH_PREFIX=tests
@echo "Run ExtProc test"
@go test ./tests/extproc/... -tags test_extproc -v -count=1
Expand Down Expand Up @@ -140,6 +141,7 @@ test-e2e: kind
# Example:
# - `make build.controller`: will build the cmd/controller directory.
# - `make build.extproc`: will build the cmd/extproc directory.
# - `make build.extproc_custom_router CMD_PATH_PREFIX=examples`: will build the examples/extproc_custom_router directory.
# - `make build.testupstream CMD_PATH_PREFIX=tests`: will build the tests/testupstream directory.
#
# By default, this will build for the current GOOS and GOARCH.
Expand Down
77 changes: 2 additions & 75 deletions cmd/extproc/main.go
Original file line number Diff line number Diff line change
@@ -1,78 +1,5 @@
package main

import (
"context"
"flag"
"log"
"log/slog"
"net"
"os"
"os/signal"
"syscall"
"time"
import "github.com/envoyproxy/ai-gateway/cmd/extproc/mainlib"

extprocv3 "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/envoyproxy/ai-gateway/internal/extproc"
"github.com/envoyproxy/ai-gateway/internal/version"
)

var (
configPath = flag.String("configPath", "", "path to the configuration file. "+
"The file must be in YAML format specified in extprocconfig.Config type. The configuration file is watched for changes.")
// TODO: unix domain socket support.
extProcPort = flag.String("extProcPort", ":1063", "gRPC port for the external processor")
logLevel = flag.String("logLevel", "info", "log level")
)

func main() {
flag.Parse()

var level slog.Level
if err := level.UnmarshalText([]byte(*logLevel)); err != nil {
log.Fatalf("failed to unmarshal log level: %v", err)
}
l := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
Level: level,
}))

l.Info("starting external processor", slog.String("version", version.Version))

if *configPath == "" {
log.Fatal("configPath must be provided")
}

ctx, cancel := context.WithCancel(context.Background())
signalsChan := make(chan os.Signal, 1)
signal.Notify(signalsChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-signalsChan
cancel()
}()

// TODO: unix domain socket support.
lis, err := net.Listen("tcp", *extProcPort)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}

server, err := extproc.NewServer[*extproc.Processor](l, extproc.NewProcessor)
if err != nil {
log.Fatalf("failed to create external processor server: %v", err)
}

if err := extproc.StartConfigWatcher(ctx, *configPath, server, l, time.Second*5); err != nil {
log.Fatalf("failed to start config watcher: %v", err)
}

s := grpc.NewServer()
extprocv3.RegisterExternalProcessorServer(s, server)
grpc_health_v1.RegisterHealthServer(s, server)
go func() {
<-ctx.Done()
s.GracefulStop()
}()
_ = s.Serve(lis)
}
func main() { mainlib.Main() }
80 changes: 80 additions & 0 deletions cmd/extproc/mainlib/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package mainlib

import (
"context"
"flag"
"log"
"log/slog"
"net"
"os"
"os/signal"
"syscall"
"time"

extprocv3 "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/envoyproxy/ai-gateway/internal/extproc"
"github.com/envoyproxy/ai-gateway/internal/version"
)

var (
configPath = flag.String("configPath", "", "path to the configuration file. "+
"The file must be in YAML format specified in extprocconfig.Config type. The configuration file is watched for changes.")
// TODO: unix domain socket support.
extProcPort = flag.String("extProcPort", ":1063", "gRPC port for the external processor")
logLevel = flag.String("logLevel", "info", "log level")
)

// Main is a main function for the external processor exposed
// for allowing users to build their own external processor.
func Main() {
flag.Parse()

var level slog.Level
if err := level.UnmarshalText([]byte(*logLevel)); err != nil {
log.Fatalf("failed to unmarshal log level: %v", err)
}
l := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
Level: level,
}))

l.Info("starting external processor", slog.String("version", version.Version))

if *configPath == "" {
log.Fatal("configPath must be provided")
}

ctx, cancel := context.WithCancel(context.Background())
signalsChan := make(chan os.Signal, 1)
signal.Notify(signalsChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-signalsChan
cancel()
}()

// TODO: unix domain socket support.
lis, err := net.Listen("tcp", *extProcPort)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}

server, err := extproc.NewServer[*extproc.Processor](l, extproc.NewProcessor)
if err != nil {
log.Fatalf("failed to create external processor server: %v", err)
}

if err := extproc.StartConfigWatcher(ctx, *configPath, server, l, time.Second*5); err != nil {
log.Fatalf("failed to start config watcher: %v", err)
}

s := grpc.NewServer()
extprocv3.RegisterExternalProcessorServer(s, server)
grpc_health_v1.RegisterHealthServer(s, server)
go func() {
<-ctx.Done()
s.GracefulStop()
}()
_ = s.Serve(lis)
}
1 change: 1 addition & 0 deletions examples/extproc_custom_router/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
This example shows how to insert a custom router in the custom external process using `filterconfig` package.
41 changes: 41 additions & 0 deletions examples/extproc_custom_router/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package main

import (
"fmt"

"github.com/envoyproxy/ai-gateway/cmd/extproc/mainlib"
"github.com/envoyproxy/ai-gateway/extprocapi"
"github.com/envoyproxy/ai-gateway/filterconfig"
)

// newCustomRouter implements [extprocapi.NewCustomRouter].
func newCustomRouter(defaultRouter extprocapi.Router, config *filterconfig.Config) extprocapi.Router {
// You can poke the current configuration of the routes, and the list of backends
// specified in the AIGatewayRoute.Rules, etc.
return &myCustomRouter{config: config, defaultRouter: defaultRouter}
}

// myCustomRouter implements [extprocapi.Router].
type myCustomRouter struct {
config *filterconfig.Config
defaultRouter extprocapi.Router
}

// Calculate implements [extprocapi.Router.Calculate].
func (m *myCustomRouter) Calculate(headers map[string]string) (backend *filterconfig.Backend, err error) {
// Simply logs the headers and delegates the calculation to the default router.
modelName, ok := headers[m.config.ModelNameHeaderKey]
if !ok {
panic("model name not found in the headers")
}
fmt.Printf("model name: %s\n", modelName)
return m.defaultRouter.Calculate(headers)
}

// This demonstrates how to build a custom router for the external processor.
func main() {
// Initializes the custom router.
extprocapi.NewCustomRouter = newCustomRouter
// Executes the main function of the external processor.
mainlib.Main()
}
29 changes: 29 additions & 0 deletions extprocapi/exptorcapi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Package extprocapi is for building a custom external process.
package extprocapi

import "github.com/envoyproxy/ai-gateway/filterconfig"

// NewCustomRouter is the function to create a custom router over the default router.
// This is nil by default and can be set by the custom build of external processor.
var NewCustomRouter NewCustomRouterFn

// NewCustomRouterFn is the function signature for [NewCustomRouter].
//
// It accepts the exptproc config passed to the AI Gateway filter and returns a [Router].
// This is called when the new configuration is loaded.
//
// The defaultRouter can be used to delegate the calculation to the default router implementation.
type NewCustomRouterFn func(defaultRouter Router, config *filterconfig.Config) Router

// Router is the interface for the router.
//
// Router must be goroutine-safe as it is shared across multiple requests.
type Router interface {
// Calculate determines the backend to route to based on the request headers.
//
// The request headers include the populated [filterconfig.Config.ModelNameHeaderKey]
// with the parsed model name based on the [filterconfig.Config] given to the NewCustomRouterFn.
//
// Returns the backend.
Calculate(requestHeaders map[string]string) (backend *filterconfig.Backend, err error)
}
3 changes: 2 additions & 1 deletion internal/extproc/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/grpc/metadata"

"github.com/envoyproxy/ai-gateway/extprocapi"
"github.com/envoyproxy/ai-gateway/filterconfig"
"github.com/envoyproxy/ai-gateway/internal/extproc/router"
"github.com/envoyproxy/ai-gateway/internal/extproc/translator"
Expand All @@ -19,7 +20,7 @@ import (
var (
_ ProcessorIface = &mockProcessor{}
_ translator.Translator = &mockTranslator{}
_ router.Router = &mockRouter{}
_ extprocapi.Router = &mockRouter{}
)

func newMockProcessor(_ *processorConfig) *mockProcessor {
Expand Down
3 changes: 2 additions & 1 deletion internal/extproc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
extprocv3 "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
"google.golang.org/protobuf/types/known/structpb"

"github.com/envoyproxy/ai-gateway/extprocapi"
"github.com/envoyproxy/ai-gateway/filterconfig"
"github.com/envoyproxy/ai-gateway/internal/extproc/backendauth"
"github.com/envoyproxy/ai-gateway/internal/extproc/router"
Expand All @@ -22,7 +23,7 @@ import (
// This will be created by the server and passed to the processor when it detects a new configuration.
type processorConfig struct {
bodyParser router.RequestBodyParser
router router.Router
router extprocapi.Router
ModelNameHeaderKey, selectedBackendHeaderKey string
factories map[filterconfig.VersionedAPISchema]translator.Factory
backendAuthHandlers map[string]backendauth.Handler
Expand Down
23 changes: 11 additions & 12 deletions internal/extproc/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,27 @@ import (

"golang.org/x/exp/rand"

"github.com/envoyproxy/ai-gateway/extprocapi"
"github.com/envoyproxy/ai-gateway/filterconfig"
)

// Router is the interface for the router.
type Router interface {
// Calculate determines the backend to route to based on the headers.
// Returns the backend name and the output schema.
Calculate(headers map[string]string) (backend *filterconfig.Backend, err error)
}

// router implements [Router].
// router implements [extprocapi.Router].
type router struct {
rules []filterconfig.RouteRule
rng *rand.Rand
}

// NewRouter creates a new [Router] implementation for the given config.
func NewRouter(config *filterconfig.Config) (Router, error) {
return &router{rules: config.Rules, rng: rand.New(rand.NewSource(uint64(time.Now().UnixNano())))}, nil
// NewRouter creates a new [extprocapi.Router] implementation for the given config.
func NewRouter(config *filterconfig.Config, newCustomFn extprocapi.NewCustomRouterFn) (extprocapi.Router, error) {
r := &router{rules: config.Rules, rng: rand.New(rand.NewSource(uint64(time.Now().UnixNano())))}
if newCustomFn != nil {
customRouter := newCustomFn(r, config)
return customRouter, nil
}
return r, nil
}

// Calculate implements [Router.Calculate].
// Calculate implements [extprocapi.Router.Calculate].
func (r *router) Calculate(headers map[string]string) (backend *filterconfig.Backend, err error) {
var rule *filterconfig.RouteRule
for i := range r.rules {
Expand Down
29 changes: 27 additions & 2 deletions internal/extproc/router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,34 @@ import (

"github.com/stretchr/testify/require"

"github.com/envoyproxy/ai-gateway/extprocapi"
"github.com/envoyproxy/ai-gateway/filterconfig"
)

// dummyCustomRouter implements [extprocapi.Router].
type dummyCustomRouter struct{ called bool }

func (c *dummyCustomRouter) Calculate(map[string]string) (*filterconfig.Backend, error) {
c.called = true
return nil, nil
}

func TestRouter_NewRouter_Custom(t *testing.T) {
r, err := NewRouter(&filterconfig.Config{}, func(defaultRouter extprocapi.Router, config *filterconfig.Config) extprocapi.Router {
require.NotNil(t, defaultRouter)
_, ok := defaultRouter.(*router)
require.True(t, ok) // Checking if the default router is correctly passed.
return &dummyCustomRouter{}
})
require.NoError(t, err)
_, ok := r.(*dummyCustomRouter)
require.True(t, ok)

_, err = r.Calculate(nil)
require.NoError(t, err)
require.True(t, r.(*dummyCustomRouter).called)
}

func TestRouter_Calculate(t *testing.T) {
outSchema := filterconfig.VersionedAPISchema{Schema: filterconfig.APISchemaOpenAI}
_r, err := NewRouter(&filterconfig.Config{
Expand All @@ -30,7 +55,7 @@ func TestRouter_Calculate(t *testing.T) {
},
},
},
})
}, nil)
require.NoError(t, err)
r, ok := _r.(*router)
require.True(t, ok)
Expand Down Expand Up @@ -62,7 +87,7 @@ func TestRouter_Calculate(t *testing.T) {
}

func TestRouter_selectBackendFromRule(t *testing.T) {
_r, err := NewRouter(&filterconfig.Config{})
_r, err := NewRouter(&filterconfig.Config{}, nil)
require.NoError(t, err)
r, ok := _r.(*router)
require.True(t, ok)
Expand Down
3 changes: 2 additions & 1 deletion internal/extproc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"

"github.com/envoyproxy/ai-gateway/extprocapi"
"github.com/envoyproxy/ai-gateway/filterconfig"
"github.com/envoyproxy/ai-gateway/internal/extproc/backendauth"
"github.com/envoyproxy/ai-gateway/internal/extproc/router"
Expand All @@ -37,7 +38,7 @@ func (s *Server[P]) LoadConfig(config *filterconfig.Config) error {
if err != nil {
return fmt.Errorf("cannot create request body parser: %w", err)
}
rt, err := router.NewRouter(config)
rt, err := router.NewRouter(config, extprocapi.NewCustomRouter)
if err != nil {
return fmt.Errorf("cannot create router: %w", err)
}
Expand Down
Loading

0 comments on commit 3a490bf

Please sign in to comment.