diff --git a/Snakefile b/Snakefile
index 3cd39ff8..7ebc84ca 100644
--- a/Snakefile
+++ b/Snakefile
@@ -1,12 +1,16 @@
import os
from spras import runner
import shutil
+import json
import yaml
+from pathlib import Path
+from spras.containers import TimeoutError
from spras.dataset import Dataset
from spras.evaluation import Evaluation
from spras.analysis import ml, summary, cytoscape
from spras.config.revision import detach_spras_revision
import spras.config.config as _config
+from spras.errors import mark_error, mark_success, is_error, TimeoutArtifactError
# Snakemake updated the behavior in the 6.5.0 release https://github.com/snakemake/snakemake/pull/1037
# and using the wrong separator prevents Snakemake from matching filenames to the rules that can produce them
@@ -44,11 +48,14 @@ def algo_has_mult_param_combos(algo):
algorithms_mult_param_combos = [algo for algo in algorithms if algo_has_mult_param_combos(algo)]
+# Gets the associated parameter hash out of a params wildcard
+def params_index(params_hash):
+ return params_hash.replace('params-', '')
+
# Get the parameter dictionary for the specified
# algorithm and parameter combination hash
def reconstruction_params(algorithm, params_hash):
- index = params_hash.replace('params-', '')
- return algorithm_params[algorithm][index]
+ return algorithm_params[algorithm][params_index(params_hash)]
# Log the parameter dictionary for this parameter configuration in a yaml file
def write_parameter_log(algorithm, param_label, logfile):
@@ -262,6 +269,10 @@ def collect_prepared_input(wildcards):
return prepared_inputs
+def filter_successful(files):
+ """Convenient function for filtering iterators by whether or not their items are error files."""
+ return [file for file in files if not is_error(file)]
+
# Run the pathway reconstruction algorithm
rule reconstruct:
input: collect_prepared_input
@@ -270,25 +281,48 @@ rule reconstruct:
# Overwriting files can happen because the pathway reconstruction algorithms often generate output files with the
# same name regardless of the inputs or parameters, and these aren't renamed until after the container command
# terminates
- output: pathway_file = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'raw-pathway.txt'])
+ output:
+ pathway_file = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'raw-pathway.txt']),
+ # Despite this being a 'log' file, we don't use the log directive as this rule doesn't actually throw errors.
+ artifact_info = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'artifact-log.json'])
+ params:
+ # Get the timeout from the config and use it as an input.
+ # TODO: This has unexpected behavior when this rule succeeds but the timeout extends,
+ # making this rule run again.
+ timeout = lambda wildcards: _config.config.algorithm_param_run_settings[params_index(wildcards.params)].timeout
run:
# Create a copy so that the updates are not written to the parameters logfile
- params = reconstruction_params(wildcards.algorithm, wildcards.params).copy()
+ algorithm_params = reconstruction_params(wildcards.algorithm, wildcards.params).copy()
# Declare the input files as a dictionary.
inputs = dict(zip(runner.get_required_inputs(detach_spras_revision(_config.config.immutable_files, wildcards.algorithm)), *{input}, strict=True))
# Remove the _spras_run_name parameter added for keeping track of the run name for parameters.yml
- if '_spras_run_name' in params:
- params.pop('_spras_run_name')
- runner.run(detach_spras_revision(_config.config.immutable_files, wildcards.algorithm), inputs, output.pathway_file, params, container_settings)
+ if '_spras_run_name' in algorithm_params:
+ algorithm_params.pop('_spras_run_name')
+ try:
+ runner.run(detach_spras_revision(_config.config.immutable_files, wildcards.algorithm), inputs, output.pathway_file, algorithm_params, container_settings, params.timeout)
+ mark_success(output.artifact_info)
+ except TimeoutError as err:
+ # We don't raise the error here (analogous to `--keep-going`, except we avoid unnecessarily re-running this rule.)
+ mark_error(output.artifact_info, TimeoutArtifactError(duration=params.timeout))
+ # and we touch pathway_file still: Snakemake doesn't have optional files, so we output a 'artifact info' file,
+ # which contains the status (success/failure) of specific Snakemake jobs.
+ # We filter for the successful files (such as ones that didn't time out) with the `filter_successful` function.
+ Path(output.pathway_file).touch()
# Original pathway reconstruction output to universal output
# Use PRRunner as a wrapper to call the algorithm-specific parse_output
rule parse_output:
input:
- raw_file = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'raw-pathway.txt']),
+ # We propagate up the artifact_info error if it exists.
+ artifact_info = rules.reconstruct.output.artifact_info,
+ raw_file = rules.reconstruct.output.pathway_file,
dataset_file = SEP.join([out_dir, 'dataset-{dataset}-merged.pickle'])
output: standardized_file = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'pathway.txt'])
run:
+ if is_error(input.artifact_info):
+ mark_error(output.standardized_file, artifact_info_from_file(input.artifact_info).error)
+ return
+
params = reconstruction_params(wildcards.algorithm, wildcards.params).copy()
params['dataset'] = input.dataset_file
runner.parse_output(detach_spras_revision(_config.config.immutable_files, wildcards.algorithm), input.raw_file, output.standardized_file, params)
@@ -310,7 +344,7 @@ rule viz_cytoscape:
output:
session = SEP.join([out_dir, '{dataset}-cytoscape.cys'])
run:
- cytoscape.run_cytoscape(input.pathways, output.session, container_settings)
+ cytoscape.run_cytoscape(filter_successful(input.pathways), output.session, container_settings)
# Write a single summary table for all pathways for each dataset
@@ -323,7 +357,7 @@ rule summary_table:
run:
# Load the node table from the pickled dataset file
node_table = Dataset.from_file(input.dataset_file).node_table
- summary_df = summary.summarize_networks(input.pathways, node_table, algorithm_params, algorithms_with_params)
+ summary_df = summary.summarize_networks(filter_successful(input.pathways), node_table, algorithm_params, algorithms_with_params)
summary_df.to_csv(output.summary_table, sep='\t', index=False)
# Cluster the output pathways for each dataset
@@ -339,7 +373,7 @@ rule ml_analysis:
hac_image_horizontal = SEP.join([out_dir, '{dataset}-ml', 'hac-horizontal.png']),
hac_clusters_horizontal = SEP.join([out_dir, '{dataset}-ml', 'hac-clusters-horizontal.txt']),
run:
- summary_df = ml.summarize_networks(input.pathways)
+ summary_df = ml.summarize_networks(filter_successful(input.pathways))
ml.hac_vertical(summary_df, output.hac_image_vertical, output.hac_clusters_vertical, **hac_params)
ml.hac_horizontal(summary_df, output.hac_image_horizontal, output.hac_clusters_horizontal, **hac_params)
ml.pca(summary_df, output.pca_image, output.pca_variance, output.pca_coordinates, **pca_params)
@@ -353,7 +387,7 @@ rule jaccard_similarity:
jaccard_similarity_matrix = SEP.join([out_dir, '{dataset}-ml', 'jaccard-matrix.txt']),
jaccard_similarity_heatmap = SEP.join([out_dir, '{dataset}-ml', 'jaccard-heatmap.png'])
run:
- summary_df = ml.summarize_networks(input.pathways)
+ summary_df = ml.summarize_networks(filter_successful(input.pathways))
ml.jaccard_similarity_eval(summary_df, output.jaccard_similarity_matrix, output.jaccard_similarity_heatmap)
@@ -364,7 +398,7 @@ rule ensemble:
output:
ensemble_network_file = SEP.join([out_dir,'{dataset}-ml', 'ensemble-pathway.txt'])
run:
- summary_df = ml.summarize_networks(input.pathways)
+ summary_df = ml.summarize_networks(filter_successful(input.pathways))
ml.ensemble_network(summary_df, output.ensemble_network_file)
# Returns all pathways for a specific algorithm
@@ -386,7 +420,7 @@ rule ml_analysis_aggregate_algo:
hac_image_horizontal = SEP.join([out_dir, '{dataset}-ml', '{algorithm}-hac-horizontal.png']),
hac_clusters_horizontal = SEP.join([out_dir, '{dataset}-ml', '{algorithm}-hac-clusters-horizontal.txt']),
run:
- summary_df = ml.summarize_networks(input.pathways)
+ summary_df = ml.summarize_networks(filter_successful(input.pathways))
ml.hac_vertical(summary_df, output.hac_image_vertical, output.hac_clusters_vertical, **hac_params)
ml.hac_horizontal(summary_df, output.hac_image_horizontal, output.hac_clusters_horizontal, **hac_params)
ml.pca(summary_df, output.pca_image, output.pca_variance, output.pca_coordinates, **pca_params)
@@ -398,7 +432,7 @@ rule ensemble_per_algo:
output:
ensemble_network_file = SEP.join([out_dir,'{dataset}-ml', '{algorithm}-ensemble-pathway.txt'])
run:
- summary_df = ml.summarize_networks(input.pathways)
+ summary_df = ml.summarize_networks(filter_successful(input.pathways))
ml.ensemble_network(summary_df, output.ensemble_network_file)
# Calculated Jaccard similarity between output pathways for each dataset per algorithm
@@ -409,7 +443,7 @@ rule jaccard_similarity_per_algo:
jaccard_similarity_matrix = SEP.join([out_dir, '{dataset}-ml', '{algorithm}-jaccard-matrix.txt']),
jaccard_similarity_heatmap = SEP.join([out_dir, '{dataset}-ml', '{algorithm}-jaccard-heatmap.png'])
run:
- summary_df = ml.summarize_networks(input.pathways)
+ summary_df = ml.summarize_networks(filter_successful(input.pathways))
ml.jaccard_similarity_eval(summary_df, output.jaccard_similarity_matrix, output.jaccard_similarity_heatmap)
# Return the gold standard pickle file for a specific gold standard
@@ -440,7 +474,7 @@ rule evaluation_pr_per_pathways:
node_pr_png = SEP.join([out_dir, '{dataset_gold_standard_pair}-eval', 'pr-per-pathway-nodes.png']),
run:
node_table = Evaluation.from_file(input.node_gold_standard_file).node_table
- pr_df = Evaluation.node_precision_and_recall(input.pathways, node_table)
+ pr_df = Evaluation.node_precision_and_recall(filter_successful(input.pathways), node_table)
Evaluation.precision_and_recall_per_pathway(pr_df, output.node_pr_file, output.node_pr_png)
# Returns all pathways for a specific algorithm and dataset
@@ -459,7 +493,7 @@ rule evaluation_per_algo_pr_per_pathways:
node_pr_png = SEP.join([out_dir, '{dataset_gold_standard_pair}-eval', 'pr-per-pathway-for-{algorithm}-nodes.png']),
run:
node_table = Evaluation.from_file(input.node_gold_standard_file).node_table
- pr_df = Evaluation.node_precision_and_recall(input.pathways, node_table)
+ pr_df = Evaluation.node_precision_and_recall(filter_successful(input.pathways), node_table)
Evaluation.precision_and_recall_per_pathway(pr_df, output.node_pr_file, output.node_pr_png, include_aggregate_algo_eval)
# Return pathway summary file per dataset
diff --git a/config/config.yaml b/config/config.yaml
index fc718e3c..b0e1df94 100644
--- a/config/config.yaml
+++ b/config/config.yaml
@@ -73,65 +73,75 @@ algorithms:
include: true
runs:
run1:
- k: range(100,201,100)
+ params:
+ k: range(100,201,100)
- name: "omicsintegrator1"
include: true
runs:
run1:
- b: [5, 6]
- w: np.linspace(0,5,2)
- d: 10
- dummy_mode: "file" # Or "terminals", "all", "others"
+ params:
+ b: [5, 6]
+ w: np.linspace(0,5,2)
+ d: 10
+ dummy_mode: "file" # Or "terminals", "all", "others"
- name: "omicsintegrator2"
include: true
runs:
run1:
- b: 4
- g: 0
+ params:
+ b: 4
+ g: 0
run2:
- b: 2
- g: 3
+ params:
+ b: 2
+ g: 3
- name: "meo"
include: true
runs:
run1:
- max_path_length: 3
- local_search: true
- rand_restarts: 10
+ params:
+ max_path_length: 3
+ local_search: true
+ rand_restarts: 10
- name: "mincostflow"
include: true
runs:
run1:
- flow: 1
- capacity: 1
+ params:
+ flow: 1
+ capacity: 1
- name: "allpairs"
include: true
+ timeout: 1d
- name: "domino"
include: true
runs:
run1:
- slice_threshold: 0.3
- module_threshold: 0.05
+ params:
+ slice_threshold: 0.3
+ module_threshold: 0.05
- name: "strwr"
include: true
runs:
run1:
- alpha: [0.85]
- threshold: [100, 200]
+ params:
+ alpha: [0.85]
+ threshold: [100, 200]
- name: "rwr"
include: true
runs:
run1:
- alpha: [0.85]
- threshold: [100, 200]
+ params:
+ alpha: [0.85]
+ threshold: [100, 200]
- name: "bowtiebuilder"
include: true
@@ -140,12 +150,14 @@ algorithms:
include: true
runs:
run1:
- gamma: [10]
+ params:
+ gamma: [10]
- name: "diamond"
include: true
runs:
run1:
- n: 1
+ params:
+ n: 1
# Here we specify which pathways to run and other file location information.
# DataLoader.py can currently only load a single dataset
diff --git a/config/egfr.yaml b/config/egfr.yaml
index b93c593c..623b0121 100644
--- a/config/egfr.yaml
+++ b/config/egfr.yaml
@@ -31,77 +31,89 @@ algorithms:
include: true
runs:
run1:
- k:
- - 10
- - 20
- - 70
+ params:
+ k:
+ - 10
+ - 20
+ - 70
- name: omicsintegrator1
include: true
runs:
run1:
- b:
- - 0.55
- - 2
- - 10
- d: 10
- g: 1e-3
- r: 0.01
- w: 0.1
- mu: 0.008
- dummy_mode: ["file"]
+ params:
+ b:
+ - 0.55
+ - 2
+ - 10
+ d: 10
+ g: 1e-3
+ r: 0.01
+ w: 0.1
+ mu: 0.008
+ dummy_mode: ["file"]
- name: omicsintegrator2
include: true
runs:
run1:
- b: 4
- g: 0
+ params:
+ b: 4
+ g: 0
run2:
- b: 2
- g: 3
+ params:
+ b: 2
+ g: 3
- name: meo
include: true
runs:
run1:
- local_search: true
- max_path_length: 3
- rand_restarts: 10
+ params:
+ local_search: true
+ max_path_length: 3
+ rand_restarts: 10
run2:
- local_search: false
- max_path_length: 2
- rand_restarts: 10
+ params:
+ local_search: false
+ max_path_length: 2
+ rand_restarts: 10
- name: allpairs
include: true
- name: domino
include: true
runs:
run1:
- slice_threshold: 0.3
- module_threshold: 0.05
+ params:
+ slice_threshold: 0.3
+ module_threshold: 0.05
- name: mincostflow
include: true
runs:
run1:
- capacity: 15
- flow: 80
+ params:
+ capacity: 15
+ flow: 80
run2:
- capacity: 1
- flow: 6
+ params:
+ capacity: 1
+ flow: 6
run3:
- capacity: 5
- flow: 60
+ params:
+ capacity: 5
+ flow: 60
- name: "strwr"
include: true
runs:
run1:
- alpha: [0.85]
- threshold: [100, 200]
+ params:
+ alpha: [0.85]
+ threshold: [100, 200]
- name: "rwr"
include: true
runs:
run1:
- alpha: [0.85]
- threshold: [100, 200]
+ params:
+ alpha: [0.85]
+ threshold: [100, 200]
- name: "bowtiebuilder"
include: false
diff --git a/docs/design/errors.rst b/docs/design/errors.rst
new file mode 100644
index 00000000..4d436601
--- /dev/null
+++ b/docs/design/errors.rst
@@ -0,0 +1,43 @@
+########
+ Errors
+########
+
+By default, whenever SPRAS runs into a container error (i.e. an internal
+algorithm error), the full workflow will fail. However, there are
+certain designated errors where we don't want this to be the case.
+
+Due to the following design constraints:
+
+- Snakemake does not have support for such errors (the closest being
+ ``--keep-going``, which unnecessarily runs failed runs)
+- SPRAS occasionally outputs empty files as genuine output
+- We need to log errors that happen for user knowledge
+
+we opt to use an ``artifact-info.json`` file, which keeps track of the
+success/failure status at certain failable parts of the workflow. This
+file contains whether or not this part of the workflow succeeded, and if
+it failed, how it failed.
+
+The ``artifact-info.json`` stores a JSON object, containing:
+
+- The key ``status``, which is either the value ``success`` or
+ ``error``, depending on what happened in the workflow.
+
+- If ``status`` is ``error``, we store associated error details in the
+ ``details`` key, which contains an object:
+
+ - The ``details`` object varies per error in the form of a tagged
+ union: they are differentiated by the ``type`` key. We describe
+ each error down below.
+
+*************
+ Error Types
+*************
+
+With the above schema, we detail the ``details`` key below.
+
+Timeout
+=======
+
+Timeout has ``type: "timeout"``, with a single key ``duration``, which
+describes the ``timeout`` value associated to that run.
diff --git a/docs/fordevs/spras.config.rst b/docs/fordevs/spras.config.rst
index f7161447..a0f6f2e9 100644
--- a/docs/fordevs/spras.config.rst
+++ b/docs/fordevs/spras.config.rst
@@ -6,6 +6,15 @@
Submodules
************
+********************************
+ spras.config.algorithms module
+********************************
+
+.. automodule:: spras.config.algorithms
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
****************************
spras.config.config module
****************************
@@ -15,6 +24,33 @@
:undoc-members:
:show-inheritance:
+**************************************
+ spras.config.container_schema module
+**************************************
+
+.. automodule:: spras.config.container_schema
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
+*****************************
+ spras.config.dataset module
+*****************************
+
+.. automodule:: spras.config.dataset
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
+******************************
+ spras.config.revision module
+******************************
+
+.. automodule:: spras.config.revision
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
****************************
spras.config.schema module
****************************
diff --git a/docs/fordevs/spras.rst b/docs/fordevs/spras.rst
index 8bd5060f..d63f52e9 100644
--- a/docs/fordevs/spras.rst
+++ b/docs/fordevs/spras.rst
@@ -52,6 +52,15 @@
:undoc-members:
:show-inheritance:
+**********************
+ spras.diamond module
+**********************
+
+.. automodule:: spras.diamond
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
*********************
spras.domino module
*********************
@@ -61,6 +70,15 @@
:undoc-members:
:show-inheritance:
+*********************
+ spras.errors module
+*********************
+
+.. automodule:: spras.errors
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
*************************
spras.evaluation module
*************************
@@ -142,6 +160,15 @@
:undoc-members:
:show-inheritance:
+************************
+ spras.profiling module
+************************
+
+.. automodule:: spras.profiling
+ :members:
+ :undoc-members:
+ :show-inheritance:
+
**************************
spras.responsenet module
**************************
diff --git a/docs/index.rst b/docs/index.rst
index c8aae5e6..42fc92f8 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -40,6 +40,7 @@ reconstruction methods (PRMs) to omics data.
output
htcondor
+ timeout
.. toctree::
:maxdepth: 1
@@ -62,6 +63,12 @@ reconstruction methods (PRMs) to omics data.
contributing/patching
contributing/design
+.. toctree::
+ :maxdepth: 1
+ :caption: Internal Designs
+
+ design/errors
+
.. toctree::
:maxdepth: 1
:caption: Tutorials
diff --git a/docs/timeout.rst b/docs/timeout.rst
new file mode 100644
index 00000000..e58b2e50
--- /dev/null
+++ b/docs/timeout.rst
@@ -0,0 +1,60 @@
+##########
+ Timeouts
+##########
+
+The SPRAS global configuration can take optional per-algorithm timeouts.
+For example, to give a specific run of the PathLinker algorithm a 1 day
+timeout:
+
+.. code:: yaml
+
+ - name: "pathlinker"
+ include: true
+ runs:
+ run1:
+ timeout: 1d
+ params:
+ k: 200
+
+The timeout string parsing is delegated to `pytimeparse
+`__ (examples linked here). This
+allows for more complicated timeout strings, such as ``3d2h32m``.
+
+If ``timeout`` is not specified, the algorithm will never be interrupted
+due to running too long.
+
+**NOTE**: This feature only works with docker and apptainer/singularity
+at the time of writing, not dsub.
+
+*********************
+ Configuration notes
+*********************
+
+Since ``timeout`` is a run parameter, it can also be moved to the top
+level of an algorithm configuration, where that value will become the
+default unless otherwise specified in a specific run.
+
+.. code:: yaml
+
+ - name: "pathlinker"
+ include: true
+ timeout: 1d
+ runs:
+ run1:
+ # this uses timeout: 2d
+ timeout: 2d
+ params:
+ k: 200
+ run2:
+ # this uses timeout: 1d
+ params:
+ k: 100
+
+This is also useful for algorithms which take in no parameters, such as
+``allpairs``:
+
+.. code:: yaml
+
+ - name: "allpairs"
+ include: true
+ timeout: 1d
diff --git a/docs/tutorial/beginner.rst b/docs/tutorial/beginner.rst
index a0846666..f4de401d 100644
--- a/docs/tutorial/beginner.rst
+++ b/docs/tutorial/beginner.rst
@@ -161,13 +161,15 @@ Algorithms
params:
include: true
run1:
- b: 0.1
- d: 10
- g: 1e-3
+ params:
+ b: 0.1
+ d: 10
+ g: 1e-3
run2:
- b: [0.55, 2, 10]
- d: [10, 20]
- g: 1e-3
+ params:
+ b: [0.55, 2, 10]
+ d: [10, 20]
+ g: 1e-3
When defining an algorithm in the configuration file, its name must
match one of the supported SPRAS algorithms. Each algorithm includes an
diff --git a/environment.yml b/environment.yml
index 4361834a..7fe14408 100644
--- a/environment.yml
+++ b/environment.yml
@@ -15,6 +15,7 @@ dependencies:
- scikit-learn=1.7.0
- seaborn=0.13.2
- spython=0.3.14
+ - pytimeparse=1.1.8
# conda-specific for dsub
- python-dateutil=2.9.0
diff --git a/pyproject.toml b/pyproject.toml
index bfc602c6..38074f77 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -30,6 +30,7 @@ dependencies = [
"scikit-learn==1.7.0",
"seaborn==0.13.2",
"spython==0.3.14",
+ "pytimeparse==1.1.8",
# toolchain deps
"pip==25.3",
diff --git a/spras/allpairs.py b/spras/allpairs.py
index 51ba5432..b4e542ad 100644
--- a/spras/allpairs.py
+++ b/spras/allpairs.py
@@ -2,6 +2,7 @@
from pathlib import Path
from spras.config.container_schema import ProcessedContainerSettings
+from spras.config.runs import RunSettings
from spras.config.util import Empty
from spras.containers import prepare_volume, run_container_and_log
from spras.dataset import Dataset
@@ -72,8 +73,9 @@ def generate_inputs(data: Dataset, filename_map):
header=["#Interactor1", "Interactor2", "Weight"])
@staticmethod
- def run(inputs, output_file, args=None, container_settings=None):
+ def run(inputs, output_file, args=None, container_settings=None, run_settings=None):
if not container_settings: container_settings = ProcessedContainerSettings()
+ if not run_settings: run_settings = RunSettings()
AllPairs.validate_required_run_args(inputs)
work_dir = '/apsp'
@@ -109,7 +111,8 @@ def run(inputs, output_file, args=None, container_settings=None):
volumes,
work_dir,
out_dir,
- container_settings)
+ container_settings,
+ run_settings.timeout)
@staticmethod
def parse_output(raw_pathway_file, standardized_pathway_file, params):
diff --git a/spras/analysis/cytoscape.py b/spras/analysis/cytoscape.py
index e8489950..6eadfadd 100644
--- a/spras/analysis/cytoscape.py
+++ b/spras/analysis/cytoscape.py
@@ -58,5 +58,6 @@ def run_cytoscape(pathways: List[Union[str, PurePath]], output_file: str, contai
# (https://github.com/Reed-CompBio/spras/pull/390/files#r2485100875)
None,
container_settings,
+ None,
env)
rmtree(cytoscape_output_dir)
diff --git a/spras/btb.py b/spras/btb.py
index 63b6c7ed..6ccfd3d5 100644
--- a/spras/btb.py
+++ b/spras/btb.py
@@ -1,6 +1,7 @@
from pathlib import Path
from spras.config.container_schema import ProcessedContainerSettings
+from spras.config.runs import RunSettings
from spras.config.util import Empty
from spras.containers import prepare_volume, run_container_and_log
from spras.interactome import (
@@ -61,8 +62,9 @@ def generate_inputs(data, filename_map):
# Skips parameter validation step
@staticmethod
- def run(inputs, output_file, args=None, container_settings=None):
+ def run(inputs, output_file, args=None, container_settings=None, run_settings=None):
if not container_settings: container_settings = ProcessedContainerSettings()
+ if not run_settings: run_settings = RunSettings()
BowTieBuilder.validate_required_run_args(inputs)
# Tests for pytest (docker container also runs this)
@@ -119,7 +121,8 @@ def run(inputs, output_file, args=None, container_settings=None):
volumes,
work_dir,
out_dir,
- container_settings)
+ container_settings,
+ run_settings.timeout)
# Output is already written to raw-pathway.txt file
diff --git a/spras/config/algorithms.py b/spras/config/algorithms.py
index 552fbc4e..06bdc1cc 100644
--- a/spras/config/algorithms.py
+++ b/spras/config/algorithms.py
@@ -17,6 +17,7 @@
create_model,
)
+from spras.config.runs import RunSettings
from spras.runner import algorithms
# This contains the dynamically generated algorithm schema for use in `schema.py`
@@ -100,12 +101,6 @@ def construct_algorithm_model(name: str, model: type[BaseModel]) -> type[BaseMod
- Ranges and other convenient calls are expanded (see `python_evalish_coerce`)
"""
- # Get the default model instance by trying to serialize the empty dictionary
- try:
- model_default = model.model_validate({})
- except ValidationError:
- model_default = None
-
# First, we need to take our 'model' and coerce it to permit parameter combinations.
# This assumes that all of the keys are flattened, so we only get a structure like so:
# class AlgorithmParams(BaseModel):
@@ -151,12 +146,30 @@ def construct_algorithm_model(name: str, model: type[BaseModel]) -> type[BaseMod
# Pass this as kwargs to create_model, which usually takes in parameters field_name=type.
# We do need to cast create_model, since otherwise the type-checker complains that we may
# have had a key that starts with __ in mapped_list_fields. The above assertion prevents this.
- run_model = (cast(Any, create_model))(
- f'{name}RunModel',
+ params_model = (cast(Any, create_model))(
+ f'{name}ParamModel',
__config__=ConfigDict(extra='forbid'),
**mapped_list_field
)
+ # Get the default model instance by trying to serialize the empty dictionary
+ try:
+ params_model_default = params_model.model_validate({})
+ except ValidationError:
+ params_model_default = None
+
+ # Then, we create a wrapping `run_model` which contains our params_model,
+ # as well as any associated options with an individual run.
+ run_model = create_model(
+ f'{name}RunModel',
+ params=(params_model, params_model_default),
+ __base__=RunSettings,
+ __config__=ConfigDict(extra='forbid')
+ )
+
+ # We use `model_validate` instead of the `run_model` constructor since `run_model` is based off of `RunSettings`
+ run_model_default = None if params_model_default is None else run_model.model_validate({"params": params_model_default})
+
# Here is an example of how this would look like inside config.yaml
# name: pathlinker
# include: true
@@ -174,8 +187,11 @@ def construct_algorithm_model(name: str, model: type[BaseModel]) -> type[BaseMod
# include: true
# will run, despite there being no entries in `runs`.
# (create_model entries take in either a type or (type, default)).
- runs=dict[str, run_model] if model_default is None else (dict[str, run_model], {"default": model_default}),
- __config__=ConfigDict(extra='forbid')
+ runs=dict[str, run_model] if run_model_default is None else (dict[str, run_model], {"default": run_model_default}),
+ __config__=ConfigDict(extra='forbid'),
+ # Note that both entire algorithms and their runs inherit from `RunSettings`, to allow default runs such as
+ # `allpairs` to have run-specific settings (e.g. allpairs with timeout.)
+ __base__=RunSettings
)
algorithm_models: list[type[BaseModel]] = [construct_algorithm_model(name, model.get_params_generic()) for name, model in algorithms.items()]
diff --git a/spras/config/config.py b/spras/config/config.py
index 0a4670a4..dbc68b5b 100644
--- a/spras/config/config.py
+++ b/spras/config/config.py
@@ -24,6 +24,7 @@
from spras.config.container_schema import ProcessedContainerSettings
from spras.config.revision import attach_spras_revision, spras_revision
+from spras.config.runs import RunSettings
from spras.config.schema import DatasetSchema, RawConfig
from spras.util import LoosePathLike, NpHashEncoder, hash_params_sha1_base32
@@ -61,6 +62,8 @@ def __init__(self, raw_config: dict[str, Any]):
self.hash_length = parsed_raw_config.hash_length
# Container settings used by PRMs.
self.container_settings = ProcessedContainerSettings.from_container_settings(parsed_raw_config.containers, self.hash_length)
+ # Dictionary of parameter hashes to their respective run settings
+ self.algorithm_param_run_settings: dict[str, RunSettings] = dict()
# A nested dict mapping algorithm names to dicts that map parameter hashes to parameter combinations.
# Only includes algorithms that are set to be run with 'include: true'.
self.algorithm_params: dict[str, dict[str, Any]] = dict()
@@ -182,7 +185,7 @@ def process_algorithms(self, raw_config: RawConfig):
# We create the product of all param combinations for each run
param_name_list = []
# We convert our run parameters to a dictionary, allowing us to iterate over it
- run_subscriptable = vars(runs[run_name])
+ run_subscriptable = vars(runs[run_name].params)
for param in run_subscriptable:
param_name_list.append(param)
# this is guaranteed to be list[Any] by algorithms.py
@@ -218,6 +221,11 @@ def process_algorithms(self, raw_config: RawConfig):
self.algorithm_params[alg.name][params_hash] = run_dict
+ # We finalize by handling any associated information to each parameter hash.
+ self.algorithm_param_run_settings[params_hash] = RunSettings(
+ timeout=runs[run_name].timeout or alg.timeout
+ )
+
def process_analysis(self, raw_config: RawConfig):
if not raw_config.analysis:
return
diff --git a/spras/config/runs.py b/spras/config/runs.py
new file mode 100644
index 00000000..5d33568c
--- /dev/null
+++ b/spras/config/runs.py
@@ -0,0 +1,24 @@
+from typing import Annotated, Optional
+
+from pydantic import BaseModel, BeforeValidator, ConfigDict
+from pytimeparse import parse
+
+
+def validate_duration(value):
+ if isinstance(value, int): return value
+ parsed_duration = parse(value, granularity='seconds')
+ if not parsed_duration: raise RuntimeError(f"Encountered unparsable duration string '{value}'.")
+ return parsed_duration
+
+PyDateTimeDuration = Annotated[
+ int,
+ BeforeValidator(validate_duration)
+]
+
+class RunSettings(BaseModel):
+ """All of the non-parameter settings associated with a run."""
+
+ timeout: Optional[PyDateTimeDuration] = None
+ """The associated timeout with a run, parsed with `pytimeparse`."""
+
+ model_config = ConfigDict(extra='forbid', use_attribute_docstrings=True)
diff --git a/spras/containers.py b/spras/containers.py
index 124b9741..19b7ee6e 100644
--- a/spras/containers.py
+++ b/spras/containers.py
@@ -1,3 +1,4 @@
+import datetime
import os
import platform
import re
@@ -8,6 +9,7 @@
import docker
import docker.errors
+import requests
from spras.config.container_schema import ProcessedContainerSettings
from spras.logging import indent
@@ -166,6 +168,20 @@ def streams_contain(self, needle: str):
def __str__(self):
return self.message
+class TimeoutError(RuntimeError):
+ """Raises when a function times out."""
+ timeout: int
+ message: str
+
+ def __init__(self, timeout: int, *args):
+ self.timeout = timeout
+ self.message = f"Timed out after {datetime.timedelta(seconds=timeout)}."
+
+ super(TimeoutError, self).__init__(timeout, *args)
+
+ def __str__(self):
+ return self.message
+
def env_to_items(environment: dict[str, str]) -> Iterator[str]:
"""
Turns an environment variable dictionary to KEY=VALUE pairs.
@@ -176,7 +192,17 @@ def env_to_items(environment: dict[str, str]) -> Iterator[str]:
# TODO consider a better default environment variable
# Follow docker-py's naming conventions (https://docker-py.readthedocs.io/en/stable/containers.html)
# Technically the argument is an image, not a container, but we use container here.
-def run_container(container_suffix: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, out_dir: str | os.PathLike, container_settings: ProcessedContainerSettings, environment: Optional[dict[str, str]] = None, network_disabled = False):
+def run_container(
+ container_suffix: str,
+ command: List[str],
+ volumes: List[Tuple[PurePath, PurePath]],
+ working_dir: str,
+ out_dir: str | os.PathLike,
+ container_settings: ProcessedContainerSettings,
+ timeout: Optional[int],
+ environment: Optional[dict[str, str]] = None,
+ network_disabled = False
+):
"""
Runs a command in the container using Singularity or Docker
@param container_suffix: name of the DockerHub container without the 'docker://' prefix
@@ -185,6 +211,7 @@ def run_container(container_suffix: str, command: List[str], volumes: List[Tuple
@param working_dir: the working directory in the container
@param container_settings: the settings to use to run the container
@param out_dir: output directory for the rule's artifacts. Only passed into run_container_singularity for the purpose of profiling.
+ @param timeout: the timeout (in seconds), throwing a TimeoutException if the timeout is reached.
@param environment: environment variables to set in the container
@param network_disabled: Disables the network on the container. Only works for docker for now. This acts as a 'runtime assertion' that a container works w/o networking.
@return: output from Singularity execute or Docker run
@@ -193,15 +220,25 @@ def run_container(container_suffix: str, command: List[str], volumes: List[Tuple
container = container_settings.prefix + "/" + container_suffix
if normalized_framework == 'docker':
- return run_container_docker(container, command, volumes, working_dir, environment, network_disabled)
+ return run_container_docker(container, command, volumes, working_dir, environment, timeout, network_disabled)
elif normalized_framework == 'singularity' or normalized_framework == "apptainer":
- return run_container_singularity(container, command, volumes, working_dir, out_dir, container_settings, environment)
+ return run_container_singularity(container, command, volumes, working_dir, out_dir, container_settings, environment, timeout)
elif normalized_framework == 'dsub':
return run_container_dsub(container, command, volumes, working_dir, environment)
else:
raise ValueError(f'{container_settings.framework} is not a recognized container framework. Choose "docker", "dsub", "apptainer", or "singularity".')
-def run_container_and_log(name: str, container_suffix: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, out_dir: str | os.PathLike, container_settings: ProcessedContainerSettings, environment: Optional[dict[str, str]] = None, network_disabled=False):
+def run_container_and_log(
+ name: str,
+ container_suffix: str,
+ command: List[str],
+ volumes: List[Tuple[PurePath, PurePath]],
+ working_dir: str, out_dir: str | os.PathLike,
+ container_settings: ProcessedContainerSettings,
+ timeout: Optional[int],
+ environment: Optional[dict[str, str]] = None,
+ network_disabled=False
+):
"""
Runs a command in the container using Singularity or Docker with associated pretty printed messages.
@param name: the display name of the running container for logging purposes
@@ -210,6 +247,7 @@ def run_container_and_log(name: str, container_suffix: str, command: List[str],
@param volumes: a list of volumes to mount where each item is a (source, destination) tuple
@param working_dir: the working directory in the container
@param container_settings: the container settings to use
+ @param timeout: the timeout (in seconds), throwing a TimeoutException if the timeout is reached.
@param environment: environment variables to set in the container
@param network_disabled: Disables the network on the container. Only works for docker for now. This acts as a 'runtime assertion' that a container works w/o networking.
@return: output from Singularity execute or Docker run
@@ -219,7 +257,17 @@ def run_container_and_log(name: str, container_suffix: str, command: List[str],
print('Running {} on container framework "{}" on env {} with command: {}'.format(name, container_settings.framework, list(env_to_items(environment)), ' '.join(command)), flush=True)
try:
- out = run_container(container_suffix=container_suffix, command=command, volumes=volumes, working_dir=working_dir, out_dir=out_dir, container_settings=container_settings, environment=environment, network_disabled=network_disabled)
+ out = run_container(
+ container_suffix=container_suffix,
+ command=command,
+ volumes=volumes,
+ working_dir=working_dir,
+ out_dir=out_dir,
+ container_settings=container_settings,
+ timeout=timeout,
+ environment=environment,
+ network_disabled=network_disabled
+ )
if out is not None:
if isinstance(out, list):
out = ''.join(out)
@@ -250,7 +298,15 @@ def run_container_and_log(name: str, container_suffix: str, command: List[str],
raise ContainerError(message, err.exit_status, stdout, stderr) from None
# TODO any issue with creating a new client each time inside this function?
-def run_container_docker(container: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, environment: Optional[dict[str, str]] = None, network_disabled=False):
+def run_container_docker(
+ container: str,
+ command: List[str],
+ volumes: List[Tuple[PurePath, PurePath]],
+ working_dir: str,
+ environment: Optional[dict[str, str]] = None,
+ timeout: Optional[int] = None,
+ network_disabled=False
+):
"""
Runs a command in the container using Docker.
Attempts to automatically correct file owner and group for new files created by the container, setting them to the
@@ -261,6 +317,8 @@ def run_container_docker(container: str, command: List[str], volumes: List[Tuple
@param volumes: a list of volumes to mount where each item is a (source, destination) tuple
@param working_dir: the working directory in the container
@param environment: environment variables to set in the container
+ @param timeout: the timeout (in seconds), throwing a TimeoutException if the timeout is reached.
+ @param network_disabled: if enabled, disables the underlying network: useful when containers don't fetch any online resources.
@return: output from Docker run, or will error if the container errored.
"""
@@ -290,13 +348,28 @@ def run_container_docker(container: str, command: List[str], volumes: List[Tuple
bind_paths = [f'{prepare_path_docker(src)}:{dest}' for src, dest in volumes]
- out = client.containers.run(container,
- command,
- stderr=True,
- volumes=bind_paths,
- working_dir=working_dir,
- network_disabled=network_disabled,
- environment=environment).decode('utf-8')
+ # We detach the container, allowing dockerpy to return a
+ # `Container` object for our further use. This is currently only
+ # to set docker-based container timeouts.
+ container_obj = client.containers.run(
+ container,
+ command,
+ volumes=bind_paths,
+ working_dir=working_dir,
+ network_disabled=network_disabled,
+ environment=environment,
+ detach=True
+ )
+
+ try:
+ container_obj.wait(timeout=timeout)
+ except requests.exceptions.ReadTimeout as err:
+ container_obj.stop()
+ client.close()
+ if timeout: raise TimeoutError(timeout) from err
+ else: raise RuntimeError("Timeout error but no timeout specified. Please file an issue with this error and stacktrace at https://github.com/Reed-CompBio/spras/issues/new.") from None
+
+ out = container_obj.attach(stderr=True).decode('utf-8')
# TODO does this cleanup need to still run even if there was an error in the above run command?
# On Unix, files written by the above Docker run command will be owned by root and cannot be modified
@@ -345,7 +418,16 @@ def run_container_docker(container: str, command: List[str], volumes: List[Tuple
return out
-def run_container_singularity(container: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, out_dir: str, config: ProcessedContainerSettings, environment: Optional[dict[str, str]] = None):
+def run_container_singularity(
+ container: str,
+ command: List[str],
+ volumes: List[Tuple[PurePath, PurePath]],
+ working_dir: str,
+ out_dir: str | os.PathLike,
+ config: ProcessedContainerSettings,
+ environment: Optional[dict[str, str]] = None,
+ timeout: Optional[int] = None,
+):
"""
Runs a command in the container using Singularity.
Only available on Linux.
@@ -355,6 +437,7 @@ def run_container_singularity(container: str, command: List[str], volumes: List[
@param working_dir: the working directory in the container
@param out_dir: output directory for the rule's artifacts -- used here to store profiling data
@param environment: environment variable to set in the container
+ @param timeout: the timeout (in seconds), throwing a TimeoutException if the timeout is reached.
@return: output from Singularity execute
"""
@@ -417,37 +500,44 @@ def run_container_singularity(container: str, command: List[str], volumes: List[
# If not using the expanded sandbox image, we still need to prepend the docker:// prefix
# so apptainer knows to pull and convert the image format from docker to apptainer.
image_to_run = expanded_image if expanded_image else "docker://" + container
- if config.enable_profiling:
- # We won't end up using the spython client if profiling is enabled because
- # we need to run everything manually to set up the cgroup
- # Build the apptainer run command, which gets passed to the cgroup wrapper script
- singularity_cmd = [
- "apptainer", "exec"
- ]
- for bind in bind_paths:
- singularity_cmd.extend(["--bind", bind])
- singularity_cmd.extend(singularity_options)
- singularity_cmd.append(image_to_run)
- singularity_cmd.extend(command)
+ # We won't end up using the spython client if profiling or timeout is enabled because
+ # we need to run everything manually to set up the cgroup and add the timeout command as a prefix.
+ # Build the apptainer run command, which gets passed to the cgroup wrapper script
+ cmd = [
+ "apptainer", "exec"
+ ]
+ for bind in bind_paths:
+ cmd.extend(["--bind", bind])
+ cmd.extend(singularity_options)
+ cmd.append(str(image_to_run))
+ cmd.extend(command)
+
+ my_cgroup: Optional[str] = None
+ if config.enable_profiling:
my_cgroup = create_peer_cgroup()
# The wrapper script is packaged with spras, and should be located in the same directory
# as `containers.py`.
wrapper = os.path.join(os.path.dirname(__file__), "cgroup_wrapper.sh")
- cmd = [wrapper, my_cgroup] + singularity_cmd
- proc = subprocess.run(cmd, capture_output=True, text=True, stderr=subprocess.STDOUT)
+ cmd = [wrapper, my_cgroup] + cmd
+ if timeout is not None:
+ cmd = ["timeout", f"{timeout}s"] + cmd
+ proc = subprocess.run(cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+
+ # As per unix `timeout`, this is the status if the command times out and --preserve-status is not initially specified
+ # (where the latter above holds).
+ if proc.returncode == 124:
+ if timeout is not None:
+ raise TimeoutError(timeout)
+ else:
+ raise RuntimeError("Timeout return code occurred, yet `timeout` wasn't specified. " + \
+ "Please file an issue with this error and stacktrace at https://github.com/Reed-CompBio/spras/issues/new.")
+ if my_cgroup is not None:
print("Reading memory and CPU stats from cgroup")
- create_apptainer_container_stats(my_cgroup, out_dir)
+ create_apptainer_container_stats(my_cgroup, str(out_dir))
- result = proc.stdout
- else:
- result = Client.execute(
- image=image_to_run,
- command=command,
- options=singularity_options,
- bind=bind_paths
- )
+ result = proc.stdout
return result
diff --git a/spras/diamond.py b/spras/diamond.py
index e8d5ed7d..65f1ac9b 100644
--- a/spras/diamond.py
+++ b/spras/diamond.py
@@ -3,6 +3,7 @@
from pydantic import BaseModel, ConfigDict
from spras.config.container_schema import ProcessedContainerSettings
+from spras.config.runs import RunSettings
from spras.containers import ContainerError, prepare_volume, run_container_and_log
from spras.dataset import Dataset
from spras.interactome import (
@@ -63,8 +64,9 @@ def generate_inputs(data, filename_map):
edges_df.to_csv(filename_map["network"], columns=["Interactor1", "Interactor2"], index=False, header=None, sep=',')
@staticmethod
- def run(inputs, output_file, args, container_settings=None):
+ def run(inputs, output_file, args, container_settings=None, run_settings=None):
if not container_settings: container_settings = ProcessedContainerSettings()
+ if not run_settings: run_settings = RunSettings()
DIAMOnD.validate_required_run_args(inputs)
work_dir = '/diamond'
@@ -100,7 +102,8 @@ def run(inputs, output_file, args, container_settings=None):
volumes,
work_dir,
out_dir,
- container_settings)
+ container_settings,
+ run_settings.timeout)
except ContainerError as err:
if err.streams_contain("KeyError: 'nix'"):
raise RuntimeError(f"{err.stderr}\n" + \
diff --git a/spras/domino.py b/spras/domino.py
index 2e96a298..1e419ecf 100644
--- a/spras/domino.py
+++ b/spras/domino.py
@@ -6,6 +6,7 @@
from pydantic import BaseModel, ConfigDict
from spras.config.container_schema import ProcessedContainerSettings
+from spras.config.runs import RunSettings
from spras.config.util import BaseModel
from spras.containers import ContainerError, prepare_volume, run_container_and_log
from spras.interactome import (
@@ -79,9 +80,10 @@ def generate_inputs(data, filename_map):
header=['ID_interactor_A', 'ppi', 'ID_interactor_B'])
@staticmethod
- def run(inputs, output_file, args=None, container_settings=None):
- if not container_settings: container_settings = ProcessedContainerSettings()
+ def run(inputs, output_file, args=None, container_settings=None, run_settings=None):
if not args: args = DominoParams()
+ if not container_settings: container_settings = ProcessedContainerSettings()
+ if not run_settings: run_settings = RunSettings()
DOMINO.validate_required_run_args(inputs)
work_dir = '/spras'
@@ -117,7 +119,8 @@ def run(inputs, output_file, args=None, container_settings=None):
volumes,
work_dir,
out_dir,
- container_settings)
+ container_settings,
+ run_settings.timeout)
except ContainerError as err:
# Occurs when DOMINO gets passed some empty dataframe from network_file.
# This counts as an empty input, so we return an empty output.
@@ -151,7 +154,8 @@ def run(inputs, output_file, args=None, container_settings=None):
volumes,
work_dir,
out_dir,
- container_settings)
+ container_settings,
+ run_settings.timeout)
except ContainerError as err:
# Occurs when DOMINO gets passed some empty dataframe from network_file.
# This counts as an empty input, so we return an empty output.
diff --git a/spras/errors.py b/spras/errors.py
new file mode 100644
index 00000000..49d4b74f
--- /dev/null
+++ b/spras/errors.py
@@ -0,0 +1,83 @@
+"""
+These are errors for the SPRAS workflow: we describe these as 'artifact' information,
+as Snakemake produces artifacts, and the error/success status of these artifacts is associated with
+a separate file, named `artifact-info.json`. Note that an `artifact-info.json` file is attached to a
+part of the workflow run, which may produce multiple artifacts, and not a single artifact.
+
+This file makes some heavy use of pydantic discriminated unions and type adapters,
+both of which happen to be described in the unions page:
+https://pydantic.dev/docs/validation/latest/concepts/unions/#discriminated-unions
+"""
+
+import json
+from pathlib import Path
+from typing import Annotated, Literal, Union
+
+from pydantic import BaseModel, Field, TypeAdapter
+
+from spras.util import LoosePathLike
+
+
+class TimeoutArtifactError(BaseModel):
+ # We can't use the key `type` without some extra pydantic aliasing.
+ error_type: Literal['timeout'] = 'timeout'
+ duration: int
+
+# NOTE: when we have several errors, replace this with Union[TimeoutError, , ...]
+"""All possible distinguished errors."""
+ArtifactErrorOptions = TimeoutArtifactError
+
+class ArtifactError(BaseModel):
+ """
+ One of the two variants of artifact information describing errors. See `ArtifactSuccess` for the other option.
+
+ This variant is returned when we have a failing point in the workflow,
+ with `details` delegating to `ArtifactErrorOptions` for more information about the error.
+ """
+ details: ArtifactErrorOptions = Field(discriminator="error_type")
+ status: Literal['error'] = 'error'
+
+class ArtifactSuccess(BaseModel):
+ """
+ One of the two variants of artifact information describing successes. See `ArtifactError` for the other option.
+
+ This variant only says that this part of the workflow succeeded.
+ """
+ status: Literal['success'] = 'success'
+
+"""Describes what happened to a [potentially collection of] artifacts at a point in the workflow."""
+ArtifactInfo = Annotated[Union[ArtifactError, ArtifactSuccess], Field(discriminator="status")]
+ArtifactInfoAdapter = TypeAdapter[ArtifactInfo](ArtifactInfo)
+
+# Collection of Snakemake utilities.
+
+def artifact_info_to_str(artifact_info: ArtifactInfo) -> str:
+ """Converts some `ArtifactInfo` into a string."""
+ return json.dumps(ArtifactInfoAdapter.validate_python(artifact_info).model_dump(mode='json'))
+
+def artifact_info_from_file(file: LoosePathLike) -> ArtifactInfo:
+ """Converts a file into ArtifactInfo."""
+ with open(file, 'r') as f:
+ return ArtifactInfoAdapter.validate_python(json.load(f))
+
+def mark_error(file: LoosePathLike, artifact_error: ArtifactErrorOptions):
+ """Marks an artifact information file as an error with associated details."""
+ Path(file).write_text(artifact_info_to_str(ArtifactError(details=artifact_error)))
+
+def mark_success(file: LoosePathLike):
+ """Marks an artifact information file as successful"""
+ Path(file).write_text(artifact_info_to_str(ArtifactSuccess()))
+
+def is_error(file: LoosePathLike):
+ """Checks if a file was produced by `mark_error`. This is close to, but not the negation of, `is_success`. """
+ try:
+ return artifact_info_from_file(file).status == "error"
+ except ValueError:
+ return False
+
+def is_success(file: LoosePathLike):
+ """Checks if a file was produced by mark_success. This is close to, but not the negation of, `is_error`. """
+ try:
+ return artifact_info_from_file(file).status == "success"
+ except ValueError:
+ return False
diff --git a/spras/meo.py b/spras/meo.py
index dcb7985e..003d6a62 100644
--- a/spras/meo.py
+++ b/spras/meo.py
@@ -5,6 +5,7 @@
from pydantic import BaseModel, ConfigDict
from spras.config.container_schema import ProcessedContainerSettings
+from spras.config.runs import RunSettings
from spras.containers import prepare_volume, run_container_and_log
from spras.interactome import (
add_directionality_constant,
@@ -136,10 +137,9 @@ def generate_inputs(data, filename_map):
edges.to_csv(filename_map['edges'], sep='\t', index=False,
columns=['Interactor1', 'EdgeType', 'Interactor2', 'Weight'], header=False)
- # TODO add parameter validation
# TODO document required arguments
@staticmethod
- def run(inputs, output_file=None, args=None, container_settings=None):
+ def run(inputs, output_file, args=None, container_settings=None, run_settings=None):
"""
Run Maximum Edge Orientation in the Docker image with the provided parameters.
The properties file is generated from the provided arguments.
@@ -148,8 +148,9 @@ def run(inputs, output_file=None, args=None, container_settings=None):
Only the edge output file is retained.
All other output files are deleted.
"""
- if not container_settings: container_settings = ProcessedContainerSettings()
if not args: args = MEOParams()
+ if not container_settings: container_settings = ProcessedContainerSettings()
+ if not run_settings: run_settings = RunSettings()
MEO.validate_required_run_args(inputs)
work_dir = '/spras'
@@ -196,7 +197,8 @@ def run(inputs, output_file=None, args=None, container_settings=None):
volumes,
work_dir,
out_dir,
- container_settings)
+ container_settings,
+ run_settings.timeout)
properties_file_local.unlink(missing_ok=True)
diff --git a/spras/mincostflow.py b/spras/mincostflow.py
index 96fb0e10..60f57900 100644
--- a/spras/mincostflow.py
+++ b/spras/mincostflow.py
@@ -4,6 +4,7 @@
from pydantic import BaseModel, ConfigDict
from spras.config.container_schema import ProcessedContainerSettings
+from spras.config.runs import RunSettings
from spras.containers import prepare_volume, run_container_and_log
from spras.interactome import (
convert_undirected_to_directed,
@@ -71,9 +72,10 @@ def generate_inputs(data, filename_map):
header=False)
@staticmethod
- def run(inputs, output_file, args=None, container_settings=None):
- if not container_settings: container_settings = ProcessedContainerSettings()
+ def run(inputs, output_file, args=None, container_settings=None, run_settings=None):
if not args: args = MinCostFlowParams()
+ if not container_settings: container_settings = ProcessedContainerSettings()
+ if not run_settings: run_settings = RunSettings()
MinCostFlow.validate_required_run_args(inputs)
# the data files will be mapped within this directory within the container
@@ -122,7 +124,8 @@ def run(inputs, output_file, args=None, container_settings=None):
volumes,
work_dir,
out_dir,
- container_settings)
+ container_settings,
+ run_settings.timeout)
# Check the output of the container
out_dir_content = sorted(out_dir.glob('*.sif'))
diff --git a/spras/omicsintegrator1.py b/spras/omicsintegrator1.py
index 916b7c45..85f444b8 100644
--- a/spras/omicsintegrator1.py
+++ b/spras/omicsintegrator1.py
@@ -4,6 +4,7 @@
from pydantic import BaseModel, ConfigDict
from spras.config.container_schema import ProcessedContainerSettings
+from spras.config.runs import RunSettings
from spras.config.util import CaseInsensitiveEnum
from spras.containers import prepare_volume, run_container_and_log
from spras.dataset import MissingDataError
@@ -154,8 +155,9 @@ def generate_inputs(data, filename_map):
# TODO add support for knockout argument
# TODO add reasonable default values
@staticmethod
- def run(inputs, output_file, args, container_settings=None):
+ def run(inputs, output_file, args, container_settings=None, run_settings=None):
if not container_settings: container_settings = ProcessedContainerSettings()
+ if not run_settings: run_settings = RunSettings()
OmicsIntegrator1.validate_required_run_args(inputs, ["dummy_nodes"])
work_dir = '/spras'
@@ -227,6 +229,7 @@ def run(inputs, output_file, args, container_settings=None):
work_dir,
out_dir,
container_settings,
+ run_settings.timeout,
{'TMPDIR': mapped_out_dir})
conf_file_local.unlink(missing_ok=True)
diff --git a/spras/omicsintegrator2.py b/spras/omicsintegrator2.py
index b6c18efd..7d11d267 100644
--- a/spras/omicsintegrator2.py
+++ b/spras/omicsintegrator2.py
@@ -5,6 +5,7 @@
from pydantic import BaseModel, ConfigDict
from spras.config.container_schema import ProcessedContainerSettings
+from spras.config.runs import RunSettings
from spras.config.util import CaseInsensitiveEnum
from spras.containers import prepare_volume, run_container_and_log
from spras.dataset import Dataset, MissingDataError
@@ -105,9 +106,10 @@ def generate_inputs(data: Dataset, filename_map):
header=['protein1', 'protein2', 'cost'])
@staticmethod
- def run(inputs, output_file, args=None, container_settings=None):
- if not container_settings: container_settings = ProcessedContainerSettings()
+ def run(inputs, output_file, args=None, container_settings=None, run_settings=None):
if not args: args = OmicsIntegrator2Params()
+ if not container_settings: container_settings = ProcessedContainerSettings()
+ if not run_settings: run_settings = RunSettings()
OmicsIntegrator2.validate_required_run_args(inputs)
work_dir = '/spras'
@@ -157,6 +159,7 @@ def run(inputs, output_file, args=None, container_settings=None):
work_dir,
out_dir,
container_settings,
+ run_settings.timeout,
network_disabled=True)
# TODO do we want to retain other output files?
diff --git a/spras/pathlinker.py b/spras/pathlinker.py
index 7f070f55..6a05509f 100644
--- a/spras/pathlinker.py
+++ b/spras/pathlinker.py
@@ -4,6 +4,7 @@
from pydantic import BaseModel, ConfigDict
from spras.config.container_schema import ProcessedContainerSettings
+from spras.config.runs import RunSettings
from spras.containers import prepare_volume, run_container_and_log
from spras.dataset import Dataset
from spras.interactome import (
@@ -73,9 +74,10 @@ def generate_inputs(data, filename_map):
header=["#Interactor1","Interactor2","Weight"])
@staticmethod
- def run(inputs, output_file, args=None, container_settings=None):
- if not container_settings: container_settings = ProcessedContainerSettings()
+ def run(inputs, output_file, args=None, container_settings=None, run_settings=None):
if not args: args = PathLinkerParams()
+ if not run_settings: run_settings = RunSettings()
+ if not container_settings: container_settings = ProcessedContainerSettings()
PathLinker.validate_required_run_args(inputs)
work_dir = '/spras'
@@ -113,7 +115,8 @@ def run(inputs, output_file, args=None, container_settings=None):
volumes,
work_dir,
out_dir,
- container_settings)
+ container_settings,
+ run_settings.timeout)
# Rename the primary output file to match the desired output filename
# Currently PathLinker only writes one output file so we do not need to delete others
diff --git a/spras/prm.py b/spras/prm.py
index 18f3c8a9..d7e207a0 100644
--- a/spras/prm.py
+++ b/spras/prm.py
@@ -6,6 +6,7 @@
from pydantic import BaseModel
from spras.config.container_schema import ProcessedContainerSettings
+from spras.config.runs import RunSettings
from spras.dataset import Dataset
from spras.util import LoosePathLike
@@ -60,9 +61,10 @@ def get_params_generic(cls) -> type[T]:
# This is used in `runner.py` to avoid a dependency diamond when trying
# to import the actual algorithm schema.
@classmethod
- def run_typeless(cls, inputs: dict[str, str | os.PathLike], output_file: str | os.PathLike, args: dict[str, Any], container_settings: ProcessedContainerSettings):
+ def run_typeless(cls, inputs: dict[str, str | os.PathLike], output_file: str | os.PathLike, args: dict[str, Any], container_settings: ProcessedContainerSettings, run_settings: RunSettings):
"""
- This is similar to PRA.run, but it does pydantic logic internally to re-validate argument parameters.
+ This is similar to PRA.run, but `args` is a dictionary and not a pydantic structure.
+ However, this method still re-validates `args` against the associated pydantic PRM argument model.
"""
T_class = cls.get_params_generic()
@@ -76,17 +78,23 @@ def run_typeless(cls, inputs: dict[str, str | os.PathLike], output_file: str | o
# (Pydantic already provides nice error messages, so we don't need to worry about catching this.)
T_parsed = T_class.model_validate(args)
- return cls.run(inputs, output_file, T_parsed, container_settings)
+ return cls.run(inputs, output_file, T_parsed, container_settings, run_settings)
@staticmethod
@abstractmethod
- def run(inputs: dict[str, str | os.PathLike], output_file: str | os.PathLike, args: T, container_settings: ProcessedContainerSettings):
+ def run(inputs: dict[str, str | os.PathLike], output_file: str | os.PathLike, args: T, container_settings: ProcessedContainerSettings, run_settings: RunSettings):
"""
- Runs an algorithm with the specified inputs, algorithm params (T),
- the designated output_file, and the desired container_settings.
-
- See the algorithm-specific `generate_inputs` and `parse_output`
+ Runs an algorithm.
+ @param inputs: specified inputs
+ @param output_file: designated reconstructed pathway output
+ @param args: (T) typed algorithm params
+ @param container_settings: what settings should be associated with the individual container.
+ @param run_settings: The particular run settings to use. See `RunSettings` for more info.
+
+ See the algorithm-specific `PRM.generate_inputs` and `PRM.parse_output`
for information about the input and output format.
+
+ Also see `PRM.run_typeless` for the non-pydantic version of this method (where `args` is a dict).
"""
raise NotImplementedError
diff --git a/spras/responsenet.py b/spras/responsenet.py
index f53b5984..c3c599cd 100644
--- a/spras/responsenet.py
+++ b/spras/responsenet.py
@@ -3,6 +3,7 @@
from pydantic import BaseModel, ConfigDict
from spras.config.container_schema import ProcessedContainerSettings
+from spras.config.runs import RunSettings
from spras.containers import prepare_volume, run_container_and_log
from spras.interactome import (
convert_undirected_to_directed,
@@ -63,10 +64,11 @@ def generate_inputs(data, filename_map):
header=False)
@staticmethod
- def run(inputs, output_file, args=None, container_settings=None):
+ def run(inputs, output_file, args=None, container_settings=None, run_settings=None):
+ if not args: args = ResponseNetParams()
+ if not run_settings: run_settings = RunSettings()
if not container_settings: container_settings = ProcessedContainerSettings()
ResponseNet.validate_required_run_args(inputs)
- if not args: args = ResponseNetParams()
# the data files will be mapped within this directory within the container
work_dir = '/ResponseNet'
@@ -112,7 +114,8 @@ def run(inputs, output_file, args=None, container_settings=None):
volumes,
work_dir,
out_dir,
- container_settings)
+ container_settings,
+ run_settings.timeout)
# Rename the primary output file to match the desired output filename
out_file_suffixed.rename(output_file)
diff --git a/spras/runner.py b/spras/runner.py
index fb9391f9..994cf5c7 100644
--- a/spras/runner.py
+++ b/spras/runner.py
@@ -1,8 +1,11 @@
+from os import PathLike
from typing import Any, Mapping
# supported algorithm imports
from spras.allpairs import AllPairs
from spras.btb import BowTieBuilder
+from spras.config.container_schema import ProcessedContainerSettings
+from spras.config.runs import RunSettings
from spras.dataset import Dataset, DatasetSchema
from spras.diamond import DIAMOnD
from spras.domino import DOMINO
@@ -38,14 +41,21 @@ def get_algorithm(algorithm: str) -> type[PRM]:
except KeyError as exc:
raise NotImplementedError(f'{algorithm} is not currently supported.') from exc
-def run(algorithm: str, inputs, output_file, args, container_settings):
+def run(
+ algorithm: str,
+ inputs: dict[str, str | PathLike],
+ output_file: str | PathLike,
+ args: dict[str, Any],
+ container_settings: ProcessedContainerSettings,
+ run_settings: RunSettings
+):
"""
A generic interface to the algorithm-specific run functions
"""
algorithm_runner = get_algorithm(algorithm)
# We can't use config.config here else we would get a cyclic dependency.
# Since args is a dict here, we use the 'run_typeless' utility PRM function.
- algorithm_runner.run_typeless(inputs, output_file, args, container_settings)
+ algorithm_runner.run_typeless(inputs, output_file, args, container_settings, run_settings)
def get_required_inputs(algorithm: str):
diff --git a/spras/rwr.py b/spras/rwr.py
index e85eef8a..38bc4534 100644
--- a/spras/rwr.py
+++ b/spras/rwr.py
@@ -5,6 +5,7 @@
from pydantic import BaseModel, ConfigDict
from spras.config.container_schema import ProcessedContainerSettings
+from spras.config.runs import RunSettings
from spras.containers import prepare_volume, run_container_and_log
from spras.dataset import Dataset
from spras.interactome import (
@@ -52,8 +53,9 @@ def generate_inputs(data, filename_map):
edges.to_csv(filename_map['network'],sep='|',index=False,columns=['Interactor1','Interactor2'],header=False)
@staticmethod
- def run(inputs, output_file, args, container_settings=None):
+ def run(inputs, output_file, args, container_settings=None, run_settings=None):
if not container_settings: container_settings = ProcessedContainerSettings()
+ if not run_settings: run_settings = RunSettings()
RWR.validate_required_run_args(inputs)
with Path(inputs["network"]).open() as network_f:
@@ -99,7 +101,8 @@ def run(inputs, output_file, args, container_settings=None):
volumes,
work_dir,
out_dir,
- container_settings)
+ container_settings,
+ run_settings.timeout)
# Rename the primary output file to match the desired output filename
output_edges = Path(out_dir, 'output.txt')
diff --git a/spras/strwr.py b/spras/strwr.py
index ea5bc274..b3fd86c3 100644
--- a/spras/strwr.py
+++ b/spras/strwr.py
@@ -4,6 +4,7 @@
from pydantic import BaseModel, ConfigDict
from spras.config.container_schema import ProcessedContainerSettings
+from spras.config.runs import RunSettings
from spras.containers import prepare_volume, run_container_and_log
from spras.dataset import Dataset
from spras.interactome import (
@@ -52,8 +53,9 @@ def generate_inputs(data, filename_map):
edges.to_csv(filename_map['network'],sep='|',index=False,columns=['Interactor1','Interactor2'],header=False)
@staticmethod
- def run(inputs, output_file, args, container_settings=None):
+ def run(inputs, output_file, args, container_settings=None, run_settings=None):
if not container_settings: container_settings = ProcessedContainerSettings()
+ if not run_settings: run_settings = RunSettings()
ST_RWR.validate_required_run_args(inputs)
with Path(inputs["network"]).open() as network_f:
@@ -104,7 +106,8 @@ def run(inputs, output_file, args, container_settings=None):
volumes,
work_dir,
out_dir,
- container_settings)
+ container_settings,
+ run_settings.timeout)
# Rename the primary output file to match the desired output filename
output_edges = Path(out_dir, 'output.txt')
diff --git a/test/analysis/input/egfr.yaml b/test/analysis/input/egfr.yaml
index c9ed5f73..46bbfe00 100644
--- a/test/analysis/input/egfr.yaml
+++ b/test/analysis/input/egfr.yaml
@@ -12,14 +12,16 @@ algorithms:
include: true
runs:
run1:
- k: [10, 20]
+ params:
+ k: [10, 20]
- name: meo
include: true
runs:
run1:
- local_search: true
- max_path_length: 3
- rand_restarts: 10
+ params:
+ local_search: true
+ max_path_length: 3
+ rand_restarts: 10
datasets:
- data_dir: "input"
edge_files:
diff --git a/test/analysis/input/example.yaml b/test/analysis/input/example.yaml
index 1a4514c0..e80097cf 100644
--- a/test/analysis/input/example.yaml
+++ b/test/analysis/input/example.yaml
@@ -12,22 +12,25 @@ algorithms:
include: true
runs:
run1:
- k: range(100,201,100)
+ params:
+ k: range(100,201,100)
- name: "meo"
include: true
runs:
run1:
- max_path_length: [3]
- local_search: ["Yes"]
- rand_restarts: [10]
+ params:
+ max_path_length: [3]
+ local_search: ["Yes"]
+ rand_restarts: [10]
- name: "mincostflow"
include: true
runs:
run1:
- flow: [1] # The flow must be an int
- capacity: [1]
+ params:
+ flow: [1] # The flow must be an int
+ capacity: [1]
- name: "allpairs"
include: true
diff --git a/test/errors/expected/error.json b/test/errors/expected/error.json
new file mode 100644
index 00000000..cf0bb2c4
--- /dev/null
+++ b/test/errors/expected/error.json
@@ -0,0 +1 @@
+{"details": {"error_type": "timeout", "duration": 1}, "status": "error"}
\ No newline at end of file
diff --git a/test/errors/expected/success.json b/test/errors/expected/success.json
new file mode 100644
index 00000000..0083a06a
--- /dev/null
+++ b/test/errors/expected/success.json
@@ -0,0 +1 @@
+{"status": "success"}
\ No newline at end of file
diff --git a/test/errors/test_errors.py b/test/errors/test_errors.py
new file mode 100644
index 00000000..56295be3
--- /dev/null
+++ b/test/errors/test_errors.py
@@ -0,0 +1,26 @@
+import filecmp
+from pathlib import Path
+from spras.errors import ArtifactError, ArtifactSuccess, TimeoutArtifactError, mark_error, is_error, is_success, artifact_info_from_file, mark_success
+
+OUTPUT_DIR = Path('test', 'errors', 'output')
+EXPECTED_DIR = Path('test', 'errors', 'expected')
+
+class TestErrors:
+ def test_error(self):
+ OUTPUT_DIR.mkdir(exist_ok=True)
+
+ artifact_error = TimeoutArtifactError(duration=1)
+ mark_error(OUTPUT_DIR / 'error.json', artifact_error)
+ assert filecmp.cmp(EXPECTED_DIR / 'error.json', OUTPUT_DIR / 'error.json', shallow=True)
+ assert is_error(OUTPUT_DIR / 'error.json')
+ assert not is_success(OUTPUT_DIR / 'error.json')
+ assert artifact_info_from_file(OUTPUT_DIR / 'error.json') == ArtifactError(details=artifact_error)
+
+ def test_success(self):
+ OUTPUT_DIR.mkdir(exist_ok=True)
+
+ mark_success(OUTPUT_DIR / 'success.json')
+ assert filecmp.cmp(EXPECTED_DIR / 'success.json', OUTPUT_DIR / 'success.json', shallow=True)
+ assert is_success(OUTPUT_DIR / 'success.json')
+ assert not is_error(OUTPUT_DIR / 'success.json')
+ assert artifact_info_from_file(OUTPUT_DIR / 'success.json') == ArtifactSuccess()
diff --git a/test/generate-inputs/inputs/test_config.yaml b/test/generate-inputs/inputs/test_config.yaml
index 33900bdb..a4aa1e81 100644
--- a/test/generate-inputs/inputs/test_config.yaml
+++ b/test/generate-inputs/inputs/test_config.yaml
@@ -11,40 +11,46 @@ algorithms:
include: true
runs:
run1:
- k: range(100,201,100)
+ params:
+ k: range(100,201,100)
- name: "omicsintegrator1"
include: true
runs:
run1:
- b: [5, 6]
- w: np.linspace(0,5,2)
- d: 10
+ params:
+ b: [5, 6]
+ w: np.linspace(0,5,2)
+ d: 10
- name: "omicsintegrator2"
include: true
runs:
run1:
- b: 4
- g: 0
+ params:
+ b: 4
+ g: 0
run2:
- b: 2
- g: 3
+ params:
+ b: 2
+ g: 3
- name: "meo"
include: true
runs:
run1:
- max_path_length: 3
- local_search: true
- rand_restarts: 10
+ params:
+ max_path_length: 3
+ local_search: true
+ rand_restarts: 10
- name: "mincostflow"
include: true
runs:
run1:
- flow: 1 # The flow must be an int
- capacity: 1
+ params:
+ flow: 1 # The flow must be an int
+ capacity: 1
- name: "allpairs"
include: true
@@ -53,8 +59,9 @@ algorithms:
include: true
runs:
run1:
- slice_threshold: 0.3
- module_threshold: 0.05
+ params:
+ slice_threshold: 0.3
+ module_threshold: 0.05
datasets:
- # Labels can only contain letters, numbers, or underscores
diff --git a/test/test_config.py b/test/test_config.py
index 41551c38..9e7ff40c 100644
--- a/test/test_config.py
+++ b/test/test_config.py
@@ -66,27 +66,27 @@ def get_test_config():
"name": "omicsintegrator2",
"include": True,
"runs": {
- "strings": {"dummy_mode": ["terminals", "others"], "b": 3},
+ "strings": {"params": {"dummy_mode": ["terminals", "others"], "b": 3}},
# spacing in np.linspace is on purpose
- "singleton_string_np_linspace": {"dummy_mode": "terminals", "b": "np.linspace(0, 5,2,)"},
- "str_array_np_logspace": {"dummy_mode": ["others", "all"], "g": "np.logspace(1,1)"}
+ "singleton_string_np_linspace": {"params": {"dummy_mode": "terminals", "b": "np.linspace(0, 5,2,)"}},
+ "str_array_np_logspace": {"params": {"dummy_mode": ["others", "all"], "g": "np.logspace(1,1)"}}
}
},
{
"name": "meo",
"include": True,
"runs": {
- "numbersAndBoolsDuplicate": {"max_path_length": 1, "rand_restarts": [float(2.0), 3], "local_search": [True, False]},
- "numbersAndBool": {"max_path_length": 2, "rand_restarts": [float(2.0), 3], "local_search": [True]},
- "numbersAndBools": {"max_path_length": 1, "rand_restarts": [float(2.0), 3], "local_search": [True, False]},
- "boolArrTest": {"local_search": [True, False], "max_path_length": "range(1, 3)"}
+ "numbersAndBoolsDuplicate": {"params": {"max_path_length": 1, "rand_restarts": [float(2.0), 3], "local_search": [True, False]}},
+ "numbersAndBool": {"params": {"max_path_length": 2, "rand_restarts": [float(2.0), 3], "local_search": [True]}},
+ "numbersAndBools": {"params": {"max_path_length": 1, "rand_restarts": [float(2.0), 3], "local_search": [True, False]}},
+ "boolArrTest": {"params": {"local_search": [True, False], "max_path_length": "range(1, 3)"}}
}
},
{
"name": "mincostflow",
"include": True,
"runs": {
- "int64artifact": {"flow": "np.arange(5, 7)", "capacity": [2, 3]}
+ "int64artifact": {"params": {"flow": "np.arange(5, 7)", "capacity": [2, 3]}}
}
},
],