Skip to content

Commit

Permalink
Handle deleting deleted existent pods + send synthetic event on kill()
Browse files Browse the repository at this point in the history
There's two potential issues with our termination code:
1. we try to delete a Pod that no longer exists
2. we successfully delete a Pod, but k8s coaleses the event and
   task_processing/our caller doesn't see the termination event

Solutions:
1. return success on a 404 response to a delete - regardless of what
   happened, we're in the desired state :p
2. Send a synthetic event to our caller in kill() + fixup our internal
   state
  • Loading branch information
nemacysts committed Apr 30, 2024
1 parent 1aa03d2 commit 07e1c63
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 1 deletion.
5 changes: 5 additions & 0 deletions task_processing/plugins/kubernetes/kube_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ def terminate_pod(
# the termination request.
return True
except ApiException as e:
if e.status == 404:
logger.info(
f"Found no pods matching {pod_name} - returning success since we're in the desired state."
)
return True
if not self.maybe_reload_on_exception(exception=e) and attempts:
logger.exception(
f"Failed to request termination for {pod_name} due to unhandled API "
Expand Down
28 changes: 27 additions & 1 deletion task_processing/plugins/kubernetes/kubernetes_pod_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -630,10 +630,36 @@ def kill(self, task_id: str) -> bool:
# NOTE: we're purposely not removing this task from `task_metadata` as we want
# to handle that with the Watch that we'll set to monitor each Pod for events.
# TODO(TASKPROC-242): actually handle termination events
return self.kube_client.terminate_pod(
terminated = self.kube_client.terminate_pod(
namespace=self.namespace,
pod_name=task_id,
)
if terminated:
logger.info(
f"Successfully requested termination for {task_id}. "
"Emitting synthetic 'killed' event in case of Kubernetes event coalescion."
)
# we need to lock here since there will be other threads updating this metadata in response
# to k8s events
with self.task_metadata_lock:
# NOTE: it's possible that there'll also be a real DELETED event for this Pod
# but it should be safe to do this as _process_pod_event() will just ignore
# pods not in self.task_metadata
self.task_metadata = self.task_metadata.discard(task_id)
self.event_queue.put(
task_event(
task_id=task_id,
terminal=True,
success=False,
timestamp=time.time(),
raw=None,
task_config=None,
platform_type="killed",
)
)
else:
logger.error(f"Failed to request termination for {task_id}.")
return terminated

def stop(self) -> None:
logger.debug("Preparing to stop all KubernetesPodExecutor threads.")
Expand Down

0 comments on commit 07e1c63

Please sign in to comment.