Skip to content

Commit

Permalink
Merge pull request #243 from HSF/k8s_tokens
Browse files Browse the repository at this point in the history
K8s: support for pilot-panda server tokens
  • Loading branch information
fbarreir authored Oct 1, 2024
2 parents 24e0a6a + 1964d84 commit 9f42bf0
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 55 deletions.
1 change: 0 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ repos:
rev: '1.0.1' # Specify a specific version if desired
hooks:
- id: flynt
exclude: ^pandaharvester/harvestercloud/pilots_starter\.py$

- repo: https://github.com/psf/black
rev: 24.2.0
Expand Down
90 changes: 46 additions & 44 deletions pandaharvester/harvestercloud/pilots_starter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,16 @@
post-multipart code was taken from: https://github.com/haiwen/webapi-examples/blob/master/python/upload-file.py
"""

try:
import subprocess32 as subprocess
except Exception:
import subprocess

try:
import http.client as httplib # for python 3
except Exception:
import httplib # for python 2

try:
import urllib.parse as urlparse # for python 3
except ImportError:
import urlparse # for python 2

import http.client as httplib
import logging
import mimetypes
import os
import shutil
import ssl
import subprocess
import sys
import traceback
import urllib.parse as urlparse

WORK_DIR = "/scratch"
CONFIG_DIR = "/scratch/jobconfig"
Expand Down Expand Up @@ -74,14 +62,14 @@ def encode_multipart_formdata(files):
L = []
for key, filename, value in files:
L.append("--" + BOUNDARY)
L.append('Content-Disposition: form-data; name="%s"; filename="%s"' % (key, filename))
L.append("Content-Type: %s" % get_content_type(filename))
L.append(f'Content-Disposition: form-data; name="{key}"; filename="{filename}"')
L.append(f"Content-Type: {get_content_type(filename)}")
L.append("")
L.append(value)
L.append("--" + BOUNDARY + "--")
L.append("")
body = CRLF.join(L)
content_type = "multipart/form-data; boundary=%s" % BOUNDARY
content_type = f"multipart/form-data; boundary={BOUNDARY}"
return content_type, body


Expand All @@ -92,25 +80,24 @@ def get_content_type(filename):
def upload_logs(url, log_file_name, destination_name, proxy_cert):
try:
full_url = url + "/putFile"
urlparts = urlparse.urlsplit(full_url)
url_parts = urlparse.urlsplit(full_url)

logging.debug("[upload_logs] start")
files = [("file", destination_name, open(log_file_name).read())]
status, reason = post_multipart(urlparts.hostname, urlparts.port, urlparts.path, files, proxy_cert)
logging.debug("[upload_logs] finished with code={0} msg={1}".format(status, reason))
status, reason = post_multipart(url_parts.hostname, url_parts.port, url_parts.path, files, proxy_cert)
logging.debug(f"[upload_logs] finished with code={status} msg={reason}")
if status == 200:
return True
except Exception:
err_type, err_value = sys.exc_info()[:2]
err_messsage = "failed to put with {0}:{1} ".format(err_type, err_value)
err_messsage += traceback.format_exc()
logging.debug("[upload_logs] excepted with:\n {0}".format(err_messsage))
err_message = f"failed to put with {err_type}:{err_value} "
err_message += traceback.format_exc()
logging.debug(f"[upload_logs] excepted with:\n {err_message}")

return False


def copy_files_in_dir(src_dir, dst_dir):
# src_files = os.listdir(src_dir)
for file_name in CONFIG_FILES:
full_file_name = os.path.join(src_dir, file_name)
shutil.copy(full_file_name, dst_dir)
Expand Down Expand Up @@ -168,43 +155,58 @@ def get_configuration():
os.environ["X509_USER_PROXY"] = proxy_path
logging.debug("[main] initialized proxy")

# copy the pilot-panda token and token key to the work directory
token_path = os.environ.get("PANDA_AUTH_DIR")
token_filename = os.environ.get("PANDA_AUTH_TOKEN")
token_key_filename = os.environ.get("PANDA_AUTH_TOKEN_KEY")
logging.debug(f"[main] token info {token_path} {token_filename} {token_key_filename}")
if token_path and token_filename and token_key_filename:
full_token_path = os.path.join(token_path, token_filename)
copy_proxy(full_token_path, WORK_DIR)

full_token_key_path = os.path.join(token_path, token_key_filename)
copy_proxy(full_token_key_path, WORK_DIR)
logging.debug("[main] initialized pilot-panda tokens")
else:
os.unsetenv("PANDA_AUTH_TOKEN")

# get the panda site name
panda_site = os.environ.get("computingSite")
logging.debug("[main] got panda site: {0}".format(panda_site))
logging.debug(f"[main] got panda site: {panda_site}")

# get the panda queue name
panda_queue = os.environ.get("pandaQueueName")
logging.debug("[main] got panda queue: {0}".format(panda_queue))
logging.debug(f"[main] got panda queue: {panda_queue}")

# get the resource type of the worker
resource_type = os.environ.get("resourceType")
logging.debug("[main] got resource type: {0}".format(resource_type))
logging.debug(f"[main] got resource type: {resource_type}")

prodSourceLabel = os.environ.get("prodSourceLabel")
logging.debug("[main] got prodSourceLabel: {0}".format(prodSourceLabel))
logging.debug(f"[main] got prodSourceLabel: {prodSourceLabel}")

job_type = os.environ.get("jobType")
logging.debug("[main] got job type: {0}".format(job_type))
logging.debug(f"[main] got job type: {job_type}")

pilot_type = os.environ.get("pilotType", "")
logging.debug("[main] got pilotType: {0}".format(pilot_type))
logging.debug(f"[main] got pilotType: {pilot_type}")

pilot_url_option = os.environ.get("pilotUrlOpt", "")
logging.debug("[main] got pilotUrlOpt: {0}".format(pilot_url_option))
logging.debug(f"[main] got pilotUrlOpt: {pilot_url_option}")

python_option = os.environ.get("pythonOption", "")
logging.debug("[main] got pythonOption: {0}".format(python_option))
logging.debug(f"[main] got pythonOption: {python_option}")

pilot_version = os.environ.get("pilotVersion", "")
logging.debug("[main] got pilotVersion: {0}".format(pilot_version))
logging.debug(f"[main] got pilotVersion: {pilot_version}")

# get the Harvester ID
harvester_id = os.environ.get("HARVESTER_ID")
logging.debug("[main] got Harvester ID: {0}".format(harvester_id))
logging.debug(f"[main] got Harvester ID: {harvester_id}")

# get the worker id
worker_id = os.environ.get("workerID")
logging.debug("[main] got worker ID: {0}".format(worker_id))
logging.debug(f"[main] got worker ID: {worker_id}")

# get the URL (e.g. panda cache) to upload logs
logs_frontend_w = os.environ.get("logs_frontend_w")
Expand All @@ -217,7 +219,7 @@ def get_configuration():
# get the filename to use for the stdout log
stdout_name = os.environ.get("stdout_name")
if not stdout_name:
stdout_name = "{0}_{1}.out".format(harvester_id, worker_id)
stdout_name = f"{harvester_id}_{worker_id}.out"

logging.debug("[main] got filename for the stdout log")

Expand Down Expand Up @@ -268,31 +270,31 @@ def get_configuration():
) = get_configuration()

# the pilot should propagate the download link via the pilotId field in the job table
log_download_url = "{0}/{1}".format(logs_frontend_r, destination_name)
log_download_url = f"{logs_frontend_r}/{destination_name}"
os.environ["GTAG"] = log_download_url # GTAG env variable is read by pilot

# execute the pilot wrapper
logging.debug("[main] starting pilot wrapper...")
resource_type_option = ""
if resource_type:
resource_type_option = "--resource-type {0}".format(resource_type)
resource_type_option = f"--resource-type {resource_type}"

if prodSourceLabel:
psl_option = "-j {0}".format(prodSourceLabel)
psl_option = f"-j {prodSourceLabel}"
else:
psl_option = "-j managed"

job_type_option = ""
if job_type:
job_type_option = "--job-type {0}".format(job_type)
job_type_option = f"--job-type {job_type}"

pilot_type_option = "-i PR"
if pilot_type:
pilot_type_option = "-i {0}".format(pilot_type)
pilot_type_option = f"-i {pilot_type}"

pilot_version_option = "--pilotversion 2"
if pilot_version:
pilot_version_option = "--pilotversion {0}".format(pilot_version)
pilot_version_option = f"--pilotversion {pilot_version}"

wrapper_params = "-q {0} -r {1} -s {2} -a {3} {4} {5} {6} {7} {8} {9} {10}".format(
panda_queue,
Expand Down Expand Up @@ -327,7 +329,7 @@ def get_configuration():
logging.error(traceback.format_exc())
return_code = 1

logging.debug("[main] pilot wrapper done with return code {0} ...".format(return_code))
logging.debug(f"[main] pilot wrapper done with return code {return_code} ...")

# upload logs to e.g. panda cache or similar
upload_logs(logs_frontend_w, WORK_DIR + "/wrapper-wid.log", destination_name, proxy_path)
Expand Down
19 changes: 18 additions & 1 deletion pandaharvester/harvestermisc/k8s_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,20 @@ def read_yaml_file(self, yaml_file):
return yaml_content

def create_job_from_yaml(
self, yaml_content, work_spec, prod_source_label, pilot_type, pilot_url_str, pilot_python_option, pilot_version, host_image, cert, max_time=None
self,
yaml_content,
work_spec,
prod_source_label,
pilot_type,
pilot_url_str,
pilot_python_option,
pilot_version,
host_image,
cert,
panda_token_path,
panda_token_filename,
panda_token_key_filename,
max_time=None,
):
tmp_log = core_utils.make_logger(base_logger, f"queue_name={self.queue_name}", method_name="create_job_from_yaml")

Expand Down Expand Up @@ -193,6 +206,10 @@ def create_job_from_yaml(
{"name": "pilotVersion", "value": pilot_version},
{"name": "jobType", "value": work_spec.jobType},
{"name": "proxySecretPath", "value": cert},
{"name": "PANDA_AUTH_ORIGIN", "value": "atlas.pilot"},
{"name": "PANDA_AUTH_DIR", "value": panda_token_path},
{"name": "PANDA_AUTH_TOKEN", "value": panda_token_filename},
{"name": "PANDA_AUTH_TOKEN_KEY", "value": panda_token_key_filename},
{"name": "workerID", "value": str(work_spec.workerID)},
{"name": "logs_frontend_w", "value": harvester_config.pandacon.pandaCacheURL_W},
{"name": "logs_frontend_r", "value": harvester_config.pandacon.pandaCacheURL_R},
Expand Down
1 change: 0 additions & 1 deletion pandaharvester/harvestersubmitter/htcondor_submitter.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import datetime
import errno
import json
import os
Expand Down
30 changes: 22 additions & 8 deletions pandaharvester/harvestersubmitter/k8s_submitter.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import os
import traceback
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import unquote # Python 3+

from pandaharvester.harvesterconfig import harvester_config
from pandaharvester.harvestercore import core_utils
Expand All @@ -11,14 +10,11 @@
from pandaharvester.harvestermisc.k8s_utils import k8s_Client
from pandaharvester.harvestersubmitter import submitter_common

# logger
base_logger = core_utils.setup_logger("k8s_submitter")

# submitter for K8S


# submitter for K8S
class K8sSubmitter(PluginBase):
# constructor
def __init__(self, **kwarg):
self.logBaseURL = None
PluginBase.__init__(self, **kwarg)
Expand All @@ -34,7 +30,7 @@ def __init__(self, **kwarg):
self.k8s_client.create_or_patch_configmap_starter()

# allowed associated parameters from CRIC
self._allowed_agis_attrs = ("pilot_url",)
self.allowed_cric_attrs = ("pilot_url",)

# number of processes
try:
Expand All @@ -59,6 +55,11 @@ def __init__(self, **kwarg):
if os.getenv("PROXY_SECRET_PATH_ANAL"):
self.proxySecretPath = os.getenv("PROXY_SECRET_PATH_ANAL")

# token for pilot-pandaserver communications
self.pandaTokenPath = getattr(self, "tokenDir", None)
self.pandaTokenFilename = getattr(self, "pandaTokenFilename", None)
self.pandaTokenKeyFilename = getattr(self, "pandaTokenKeyFilename", None)

def _choose_proxy(self, workspec, is_grandly_unified_queue):
"""
Choose the proxy based on the job type
Expand Down Expand Up @@ -108,7 +109,7 @@ def submit_k8s_worker(self, work_spec):

associated_params_dict = {}
for key, val in self.panda_queues_dict.get_harvester_params(self.queueName).items():
if key in self._allowed_agis_attrs:
if key in self.allowed_cric_attrs:
associated_params_dict[key] = val

pilot_url = associated_params_dict.get("pilot_url")
Expand All @@ -131,8 +132,21 @@ def submit_k8s_worker(self, work_spec):

# submit the worker
rsp, yaml_content_final = self.k8s_client.create_job_from_yaml(
yaml_content, work_spec, prod_source_label, pilot_type, pilot_url_str, pilot_python_option, pilot_version, host_image, cert, max_time=max_time
yaml_content,
work_spec,
prod_source_label,
pilot_type,
pilot_url_str,
pilot_python_option,
pilot_version,
host_image,
cert,
self.pandaTokenPath,
self.pandaTokenFilename,
self.pandaTokenKeyFilename,
max_time=max_time,
)

except Exception as _e:
tmp_log.error(traceback.format_exc())
err_str = f"Failed to create a JOB; {_e}"
Expand Down

0 comments on commit 9f42bf0

Please sign in to comment.