diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 78bad1a..ad8c19e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -56,7 +56,7 @@ repos: args: [--wrap, "88"] files: (README\.md) - repo: https://github.com/crate-ci/typos - rev: v1 + rev: v1.46.0 hooks: - id: typos - repo: meta diff --git a/docs/source/changes.md b/docs/source/changes.md index 6f30351..719a469 100644 --- a/docs/source/changes.md +++ b/docs/source/changes.md @@ -5,6 +5,10 @@ chronological order. Releases follow [semantic versioning](https://semver.org/) releases are available on [PyPI](https://pypi.org/project/pytask-parallel) and [Anaconda.org](https://anaconda.org/conda-forge/pytask-parallel). +## 0.5.3 - 2026-05-01 + +- {pull}`153` adds compatibility with the new pytask 0.6.0 release. + ## 0.5.2 - 2026-02-06 - {pull}`129` drops support for Python 3.8 and 3.9 and adds support for Python 3.14. diff --git a/pyproject.toml b/pyproject.toml index d010638..d2351a0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -99,7 +99,6 @@ unsafe-fixes = true [tool.ruff.lint] extend-ignore = [ - "ANN401", # flake8-annotate typing.Any "COM812", # Comply with ruff-format. "ISC001", # Comply with ruff-format. ] diff --git a/src/pytask_parallel/backends.py b/src/pytask_parallel/backends.py index 2021782..de38586 100644 --- a/src/pytask_parallel/backends.py +++ b/src/pytask_parallel/backends.py @@ -63,7 +63,7 @@ def _configure_worker(root: str | None) -> None: sys.path.insert(0, root) -def _deserialize_and_run_with_cloudpickle(fn: bytes, kwargs: bytes) -> Any: +def _deserialize_and_run_with_cloudpickle(fn: bytes, kwargs: bytes) -> Any: # noqa: ANN401 """Deserialize and execute a function and keyword arguments.""" deserialized_fn = cloudpickle.loads(fn) deserialized_kwargs = cloudpickle.loads(kwargs) @@ -77,8 +77,8 @@ def submit( self, fn: Callable[..., Any], /, - *args: Any, # noqa: ARG002 - **kwargs: Any, + *args: Any, # noqa: ANN401, ARG002 + **kwargs: Any, # noqa: ANN401 ) -> Future[Any]: """Submit a new task.""" return super().submit( diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index 1b31a24..3001d69 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -41,6 +41,18 @@ from pytask_parallel.wrappers import WrapperResult +def _get_task_from_dag(session: Session, task_name: str) -> PTask: + """Get a task from the pre- and post-pytask 0.6 DAG representations for compat.""" + node = session.dag.nodes[task_name] + task = node["task"] if isinstance(node, dict) else node + + if not isinstance(task, PTask): + msg = f"Expected {task_name!r} to resolve to a task." + raise TypeError(msg) + + return task + + @hookimpl def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR0912, PLR0915 """Execute tasks with a parallel backend. @@ -68,8 +80,13 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 with session.config["_parallel_executor"]: sleeper = _Sleeper() + scheduler = session.scheduler + if scheduler is None: + msg = "Expected the scheduler to be initialized before executing tasks." + raise RuntimeError(msg) + i = 0 - while session.scheduler.is_active(): + while scheduler.is_active(): try: newly_collected_reports = [] @@ -88,13 +105,11 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 n_new_tasks = session.config["n_workers"] - len(running_tasks) ready_tasks = ( - list(session.scheduler.get_ready(n_new_tasks)) - if n_new_tasks >= 1 - else [] + list(scheduler.get_ready(n_new_tasks)) if n_new_tasks >= 1 else [] ) for task_name in ready_tasks: - task = session.dag.nodes[task_name]["task"] + task = _get_task_from_dag(session, task_name) session.hook.pytask_execute_task_log_start( session=session, task=task ) @@ -111,7 +126,7 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 task, sys.exc_info() ) newly_collected_reports.append(report) - session.scheduler.done(task_name) + scheduler.done(task_name) if not ready_tasks: sleeper.increment() @@ -133,7 +148,7 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 ) if wrapper_result.exc_info is not None: - task = session.dag.nodes[task_name]["task"] + task = _get_task_from_dag(session, task_name) newly_collected_reports.append( ExecutionReport.from_task_and_exception( task, @@ -141,9 +156,9 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 ) ) running_tasks.pop(task_name) - session.scheduler.done(task_name) + scheduler.done(task_name) else: - task = session.dag.nodes[task_name]["task"] + task = _get_task_from_dag(session, task_name) _update_carry_over_products( task, wrapper_result.carry_over_products ) @@ -161,7 +176,7 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 running_tasks.pop(task_name) newly_collected_reports.append(report) - session.scheduler.done(task_name) + scheduler.done(task_name) for report in newly_collected_reports: session.hook.pytask_execute_task_process_report( diff --git a/src/pytask_parallel/nodes.py b/src/pytask_parallel/nodes.py index e5fb0f6..5094299 100644 --- a/src/pytask_parallel/nodes.py +++ b/src/pytask_parallel/nodes.py @@ -71,6 +71,6 @@ def load(self, is_product: bool = False) -> Path: # noqa: ARG002, FBT001, FBT00 self.node.path = path return self.node.load(is_product=self.is_product) - def save(self, value: Any) -> None: + def save(self, value: Any) -> None: # noqa: ANN401 """Save strings or bytes to file.""" self.value = value diff --git a/src/pytask_parallel/typing.py b/src/pytask_parallel/typing.py index 5d0ac4b..b8a1db8 100644 --- a/src/pytask_parallel/typing.py +++ b/src/pytask_parallel/typing.py @@ -1,8 +1,8 @@ """Contains functions related to typing.""" -from pathlib import Path from pathlib import PosixPath from pathlib import WindowsPath +from typing import Any from typing import NamedTuple from pytask import PTask @@ -16,7 +16,7 @@ def is_coiled_function(task: PTask) -> bool: return "coiled_kwargs" in task.attributes -def is_local_path(path: Path) -> bool: +def is_local_path(path: Any) -> bool: # noqa: ANN401 """Check if a path is local.""" return isinstance(path, (FilePath, PosixPath, WindowsPath)) diff --git a/src/pytask_parallel/utils.py b/src/pytask_parallel/utils.py index c76dce9..1553ea0 100644 --- a/src/pytask_parallel/utils.py +++ b/src/pytask_parallel/utils.py @@ -73,7 +73,7 @@ def _safe_load( *, is_product: bool, remote: bool, -) -> Any: +) -> Any: # noqa: ANN401 """Load a node and catch exceptions.""" _rich_traceback_guard = True # Get the argument name like "path" or "return" for function returns. diff --git a/src/pytask_parallel/wrappers.py b/src/pytask_parallel/wrappers.py index 63cdf61..d20e75d 100644 --- a/src/pytask_parallel/wrappers.py +++ b/src/pytask_parallel/wrappers.py @@ -57,7 +57,7 @@ class WrapperResult: stderr: str -def wrap_task_in_thread(task: PTask, *, remote: bool, **kwargs: Any) -> WrapperResult: +def wrap_task_in_thread(task: PTask, *, remote: bool, **kwargs: Any) -> WrapperResult: # noqa: ANN401 """Mock execution function such that it returns the same as for processes. The function for processes returns ``warning_reports`` and an ``exception``. With @@ -191,7 +191,7 @@ def rewrap_task_with_coiled_function(task: PTask) -> CoiledFunction: return cast("CoiledFunction", decorated) -def _raise_exception_on_breakpoint(*args: Any, **kwargs: Any) -> None: # noqa: ARG001 +def _raise_exception_on_breakpoint(*args: Any, **kwargs: Any) -> None: # noqa: ANN401, ARG001 msg = ( "You cannot use 'breakpoint()' or 'pdb.set_trace()' while parallelizing the " "execution of tasks with pytask-parallel. Please, remove the breakpoint or run " @@ -223,11 +223,14 @@ def _render_traceback_to_string( traceback = Traceback(exc_info, show_locals=show_locals) segments = console.render(cast("Any", traceback), options=console_options) text = "".join(segment.text for segment in segments) - return (*exc_info[:2], text) + return exc_info[0], exc_info[1], text def _handle_function_products( - task: PTask, out: Any, *, remote: bool = False + task: PTask, + out: Any, # noqa: ANN401 + *, + remote: bool = False, ) -> PyTree[CarryOverPath | PythonNode | None]: """Handle the products of the task. @@ -310,7 +313,7 @@ def _delete_local_files_on_remote(kwargs: dict[str, PyTree[Any]]) -> None: """ - def _delete(potential_node: Any) -> None: + def _delete(potential_node: Any) -> None: # noqa: ANN401 if isinstance(potential_node, RemotePathNode): with suppress(OSError): os.close(potential_node.fd)