Skip to content

Commit

Permalink
Node watch
Browse files Browse the repository at this point in the history
  • Loading branch information
jmcgrath207 committed Mar 31, 2024
1 parent e3baa82 commit 8e16b38
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 22 deletions.
60 changes: 45 additions & 15 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ var (
nodePercentageGaugeVec *prometheus.GaugeVec
containerPercentageLimitsVec *prometheus.GaugeVec
containerPercentageVolumeLimitsVec *prometheus.GaugeVec
nodeSlice []string
maxNodeConcurrency int
podResourceLookup map[string]pod
podResourceLookupMutex sync.RWMutex
nodeSet mapset.Set[string]
)

func getEnv(key, fallback string) string {
Expand Down Expand Up @@ -279,6 +279,46 @@ func evictPodFromMetrics(p v1.Pod) {
}
}

func NodeWatch() {
nodeWaitGroup.Wait()
stopCh := make(chan struct{})
defer close(stopCh)
sharedInformerFactory := informers.NewSharedInformerFactory(clientset, time.Duration(sampleInterval)*time.Second)
podInformer := sharedInformerFactory.Core().V1().Nodes().Informer()

// Define event handlers for Pod events
eventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
p := obj.(*v1.Node)
nodeSet.Add(p.Name)
},
DeleteFunc: func(obj interface{}) {
p := obj.(*v1.Node)
nodeSet.Remove(p.Name)
evictNode(p.Name)
},
}

// Register the event handlers with the informer
_, err := podInformer.AddEventHandler(eventHandler)
if err != nil {
log.Err(err)
os.Exit(1)
}

// Start the informer to begin watching for Node events
go sharedInformerFactory.Start(stopCh)

for {
time.Sleep(time.Duration(sampleInterval) * time.Second)
select {
case <-stopCh:
log.Error().Msg("Watcher podWatch stopped.")
os.Exit(1)
}
}
}

func evictNode(node string) {

nodeAvailableGaugeVec.DeletePartialMatch(prometheus.Labels{"node_name": node})
Expand All @@ -291,7 +331,7 @@ func evictNode(node string) {
}

func getNodes() {
nodeSet := mapset.NewSet[string]()
nodeSet = mapset.NewSet[string]()
nodeWaitGroup.Add(1)
if deployType != "Deployment" {
nodeSet.Add(getEnv("CURRENT_NODE_NAME", ""))
Expand All @@ -304,20 +344,8 @@ func getNodes() {
for _, node := range startNodes.Items {
nodeSet.Add(node.Name)
}
nodeSlice = nodeSet.ToSlice()
nodeWaitGroup.Done()

// Poll for new nodes
// TODO: make this more event driven instead of polling
for {
nodes, _ := clientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
for _, node := range nodes.Items {
nodeSet.Add(node.Name)
}
nodeSlice = nodeSet.ToSlice()
time.Sleep(1 * time.Minute)
}

}

func queryNode(node string) ([]byte, error) {
Expand Down Expand Up @@ -429,7 +457,7 @@ func setMetrics(node string) {

content, err := queryNode(node)
if err != nil {
evictNode(node)
log.Warn().Msgf("Could not query node %s for ephemeral storage", node)
return
}

Expand Down Expand Up @@ -585,6 +613,7 @@ func getMetrics() {
defer p.Release()

for {
nodeSlice := nodeSet.ToSlice()

for _, node := range nodeSlice {
_ = p.Invoke(node)
Expand Down Expand Up @@ -657,6 +686,7 @@ func main() {
go podWatch()
}
go getNodes()
go NodeWatch()
go getMetrics()
if deployType != "Deployment" && deployType != "DaemonSet" {
log.Error().Msg(fmt.Sprintf("deployType must be 'Deployment' or 'DaemonSet', got %s", deployType))
Expand Down
14 changes: 7 additions & 7 deletions tests/e2e/deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func getContainerVolumeLimitPercentage(podName string) float64 {
return currentPodSize
}

func WatchEphemeralPodSize(podName string, desiredSizeChange float64, timeout time.Duration, getPodSize getPodSize) {
func WatchEphemeralSize(podName string, desiredSizeChange float64, timeout time.Duration, getPodSize getPodSize) {
// Watch Prometheus Metrics until the ephemeral storage shrinks or grows to a certain desiredSizeChange.
var currentPodSize float64
var targetSizeChange float64
Expand Down Expand Up @@ -300,7 +300,7 @@ var _ = ginkgo.Describe("Test Metrics\n", func() {
})
ginkgo.Context("Observe change in ephemeral_storage_pod_usage metric\n", func() {
ginkgo.Specify("\nWatch Pod grow to 100000 Bytes", func() {
WatchEphemeralPodSize("grow-test", 100000, time.Second*90, getPodUsageSize)
WatchEphemeralSize("grow-test", 100000, time.Second*180, getPodUsageSize)
})
ginkgo.Specify("\nWatch Pod shrink to 100000 Bytes", func() {
// Shrinking of ephemeral_storage reflects slower from Node API up to 5 minutes.
Expand All @@ -314,24 +314,24 @@ var _ = ginkgo.Describe("Test Metrics\n", func() {
}
time.Sleep(time.Second * 5)
}
WatchEphemeralPodSize("shrink-test", 100000, time.Second*180, getPodUsageSize)
WatchEphemeralSize("shrink-test", 100000, time.Second*180, getPodUsageSize)
})
})
ginkgo.Context("Observe change in ephemeral_storage_container_limit_percentage metric\n", func() {
ginkgo.Specify("\nWatch Pod grow to 0.2 percent", func() {
WatchEphemeralPodSize("grow-test", 0.2, time.Second*90, getContainerLimitPercentage)
WatchEphemeralSize("grow-test", 0.2, time.Second*180, getContainerLimitPercentage)
})
ginkgo.Specify("\nWatch Pod shrink to 0.2 percent", func() {
WatchEphemeralPodSize("shrink-test", 0.2, time.Second*180, getContainerLimitPercentage)
WatchEphemeralSize("shrink-test", 0.2, time.Second*180, getContainerLimitPercentage)
})

})
ginkgo.Context("Observe change in ephemeral_storage_container_volume_limit_percentage metric\n", func() {
ginkgo.Specify("\nWatch Pod grow to 0.2 percent", func() {
WatchEphemeralPodSize("grow-test", 0.2, time.Second*90, getContainerVolumeLimitPercentage)
WatchEphemeralSize("grow-test", 0.2, time.Second*180, getContainerVolumeLimitPercentage)
})
ginkgo.Specify("\nWatch Pod shrink to 0.2 percent", func() {
WatchEphemeralPodSize("shrink-test", 0.2, time.Second*180, getContainerVolumeLimitPercentage)
WatchEphemeralSize("shrink-test", 0.2, time.Second*180, getContainerVolumeLimitPercentage)
})
})
ginkgo.Context("\nMake sure percentage is not over 100", func() {
Expand Down

0 comments on commit 8e16b38

Please sign in to comment.