diff --git a/Snakefile b/Snakefile index 3cd39ff8..7ebc84ca 100644 --- a/Snakefile +++ b/Snakefile @@ -1,12 +1,16 @@ import os from spras import runner import shutil +import json import yaml +from pathlib import Path +from spras.containers import TimeoutError from spras.dataset import Dataset from spras.evaluation import Evaluation from spras.analysis import ml, summary, cytoscape from spras.config.revision import detach_spras_revision import spras.config.config as _config +from spras.errors import mark_error, mark_success, is_error, TimeoutArtifactError # Snakemake updated the behavior in the 6.5.0 release https://github.com/snakemake/snakemake/pull/1037 # and using the wrong separator prevents Snakemake from matching filenames to the rules that can produce them @@ -44,11 +48,14 @@ def algo_has_mult_param_combos(algo): algorithms_mult_param_combos = [algo for algo in algorithms if algo_has_mult_param_combos(algo)] +# Gets the associated parameter hash out of a params wildcard +def params_index(params_hash): + return params_hash.replace('params-', '') + # Get the parameter dictionary for the specified # algorithm and parameter combination hash def reconstruction_params(algorithm, params_hash): - index = params_hash.replace('params-', '') - return algorithm_params[algorithm][index] + return algorithm_params[algorithm][params_index(params_hash)] # Log the parameter dictionary for this parameter configuration in a yaml file def write_parameter_log(algorithm, param_label, logfile): @@ -262,6 +269,10 @@ def collect_prepared_input(wildcards): return prepared_inputs +def filter_successful(files): + """Convenient function for filtering iterators by whether or not their items are error files.""" + return [file for file in files if not is_error(file)] + # Run the pathway reconstruction algorithm rule reconstruct: input: collect_prepared_input @@ -270,25 +281,48 @@ rule reconstruct: # Overwriting files can happen because the pathway reconstruction algorithms often generate output files with the # same name regardless of the inputs or parameters, and these aren't renamed until after the container command # terminates - output: pathway_file = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'raw-pathway.txt']) + output: + pathway_file = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'raw-pathway.txt']), + # Despite this being a 'log' file, we don't use the log directive as this rule doesn't actually throw errors. + artifact_info = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'artifact-log.json']) + params: + # Get the timeout from the config and use it as an input. + # TODO: This has unexpected behavior when this rule succeeds but the timeout extends, + # making this rule run again. + timeout = lambda wildcards: _config.config.algorithm_param_run_settings[params_index(wildcards.params)].timeout run: # Create a copy so that the updates are not written to the parameters logfile - params = reconstruction_params(wildcards.algorithm, wildcards.params).copy() + algorithm_params = reconstruction_params(wildcards.algorithm, wildcards.params).copy() # Declare the input files as a dictionary. inputs = dict(zip(runner.get_required_inputs(detach_spras_revision(_config.config.immutable_files, wildcards.algorithm)), *{input}, strict=True)) # Remove the _spras_run_name parameter added for keeping track of the run name for parameters.yml - if '_spras_run_name' in params: - params.pop('_spras_run_name') - runner.run(detach_spras_revision(_config.config.immutable_files, wildcards.algorithm), inputs, output.pathway_file, params, container_settings) + if '_spras_run_name' in algorithm_params: + algorithm_params.pop('_spras_run_name') + try: + runner.run(detach_spras_revision(_config.config.immutable_files, wildcards.algorithm), inputs, output.pathway_file, algorithm_params, container_settings, params.timeout) + mark_success(output.artifact_info) + except TimeoutError as err: + # We don't raise the error here (analogous to `--keep-going`, except we avoid unnecessarily re-running this rule.) + mark_error(output.artifact_info, TimeoutArtifactError(duration=params.timeout)) + # and we touch pathway_file still: Snakemake doesn't have optional files, so we output a 'artifact info' file, + # which contains the status (success/failure) of specific Snakemake jobs. + # We filter for the successful files (such as ones that didn't time out) with the `filter_successful` function. + Path(output.pathway_file).touch() # Original pathway reconstruction output to universal output # Use PRRunner as a wrapper to call the algorithm-specific parse_output rule parse_output: input: - raw_file = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'raw-pathway.txt']), + # We propagate up the artifact_info error if it exists. + artifact_info = rules.reconstruct.output.artifact_info, + raw_file = rules.reconstruct.output.pathway_file, dataset_file = SEP.join([out_dir, 'dataset-{dataset}-merged.pickle']) output: standardized_file = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'pathway.txt']) run: + if is_error(input.artifact_info): + mark_error(output.standardized_file, artifact_info_from_file(input.artifact_info).error) + return + params = reconstruction_params(wildcards.algorithm, wildcards.params).copy() params['dataset'] = input.dataset_file runner.parse_output(detach_spras_revision(_config.config.immutable_files, wildcards.algorithm), input.raw_file, output.standardized_file, params) @@ -310,7 +344,7 @@ rule viz_cytoscape: output: session = SEP.join([out_dir, '{dataset}-cytoscape.cys']) run: - cytoscape.run_cytoscape(input.pathways, output.session, container_settings) + cytoscape.run_cytoscape(filter_successful(input.pathways), output.session, container_settings) # Write a single summary table for all pathways for each dataset @@ -323,7 +357,7 @@ rule summary_table: run: # Load the node table from the pickled dataset file node_table = Dataset.from_file(input.dataset_file).node_table - summary_df = summary.summarize_networks(input.pathways, node_table, algorithm_params, algorithms_with_params) + summary_df = summary.summarize_networks(filter_successful(input.pathways), node_table, algorithm_params, algorithms_with_params) summary_df.to_csv(output.summary_table, sep='\t', index=False) # Cluster the output pathways for each dataset @@ -339,7 +373,7 @@ rule ml_analysis: hac_image_horizontal = SEP.join([out_dir, '{dataset}-ml', 'hac-horizontal.png']), hac_clusters_horizontal = SEP.join([out_dir, '{dataset}-ml', 'hac-clusters-horizontal.txt']), run: - summary_df = ml.summarize_networks(input.pathways) + summary_df = ml.summarize_networks(filter_successful(input.pathways)) ml.hac_vertical(summary_df, output.hac_image_vertical, output.hac_clusters_vertical, **hac_params) ml.hac_horizontal(summary_df, output.hac_image_horizontal, output.hac_clusters_horizontal, **hac_params) ml.pca(summary_df, output.pca_image, output.pca_variance, output.pca_coordinates, **pca_params) @@ -353,7 +387,7 @@ rule jaccard_similarity: jaccard_similarity_matrix = SEP.join([out_dir, '{dataset}-ml', 'jaccard-matrix.txt']), jaccard_similarity_heatmap = SEP.join([out_dir, '{dataset}-ml', 'jaccard-heatmap.png']) run: - summary_df = ml.summarize_networks(input.pathways) + summary_df = ml.summarize_networks(filter_successful(input.pathways)) ml.jaccard_similarity_eval(summary_df, output.jaccard_similarity_matrix, output.jaccard_similarity_heatmap) @@ -364,7 +398,7 @@ rule ensemble: output: ensemble_network_file = SEP.join([out_dir,'{dataset}-ml', 'ensemble-pathway.txt']) run: - summary_df = ml.summarize_networks(input.pathways) + summary_df = ml.summarize_networks(filter_successful(input.pathways)) ml.ensemble_network(summary_df, output.ensemble_network_file) # Returns all pathways for a specific algorithm @@ -386,7 +420,7 @@ rule ml_analysis_aggregate_algo: hac_image_horizontal = SEP.join([out_dir, '{dataset}-ml', '{algorithm}-hac-horizontal.png']), hac_clusters_horizontal = SEP.join([out_dir, '{dataset}-ml', '{algorithm}-hac-clusters-horizontal.txt']), run: - summary_df = ml.summarize_networks(input.pathways) + summary_df = ml.summarize_networks(filter_successful(input.pathways)) ml.hac_vertical(summary_df, output.hac_image_vertical, output.hac_clusters_vertical, **hac_params) ml.hac_horizontal(summary_df, output.hac_image_horizontal, output.hac_clusters_horizontal, **hac_params) ml.pca(summary_df, output.pca_image, output.pca_variance, output.pca_coordinates, **pca_params) @@ -398,7 +432,7 @@ rule ensemble_per_algo: output: ensemble_network_file = SEP.join([out_dir,'{dataset}-ml', '{algorithm}-ensemble-pathway.txt']) run: - summary_df = ml.summarize_networks(input.pathways) + summary_df = ml.summarize_networks(filter_successful(input.pathways)) ml.ensemble_network(summary_df, output.ensemble_network_file) # Calculated Jaccard similarity between output pathways for each dataset per algorithm @@ -409,7 +443,7 @@ rule jaccard_similarity_per_algo: jaccard_similarity_matrix = SEP.join([out_dir, '{dataset}-ml', '{algorithm}-jaccard-matrix.txt']), jaccard_similarity_heatmap = SEP.join([out_dir, '{dataset}-ml', '{algorithm}-jaccard-heatmap.png']) run: - summary_df = ml.summarize_networks(input.pathways) + summary_df = ml.summarize_networks(filter_successful(input.pathways)) ml.jaccard_similarity_eval(summary_df, output.jaccard_similarity_matrix, output.jaccard_similarity_heatmap) # Return the gold standard pickle file for a specific gold standard @@ -440,7 +474,7 @@ rule evaluation_pr_per_pathways: node_pr_png = SEP.join([out_dir, '{dataset_gold_standard_pair}-eval', 'pr-per-pathway-nodes.png']), run: node_table = Evaluation.from_file(input.node_gold_standard_file).node_table - pr_df = Evaluation.node_precision_and_recall(input.pathways, node_table) + pr_df = Evaluation.node_precision_and_recall(filter_successful(input.pathways), node_table) Evaluation.precision_and_recall_per_pathway(pr_df, output.node_pr_file, output.node_pr_png) # Returns all pathways for a specific algorithm and dataset @@ -459,7 +493,7 @@ rule evaluation_per_algo_pr_per_pathways: node_pr_png = SEP.join([out_dir, '{dataset_gold_standard_pair}-eval', 'pr-per-pathway-for-{algorithm}-nodes.png']), run: node_table = Evaluation.from_file(input.node_gold_standard_file).node_table - pr_df = Evaluation.node_precision_and_recall(input.pathways, node_table) + pr_df = Evaluation.node_precision_and_recall(filter_successful(input.pathways), node_table) Evaluation.precision_and_recall_per_pathway(pr_df, output.node_pr_file, output.node_pr_png, include_aggregate_algo_eval) # Return pathway summary file per dataset diff --git a/config/config.yaml b/config/config.yaml index fc718e3c..b0e1df94 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -73,65 +73,75 @@ algorithms: include: true runs: run1: - k: range(100,201,100) + params: + k: range(100,201,100) - name: "omicsintegrator1" include: true runs: run1: - b: [5, 6] - w: np.linspace(0,5,2) - d: 10 - dummy_mode: "file" # Or "terminals", "all", "others" + params: + b: [5, 6] + w: np.linspace(0,5,2) + d: 10 + dummy_mode: "file" # Or "terminals", "all", "others" - name: "omicsintegrator2" include: true runs: run1: - b: 4 - g: 0 + params: + b: 4 + g: 0 run2: - b: 2 - g: 3 + params: + b: 2 + g: 3 - name: "meo" include: true runs: run1: - max_path_length: 3 - local_search: true - rand_restarts: 10 + params: + max_path_length: 3 + local_search: true + rand_restarts: 10 - name: "mincostflow" include: true runs: run1: - flow: 1 - capacity: 1 + params: + flow: 1 + capacity: 1 - name: "allpairs" include: true + timeout: 1d - name: "domino" include: true runs: run1: - slice_threshold: 0.3 - module_threshold: 0.05 + params: + slice_threshold: 0.3 + module_threshold: 0.05 - name: "strwr" include: true runs: run1: - alpha: [0.85] - threshold: [100, 200] + params: + alpha: [0.85] + threshold: [100, 200] - name: "rwr" include: true runs: run1: - alpha: [0.85] - threshold: [100, 200] + params: + alpha: [0.85] + threshold: [100, 200] - name: "bowtiebuilder" include: true @@ -140,12 +150,14 @@ algorithms: include: true runs: run1: - gamma: [10] + params: + gamma: [10] - name: "diamond" include: true runs: run1: - n: 1 + params: + n: 1 # Here we specify which pathways to run and other file location information. # DataLoader.py can currently only load a single dataset diff --git a/config/egfr.yaml b/config/egfr.yaml index b93c593c..623b0121 100644 --- a/config/egfr.yaml +++ b/config/egfr.yaml @@ -31,77 +31,89 @@ algorithms: include: true runs: run1: - k: - - 10 - - 20 - - 70 + params: + k: + - 10 + - 20 + - 70 - name: omicsintegrator1 include: true runs: run1: - b: - - 0.55 - - 2 - - 10 - d: 10 - g: 1e-3 - r: 0.01 - w: 0.1 - mu: 0.008 - dummy_mode: ["file"] + params: + b: + - 0.55 + - 2 + - 10 + d: 10 + g: 1e-3 + r: 0.01 + w: 0.1 + mu: 0.008 + dummy_mode: ["file"] - name: omicsintegrator2 include: true runs: run1: - b: 4 - g: 0 + params: + b: 4 + g: 0 run2: - b: 2 - g: 3 + params: + b: 2 + g: 3 - name: meo include: true runs: run1: - local_search: true - max_path_length: 3 - rand_restarts: 10 + params: + local_search: true + max_path_length: 3 + rand_restarts: 10 run2: - local_search: false - max_path_length: 2 - rand_restarts: 10 + params: + local_search: false + max_path_length: 2 + rand_restarts: 10 - name: allpairs include: true - name: domino include: true runs: run1: - slice_threshold: 0.3 - module_threshold: 0.05 + params: + slice_threshold: 0.3 + module_threshold: 0.05 - name: mincostflow include: true runs: run1: - capacity: 15 - flow: 80 + params: + capacity: 15 + flow: 80 run2: - capacity: 1 - flow: 6 + params: + capacity: 1 + flow: 6 run3: - capacity: 5 - flow: 60 + params: + capacity: 5 + flow: 60 - name: "strwr" include: true runs: run1: - alpha: [0.85] - threshold: [100, 200] + params: + alpha: [0.85] + threshold: [100, 200] - name: "rwr" include: true runs: run1: - alpha: [0.85] - threshold: [100, 200] + params: + alpha: [0.85] + threshold: [100, 200] - name: "bowtiebuilder" include: false diff --git a/docs/design/errors.rst b/docs/design/errors.rst new file mode 100644 index 00000000..4d436601 --- /dev/null +++ b/docs/design/errors.rst @@ -0,0 +1,43 @@ +######## + Errors +######## + +By default, whenever SPRAS runs into a container error (i.e. an internal +algorithm error), the full workflow will fail. However, there are +certain designated errors where we don't want this to be the case. + +Due to the following design constraints: + +- Snakemake does not have support for such errors (the closest being + ``--keep-going``, which unnecessarily runs failed runs) +- SPRAS occasionally outputs empty files as genuine output +- We need to log errors that happen for user knowledge + +we opt to use an ``artifact-info.json`` file, which keeps track of the +success/failure status at certain failable parts of the workflow. This +file contains whether or not this part of the workflow succeeded, and if +it failed, how it failed. + +The ``artifact-info.json`` stores a JSON object, containing: + +- The key ``status``, which is either the value ``success`` or + ``error``, depending on what happened in the workflow. + +- If ``status`` is ``error``, we store associated error details in the + ``details`` key, which contains an object: + + - The ``details`` object varies per error in the form of a tagged + union: they are differentiated by the ``type`` key. We describe + each error down below. + +************* + Error Types +************* + +With the above schema, we detail the ``details`` key below. + +Timeout +======= + +Timeout has ``type: "timeout"``, with a single key ``duration``, which +describes the ``timeout`` value associated to that run. diff --git a/docs/fordevs/spras.config.rst b/docs/fordevs/spras.config.rst index f7161447..a0f6f2e9 100644 --- a/docs/fordevs/spras.config.rst +++ b/docs/fordevs/spras.config.rst @@ -6,6 +6,15 @@ Submodules ************ +******************************** + spras.config.algorithms module +******************************** + +.. automodule:: spras.config.algorithms + :members: + :undoc-members: + :show-inheritance: + **************************** spras.config.config module **************************** @@ -15,6 +24,33 @@ :undoc-members: :show-inheritance: +************************************** + spras.config.container_schema module +************************************** + +.. automodule:: spras.config.container_schema + :members: + :undoc-members: + :show-inheritance: + +***************************** + spras.config.dataset module +***************************** + +.. automodule:: spras.config.dataset + :members: + :undoc-members: + :show-inheritance: + +****************************** + spras.config.revision module +****************************** + +.. automodule:: spras.config.revision + :members: + :undoc-members: + :show-inheritance: + **************************** spras.config.schema module **************************** diff --git a/docs/fordevs/spras.rst b/docs/fordevs/spras.rst index 8bd5060f..d63f52e9 100644 --- a/docs/fordevs/spras.rst +++ b/docs/fordevs/spras.rst @@ -52,6 +52,15 @@ :undoc-members: :show-inheritance: +********************** + spras.diamond module +********************** + +.. automodule:: spras.diamond + :members: + :undoc-members: + :show-inheritance: + ********************* spras.domino module ********************* @@ -61,6 +70,15 @@ :undoc-members: :show-inheritance: +********************* + spras.errors module +********************* + +.. automodule:: spras.errors + :members: + :undoc-members: + :show-inheritance: + ************************* spras.evaluation module ************************* @@ -142,6 +160,15 @@ :undoc-members: :show-inheritance: +************************ + spras.profiling module +************************ + +.. automodule:: spras.profiling + :members: + :undoc-members: + :show-inheritance: + ************************** spras.responsenet module ************************** diff --git a/docs/index.rst b/docs/index.rst index c8aae5e6..42fc92f8 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -40,6 +40,7 @@ reconstruction methods (PRMs) to omics data. output htcondor + timeout .. toctree:: :maxdepth: 1 @@ -62,6 +63,12 @@ reconstruction methods (PRMs) to omics data. contributing/patching contributing/design +.. toctree:: + :maxdepth: 1 + :caption: Internal Designs + + design/errors + .. toctree:: :maxdepth: 1 :caption: Tutorials diff --git a/docs/timeout.rst b/docs/timeout.rst new file mode 100644 index 00000000..e58b2e50 --- /dev/null +++ b/docs/timeout.rst @@ -0,0 +1,60 @@ +########## + Timeouts +########## + +The SPRAS global configuration can take optional per-algorithm timeouts. +For example, to give a specific run of the PathLinker algorithm a 1 day +timeout: + +.. code:: yaml + + - name: "pathlinker" + include: true + runs: + run1: + timeout: 1d + params: + k: 200 + +The timeout string parsing is delegated to `pytimeparse +`__ (examples linked here). This +allows for more complicated timeout strings, such as ``3d2h32m``. + +If ``timeout`` is not specified, the algorithm will never be interrupted +due to running too long. + +**NOTE**: This feature only works with docker and apptainer/singularity +at the time of writing, not dsub. + +********************* + Configuration notes +********************* + +Since ``timeout`` is a run parameter, it can also be moved to the top +level of an algorithm configuration, where that value will become the +default unless otherwise specified in a specific run. + +.. code:: yaml + + - name: "pathlinker" + include: true + timeout: 1d + runs: + run1: + # this uses timeout: 2d + timeout: 2d + params: + k: 200 + run2: + # this uses timeout: 1d + params: + k: 100 + +This is also useful for algorithms which take in no parameters, such as +``allpairs``: + +.. code:: yaml + + - name: "allpairs" + include: true + timeout: 1d diff --git a/docs/tutorial/beginner.rst b/docs/tutorial/beginner.rst index a0846666..f4de401d 100644 --- a/docs/tutorial/beginner.rst +++ b/docs/tutorial/beginner.rst @@ -161,13 +161,15 @@ Algorithms params: include: true run1: - b: 0.1 - d: 10 - g: 1e-3 + params: + b: 0.1 + d: 10 + g: 1e-3 run2: - b: [0.55, 2, 10] - d: [10, 20] - g: 1e-3 + params: + b: [0.55, 2, 10] + d: [10, 20] + g: 1e-3 When defining an algorithm in the configuration file, its name must match one of the supported SPRAS algorithms. Each algorithm includes an diff --git a/environment.yml b/environment.yml index 4361834a..7fe14408 100644 --- a/environment.yml +++ b/environment.yml @@ -15,6 +15,7 @@ dependencies: - scikit-learn=1.7.0 - seaborn=0.13.2 - spython=0.3.14 + - pytimeparse=1.1.8 # conda-specific for dsub - python-dateutil=2.9.0 diff --git a/pyproject.toml b/pyproject.toml index bfc602c6..38074f77 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,7 @@ dependencies = [ "scikit-learn==1.7.0", "seaborn==0.13.2", "spython==0.3.14", + "pytimeparse==1.1.8", # toolchain deps "pip==25.3", diff --git a/spras/allpairs.py b/spras/allpairs.py index 51ba5432..b4e542ad 100644 --- a/spras/allpairs.py +++ b/spras/allpairs.py @@ -2,6 +2,7 @@ from pathlib import Path from spras.config.container_schema import ProcessedContainerSettings +from spras.config.runs import RunSettings from spras.config.util import Empty from spras.containers import prepare_volume, run_container_and_log from spras.dataset import Dataset @@ -72,8 +73,9 @@ def generate_inputs(data: Dataset, filename_map): header=["#Interactor1", "Interactor2", "Weight"]) @staticmethod - def run(inputs, output_file, args=None, container_settings=None): + def run(inputs, output_file, args=None, container_settings=None, run_settings=None): if not container_settings: container_settings = ProcessedContainerSettings() + if not run_settings: run_settings = RunSettings() AllPairs.validate_required_run_args(inputs) work_dir = '/apsp' @@ -109,7 +111,8 @@ def run(inputs, output_file, args=None, container_settings=None): volumes, work_dir, out_dir, - container_settings) + container_settings, + run_settings.timeout) @staticmethod def parse_output(raw_pathway_file, standardized_pathway_file, params): diff --git a/spras/analysis/cytoscape.py b/spras/analysis/cytoscape.py index e8489950..6eadfadd 100644 --- a/spras/analysis/cytoscape.py +++ b/spras/analysis/cytoscape.py @@ -58,5 +58,6 @@ def run_cytoscape(pathways: List[Union[str, PurePath]], output_file: str, contai # (https://github.com/Reed-CompBio/spras/pull/390/files#r2485100875) None, container_settings, + None, env) rmtree(cytoscape_output_dir) diff --git a/spras/btb.py b/spras/btb.py index 63b6c7ed..6ccfd3d5 100644 --- a/spras/btb.py +++ b/spras/btb.py @@ -1,6 +1,7 @@ from pathlib import Path from spras.config.container_schema import ProcessedContainerSettings +from spras.config.runs import RunSettings from spras.config.util import Empty from spras.containers import prepare_volume, run_container_and_log from spras.interactome import ( @@ -61,8 +62,9 @@ def generate_inputs(data, filename_map): # Skips parameter validation step @staticmethod - def run(inputs, output_file, args=None, container_settings=None): + def run(inputs, output_file, args=None, container_settings=None, run_settings=None): if not container_settings: container_settings = ProcessedContainerSettings() + if not run_settings: run_settings = RunSettings() BowTieBuilder.validate_required_run_args(inputs) # Tests for pytest (docker container also runs this) @@ -119,7 +121,8 @@ def run(inputs, output_file, args=None, container_settings=None): volumes, work_dir, out_dir, - container_settings) + container_settings, + run_settings.timeout) # Output is already written to raw-pathway.txt file diff --git a/spras/config/algorithms.py b/spras/config/algorithms.py index 552fbc4e..06bdc1cc 100644 --- a/spras/config/algorithms.py +++ b/spras/config/algorithms.py @@ -17,6 +17,7 @@ create_model, ) +from spras.config.runs import RunSettings from spras.runner import algorithms # This contains the dynamically generated algorithm schema for use in `schema.py` @@ -100,12 +101,6 @@ def construct_algorithm_model(name: str, model: type[BaseModel]) -> type[BaseMod - Ranges and other convenient calls are expanded (see `python_evalish_coerce`) """ - # Get the default model instance by trying to serialize the empty dictionary - try: - model_default = model.model_validate({}) - except ValidationError: - model_default = None - # First, we need to take our 'model' and coerce it to permit parameter combinations. # This assumes that all of the keys are flattened, so we only get a structure like so: # class AlgorithmParams(BaseModel): @@ -151,12 +146,30 @@ def construct_algorithm_model(name: str, model: type[BaseModel]) -> type[BaseMod # Pass this as kwargs to create_model, which usually takes in parameters field_name=type. # We do need to cast create_model, since otherwise the type-checker complains that we may # have had a key that starts with __ in mapped_list_fields. The above assertion prevents this. - run_model = (cast(Any, create_model))( - f'{name}RunModel', + params_model = (cast(Any, create_model))( + f'{name}ParamModel', __config__=ConfigDict(extra='forbid'), **mapped_list_field ) + # Get the default model instance by trying to serialize the empty dictionary + try: + params_model_default = params_model.model_validate({}) + except ValidationError: + params_model_default = None + + # Then, we create a wrapping `run_model` which contains our params_model, + # as well as any associated options with an individual run. + run_model = create_model( + f'{name}RunModel', + params=(params_model, params_model_default), + __base__=RunSettings, + __config__=ConfigDict(extra='forbid') + ) + + # We use `model_validate` instead of the `run_model` constructor since `run_model` is based off of `RunSettings` + run_model_default = None if params_model_default is None else run_model.model_validate({"params": params_model_default}) + # Here is an example of how this would look like inside config.yaml # name: pathlinker # include: true @@ -174,8 +187,11 @@ def construct_algorithm_model(name: str, model: type[BaseModel]) -> type[BaseMod # include: true # will run, despite there being no entries in `runs`. # (create_model entries take in either a type or (type, default)). - runs=dict[str, run_model] if model_default is None else (dict[str, run_model], {"default": model_default}), - __config__=ConfigDict(extra='forbid') + runs=dict[str, run_model] if run_model_default is None else (dict[str, run_model], {"default": run_model_default}), + __config__=ConfigDict(extra='forbid'), + # Note that both entire algorithms and their runs inherit from `RunSettings`, to allow default runs such as + # `allpairs` to have run-specific settings (e.g. allpairs with timeout.) + __base__=RunSettings ) algorithm_models: list[type[BaseModel]] = [construct_algorithm_model(name, model.get_params_generic()) for name, model in algorithms.items()] diff --git a/spras/config/config.py b/spras/config/config.py index 0a4670a4..dbc68b5b 100644 --- a/spras/config/config.py +++ b/spras/config/config.py @@ -24,6 +24,7 @@ from spras.config.container_schema import ProcessedContainerSettings from spras.config.revision import attach_spras_revision, spras_revision +from spras.config.runs import RunSettings from spras.config.schema import DatasetSchema, RawConfig from spras.util import LoosePathLike, NpHashEncoder, hash_params_sha1_base32 @@ -61,6 +62,8 @@ def __init__(self, raw_config: dict[str, Any]): self.hash_length = parsed_raw_config.hash_length # Container settings used by PRMs. self.container_settings = ProcessedContainerSettings.from_container_settings(parsed_raw_config.containers, self.hash_length) + # Dictionary of parameter hashes to their respective run settings + self.algorithm_param_run_settings: dict[str, RunSettings] = dict() # A nested dict mapping algorithm names to dicts that map parameter hashes to parameter combinations. # Only includes algorithms that are set to be run with 'include: true'. self.algorithm_params: dict[str, dict[str, Any]] = dict() @@ -182,7 +185,7 @@ def process_algorithms(self, raw_config: RawConfig): # We create the product of all param combinations for each run param_name_list = [] # We convert our run parameters to a dictionary, allowing us to iterate over it - run_subscriptable = vars(runs[run_name]) + run_subscriptable = vars(runs[run_name].params) for param in run_subscriptable: param_name_list.append(param) # this is guaranteed to be list[Any] by algorithms.py @@ -218,6 +221,11 @@ def process_algorithms(self, raw_config: RawConfig): self.algorithm_params[alg.name][params_hash] = run_dict + # We finalize by handling any associated information to each parameter hash. + self.algorithm_param_run_settings[params_hash] = RunSettings( + timeout=runs[run_name].timeout or alg.timeout + ) + def process_analysis(self, raw_config: RawConfig): if not raw_config.analysis: return diff --git a/spras/config/runs.py b/spras/config/runs.py new file mode 100644 index 00000000..5d33568c --- /dev/null +++ b/spras/config/runs.py @@ -0,0 +1,24 @@ +from typing import Annotated, Optional + +from pydantic import BaseModel, BeforeValidator, ConfigDict +from pytimeparse import parse + + +def validate_duration(value): + if isinstance(value, int): return value + parsed_duration = parse(value, granularity='seconds') + if not parsed_duration: raise RuntimeError(f"Encountered unparsable duration string '{value}'.") + return parsed_duration + +PyDateTimeDuration = Annotated[ + int, + BeforeValidator(validate_duration) +] + +class RunSettings(BaseModel): + """All of the non-parameter settings associated with a run.""" + + timeout: Optional[PyDateTimeDuration] = None + """The associated timeout with a run, parsed with `pytimeparse`.""" + + model_config = ConfigDict(extra='forbid', use_attribute_docstrings=True) diff --git a/spras/containers.py b/spras/containers.py index 124b9741..19b7ee6e 100644 --- a/spras/containers.py +++ b/spras/containers.py @@ -1,3 +1,4 @@ +import datetime import os import platform import re @@ -8,6 +9,7 @@ import docker import docker.errors +import requests from spras.config.container_schema import ProcessedContainerSettings from spras.logging import indent @@ -166,6 +168,20 @@ def streams_contain(self, needle: str): def __str__(self): return self.message +class TimeoutError(RuntimeError): + """Raises when a function times out.""" + timeout: int + message: str + + def __init__(self, timeout: int, *args): + self.timeout = timeout + self.message = f"Timed out after {datetime.timedelta(seconds=timeout)}." + + super(TimeoutError, self).__init__(timeout, *args) + + def __str__(self): + return self.message + def env_to_items(environment: dict[str, str]) -> Iterator[str]: """ Turns an environment variable dictionary to KEY=VALUE pairs. @@ -176,7 +192,17 @@ def env_to_items(environment: dict[str, str]) -> Iterator[str]: # TODO consider a better default environment variable # Follow docker-py's naming conventions (https://docker-py.readthedocs.io/en/stable/containers.html) # Technically the argument is an image, not a container, but we use container here. -def run_container(container_suffix: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, out_dir: str | os.PathLike, container_settings: ProcessedContainerSettings, environment: Optional[dict[str, str]] = None, network_disabled = False): +def run_container( + container_suffix: str, + command: List[str], + volumes: List[Tuple[PurePath, PurePath]], + working_dir: str, + out_dir: str | os.PathLike, + container_settings: ProcessedContainerSettings, + timeout: Optional[int], + environment: Optional[dict[str, str]] = None, + network_disabled = False +): """ Runs a command in the container using Singularity or Docker @param container_suffix: name of the DockerHub container without the 'docker://' prefix @@ -185,6 +211,7 @@ def run_container(container_suffix: str, command: List[str], volumes: List[Tuple @param working_dir: the working directory in the container @param container_settings: the settings to use to run the container @param out_dir: output directory for the rule's artifacts. Only passed into run_container_singularity for the purpose of profiling. + @param timeout: the timeout (in seconds), throwing a TimeoutException if the timeout is reached. @param environment: environment variables to set in the container @param network_disabled: Disables the network on the container. Only works for docker for now. This acts as a 'runtime assertion' that a container works w/o networking. @return: output from Singularity execute or Docker run @@ -193,15 +220,25 @@ def run_container(container_suffix: str, command: List[str], volumes: List[Tuple container = container_settings.prefix + "/" + container_suffix if normalized_framework == 'docker': - return run_container_docker(container, command, volumes, working_dir, environment, network_disabled) + return run_container_docker(container, command, volumes, working_dir, environment, timeout, network_disabled) elif normalized_framework == 'singularity' or normalized_framework == "apptainer": - return run_container_singularity(container, command, volumes, working_dir, out_dir, container_settings, environment) + return run_container_singularity(container, command, volumes, working_dir, out_dir, container_settings, environment, timeout) elif normalized_framework == 'dsub': return run_container_dsub(container, command, volumes, working_dir, environment) else: raise ValueError(f'{container_settings.framework} is not a recognized container framework. Choose "docker", "dsub", "apptainer", or "singularity".') -def run_container_and_log(name: str, container_suffix: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, out_dir: str | os.PathLike, container_settings: ProcessedContainerSettings, environment: Optional[dict[str, str]] = None, network_disabled=False): +def run_container_and_log( + name: str, + container_suffix: str, + command: List[str], + volumes: List[Tuple[PurePath, PurePath]], + working_dir: str, out_dir: str | os.PathLike, + container_settings: ProcessedContainerSettings, + timeout: Optional[int], + environment: Optional[dict[str, str]] = None, + network_disabled=False +): """ Runs a command in the container using Singularity or Docker with associated pretty printed messages. @param name: the display name of the running container for logging purposes @@ -210,6 +247,7 @@ def run_container_and_log(name: str, container_suffix: str, command: List[str], @param volumes: a list of volumes to mount where each item is a (source, destination) tuple @param working_dir: the working directory in the container @param container_settings: the container settings to use + @param timeout: the timeout (in seconds), throwing a TimeoutException if the timeout is reached. @param environment: environment variables to set in the container @param network_disabled: Disables the network on the container. Only works for docker for now. This acts as a 'runtime assertion' that a container works w/o networking. @return: output from Singularity execute or Docker run @@ -219,7 +257,17 @@ def run_container_and_log(name: str, container_suffix: str, command: List[str], print('Running {} on container framework "{}" on env {} with command: {}'.format(name, container_settings.framework, list(env_to_items(environment)), ' '.join(command)), flush=True) try: - out = run_container(container_suffix=container_suffix, command=command, volumes=volumes, working_dir=working_dir, out_dir=out_dir, container_settings=container_settings, environment=environment, network_disabled=network_disabled) + out = run_container( + container_suffix=container_suffix, + command=command, + volumes=volumes, + working_dir=working_dir, + out_dir=out_dir, + container_settings=container_settings, + timeout=timeout, + environment=environment, + network_disabled=network_disabled + ) if out is not None: if isinstance(out, list): out = ''.join(out) @@ -250,7 +298,15 @@ def run_container_and_log(name: str, container_suffix: str, command: List[str], raise ContainerError(message, err.exit_status, stdout, stderr) from None # TODO any issue with creating a new client each time inside this function? -def run_container_docker(container: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, environment: Optional[dict[str, str]] = None, network_disabled=False): +def run_container_docker( + container: str, + command: List[str], + volumes: List[Tuple[PurePath, PurePath]], + working_dir: str, + environment: Optional[dict[str, str]] = None, + timeout: Optional[int] = None, + network_disabled=False +): """ Runs a command in the container using Docker. Attempts to automatically correct file owner and group for new files created by the container, setting them to the @@ -261,6 +317,8 @@ def run_container_docker(container: str, command: List[str], volumes: List[Tuple @param volumes: a list of volumes to mount where each item is a (source, destination) tuple @param working_dir: the working directory in the container @param environment: environment variables to set in the container + @param timeout: the timeout (in seconds), throwing a TimeoutException if the timeout is reached. + @param network_disabled: if enabled, disables the underlying network: useful when containers don't fetch any online resources. @return: output from Docker run, or will error if the container errored. """ @@ -290,13 +348,28 @@ def run_container_docker(container: str, command: List[str], volumes: List[Tuple bind_paths = [f'{prepare_path_docker(src)}:{dest}' for src, dest in volumes] - out = client.containers.run(container, - command, - stderr=True, - volumes=bind_paths, - working_dir=working_dir, - network_disabled=network_disabled, - environment=environment).decode('utf-8') + # We detach the container, allowing dockerpy to return a + # `Container` object for our further use. This is currently only + # to set docker-based container timeouts. + container_obj = client.containers.run( + container, + command, + volumes=bind_paths, + working_dir=working_dir, + network_disabled=network_disabled, + environment=environment, + detach=True + ) + + try: + container_obj.wait(timeout=timeout) + except requests.exceptions.ReadTimeout as err: + container_obj.stop() + client.close() + if timeout: raise TimeoutError(timeout) from err + else: raise RuntimeError("Timeout error but no timeout specified. Please file an issue with this error and stacktrace at https://github.com/Reed-CompBio/spras/issues/new.") from None + + out = container_obj.attach(stderr=True).decode('utf-8') # TODO does this cleanup need to still run even if there was an error in the above run command? # On Unix, files written by the above Docker run command will be owned by root and cannot be modified @@ -345,7 +418,16 @@ def run_container_docker(container: str, command: List[str], volumes: List[Tuple return out -def run_container_singularity(container: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, out_dir: str, config: ProcessedContainerSettings, environment: Optional[dict[str, str]] = None): +def run_container_singularity( + container: str, + command: List[str], + volumes: List[Tuple[PurePath, PurePath]], + working_dir: str, + out_dir: str | os.PathLike, + config: ProcessedContainerSettings, + environment: Optional[dict[str, str]] = None, + timeout: Optional[int] = None, +): """ Runs a command in the container using Singularity. Only available on Linux. @@ -355,6 +437,7 @@ def run_container_singularity(container: str, command: List[str], volumes: List[ @param working_dir: the working directory in the container @param out_dir: output directory for the rule's artifacts -- used here to store profiling data @param environment: environment variable to set in the container + @param timeout: the timeout (in seconds), throwing a TimeoutException if the timeout is reached. @return: output from Singularity execute """ @@ -417,37 +500,44 @@ def run_container_singularity(container: str, command: List[str], volumes: List[ # If not using the expanded sandbox image, we still need to prepend the docker:// prefix # so apptainer knows to pull and convert the image format from docker to apptainer. image_to_run = expanded_image if expanded_image else "docker://" + container - if config.enable_profiling: - # We won't end up using the spython client if profiling is enabled because - # we need to run everything manually to set up the cgroup - # Build the apptainer run command, which gets passed to the cgroup wrapper script - singularity_cmd = [ - "apptainer", "exec" - ] - for bind in bind_paths: - singularity_cmd.extend(["--bind", bind]) - singularity_cmd.extend(singularity_options) - singularity_cmd.append(image_to_run) - singularity_cmd.extend(command) + # We won't end up using the spython client if profiling or timeout is enabled because + # we need to run everything manually to set up the cgroup and add the timeout command as a prefix. + # Build the apptainer run command, which gets passed to the cgroup wrapper script + cmd = [ + "apptainer", "exec" + ] + for bind in bind_paths: + cmd.extend(["--bind", bind]) + cmd.extend(singularity_options) + cmd.append(str(image_to_run)) + cmd.extend(command) + + my_cgroup: Optional[str] = None + if config.enable_profiling: my_cgroup = create_peer_cgroup() # The wrapper script is packaged with spras, and should be located in the same directory # as `containers.py`. wrapper = os.path.join(os.path.dirname(__file__), "cgroup_wrapper.sh") - cmd = [wrapper, my_cgroup] + singularity_cmd - proc = subprocess.run(cmd, capture_output=True, text=True, stderr=subprocess.STDOUT) + cmd = [wrapper, my_cgroup] + cmd + if timeout is not None: + cmd = ["timeout", f"{timeout}s"] + cmd + proc = subprocess.run(cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + + # As per unix `timeout`, this is the status if the command times out and --preserve-status is not initially specified + # (where the latter above holds). + if proc.returncode == 124: + if timeout is not None: + raise TimeoutError(timeout) + else: + raise RuntimeError("Timeout return code occurred, yet `timeout` wasn't specified. " + \ + "Please file an issue with this error and stacktrace at https://github.com/Reed-CompBio/spras/issues/new.") + if my_cgroup is not None: print("Reading memory and CPU stats from cgroup") - create_apptainer_container_stats(my_cgroup, out_dir) + create_apptainer_container_stats(my_cgroup, str(out_dir)) - result = proc.stdout - else: - result = Client.execute( - image=image_to_run, - command=command, - options=singularity_options, - bind=bind_paths - ) + result = proc.stdout return result diff --git a/spras/diamond.py b/spras/diamond.py index e8d5ed7d..65f1ac9b 100644 --- a/spras/diamond.py +++ b/spras/diamond.py @@ -3,6 +3,7 @@ from pydantic import BaseModel, ConfigDict from spras.config.container_schema import ProcessedContainerSettings +from spras.config.runs import RunSettings from spras.containers import ContainerError, prepare_volume, run_container_and_log from spras.dataset import Dataset from spras.interactome import ( @@ -63,8 +64,9 @@ def generate_inputs(data, filename_map): edges_df.to_csv(filename_map["network"], columns=["Interactor1", "Interactor2"], index=False, header=None, sep=',') @staticmethod - def run(inputs, output_file, args, container_settings=None): + def run(inputs, output_file, args, container_settings=None, run_settings=None): if not container_settings: container_settings = ProcessedContainerSettings() + if not run_settings: run_settings = RunSettings() DIAMOnD.validate_required_run_args(inputs) work_dir = '/diamond' @@ -100,7 +102,8 @@ def run(inputs, output_file, args, container_settings=None): volumes, work_dir, out_dir, - container_settings) + container_settings, + run_settings.timeout) except ContainerError as err: if err.streams_contain("KeyError: 'nix'"): raise RuntimeError(f"{err.stderr}\n" + \ diff --git a/spras/domino.py b/spras/domino.py index 2e96a298..1e419ecf 100644 --- a/spras/domino.py +++ b/spras/domino.py @@ -6,6 +6,7 @@ from pydantic import BaseModel, ConfigDict from spras.config.container_schema import ProcessedContainerSettings +from spras.config.runs import RunSettings from spras.config.util import BaseModel from spras.containers import ContainerError, prepare_volume, run_container_and_log from spras.interactome import ( @@ -79,9 +80,10 @@ def generate_inputs(data, filename_map): header=['ID_interactor_A', 'ppi', 'ID_interactor_B']) @staticmethod - def run(inputs, output_file, args=None, container_settings=None): - if not container_settings: container_settings = ProcessedContainerSettings() + def run(inputs, output_file, args=None, container_settings=None, run_settings=None): if not args: args = DominoParams() + if not container_settings: container_settings = ProcessedContainerSettings() + if not run_settings: run_settings = RunSettings() DOMINO.validate_required_run_args(inputs) work_dir = '/spras' @@ -117,7 +119,8 @@ def run(inputs, output_file, args=None, container_settings=None): volumes, work_dir, out_dir, - container_settings) + container_settings, + run_settings.timeout) except ContainerError as err: # Occurs when DOMINO gets passed some empty dataframe from network_file. # This counts as an empty input, so we return an empty output. @@ -151,7 +154,8 @@ def run(inputs, output_file, args=None, container_settings=None): volumes, work_dir, out_dir, - container_settings) + container_settings, + run_settings.timeout) except ContainerError as err: # Occurs when DOMINO gets passed some empty dataframe from network_file. # This counts as an empty input, so we return an empty output. diff --git a/spras/errors.py b/spras/errors.py new file mode 100644 index 00000000..49d4b74f --- /dev/null +++ b/spras/errors.py @@ -0,0 +1,83 @@ +""" +These are errors for the SPRAS workflow: we describe these as 'artifact' information, +as Snakemake produces artifacts, and the error/success status of these artifacts is associated with +a separate file, named `artifact-info.json`. Note that an `artifact-info.json` file is attached to a +part of the workflow run, which may produce multiple artifacts, and not a single artifact. + +This file makes some heavy use of pydantic discriminated unions and type adapters, +both of which happen to be described in the unions page: +https://pydantic.dev/docs/validation/latest/concepts/unions/#discriminated-unions +""" + +import json +from pathlib import Path +from typing import Annotated, Literal, Union + +from pydantic import BaseModel, Field, TypeAdapter + +from spras.util import LoosePathLike + + +class TimeoutArtifactError(BaseModel): + # We can't use the key `type` without some extra pydantic aliasing. + error_type: Literal['timeout'] = 'timeout' + duration: int + +# NOTE: when we have several errors, replace this with Union[TimeoutError, , ...] +"""All possible distinguished errors.""" +ArtifactErrorOptions = TimeoutArtifactError + +class ArtifactError(BaseModel): + """ + One of the two variants of artifact information describing errors. See `ArtifactSuccess` for the other option. + + This variant is returned when we have a failing point in the workflow, + with `details` delegating to `ArtifactErrorOptions` for more information about the error. + """ + details: ArtifactErrorOptions = Field(discriminator="error_type") + status: Literal['error'] = 'error' + +class ArtifactSuccess(BaseModel): + """ + One of the two variants of artifact information describing successes. See `ArtifactError` for the other option. + + This variant only says that this part of the workflow succeeded. + """ + status: Literal['success'] = 'success' + +"""Describes what happened to a [potentially collection of] artifacts at a point in the workflow.""" +ArtifactInfo = Annotated[Union[ArtifactError, ArtifactSuccess], Field(discriminator="status")] +ArtifactInfoAdapter = TypeAdapter[ArtifactInfo](ArtifactInfo) + +# Collection of Snakemake utilities. + +def artifact_info_to_str(artifact_info: ArtifactInfo) -> str: + """Converts some `ArtifactInfo` into a string.""" + return json.dumps(ArtifactInfoAdapter.validate_python(artifact_info).model_dump(mode='json')) + +def artifact_info_from_file(file: LoosePathLike) -> ArtifactInfo: + """Converts a file into ArtifactInfo.""" + with open(file, 'r') as f: + return ArtifactInfoAdapter.validate_python(json.load(f)) + +def mark_error(file: LoosePathLike, artifact_error: ArtifactErrorOptions): + """Marks an artifact information file as an error with associated details.""" + Path(file).write_text(artifact_info_to_str(ArtifactError(details=artifact_error))) + +def mark_success(file: LoosePathLike): + """Marks an artifact information file as successful""" + Path(file).write_text(artifact_info_to_str(ArtifactSuccess())) + +def is_error(file: LoosePathLike): + """Checks if a file was produced by `mark_error`. This is close to, but not the negation of, `is_success`. """ + try: + return artifact_info_from_file(file).status == "error" + except ValueError: + return False + +def is_success(file: LoosePathLike): + """Checks if a file was produced by mark_success. This is close to, but not the negation of, `is_error`. """ + try: + return artifact_info_from_file(file).status == "success" + except ValueError: + return False diff --git a/spras/meo.py b/spras/meo.py index dcb7985e..003d6a62 100644 --- a/spras/meo.py +++ b/spras/meo.py @@ -5,6 +5,7 @@ from pydantic import BaseModel, ConfigDict from spras.config.container_schema import ProcessedContainerSettings +from spras.config.runs import RunSettings from spras.containers import prepare_volume, run_container_and_log from spras.interactome import ( add_directionality_constant, @@ -136,10 +137,9 @@ def generate_inputs(data, filename_map): edges.to_csv(filename_map['edges'], sep='\t', index=False, columns=['Interactor1', 'EdgeType', 'Interactor2', 'Weight'], header=False) - # TODO add parameter validation # TODO document required arguments @staticmethod - def run(inputs, output_file=None, args=None, container_settings=None): + def run(inputs, output_file, args=None, container_settings=None, run_settings=None): """ Run Maximum Edge Orientation in the Docker image with the provided parameters. The properties file is generated from the provided arguments. @@ -148,8 +148,9 @@ def run(inputs, output_file=None, args=None, container_settings=None): Only the edge output file is retained. All other output files are deleted. """ - if not container_settings: container_settings = ProcessedContainerSettings() if not args: args = MEOParams() + if not container_settings: container_settings = ProcessedContainerSettings() + if not run_settings: run_settings = RunSettings() MEO.validate_required_run_args(inputs) work_dir = '/spras' @@ -196,7 +197,8 @@ def run(inputs, output_file=None, args=None, container_settings=None): volumes, work_dir, out_dir, - container_settings) + container_settings, + run_settings.timeout) properties_file_local.unlink(missing_ok=True) diff --git a/spras/mincostflow.py b/spras/mincostflow.py index 96fb0e10..60f57900 100644 --- a/spras/mincostflow.py +++ b/spras/mincostflow.py @@ -4,6 +4,7 @@ from pydantic import BaseModel, ConfigDict from spras.config.container_schema import ProcessedContainerSettings +from spras.config.runs import RunSettings from spras.containers import prepare_volume, run_container_and_log from spras.interactome import ( convert_undirected_to_directed, @@ -71,9 +72,10 @@ def generate_inputs(data, filename_map): header=False) @staticmethod - def run(inputs, output_file, args=None, container_settings=None): - if not container_settings: container_settings = ProcessedContainerSettings() + def run(inputs, output_file, args=None, container_settings=None, run_settings=None): if not args: args = MinCostFlowParams() + if not container_settings: container_settings = ProcessedContainerSettings() + if not run_settings: run_settings = RunSettings() MinCostFlow.validate_required_run_args(inputs) # the data files will be mapped within this directory within the container @@ -122,7 +124,8 @@ def run(inputs, output_file, args=None, container_settings=None): volumes, work_dir, out_dir, - container_settings) + container_settings, + run_settings.timeout) # Check the output of the container out_dir_content = sorted(out_dir.glob('*.sif')) diff --git a/spras/omicsintegrator1.py b/spras/omicsintegrator1.py index 916b7c45..85f444b8 100644 --- a/spras/omicsintegrator1.py +++ b/spras/omicsintegrator1.py @@ -4,6 +4,7 @@ from pydantic import BaseModel, ConfigDict from spras.config.container_schema import ProcessedContainerSettings +from spras.config.runs import RunSettings from spras.config.util import CaseInsensitiveEnum from spras.containers import prepare_volume, run_container_and_log from spras.dataset import MissingDataError @@ -154,8 +155,9 @@ def generate_inputs(data, filename_map): # TODO add support for knockout argument # TODO add reasonable default values @staticmethod - def run(inputs, output_file, args, container_settings=None): + def run(inputs, output_file, args, container_settings=None, run_settings=None): if not container_settings: container_settings = ProcessedContainerSettings() + if not run_settings: run_settings = RunSettings() OmicsIntegrator1.validate_required_run_args(inputs, ["dummy_nodes"]) work_dir = '/spras' @@ -227,6 +229,7 @@ def run(inputs, output_file, args, container_settings=None): work_dir, out_dir, container_settings, + run_settings.timeout, {'TMPDIR': mapped_out_dir}) conf_file_local.unlink(missing_ok=True) diff --git a/spras/omicsintegrator2.py b/spras/omicsintegrator2.py index b6c18efd..7d11d267 100644 --- a/spras/omicsintegrator2.py +++ b/spras/omicsintegrator2.py @@ -5,6 +5,7 @@ from pydantic import BaseModel, ConfigDict from spras.config.container_schema import ProcessedContainerSettings +from spras.config.runs import RunSettings from spras.config.util import CaseInsensitiveEnum from spras.containers import prepare_volume, run_container_and_log from spras.dataset import Dataset, MissingDataError @@ -105,9 +106,10 @@ def generate_inputs(data: Dataset, filename_map): header=['protein1', 'protein2', 'cost']) @staticmethod - def run(inputs, output_file, args=None, container_settings=None): - if not container_settings: container_settings = ProcessedContainerSettings() + def run(inputs, output_file, args=None, container_settings=None, run_settings=None): if not args: args = OmicsIntegrator2Params() + if not container_settings: container_settings = ProcessedContainerSettings() + if not run_settings: run_settings = RunSettings() OmicsIntegrator2.validate_required_run_args(inputs) work_dir = '/spras' @@ -157,6 +159,7 @@ def run(inputs, output_file, args=None, container_settings=None): work_dir, out_dir, container_settings, + run_settings.timeout, network_disabled=True) # TODO do we want to retain other output files? diff --git a/spras/pathlinker.py b/spras/pathlinker.py index 7f070f55..6a05509f 100644 --- a/spras/pathlinker.py +++ b/spras/pathlinker.py @@ -4,6 +4,7 @@ from pydantic import BaseModel, ConfigDict from spras.config.container_schema import ProcessedContainerSettings +from spras.config.runs import RunSettings from spras.containers import prepare_volume, run_container_and_log from spras.dataset import Dataset from spras.interactome import ( @@ -73,9 +74,10 @@ def generate_inputs(data, filename_map): header=["#Interactor1","Interactor2","Weight"]) @staticmethod - def run(inputs, output_file, args=None, container_settings=None): - if not container_settings: container_settings = ProcessedContainerSettings() + def run(inputs, output_file, args=None, container_settings=None, run_settings=None): if not args: args = PathLinkerParams() + if not run_settings: run_settings = RunSettings() + if not container_settings: container_settings = ProcessedContainerSettings() PathLinker.validate_required_run_args(inputs) work_dir = '/spras' @@ -113,7 +115,8 @@ def run(inputs, output_file, args=None, container_settings=None): volumes, work_dir, out_dir, - container_settings) + container_settings, + run_settings.timeout) # Rename the primary output file to match the desired output filename # Currently PathLinker only writes one output file so we do not need to delete others diff --git a/spras/prm.py b/spras/prm.py index 18f3c8a9..d7e207a0 100644 --- a/spras/prm.py +++ b/spras/prm.py @@ -6,6 +6,7 @@ from pydantic import BaseModel from spras.config.container_schema import ProcessedContainerSettings +from spras.config.runs import RunSettings from spras.dataset import Dataset from spras.util import LoosePathLike @@ -60,9 +61,10 @@ def get_params_generic(cls) -> type[T]: # This is used in `runner.py` to avoid a dependency diamond when trying # to import the actual algorithm schema. @classmethod - def run_typeless(cls, inputs: dict[str, str | os.PathLike], output_file: str | os.PathLike, args: dict[str, Any], container_settings: ProcessedContainerSettings): + def run_typeless(cls, inputs: dict[str, str | os.PathLike], output_file: str | os.PathLike, args: dict[str, Any], container_settings: ProcessedContainerSettings, run_settings: RunSettings): """ - This is similar to PRA.run, but it does pydantic logic internally to re-validate argument parameters. + This is similar to PRA.run, but `args` is a dictionary and not a pydantic structure. + However, this method still re-validates `args` against the associated pydantic PRM argument model. """ T_class = cls.get_params_generic() @@ -76,17 +78,23 @@ def run_typeless(cls, inputs: dict[str, str | os.PathLike], output_file: str | o # (Pydantic already provides nice error messages, so we don't need to worry about catching this.) T_parsed = T_class.model_validate(args) - return cls.run(inputs, output_file, T_parsed, container_settings) + return cls.run(inputs, output_file, T_parsed, container_settings, run_settings) @staticmethod @abstractmethod - def run(inputs: dict[str, str | os.PathLike], output_file: str | os.PathLike, args: T, container_settings: ProcessedContainerSettings): + def run(inputs: dict[str, str | os.PathLike], output_file: str | os.PathLike, args: T, container_settings: ProcessedContainerSettings, run_settings: RunSettings): """ - Runs an algorithm with the specified inputs, algorithm params (T), - the designated output_file, and the desired container_settings. - - See the algorithm-specific `generate_inputs` and `parse_output` + Runs an algorithm. + @param inputs: specified inputs + @param output_file: designated reconstructed pathway output + @param args: (T) typed algorithm params + @param container_settings: what settings should be associated with the individual container. + @param run_settings: The particular run settings to use. See `RunSettings` for more info. + + See the algorithm-specific `PRM.generate_inputs` and `PRM.parse_output` for information about the input and output format. + + Also see `PRM.run_typeless` for the non-pydantic version of this method (where `args` is a dict). """ raise NotImplementedError diff --git a/spras/responsenet.py b/spras/responsenet.py index f53b5984..c3c599cd 100644 --- a/spras/responsenet.py +++ b/spras/responsenet.py @@ -3,6 +3,7 @@ from pydantic import BaseModel, ConfigDict from spras.config.container_schema import ProcessedContainerSettings +from spras.config.runs import RunSettings from spras.containers import prepare_volume, run_container_and_log from spras.interactome import ( convert_undirected_to_directed, @@ -63,10 +64,11 @@ def generate_inputs(data, filename_map): header=False) @staticmethod - def run(inputs, output_file, args=None, container_settings=None): + def run(inputs, output_file, args=None, container_settings=None, run_settings=None): + if not args: args = ResponseNetParams() + if not run_settings: run_settings = RunSettings() if not container_settings: container_settings = ProcessedContainerSettings() ResponseNet.validate_required_run_args(inputs) - if not args: args = ResponseNetParams() # the data files will be mapped within this directory within the container work_dir = '/ResponseNet' @@ -112,7 +114,8 @@ def run(inputs, output_file, args=None, container_settings=None): volumes, work_dir, out_dir, - container_settings) + container_settings, + run_settings.timeout) # Rename the primary output file to match the desired output filename out_file_suffixed.rename(output_file) diff --git a/spras/runner.py b/spras/runner.py index fb9391f9..994cf5c7 100644 --- a/spras/runner.py +++ b/spras/runner.py @@ -1,8 +1,11 @@ +from os import PathLike from typing import Any, Mapping # supported algorithm imports from spras.allpairs import AllPairs from spras.btb import BowTieBuilder +from spras.config.container_schema import ProcessedContainerSettings +from spras.config.runs import RunSettings from spras.dataset import Dataset, DatasetSchema from spras.diamond import DIAMOnD from spras.domino import DOMINO @@ -38,14 +41,21 @@ def get_algorithm(algorithm: str) -> type[PRM]: except KeyError as exc: raise NotImplementedError(f'{algorithm} is not currently supported.') from exc -def run(algorithm: str, inputs, output_file, args, container_settings): +def run( + algorithm: str, + inputs: dict[str, str | PathLike], + output_file: str | PathLike, + args: dict[str, Any], + container_settings: ProcessedContainerSettings, + run_settings: RunSettings +): """ A generic interface to the algorithm-specific run functions """ algorithm_runner = get_algorithm(algorithm) # We can't use config.config here else we would get a cyclic dependency. # Since args is a dict here, we use the 'run_typeless' utility PRM function. - algorithm_runner.run_typeless(inputs, output_file, args, container_settings) + algorithm_runner.run_typeless(inputs, output_file, args, container_settings, run_settings) def get_required_inputs(algorithm: str): diff --git a/spras/rwr.py b/spras/rwr.py index e85eef8a..38bc4534 100644 --- a/spras/rwr.py +++ b/spras/rwr.py @@ -5,6 +5,7 @@ from pydantic import BaseModel, ConfigDict from spras.config.container_schema import ProcessedContainerSettings +from spras.config.runs import RunSettings from spras.containers import prepare_volume, run_container_and_log from spras.dataset import Dataset from spras.interactome import ( @@ -52,8 +53,9 @@ def generate_inputs(data, filename_map): edges.to_csv(filename_map['network'],sep='|',index=False,columns=['Interactor1','Interactor2'],header=False) @staticmethod - def run(inputs, output_file, args, container_settings=None): + def run(inputs, output_file, args, container_settings=None, run_settings=None): if not container_settings: container_settings = ProcessedContainerSettings() + if not run_settings: run_settings = RunSettings() RWR.validate_required_run_args(inputs) with Path(inputs["network"]).open() as network_f: @@ -99,7 +101,8 @@ def run(inputs, output_file, args, container_settings=None): volumes, work_dir, out_dir, - container_settings) + container_settings, + run_settings.timeout) # Rename the primary output file to match the desired output filename output_edges = Path(out_dir, 'output.txt') diff --git a/spras/strwr.py b/spras/strwr.py index ea5bc274..b3fd86c3 100644 --- a/spras/strwr.py +++ b/spras/strwr.py @@ -4,6 +4,7 @@ from pydantic import BaseModel, ConfigDict from spras.config.container_schema import ProcessedContainerSettings +from spras.config.runs import RunSettings from spras.containers import prepare_volume, run_container_and_log from spras.dataset import Dataset from spras.interactome import ( @@ -52,8 +53,9 @@ def generate_inputs(data, filename_map): edges.to_csv(filename_map['network'],sep='|',index=False,columns=['Interactor1','Interactor2'],header=False) @staticmethod - def run(inputs, output_file, args, container_settings=None): + def run(inputs, output_file, args, container_settings=None, run_settings=None): if not container_settings: container_settings = ProcessedContainerSettings() + if not run_settings: run_settings = RunSettings() ST_RWR.validate_required_run_args(inputs) with Path(inputs["network"]).open() as network_f: @@ -104,7 +106,8 @@ def run(inputs, output_file, args, container_settings=None): volumes, work_dir, out_dir, - container_settings) + container_settings, + run_settings.timeout) # Rename the primary output file to match the desired output filename output_edges = Path(out_dir, 'output.txt') diff --git a/test/analysis/input/egfr.yaml b/test/analysis/input/egfr.yaml index c9ed5f73..46bbfe00 100644 --- a/test/analysis/input/egfr.yaml +++ b/test/analysis/input/egfr.yaml @@ -12,14 +12,16 @@ algorithms: include: true runs: run1: - k: [10, 20] + params: + k: [10, 20] - name: meo include: true runs: run1: - local_search: true - max_path_length: 3 - rand_restarts: 10 + params: + local_search: true + max_path_length: 3 + rand_restarts: 10 datasets: - data_dir: "input" edge_files: diff --git a/test/analysis/input/example.yaml b/test/analysis/input/example.yaml index 1a4514c0..e80097cf 100644 --- a/test/analysis/input/example.yaml +++ b/test/analysis/input/example.yaml @@ -12,22 +12,25 @@ algorithms: include: true runs: run1: - k: range(100,201,100) + params: + k: range(100,201,100) - name: "meo" include: true runs: run1: - max_path_length: [3] - local_search: ["Yes"] - rand_restarts: [10] + params: + max_path_length: [3] + local_search: ["Yes"] + rand_restarts: [10] - name: "mincostflow" include: true runs: run1: - flow: [1] # The flow must be an int - capacity: [1] + params: + flow: [1] # The flow must be an int + capacity: [1] - name: "allpairs" include: true diff --git a/test/errors/expected/error.json b/test/errors/expected/error.json new file mode 100644 index 00000000..cf0bb2c4 --- /dev/null +++ b/test/errors/expected/error.json @@ -0,0 +1 @@ +{"details": {"error_type": "timeout", "duration": 1}, "status": "error"} \ No newline at end of file diff --git a/test/errors/expected/success.json b/test/errors/expected/success.json new file mode 100644 index 00000000..0083a06a --- /dev/null +++ b/test/errors/expected/success.json @@ -0,0 +1 @@ +{"status": "success"} \ No newline at end of file diff --git a/test/errors/test_errors.py b/test/errors/test_errors.py new file mode 100644 index 00000000..56295be3 --- /dev/null +++ b/test/errors/test_errors.py @@ -0,0 +1,26 @@ +import filecmp +from pathlib import Path +from spras.errors import ArtifactError, ArtifactSuccess, TimeoutArtifactError, mark_error, is_error, is_success, artifact_info_from_file, mark_success + +OUTPUT_DIR = Path('test', 'errors', 'output') +EXPECTED_DIR = Path('test', 'errors', 'expected') + +class TestErrors: + def test_error(self): + OUTPUT_DIR.mkdir(exist_ok=True) + + artifact_error = TimeoutArtifactError(duration=1) + mark_error(OUTPUT_DIR / 'error.json', artifact_error) + assert filecmp.cmp(EXPECTED_DIR / 'error.json', OUTPUT_DIR / 'error.json', shallow=True) + assert is_error(OUTPUT_DIR / 'error.json') + assert not is_success(OUTPUT_DIR / 'error.json') + assert artifact_info_from_file(OUTPUT_DIR / 'error.json') == ArtifactError(details=artifact_error) + + def test_success(self): + OUTPUT_DIR.mkdir(exist_ok=True) + + mark_success(OUTPUT_DIR / 'success.json') + assert filecmp.cmp(EXPECTED_DIR / 'success.json', OUTPUT_DIR / 'success.json', shallow=True) + assert is_success(OUTPUT_DIR / 'success.json') + assert not is_error(OUTPUT_DIR / 'success.json') + assert artifact_info_from_file(OUTPUT_DIR / 'success.json') == ArtifactSuccess() diff --git a/test/generate-inputs/inputs/test_config.yaml b/test/generate-inputs/inputs/test_config.yaml index 33900bdb..a4aa1e81 100644 --- a/test/generate-inputs/inputs/test_config.yaml +++ b/test/generate-inputs/inputs/test_config.yaml @@ -11,40 +11,46 @@ algorithms: include: true runs: run1: - k: range(100,201,100) + params: + k: range(100,201,100) - name: "omicsintegrator1" include: true runs: run1: - b: [5, 6] - w: np.linspace(0,5,2) - d: 10 + params: + b: [5, 6] + w: np.linspace(0,5,2) + d: 10 - name: "omicsintegrator2" include: true runs: run1: - b: 4 - g: 0 + params: + b: 4 + g: 0 run2: - b: 2 - g: 3 + params: + b: 2 + g: 3 - name: "meo" include: true runs: run1: - max_path_length: 3 - local_search: true - rand_restarts: 10 + params: + max_path_length: 3 + local_search: true + rand_restarts: 10 - name: "mincostflow" include: true runs: run1: - flow: 1 # The flow must be an int - capacity: 1 + params: + flow: 1 # The flow must be an int + capacity: 1 - name: "allpairs" include: true @@ -53,8 +59,9 @@ algorithms: include: true runs: run1: - slice_threshold: 0.3 - module_threshold: 0.05 + params: + slice_threshold: 0.3 + module_threshold: 0.05 datasets: - # Labels can only contain letters, numbers, or underscores diff --git a/test/test_config.py b/test/test_config.py index 41551c38..9e7ff40c 100644 --- a/test/test_config.py +++ b/test/test_config.py @@ -66,27 +66,27 @@ def get_test_config(): "name": "omicsintegrator2", "include": True, "runs": { - "strings": {"dummy_mode": ["terminals", "others"], "b": 3}, + "strings": {"params": {"dummy_mode": ["terminals", "others"], "b": 3}}, # spacing in np.linspace is on purpose - "singleton_string_np_linspace": {"dummy_mode": "terminals", "b": "np.linspace(0, 5,2,)"}, - "str_array_np_logspace": {"dummy_mode": ["others", "all"], "g": "np.logspace(1,1)"} + "singleton_string_np_linspace": {"params": {"dummy_mode": "terminals", "b": "np.linspace(0, 5,2,)"}}, + "str_array_np_logspace": {"params": {"dummy_mode": ["others", "all"], "g": "np.logspace(1,1)"}} } }, { "name": "meo", "include": True, "runs": { - "numbersAndBoolsDuplicate": {"max_path_length": 1, "rand_restarts": [float(2.0), 3], "local_search": [True, False]}, - "numbersAndBool": {"max_path_length": 2, "rand_restarts": [float(2.0), 3], "local_search": [True]}, - "numbersAndBools": {"max_path_length": 1, "rand_restarts": [float(2.0), 3], "local_search": [True, False]}, - "boolArrTest": {"local_search": [True, False], "max_path_length": "range(1, 3)"} + "numbersAndBoolsDuplicate": {"params": {"max_path_length": 1, "rand_restarts": [float(2.0), 3], "local_search": [True, False]}}, + "numbersAndBool": {"params": {"max_path_length": 2, "rand_restarts": [float(2.0), 3], "local_search": [True]}}, + "numbersAndBools": {"params": {"max_path_length": 1, "rand_restarts": [float(2.0), 3], "local_search": [True, False]}}, + "boolArrTest": {"params": {"local_search": [True, False], "max_path_length": "range(1, 3)"}} } }, { "name": "mincostflow", "include": True, "runs": { - "int64artifact": {"flow": "np.arange(5, 7)", "capacity": [2, 3]} + "int64artifact": {"params": {"flow": "np.arange(5, 7)", "capacity": [2, 3]}} } }, ],