diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index d0cf813b..84e6d69d 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -3,7 +3,6 @@ repos: rev: '1.0.1' # Specify a specific version if desired hooks: - id: flynt - exclude: ^pandaharvester/harvestercloud/pilots_starter\.py$ - repo: https://github.com/psf/black rev: 24.2.0 diff --git a/pandaharvester/harvestercloud/pilots_starter.py b/pandaharvester/harvestercloud/pilots_starter.py index fd3a995f..846ba012 100644 --- a/pandaharvester/harvestercloud/pilots_starter.py +++ b/pandaharvester/harvestercloud/pilots_starter.py @@ -10,28 +10,16 @@ post-multipart code was taken from: https://github.com/haiwen/webapi-examples/blob/master/python/upload-file.py """ -try: - import subprocess32 as subprocess -except Exception: - import subprocess - -try: - import http.client as httplib # for python 3 -except Exception: - import httplib # for python 2 - -try: - import urllib.parse as urlparse # for python 3 -except ImportError: - import urlparse # for python 2 - +import http.client as httplib import logging import mimetypes import os import shutil import ssl +import subprocess import sys import traceback +import urllib.parse as urlparse WORK_DIR = "/scratch" CONFIG_DIR = "/scratch/jobconfig" @@ -74,14 +62,14 @@ def encode_multipart_formdata(files): L = [] for key, filename, value in files: L.append("--" + BOUNDARY) - L.append('Content-Disposition: form-data; name="%s"; filename="%s"' % (key, filename)) - L.append("Content-Type: %s" % get_content_type(filename)) + L.append(f'Content-Disposition: form-data; name="{key}"; filename="{filename}"') + L.append(f"Content-Type: {get_content_type(filename)}") L.append("") L.append(value) L.append("--" + BOUNDARY + "--") L.append("") body = CRLF.join(L) - content_type = "multipart/form-data; boundary=%s" % BOUNDARY + content_type = f"multipart/form-data; boundary={BOUNDARY}" return content_type, body @@ -92,25 +80,24 @@ def get_content_type(filename): def upload_logs(url, log_file_name, destination_name, proxy_cert): try: full_url = url + "/putFile" - urlparts = urlparse.urlsplit(full_url) + url_parts = urlparse.urlsplit(full_url) logging.debug("[upload_logs] start") files = [("file", destination_name, open(log_file_name).read())] - status, reason = post_multipart(urlparts.hostname, urlparts.port, urlparts.path, files, proxy_cert) - logging.debug("[upload_logs] finished with code={0} msg={1}".format(status, reason)) + status, reason = post_multipart(url_parts.hostname, url_parts.port, url_parts.path, files, proxy_cert) + logging.debug(f"[upload_logs] finished with code={status} msg={reason}") if status == 200: return True except Exception: err_type, err_value = sys.exc_info()[:2] - err_messsage = "failed to put with {0}:{1} ".format(err_type, err_value) - err_messsage += traceback.format_exc() - logging.debug("[upload_logs] excepted with:\n {0}".format(err_messsage)) + err_message = f"failed to put with {err_type}:{err_value} " + err_message += traceback.format_exc() + logging.debug(f"[upload_logs] excepted with:\n {err_message}") return False def copy_files_in_dir(src_dir, dst_dir): - # src_files = os.listdir(src_dir) for file_name in CONFIG_FILES: full_file_name = os.path.join(src_dir, file_name) shutil.copy(full_file_name, dst_dir) @@ -168,43 +155,58 @@ def get_configuration(): os.environ["X509_USER_PROXY"] = proxy_path logging.debug("[main] initialized proxy") + # copy the pilot-panda token and token key to the work directory + token_path = os.environ.get("PANDA_AUTH_DIR") + token_filename = os.environ.get("PANDA_AUTH_TOKEN") + token_key_filename = os.environ.get("PANDA_AUTH_TOKEN_KEY") + logging.debug(f"[main] token info {token_path} {token_filename} {token_key_filename}") + if token_path and token_filename and token_key_filename: + full_token_path = os.path.join(token_path, token_filename) + copy_proxy(full_token_path, WORK_DIR) + + full_token_key_path = os.path.join(token_path, token_key_filename) + copy_proxy(full_token_key_path, WORK_DIR) + logging.debug("[main] initialized pilot-panda tokens") + else: + os.unsetenv("PANDA_AUTH_TOKEN") + # get the panda site name panda_site = os.environ.get("computingSite") - logging.debug("[main] got panda site: {0}".format(panda_site)) + logging.debug(f"[main] got panda site: {panda_site}") # get the panda queue name panda_queue = os.environ.get("pandaQueueName") - logging.debug("[main] got panda queue: {0}".format(panda_queue)) + logging.debug(f"[main] got panda queue: {panda_queue}") # get the resource type of the worker resource_type = os.environ.get("resourceType") - logging.debug("[main] got resource type: {0}".format(resource_type)) + logging.debug(f"[main] got resource type: {resource_type}") prodSourceLabel = os.environ.get("prodSourceLabel") - logging.debug("[main] got prodSourceLabel: {0}".format(prodSourceLabel)) + logging.debug(f"[main] got prodSourceLabel: {prodSourceLabel}") job_type = os.environ.get("jobType") - logging.debug("[main] got job type: {0}".format(job_type)) + logging.debug(f"[main] got job type: {job_type}") pilot_type = os.environ.get("pilotType", "") - logging.debug("[main] got pilotType: {0}".format(pilot_type)) + logging.debug(f"[main] got pilotType: {pilot_type}") pilot_url_option = os.environ.get("pilotUrlOpt", "") - logging.debug("[main] got pilotUrlOpt: {0}".format(pilot_url_option)) + logging.debug(f"[main] got pilotUrlOpt: {pilot_url_option}") python_option = os.environ.get("pythonOption", "") - logging.debug("[main] got pythonOption: {0}".format(python_option)) + logging.debug(f"[main] got pythonOption: {python_option}") pilot_version = os.environ.get("pilotVersion", "") - logging.debug("[main] got pilotVersion: {0}".format(pilot_version)) + logging.debug(f"[main] got pilotVersion: {pilot_version}") # get the Harvester ID harvester_id = os.environ.get("HARVESTER_ID") - logging.debug("[main] got Harvester ID: {0}".format(harvester_id)) + logging.debug(f"[main] got Harvester ID: {harvester_id}") # get the worker id worker_id = os.environ.get("workerID") - logging.debug("[main] got worker ID: {0}".format(worker_id)) + logging.debug(f"[main] got worker ID: {worker_id}") # get the URL (e.g. panda cache) to upload logs logs_frontend_w = os.environ.get("logs_frontend_w") @@ -217,7 +219,7 @@ def get_configuration(): # get the filename to use for the stdout log stdout_name = os.environ.get("stdout_name") if not stdout_name: - stdout_name = "{0}_{1}.out".format(harvester_id, worker_id) + stdout_name = f"{harvester_id}_{worker_id}.out" logging.debug("[main] got filename for the stdout log") @@ -268,31 +270,31 @@ def get_configuration(): ) = get_configuration() # the pilot should propagate the download link via the pilotId field in the job table - log_download_url = "{0}/{1}".format(logs_frontend_r, destination_name) + log_download_url = f"{logs_frontend_r}/{destination_name}" os.environ["GTAG"] = log_download_url # GTAG env variable is read by pilot # execute the pilot wrapper logging.debug("[main] starting pilot wrapper...") resource_type_option = "" if resource_type: - resource_type_option = "--resource-type {0}".format(resource_type) + resource_type_option = f"--resource-type {resource_type}" if prodSourceLabel: - psl_option = "-j {0}".format(prodSourceLabel) + psl_option = f"-j {prodSourceLabel}" else: psl_option = "-j managed" job_type_option = "" if job_type: - job_type_option = "--job-type {0}".format(job_type) + job_type_option = f"--job-type {job_type}" pilot_type_option = "-i PR" if pilot_type: - pilot_type_option = "-i {0}".format(pilot_type) + pilot_type_option = f"-i {pilot_type}" pilot_version_option = "--pilotversion 2" if pilot_version: - pilot_version_option = "--pilotversion {0}".format(pilot_version) + pilot_version_option = f"--pilotversion {pilot_version}" wrapper_params = "-q {0} -r {1} -s {2} -a {3} {4} {5} {6} {7} {8} {9} {10}".format( panda_queue, @@ -327,7 +329,7 @@ def get_configuration(): logging.error(traceback.format_exc()) return_code = 1 - logging.debug("[main] pilot wrapper done with return code {0} ...".format(return_code)) + logging.debug(f"[main] pilot wrapper done with return code {return_code} ...") # upload logs to e.g. panda cache or similar upload_logs(logs_frontend_w, WORK_DIR + "/wrapper-wid.log", destination_name, proxy_path) diff --git a/pandaharvester/harvestermisc/k8s_utils.py b/pandaharvester/harvestermisc/k8s_utils.py index fee8ad59..3c5e83f1 100644 --- a/pandaharvester/harvestermisc/k8s_utils.py +++ b/pandaharvester/harvestermisc/k8s_utils.py @@ -50,7 +50,20 @@ def read_yaml_file(self, yaml_file): return yaml_content def create_job_from_yaml( - self, yaml_content, work_spec, prod_source_label, pilot_type, pilot_url_str, pilot_python_option, pilot_version, host_image, cert, max_time=None + self, + yaml_content, + work_spec, + prod_source_label, + pilot_type, + pilot_url_str, + pilot_python_option, + pilot_version, + host_image, + cert, + panda_token_path, + panda_token_filename, + panda_token_key_filename, + max_time=None, ): tmp_log = core_utils.make_logger(base_logger, f"queue_name={self.queue_name}", method_name="create_job_from_yaml") @@ -193,6 +206,10 @@ def create_job_from_yaml( {"name": "pilotVersion", "value": pilot_version}, {"name": "jobType", "value": work_spec.jobType}, {"name": "proxySecretPath", "value": cert}, + {"name": "PANDA_AUTH_ORIGIN", "value": "atlas.pilot"}, + {"name": "PANDA_AUTH_DIR", "value": panda_token_path}, + {"name": "PANDA_AUTH_TOKEN", "value": panda_token_filename}, + {"name": "PANDA_AUTH_TOKEN_KEY", "value": panda_token_key_filename}, {"name": "workerID", "value": str(work_spec.workerID)}, {"name": "logs_frontend_w", "value": harvester_config.pandacon.pandaCacheURL_W}, {"name": "logs_frontend_r", "value": harvester_config.pandacon.pandaCacheURL_R}, diff --git a/pandaharvester/harvestersubmitter/htcondor_submitter.py b/pandaharvester/harvestersubmitter/htcondor_submitter.py index 20bacb1a..041ea852 100644 --- a/pandaharvester/harvestersubmitter/htcondor_submitter.py +++ b/pandaharvester/harvestersubmitter/htcondor_submitter.py @@ -1,4 +1,3 @@ -import datetime import errno import json import os diff --git a/pandaharvester/harvestersubmitter/k8s_submitter.py b/pandaharvester/harvestersubmitter/k8s_submitter.py index 928d2333..ee512d21 100644 --- a/pandaharvester/harvestersubmitter/k8s_submitter.py +++ b/pandaharvester/harvestersubmitter/k8s_submitter.py @@ -1,7 +1,6 @@ import os import traceback from concurrent.futures import ThreadPoolExecutor -from urllib.parse import unquote # Python 3+ from pandaharvester.harvesterconfig import harvester_config from pandaharvester.harvestercore import core_utils @@ -11,14 +10,11 @@ from pandaharvester.harvestermisc.k8s_utils import k8s_Client from pandaharvester.harvestersubmitter import submitter_common -# logger base_logger = core_utils.setup_logger("k8s_submitter") -# submitter for K8S - +# submitter for K8S class K8sSubmitter(PluginBase): - # constructor def __init__(self, **kwarg): self.logBaseURL = None PluginBase.__init__(self, **kwarg) @@ -34,7 +30,7 @@ def __init__(self, **kwarg): self.k8s_client.create_or_patch_configmap_starter() # allowed associated parameters from CRIC - self._allowed_agis_attrs = ("pilot_url",) + self.allowed_cric_attrs = ("pilot_url",) # number of processes try: @@ -59,6 +55,11 @@ def __init__(self, **kwarg): if os.getenv("PROXY_SECRET_PATH_ANAL"): self.proxySecretPath = os.getenv("PROXY_SECRET_PATH_ANAL") + # token for pilot-pandaserver communications + self.pandaTokenPath = getattr(self, "tokenDir", None) + self.pandaTokenFilename = getattr(self, "pandaTokenFilename", None) + self.pandaTokenKeyFilename = getattr(self, "pandaTokenKeyFilename", None) + def _choose_proxy(self, workspec, is_grandly_unified_queue): """ Choose the proxy based on the job type @@ -108,7 +109,7 @@ def submit_k8s_worker(self, work_spec): associated_params_dict = {} for key, val in self.panda_queues_dict.get_harvester_params(self.queueName).items(): - if key in self._allowed_agis_attrs: + if key in self.allowed_cric_attrs: associated_params_dict[key] = val pilot_url = associated_params_dict.get("pilot_url") @@ -131,8 +132,21 @@ def submit_k8s_worker(self, work_spec): # submit the worker rsp, yaml_content_final = self.k8s_client.create_job_from_yaml( - yaml_content, work_spec, prod_source_label, pilot_type, pilot_url_str, pilot_python_option, pilot_version, host_image, cert, max_time=max_time + yaml_content, + work_spec, + prod_source_label, + pilot_type, + pilot_url_str, + pilot_python_option, + pilot_version, + host_image, + cert, + self.pandaTokenPath, + self.pandaTokenFilename, + self.pandaTokenKeyFilename, + max_time=max_time, ) + except Exception as _e: tmp_log.error(traceback.format_exc()) err_str = f"Failed to create a JOB; {_e}"