Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
5075547
feat: UploadLogFile command implementation
AcquaDiGiorgio Feb 2, 2026
7a03ef2
chore: improve UploadLogFile tests
AcquaDiGiorgio Feb 4, 2026
fd12496
feat: Change UploadLogFile DataManager Mocks to real DIRAC Classes
AcquaDiGiorgio Feb 11, 2026
8317c9f
chore: Update project name at imports
AcquaDiGiorgio Feb 12, 2026
91cef73
chore: setup lhcbdirac dependency to fork
AcquaDiGiorgio Apr 27, 2026
98ccc37
feat: Migrate BookkeepingReport command to cwl-dirac
AcquaDiGiorgio Apr 27, 2026
1da58a2
chore: set lhcbdirac dependency to https instead of ssh
AcquaDiGiorgio Apr 27, 2026
4586f84
chore: remove all DIRAC import mypy type checking
AcquaDiGiorgio Apr 28, 2026
0ad8e0e
feat: Migrate FailoverRequest command to cwl-dirac
AcquaDiGiorgio May 4, 2026
bd285c3
chore(tests): improve command fixtures
AcquaDiGiorgio May 4, 2026
26de911
feat: Migrate UploadOutputData command to cwl-dirac
AcquaDiGiorgio May 5, 2026
b87c180
feat: Migrate AnalyseXmlSummary command to cwl-dirac
AcquaDiGiorgio May 6, 2026
32e54b4
feat: Migrate WorkflowAccounting command to cwl-dirac
AcquaDiGiorgio May 6, 2026
f02159a
feat: Migrate UploadLogFile command to cwl-dirac
AcquaDiGiorgio May 6, 2026
f4d2821
chore: update pixi.lock
AcquaDiGiorgio May 7, 2026
028ca4f
chore: fix BookkeepingReport typo
AcquaDiGiorgio May 11, 2026
d9e24e9
chore: fix possible None values while saving workflow_commons
AcquaDiGiorgio May 11, 2026
6907021
chore: set proper commands exception catching
AcquaDiGiorgio May 11, 2026
1987844
chore: fix job path not being taken into account
AcquaDiGiorgio May 11, 2026
947bc8b
chore: change workflow commons from dict to a pydantic model
AcquaDiGiorgio May 15, 2026
396645e
chore: fix typos
AcquaDiGiorgio May 15, 2026
a81648b
chore: add logging to commands
AcquaDiGiorgio May 18, 2026
dcb1693
chore: wrap command execute function
AcquaDiGiorgio May 19, 2026
2d16150
chore: use DataStoreClient private registersList attribute
AcquaDiGiorgio May 19, 2026
ed1c6f0
feat: Add step information for executions of multiple steps at a time
AcquaDiGiorgio Jun 4, 2026
3a8908d
chore: fix snake-case convention mismatch
AcquaDiGiorgio Jun 4, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10,687 changes: 5,782 additions & 4,905 deletions pixi.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies = [
"diracx-client>=0.0.8",
"diracx-cli>=0.0.8",
"lbprodrun",
"LHCbDIRAC @ git+https://git@gitlab.cern.ch/jlisalab/LHCbDIRAC.git@modules-to-cwl-migration", # Temporary fork dependency
"pydantic",
"pyyaml",
"typer",
Expand Down Expand Up @@ -78,7 +79,7 @@ allow_redefinition = true
enable_error_code = ["import", "attr-defined"]

[[tool.mypy.overrides]]
module = ["requests", "yaml"]
module = ["requests", "yaml", "DIRAC.*", "LHCbDIRAC.*", "DIRACCommon.*"]
ignore_missing_imports = true

[tool.pytest.ini_options]
Expand Down
17 changes: 16 additions & 1 deletion src/dirac_cwl/commands/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
"""Command classes for workflow pre/post-processing operations."""

from .analyze_xml_summary import AnalyseXmlSummary
from .bookkeeping_report import BookkeepingReport
from .core import PostProcessCommand, PreProcessCommand
from .failover_request import FailoverRequest
from .upload_log_file import UploadLogFile
from .upload_output_data import UploadOutputData
from .workflow_accounting import WorkflowAccounting

__all__ = ["PreProcessCommand", "PostProcessCommand"]
__all__ = [
"AnalyseXmlSummary",
"PreProcessCommand",
"PostProcessCommand",
"UploadLogFile",
"BookkeepingReport",
"FailoverRequest",
"UploadOutputData",
"WorkflowAccounting",
]
49 changes: 49 additions & 0 deletions src/dirac_cwl/commands/analyze_xml_summary.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""LHCb command for checking the XMLSummary output to ensure that the execution was done correctly."""

import logging
import os

from LHCbDIRAC.Workflow.Modules.AnalyseXMLSummary import _areInputsOK, _isXMLSummaryOK

from dirac_cwl.core.exceptions import WorkflowProcessingException

from .core import PostProcessCommand
from .workflow_commons import Step, StepStatus, WorkflowCommons

logger = logging.getLogger(__name__)


class AnalyseXmlSummary(PostProcessCommand):
"""Performs a series of checks on the XMLSummary output to make sure the execution was done correctly."""

def _execute(self, job_path: os.PathLike, workflow_commons: WorkflowCommons, **kwargs):
"""Execute the command.

:param job_path: Path to the job working directory.
:param workflow_commons: WorkflowCommons object
:param kwargs: Additional keyword arguments.
"""
for step in workflow_commons.steps:
self._execute_for_step(job_path, workflow_commons, step, **kwargs)

def _execute_for_step(self, job_path: os.PathLike, workflow_commons: WorkflowCommons, step_commons: Step, **kwargs):
"""Execute the command for a specific step."""
job_ok = _isXMLSummaryOK(step_commons.xf_o)

if job_ok:
job_ok = _areInputsOK(
step_commons.xf_o,
step_commons.inputs,
step_commons.number_of_events,
workflow_commons.production_id,
workflow_commons.file_report,
)
if not job_ok:
workflow_commons.job_report.setApplicationStatus("XMLSummary reports error")
raise WorkflowProcessingException("XMLSummary reports error")

if workflow_commons.step_status == StepStatus.Failed:
logger.info("Workflow already failed")
return

workflow_commons.job_report.setApplicationStatus(f"{step_commons.application_name} Step OK")
162 changes: 162 additions & 0 deletions src/dirac_cwl/commands/bookkeeping_report.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
"""LHCb command for bookkeeping report file generation based on the XMLSummary and the XML catalog."""

import copy
import logging
import os
from typing import Any, Dict

from DIRAC.Core.Utilities.ReturnValues import SErrorException, returnValueOrRaise
from DIRAC.Workflow.Utilities.Utils import getStepCPUTimes
from LHCbDIRAC.Core.Utilities.ProductionData import constructProductionLFNs
from LHCbDIRAC.Workflow.Modules.BookkeepingReport import (
_generate_xml_object,
Comment thread
aldbr marked this conversation as resolved.
_generateInputFiles,
_generateOutputFiles,
_prepare_job_info,
_process_time,
)
from LHCbDIRAC.Workflow.Modules.ModulesUtilities import getNumberOfProcessorsToUse

from dirac_cwl.core.exceptions import WorkflowProcessingException

from .core import PostProcessCommand
from .workflow_commons import Step, StepStatus, WorkflowCommons

logger = logging.getLogger(__name__)


class BookkeepingReport(PostProcessCommand):
"""Generates a bookkeeping report file based on the XMLSummary and the pool XML catalog."""

def _execute(self, job_path: os.PathLike, workflow_commons: WorkflowCommons, **kwargs):
"""Execute the command.

:param job_path: Path to the job working directory.
:param workflow_commons: WorkflowCommons object
:param kwargs: Additional keyword arguments.
"""
for step in workflow_commons.steps:
if workflow_commons.step_status == StepStatus.Failed:
return

self._execute_for_step(job_path, workflow_commons, step, **kwargs)

def _execute_for_step(self, job_path: os.PathLike, workflow_commons: WorkflowCommons, step_commons: Step, **kwargs):
# Setup variables
cpu_times: Dict[str, Any] = {}
if step_commons.start_time:
cpu_times["StartTime"] = step_commons.start_time
if step_commons.start_stats:
cpu_times["StartStats"] = step_commons.start_stats

exectime, cputime = getStepCPUTimes(cpu_times)

number_of_processors = workflow_commons.number_of_processors

if (step_commons.multicore and workflow_commons.multicore) or (
workflow_commons.job_type.lower() == "user" and workflow_commons.max_number_of_processors
):
number_of_processors = getNumberOfProcessorsToUse(
workflow_commons.job_id,
workflow_commons.max_number_of_processors,
)

all_outputs = copy.deepcopy(step_commons.outputs)
all_outputs.extend(step_commons.outputs)

parameters = {
"PRODUCTION_ID": workflow_commons.production_id,
"JOB_ID": workflow_commons.prod_job_id,
"configVersion": workflow_commons.config_version,
"outputList": all_outputs,
"configName": workflow_commons.config_name,
"outputDataFileMask": workflow_commons.output_data_file_mask,
}

if workflow_commons.bookkeeping_lfns and workflow_commons.production_output_data:
bk_lfns = workflow_commons.bookkeeping_lfns

if not isinstance(bk_lfns, list):
bk_lfns = [i.strip() for i in bk_lfns.split(";")]

else:
logger.info("BookkeepingLFNs parameters not found, creating on the fly")
try:
production_lfns_dict = returnValueOrRaise(
constructProductionLFNs(parameters, workflow_commons.bk_client)
)
except SErrorException as e:
logger.error("Could not create production LFNs", exc_info=e)
raise WorkflowProcessingException(f"Could not create production LFNs: {e}") from e

bk_lfns = production_lfns_dict["BookkeepingLFNs"]

ldate, ltime, ldatestart, ltimestart = _process_time(step_commons.start_time)

# Obtain XMLSummary
if not step_commons.xf_o:
step_commons.xf_o = _generate_xml_object(
step_commons.cleaned_application_name,
workflow_commons.production_id,
workflow_commons.prod_job_id,
step_commons.number,
step_commons.id,
)

info_dict = {
"exectime": exectime,
"cputime": cputime,
"numberOfProcessors": number_of_processors,
"production_id": workflow_commons.production_id,
"jobID": workflow_commons.job_id,
"siteName": workflow_commons.site_name,
"jobType": workflow_commons.job_type,
"applicationName": step_commons.application_name,
"applicationVersion": step_commons.application_version,
"numberOfEvents": step_commons.number_of_events,
}

# Generate job_info object
job_info = _prepare_job_info(
info_dict,
ldatestart,
ltimestart,
ldate,
ltime,
step_commons.xf_o,
step_commons.inputs,
step_commons.id,
step_commons.bk_id,
workflow_commons.bk_client,
workflow_commons.config_name,
workflow_commons.config_version,
)

# Add input files to job_info
_generateInputFiles(job_info, bk_lfns, step_commons.inputs)

# Add output files to job_info
_generateOutputFiles(
job_info,
bk_lfns,
step_commons.event_type,
step_commons.application_name,
step_commons.xf_o,
step_commons.outputs,
step_commons.inputs,
step_commons.size,
step_commons.md5,
step_commons.guid,
)

# Generate SimulationConditions
if step_commons.application_name == "Gauss":
job_info.simulation_condition = workflow_commons.sim_description

# Convert job_info object to XML
doc = job_info.to_xml()

# Write to file
bfilename = f"bookkeeping_{step_commons.id}.xml"
with open(bfilename, "wb") as bfile:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For instance, here you also want this to be part of job_path, let's write it explicitly may be.

Suggested change
with open(bfilename, "wb") as bfile:
with open(job_path / bfilename, "wb") as bfile:

bfile.write(doc)
41 changes: 39 additions & 2 deletions src/dirac_cwl/commands/core.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
"""Core base classes for workflow processing commands."""

import logging
import os
from abc import ABC, abstractmethod
from pathlib import Path

from dirac_cwl.core.exceptions import WorkflowProcessingException

from .workflow_commons import WorkflowCommons

logger = logging.getLogger(__name__)


class CommandBase(ABC):
Expand All @@ -12,8 +19,38 @@ class CommandBase(ABC):
:class:`dirac_cwl.commands.base.PostProcessCommand`
"""

def execute(self, job_path: os.PathLike, **kwargs) -> None:
"""Execute the command in the given job path.

:param job_path: Path to the job working directory.
:param kwargs: Additional keyword arguments.
"""
failed = False
workflow_commons = None
try:
workflow_commons = WorkflowCommons.load(job_path)
logger.info("WorkflowCommons:\n%s", workflow_commons)
self._execute(job_path, workflow_commons, **kwargs)

except WorkflowProcessingException:
failed = True
raise

except Exception as e:
logger.exception("Exception in %s", self.__class__.__name__, exc_info=e)

failed = True
if workflow_commons:
workflow_commons.job_report.setApplicationStatus(repr(e))

raise WorkflowProcessingException(e) from e

finally:
if workflow_commons:
workflow_commons.save(job_path, failed=failed)

@abstractmethod
def execute(self, job_path: Path, **kwargs) -> None:
def _execute(self, job_path: os.PathLike, workflow_commons: WorkflowCommons, **kwargs) -> None:
"""Execute the command in the given job path.

:param job_path: Path to the job working directory.
Expand Down
4 changes: 3 additions & 1 deletion src/dirac_cwl/commands/download_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@

from dirac_cwl.commands import PreProcessCommand

from .workflow_commons import WorkflowCommons


class DownloadConfig(PreProcessCommand):
"""Example command that creates a file with named 'content.cfg'."""

def execute(self, job_path, **kwargs):
def _execute(self, job_path: os.PathLike, workflow_commons: WorkflowCommons, **kwargs):
"""Execute the configuration download.

:param job_path: Path to the job working directory.
Expand Down
71 changes: 71 additions & 0 deletions src/dirac_cwl/commands/failover_request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"""LHCb command for committing the status of the files in the file report.

The status will be "Processed" if everything ended properly or "Unused" if it did not.
"""

import logging
import os

from DIRAC.Core.Utilities.ReturnValues import SErrorException, returnValueOrRaise
from LHCbDIRAC.Workflow.Modules.FailoverRequest import _prepareRequest

from .core import PostProcessCommand
from .workflow_commons import StepStatus, WorkflowCommons

logger = logging.getLogger(__name__)


class FailoverRequest(PostProcessCommand):
Comment thread
AcquaDiGiorgio marked this conversation as resolved.
"""Commits the status of the files in the file report.

The status will be "Processed" if everything ended properly or "Unused" if it did not.
"""

def _execute(self, job_path: os.PathLike, workflow_commons: WorkflowCommons, **kwargs):
"""Execute the command.

:param job_path: Path to the job working directory.
:param workflow_commons: WorkflowCommons object.
:param kwargs: Additional keyword arguments.
"""
_prepareRequest(workflow_commons.request, workflow_commons.job_id)

files_in_file_report = workflow_commons.file_report.getFiles()

for lfn in workflow_commons.inputs:
if lfn not in files_in_file_report:
status = "Processed" if workflow_commons.step_status == StepStatus.Done else "Unused"
if status == "Unused":
logger.info("Set status of %s to 'Unused' due to workflow failure", lfn)
else:
logger.debug("No status populated for %s, setting to 'Processed'", lfn)

workflow_commons.file_report.setFileStatus(int(workflow_commons.production_id), lfn, status)

try:
value = returnValueOrRaise(workflow_commons.file_report.commit())
if value:
logger.info("Status of files have been properly updated in the TransformationDB")
else:
logger.warning("No file status update reported. There are no input files?")
except SErrorException as e:
logger.error("Something went wrong trying fileReport.commit() %s", e)

if workflow_commons.file_report.getFiles():
logger.error("On first attempt, failed to report file status to TransformationDB")
try:
value = returnValueOrRaise(workflow_commons.file_report.generateForwardDISET())
if not value:
logger.info("On second attempt, files correctly reported to TransformationDB")
elif workflow_commons.step_status == StepStatus.Done:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also based on my previous comment in the WorkflowAccounting: it would be interesting to see whether there is a way to get the status of the cwl execution vs a step execution.

Because if this is possible, then may be there is a way to reproduce exactly what we have in the workflow modules with the conditions like if workflowStatus and jobStatus...

logger.info("Adding a SetFileStatus operation to the request")
workflow_commons.request.addOperation(value)
else:
logger.info("The job should fail: do not set requests, as the DRA will take care")
except SErrorException as e:
logger.warning("Could not generate Operation for file report: %s", e)

if workflow_commons.step_status == StepStatus.Done:
workflow_commons.job_report.setApplicationStatus("Job Finished Successfully", True)

workflow_commons.generate_failover_file()
Loading
Loading