Skip to content

Commit

Permalink
Fix unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
nstogner committed Dec 7, 2024
1 parent 0a6470b commit 4f42cde
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 19 deletions.
7 changes: 5 additions & 2 deletions internal/loadbalancer/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ import (
func newEndpointGroup() *group {
g := &group{
endpoints: make(map[string]endpoint),
totalInFlight: &atomic.Int64{},
chwblReplication: 100,
chwblHashes: map[uint64]string{},
chwblSortedHashes: []uint64{},
bcast: make(chan struct{}),
}
return g
}
Expand Down Expand Up @@ -103,8 +105,8 @@ func (g *group) getAllAddrs() []string {
defer g.mtx.RUnlock()

var hosts []string
for ip := range g.endpoints {
hosts = append(hosts, ip)
for _, ep := range g.endpoints {
hosts = append(hosts, ep.address)
}

return hosts
Expand All @@ -115,6 +117,7 @@ func (g *group) reconcileEndpoints(observed map[string]endpoint) {
for name, observedEp := range observed {
if currentEp, ok := g.endpoints[name]; ok {
currentEp.adapters = observedEp.adapters
g.endpoints[name] = currentEp
} else {
g.endpoints[name] = endpoint{
inFlight: &atomic.Int64{},
Expand Down
31 changes: 23 additions & 8 deletions internal/loadbalancer/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,46 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "github.com/substratusai/kubeai/api/v1"

"k8s.io/apimachinery/pkg/util/rand"
)

func TestConcurrentAccess(t *testing.T) {
const myModel = "myModel"
const (
myModel = "myModel"
myAddr = "10.0.0.1:8000"
)

testCases := map[string]struct {
readerCount int
writerCount int
}{
"lot of reader": {readerCount: 1_000, writerCount: 1},
"lot of writer": {readerCount: 1, writerCount: 1_000},
"lot of both": {readerCount: 1_000, writerCount: 1_000},
"one reader_one_writer": {readerCount: 1, writerCount: 1},
"lot of reader": {readerCount: 1_000, writerCount: 1},
"lot of writer": {readerCount: 1, writerCount: 1_000},
"lot of both": {readerCount: 1_000, writerCount: 1_000},
}
for name, spec := range testCases {
randomReadFn := []func(g *group){
func(g *group) { g.getBestAddr(context.Background(), AddressRequest{}, false) },
func(g *group) {
ip, f, err := g.getBestAddr(context.Background(), AddressRequest{
Model: myModel,
LoadBalancing: v1.LoadBalancing{
Strategy: v1.LeastLoadStrategy,
},
}, false)
require.NoError(t, err)
defer f()
assert.Equal(t, myAddr, ip)
},
func(g *group) { g.getAllAddrs() },
}
t.Run(name, func(t *testing.T) {
// setup endpoint with one service so that requests are not waiting
// setup endpoint with one endpoint so that requests are not waiting
group := newEndpointGroup()
group.reconcileEndpoints(
map[string]endpoint{myModel: {}},
map[string]endpoint{myModel: {address: myAddr}},
)

var startWg, doneWg sync.WaitGroup
Expand All @@ -52,7 +67,7 @@ func TestConcurrentAccess(t *testing.T) {
startTogether(spec.readerCount, func() { randomReadFn[rand.Intn(len(randomReadFn)-1)](group) })
startTogether(spec.writerCount, func() {
group.reconcileEndpoints(
map[string]endpoint{rand.String(1): {}},
map[string]endpoint{myModel: {address: myAddr}},
)
})
doneWg.Wait()
Expand Down
6 changes: 4 additions & 2 deletions internal/loadbalancer/load_balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ func TestAwaitBestHost(t *testing.T) {
myAddrWithAdapter = "10.0.0.2:8000"
)

manager := &LoadBalancer{endpoints: make(map[string]*group, 1)}

testCases := map[string]struct {
model string
adapter string
Expand Down Expand Up @@ -63,6 +61,10 @@ func TestAwaitBestHost(t *testing.T) {

for name, spec := range testCases {
t.Run(name, func(t *testing.T) {
manager := &LoadBalancer{
endpoints: make(map[string]*group, 1),
}

manager.getEndpoints(myModel).reconcileEndpoints(spec.endpoints)

ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
Expand Down
14 changes: 7 additions & 7 deletions internal/modelproxy/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,18 @@ type proxyRequest struct {
}

func newProxyRequest(r *http.Request) (*proxyRequest, error) {
pr := &proxyRequest{
http: r,
status: http.StatusOK,
}

apiReq, err := apiutils.ParseRequest(r.Body, r.Header)
if err != nil {
return nil, err
return pr, err
}
// The content length might have changed after the body was read and rewritten.
r.ContentLength = apiReq.ContentLength

pr := &proxyRequest{
Request: apiReq,
http: r,
status: http.StatusOK,
}
pr.Request = apiReq

return pr, nil
}
Expand Down

0 comments on commit 4f42cde

Please sign in to comment.