Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 196 additions & 0 deletions codec_email_triage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
"""CODEC Email Triage — read-only inbox classification + ranked digest.

v1 is deliberately READ-ONLY: it reads a recent inbox window via the user's
existing Gmail OAuth (the same `codec_google_auth.build_service` path
google_gmail uses), classifies each message, and returns a ranked digest.
It applies NO labels, creates NO drafts, and sends NOTHING — those are later,
consent-gated phases (outbound is bridge-only per the CODEC operating
principles).

Local-first: classification runs on the LOCAL Qwen (config llm_base_url) by
default, so email content never leaves the machine. The digest that comes back
is metadata only (sender / subject / category / priority / one-line reason).

This is the engine (Gmail API + LLM live here) so skills/email_triage.py stays
thin and passes the SkillRegistry AST gate.
"""
from __future__ import annotations

import json
import logging
import os
import re
from typing import Optional

log = logging.getLogger("codec_email_triage")

_CONFIG_PATH = os.path.expanduser("~/.codec/config.json")
CATEGORIES = ("lead", "support", "personal", "transactional", "noise")
PRIORITIES = ("high", "medium", "low")
_PRIORITY_RANK = {"high": 0, "medium": 1, "low": 2}
_CATEGORY_RANK = {c: i for i, c in enumerate(CATEGORIES)}
_MAX_MESSAGES = 25
_SNIPPET_CHARS = 200

_CLASSIFY_SYSTEM = (
"You are an email triage classifier. For EACH email you are given, decide:\n"
" category: one of lead, support, personal, transactional, noise\n"
" (lead=sales/business opportunity/new inquiry; support=help/issue/question "
"from an existing contact; personal=from a person you know, non-business; "
"transactional=receipts/calendar/automated account notices; noise=newsletters/"
"marketing/spam)\n"
" priority: one of high, medium, low (how much it needs a human reply soon)\n"
" reason: <= 12 words, why.\n"
"Return ONLY a JSON array, one object per email, each "
'{"idx": <int>, "category": "...", "priority": "...", "reason": "..."}. '
"No prose, no code fences."
)


def _load_cfg() -> dict:
try:
with open(_CONFIG_PATH, encoding="utf-8") as f:
return json.load(f)
except (OSError, json.JSONDecodeError):
return {}


def _gmail_service():
"""Reuse the same authed Gmail service google_gmail uses. Raises if the
user hasn't connected Google (caller turns that into a friendly message)."""
import sys
repo = os.path.dirname(os.path.abspath(__file__))
if repo not in sys.path:
sys.path.insert(0, repo)
from codec_google_auth import build_service
return build_service("gmail", "v1")


def fetch_recent(max_messages: int = _MAX_MESSAGES, query: str = "is:inbox",
service=None) -> list[dict]:
"""Recent inbox messages as {id, sender, subject, snippet, unread, date}.
Read-only — list + get(metadata/snippet) only."""
svc = service or _gmail_service()
max_messages = max(1, min(max_messages, 50))
listing = svc.users().messages().list(
userId="me", q=query, maxResults=max_messages).execute()
out = []
for ref in listing.get("messages", []):
try:
msg = svc.users().messages().get(
userId="me", id=ref["id"],
format="metadata",
metadataHeaders=["From", "Subject", "Date"]).execute()
except Exception as e:
log.debug("message fetch failed (%s): %s", ref.get("id"), e)
continue
headers = {h["name"]: h["value"] for h in msg.get("payload", {}).get("headers", [])}
sender = headers.get("From", "Unknown")
if "<" in sender:
sender = sender.split("<")[0].strip().strip('"') or sender
out.append({
"id": ref["id"],
"sender": sender,
"subject": headers.get("Subject", "(no subject)"),
"snippet": (msg.get("snippet", "") or "")[:_SNIPPET_CHARS],
"unread": "UNREAD" in msg.get("labelIds", []),
"date": headers.get("Date", ""),
})
return out


def _parse_classification(raw: str, n: int) -> dict:
"""Parse the LLM's JSON array into {idx: {category, priority, reason}}.
Tolerant: strips code fences; coerces unknown enums; returns {} on failure
so the caller falls back to 'unclassified'."""
if not raw:
return {}
s = raw.strip()
if s.startswith("```"):
s = re.sub(r"^```(?:json)?\s*", "", s)
s = re.sub(r"\s*```\s*$", "", s)
# grab the first JSON array if there's surrounding prose
m = re.search(r"\[.*\]", s, re.DOTALL)
if m:
s = m.group(0)
try:
arr = json.loads(s)
except (json.JSONDecodeError, ValueError):
return {}
by_idx = {}
for obj in arr if isinstance(arr, list) else []:
try:
idx = int(obj.get("idx"))
except (TypeError, ValueError):
continue
if not (0 <= idx < n):
continue
cat = str(obj.get("category", "")).lower().strip()
pri = str(obj.get("priority", "")).lower().strip()
by_idx[idx] = {
"category": cat if cat in CATEGORIES else "noise",
"priority": pri if pri in PRIORITIES else "medium",
"reason": str(obj.get("reason", ""))[:120],
}
return by_idx


def classify(messages: list[dict], *, base_url: Optional[str] = None,
model: Optional[str] = None, timeout: int = 60) -> list[dict]:
"""Classify messages with the LOCAL LLM in ONE call. Returns each message
enriched with category/priority/reason. On LLM failure, every message is
'unclassified'/'medium' (read-only digest still works)."""
if not messages:
return []
cfg = _load_cfg()
base_url = base_url or cfg.get("llm_base_url", "http://localhost:8083/v1")
model = model or cfg.get("llm_model", "local-qwen")
listing = "\n".join(
f'{i}) From: {m["sender"]} | Subject: {m["subject"]} | {m["snippet"]}'
for i, m in enumerate(messages)
)
raw = ""
try:
import codec_llm
raw = codec_llm.call(
[{"role": "system", "content": _CLASSIFY_SYSTEM},
{"role": "user", "content": listing}],
base_url=base_url, model=model, max_tokens=2000,
temperature=0.1, timeout=timeout)
except Exception as e:
log.warning("triage classify LLM call failed: %s", e)
by_idx = _parse_classification(raw, len(messages))
enriched = []
for i, m in enumerate(messages):
c = by_idx.get(i)
enriched.append({
**m,
"category": c["category"] if c else "unclassified",
"priority": c["priority"] if c else "medium",
"reason": c["reason"] if c else "",
})
return enriched


def _rank_key(item: dict) -> tuple:
return (_PRIORITY_RANK.get(item.get("priority"), 1),
_CATEGORY_RANK.get(item.get("category"), len(CATEGORIES)),
0 if item.get("unread") else 1)


def triage(max_messages: int = _MAX_MESSAGES, query: str = "is:inbox",
service=None, **classify_kwargs) -> dict:
"""Read-only triage: fetch → classify (local) → rank. Returns
{count, query, items:[...ranked], by_priority, by_category}."""
messages = fetch_recent(max_messages, query, service=service)
if not messages:
return {"count": 0, "query": query, "items": [],
"by_priority": {}, "by_category": {}}
items = classify(messages, **classify_kwargs)
items.sort(key=_rank_key)
by_pri, by_cat = {}, {}
for it in items:
by_pri[it["priority"]] = by_pri.get(it["priority"], 0) + 1
by_cat[it["category"]] = by_cat.get(it["category"], 0) + 1
return {"count": len(items), "query": query, "items": items,
"by_priority": by_pri, "by_category": by_cat}
1 change: 1 addition & 0 deletions skills/.manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
"cookbook_stop.py": "8b51e04ef08e08899df24d032af7a609bef7bf02ad72be5badf95eb0a2b1ce64",
"create_skill.py": "28070280abfe2179d5c9b33eee74b4db5a5dd085f76a09f02d516302ba330036",
"delegate.py": "7c595d5605cd9913a8afa331ae0013861d6996f378f8a59aca0249f1b2f3a474",
"email_triage.py": "d7b9032b81179f58c5aff660c34f9b8edf212a8ce7651d307306e4757a8daaf2",
"fact_extract.py": "a43ed03b8c51704415f4135de46a44e097f757305af3921dd389b8dfd0540906",
"file_ops.py": "890f581c891a2c89dbd51177cf7b8152dba224a590a7f526737e9003f9046dce",
"file_search.py": "422274fb2386a1e3606a9f5539b3be6a381ebb4c0d968957b14150299a25eb34",
Expand Down
65 changes: 65 additions & 0 deletions skills/email_triage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""CODEC Skill: Email triage — read-only ranked inbox digest (local-classified)."""
import re

from codec_email_triage import triage

SKILL_NAME = "email_triage"
SKILL_DESCRIPTION = (
"Triage your Gmail inbox: read a recent window, classify each message "
"(lead/support/personal/transactional/noise + priority) on the LOCAL model, "
"and return a ranked digest. Read-only — applies no labels, sends nothing."
)
SKILL_TAGS = ["email", "triage", "gmail", "inbox", "productivity"]
SKILL_TRIGGERS = [
"triage my email", "triage inbox", "triage my inbox", "email triage",
"prioritize my email", "what's important in my inbox", "sort my inbox",
]
SKILL_MCP_EXPOSE = True # read-only digest (metadata only); mirrors google_gmail's exposure

_PRI_EMOJI = {"high": "🔴", "medium": "🟡", "low": "⚪"}
_PRI_ORDER = ("high", "medium", "low")


def _parse_count(task: str, default: int = 25) -> int:
m = re.search(r"\b(\d{1,2})\b", task or "")
if m:
try:
return max(1, min(int(m.group(1)), 50))
except ValueError:
pass
return default


def run(task, app="", ctx=""):
low = (task or "").lower()
query = "is:unread is:inbox" if "unread" in low else "is:inbox"
count = _parse_count(task)
try:
result = triage(max_messages=count, query=query)
except Exception as e:
msg = str(e).lower()
if "credential" in msg or "token" in msg or "auth" in msg:
return "I can't reach your Gmail — connect Google first (the google auth flow), then try again."
return f"Email triage failed: {e}"

items = result.get("items", [])
if not items:
scope = "unread " if "unread" in query else ""
return f"No {scope}inbox messages to triage."

bp = result.get("by_priority", {})
head = (f"📥 Inbox triage — {result['count']} message"
f"{'s' if result['count'] != 1 else ''} "
f"({bp.get('high', 0)} high, {bp.get('medium', 0)} medium, {bp.get('low', 0)} low)")
lines = [head]
for pri in _PRI_ORDER:
group = [it for it in items if it.get("priority") == pri]
if not group:
continue
lines.append(f"\n{_PRI_EMOJI[pri]} {pri.upper()}")
for it in group:
star = "* " if it.get("unread") else " "
reason = f" — {it['reason']}" if it.get("reason") else ""
lines.append(f" {star}[{it.get('category')}] {it.get('sender')} — "
f"{it.get('subject')}{reason}")
return "\n".join(lines)
Loading