diff --git a/CHANGELOG.md b/CHANGELOG.md index 0391521..81a64f8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## Unreleased + +### Added + +- **`context-analyzer-tool export`** (#13) — export `events`, `tasks`, `snapshots`, or `anomalies` as CSV or JSON, with optional `--days` filter and `--output` file path. + ## 0.3.1 (2026-04-08) ### Bug Fixes diff --git a/README.md b/README.md index 207f669..4a2bd17 100644 --- a/README.md +++ b/README.md @@ -78,6 +78,7 @@ context-analyzer-tool context-cost Show context cost breakdown context-analyzer-tool health Collector health check context-analyzer-tool rtk-status Show RTK integration status and savings context-analyzer-tool prune Clean up old data +context-analyzer-tool export Export data as CSV or JSON context-analyzer-tool clear Clear all stored data and start fresh ``` diff --git a/src/context_analyzer_tool/cli.py b/src/context_analyzer_tool/cli.py index 2aa6d7b..a42b27d 100644 --- a/src/context_analyzer_tool/cli.py +++ b/src/context_analyzer_tool/cli.py @@ -6,9 +6,10 @@ import logging import os import shutil +import sys from datetime import UTC, datetime from pathlib import Path -from typing import Any, cast +from typing import Any, Literal, cast import typer from rich.console import Console @@ -1103,3 +1104,80 @@ async def _run() -> None: await db.close() asyncio.run(_run()) + + +ExportFormat = Literal["csv", "json"] +ExportTable = Literal["events", "tasks", "snapshots", "anomalies"] + + +@app.command() +def export( + table: ExportTable = typer.Option(..., "--table", help="Table to export"), + fmt: ExportFormat = typer.Option( + "csv", + "--format", + "-f", + help="Output format", + ), + output: Path | None = typer.Option( + None, + "--output", + "-o", + help="Output file path (default: stdout)", + ), + days: int | None = typer.Option( + None, + "--days", + help="Limit export to rows from the last N days", + ), +) -> None: + """Export collected data as CSV or JSON for backup or external analysis.""" + import asyncio + + from context_analyzer_tool.config import get_db_path + from context_analyzer_tool.db.export import ( + EXPORT_TABLES, + fetch_export_rows, + format_rows_as_csv, + format_rows_as_json, + get_table_column_names, + ) + from context_analyzer_tool.db.schema import open_db + + cfg = load_config() + db_path = get_db_path(cfg) + if not Path(db_path).exists(): + console.print("[dim]No database found. Nothing to export.[/dim]") + raise typer.Exit(1) + + async def _run() -> tuple[list[dict[str, Any]], list[str]]: + db = await open_db(db_path) + try: + rows = await fetch_export_rows(db, table, days=days) + db_table, _ = EXPORT_TABLES[table] + columns = await get_table_column_names(db, db_table) + return rows, columns + finally: + await db.close() + + try: + rows, columns = asyncio.run(_run()) + except ValueError as exc: + console.print(f"[red]Error:[/red] {exc}") + raise typer.Exit(1) from None + + if fmt == "csv": + content = format_rows_as_csv(rows, fieldnames=columns) + else: + content = format_rows_as_json(rows, pretty=output is not None) + + if output is not None: + output.parent.mkdir(parents=True, exist_ok=True) + output.write_text(content, encoding="utf-8") + console.print( + f"[green]Exported {len(rows)} row(s)[/green] from " + f"[bold]{table}[/bold] to {output}" + ) + return + + sys.stdout.write(content) diff --git a/src/context_analyzer_tool/db/export.py b/src/context_analyzer_tool/db/export.py new file mode 100644 index 0000000..0f6bf40 --- /dev/null +++ b/src/context_analyzer_tool/db/export.py @@ -0,0 +1,81 @@ +"""Export collected data as CSV or JSON.""" + +from __future__ import annotations + +import csv +import io +import json +import time +from typing import Any, Literal + +import aiosqlite + +ExportTable = Literal["events", "tasks", "snapshots", "anomalies"] + +# CLI table name -> (SQLite table, timestamp column for --days filter) +EXPORT_TABLES: dict[ExportTable, tuple[str, str]] = { + "events": ("events", "timestamp_ms"), + "tasks": ("tasks", "timestamp_ms"), + "snapshots": ("token_snapshots", "timestamp_ms"), + "anomalies": ("anomalies", "timestamp_ms"), +} + + +async def get_table_column_names( + db: aiosqlite.Connection, + table: str, +) -> list[str]: + """Return column names for *table* in schema order.""" + cursor = await db.execute(f"PRAGMA table_info({table})") # noqa: S608 + rows = await cursor.fetchall() + return [str(row[1]) for row in rows] + + +async def fetch_export_rows( + db: aiosqlite.Connection, + table: ExportTable, + *, + days: int | None = None, +) -> list[dict[str, Any]]: + """Return all rows for an exportable table, optionally limited to recent days.""" + db_table, ts_col = EXPORT_TABLES[table] + clauses: list[str] = [] + params: list[int] = [] + + if days is not None: + if days <= 0: + msg = "--days must be a positive integer" + raise ValueError(msg) + cutoff_ms = int((time.time() - days * 86400) * 1000) + clauses.append(f"{ts_col} >= ?") + params.append(cutoff_ms) + + where = f"WHERE {' AND '.join(clauses)}" if clauses else "" + query = ( + f"SELECT * FROM {db_table} {where} ORDER BY {ts_col} ASC" # noqa: S608 + ) + cursor = await db.execute(query, params) + rows = await cursor.fetchall() + return [dict(row) for row in rows] + + +def format_rows_as_csv( + rows: list[dict[str, Any]], + *, + fieldnames: list[str] | None = None, +) -> str: + """Serialize *rows* as CSV text.""" + columns = fieldnames or (list(rows[0].keys()) if rows else []) + buffer = io.StringIO() + writer = csv.DictWriter(buffer, fieldnames=columns, extrasaction="ignore") + writer.writeheader() + for row in rows: + writer.writerow({key: row.get(key, "") for key in columns}) + return buffer.getvalue() + + +def format_rows_as_json(rows: list[dict[str, Any]], *, pretty: bool = False) -> str: + """Serialize *rows* as a JSON array.""" + if pretty: + return json.dumps(rows, indent=2, default=str) + "\n" + return json.dumps(rows, default=str) + "\n" diff --git a/tests/test_export.py b/tests/test_export.py new file mode 100644 index 0000000..6b4c71b --- /dev/null +++ b/tests/test_export.py @@ -0,0 +1,226 @@ +"""Tests for data export (#13).""" + +from __future__ import annotations + +import csv +import io +import json +import time +from pathlib import Path + +import aiosqlite +import pytest +from typer.testing import CliRunner + +from context_analyzer_tool.cli import app +from context_analyzer_tool.config import CATConfig +from context_analyzer_tool.db import anomalies as db_anomalies +from context_analyzer_tool.db import events as db_events +from context_analyzer_tool.db import tasks as db_tasks +from context_analyzer_tool.db.export import ( + fetch_export_rows, + format_rows_as_csv, + format_rows_as_json, +) +from context_analyzer_tool.db.schema import open_db, run_migrations + + +@pytest.mark.asyncio +async def test_fetch_export_rows_respects_days_filter( + db_connection: aiosqlite.Connection, +) -> None: + now_ms = int(time.time() * 1000) + old_ms = now_ms - (10 * 86400 * 1000) + + await db_events.insert_event( + db_connection, + session_id="s1", + event_type="PostToolUse", + timestamp_ms=old_ms, + payload_json="{}", + ) + await db_events.insert_event( + db_connection, + session_id="s1", + event_type="PostToolUse", + timestamp_ms=now_ms, + payload_json="{}", + ) + + rows = await fetch_export_rows(db_connection, "events", days=7) + + assert len(rows) == 1 + assert rows[0]["timestamp_ms"] == now_ms + + +@pytest.mark.asyncio +async def test_fetch_export_rows_snapshots_table_alias( + db_connection: aiosqlite.Connection, +) -> None: + ts = int(time.time() * 1000) + await db_events.insert_snapshot( + db_connection, + session_id="s1", + timestamp_ms=ts, + total_input_tokens=100, + total_output_tokens=20, + cache_creation_input_tokens=0, + cache_read_input_tokens=0, + context_window_size=200_000, + used_percentage=1.0, + total_cost_usd=0.01, + model_id="claude-test", + ) + + rows = await fetch_export_rows(db_connection, "snapshots") + + assert len(rows) == 1 + assert rows[0]["session_id"] == "s1" + assert rows[0]["total_input_tokens"] == 100 + + +def test_format_rows_as_csv_and_json() -> None: + rows = [{"task_type": "Bash", "call_count": 2, "total_tokens": 6000}] + + csv_text = format_rows_as_csv(rows) + parsed = list(csv.DictReader(io.StringIO(csv_text))) + assert parsed[0]["task_type"] == "Bash" + assert parsed[0]["total_tokens"] == "6000" + + json_text = format_rows_as_json(rows) + assert json.loads(json_text) == rows + + +@pytest.mark.asyncio +async def test_export_tasks_includes_estimated_tokens( + db_connection: aiosqlite.Connection, +) -> None: + ts = int(time.time() * 1000) + event_id = await db_events.insert_event( + db_connection, + session_id="s1", + event_type="PostToolUse", + timestamp_ms=ts, + payload_json="{}", + tool_name="Read", + ) + await db_tasks.insert_task( + db_connection, + session_id="s1", + event_id=event_id, + task_type="Read", + timestamp_ms=ts, + estimated_tokens=1234, + ) + + rows = await fetch_export_rows(db_connection, "tasks") + + assert len(rows) == 1 + assert rows[0]["estimated_tokens"] == 1234 + + +@pytest.mark.asyncio +async def test_export_anomalies_round_trip( + db_connection: aiosqlite.Connection, +) -> None: + ts = int(time.time() * 1000) + await db_anomalies.insert_anomaly( + db_connection, + session_id="s1", + task_type="Bash", + token_cost=9000, + z_score=3.5, + cause="Large output", + severity="high", + suggestion="Use ripgrep", + timestamp_ms=ts, + ) + + rows = await fetch_export_rows(db_connection, "anomalies") + + assert len(rows) == 1 + assert rows[0]["task_type"] == "Bash" + assert rows[0]["cause"] == "Large output" + + +def test_cli_export_writes_csv_file( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + async def _seed() -> None: + db = await open_db(str(tmp_path / "test.db")) + await run_migrations(db) + ts = int(time.time() * 1000) + await db_events.insert_event( + db, + session_id="s-cli", + event_type="PostToolUse", + timestamp_ms=ts, + payload_json='{"tool":"Read"}', + tool_name="Read", + ) + await db.close() + + import asyncio + + asyncio.run(_seed()) + + cfg = CATConfig() + cfg.collector.db_path = str(tmp_path / "test.db") + monkeypatch.setattr("context_analyzer_tool.cli.load_config", lambda: cfg) + + out_file = tmp_path / "events.csv" + runner = CliRunner() + result = runner.invoke( + app, + ["export", "--table", "events", "--format", "csv", "--output", str(out_file)], + ) + + assert result.exit_code == 0, result.output + assert out_file.exists() + rows = list(csv.DictReader(out_file.open(encoding="utf-8"))) + assert len(rows) == 1 + assert rows[0]["session_id"] == "s-cli" + assert rows[0]["tool_name"] == "Read" + + +def test_cli_export_json_to_stdout( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + async def _seed() -> None: + db = await open_db(str(tmp_path / "test.db")) + await run_migrations(db) + ts = int(time.time() * 1000) + await db_anomalies.insert_anomaly( + db, + session_id="s-json", + task_type="Edit", + token_cost=500, + z_score=2.1, + cause=None, + severity="medium", + suggestion=None, + timestamp_ms=ts, + ) + await db.close() + + import asyncio + + asyncio.run(_seed()) + + cfg = CATConfig() + cfg.collector.db_path = str(tmp_path / "test.db") + monkeypatch.setattr("context_analyzer_tool.cli.load_config", lambda: cfg) + + runner = CliRunner() + result = runner.invoke( + app, + ["export", "--table", "anomalies", "--format", "json"], + ) + + assert result.exit_code == 0, result.output + payload = json.loads(result.stdout) + assert len(payload) == 1 + assert payload[0]["session_id"] == "s-json" + assert payload[0]["task_type"] == "Edit"