diff --git a/daemon/daemon.go b/daemon/daemon.go index 5f9b7cc0..0d21c579 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -190,7 +190,7 @@ func (n *networkService) AllocIP(ctx context.Context, r *rpc.AllocIPRequest) (*r if pod.PodENI { resourceRequests = append(resourceRequests, &eni.RemoteIPRequest{}) } else { - req := &eni.LocalIPRequest{} + req := eni.NewLocalIPRequest() if pod.ERdma { req.LocalIPType = eni.LocalIPTypeERDMA } @@ -208,7 +208,7 @@ func (n *networkService) AllocIP(ctx context.Context, r *rpc.AllocIPRequest) (*r if pod.PodENI || n.ipamType == types.IPAMTypeCRD { resourceRequests = append(resourceRequests, &eni.RemoteIPRequest{}) } else { - req := &eni.LocalIPRequest{} + req := eni.NewLocalIPRequest() if len(oldRes.GetResourceItemByType(daemon.ResourceTypeENI)) == 1 { old := oldRes.GetResourceItemByType(daemon.ResourceTypeENI)[0] diff --git a/pkg/eni/local.go b/pkg/eni/local.go index 04f3b492..a861be36 100644 --- a/pkg/eni/local.go +++ b/pkg/eni/local.go @@ -11,6 +11,8 @@ import ( "sync" "time" + "github.com/go-logr/logr" + "github.com/samber/lo" "golang.org/x/time/rate" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/cache" @@ -51,6 +53,14 @@ var rateLimit = rate.Every(1 * time.Minute / 10) var _ ResourceRequest = &LocalIPRequest{} +func NewLocalIPRequest() *LocalIPRequest { + ctx, cancel := context.WithCancel(context.Background()) + return &LocalIPRequest{ + workerCtx: ctx, + cancel: cancel, + } +} + type LocalIPRequest struct { NetworkInterfaceID string LocalIPType string @@ -58,6 +68,9 @@ type LocalIPRequest struct { IPv6 netip.Addr NoCache bool // do not use cached ip + + workerCtx context.Context + cancel context.CancelFunc } func (l *LocalIPRequest) ResourceType() ResourceType { @@ -122,7 +135,9 @@ type Local struct { batchSize int cap int - allocatingV4, allocatingV6 int + allocatingV4, allocatingV6 AllocatingRequests + // danging, used for release + dangingV4, dangingV6 AllocatingRequests eni *daemon.ENI ipAllocInhibitExpireAt time.Time @@ -375,30 +390,30 @@ func (l *Local) Allocate(ctx context.Context, cni *daemon.CNI, request ResourceR return nil, nil } - lo, ok := request.(*LocalIPRequest) + localIPRequest, ok := request.(*LocalIPRequest) if !ok { return nil, []Trace{{Condition: ResourceTypeMismatch}} } - if lo.NetworkInterfaceID != "" && l.eni != nil && l.eni.ID != lo.NetworkInterfaceID { + if localIPRequest.NetworkInterfaceID != "" && l.eni != nil && l.eni.ID != localIPRequest.NetworkInterfaceID { return nil, []Trace{{Condition: NetworkInterfaceMismatch}} } - log := logf.FromContext(ctx) - log.Info(fmt.Sprintf("local request %v", lo)) + log := logr.FromContextOrDiscard(ctx) + log.Info(fmt.Sprintf("local request %v", localIPRequest)) expectV4 := 0 expectV6 := 0 if l.enableIPv4 { - if lo.NoCache { - if len(l.ipv4)+l.allocatingV4 >= l.cap { + if localIPRequest.NoCache { + if len(l.ipv4)+l.allocatingV4.Len() >= l.cap { return nil, []Trace{{Condition: Full}} } expectV4 = 1 } else { ipv4 := l.ipv4.PeekAvailable(cni.PodID) - if ipv4 == nil && len(l.ipv4)+l.allocatingV4 >= l.cap { + if ipv4 == nil && len(l.ipv4)+l.allocatingV4.Len() >= l.cap { return nil, []Trace{{Condition: Full}} } else if ipv4 == nil { expectV4 = 1 @@ -407,14 +422,14 @@ func (l *Local) Allocate(ctx context.Context, cni *daemon.CNI, request ResourceR } if l.enableIPv6 { - if lo.NoCache { - if len(l.ipv6)+l.allocatingV6 >= l.cap { + if localIPRequest.NoCache { + if len(l.ipv6)+l.allocatingV6.Len() >= l.cap { return nil, []Trace{{Condition: Full}} } expectV6 = 1 } else { ipv6 := l.ipv6.PeekAvailable(cni.PodID) - if ipv6 == nil && len(l.ipv6)+l.allocatingV6 >= l.cap { + if ipv6 == nil && len(l.ipv6)+l.allocatingV6.Len() >= l.cap { return nil, []Trace{{Condition: Full}} } else if ipv6 == nil { expectV6 = 1 @@ -427,21 +442,18 @@ func (l *Local) Allocate(ctx context.Context, cni *daemon.CNI, request ResourceR return nil, []Trace{{Condition: InsufficientVSwitchIP, Reason: fmt.Sprintf("alloc inhibit, expire at %s", l.ipAllocInhibitExpireAt.String())}} } - l.allocatingV4 += expectV4 - l.allocatingV6 += expectV6 + for i := 0; i < expectV4; i++ { + l.allocatingV4 = append(l.allocatingV4, localIPRequest) + } + for i := 0; i < expectV6; i++ { + l.allocatingV6 = append(l.allocatingV6, localIPRequest) + } l.cond.Broadcast() respCh := make(chan *AllocResp) - go l.allocWorker(ctx, cni, lo, respCh, func() { - // current roll back ip at same time - l.allocatingV4 -= expectV4 - l.allocatingV4 = max(l.allocatingV4, 0) - l.allocatingV6 -= expectV6 - l.allocatingV6 = max(l.allocatingV6, 0) - log.Info("rollback ipv4", "ipv4", expectV4) - }) + go l.allocWorker(ctx, cni, localIPRequest, respCh) return respCh, nil } @@ -461,7 +473,7 @@ func (l *Local) Release(ctx context.Context, cni *daemon.CNI, request NetworkRes return false, nil } - log := logf.FromContext(ctx) + log := logr.FromContextOrDiscard(ctx) if res.IP.IPv4.IsValid() { l.ipv4.Release(cni.PodID, res.IP.IPv4) @@ -509,13 +521,23 @@ func (l *Local) Priority() int { } // allocWorker started with each Allocate call -func (l *Local) allocWorker(ctx context.Context, cni *daemon.CNI, request *LocalIPRequest, respCh chan *AllocResp, onErrLocked func()) { +func (l *Local) allocWorker(ctx context.Context, cni *daemon.CNI, request *LocalIPRequest, respCh chan *AllocResp) { done := make(chan struct{}) defer close(done) l.cond.L.Lock() defer l.cond.L.Unlock() + defer func() { + if request == nil { + return + } + request.cancel() + + l.switchIPv4(request) + l.switchIPv6(request) + }() + go func() { select { case <-ctx.Done(): @@ -526,13 +548,11 @@ func (l *Local) allocWorker(ctx context.Context, cni *daemon.CNI, request *Local } }() - log := logf.FromContext(ctx) + log := logr.FromContextOrDiscard(ctx) for { select { case <-ctx.Done(): // parent cancel the context, so close the ch - onErrLocked() - close(respCh) return default: @@ -592,7 +612,7 @@ func (l *Local) allocWorker(ctx context.Context, cni *daemon.CNI, request *Local func (l *Local) factoryAllocWorker(ctx context.Context) { l.cond.L.Lock() - log := logf.FromContext(ctx) + log := logr.FromContextOrDiscard(ctx) for { if log.V(4).Enabled() { log.V(4).Info("call allocWorker") @@ -605,7 +625,7 @@ func (l *Local) factoryAllocWorker(ctx context.Context) { default: } - if l.allocatingV4 <= 0 && l.allocatingV6 <= 0 { + if l.allocatingV4.Len() <= 0 && l.allocatingV6.Len() <= 0 { l.cond.Wait() continue } @@ -629,8 +649,8 @@ func (l *Local) factoryAllocWorker(ctx context.Context) { if l.eni == nil { // create eni - v4Count := min(l.batchSize, max(l.allocatingV4, 1)) - v6Count := min(l.batchSize, l.allocatingV6) + v4Count := min(l.batchSize, max(l.allocatingV4.Len(), 1)) + v6Count := min(l.batchSize, l.allocatingV6.Len()) l.status = statusCreating l.cond.L.Unlock() @@ -669,11 +689,8 @@ func (l *Local) factoryAllocWorker(ctx context.Context) { l.eni = eni - l.allocatingV4 -= v4Count - l.allocatingV6 -= v6Count - - l.allocatingV4 = max(l.allocatingV4, 0) - l.allocatingV6 = max(l.allocatingV6, 0) + l.popNIPv4Jobs(v4Count) + l.popNIPv6Jobs(v6Count) primary, err := netip.ParseAddr(eni.PrimaryIP.IPv4.String()) if err == nil { @@ -693,8 +710,8 @@ func (l *Local) factoryAllocWorker(ctx context.Context) { l.status = statusInUse } else { eniID := l.eni.ID - v4Count := min(l.batchSize, l.allocatingV4) - v6Count := min(l.batchSize, l.allocatingV6) + v4Count := min(l.batchSize, l.allocatingV4.Len()) + v6Count := min(l.batchSize, l.allocatingV6.Len()) if v4Count > 0 { l.cond.L.Unlock() @@ -718,8 +735,7 @@ func (l *Local) factoryAllocWorker(ctx context.Context) { continue } - l.allocatingV4 -= len(ipv4Set) - l.allocatingV4 = max(l.allocatingV4, 0) + l.popNIPv4Jobs(len(ipv4Set)) l.ipv4.PutValid(ipv4Set...) @@ -751,8 +767,7 @@ func (l *Local) factoryAllocWorker(ctx context.Context) { continue } - l.allocatingV6 -= len(ipv6Set) - l.allocatingV6 = max(l.allocatingV6, 0) + l.popNIPv6Jobs(len(ipv6Set)) l.ipv6.PutValid(ipv6Set...) @@ -841,7 +856,7 @@ func (l *Local) Dispose(n int) int { func (l *Local) factoryDisposeWorker(ctx context.Context) { l.cond.L.Lock() - log := logf.FromContext(ctx) + log := logr.FromContextOrDiscard(ctx) for { select { case <-ctx.Done(): @@ -1062,3 +1077,68 @@ func parseResourceID(id string) (string, string, error) { } return parts[0], parts[1], nil } + +func (l *Local) switchIPv4(req *LocalIPRequest) { + found := false + l.allocatingV4 = lo.Filter(l.allocatingV4, func(item *LocalIPRequest, index int) bool { + if item != req { + // true to keep + return true + } + found = true + return false + }) + if !found { + return + } + + if l.dangingV4.Len() == 0 { + // this may not happen + // call the Len() to make sure canceled job will be removed + return + } + l.allocatingV4 = append(l.allocatingV4, l.dangingV4[0]) + l.dangingV4 = l.dangingV4[1:] +} + +func (l *Local) switchIPv6(req *LocalIPRequest) { + found := false + l.allocatingV6 = lo.Filter(l.allocatingV6, func(item *LocalIPRequest, index int) bool { + if item != req { + // true to keep + return true + } + found = true + return false + }) + if !found { + return + } + + if l.dangingV6.Len() == 0 { + // this may not happen + return + } + l.allocatingV6 = append(l.allocatingV6, l.dangingV6[0]) + l.dangingV6 = l.dangingV6[1:] +} + +func (l *Local) popNIPv4Jobs(count int) { + firstPart, secondPart := Split(l.allocatingV4, count) + l.dangingV4 = append(l.dangingV4, firstPart...) + l.allocatingV4 = secondPart +} + +func (l *Local) popNIPv6Jobs(count int) { + firstPart, secondPart := Split(l.allocatingV6, count) + l.dangingV6 = append(l.dangingV6, firstPart...) + l.allocatingV6 = secondPart +} + +func Split[T any](arr []T, index int) ([]T, []T) { + if index < 0 || index > len(arr) { + return arr, nil + } + + return arr[:index], arr[index:] +} diff --git a/pkg/eni/local_test.go b/pkg/eni/local_test.go index 6426ddf4..800de640 100644 --- a/pkg/eni/local_test.go +++ b/pkg/eni/local_test.go @@ -14,6 +14,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "github.com/AliyunContainerService/terway/pkg/factory" + factorymocks "github.com/AliyunContainerService/terway/pkg/factory/mocks" "github.com/AliyunContainerService/terway/types" "github.com/AliyunContainerService/terway/types/daemon" ) @@ -124,7 +125,7 @@ func TestLocal_AllocWorker_EnableIPv4(t *testing.T) { cni := &daemon.CNI{PodID: "pod-1"} respCh := make(chan *AllocResp) - go local.allocWorker(context.Background(), cni, nil, respCh, func() {}) + go local.allocWorker(context.Background(), cni, nil, respCh) go func() { local.cond.L.Lock() @@ -149,7 +150,7 @@ func TestLocal_AllocWorker_EnableIPv6(t *testing.T) { cni := &daemon.CNI{PodID: "pod-1"} respCh := make(chan *AllocResp) - go local.allocWorker(context.Background(), cni, nil, respCh, func() {}) + go local.allocWorker(context.Background(), cni, nil, respCh) go func() { local.cond.L.Lock() @@ -175,7 +176,7 @@ func TestLocal_AllocWorker_ParentCancelContext(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) respCh := make(chan *AllocResp) - go local.allocWorker(ctx, cni, nil, respCh, func() {}) + go local.allocWorker(ctx, cni, nil, respCh) cancel() @@ -214,7 +215,8 @@ func TestLocal_DisposeWholeENI(t *testing.T) { func TestLocal_Allocate_NoCache(t *testing.T) { local := NewLocalTest(&daemon.ENI{ID: "eni-1"}, nil, &types.PoolConfig{MaxIPPerENI: 2, EnableIPv4: true}, "") - request := &LocalIPRequest{NoCache: true} + request := NewLocalIPRequest() + request.NoCache = true cni := &daemon.CNI{PodID: "pod-1"} local.ipv4.Add(NewValidIP(netip.MustParseAddr("192.0.2.1"), false)) @@ -225,6 +227,22 @@ func TestLocal_Allocate_NoCache(t *testing.T) { assert.Equal(t, 1, len(resp)) } +func TestLocal_Allocate_NoCache_AllocSuccess(t *testing.T) { + local := NewLocalTest(&daemon.ENI{ID: "eni-1"}, nil, &types.PoolConfig{ + MaxIPPerENI: 10, EnableIPv4: true, EnableIPv6: true}, "") + + request := NewLocalIPRequest() + request.NoCache = true + cni := &daemon.CNI{PodID: "pod-1"} + + local.ipv4.Add(NewValidIP(netip.MustParseAddr("192.0.2.1"), false)) + local.ipv4.Add(NewValidIP(netip.MustParseAddr("192.0.2.2"), false)) + + ch, resp := local.Allocate(context.Background(), cni, request) + assert.NotNil(t, ch) + assert.Equal(t, 0, len(resp)) +} + func TestLocal_DisposeWholeERDMA(t *testing.T) { local := NewLocalTest(&daemon.ENI{ID: "eni-1"}, nil, &types.PoolConfig{}, "erdma") local.status = statusInUse @@ -239,7 +257,8 @@ func TestLocal_DisposeWholeERDMA(t *testing.T) { func TestLocal_Allocate_ERDMA(t *testing.T) { localErdma := NewLocalTest(&daemon.ENI{ID: "eni-1"}, nil, &types.PoolConfig{MaxIPPerENI: 2, EnableIPv4: true}, "erdma") - request := &LocalIPRequest{NoCache: true} + request := NewLocalIPRequest() + request.NoCache = true cni := &daemon.CNI{PodID: "pod-1"} localErdma.ipv4.Add(NewValidIP(netip.MustParseAddr("192.0.2.1"), false)) @@ -250,7 +269,9 @@ func TestLocal_Allocate_ERDMA(t *testing.T) { assert.Equal(t, 1, len(resp)) assert.Equal(t, ResourceTypeMismatch, resp[0].Condition) - request = &LocalIPRequest{NoCache: true, LocalIPType: LocalIPTypeERDMA} + request = NewLocalIPRequest() + request.NoCache = true + request.LocalIPType = LocalIPTypeERDMA _, resp = localErdma.Allocate(context.Background(), cni, request) assert.Equal(t, 1, len(resp)) @@ -260,13 +281,16 @@ func TestLocal_Allocate_ERDMA(t *testing.T) { local.ipv4.Add(NewValidIP(netip.MustParseAddr("192.0.2.1"), false)) local.ipv4.Add(NewValidIP(netip.MustParseAddr("192.0.2.2"), false)) - request = &LocalIPRequest{NoCache: true} + request = NewLocalIPRequest() + request.NoCache = true _, resp = local.Allocate(context.Background(), cni, request) assert.Equal(t, 1, len(resp)) assert.NotEqual(t, ResourceTypeMismatch, resp[0].Condition) - request = &LocalIPRequest{NoCache: true, LocalIPType: LocalIPTypeERDMA} + request = NewLocalIPRequest() + request.NoCache = true + request.LocalIPType = LocalIPTypeERDMA _, resp = local.Allocate(context.Background(), cni, request) assert.Equal(t, 1, len(resp)) @@ -276,7 +300,7 @@ func TestLocal_Allocate_ERDMA(t *testing.T) { func TestLocal_Allocate_Inhibit(t *testing.T) { local := NewLocalTest(&daemon.ENI{ID: "eni-1"}, nil, &types.PoolConfig{MaxIPPerENI: 2, EnableIPv4: true}, "") - request := &LocalIPRequest{} + request := NewLocalIPRequest() cni := &daemon.CNI{PodID: "pod-1"} local.ipAllocInhibitExpireAt = time.Now().Add(time.Minute) @@ -343,3 +367,201 @@ func Test_orphanIP(t *testing.T) { v, _ = invalidIPCache.Get(netip.MustParseAddr("127.0.0.2")) assert.Equal(t, 2, v) } + +func Test_switchIPv4(t *testing.T) { + l := &Local{} + + req := NewLocalIPRequest() + l.allocatingV4 = append(l.allocatingV4, + req, + NewLocalIPRequest(), + ) + + l.dangingV4 = append(l.dangingV4, NewLocalIPRequest()) + + l.switchIPv4(req) + assert.Equal(t, 2, l.allocatingV4.Len()) + assert.Equal(t, 0, l.dangingV4.Len()) +} + +func Test_switchIPv6(t *testing.T) { + l := &Local{} + + req := NewLocalIPRequest() + l.allocatingV6 = append(l.allocatingV6, + req, + NewLocalIPRequest(), + ) + + l.switchIPv6(req) + assert.Equal(t, 1, l.allocatingV6.Len()) + assert.Equal(t, 0, l.dangingV6.Len()) +} + +func TestPopNIPv4JobsMovesCorrectNumberOfJobs(t *testing.T) { + l := &Local{ + allocatingV4: AllocatingRequests{NewLocalIPRequest(), NewLocalIPRequest(), NewLocalIPRequest()}, + dangingV4: AllocatingRequests{}, + } + + l.popNIPv4Jobs(2) + + assert.Len(t, l.allocatingV4, 1) + assert.Len(t, l.dangingV4, 2) +} + +func TestPopNIPv6JobsMovesCorrectNumberOfJobs(t *testing.T) { + l := &Local{ + allocatingV6: AllocatingRequests{NewLocalIPRequest(), NewLocalIPRequest(), NewLocalIPRequest()}, + dangingV6: AllocatingRequests{}, + } + + l.popNIPv6Jobs(2) + + assert.Len(t, l.allocatingV6, 1) + assert.Len(t, l.dangingV6, 2) +} + +func TestPopNIPv6JobsMovesAllJobsWhenCountExceeds(t *testing.T) { + l := &Local{ + allocatingV6: AllocatingRequests{NewLocalIPRequest(), NewLocalIPRequest()}, + dangingV6: AllocatingRequests{}, + } + + l.popNIPv6Jobs(5) + + assert.Len(t, l.allocatingV6, 0) + assert.Len(t, l.dangingV6, 2) +} + +func TestPopNIPv6JobsMovesNoJobsWhenCountIsZero(t *testing.T) { + l := &Local{ + allocatingV6: AllocatingRequests{NewLocalIPRequest(), NewLocalIPRequest()}, + dangingV6: AllocatingRequests{}, + } + + l.popNIPv6Jobs(0) + + assert.Len(t, l.allocatingV6, 2) + assert.Len(t, l.dangingV6, 0) +} + +func TestPriorityReturnsNegativeWhenStatusIsDeleting(t *testing.T) { + l := &Local{ + cond: sync.NewCond(&sync.Mutex{}), + status: statusDeleting, + } + + prio := l.Priority() + + assert.Equal(t, -100, prio) +} + +func TestPriorityReturnsZeroWhenStatusIsInit(t *testing.T) { + l := &Local{ + cond: sync.NewCond(&sync.Mutex{}), + status: statusInit, + } + + prio := l.Priority() + + assert.Equal(t, 0, prio) +} + +func TestPriorityReturnsTenWhenStatusIsCreating(t *testing.T) { + l := &Local{ + cond: sync.NewCond(&sync.Mutex{}), + status: statusCreating, + } + + prio := l.Priority() + + assert.Equal(t, 10, prio) +} + +func TestPriorityReturnsFiftyPlusIPv4CountWhenStatusIsInUseAndIPv4Enabled(t *testing.T) { + l := &Local{ + cond: sync.NewCond(&sync.Mutex{}), + status: statusInUse, + enableIPv4: true, + ipv4: Set{netip.MustParseAddr("192.0.2.1"): &IP{}}, + } + + prio := l.Priority() + + assert.Equal(t, 51, prio) +} + +func TestPriorityReturnsFiftyPlusIPv6CountWhenStatusIsInUseAndIPv6Enabled(t *testing.T) { + l := &Local{ + cond: sync.NewCond(&sync.Mutex{}), + status: statusInUse, + enableIPv6: true, + ipv6: Set{netip.MustParseAddr("fd00:46dd:e::1"): &IP{}}, + } + + prio := l.Priority() + + assert.Equal(t, 51, prio) +} + +func TestPriorityReturnsFiftyPlusIPv4AndIPv6CountWhenStatusIsInUseAndBothEnabled(t *testing.T) { + l := &Local{ + cond: sync.NewCond(&sync.Mutex{}), + status: statusInUse, + enableIPv4: true, + enableIPv6: true, + ipv4: Set{netip.MustParseAddr("192.0.2.1"): &IP{}}, + ipv6: Set{netip.MustParseAddr("fd00:46dd:e::1"): &IP{}}, + } + + prio := l.Priority() + + assert.Equal(t, 52, prio) +} + +func TestAllocFromFactory(t *testing.T) { + // 1. test factory worker finish req1, and alloc worker consumed req2 + + f := factorymocks.NewFactory(t) + // even we have two jobs ,we only get one ip + f.On("AssignNIPv4", "eni-1", 2, "").Return([]netip.Addr{netip.MustParseAddr("192.0.2.1")}, nil) + f.On("AssignNIPv6", "eni-1", 2, "").Return([]netip.Addr{netip.MustParseAddr("fd00::1")}, nil) + f.On("AssignNIPv4", "eni-1", 1, "").Return(nil, nil) + f.On("AssignNIPv6", "eni-1", 1, "").Return(nil, nil) + + local := NewLocalTest(&daemon.ENI{ID: "eni-1"}, f, &types.PoolConfig{ + EnableIPv4: true, + EnableIPv6: true, + BatchSize: 10, + }, "") + local.status = statusInUse + + req1 := NewLocalIPRequest() + req2 := NewLocalIPRequest() + + local.allocatingV4 = append(local.allocatingV4, req1, req2) + local.allocatingV6 = append(local.allocatingV6, req1, req2) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // expect req1 is moved to danging + go local.factoryAllocWorker(ctx) + + local.cond.Broadcast() + req2Ch := make(chan *AllocResp) + go local.allocWorker(ctx, &daemon.CNI{}, req2, req2Ch) + + <-req2Ch + // worker may not exist + time.Sleep(time.Second) + + assert.Equal(t, 1, len(local.allocatingV4)) + assert.Equal(t, 0, len(local.dangingV4)) + assert.Equal(t, 1, len(local.allocatingV6)) + assert.Equal(t, 0, len(local.dangingV6)) + + // check the job is switched + assert.Equal(t, req1, local.allocatingV4[0]) + assert.Equal(t, req1, local.allocatingV6[0]) +} diff --git a/pkg/eni/manager.go b/pkg/eni/manager.go index c1ace866..71e14642 100644 --- a/pkg/eni/manager.go +++ b/pkg/eni/manager.go @@ -332,9 +332,12 @@ func (m *Manager) syncPool(ctx context.Context) { go func() { defer wg.Done() + req := NewLocalIPRequest() + req.NoCache = true + _, err := m.Allocate(ctx, &daemon.CNI{}, &AllocRequest{ ResourceRequests: []ResourceRequest{ - &LocalIPRequest{NoCache: true}, + req, }, }) if err != nil { diff --git a/pkg/eni/manager_test.go b/pkg/eni/manager_test.go index 22d532a7..b3cded83 100644 --- a/pkg/eni/manager_test.go +++ b/pkg/eni/manager_test.go @@ -86,8 +86,9 @@ func TestManagerAllocateReturnsResourcesWhenSuccessful(t *testing.T) { mockNI := &success{} manager := NewManager(0, 0, 0, 0, []NetworkInterface{mockNI}, types.EniSelectionPolicyMostIPs, &FakeK8s{}) + request := NewLocalIPRequest() resources, err := manager.Allocate(context.Background(), &daemon.CNI{}, &AllocRequest{ - ResourceRequests: []ResourceRequest{&LocalIPRequest{}}, + ResourceRequests: []ResourceRequest{request}, }) assert.Nil(t, err) @@ -109,8 +110,9 @@ func TestManagerAllocateSelectionPolicy(t *testing.T) { { manager := NewManager(0, 0, 0, 0, []NetworkInterface{mockNI, mockNI2}, types.EniSelectionPolicyMostIPs, &FakeK8s{}) + request := NewLocalIPRequest() resources, err := manager.Allocate(context.Background(), &daemon.CNI{}, &AllocRequest{ - ResourceRequests: []ResourceRequest{&LocalIPRequest{}}, + ResourceRequests: []ResourceRequest{request}, }) assert.Nil(t, err) @@ -121,8 +123,9 @@ func TestManagerAllocateSelectionPolicy(t *testing.T) { { manager := NewManager(0, 0, 0, 0, []NetworkInterface{mockNI, mockNI2}, types.EniSelectionPolicyLeastIPs, &FakeK8s{}) + request := NewLocalIPRequest() resources, err := manager.Allocate(context.Background(), &daemon.CNI{}, &AllocRequest{ - ResourceRequests: []ResourceRequest{&LocalIPRequest{}}, + ResourceRequests: []ResourceRequest{request}, }) assert.Nil(t, err) @@ -134,8 +137,9 @@ func TestManagerAllocateSelectionPolicy(t *testing.T) { func TestManagerAllocateReturnsErrorWhenNoBackendCanHandleAllocation(t *testing.T) { manager := NewManager(0, 0, 0, 0, []NetworkInterface{}, types.EniSelectionPolicyMostIPs, &FakeK8s{}) + request := NewLocalIPRequest() _, err := manager.Allocate(context.Background(), &daemon.CNI{}, &AllocRequest{ - ResourceRequests: []ResourceRequest{&LocalIPRequest{}}, + ResourceRequests: []ResourceRequest{request}, }) assert.NotNil(t, err) @@ -147,8 +151,10 @@ func TestManagerAllocateWithTimeoutWhenAllocationFails(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() + + request := NewLocalIPRequest() _, err := manager.Allocate(ctx, &daemon.CNI{}, &AllocRequest{ - ResourceRequests: []ResourceRequest{&LocalIPRequest{}}, + ResourceRequests: []ResourceRequest{request}, }) assert.NotNil(t, err) } diff --git a/pkg/eni/types.go b/pkg/eni/types.go index c9142c63..cc7bda9b 100644 --- a/pkg/eni/types.go +++ b/pkg/eni/types.go @@ -4,6 +4,8 @@ import ( "net/netip" "time" + "github.com/samber/lo" + "github.com/AliyunContainerService/terway/rpc" "github.com/AliyunContainerService/terway/types/daemon" ) @@ -257,3 +259,21 @@ const ( NetworkInterfaceMismatch InsufficientVSwitchIP ) + +type AllocatingRequests []*LocalIPRequest + +// Len return the valid slice size +func (a *AllocatingRequests) Len() int { + // true to keep + filtered := lo.Filter(*a, func(item *LocalIPRequest, index int) bool { + select { + case <-item.workerCtx.Done(): + return false + default: + return true + } + }) + + *a = filtered + return len(*a) +}