Skip to content

Commit

Permalink
Handling Added events and adding timers and logs in signalfx and scri…
Browse files Browse the repository at this point in the history
…bereader
  • Loading branch information
EmanElsaban committed Jan 19, 2024
1 parent 0f0cbe6 commit 7905c90
Showing 1 changed file with 14 additions and 10 deletions.
24 changes: 14 additions & 10 deletions task_processing/plugins/kubernetes/kubernetes_pod_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
from task_processing.plugins.kubernetes.utils import get_sanitised_kubernetes_name

logger = logging.getLogger(__name__)
from kubernetes.client import ApiClient
api = ApiClient()

POD_WATCH_THREAD_JOIN_TIMEOUT_S = 1.0
POD_EVENT_THREAD_JOIN_TIMEOUT_S = 1.0
Expand All @@ -52,7 +54,8 @@
"Running",
"Succeeded",
"Unknown",
}
}



class KubernetesPodExecutor(TaskExecutor):
Expand Down Expand Up @@ -135,7 +138,7 @@ def _initialize_existing_task(self, task_config: KubernetesTaskConfig) -> None:
),
),
)

def _pod_event_watch_loop(self) -> None:
logger.debug(f"Starting watching Pod events for namespace={self.namespace}.")
# TODO(TASKPROC-243): we'll need to correctly handle resourceVersion expiration for the case
Expand All @@ -151,9 +154,6 @@ def _pod_event_watch_loop(self) -> None:
for pod_event in self.watch.stream(
self.kube_client.core.list_namespaced_pod, self.namespace
):
# it's possible that we've received an event after we've already set the stop
# flag since Watch streams block forever, so re-check if we've stopped before
# queueing any pending events
if not self.stopping:
logger.debug("Adding Pod event to pending event queue.")
self.pending_events.put(pod_event)
Expand Down Expand Up @@ -209,8 +209,8 @@ def __update_modified_pod(self, pod: V1Pod, event: Optional[PodEvent]) -> None:
raw_event = event["raw_object"] if event else None

if pod.status.phase not in SUPPORTED_POD_MODIFIED_EVENT_PHASES:
logger.debug(
f"Got a MODIFIED event for {pod_name} for unhandled phase: "
logger.info(
f"Got a {event['type']} event for {pod_name} for unhandled phase: "
f"{pod.status.phase} - ignoring."
)
return
Expand Down Expand Up @@ -319,7 +319,7 @@ def __update_modified_pod(self, pod: V1Pod, event: Optional[PodEvent]) -> None:
and task_metadata.task_state is not KubernetesTaskState.TASK_LOST
):
logger.info(
f"Got a MODIFIED event for {pod_name} with unknown phase, host likely "
f"Got a {event['type']} event for {pod_name} with unknown phase, host likely "
"unexpectedly died"
)
self.task_metadata = self.task_metadata.set(
Expand Down Expand Up @@ -357,7 +357,7 @@ def __update_modified_pod(self, pod: V1Pod, event: Optional[PodEvent]) -> None:
)
else:
logger.info(
f"Ignoring MODIFIED event for {pod_name} as it did not result "
f"Ignoring {event['type']} event for {pod_name} as it did not result "
"in a state transition",
)

Expand Down Expand Up @@ -388,7 +388,7 @@ def _process_pod_event(self, event: PodEvent) -> None:
elif event["type"] == "DELETED":
self.__handle_deleted_pod_event(event)

elif event["type"] == "MODIFIED":
elif event["type"] in {"MODIFIED", "ADDED"}:
self.__handle_modified_pod_event(event)

else:
Expand All @@ -404,6 +404,10 @@ def _pending_event_processing_loop(self) -> None:
event = None
while not self.stopping or not self.pending_events.empty():
try:
# we might see that their are gaps 0.5s because thats how long it will take it to see if there are stuff in the queue
# will give you whats in the queue or wait 0.5 sec to receive events, if no events are received then it will throw the empty exception
# and start again
# I think below might be taking some time to get from a queue an event, should time these two separately
event = self.pending_events.get(timeout=QUEUE_GET_TIMEOUT_S)
self._process_pod_event(event)
except queue.Empty:
Expand Down

0 comments on commit 7905c90

Please sign in to comment.