Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,39 @@ Or use the helper script:
./scripts/run-mini-forensics-agent --help
```

Experimental Agent Skills scaffold:

```bash
uv run mini-forensics-agent \
--model 'LocoOperator-4B-mlx-4Bit' \
--workspace /path/to/workspace \
--task 'Inspect this repo and use any matching skill if needed.' \
--enable-skills \
--stream
```

Skill discovery roots, in increasing precedence:
- `~/.agents/skills`
- `~/.mini-forensics-agent/skills`
- `<workspace>/.agents/skills`
- `<workspace>/.mini-forensics-agent/skills`
- any extra `--skill-dir /path/to/skills`

Each skill lives in its own directory and must include a `SKILL.md` with frontmatter:

```md
---
name: demo-skill
description: One-line summary of when the skill should be used
---

# Instructions
```

Useful skill commands:
- `uv run mini-forensics-agent --workspace /path/to/workspace --list-skills`
- `uv run mini-forensics-agent --workspace /path/to/workspace --enable-skills --skill-dir /extra/skills ...`

## TUI

```bash
Expand Down
29 changes: 27 additions & 2 deletions src/miniforensicsagent/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import argparse
import json
import sys
import time
from pathlib import Path

Expand All @@ -14,6 +15,7 @@
patch_mlx_lm_prompt_cache_with_turboquant,
resolve_model,
)
from .skills import discover_skills


def main() -> int:
Expand All @@ -31,26 +33,43 @@ def main() -> int:
parser.add_argument("--turboquant", action="store_true")
parser.add_argument("--tq-r-bits", type=int, default=4)
parser.add_argument("--tq-theta-bits", type=int, default=4)
parser.add_argument("--task", required=True)
parser.add_argument("--task", default="")
parser.add_argument("--json-out", default="")
parser.add_argument("--stream", action="store_true")
parser.add_argument("--list-models", action="store_true")
# Experimental features
parser.add_argument("--compress-observations", action="store_true", help="[exp] Compress old observation payloads to reduce prompt size (O(n²) → O(n) tokens).")
parser.add_argument("--transcript-window", type=int, default=None, metavar="K", help="[exp] Only include the last K turns in each prompt (sliding window).")
parser.add_argument("--multi-tool", action="store_true", help="[exp] Allow multiple independent tool calls per turn.")
parser.add_argument("--enable-skills", action="store_true", help="[exp] Discover Agent Skills and expose activation tools to the model.")
parser.add_argument("--skill-dir", action="append", default=[], help="[exp] Additional Agent Skills root to scan. May be passed multiple times.")
parser.add_argument("--list-skills", action="store_true", help="List discovered skills and exit.")
args = parser.parse_args()

root = Path(args.model_root).expanduser().resolve()
models = discover_models(root)
workspace = Path(args.workspace).expanduser().resolve()
skill_catalog = discover_skills(workspace, extra_dirs=args.skill_dir) if (args.enable_skills or args.list_skills) else None
if args.list_models:
for model in models:
print(f"{model.name}\t{model.path}")
return 0
if args.list_skills:
if skill_catalog is None:
return 0
for diagnostic in skill_catalog.diagnostics:
print(f"[skills] {diagnostic}", file=sys.stderr)
for skill in skill_catalog.skills:
print(f"{skill.name}\t{skill.skill_file}\t{skill.description}")
return 0
if not args.task.strip():
parser.error("--task is required unless --list-models or --list-skills is used.")
selected = resolve_model(args.model, models, root)
workspace = Path(args.workspace).expanduser().resolve()
started = time.perf_counter()
model, generation_config = load_local_model(selected.path)
if args.enable_skills and skill_catalog is not None:
for diagnostic in skill_catalog.diagnostics:
print(f"[skills] {diagnostic}", file=sys.stderr)
if args.turboquant:
patch_mlx_lm_prompt_cache_with_turboquant(r_bits=args.tq_r_bits, theta_bits=args.tq_theta_bits)
result = run_loop(
Expand All @@ -69,6 +88,7 @@ def main() -> int:
compress_observations=args.compress_observations,
transcript_window=args.transcript_window,
multi_tool=args.multi_tool,
skill_catalog=skill_catalog if args.enable_skills else None,
)
payload = {
"model": selected.name,
Expand All @@ -79,6 +99,11 @@ def main() -> int:
"tool_calls": result.tool_calls,
"elapsed_seconds": round(time.perf_counter() - started, 3),
"workspace": str(workspace),
"skills": {
"enabled": bool(args.enable_skills),
"roots": [str(path) for path in (skill_catalog.roots if skill_catalog is not None else ())],
"discovered": [skill.name for skill in (skill_catalog.skills if skill_catalog is not None else ())],
},
"kv_cache_quantization": {
"kv_bits": args.kv_bits,
"kv_group_size": args.kv_group_size,
Expand Down
35 changes: 31 additions & 4 deletions src/miniforensicsagent/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@
)
from .prompting import build_prompt
from .render import HAS_RICH, Live, RICH_STDERR, build_status_renderable, count_tokens, emit_observation_rendered, start_prefill_indicator
from .tools import DEFAULT_READ_LIMIT, run_tool
from .skills import SkillCatalog, render_active_skill_context, render_skill_catalog
from .tools import run_tool


TOOL_NAMES = {"Read", "Glob", "Grep", "Bash", "Write", "Edit"}
TOOL_NAMES = {"Read", "Glob", "Grep", "Bash", "Write", "Edit", "ActivateSkill", "ReadSkillResource"}


@dataclass
Expand Down Expand Up @@ -249,6 +250,15 @@ def compress_turn(turn: dict[str, Any]) -> None:
obs["output"] = f"[compressed: Bash output {len(output)} chars, seen at iter {iteration}]"


def update_active_skills(active_skills: dict[str, dict[str, Any]], decision: dict[str, Any], observation: dict[str, Any]) -> None:
if decision.get("type") != "tool" or not observation.get("ok"):
return
if decision.get("name") == "ActivateSkill":
skill_name = str(observation.get("name", "")).strip()
if skill_name:
active_skills[skill_name] = dict(observation)


def run_loop(
model: Any,
generation_config: Any,
Expand All @@ -269,6 +279,7 @@ def run_loop(
compress_observations: bool = False,
transcript_window: int | None = None,
multi_tool: bool = False,
skill_catalog: SkillCatalog | None = None,
) -> LoopResult:
transcript: list[dict[str, Any]] = []
tool_calls = 0
Expand All @@ -277,6 +288,8 @@ def run_loop(
narrow_empty_search_streak = 0
plan_state: dict[str, Any] | None = None
running_tool_stats: dict[str, Any] = {"counts": {}, "failures": 0}
active_skills: dict[str, dict[str, Any]] = {}
available_skills_block = render_skill_catalog(skill_catalog) if skill_catalog is not None else ""

for iteration in range(1, max_iterations + 1):
if should_stop is not None and should_stop():
Expand Down Expand Up @@ -315,6 +328,8 @@ def run_loop(
reflection_hint=reflection_hint,
window=transcript_window,
multi_tool=multi_tool,
available_skills=available_skills_block,
active_skill_context=render_active_skill_context(active_skills),
)
tokenizer = getattr(model, "tokenizer", None)
prompt_tokens = count_tokens(tokenizer, prompt) if tokenizer is not None else None
Expand Down Expand Up @@ -471,9 +486,15 @@ def run_loop(
for call in normalized_calls:
if should_stop is not None and should_stop():
return LoopResult(False, "cancelled", iteration, tool_calls, transcript)
obs = run_tool(call, workspace)
obs = run_tool(
call,
workspace,
skill_catalog=skill_catalog,
active_skill_names=set(active_skills),
)
tool_calls += 1
observations.append(obs)
update_active_skills(active_skills, call, obs)
added_evidence_total += update_evidence_cache(evidence_cache, call, obs)
turn["decisions"] = normalized_calls
turn["decision"] = normalized_calls[0] # keep compat for reflection hints
Expand Down Expand Up @@ -591,8 +612,14 @@ def run_loop(

if should_stop is not None and should_stop():
return LoopResult(False, "cancelled", iteration, tool_calls, transcript)
observation = run_tool(response, workspace)
observation = run_tool(
response,
workspace,
skill_catalog=skill_catalog,
active_skill_names=set(active_skills),
)
tool_calls += 1
update_active_skills(active_skills, response, observation)
turn["observations"] = [observation]
added_evidence = update_evidence_cache(evidence_cache, response, observation)
turn["evidence_cache_size"] = len(evidence_cache)
Expand Down
27 changes: 26 additions & 1 deletion src/miniforensicsagent/prompting.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ def build_prompt(
reflection_hint: str = "",
window: int | None = None,
multi_tool: bool = False,
available_skills: str = "",
active_skill_context: str = "",
) -> str:
visible = transcript[-window:] if window is not None and window > 0 else transcript
history = json.dumps(visible, ensure_ascii=False, indent=2) if visible else "[]"
Expand Down Expand Up @@ -86,6 +88,27 @@ def build_prompt(
"- or {\"type\":\"plan_update\",\"completed_steps\":[\"step\"],\"current_step\":\"step\"}\n"
"- or <final>{\"answer\":\"done\"}</final>"
)
skill_block = ""
skill_tools = ""
skill_rules = ""
if available_skills:
skill_block = (
"Available skills:\n"
f"{available_skills}\n"
)
skill_tools = (
"- ActivateSkill(skill_name)\n"
"- ReadSkillResource(skill_name, file_path, offset=1, limit=120)\n"
)
skill_rules = (
"Skills:\n"
"- If one of the listed skills clearly matches the task, activate it before following its detailed instructions.\n"
"- Once a skill is activated, treat its instructions as active guidance for the rest of the run.\n"
"- Only read skill resources after activating that skill, and prefer the specific files the skill references.\n"
)
active_skill_block = ""
if active_skill_context:
active_skill_block = f"{active_skill_context}\n\n"
return f"""You are a local codebase explorer.
Use Claude Code style tool calls.
{turn_format}
Expand All @@ -101,6 +124,7 @@ def build_prompt(
- Bash(command) [allowed: pwd, ls, find, cat]
- Write(file_path, content)
- Edit(file_path, old_string, new_string)
{skill_tools}

Forensics mode:
- Treat the workspace as an evidence snapshot, not a live system.
Expand All @@ -118,13 +142,14 @@ def build_prompt(
- Do not keep increasing Read from offset=1 unless no line-targeted option exists.
- If the last observation failed, fix it instead of finishing.
- Goal is artifact discovery, not long explanation.
{skill_rules}

{plan_block}
{convergence_block}
{final_only_prefix}
{plan_instruction}
{few_shot}
{reflection_hint}
{skill_block}{active_skill_block}{reflection_hint}

Task:
{task}
Expand Down
Loading
Loading