diff --git a/codec_email_triage.py b/codec_email_triage.py new file mode 100644 index 0000000..bebca8b --- /dev/null +++ b/codec_email_triage.py @@ -0,0 +1,196 @@ +"""CODEC Email Triage — read-only inbox classification + ranked digest. + +v1 is deliberately READ-ONLY: it reads a recent inbox window via the user's +existing Gmail OAuth (the same `codec_google_auth.build_service` path +google_gmail uses), classifies each message, and returns a ranked digest. +It applies NO labels, creates NO drafts, and sends NOTHING — those are later, +consent-gated phases (outbound is bridge-only per the CODEC operating +principles). + +Local-first: classification runs on the LOCAL Qwen (config llm_base_url) by +default, so email content never leaves the machine. The digest that comes back +is metadata only (sender / subject / category / priority / one-line reason). + +This is the engine (Gmail API + LLM live here) so skills/email_triage.py stays +thin and passes the SkillRegistry AST gate. +""" +from __future__ import annotations + +import json +import logging +import os +import re +from typing import Optional + +log = logging.getLogger("codec_email_triage") + +_CONFIG_PATH = os.path.expanduser("~/.codec/config.json") +CATEGORIES = ("lead", "support", "personal", "transactional", "noise") +PRIORITIES = ("high", "medium", "low") +_PRIORITY_RANK = {"high": 0, "medium": 1, "low": 2} +_CATEGORY_RANK = {c: i for i, c in enumerate(CATEGORIES)} +_MAX_MESSAGES = 25 +_SNIPPET_CHARS = 200 + +_CLASSIFY_SYSTEM = ( + "You are an email triage classifier. For EACH email you are given, decide:\n" + " category: one of lead, support, personal, transactional, noise\n" + " (lead=sales/business opportunity/new inquiry; support=help/issue/question " + "from an existing contact; personal=from a person you know, non-business; " + "transactional=receipts/calendar/automated account notices; noise=newsletters/" + "marketing/spam)\n" + " priority: one of high, medium, low (how much it needs a human reply soon)\n" + " reason: <= 12 words, why.\n" + "Return ONLY a JSON array, one object per email, each " + '{"idx": , "category": "...", "priority": "...", "reason": "..."}. ' + "No prose, no code fences." +) + + +def _load_cfg() -> dict: + try: + with open(_CONFIG_PATH, encoding="utf-8") as f: + return json.load(f) + except (OSError, json.JSONDecodeError): + return {} + + +def _gmail_service(): + """Reuse the same authed Gmail service google_gmail uses. Raises if the + user hasn't connected Google (caller turns that into a friendly message).""" + import sys + repo = os.path.dirname(os.path.abspath(__file__)) + if repo not in sys.path: + sys.path.insert(0, repo) + from codec_google_auth import build_service + return build_service("gmail", "v1") + + +def fetch_recent(max_messages: int = _MAX_MESSAGES, query: str = "is:inbox", + service=None) -> list[dict]: + """Recent inbox messages as {id, sender, subject, snippet, unread, date}. + Read-only — list + get(metadata/snippet) only.""" + svc = service or _gmail_service() + max_messages = max(1, min(max_messages, 50)) + listing = svc.users().messages().list( + userId="me", q=query, maxResults=max_messages).execute() + out = [] + for ref in listing.get("messages", []): + try: + msg = svc.users().messages().get( + userId="me", id=ref["id"], + format="metadata", + metadataHeaders=["From", "Subject", "Date"]).execute() + except Exception as e: + log.debug("message fetch failed (%s): %s", ref.get("id"), e) + continue + headers = {h["name"]: h["value"] for h in msg.get("payload", {}).get("headers", [])} + sender = headers.get("From", "Unknown") + if "<" in sender: + sender = sender.split("<")[0].strip().strip('"') or sender + out.append({ + "id": ref["id"], + "sender": sender, + "subject": headers.get("Subject", "(no subject)"), + "snippet": (msg.get("snippet", "") or "")[:_SNIPPET_CHARS], + "unread": "UNREAD" in msg.get("labelIds", []), + "date": headers.get("Date", ""), + }) + return out + + +def _parse_classification(raw: str, n: int) -> dict: + """Parse the LLM's JSON array into {idx: {category, priority, reason}}. + Tolerant: strips code fences; coerces unknown enums; returns {} on failure + so the caller falls back to 'unclassified'.""" + if not raw: + return {} + s = raw.strip() + if s.startswith("```"): + s = re.sub(r"^```(?:json)?\s*", "", s) + s = re.sub(r"\s*```\s*$", "", s) + # grab the first JSON array if there's surrounding prose + m = re.search(r"\[.*\]", s, re.DOTALL) + if m: + s = m.group(0) + try: + arr = json.loads(s) + except (json.JSONDecodeError, ValueError): + return {} + by_idx = {} + for obj in arr if isinstance(arr, list) else []: + try: + idx = int(obj.get("idx")) + except (TypeError, ValueError): + continue + if not (0 <= idx < n): + continue + cat = str(obj.get("category", "")).lower().strip() + pri = str(obj.get("priority", "")).lower().strip() + by_idx[idx] = { + "category": cat if cat in CATEGORIES else "noise", + "priority": pri if pri in PRIORITIES else "medium", + "reason": str(obj.get("reason", ""))[:120], + } + return by_idx + + +def classify(messages: list[dict], *, base_url: Optional[str] = None, + model: Optional[str] = None, timeout: int = 60) -> list[dict]: + """Classify messages with the LOCAL LLM in ONE call. Returns each message + enriched with category/priority/reason. On LLM failure, every message is + 'unclassified'/'medium' (read-only digest still works).""" + if not messages: + return [] + cfg = _load_cfg() + base_url = base_url or cfg.get("llm_base_url", "http://localhost:8083/v1") + model = model or cfg.get("llm_model", "local-qwen") + listing = "\n".join( + f'{i}) From: {m["sender"]} | Subject: {m["subject"]} | {m["snippet"]}' + for i, m in enumerate(messages) + ) + raw = "" + try: + import codec_llm + raw = codec_llm.call( + [{"role": "system", "content": _CLASSIFY_SYSTEM}, + {"role": "user", "content": listing}], + base_url=base_url, model=model, max_tokens=2000, + temperature=0.1, timeout=timeout) + except Exception as e: + log.warning("triage classify LLM call failed: %s", e) + by_idx = _parse_classification(raw, len(messages)) + enriched = [] + for i, m in enumerate(messages): + c = by_idx.get(i) + enriched.append({ + **m, + "category": c["category"] if c else "unclassified", + "priority": c["priority"] if c else "medium", + "reason": c["reason"] if c else "", + }) + return enriched + + +def _rank_key(item: dict) -> tuple: + return (_PRIORITY_RANK.get(item.get("priority"), 1), + _CATEGORY_RANK.get(item.get("category"), len(CATEGORIES)), + 0 if item.get("unread") else 1) + + +def triage(max_messages: int = _MAX_MESSAGES, query: str = "is:inbox", + service=None, **classify_kwargs) -> dict: + """Read-only triage: fetch → classify (local) → rank. Returns + {count, query, items:[...ranked], by_priority, by_category}.""" + messages = fetch_recent(max_messages, query, service=service) + if not messages: + return {"count": 0, "query": query, "items": [], + "by_priority": {}, "by_category": {}} + items = classify(messages, **classify_kwargs) + items.sort(key=_rank_key) + by_pri, by_cat = {}, {} + for it in items: + by_pri[it["priority"]] = by_pri.get(it["priority"], 0) + 1 + by_cat[it["category"]] = by_cat.get(it["category"], 0) + 1 + return {"count": len(items), "query": query, "items": items, + "by_priority": by_pri, "by_category": by_cat} diff --git a/skills/.manifest.json b/skills/.manifest.json index e65f07e..ad3fb51 100644 --- a/skills/.manifest.json +++ b/skills/.manifest.json @@ -35,6 +35,7 @@ "cookbook_stop.py": "8b51e04ef08e08899df24d032af7a609bef7bf02ad72be5badf95eb0a2b1ce64", "create_skill.py": "28070280abfe2179d5c9b33eee74b4db5a5dd085f76a09f02d516302ba330036", "delegate.py": "7c595d5605cd9913a8afa331ae0013861d6996f378f8a59aca0249f1b2f3a474", + "email_triage.py": "d7b9032b81179f58c5aff660c34f9b8edf212a8ce7651d307306e4757a8daaf2", "fact_extract.py": "a43ed03b8c51704415f4135de46a44e097f757305af3921dd389b8dfd0540906", "file_ops.py": "890f581c891a2c89dbd51177cf7b8152dba224a590a7f526737e9003f9046dce", "file_search.py": "422274fb2386a1e3606a9f5539b3be6a381ebb4c0d968957b14150299a25eb34", diff --git a/skills/email_triage.py b/skills/email_triage.py new file mode 100644 index 0000000..a511e95 --- /dev/null +++ b/skills/email_triage.py @@ -0,0 +1,65 @@ +"""CODEC Skill: Email triage — read-only ranked inbox digest (local-classified).""" +import re + +from codec_email_triage import triage + +SKILL_NAME = "email_triage" +SKILL_DESCRIPTION = ( + "Triage your Gmail inbox: read a recent window, classify each message " + "(lead/support/personal/transactional/noise + priority) on the LOCAL model, " + "and return a ranked digest. Read-only — applies no labels, sends nothing." +) +SKILL_TAGS = ["email", "triage", "gmail", "inbox", "productivity"] +SKILL_TRIGGERS = [ + "triage my email", "triage inbox", "triage my inbox", "email triage", + "prioritize my email", "what's important in my inbox", "sort my inbox", +] +SKILL_MCP_EXPOSE = True # read-only digest (metadata only); mirrors google_gmail's exposure + +_PRI_EMOJI = {"high": "🔴", "medium": "🟡", "low": "⚪"} +_PRI_ORDER = ("high", "medium", "low") + + +def _parse_count(task: str, default: int = 25) -> int: + m = re.search(r"\b(\d{1,2})\b", task or "") + if m: + try: + return max(1, min(int(m.group(1)), 50)) + except ValueError: + pass + return default + + +def run(task, app="", ctx=""): + low = (task or "").lower() + query = "is:unread is:inbox" if "unread" in low else "is:inbox" + count = _parse_count(task) + try: + result = triage(max_messages=count, query=query) + except Exception as e: + msg = str(e).lower() + if "credential" in msg or "token" in msg or "auth" in msg: + return "I can't reach your Gmail — connect Google first (the google auth flow), then try again." + return f"Email triage failed: {e}" + + items = result.get("items", []) + if not items: + scope = "unread " if "unread" in query else "" + return f"No {scope}inbox messages to triage." + + bp = result.get("by_priority", {}) + head = (f"📥 Inbox triage — {result['count']} message" + f"{'s' if result['count'] != 1 else ''} " + f"({bp.get('high', 0)} high, {bp.get('medium', 0)} medium, {bp.get('low', 0)} low)") + lines = [head] + for pri in _PRI_ORDER: + group = [it for it in items if it.get("priority") == pri] + if not group: + continue + lines.append(f"\n{_PRI_EMOJI[pri]} {pri.upper()}") + for it in group: + star = "* " if it.get("unread") else " " + reason = f" — {it['reason']}" if it.get("reason") else "" + lines.append(f" {star}[{it.get('category')}] {it.get('sender')} — " + f"{it.get('subject')}{reason}") + return "\n".join(lines) diff --git a/tests/test_email_triage.py b/tests/test_email_triage.py new file mode 100644 index 0000000..58135d3 --- /dev/null +++ b/tests/test_email_triage.py @@ -0,0 +1,201 @@ +"""CODEC Email Triage tests — read-only inbox digest. + +The Gmail service and the local LLM are mocked, so the suite runs offline and +touches no real inbox. Triage is read-only by design — these tests also assert +it never calls a mutating Gmail method. +""" +from __future__ import annotations + +import json +import sys +from pathlib import Path + +_REPO = Path(__file__).resolve().parents[1] +if str(_REPO) not in sys.path: + sys.path.insert(0, str(_REPO)) + +import codec_email_triage as et # noqa: E402 + + +# ── fake Gmail service ─────────────────────────────────────────────────────── +class _Exec: + def __init__(self, v): + self._v = v + + def execute(self): + return self._v + + +class _Msgs: + def __init__(self, ids, recorder=None): + self._ids = ids + self._rec = recorder + + def list(self, **k): + if self._rec is not None: + self._rec.append(("list", k)) + return _Exec({"messages": [{"id": i} for i in self._ids]}) + + def get(self, **k): + if self._rec is not None: + self._rec.append(("get", k)) + i = k["id"] + return _Exec({ + "payload": {"headers": [ + {"name": "From", "value": f'Person {i} '}, + {"name": "Subject", "value": f"Subject {i}"}, + {"name": "Date", "value": "Mon, 1 Jun 2026 09:00:00 +0000"}, + ]}, + "snippet": "x" * 500, + "labelIds": (["UNREAD"] if i == "1" else []), + }) + + +class FakeSvc: + def __init__(self, ids=("1", "2", "3"), recorder=None): + self._ids = list(ids) + self._rec = recorder + + def users(self): + outer = self + + class _U: + def messages(self): + return _Msgs(outer._ids, outer._rec) + return _U() + + +# ── fetch_recent ───────────────────────────────────────────────────────────── +class TestFetch: + def test_parses_and_cleans(self): + msgs = et.fetch_recent(service=FakeSvc(("1", "2"))) + assert len(msgs) == 2 + assert msgs[0]["sender"] == "Person 1" # stripped + assert msgs[0]["subject"] == "Subject 1" + assert msgs[0]["unread"] is True and msgs[1]["unread"] is False + assert len(msgs[0]["snippet"]) == 200 # truncated + + def test_uses_only_readonly_calls(self): + rec = [] + et.fetch_recent(service=FakeSvc(("1",), recorder=rec)) + methods = {m for m, _ in rec} + assert methods <= {"list", "get"}, f"triage must be read-only, used: {methods}" + + +# ── classification parsing ─────────────────────────────────────────────────── +class TestParse: + def test_plain_json(self): + raw = json.dumps([{"idx": 0, "category": "lead", "priority": "high", "reason": "r"}]) + out = et._parse_classification(raw, 1) + assert out[0]["category"] == "lead" and out[0]["priority"] == "high" + + def test_code_fence_stripped(self): + raw = "```json\n[{\"idx\":0,\"category\":\"support\",\"priority\":\"low\",\"reason\":\"r\"}]\n```" + assert et._parse_classification(raw, 1)[0]["category"] == "support" + + def test_prose_wrapped_array(self): + raw = 'Here you go:\n[{"idx":0,"category":"noise","priority":"low","reason":"ad"}]\nThanks!' + assert et._parse_classification(raw, 1)[0]["category"] == "noise" + + def test_unknown_enums_coerced(self): + raw = json.dumps([{"idx": 0, "category": "BOGUS", "priority": "urgent", "reason": "r"}]) + out = et._parse_classification(raw, 1) + assert out[0]["category"] == "noise" and out[0]["priority"] == "medium" + + def test_out_of_range_idx_skipped(self): + raw = json.dumps([{"idx": 9, "category": "lead", "priority": "high"}]) + assert et._parse_classification(raw, 1) == {} + + def test_garbage_returns_empty(self): + assert et._parse_classification("not json at all", 3) == {} + assert et._parse_classification("", 3) == {} + + +# ── classify ───────────────────────────────────────────────────────────────── +class TestClassify: + def test_single_llm_call_enriches_all(self, monkeypatch): + calls = {"n": 0} + + def fake_call(msgs, **k): + calls["n"] += 1 + return json.dumps([ + {"idx": 0, "category": "lead", "priority": "high", "reason": "a"}, + {"idx": 1, "category": "noise", "priority": "low", "reason": "b"}, + ]) + import codec_llm + monkeypatch.setattr(codec_llm, "call", fake_call) + msgs = [{"sender": "A", "subject": "s1", "snippet": "x"}, + {"sender": "B", "subject": "s2", "snippet": "y"}] + out = et.classify(msgs) + assert calls["n"] == 1, "must classify the whole batch in ONE LLM call" + assert out[0]["category"] == "lead" and out[1]["category"] == "noise" + + def test_llm_failure_falls_back_unclassified(self, monkeypatch): + import codec_llm + monkeypatch.setattr(codec_llm, "call", + lambda m, **k: (_ for _ in ()).throw(RuntimeError("down"))) + out = et.classify([{"sender": "A", "subject": "s", "snippet": "x"}]) + assert out[0]["category"] == "unclassified" and out[0]["priority"] == "medium" + + +# ── triage end-to-end (ranking) ────────────────────────────────────────────── +class TestTriage: + def test_ranks_priority_then_category(self, monkeypatch): + import codec_llm + monkeypatch.setattr(codec_llm, "call", lambda m, **k: json.dumps([ + {"idx": 0, "category": "noise", "priority": "low", "reason": "n"}, + {"idx": 1, "category": "lead", "priority": "high", "reason": "l"}, + {"idx": 2, "category": "support", "priority": "medium", "reason": "s"}, + ])) + r = et.triage(service=FakeSvc(("1", "2", "3"))) + order = [(it["priority"], it["category"]) for it in r["items"]] + assert order == [("high", "lead"), ("medium", "support"), ("low", "noise")] + assert r["by_priority"] == {"high": 1, "medium": 1, "low": 1} + + def test_empty_inbox(self): + r = et.triage(service=FakeSvc(())) + assert r["count"] == 0 and r["items"] == [] + + +# ── skill ──────────────────────────────────────────────────────────────────── +class TestSkill: + def test_discovered_and_exposed(self): + import codec_dispatch + codec_dispatch.load_skills() + reg = codec_dispatch.registry + assert "email_triage" in reg.names() + assert reg.get_mcp_expose("email_triage") is True + + def test_formats_digest(self, monkeypatch): + import skills.email_triage as st + monkeypatch.setattr(st, "triage", lambda max_messages=25, query="is:inbox": { + "count": 2, "query": query, "by_priority": {"high": 1, "low": 1}, + "by_category": {}, "items": [ + {"sender": "Acme", "subject": "Quote?", "category": "lead", + "priority": "high", "reason": "new deal", "unread": True}, + {"sender": "News", "subject": "Weekly", "category": "noise", + "priority": "low", "reason": "newsletter", "unread": False}, + ]}) + out = st.run("triage my inbox") + assert "Acme" in out and "lead" in out and "HIGH" in out and "new deal" in out + + def test_unread_query(self, monkeypatch): + import skills.email_triage as st + seen = {} + monkeypatch.setattr(st, "triage", + lambda max_messages=25, query="is:inbox": + seen.update(query=query) or {"count": 0, "items": []}) + st.run("triage my unread email") + assert seen["query"] == "is:unread is:inbox" + + def test_auth_error_friendly(self, monkeypatch): + import skills.email_triage as st + monkeypatch.setattr(st, "triage", + lambda **k: (_ for _ in ()).throw(RuntimeError("invalid credentials"))) + out = st.run("triage my inbox") + assert "connect google" in out.lower() + + def test_no_messages(self, monkeypatch): + import skills.email_triage as st + monkeypatch.setattr(st, "triage", lambda **k: {"count": 0, "items": []}) + assert "No" in st.run("triage inbox")