Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## Unreleased

### Added

- **`context-analyzer-tool export`** (#13) — export `events`, `tasks`, `snapshots`, or `anomalies` as CSV or JSON, with optional `--days` filter and `--output` file path.

## 0.3.1 (2026-04-08)

### Bug Fixes
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ context-analyzer-tool context-cost Show context cost breakdown
context-analyzer-tool health Collector health check
context-analyzer-tool rtk-status Show RTK integration status and savings
context-analyzer-tool prune Clean up old data
context-analyzer-tool export Export data as CSV or JSON
context-analyzer-tool clear Clear all stored data and start fresh
```

Expand Down
80 changes: 79 additions & 1 deletion src/context_analyzer_tool/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
import logging
import os
import shutil
import sys
from datetime import UTC, datetime
from pathlib import Path
from typing import Any, cast
from typing import Any, Literal, cast

import typer
from rich.console import Console
Expand Down Expand Up @@ -1103,3 +1104,80 @@ async def _run() -> None:
await db.close()

asyncio.run(_run())


ExportFormat = Literal["csv", "json"]
ExportTable = Literal["events", "tasks", "snapshots", "anomalies"]


@app.command()
def export(
table: ExportTable = typer.Option(..., "--table", help="Table to export"),
fmt: ExportFormat = typer.Option(
"csv",
"--format",
"-f",
help="Output format",
),
output: Path | None = typer.Option(
None,
"--output",
"-o",
help="Output file path (default: stdout)",
),
days: int | None = typer.Option(
None,
"--days",
help="Limit export to rows from the last N days",
),
) -> None:
"""Export collected data as CSV or JSON for backup or external analysis."""
import asyncio

from context_analyzer_tool.config import get_db_path
from context_analyzer_tool.db.export import (
EXPORT_TABLES,
fetch_export_rows,
format_rows_as_csv,
format_rows_as_json,
get_table_column_names,
)
from context_analyzer_tool.db.schema import open_db

cfg = load_config()
db_path = get_db_path(cfg)
if not Path(db_path).exists():
console.print("[dim]No database found. Nothing to export.[/dim]")
raise typer.Exit(1)

async def _run() -> tuple[list[dict[str, Any]], list[str]]:
db = await open_db(db_path)
try:
rows = await fetch_export_rows(db, table, days=days)
db_table, _ = EXPORT_TABLES[table]
columns = await get_table_column_names(db, db_table)
return rows, columns
finally:
await db.close()

try:
rows, columns = asyncio.run(_run())
except ValueError as exc:
console.print(f"[red]Error:[/red] {exc}")
raise typer.Exit(1) from None

if fmt == "csv":
content = format_rows_as_csv(rows, fieldnames=columns)
else:
content = format_rows_as_json(rows, pretty=output is not None)

if output is not None:
output.parent.mkdir(parents=True, exist_ok=True)
output.write_text(content, encoding="utf-8")
console.print(
f"[green]Exported {len(rows)} row(s)[/green] from "
f"[bold]{table}[/bold] to {output}"
)
return

sys.stdout.write(content)
81 changes: 81 additions & 0 deletions src/context_analyzer_tool/db/export.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""Export collected data as CSV or JSON."""

from __future__ import annotations

import csv
import io
import json
import time
from typing import Any, Literal

import aiosqlite

ExportTable = Literal["events", "tasks", "snapshots", "anomalies"]

# CLI table name -> (SQLite table, timestamp column for --days filter)
EXPORT_TABLES: dict[ExportTable, tuple[str, str]] = {
"events": ("events", "timestamp_ms"),
"tasks": ("tasks", "timestamp_ms"),
"snapshots": ("token_snapshots", "timestamp_ms"),
"anomalies": ("anomalies", "timestamp_ms"),
}


async def get_table_column_names(
db: aiosqlite.Connection,
table: str,
) -> list[str]:
"""Return column names for *table* in schema order."""
cursor = await db.execute(f"PRAGMA table_info({table})") # noqa: S608
rows = await cursor.fetchall()
return [str(row[1]) for row in rows]


async def fetch_export_rows(
db: aiosqlite.Connection,
table: ExportTable,
*,
days: int | None = None,
) -> list[dict[str, Any]]:
"""Return all rows for an exportable table, optionally limited to recent days."""
db_table, ts_col = EXPORT_TABLES[table]
clauses: list[str] = []
params: list[int] = []

if days is not None:
if days <= 0:
msg = "--days must be a positive integer"
raise ValueError(msg)
cutoff_ms = int((time.time() - days * 86400) * 1000)
clauses.append(f"{ts_col} >= ?")
params.append(cutoff_ms)

where = f"WHERE {' AND '.join(clauses)}" if clauses else ""
query = (
f"SELECT * FROM {db_table} {where} ORDER BY {ts_col} ASC" # noqa: S608
)
cursor = await db.execute(query, params)
rows = await cursor.fetchall()
return [dict(row) for row in rows]


def format_rows_as_csv(
rows: list[dict[str, Any]],
*,
fieldnames: list[str] | None = None,
) -> str:
"""Serialize *rows* as CSV text."""
columns = fieldnames or (list(rows[0].keys()) if rows else [])
buffer = io.StringIO()
writer = csv.DictWriter(buffer, fieldnames=columns, extrasaction="ignore")
writer.writeheader()
for row in rows:
writer.writerow({key: row.get(key, "") for key in columns})
return buffer.getvalue()


def format_rows_as_json(rows: list[dict[str, Any]], *, pretty: bool = False) -> str:
"""Serialize *rows* as a JSON array."""
if pretty:
return json.dumps(rows, indent=2, default=str) + "\n"
return json.dumps(rows, default=str) + "\n"
Loading