diff --git a/main.go b/main.go index 917a540..9e4c433 100644 --- a/main.go +++ b/main.go @@ -52,10 +52,10 @@ var ( nodePercentageGaugeVec *prometheus.GaugeVec containerPercentageLimitsVec *prometheus.GaugeVec containerPercentageVolumeLimitsVec *prometheus.GaugeVec - nodeSlice []string maxNodeConcurrency int podResourceLookup map[string]pod podResourceLookupMutex sync.RWMutex + nodeSet mapset.Set[string] ) func getEnv(key, fallback string) string { @@ -279,6 +279,46 @@ func evictPodFromMetrics(p v1.Pod) { } } +func NodeWatch() { + nodeWaitGroup.Wait() + stopCh := make(chan struct{}) + defer close(stopCh) + sharedInformerFactory := informers.NewSharedInformerFactory(clientset, time.Duration(sampleInterval)*time.Second) + podInformer := sharedInformerFactory.Core().V1().Nodes().Informer() + + // Define event handlers for Pod events + eventHandler := cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + p := obj.(*v1.Node) + nodeSet.Add(p.Name) + }, + DeleteFunc: func(obj interface{}) { + p := obj.(*v1.Node) + nodeSet.Remove(p.Name) + evictNode(p.Name) + }, + } + + // Register the event handlers with the informer + _, err := podInformer.AddEventHandler(eventHandler) + if err != nil { + log.Err(err) + os.Exit(1) + } + + // Start the informer to begin watching for Node events + go sharedInformerFactory.Start(stopCh) + + for { + time.Sleep(time.Duration(sampleInterval) * time.Second) + select { + case <-stopCh: + log.Error().Msg("Watcher podWatch stopped.") + os.Exit(1) + } + } +} + func evictNode(node string) { nodeAvailableGaugeVec.DeletePartialMatch(prometheus.Labels{"node_name": node}) @@ -291,7 +331,7 @@ func evictNode(node string) { } func getNodes() { - nodeSet := mapset.NewSet[string]() + nodeSet = mapset.NewSet[string]() nodeWaitGroup.Add(1) if deployType != "Deployment" { nodeSet.Add(getEnv("CURRENT_NODE_NAME", "")) @@ -304,20 +344,8 @@ func getNodes() { for _, node := range startNodes.Items { nodeSet.Add(node.Name) } - nodeSlice = nodeSet.ToSlice() nodeWaitGroup.Done() - // Poll for new nodes - // TODO: make this more event driven instead of polling - for { - nodes, _ := clientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) - for _, node := range nodes.Items { - nodeSet.Add(node.Name) - } - nodeSlice = nodeSet.ToSlice() - time.Sleep(1 * time.Minute) - } - } func queryNode(node string) ([]byte, error) { @@ -429,7 +457,7 @@ func setMetrics(node string) { content, err := queryNode(node) if err != nil { - evictNode(node) + log.Warn().Msgf("Could not query node %s for ephemeral storage", node) return } @@ -585,12 +613,11 @@ func getMetrics() { defer p.Release() for { + nodeSlice := nodeSet.ToSlice() for _, node := range nodeSlice { _ = p.Invoke(node) } - - time.Sleep(time.Duration(sampleInterval) * time.Second) } } @@ -657,6 +684,7 @@ func main() { go podWatch() } go getNodes() + go NodeWatch() go getMetrics() if deployType != "Deployment" && deployType != "DaemonSet" { log.Error().Msg(fmt.Sprintf("deployType must be 'Deployment' or 'DaemonSet', got %s", deployType)) diff --git a/tests/e2e/deployment_test.go b/tests/e2e/deployment_test.go index 3e31801..5106eda 100644 --- a/tests/e2e/deployment_test.go +++ b/tests/e2e/deployment_test.go @@ -218,7 +218,7 @@ func getContainerVolumeLimitPercentage(podName string) float64 { return currentPodSize } -func WatchEphemeralPodSize(podName string, desiredSizeChange float64, timeout time.Duration, getPodSize getPodSize) { +func WatchEphemeralSize(podName string, desiredSizeChange float64, timeout time.Duration, getPodSize getPodSize) { // Watch Prometheus Metrics until the ephemeral storage shrinks or grows to a certain desiredSizeChange. var currentPodSize float64 var targetSizeChange float64 @@ -300,7 +300,7 @@ var _ = ginkgo.Describe("Test Metrics\n", func() { }) ginkgo.Context("Observe change in ephemeral_storage_pod_usage metric\n", func() { ginkgo.Specify("\nWatch Pod grow to 100000 Bytes", func() { - WatchEphemeralPodSize("grow-test", 100000, time.Second*90, getPodUsageSize) + WatchEphemeralSize("grow-test", 100000, time.Second*180, getPodUsageSize) }) ginkgo.Specify("\nWatch Pod shrink to 100000 Bytes", func() { // Shrinking of ephemeral_storage reflects slower from Node API up to 5 minutes. @@ -314,24 +314,24 @@ var _ = ginkgo.Describe("Test Metrics\n", func() { } time.Sleep(time.Second * 5) } - WatchEphemeralPodSize("shrink-test", 100000, time.Second*180, getPodUsageSize) + WatchEphemeralSize("shrink-test", 100000, time.Second*180, getPodUsageSize) }) }) ginkgo.Context("Observe change in ephemeral_storage_container_limit_percentage metric\n", func() { ginkgo.Specify("\nWatch Pod grow to 0.2 percent", func() { - WatchEphemeralPodSize("grow-test", 0.2, time.Second*90, getContainerLimitPercentage) + WatchEphemeralSize("grow-test", 0.2, time.Second*180, getContainerLimitPercentage) }) ginkgo.Specify("\nWatch Pod shrink to 0.2 percent", func() { - WatchEphemeralPodSize("shrink-test", 0.2, time.Second*180, getContainerLimitPercentage) + WatchEphemeralSize("shrink-test", 0.2, time.Second*180, getContainerLimitPercentage) }) }) ginkgo.Context("Observe change in ephemeral_storage_container_volume_limit_percentage metric\n", func() { ginkgo.Specify("\nWatch Pod grow to 0.2 percent", func() { - WatchEphemeralPodSize("grow-test", 0.2, time.Second*90, getContainerVolumeLimitPercentage) + WatchEphemeralSize("grow-test", 0.2, time.Second*180, getContainerVolumeLimitPercentage) }) ginkgo.Specify("\nWatch Pod shrink to 0.2 percent", func() { - WatchEphemeralPodSize("shrink-test", 0.2, time.Second*180, getContainerVolumeLimitPercentage) + WatchEphemeralSize("shrink-test", 0.2, time.Second*180, getContainerVolumeLimitPercentage) }) }) ginkgo.Context("\nMake sure percentage is not over 100", func() {