Skip to content

Commit

Permalink
UPT: add schedulername
Browse files Browse the repository at this point in the history
  • Loading branch information
DesmondH0 committed Jun 17, 2020
1 parent 38653bf commit a16b48c
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -261,3 +261,8 @@ def _apply_env_from(pod, req):
}
}
)

@staticmethod
def extract_schedulername(pod, req):
if pod.schedulername:
req['spec']['schedulerName'] = pod.schedulername
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def create(self, pod):
self.extract_tolerations(pod, req)
self.extract_security_context(pod, req)
self.extract_dnspolicy(pod, req)
self.extract_schedulername(pod, req)
return req


Expand Down Expand Up @@ -132,4 +133,5 @@ def create(self, pod):
self.extract_tolerations(pod, req)
self.extract_security_context(pod, req)
self.extract_dnspolicy(pod, req)
self.extract_schedulername(pod, req)
return req
6 changes: 5 additions & 1 deletion airflow/contrib/kubernetes/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ class Pod:
:type pod_runtime_info_envs: list[PodRuntimeEnv]
:param dnspolicy: Specify a dnspolicy for the pod
:type dnspolicy: str
:param schedulername: Specify a schedulername for the pod
:type schedulername: str
"""
def __init__(
self,
Expand Down Expand Up @@ -120,7 +122,8 @@ def __init__(
security_context=None,
configmaps=None,
pod_runtime_info_envs=None,
dnspolicy=None
dnspolicy=None,
schedulername=None
):
self.image = image
self.envs = envs or {}
Expand Down Expand Up @@ -148,3 +151,4 @@ def __init__(
self.configmaps = configmaps or []
self.pod_runtime_info_envs = pod_runtime_info_envs or []
self.dnspolicy = dnspolicy
self.schedulername = schedulername
4 changes: 3 additions & 1 deletion airflow/contrib/kubernetes/worker_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_da

affinity = kube_executor_config.affinity or self.kube_config.kube_affinity
tolerations = kube_executor_config.tolerations or self.kube_config.kube_tolerations
schedulername = kube_executor_config.schedulername or self.kube_config.schedulername

return Pod(
namespace=namespace,
Expand Down Expand Up @@ -428,5 +429,6 @@ def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_da
affinity=affinity,
tolerations=tolerations,
security_context=self._get_security_context(),
configmaps=self._get_configmaps()
configmaps=self._get_configmaps(),
schedulername=schedulername
)
5 changes: 5 additions & 0 deletions airflow/contrib/operators/kubernetes_pod_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
:type security_context: dict
:param dnspolicy: dnspolicy for the pod.
:type dnspolicy: str
:param schedulername: schedulername for the pod.
:type schedulername: str
"""
template_fields = ('cmds', 'arguments', 'env_vars', 'config_file')

Expand Down Expand Up @@ -147,6 +149,7 @@ def __init__(self, # pylint: disable=too-many-arguments,too-many-locals
pod_runtime_info_envs=None,
dnspolicy=None,
do_xcom_push=False,
schedulername=None,
*args,
**kwargs):
# https://github.com/apache/airflow/blob/2d0eff4ee4fafcf8c7978ac287a8fb968e56605f/UPDATING.md#unification-of-do_xcom_push-flag
Expand Down Expand Up @@ -188,6 +191,7 @@ def __init__(self, # pylint: disable=too-many-arguments,too-many-locals
self.security_context = security_context or {}
self.pod_runtime_info_envs = pod_runtime_info_envs or []
self.dnspolicy = dnspolicy
self.schedulername = schedulername

def execute(self, context):
try:
Expand Down Expand Up @@ -241,6 +245,7 @@ def execute(self, context):
pod.security_context = self.security_context
pod.pod_runtime_info_envs = self.pod_runtime_info_envs
pod.dnspolicy = self.dnspolicy
pod.schedulername = self.schedulername

launcher = pod_launcher.PodLauncher(kube_client=client,
extract_xcom=self.do_xcom_push)
Expand Down

0 comments on commit a16b48c

Please sign in to comment.