diff --git a/pandajedi/jediorder/PostProcessor.py b/pandajedi/jediorder/PostProcessor.py index 7699e286f..c257cf7b4 100644 --- a/pandajedi/jediorder/PostProcessor.py +++ b/pandajedi/jediorder/PostProcessor.py @@ -1,4 +1,12 @@ -import datetime +""" +Post-processing worker for JEDI tasks. + +PostProcessor is a long-running daemon that periodically picks up tasks ready +to be finished and dispatches them to a pool of PostProcessorThread workers. +Each worker calls the VO/label-specific post-processor implementation +(doPostProcess) and, if successful, the final-procedure hook (doFinalProcedure). +""" + import os import socket import sys @@ -18,9 +26,16 @@ logger = PandaLogger().getLogger(__name__.split(".")[-1]) -# worker class to do post-processing class PostProcessor(JediKnight, FactoryBase): - # constructor + """ + Daemon that drives post-processing for finished JEDI tasks. + + Inherits scheduling and communication from JediKnight and VO/label-specific + implementation instantiation from FactoryBase. On each loop iteration it + calls prepareTasksToBeFinished_JEDI to lock eligible tasks, then hands them + off to a pool of PostProcessorThread workers. + """ + def __init__(self, commuChannel, taskBufferIF, ddmIF, vos, prodSourceLabels): self.vos = self.parseInit(vos) self.prodSourceLabels = self.parseInit(prodSourceLabels) @@ -28,139 +43,140 @@ def __init__(self, commuChannel, taskBufferIF, ddmIF, vos, prodSourceLabels): JediKnight.__init__(self, commuChannel, taskBufferIF, ddmIF, logger) FactoryBase.__init__(self, self.vos, self.prodSourceLabels, logger, jedi_config.postprocessor.modConfig) - # main def start(self): - # start base classes + """Run the main post-processing loop, cycling every 60 seconds.""" JediKnight.start(self) FactoryBase.initializeMods(self, self.taskBufferIF, self.ddmIF) - # go into main loop + while True: - startTime = naive_utcnow() + start_time = naive_utcnow() try: - # get logger - tmpLog = MsgWrapper(logger) - tmpLog.info("start") - # loop over all vos + tmp_log = MsgWrapper(logger) + tmp_log.info("start") + for vo in self.vos: - # loop over all sourceLabels - for prodSourceLabel in self.prodSourceLabels: - # prepare tasks to be finished - tmpLog.info(f"preparing tasks to be finished for vo={vo} label={prodSourceLabel}") - tmp_ret_list = self.taskBufferIF.prepareTasksToBeFinished_JEDI(vo, prodSourceLabel, jedi_config.postprocessor.nTasks, pid=self.pid) - if tmp_ret_list is None: - # failed - tmpLog.error("failed to prepare tasks") - # get tasks to be finished - tmpLog.info("getting tasks to be finished") - tmpList = self.taskBufferIF.getTasksToBeFinished_JEDI( - vo, prodSourceLabel, self.pid, jedi_config.postprocessor.nTasks, target_tasks=tmp_ret_list + for prod_source_label in self.prodSourceLabels: + tmp_log.info(f"preparing tasks to be finished for vo={vo} label={prod_source_label}") + target_tasks = self.taskBufferIF.prepareTasksToBeFinished_JEDI(vo, prod_source_label, jedi_config.postprocessor.nTasks, pid=self.pid) + if target_tasks is None: + tmp_log.error("failed to prepare tasks") + + tmp_log.info("getting tasks to be finished") + task_list = self.taskBufferIF.getTasksToBeFinished_JEDI( + vo, prod_source_label, self.pid, jedi_config.postprocessor.nTasks, target_tasks=target_tasks ) - if tmpList is None: - # failed - tmpLog.error("failed to get tasks to be finished") - else: - tmpLog.info(f"got {len(tmpList)} tasks") - # put to a locked list - taskList = ListWithLock(tmpList) - # make thread pool - threadPool = ThreadPool() - # make workers - nWorker = jedi_config.postprocessor.nWorkers - for iWorker in range(nWorker): - thr = PostProcessorThread(taskList, threadPool, self.taskBufferIF, self.ddmIF, self) - thr.start() - # join - threadPool.join() - tmpLog.info("done") + if task_list is None: + tmp_log.error("failed to get tasks to be finished") + continue + + tmp_log.info(f"got {len(task_list)} tasks") + locked_list = ListWithLock(task_list) + thread_pool = ThreadPool() + for _ in range(jedi_config.postprocessor.nWorkers): + thr = PostProcessorThread(locked_list, thread_pool, self.taskBufferIF, self.ddmIF, self) + thr.start() + thread_pool.join() + + tmp_log.info("done") except Exception: - errtype, errvalue = sys.exc_info()[:2] - tmpLog.error(f"failed in {self.__class__.__name__}.start() with {errtype.__name__} {errvalue}") - # sleep if needed - loopCycle = 60 - timeDelta = naive_utcnow() - startTime - sleepPeriod = loopCycle - timeDelta.seconds - if sleepPeriod > 0: - time.sleep(sleepPeriod) + err_type, err_value = sys.exc_info()[:2] + tmp_log.error(f"failed in {self.__class__.__name__}.start() with {err_type.__name__} {err_value}") + + # sleep for the remainder of the 60-second cycle + loop_cycle = 60 + elapsed = naive_utcnow() - start_time + sleep_period = loop_cycle - elapsed.seconds + if sleep_period > 0: + time.sleep(sleep_period) -# thread for real worker class PostProcessorThread(WorkerThread): - # constructor + """ + Worker thread that post-processes a batch of JEDI tasks. + + Instantiated by PostProcessor.start() for each worker slot. Pulls tasks + from a shared locked list and calls post_process_tasks() until the list is + exhausted. + """ + def __init__(self, taskList, threadPool, taskbufferIF, ddmIF, implFactory): - # initialize worker with no semaphore WorkerThread.__init__(self, None, threadPool, logger) - # attributes self.taskList = taskList self.taskBufferIF = taskbufferIF self.ddmIF = ddmIF self.implFactory = implFactory - # post process tasks def post_process_tasks(self, task_list): - for taskSpec in task_list: - # make logger - tmpLog = MsgWrapper(self.logger, f"") - tmpLog.info("start") - tmpStat = Interaction.SC_SUCCEEDED - # get impl - impl = self.implFactory.instantiateImpl(taskSpec.vo, taskSpec.prodSourceLabel, None, self.taskBufferIF, self.ddmIF) + """ + Run post-processing and final-procedure for each task in task_list. + + Outcome per task: + - SC_FATAL or SC_FAILED on a terminal task status → mark as broken. + - SC_FAILED on a non-terminal status → record transient error and skip + the final procedure. + - SC_SUCCEEDED → call doFinalProcedure. + """ + for task_spec in task_list: + tmp_log = MsgWrapper(self.logger, f"") + tmp_log.info("start") + tmp_stat = Interaction.SC_SUCCEEDED + + # instantiate the VO/label-specific post-processor + impl = self.implFactory.instantiateImpl(task_spec.vo, task_spec.prodSourceLabel, None, self.taskBufferIF, self.ddmIF) if impl is None: - # post processor is undefined - tmpLog.error(f"post-processor is undefined for vo={taskSpec.vo} sourceLabel={taskSpec.prodSourceLabel}") - tmpStat = Interaction.SC_FATAL - # execute - if tmpStat == Interaction.SC_SUCCEEDED: - tmpLog.info(f"post-process with {impl.__class__.__name__}") + tmp_log.error(f"post-processor is undefined for vo={task_spec.vo} sourceLabel={task_spec.prodSourceLabel}") + tmp_stat = Interaction.SC_FATAL + + # run post-processing + if tmp_stat == Interaction.SC_SUCCEEDED: + tmp_log.info(f"post-process with {impl.__class__.__name__}") try: - tmpStat = impl.doPostProcess(taskSpec, tmpLog) + tmp_stat = impl.doPostProcess(task_spec, tmp_log) except Exception as e: - tmpLog.error(f"post-process failed with {str(e)}") - tmpStat = Interaction.SC_FATAL - # done - if tmpStat == Interaction.SC_FATAL or (tmpStat == Interaction.SC_FAILED and taskSpec.status in ["toabort", "tobroken"]): - # task is broken - tmpErrStr = "post-process permanently failed" - tmpLog.error(tmpErrStr) - taskSpec.status = "broken" - taskSpec.setErrDiag(tmpErrStr) - taskSpec.lockedBy = None - self.taskBufferIF.updateTask_JEDI(taskSpec, {"jediTaskID": taskSpec.jediTaskID}) - elif tmpStat == Interaction.SC_FAILED: - tmpErrStr = "post-processing temporarily failed" - taskSpec.setErrDiag(tmpErrStr, True) - self.taskBufferIF.updateTask_JEDI(taskSpec, {"jediTaskID": taskSpec.jediTaskID}) - tmpLog.info(f"set task_status={taskSpec.status} since {taskSpec.errorDialog}") - tmpLog.info("done") + tmp_log.error(f"post-process failed with {str(e)}") + tmp_stat = Interaction.SC_FATAL + + # handle permanent failure + if tmp_stat == Interaction.SC_FATAL or (tmp_stat == Interaction.SC_FAILED and task_spec.status in ("toabort", "tobroken")): + err_str = "post-process permanently failed" + tmp_log.error(err_str) + task_spec.status = "broken" + task_spec.setErrDiag(err_str) + task_spec.lockedBy = None + self.taskBufferIF.updateTask_JEDI(task_spec, {"jediTaskID": task_spec.jediTaskID}) + + # handle transient failure — skip final procedure + elif tmp_stat == Interaction.SC_FAILED: + err_str = "post-processing temporarily failed" + task_spec.setErrDiag(err_str, True) + self.taskBufferIF.updateTask_JEDI(task_spec, {"jediTaskID": task_spec.jediTaskID}) + tmp_log.info(f"set task_status={task_spec.status} since {task_spec.errorDialog}") + tmp_log.info("done") continue - # final procedure + + # run final procedure depending on prodsourcelabel (e.g. email notifications, manage output datasets, etc.) try: - impl.doFinalProcedure(taskSpec, tmpLog) + impl.doFinalProcedure(task_spec, tmp_log) except Exception as e: - tmpLog.error(f"final procedure failed with {str(e)}") - # done - tmpLog.info("done") + tmp_log.error(f"final procedure failed with {str(e)}") + + tmp_log.info("done") - # main def runImpl(self): + """Pull batches of tasks from the shared list and post-process them.""" while True: try: - # get a part of list - nTasks = 10 - taskList = self.taskList.get(nTasks) - # no more datasets - if len(taskList) == 0: + task_list = self.taskList.get(10) + if not task_list: self.logger.debug(f"{self.__class__.__name__} terminating since no more items") return - # post process tasks - self.post_process_tasks(taskList) + self.post_process_tasks(task_list) except Exception: - errtype, errvalue = sys.exc_info()[:2] - logger.error(f"{self.__class__.__name__} failed in runImpl() with {errtype.__name__}:{errvalue}") - - -# launch + err_type, err_value = sys.exc_info()[:2] + logger.error(f"{self.__class__.__name__} failed in runImpl() with {err_type.__name__}:{err_value}") def launcher(commuChannel, taskBufferIF, ddmIF, vos=None, prodSourceLabels=None): + """Entry point used by the JEDI daemon infrastructure to start the PostProcessor.""" p = PostProcessor(commuChannel, taskBufferIF, ddmIF, vos, prodSourceLabels) p.start() diff --git a/pandajedi/jedipprocess/AtlasAnalPostProcessor.py b/pandajedi/jedipprocess/AtlasAnalPostProcessor.py index 72a5c96d2..935725e5b 100644 --- a/pandajedi/jedipprocess/AtlasAnalPostProcessor.py +++ b/pandajedi/jedipprocess/AtlasAnalPostProcessor.py @@ -1,3 +1,10 @@ +""" +Post-processor implementation for ATLAS analysis tasks. + +Handles dataset freezing, email notifications, and carbon-footprint reporting +for user analysis tasks. +""" + import datetime import random import re @@ -17,20 +24,20 @@ def format_weight(weight): + """Convert a CO2 weight in grams to a human-readable string with the appropriate unit.""" power = 1000 n = 0 power_labels = {0: "gCO2", 1: "kgCO2", 2: "tCO2", 3: "MtCO2", 4: "GtCO2"} while weight > power: weight /= power n += 1 - weight_str = f"{weight:.2f} {power_labels[n]}" return weight_str -# post processor for ATLAS production class AtlasAnalPostProcessor(PostProcessorBase): - # constructor + """Post-processor for ATLAS analysis tasks.""" + def __init__(self, taskBufferIF, ddmIF): PostProcessorBase.__init__(self, taskBufferIF, ddmIF) self.taskParamMap = None @@ -39,33 +46,45 @@ def __init__(self, taskBufferIF, ddmIF): self.user_container_lifetime = 14 self.user_container_lifetime *= 24 * 60 * 60 - # main def doPostProcess(self, taskSpec, tmp_logger): + """ + Run post-processing steps for a finished ATLAS analysis task. + + Steps performed in order: + 1. For each output/log/lib dataset: remove wrong files, freeze, + delete transient/empty datasets, and extend replication-rule lifetimes. + 2. Set an error dialog if the build step produced no successful jobs. + 3. Call doBasicPostProcess for common bookkeeping. + + Returns SC_SUCCEEDED, SC_FAILED, or SC_FATAL. + """ # freeze datasets try: - # get DDM I/F ddmIF = self.ddmIF.getInterface(taskSpec.vo) - # shuffle datasets + + # shuffle to avoid always processing in the same order under partial failures random.shuffle(taskSpec.datasetSpecList) - # loop over all datasets + use_lib = False n_ok_lib = 0 lock_update_time = naive_utcnow() done_containers = set() + for datasetSpec in taskSpec.datasetSpecList: - # ignore template + # ignore template datasets if datasetSpec.type.startswith("tmpl_"): continue - # only output, log or lib datasets + # only process output, log, or lib datasets if not datasetSpec.type.endswith("log") and not datasetSpec.type.endswith("output") and not datasetSpec.type == "lib": continue - # only user group, or panda dataset + # only user, group, or panda datasets if ( not datasetSpec.datasetName.startswith("user") and not datasetSpec.datasetName.startswith("panda") and not datasetSpec.datasetName.startswith("group") ): continue + # check if already closed dataset_attrs = self.taskBufferIF.getDatasetAttributes_JEDI(datasetSpec.jediTaskID, datasetSpec.datasetID, ["state"]) if "state" in dataset_attrs and dataset_attrs["state"] == "closed": @@ -73,45 +92,44 @@ def doPostProcess(self, taskSpec, tmp_logger): closed_flag = True else: closed_flag = False - # remove wrong files + + # remove files from DDM that are not in the DB success list if not closed_flag and datasetSpec.type in ["output"]: - # get successful files ok_files = self.taskBufferIF.getSuccessfulFiles_JEDI(datasetSpec.jediTaskID, datasetSpec.datasetID) if ok_files is None: tmp_logger.warning(f"failed to get successful files for {datasetSpec.datasetName}") return self.SC_FAILED - # get files in dataset ddm_files = ddmIF.getFilesInDataset(datasetSpec.datasetName, skipDuplicate=False) tmp_logger.debug( f"datasetID={datasetSpec.datasetID}:Name={datasetSpec.datasetName} has {len(ok_files)} files in DB, {len(ddm_files)} files in DDM" ) - # check all files to_delete = [] for tmpGUID, attMap in ddm_files.items(): if attMap["lfn"] not in ok_files: did = {"scope": attMap["scope"], "name": attMap["lfn"]} to_delete.append(did) tmp_logger.debug(f"delete {attMap['lfn']} from {datasetSpec.datasetName}") - # delete if to_delete: ddmIF.deleteFilesFromDataset(datasetSpec.datasetName, to_delete) - # freeze datasets + # freeze datasets, skipping transient ones (trn_*), except trn_log if not closed_flag and not (datasetSpec.type.startswith("trn_") and datasetSpec.type not in ["trn_log"]): tmp_logger.debug(f"freeze datasetID={datasetSpec.datasetID}:Name={datasetSpec.datasetName}") ddmIF.freezeDataset(datasetSpec.datasetName, ignoreUnknown=True) else: if datasetSpec.type.startswith("trn_") and datasetSpec.type not in ["trn_log"]: tmp_logger.debug(f"skip freezing transient datasetID={datasetSpec.datasetID}:Name={datasetSpec.datasetName}") - # update dataset + + # update dataset state datasetSpec.state = "closed" datasetSpec.stateCheckTime = naive_utcnow() - # check if build step was succeeded + # check if build step succeeded if datasetSpec.type == "lib": use_lib = True else: n_ok_lib += 1 + # delete transient or empty datasets if not closed_flag: empty_only = True @@ -119,7 +137,8 @@ def doPostProcess(self, taskSpec, tmp_logger): empty_only = False retStr = ddmIF.deleteDataset(datasetSpec.datasetName, empty_only, ignoreUnknown=True) tmp_logger.debug(retStr) - # extend lifetime + + # extend replication-rule lifetime for user output datasets if datasetSpec.type in ["output"] and datasetSpec.datasetName.startswith("user"): tmp_logger.debug(f"extend lifetime datasetID={datasetSpec.datasetID}:Name={datasetSpec.datasetName}") ddmIF.updateReplicationRules( @@ -133,21 +152,25 @@ def doPostProcess(self, taskSpec, tmp_logger): {"type=.+": {"lifetime": self.user_container_lifetime}, "(SCRATCH|USER)DISK": {"lifetime": self.user_container_lifetime}}, ) done_containers.add(datasetSpec.containerName) - # update dataset in DB + + # persist dataset state to DB self.taskBufferIF.updateDatasetAttributes_JEDI( datasetSpec.jediTaskID, datasetSpec.datasetID, {"state": datasetSpec.state, "stateCheckTime": datasetSpec.stateCheckTime} ) - # update task lock + + # refresh task lock every 5 minutes to avoid expiry during long loops if naive_utcnow() - lock_update_time > datetime.timedelta(minutes=5): lock_update_time = naive_utcnow() - # update lock self.taskBufferIF.updateTaskLock_JEDI(taskSpec.jediTaskID) - # dialog + + # set error dialog if the build job produced no output if use_lib and n_ok_lib == 0: taskSpec.setErrDiag("No build jobs succeeded", True) + except Exception: err_type, err_value = sys.exc_info()[:2] tmp_logger.warning(f"failed to freeze datasets with {err_type.__name__}:{err_value}") + ret_val = self.SC_SUCCEEDED try: self.doBasicPostProcess(taskSpec, tmp_logger) @@ -155,19 +178,28 @@ def doPostProcess(self, taskSpec, tmp_logger): err_type, err_value = sys.exc_info()[:2] tmp_logger.error(f"doBasicPostProcess failed with {err_type.__name__}:{err_value}") ret_val = self.SC_FATAL + return ret_val - # final procedure def doFinalProcedure(self, taskSpec, tmp_logger): - # check email address + """ + Send an email notification to the task owner with a task summary. + + Resolves the user's email address (with a 1-hour cache), calculates the + task carbon footprint, and sends a multipart HTML/plain-text message. + Email is suppressed if the user has opted out or the taskParams contain + ``noEmail: true``. + + Returns SC_SUCCEEDED. + """ + # resolve the recipient email address to_add = self.getEmail(taskSpec.userName, taskSpec.vo, tmp_logger) - # calculate carbon footprint for the task + # calculate carbon footprint try: carbon_footprint = self.taskBufferIF.get_task_carbon_footprint(taskSpec.jediTaskID, level="global") carbon_footprint_redacted = {} zero = "0 gCO2" - for job_status in ["finished", "failed", "cancelled", "total"]: if carbon_footprint and job_status in carbon_footprint: carbon_footprint_redacted[job_status] = format_weight(carbon_footprint[job_status]) @@ -185,36 +217,38 @@ def doFinalProcedure(self, taskSpec, tmp_logger): except Exception: err_type, err_value = sys.exc_info()[:2] tmp_logger.error(f"task param conversion from json failed with {err_type.__name__}:{err_value}") + + # send email notification unless suppressed if to_add is None or (self.taskParamMap is not None and "noEmail" in self.taskParamMap and self.taskParamMap["noEmail"] is True): tmp_logger.debug("email notification is suppressed") else: try: - # send email notification from_add = self.senderAddress() html_text, plain_text, subject = self.compose_message(taskSpec, carbon_footprint_redacted) - msg = MIMEMultipart("alternative") + msg = MIMEMultipart("alternative") msg["Subject"] = subject msg["From"] = from_add msg["To"] = to_add - # Record the MIME types of both parts - text/plain and text/html. + # attach plain text first, HTML last (RFC 2046: last part is preferred) part1 = MIMEText(plain_text, "plain") part2 = MIMEText(html_text, "html") - - # Attach parts into message container. - # According to RFC 2046, the last part of a multipart message, in this case - # the HTML message, is best and preferred. msg.attach(part1) msg.attach(part2) + self.sendMail(taskSpec.jediTaskID, from_add, to_add, msg.as_string(), 3, False, tmp_logger) except Exception: tmp_logger.error(traceback.format_exc()) + return self.SC_SUCCEEDED - # compose mail message def compose_message(self, taskSpec, carbon_footprint): - # summary + """ + Build HTML and plain-text email bodies summarising the task outcome. + + Returns a tuple of (html_message, plain_message, subject). + """ input_datasets = [] output_datasets = [] log_datasets = [] @@ -228,7 +262,7 @@ def compose_message(self, taskSpec, carbon_footprint): cancelled_str = "Cancelled " for datasetSpec in taskSpec.datasetSpecList: - # dataset summary + # collect dataset names by type if datasetSpec.type == "log": if datasetSpec.containerName not in log_datasets: log_datasets.append(datasetSpec.containerName) @@ -238,7 +272,8 @@ def compose_message(self, taskSpec, carbon_footprint): elif datasetSpec.type == "output": if datasetSpec.containerName not in output_datasets: output_datasets.append(datasetSpec.containerName) - # process summary + + # accumulate job counts from the master input dataset if datasetSpec.isMasterInput(): if datasetSpec.status == "removed": continue @@ -256,14 +291,14 @@ def compose_message(self, taskSpec, carbon_footprint): if event_stat is not None: n_succeeded_jobs = event_stat.get(EventServiceUtils.ST_finished, 0) n_failed_jobs = event_stat.get(EventServiceUtils.ST_failed, 0) + try: n_cancelled_jobs = n_total_jobs - n_succeeded_jobs - n_failed_jobs except Exception: pass - if n_succeeded_jobs == n_total_jobs: - msg_succeeded = "All Succeeded" - else: - msg_succeeded = "Succeeded" + + msg_succeeded = "All Succeeded" if n_succeeded_jobs == n_total_jobs else "Succeeded" + input_datasets.sort() output_datasets.sort() log_datasets.sort() @@ -276,13 +311,10 @@ def compose_message(self, taskSpec, carbon_footprint): dataset_summary += f"Log : {tmpDS}\n" dataset_summary = dataset_summary[:-1] - # CLI param - if "cliParams" in self.taskParamMap: - cli_parameters = self.taskParamMap["cliParams"] - else: - cli_parameters = None + # CLI parameters + cli_parameters = self.taskParamMap.get("cliParams") if self.taskParamMap else None - # make message + # build HTML message head = html_head.format(title="Task summary notification") body = jedi_task_html_body.format( jedi_task_id=taskSpec.jediTaskID, @@ -308,6 +340,7 @@ def compose_message(self, taskSpec, carbon_footprint): ) message_html = head + body + # build plain-text message message_plain = jedi_task_plain.format( jedi_task_id=taskSpec.jediTaskID, creation_time=taskSpec.creationDate, @@ -331,52 +364,52 @@ def compose_message(self, taskSpec, carbon_footprint): subject = f"JEDI notification for TaskID:{taskSpec.jediTaskID} ({n_succeeded_jobs}/{n_total_jobs} {msg_succeeded})" - # return return message_html, message_plain, subject - # get email def getEmail(self, user_name, vo, tmp_logger): - # return to suppress mail + """ + Resolve the email address for user_name, using a 1-hour DB cache. + + Returns the email address string, or None if the user has opted out + or no address could be found. + """ ret_suppressed = None - # get DN tmp_logger.debug(f"getting email for {user_name}") - # get email from PANDAMETA DB + # check the metadata DB cache first mail_address_db, dn, db_uptime = self.taskBufferIF.getEmailAddr(user_name, withDN=True) tmp_logger.debug(f"email from MetaDB : {mail_address_db}") - # email notification is suppressed - not_send_mail = False - if mail_address_db is not None and mail_address_db.startswith("notsend"): - not_send_mail = True - # DN is unavailable - if dn in ["", None]: - # there will be no email + + not_send_mail = mail_address_db is not None and mail_address_db.startswith("notsend") + + if dn in ("", None): tmp_logger.debug("DN is empty") else: - # avoid too frequent lookup + # use the cached value if it is less than 1 hour old if db_uptime is not None and naive_utcnow() - db_uptime < datetime.timedelta(hours=1): tmp_logger.debug("no lookup") - if not_send_mail or mail_address_db in [None, ""]: + if not_send_mail or mail_address_db in (None, ""): return ret_suppressed else: return mail_address_db.split(":")[-1] else: - # get email from DQ2 - tmp_logger.debug(f"getting email using dq2Info.finger({dn})") + # look up the address via DDM finger + tmp_logger.debug(f"getting email using rucio.finger({dn})") n_tries = 3 for iDDMTry in range(n_tries): try: user_info = self.ddmIF.getInterface(vo).finger(dn) mail_address = user_info["email"] - tmp_logger.debug(f"email from DQ2 : {mail_address}") + tmp_logger.debug(f"email from Rucio : {mail_address}") if mail_address is None: mail_address = "" - # make email field to update DB + + # build the DB entry (prefix with "notsend:" if opted out) mail_addr_to_db = "" if not_send_mail: mail_addr_to_db += "notsend:" mail_addr_to_db += mail_address - # update database + tmp_logger.debug(f"update email to {mail_addr_to_db}") self.taskBufferIF.setEmailAddr(user_name, mail_addr_to_db) @@ -390,11 +423,11 @@ def getEmail(self, user_name, vo, tmp_logger): else: err_type, err_value = sys.exc_info()[:2] tmp_logger.error(f"{err_type}:{err_value}") - # not send email + return ret_suppressed - # remove tags def removeTags(self, tmp_str): + """Strip HTML tags from tmp_str, returning the cleaned string.""" try: if tmp_str is not None: tmp_str = re.sub(">[^<]+<", "><", tmp_str) diff --git a/pandajedi/jedipprocess/AtlasProdPostProcessor.py b/pandajedi/jedipprocess/AtlasProdPostProcessor.py index 7d4ead90c..f02822ed3 100644 --- a/pandajedi/jedipprocess/AtlasProdPostProcessor.py +++ b/pandajedi/jedipprocess/AtlasProdPostProcessor.py @@ -1,3 +1,7 @@ +""" +Post-processor implementation for ATLAS production tasks. +""" + import sys from pandaserver.dataservice import DataServiceUtils @@ -7,14 +11,27 @@ from .PostProcessorBase import PostProcessorBase -# post processor for ATLAS production class AtlasProdPostProcessor(PostProcessorBase): - # constructor + """Post-processor for ATLAS production tasks.""" + def __init__(self, taskBufferIF, ddmIF): PostProcessorBase.__init__(self, taskBufferIF, ddmIF) - # main def doPostProcess(self, taskSpec, tmpLog): + """ + Run post-processing steps for a finished ATLAS production task. + + Steps performed in order: + 1. Pre-check (doPreCheck) — early exit if already handled. + 2. For each output dataset: remove files absent from the DB success list. + 3. Freeze output, log, and trn_log datasets in DDM. + 4. Delete transient output (trn_output) datasets. + 5. Check for duplicate tasks and pause if found. + 6. Delete Event Service datasets if applicable. + 7. Send external notifications (send_notification). + 8. Run common bookkeeping (doBasicPostProcess). + Returns SC_SUCCEEDED, SC_FAILED, or SC_FATAL. + """ # pre-check try: tmpStat = self.doPreCheck(taskSpec, tmpLog) @@ -24,40 +41,40 @@ def doPostProcess(self, taskSpec, tmpLog): errtype, errvalue = sys.exc_info()[:2] tmpLog.error(f"doPreCheck failed with {errtype.__name__}:{errvalue}") return self.SC_FATAL + # get DDM I/F ddmIF = self.ddmIF.getInterface(taskSpec.vo) + # loop over all datasets for datasetSpec in taskSpec.datasetSpecList: # skip pseudo output datasets if datasetSpec.type in ["output"] and datasetSpec.isPseudo(): continue + try: - # remove wrong files + # remove files from DDM that are not in the DB success list if datasetSpec.type in ["output"]: - # get successful files okFiles = self.taskBufferIF.getSuccessfulFiles_JEDI(datasetSpec.jediTaskID, datasetSpec.datasetID) if okFiles is None: tmpLog.warning(f"failed to get successful files for {datasetSpec.datasetName}") return self.SC_FAILED - # get files in dataset ddmFiles = ddmIF.getFilesInDataset(datasetSpec.datasetName, skipDuplicate=False, ignoreUnknown=True) tmpLog.debug( f"datasetID={datasetSpec.datasetID}:Name={datasetSpec.datasetName} has {len(okFiles)} files in DB, {len(ddmFiles)} files in DDM" ) - # check all files toDelete = [] for tmpGUID, attMap in ddmFiles.items(): if attMap["lfn"] not in okFiles: did = {"scope": attMap["scope"], "name": attMap["lfn"]} toDelete.append(did) tmpLog.debug(f"delete {attMap['lfn']} from {datasetSpec.datasetName}") - # delete if toDelete != []: ddmIF.deleteFilesFromDataset(datasetSpec.datasetName, toDelete) except Exception: errtype, errvalue = sys.exc_info()[:2] tmpLog.warning(f"failed to remove wrong files with {errtype.__name__}:{errvalue}") return self.SC_FAILED + try: # freeze output and log datasets if datasetSpec.type in ["output", "log", "trn_log"]: @@ -67,6 +84,7 @@ def doPostProcess(self, taskSpec, tmpLog): errtype, errvalue = sys.exc_info()[:2] tmpLog.warning(f"failed to freeze datasets with {errtype.__name__}:{errvalue}") return self.SC_FAILED + try: # delete transient datasets if datasetSpec.type in ["trn_output"]: @@ -76,7 +94,8 @@ def doPostProcess(self, taskSpec, tmpLog): except Exception: errtype, errvalue = sys.exc_info()[:2] tmpLog.warning(f"failed to delete datasets with {errtype.__name__}:{errvalue}") - # check duplication + + # check for duplicate tasks and pause if found if self.getFinalTaskStatus(taskSpec) in ["finished", "done"] and taskSpec.gshare != "Test": nDup = self.taskBufferIF.checkDuplication_JEDI(taskSpec.jediTaskID) tmpLog.debug(f"checked duplication with {nDup}") @@ -86,7 +105,8 @@ def doPostProcess(self, taskSpec, tmpLog): taskSpec.status = "paused" taskSpec.setErrDiag(errStr) tmpLog.debug(errStr) - # delete ES datasets + + # delete Event Service datasets if taskSpec.registerEsFiles(): try: targetName = EventServiceUtils.getEsDatasetName(taskSpec.jediTaskID) @@ -96,27 +116,44 @@ def doPostProcess(self, taskSpec, tmpLog): except Exception: errtype, errvalue = sys.exc_info()[:2] tmpLog.warning(f"failed to delete ES dataset with {errtype.__name__}:{errvalue}") + try: AtlasPostProcessorUtils.send_notification(self.taskBufferIF, ddmIF, taskSpec, tmpLog) except Exception as e: tmpLog.error(f"failed to talk to external system with {str(e)}") return self.SC_FAILED + try: self.doBasicPostProcess(taskSpec, tmpLog) except Exception: errtype, errvalue = sys.exc_info()[:2] tmpLog.error(f"doBasicPostProcess failed with {errtype.__name__}:{errvalue}") return self.SC_FATAL + return self.SC_SUCCEEDED - # final procedure def doFinalProcedure(self, taskSpec, tmpLog): + """ + Apply final DDM metadata updates after post-processing completes. + + For done/finished tasks: + - Sets a 14-day lifetime on transient output/log datasets. + - For merge tasks whose parent is fully done: extends parent transient + output dataset lifetimes to 40 days. + + For failed/broken/aborted tasks: + - Sets a 30-day lifetime on log datasets. + + Returns SC_SUCCEEDED. + """ tmpLog.info(f"final procedure for status={taskSpec.status} processingType={taskSpec.processingType}") + + # set lifetime on transient datasets for done/finished tasks if taskSpec.status in ["done", "finished"] or (taskSpec.status == "paused" and taskSpec.oldStatus in ["done", "finished"]): trnLifeTime = 14 * 24 * 60 * 60 trnLifeTimeMerge = 40 * 24 * 60 * 60 ddmIF = self.ddmIF.getInterface(taskSpec.vo) - # set lifetime to transient datasets + metaData = {"lifetime": trnLifeTime} datasetTypeListI = set() datasetTypeListO = set() @@ -133,7 +170,8 @@ def doFinalProcedure(self, taskSpec, tmpLog): datasetTypeListI.add(datasetType) elif datasetSpec.type == "output": datasetTypeListO.add(datasetType) - # set lifetime to parent transient datasets + + # set extended lifetime on parent transient datasets for completed merge tasks if taskSpec.processingType in ["merge"] and ( taskSpec.status == "done" or (taskSpec.status == "finished" and self.getFinalTaskStatus(taskSpec, checkParent=False) == "done") @@ -142,15 +180,11 @@ def doFinalProcedure(self, taskSpec, tmpLog): and (taskSpec.oldStatus == "done" or (taskSpec.oldStatus == "finished" and self.getFinalTaskStatus(taskSpec, checkParent=False) == "done")) ) ): - # get parent task if taskSpec.parent_tid not in [None, taskSpec.jediTaskID]: - # get parent tmpStat, parentTaskSpec = self.taskBufferIF.getTaskDatasetsWithID_JEDI(taskSpec.parent_tid, None, False) if tmpStat and parentTaskSpec is not None: - # set lifetime to parent datasets if they are transient for datasetSpec in parentTaskSpec.datasetSpecList: if datasetSpec.type in ["output"]: - # check dataset type datasetType = DataServiceUtils.getDatasetType(datasetSpec.datasetName) if datasetType not in datasetTypeListI or datasetType not in datasetTypeListO: continue @@ -164,15 +198,16 @@ def doFinalProcedure(self, taskSpec, tmpLog): ) for metadataName, metadaValue in metaData.items(): ddmIF.setDatasetMetadata(datasetSpec.datasetName, metadataName, metadaValue) - # set lifetime to failed datasets + + # set lifetime on log datasets for failed/broken/aborted tasks if taskSpec.status in ["failed", "broken", "aborted"]: trnLifeTime = 30 * 24 * 60 * 60 ddmIF = self.ddmIF.getInterface(taskSpec.vo) - # only log datasets metaData = {"lifetime": trnLifeTime} for datasetSpec in taskSpec.datasetSpecList: if datasetSpec.type in ["log"]: tmpLog.debug(f"set metadata={str(metaData)} to failed datasetID={datasetSpec.datasetID}:Name={datasetSpec.datasetName}") for metadataName, metadaValue in metaData.items(): ddmIF.setDatasetMetadata(datasetSpec.datasetName, metadataName, metadaValue) + return self.SC_SUCCEEDED