diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index e551ec2e..68364c31 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -25,7 +25,7 @@ repos:
- id: python-no-log-warn
- id: text-unicode-replacement-char
- repo: https://github.com/astral-sh/ruff-pre-commit
- rev: v0.15.11
+ rev: v0.15.12
hooks:
- id: ruff-format
- id: ruff-check
@@ -33,6 +33,13 @@ repos:
rev: 0.11.7
hooks:
- id: uv-lock
+- repo: local
+ hooks:
+ - id: check-termynal-line-lengths
+ name: Check Termynal line lengths
+ entry: python scripts/check_termynal_line_lengths.py
+ language: python
+ files: ^docs/source/_static/md/.*\.md$
- repo: https://github.com/executablebooks/mdformat
rev: 1.0.0
hooks:
@@ -60,7 +67,7 @@ repos:
- id: nbstripout
exclude: (docs)
- repo: https://github.com/crate-ci/typos
- rev: v1
+ rev: v1.45.1
hooks:
- id: typos
exclude: (\.ipynb)
diff --git a/.readthedocs.yaml b/.readthedocs.yaml
index 519e8b04..2aa8017f 100644
--- a/.readthedocs.yaml
+++ b/.readthedocs.yaml
@@ -19,7 +19,7 @@ build:
- UV_PROJECT_ENVIRONMENT="${READTHEDOCS_VIRTUALENV_PATH}" uv sync --frozen --group docs
build:
html:
- - UV_PROJECT_ENVIRONMENT="${READTHEDOCS_VIRTUALENV_PATH}" uv run --group docs zensical build
+ - UV_PROJECT_ENVIRONMENT="${READTHEDOCS_VIRTUALENV_PATH}" uvx --from rust-just just docs
post_build:
- mkdir -p "${READTHEDOCS_OUTPUT}/html"
- cp -a docs/build/. "${READTHEDOCS_OUTPUT}/html/"
diff --git a/docs/source/_static/md/commands/build-options.md b/docs/source/_static/md/commands/build-options.md
index 317f7f2a..3b8fe0e7 100644
--- a/docs/source/_static/md/commands/build-options.md
+++ b/docs/source/_static/md/commands/build-options.md
@@ -2,7 +2,6 @@
| ---------------------------------------------------------- | ------------------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------- |
| -c, --config FILE | - | Path to configuration file. |
| --capture [fd\|no\|sys\|tee-sys] | fd | Per task capturing method. |
-| --clean-lockfile | false | Rewrite the lockfile with only currently collected tasks. |
| --database-url TEXT | - | Url to the database. |
| --debug-pytask | false | Trace all function calls in the plugin framework. |
| --disable-warnings | false | Disables the summary for warnings. |
diff --git a/docs/source/_static/md/commands/command-list.md b/docs/source/_static/md/commands/command-list.md
index 093f3ad2..af659a97 100644
--- a/docs/source/_static/md/commands/command-list.md
+++ b/docs/source/_static/md/commands/command-list.md
@@ -4,5 +4,6 @@
| [`clean`](clean.md) | Clean the provided paths by removing files unknown to pytask. |
| [`collect`](collect.md) | Collect tasks and report information about them. |
| [`dag`](dag.md) | Create a visualization of the directed acyclic graph. |
+| [`lock`](lock.md) | Inspect and update recorded task state in the lockfile. |
| [`markers`](markers.md) | Show all registered markers. |
| [`profile`](profile.md) | Show information about resource consumption. |
diff --git a/docs/source/_static/md/lock-accept-dry-run.md b/docs/source/_static/md/lock-accept-dry-run.md
new file mode 100644
index 00000000..acf5ee72
--- /dev/null
+++ b/docs/source/_static/md/lock-accept-dry-run.md
@@ -0,0 +1,16 @@
+
+
+```console
+$ pytask lock accept -k train --dry-run
+
+────────────────────────── Start pytask session ──────────────────────────
+Platform: win32 -- Python 3.12.0, pytask 0.5.3
+Root: C:\Users\pytask-dev\git\my_project
+Collected 2 tasks.
+Would accept recorded state for task_train.py::task_train.
+Would accept recorded state for task_evaluate.py::task_evaluate.
+
+─────────────────────────────────────────────────────────────────────────
+```
+
+
diff --git a/docs/source/_static/md/lock-accept-interactive.md b/docs/source/_static/md/lock-accept-interactive.md
new file mode 100644
index 00000000..67615034
--- /dev/null
+++ b/docs/source/_static/md/lock-accept-interactive.md
@@ -0,0 +1,16 @@
+
+
+```console
+$ pytask lock accept -k train
+────────────────────────── Start pytask session ──────────────────────────
+Platform: win32 -- Python 3.12.0, pytask 0.5.3
+Root: C:\Users\pytask-dev\git\my_project
+Collected 2 tasks.
+# Accept recorded state for task_train.py::task_train? [y/N]: $ y
+# Accept recorded state for task_evaluate.py::task_evaluate? [y/N]: $ n
+Accept recorded state for task_train.py::task_train.
+
+─────────────────────────────────────────────────────────────────────────
+```
+
+
diff --git a/docs/source/_static/md/lock-clean.md b/docs/source/_static/md/lock-clean.md
new file mode 100644
index 00000000..b60b30ed
--- /dev/null
+++ b/docs/source/_static/md/lock-clean.md
@@ -0,0 +1,17 @@
+
+
+```console
+$ pytask lock clean
+────────────────────────── Start pytask session ──────────────────────────
+Platform: win32 -- Python 3.12.0, pytask 0.5.3
+Root: C:\Users\pytask-dev\git\my_project
+Collected 2 tasks.
+# Remove recorded state for task_old.py::task_train? [y/N]: $ y
+# Remove recorded state for task_old.py::task_evaluate? [y/N]: $ y
+Remove recorded state for task_old.py::task_train.
+Remove recorded state for task_old.py::task_evaluate.
+
+─────────────────────────────────────────────────────────────────────────
+```
+
+
diff --git a/docs/source/how_to_guides/index.md b/docs/source/how_to_guides/index.md
index ce4084f1..c18c0c4d 100644
--- a/docs/source/how_to_guides/index.md
+++ b/docs/source/how_to_guides/index.md
@@ -10,6 +10,7 @@ specific tasks with pytask.
- [Migrating From Scripts To Pytask](migrating_from_scripts_to_pytask.md)
- [Interfaces For Dependencies Products](interfaces_for_dependencies_products.md)
- [Portability](portability.md)
+- [Update the Lockfile to Match Project State](reconciling_lockfile_state.md)
- [Remote Files](remote_files.md)
- [Functional Interface](functional_interface.md)
- [Capture Warnings](capture_warnings.md)
diff --git a/docs/source/how_to_guides/portability.md b/docs/source/how_to_guides/portability.md
index 6ffb23dd..ac38cf92 100644
--- a/docs/source/how_to_guides/portability.md
+++ b/docs/source/how_to_guides/portability.md
@@ -17,13 +17,11 @@ Use this checklist when you move a project to another machine or environment.
Run a normal build with [`pytask build`](../reference_guides/commands.md#pytask-build)
so `pytask.lock` is up to date:
-````
```console
$ pytask build
```
If you already have a recent lockfile and up-to-date outputs, you can skip this step.
-````
1. **Ship the right files.**
@@ -85,11 +83,10 @@ tasks run. If tasks are removed or renamed, their old entries remain as stale da
are ignored.
To clean up stale entries without deleting the file, run
-[`pytask build --clean-lockfile`](../reference_guides/commands.md#pytask-build--clean-lockfile):
+[`pytask lock clean`](../reference_guides/commands.md#pytask-lock-clean):
```console
-$ pytask build --clean-lockfile
+$ pytask lock clean
```
-This rewrites the lockfile after a successful build with only the currently collected
-tasks and their current state values.
+This removes lockfile entries for tasks which are no longer collected.
diff --git a/docs/source/how_to_guides/reconciling_lockfile_state.md b/docs/source/how_to_guides/reconciling_lockfile_state.md
new file mode 100644
index 00000000..135c2496
--- /dev/null
+++ b/docs/source/how_to_guides/reconciling_lockfile_state.md
@@ -0,0 +1,123 @@
+# Update the Lockfile to Match Project State
+
+Use [`pytask lock`](../reference_guides/commands.md#pytask-lock) when the current files
+and outputs in the project are already correct, but the recorded state in `pytask.lock`
+needs to catch up. This can happen after refactoring task files, moving or renaming
+tasks, producing outputs outside of pytask, or deleting tasks.
+
+## Accept current files and outputs
+
+Use [`pytask lock accept`](../reference_guides/commands.md#pytask-lock-accept) when the
+current dependencies, products, and task definition are already correct and should
+become the new recorded state.
+
+Preview the changes without writing them with `--dry-run`:
+
+--8<-- "docs/source/_static/md/lock-accept-dry-run.md"
+
+Then accept the planned changes interactively:
+
+--8<-- "docs/source/_static/md/lock-accept-interactive.md"
+
+Add `--yes` to apply all planned changes without prompting:
+
+```console
+$ pytask lock accept -k train --yes
+```
+
+If no selectors are provided, `pytask lock accept` applies to all collected tasks in the
+provided paths.
+
+If selectors are provided with `-k` or `-m`, `accept` automatically includes the
+ancestors of the selected tasks. This is useful when you target a downstream task and
+want the accepted state to stay consistent with its upstream dependencies.
+
+```console
+$ pytask lock accept -k evaluate
+```
+
+In this example, `pytask` accepts `evaluate` and its ancestors. It does not
+automatically include descendants. If you want to accept a wider part of the DAG, widen
+the task selection yourself.
+
+```console
+$ pytask lock accept -k "train or evaluate"
+```
+
+If a selected task is missing a required dependency or product, the command fails
+instead of accepting incomplete state.
+
+Run a build afterwards to check that unchanged tasks are skipped according to the
+updated lockfile.
+
+```console
+$ pytask build
+```
+
+## Reset state for selected tasks
+
+Use [`pytask lock reset`](../reference_guides/commands.md#pytask-lock-reset) to remove
+recorded state for selected tasks when state was accepted too broadly or when specific
+tasks should be reconsidered from scratch.
+
+```console
+$ pytask lock reset -k train
+```
+
+Unlike `accept`, `reset` with a selector works on the exact selected tasks. It does not
+automatically include ancestors.
+
+Preview the reset with `--dry-run` if you want to check the affected tasks first:
+
+```console
+$ pytask lock reset -k train --dry-run
+```
+
+Add `--yes` to remove all planned entries without prompting:
+
+```console
+$ pytask lock reset -k train --yes
+```
+
+If no selectors are provided, `pytask lock reset` removes the recorded state for all
+collected tasks in the provided paths.
+
+Run a build afterwards so `pytask` determines again whether the selected tasks require
+execution.
+
+```console
+$ pytask build
+```
+
+## Remove stale entries for deleted or moved tasks
+
+Use [`pytask lock clean`](../reference_guides/commands.md#pytask-lock-clean) to remove
+entries from the lockfile which no longer correspond to collected tasks in the current
+project. This is useful after deleting, renaming, or moving tasks when old entries
+should no longer remain in the lockfile.
+
+Preview stale entries without writing them with `--dry-run`:
+
+```console
+$ pytask lock clean --dry-run
+```
+
+Then remove stale entries interactively:
+
+--8<-- "docs/source/_static/md/lock-clean.md"
+
+Add `--yes` to remove all stale entries without prompting:
+
+```console
+$ pytask lock clean --yes
+```
+
+`clean` only removes entries for tasks which are no longer collected. It does not accept
+or update the current state of collected tasks.
+
+## Related
+
+- [`pytask lock`](../reference_guides/commands.md#pytask-lock)
+- [`pytask build`](../reference_guides/commands.md#pytask-build)
+- [Portability](portability.md)
+- [The lockfile](../reference_guides/lockfile.md)
diff --git a/docs/source/reference_guides/lockfile.md b/docs/source/reference_guides/lockfile.md
index ea4ac1bc..dfb3c423 100644
--- a/docs/source/reference_guides/lockfile.md
+++ b/docs/source/reference_guides/lockfile.md
@@ -51,9 +51,9 @@ There are two portability concerns:
## Maintenance
-Use [`pytask build --clean-lockfile`](commands.md#pytask-build--clean-lockfile) to
-rewrite `pytask.lock` with only currently collected tasks. The rewrite happens after a
-successful build and recomputes current state values without executing tasks again.
+Use [`pytask lock clean`](commands.md#pytask-lock-clean) to rewrite `pytask.lock` with
+only currently collected tasks. The command removes stale task entries without executing
+tasks again.
## File Format Reference
diff --git a/mkdocs.yml b/mkdocs.yml
index 4711b71a..9b95ffe9 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -38,6 +38,7 @@ nav:
- Migrating From Scripts To pytask: how_to_guides/migrating_from_scripts_to_pytask.md
- Interfaces For Dependencies And Products: how_to_guides/interfaces_for_dependencies_products.md
- Portability: how_to_guides/portability.md
+ - Reconciling Lockfile State: how_to_guides/reconciling_lockfile_state.md
- Remote Files: how_to_guides/remote_files.md
- Functional Interface: how_to_guides/functional_interface.md
- Capture Warnings: how_to_guides/capture_warnings.md
diff --git a/scripts/check_termynal_line_lengths.py b/scripts/check_termynal_line_lengths.py
new file mode 100644
index 00000000..df42782a
--- /dev/null
+++ b/scripts/check_termynal_line_lengths.py
@@ -0,0 +1,74 @@
+"""Check visible line lengths in Termynal documentation snippets."""
+
+from __future__ import annotations
+
+import argparse
+import re
+import sys
+from html import unescape
+from pathlib import Path
+
+# Existing Termynal snippets top out at 78 visible characters.
+MAX_TERMYNAL_LINE_LENGTH = 78
+
+_CONSOLE_BLOCK_PATTERN = re.compile(r"```console\n(.*?)\n```", re.DOTALL)
+_HTML_TAG_PATTERN = re.compile(r"<[^>]+>")
+
+
+def _visible_text(line: str) -> str:
+ return unescape(_HTML_TAG_PATTERN.sub("", line))
+
+
+def _iter_violations(path: Path, max_length: int) -> list[str]:
+ text = path.read_text(encoding="utf-8")
+ if 'class="termy"' not in text:
+ return []
+
+ violations = []
+ for match in _CONSOLE_BLOCK_PATTERN.finditer(text):
+ start_line = text.count("\n", 0, match.start(1)) + 1
+ for offset, raw_line in enumerate(match.group(1).splitlines()):
+ rendered_line = _visible_text(raw_line)
+ line_length = len(rendered_line)
+ if line_length > max_length:
+ violations.append(
+ f"{path}:{start_line + offset}: rendered line has "
+ f"{line_length} characters, maximum is {max_length}:\n"
+ f" {rendered_line}"
+ )
+
+ return violations
+
+
+def main() -> int:
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ "paths",
+ nargs="*",
+ type=Path,
+ default=sorted(Path("docs/source/_static/md").rglob("*.md")),
+ help="Markdown files with Termynal snippets.",
+ )
+ parser.add_argument(
+ "--max-length",
+ type=int,
+ default=MAX_TERMYNAL_LINE_LENGTH,
+ help="Maximum visible line length for rendered terminal lines.",
+ )
+ args = parser.parse_args()
+
+ violations = [
+ violation
+ for path in args.paths
+ if path.is_file()
+ for violation in _iter_violations(path, args.max_length)
+ ]
+ if violations:
+ sys.stderr.write("\n\n".join(violations))
+ sys.stderr.write("\n")
+ return 1
+ return 0
+
+
+if __name__ == "__main__":
+ raise SystemExit(main())
diff --git a/src/_pytask/build.py b/src/_pytask/build.py
index 52ce5e17..4195ee32 100644
--- a/src/_pytask/build.py
+++ b/src/_pytask/build.py
@@ -72,7 +72,6 @@ def build( # noqa: PLR0913
debug_pytask: bool = False,
disable_warnings: bool = False,
dry_run: bool = False,
- clean_lockfile: bool = False,
editor_url_scheme: Literal["no_link", "file", "vscode", "pycharm"] # noqa: PYI051
| str = "file",
explain: bool = False,
@@ -132,8 +131,6 @@ def build( # noqa: PLR0913
Whether warnings should be disabled and not displayed.
dry_run : bool, default=False
Whether a dry-run should be performed that shows which tasks need to be rerun.
- clean_lockfile : bool, default=False
- Whether the lockfile should be rewritten to only include collected tasks.
editor_url_scheme : Literal["no_link", "file", "vscode", "pycharm"] | str
A URL scheme that allows task, node, and file names to become clickable links.
explain : bool, default=False
@@ -228,7 +225,6 @@ def build( # noqa: PLR0913
"debug_pytask": debug_pytask,
"disable_warnings": disable_warnings,
"dry_run": dry_run,
- "clean_lockfile": clean_lockfile,
"editor_url_scheme": editor_url_scheme,
"explain": explain,
"expression": expression,
@@ -357,12 +353,6 @@ def build( # noqa: PLR0913
default=False,
help="Execute a task even if it succeeded successfully before.",
)
-@click.option(
- "--clean-lockfile",
- is_flag=True,
- default=False,
- help="Rewrite the lockfile with only currently collected tasks.",
-)
@click.option(
"--explain",
is_flag=True,
diff --git a/src/_pytask/click.py b/src/_pytask/click.py
index ba7abbf8..1ab18fca 100644
--- a/src/_pytask/click.py
+++ b/src/_pytask/click.py
@@ -33,7 +33,7 @@
from collections.abc import Sequence
-__all__ = ["ColoredCommand", "ColoredGroup", "EnumChoice"]
+__all__ = ["ColoredCommand", "ColoredGroup", "EnumChoice", "get_command"]
if importlib.metadata.version("click") < "8.2":
@@ -47,6 +47,14 @@ def split_opt(option: str) -> tuple[str, str]:
return cast("Callable[[str], tuple[str, str]]", _split_opt)(option)
+def get_command(cli: click.Group, name: str) -> click.Command:
+ """Get a nested command by name."""
+ command: click.Command = cli
+ for part in name.split():
+ command = cast("click.Group", command).commands[part]
+ return command
+
+
class EnumChoice(Choice):
"""An enum-based choice type.
diff --git a/src/_pytask/lock.py b/src/_pytask/lock.py
new file mode 100644
index 00000000..e3dbf33d
--- /dev/null
+++ b/src/_pytask/lock.py
@@ -0,0 +1,373 @@
+"""Implement commands to inspect and update the lockfile."""
+
+from __future__ import annotations
+
+import sys
+from dataclasses import dataclass
+from itertools import chain
+from typing import TYPE_CHECKING
+from typing import Any
+from typing import cast
+
+import click
+
+from _pytask.click import ColoredCommand
+from _pytask.click import ColoredGroup
+from _pytask.console import console
+from _pytask.dag import create_dag
+from _pytask.dag_utils import task_and_preceding_tasks
+from _pytask.exceptions import CollectionError
+from _pytask.exceptions import ConfigurationError
+from _pytask.exceptions import ExecutionError
+from _pytask.exceptions import NodeNotFoundError
+from _pytask.exceptions import ResolvingDependenciesError
+from _pytask.lockfile import _build_task_entry
+from _pytask.lockfile import _TaskEntry
+from _pytask.lockfile import build_portable_task_id
+from _pytask.mark import Expression
+from _pytask.mark import KeywordMatcher
+from _pytask.mark import MarkMatcher
+from _pytask.mark import ParseError
+from _pytask.node_protocols import PNode
+from _pytask.node_protocols import PProvisionalNode
+from _pytask.node_protocols import PTask
+from _pytask.outcomes import ExitCode
+from _pytask.pluginmanager import hookimpl
+from _pytask.pluginmanager import storage
+from _pytask.session import Session
+from _pytask.traceback import Traceback
+
+if TYPE_CHECKING:
+ from collections.abc import Callable
+
+ from _pytask.lockfile import LockfileState
+
+
+@dataclass(slots=True)
+class _PlannedChange:
+ task_id: str
+ entry: _TaskEntry | None = None
+
+ @property
+ def is_accept(self) -> bool:
+ return self.entry is not None
+
+ def describe(self) -> str:
+ if self.is_accept:
+ return f"Accept recorded state for {self.task_id}"
+ return f"Remove recorded state for {self.task_id}"
+
+
+# Task selection.
+
+
+def _validate_confirmation_options(raw_config: dict[str, Any]) -> None:
+ if raw_config["dry_run"] and raw_config["yes"]:
+ msg = "The options '--dry-run' and '--yes' are mutually exclusive."
+ raise click.UsageError(msg)
+
+
+def _expression_filter(
+ tasks: list[PTask],
+ expression: str,
+ option: str,
+ matcher_from_task: Callable[[PTask], Any],
+) -> set[str]:
+ try:
+ compiled = Expression.compile_(expression)
+ except ParseError as e:
+ msg = f"Wrong expression passed to {option!r}: {expression}: {e}"
+ raise ValueError(msg) from None
+
+ return {
+ task.signature for task in tasks if compiled.evaluate(matcher_from_task(task))
+ }
+
+
+def _select_tasks_exact(session: Session) -> list[PTask]:
+ selected = {task.signature for task in session.tasks}
+
+ expression = session.config.get("expression")
+ if expression:
+ selected &= _expression_filter(
+ session.tasks, expression, "-k", KeywordMatcher.from_task
+ )
+
+ marker_expression = session.config.get("marker_expression")
+ if marker_expression:
+ selected &= _expression_filter(
+ session.tasks, marker_expression, "-m", MarkMatcher.from_task
+ )
+
+ return [task for task in session.tasks if task.signature in selected]
+
+
+def _select_tasks_with_ancestors(session: Session) -> list[PTask]:
+ selected = {task.signature for task in _select_tasks_exact(session)}
+ selected |= set(
+ chain.from_iterable(
+ task_and_preceding_tasks(signature, session.dag) for signature in selected
+ )
+ )
+ return [task for task in session.tasks if task.signature in selected]
+
+
+# Change planning.
+
+
+def _validate_task_for_accept(session: Session, task: PTask) -> None:
+ predecessors = set(session.dag.predecessors(task.signature))
+
+ for node_signature in chain(
+ predecessors, [task.signature], session.dag.successors(task.signature)
+ ):
+ node = session.dag.nodes[node_signature]
+
+ if node_signature not in predecessors and isinstance(node, PProvisionalNode):
+ continue
+
+ if isinstance(node, PProvisionalNode):
+ msg = (
+ f"Task {task.name!r} still references provisional node "
+ f"{node.name!r} while accepting lockfile state."
+ )
+ raise ExecutionError(msg)
+
+ if not isinstance(node, (PTask, PNode)):
+ continue
+
+ state = node.state()
+ if state is not None:
+ continue
+
+ if node_signature in predecessors:
+ msg = f"{task.name!r} requires missing node {node.name!r}."
+ raise NodeNotFoundError(msg)
+
+ if node_signature == task.signature:
+ msg = f"{task.name!r} has no state and cannot be accepted."
+ raise ExecutionError(msg)
+
+ msg = f"{task.name!r} is missing product {node.name!r}."
+ raise NodeNotFoundError(msg)
+
+
+def _plan_accept_changes(session: Session) -> list[_PlannedChange]:
+ root = session.config["root"]
+ planned_changes = []
+
+ for task in _select_tasks_with_ancestors(session):
+ _validate_task_for_accept(session, task)
+ entry = _build_task_entry(session, task, root)
+ if entry is None:
+ task_id = build_portable_task_id(task, root)
+ msg = f"{task_id!r} has no state and cannot be accepted."
+ raise ExecutionError(msg)
+
+ existing = session.config["lockfile_state"].get_task_entry(entry.id)
+ if existing != entry:
+ planned_changes.append(_PlannedChange(task_id=entry.id, entry=entry))
+
+ return planned_changes
+
+
+def _plan_reset_changes(session: Session) -> list[_PlannedChange]:
+ root = session.config["root"]
+ planned_changes = []
+
+ for task in _select_tasks_exact(session):
+ task_id = build_portable_task_id(task, root)
+ if session.config["lockfile_state"].get_task_entry(task_id) is not None:
+ planned_changes.append(_PlannedChange(task_id=task_id))
+
+ return planned_changes
+
+
+def _plan_clean_changes(session: Session) -> list[_PlannedChange]:
+ state: LockfileState = session.config["lockfile_state"]
+ current_task_ids = {
+ build_portable_task_id(task, session.config["root"]) for task in session.tasks
+ }
+ stale_ids = state.task_ids() - current_task_ids
+ return [_PlannedChange(task_id=task_id) for task_id in sorted(stale_ids)]
+
+
+# Change application.
+
+
+def _apply_changes(
+ session: Session, planned_changes: list[_PlannedChange]
+) -> list[_PlannedChange]:
+ if session.config["dry_run"]:
+ for change in planned_changes:
+ console.print(f"Would {change.describe().lower()}.")
+ return planned_changes
+
+ accepted = planned_changes
+ if not session.config["yes"]:
+ accepted = []
+ for change in planned_changes:
+ prompt = f"{change.describe()}?"
+ if click.confirm(prompt, default=False):
+ accepted.append(change)
+
+ if not accepted:
+ return []
+
+ state: LockfileState = session.config["lockfile_state"]
+ entries = [change.entry for change in accepted if change.entry is not None]
+ if entries:
+ state.set_task_entries(entries)
+
+ removed_ids = {change.task_id for change in accepted if change.entry is None}
+ if removed_ids:
+ state.remove_task_entries(removed_ids)
+
+ state.flush()
+
+ for change in accepted:
+ console.print(f"{change.describe()}.")
+
+ return accepted
+
+
+# Command execution.
+
+
+def _run_lock_command(
+ raw_config: dict[str, Any],
+ *,
+ planner: Callable[[Session], list[_PlannedChange]],
+ empty_message: str,
+) -> int:
+ _validate_confirmation_options(raw_config)
+ pm = storage.get()
+ from _pytask.cli import DEFAULTS_FROM_CLI # noqa: PLC0415
+
+ raw_config = cast("dict[str, Any]", DEFAULTS_FROM_CLI) | raw_config
+ raw_config["command"] = "lock"
+
+ try:
+ config = pm.hook.pytask_configure(pm=pm, raw_config=raw_config)
+ session = Session.from_config(config)
+ except (ConfigurationError, Exception): # noqa: BLE001
+ console.print(Traceback(sys.exc_info()))
+ session = Session(exit_code=ExitCode.CONFIGURATION_FAILED)
+ else:
+ try:
+ session.hook.pytask_log_session_header(session=session)
+ session.hook.pytask_collect(session=session)
+ session.dag = create_dag(session=session)
+
+ planned_changes = planner(session)
+
+ if planned_changes:
+ _apply_changes(session, planned_changes)
+ else:
+ console.print()
+ console.print(empty_message)
+
+ # Journal replay can make the lockfile state dirty even if this lock command
+ # has no net changes. Flush it so replayed entries are persisted and the
+ # journal is removed.
+ if not session.config["dry_run"]:
+ session.config["lockfile_state"].flush()
+
+ console.print()
+ console.rule(style="default")
+ except CollectionError:
+ session.exit_code = ExitCode.COLLECTION_FAILED
+ console.rule(style="failed")
+ except ResolvingDependenciesError:
+ session.exit_code = ExitCode.DAG_FAILED
+ console.rule(style="failed")
+ except Exception: # noqa: BLE001
+ console.print(Traceback(sys.exc_info()))
+ console.rule(style="failed")
+ session.exit_code = ExitCode.FAILED
+
+ # Configuration can fail before the session receives the plugin manager's hook
+ # relay. A fallback session only has a bare HookRelay without this hook.
+ if hasattr(session.hook, "pytask_unconfigure"):
+ session.hook.pytask_unconfigure(session=session)
+ return session.exit_code
+
+
+# Command line interface.
+
+
+def _add_lock_command_options(
+ *, dry_run_help: str
+) -> Callable[[Callable[..., None]], Callable[..., None]]:
+ def decorator(func: Callable[..., None]) -> Callable[..., None]:
+ func = click.option(
+ "--dry-run",
+ is_flag=True,
+ default=False,
+ help=dry_run_help,
+ )(func)
+ return click.option(
+ "-y",
+ "--yes",
+ is_flag=True,
+ default=False,
+ help="Apply the changes without prompting for confirmation.",
+ )(func)
+
+ return decorator
+
+
+@hookimpl(tryfirst=True)
+def pytask_extend_command_line_interface(cli: click.Group) -> None:
+ """Extend the command line interface."""
+ cli.add_command(lock)
+
+
+@click.group(cls=ColoredGroup)
+def lock() -> None:
+ """Inspect and update recorded task state in the lockfile."""
+
+
+@lock.command(cls=ColoredCommand)
+@_add_lock_command_options(
+ dry_run_help="Show which recorded states would be updated without writing changes."
+)
+def accept(**raw_config: Any) -> None:
+ """Accept the current state for selected tasks and their ancestors."""
+ sys.exit(
+ _run_lock_command(
+ raw_config,
+ planner=_plan_accept_changes,
+ empty_message="No lockfile entries need updating.",
+ )
+ )
+
+
+@lock.command(cls=ColoredCommand)
+@_add_lock_command_options(
+ dry_run_help="Show which recorded states would be removed without writing changes."
+)
+def reset(**raw_config: Any) -> None:
+ """Remove recorded state for selected tasks."""
+ sys.exit(
+ _run_lock_command(
+ raw_config,
+ planner=_plan_reset_changes,
+ empty_message="No lockfile entries need removing.",
+ )
+ )
+
+
+@lock.command(cls=ColoredCommand)
+@_add_lock_command_options(
+ dry_run_help="Show which stale entries would be removed without writing changes."
+)
+def clean(**raw_config: Any) -> None:
+ """Remove stale lockfile entries which no longer correspond to collected tasks."""
+ sys.exit(
+ _run_lock_command(
+ raw_config,
+ planner=_plan_clean_changes,
+ empty_message="There are no stale lockfile entries.",
+ )
+ )
diff --git a/src/_pytask/lockfile.py b/src/_pytask/lockfile.py
index ea8ba3d6..080fdaa2 100644
--- a/src/_pytask/lockfile.py
+++ b/src/_pytask/lockfile.py
@@ -61,11 +61,11 @@ class _JournalEntry(msgspec.Struct):
def _should_initialize_lockfile_state(command: str | None) -> bool:
- return command in (None, "build")
+ return command in (None, "build", "lock")
def _should_validate_lockfile_ids(command: str | None) -> bool:
- return command in (None, "build", "collect")
+ return command in (None, "build", "collect", "lock")
def _encode_node_path(path: tuple[str | int, ...]) -> str:
@@ -365,6 +365,17 @@ def get_task_entry(self, task_id: str) -> _TaskEntry | None:
def get_node_state(self, task_id: str, node_id: str) -> str | None:
return self._node_index.get(task_id, {}).get(node_id)
+ def task_ids(self) -> set[str]:
+ return set(self._task_index)
+
+ def _update_from_task_index(self) -> None:
+ self.lockfile = _Lockfile(
+ lock_version=CURRENT_LOCKFILE_VERSION,
+ task=list(self._task_index.values()),
+ )
+ self._rebuild_indexes()
+ self._dirty = True
+
def update_task(self, session: Session, task: PTask) -> None:
entry = _build_task_entry(session, task, self.root)
if entry is None:
@@ -390,6 +401,28 @@ def update_task(self, session: Session, task: PTask) -> None:
)
self._dirty = True
+ def set_task_entries(self, entries: list[_TaskEntry]) -> list[str]:
+ changed = []
+ for entry in entries:
+ existing = self._task_index.get(entry.id)
+ if existing == entry:
+ continue
+ self._task_index[entry.id] = entry
+ changed.append(entry.id)
+ if changed:
+ self._update_from_task_index()
+ return changed
+
+ def remove_task_entries(self, task_ids: set[str]) -> list[str]:
+ removed = []
+ for task_id in task_ids:
+ if task_id in self._task_index:
+ del self._task_index[task_id]
+ removed.append(task_id)
+ if removed:
+ self._update_from_task_index()
+ return removed
+
def rebuild_from_session(self, session: Session) -> None:
if session.dag is None:
return
@@ -451,7 +484,4 @@ def pytask_unconfigure(session: Session) -> None:
lockfile_state = session.config.get("lockfile_state")
if lockfile_state is None:
return
- if session.config.get("clean_lockfile"):
- lockfile_state.rebuild_from_session(session)
- else:
- lockfile_state.flush()
+ lockfile_state.flush()
diff --git a/src/_pytask/mark/__init__.py b/src/_pytask/mark/__init__.py
index 8864d0f1..54219c90 100644
--- a/src/_pytask/mark/__init__.py
+++ b/src/_pytask/mark/__init__.py
@@ -11,6 +11,7 @@
from rich.table import Table
from _pytask.click import ColoredCommand
+from _pytask.click import get_command
from _pytask.console import console
from _pytask.dag_utils import task_and_preceding_tasks
from _pytask.exceptions import ConfigurationError
@@ -101,8 +102,9 @@ def pytask_extend_command_line_interface(cli: click.Group) -> None:
default=None,
),
]
- for command in ("build", "clean", "collect"):
- cli.commands[command].params.extend(additional_build_parameters)
+ for command in ("build", "clean", "collect", "lock accept", "lock reset"):
+ target = get_command(cli, command)
+ target.params.extend(additional_build_parameters)
@hookimpl
diff --git a/src/_pytask/parameters.py b/src/_pytask/parameters.py
index ae38827e..3baee8ef 100644
--- a/src/_pytask/parameters.py
+++ b/src/_pytask/parameters.py
@@ -12,6 +12,7 @@
from sqlalchemy.engine import make_url
from sqlalchemy.exc import ArgumentError
+from _pytask.click import get_command
from _pytask.config_utils import set_defaults_from_config
from _pytask.path import import_path
from _pytask.pluginmanager import hookimpl
@@ -184,11 +185,29 @@ def pytask_extend_command_line_interface(cli: click.Group) -> None:
"""Register general markers."""
for command in ("build", "clean", "collect", "dag", "profile"):
cli.commands[command].params.extend((_DATABASE_URL_OPTION,))
- for command in ("build", "clean", "collect", "dag", "markers", "profile"):
- cli.commands[command].params.extend(
- (_CONFIG_OPTION, _HOOK_MODULE_OPTION, _PATH_ARGUMENT)
- )
- for command in ("build", "clean", "collect", "profile"):
- cli.commands[command].params.extend([_IGNORE_OPTION, _EDITOR_URL_SCHEME_OPTION])
+ for command in (
+ "build",
+ "clean",
+ "collect",
+ "dag",
+ "lock accept",
+ "lock clean",
+ "lock reset",
+ "markers",
+ "profile",
+ ):
+ target = get_command(cli, command)
+ target.params.extend((_CONFIG_OPTION, _HOOK_MODULE_OPTION, _PATH_ARGUMENT))
+ for command in (
+ "build",
+ "clean",
+ "collect",
+ "lock accept",
+ "lock clean",
+ "lock reset",
+ "profile",
+ ):
+ target = get_command(cli, command)
+ target.params.extend([_IGNORE_OPTION, _EDITOR_URL_SCHEME_OPTION])
for command in ("build",):
cli.commands[command].params.append(_VERBOSE_OPTION)
diff --git a/src/_pytask/pluginmanager.py b/src/_pytask/pluginmanager.py
index 774de974..025f8698 100644
--- a/src/_pytask/pluginmanager.py
+++ b/src/_pytask/pluginmanager.py
@@ -53,6 +53,7 @@ def pytask_add_hooks(pm: PluginManager) -> None:
"_pytask.provisional",
"_pytask.execute",
"_pytask.live",
+ "_pytask.lock",
"_pytask.lockfile",
"_pytask.logging",
"_pytask.mark",
diff --git a/tests/test_lock_command.py b/tests/test_lock_command.py
new file mode 100644
index 00000000..022e9700
--- /dev/null
+++ b/tests/test_lock_command.py
@@ -0,0 +1,1141 @@
+from __future__ import annotations
+
+import textwrap
+
+import pytest
+
+from _pytask.lockfile import read_lockfile
+from pytask import ExitCode
+from pytask import build
+from pytask import cli
+
+
+def _write_chain_project(tmp_path):
+ tmp_path.joinpath("task_upstream.py").write_text(
+ textwrap.dedent(
+ """
+ from pathlib import Path
+
+
+ def task_upstream(produces=Path("up.txt")):
+ produces.write_text("up")
+ """
+ )
+ )
+ tmp_path.joinpath("task_downstream.py").write_text(
+ textwrap.dedent(
+ """
+ from pathlib import Path
+
+
+ def task_downstream(depends_on=Path("up.txt"), produces=Path("down.txt")):
+ produces.write_text(depends_on.read_text() + "down")
+ """
+ )
+ )
+
+
+def _write_marked_chain_project(tmp_path):
+ tmp_path.joinpath("task_upstream.py").write_text(
+ textwrap.dedent(
+ """
+ from pathlib import Path
+
+
+ def task_upstream(produces=Path("up.txt")):
+ produces.write_text("up")
+ """
+ )
+ )
+ tmp_path.joinpath("task_downstream.py").write_text(
+ textwrap.dedent(
+ """
+ from pathlib import Path
+
+ import pytask
+
+
+ @pytask.mark.try_first
+ def task_downstream(depends_on=Path("up.txt"), produces=Path("down.txt")):
+ produces.write_text(depends_on.read_text() + "down")
+ """
+ )
+ )
+
+
+def _write_single_task_project(tmp_path):
+ tmp_path.joinpath("task_example.py").write_text(
+ textwrap.dedent(
+ """
+ from pathlib import Path
+
+
+ def task_example(produces=Path("out.txt")):
+ produces.write_text("data")
+ """
+ )
+ )
+
+
+def _task_ids(tmp_path):
+ lockfile = read_lockfile(tmp_path / "pytask.lock")
+ assert lockfile is not None
+ return {entry.id for entry in lockfile.task}
+
+
+def _task_state_by_suffix(tmp_path, suffix):
+ lockfile = read_lockfile(tmp_path / "pytask.lock")
+ assert lockfile is not None
+ for entry in lockfile.task:
+ if entry.id.endswith(suffix):
+ return entry.state
+ msg = f"Could not find lockfile entry ending with {suffix!r}."
+ raise AssertionError(msg)
+
+
+def _lockfile_text(tmp_path):
+ return (tmp_path / "pytask.lock").read_text()
+
+
+def _task_by_suffix(session, suffix):
+ for task in session.tasks:
+ if task.name.endswith(suffix):
+ return task
+ msg = f"Could not find collected task ending with {suffix!r}."
+ raise AssertionError(msg)
+
+
+def test_lock_help_lists_subcommands(runner):
+ result = runner.invoke(cli, ["lock", "--help"])
+
+ assert result.exit_code == ExitCode.OK
+ assert "accept" in result.output
+ assert "reset" in result.output
+ assert "clean" in result.output
+
+
+def test_build_help_no_longer_lists_clean_lockfile(runner):
+ result = runner.invoke(cli, ["build", "--help"])
+
+ assert result.exit_code == ExitCode.OK
+ assert "--clean-lockfile" not in result.output
+
+
+def test_lock_accept_creates_lockfile_without_executing_tasks(runner, tmp_path):
+ source = """
+ from pathlib import Path
+
+
+ def task_example(depends_on=Path("in.txt"), produces=Path("out.txt")):
+ raise RuntimeError("should not execute")
+ """
+ tmp_path.joinpath("task_module.py").write_text(textwrap.dedent(source))
+ tmp_path.joinpath("in.txt").write_text("data")
+ tmp_path.joinpath("out.txt").write_text("data")
+
+ result = runner.invoke(cli, ["lock", "accept", "--yes", tmp_path.as_posix()])
+
+ assert result.exit_code == ExitCode.OK
+ assert (tmp_path / "pytask.lock").exists()
+ assert "should not execute" not in result.output
+
+
+def test_lock_accept_includes_ancestors_of_selected_tasks(runner, tmp_path):
+ _write_chain_project(tmp_path)
+
+ result = runner.invoke(cli, [tmp_path.as_posix()])
+ assert result.exit_code == ExitCode.OK
+
+ upstream = tmp_path / "task_upstream.py"
+ downstream = tmp_path / "task_downstream.py"
+ upstream_before = _task_state_by_suffix(tmp_path, "task_upstream.py::task_upstream")
+ downstream_before = _task_state_by_suffix(
+ tmp_path, "task_downstream.py::task_downstream"
+ )
+
+ upstream.write_text(upstream.read_text() + "\n# changed upstream\n")
+ downstream.write_text(downstream.read_text() + "\n# changed downstream\n")
+
+ result = runner.invoke(
+ cli,
+ [
+ "lock",
+ "accept",
+ "-k",
+ "downstream",
+ "--yes",
+ tmp_path.as_posix(),
+ ],
+ )
+
+ assert result.exit_code == ExitCode.OK
+ assert (
+ _task_state_by_suffix(tmp_path, "task_upstream.py::task_upstream")
+ != upstream_before
+ )
+ assert (
+ _task_state_by_suffix(tmp_path, "task_downstream.py::task_downstream")
+ != downstream_before
+ )
+
+
+def test_lock_accept_does_not_include_descendants_of_selected_tasks(runner, tmp_path):
+ _write_chain_project(tmp_path)
+
+ result = runner.invoke(cli, [tmp_path.as_posix()])
+ assert result.exit_code == ExitCode.OK
+
+ upstream = tmp_path / "task_upstream.py"
+ downstream = tmp_path / "task_downstream.py"
+ upstream_before = _task_state_by_suffix(tmp_path, "task_upstream.py::task_upstream")
+ downstream_before = _task_state_by_suffix(
+ tmp_path, "task_downstream.py::task_downstream"
+ )
+
+ upstream.write_text(upstream.read_text() + "\n# changed upstream\n")
+ downstream.write_text(downstream.read_text() + "\n# changed downstream\n")
+
+ result = runner.invoke(
+ cli,
+ [
+ "lock",
+ "accept",
+ "-k",
+ "upstream",
+ "--yes",
+ tmp_path.as_posix(),
+ ],
+ )
+
+ assert result.exit_code == ExitCode.OK
+ assert (
+ _task_state_by_suffix(tmp_path, "task_upstream.py::task_upstream")
+ != upstream_before
+ )
+ assert (
+ _task_state_by_suffix(tmp_path, "task_downstream.py::task_downstream")
+ == downstream_before
+ )
+
+
+def test_lock_accept_updates_selected_task(runner, tmp_path):
+ _write_chain_project(tmp_path)
+
+ result = runner.invoke(cli, [tmp_path.as_posix()])
+ assert result.exit_code == ExitCode.OK
+
+ before_upstream = _task_state_by_suffix(tmp_path, "task_upstream.py::task_upstream")
+ before_downstream = _task_state_by_suffix(
+ tmp_path, "task_downstream.py::task_downstream"
+ )
+
+ downstream = tmp_path / "task_downstream.py"
+ downstream.write_text(downstream.read_text() + "\n# changed without rerunning\n")
+
+ result = runner.invoke(
+ cli, ["lock", "accept", "-k", "downstream", "--yes", tmp_path.as_posix()]
+ )
+
+ assert result.exit_code == ExitCode.OK
+ assert (
+ _task_state_by_suffix(tmp_path, "task_upstream.py::task_upstream")
+ == before_upstream
+ )
+ assert (
+ _task_state_by_suffix(tmp_path, "task_downstream.py::task_downstream")
+ != before_downstream
+ )
+
+
+def test_lock_accept_uses_intersection_of_keyword_and_marker_selection(
+ runner, tmp_path
+):
+ _write_marked_chain_project(tmp_path)
+
+ result = runner.invoke(cli, [tmp_path.as_posix()])
+ assert result.exit_code == ExitCode.OK
+
+ before_upstream = _task_state_by_suffix(tmp_path, "task_upstream.py::task_upstream")
+ before_downstream = _task_state_by_suffix(
+ tmp_path, "task_downstream.py::task_downstream"
+ )
+
+ downstream = tmp_path / "task_downstream.py"
+ downstream.write_text(downstream.read_text() + "\n# changed downstream\n")
+
+ result = runner.invoke(
+ cli,
+ [
+ "lock",
+ "accept",
+ "-k",
+ "downstream",
+ "-m",
+ "try_first",
+ "--yes",
+ tmp_path.as_posix(),
+ ],
+ )
+
+ assert result.exit_code == ExitCode.OK
+ assert (
+ _task_state_by_suffix(tmp_path, "task_upstream.py::task_upstream")
+ == before_upstream
+ )
+ assert (
+ _task_state_by_suffix(tmp_path, "task_downstream.py::task_downstream")
+ != before_downstream
+ )
+
+
+def test_lock_accept_no_matching_selection_is_a_no_op(runner, tmp_path):
+ _write_chain_project(tmp_path)
+
+ result = runner.invoke(cli, [tmp_path.as_posix()])
+ assert result.exit_code == ExitCode.OK
+
+ before = _lockfile_text(tmp_path)
+
+ result = runner.invoke(
+ cli, ["lock", "accept", "-k", "missing", "--yes", tmp_path.as_posix()]
+ )
+
+ assert result.exit_code == ExitCode.OK
+ assert "No lockfile entries need updating." in result.output
+ assert _lockfile_text(tmp_path) == before
+
+
+def test_lock_accept_interactive_only_applies_confirmed_changes(runner, tmp_path):
+ _write_chain_project(tmp_path)
+
+ result = runner.invoke(cli, [tmp_path.as_posix()])
+ assert result.exit_code == ExitCode.OK
+
+ upstream = tmp_path / "task_upstream.py"
+ downstream = tmp_path / "task_downstream.py"
+ before_upstream = _task_state_by_suffix(tmp_path, "task_upstream.py::task_upstream")
+ before_downstream = _task_state_by_suffix(
+ tmp_path, "task_downstream.py::task_downstream"
+ )
+
+ upstream.write_text(upstream.read_text() + "\n# changed upstream\n")
+ downstream.write_text(downstream.read_text() + "\n# changed downstream\n")
+
+ result = runner.invoke(
+ cli,
+ [
+ "lock",
+ "accept",
+ "-k",
+ "downstream",
+ tmp_path.as_posix(),
+ ],
+ input="y\nn\n",
+ )
+
+ assert result.exit_code == ExitCode.OK
+ upstream_changed = (
+ _task_state_by_suffix(tmp_path, "task_upstream.py::task_upstream")
+ != before_upstream
+ )
+ downstream_changed = (
+ _task_state_by_suffix(tmp_path, "task_downstream.py::task_downstream")
+ != before_downstream
+ )
+ assert upstream_changed ^ downstream_changed
+
+
+def test_lock_accept_current_task_is_a_no_op_without_rewrite(runner, tmp_path):
+ _write_chain_project(tmp_path)
+
+ result = runner.invoke(cli, [tmp_path.as_posix()])
+ assert result.exit_code == ExitCode.OK
+
+ lockfile = tmp_path / "pytask.lock"
+ before_text = lockfile.read_text()
+ before_mtime = lockfile.stat().st_mtime_ns
+
+ result = runner.invoke(
+ cli, ["lock", "accept", "-k", "downstream", "--yes", tmp_path.as_posix()]
+ )
+
+ assert result.exit_code == ExitCode.OK
+ assert "No lockfile entries need updating." in result.output
+ assert lockfile.read_text() == before_text
+ assert lockfile.stat().st_mtime_ns == before_mtime
+
+
+def test_lock_accept_fails_for_missing_product(runner, tmp_path):
+ source = """
+ from pathlib import Path
+
+
+ def task_example(depends_on=Path("in.txt"), produces=Path("out.txt")):
+ raise RuntimeError("should not execute")
+ """
+ tmp_path.joinpath("task_module.py").write_text(textwrap.dedent(source))
+ tmp_path.joinpath("in.txt").write_text("data")
+
+ result = runner.invoke(cli, ["lock", "accept", "--yes", tmp_path.as_posix()])
+
+ assert result.exit_code == ExitCode.FAILED
+ assert "missing product" in result.output
+
+
+def test_lock_accept_fails_for_missing_dependency(runner, tmp_path):
+ source = """
+ from pathlib import Path
+
+
+ def task_example(depends_on=Path("in.txt"), produces=Path("out.txt")):
+ raise RuntimeError("should not execute")
+ """
+ tmp_path.joinpath("task_module.py").write_text(textwrap.dedent(source))
+ tmp_path.joinpath("out.txt").write_text("data")
+
+ result = runner.invoke(cli, ["lock", "accept", "--yes", tmp_path.as_posix()])
+
+ assert result.exit_code == ExitCode.FAILED
+ assert "requires missing node" in result.output
+
+
+def test_lock_accept_fails_when_task_state_is_missing(runner, tmp_path):
+ source = """
+ from dataclasses import dataclass, field
+ from pathlib import Path
+ from typing import Any
+
+ from pytask import PathNode
+
+ @dataclass(kw_only=True)
+ class CustomTask:
+ name: str
+ function: Any
+ depends_on: dict[str, Any] = field(default_factory=dict)
+ produces: dict[str, Any] = field(default_factory=dict)
+ markers: list[Any] = field(default_factory=list)
+ report_sections: list[tuple[str, str, str]] = field(default_factory=list)
+ attributes: dict[Any, Any] = field(default_factory=dict)
+
+ @property
+ def signature(self):
+ return "custom-signature"
+
+ def state(self):
+ return None
+
+ def execute(self, **kwargs):
+ return self.function(**kwargs)
+
+ def func(path): raise RuntimeError("should not execute")
+
+ task_create_file = CustomTask(
+ name="task_custom",
+ function=func,
+ produces={"path": PathNode(path=Path(__file__).parent / "out.txt")},
+ )
+ """
+ tmp_path.joinpath("task_example.py").write_text(textwrap.dedent(source))
+ tmp_path.joinpath("out.txt").write_text("done")
+
+ result = runner.invoke(cli, ["lock", "accept", "--yes", tmp_path.as_posix()])
+
+ assert result.exit_code == ExitCode.FAILED
+ assert "has no state and cannot be accepted" in result.output
+
+
+def test_lock_accept_works_for_task_without_path_via_cli(runner, tmp_path):
+ source = """
+ from pathlib import Path
+
+ from pytask import PathNode, TaskWithoutPath
+
+ def func(path): raise RuntimeError("should not execute")
+
+ task_create_file = TaskWithoutPath(
+ name="task_without_path",
+ function=func,
+ produces={"path": PathNode(path=Path(__file__).parent / "out.txt")},
+ )
+ """
+ tmp_path.joinpath("task_example.py").write_text(textwrap.dedent(source))
+ tmp_path.joinpath("out.txt").write_text("done")
+
+ result = runner.invoke(cli, ["lock", "accept", "--yes", tmp_path.as_posix()])
+
+ assert result.exit_code == ExitCode.OK
+ assert _task_ids(tmp_path) == {"task_without_path"}
+
+
+def test_lock_accept_records_custom_node_state(runner, tmp_path):
+ source = """
+ from dataclasses import dataclass, field
+ from pathlib import Path
+ from typing import Any, Annotated
+
+ from pytask import Product
+
+ @dataclass
+ class CustomNode:
+ name: str
+ filepath: Path
+ signature: str
+ attributes: dict[Any, Any] = field(default_factory=dict)
+
+ def state(self):
+ if not self.filepath.exists():
+ return None
+ return self.filepath.read_text()
+
+ def load(self, is_product=False):
+ return self if is_product else self.filepath.read_text()
+
+ def save(self, value):
+ self.filepath.write_text(value)
+
+ def task_example(
+ dependency=CustomNode(
+ name="custom_dependency",
+ filepath=Path(__file__).parent / "in.txt",
+ signature="signature-a",
+ ),
+ product: Annotated[CustomNode, Product] = CustomNode(
+ name="custom_product",
+ filepath=Path(__file__).parent / "out.txt",
+ signature="signature-b",
+ ),
+ ):
+ raise RuntimeError("should not execute")
+ """
+ tmp_path.joinpath("task_module.py").write_text(textwrap.dedent(source))
+ tmp_path.joinpath("in.txt").write_text("hello")
+ tmp_path.joinpath("out.txt").write_text("HELLO")
+
+ result = runner.invoke(cli, ["lock", "accept", "--yes", tmp_path.as_posix()])
+
+ assert result.exit_code == ExitCode.OK
+ lockfile = read_lockfile(tmp_path / "pytask.lock")
+ assert lockfile is not None
+ entry = lockfile.task[0]
+ assert "custom_dependency" in entry.depends_on
+ assert "custom_product" in entry.produces
+
+
+def test_lock_accept_fails_with_provisional_dependencies(runner, tmp_path):
+ source = """
+ from typing import Annotated
+ from pathlib import Path
+
+ from pytask import DirectoryNode
+
+ def task_example(
+ paths=DirectoryNode(pattern="*.txt")
+ ) -> Annotated[str, Path("out.txt")]:
+ raise RuntimeError("should not execute")
+ """
+ tmp_path.joinpath("task_module.py").write_text(textwrap.dedent(source))
+ tmp_path.joinpath("a.txt").write_text("a")
+
+ result = runner.invoke(cli, ["lock", "accept", "--yes", tmp_path.as_posix()])
+
+ assert result.exit_code == ExitCode.FAILED
+ assert "accepting lockfile state" in result.output
+
+
+def test_lock_reset_only_affects_exact_selection(runner, tmp_path):
+ _write_chain_project(tmp_path)
+
+ result = runner.invoke(cli, [tmp_path.as_posix()])
+ assert result.exit_code == ExitCode.OK
+
+ result = runner.invoke(
+ cli, ["lock", "reset", "-k", "downstream", "--yes", tmp_path.as_posix()]
+ )
+
+ assert result.exit_code == ExitCode.OK
+ assert _task_ids(tmp_path) == {"task_upstream.py::task_upstream"}
+
+
+def test_lock_reset_uses_intersection_of_keyword_and_marker_selection(runner, tmp_path):
+ _write_marked_chain_project(tmp_path)
+
+ result = runner.invoke(cli, [tmp_path.as_posix()])
+ assert result.exit_code == ExitCode.OK
+
+ result = runner.invoke(
+ cli,
+ [
+ "lock",
+ "reset",
+ "-k",
+ "downstream",
+ "-m",
+ "try_first",
+ "--yes",
+ tmp_path.as_posix(),
+ ],
+ )
+
+ assert result.exit_code == ExitCode.OK
+ assert _task_ids(tmp_path) == {"task_upstream.py::task_upstream"}
+
+
+def test_lock_reset_no_matching_selection_is_a_no_op(runner, tmp_path):
+ _write_chain_project(tmp_path)
+
+ result = runner.invoke(cli, [tmp_path.as_posix()])
+ assert result.exit_code == ExitCode.OK
+
+ before = _lockfile_text(tmp_path)
+
+ result = runner.invoke(
+ cli, ["lock", "reset", "-k", "missing", "--yes", tmp_path.as_posix()]
+ )
+
+ assert result.exit_code == ExitCode.OK
+ assert "No lockfile entries need removing." in result.output
+ assert _lockfile_text(tmp_path) == before
+
+
+def test_lock_reset_when_selected_task_is_absent_is_a_no_op(runner, tmp_path):
+ _write_chain_project(tmp_path)
+
+ result = runner.invoke(cli, [tmp_path.as_posix()])
+ assert result.exit_code == ExitCode.OK
+
+ result = runner.invoke(
+ cli, ["lock", "reset", "-k", "downstream", "--yes", tmp_path.as_posix()]
+ )
+ assert result.exit_code == ExitCode.OK
+
+ before = _lockfile_text(tmp_path)
+ result = runner.invoke(
+ cli, ["lock", "reset", "-k", "downstream", "--yes", tmp_path.as_posix()]
+ )
+
+ assert result.exit_code == ExitCode.OK
+ assert "No lockfile entries need removing." in result.output
+ assert _lockfile_text(tmp_path) == before
+
+
+def test_lock_reset_interactive_only_applies_confirmed_changes(runner, tmp_path):
+ _write_chain_project(tmp_path)
+
+ result = runner.invoke(cli, [tmp_path.as_posix()])
+ assert result.exit_code == ExitCode.OK
+
+ result = runner.invoke(
+ cli,
+ [
+ "lock",
+ "reset",
+ tmp_path.as_posix(),
+ ],
+ input="y\nn\n",
+ )
+
+ assert result.exit_code == ExitCode.OK
+ assert len(_task_ids(tmp_path)) == 1
+
+
+def test_lock_reset_dry_run_does_not_modify_lockfile(runner, tmp_path):
+ _write_chain_project(tmp_path)
+
+ result = runner.invoke(cli, [tmp_path.as_posix()])
+ assert result.exit_code == ExitCode.OK
+
+ before = _lockfile_text(tmp_path)
+
+ result = runner.invoke(
+ cli, ["lock", "reset", "-k", "downstream", "--dry-run", tmp_path.as_posix()]
+ )
+
+ assert result.exit_code == ExitCode.OK
+ assert _lockfile_text(tmp_path) == before
+
+
+def test_lock_reset_followed_by_build_reconsiders_task(runner, tmp_path):
+ _write_chain_project(tmp_path)
+
+ result = runner.invoke(cli, [tmp_path.as_posix()])
+ assert result.exit_code == ExitCode.OK
+
+ result = runner.invoke(
+ cli, ["lock", "reset", "-k", "downstream", "--yes", tmp_path.as_posix()]
+ )
+ assert result.exit_code == ExitCode.OK
+
+ result = runner.invoke(cli, [tmp_path.as_posix()])
+
+ assert result.exit_code == ExitCode.OK
+ assert "1 Succeeded" in result.output
+ assert "1 Skipped because unchanged" in result.output
+
+
+def test_lock_clean_removes_stale_entries(runner, tmp_path):
+ _write_chain_project(tmp_path)
+
+ result = runner.invoke(cli, [tmp_path.as_posix()])
+ assert result.exit_code == ExitCode.OK
+
+ tmp_path.joinpath("task_downstream.py").unlink()
+
+ result = runner.invoke(cli, ["lock", "clean", "--yes", tmp_path.as_posix()])
+
+ assert result.exit_code == ExitCode.OK
+ assert _task_ids(tmp_path) == {"task_upstream.py::task_upstream"}
+
+
+def test_lock_clean_dry_run_is_read_only(runner, tmp_path):
+ _write_chain_project(tmp_path)
+
+ result = runner.invoke(cli, [tmp_path.as_posix()])
+ assert result.exit_code == ExitCode.OK
+
+ tmp_path.joinpath("task_downstream.py").unlink()
+ before = _lockfile_text(tmp_path)
+
+ result = runner.invoke(cli, ["lock", "clean", "--dry-run", tmp_path.as_posix()])
+
+ assert result.exit_code == ExitCode.OK
+ assert (
+ "Would remove recorded state for task_downstream.py::task_downstream."
+ in result.output
+ )
+ assert _lockfile_text(tmp_path) == before
+
+
+def test_lock_clean_interactive_only_applies_confirmed_changes(runner, tmp_path):
+ tmp_path.joinpath("task_alpha.py").write_text(
+ textwrap.dedent(
+ """
+ from pathlib import Path
+
+
+ def task_alpha(produces=Path("alpha.txt")):
+ produces.write_text("alpha")
+ """
+ )
+ )
+ tmp_path.joinpath("task_beta.py").write_text(
+ textwrap.dedent(
+ """
+ from pathlib import Path
+
+
+ def task_beta(produces=Path("beta.txt")):
+ produces.write_text("beta")
+ """
+ )
+ )
+
+ result = runner.invoke(cli, [tmp_path.as_posix()])
+ assert result.exit_code == ExitCode.OK
+
+ tmp_path.joinpath("task_alpha.py").unlink()
+ tmp_path.joinpath("task_beta.py").unlink()
+
+ result = runner.invoke(
+ cli,
+ ["lock", "clean", tmp_path.as_posix()],
+ input="y\nn\n",
+ )
+
+ assert result.exit_code == ExitCode.OK
+ assert len(_task_ids(tmp_path)) == 1
+
+
+def test_lock_clean_reports_when_no_stale_entries_exist(runner, tmp_path):
+ _write_chain_project(tmp_path)
+
+ result = runner.invoke(cli, [tmp_path.as_posix()])
+ assert result.exit_code == ExitCode.OK
+
+ result = runner.invoke(cli, ["lock", "clean", "--yes", tmp_path.as_posix()])
+
+ assert result.exit_code == ExitCode.OK
+ assert "There are no stale lockfile entries." in result.output
+
+
+def test_lock_clean_on_fresh_project_without_lockfile_is_harmless(runner, tmp_path):
+ _write_chain_project(tmp_path)
+
+ result = runner.invoke(cli, ["lock", "clean", "--yes", tmp_path.as_posix()])
+
+ assert result.exit_code == ExitCode.OK
+ assert "There are no stale lockfile entries." in result.output
+ assert not (tmp_path / "pytask.lock").exists()
+
+
+def test_lock_clean_removes_multiple_stale_entries_without_adding_new_ones(
+ runner, tmp_path
+):
+ tmp_path.joinpath("task_alpha.py").write_text(
+ textwrap.dedent(
+ """
+ from pathlib import Path
+
+
+ def task_alpha(produces=Path("alpha.txt")):
+ produces.write_text("alpha")
+ """
+ )
+ )
+ tmp_path.joinpath("task_beta.py").write_text(
+ textwrap.dedent(
+ """
+ from pathlib import Path
+
+
+ def task_beta(produces=Path("beta.txt")):
+ produces.write_text("beta")
+ """
+ )
+ )
+
+ result = runner.invoke(cli, [tmp_path.as_posix()])
+ assert result.exit_code == ExitCode.OK
+
+ tmp_path.joinpath("task_alpha.py").unlink()
+ tmp_path.joinpath("task_beta.py").unlink()
+ tmp_path.joinpath("task_gamma.py").write_text(
+ textwrap.dedent(
+ """
+ from pathlib import Path
+
+
+ def task_gamma(produces=Path("gamma.txt")):
+ produces.write_text("gamma")
+ """
+ )
+ )
+
+ result = runner.invoke(cli, ["lock", "clean", "--yes", tmp_path.as_posix()])
+
+ assert result.exit_code == ExitCode.OK
+ assert _task_ids(tmp_path) == set()
+
+
+def test_lock_accept_followed_by_build_skips_changed_task(runner, tmp_path):
+ _write_chain_project(tmp_path)
+
+ result = runner.invoke(cli, [tmp_path.as_posix()])
+ assert result.exit_code == ExitCode.OK
+
+ downstream = tmp_path / "task_downstream.py"
+ downstream.write_text(downstream.read_text() + "\n# changed downstream\n")
+
+ result = runner.invoke(
+ cli, ["lock", "accept", "-k", "downstream", "--yes", tmp_path.as_posix()]
+ )
+ assert result.exit_code == ExitCode.OK
+
+ result = runner.invoke(cli, [tmp_path.as_posix()])
+
+ assert result.exit_code == ExitCode.OK
+ assert "2 Skipped because unchanged" in result.output
+
+
+class TestScenarios:
+ def test_lock_accept_and_reset_end_to_end_workflow_for_single_task(
+ self, runner, tmp_path
+ ):
+ _write_single_task_project(tmp_path)
+
+ result = runner.invoke(cli, [tmp_path.as_posix()])
+ assert result.exit_code == ExitCode.OK
+
+ task = tmp_path / "task_example.py"
+ task.write_text(task.read_text() + "\n# changed without rerunning\n")
+
+ result = runner.invoke(
+ cli, ["lock", "accept", "-k", "example", "--yes", tmp_path.as_posix()]
+ )
+ assert result.exit_code == ExitCode.OK
+
+ result = runner.invoke(cli, [tmp_path.as_posix()])
+ assert result.exit_code == ExitCode.OK
+ assert "1 Skipped because unchanged" in result.output
+
+ result = runner.invoke(
+ cli, ["lock", "reset", "-k", "example", "--yes", tmp_path.as_posix()]
+ )
+ assert result.exit_code == ExitCode.OK
+
+ result = runner.invoke(cli, [tmp_path.as_posix()])
+ assert result.exit_code == ExitCode.OK
+ assert "1 Succeeded" in result.output
+
+ def test_lock_accept_downstream_target_then_build_skips_target_and_ancestors(
+ self, runner, tmp_path
+ ):
+ _write_chain_project(tmp_path)
+
+ result = runner.invoke(cli, [tmp_path.as_posix()])
+ assert result.exit_code == ExitCode.OK
+
+ upstream = tmp_path / "task_upstream.py"
+ downstream = tmp_path / "task_downstream.py"
+ upstream.write_text(upstream.read_text() + "\n# changed upstream\n")
+ downstream.write_text(downstream.read_text() + "\n# changed downstream\n")
+
+ result = runner.invoke(
+ cli, ["lock", "accept", "-k", "downstream", "--yes", tmp_path.as_posix()]
+ )
+ assert result.exit_code == ExitCode.OK
+
+ result = runner.invoke(cli, [tmp_path.as_posix()])
+ assert result.exit_code == ExitCode.OK
+ assert "2 Skipped because unchanged" in result.output
+
+ def test_lock_accept_upstream_target_then_build_only_runs_unaccepted_descendant(
+ self, runner, tmp_path
+ ):
+ _write_chain_project(tmp_path)
+
+ result = runner.invoke(cli, [tmp_path.as_posix()])
+ assert result.exit_code == ExitCode.OK
+
+ upstream = tmp_path / "task_upstream.py"
+ downstream = tmp_path / "task_downstream.py"
+ upstream.write_text(upstream.read_text() + "\n# changed upstream\n")
+ downstream.write_text(downstream.read_text() + "\n# changed downstream\n")
+
+ result = runner.invoke(
+ cli, ["lock", "accept", "-k", "upstream", "--yes", tmp_path.as_posix()]
+ )
+ assert result.exit_code == ExitCode.OK
+
+ result = runner.invoke(cli, [tmp_path.as_posix()])
+ assert result.exit_code == ExitCode.OK
+ assert "1 Succeeded" in result.output
+ assert "1 Skipped because unchanged" in result.output
+
+ def test_lock_accept_with_ancestors_then_exact_reset_reexecutes_only_target(
+ self, runner, tmp_path
+ ):
+ _write_chain_project(tmp_path)
+
+ result = runner.invoke(cli, [tmp_path.as_posix()])
+ assert result.exit_code == ExitCode.OK
+
+ upstream = tmp_path / "task_upstream.py"
+ downstream = tmp_path / "task_downstream.py"
+ upstream.write_text(upstream.read_text() + "\n# changed upstream\n")
+ downstream.write_text(downstream.read_text() + "\n# changed downstream\n")
+
+ result = runner.invoke(
+ cli, ["lock", "accept", "-k", "downstream", "--yes", tmp_path.as_posix()]
+ )
+ assert result.exit_code == ExitCode.OK
+
+ result = runner.invoke(cli, [tmp_path.as_posix()])
+ assert result.exit_code == ExitCode.OK
+ assert "2 Skipped because unchanged" in result.output
+
+ result = runner.invoke(
+ cli, ["lock", "reset", "-k", "downstream", "--yes", tmp_path.as_posix()]
+ )
+ assert result.exit_code == ExitCode.OK
+
+ result = runner.invoke(cli, [tmp_path.as_posix()])
+ assert result.exit_code == ExitCode.OK
+ assert "1 Succeeded" in result.output
+ assert "1 Skipped because unchanged" in result.output
+
+ def test_lock_accept_interactive_partial_workflow_only_reexecutes_unaccepted_tasks(
+ self, runner, tmp_path
+ ):
+ _write_chain_project(tmp_path)
+
+ result = runner.invoke(cli, [tmp_path.as_posix()])
+ assert result.exit_code == ExitCode.OK
+
+ upstream = tmp_path / "task_upstream.py"
+ downstream = tmp_path / "task_downstream.py"
+ upstream.write_text(upstream.read_text() + "\n# changed upstream\n")
+ downstream.write_text(downstream.read_text() + "\n# changed downstream\n")
+
+ result = runner.invoke(
+ cli,
+ ["lock", "accept", "-k", "downstream", tmp_path.as_posix()],
+ input="y\nn\n",
+ )
+ assert result.exit_code == ExitCode.OK
+
+ result = runner.invoke(cli, [tmp_path.as_posix()])
+ assert result.exit_code == ExitCode.OK
+ assert "1 Succeeded" in result.output
+ assert "1 Skipped because unchanged" in result.output
+
+ def test_lock_clean_removes_stale_entries_without_accepting_new_tasks_workflow(
+ self, runner, tmp_path
+ ):
+ tmp_path.joinpath("task_alpha.py").write_text(
+ textwrap.dedent(
+ """
+ from pathlib import Path
+
+
+ def task_alpha(produces=Path("alpha.txt")):
+ produces.write_text("alpha")
+ """
+ )
+ )
+ tmp_path.joinpath("task_beta.py").write_text(
+ textwrap.dedent(
+ """
+ from pathlib import Path
+
+
+ def task_beta(produces=Path("beta.txt")):
+ produces.write_text("beta")
+ """
+ )
+ )
+
+ result = runner.invoke(cli, [tmp_path.as_posix()])
+ assert result.exit_code == ExitCode.OK
+
+ tmp_path.joinpath("task_alpha.py").unlink()
+ tmp_path.joinpath("task_gamma.py").write_text(
+ textwrap.dedent(
+ """
+ from pathlib import Path
+
+
+ def task_gamma(produces=Path("gamma.txt")):
+ produces.write_text("gamma")
+ """
+ )
+ )
+
+ result = runner.invoke(cli, ["lock", "clean", "--yes", tmp_path.as_posix()])
+ assert result.exit_code == ExitCode.OK
+ assert _task_ids(tmp_path) == {"task_beta.py::task_beta"}
+
+ result = runner.invoke(cli, [tmp_path.as_posix()])
+ assert result.exit_code == ExitCode.OK
+ assert "1 Succeeded" in result.output
+ assert "1 Skipped because unchanged" in result.output
+
+
+@pytest.mark.parametrize(
+ ("content", "message"),
+ [
+ ("{not toml", "Lockfile has invalid format"),
+ ('lock-version = "0.9"\ntask = []\n', "Unsupported lock-version"),
+ ('lock-version = "9.0"\ntask = []\n', "Unsupported lock-version"),
+ ],
+)
+def test_lock_commands_fail_for_invalid_lockfiles(runner, tmp_path, content, message):
+ _write_chain_project(tmp_path)
+ tmp_path.joinpath("pytask.lock").write_text(content)
+
+ result = runner.invoke(cli, ["lock", "clean", "--yes", tmp_path.as_posix()])
+
+ assert result.exit_code == ExitCode.CONFIGURATION_FAILED
+ assert message in result.output
+
+
+def test_lock_accept_on_database_only_project_creates_lockfile(runner, tmp_path):
+ _write_chain_project(tmp_path)
+
+ result = runner.invoke(cli, [tmp_path.as_posix()])
+ assert result.exit_code == ExitCode.OK
+ assert (tmp_path / ".pytask" / "pytask.sqlite3").exists()
+
+ (tmp_path / "pytask.lock").unlink()
+
+ downstream = tmp_path / "task_downstream.py"
+ downstream.write_text(downstream.read_text() + "\n# changed downstream\n")
+
+ result = runner.invoke(
+ cli, ["lock", "accept", "-k", "downstream", "--yes", tmp_path.as_posix()]
+ )
+
+ assert result.exit_code == ExitCode.OK
+ assert (tmp_path / "pytask.lock").exists()
+
+
+@pytest.mark.parametrize("subcommand", ["accept", "reset", "clean"])
+def test_lock_rejects_dry_run_and_yes_together(runner, tmp_path, subcommand):
+ _write_chain_project(tmp_path)
+
+ args = ["lock", subcommand, "--dry-run", "--yes", tmp_path.as_posix()]
+ result = runner.invoke(cli, args)
+
+ assert result.exit_code == 2
+ assert "mutually exclusive" in result.output
+
+
+@pytest.mark.parametrize("subcommand", ["accept", "reset", "clean"])
+def test_lock_commands_replay_journal_before_applying_changes(
+ runner, tmp_path, subcommand
+):
+ _write_chain_project(tmp_path)
+
+ session = build(paths=tmp_path)
+ assert session.exit_code == ExitCode.OK
+
+ lockfile_state = session.config["lockfile_state"]
+ assert lockfile_state is not None
+
+ downstream = tmp_path / "task_downstream.py"
+ downstream.write_text(downstream.read_text() + "\n# journal change\n")
+ downstream_task = _task_by_suffix(session, "task_downstream.py::task_downstream")
+ lockfile_state.update_task(session, downstream_task)
+
+ journal_path = (tmp_path / "pytask.lock").with_suffix(".lock.journal")
+ assert journal_path.exists()
+
+ if subcommand == "accept":
+ downstream.write_text(downstream.read_text() + "\n# current change\n")
+ args = ["lock", "accept", "-k", "downstream", "--yes", tmp_path.as_posix()]
+ elif subcommand == "reset":
+ args = ["lock", "reset", "-k", "downstream", "--yes", tmp_path.as_posix()]
+ else:
+ tmp_path.joinpath("task_downstream.py").unlink()
+ args = ["lock", "clean", "--yes", tmp_path.as_posix()]
+
+ result = runner.invoke(cli, args)
+
+ assert result.exit_code == ExitCode.OK
+ assert not journal_path.exists()
+
+
+def test_lock_command_fails_for_ambiguous_lockfile_ids(runner, tmp_path):
+ source = """
+ from dataclasses import dataclass, field
+ from pathlib import Path
+ from typing import Any
+
+ @dataclass
+ class CustomNode:
+ name: str
+ value: str
+ signature: str
+ attributes: dict[Any, Any] = field(default_factory=dict)
+
+ def state(self):
+ return self.value
+
+ def load(self, is_product=False):
+ return self.value
+
+ def save(self, value):
+ self.value = value
+
+ def task_example(
+ first=CustomNode(name="dup", value="1", signature="signature-a"),
+ second=CustomNode(name="dup", value="2", signature="signature-b"),
+ produces=Path("out.txt"),
+ ):
+ raise RuntimeError("should not execute")
+ """
+ tmp_path.joinpath("task_module.py").write_text(textwrap.dedent(source))
+
+ result = runner.invoke(cli, ["lock", "clean", "--yes", tmp_path.as_posix()])
+
+ assert result.exit_code == ExitCode.COLLECTION_FAILED
+ assert "Ambiguous lockfile ids detected" in result.output
diff --git a/tests/test_lockfile.py b/tests/test_lockfile.py
index 2aebe95b..7dcdbf19 100644
--- a/tests/test_lockfile.py
+++ b/tests/test_lockfile.py
@@ -226,37 +226,6 @@ def func(path):
assert state.hash_ == session.tasks[0].state()
-def test_clean_lockfile_removes_stale_entries(tmp_path):
- def func_first(path):
- path.touch()
-
- def func_second(path):
- path.touch()
-
- task_first = TaskWithoutPath(
- name="task_first",
- function=func_first,
- produces={"path": PathNode(path=tmp_path / "first.txt")},
- )
- task_second = TaskWithoutPath(
- name="task_second",
- function=func_second,
- produces={"path": PathNode(path=tmp_path / "second.txt")},
- )
-
- session = build(tasks=[task_first, task_second], paths=tmp_path)
- assert session.exit_code == ExitCode.OK
- lockfile = read_lockfile(tmp_path / "pytask.lock")
- assert lockfile is not None
- assert {entry.id for entry in lockfile.task} == {"task_first", "task_second"}
-
- session = build(tasks=[task_first], paths=tmp_path, clean_lockfile=True)
- assert session.exit_code == ExitCode.OK
- lockfile = read_lockfile(tmp_path / "pytask.lock")
- assert lockfile is not None
- assert {entry.id for entry in lockfile.task} == {"task_first"}
-
-
def test_update_task_skips_write_when_unchanged(tmp_path, monkeypatch):
def func(path):
path.write_text("data")