From 3ee652dd8e400f9dff250c5aa9cae9dcf36f10c4 Mon Sep 17 00:00:00 2001 From: John Mcgrath Date: Sun, 24 Mar 2024 22:51:45 -0500 Subject: [PATCH] bug: fix node eviction --- .github/workflows/test.yaml | 2 +- main.go | 38 ++++++------ scripts/create-minikube.sh | 4 +- scripts/deploy.sh | 72 ++++++++++++++-------- scripts/helpers.sh | 13 +++- tests/e2e/deployment_test.go | 115 ++++++++++++++++++++++++----------- 6 files changed, 161 insertions(+), 83 deletions(-) diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 5913b6a..80726a4 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -5,7 +5,7 @@ on: branches: - 'master' pull_request: - types: [opened, reopened] + types: [opened, reopened,ready_for_review,synchronize] jobs: diff --git a/main.go b/main.go index 068e834..6eda5f9 100644 --- a/main.go +++ b/main.go @@ -42,7 +42,7 @@ var ( ephemeralStorageNodePercentage bool ephemeralStorageContainerLimitsPercentage bool ephemeralStorageContainerVolumeLimitsPercentage bool - adjustedTimeGaugeVec *prometheus.GaugeVec + adjustedPollingRateGaugeVec *prometheus.GaugeVec deployType string nodeWaitGroup sync.WaitGroup podDataWaitGroup sync.WaitGroup @@ -267,6 +267,7 @@ func podWatch() { ticker := time.NewTicker(time.Duration(sampleInterval) * time.Second) defer ticker.Stop() + // TODO: make this more event driven instead of polling for { select { case <-ticker.C: @@ -288,8 +289,18 @@ func evictPodFromMetrics(p v1.Pod) { } } +func evictNode(node string) { + + nodeAvailableGaugeVec.DeletePartialMatch(prometheus.Labels{"node_name": node}) + nodeCapacityGaugeVec.DeletePartialMatch(prometheus.Labels{"node_name": node}) + nodePercentageGaugeVec.DeletePartialMatch(prometheus.Labels{"node_name": node}) + if adjustedPollingRate { + adjustedPollingRateGaugeVec.DeletePartialMatch(prometheus.Labels{"node_name": node}) + } + log.Info().Msgf("Node %s does not exist. Removed from monitoring", node) +} + func getNodes() { - oldNodeSet := mapset.NewSet[string]() nodeSet := mapset.NewSet[string]() nodeWaitGroup.Add(1) if deployType != "Deployment" { @@ -306,24 +317,13 @@ func getNodes() { nodeSlice = nodeSet.ToSlice() nodeWaitGroup.Done() - // Poll for new nodes and remove dead ones + // Poll for new nodes + // TODO: make this more event driven instead of polling for { - oldNodeSet = nodeSet.Clone() - nodeSet.Clear() nodes, _ := clientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) for _, node := range nodes.Items { nodeSet.Add(node.Name) } - deadNodesSet := nodeSet.Difference(oldNodeSet) - - // Evict Metrics where the node doesn't exist anymore. - for _, deadNode := range deadNodesSet.ToSlice() { - nodeAvailableGaugeVec.DeletePartialMatch(prometheus.Labels{"node_name": deadNode}) - nodeCapacityGaugeVec.DeletePartialMatch(prometheus.Labels{"node_name": deadNode}) - nodePercentageGaugeVec.DeletePartialMatch(prometheus.Labels{"node_name": deadNode}) - log.Info().Msgf("Node %s does not exist. Removing from monitoring", deadNode) - } - nodeSlice = nodeSet.ToSlice() time.Sleep(1 * time.Minute) } @@ -439,7 +439,7 @@ func setMetrics(node string) { content, err := queryNode(node) if err != nil { - log.Warn().Msg(fmt.Sprintf("Could not query node: %s. Skipping..", node)) + evictNode(node) return } @@ -467,7 +467,7 @@ func setMetrics(node string) { log.Error().Msgf("Node %s: Polling Rate could not keep up. Adjust your Interval to a higher number than %d seconds", nodeName, sampleInterval) } if adjustedPollingRate { - adjustedTimeGaugeVec.With(prometheus.Labels{"node_name": nodeName}).Set(float64(adjustTime)) + adjustedPollingRateGaugeVec.With(prometheus.Labels{"node_name": nodeName}).Set(float64(adjustTime)) } } @@ -567,7 +567,7 @@ func createMetrics() { prometheus.MustRegister(nodePercentageGaugeVec) if adjustedPollingRate { - adjustedTimeGaugeVec = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + adjustedPollingRateGaugeVec = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "ephemeral_storage_adjusted_polling_rate", Help: "AdjustTime polling rate time after a Node API queries in Milliseconds", }, @@ -576,7 +576,7 @@ func createMetrics() { "node_name", }) - prometheus.MustRegister(adjustedTimeGaugeVec) + prometheus.MustRegister(adjustedPollingRateGaugeVec) } } diff --git a/scripts/create-minikube.sh b/scripts/create-minikube.sh index 3b211aa..fbfaf41 100755 --- a/scripts/create-minikube.sh +++ b/scripts/create-minikube.sh @@ -5,7 +5,9 @@ c=$(docker ps -q) && [[ $c ]] && docker kill $c docker network prune -f minikube start \ --kubernetes-version="${K8S_VERSION}" \ - --insecure-registry "10.0.0.0/24" + --insecure-registry "10.0.0.0/24" \ + --cpus=2 \ + --memory=3900MB minikube addons enable registry # Add Service Monitor CRD diff --git a/scripts/deploy.sh b/scripts/deploy.sh index 91be204..617d631 100755 --- a/scripts/deploy.sh +++ b/scripts/deploy.sh @@ -13,19 +13,42 @@ source helpers.sh function main() { local image_tag local dockerfile - local registry - local image local common_set_values local common_set_values_arr local grow_repo_image local shrink_repo_image local e2e_values_arr + local external_registry + local internal_registry + local status_code trap 'trap_func' EXIT ERR - while [ "$(kubectl get pods -n kube-system -l kubernetes.io/minikube-addons=registry -o=jsonpath='{.items[*].status.phase}')" != "Running Running" ]; do - echo "waiting for minikube registry and proxy pod to start. Sleep 10" && sleep 10 + # Wait until registry pod come up + while [ "$(kubectl get pods -n kube-system -l actual-registry=true -o=jsonpath='{.items[*].status.phase}')" != "Running" ]; do + echo "Waiting for registry pod to start. Sleep 10" && sleep 10 done + + # Wait until registry-proxy pod come up + while [ "$(kubectl get pods -n kube-system -l registry-proxy=true -o=jsonpath='{.items[*].status.phase}')" != "Running" ]; do + echo "Waiting for registry proxy pod to start. Sleep 10" && sleep 10 + done + + # Use a while loop to repeatedly check the registry endpoint until health + while true; do + # Send a GET request to the endpoint and capture the HTTP status code + status_code=$(curl -s -o /dev/null -w "%{http_code}" "http://$(minikube ip):5000/v2/_catalog") + + # Check if the status code is 200 + if [ "$status_code" -eq 200 ]; then + echo "Registry endpoint is healthy. Status code: $status_code" + break + else + echo "Registry endpoint is not healthy. Status code: $status_code. Retrying..." + sleep 5 # Wait for 5 seconds before retrying + fi + done + # Need both. External to push and internal for pods to pull from registry in cluster external_registry="$(minikube ip):5000" internal_registry="$(kubectl get service -n kube-system registry --template='{{.spec.clusterIP}}')" @@ -34,20 +57,19 @@ function main() { grow_repo_image="k8s-ephemeral-storage-grow-test:latest" - docker build --build-arg TARGETOS=linux --build-arg TARGETARCH=amd64 -f ../DockerfileTestGrow \ - -t "${external_registry}/${grow_repo_image}" -t "${internal_registry}/${grow_repo_image}" ../. + docker build --build-arg TARGETOS=linux --build-arg TARGETARCH=amd64 -f ../DockerfileTestGrow \ + -t "${external_registry}/${grow_repo_image}" -t "${internal_registry}/${grow_repo_image}" ../. - docker save "${external_registry}/${grow_repo_image}" > /tmp/image.tar - ${LOCALBIN}/crane push --insecure /tmp/image.tar "${external_registry}/${grow_repo_image}" + docker save "${external_registry}/${grow_repo_image}" >/tmp/image.tar + "${LOCALBIN}/crane" push --insecure /tmp/image.tar "${external_registry}/${grow_repo_image}" rm /tmp/image.tar - shrink_repo_image="k8s-ephemeral-storage-shrink-test:latest" docker build --build-arg TARGETOS=linux --build-arg TARGETARCH=amd64 -f ../DockerfileTestShrink \ - -t "${external_registry}/${shrink_repo_image}" -t "${internal_registry}/${shrink_repo_image}" ../. + -t "${external_registry}/${shrink_repo_image}" -t "${internal_registry}/${shrink_repo_image}" ../. - docker save "${external_registry}/${shrink_repo_image}" > /tmp/image.tar + docker save "${external_registry}/${shrink_repo_image}" >/tmp/image.tar ${LOCALBIN}/crane push --insecure /tmp/image.tar "${external_registry}/${shrink_repo_image}" rm /tmp/image.tar @@ -62,13 +84,12 @@ function main() { # Main image main_repo_image="${DEPLOYMENT_NAME}:${image_tag}" docker build --build-arg TARGETOS=linux --build-arg TARGETARCH=amd64 -f ../${dockerfile} \ - -t "${external_registry}/${main_repo_image}" -t "${internal_registry}/${main_repo_image}" ../. + -t "${external_registry}/${main_repo_image}" -t "${internal_registry}/${main_repo_image}" ../. - docker save "${external_registry}/${main_repo_image}" > /tmp/image.tar + docker save "${external_registry}/${main_repo_image}" >/tmp/image.tar ${LOCALBIN}/crane push --insecure /tmp/image.tar "${external_registry}/${main_repo_image}" rm /tmp/image.tar - ### Install Chart ### common_set_values_arr=( @@ -96,7 +117,6 @@ function main() { --create-namespace \ --namespace "${DEPLOYMENT_NAME}" - # Patch deploy so minikube image upload works. if [[ $ENV == "debug" ]]; then # Disable for Debugging of Delve. @@ -104,23 +124,25 @@ function main() { '{ "spec": {"template": { "spec":{"securityContext": null, "containers":[{"name":"metrics", "livenessProbe": null, "readinessProbe": null, "securityContext": null, "command": null, "args": null }]}}}}' fi - # Kill dangling port forwards if found. - # Main Exporter Port - sudo ss -aK '( dport = :9100 or sport = :9100 )' | true - # Prometheus Port - sudo ss -aK '( dport = :9090 or sport = :9090 )' | true - # Pprof Port - sudo ss -aK '( dport = :6060 or sport = :6060 )' | true - # Start Exporter Port Forward ( sleep 10 - printf "\n\n" && while :; do kubectl port-forward -n $DEPLOYMENT_NAME service/k8s-ephemeral-storage-metrics 9100:9100 || sleep 5; done + printf "\n\n" && while :; do kubectl port-forward -n $DEPLOYMENT_NAME service/k8s-ephemeral-storage-metrics 9100:9100 || kill_main_exporter_port && sleep 5; done ) & # Wait until main pod comes up while [ "$(kubectl get pods -n $DEPLOYMENT_NAME -l app.kubernetes.io/name=k8s-ephemeral-storage-metrics -o=jsonpath='{.items[*].status.phase}')" != "Running" ]; do - echo "waiting for k8s-ephemeral-storage-metrics pod to start. Sleep 10" && sleep 10 + echo "Waiting for k8s-ephemeral-storage-metrics pod to start. Sleep 10" && sleep 10 + done + + # Wait until grow-test comes up + while [ "$(kubectl get pods -n $DEPLOYMENT_NAME -l name=grow-test -o=jsonpath='{.items[*].status.phase}')" != "Running" ]; do + echo "Waiting for grow-test pod to start. Sleep 10" && sleep 10 + done + + # Wait until shrink-test comes up + while [ "$(kubectl get pods -n $DEPLOYMENT_NAME -l name=shrink-test -o=jsonpath='{.items[*].status.phase}')" != "Running" ]; do + echo "Waiting for shrink-test pod to start. Sleep 10" && sleep 10 done if [[ $ENV == "debug" ]]; then diff --git a/scripts/helpers.sh b/scripts/helpers.sh index 19286ca..7a713b4 100755 --- a/scripts/helpers.sh +++ b/scripts/helpers.sh @@ -1,3 +1,9 @@ +#!/bin/bash + +function kill_main_exporter_port { + # Main Exporter Port + sudo ss -aK '( dport = :9100 or sport = :9100 )' || true +} function trap_func() { set +e @@ -5,7 +11,12 @@ function trap_func() { helm delete $DEPLOYMENT_NAME -n $DEPLOYMENT_NAME jobs -p | xargs kill -SIGSTOP jobs -p | xargs kill -9 - sudo ss -aK '( dport = :9100 or sport = :9100 )' + # Kill dangling port forwards if found. + kill_main_exporter_port + # Prometheus Port + sudo ss -aK '( dport = :9090 or sport = :9090 )' || true + # Pprof Port + sudo ss -aK '( dport = :6060 or sport = :6060 )' || true } &> /dev/null } diff --git a/tests/e2e/deployment_test.go b/tests/e2e/deployment_test.go index 2cf9fc7..f341470 100644 --- a/tests/e2e/deployment_test.go +++ b/tests/e2e/deployment_test.go @@ -6,6 +6,7 @@ import ( "github.com/onsi/gomega" "io" "net/http" + "os/exec" "regexp" "strconv" "strings" @@ -58,7 +59,7 @@ func CheckValues(ifFound map[string]bool) int { return status } -func checkPrometheus(checkSlice []string) { +func checkPrometheus(checkSlice []string, inverse bool) { var status int timeout := time.Second * 180 startTime := time.Now() @@ -84,10 +85,14 @@ func checkPrometheus(checkSlice []string) { if ifFound[a] { continue } - if strings.Contains(output, a) { + if inverse && !strings.Contains(output, a) { + ifFound[a] = true + + } else if !inverse && strings.Contains(output, a) { ifFound[a] = true - status = CheckValues(ifFound) } + + status = CheckValues(ifFound) } if status == 1 { @@ -97,7 +102,11 @@ func checkPrometheus(checkSlice []string) { } for key, value := range ifFound { if value { - ginkgo.GinkgoWriter.Printf("\nFound value: [ %v ] in prometheus exporter\n", key) + if inverse { + ginkgo.GinkgoWriter.Printf("\nDid not find value: [ %v ] in prometheus exporter\n", key) + } else { + ginkgo.GinkgoWriter.Printf("\nFound value: [ %v ] in prometheus exporter\n", key) + } continue } ginkgo.GinkgoWriter.Printf("\nDid not find value: [ %v ] in prometheus exporter\n", key) @@ -120,16 +129,12 @@ func WatchContainerPercentage() { } +// TODO: need to add func WatchContainerVolumePercentage() { - // TODO: add test for this once evictions for dead pods is handled. //ephemeral_storage_container_volume_limit_percentage } -func WatchDeadPod() { - // TODO: Deploy a pod and then remove it. Make sure it's completely evicted from prom metrics. -} - func WatchNodePercentage() { status := 0 re := regexp.MustCompile(`ephemeral_storage_node_percentage\{node_name="minikube"}\s+(.+)`) @@ -166,48 +171,72 @@ func WatchPollingRate(pollRateUpper float64, pollingRateLower float64, timeout t } -func WatchEphemeralPodSize(podname string, sizeChange float64, timeout time.Duration) { - // Watch Prometheus Metrics until the ephemeral storage shrinks or grows to a certain sizeChange. +func getPodSize(podName string) float64 { + output := requestPrometheusString() + re := regexp.MustCompile(fmt.Sprintf(`ephemeral_storage_pod_usage.+pod_name="%s.+\}\s(.+)`, podName)) + match := re.FindAllStringSubmatch(output, 2) + currentPodSize, _ := strconv.ParseFloat(match[0][1], 64) + return currentPodSize +} + +func WatchEphemeralPodSize(podName string, desiredSizeChange float64, timeout time.Duration) { + // Watch Prometheus Metrics until the ephemeral storage shrinks or grows to a certain desiredSizeChange. + var currentPodSize float64 startTime := time.Now() status := 0 - var initSize float64 + initSize := getPodSize(podName) + if podName == "grow-test" { + desiredSizeChange = initSize + desiredSizeChange + } else if podName == "shrink-test" { + desiredSizeChange = initSize - desiredSizeChange + } + for { elapsed := time.Since(startTime) if elapsed >= timeout { - ginkgo.GinkgoWriter.Printf("Watch for metrics has timed out for pod %s", podname) + ginkgo.GinkgoWriter.Printf("Watch for metrics has timed out for pod %s", podName) break } - output := requestPrometheusString() - re := regexp.MustCompile(fmt.Sprintf(`ephemeral_storage_pod_usage.+pod_name="%s.+\}\s(.+)`, podname)) - match := re.FindAllStringSubmatch(output, 2) - floatValue, _ := strconv.ParseFloat(match[0][1], 64) - if initSize == 0.0 { - initSize = floatValue + currentPodSize = getPodSize(podName) + if podName == "grow-test" && currentPodSize >= desiredSizeChange { + status = 1 + + } else if podName == "shrink-test" && currentPodSize <= desiredSizeChange { + status = 1 } - if strings.Contains(podname, "grow") { - if floatValue-initSize >= sizeChange { - ginkgo.GinkgoWriter.Printf("\nSuccess: \n\tPod Name: %s \n\tTargetSize: %f \n\tPodSize: %f", podname, initSize+sizeChange, floatValue) - status = 1 - break - } - ginkgo.GinkgoWriter.Printf("\nPending: \n\tPod Name: %s \n\tTargetSize: %f \n\tPodSize: %f", podname, initSize+sizeChange, floatValue) - time.Sleep(time.Second * 5) - } else if strings.Contains(podname, "shrink") { - if initSize-floatValue >= sizeChange { - ginkgo.GinkgoWriter.Printf("\nSuccess: \n\tPod Name: %s \n\tTargetSize: %f \n\tPodSize: %f", podname, initSize-sizeChange, floatValue) - status = 1 - break - } - ginkgo.GinkgoWriter.Printf("\nPending: \n\tPod Name: %s \n\tTargetSize: %f \n\tPodSize: %f", podname, initSize-sizeChange, floatValue) - time.Sleep(time.Second * 5) + + if status == 1 { + ginkgo.GinkgoWriter.Printf("\nSuccess: \n\tPod name: %s \n\tTarget size: %f \n\tCurrent size: %f", podName, desiredSizeChange, currentPodSize) + break } + ginkgo.GinkgoWriter.Printf("\nPending: \n\tPod name: %s \n\tTarget size: %f \n\tCurrent size: %f", podName, desiredSizeChange, currentPodSize) + time.Sleep(time.Second * 5) + } gomega.Expect(status).Should(gomega.Equal(1)) } +func scaleUp() { + cmd := exec.Command("make", "minikube_scale_up") + cmd.Dir = "../.." + + _, err := cmd.Output() + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + +} + +func scaleDown() { + cmd := exec.Command("make", "minikube_scale_down") + cmd.Dir = "../.." + + _, err := cmd.Output() + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + +} + var _ = ginkgo.Describe("Test Metrics\n", func() { ginkgo.Context("Observe labels\n", func() { @@ -220,7 +249,7 @@ var _ = ginkgo.Describe("Test Metrics\n", func() { "pod_name=\"k8s-ephemeral-storage", "ephemeral_storage_adjusted_polling_rate", "node_name=\"minikube", "ephemeral_storage_container_limit_percentage") - checkPrometheus(checkSlice) + checkPrometheus(checkSlice, false) }) }) ginkgo.Context("Observe change in storage metrics\n", func() { @@ -246,6 +275,20 @@ var _ = ginkgo.Describe("Test Metrics\n", func() { WatchContainerPercentage() }) }) + ginkgo.Context("Test Scaling\n", func() { + checkSlice := []string{ + "node_name=\"minikube-m02", + "ephemeral_storage_container_limit_percentage{container=\"kube-proxy\",node_name=\"minikube-m02\"", + } + ginkgo.Specify("\nScale up test to make sure pods and nodes are found", func() { + scaleUp() + checkPrometheus(checkSlice, false) + }) + ginkgo.Specify("\nScale Down test to make sure pods and nodes are evicted", func() { + scaleDown() + checkPrometheus(checkSlice, true) + }) + }) }) func TestDeployments(t *testing.T) {