Skip to content

feat: add ImageClassificationExecutor for vision fine-tuning#62

Open
J1shen wants to merge 6 commits into
mainfrom
feat/image-classification-executor
Open

feat: add ImageClassificationExecutor for vision fine-tuning#62
J1shen wants to merge 6 commits into
mainfrom
feat/image-classification-executor

Conversation

@J1shen
Copy link
Copy Markdown

@J1shen J1shen commented May 31, 2026

Purpose

FlowMesh's training executors (SFT/LoRA/DPO/PPO) are all LLM fine-tuning. This PR adds a first-class image-classification training task type so vision models can be fine-tuned through the same workflow/submit path, instead of users wrapping their own trainer in an ssh container job.

Changes

  • src/shared/tasks/task_type.py — add IMAGE_CLASSIFICATION_TRAINING = "image_classification_training".
  • src/shared/tasks/specs/training.py — add ImageClassificationTrainingSpec{Strict,Template}; the strict spec requires model.source.identifier (validated, no silent default).
  • src/shared/tasks/specs/__init__.py, src/shared/tasks/envelope.py — export the spec and wire it into the taskType discriminator union (strict + template).
  • sdk/src/flowmesh/models/common.py — mirror the new enum member (kept in lockstep with the server enum, enforced by tests/sdk/test_schema_compat.py).
  • src/worker/executors/image_classification_executor.py — new ImageClassificationExecutor: AutoImageProcessor + AutoModelForImageClassification + HF Trainer. Loads the dataset via datasets, infers num_labels/id2label, trains, evaluates, saves final_model/. Honors training.optimizer (→ TrainingArguments.optim) and training.augmentation (h-flip + 4px-pad random crop on the train split), and returns the per-step train_losses. Reuses the existing checkpoint/artifact helpers (determine_resume_path, archive_model_dir, maybe_upload_artifacts, TrainingMixin._cleanup_local_artifacts).
  • src/worker/executors/__init__.py — register image_classification_training in EXECUTOR_REGISTRY / EXECUTOR_CLASS_NAMES / __all__.
  • docs/EXECUTORS.md — add the taskType row; relabel the existing training row as LLM fine-tuning.
  • examples/templates/image_classification_vit.yaml — runnable ViT-on-CIFAR-10 demo.
  • tests/worker/test_executor_registry.py, tests/shared/test_image_classification_spec.py — registry key + envelope-resolution and required-model coverage.

Design

The executor mirrors SFTExecutor's structure (spec → dataset prep → Trainer → save → result) so it slots into the existing training conventions, and deliberately adds no new dependencies — it reuses the runtime-inference group (transformers/torch/datasets/Pillow). It is single-GPU for now: allow_multi_gpu is downgraded with a warning (same as LoRASFTExecutor); multi-GPU via a torchrun/DeepSpeed dist-entry mirroring sft_dist_entry.py is a follow-up. The model_arch-free, HF-identifier-driven design means any AutoModelForImageClassification checkpoint (ViT/ResNet/ConvNeXt/…) works without code changes.

Test Plan

  • uv run pre-commit run --all-files
  • uv run pytest tests/ --ignore=tests/worker/test_mp_executor_cleanup_gpu.py
  • uvx bandit==1.9.4 -c pyproject.toml -r src/
  • End-to-end (no Docker image): ran ImageClassificationExecutor.run() in-process on a single H100 — ViT-tiny on CIFAR-10 and Fashion-MNIST, plus an AdamW-vs-SGD contrast to confirm the optimizer knob takes effect.

Test Result

  • pre-commit: all hooks pass (gitleaks/isort/black/ruff/codespell/mypy).
  • pytest: 990 passed.
  • bandit: no issues.
  • In-process run: ok=True, final_model/ written (config.json + model.safetensors + preprocessor_config.json), eval_accuracy/train_losses populated. AdamW converged (loss 3.16→1.16) while SGD at the same budget stayed flat (~3.0–3.4), confirming training.optimizer is honored.

Pre-submission Checklist
  • I have read the contribution guidelines.
  • I have run pre-commit run --all-files and fixed any issues.
  • I have added or updated tests covering my changes (if applicable).
  • I have verified that uv run pytest tests/ passes locally.
  • If I changed shared schemas or proto definitions, I have checked downstream compatibility across Server and Worker.
  • If I changed the SDK or CLI, I have verified the affected packages work (uv sync --all-packages --group ci --frozen).
  • If this is a breaking change, I have prefixed the PR title with [BREAKING] and described migration steps above.
  • I have updated documentation or config examples if user-facing behavior changed.

Add a fifth training executor alongside SFT/LoRA/DPO/PPO that fine-tunes
image classification models via AutoModelForImageClassification and the
Transformers Trainer. Register the image_classification taskType, add the
ImageClassificationSpec, wire it into the envelope discriminator and the
worker executor registry, and ship a ViT-on-CIFAR-10 demo template.

Single-GPU only for now; multi-GPU via torchrun/DeepSpeed is a follow-up
mirroring the SFT distributed entrypoint.

Signed-off-by: Junyi Shen <jyshen44@gmail.com>
@J1shen J1shen requested review from kaiitunnz and timzsu as code owners May 31, 2026 17:17
The SDK TaskType enum must stay in lockstep with the server enum
(enforced by tests/sdk/test_schema_compat.py); add the new member.

Signed-off-by: Junyi Shen <jyshen44@gmail.com>
@J1shen J1shen changed the title feat(worker): add ImageClassificationExecutor for vision fine-tuning feat: add ImageClassificationExecutor for vision fine-tuning May 31, 2026
…classification

Wire two previously-ignored training knobs into ImageClassificationExecutor
and expose the per-step training loss curve:

- training.optimizer maps onto TrainingArguments.optim (adam->adamw_torch,
  sgd->sgd, else passthrough)
- training.augmentation applies horizontal flip + 4px-pad random crop to the
  train split only
- ImageClassificationResult.train_losses carries the Trainer loss history

Document the new knobs in the ViT demo template.

Signed-off-by: Junyi Shen <jyshen44@gmail.com>
Copy link
Copy Markdown
Collaborator

@timzsu timzsu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FlowMesh is intended to be a "service fabric for composable LLM workflows". Could you include some justification on whether you introduce the ViT training path instead of Omni LLM training (e.g. FlowGRPO training for Qwen-Image, etc.)?

In addition, the PR description looks like LLM generated and do not follow the template. Please fix :)

Comment thread docs/EXECUTORS.md Outdated
| `omni_text2{audio,image,speech,general}` | `Omni*Executor` | Multimodal generation |
| `training` | `SFTExecutor` / `LoRASFTExecutor` / `DPOExecutor` / `PPOExecutor` | Fine-tuning |
| `training` | `SFTExecutor` / `LoRASFTExecutor` / `DPOExecutor` / `PPOExecutor` | LLM fine-tuning |
| `image_classification` | `ImageClassificationExecutor` | Vision classification fine-tuning (`AutoModelForImageClassification` + HF `Trainer`) |
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image_classification is still a training task, which looks confusing. I suggest merging it to the training type or renaming training to llm_training, and naming this new type as image_classification_training.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noted

train_dataset: Dataset | None = None
eval_dataset: Dataset | None = None
try:
model_name = spec.model_name or "google/vit-base-patch16-224"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest requiring the model name.

J1shen added 3 commits June 1, 2026 12:27
…e model id

Address review feedback on the vision-training executor:

- Rename the taskType / enum / spec classes from image_classification to
  image_classification_training so it reads unambiguously as a training task
  alongside sft/lora_sft/dpo/ppo (server + SDK enums kept in sync).
- Require model.source.identifier: ImageClassificationTrainingSpecStrict
  rejects a missing model at validation, and the executor no longer falls
  back to a hard-coded ViT default.
- Add spec tests covering envelope resolution and the required-model rule.

Signed-off-by: Junyi Shen <jyshen44@gmail.com>
Refresh the supported-workloads summary to include image_classification_training
and add a task-types table covering training/inference plus the misc data and
utility tasks (data_profiling, data_retrieval, embedding, api, echo).

Signed-off-by: Junyi Shen <jyshen44@gmail.com>
…te from LLM tasks

Signed-off-by: Junyi Shen <jyshen44@gmail.com>
@J1shen J1shen requested a review from timzsu June 3, 2026 03:48
Copy link
Copy Markdown
Collaborator

@timzsu timzsu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that there are a few places not following the code style guidelines. Could you take a look at the doc? I have left comments in places that I spotted.

By the way, in ./src/server/task/models.py, please extend TRAINING_TASK_TYPES to include the new executor class.

Comment thread README.md
Comment on lines +133 to +157
## Task types

Set `spec.taskType` to pick an executor. See [`docs/EXECUTORS.md`](docs/EXECUTORS.md)
for the full registry and per-executor fields.

| `taskType` | What it does |
|---|---|
| **LLM — inference & agents** | |
| `inference` | LLM inference (vLLM / HF transformers) |
| `omni_text2{audio,image,speech,general}` | Multimodal generation |
| `rag` | Retrieval-augmented generation |
| `agent` | Tool-using LLM agent |
| **LLM — training** | |
| `sft` / `lora_sft` / `dpo` / `ppo` | LLM fine-tuning (TRL) |
| **Deep learning (non-LLM)** | |
| `image_classification_training` | Vision classifier fine-tuning (HF `AutoModelForImageClassification`) |
| `diffusion` | Image / video diffusion models |
| **Misc / data & utility** | |
| `data_profiling` | DataFrame profiling |
| `data_retrieval` | DataFrame loading from sources |
| `embedding` | Text embeddings |
| `api` | Outbound HTTP API call step |
| `ssh` | Interactive SSH session or non-interactive container job |
| `echo` | Echo input back (smoke tests) |

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add this piece of text to the README?

)

trainer = None
model = None # type: ignore[assignment]
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this type: ignore avoidable?

{idx: name for idx, name in enumerate(explicit_names)},
)

features = dataset.features.get(label_field)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check the type here. Seems that the label is expected to be int, yet there is no check in this executor.

predictions, labels = eval_pred
preds = np.argmax(predictions, axis=-1)
correct = int((preds == labels).sum())
total = int(labels.shape[0]) if hasattr(labels, "shape") else len(labels)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the hasattr here? I think len(labels) works regardless.

)

features = dataset.features.get(label_field)
feature_names: Any = getattr(features, "names", None)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this a direct member access? I think we are using a fixed dataset and we should try to be not defensive.

Comment on lines +394 to +396
max_eval = data_cfg.get("max_eval_samples", max_samples)
if max_eval is not None:
max_eval = int(max_eval)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
max_eval = data_cfg.get("max_eval_samples", max_samples)
if max_eval is not None:
max_eval = int(max_eval)
max_eval = int(data_cfg.get("max_eval_samples") or max_samples)

I am a bit confused about the logic here. Does this change work?

Comment on lines +382 to +383
max_samples = data_cfg.get("max_samples")
if max_samples is not None:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
max_samples = data_cfg.get("max_samples")
if max_samples is not None:
if max_samples := data_cfg.get("max_samples"):

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants