diff --git a/PandaPkgInfo.py b/PandaPkgInfo.py index d2249bc8..2ca644c4 100644 --- a/PandaPkgInfo.py +++ b/PandaPkgInfo.py @@ -1 +1 @@ -release_version = "0.4.4" +release_version = "0.4.5" diff --git a/pandajedi/jedicore/JediDBProxy.py b/pandajedi/jedicore/JediDBProxy.py index cfb5e7a0..6ff6b8ed 100644 --- a/pandajedi/jedicore/JediDBProxy.py +++ b/pandajedi/jedicore/JediDBProxy.py @@ -14,6 +14,7 @@ import numpy from pandacommon.pandalogger.PandaLogger import PandaLogger +from pandacommon.pandautils.PandaUtils import batched from pandaserver.taskbuffer import EventServiceUtils, JobUtils, OraDBProxy from pandajedi.jediconfig import jedi_config @@ -13539,7 +13540,7 @@ def getAvgDiskIO_JEDI(self): return {} # update input files stage-in done according to message from idds - def updateInputFilesStagedAboutIdds_JEDI(self, jeditaskid, scope, filenames_dict): + def updateInputFilesStagedAboutIdds_JEDI(self, jeditaskid, scope, filenames_dict, chunk_size=500): comment = " /* JediDBProxy.updateInputFilesStagedAboutIdds_JEDI */" methodName = self.getMethodName(comment) methodName += f" < jediTaskID={jeditaskid} >" @@ -13558,22 +13559,24 @@ def updateInputFilesStagedAboutIdds_JEDI(self, jeditaskid, scope, filenames_dict # sql to update file status if scope != "pseudo_dataset": sqlUF = ( - "UPDATE {0}.JEDI_Dataset_Contents " - "SET status=:new_status " - "WHERE jediTaskID=:jediTaskID " - "AND status=:old_status " - "AND scope=:scope " - "AND lfn=:lfn " - ).format(jedi_config.db.schemaJEDI) + f"UPDATE {jedi_config.db.schemaJEDI}.JEDI_Dataset_Contents " + f"SET status=:new_status " + f"WHERE jediTaskID=:jediTaskID " + f"AND status=:old_status " + ) + sqlUF_with_lfn = sqlUF + "AND scope=:scope AND lfn=:lfn " + sqlUF_with_fileID = sqlUF + "AND fileID=:fileID " else: sqlUF = ( - "UPDATE {0}.JEDI_Dataset_Contents " - "SET status=:new_status " - "WHERE jediTaskID=:jediTaskID " - "AND status=:old_status " - "AND scope IS NULL " - "AND lfn like :lfn " - ).format(jedi_config.db.schemaJEDI) + f"UPDATE {jedi_config.db.schemaJEDI}.JEDI_Dataset_Contents " + f"SET status=:new_status " + f"WHERE jediTaskID=:jediTaskID " + f"AND status=:old_status " + f"AND scope IS NULL " + ) + sqlUF_with_lfn = sqlUF + "AND lfn like :lfn " + sqlUF_with_fileID = sqlUF + "AND fileID=:fileID " + sqlUF_with_datasetID = sqlUF_with_lfn + "AND datasetID=:datasetID " # begin transaction self.conn.begin() # get datasetIDs from DB if no fileID nor datasetID provided by the message @@ -13601,9 +13604,7 @@ def updateInputFilesStagedAboutIdds_JEDI(self, jeditaskid, scope, filenames_dict # set sqls to update file status params_key_str = ",".join(params_key_list) datesetid_list_str = f"AND datasetID IN ({params_key_str}) " - sqlUF_without_ID = sqlUF + datesetid_list_str - sqlUF_with_fileID = sqlUF + "AND fileID=:fileID " - sqlUF_with_datasetID = sqlUF + "AND datasetID=:datasetID " + sqlUF_without_ID = sqlUF_with_lfn + datesetid_list_str # update files if to_update_files: # split into groups according to whether with ids @@ -13622,49 +13623,53 @@ def updateInputFilesStagedAboutIdds_JEDI(self, jeditaskid, scope, filenames_dict filenames_dict_without_ID[filename] = (datasetid, fileid) # loop over files with fileID if filenames_dict_with_fileID: - varMaps = [] - for filename, (datasetid, fileid) in filenames_dict_with_fileID.items(): - tmp_varMap = varMap.copy() - if scope != "pseudo_dataset": - tmp_varMap[":lfn"] = filename - else: - tmp_varMap[":lfn"] = "%" + filename - tmp_varMap[":fileID"] = fileid - varMaps.append(tmp_varMap) - tmpLog.debug(f"tmp_varMap: {tmp_varMap}") - tmpLog.debug(f"running sql executemany: {sqlUF_with_fileID}") - self.cur.executemany(sqlUF_with_fileID + comment, varMaps) - retVal += self.cur.rowcount + for one_batch in batched(filenames_dict_with_fileID.items(), chunk_size): + # loop batches of executemany + varMaps = [] + for filename, (datasetid, fileid) in one_batch: + tmp_varMap = varMap.copy() + tmp_varMap[":fileID"] = fileid + if ":scope" in tmp_varMap: + del tmp_varMap[":scope"] + varMaps.append(tmp_varMap) + tmpLog.debug(f"tmp_varMap: {tmp_varMap}") + tmpLog.debug(f"running sql executemany: {sqlUF_with_fileID} for {len(varMaps)} items") + self.cur.executemany(sqlUF_with_fileID + comment, varMaps) + retVal += self.cur.rowcount # loop over files with datasetID if filenames_dict_with_datasetID: - varMaps = [] - for filename, (datasetid, fileid) in filenames_dict_with_datasetID.items(): - tmp_varMap = varMap.copy() - if scope != "pseudo_dataset": - tmp_varMap[":lfn"] = filename - else: - tmp_varMap[":lfn"] = "%" + filename - tmp_varMap[":datasetID"] = datasetid - varMaps.append(tmp_varMap) - tmpLog.debug(f"tmp_varMap: {tmp_varMap}") - tmpLog.debug(f"running sql executemany: {sqlUF_with_datasetID}") - self.cur.executemany(sqlUF_with_datasetID + comment, varMaps) - retVal += self.cur.rowcount + for one_batch in batched(filenames_dict_with_datasetID.items(), chunk_size): + # loop batches of executemany + varMaps = [] + for filename, (datasetid, fileid) in one_batch: + tmp_varMap = varMap.copy() + if scope != "pseudo_dataset": + tmp_varMap[":lfn"] = filename + else: + tmp_varMap[":lfn"] = "%" + filename + tmp_varMap[":datasetID"] = datasetid + varMaps.append(tmp_varMap) + tmpLog.debug(f"tmp_varMap: {tmp_varMap}") + tmpLog.debug(f"running sql executemany: {sqlUF_with_datasetID} for {len(varMaps)} items") + self.cur.executemany(sqlUF_with_datasetID + comment, varMaps) + retVal += self.cur.rowcount # loop over files without ID if filenames_dict_without_ID: - varMaps = [] - for filename, (datasetid, fileid) in filenames_dict_without_ID.items(): - tmp_varMap = varMap.copy() - if scope != "pseudo_dataset": - tmp_varMap[":lfn"] = filename - else: - tmp_varMap[":lfn"] = "%" + filename - tmp_varMap.update(var_map_datasetids) - varMaps.append(tmp_varMap) - tmpLog.debug(f"tmp_varMap: {tmp_varMap}") - tmpLog.debug(f"running sql executemany: {sqlUF_without_ID}") - self.cur.executemany(sqlUF_without_ID + comment, varMaps) - retVal += self.cur.rowcount + for one_batch in batched(filenames_dict_without_ID.items(), chunk_size): + # loop batches of executemany + varMaps = [] + for filename, (datasetid, fileid) in one_batch: + tmp_varMap = varMap.copy() + if scope != "pseudo_dataset": + tmp_varMap[":lfn"] = filename + else: + tmp_varMap[":lfn"] = "%" + filename + tmp_varMap.update(var_map_datasetids) + varMaps.append(tmp_varMap) + tmpLog.debug(f"tmp_varMap: {tmp_varMap}") + tmpLog.debug(f"running sql executemany: {sqlUF_without_ID} for {len(varMaps)} items") + self.cur.executemany(sqlUF_without_ID + comment, varMaps) + retVal += self.cur.rowcount # update associated files if primaryID is not None: self.fix_associated_files_in_staging(jeditaskid, primary_id=primaryID) diff --git a/pandajedi/jedimsgprocessor/status_report_msg_processor.py b/pandajedi/jedimsgprocessor/status_report_msg_processor.py index 62f1699e..3abe7982 100644 --- a/pandajedi/jedimsgprocessor/status_report_msg_processor.py +++ b/pandajedi/jedimsgprocessor/status_report_msg_processor.py @@ -52,9 +52,9 @@ class StatusReportMsgProcPlugin(BaseMsgProcPlugin): def initialize(self): BaseMsgProcPlugin.initialize(self) - # forwarding plugins: incoming message will be fowarded to process method of these plugins + # forwarding plugins: incoming message will be forwarded to process method of these plugins self.forwarding_plugins = [] - forwarding_plugin_names = self.params.get("forwading_plugins", []) + forwarding_plugin_names = self.params.get("forwarding_plugins", []) if "kafka" in forwarding_plugin_names: # Kafka from pandajedi.jedimsgprocessor.kafka_msg_processor import ( diff --git a/pyproject.toml b/pyproject.toml index 34e98478..0412f6f9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,8 +12,8 @@ authors = [ { name = "PanDA Team", email = "panda-support@cern.ch" }, ] dependencies = [ - 'panda-common>=0.0.38', - 'panda-server>=0.3.23', + 'panda-common>=0.1.3', + 'panda-server>=0.4.2', 'python-daemon', 'numpy', 'pyyaml',