Skip to content

Commit

Permalink
✅ Add rollout restart agent e2e test
Browse files Browse the repository at this point in the history
Signed-off-by: vankichi <kyukawa315@gmail.com>
  • Loading branch information
vankichi committed Jan 10, 2025
1 parent d1e7f4c commit 4ae3d97
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 0 deletions.
5 changes: 5 additions & 0 deletions Makefile.d/e2e.mk
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ e2e/index/job/correction:
e2e/readreplica:
$(call run-e2e-crud-test,-run TestE2EReadReplica)

.PHONY: e2e/rollaout/restart/agent
## run rollout-restart agent e2e
e2e/rolloout/restart/agent:
$(call run-e2e-crud-test,-run TestE2EAgentRolloutRestart)

.PHONY: e2e/maxdim
## run e2e/maxdim
e2e/maxdim:
Expand Down
11 changes: 11 additions & 0 deletions hoge.txt

Large diffs are not rendered by default.

110 changes: 110 additions & 0 deletions tests/e2e/crud/crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"os"
"os/exec"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -987,3 +988,112 @@ func TestE2EReadReplica(t *testing.T) {
t.Fatalf("an error occurred: %s", err)
}
}

// TestE2EReadReplica tests that search requests succeed with read replica resources.
func TestE2EAgentRolloutRestart(t *testing.T) {
t.Cleanup(teardown)

if kubeClient == nil {
var err error
kubeClient, err = client.New(kubeConfig)
if err != nil {
t.Skipf("TestE2EReadReplica needs kubernetes client but failed to create one: %s", err)
}
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

op, err := operation.New(host, port)
if err != nil {
t.Fatalf("an error occurred: %s", err)
}

err = op.Insert(t, ctx, operation.Dataset{
Train: ds.Train[insertFrom : insertFrom+insertNum],
})
if err != nil {
t.Fatalf("an error occurred: %s", err)
}

sleep(t, waitAfterInsertDuration)

// TODO Dipatch Search Inf-Loop
searchFunc := func() {
_ = op.Search(t, ctx, operation.Dataset{
Test: ds.Test[searchFrom : searchFrom+searchNum],
Neighbors: ds.Neighbors[searchFrom : searchFrom+searchNum],
})
}

wg := sync.WaitGroup{}
wg.Add(1)
done := make(chan struct{})
go func() {
defer wg.Done()
for {
searchFunc()
time.Sleep(1 * time.Second)
select {
case <-done:
return
default:
}
}
}()
kubectl.RolloutRestart(ctx, t, "statefulset", "vald-agent")

// Wait for StatefulSet to be ready
t.Log("waiting for agent pods ready...")
swg := sync.WaitGroup{}
swg.Add(1)
go func() {
defer swg.Done()
for {
ok, err := kubeClient.WaitForStatefulSetReady(ctx, namespace, "vald-agent", 10*time.Minute)
if err != nil {
t.Fatalf("an error occurred: %s", err)
}
if ok {
return
}
continue
}
}()
swg.Wait()

cnt, err := op.IndexInfo(t, ctx)
if err != nil {
t.Fatalf("an error occurred: count = %d, err = %s", cnt.Stored, err)
}

// err = op.Exists(t, ctx, "0")
// if err != nil {
// t.Fatalf("an error occurred: %s", err)
// }
//
// err = op.GetObject(t, ctx, operation.Dataset{
// Train: ds.Train[getObjectFrom : getObjectFrom+getObjectNum],
// })
// if err != nil {
// t.Fatalf("an error occurred: %s", err)
// }
//
// err = op.Remove(t, ctx, operation.Dataset{
// Train: ds.Train[removeFrom : removeFrom+removeNum],
// })
// if err != nil {
// t.Fatalf("an error occurred: %s", err)
// }
//
// // Remove all vector data after the current - 1 hour.
// err = op.RemoveByTimestamp(t, ctx, time.Now().Add(-time.Hour).UnixNano())
// if err != nil {
// t.Fatalf("an error occurred: %s", err)
// }
time.AfterFunc(5*time.Second, func() {
close(done)
fmt.Println("canceling all goroutines")
})
wg.Wait()
}
30 changes: 30 additions & 0 deletions tests/e2e/kubernetes/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ type Client interface {
name, namespace string,
cronJob *v1.CronJob,
) error
WaitForStatefulSetReady(
ctx context.Context,
namespace, name string,
timeout time.Duration,
) (ok bool, err error)
}

type client struct {
Expand Down Expand Up @@ -201,3 +206,28 @@ func (cli *client) CreateJobFromCronJob(
_, err := cli.clientset.BatchV1().Jobs(namespace).Create(ctx, job, metav1.CreateOptions{})
return err
}

func (cli *client) WaitForStatefulSetReady(
ctx context.Context, namespace, name string, timeout time.Duration,
) (ok bool, err error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

tick := time.NewTicker(time.Second)
defer tick.Stop()

for {
ss, err := cli.clientset.AppsV1().StatefulSets(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return false, err
}
if ss.Status.UpdatedReplicas == ss.Status.Replicas && ss.Status.ReadyReplicas == ss.Status.Replicas {
return true, nil
}
select {
case <-ctx.Done():
return false, ctx.Err()
case <-tick.C:
}
}
}
10 changes: 10 additions & 0 deletions tests/e2e/kubernetes/kubectl/kubectl.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ func RolloutResource(ctx context.Context, t *testing.T, resource string) error {
return runCmd(t, cmd)
}

func RolloutRestart(ctx context.Context, t *testing.T, resource string, name string) error {
t.Helper()

cmd := exec.CommandContext(ctx, "kubectl", "rollout", "restart", resource, name)
if err := runCmd(t, cmd); err != nil {
return err
}
return runCmd(t, cmd)
}

// WaitResources waits for multiple resources to be ready.
func WaitResources(
ctx context.Context, t *testing.T, resource, labelSelector, condition, timeout string,
Expand Down

0 comments on commit 4ae3d97

Please sign in to comment.