From 36fc1e9ff2fdae49d8976579ebc03a6b2279b7b2 Mon Sep 17 00:00:00 2001 From: teslajoy Date: Tue, 19 May 2026 06:00:32 -0700 Subject: [PATCH 1/6] adds niche-level aucell enrichment, generalize package, repro fixes --- .gitignore | 5 +- README.md | 36 ++++- gpath2vec/aucell.py | 231 ++++++++++++++++++++++++++++++++ gpath2vec/cli.py | 213 ++++++++++++++++++++++++++--- gpath2vec/compare.py | 13 +- gpath2vec/ea.py | 85 ++++++++---- gpath2vec/embedder.py | 9 +- gpath2vec/net.py | 22 ++- pull_reactome_cache.sh | 47 +++++++ setup.py | 13 +- tests/test_ea_aggregation.py | 72 ++++++++++ tests/test_metapath_topology.py | 111 +++++++++++++++ 12 files changed, 800 insertions(+), 57 deletions(-) create mode 100644 gpath2vec/aucell.py create mode 100755 pull_reactome_cache.sh create mode 100644 tests/test_ea_aggregation.py create mode 100644 tests/test_metapath_topology.py diff --git a/.gitignore b/.gitignore index 0f93878..4aa7e81 100644 --- a/.gitignore +++ b/.gitignore @@ -174,4 +174,7 @@ cython_debug/ .pypirc .idea -scripts/ \ No newline at end of file +scripts/ +paper_draft.* +TODO.md +arm_topology_A_B_C.png diff --git a/README.md b/README.md index 42c2a92..0d416be 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,8 @@ a python package for converting gene sets to biological pathway embeddings with gene sets (from clusters, niches, studies) are tested against reactome pathways via fisher's exact test, then embedded into a shared vector space using metapath2vec over the pathway hierarchy graph. +enrichment is either fisher's exact (a binary top-N gene set per cluster/niche) or **niche-level AUCell** (the full per-niche expression ranking, no gene-set selection step). AUCell here scores each niche's **aggregated pseudobulk** profile, one score per (niche, pathway); it is **not single-cell AUCell** (the package never sees individual cells, niche construction is upstream). both enrichment sources feed the same reactome hierarchy graph and metapath2vec embedding. + ![gpath2vec.png](./img/gpath2vec.png) ## pipeline @@ -89,7 +91,8 @@ the result is a shared embedding space where: ## install ```bash -pip install -e . +pip install -e . # fisher enrichment path +pip install -e '.[aucell]' # adds the niche-level AUCell path (decoupler, anndata) ``` ## usage @@ -166,6 +169,17 @@ gpath2vec embeddings --network-path net.pkl --method vae --ea-matrix-path ea_mat # full pipeline with method choice gpath2vec end2end --genes "EGFR,EGF" --level low --method vae --output-dir output/ + +# niche pipeline: enrichment -> graph -> embeddings in one command. +# --enrichment fisher : binary top-N gene set per niche, fisher's exact + fdr. +# --enrichment aucell : niche-level AUCell on each niche's aggregated pseudobulk +# (one score per niche, NOT single-cell), per-niche top-k as edges. +# inputs: --niche-matrix (niches x genes .npz/.npy), --genes (.npy gene order), +# --niche-meta (parquet with a niche_id column). +gpath2vec niche-pipeline \ + --niche-matrix niches.npz --genes genes.npy --niche-meta niche_meta.parquet \ + --enrichment aucell --reactome-level low --topk 50 \ + --reactome-dir /path/to/reactome/cache --out-dir output/ ``` ## embedding methods @@ -219,3 +233,23 @@ reactome data is downloaded once and cached to `~/.gpath2vec/cache/`. set `GPATH ```bash export GPATH2VEC_REACTOME_DIR=/path/to/reactome/files ``` + +### offline / hpc (air-gapped compute nodes) + +clusters like ARC have no internet on compute nodes, so the cache must be staged from a node that does have internet (ex. a login node): + +```bash +./pull_reactome_cache.sh /shared/path/reactome_cache # run where there IS internet +``` + +this drives the package's real fetchers, so the cache matches exactly what gpath2vec expects. on the compute node, point at it without re-downloading: + +```bash +gpath2vec niche-pipeline ... --reactome-dir /shared/path/reactome_cache +# or: export GPATH2VEC_REACTOME_DIR=/shared/path/reactome_cache +``` + +## todo + +- **edge2vec**: edge-type transition-matrix biased walks as an embedding method. +- **lorentz (hyperbolic) pipeline** diff --git a/gpath2vec/aucell.py b/gpath2vec/aucell.py new file mode 100644 index 0000000..873e78d --- /dev/null +++ b/gpath2vec/aucell.py @@ -0,0 +1,231 @@ +"""AUCell per-niche pathway scoring, as an alternative enrichment source. + +WHAT THIS COMPUTES (read this before using it) +---------------------------------------------- +The unit of analysis is the *niche*: an upstream-defined spatial group of cells +(e.g. a center spot plus its spatial neighbors), summarized as one aggregated +"pseudobulk" expression vector per niche. The input here is therefore an +(n_niches x n_genes) matrix where each row is that niche's aggregated counts. + +For each niche, AUCell: + 1. ranks all genes by the niche's aggregated expression (descending), + 2. for each Reactome pathway gene set, walks down that ranking and integrates + the recovery curve (how early the pathway's genes appear) over the top + `n_up` features. With decoupler's default (n_up=None) the recovery window + is the top 5% of ranked genes by magnitude. + 3. returns one continuous score in [0, ~1] per (niche, pathway): high = the + pathway's genes are concentrated among the niche's most-expressed genes. + +There is no per-niche gene-set selection step (no top-100, no MAD/mean): AUCell +uses the *full* gene ranking, so the "which genes count per niche" choice that +Fisher's exact requires does not exist here. The only parameters are the +gene-set size band and the AUC recovery window (n_up); both are logged. + +This is a faithful generalization of the validated niche_aucell.py pipeline: +same normalization (normalize_total to 1e4 then log1p), same decoupler call +(`dc.mt.aucell`, tmin = min gene-set size), same gene-set-size band. It is +generalized by (a) taking all paths/params as arguments, (b) drawing the +pathway universe from `ea.filter_pathways` so the AUCell and Fisher enrichment +paths use the IDENTICAL level-filtered Reactome universe (comparable by +construction), and (c) writing a provenance JSON for the methods section. + +decoupler + anndata are imported lazily; install via the `aucell` extra. +""" + +from __future__ import annotations + +import hashlib +import json +from pathlib import Path + +import numpy as np +import pandas as pd +import scipy.sparse as sp + +from . import ea + + +def _sha256_of_array(X) -> str: + """stable content hash of the niche x gene matrix, for provenance.""" + h = hashlib.sha256() + if sp.issparse(X): + Xc = X.tocsr() + for a in (Xc.data, Xc.indices, Xc.indptr, + np.asarray(Xc.shape, dtype=np.int64)): + h.update(np.ascontiguousarray(a).tobytes()) + else: + h.update(np.ascontiguousarray(np.asarray(X)).tobytes()) + return h.hexdigest() + + +def _build_net(level, gene_filter, min_genes, max_genes): + """long-form pathway->gene table for decoupler, drawn from the SAME + `ea.filter_pathways` universe the Fisher path uses. `source` is the bare + Reactome stId so the resulting score columns are drop-in compatible with + `Net` (which keys pathway nodes by stId).""" + gm = ea.filter_pathways(level=level, gene_filter=gene_filter, + min_genes=min_genes) + if gm.empty: + raise ValueError(f"no pathways for level={level!r} " + f"(gene_filter set: {gene_filter is not None})") + rows = [] + kept = 0 + for r in gm.itertuples(index=False): + g = list(r.genes) + # gene-set-size band: min_genes (also passed to decoupler as tmin) + # and an upper cap to drop very large, non-specific sets. + if min_genes <= len(g) <= max_genes: + kept += 1 + for sym in g: + rows.append((r.stId, sym)) + if not rows: + raise ValueError( + f"no pathways within size band [{min_genes}, {max_genes}] " + f"at level={level!r}") + net = pd.DataFrame(rows, columns=["source", "target"]) + return net, kept + + +def compute_aucell(X, genes, niche_ids, *, level="all", gene_filter=None, + min_genes=3, max_genes=500, normalize=True, + provenance_path=None, verbose=True): + """per-niche AUCell scores against the level-filtered Reactome universe. + + Parameters + ---------- + X : (n_niches, n_genes) array or scipy.sparse + per-niche aggregated (pseudobulk) expression. Raw counts by default; + pass `normalize=False` if already library-normalized + log1p'd. + genes : sequence of str, length n_genes + gene symbols aligned to the columns of X. + niche_ids : sequence, length n_niches + identifier per row of X (used as the output index). + level : {"low","mid","high","all"} + Reactome pathway level (same semantics as ea.filter_pathways). + gene_filter : optional set/list of gene symbols + restrict the pathway universe (e.g. TF genes). None = no restriction. + min_genes, max_genes : int + pathway gene-set size band. min_genes is also passed to decoupler as + `tmin`. Defaults [3, 500] match the validated pipeline. + normalize : bool + if True (default) apply normalize_total(target_sum=1e4) then log1p + in-place, exactly as the validated niche_aucell pipeline. The score + ranking is invariant to a per-niche scalar, but this is kept for + parity with the reference and is logged. + provenance_path : optional path + if given, write a JSON of every parameter (decoupler version, n_up + policy, tmin/tmax, normalization, input sha256, shapes) for the + methods section / reproducibility. + + Returns + ------- + aucell_df : DataFrame (n_niches x n_pathways), index = niche_ids, + columns = Reactome stIds, values = continuous AUCell scores. + """ + try: + import anndata as ad + import decoupler as dc + except ImportError as e: # pragma: no cover - environment-dependent + raise ImportError( + "AUCell requires 'decoupler' and 'anndata'. Install the extra: " + "pip install 'gpath2vec[aucell]'") from e + + genes = list(map(str, genes)) + niche_ids = list(niche_ids) + if sp.issparse(X): + X = X.tocsr() + if X.shape != (len(niche_ids), len(genes)): + raise ValueError( + f"X shape {X.shape} does not match " + f"(n_niches={len(niche_ids)}, n_genes={len(genes)})") + + input_sha = _sha256_of_array(X) + + adata = ad.AnnData( + X=X.tocsr() if sp.issparse(X) else np.asarray(X), + obs=pd.DataFrame({"niche_id": niche_ids}), + var=pd.DataFrame(index=genes), + ) + adata.obs_names = [str(n) for n in niche_ids] + adata.obs_names_make_unique() + + if normalize: + # normalize_total(target_sum=1e4) then log1p, done explicitly so the + # exact transform is auditable and matches the reference pipeline. + if verbose: + print(" normalizing: target_sum=1e4 -> log1p") + Xc = adata.X.tocsr() if sp.issparse(adata.X) else sp.csr_matrix(adata.X) + lib = np.asarray(Xc.sum(axis=1)).ravel() + scaling = 1e4 / np.maximum(lib, 1.0) + Xc = Xc.multiply(scaling[:, None]).tocsr() + Xc.data = np.log1p(Xc.data) + adata.X = Xc + + net, n_path = _build_net(level, gene_filter, min_genes, max_genes) + if verbose: + print(f" pathway net: {net['source'].nunique()} pathways, " + f"{len(net)} pathway-gene edges (level={level}, " + f"size band [{min_genes}, {max_genes}])") + print(" running AUCell (decoupler)...") + + # tmin = minimum gene-set size; n_up left at decoupler default (None -> + # top 5% of ranked features). Both recorded in provenance below. + dc.mt.aucell(adata, net=net, tmin=min_genes, verbose=verbose) + scores = adata.obsm["score_aucell"].copy() + scores.index = niche_ids[: len(scores)] if len(scores) == len(niche_ids) \ + else scores.index + + if provenance_path is not None: + prov = { + "method": "AUCell", + "decoupler_version": dc.__version__, + "anndata_version": ad.__version__, + "n_up": None, + "n_up_policy": ("decoupler default: top 5% of ranked features " + "by magnitude"), + "tmin": int(min_genes), + "tmax_genesize_band": int(max_genes), + "reactome_level": level, + "gene_filter_applied": gene_filter is not None, + "n_gene_filter": (len(set(gene_filter)) + if gene_filter is not None else 0), + "normalize_total_target_sum": 1e4 if normalize else None, + "log1p": bool(normalize), + "pre_normalized_input": (not normalize), + "n_niches": int(len(niche_ids)), + "n_genes": int(len(genes)), + "n_pathways_scored": int(scores.shape[1]), + "input_matrix_sha256": input_sha, + } + Path(provenance_path).write_text(json.dumps(prov, indent=2)) + if verbose: + print(f" wrote AUCell provenance -> {provenance_path}") + + return scores + + +def topk_per_niche(aucell_df, k): + """top-k AUCell pathways per niche -> clusters dict for `Net`. + + aucell_df : DataFrame (n_niches x n_pathways), continuous AUCell scores. + k : pathways to keep per niche (the primary sparsity parameter; ablate + over e.g. {20, 50, 100}). Low k = sparse, sharp per-niche identity but + risk of under-connected niches; high k = denser, smoother, re-approaches + the everything-connects-to-everything dilution. Always ablate + log k. + + Mechanics: each niche row is the aggregated pseudobulk of its constituent + cells; its AUCell vector is that niche's pathway-activity profile. Keeping + the k highest-scoring pathways (dropping zeros) makes the niche->pathway + graph edges, with the RAW AUCell score as edge weight (non-negative, in + [0, ~1], directly usable by the weighted random walker). + + returns: {niche_id: {pathway_stId: aucell_score, ...}} -- identical shape + to the Fisher path's clusters dict, drop-in for the Net builder. + """ + clusters = {} + for niche_id in aucell_df.index: + scores = aucell_df.loc[niche_id] + top = scores.nlargest(k) + top = top[top > 0] # drop zero-AUCell pathways even if within top-k + clusters[str(niche_id)] = top.to_dict() + return clusters \ No newline at end of file diff --git a/gpath2vec/cli.py b/gpath2vec/cli.py index 52f99f7..c608106 100644 --- a/gpath2vec/cli.py +++ b/gpath2vec/cli.py @@ -7,13 +7,17 @@ from datetime import datetime import click +import numpy as np +import pandas as pd +import scipy.sparse as sp -from gpath2vec.ea import enrich, ea_matrix +from gpath2vec.ea import enrich, ea_matrix, aggregate_min_fdr from gpath2vec.net import Net from gpath2vec.embedder import ( PathwayMetapath2vec, SVDEmbedder, SpectralGraphEmbedder, LINEEmbedder, VAEEmbedder ) +from gpath2vec.aucell import compute_aucell, topk_per_niche METHODS = ["metapath2vec", "svd", "spectral", "line", "vae"] @@ -145,15 +149,8 @@ def create_network(enrichment_path, study_id, level, gene_filter, weight, digrap with open(enrichment_path) as f: ea_records = json.load(f) - # build enrichment list for Net - enrichment = [] - seen = set() - for r in ea_records: - stid = r["stId"] - if stid not in seen: - fdr = r.get("fdr_bh", r.get("entities", {}).get("fdr", 1.0)) - enrichment.append({"stId": stid, "entities": {"fdr": fdr}}) - seen.add(stid) + # min-fdr per pathway across all niches + enrichment = aggregate_min_fdr(ea_records) # build cluster dict from ea records clusters = {} @@ -274,13 +271,7 @@ def run_pipeline(genes, gene_sets, output_dir, study_id, level, gene_filter, # network click.echo("step 2: network") - enrichment = [] - seen = set() - for r in records: - stid = r["stId"] - if stid not in seen: - enrichment.append({"stId": stid, "entities": {"fdr": r["fdr_bh"]}}) - seen.add(stid) + enrichment = aggregate_min_fdr(records) clusters = {} for r in records: @@ -315,5 +306,193 @@ def run_pipeline(genes, gene_sets, output_dir, study_id, level, gene_filter, +def _load_gene_filter(path): + """JSON list of gene symbols, or a dict with one list value.""" + obj = json.loads(Path(path).read_text()) + if isinstance(obj, list): + return set(map(str, obj)) + if isinstance(obj, dict): + for k in ("genes", "tf_genes"): + if isinstance(obj.get(k), list): + return set(map(str, obj[k])) + lists = [v for v in obj.values() if isinstance(v, list)] + if len(lists) == 1: + return set(map(str, lists[0])) + raise click.ClickException( + f"--gene-filter {path}: expected a JSON list of gene symbols " + f"or a dict with one list value") + + +def _niche_row(X, i): + """dense 1-D expression vector for niche row i (sparse or dense X).""" + r = X[i] + if sp.issparse(r): + return np.asarray(r.todense()).ravel() + return np.asarray(r).ravel() + + +@cli.command("niche-pipeline") +@click.option("--niche-matrix", required=True, + type=click.Path(exists=True, dir_okay=False), + help="niche x gene matrix: .npz (scipy sparse) or .npy (dense)") +@click.option("--genes", "genes_path", required=True, + type=click.Path(exists=True, dir_okay=False), + help=".npy of gene symbols aligned to matrix columns") +@click.option("--niche-meta", required=True, + type=click.Path(exists=True, dir_okay=False), + help="parquet with column 'niche_id' (+ optional subarray, patient)") +@click.option("--out-dir", required=True, type=click.Path(file_okay=False)) +@click.option("--reactome-dir", default=None, + type=click.Path(exists=True, file_okay=False), + help="reactome cache dir. REQUIRED on offline/HPC nodes; else " + "GPATH2VEC_REACTOME_DIR env or ~/.gpath2vec/cache") +@click.option("--enrichment", "enrichment_method", default="fisher", + show_default=True, type=click.Choice(["fisher", "aucell"])) +@click.option("--reactome-level", default="low", show_default=True, + type=click.Choice(["low", "mid", "high", "all"])) +@click.option("--gene-filter", "gene_filter_file", default=None, + type=click.Path(exists=True, dir_okay=False), + help="optional JSON gene-symbol list restricting the pathway " + "universe (off = full universe)") +@click.option("--min-genes", default=3, show_default=True, type=int) +@click.option("--max-genes", default=500, show_default=True, type=int, + help="aucell: pathway gene-set size upper band") +@click.option("--top-genes", default=100, show_default=True, type=int, + help="fisher: per-niche top-N expressed marker genes") +@click.option("--n-jobs", default=-1, show_default=True, type=int, + help="fisher: parallel enrichment workers (-1 = all cores)") +@click.option("--topk", default=50, show_default=True, type=int, + help="aucell: per-niche pathways kept (ablate 20/50/100)") +@click.option("--pre-normalized/--normalize", default=False, + show_default=True, + help="aucell: --normalize (default) applies " + "normalize_total(1e4)+log1p; --pre-normalized skips it") +@click.option("--dimensions", default=512, show_default=True, type=int) +@click.option("--epochs", default=5, show_default=True, type=int) +@click.option("--lr", default=0.005, show_default=True, type=float) +@click.option("--study-id", default=None) +def niche_pipeline(niche_matrix, genes_path, niche_meta, out_dir, + reactome_dir, enrichment_method, reactome_level, + gene_filter_file, min_genes, max_genes, top_genes, + n_jobs, topk, pre_normalized, dimensions, epochs, lr, + study_id): + """niche expression -> enrichment (fisher|aucell) -> graph -> embeddings. + + fisher: per-niche top-N expressed genes -> Fisher's exact vs Reactome, + sig/notsig metapaths. aucell: full-ranking AUCell per niche -> per-niche + top-k pathways as connectivity-typed edges, simplified metapaths. all + paths are arguments; a provenance JSON is written for reproducibility. + """ + if reactome_dir: + os.environ["GPATH2VEC_REACTOME_DIR"] = os.path.abspath(reactome_dir) + study_id = _make_id(study_id) + os.makedirs(out_dir, exist_ok=True) + + mp = str(niche_matrix) + if mp.endswith(".npz"): + X = sp.load_npz(mp) + elif mp.endswith(".npy"): + X = np.load(mp, allow_pickle=False) + else: + raise click.ClickException( + "--niche-matrix must be .npz (scipy sparse) or .npy (dense)") + genes = np.load(genes_path, allow_pickle=True) + meta = pd.read_parquet(niche_meta) + if "niche_id" not in meta.columns: + raise click.ClickException("--niche-meta must have a 'niche_id' column") + niche_ids = meta["niche_id"].astype(str).tolist() + if X.shape != (len(niche_ids), len(genes)): + raise click.ClickException( + f"shape mismatch: matrix {X.shape} vs " + f"(n_niches={len(niche_ids)}, n_genes={len(genes)})") + gene_filter = _load_gene_filter(gene_filter_file) if gene_filter_file else None + click.echo(f"niche-pipeline: {len(niche_ids)} niches, {len(genes)} genes, " + f"enrichment={enrichment_method}, level={reactome_level}") + + if enrichment_method == "fisher": + gene_sets = {} + for i, nid in enumerate(niche_ids): + row = _niche_row(X, i) + if (row > 0).sum() == 0: + continue + top = np.argsort(row)[-top_genes:] + top = top[row[top] > 0] + gene_sets[str(nid)] = list(map(str, genes[top])) + click.echo(f" {len(gene_sets)} niches with genes -> enrichment") + ea_df = enrich(gene_sets, level=reactome_level, + gene_filter=gene_filter, min_genes=min_genes, + n_jobs=n_jobs) + ea_df.to_parquet(os.path.join(out_dir, "enrichment.parquet")) + clusters = {} + for _, r in ea_df[ea_df.sig_pathway].iterrows(): + clusters.setdefault(str(r["cluster"]), {})[r["stId"]] = \ + 1 - r["fdr_bh"] + enrichment = aggregate_min_fdr(ea_df.to_dict("records")) + net = Net(enrichment=enrichment, id=study_id, digraph=True, + level=reactome_level, gene_filter=gene_filter, + clusters=clusters if clusters else None) + embedder = PathwayMetapath2vec(graph=net.graph, name=study_id, + walks_per_node=10, walk_length=100) + else: # aucell + scores = compute_aucell( + X, genes, niche_ids, level=reactome_level, + gene_filter=gene_filter, min_genes=min_genes, + max_genes=max_genes, normalize=not pre_normalized, + provenance_path=os.path.join(out_dir, "aucell_params.json")) + scores.to_parquet(os.path.join(out_dir, "aucell_scores.parquet")) + clusters = topk_per_niche(scores, topk) + net = Net(enrichment=[], id=study_id, digraph=True, + level=reactome_level, gene_filter=gene_filter, + clusters=clusters if clusters else None, + node_typing="uniform") + embedder = PathwayMetapath2vec( + graph=net.graph, name=study_id, walks_per_node=10, + walk_length=100, + metapaths=[["cluster", "pathway", "pathway"], + ["pathway", "pathway", "pathway"]]) + + g = net.graph + n_clust = sum(1 for _, a in g.nodes(data=True) + if a.get("node_type") == "cluster") + click.echo(f" network: {g.number_of_nodes()} nodes, " + f"{g.number_of_edges()} edges, {n_clust} niches") + net.save(os.path.join(out_dir, "network.pkl")) + + embedder.train_embeddings(walks=embedder.model, dimensions=dimensions, + window_size=5, epochs=epochs, lr=lr) + embeddings = embedder.get_embeddings() + with open(os.path.join(out_dir, "embeddings.pkl"), "wb") as f: + pickle.dump(embeddings, f) + embedder.save_model(os.path.join(out_dir, "model.pt")) + + cluster_emb = {k: v for k, v in embeddings.items() + if isinstance(k, str) and k.startswith("cluster_")} + if cluster_emb: + emb_df = pd.DataFrame(cluster_emb).T + emb_df.index = [i.replace("cluster_", "", 1) for i in emb_df.index] + m = meta.set_index(meta["niche_id"].astype(str)) + for col in ("subarray", "patient"): + if col in meta.columns: + emb_df[col] = emb_df.index.map(m[col].to_dict()) + emb_df.to_parquet( + os.path.join(out_dir, "cluster_embeddings.parquet")) + + prov = { + "study_id": study_id, + "timestamp": datetime.now().isoformat(timespec="seconds"), + "enrichment_method": enrichment_method, + "reactome_level": reactome_level, + "gene_filter_applied": gene_filter is not None, + "min_genes": min_genes, "max_genes": max_genes, + "top_genes_fisher": top_genes, "topk_aucell": topk, + "pre_normalized": pre_normalized, + "dimensions": dimensions, "epochs": epochs, "lr": lr, + "n_niches": len(niche_ids), "n_genes": len(genes), + } + Path(os.path.join(out_dir, "run_provenance.json")).write_text( + json.dumps(prov, indent=2)) + click.echo(f"done. output in {out_dir}") + + def main(): return cli() diff --git a/gpath2vec/compare.py b/gpath2vec/compare.py index 50f076f..9c9f3a5 100644 --- a/gpath2vec/compare.py +++ b/gpath2vec/compare.py @@ -5,20 +5,22 @@ from sklearn.cross_decomposition import CCA -def cca_compare(emb_a, emb_b, n_components=10): +def cca_compare(emb_a, emb_b, n_components=10, seed=42): """ canonical correlation analysis between two embedding matrices. emb_a, emb_b: numpy arrays (n_samples_a x dim), (n_samples_b x dim) n_components: number of canonical components + seed: rng seed for the subsample; local rng, does not touch global state returns: dict with correlations, transformed components, and summary """ n = min(len(emb_a), len(emb_b)) n_components = min(n_components, n, emb_a.shape[1]) - idx_a = np.random.choice(len(emb_a), n, replace=False) - idx_b = np.random.choice(len(emb_b), n, replace=False) + rng = np.random.default_rng(seed) + idx_a = rng.choice(len(emb_a), n, replace=False) + idx_b = rng.choice(len(emb_b), n, replace=False) Xa = emb_a[idx_a] Xb = emb_b[idx_b] @@ -40,11 +42,12 @@ def cca_compare(emb_a, emb_b, n_components=10): } -def pairwise_cca(embeddings_dict, n_components=10): +def pairwise_cca(embeddings_dict, n_components=10, seed=42): """ run cca between all pairs of groups. embeddings_dict: {group_name: numpy array (n x dim)} + seed: rng seed threaded into each cca_compare call for reproducibility returns: dataframe with group_a, group_b, mean_corr, per-component correlations """ names = sorted(embeddings_dict.keys()) @@ -54,7 +57,7 @@ def pairwise_cca(embeddings_dict, n_components=10): for j in range(i + 1, len(names)): a, b = names[i], names[j] res = cca_compare(embeddings_dict[a], embeddings_dict[b], - n_components=n_components) + n_components=n_components, seed=seed) row = { "group_a": a, "group_b": b, diff --git a/gpath2vec/ea.py b/gpath2vec/ea.py index 8fdfb9c..aa16fba 100644 --- a/gpath2vec/ea.py +++ b/gpath2vec/ea.py @@ -89,28 +89,11 @@ def filter_pathways(level="all", gene_filter=None, min_genes=3): return gm -def enrich(gene_sets, level="low", gene_filter=None, min_genes=3): - """ - fisher's exact test per gene-set/pathway pair with fdr correction. - - gene_sets: {name: [genes]} - level: high/mid/low/all - gene_filter: optional set of genes to restrict pathway universe - min_genes: minimum pathway size to include - returns: dataframe with cluster, stId, name, oddsratio, pvalue, fdr_bh - """ - gm = filter_pathways(level=level, gene_filter=gene_filter, min_genes=min_genes) - if gm.empty: - return pd.DataFrame() - - universe = sorted(set(g for genes in gm.genes for g in genes)) - u_set = set(universe) - pw_genes = {row.stId: set(row.genes) & u_set for _, row in gm.iterrows()} - n_pw = len(pw_genes) - n_universe = len(universe) - - results = [] - for cname, cgenes in gene_sets.items(): +def _enrich_chunk(items, u_set, pw_genes, n_universe, n_pw): + """fisher + per-cluster BH-FDR for a chunk of (cname, cgenes). pure; + output order == input order, so parallel == serial bit-identically.""" + out = [] + for cname, cgenes in items: cset = set(cgenes) & u_set c_in = len(cset) c_out = n_universe - c_in @@ -132,13 +115,69 @@ def enrich(gene_sets, level="low", gene_filter=None, min_genes=3): edf["sig_pathway"] = corrected[0] edf["pathway_adjpvalue"] = -np.log10(edf.fdr_bh) / n_pw edf.fillna(value={"pathway_adjpvalue": 1}, inplace=True) - results.append(edf) + out.append(edf) + return out + + +def enrich(gene_sets, level="low", gene_filter=None, min_genes=3, n_jobs=1): + """ + fisher's exact test per gene-set/pathway pair with fdr correction. + + gene_sets: {name: [genes]} + level: high/mid/low/all + gene_filter: optional set of genes to restrict pathway universe + min_genes: minimum pathway size to include + n_jobs: 1 = serial (default, unchanged behavior); >1 or -1 parallelizes + the per-cluster loop. order-preserving, so the result is + bit-identical to the serial path. + returns: dataframe with cluster, stId, name, oddsratio, pvalue, fdr_bh + """ + gm = filter_pathways(level=level, gene_filter=gene_filter, min_genes=min_genes) + if gm.empty: + return pd.DataFrame() + + universe = sorted(set(g for genes in gm.genes for g in genes)) + u_set = set(universe) + pw_genes = {row.stId: set(row.genes) & u_set for _, row in gm.iterrows()} + n_pw = len(pw_genes) + n_universe = len(universe) + + items = list(gene_sets.items()) + if n_jobs == 1 or len(items) <= 1: + results = _enrich_chunk(items, u_set, pw_genes, n_universe, n_pw) + else: + from joblib import Parallel, delayed, cpu_count + nj = cpu_count() if n_jobs == -1 else n_jobs + k = max(1, -(-len(items) // (nj * 4))) # ~4 chunks/worker + chunks = [items[i:i + k] for i in range(0, len(items), k)] + nested = Parallel(n_jobs=n_jobs)( + delayed(_enrich_chunk)(ch, u_set, pw_genes, n_universe, n_pw) + for ch in chunks) + results = [edf for sub in nested for edf in sub] ea_df = pd.concat(results, ignore_index=True) ea_df["name"] = ea_df.stId.map(gm.set_index("stId")["name"]) return ea_df +def aggregate_min_fdr(ea_records): + """min-FDR per pathway across all niches; the correct sig/notsig basis. + + accepts records in either shape: + - {"stId": ..., "fdr_bh": ...} (long-form ea_df rows) + - {"stId": ..., "entities": {"fdr": ...}} (Net.enrichment format) + pandas Series rows (from ea_df.iterrows()) also work via .get(). + returns records in Net.enrichment format. + """ + best = {} + for r in ea_records: + stid = r["stId"] + fdr = r.get("fdr_bh", r.get("entities", {}).get("fdr", 1.0)) + if stid not in best or fdr < best[stid]: + best[stid] = fdr + return [{"stId": s, "entities": {"fdr": f}} for s, f in best.items()] + + def ea_matrix(ea_df, weight="fdr", sig_only=True): """ pivot enrichment results into cluster x pathway matrix. diff --git a/gpath2vec/embedder.py b/gpath2vec/embedder.py index 52ac64c..8186c85 100644 --- a/gpath2vec/embedder.py +++ b/gpath2vec/embedder.py @@ -342,11 +342,16 @@ def _train_order(self, edges, n, dim, neg_dist, order): class PathwayMetapath2vec(Embedder): - def __init__(self, graph, name, walks_per_node=10, walk_length=100): + def __init__(self, graph, name, walks_per_node=10, walk_length=100, + metapaths=None): self.graph = graph self.name = name self.walks_per_node = walks_per_node self.walk_length = walk_length + # None -> the default 6 sig/notsig schemas (unchanged behavior). + # pass an explicit list for connectivity-typed graphs, ex. + # [["cluster","pathway","pathway"], ["pathway","pathway","pathway"]]. + self.metapaths = metapaths self.vocab = {} super().__init__() @@ -369,7 +374,7 @@ def metapath2vec(self): print(f"graph: {self.graph.number_of_nodes()} nodes, {self.graph.number_of_edges()} edges") node_types = nx.get_node_attributes(self.graph, "node_type") - metapaths = [ + metapaths = self.metapaths if self.metapaths else [ ["sig", "sig"], ["sig", "notsig", "sig"], ["notsig", "sig", "notsig"], diff --git a/gpath2vec/net.py b/gpath2vec/net.py index c269cda..309adf6 100644 --- a/gpath2vec/net.py +++ b/gpath2vec/net.py @@ -32,13 +32,17 @@ class Net: def __init__(self, enrichment=None, id="study", digraph=False, induce=False, level="all", gene_filter=None, - clusters=None): + clusters=None, node_typing="sig"): self.enrichment = enrichment or [] self.digraph = digraph self.induce = induce self.id = id self.level = level self.gene_filter = gene_filter + # "sig" (default): pathway nodes typed sig/notsig from enrichment FDR + # (unchanged behavior). "uniform": every pathway node typed "pathway", + # no FDR dependence, for connectivity-typed (ex. AUCell) graphs. + self.node_typing = node_typing self.pathway_relations = fetch_human_hierarchy() self.pathway_stids = set(chain(*self.pathway_relations)) self.graph = self._build_graph() @@ -70,6 +74,22 @@ def _build_graph(self): return G def _set_node_attr(self): + if self.node_typing == "uniform": + # connectivity-typed graph: every node present at this point is a + # pathway node (clusters are added afterwards). no sig/notsig, no + # FDR. simplified metapaths walk on this single "pathway" type. + nx.set_node_attributes(self.graph, + utils.pathway_parent_mappings(), + "parent_pathway") + nx.set_node_attributes(self.graph, + utils.pathway_name_mappings(), + "pathway_name") + nx.set_node_attributes(self.graph, { + n: {"node_type": "pathway", "stId": n} + for n in self.graph.nodes() + }) + return + fdr_vals = self.fdr_values id_keys = self.ea_stids diff --git a/pull_reactome_cache.sh b/pull_reactome_cache.sh new file mode 100755 index 0000000..fa66df9 --- /dev/null +++ b/pull_reactome_cache.sh @@ -0,0 +1,47 @@ +#!/usr/bin/env bash +# pre-stage the Reactome cache gpath2vec needs. + +set -euo pipefail + +CACHE_DIR="${1:-$HOME/.gpath2vec/cache}" +PYTHON="${PYTHON:-python}" + +mkdir -p "$CACHE_DIR" +# resolve to an absolute path so the env var is unambiguous on other nodes +CACHE_DIR="$(cd "$CACHE_DIR" && pwd)" +export GPATH2VEC_REACTOME_DIR="$CACHE_DIR" + +echo "interpreter : $("$PYTHON" -c 'import sys; print(sys.executable)')" +echo "cache dir : $CACHE_DIR" +echo "warming Reactome cache (requires internet)..." + +"$PYTHON" - <<'PY' +# call the package's real fetchers; each populates one cached file. +from gpath2vec import ea, net, utils + +steps = [ + ("ReactomePathways.gmt.zip", ea.gene_mappings), + ("ehld/svgsummary.txt", ea.ehld_stids), + ("homo_sapiens.sbgn.tar.gz", ea.sbgn_stids), + ("ReactomePathwaysRelation.txt", net.fetch_human_hierarchy), + ("ReactomePathways.txt", utils.pathway_name_mappings), + ("events_hierarchy_9606.json", lambda: utils.get_event_hierarchy("9606")), + ("pathway parent mappings", utils.pathway_parent_mappings), +] +for label, fn in steps: + print(f" fetching: {label} ...", flush=True) + fn() +print("all package fetchers completed.") +PY + +echo +echo "cache contents:" +ls -la "$CACHE_DIR" +if [ -z "$(ls -A "$CACHE_DIR" 2>/dev/null)" ]; then + echo "ERROR: cache dir is empty after warm-up" >&2 + exit 1 +fi +echo +echo "done. on the compute node either:" +echo " export GPATH2VEC_REACTOME_DIR='$CACHE_DIR'" +echo " # or pass --reactome-dir '$CACHE_DIR' to gpath2vec niche-pipeline" \ No newline at end of file diff --git a/setup.py b/setup.py index 5eeb83b..94a0983 100644 --- a/setup.py +++ b/setup.py @@ -1,6 +1,6 @@ from setuptools import setup, find_packages -__version__ = "2.0.0" +__version__ = "3.0.0" setup( name="gpath2vec", @@ -13,7 +13,7 @@ author="Nasim Sanati", author_email="nasim@plenary.org", license="MIT", - python_requires=">=3.9", + python_requires=">=3.14", install_requires=[ "torch", "networkx", @@ -25,6 +25,10 @@ "statsmodels", "scikit-learn", ], + extras_require={ + "test": ["pytest"], + "aucell": ["decoupler>=2.1", "anndata"], + }, entry_points={ "console_scripts": [ "gpath2vec=gpath2vec.cli:main", @@ -36,11 +40,6 @@ "Intended Audience :: Science/Research", "Topic :: Scientific/Engineering :: Bio-Informatics", "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.9", - "Programming Language :: Python :: 3.10", - "Programming Language :: Python :: 3.11", - "Programming Language :: Python :: 3.12", - "Programming Language :: Python :: 3.13", "Programming Language :: Python :: 3.14", "License :: OSI Approved :: MIT License", "Operating System :: OS Independent", diff --git a/tests/test_ea_aggregation.py b/tests/test_ea_aggregation.py new file mode 100644 index 0000000..3ec6ae1 --- /dev/null +++ b/tests/test_ea_aggregation.py @@ -0,0 +1,72 @@ +"""unit tests for the min-fdr aggregator (F1 dedup fix).""" + +import pytest + +from gpath2vec.ea import aggregate_min_fdr + + +def test_min_fdr_picked_long_form(): + # niche A has fdr=0.5 for pathway X, niche B has fdr=0.001 for X + records = [ + {"stId": "R-HSA-X", "fdr_bh": 0.5, "cluster": "A"}, + {"stId": "R-HSA-X", "fdr_bh": 0.001, "cluster": "B"}, + ] + out = aggregate_min_fdr(records) + assert len(out) == 1 + assert out[0]["stId"] == "R-HSA-X" + assert out[0]["entities"]["fdr"] == pytest.approx(0.001) + + +def test_min_fdr_picked_long_form_reverse_order(): + # iteration order should not matter + records = [ + {"stId": "R-HSA-X", "fdr_bh": 0.001, "cluster": "B"}, + {"stId": "R-HSA-X", "fdr_bh": 0.5, "cluster": "A"}, + ] + out = aggregate_min_fdr(records) + assert out[0]["entities"]["fdr"] == pytest.approx(0.001) + + +def test_min_fdr_nested_form(): + # Net.enrichment shape with nested entities.fdr + records = [ + {"stId": "R-HSA-Y", "entities": {"fdr": 0.3}}, + {"stId": "R-HSA-Y", "entities": {"fdr": 0.01}}, + ] + out = aggregate_min_fdr(records) + assert len(out) == 1 + assert out[0]["entities"]["fdr"] == pytest.approx(0.01) + + +def test_multiple_pathways_independent(): + records = [ + {"stId": "R-HSA-X", "fdr_bh": 0.5}, + {"stId": "R-HSA-Y", "fdr_bh": 0.001}, + {"stId": "R-HSA-X", "fdr_bh": 0.05}, + {"stId": "R-HSA-Y", "fdr_bh": 0.2}, + ] + out = {r["stId"]: r["entities"]["fdr"] for r in aggregate_min_fdr(records)} + assert out["R-HSA-X"] == pytest.approx(0.05) + assert out["R-HSA-Y"] == pytest.approx(0.001) + + +def test_empty_input(): + assert aggregate_min_fdr([]) == [] + + +def test_missing_fdr_defaults_to_one(): + # a record with neither fdr_bh nor entities.fdr should default to 1.0 (notsig) + records = [{"stId": "R-HSA-Z"}] + out = aggregate_min_fdr(records) + assert out[0]["entities"]["fdr"] == 1.0 + + +def test_pandas_series_compatible(): + # ea_df.iterrows() yields (index, Series); Series.get(key, default) works. + import pandas as pd + df = pd.DataFrame([ + {"stId": "R-HSA-X", "fdr_bh": 0.5}, + {"stId": "R-HSA-X", "fdr_bh": 0.01}, + ]) + out = aggregate_min_fdr([row for _, row in df.iterrows()]) + assert out[0]["entities"]["fdr"] == pytest.approx(0.01) \ No newline at end of file diff --git a/tests/test_metapath_topology.py b/tests/test_metapath_topology.py new file mode 100644 index 0000000..c47d4f0 --- /dev/null +++ b/tests/test_metapath_topology.py @@ -0,0 +1,111 @@ +"""F2 unit tests: cluster-edge directionality in the pathway graph. + +verifies the topology assumption that embedder.metapath2vec relies on at +embedder.py:391, where `self.graph.neighbors(current)` is filtered by +node_type at line 396. for a DiGraph this is successors only, so a +`cluster -> pathway` edge is invisible from the pathway side. + +these tests document the F2 bug and the three candidate fixes so any +silent change to net.add_clusters edge semantics or Net.digraph default +trips a regression here. + +run: + pytest tests/test_metapath_topology.py -v +""" + +import networkx as nx + + +def _build_pathway_hierarchy(graph_cls): + """tiny stand-in for the Reactome hierarchy: P1 -> P2 -> P3.""" + G = graph_cls() + G.add_node("P1", node_type="sig") + G.add_node("P2", node_type="sig") + G.add_node("P3", node_type="notsig") + G.add_edge("P1", "P2") + G.add_edge("P2", "P3") + return G + + +def test_digraph_cluster_unreachable_from_pathway(): + """current behavior: in DiGraph mode, the cluster anchors a pathway via + `cluster -> pathway`, so P.neighbors() (which returns successors only) + never yields the cluster. this is the F2 bug paper ยง4.5 inadvertently + claims is fixed. + """ + G = _build_pathway_hierarchy(nx.DiGraph) + G.add_node("cluster_A", node_type="cluster") + G.add_edge("cluster_A", "P1", weight=0.9) + assert "cluster_A" not in set(G.neighbors("P1")) + assert "cluster_A" not in set(G.neighbors("P2")) + + +def test_undirected_graph_cluster_reachable(): + """option A: undirected graph. cluster is reachable from P1, but P1's + hierarchical parents are now also neighbors (loss of direction). + """ + G = _build_pathway_hierarchy(nx.Graph) + G.add_node("cluster_A", node_type="cluster") + G.add_edge("cluster_A", "P1", weight=0.9) + p1_neighbors = set(G.neighbors("P1")) + assert "cluster_A" in p1_neighbors + + +def test_bidirectional_cluster_edges_reachable(): + """option B: keep hierarchy directional, but add the reverse + `pathway -> cluster` edge. P1.neighbors() yields the cluster plus + P1's children. hierarchical parents stay invisible from the child. + """ + G = _build_pathway_hierarchy(nx.DiGraph) + G.add_node("cluster_A", node_type="cluster") + G.add_edge("cluster_A", "P1", weight=0.9) + G.add_edge("P1", "cluster_A", weight=0.9) # F2(b) patch + p1_neighbors = set(G.neighbors("P1")) + assert "cluster_A" in p1_neighbors + assert "P2" in p1_neighbors + # cluster only reachable from the pathways it anchors + assert "cluster_A" not in set(G.neighbors("P3")) + + +def test_add_clusters_only_writes_one_direction(): + """mirrors net.Net.add_clusters (net.py:99-111) on a hand-built graph + to confirm we're testing the same edge shape add_clusters produces. + this is the integration-side regression check: if add_clusters ever + changes to bidirectional, this test breaks and forces a re-read of F2. + """ + G = nx.DiGraph() + G.add_node("P1") + G.add_node("P2") + clusters = {"A": {"P1": 0.9, "P2": 0.5}} + for cname, pathway_weights in clusters.items(): + node_id = f"cluster_{cname}" + G.add_node(node_id, node_type="cluster", cluster=cname) + for stid, weight in pathway_weights.items(): + if stid in G: + G.add_edge(node_id, stid, weight=weight) + assert G.has_edge("cluster_A", "P1") + assert G.has_edge("cluster_A", "P2") + assert not G.has_edge("P1", "cluster_A") + assert not G.has_edge("P2", "cluster_A") + + +def test_metapath_filter_silently_falls_back_when_cluster_unreachable(): + """end-to-end mini-regression on the embedder filter logic at + embedder.py:396-401. when walker is at P1 with target_type='cluster', + `typed = [n for n in neighbors if node_types[n] == 'cluster']` is + empty in digraph mode; the walker silently falls back to the full + neighbor list. this test pins that fallback behavior so the F2 + decision can change it deliberately. + """ + G = _build_pathway_hierarchy(nx.DiGraph) + G.add_node("cluster_A", node_type="cluster") + G.add_edge("cluster_A", "P1", weight=0.9) + node_types = nx.get_node_attributes(G, "node_type") + + current = "P1" + target_type = "cluster" + neighbors = list(G.neighbors(current)) + typed = [n for n in neighbors if node_types.get(n, "unknown") == target_type] + + assert typed == [] # cluster filter yields nothing in digraph mode + assert neighbors == ["P2"] # silent fallback would step toward P2, not cluster_A \ No newline at end of file From ffb58b75cd53cf2e609346d5e1749073096c69dd Mon Sep 17 00:00:00 2001 From: teslajoy Date: Tue, 19 May 2026 06:59:28 -0700 Subject: [PATCH 2/6] seed --- README.md | 4 ++++ gpath2vec/cli.py | 39 ++++++++++++++++++++++++++------------- gpath2vec/embedder.py | 28 +++++++++++++++++++++++----- 3 files changed, 53 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index 0d416be..43c57a1 100644 --- a/README.md +++ b/README.md @@ -249,6 +249,10 @@ gpath2vec niche-pipeline ... --reactome-dir /shared/path/reactome_cache # or: export GPATH2VEC_REACTOME_DIR=/shared/path/reactome_cache ``` +## reproducibility + +all stochastic embedders (metapath2vec, line, vae) take a `seed` (default 1234) that pins the python, numpy and torch rngs, so embeddings are bit-reproducible run to run. the seed is re-applied before training (independent of walk-generation rng) and recorded in `run_provenance.json`. svd and spectral are deterministic by construction. cli: `--seed`. + ## todo - **edge2vec**: edge-type transition-matrix biased walks as an embedding method. diff --git a/gpath2vec/cli.py b/gpath2vec/cli.py index c608106..5c406f6 100644 --- a/gpath2vec/cli.py +++ b/gpath2vec/cli.py @@ -22,14 +22,16 @@ METHODS = ["metapath2vec", "svd", "spectral", "line", "vae"] -def _run_embedder(method, graph, ea_mat, study_id, dimensions, epochs, lr, window): +def _run_embedder(method, graph, ea_mat, study_id, dimensions, epochs, lr, + window, seed=1234): """run the selected embedding method, return embeddings dict.""" if method == "metapath2vec": - embedder = PathwayMetapath2vec(graph=graph, name=study_id) + embedder = PathwayMetapath2vec(graph=graph, name=study_id, seed=seed) walks = embedder.model click.echo(f"{len(walks)} random walks") embedder.train_embeddings(walks=walks, dimensions=dimensions, - window_size=window, epochs=epochs, lr=lr) + window_size=window, epochs=epochs, lr=lr, + seed=seed) elif method == "svd": if ea_mat is None: raise click.UsageError("svd requires an ea matrix (run enrichment first)") @@ -37,11 +39,13 @@ def _run_embedder(method, graph, ea_mat, study_id, dimensions, epochs, lr, windo elif method == "spectral": embedder = SpectralGraphEmbedder(graph, dimensions=dimensions) elif method == "line": - embedder = LINEEmbedder(graph, dimensions=dimensions, epochs=epochs, lr=lr) + embedder = LINEEmbedder(graph, dimensions=dimensions, epochs=epochs, + lr=lr, seed=seed) elif method == "vae": if ea_mat is None: raise click.UsageError("vae requires an ea matrix (run enrichment first)") - embedder = VAEEmbedder(ea_mat, dimensions=dimensions, epochs=epochs, lr=lr) + embedder = VAEEmbedder(ea_mat, dimensions=dimensions, epochs=epochs, + lr=lr, seed=seed) else: raise click.UsageError(f"unknown method: {method}") @@ -188,10 +192,13 @@ def create_network(enrichment_path, study_id, level, gene_filter, weight, digrap @click.option("--window", default=5, show_default=True) @click.option("--epochs", default=10, show_default=True) @click.option("--lr", default=0.005, show_default=True) +@click.option("--seed", default=1234, show_default=True, type=int, + help="rng seed for reproducible embeddings") @click.option("--out-path", required=True, help="output path (pickle)") @click.option("--save-model", required=False, help="optional model save path") def generate_embeddings(network_path, ea_matrix_path, method, study_id, - dimensions, window, epochs, lr, out_path, save_model): + dimensions, window, epochs, lr, seed, out_path, + save_model): """generate embeddings from network (metapath2vec, svd, spectral, line)""" import pandas as pd @@ -206,7 +213,7 @@ def generate_embeddings(network_path, ea_matrix_path, method, study_id, ea_mat = pd.read_csv(ea_matrix_path, index_col=0) embedder = _run_embedder(method, net.graph, ea_mat, study_id, - dimensions, epochs, lr, window) + dimensions, epochs, lr, window, seed) embeddings = embedder.get_embeddings() click.echo(f"embeddings for {len(embeddings)} nodes") @@ -239,8 +246,10 @@ def generate_embeddings(network_path, ea_matrix_path, method, study_id, @click.option("--window", default=5, show_default=True) @click.option("--epochs", default=10, show_default=True) @click.option("--lr", default=0.005, show_default=True) +@click.option("--seed", default=1234, show_default=True, type=int, + help="rng seed for reproducible embeddings") def run_pipeline(genes, gene_sets, output_dir, study_id, level, gene_filter, - weight, method, dimensions, window, epochs, lr): + weight, method, dimensions, window, epochs, lr, seed): """run the full pipeline: enrichment -> network -> embeddings""" study_id = _make_id(study_id) os.makedirs(output_dir, exist_ok=True) @@ -291,7 +300,7 @@ def run_pipeline(genes, gene_sets, output_dir, study_id, level, gene_filter, # embeddings click.echo(f"step 3: embeddings ({method})") embedder = _run_embedder(method, net.graph, matrix, study_id, - dimensions, epochs, lr, window) + dimensions, epochs, lr, window, seed) embeddings = embedder.get_embeddings() with open(embeddings_path, "wb") as f: pickle.dump(embeddings, f) @@ -370,12 +379,14 @@ def _niche_row(X, i): @click.option("--dimensions", default=512, show_default=True, type=int) @click.option("--epochs", default=5, show_default=True, type=int) @click.option("--lr", default=0.005, show_default=True, type=float) +@click.option("--seed", default=1234, show_default=True, type=int, + help="rng seed for reproducible walks + embeddings") @click.option("--study-id", default=None) def niche_pipeline(niche_matrix, genes_path, niche_meta, out_dir, reactome_dir, enrichment_method, reactome_level, gene_filter_file, min_genes, max_genes, top_genes, n_jobs, topk, pre_normalized, dimensions, epochs, lr, - study_id): + seed, study_id): """niche expression -> enrichment (fisher|aucell) -> graph -> embeddings. fisher: per-niche top-N expressed genes -> Fisher's exact vs Reactome, @@ -432,7 +443,8 @@ def niche_pipeline(niche_matrix, genes_path, niche_meta, out_dir, level=reactome_level, gene_filter=gene_filter, clusters=clusters if clusters else None) embedder = PathwayMetapath2vec(graph=net.graph, name=study_id, - walks_per_node=10, walk_length=100) + walks_per_node=10, walk_length=100, + seed=seed) else: # aucell scores = compute_aucell( X, genes, niche_ids, level=reactome_level, @@ -447,7 +459,7 @@ def niche_pipeline(niche_matrix, genes_path, niche_meta, out_dir, node_typing="uniform") embedder = PathwayMetapath2vec( graph=net.graph, name=study_id, walks_per_node=10, - walk_length=100, + walk_length=100, seed=seed, metapaths=[["cluster", "pathway", "pathway"], ["pathway", "pathway", "pathway"]]) @@ -459,7 +471,7 @@ def niche_pipeline(niche_matrix, genes_path, niche_meta, out_dir, net.save(os.path.join(out_dir, "network.pkl")) embedder.train_embeddings(walks=embedder.model, dimensions=dimensions, - window_size=5, epochs=epochs, lr=lr) + window_size=5, epochs=epochs, lr=lr, seed=seed) embeddings = embedder.get_embeddings() with open(os.path.join(out_dir, "embeddings.pkl"), "wb") as f: pickle.dump(embeddings, f) @@ -487,6 +499,7 @@ def niche_pipeline(niche_matrix, genes_path, niche_meta, out_dir, "top_genes_fisher": top_genes, "topk_aucell": topk, "pre_normalized": pre_normalized, "dimensions": dimensions, "epochs": epochs, "lr": lr, + "seed": seed, "n_niches": len(niche_ids), "n_genes": len(genes), } Path(os.path.join(out_dir, "run_provenance.json")).write_text( diff --git a/gpath2vec/embedder.py b/gpath2vec/embedder.py index 8186c85..cf2a303 100644 --- a/gpath2vec/embedder.py +++ b/gpath2vec/embedder.py @@ -9,6 +9,13 @@ from sklearn.manifold import SpectralEmbedding as SklearnSpectral +def _set_seed(seed): + # pin python, numpy and torch global rngs so embeddings are bit-reproducible. + random.seed(seed) + np.random.seed(seed) + torch.manual_seed(seed) + + class Embedder: """base class for all embedding methods.""" @@ -45,18 +52,20 @@ class VAEEmbedder(Embedder): """ def __init__(self, ea_matrix, dimensions=512, hidden_dim=256, - epochs=50, lr=0.001, beta=1.0): + epochs=50, lr=0.001, beta=1.0, seed=1234): self.ea_matrix = ea_matrix self.dimensions = dimensions self.hidden_dim = hidden_dim self.epochs = epochs self.lr = lr self.beta = beta + self.seed = seed self.latent_means = {} self.latent_vars = {} super().__init__() def method(self): + _set_seed(self.seed) input_dim = self.ea_matrix.shape[1] X = torch.FloatTensor(self.ea_matrix.values) names = list(self.ea_matrix.index) @@ -229,16 +238,19 @@ class LINEEmbedder(Embedder): neg_samples: negative samples per positive """ - def __init__(self, graph, dimensions=512, epochs=15, lr=0.005, neg_samples=5): + def __init__(self, graph, dimensions=512, epochs=15, lr=0.005, + neg_samples=5, seed=1234): self.graph = graph self.dimensions = dimensions self.epochs = epochs self.lr = lr self.neg_samples = neg_samples + self.seed = seed self.vocab = {} super().__init__() def method(self): + _set_seed(self.seed) nodes = list(self.graph.nodes()) node_to_ix = {n: i for i, n in enumerate(nodes)} self.vocab = node_to_ix @@ -343,11 +355,12 @@ def _train_order(self, edges, n, dim, neg_dist, order): class PathwayMetapath2vec(Embedder): def __init__(self, graph, name, walks_per_node=10, walk_length=100, - metapaths=None): + metapaths=None, seed=1234): self.graph = graph self.name = name self.walks_per_node = walks_per_node self.walk_length = walk_length + self.seed = seed # None -> the default 6 sig/notsig schemas (unchanged behavior). # pass an explicit list for connectivity-typed graphs, ex. # [["cluster","pathway","pathway"], ["pathway","pathway","pathway"]]. @@ -384,7 +397,7 @@ def metapath2vec(self): ] walks = [] - random.seed(1234) + _set_seed(self.seed) for _ in range(self.walks_per_node): for start_node in self.graph.nodes(): @@ -414,12 +427,17 @@ def metapath2vec(self): return walks def train_embeddings(self, walks, dimensions=512, window_size=5, epochs=10, - lr=0.025, batch_size=1024, n_neg=5, walks_per_chunk=5000): + lr=0.025, batch_size=1024, n_neg=5, walks_per_chunk=5000, + seed=None): """ skip-gram with negative sampling. streams (target, context) pairs per chunk of walks instead of materializing every pair upfront, so memory stays O(chunk) rather than O(all pairs). + + seed defaults to the value passed at construction; re-seeding here makes + training independent of how much rng was consumed during walk generation. """ + _set_seed(self.seed if seed is None else seed) word_to_ix = {} for walk in walks: for word in walk: From ea633176763509b2cd4511bc0949e1cda9f614e5 Mon Sep 17 00:00:00 2001 From: teslajoy Date: Wed, 20 May 2026 09:43:27 -0700 Subject: [PATCH 3/6] generic --meta join; upgrade end2end with parallel + reactome-dir + provenance - new _join_meta helper joins any non-key meta columns into cluster_embeddings; no hardcoded column names. used by both niche-pipeline and end2end. - niche-pipeline: --niche-meta-key (default niche_id) replaces the hardcoded subarray/patient lookup; all non-key meta columns are now joined. - end2end: --n-jobs (parallel enrich), --reactome-dir (offline/hpc), --meta + --meta-key (generic schema-free join), run_provenance.json, cluster_embeddings.parquet output, enrichment json -> parquet. - verified end2end --gene-sets with arbitrary meta column names round-trips values into cluster_embeddings.parquet (no schema assumptions remain). --- gpath2vec/cli.py | 91 ++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 77 insertions(+), 14 deletions(-) diff --git a/gpath2vec/cli.py b/gpath2vec/cli.py index 5c406f6..006bd26 100644 --- a/gpath2vec/cli.py +++ b/gpath2vec/cli.py @@ -248,17 +248,33 @@ def generate_embeddings(network_path, ea_matrix_path, method, study_id, @click.option("--lr", default=0.005, show_default=True) @click.option("--seed", default=1234, show_default=True, type=int, help="rng seed for reproducible embeddings") +@click.option("--n-jobs", default=1, show_default=True, type=int, + help="parallel enrichment workers (order-preserving)") +@click.option("--reactome-dir", default=None, + help="reactome cache dir (sets GPATH2VEC_REACTOME_DIR)") +@click.option("--meta", "meta_path", default=None, + help="optional parquet to join into cluster embeddings; " + "all non-key columns are joined") +@click.option("--meta-key", default="cluster", show_default=True, + help="key column in --meta matching cluster names") def run_pipeline(genes, gene_sets, output_dir, study_id, level, gene_filter, - weight, method, dimensions, window, epochs, lr, seed): + weight, method, dimensions, window, epochs, lr, seed, + n_jobs, reactome_dir, meta_path, meta_key): """run the full pipeline: enrichment -> network -> embeddings""" + if reactome_dir: + os.environ["GPATH2VEC_REACTOME_DIR"] = os.path.abspath(reactome_dir) study_id = _make_id(study_id) os.makedirs(output_dir, exist_ok=True) - enrichment_path = os.path.join(output_dir, f"{study_id}_enrichment.json") + enrichment_path = os.path.join(output_dir, f"{study_id}_enrichment.parquet") matrix_path = os.path.join(output_dir, f"{study_id}_ea_matrix.csv") network_path = os.path.join(output_dir, f"{study_id}_network.pkl") embeddings_path = os.path.join(output_dir, f"{study_id}_embeddings.pkl") model_path = os.path.join(output_dir, f"{study_id}_model.pt") + cluster_emb_path = os.path.join( + output_dir, f"{study_id}_cluster_embeddings.parquet") + provenance_path = os.path.join( + output_dir, f"{study_id}_run_provenance.json") if gene_sets: gs = _parse_gene_sets(gene_sets) @@ -271,10 +287,9 @@ def run_pipeline(genes, gene_sets, output_dir, study_id, level, gene_filter, # enrichment click.echo("step 1: enrichment") - ea_df = enrich(gs, level=level, gene_filter=gf) + ea_df = enrich(gs, level=level, gene_filter=gf, n_jobs=n_jobs) + ea_df.to_parquet(enrichment_path) records = ea_df.to_dict(orient="records") - with open(enrichment_path, "w") as f: - json.dump(records, f, indent=2) matrix = ea_matrix(ea_df, weight=weight) matrix.to_csv(matrix_path) @@ -306,12 +321,38 @@ def run_pipeline(genes, gene_sets, output_dir, study_id, level, gene_filter, pickle.dump(embeddings, f) embedder.save_model(model_path) + cluster_emb = {k: v for k, v in embeddings.items() + if isinstance(k, str) and k.startswith("cluster_")} + if cluster_emb: + emb_df = pd.DataFrame(cluster_emb).T + emb_df.index = [i.replace("cluster_", "", 1) for i in emb_df.index] + if meta_path: + emb_df = _join_meta(emb_df, pd.read_parquet(meta_path), meta_key) + emb_df.to_parquet(cluster_emb_path) + + prov = { + "study_id": study_id, + "timestamp": datetime.now().isoformat(timespec="seconds"), + "command": "end2end", + "level": level, "weight": weight, "method": method, + "dimensions": dimensions, "window": window, "epochs": epochs, + "lr": lr, "seed": seed, "n_jobs": n_jobs, + "gene_filter_applied": gf is not None, + "meta_applied": meta_path is not None, + "meta_key": meta_key if meta_path else None, + "n_gene_sets": len(gs), + } + Path(provenance_path).write_text(json.dumps(prov, indent=2)) + click.echo(f"done. output in {output_dir}") click.echo(f" enrichment: {enrichment_path}") click.echo(f" ea matrix: {matrix_path}") click.echo(f" network: {network_path}") click.echo(f" embeddings: {embeddings_path}") + if cluster_emb: + click.echo(f" cluster embeddings: {cluster_emb_path}") click.echo(f" model: {model_path}") + click.echo(f" provenance: {provenance_path}") @@ -340,6 +381,24 @@ def _niche_row(X, i): return np.asarray(r).ravel() +def _join_meta(emb_df, meta, key): + """join all non-key columns of `meta` into `emb_df` on its string index. + no column names are hardcoded; whatever columns the user supplies in + `meta` (besides `key`) end up as columns of `emb_df`. silent no-op if + `meta` is None.""" + if meta is None: + return emb_df + if key not in meta.columns: + raise click.ClickException( + f"meta key {key!r} not in meta columns: {list(meta.columns)}") + m = meta.copy() + m[key] = m[key].astype(str) + m = m.drop_duplicates(subset=[key]).set_index(key) + for col in m.columns: + emb_df[col] = emb_df.index.map(m[col].to_dict()) + return emb_df + + @cli.command("niche-pipeline") @click.option("--niche-matrix", required=True, type=click.Path(exists=True, dir_okay=False), @@ -349,7 +408,9 @@ def _niche_row(X, i): help=".npy of gene symbols aligned to matrix columns") @click.option("--niche-meta", required=True, type=click.Path(exists=True, dir_okay=False), - help="parquet with column 'niche_id' (+ optional subarray, patient)") + help="parquet with a key column (default 'niche_id', set with " + "--niche-meta-key); any other columns are joined into the " + "cluster embeddings parquet") @click.option("--out-dir", required=True, type=click.Path(file_okay=False)) @click.option("--reactome-dir", default=None, type=click.Path(exists=True, file_okay=False), @@ -381,12 +442,14 @@ def _niche_row(X, i): @click.option("--lr", default=0.005, show_default=True, type=float) @click.option("--seed", default=1234, show_default=True, type=int, help="rng seed for reproducible walks + embeddings") +@click.option("--niche-meta-key", default="niche_id", show_default=True, + help="key column in --niche-meta matching cluster names") @click.option("--study-id", default=None) def niche_pipeline(niche_matrix, genes_path, niche_meta, out_dir, reactome_dir, enrichment_method, reactome_level, gene_filter_file, min_genes, max_genes, top_genes, n_jobs, topk, pre_normalized, dimensions, epochs, lr, - seed, study_id): + seed, niche_meta_key, study_id): """niche expression -> enrichment (fisher|aucell) -> graph -> embeddings. fisher: per-niche top-N expressed genes -> Fisher's exact vs Reactome, @@ -409,9 +472,11 @@ def niche_pipeline(niche_matrix, genes_path, niche_meta, out_dir, "--niche-matrix must be .npz (scipy sparse) or .npy (dense)") genes = np.load(genes_path, allow_pickle=True) meta = pd.read_parquet(niche_meta) - if "niche_id" not in meta.columns: - raise click.ClickException("--niche-meta must have a 'niche_id' column") - niche_ids = meta["niche_id"].astype(str).tolist() + if niche_meta_key not in meta.columns: + raise click.ClickException( + f"--niche-meta must have a {niche_meta_key!r} column " + f"(set with --niche-meta-key)") + niche_ids = meta[niche_meta_key].astype(str).tolist() if X.shape != (len(niche_ids), len(genes)): raise click.ClickException( f"shape mismatch: matrix {X.shape} vs " @@ -482,10 +547,7 @@ def niche_pipeline(niche_matrix, genes_path, niche_meta, out_dir, if cluster_emb: emb_df = pd.DataFrame(cluster_emb).T emb_df.index = [i.replace("cluster_", "", 1) for i in emb_df.index] - m = meta.set_index(meta["niche_id"].astype(str)) - for col in ("subarray", "patient"): - if col in meta.columns: - emb_df[col] = emb_df.index.map(m[col].to_dict()) + emb_df = _join_meta(emb_df, meta, niche_meta_key) emb_df.to_parquet( os.path.join(out_dir, "cluster_embeddings.parquet")) @@ -500,6 +562,7 @@ def niche_pipeline(niche_matrix, genes_path, niche_meta, out_dir, "pre_normalized": pre_normalized, "dimensions": dimensions, "epochs": epochs, "lr": lr, "seed": seed, + "niche_meta_key": niche_meta_key, "n_niches": len(niche_ids), "n_genes": len(genes), } Path(os.path.join(out_dir, "run_provenance.json")).write_text( From 39b3fcb88f6fab4ea6b90b6ed503f1badbf9f025 Mon Sep 17 00:00:00 2001 From: teslajoy Date: Wed, 20 May 2026 10:05:11 -0700 Subject: [PATCH 4/6] update setup --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 94a0983..4b3de53 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ author="Nasim Sanati", author_email="nasim@plenary.org", license="MIT", - python_requires=">=3.14", + python_requires=">=3.9", install_requires=[ "torch", "networkx", From 74e39f1e9518aca9e53d89ae4c03fe90c8248e2b Mon Sep 17 00:00:00 2001 From: teslajoy Date: Thu, 21 May 2026 07:24:19 -0700 Subject: [PATCH 5/6] bug fix --- gpath2vec/cli.py | 75 +++++++++++++++++++++++++++++++----------------- 1 file changed, 48 insertions(+), 27 deletions(-) diff --git a/gpath2vec/cli.py b/gpath2vec/cli.py index 006bd26..12ac408 100644 --- a/gpath2vec/cli.py +++ b/gpath2vec/cli.py @@ -105,7 +105,7 @@ def perform_enrichment(genes, gene_sets, level, gene_filter, weight, min_genes, gene_list = _parse_genes(genes) gs = {"study": gene_list} - gf = _parse_genes(gene_filter) if gene_filter else None + gf = _load_gene_filter(gene_filter) if gene_filter else None click.echo(f"enrichment: {len(gs)} gene sets, level={level}") ea_df = enrich(gs, level=level, gene_filter=gf, min_genes=min_genes) @@ -148,7 +148,7 @@ def create_network(enrichment_path, study_id, level, gene_filter, weight, digrap """create a pathway network from enrichment results""" assert Path(enrichment_path).is_file(), f"{enrichment_path} not found" study_id = _make_id(study_id) - gf = _parse_genes(gene_filter) if gene_filter else None + gf = _load_gene_filter(gene_filter) if gene_filter else None with open(enrichment_path) as f: ea_records = json.load(f) @@ -283,29 +283,31 @@ def run_pipeline(genes, gene_sets, output_dir, study_id, level, gene_filter, else: raise click.UsageError("provide --genes or --gene-sets") - gf = _parse_genes(gene_filter) if gene_filter else None + gf = _load_gene_filter(gene_filter) if gene_filter else None # enrichment click.echo("step 1: enrichment") ea_df = enrich(gs, level=level, gene_filter=gf, n_jobs=n_jobs) ea_df.to_parquet(enrichment_path) - records = ea_df.to_dict(orient="records") matrix = ea_matrix(ea_df, weight=weight) matrix.to_csv(matrix_path) # network click.echo("step 2: network") - enrichment = aggregate_min_fdr(records) - + # vectorized; avoids materializing ea_df as a python list of dicts + sig_df = ea_df[ea_df["sig_pathway"].astype(bool)] + if weight == "fdr": + sig_df = sig_df.assign(_w=1 - sig_df["fdr_bh"]) + else: + sig_df = sig_df.assign(_w=sig_df["oddsratio"]) clusters = {} - for r in records: - cname = r.get("cluster", "study") - if not r.get("sig_pathway", False): - continue - if cname not in clusters: - clusters[cname] = {} - val = (1 - r["fdr_bh"]) if weight == "fdr" else r.get("oddsratio", 1.0) - clusters[cname][r["stId"]] = val + for r in sig_df[["cluster", "stId", "_w"]].itertuples(index=False): + clusters.setdefault(str(r.cluster), {})[str(r.stId)] = float(r._w) + + # same contract as aggregate_min_fdr: one entry per pathway with min FDR + pw_min = ea_df.groupby("stId", as_index=False)["fdr_bh"].min() + enrichment = [{"stId": str(r.stId), "entities": {"fdr": float(r.fdr_bh)}} + for r in pw_min.itertuples(index=False)] net = Net(enrichment=enrichment, id=study_id, digraph=True, induce=False, level=level, gene_filter=gf, @@ -357,20 +359,39 @@ def run_pipeline(genes, gene_sets, output_dir, study_id, level, gene_filter, def _load_gene_filter(path): - """JSON list of gene symbols, or a dict with one list value.""" - obj = json.loads(Path(path).read_text()) - if isinstance(obj, list): - return set(map(str, obj)) - if isinstance(obj, dict): - for k in ("genes", "tf_genes"): - if isinstance(obj.get(k), list): - return set(map(str, obj[k])) - lists = [v for v in obj.values() if isinstance(v, list)] - if len(lists) == 1: - return set(map(str, lists[0])) + """robust gene-filter loader. accepts: + - JSON list of gene symbols + - JSON dict with a 'genes'/'tf_genes' key, or one list-valued key + - plain text, one gene symbol per line (comments after '#' ignored) + - single comma-separated string in a file + returns set[str]. + """ + text = Path(path).read_text() + try: + obj = json.loads(text) + if isinstance(obj, list): + return set(map(str, obj)) + if isinstance(obj, dict): + for k in ("genes", "tf_genes"): + if isinstance(obj.get(k), list): + return set(map(str, obj[k])) + lists = [v for v in obj.values() if isinstance(v, list)] + if len(lists) == 1: + return set(map(str, lists[0])) + except json.JSONDecodeError: + pass + # fall back: plain text (line per gene or comma-separated) + lines = [ln.split("#", 1)[0].strip() for ln in text.splitlines()] + genes = [] + for ln in lines: + if not ln: + continue + genes.extend(g.strip() for g in ln.split(",") if g.strip()) + if genes: + return set(genes) raise click.ClickException( - f"--gene-filter {path}: expected a JSON list of gene symbols " - f"or a dict with one list value") + f"--gene-filter {path}: could not parse as JSON list/dict or as " + f"plain-text gene symbols") def _niche_row(X, i): From 22653f8e71f6d9fa4fddc4d63bcfd1b6bd18e337 Mon Sep 17 00:00:00 2001 From: teslajoy Date: Wed, 27 May 2026 06:49:10 -0700 Subject: [PATCH 6/6] ignore figs --- .gitignore | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.gitignore b/.gitignore index 4aa7e81..58e67da 100644 --- a/.gitignore +++ b/.gitignore @@ -178,3 +178,9 @@ scripts/ paper_draft.* TODO.md arm_topology_A_B_C.png +img/aucell_walks_figure.png +img/aucell_walks_figure.svg +img/fisher_walks_figure.png +img/fisher_walks_figure.svg +img/v1_v2_v3_comparison.png +img/v1_v2_v3_comparison.svg