Skip to content

Commit

Permalink
Node watch
Browse files Browse the repository at this point in the history
  • Loading branch information
jmcgrath207 committed Mar 31, 2024
1 parent e3baa82 commit 89b349a
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 25 deletions.
60 changes: 45 additions & 15 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ var (
nodePercentageGaugeVec *prometheus.GaugeVec
containerPercentageLimitsVec *prometheus.GaugeVec
containerPercentageVolumeLimitsVec *prometheus.GaugeVec
nodeSlice []string
maxNodeConcurrency int
podResourceLookup map[string]pod
podResourceLookupMutex sync.RWMutex
nodeSet mapset.Set[string]
)

func getEnv(key, fallback string) string {
Expand Down Expand Up @@ -279,6 +279,46 @@ func evictPodFromMetrics(p v1.Pod) {
}
}

func NodeWatch() {
nodeWaitGroup.Wait()
stopCh := make(chan struct{})
defer close(stopCh)
sharedInformerFactory := informers.NewSharedInformerFactory(clientset, time.Duration(sampleInterval)*time.Second)
podInformer := sharedInformerFactory.Core().V1().Nodes().Informer()

// Define event handlers for Pod events
eventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
p := obj.(*v1.Node)
nodeSet.Add(p.Name)
},
DeleteFunc: func(obj interface{}) {
p := obj.(*v1.Node)
nodeSet.Remove(p.Name)
evictNode(p.Name)
},
}

// Register the event handlers with the informer
_, err := podInformer.AddEventHandler(eventHandler)
if err != nil {
log.Err(err)
os.Exit(1)
}

// Start the informer to begin watching for Node events
go sharedInformerFactory.Start(stopCh)

for {
time.Sleep(time.Duration(sampleInterval) * time.Second)
select {
case <-stopCh:
log.Error().Msg("Watcher NodeWatch stopped.")
os.Exit(1)
}
}
}

func evictNode(node string) {

nodeAvailableGaugeVec.DeletePartialMatch(prometheus.Labels{"node_name": node})
Expand All @@ -291,7 +331,7 @@ func evictNode(node string) {
}

func getNodes() {
nodeSet := mapset.NewSet[string]()
nodeSet = mapset.NewSet[string]()
nodeWaitGroup.Add(1)
if deployType != "Deployment" {
nodeSet.Add(getEnv("CURRENT_NODE_NAME", ""))
Expand All @@ -304,20 +344,8 @@ func getNodes() {
for _, node := range startNodes.Items {
nodeSet.Add(node.Name)
}
nodeSlice = nodeSet.ToSlice()
nodeWaitGroup.Done()

// Poll for new nodes
// TODO: make this more event driven instead of polling
for {
nodes, _ := clientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
for _, node := range nodes.Items {
nodeSet.Add(node.Name)
}
nodeSlice = nodeSet.ToSlice()
time.Sleep(1 * time.Minute)
}

}

func queryNode(node string) ([]byte, error) {
Expand Down Expand Up @@ -429,7 +457,7 @@ func setMetrics(node string) {

content, err := queryNode(node)
if err != nil {
evictNode(node)
log.Warn().Msgf("Could not query node %s for ephemeral storage", node)
return
}

Expand Down Expand Up @@ -585,6 +613,7 @@ func getMetrics() {
defer p.Release()

for {
nodeSlice := nodeSet.ToSlice()

for _, node := range nodeSlice {
_ = p.Invoke(node)
Expand Down Expand Up @@ -657,6 +686,7 @@ func main() {
go podWatch()
}
go getNodes()
go NodeWatch()
go getMetrics()
if deployType != "Deployment" && deployType != "DaemonSet" {
log.Error().Msg(fmt.Sprintf("deployType must be 'Deployment' or 'DaemonSet', got %s", deployType))
Expand Down
22 changes: 12 additions & 10 deletions tests/e2e/deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,26 +169,28 @@ func WatchNodePercentage() {

}
func WatchPollingRate(pollRateUpper float64, pollingRateLower float64, timeout time.Duration) {
var currentPollRate float64
status := 0
startTime := time.Now()
re := regexp.MustCompile(`ephemeral_storage_adjusted_polling_rate\{node_name="minikube"}\s+(.+)`)
for {
elapsed := time.Since(startTime)
if elapsed >= timeout {
ginkgo.GinkgoWriter.Printf("Watch for rate polling timed out")
ginkgo.GinkgoWriter.Printf("\nFailed: \n\tephemeral_storage_adjusted_polling_rate: %f\n", currentPollRate)
break
}
output := requestPrometheusString()
match := re.FindAllStringSubmatch(output, -1)
floatValue, _ := strconv.ParseFloat(match[0][1], 64)
if pollRateUpper >= floatValue && pollingRateLower <= floatValue {
currentPollRate, _ = strconv.ParseFloat(match[0][1], 64)
if pollRateUpper >= currentPollRate && pollingRateLower <= currentPollRate {
status = 1
break
}
time.Sleep(5 * time.Second)
}

gomega.Expect(status).Should(gomega.Equal(1))
ginkgo.GinkgoWriter.Printf("\nSuccess: \n\tephemeral_storage_adjusted_polling_rate: %f\n", currentPollRate)

}

Expand Down Expand Up @@ -218,7 +220,7 @@ func getContainerVolumeLimitPercentage(podName string) float64 {
return currentPodSize
}

func WatchEphemeralPodSize(podName string, desiredSizeChange float64, timeout time.Duration, getPodSize getPodSize) {
func WatchEphemeralSize(podName string, desiredSizeChange float64, timeout time.Duration, getPodSize getPodSize) {
// Watch Prometheus Metrics until the ephemeral storage shrinks or grows to a certain desiredSizeChange.
var currentPodSize float64
var targetSizeChange float64
Expand Down Expand Up @@ -300,7 +302,7 @@ var _ = ginkgo.Describe("Test Metrics\n", func() {
})
ginkgo.Context("Observe change in ephemeral_storage_pod_usage metric\n", func() {
ginkgo.Specify("\nWatch Pod grow to 100000 Bytes", func() {
WatchEphemeralPodSize("grow-test", 100000, time.Second*90, getPodUsageSize)
WatchEphemeralSize("grow-test", 100000, time.Second*180, getPodUsageSize)
})
ginkgo.Specify("\nWatch Pod shrink to 100000 Bytes", func() {
// Shrinking of ephemeral_storage reflects slower from Node API up to 5 minutes.
Expand All @@ -314,24 +316,24 @@ var _ = ginkgo.Describe("Test Metrics\n", func() {
}
time.Sleep(time.Second * 5)
}
WatchEphemeralPodSize("shrink-test", 100000, time.Second*180, getPodUsageSize)
WatchEphemeralSize("shrink-test", 100000, time.Second*180, getPodUsageSize)
})
})
ginkgo.Context("Observe change in ephemeral_storage_container_limit_percentage metric\n", func() {
ginkgo.Specify("\nWatch Pod grow to 0.2 percent", func() {
WatchEphemeralPodSize("grow-test", 0.2, time.Second*90, getContainerLimitPercentage)
WatchEphemeralSize("grow-test", 0.2, time.Second*180, getContainerLimitPercentage)
})
ginkgo.Specify("\nWatch Pod shrink to 0.2 percent", func() {
WatchEphemeralPodSize("shrink-test", 0.2, time.Second*180, getContainerLimitPercentage)
WatchEphemeralSize("shrink-test", 0.2, time.Second*180, getContainerLimitPercentage)
})

})
ginkgo.Context("Observe change in ephemeral_storage_container_volume_limit_percentage metric\n", func() {
ginkgo.Specify("\nWatch Pod grow to 0.2 percent", func() {
WatchEphemeralPodSize("grow-test", 0.2, time.Second*90, getContainerVolumeLimitPercentage)
WatchEphemeralSize("grow-test", 0.2, time.Second*180, getContainerVolumeLimitPercentage)
})
ginkgo.Specify("\nWatch Pod shrink to 0.2 percent", func() {
WatchEphemeralPodSize("shrink-test", 0.2, time.Second*180, getContainerVolumeLimitPercentage)
WatchEphemeralSize("shrink-test", 0.2, time.Second*180, getContainerVolumeLimitPercentage)
})
})
ginkgo.Context("\nMake sure percentage is not over 100", func() {
Expand Down

0 comments on commit 89b349a

Please sign in to comment.