From a22d219badf9af47e5bd71e0f6aa6d1423dc65b6 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Fri, 1 May 2026 14:17:32 +0200 Subject: [PATCH 1/5] feat: add compatibility with pytask v0.6.0 --- docs/source/changes.md | 4 ++++ pyproject.toml | 4 +++- src/pytask_parallel/backends.py | 6 +++--- src/pytask_parallel/execute.py | 35 +++++++++++++++++++++++---------- src/pytask_parallel/nodes.py | 2 +- src/pytask_parallel/typing.py | 4 ++-- src/pytask_parallel/utils.py | 2 +- src/pytask_parallel/wrappers.py | 10 +++++----- 8 files changed, 44 insertions(+), 23 deletions(-) diff --git a/docs/source/changes.md b/docs/source/changes.md index 6f30351..719a469 100644 --- a/docs/source/changes.md +++ b/docs/source/changes.md @@ -5,6 +5,10 @@ chronological order. Releases follow [semantic versioning](https://semver.org/) releases are available on [PyPI](https://pypi.org/project/pytask-parallel) and [Anaconda.org](https://anaconda.org/conda-forge/pytask-parallel). +## 0.5.3 - 2026-05-01 + +- {pull}`153` adds compatibility with the new pytask 0.6.0 release. + ## 0.5.2 - 2026-02-06 - {pull}`129` drops support for Python 3.8 and 3.9 and adds support for Python 3.14. diff --git a/pyproject.toml b/pyproject.toml index d010638..09289d5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -99,7 +99,6 @@ unsafe-fixes = true [tool.ruff.lint] extend-ignore = [ - "ANN401", # flake8-annotate typing.Any "COM812", # Comply with ruff-format. "ISC001", # Comply with ruff-format. ] @@ -147,3 +146,6 @@ exclude_also = [ "\\.\\.\\.", "def __repr__", ] + +[tool.uv.sources] +pytask = { git = "https://github.com/pytask-dev/pytask" } diff --git a/src/pytask_parallel/backends.py b/src/pytask_parallel/backends.py index 2021782..de38586 100644 --- a/src/pytask_parallel/backends.py +++ b/src/pytask_parallel/backends.py @@ -63,7 +63,7 @@ def _configure_worker(root: str | None) -> None: sys.path.insert(0, root) -def _deserialize_and_run_with_cloudpickle(fn: bytes, kwargs: bytes) -> Any: +def _deserialize_and_run_with_cloudpickle(fn: bytes, kwargs: bytes) -> Any: # noqa: ANN401 """Deserialize and execute a function and keyword arguments.""" deserialized_fn = cloudpickle.loads(fn) deserialized_kwargs = cloudpickle.loads(kwargs) @@ -77,8 +77,8 @@ def submit( self, fn: Callable[..., Any], /, - *args: Any, # noqa: ARG002 - **kwargs: Any, + *args: Any, # noqa: ANN401, ARG002 + **kwargs: Any, # noqa: ANN401 ) -> Future[Any]: """Submit a new task.""" return super().submit( diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index 1b31a24..3001d69 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -41,6 +41,18 @@ from pytask_parallel.wrappers import WrapperResult +def _get_task_from_dag(session: Session, task_name: str) -> PTask: + """Get a task from the pre- and post-pytask 0.6 DAG representations for compat.""" + node = session.dag.nodes[task_name] + task = node["task"] if isinstance(node, dict) else node + + if not isinstance(task, PTask): + msg = f"Expected {task_name!r} to resolve to a task." + raise TypeError(msg) + + return task + + @hookimpl def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR0912, PLR0915 """Execute tasks with a parallel backend. @@ -68,8 +80,13 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 with session.config["_parallel_executor"]: sleeper = _Sleeper() + scheduler = session.scheduler + if scheduler is None: + msg = "Expected the scheduler to be initialized before executing tasks." + raise RuntimeError(msg) + i = 0 - while session.scheduler.is_active(): + while scheduler.is_active(): try: newly_collected_reports = [] @@ -88,13 +105,11 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 n_new_tasks = session.config["n_workers"] - len(running_tasks) ready_tasks = ( - list(session.scheduler.get_ready(n_new_tasks)) - if n_new_tasks >= 1 - else [] + list(scheduler.get_ready(n_new_tasks)) if n_new_tasks >= 1 else [] ) for task_name in ready_tasks: - task = session.dag.nodes[task_name]["task"] + task = _get_task_from_dag(session, task_name) session.hook.pytask_execute_task_log_start( session=session, task=task ) @@ -111,7 +126,7 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 task, sys.exc_info() ) newly_collected_reports.append(report) - session.scheduler.done(task_name) + scheduler.done(task_name) if not ready_tasks: sleeper.increment() @@ -133,7 +148,7 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 ) if wrapper_result.exc_info is not None: - task = session.dag.nodes[task_name]["task"] + task = _get_task_from_dag(session, task_name) newly_collected_reports.append( ExecutionReport.from_task_and_exception( task, @@ -141,9 +156,9 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 ) ) running_tasks.pop(task_name) - session.scheduler.done(task_name) + scheduler.done(task_name) else: - task = session.dag.nodes[task_name]["task"] + task = _get_task_from_dag(session, task_name) _update_carry_over_products( task, wrapper_result.carry_over_products ) @@ -161,7 +176,7 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 running_tasks.pop(task_name) newly_collected_reports.append(report) - session.scheduler.done(task_name) + scheduler.done(task_name) for report in newly_collected_reports: session.hook.pytask_execute_task_process_report( diff --git a/src/pytask_parallel/nodes.py b/src/pytask_parallel/nodes.py index e5fb0f6..5094299 100644 --- a/src/pytask_parallel/nodes.py +++ b/src/pytask_parallel/nodes.py @@ -71,6 +71,6 @@ def load(self, is_product: bool = False) -> Path: # noqa: ARG002, FBT001, FBT00 self.node.path = path return self.node.load(is_product=self.is_product) - def save(self, value: Any) -> None: + def save(self, value: Any) -> None: # noqa: ANN401 """Save strings or bytes to file.""" self.value = value diff --git a/src/pytask_parallel/typing.py b/src/pytask_parallel/typing.py index 5d0ac4b..b8a1db8 100644 --- a/src/pytask_parallel/typing.py +++ b/src/pytask_parallel/typing.py @@ -1,8 +1,8 @@ """Contains functions related to typing.""" -from pathlib import Path from pathlib import PosixPath from pathlib import WindowsPath +from typing import Any from typing import NamedTuple from pytask import PTask @@ -16,7 +16,7 @@ def is_coiled_function(task: PTask) -> bool: return "coiled_kwargs" in task.attributes -def is_local_path(path: Path) -> bool: +def is_local_path(path: Any) -> bool: # noqa: ANN401 """Check if a path is local.""" return isinstance(path, (FilePath, PosixPath, WindowsPath)) diff --git a/src/pytask_parallel/utils.py b/src/pytask_parallel/utils.py index c76dce9..1553ea0 100644 --- a/src/pytask_parallel/utils.py +++ b/src/pytask_parallel/utils.py @@ -73,7 +73,7 @@ def _safe_load( *, is_product: bool, remote: bool, -) -> Any: +) -> Any: # noqa: ANN401 """Load a node and catch exceptions.""" _rich_traceback_guard = True # Get the argument name like "path" or "return" for function returns. diff --git a/src/pytask_parallel/wrappers.py b/src/pytask_parallel/wrappers.py index 63cdf61..1c30944 100644 --- a/src/pytask_parallel/wrappers.py +++ b/src/pytask_parallel/wrappers.py @@ -57,7 +57,7 @@ class WrapperResult: stderr: str -def wrap_task_in_thread(task: PTask, *, remote: bool, **kwargs: Any) -> WrapperResult: +def wrap_task_in_thread(task: PTask, *, remote: bool, **kwargs: Any) -> WrapperResult: # noqa: ANN401 """Mock execution function such that it returns the same as for processes. The function for processes returns ``warning_reports`` and an ``exception``. With @@ -191,7 +191,7 @@ def rewrap_task_with_coiled_function(task: PTask) -> CoiledFunction: return cast("CoiledFunction", decorated) -def _raise_exception_on_breakpoint(*args: Any, **kwargs: Any) -> None: # noqa: ARG001 +def _raise_exception_on_breakpoint(*args: Any, **kwargs: Any) -> None: # noqa: ANN401, ARG001 msg = ( "You cannot use 'breakpoint()' or 'pdb.set_trace()' while parallelizing the " "execution of tasks with pytask-parallel. Please, remove the breakpoint or run " @@ -223,11 +223,11 @@ def _render_traceback_to_string( traceback = Traceback(exc_info, show_locals=show_locals) segments = console.render(cast("Any", traceback), options=console_options) text = "".join(segment.text for segment in segments) - return (*exc_info[:2], text) + return exc_info[0], exc_info[1], text def _handle_function_products( - task: PTask, out: Any, *, remote: bool = False + task: PTask, out: Any, *, remote: bool = False # noqa: ANN401 ) -> PyTree[CarryOverPath | PythonNode | None]: """Handle the products of the task. @@ -310,7 +310,7 @@ def _delete_local_files_on_remote(kwargs: dict[str, PyTree[Any]]) -> None: """ - def _delete(potential_node: Any) -> None: + def _delete(potential_node: Any) -> None: # noqa: ANN401 if isinstance(potential_node, RemotePathNode): with suppress(OSError): os.close(potential_node.fd) From ba1ee3287828016c3da75f50ed52d1a173b4a4dd Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 1 May 2026 12:17:51 +0000 Subject: [PATCH 2/5] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/pytask_parallel/wrappers.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/pytask_parallel/wrappers.py b/src/pytask_parallel/wrappers.py index 1c30944..82b340c 100644 --- a/src/pytask_parallel/wrappers.py +++ b/src/pytask_parallel/wrappers.py @@ -227,7 +227,10 @@ def _render_traceback_to_string( def _handle_function_products( - task: PTask, out: Any, *, remote: bool = False # noqa: ANN401 + task: PTask, + out: Any, + *, + remote: bool = False, # noqa: ANN401 ) -> PyTree[CarryOverPath | PythonNode | None]: """Handle the products of the task. From f0c498970ef472895535f7f9992990c091870b0a Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Fri, 1 May 2026 14:19:34 +0200 Subject: [PATCH 3/5] fix --- pyproject.toml | 3 --- src/pytask_parallel/wrappers.py | 5 ++++- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 09289d5..d2351a0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -146,6 +146,3 @@ exclude_also = [ "\\.\\.\\.", "def __repr__", ] - -[tool.uv.sources] -pytask = { git = "https://github.com/pytask-dev/pytask" } diff --git a/src/pytask_parallel/wrappers.py b/src/pytask_parallel/wrappers.py index 1c30944..82b340c 100644 --- a/src/pytask_parallel/wrappers.py +++ b/src/pytask_parallel/wrappers.py @@ -227,7 +227,10 @@ def _render_traceback_to_string( def _handle_function_products( - task: PTask, out: Any, *, remote: bool = False # noqa: ANN401 + task: PTask, + out: Any, + *, + remote: bool = False, # noqa: ANN401 ) -> PyTree[CarryOverPath | PythonNode | None]: """Handle the products of the task. From 4d5cabe1038e01c572f28f39f3e394af017b43b2 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Fri, 1 May 2026 14:20:57 +0200 Subject: [PATCH 4/5] fix --- .pre-commit-config.yaml | 4 ++-- src/pytask_parallel/wrappers.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 344c431..ad8c19e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -26,7 +26,7 @@ repos: - id: python-use-type-annotations - id: text-unicode-replacement-char - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.15.10 + rev: v0.15.12 hooks: - id: ruff-check - id: ruff-format @@ -56,7 +56,7 @@ repos: args: [--wrap, "88"] files: (README\.md) - repo: https://github.com/crate-ci/typos - rev: v1 + rev: v1.46.0 hooks: - id: typos - repo: meta diff --git a/src/pytask_parallel/wrappers.py b/src/pytask_parallel/wrappers.py index 82b340c..d20e75d 100644 --- a/src/pytask_parallel/wrappers.py +++ b/src/pytask_parallel/wrappers.py @@ -228,9 +228,9 @@ def _render_traceback_to_string( def _handle_function_products( task: PTask, - out: Any, + out: Any, # noqa: ANN401 *, - remote: bool = False, # noqa: ANN401 + remote: bool = False, ) -> PyTree[CarryOverPath | PythonNode | None]: """Handle the products of the task. From d42a659e29ca2c4250144081b348a701cf4a72db Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Fri, 1 May 2026 16:07:23 +0200 Subject: [PATCH 5/5] Mark trace test xfail on Python 3.14 in GitHub Actions --- tests/test_execute.py | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/tests/test_execute.py b/tests/test_execute.py index 6df9ef5..0756273 100644 --- a/tests/test_execute.py +++ b/tests/test_execute.py @@ -1,5 +1,7 @@ from __future__ import annotations +import os +import sys import textwrap from time import time @@ -262,7 +264,25 @@ def test_task_without_path_that_return(runner, tmp_path, parallel_backend): ) -@pytest.mark.parametrize("flag", ["--pdb", "--trace", "--dry-run"]) +@pytest.mark.parametrize( + "flag", + [ + "--pdb", + pytest.param( + "--trace", + marks=pytest.mark.xfail( + os.environ.get("GITHUB_ACTIONS") == "true" + and sys.version_info[:2] == (3, 14), + reason=( + "Pdb does not consume CliRunner input on Python 3.14 in " + "GitHub Actions." + ), + strict=True, + ), + ), + "--dry-run", + ], +) @pytest.mark.parametrize("parallel_backend", _IMPLEMENTED_BACKENDS) def test_parallel_execution_is_deactivated(runner, tmp_path, flag, parallel_backend): tmp_path.joinpath("task_example.py").write_text("def task_example(): pass")