Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
362 changes: 255 additions & 107 deletions agr/commands/remove.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

from dataclasses import dataclass, field
from pathlib import Path

from agr.commands import CommandResult
Expand All @@ -10,6 +11,7 @@
DEPENDENCY_TYPE_PACKAGE,
DEPENDENCY_TYPE_RALPH,
DEPENDENCY_TYPE_SKILL,
AgrConfig,
Dependency,
)
from agr.commands.migrations import run_tool_migrations
Expand Down Expand Up @@ -206,6 +208,247 @@ def _find_dep_by_candidates(
return None


@dataclass
class _RefRemoval:
"""Outcome of processing a single ref in ``run_remove``.

``removed_candidates`` / ``removed_kinds`` are parallel lists describing
every lockfile entry that should be dropped as a result of this ref
(the dep itself plus any transitive/nested children), in the same shape
``_update_lockfile_after_remove`` consumes.
"""

result: CommandResult
removed_candidates: list[list[str]] = field(default_factory=list)
removed_kinds: list[str] = field(default_factory=list)


def _nested_packages_to_remove(
dep: Dependency,
existing_lockfile: Lockfile | None,
) -> list[tuple[str, LockedEntry]]:
"""Nested package entries whose whole parent chain lives inside ``dep``."""
candidate_package_ids = (
existing_lockfile.package_closure({dep.identifier})
if existing_lockfile
else {dep.identifier}
)
return [
(kind, entry)
for kind, entry in _nested_package_entries_for_packages(
existing_lockfile, {dep.identifier}
)
if not (entry.parent_ids - candidate_package_ids)
]


def _transitive_leaves_to_remove(
dep: Dependency,
config: AgrConfig,
existing_lockfile: Lockfile | None,
nested_package_entries: list[tuple[str, LockedEntry]],
) -> list[tuple[str, LockedEntry]]:
"""Transitive leaf entries to remove, excluding ones still directly required.

A leaf is removed only when it is not a direct dependency and its entire
parent chain lives inside the package being removed (plus the nested
packages also being removed).
"""
direct_leaf_ids = {
(d.type, d.identifier)
for d in config.dependencies
if not d.is_package and d.identifier != dep.identifier
}
removed_package_ids = {dep.identifier} | {
entry.identifier for _kind, entry in nested_package_entries
}
return [
(kind, entry)
for kind, entry in _transitive_leaf_entries_for_packages(
existing_lockfile, {dep.identifier}
)
if (kind, entry.identifier) not in direct_leaf_ids
and not (entry.parent_ids - removed_package_ids)
]


def _resolve_package_cleanup(
dep: Dependency,
config: AgrConfig,
existing_lockfile: Lockfile | None,
) -> tuple[list[tuple[str, LockedEntry]], list[tuple[str, LockedEntry]]]:
"""Resolve the nested-package and transitive-leaf entries to remove for a package.

Returns ``(nested_package_entries, transitive_entries)``. A child is
only scheduled for removal when its entire parent chain lives inside the
package being removed (so children still required by another package are
retained).
"""
nested_package_entries = _nested_packages_to_remove(dep, existing_lockfile)
transitive_entries = _transitive_leaves_to_remove(
dep, config, existing_lockfile, nested_package_entries
)
return nested_package_entries, transitive_entries


def _uninstall_transitive_entries(
transitive_entries: list[tuple[str, LockedEntry]],
config: AgrConfig,
tools: list[ToolConfig],
repo_root: Path | None,
skills_dirs: dict[str, Path] | None,
) -> bool:
"""Filesystem-remove each transitive child entry. Returns True if any removed."""
removed = False
for kind, entry in transitive_entries:
child_handle = _entry_to_handle(entry, kind, config.default_owner)
if _uninstall_from_filesystem(
child_handle,
kind == DEPENDENCY_TYPE_RALPH,
tools,
repo_root,
entry.source,
skills_dirs,
default_repo=config.default_repo,
):
removed = True
return removed


def _resolve_dep(
ref: str,
config: AgrConfig,
global_install: bool,
) -> tuple[ParsedHandle, list[str], Dependency | None]:
"""Resolve a ref to its handle, identifier candidates, and matching dependency."""
handle = parse_handle(ref, default_owner=config.default_owner)

# Compute the resolved absolute path once for local global installs.
abs_path_str: str | None = None
if global_install and handle.is_local and handle.local_path is not None:
abs_path_str = str(handle.resolve_local_path())

candidates = _identifier_candidates(ref, handle, abs_path_str)
dep = _find_dep_by_candidates(candidates, ref, config.dependencies)

# When the dep was found by installed_name fallback (not by any candidate
# identifier), its actual identifier must be added so that config and
# lockfile removal can match it.
if dep is not None and dep.identifier not in candidates:
candidates.append(dep.identifier)

return handle, candidates, dep


def _remove_leaf_from_filesystem(
dep: Dependency | None,
handle: ParsedHandle,
config: AgrConfig,
tools: list[ToolConfig],
repo_root: Path | None,
skills_dirs: dict[str, Path] | None,
) -> bool:
"""Filesystem-remove a non-package dependency (skill or ralph)."""
source_name = None
if dep and dep.is_remote:
source_name = dep.source or config.default_source
is_ralph = dep is not None and dep.is_ralph
fs_handle = dep.to_parsed_handle(config.default_owner) if dep else handle
return _uninstall_from_filesystem(
fs_handle,
is_ralph,
tools,
repo_root,
source_name,
skills_dirs,
default_repo=config.default_repo,
)


def _remove_from_filesystem(
dep: Dependency | None,
handle: ParsedHandle,
config: AgrConfig,
tools: list[ToolConfig],
repo_root: Path | None,
skills_dirs: dict[str, Path] | None,
existing_lockfile: Lockfile | None,
) -> tuple[bool, list[tuple[str, LockedEntry]], list[tuple[str, LockedEntry]]]:
"""Filesystem-remove a dependency, fanning out to package children when needed.

Returns ``(removed_anything, nested_package_entries, transitive_entries)``.
Packages have no filesystem footprint of their own; only their transitive
children are removed.
"""
if dep is not None and dep.is_package:
nested, transitive = _resolve_package_cleanup(dep, config, existing_lockfile)
removed = _uninstall_transitive_entries(
transitive, config, tools, repo_root, skills_dirs
)
return removed, nested, transitive
removed = _remove_leaf_from_filesystem(
dep, handle, config, tools, repo_root, skills_dirs
)
return removed, [], []


def _remove_from_config(config: AgrConfig, candidates: list[str]) -> bool:
"""Remove the first matching candidate identifier from the config."""
for identifier in candidates:
if config.remove_dependency(identifier):
return True
return False


def _build_removal(
ref: str,
dep: Dependency | None,
candidates: list[str],
nested_package_entries: list[tuple[str, LockedEntry]],
transitive_entries: list[tuple[str, LockedEntry]],
) -> _RefRemoval:
"""Assemble a successful ``_RefRemoval`` with the lockfile entries to drop."""
dep_kind = dep.type if dep is not None else DEPENDENCY_TYPE_SKILL
removal = _RefRemoval(
CommandResult(ref, True, "Removed"),
removed_candidates=[candidates],
removed_kinds=[dep_kind],
)
for kind, entry in (*nested_package_entries, *transitive_entries):
removal.removed_candidates.append([entry.identifier])
removal.removed_kinds.append(kind)
return removal


def _process_ref(
ref: str,
config: AgrConfig,
tools: list[ToolConfig],
repo_root: Path | None,
skills_dirs: dict[str, Path] | None,
existing_lockfile: Lockfile | None,
global_install: bool,
) -> _RefRemoval:
"""Process a single remove ref: resolve, uninstall, and report the outcome.

Performs filesystem removal and config removal for one ref (plus any
transitive/nested package children), returning a ``_RefRemoval`` whose
parallel candidate/kind lists feed ``_update_lockfile_after_remove``.
Install errors are caught and surfaced as a failed ``CommandResult``.
"""
try:
handle, candidates, dep = _resolve_dep(ref, config, global_install)
removed_fs, nested, transitive = _remove_from_filesystem(
dep, handle, config, tools, repo_root, skills_dirs, existing_lockfile
)
removed_config = _remove_from_config(config, candidates)
if not (removed_fs or removed_config):
return _RefRemoval(CommandResult(ref, False, "Not found"))
return _build_removal(ref, dep, candidates, nested, transitive)
except INSTALL_ERROR_TYPES as e:
return _RefRemoval(CommandResult(ref, False, format_install_error(e)))


def run_remove(refs: list[str], global_install: bool = False) -> None:
"""Run the remove command.

Expand All @@ -225,113 +468,18 @@ def run_remove(refs: list[str], global_install: bool = False) -> None:
removed_kinds: list[str] = []

for ref in refs:
try:
# Parse handle
handle = parse_handle(ref, default_owner=config.default_owner)

# Compute the resolved absolute path once for local global installs
abs_path_str: str | None = None
if global_install and handle.is_local and handle.local_path is not None:
abs_path_str = str(handle.resolve_local_path())

candidates = _identifier_candidates(ref, handle, abs_path_str)

dep = _find_dep_by_candidates(candidates, ref, config.dependencies)

# When the dep was found by installed_name fallback (not by
# any candidate identifier), its actual identifier must be
# added to candidates so that config and lockfile removal
# can match it.
if dep is not None and dep.identifier not in candidates:
candidates.append(dep.identifier)

source_name = None
if dep and dep.is_remote:
source_name = dep.source or config.default_source

is_ralph = dep is not None and dep.is_ralph
dep_kind = dep.type if dep is not None else DEPENDENCY_TYPE_SKILL
fs_handle = dep.to_parsed_handle(config.default_owner) if dep else handle

# Remove from filesystem
removed_fs = False
if dep is None or not dep.is_package:
removed_fs = _uninstall_from_filesystem(
fs_handle,
is_ralph,
tools,
repo_root,
source_name,
skills_dirs,
default_repo=config.default_repo,
)

transitive_entries: list[tuple[str, LockedEntry]] = []
nested_package_entries: list[tuple[str, LockedEntry]] = []
if dep is not None and dep.is_package:
direct_leaf_ids = {
(d.type, d.identifier)
for d in config.dependencies
if not d.is_package and d.identifier != dep.identifier
}
candidate_package_ids = (
existing_lockfile.package_closure({dep.identifier})
if existing_lockfile
else {dep.identifier}
)
nested_package_entries = [
(kind, entry)
for kind, entry in _nested_package_entries_for_packages(
existing_lockfile, {dep.identifier}
)
if not (entry.parent_ids - candidate_package_ids)
]
removed_package_ids = {dep.identifier} | {
entry.identifier for _kind, entry in nested_package_entries
}
transitive_entries = [
(kind, entry)
for kind, entry in _transitive_leaf_entries_for_packages(
existing_lockfile, {dep.identifier}
)
if (kind, entry.identifier) not in direct_leaf_ids
and not (entry.parent_ids - removed_package_ids)
]
for kind, entry in transitive_entries:
child_handle = _entry_to_handle(entry, kind, config.default_owner)
if _uninstall_from_filesystem(
child_handle,
kind == DEPENDENCY_TYPE_RALPH,
tools,
repo_root,
entry.source,
skills_dirs,
default_repo=config.default_repo,
):
removed_fs = True

# Remove from config (try same candidate identifiers)
removed_config = False
for identifier in candidates:
if config.remove_dependency(identifier):
removed_config = True
break

if removed_fs or removed_config:
results.append(CommandResult(ref, True, "Removed"))
removed_candidates.append(candidates)
removed_kinds.append(dep_kind)
for kind, entry in nested_package_entries:
removed_candidates.append([entry.identifier])
removed_kinds.append(kind)
for kind, entry in transitive_entries:
removed_candidates.append([entry.identifier])
removed_kinds.append(kind)
else:
results.append(CommandResult(ref, False, "Not found"))

except INSTALL_ERROR_TYPES as e:
results.append(CommandResult(ref, False, format_install_error(e)))
removal = _process_ref(
ref,
config,
tools,
repo_root,
skills_dirs,
existing_lockfile,
global_install,
)
results.append(removal.result)
removed_candidates.extend(removal.removed_candidates)
removed_kinds.extend(removal.removed_kinds)

save_and_summarize_results(
results,
Expand Down
Loading
Loading