diff --git a/pandaharvester/commit_timestamp.py b/pandaharvester/commit_timestamp.py index 56c5c11f..3a7c3181 100644 --- a/pandaharvester/commit_timestamp.py +++ b/pandaharvester/commit_timestamp.py @@ -1 +1 @@ -timestamp = "25-04-2024 07:49:43 on flin (by mightqxc)" +timestamp = "07-05-2024 09:25:45 on flin (by mightqxc)" diff --git a/pandaharvester/harvestermonitor/htcondor_monitor.py b/pandaharvester/harvestermonitor/htcondor_monitor.py index 2b1693c5..41a34baf 100644 --- a/pandaharvester/harvestermonitor/htcondor_monitor.py +++ b/pandaharvester/harvestermonitor/htcondor_monitor.py @@ -203,6 +203,13 @@ class HTCondorMonitor(PluginBase): # constructor def __init__(self, **kwarg): PluginBase.__init__(self, **kwarg) + extra_plugin_configs = {} + try: + extra_plugin_configs = harvester_config.master.extraPluginConfigs["HTCondorMonitor"] + except AttributeError: + pass + except KeyError: + pass try: self.nProcesses except AttributeError: @@ -228,7 +235,10 @@ def __init__(self, **kwarg): try: self.useCondorHistory except AttributeError: - self.useCondorHistory = True + if extra_plugin_configs.get("use_condor_history") is False: + self.useCondorHistory = False + else: + self.useCondorHistory = True try: self.submissionHost_list except AttributeError: diff --git a/pandaharvester/harvestersubmitter/htcondor_submitter.py b/pandaharvester/harvestersubmitter/htcondor_submitter.py index f5c54bb0..826b0193 100644 --- a/pandaharvester/harvestersubmitter/htcondor_submitter.py +++ b/pandaharvester/harvestersubmitter/htcondor_submitter.py @@ -232,6 +232,7 @@ def make_a_jdl( pilot_type_opt = pilot_opt_dict["pilot_type_opt"] pilot_url_str = pilot_opt_dict["pilot_url_str"] pilot_debug_str = pilot_opt_dict["pilot_debug_str"] + tmpLog.debug(f"pilot options: {pilot_opt_dict}") # get token filename according to CE token_filename = None if token_dir is not None and ce_info_dict.get("ce_endpoint"): @@ -343,13 +344,19 @@ class HTCondorSubmitter(PluginBase): def __init__(self, **kwarg): tmpLog = core_utils.make_logger(baseLogger, method_name="__init__") self.logBaseURL = None - self.templateFile = None if hasattr(self, "useFQDN") and self.useFQDN: self.hostname = socket.getfqdn() else: self.hostname = socket.gethostname().split(".")[0] PluginBase.__init__(self, **kwarg) - + # extra plugin configs + extra_plugin_configs = {} + try: + extra_plugin_configs = harvester_config.master.extraPluginConfigs["HTCondorSubmitter"] + except AttributeError: + pass + except KeyError: + pass # number of processes try: self.nProcesses @@ -437,6 +444,11 @@ def __init__(self, **kwarg): self.useCRICGridCE = False finally: self.useCRIC = self.useCRIC or self.useCRICGridCE + # sdf template + try: + self.templateFile + except AttributeError: + self.templateFile = None # sdf template directories of CEs; ignored if templateFile is set try: self.CEtemplateDir @@ -503,24 +515,20 @@ def __init__(self, **kwarg): self.rcPilotRandomWeightPermille = 0 # submission to ARC CE's with nordugrid (gridftp) or arc (REST) grid type self.submit_arc_grid_type = "arc" - try: - extra_plugin_configs = harvester_config.master.extraPluginConfigs["HTCondorSubmitter"] - except AttributeError: - pass - except KeyError: - pass - else: - if extra_plugin_configs.get("submit_arc_grid_type") == "nordugrid": - self.submit_arc_grid_type = "nordugrid" + if extra_plugin_configs.get("submit_arc_grid_type") == "nordugrid": + self.submit_arc_grid_type = "nordugrid" # record of information of CE statistics self.ceStatsLock = threading.Lock() self.ceStats = dict() - # allowed associated parameters from CRIC - self._allowed_cric_attrs = ( + # allowed associated parameters and paramester prefixes from CRIC + self._allowed_cric_attrs = [ "pilot_url", "pilot_args", "unified_dispatch", - ) + ] + self._allowed_cric_attr_prefixes = [ + "jdl.plusattr.", + ] # get CE statistics of a site def get_ce_statistics(self, site_name, n_new_workers, time_window=21600): @@ -577,7 +585,9 @@ def submit_workers(self, workspec_list): # tmpLog.debug('panda_queues_name and queue_info: {0}, {1}'.format(self.queueName, panda_queues_dict[self.queueName])) # associated params on CRIC for key, val in panda_queues_dict.get_harvester_params(self.queueName).items(): - if key in self._allowed_cric_attrs: + if not isinstance(key, str): + continue + if key in self._allowed_cric_attrs or any([key.startswith(the_prefix) for the_prefix in self._allowed_cric_attr_prefixes]): if isinstance(val, str): # sanitized list the value val = re.sub(r"[;$~`]*", "", val) @@ -594,8 +604,26 @@ def submit_workers(self, workspec_list): pilot_url = associated_params_dict.get("pilot_url") pilot_args = associated_params_dict.get("pilot_args", "") pilot_version = str(this_panda_queue_dict.get("pilot_version", "current")) - python_version = str(this_panda_queue_dict.get("python_version", "2")) + python_version = str(this_panda_queue_dict.get("python_version", "3")) is_gpu_resource = this_panda_queue_dict.get("resource_type", "") == "gpu" + custom_submit_attr_dict = {} + for k, v in associated_params_dict.items(): + # fill custom submit attributes for adding to JDL + try: + the_prefix = "jdl.plusattr." + if k.startswith(the_prefix): + attr_key = k[len(the_prefix) :] + attr_value = str(v) + if not re.fullmatch(r"[a-zA-Z_0-9][a-zA-Z_0-9.\-]*", attr_key): + # skip invalid key + continue + if not re.fullmatch(r"[a-zA-Z_0-9.\-,]+", attr_value): + # skip invalid value + continue + custom_submit_attr_dict[attr_key] = attr_value + except Exception as e: + tmpLog.warning(f'Got {e} with custom submit attributes "{k}: {v}"; skipped') + continue # get override requirements from queue configured try: @@ -763,6 +791,7 @@ def _choose_credential(workspec): ce_info_dict["ce_endpoint"] = self.ceEndpoint except AttributeError: pass + tmpLog.debug(f"Got pilot version: \"{pilot_version}\"; CE endpoint: \"{ce_info_dict.get('ce_endpoint')}\"") try: # Manually define ceQueueName if self.ceQueueName: @@ -872,6 +901,7 @@ def _choose_credential(workspec): "is_unified_dispatch": is_unified_dispatch, "prod_rc_permille": self.rcPilotRandomWeightPermille, "is_gpu_resource": is_gpu_resource, + "custom_submit_attr_dict": custom_submit_attr_dict, } ) return data diff --git a/pandaharvester/panda_pkg_info.py b/pandaharvester/panda_pkg_info.py index 48f566d4..ef2eb181 100644 --- a/pandaharvester/panda_pkg_info.py +++ b/pandaharvester/panda_pkg_info.py @@ -1 +1 @@ -release_version = "0.5.2" +release_version = "0.5.3"