Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert Node polling to Node watch #72

Merged
merged 1 commit into from
Mar 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 45 additions & 15 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ var (
nodePercentageGaugeVec *prometheus.GaugeVec
containerPercentageLimitsVec *prometheus.GaugeVec
containerPercentageVolumeLimitsVec *prometheus.GaugeVec
nodeSlice []string
maxNodeConcurrency int
podResourceLookup map[string]pod
podResourceLookupMutex sync.RWMutex
nodeSet mapset.Set[string]
)

func getEnv(key, fallback string) string {
Expand Down Expand Up @@ -279,6 +279,46 @@ func evictPodFromMetrics(p v1.Pod) {
}
}

func NodeWatch() {
nodeWaitGroup.Wait()
stopCh := make(chan struct{})
defer close(stopCh)
sharedInformerFactory := informers.NewSharedInformerFactory(clientset, time.Duration(sampleInterval)*time.Second)
podInformer := sharedInformerFactory.Core().V1().Nodes().Informer()

// Define event handlers for Pod events
eventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
p := obj.(*v1.Node)
nodeSet.Add(p.Name)
},
DeleteFunc: func(obj interface{}) {
p := obj.(*v1.Node)
nodeSet.Remove(p.Name)
evictNode(p.Name)
},
}

// Register the event handlers with the informer
_, err := podInformer.AddEventHandler(eventHandler)
if err != nil {
log.Err(err)
os.Exit(1)
}

// Start the informer to begin watching for Node events
go sharedInformerFactory.Start(stopCh)

for {
time.Sleep(time.Duration(sampleInterval) * time.Second)
select {
case <-stopCh:
log.Error().Msg("Watcher NodeWatch stopped.")
os.Exit(1)
}
}
}

func evictNode(node string) {

nodeAvailableGaugeVec.DeletePartialMatch(prometheus.Labels{"node_name": node})
Expand All @@ -291,7 +331,7 @@ func evictNode(node string) {
}

func getNodes() {
nodeSet := mapset.NewSet[string]()
nodeSet = mapset.NewSet[string]()
nodeWaitGroup.Add(1)
if deployType != "Deployment" {
nodeSet.Add(getEnv("CURRENT_NODE_NAME", ""))
Expand All @@ -304,20 +344,8 @@ func getNodes() {
for _, node := range startNodes.Items {
nodeSet.Add(node.Name)
}
nodeSlice = nodeSet.ToSlice()
nodeWaitGroup.Done()

// Poll for new nodes
// TODO: make this more event driven instead of polling
for {
nodes, _ := clientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
for _, node := range nodes.Items {
nodeSet.Add(node.Name)
}
nodeSlice = nodeSet.ToSlice()
time.Sleep(1 * time.Minute)
}

}

func queryNode(node string) ([]byte, error) {
Expand Down Expand Up @@ -429,7 +457,7 @@ func setMetrics(node string) {

content, err := queryNode(node)
if err != nil {
evictNode(node)
log.Warn().Msgf("Could not query node %s for ephemeral storage", node)
return
}

Expand Down Expand Up @@ -585,6 +613,7 @@ func getMetrics() {
defer p.Release()

for {
nodeSlice := nodeSet.ToSlice()

for _, node := range nodeSlice {
_ = p.Invoke(node)
Expand Down Expand Up @@ -657,6 +686,7 @@ func main() {
go podWatch()
}
go getNodes()
go NodeWatch()
go getMetrics()
if deployType != "Deployment" && deployType != "DaemonSet" {
log.Error().Msg(fmt.Sprintf("deployType must be 'Deployment' or 'DaemonSet', got %s", deployType))
Expand Down
26 changes: 14 additions & 12 deletions tests/e2e/deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,26 +169,28 @@ func WatchNodePercentage() {

}
func WatchPollingRate(pollRateUpper float64, pollingRateLower float64, timeout time.Duration) {
var currentPollRate float64
status := 0
startTime := time.Now()
re := regexp.MustCompile(`ephemeral_storage_adjusted_polling_rate\{node_name="minikube"}\s+(.+)`)
for {
elapsed := time.Since(startTime)
if elapsed >= timeout {
ginkgo.GinkgoWriter.Printf("Watch for rate polling timed out")
ginkgo.GinkgoWriter.Printf("\nFailed: \n\tephemeral_storage_adjusted_polling_rate: %f\n", currentPollRate)
break
}
output := requestPrometheusString()
match := re.FindAllStringSubmatch(output, -1)
floatValue, _ := strconv.ParseFloat(match[0][1], 64)
if pollRateUpper >= floatValue && pollingRateLower <= floatValue {
currentPollRate, _ = strconv.ParseFloat(match[0][1], 64)
if pollRateUpper >= currentPollRate && pollingRateLower <= currentPollRate {
status = 1
break
}
time.Sleep(5 * time.Second)
}

gomega.Expect(status).Should(gomega.Equal(1))
ginkgo.GinkgoWriter.Printf("\nSuccess: \n\tephemeral_storage_adjusted_polling_rate: %f\n", currentPollRate)

}

Expand Down Expand Up @@ -218,7 +220,7 @@ func getContainerVolumeLimitPercentage(podName string) float64 {
return currentPodSize
}

func WatchEphemeralPodSize(podName string, desiredSizeChange float64, timeout time.Duration, getPodSize getPodSize) {
func WatchEphemeralSize(podName string, desiredSizeChange float64, timeout time.Duration, getPodSize getPodSize) {
// Watch Prometheus Metrics until the ephemeral storage shrinks or grows to a certain desiredSizeChange.
var currentPodSize float64
var targetSizeChange float64
Expand Down Expand Up @@ -248,11 +250,11 @@ func WatchEphemeralPodSize(podName string, desiredSizeChange float64, timeout ti
}

if status == 1 {
ginkgo.GinkgoWriter.Printf("\nSuccess: \n\tPod name: %s \n\tTarget size: %f \n\tCurrent size: %f", podName, targetSizeChange, currentPodSize)
ginkgo.GinkgoWriter.Printf("\nSuccess: \n\tPod name: %s \n\tTarget size: %f \n\tCurrent size: %f\n", podName, targetSizeChange, currentPodSize)
break
}

ginkgo.GinkgoWriter.Printf("\nPending: \n\tPod name: %s \n\tTarget size: %f \n\tCurrent size: %f", podName, targetSizeChange, currentPodSize)
ginkgo.GinkgoWriter.Printf("\nPending: \n\tPod name: %s \n\tTarget size: %f \n\tCurrent size: %f\n", podName, targetSizeChange, currentPodSize)
time.Sleep(time.Second * 5)

}
Expand Down Expand Up @@ -300,7 +302,7 @@ var _ = ginkgo.Describe("Test Metrics\n", func() {
})
ginkgo.Context("Observe change in ephemeral_storage_pod_usage metric\n", func() {
ginkgo.Specify("\nWatch Pod grow to 100000 Bytes", func() {
WatchEphemeralPodSize("grow-test", 100000, time.Second*90, getPodUsageSize)
WatchEphemeralSize("grow-test", 100000, time.Second*180, getPodUsageSize)
})
ginkgo.Specify("\nWatch Pod shrink to 100000 Bytes", func() {
// Shrinking of ephemeral_storage reflects slower from Node API up to 5 minutes.
Expand All @@ -314,24 +316,24 @@ var _ = ginkgo.Describe("Test Metrics\n", func() {
}
time.Sleep(time.Second * 5)
}
WatchEphemeralPodSize("shrink-test", 100000, time.Second*180, getPodUsageSize)
WatchEphemeralSize("shrink-test", 100000, time.Second*180, getPodUsageSize)
})
})
ginkgo.Context("Observe change in ephemeral_storage_container_limit_percentage metric\n", func() {
ginkgo.Specify("\nWatch Pod grow to 0.2 percent", func() {
WatchEphemeralPodSize("grow-test", 0.2, time.Second*90, getContainerLimitPercentage)
WatchEphemeralSize("grow-test", 0.2, time.Second*180, getContainerLimitPercentage)
})
ginkgo.Specify("\nWatch Pod shrink to 0.2 percent", func() {
WatchEphemeralPodSize("shrink-test", 0.2, time.Second*180, getContainerLimitPercentage)
WatchEphemeralSize("shrink-test", 0.2, time.Second*180, getContainerLimitPercentage)
})

})
ginkgo.Context("Observe change in ephemeral_storage_container_volume_limit_percentage metric\n", func() {
ginkgo.Specify("\nWatch Pod grow to 0.2 percent", func() {
WatchEphemeralPodSize("grow-test", 0.2, time.Second*90, getContainerVolumeLimitPercentage)
WatchEphemeralSize("grow-test", 0.2, time.Second*180, getContainerVolumeLimitPercentage)
})
ginkgo.Specify("\nWatch Pod shrink to 0.2 percent", func() {
WatchEphemeralPodSize("shrink-test", 0.2, time.Second*180, getContainerVolumeLimitPercentage)
WatchEphemeralSize("shrink-test", 0.2, time.Second*180, getContainerVolumeLimitPercentage)
})
})
ginkgo.Context("\nMake sure percentage is not over 100", func() {
Expand Down
Loading