From 89b349af11fc71de5577c7a9102c87787dc6515f Mon Sep 17 00:00:00 2001 From: John Mcgrath Date: Sat, 30 Mar 2024 21:49:59 -0500 Subject: [PATCH] Node watch --- main.go | 60 +++++++++++++++++++++++++++--------- tests/e2e/deployment_test.go | 22 +++++++------ 2 files changed, 57 insertions(+), 25 deletions(-) diff --git a/main.go b/main.go index 917a540..37b64b3 100644 --- a/main.go +++ b/main.go @@ -52,10 +52,10 @@ var ( nodePercentageGaugeVec *prometheus.GaugeVec containerPercentageLimitsVec *prometheus.GaugeVec containerPercentageVolumeLimitsVec *prometheus.GaugeVec - nodeSlice []string maxNodeConcurrency int podResourceLookup map[string]pod podResourceLookupMutex sync.RWMutex + nodeSet mapset.Set[string] ) func getEnv(key, fallback string) string { @@ -279,6 +279,46 @@ func evictPodFromMetrics(p v1.Pod) { } } +func NodeWatch() { + nodeWaitGroup.Wait() + stopCh := make(chan struct{}) + defer close(stopCh) + sharedInformerFactory := informers.NewSharedInformerFactory(clientset, time.Duration(sampleInterval)*time.Second) + podInformer := sharedInformerFactory.Core().V1().Nodes().Informer() + + // Define event handlers for Pod events + eventHandler := cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + p := obj.(*v1.Node) + nodeSet.Add(p.Name) + }, + DeleteFunc: func(obj interface{}) { + p := obj.(*v1.Node) + nodeSet.Remove(p.Name) + evictNode(p.Name) + }, + } + + // Register the event handlers with the informer + _, err := podInformer.AddEventHandler(eventHandler) + if err != nil { + log.Err(err) + os.Exit(1) + } + + // Start the informer to begin watching for Node events + go sharedInformerFactory.Start(stopCh) + + for { + time.Sleep(time.Duration(sampleInterval) * time.Second) + select { + case <-stopCh: + log.Error().Msg("Watcher NodeWatch stopped.") + os.Exit(1) + } + } +} + func evictNode(node string) { nodeAvailableGaugeVec.DeletePartialMatch(prometheus.Labels{"node_name": node}) @@ -291,7 +331,7 @@ func evictNode(node string) { } func getNodes() { - nodeSet := mapset.NewSet[string]() + nodeSet = mapset.NewSet[string]() nodeWaitGroup.Add(1) if deployType != "Deployment" { nodeSet.Add(getEnv("CURRENT_NODE_NAME", "")) @@ -304,20 +344,8 @@ func getNodes() { for _, node := range startNodes.Items { nodeSet.Add(node.Name) } - nodeSlice = nodeSet.ToSlice() nodeWaitGroup.Done() - // Poll for new nodes - // TODO: make this more event driven instead of polling - for { - nodes, _ := clientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) - for _, node := range nodes.Items { - nodeSet.Add(node.Name) - } - nodeSlice = nodeSet.ToSlice() - time.Sleep(1 * time.Minute) - } - } func queryNode(node string) ([]byte, error) { @@ -429,7 +457,7 @@ func setMetrics(node string) { content, err := queryNode(node) if err != nil { - evictNode(node) + log.Warn().Msgf("Could not query node %s for ephemeral storage", node) return } @@ -585,6 +613,7 @@ func getMetrics() { defer p.Release() for { + nodeSlice := nodeSet.ToSlice() for _, node := range nodeSlice { _ = p.Invoke(node) @@ -657,6 +686,7 @@ func main() { go podWatch() } go getNodes() + go NodeWatch() go getMetrics() if deployType != "Deployment" && deployType != "DaemonSet" { log.Error().Msg(fmt.Sprintf("deployType must be 'Deployment' or 'DaemonSet', got %s", deployType)) diff --git a/tests/e2e/deployment_test.go b/tests/e2e/deployment_test.go index 3e31801..20d9e82 100644 --- a/tests/e2e/deployment_test.go +++ b/tests/e2e/deployment_test.go @@ -169,19 +169,20 @@ func WatchNodePercentage() { } func WatchPollingRate(pollRateUpper float64, pollingRateLower float64, timeout time.Duration) { + var currentPollRate float64 status := 0 startTime := time.Now() re := regexp.MustCompile(`ephemeral_storage_adjusted_polling_rate\{node_name="minikube"}\s+(.+)`) for { elapsed := time.Since(startTime) if elapsed >= timeout { - ginkgo.GinkgoWriter.Printf("Watch for rate polling timed out") + ginkgo.GinkgoWriter.Printf("\nFailed: \n\tephemeral_storage_adjusted_polling_rate: %f\n", currentPollRate) break } output := requestPrometheusString() match := re.FindAllStringSubmatch(output, -1) - floatValue, _ := strconv.ParseFloat(match[0][1], 64) - if pollRateUpper >= floatValue && pollingRateLower <= floatValue { + currentPollRate, _ = strconv.ParseFloat(match[0][1], 64) + if pollRateUpper >= currentPollRate && pollingRateLower <= currentPollRate { status = 1 break } @@ -189,6 +190,7 @@ func WatchPollingRate(pollRateUpper float64, pollingRateLower float64, timeout t } gomega.Expect(status).Should(gomega.Equal(1)) + ginkgo.GinkgoWriter.Printf("\nSuccess: \n\tephemeral_storage_adjusted_polling_rate: %f\n", currentPollRate) } @@ -218,7 +220,7 @@ func getContainerVolumeLimitPercentage(podName string) float64 { return currentPodSize } -func WatchEphemeralPodSize(podName string, desiredSizeChange float64, timeout time.Duration, getPodSize getPodSize) { +func WatchEphemeralSize(podName string, desiredSizeChange float64, timeout time.Duration, getPodSize getPodSize) { // Watch Prometheus Metrics until the ephemeral storage shrinks or grows to a certain desiredSizeChange. var currentPodSize float64 var targetSizeChange float64 @@ -300,7 +302,7 @@ var _ = ginkgo.Describe("Test Metrics\n", func() { }) ginkgo.Context("Observe change in ephemeral_storage_pod_usage metric\n", func() { ginkgo.Specify("\nWatch Pod grow to 100000 Bytes", func() { - WatchEphemeralPodSize("grow-test", 100000, time.Second*90, getPodUsageSize) + WatchEphemeralSize("grow-test", 100000, time.Second*180, getPodUsageSize) }) ginkgo.Specify("\nWatch Pod shrink to 100000 Bytes", func() { // Shrinking of ephemeral_storage reflects slower from Node API up to 5 minutes. @@ -314,24 +316,24 @@ var _ = ginkgo.Describe("Test Metrics\n", func() { } time.Sleep(time.Second * 5) } - WatchEphemeralPodSize("shrink-test", 100000, time.Second*180, getPodUsageSize) + WatchEphemeralSize("shrink-test", 100000, time.Second*180, getPodUsageSize) }) }) ginkgo.Context("Observe change in ephemeral_storage_container_limit_percentage metric\n", func() { ginkgo.Specify("\nWatch Pod grow to 0.2 percent", func() { - WatchEphemeralPodSize("grow-test", 0.2, time.Second*90, getContainerLimitPercentage) + WatchEphemeralSize("grow-test", 0.2, time.Second*180, getContainerLimitPercentage) }) ginkgo.Specify("\nWatch Pod shrink to 0.2 percent", func() { - WatchEphemeralPodSize("shrink-test", 0.2, time.Second*180, getContainerLimitPercentage) + WatchEphemeralSize("shrink-test", 0.2, time.Second*180, getContainerLimitPercentage) }) }) ginkgo.Context("Observe change in ephemeral_storage_container_volume_limit_percentage metric\n", func() { ginkgo.Specify("\nWatch Pod grow to 0.2 percent", func() { - WatchEphemeralPodSize("grow-test", 0.2, time.Second*90, getContainerVolumeLimitPercentage) + WatchEphemeralSize("grow-test", 0.2, time.Second*180, getContainerVolumeLimitPercentage) }) ginkgo.Specify("\nWatch Pod shrink to 0.2 percent", func() { - WatchEphemeralPodSize("shrink-test", 0.2, time.Second*180, getContainerVolumeLimitPercentage) + WatchEphemeralSize("shrink-test", 0.2, time.Second*180, getContainerVolumeLimitPercentage) }) }) ginkgo.Context("\nMake sure percentage is not over 100", func() {