Skip to content

Commit

Permalink
Merge pull request #283 from PanDAWMS/flin
Browse files Browse the repository at this point in the history
fix typo; batch executemany in updateInputFilesStagedAboutIdds
  • Loading branch information
mightqxc authored Nov 19, 2024
2 parents 736c398 + f34b245 commit e5f0498
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 62 deletions.
2 changes: 1 addition & 1 deletion PandaPkgInfo.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
release_version = "0.4.4"
release_version = "0.4.5"
119 changes: 62 additions & 57 deletions pandajedi/jedicore/JediDBProxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import numpy
from pandacommon.pandalogger.PandaLogger import PandaLogger
from pandacommon.pandautils.PandaUtils import batched
from pandaserver.taskbuffer import EventServiceUtils, JobUtils, OraDBProxy

from pandajedi.jediconfig import jedi_config
Expand Down Expand Up @@ -13539,7 +13540,7 @@ def getAvgDiskIO_JEDI(self):
return {}

# update input files stage-in done according to message from idds
def updateInputFilesStagedAboutIdds_JEDI(self, jeditaskid, scope, filenames_dict):
def updateInputFilesStagedAboutIdds_JEDI(self, jeditaskid, scope, filenames_dict, chunk_size=500):
comment = " /* JediDBProxy.updateInputFilesStagedAboutIdds_JEDI */"
methodName = self.getMethodName(comment)
methodName += f" < jediTaskID={jeditaskid} >"
Expand All @@ -13558,22 +13559,24 @@ def updateInputFilesStagedAboutIdds_JEDI(self, jeditaskid, scope, filenames_dict
# sql to update file status
if scope != "pseudo_dataset":
sqlUF = (
"UPDATE {0}.JEDI_Dataset_Contents "
"SET status=:new_status "
"WHERE jediTaskID=:jediTaskID "
"AND status=:old_status "
"AND scope=:scope "
"AND lfn=:lfn "
).format(jedi_config.db.schemaJEDI)
f"UPDATE {jedi_config.db.schemaJEDI}.JEDI_Dataset_Contents "
f"SET status=:new_status "
f"WHERE jediTaskID=:jediTaskID "
f"AND status=:old_status "
)
sqlUF_with_lfn = sqlUF + "AND scope=:scope AND lfn=:lfn "
sqlUF_with_fileID = sqlUF + "AND fileID=:fileID "
else:
sqlUF = (
"UPDATE {0}.JEDI_Dataset_Contents "
"SET status=:new_status "
"WHERE jediTaskID=:jediTaskID "
"AND status=:old_status "
"AND scope IS NULL "
"AND lfn like :lfn "
).format(jedi_config.db.schemaJEDI)
f"UPDATE {jedi_config.db.schemaJEDI}.JEDI_Dataset_Contents "
f"SET status=:new_status "
f"WHERE jediTaskID=:jediTaskID "
f"AND status=:old_status "
f"AND scope IS NULL "
)
sqlUF_with_lfn = sqlUF + "AND lfn like :lfn "
sqlUF_with_fileID = sqlUF + "AND fileID=:fileID "
sqlUF_with_datasetID = sqlUF_with_lfn + "AND datasetID=:datasetID "
# begin transaction
self.conn.begin()
# get datasetIDs from DB if no fileID nor datasetID provided by the message
Expand Down Expand Up @@ -13601,9 +13604,7 @@ def updateInputFilesStagedAboutIdds_JEDI(self, jeditaskid, scope, filenames_dict
# set sqls to update file status
params_key_str = ",".join(params_key_list)
datesetid_list_str = f"AND datasetID IN ({params_key_str}) "
sqlUF_without_ID = sqlUF + datesetid_list_str
sqlUF_with_fileID = sqlUF + "AND fileID=:fileID "
sqlUF_with_datasetID = sqlUF + "AND datasetID=:datasetID "
sqlUF_without_ID = sqlUF_with_lfn + datesetid_list_str
# update files
if to_update_files:
# split into groups according to whether with ids
Expand All @@ -13622,49 +13623,53 @@ def updateInputFilesStagedAboutIdds_JEDI(self, jeditaskid, scope, filenames_dict
filenames_dict_without_ID[filename] = (datasetid, fileid)
# loop over files with fileID
if filenames_dict_with_fileID:
varMaps = []
for filename, (datasetid, fileid) in filenames_dict_with_fileID.items():
tmp_varMap = varMap.copy()
if scope != "pseudo_dataset":
tmp_varMap[":lfn"] = filename
else:
tmp_varMap[":lfn"] = "%" + filename
tmp_varMap[":fileID"] = fileid
varMaps.append(tmp_varMap)
tmpLog.debug(f"tmp_varMap: {tmp_varMap}")
tmpLog.debug(f"running sql executemany: {sqlUF_with_fileID}")
self.cur.executemany(sqlUF_with_fileID + comment, varMaps)
retVal += self.cur.rowcount
for one_batch in batched(filenames_dict_with_fileID.items(), chunk_size):
# loop batches of executemany
varMaps = []
for filename, (datasetid, fileid) in one_batch:
tmp_varMap = varMap.copy()
tmp_varMap[":fileID"] = fileid
if ":scope" in tmp_varMap:
del tmp_varMap[":scope"]
varMaps.append(tmp_varMap)
tmpLog.debug(f"tmp_varMap: {tmp_varMap}")
tmpLog.debug(f"running sql executemany: {sqlUF_with_fileID} for {len(varMaps)} items")
self.cur.executemany(sqlUF_with_fileID + comment, varMaps)
retVal += self.cur.rowcount
# loop over files with datasetID
if filenames_dict_with_datasetID:
varMaps = []
for filename, (datasetid, fileid) in filenames_dict_with_datasetID.items():
tmp_varMap = varMap.copy()
if scope != "pseudo_dataset":
tmp_varMap[":lfn"] = filename
else:
tmp_varMap[":lfn"] = "%" + filename
tmp_varMap[":datasetID"] = datasetid
varMaps.append(tmp_varMap)
tmpLog.debug(f"tmp_varMap: {tmp_varMap}")
tmpLog.debug(f"running sql executemany: {sqlUF_with_datasetID}")
self.cur.executemany(sqlUF_with_datasetID + comment, varMaps)
retVal += self.cur.rowcount
for one_batch in batched(filenames_dict_with_datasetID.items(), chunk_size):
# loop batches of executemany
varMaps = []
for filename, (datasetid, fileid) in one_batch:
tmp_varMap = varMap.copy()
if scope != "pseudo_dataset":
tmp_varMap[":lfn"] = filename
else:
tmp_varMap[":lfn"] = "%" + filename
tmp_varMap[":datasetID"] = datasetid
varMaps.append(tmp_varMap)
tmpLog.debug(f"tmp_varMap: {tmp_varMap}")
tmpLog.debug(f"running sql executemany: {sqlUF_with_datasetID} for {len(varMaps)} items")
self.cur.executemany(sqlUF_with_datasetID + comment, varMaps)
retVal += self.cur.rowcount
# loop over files without ID
if filenames_dict_without_ID:
varMaps = []
for filename, (datasetid, fileid) in filenames_dict_without_ID.items():
tmp_varMap = varMap.copy()
if scope != "pseudo_dataset":
tmp_varMap[":lfn"] = filename
else:
tmp_varMap[":lfn"] = "%" + filename
tmp_varMap.update(var_map_datasetids)
varMaps.append(tmp_varMap)
tmpLog.debug(f"tmp_varMap: {tmp_varMap}")
tmpLog.debug(f"running sql executemany: {sqlUF_without_ID}")
self.cur.executemany(sqlUF_without_ID + comment, varMaps)
retVal += self.cur.rowcount
for one_batch in batched(filenames_dict_without_ID.items(), chunk_size):
# loop batches of executemany
varMaps = []
for filename, (datasetid, fileid) in one_batch:
tmp_varMap = varMap.copy()
if scope != "pseudo_dataset":
tmp_varMap[":lfn"] = filename
else:
tmp_varMap[":lfn"] = "%" + filename
tmp_varMap.update(var_map_datasetids)
varMaps.append(tmp_varMap)
tmpLog.debug(f"tmp_varMap: {tmp_varMap}")
tmpLog.debug(f"running sql executemany: {sqlUF_without_ID} for {len(varMaps)} items")
self.cur.executemany(sqlUF_without_ID + comment, varMaps)
retVal += self.cur.rowcount
# update associated files
if primaryID is not None:
self.fix_associated_files_in_staging(jeditaskid, primary_id=primaryID)
Expand Down
4 changes: 2 additions & 2 deletions pandajedi/jedimsgprocessor/status_report_msg_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ class StatusReportMsgProcPlugin(BaseMsgProcPlugin):

def initialize(self):
BaseMsgProcPlugin.initialize(self)
# forwarding plugins: incoming message will be fowarded to process method of these plugins
# forwarding plugins: incoming message will be forwarded to process method of these plugins
self.forwarding_plugins = []
forwarding_plugin_names = self.params.get("forwading_plugins", [])
forwarding_plugin_names = self.params.get("forwarding_plugins", [])
if "kafka" in forwarding_plugin_names:
# Kafka
from pandajedi.jedimsgprocessor.kafka_msg_processor import (
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ authors = [
{ name = "PanDA Team", email = "panda-support@cern.ch" },
]
dependencies = [
'panda-common>=0.0.38',
'panda-server>=0.3.23',
'panda-common>=0.1.3',
'panda-server>=0.4.2',
'python-daemon',
'numpy',
'pyyaml',
Expand Down

0 comments on commit e5f0498

Please sign in to comment.