Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## Unreleased

### Added

- **`context-analyzer-tool top-tools`** (#7) — ranked table of token usage by tool type (calls, total, average, share %). Backed by `/api/tool-summary`.

## 0.3.1 (2026-04-08)

### Bug Fixes
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ context-analyzer-tool serve Start the collector server
context-analyzer-tool dashboard Launch the live TUI dashboard
context-analyzer-tool status View active sessions and recent tasks
context-analyzer-tool anomalies List recent anomalies with root causes
context-analyzer-tool top-tools Rank tool types by total token usage
context-analyzer-tool context-cost Show context cost breakdown
context-analyzer-tool health Collector health check
context-analyzer-tool rtk-status Show RTK integration status and savings
Expand Down
65 changes: 65 additions & 0 deletions src/context_analyzer_tool/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,71 @@ def anomalies(
)


@app.command()
def top_tools(
port: int | None = typer.Option(None, help="Collector port (default: from config)"),
limit: int = typer.Option(10, help="Number of tool types to show"),
session_id: str | None = typer.Option(None, "--session", help="Filter by session"),
) -> None:
"""Show which tool types consume the most tokens overall."""
import httpx

url = _collector_base_url(port)
params: dict[str, str | int] = {"limit": limit}
if session_id is not None:
params["session_id"] = session_id

try:
with httpx.Client(timeout=5.0) as client:
resp = client.get(f"{url}/api/tool-summary", params=params)
resp.raise_for_status()
data = resp.json()
except httpx.ConnectError:
console.print(
"[red]Cannot connect to collector.[/red] "
"Is it running? Start with: [bold]context-analyzer-tool serve[/bold]"
)
raise typer.Exit(1) from None
except httpx.HTTPError as exc:
console.print(f"[red]Error:[/red] {exc}")
raise typer.Exit(1) from None

tools: list[dict[str, Any]] = data.get("tools", [])
total_tokens: int = int(data.get("total_tokens", 0))

if not tools:
console.print(
Panel("[dim]No tool usage with token estimates yet[/dim]", title="Top Tools")
)
return

table = Table(show_header=True, header_style="bold cyan", expand=True)
table.add_column("Rank", justify="right")
table.add_column("Tool")
table.add_column("Calls", justify="right")
table.add_column("Total Tokens", justify="right")
table.add_column("Avg / Call", justify="right")
table.add_column("Share", justify="right")

for rank, tool in enumerate(tools, start=1):
tool_total = int(tool.get("total_tokens", 0))
share = (tool_total / total_tokens * 100) if total_tokens else 0.0
table.add_row(
str(rank),
str(tool.get("task_type", "")),
f"{int(tool.get('call_count', 0)):,}",
f"{tool_total:,}",
f"{float(tool.get('avg_tokens', 0)):,.0f}",
f"{share:.1f}%",
)

title = "Top Tools by Token Usage"
if session_id:
title += f" (session {session_id[:8]}...)"
subtitle = f"{total_tokens:,} tokens across {len(tools)} tool type(s)"
console.print(Panel(table, title=f"{title}\n[dim]{subtitle}[/dim]", border_style="blue"))


@app.command()
def dashboard(
port: int | None = typer.Option(None, help="Collector port (default: from config)"),
Expand Down
12 changes: 12 additions & 0 deletions src/context_analyzer_tool/collector/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,3 +289,15 @@ class AnomalyResponse(BaseModel):
class AnomaliesListResponse(BaseModel):
anomalies: list[AnomalyResponse]
total_count: int


class ToolSummaryRow(BaseModel):
task_type: str
call_count: int
total_tokens: int
avg_tokens: float


class ToolSummaryResponse(BaseModel):
tools: list[ToolSummaryRow]
total_tokens: int
25 changes: 25 additions & 0 deletions src/context_analyzer_tool/collector/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
StatuslineSnapshotRequest,
StatusResponse,
TaskResponse,
ToolSummaryResponse,
ToolSummaryRow,
)
from context_analyzer_tool.db import anomalies as db_anomalies
from context_analyzer_tool.db import baselines as db_baselines
Expand Down Expand Up @@ -635,6 +637,29 @@ async def get_anomalies(
return AnomaliesListResponse(anomalies=anomalies, total_count=total)


@api_router.get("/tool-summary", response_model=ToolSummaryResponse)
async def get_tool_summary(
limit: int = 20,
session_id: str | None = None,
db: aiosqlite.Connection = Depends(get_db),
) -> ToolSummaryResponse:
"""Return aggregated token usage ranked by tool type."""
rows = await db_tasks.get_tool_type_summary(
db, session_id=session_id, limit=limit if limit > 0 else None
)
tools = [
ToolSummaryRow(
task_type=r["task_type"],
call_count=int(r["call_count"]),
total_tokens=int(r["total_tokens"]),
avg_tokens=float(r["avg_tokens"]),
)
for r in rows
]
total_tokens = sum(tool.total_tokens for tool in tools)
return ToolSummaryResponse(tools=tools, total_tokens=total_tokens)


@api_router.get("/rtk-status")
async def get_rtk_status() -> dict[str, Any]:
"""Return RTK integration status and savings."""
Expand Down
40 changes: 40 additions & 0 deletions src/context_analyzer_tool/db/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,3 +260,43 @@ async def get_null_delta_tasks(
)
rows = await cursor.fetchall()
return [dict(row) for row in rows]


async def get_tool_type_summary(
db: aiosqlite.Connection,
session_id: str | None = None,
limit: int | None = None,
) -> list[dict[str, Any]]:
"""Aggregate token usage by task_type (tool name).

Only rows with a non-null ``estimated_tokens`` value are included,
matching the metric used by the task cost timeline.
"""
clauses = ["estimated_tokens IS NOT NULL"]
params: list[str | int] = []

if session_id is not None:
clauses.append("session_id = ?")
params.append(session_id)

where = "WHERE " + " AND ".join(clauses)
limit_clause = ""
if limit is not None:
limit_clause = " LIMIT ?"
params.append(limit)

query = f"""
SELECT
task_type,
COUNT(*) AS call_count,
SUM(estimated_tokens) AS total_tokens,
AVG(estimated_tokens) AS avg_tokens
FROM tasks
{where}
GROUP BY task_type
ORDER BY total_tokens DESC
{limit_clause}
"""
cursor = await db.execute(query, params)
rows = await cursor.fetchall()
return [dict(row) for row in rows]
110 changes: 110 additions & 0 deletions tests/test_tool_summary.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
"""Tests for per-tool-type token aggregation (#7)."""

from __future__ import annotations

import time

import aiosqlite
import pytest
from httpx import AsyncClient

from context_analyzer_tool.db import events as db_events
from context_analyzer_tool.db import tasks as db_tasks


async def _insert_task_with_tokens(
db: aiosqlite.Connection,
*,
session_id: str,
task_type: str,
estimated_tokens: int | None,
) -> None:
ts = int(time.time() * 1000)
event_id = await db_events.insert_event(
db,
session_id=session_id,
event_type="PostToolUse",
timestamp_ms=ts,
payload_json="{}",
tool_name=task_type,
)
await db_tasks.insert_task(
db,
session_id=session_id,
event_id=event_id,
task_type=task_type,
timestamp_ms=ts,
estimated_tokens=estimated_tokens,
)


@pytest.mark.asyncio
async def test_get_tool_type_summary_aggregates_and_ranks(
db_connection: aiosqlite.Connection,
) -> None:
await _insert_task_with_tokens(
db_connection, session_id="s1", task_type="Bash", estimated_tokens=4500
)
await _insert_task_with_tokens(
db_connection, session_id="s1", task_type="Bash", estimated_tokens=1500
)
await _insert_task_with_tokens(
db_connection, session_id="s1", task_type="Read", estimated_tokens=3000
)
await _insert_task_with_tokens(
db_connection, session_id="s1", task_type="Edit", estimated_tokens=500
)
await _insert_task_with_tokens(
db_connection, session_id="s1", task_type="Edit", estimated_tokens=None
)

rows = await db_tasks.get_tool_type_summary(db_connection)

assert len(rows) == 3
assert rows[0]["task_type"] == "Bash"
assert rows[0]["call_count"] == 2
assert rows[0]["total_tokens"] == 6000
assert rows[1]["task_type"] == "Read"
assert rows[2]["task_type"] == "Edit"
assert rows[2]["call_count"] == 1


@pytest.mark.asyncio
async def test_get_tool_type_summary_session_filter(
db_connection: aiosqlite.Connection,
) -> None:
await _insert_task_with_tokens(
db_connection, session_id="s1", task_type="Bash", estimated_tokens=1000
)
await _insert_task_with_tokens(
db_connection, session_id="s2", task_type="Read", estimated_tokens=9000
)

rows = await db_tasks.get_tool_type_summary(db_connection, session_id="s2")

assert len(rows) == 1
assert rows[0]["task_type"] == "Read"
assert rows[0]["total_tokens"] == 9000


@pytest.mark.asyncio
async def test_api_tool_summary(
app_client: AsyncClient,
db_connection: aiosqlite.Connection,
) -> None:
"""GET /api/tool-summary returns ranked tool aggregates."""
await _insert_task_with_tokens(
db_connection, session_id="s-api", task_type="Bash", estimated_tokens=4500
)
await _insert_task_with_tokens(
db_connection, session_id="s-api", task_type="Read", estimated_tokens=3000
)

resp = await app_client.get("/api/tool-summary")

assert resp.status_code == 200
body = resp.json()
assert body["total_tokens"] == 7500
assert len(body["tools"]) == 2
assert body["tools"][0]["task_type"] == "Bash"
assert body["tools"][0]["total_tokens"] == 4500