diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 58a3c8f..813812e 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -16,7 +16,7 @@ jobs: fail-fast: false matrix: java_version: ['17'] - nextflow_version: ['25.10'] + nextflow_version: ['26.04'] steps: - name: Environment diff --git a/README.md b/README.md index f08a790..54ff908 100644 --- a/README.md +++ b/README.md @@ -6,13 +6,13 @@ A proof-of-concept pipeline for performing hyperparameter optimization of machin ## Requirements * Unix-like operating system (Linux, macOS, etc) -* Java >=11 +* Java >=17 * [Conda](https://docs.conda.io/en/latest/) or [Docker](https://docs.docker.com/) ## Quickstart -1. Install Nextflow (version 23.10 or higher): +1. Install Nextflow (version 26.04 or higher): ```bash curl -s https://get.nextflow.io | bash diff --git a/main.nf b/main.nf index 0daef39..ed3fcaf 100644 --- a/main.nf +++ b/main.nf @@ -1,6 +1,7 @@ #!/usr/bin/env nextflow nextflow.enable.moduleBinaries = true +nextflow.enable.types = true include { FETCH_DATASET } from './modules/fetch_dataset' include { SPLIT_TRAIN_TEST } from './modules/split_train_test' @@ -14,16 +15,18 @@ include { EVALUATE } from './modules/evaluate' * Pipeline parameters. They can be overriden on the command line, * e.g. `--fetch_datasets some_value`. */ -params.fetch_datasets = null -params.train_test_splits = null -params.train_models = null -params.pretrained_models = null -params.outdir = 'results' +params { + fetch_datasets: String? + train_test_splits: Path? + train_models: String? + pretrained_models: Path? +} /* * entry workflow */ workflow { + main: log.info """\ H Y P E R O P T P I P E L I N E ================================= @@ -31,7 +34,6 @@ workflow { train_test_splits : ${params.train_test_splits} train_models : ${params.train_models} pretrained_models : ${params.pretrained_models} - outdir : ${params.outdir} """.stripIndent() // fetch and split datasets if specified @@ -44,10 +46,15 @@ workflow { // otherwise load custom train/test splits else if( params.train_test_splits != null ) { ch_datasets = channel.empty() - ch_train_test_splits = channel.of(file(params.train_test_splits)) - .flatMap { json -> json.splitJson() } + ch_train_test_splits = channel.of(params.train_test_splits) + .flatMap { json -> json.splitJson() as List } .map { r -> - tuple(r.dataset_name, file(r.meta), file(r.data_train), file(r.data_test)) + record( + dataset_name: r.dataset_name as String, + meta: file(r.meta, checkIfExists: true), + data_train: file(r.data_train, checkIfExists: true), + data_test: file(r.data_test, checkIfExists: true) + ) } } @@ -56,16 +63,25 @@ workflow { } // separate training and test data - ch_train_datasets = ch_train_test_splits.map { dataset_name, meta, data_train, data_test -> - tuple(dataset_name, meta, data_train) + ch_train_datasets = ch_train_test_splits.map { r -> + record(dataset_name: r.dataset_name, meta: r.meta, data: r.data_train) } - ch_test_datasets = ch_train_test_splits.map { dataset_name, meta, data_train, data_test -> - tuple(dataset_name, meta, data_test) + ch_test_datasets = ch_train_test_splits.map { r -> + record(dataset_name: r.dataset_name, meta: r.meta, data: r.data_test) } // visualize train/test datasets - VISUALIZE_TRAIN(ch_train_datasets) - VISUALIZE_TEST(ch_test_datasets) + ch_train_plots = VISUALIZE_TRAIN(ch_train_datasets).map { r -> + record(dataset_name: r.dataset_name, plot_train: r.plot) + } + ch_test_plots = VISUALIZE_TEST(ch_test_datasets).map { r -> + record(dataset_name: r.dataset_name, plot_test: r.plot) + } + + // combine train/test data with train/test plots + ch_splits = ch_train_test_splits + .join(ch_train_plots, by: 'dataset_name') + .join(ch_test_plots, by: 'dataset_name') // print warning if both training and pre-trained model are enabled if( params.train_models != null && params.pretrained_models != null ) { @@ -75,15 +91,24 @@ workflow { // train new models if specified if( params.train_models != null ) { model_types = params.train_models.tokenize(',') - (ch_models, ch_train_logs) = TRAIN(ch_train_datasets, model_types) + ch_train_inputs = ch_train_datasets.flatMap { r -> + model_types.collect { model_type -> + r + record(model_type: model_type) + } + } + ch_models = TRAIN(ch_train_inputs) } // otherwise load pretrained models if specified else if( params.pretrained_models != null ) { - ch_models = channel.of(file(params.pretrained_models)) - .flatMap { json -> json.splitJson() } + ch_models = channel.of(params.pretrained_models) + .flatMap { json -> json.splitJson() as List } .map { r -> - tuple(r.dataset_name, r.model_type, file(r.model)) + record( + dataset_name: r.dataset_name as String, + model_type: r.model_type as String, + model: file(r.model, checkIfExists: true) + ) } } @@ -92,19 +117,93 @@ workflow { } // evaluate each model against test dataset - ch_evaluate_inputs = ch_models.combine(ch_test_datasets, by: 0) - (ch_scores, ch_test_logs) = EVALUATE(ch_evaluate_inputs) + ch_evaluate_inputs = ch_models.join(ch_test_datasets, by: 'dataset_name') + ch_evals = EVALUATE(ch_evaluate_inputs).map { r -> + record( + model_type: r.model_type, + dataset_name: r.dataset_name, + score_name: r.score.name, + score: r.score.value, + logs: r.logs, + ) + } // report the best model for each dataset based on evaluation score - ch_scores - .max { it -> fromJson(it[2]).value } - .subscribe { dataset_name, model_type, score_file -> - def score = fromJson(score_file) - printf "The best model for dataset '${dataset_name}' was '${model_type}' (${score.name} = %.3f)\n", score.value + ch_evals + .map { r -> tuple(r.dataset_name, r) } + .groupBy() + .subscribe { dataset_name, evals -> + def best = evals.max { r -> r.score } + printf "The best model for dataset '${dataset_name}' was '${best.model_type}' (${best.score_name} = %.3f)\n", best.score + } + + publish: + datasets = ch_datasets + splits = ch_splits + trained_models = params.train_models != null ? ch_models : channel.empty() + evals = ch_evals +} + +/* + * Workflow outputs + */ +output { + datasets: Channel { + path { r -> "datasets/${r.dataset_name}/" } + index { + path 'datasets/index.json' + } + } + + splits: Channel { + path { r -> "splits/${r.dataset_name}/" } + index { + path 'splits/index.json' + } + } + + trained_models: Channel { + path { r -> "trained_models/${r.dataset_name}-${r.model_type}/" } + index { + path 'trained_models/index.json' } + } + + evals: Channel { + path { r -> "evals/${r.dataset_name}-${r.model_type}/" } + index { + path 'evals/index.json' + } + } } +/* + * Types + */ +record Dataset { + dataset_name: String + meta: Path + data: Path +} + +record TrainTestSplit { + dataset_name: String + meta: Path + data_train: Path + plot_train: Path + data_test: Path + plot_test: Path +} + +record Model { + model_type: String + model: Path + dataset_name: String +} -def fromJson(file) { - return new groovy.json.JsonSlurper().parse(file) +record Eval { + model_type: String + dataset_name: String + score_name: String + score: Float } diff --git a/modules/evaluate/main.nf b/modules/evaluate/main.nf index c1261c2..7ba28b9 100644 --- a/modules/evaluate/main.nf +++ b/modules/evaluate/main.nf @@ -1,20 +1,39 @@ +nextflow.enable.types = true process EVALUATE { - publishDir params.outdir, mode: 'copy', saveAs: { file -> "${dataset_name}.${model_type}.${file}" } tag "${dataset_name}/${model_type}" input: - tuple val(dataset_name), val(model_type), path(model_file), path(meta_file), path(data_file) + record( + model_type: String, + model: Path, + dataset_name: String, + data: Path, + meta: Path + ) output: - tuple val(dataset_name), val(model_type), path('score.json'), emit: scores - tuple val(dataset_name), val(model_type), stdout, emit: logs + record( + model_type: model_type, + dataset_name: dataset_name, + score: new groovy.json.JsonSlurper().parse(file('score.json')) as Map, + // score: fromJson(file('score.json')) as Map, + logs: file('evaluate.log'), + ) script: """ evaluate.py \ - --model ${model_file} \ - --data ${data_file} \ - --meta ${meta_file} + --model ${model} \ + --data ${data} \ + --meta ${meta} \ + > evaluate.log """ } + +/* + * Load data from a JSON file + */ +def fromJson(file: Path) { + return new groovy.json.JsonSlurper().parse(file) +} diff --git a/modules/fetch_dataset/main.nf b/modules/fetch_dataset/main.nf index c544856..4920f00 100644 --- a/modules/fetch_dataset/main.nf +++ b/modules/fetch_dataset/main.nf @@ -1,13 +1,17 @@ +nextflow.enable.types = true process FETCH_DATASET { - publishDir params.outdir, mode: 'copy', saveAs: { file -> "${dataset_name}.${file}" } - tag "${dataset_name}" + tag dataset_name input: - val(dataset_name) + dataset_name: String output: - tuple val(dataset_name), path('meta.json'), path('data.txt') + record( + dataset_name: dataset_name, + meta: file('meta.json'), + data: file('data.txt'), + ) script: """ diff --git a/modules/split_train_test/main.nf b/modules/split_train_test/main.nf index b0465f9..2ae1004 100644 --- a/modules/split_train_test/main.nf +++ b/modules/split_train_test/main.nf @@ -1,16 +1,25 @@ +nextflow.enable.types = true process SPLIT_TRAIN_TEST { - publishDir params.outdir, mode: 'copy', saveAs: { file -> "${dataset_name}.${file}" } - tag "${dataset_name}" + tag dataset_name input: - tuple val(dataset_name), path(meta_file), path(data_file) + record( + dataset_name: String, + meta: Path, + data: Path + ) output: - tuple val(dataset_name), path(meta_file), path('train.txt'), path('test.txt') + record( + dataset_name: dataset_name, + meta: meta, + data_train: file('train.txt'), + data_test: file('test.txt'), + ) script: """ - split-train-test.py --data ${data_file} + split-train-test.py --data ${data} """ } diff --git a/modules/train/main.nf b/modules/train/main.nf index 3902761..ddf3cab 100644 --- a/modules/train/main.nf +++ b/modules/train/main.nf @@ -1,22 +1,31 @@ +nextflow.enable.types = true process TRAIN { - publishDir params.outdir, mode: 'copy', saveAs: { file -> "${dataset_name}.${model_type}.${file}" } tag "${dataset_name}/${model_type}" input: - tuple val(dataset_name), path(meta_file), path(data_file) - each model_type + record( + dataset_name: String, + meta: Path, + data: Path, + model_type: String + ) output: - tuple val(dataset_name), val(model_type), path('model.pkl'), emit: models - tuple val(dataset_name), val(model_type), stdout, emit: logs + record( + dataset_name: dataset_name, + model_type: model_type, + model: file('model.pkl'), + logs: file('train.log'), + ) script: """ train.py \ - --data ${data_file} \ - --meta ${meta_file} \ + --data ${data} \ + --meta ${meta} \ --scaler standard \ - --model-type ${model_type} + --model-type ${model_type} \ + > train.log """ } diff --git a/modules/visualize/main.nf b/modules/visualize/main.nf index 28d506a..ae41a14 100644 --- a/modules/visualize/main.nf +++ b/modules/visualize/main.nf @@ -1,18 +1,28 @@ +nextflow.enable.types = true process VISUALIZE { - publishDir params.outdir, mode: 'copy', saveAs: { file -> "${dataset_name}.${file}" } + tag data.name input: - tuple val(dataset_name), path(meta_file), path(data_file) + record( + dataset_name: String, + meta: Path, + data: Path + ) output: - tuple val(dataset_name), path('*.png') + record( + dataset_name: dataset_name, + meta: meta, + data: data, + plot: file('*.png'), + ) script: """ visualize.py \ - --data ${data_file} \ - --meta ${meta_file} \ - --outfile `basename ${data_file} .txt`.png + --data ${data} \ + --meta ${meta} \ + --outfile `basename ${data} .txt`.png """ } diff --git a/nextflow.config b/nextflow.config index b4272c4..524096c 100644 --- a/nextflow.config +++ b/nextflow.config @@ -2,10 +2,13 @@ manifest { description = 'Proof-of-concept pipeline for performing hyperparameter optimization of machine learning models with Nextflow' author = 'Ben Sherman' - nextflowVersion = '>=23.10.0' + nextflowVersion = '>=26.04.0' } -params.outdir = 'results' +/* + * Publish settings + */ +workflow.output.mode = 'copy' /* * Execution profiles for different environments. diff --git a/nextflow_schema.json b/nextflow_schema.json index d4dce81..a374a0a 100644 --- a/nextflow_schema.json +++ b/nextflow_schema.json @@ -35,12 +35,6 @@ "format": "file-path", "description": "Index file of pre-trained models to evaluate", "fa_icon": "fas fa-dumbbell" - }, - "outdir": { - "type": "string", - "description": "Directory to publish output data", - "default": "results", - "fa_icon": "fas fa-folder-open" } } }