Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
f81a33e
feat: timeout
tristan-f-r Jan 12, 2026
0342b5c
feat: snakemake err checkpoint
tristan-f-r Jan 12, 2026
841d242
fix: use timeout correctly
tristan-f-r Jan 12, 2026
75fd7f1
fix: filter files w/ errors
tristan-f-r Jan 13, 2026
7abd709
fix: correct timeout order
tristan-f-r Jan 13, 2026
e07c961
fix(cytoscape): specify optional timeout
tristan-f-r Jan 13, 2026
d5b7e18
chore(Snakefile): decheckpointify reconstruct
tristan-f-r Jan 13, 2026
111e53f
perf(Snakefile): make is_error check not consume the entire file
tristan-f-r Jan 13, 2026
c2febff
Merge branch 'main' into timeout-arg
tristan-f-r Jan 31, 2026
cc46eed
docs: timeout
tristan-f-r Apr 23, 2026
83c5ed0
docs: clarification on container_obj
tristan-f-r Apr 23, 2026
71c1976
docs: remove the strange comment
tristan-f-r Apr 23, 2026
6e60afe
refactor: use mark_error and is_error more often
tristan-f-r Apr 23, 2026
7ce0580
Merge branch 'umain' into timeout-arg
tristan-f-r Apr 23, 2026
699ddca
style: fmt
tristan-f-r Apr 23, 2026
4e3c28f
docs: on errors
tristan-f-r Apr 25, 2026
a322f4d
fix: tests and such
tristan-f-r Apr 25, 2026
641608f
feat: singularity timeouts
tristan-f-r Apr 25, 2026
c5ff6ad
docs: mention works with apptainer
tristan-f-r Apr 25, 2026
208eb4a
fix: don't use capture_output and stderr in the same command
tristan-f-r Apr 25, 2026
7a0c4f3
fix: use correct variable reference for reconstruct
tristan-f-r Apr 25, 2026
fe68153
refactor: move errors to be pydantic, add duration cmt
tristan-f-r May 4, 2026
071d4bd
style: fmt
tristan-f-r May 4, 2026
e42c3f0
fix: regenerate fordevs
tristan-f-r May 4, 2026
28cfec0
feat: `RunSettings`
tristan-f-r May 4, 2026
5291335
test(test_config): use correct config
tristan-f-r May 4, 2026
efa45fc
test(generate_inputs): fix config
tristan-f-r May 4, 2026
cae2edd
docs(algorithms): fix doc ordering
tristan-f-r May 4, 2026
0475d45
refactor: move params index grabbing into func
tristan-f-r May 4, 2026
114b608
feat: conditional runs
tristan-f-r May 7, 2026
91fe5eb
fix: actually pass in run settings
tristan-f-r May 7, 2026
2eb0d57
fix: no cyclic imports
tristan-f-r May 7, 2026
15d00a6
docs(timeout): use nested params
tristan-f-r May 7, 2026
73f7b8d
fix: properly define validate_duration
tristan-f-r May 7, 2026
4a95d79
fix(Snakefile): properly fetch run settings
tristan-f-r May 7, 2026
f02c9b4
Merge branch 'timeout-arg' into conditional-runs
tristan-f-r May 8, 2026
35387b4
fix: add conditionals to runs
tristan-f-r May 8, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 76 additions & 20 deletions Snakefile
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import os
from spras import runner
import shutil
import json
import yaml
from pathlib import Path
from spras.containers import TimeoutError
from spras.dataset import Dataset
from spras.evaluation import Evaluation
from spras.analysis import ml, summary, cytoscape
from spras.config.revision import detach_spras_revision
import spras.config.config as _config
from spras.errors import mark_error, mark_success, is_error, TimeoutArtifactError, FailedDependencyError

# Snakemake updated the behavior in the 6.5.0 release https://github.com/snakemake/snakemake/pull/1037
# and using the wrong separator prevents Snakemake from matching filenames to the rules that can produce them
Expand Down Expand Up @@ -44,11 +48,14 @@ def algo_has_mult_param_combos(algo):

algorithms_mult_param_combos = [algo for algo in algorithms if algo_has_mult_param_combos(algo)]

# Gets the associated parameter hash out of a params wildcard
def params_index(params_hash):
return params_hash.replace('params-', '')

# Get the parameter dictionary for the specified
# algorithm and parameter combination hash
def reconstruction_params(algorithm, params_hash):
index = params_hash.replace('params-', '')
return algorithm_params[algorithm][index]
return algorithm_params[algorithm][params_index(params_hash)]

# Log the parameter dictionary for this parameter configuration in a yaml file
def write_parameter_log(algorithm, param_label, logfile):
Expand Down Expand Up @@ -262,33 +269,82 @@ def collect_prepared_input(wildcards):

return prepared_inputs

def collect_dependent_artifact_info(wildcards):
# Get the associated runs that this run depends on
dependent_runs = _config.config.conditional_run_dependencies[params_index(wildcards.params)]
return [
SEP.join([out_dir, f'{wildcards.dataset}-{wildcards.algorithm}-params-{params}', 'artifact-log.json'])
for params in dependent_runs
]

def filter_successful(files):
"""Convenient function for filtering iterators by whether or not their items are error files."""
return [file for file in files if not is_error(file)]

def filter_error(files):
"""This function is precisely described as the list difference of `files` and `filter_successful(files)`"""
return [file for file in files if is_error(file)]

# Run the pathway reconstruction algorithm
rule reconstruct:
input: collect_prepared_input
input:
prepared_files=collect_prepared_input,
required_artifact_info=collect_dependent_artifact_info
# Each reconstruct call should be in a separate output subdirectory that is unique for the parameter combination so
# that multiple instances of the container can run simultaneously without overwriting the output files
# Overwriting files can happen because the pathway reconstruction algorithms often generate output files with the
# same name regardless of the inputs or parameters, and these aren't renamed until after the container command
# terminates
output: pathway_file = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'raw-pathway.txt'])
output:
pathway_file = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'raw-pathway.txt']),
# Despite this being a 'log' file, we don't use the log directive as this rule doesn't actually throw errors.
artifact_info = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'artifact-log.json'])
params:
# Get the timeout from the config and use it as an input.
# TODO: This has unexpected behavior when this rule succeeds but the timeout extends,
# making this rule run again.
timeout = lambda wildcards: _config.config.algorithm_param_run_settings[params_index(wildcards.params)].timeout
run:
successful_runs = filter_successful(input.required_artifact_info)
errorful_runs = filter_error(input.required_artifact_info)
# NOTE: conditionals happen as a big OR statement, so we are looking for at least one success.
if len(successful_runs) == 0 and len(errorful_runs) != 0:
# We don't raise the error here (analogous to `--keep-going`, except we avoid unnecessarily re-running this rule.)
mark_error(output.artifact_info, FailedDependencyError(failing_dependencies=errorful_runs))
# and we touch pathway_file still: Snakemake doesn't have optional files, so we output a 'artifact info' file,
# which contains the status (success/failure) of specific Snakemake jobs.
# We filter for the successful files (such as ones that didn't time out) with the `filter_successful` function.
Path(output.pathway_file).touch()

# Create a copy so that the updates are not written to the parameters logfile
params = reconstruction_params(wildcards.algorithm, wildcards.params).copy()
algorithm_params = reconstruction_params(wildcards.algorithm, wildcards.params).copy()
# Declare the input files as a dictionary.
inputs = dict(zip(runner.get_required_inputs(detach_spras_revision(_config.config.immutable_files, wildcards.algorithm)), *{input}, strict=True))
inputs = dict(zip(runner.get_required_inputs(detach_spras_revision(_config.config.immutable_files, wildcards.algorithm)), *{input.prepared_files}, strict=True))
# Remove the _spras_run_name parameter added for keeping track of the run name for parameters.yml
if '_spras_run_name' in params:
params.pop('_spras_run_name')
runner.run(detach_spras_revision(_config.config.immutable_files, wildcards.algorithm), inputs, output.pathway_file, params, container_settings)
if '_spras_run_name' in algorithm_params:
algorithm_params.pop('_spras_run_name')
try:
runner.run(detach_spras_revision(_config.config.immutable_files, wildcards.algorithm), inputs, output.pathway_file, algorithm_params, container_settings, params.timeout)
mark_success(output.artifact_info)
except TimeoutError as err:
# See the above notes on conditional runs for precisely why we write this as is.
mark_error(output.artifact_info, TimeoutArtifactError(duration=params.timeout))
Path(output.pathway_file).touch()

# Original pathway reconstruction output to universal output
# Use PRRunner as a wrapper to call the algorithm-specific parse_output
rule parse_output:
input:
raw_file = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'raw-pathway.txt']),
# We propagate up the artifact_info error if it exists.
artifact_info = rules.reconstruct.output.artifact_info,
raw_file = rules.reconstruct.output.pathway_file,
dataset_file = SEP.join([out_dir, 'dataset-{dataset}-merged.pickle'])
output: standardized_file = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'pathway.txt'])
run:
if is_error(input.artifact_info):
mark_error(output.standardized_file)
return

params = reconstruction_params(wildcards.algorithm, wildcards.params).copy()
params['dataset'] = input.dataset_file
runner.parse_output(detach_spras_revision(_config.config.immutable_files, wildcards.algorithm), input.raw_file, output.standardized_file, params)
Expand All @@ -310,7 +366,7 @@ rule viz_cytoscape:
output:
session = SEP.join([out_dir, '{dataset}-cytoscape.cys'])
run:
cytoscape.run_cytoscape(input.pathways, output.session, container_settings)
cytoscape.run_cytoscape(filter_successful(input.pathways), output.session, container_settings)


# Write a single summary table for all pathways for each dataset
Expand All @@ -323,7 +379,7 @@ rule summary_table:
run:
# Load the node table from the pickled dataset file
node_table = Dataset.from_file(input.dataset_file).node_table
summary_df = summary.summarize_networks(input.pathways, node_table, algorithm_params, algorithms_with_params)
summary_df = summary.summarize_networks(filter_successful(input.pathways), node_table, algorithm_params, algorithms_with_params)
summary_df.to_csv(output.summary_table, sep='\t', index=False)

# Cluster the output pathways for each dataset
Expand All @@ -339,7 +395,7 @@ rule ml_analysis:
hac_image_horizontal = SEP.join([out_dir, '{dataset}-ml', 'hac-horizontal.png']),
hac_clusters_horizontal = SEP.join([out_dir, '{dataset}-ml', 'hac-clusters-horizontal.txt']),
run:
summary_df = ml.summarize_networks(input.pathways)
summary_df = ml.summarize_networks(filter_successful(input.pathways))
ml.hac_vertical(summary_df, output.hac_image_vertical, output.hac_clusters_vertical, **hac_params)
ml.hac_horizontal(summary_df, output.hac_image_horizontal, output.hac_clusters_horizontal, **hac_params)
ml.pca(summary_df, output.pca_image, output.pca_variance, output.pca_coordinates, **pca_params)
Expand All @@ -353,7 +409,7 @@ rule jaccard_similarity:
jaccard_similarity_matrix = SEP.join([out_dir, '{dataset}-ml', 'jaccard-matrix.txt']),
jaccard_similarity_heatmap = SEP.join([out_dir, '{dataset}-ml', 'jaccard-heatmap.png'])
run:
summary_df = ml.summarize_networks(input.pathways)
summary_df = ml.summarize_networks(filter_successful(input.pathways))
ml.jaccard_similarity_eval(summary_df, output.jaccard_similarity_matrix, output.jaccard_similarity_heatmap)


Expand All @@ -364,7 +420,7 @@ rule ensemble:
output:
ensemble_network_file = SEP.join([out_dir,'{dataset}-ml', 'ensemble-pathway.txt'])
run:
summary_df = ml.summarize_networks(input.pathways)
summary_df = ml.summarize_networks(filter_successful(input.pathways))
ml.ensemble_network(summary_df, output.ensemble_network_file)

# Returns all pathways for a specific algorithm
Expand All @@ -386,7 +442,7 @@ rule ml_analysis_aggregate_algo:
hac_image_horizontal = SEP.join([out_dir, '{dataset}-ml', '{algorithm}-hac-horizontal.png']),
hac_clusters_horizontal = SEP.join([out_dir, '{dataset}-ml', '{algorithm}-hac-clusters-horizontal.txt']),
run:
summary_df = ml.summarize_networks(input.pathways)
summary_df = ml.summarize_networks(filter_successful(input.pathways))
ml.hac_vertical(summary_df, output.hac_image_vertical, output.hac_clusters_vertical, **hac_params)
ml.hac_horizontal(summary_df, output.hac_image_horizontal, output.hac_clusters_horizontal, **hac_params)
ml.pca(summary_df, output.pca_image, output.pca_variance, output.pca_coordinates, **pca_params)
Expand All @@ -398,7 +454,7 @@ rule ensemble_per_algo:
output:
ensemble_network_file = SEP.join([out_dir,'{dataset}-ml', '{algorithm}-ensemble-pathway.txt'])
run:
summary_df = ml.summarize_networks(input.pathways)
summary_df = ml.summarize_networks(filter_successful(input.pathways))
ml.ensemble_network(summary_df, output.ensemble_network_file)

# Calculated Jaccard similarity between output pathways for each dataset per algorithm
Expand All @@ -409,7 +465,7 @@ rule jaccard_similarity_per_algo:
jaccard_similarity_matrix = SEP.join([out_dir, '{dataset}-ml', '{algorithm}-jaccard-matrix.txt']),
jaccard_similarity_heatmap = SEP.join([out_dir, '{dataset}-ml', '{algorithm}-jaccard-heatmap.png'])
run:
summary_df = ml.summarize_networks(input.pathways)
summary_df = ml.summarize_networks(filter_successful(input.pathways))
ml.jaccard_similarity_eval(summary_df, output.jaccard_similarity_matrix, output.jaccard_similarity_heatmap)

# Return the gold standard pickle file for a specific gold standard
Expand Down Expand Up @@ -440,7 +496,7 @@ rule evaluation_pr_per_pathways:
node_pr_png = SEP.join([out_dir, '{dataset_gold_standard_pair}-eval', 'pr-per-pathway-nodes.png']),
run:
node_table = Evaluation.from_file(input.node_gold_standard_file).node_table
pr_df = Evaluation.node_precision_and_recall(input.pathways, node_table)
pr_df = Evaluation.node_precision_and_recall(filter_successful(input.pathways), node_table)
Evaluation.precision_and_recall_per_pathway(pr_df, output.node_pr_file, output.node_pr_png)

# Returns all pathways for a specific algorithm and dataset
Expand All @@ -459,7 +515,7 @@ rule evaluation_per_algo_pr_per_pathways:
node_pr_png = SEP.join([out_dir, '{dataset_gold_standard_pair}-eval', 'pr-per-pathway-for-{algorithm}-nodes.png']),
run:
node_table = Evaluation.from_file(input.node_gold_standard_file).node_table
pr_df = Evaluation.node_precision_and_recall(input.pathways, node_table)
pr_df = Evaluation.node_precision_and_recall(filter_successful(input.pathways), node_table)
Evaluation.precision_and_recall_per_pathway(pr_df, output.node_pr_file, output.node_pr_png, include_aggregate_algo_eval)

# Return pathway summary file per dataset
Expand Down
56 changes: 34 additions & 22 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,65 +73,75 @@ algorithms:
include: true
runs:
run1:
k: range(100,201,100)
params:
k: range(100,201,100)

- name: "omicsintegrator1"
include: true
runs:
run1:
b: [5, 6]
w: np.linspace(0,5,2)
d: 10
dummy_mode: "file" # Or "terminals", "all", "others"
params:
b: [5, 6]
w: np.linspace(0,5,2)
d: 10
dummy_mode: "file" # Or "terminals", "all", "others"

- name: "omicsintegrator2"
include: true
runs:
run1:
b: 4
g: 0
params:
b: 4
g: 0
run2:
b: 2
g: 3
params:
b: 2
g: 3

- name: "meo"
include: true
runs:
run1:
max_path_length: 3
local_search: true
rand_restarts: 10
params:
max_path_length: 3
local_search: true
rand_restarts: 10

- name: "mincostflow"
include: true
runs:
run1:
flow: 1
capacity: 1
params:
Comment thread
tristan-f-r marked this conversation as resolved.
flow: 1
capacity: 1

- name: "allpairs"
include: true
timeout: 1d

- name: "domino"
include: true
runs:
run1:
slice_threshold: 0.3
module_threshold: 0.05
params:
slice_threshold: 0.3
module_threshold: 0.05

- name: "strwr"
include: true
runs:
run1:
alpha: [0.85]
threshold: [100, 200]
params:
alpha: [0.85]
threshold: [100, 200]

- name: "rwr"
include: true
runs:
run1:
alpha: [0.85]
threshold: [100, 200]
params:
alpha: [0.85]
threshold: [100, 200]

- name: "bowtiebuilder"
include: true
Expand All @@ -140,12 +150,14 @@ algorithms:
include: true
runs:
run1:
gamma: [10]
params:
gamma: [10]
- name: "diamond"
include: true
runs:
run1:
n: 1
params:
n: 1

# Here we specify which pathways to run and other file location information.
# DataLoader.py can currently only load a single dataset
Expand Down
Loading
Loading