feat: add ImageClassificationExecutor for vision fine-tuning#62
Conversation
Add a fifth training executor alongside SFT/LoRA/DPO/PPO that fine-tunes image classification models via AutoModelForImageClassification and the Transformers Trainer. Register the image_classification taskType, add the ImageClassificationSpec, wire it into the envelope discriminator and the worker executor registry, and ship a ViT-on-CIFAR-10 demo template. Single-GPU only for now; multi-GPU via torchrun/DeepSpeed is a follow-up mirroring the SFT distributed entrypoint. Signed-off-by: Junyi Shen <jyshen44@gmail.com>
The SDK TaskType enum must stay in lockstep with the server enum (enforced by tests/sdk/test_schema_compat.py); add the new member. Signed-off-by: Junyi Shen <jyshen44@gmail.com>
…classification Wire two previously-ignored training knobs into ImageClassificationExecutor and expose the per-step training loss curve: - training.optimizer maps onto TrainingArguments.optim (adam->adamw_torch, sgd->sgd, else passthrough) - training.augmentation applies horizontal flip + 4px-pad random crop to the train split only - ImageClassificationResult.train_losses carries the Trainer loss history Document the new knobs in the ViT demo template. Signed-off-by: Junyi Shen <jyshen44@gmail.com>
timzsu
left a comment
There was a problem hiding this comment.
FlowMesh is intended to be a "service fabric for composable LLM workflows". Could you include some justification on whether you introduce the ViT training path instead of Omni LLM training (e.g. FlowGRPO training for Qwen-Image, etc.)?
In addition, the PR description looks like LLM generated and do not follow the template. Please fix :)
| | `omni_text2{audio,image,speech,general}` | `Omni*Executor` | Multimodal generation | | ||
| | `training` | `SFTExecutor` / `LoRASFTExecutor` / `DPOExecutor` / `PPOExecutor` | Fine-tuning | | ||
| | `training` | `SFTExecutor` / `LoRASFTExecutor` / `DPOExecutor` / `PPOExecutor` | LLM fine-tuning | | ||
| | `image_classification` | `ImageClassificationExecutor` | Vision classification fine-tuning (`AutoModelForImageClassification` + HF `Trainer`) | |
There was a problem hiding this comment.
image_classification is still a training task, which looks confusing. I suggest merging it to the training type or renaming training to llm_training, and naming this new type as image_classification_training.
| train_dataset: Dataset | None = None | ||
| eval_dataset: Dataset | None = None | ||
| try: | ||
| model_name = spec.model_name or "google/vit-base-patch16-224" |
There was a problem hiding this comment.
I suggest requiring the model name.
…e model id Address review feedback on the vision-training executor: - Rename the taskType / enum / spec classes from image_classification to image_classification_training so it reads unambiguously as a training task alongside sft/lora_sft/dpo/ppo (server + SDK enums kept in sync). - Require model.source.identifier: ImageClassificationTrainingSpecStrict rejects a missing model at validation, and the executor no longer falls back to a hard-coded ViT default. - Add spec tests covering envelope resolution and the required-model rule. Signed-off-by: Junyi Shen <jyshen44@gmail.com>
Refresh the supported-workloads summary to include image_classification_training and add a task-types table covering training/inference plus the misc data and utility tasks (data_profiling, data_retrieval, embedding, api, echo). Signed-off-by: Junyi Shen <jyshen44@gmail.com>
…te from LLM tasks Signed-off-by: Junyi Shen <jyshen44@gmail.com>
timzsu
left a comment
There was a problem hiding this comment.
It seems that there are a few places not following the code style guidelines. Could you take a look at the doc? I have left comments in places that I spotted.
By the way, in ./src/server/task/models.py, please extend TRAINING_TASK_TYPES to include the new executor class.
| ## Task types | ||
|
|
||
| Set `spec.taskType` to pick an executor. See [`docs/EXECUTORS.md`](docs/EXECUTORS.md) | ||
| for the full registry and per-executor fields. | ||
|
|
||
| | `taskType` | What it does | | ||
| |---|---| | ||
| | **LLM — inference & agents** | | | ||
| | `inference` | LLM inference (vLLM / HF transformers) | | ||
| | `omni_text2{audio,image,speech,general}` | Multimodal generation | | ||
| | `rag` | Retrieval-augmented generation | | ||
| | `agent` | Tool-using LLM agent | | ||
| | **LLM — training** | | | ||
| | `sft` / `lora_sft` / `dpo` / `ppo` | LLM fine-tuning (TRL) | | ||
| | **Deep learning (non-LLM)** | | | ||
| | `image_classification_training` | Vision classifier fine-tuning (HF `AutoModelForImageClassification`) | | ||
| | `diffusion` | Image / video diffusion models | | ||
| | **Misc / data & utility** | | | ||
| | `data_profiling` | DataFrame profiling | | ||
| | `data_retrieval` | DataFrame loading from sources | | ||
| | `embedding` | Text embeddings | | ||
| | `api` | Outbound HTTP API call step | | ||
| | `ssh` | Interactive SSH session or non-interactive container job | | ||
| | `echo` | Echo input back (smoke tests) | | ||
|
|
There was a problem hiding this comment.
Do we need to add this piece of text to the README?
| ) | ||
|
|
||
| trainer = None | ||
| model = None # type: ignore[assignment] |
There was a problem hiding this comment.
Is this type: ignore avoidable?
| {idx: name for idx, name in enumerate(explicit_names)}, | ||
| ) | ||
|
|
||
| features = dataset.features.get(label_field) |
There was a problem hiding this comment.
Please check the type here. Seems that the label is expected to be int, yet there is no check in this executor.
| predictions, labels = eval_pred | ||
| preds = np.argmax(predictions, axis=-1) | ||
| correct = int((preds == labels).sum()) | ||
| total = int(labels.shape[0]) if hasattr(labels, "shape") else len(labels) |
There was a problem hiding this comment.
Do we need the hasattr here? I think len(labels) works regardless.
| ) | ||
|
|
||
| features = dataset.features.get(label_field) | ||
| feature_names: Any = getattr(features, "names", None) |
There was a problem hiding this comment.
Can we make this a direct member access? I think we are using a fixed dataset and we should try to be not defensive.
| max_eval = data_cfg.get("max_eval_samples", max_samples) | ||
| if max_eval is not None: | ||
| max_eval = int(max_eval) |
There was a problem hiding this comment.
| max_eval = data_cfg.get("max_eval_samples", max_samples) | |
| if max_eval is not None: | |
| max_eval = int(max_eval) | |
| max_eval = int(data_cfg.get("max_eval_samples") or max_samples) |
I am a bit confused about the logic here. Does this change work?
| max_samples = data_cfg.get("max_samples") | ||
| if max_samples is not None: |
There was a problem hiding this comment.
| max_samples = data_cfg.get("max_samples") | |
| if max_samples is not None: | |
| if max_samples := data_cfg.get("max_samples"): |
Purpose
FlowMesh's training executors (SFT/LoRA/DPO/PPO) are all LLM fine-tuning. This PR adds a first-class image-classification training task type so vision models can be fine-tuned through the same workflow/submit path, instead of users wrapping their own trainer in an
sshcontainer job.Changes
src/shared/tasks/task_type.py— addIMAGE_CLASSIFICATION_TRAINING = "image_classification_training".src/shared/tasks/specs/training.py— addImageClassificationTrainingSpec{Strict,Template}; the strict spec requiresmodel.source.identifier(validated, no silent default).src/shared/tasks/specs/__init__.py,src/shared/tasks/envelope.py— export the spec and wire it into thetaskTypediscriminator union (strict + template).sdk/src/flowmesh/models/common.py— mirror the new enum member (kept in lockstep with the server enum, enforced bytests/sdk/test_schema_compat.py).src/worker/executors/image_classification_executor.py— newImageClassificationExecutor:AutoImageProcessor+AutoModelForImageClassification+ HFTrainer. Loads the dataset viadatasets, infersnum_labels/id2label, trains, evaluates, savesfinal_model/. Honorstraining.optimizer(→TrainingArguments.optim) andtraining.augmentation(h-flip + 4px-pad random crop on the train split), and returns the per-steptrain_losses. Reuses the existing checkpoint/artifact helpers (determine_resume_path,archive_model_dir,maybe_upload_artifacts,TrainingMixin._cleanup_local_artifacts).src/worker/executors/__init__.py— registerimage_classification_traininginEXECUTOR_REGISTRY/EXECUTOR_CLASS_NAMES/__all__.docs/EXECUTORS.md— add the taskType row; relabel the existing training row as LLM fine-tuning.examples/templates/image_classification_vit.yaml— runnable ViT-on-CIFAR-10 demo.tests/worker/test_executor_registry.py,tests/shared/test_image_classification_spec.py— registry key + envelope-resolution and required-model coverage.Design
The executor mirrors
SFTExecutor's structure (spec → dataset prep → Trainer → save → result) so it slots into the existing training conventions, and deliberately adds no new dependencies — it reuses theruntime-inferencegroup (transformers/torch/datasets/Pillow). It is single-GPU for now:allow_multi_gpuis downgraded with a warning (same asLoRASFTExecutor); multi-GPU via atorchrun/DeepSpeed dist-entry mirroringsft_dist_entry.pyis a follow-up. Themodel_arch-free, HF-identifier-driven design means anyAutoModelForImageClassificationcheckpoint (ViT/ResNet/ConvNeXt/…) works without code changes.Test Plan
uv run pre-commit run --all-filesuv run pytest tests/ --ignore=tests/worker/test_mp_executor_cleanup_gpu.pyuvx bandit==1.9.4 -c pyproject.toml -r src/ImageClassificationExecutor.run()in-process on a single H100 — ViT-tiny on CIFAR-10 and Fashion-MNIST, plus an AdamW-vs-SGD contrast to confirm the optimizer knob takes effect.Test Result
ok=True,final_model/written (config.json+model.safetensors+preprocessor_config.json),eval_accuracy/train_lossespopulated. AdamW converged (loss 3.16→1.16) while SGD at the same budget stayed flat (~3.0–3.4), confirmingtraining.optimizeris honored.Pre-submission Checklist
pre-commit run --all-filesand fixed any issues.uv run pytest tests/passes locally.uv sync --all-packages --group ci --frozen).[BREAKING]and described migration steps above.