From 96bce2267162ffbc43d11acbad8a496762e5d1ef Mon Sep 17 00:00:00 2001 From: Claude Lin & Lay Date: Mon, 25 May 2026 10:20:30 +0900 Subject: [PATCH] refactor(pipeline): split pipeline.ts (1658 lines) into per-surface modules (#147) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `src/pipeline.ts` (1658 行) を per-surface モジュール群 (`src/pipeline/*.ts`) に分割し、 `src/pipeline.ts` 自体は全 export を再エクスポートするバレル (22 行) に縮退させた。 これにより読みやすさと修正局所性が上がり、 将来の surface 追加 (例: 新 GitHub event タイプ) の影響範囲が一モジュールに限定される。 分割した新規ファイル: - `pipeline/ingest-filter.ts` (33 行) — `MIN_COMMENT_BODY_CHARS`, `isBotSender`, `isBodyTooShort` - `pipeline/embedding.ts` (69 行) — Workers AI BGE-M3 ラッパーとバッチ上限定数 - `pipeline/hash.ts` (96 行) — body hash, 各 surface 用 embedding input formatter, `base64UrlEncode`, `sha256Hex` - `pipeline/vector-id.ts` (135 行) — `stableVectorId` (private), 全 surface の vector ID builder。 `base64UrlEncodeBytes` は唯一の呼び出し元である `stableVectorId` と同居させて local 化した - `pipeline/types.ts` (19 行) — issue / release / doc / wiki で共有される `UpsertResult` - `pipeline/embed-issue.ts` (281 行) — `GitHubIssueData`, `processAndUpsertIssue` - `pipeline/embed-release.ts` (168 行) — `GitHubReleaseData`, `processAndUpsertRelease` - `pipeline/embed-doc.ts` (219 行) — `processAndUpsertDoc`, `processAndUpsertWikiDoc` - `pipeline/embed-diff.ts` (309 行) — `GitHubCommitDetail`, `DiffUpsertResult`, `normaliseFileStatus`, `fetchCommitDetail`, `processAndUpsertCommitDiff` - `pipeline/embed-comment.ts` (416 行) — comment / review / review-comment の 3 surface (`ingestIssueComment`, `ingestPRReview`, `ingestPRReviewComment`) と関連型 consumer (`src/poller.ts` / `src/webhook.ts`) の import 文は無変更。 barrel が全 public symbol を再エクスポートするため backward-compat 保証。 動作変更なし。 関数本体・型シグネチャ・コメント・エラー文言は逐語的に移植。 `tsc --noEmit` clean、 `wrangler deploy --dry-run` 成功、 `git diff src/poller.ts src/webhook.ts` 差分ゼロを確認済み。 Closes #147 Co-Authored-By: Claude Opus 4.7 (1M context) --- src/pipeline.ts | 1666 +-------------------------------- src/pipeline/embed-comment.ts | 416 ++++++++ src/pipeline/embed-diff.ts | 309 ++++++ src/pipeline/embed-doc.ts | 219 +++++ src/pipeline/embed-issue.ts | 281 ++++++ src/pipeline/embed-release.ts | 168 ++++ src/pipeline/embedding.ts | 69 ++ src/pipeline/hash.ts | 96 ++ src/pipeline/ingest-filter.ts | 33 + src/pipeline/types.ts | 19 + src/pipeline/vector-id.ts | Bin 0 -> 4243 bytes 11 files changed, 1625 insertions(+), 1651 deletions(-) create mode 100644 src/pipeline/embed-comment.ts create mode 100644 src/pipeline/embed-diff.ts create mode 100644 src/pipeline/embed-doc.ts create mode 100644 src/pipeline/embed-issue.ts create mode 100644 src/pipeline/embed-release.ts create mode 100644 src/pipeline/embedding.ts create mode 100644 src/pipeline/hash.ts create mode 100644 src/pipeline/ingest-filter.ts create mode 100644 src/pipeline/types.ts create mode 100644 src/pipeline/vector-id.ts diff --git a/src/pipeline.ts b/src/pipeline.ts index 7c08ab8..a1981ad 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -4,1655 +4,19 @@ * Provides per-item embedding + upsert functions for issues/PRs, releases, and docs. * The cron poller calls these in a batch loop; the webhook handler calls them for * individual items as events arrive. - */ - -import type { - Env, - IssueRecord, - ReleaseRecord, - DocRecord, - WikiDocRecord, - DiffRecord, - DiffFileStatus, - IssueCommentRecord, - PRReviewRecord, - PRReviewCommentRecord, -} from "./types.js"; -import { upsertFtsRow, deleteFtsRow } from "./fts.js"; - -// ── Ingest filters (shared by webhook + poller paths) ──────── - -/** - * Minimum trimmed body length for comment / review ingest. - * Filters out "LGTM", "+1", emoji-only reactions, etc. - */ -export const MIN_COMMENT_BODY_CHARS = 10; - -/** - * Returns true when the login looks like a GitHub App / bot account. - * - * Bot accounts end in the `[bot]` suffix on the sender.login field. We - * filter them out because bot-authored comments (CI notes, dependabot - * summaries, auto-merge status) add noise without judgment history. - */ -export function isBotSender(login: string | null | undefined): boolean { - if (!login) return false; - return /\[bot\]$/.test(login); -} - -/** - * Returns true when the body is too short (or empty) to carry judgment - * history. Trim first so whitespace-only payloads count as empty. - */ -export function isBodyTooShort(body: string | null | undefined): boolean { - if (!body) return true; - return body.trim().length < MIN_COMMENT_BODY_CHARS; -} - -// ── Constants ──────────────────────────────────────────────── - -/** Maximum characters for embedding input (BGE-M3 context limit ~8192 tokens, conservative char limit) */ -export const MAX_EMBEDDING_INPUT_CHARS = 8000; - -/** - * Maximum number of inputs per Workers AI batch embed call. - * Cloudflare Workers AI does not publish a hard cap on batched embedding inputs, - * so we split large commits into multiple calls. 20 × 8000 chars ≈ 160k chars per - * call keeps payload size comfortably inside observed request limits. - */ -export const MAX_EMBEDDING_BATCH_SIZE = 20; - -/** - * Maximum number of vectors per single Vectorize.upsert call. - * We mirror MAX_EMBEDDING_BATCH_SIZE so each embed batch maps 1:1 onto one upsert. - */ -export const MAX_VECTORIZE_UPSERT_BATCH_SIZE = 20; - -// ── Pure utility functions ─────────────────────────────────── - -/** - * Compute SHA-256 hash of title + body for change detection. - * Returns hex-encoded hash string. - */ -export async function computeBodyHash(title: string, body: string): Promise { - const input = title + "\n\n" + body; - const data = new TextEncoder().encode(input); - const hashBuffer = await crypto.subtle.digest("SHA-256", data); - const hashArray = new Uint8Array(hashBuffer); - return Array.from(hashArray) - .map((b) => b.toString(16).padStart(2, "0")) - .join(""); -} - -/** - * Prepare embedding input text from issue title and body. - * Concatenates title + "\n\n" + body, truncated to MAX_EMBEDDING_INPUT_CHARS. - */ -export function prepareEmbeddingInput(title: string, body: string | null): string { - const text = title + "\n\n" + (body ?? ""); - if (text.length <= MAX_EMBEDDING_INPUT_CHARS) return text; - return text.slice(0, MAX_EMBEDDING_INPUT_CHARS); -} - -/** - * Generate embedding for a text input using Workers AI BGE-M3. - * Returns 1024-dimensional float array. - */ -export async function generateEmbedding( - ai: Ai, - text: string, -): Promise { - const result = await ai.run("@cf/baai/bge-m3", { - text: [text], - }); - - // Workers AI returns { data: [{ values: number[] }] } or similar - const vectors = (result as { data: Array }).data; - if (!vectors || vectors.length === 0) { - throw new Error("Workers AI returned no embedding vectors"); - } - return vectors[0]; -} - -/** - * Generate embeddings for multiple text inputs in one batched Workers AI call. - * Input order is preserved in the returned array. - * - * Workers AI does not publish a hard limit on the number of inputs per call, - * so callers must chunk by MAX_EMBEDDING_BATCH_SIZE before invoking this - * function. Throws if the returned vector count does not match the input count. - */ -export async function generateEmbeddingBatch( - ai: Ai, - texts: string[], -): Promise { - if (texts.length === 0) return []; - - const result = await ai.run("@cf/baai/bge-m3", { text: texts }); - const vectors = (result as { data: Array }).data; - - if (!vectors || vectors.length !== texts.length) { - throw new Error( - `Workers AI returned ${vectors?.length ?? 0} vectors for ${texts.length} inputs`, - ); - } - return vectors; -} - -// ── Vector ID builders ─────────────────────────────────────── -// -// Vectorize enforces a 64-byte cap on vector IDs. The previous scheme embedded -// the repo name + path/tag/sha as plain text and overflowed for long paths -// (e.g. `owner/repo#doc-docs/long-filename.md` hit 74 bytes). We now derive a -// deterministic fixed-length ID by hashing the scheme parts with SHA-256 and -// encoding as base64url (43 chars). A short type prefix preserves surface -// separation and keeps the total under 46 bytes, well inside the 64-byte cap. -// -// Per-surface prefixes: -// "i" — issue / pull request -// "d" — doc -// "r" — release -// "c" — commit diff (file inside a commit) - -/** - * Encode an arbitrary string to URL-safe base64 (RFC 4648 §5) without padding. - * Retained because `stableVectorId` uses it to encode the SHA-256 digest. - */ -export function base64UrlEncode(input: string): string { - // Encode UTF-8 -> binary string -> base64 via btoa - const utf8Bytes = new TextEncoder().encode(input); - let binary = ""; - for (let i = 0; i < utf8Bytes.length; i++) { - binary += String.fromCharCode(utf8Bytes[i]); - } - return btoa(binary) - .replace(/\+/g, "-") - .replace(/\//g, "_") - .replace(/=+$/g, ""); -} - -/** - * Encode raw bytes as URL-safe base64 (RFC 4648 §5) without padding. - * Used to render the SHA-256 digest directly without going through UTF-8. - */ -function base64UrlEncodeBytes(bytes: Uint8Array): string { - let binary = ""; - for (let i = 0; i < bytes.length; i++) { - binary += String.fromCharCode(bytes[i]); - } - return btoa(binary) - .replace(/\+/g, "-") - .replace(/\//g, "_") - .replace(/=+$/g, ""); -} - -/** - * Build a deterministic fixed-length Vectorize vector ID from a type prefix and - * an ordered list of string parts. Parts are joined with a NUL separator (0x00) - * so that no legitimate component can forge a collision across surfaces. - * - * Output format: `{prefix}:{base64url(sha256(part1\0part2\0...))}` - * Output length: 1–2 byte prefix + 1 byte separator + 43 byte digest = 45–46 bytes. - */ -async function stableVectorId( - prefix: string, - ...parts: string[] -): Promise { - const input = parts.join("\u0000"); - const bytes = new TextEncoder().encode(input); - const hashBuffer = await crypto.subtle.digest("SHA-256", bytes); - const digest = base64UrlEncodeBytes(new Uint8Array(hashBuffer)); - return `${prefix}:${digest}`; -} - -/** - * Build Vectorize vector ID from repo and issue number. - * Deterministic SHA-256-based ID under the "i" prefix. - */ -export function vectorId(repo: string, number: number): Promise { - return stableVectorId("i", repo, String(number)); -} - -/** - * Build Vectorize vector ID for a release. - * Deterministic SHA-256-based ID under the "r" prefix. - */ -export function releaseVectorId( - repo: string, - tagName: string, -): Promise { - return stableVectorId("r", repo, tagName); -} - -/** - * Build Vectorize vector ID for a document. - * Deterministic SHA-256-based ID under the "d" prefix. - */ -export function docVectorId(repo: string, path: string): Promise { - return stableVectorId("d", repo, path); -} - -/** - * Build Vectorize vector ID for a wiki page. - * Deterministic SHA-256-based ID under the "w" prefix. - * Wiki page names are URL slugs (dash-separated), unique within a repo's wiki. - */ -export function wikiDocVectorId(repo: string, pageName: string): Promise { - return stableVectorId("w", repo, pageName); -} - -/** - * Build Vectorize vector ID for a commit diff (one file inside one commit). - * Deterministic SHA-256-based ID under the "c" prefix. - */ -export function diffVectorId( - repo: string, - commitSha: string, - filePath: string, -): Promise { - return stableVectorId("c", repo, commitSha, filePath); -} - -/** - * Build Vectorize vector ID for an issue / PR top-level comment. - * Deterministic SHA-256-based ID under the "ic" prefix. - */ -export function issueCommentVectorId( - repo: string, - commentId: number, -): Promise { - return stableVectorId("ic", repo, String(commentId)); -} - -/** - * Build Vectorize vector ID for a PR review (approve / request_changes / comment body). - * Deterministic SHA-256-based ID under the "pv" prefix. - */ -export function prReviewVectorId( - repo: string, - reviewId: number, -): Promise { - return stableVectorId("pv", repo, String(reviewId)); -} - -/** - * Build Vectorize vector ID for a PR inline review comment (per-line diff comment). - * Deterministic SHA-256-based ID under the "pc" prefix. - */ -export function prReviewCommentVectorId( - repo: string, - commentId: number, -): Promise { - return stableVectorId("pc", repo, String(commentId)); -} - -// ── Per-item upsert functions ──────────────────────────────── - -/** GitHub API issue/PR response shape (subset of fields we need) */ -export interface GitHubIssueData { - number: number; - title: string; - body: string | null; - state: "open" | "closed"; - labels: Array<{ name: string }>; - milestone: { title: string } | null; - assignees: Array<{ login: string }>; - created_at: string; - updated_at: string; - pull_request?: { url: string }; - html_url: string; -} - -/** GitHub API release response shape (subset of fields we need) */ -export interface GitHubReleaseData { - id: number; - tag_name: string; - name: string | null; - body: string | null; - prerelease: boolean; - created_at: string; - published_at: string | null; - html_url: string; -} - -/** Result of a single-item upsert operation */ -export interface UpsertResult { - /** Whether embedding was generated (vs skipped because hash unchanged) */ - embedded: boolean; - /** Whether embedding was skipped because content hash matched existing record */ - skippedUnchanged: boolean; - /** Whether Vectorize metadata was updated without re-embedding (state/labels/assignees change) */ - metadataUpdated: boolean; - /** Whether embedding failed (item stored with empty bodyHash for retry) */ - failed: boolean; -} - -/** - * Process and upsert a single issue/PR: check hash, embed if changed, upsert to Vectorize + Store. - * - * @param env - Worker env bindings (AI, VECTORIZE) - * @param storeStub - Durable Object stub for IssueStore - * @param repo - Repository in "owner/repo" format - * @param issue - GitHub issue/PR data - * @returns UpsertResult indicating what happened - */ -export async function processAndUpsertIssue( - env: Env, - storeStub: DurableObjectStub, - repo: string, - issue: GitHubIssueData, -): Promise { - const body = issue.body ?? ""; - const title = issue.title; - const bodyHash = await computeBodyHash(title, body); - - const type: IssueRecord["type"] = issue.pull_request - ? "pull_request" - : "issue"; - - // Check if body has changed by comparing hash with stored value - const existingResp = await storeStub.fetch( - new Request( - `http://store/issue?repo=${encodeURIComponent(repo)}&number=${issue.number}`, - ), - ); - - let needsEmbedding = true; - let existing: IssueRecord | null = null; - if (existingResp.ok) { - existing = (await existingResp.json()) as IssueRecord; - if (existing.bodyHash === bodyHash) { - needsEmbedding = false; - } - } - - if (!needsEmbedding) { - // Hash matched — skip embedding but update IssueStore (metadata may have changed) - const labelNames = issue.labels.map((l) => l.name); - const assigneeLogins = issue.assignees.map((a) => a.login); - const milestoneTitle = issue.milestone?.title ?? ""; - - const record: IssueRecord = { - repo, - number: issue.number, - type, - state: issue.state, - title, - labels: labelNames, - milestone: milestoneTitle, - assignees: assigneeLogins, - bodyHash, - createdAt: issue.created_at, - updatedAt: issue.updated_at, - }; - - await storeStub.fetch( - new Request("http://store/upsert", { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify(record), - }), - ); - - // Check if metadata changed — if so, update Vectorize metadata too - // (Vectorize state/labels/assignees must stay in sync with GitHub) - const sortedLabels = [...labelNames].sort(); - const metadataChanged = existing !== null && ( - existing.state !== issue.state || - existing.title !== title || - [...existing.labels].sort().join(",") !== sortedLabels.join(",") || - existing.milestone !== milestoneTitle || - [...existing.assignees].sort().join(",") !== [...assigneeLogins].sort().join(",") - ); - - if (metadataChanged) { - try { - // Retrieve existing vector values to re-upsert with updated metadata - const vid = await vectorId(repo, issue.number); - const vectors = await env.VECTORIZE.getByIds([vid]); - - if (vectors.length > 0 && vectors[0].values) { - const metadata: Record = { - repo, - number: issue.number, - type, - state: issue.state, - labels: sortedLabels.join(","), - milestone: milestoneTitle, - assignees: assigneeLogins.join(","), - updated_at: issue.updated_at, - label_0: sortedLabels[0] ?? "", - label_1: sortedLabels[1] ?? "", - label_2: sortedLabels[2] ?? "", - label_3: sortedLabels[3] ?? "", - assignee_0: assigneeLogins[0] ?? "", - assignee_1: assigneeLogins[1] ?? "", - }; - - await env.VECTORIZE.upsert([ - { - id: vid, - values: vectors[0].values as number[], - metadata, - }, - ]); - - // Mirror the metadata change onto D1 FTS5 so sparse retrieval stays filterable. - // Content stays the same (no body change), but labels/state/milestone etc. - // on the sparse side must match the dense side for pre-filter consistency. - try { - await upsertFtsRow(env.DB_FTS, { - vectorId: vid, - repo, - type, - state: issue.state, - labels: sortedLabels.join(","), - milestone: milestoneTitle, - assignees: assigneeLogins.join(","), - updatedAt: issue.updated_at, - number: issue.number, - content: prepareEmbeddingInput(title, issue.body), - }); - } catch (ftsErr) { - console.error( - `Failed to update FTS5 metadata for ${repo}#${issue.number}:`, - ftsErr instanceof Error ? ftsErr.message : String(ftsErr), - ); - // Non-fatal: sparse side will catch up on next body change. - } - - return { embedded: false, skippedUnchanged: false, metadataUpdated: true, failed: false }; - } - } catch (err) { - console.error( - `Failed to update Vectorize metadata for ${repo}#${issue.number}:`, - err instanceof Error ? err.message : String(err), - ); - // IssueStore was already updated — Vectorize metadata will catch up on next body change - } - } - - return { embedded: false, skippedUnchanged: true, metadataUpdated: false, failed: false }; - } - - // Content changed — generate embedding - let embeddingSucceeded = false; - try { - const embeddingInput = prepareEmbeddingInput(title, issue.body); - const embedding = await generateEmbedding(env.AI, embeddingInput); - - // Expand labels into individual metadata fields (first 4, sorted) - // for potential Vectorize pre-filtering. Sorted order ensures deterministic - // slot assignment across upserts. - const labelNames = issue.labels.map((l) => l.name).sort(); - const assigneeLogins = issue.assignees.map((a) => a.login); - - const metadata: Record = { - repo, - number: issue.number, - type, - state: issue.state, - labels: labelNames.join(","), - milestone: issue.milestone?.title ?? "", - assignees: assigneeLogins.join(","), - updated_at: issue.updated_at, - // Expanded label fields (first 4, sorted alphabetically) - label_0: labelNames[0] ?? "", - label_1: labelNames[1] ?? "", - label_2: labelNames[2] ?? "", - label_3: labelNames[3] ?? "", - // Expanded assignee fields (first 2) - assignee_0: assigneeLogins[0] ?? "", - assignee_1: assigneeLogins[1] ?? "", - }; - - const vid = await vectorId(repo, issue.number); - await env.VECTORIZE.upsert([ - { - id: vid, - values: embedding, - metadata, - }, - ]); - - // Mirror the same content into D1 FTS5 for sparse (BM25) retrieval. - // Failure here does not invalidate the Vectorize upsert — we still consider the - // embedding successful and rely on the next run to reconcile the sparse side. - try { - await upsertFtsRow(env.DB_FTS, { - vectorId: vid, - repo, - type, - state: issue.state, - labels: labelNames.join(","), - milestone: issue.milestone?.title ?? "", - assignees: assigneeLogins.join(","), - updatedAt: issue.updated_at, - number: issue.number, - content: embeddingInput, - }); - } catch (ftsErr) { - console.error( - `Failed to upsert FTS5 row for ${repo}#${issue.number}:`, - ftsErr instanceof Error ? ftsErr.message : String(ftsErr), - ); - } - - embeddingSucceeded = true; - } catch (err) { - console.error( - `Failed to embed ${repo}#${issue.number}:`, - err instanceof Error ? err.message : String(err), - ); - } - - // Store record — save bodyHash only when embedding succeeded. - // When embedding fails, store empty bodyHash so next attempt retries. - const record: IssueRecord = { - repo, - number: issue.number, - type, - state: issue.state, - title, - labels: issue.labels.map((l) => l.name), - milestone: issue.milestone?.title ?? "", - assignees: issue.assignees.map((a) => a.login), - bodyHash: embeddingSucceeded ? bodyHash : "", - createdAt: issue.created_at, - updatedAt: issue.updated_at, - }; - - await storeStub.fetch( - new Request("http://store/upsert", { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify(record), - }), - ); - - return { - embedded: embeddingSucceeded, - skippedUnchanged: false, - metadataUpdated: false, - failed: !embeddingSucceeded, - }; -} - -/** - * Process and upsert a single release: check hash, embed if changed, upsert to Vectorize + Store. - * - * @param env - Worker env bindings (AI, VECTORIZE) - * @param storeStub - Durable Object stub for IssueStore - * @param repo - Repository in "owner/repo" format - * @param release - GitHub release data - * @returns UpsertResult indicating what happened - */ -export async function processAndUpsertRelease( - env: Env, - storeStub: DurableObjectStub, - repo: string, - release: GitHubReleaseData, -): Promise { - const body = release.body ?? ""; - const name = release.name ?? release.tag_name; - const bodyHash = await computeBodyHash(name, body); - - // Check if release body has changed - const existingResp = await storeStub.fetch( - new Request( - `http://store/release?repo=${encodeURIComponent(repo)}&tag_name=${encodeURIComponent(release.tag_name)}`, - ), - ); - - let needsEmbedding = true; - if (existingResp.ok) { - const existing = (await existingResp.json()) as ReleaseRecord; - if (existing.bodyHash === bodyHash) { - needsEmbedding = false; - } - } - - if (!needsEmbedding) { - // Hash matched — store record but skip embedding - const record: ReleaseRecord = { - repo, - tagName: release.tag_name, - name, - body, - prerelease: release.prerelease, - bodyHash, - createdAt: release.created_at, - publishedAt: release.published_at ?? release.created_at, - }; - - await storeStub.fetch( - new Request("http://store/upsert-release", { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify(record), - }), - ); - - return { embedded: false, skippedUnchanged: true, metadataUpdated: false, failed: false }; - } - - // Content changed — generate embedding - let embeddingSucceeded = false; - try { - const embeddingInput = prepareEmbeddingInput(name, body); - const embedding = await generateEmbedding(env.AI, embeddingInput); - - const metadata: Record = { - repo, - number: 0, - type: "release", - state: "published", - labels: "", - milestone: "", - assignees: "", - updated_at: release.published_at ?? release.created_at, - tag_name: release.tag_name, - }; - - const rvid = await releaseVectorId(repo, release.tag_name); - await env.VECTORIZE.upsert([ - { - id: rvid, - values: embedding, - metadata, - }, - ]); - - // Mirror into D1 FTS5 sparse index. - try { - await upsertFtsRow(env.DB_FTS, { - vectorId: rvid, - repo, - type: "release", - state: "published", - labels: "", - milestone: "", - assignees: "", - updatedAt: release.published_at ?? release.created_at, - tagName: release.tag_name, - content: embeddingInput, - }); - } catch (ftsErr) { - console.error( - `Failed to upsert FTS5 row for release ${repo}#${release.tag_name}:`, - ftsErr instanceof Error ? ftsErr.message : String(ftsErr), - ); - } - - embeddingSucceeded = true; - } catch (err) { - console.error( - `Failed to embed release ${repo}#${release.tag_name}:`, - err instanceof Error ? err.message : String(err), - ); - } - - // Store record - const record: ReleaseRecord = { - repo, - tagName: release.tag_name, - name, - body, - prerelease: release.prerelease, - bodyHash: embeddingSucceeded ? bodyHash : "", - createdAt: release.created_at, - publishedAt: release.published_at ?? release.created_at, - }; - - await storeStub.fetch( - new Request("http://store/upsert-release", { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify(record), - }), - ); - - return { - embedded: embeddingSucceeded, - skippedUnchanged: false, - metadataUpdated: false, - failed: !embeddingSucceeded, - }; -} - -/** - * Process and upsert a single doc: embed content and upsert to Vectorize + Store. - * - * Unlike issues/releases, docs use blob SHA for change detection (handled by the caller). - * This function always generates an embedding — the caller is responsible for determining - * whether the doc content has changed. - * - * @param env - Worker env bindings (AI, VECTORIZE) - * @param storeStub - Durable Object stub for IssueStore - * @param repo - Repository in "owner/repo" format - * @param path - File path within the repo (e.g. "docs/0-requirements.md") - * @param content - Decoded file content - * @param blobSha - Git blob SHA for this version of the file - * @returns UpsertResult indicating what happened - */ -export async function processAndUpsertDoc( - env: Env, - storeStub: DurableObjectStub, - repo: string, - path: string, - content: string, - blobSha: string, -): Promise { - const now = new Date().toISOString(); - - try { - // Generate embedding (use path as title, content as body) - const embeddingInput = prepareEmbeddingInput(path, content); - const embedding = await generateEmbedding(env.AI, embeddingInput); - - const metadata: Record = { - repo, - number: 0, - type: "doc", - state: "active", - labels: "", - milestone: "", - assignees: "", - updated_at: now, - doc_path: path, - }; - - // Upsert vector into Vectorize - const dvid = await docVectorId(repo, path); - await env.VECTORIZE.upsert([ - { - id: dvid, - values: embedding, - metadata, - }, - ]); - - // Mirror into D1 FTS5 sparse index. - try { - await upsertFtsRow(env.DB_FTS, { - vectorId: dvid, - repo, - type: "doc", - state: "active", - labels: "", - milestone: "", - assignees: "", - updatedAt: now, - docPath: path, - content: embeddingInput, - }); - } catch (ftsErr) { - console.error( - `Failed to upsert FTS5 row for doc ${repo}/${path}:`, - ftsErr instanceof Error ? ftsErr.message : String(ftsErr), - ); - } - - // Upsert doc record into store - const record: DocRecord = { - repo, - path, - blobSha, - updatedAt: now, - }; - - await storeStub.fetch( - new Request("http://store/upsert-doc", { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify(record), - }), - ); - - return { embedded: true, skippedUnchanged: false, metadataUpdated: false, failed: false }; - } catch (err) { - console.error( - `Failed to embed doc ${repo}/${path}:`, - err instanceof Error ? err.message : String(err), - ); - return { embedded: false, skippedUnchanged: false, metadataUpdated: false, failed: true }; - } -} - -// ── Wiki doc surface ───────────────────────────────────────── - -/** - * Compute SHA-256 over UTF-8 bytes of the wiki content. Used as the change - * detection signal in lieu of git blob SHAs (the wiki git protocol is not - * exposed via REST, so we hash content directly). - */ -export async function sha256Hex(text: string): Promise { - const bytes = new TextEncoder().encode(text); - const digest = await crypto.subtle.digest("SHA-256", bytes); - const hex = Array.from(new Uint8Array(digest)) - .map((b) => b.toString(16).padStart(2, "0")) - .join(""); - return hex; -} - -/** - * Embed and upsert a single wiki page. - * - * Mirrors `processAndUpsertDoc` but writes vector / FTS / store records under - * the `wiki_doc` type. Vector ID prefix `"w:"` keeps wiki rows in their own - * namespace so they never collide with repo doc rows even when page name and - * doc path coincide. - * - * @param env Worker env bindings (AI, VECTORIZE, DB_FTS) - * @param storeStub Durable Object stub for IssueStore - * @param repo Repository in "owner/repo" format (the wiki belongs to {repo}.wiki) - * @param pageName GitHub Wiki page slug (dash-separated, e.g., "Home" or "Foo-Bar") - * @param extension Markup file extension that serves the page (e.g., "md", "markdown", "org") - * @param content Raw markup content fetched from raw.githubusercontent.com/wiki - * @returns UpsertResult indicating what happened - */ -export async function processAndUpsertWikiDoc( - env: Env, - storeStub: DurableObjectStub, - repo: string, - pageName: string, - extension: string, - content: string, -): Promise { - const now = new Date().toISOString(); - - try { - // Generate embedding (use page name as title surrogate, content as body) - const embeddingInput = prepareEmbeddingInput(pageName, content); - const embedding = await generateEmbedding(env.AI, embeddingInput); - - const metadata: Record = { - repo, - number: 0, - type: "wiki_doc", - state: "active", - labels: "", - milestone: "", - assignees: "", - updated_at: now, - wiki_path: pageName, - wiki_extension: extension, - }; - - const wvid = await wikiDocVectorId(repo, pageName); - await env.VECTORIZE.upsert([ - { - id: wvid, - values: embedding, - metadata, - }, - ]); - - // Mirror into D1 FTS5. We reuse the existing `doc_path` column to store - // the wiki page slug — semantically the same kind of "where did this come - // from" field, distinguished by the row's `type='wiki_doc'`. - try { - await upsertFtsRow(env.DB_FTS, { - vectorId: wvid, - repo, - type: "wiki_doc", - state: "active", - labels: "", - milestone: "", - assignees: "", - updatedAt: now, - docPath: pageName, - content: embeddingInput, - }); - } catch (ftsErr) { - console.error( - `Failed to upsert FTS5 row for wiki ${repo}/${pageName}:`, - ftsErr instanceof Error ? ftsErr.message : String(ftsErr), - ); - } - - const contentHash = await sha256Hex(content); - const record: WikiDocRecord = { - repo, - pageName, - extension, - contentHash, - updatedAt: now, - }; - - await storeStub.fetch( - new Request("http://store/upsert-wiki-doc", { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify(record), - }), - ); - - return { embedded: true, skippedUnchanged: false, metadataUpdated: false, failed: false }; - } catch (err) { - console.error( - `Failed to embed wiki doc ${repo}/${pageName}:`, - err instanceof Error ? err.message : String(err), - ); - return { embedded: false, skippedUnchanged: false, metadataUpdated: false, failed: true }; - } -} - -// ── Commit diff surface ────────────────────────────────────── - -/** GitHub API commit detail response — subset needed for diff indexing */ -export interface GitHubCommitDetail { - sha: string; - commit: { - message: string; - author?: { name?: string | null; email?: string | null; date?: string | null } | null; - committer?: { date?: string | null } | null; - }; - author?: { login?: string | null } | null; - files?: Array<{ - filename: string; - status: string; - patch?: string; - sha?: string; - previous_filename?: string; - }>; -} - -/** Result of a commit diff batch upsert */ -export interface DiffUpsertResult { - /** Number of file-in-commit entries successfully embedded and upserted */ - embedded: number; - /** Number of file-in-commit entries skipped (no patch available, e.g., binary) */ - skipped: number; - /** Number of file-in-commit entries that failed to embed/upsert */ - failed: number; - /** Number of Workers AI batch calls issued (for observability) */ - batches: number; -} - -/** - * Fetch a single commit with per-file patches via the GitHub REST API. - * Returns the commit detail including `files[]` with inline `patch` fields. - * Throws on non-2xx responses. Shared between webhook (new-commit path) and - * poller (historical backfill path). - */ -export async function fetchCommitDetail( - repo: string, - sha: string, - token: string, -): Promise { - const url = `https://api.github.com/repos/${repo}/commits/${sha}`; - - const resp = await fetch(url, { - headers: { - Authorization: `Bearer ${token}`, - Accept: "application/vnd.github+json", - "X-GitHub-Api-Version": "2022-11-28", - "User-Agent": "github-rag-mcp/0.1.0", - }, - cache: "no-store", - } as RequestInit); - - if (!resp.ok) { - const text = await resp.text(); - throw new Error( - `GitHub Commits API error ${resp.status} for ${repo}@${sha}: ${text}`, - ); - } - - return (await resp.json()) as GitHubCommitDetail; -} - -/** - * Normalise GitHub's file status string to our DiffFileStatus union. - * Unknown values fall through to "changed" (the generic GitHub bucket). - */ -function normaliseFileStatus(status: string): DiffFileStatus { - switch (status) { - case "added": - case "modified": - case "removed": - case "renamed": - case "copied": - case "changed": - case "unchanged": - return status; - default: - return "changed"; - } -} - -/** - * Build the embedding input for a single file-in-commit. - * Format: "{commitMessage}\n\n{filePath}\n\n{patch}", truncated to MAX_EMBEDDING_INPUT_CHARS. - * The file path is included inline so semantic search can match against it - * even when the patch body alone does not mention it. - */ -export function prepareDiffEmbeddingInput( - commitMessage: string, - filePath: string, - patch: string, -): string { - const text = `${commitMessage}\n\n${filePath}\n\n${patch}`; - if (text.length <= MAX_EMBEDDING_INPUT_CHARS) return text; - return text.slice(0, MAX_EMBEDDING_INPUT_CHARS); -} - -/** - * Process and upsert a commit's per-file diffs: one vector per (commit × file). - * - * Flow: - * 1. Filter `files[]` to those with a textual `patch` (binary / oversized files are skipped). - * 2. Build embedding inputs = commit message + file path + patch, truncated. - * 3. Batch-embed inputs via Workers AI (chunked by MAX_EMBEDDING_BATCH_SIZE). - * 4. Upsert all vectors into Vectorize in the same chunks. - * 5. Record DiffRecord rows into the Durable Object store for each indexed file. - * - * Failures inside a chunk do not halt subsequent chunks — counts are tallied and - * returned so the caller can log/escalate without losing partial progress. - * - * @param env - Worker env bindings (AI, VECTORIZE) - * @param storeStub - Durable Object stub for IssueStore - * @param repo - Repository in "owner/repo" format - * @param commit - Commit detail from GitHub (from GET /repos/{repo}/commits/{sha}) - * @returns Summary of embeddings/upserts produced - */ -export async function processAndUpsertCommitDiff( - env: Env, - storeStub: DurableObjectStub, - repo: string, - commit: GitHubCommitDetail, -): Promise { - const commitSha = commit.sha; - const commitMessage = commit.commit.message ?? ""; - const commitDate = - commit.commit.author?.date ?? - commit.commit.committer?.date ?? - new Date().toISOString(); - const commitAuthor = - commit.author?.login ?? commit.commit.author?.name ?? ""; - const files = commit.files ?? []; - const now = new Date().toISOString(); - - // Keep only files with a textual patch. Binary blobs, submodule changes, and - // oversized diffs arrive without a patch field and cannot be embedded. - const indexable = files.filter( - (f): f is typeof f & { patch: string } => - typeof f.patch === "string" && f.patch.length > 0, - ); - const skipped = files.length - indexable.length; - - if (indexable.length === 0) { - return { embedded: 0, skipped, failed: 0, batches: 0 }; - } - - let embedded = 0; - let failed = 0; - let batches = 0; - - // Chunk to respect Workers AI / Vectorize batch limits. - for (let offset = 0; offset < indexable.length; offset += MAX_EMBEDDING_BATCH_SIZE) { - const chunk = indexable.slice(offset, offset + MAX_EMBEDDING_BATCH_SIZE); - batches++; - - const inputs = chunk.map((f) => - prepareDiffEmbeddingInput(commitMessage, f.filename, f.patch), - ); - - let embeddings: number[][]; - try { - embeddings = await generateEmbeddingBatch(env.AI, inputs); - } catch (err) { - console.error( - `Failed to batch-embed diffs for ${repo}@${commitSha} chunk offset ${offset}:`, - err instanceof Error ? err.message : String(err), - ); - failed += chunk.length; - continue; - } - - // Vector IDs are async (SHA-256 digest). Generate them in parallel so the - // chunk still maps to a single Vectorize.upsert call below. - const vectors = await Promise.all( - chunk.map(async (f, i) => { - const fileStatus = normaliseFileStatus(f.status); - const blobShaAfter = f.sha ?? ""; - // GitHub's files API does not return the previous blob SHA directly — - // we leave it empty for now. blob_sha_after is enough to locate the - // post-commit object; history lookup can use the commit SHA itself. - const blobShaBefore = ""; - - const metadata: Record = { - repo, - number: 0, - type: "diff", - state: "active", - labels: "", - milestone: "", - assignees: "", - updated_at: commitDate, - commit_sha: commitSha, - file_path: f.filename, - file_status: fileStatus, - commit_date: commitDate, - commit_author: commitAuthor, - blob_sha_before: blobShaBefore, - blob_sha_after: blobShaAfter, - }; - - return { - id: await diffVectorId(repo, commitSha, f.filename), - values: embeddings[i], - metadata, - }; - }), - ); - - try { - await env.VECTORIZE.upsert(vectors); - } catch (err) { - console.error( - `Failed to upsert diff vectors for ${repo}@${commitSha} chunk offset ${offset}:`, - err instanceof Error ? err.message : String(err), - ); - failed += chunk.length; - continue; - } - - // Mirror each diff into D1 FTS5 (trigram tokenizer via tokenizer_kind='code'). - // Failures are logged but do not invalidate the successful Vectorize upsert — - // the dense side still surfaces the vector, and the next reindex can catch up. - for (let i = 0; i < chunk.length; i++) { - const f = chunk[i]; - const v = vectors[i]; - try { - await upsertFtsRow(env.DB_FTS, { - vectorId: v.id, - repo, - type: "diff", - state: "active", - labels: "", - milestone: "", - assignees: "", - updatedAt: commitDate, - commitSha, - filePath: f.filename, - fileStatus: normaliseFileStatus(f.status), - commitDate, - commitAuthor, - content: inputs[i], - }); - } catch (ftsErr) { - // Keep the high-level line for log searchability, then surface the underlying - // D1 error shape on a second line so the next cron run produces actionable - // context (error name, vector_id, content/path sizes). See #135. - console.error( - `Failed to upsert FTS5 row for diff ${repo}@${commitSha}/${f.filename}:`, - ftsErr instanceof Error ? ftsErr.message : String(ftsErr), - ); - console.error( - `FTS5 diff upsert detail (#135):`, - JSON.stringify({ - errorName: ftsErr instanceof Error ? ftsErr.name : typeof ftsErr, - vectorId: v.id, - tokenizerKind: "code", - contentChars: inputs[i].length, - filePathChars: f.filename.length, - fileStatus: normaliseFileStatus(f.status), - commitSha, - repo, - }), - ); - } - } - - // Record store rows for each successfully upserted file. We issue these - // sequentially (the DO is single-threaded per stub anyway) and swallow - // individual failures — the Vectorize upsert has already landed, so the - // search surface is correct even if a store insert is lost. - for (let i = 0; i < chunk.length; i++) { - const f = chunk[i]; - const fileStatus = normaliseFileStatus(f.status); - const record: DiffRecord = { - repo, - commitSha, - filePath: f.filename, - fileStatus, - commitDate, - commitAuthor, - blobShaBefore: null, - blobShaAfter: f.sha ?? null, - indexedAt: now, - }; - - try { - await storeStub.fetch( - new Request("http://store/upsert-diff", { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify(record), - }), - ); - } catch (err) { - console.error( - `Failed to record diff row for ${repo}@${commitSha}/${f.filename}:`, - err instanceof Error ? err.message : String(err), - ); - // Do not count as failed: Vectorize already has the vector. - } - } - - embedded += chunk.length; - } - - return { embedded, skipped, failed, batches }; -} - -// ── Comment / review ingest surface ────────────────────────── - -/** GitHub API issue/PR comment shape (subset we need) */ -export interface GitHubCommentData { - id: number; - body: string | null; - user: { login: string } | null; - created_at: string; - updated_at: string; -} - -/** GitHub API PR review shape (subset we need) */ -export interface GitHubPRReviewData { - id: number; - body: string | null; - user: { login: string } | null; - state: string; - submitted_at: string | null; -} - -/** GitHub API PR inline review comment shape (subset we need) */ -export interface GitHubPRReviewCommentData { - id: number; - body: string | null; - user: { login: string } | null; - path: string | null; - line: number | null; - original_line?: number | null; - commit_id: string | null; - created_at: string; - updated_at: string; -} - -/** - * Build the embedding input for a comment / review body. - * Format: "{author}\n\n{body}", truncated to MAX_EMBEDDING_INPUT_CHARS. - * The author prefix supplies speaker context so the dense embedding can - * distinguish the same body authored by different reviewers. - */ -export function prepareCommentEmbeddingInput( - author: string, - body: string, -): string { - const text = `${author}\n\n${body}`; - if (text.length <= MAX_EMBEDDING_INPUT_CHARS) return text; - return text.slice(0, MAX_EMBEDDING_INPUT_CHARS); -} - -/** Result of a comment / review ingest operation */ -export interface CommentUpsertResult { - embedded: boolean; - skippedUnchanged: boolean; - /** True when the item was filtered out (bot author or body too short). */ - filtered: boolean; - failed: boolean; -} - -/** - * Process and upsert a single issue/PR top-level comment. - * - * Flow mirrors processAndUpsertIssue: bot / short-body filter, hash-based - * change detection, embedding, Vectorize upsert, FTS5 upsert, IssueStore record. - */ -export async function ingestIssueComment( - env: Env, - storeStub: DurableObjectStub, - repo: string, - parentNumber: number, - comment: GitHubCommentData, -): Promise { - const author = comment.user?.login ?? ""; - const body = comment.body ?? ""; - - if (isBotSender(author) || isBodyTooShort(body)) { - return { embedded: false, skippedUnchanged: false, filtered: true, failed: false }; - } - - const bodyHash = await computeBodyHash(author, body); - - // Change detection: compare stored hash - const existingResp = await storeStub.fetch( - new Request( - `http://store/comment?repo=${encodeURIComponent(repo)}&comment_id=${comment.id}`, - ), - ); - if (existingResp.ok) { - const existing = (await existingResp.json()) as IssueCommentRecord; - if (existing.bodyHash === bodyHash) { - return { embedded: false, skippedUnchanged: true, filtered: false, failed: false }; - } - } - - const embeddingInput = prepareCommentEmbeddingInput(author, body); - - let embeddingSucceeded = false; - try { - const embedding = await generateEmbedding(env.AI, embeddingInput); - - const metadata: Record = { - repo, - number: parentNumber, - type: "issue_comment", - state: "active", - labels: "", - milestone: "", - assignees: "", - updated_at: comment.updated_at, - author, - comment_id: comment.id, - }; - - const vid = await issueCommentVectorId(repo, comment.id); - await env.VECTORIZE.upsert([{ id: vid, values: embedding, metadata }]); - - try { - await upsertFtsRow(env.DB_FTS, { - vectorId: vid, - repo, - type: "issue_comment", - state: "active", - labels: "", - milestone: "", - assignees: "", - updatedAt: comment.updated_at, - number: parentNumber, - content: embeddingInput, - }); - } catch (ftsErr) { - console.error( - `Failed to upsert FTS5 row for comment ${repo}#${comment.id}:`, - ftsErr instanceof Error ? ftsErr.message : String(ftsErr), - ); - } - - embeddingSucceeded = true; - } catch (err) { - console.error( - `Failed to embed comment ${repo}#${comment.id}:`, - err instanceof Error ? err.message : String(err), - ); - } - - const record: IssueCommentRecord = { - repo, - commentId: comment.id, - number: parentNumber, - author, - bodyHash: embeddingSucceeded ? bodyHash : "", - createdAt: comment.created_at, - updatedAt: comment.updated_at, - }; - - await storeStub.fetch( - new Request("http://store/upsert-comment", { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify(record), - }), - ); - - return { - embedded: embeddingSucceeded, - skippedUnchanged: false, - filtered: false, - failed: !embeddingSucceeded, - }; -} - -/** - * Process and upsert a single PR review (approve / request_changes / comment body). * - * Reviews without a body (approve-only, no prose) pass the min-length - * filter and are skipped. Reviews with meaningful prose go through the - * normal embed + upsert flow. - */ -export async function ingestPRReview( - env: Env, - storeStub: DurableObjectStub, - repo: string, - parentNumber: number, - review: GitHubPRReviewData, -): Promise { - const author = review.user?.login ?? ""; - const body = review.body ?? ""; - - if (isBotSender(author) || isBodyTooShort(body)) { - return { embedded: false, skippedUnchanged: false, filtered: true, failed: false }; - } - - const bodyHash = await computeBodyHash(author + "\n\n" + review.state, body); - - const existingResp = await storeStub.fetch( - new Request( - `http://store/review?repo=${encodeURIComponent(repo)}&review_id=${review.id}`, - ), - ); - if (existingResp.ok) { - const existing = (await existingResp.json()) as PRReviewRecord; - if (existing.bodyHash === bodyHash) { - return { embedded: false, skippedUnchanged: true, filtered: false, failed: false }; - } - } - - const submittedAt = review.submitted_at ?? new Date().toISOString(); - const embeddingInput = prepareCommentEmbeddingInput(author, body); - - let embeddingSucceeded = false; - try { - const embedding = await generateEmbedding(env.AI, embeddingInput); - - const metadata: Record = { - repo, - number: parentNumber, - type: "pr_review", - // Store the GitHub review state verbatim (APPROVED / CHANGES_REQUESTED / COMMENTED ...) - state: review.state, - labels: "", - milestone: "", - assignees: "", - updated_at: submittedAt, - author, - review_id: review.id, - }; - - const vid = await prReviewVectorId(repo, review.id); - await env.VECTORIZE.upsert([{ id: vid, values: embedding, metadata }]); - - try { - await upsertFtsRow(env.DB_FTS, { - vectorId: vid, - repo, - type: "pr_review", - state: review.state, - labels: "", - milestone: "", - assignees: "", - updatedAt: submittedAt, - number: parentNumber, - content: embeddingInput, - }); - } catch (ftsErr) { - console.error( - `Failed to upsert FTS5 row for PR review ${repo}#${review.id}:`, - ftsErr instanceof Error ? ftsErr.message : String(ftsErr), - ); - } - - embeddingSucceeded = true; - } catch (err) { - console.error( - `Failed to embed PR review ${repo}#${review.id}:`, - err instanceof Error ? err.message : String(err), - ); - } - - const record: PRReviewRecord = { - repo, - reviewId: review.id, - number: parentNumber, - author, - state: review.state, - bodyHash: embeddingSucceeded ? bodyHash : "", - submittedAt, - updatedAt: submittedAt, - }; - - await storeStub.fetch( - new Request("http://store/upsert-review", { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify(record), - }), - ); - - return { - embedded: embeddingSucceeded, - skippedUnchanged: false, - filtered: false, - failed: !embeddingSucceeded, - }; -} - -/** - * Process and upsert a single PR inline review comment (per-line diff comment). - * - * Inline comments carry extra diff context: file path, line, commit SHA. - * We surface these in the Vectorize metadata and FTS5 row so query-time - * filters can narrow to a specific file or commit. - */ -export async function ingestPRReviewComment( - env: Env, - storeStub: DurableObjectStub, - repo: string, - parentNumber: number, - comment: GitHubPRReviewCommentData, -): Promise { - const author = comment.user?.login ?? ""; - const body = comment.body ?? ""; - - if (isBotSender(author) || isBodyTooShort(body)) { - return { embedded: false, skippedUnchanged: false, filtered: true, failed: false }; - } - - // Line numbers can be null (outdated) or appear on original_line only; fall back. - const line = comment.line ?? comment.original_line ?? 0; - const filePath = comment.path ?? ""; - const commitId = comment.commit_id ?? ""; - - const bodyHash = await computeBodyHash( - `${author}\n${filePath}:${line}`, - body, - ); - - const existingResp = await storeStub.fetch( - new Request( - `http://store/review-comment?repo=${encodeURIComponent(repo)}&comment_id=${comment.id}`, - ), - ); - if (existingResp.ok) { - const existing = (await existingResp.json()) as PRReviewCommentRecord; - if (existing.bodyHash === bodyHash) { - return { embedded: false, skippedUnchanged: true, filtered: false, failed: false }; - } - } - - const embeddingInput = prepareCommentEmbeddingInput(author, body); - - let embeddingSucceeded = false; - try { - const embedding = await generateEmbedding(env.AI, embeddingInput); - - const metadata: Record = { - repo, - number: parentNumber, - type: "pr_review_comment", - state: "active", - labels: "", - milestone: "", - assignees: "", - updated_at: comment.updated_at, - author, - comment_id: comment.id, - file_path: filePath, - line, - commit_sha: commitId, - }; - - const vid = await prReviewCommentVectorId(repo, comment.id); - await env.VECTORIZE.upsert([{ id: vid, values: embedding, metadata }]); - - try { - await upsertFtsRow(env.DB_FTS, { - vectorId: vid, - repo, - type: "pr_review_comment", - state: "active", - labels: "", - milestone: "", - assignees: "", - updatedAt: comment.updated_at, - number: parentNumber, - filePath, - commitSha: commitId, - content: embeddingInput, - }); - } catch (ftsErr) { - console.error( - `Failed to upsert FTS5 row for PR review comment ${repo}#${comment.id}:`, - ftsErr instanceof Error ? ftsErr.message : String(ftsErr), - ); - } - - embeddingSucceeded = true; - } catch (err) { - console.error( - `Failed to embed PR review comment ${repo}#${comment.id}:`, - err instanceof Error ? err.message : String(err), - ); - } - - const record: PRReviewCommentRecord = { - repo, - commentId: comment.id, - number: parentNumber, - author, - filePath, - line, - commitId, - bodyHash: embeddingSucceeded ? bodyHash : "", - createdAt: comment.created_at, - updatedAt: comment.updated_at, - }; - - await storeStub.fetch( - new Request("http://store/upsert-review-comment", { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify(record), - }), - ); - - return { - embedded: embeddingSucceeded, - skippedUnchanged: false, - filtered: false, - failed: !embeddingSucceeded, - }; -} + * Barrel re-export over the per-surface modules under `./pipeline/`. Consumers + * (`./poller.ts` and `./webhook.ts`) keep importing from `./pipeline.js`; the + * surface split is internal to this directory. + */ + +export * from "./pipeline/ingest-filter.js"; +export * from "./pipeline/embedding.js"; +export * from "./pipeline/hash.js"; +export * from "./pipeline/vector-id.js"; +export * from "./pipeline/types.js"; +export * from "./pipeline/embed-issue.js"; +export * from "./pipeline/embed-release.js"; +export * from "./pipeline/embed-doc.js"; +export * from "./pipeline/embed-diff.js"; +export * from "./pipeline/embed-comment.js"; diff --git a/src/pipeline/embed-comment.ts b/src/pipeline/embed-comment.ts new file mode 100644 index 0000000..a66bfb7 --- /dev/null +++ b/src/pipeline/embed-comment.ts @@ -0,0 +1,416 @@ +/** + * Comment / review ingest pipelines (issue comments, PR reviews, PR inline review comments). + * + * Each surface has its own GitHub API shape and `ingestX` function but they + * share a common skeleton: bot / short-body filter, hash-based change + * detection, embed + Vectorize upsert, FTS5 mirror write, IssueStore record. + * `CommentUpsertResult` covers all three. + */ + +import type { + Env, + IssueCommentRecord, + PRReviewRecord, + PRReviewCommentRecord, +} from "../types.js"; +import { upsertFtsRow } from "../fts.js"; +import { isBotSender, isBodyTooShort } from "./ingest-filter.js"; +import { computeBodyHash, prepareCommentEmbeddingInput } from "./hash.js"; +import { generateEmbedding } from "./embedding.js"; +import { + issueCommentVectorId, + prReviewVectorId, + prReviewCommentVectorId, +} from "./vector-id.js"; + +/** GitHub API issue/PR comment shape (subset we need) */ +export interface GitHubCommentData { + id: number; + body: string | null; + user: { login: string } | null; + created_at: string; + updated_at: string; +} + +/** GitHub API PR review shape (subset we need) */ +export interface GitHubPRReviewData { + id: number; + body: string | null; + user: { login: string } | null; + state: string; + submitted_at: string | null; +} + +/** GitHub API PR inline review comment shape (subset we need) */ +export interface GitHubPRReviewCommentData { + id: number; + body: string | null; + user: { login: string } | null; + path: string | null; + line: number | null; + original_line?: number | null; + commit_id: string | null; + created_at: string; + updated_at: string; +} + +/** Result of a comment / review ingest operation */ +export interface CommentUpsertResult { + embedded: boolean; + skippedUnchanged: boolean; + /** True when the item was filtered out (bot author or body too short). */ + filtered: boolean; + failed: boolean; +} + +/** + * Process and upsert a single issue/PR top-level comment. + * + * Flow mirrors processAndUpsertIssue: bot / short-body filter, hash-based + * change detection, embedding, Vectorize upsert, FTS5 upsert, IssueStore record. + */ +export async function ingestIssueComment( + env: Env, + storeStub: DurableObjectStub, + repo: string, + parentNumber: number, + comment: GitHubCommentData, +): Promise { + const author = comment.user?.login ?? ""; + const body = comment.body ?? ""; + + if (isBotSender(author) || isBodyTooShort(body)) { + return { embedded: false, skippedUnchanged: false, filtered: true, failed: false }; + } + + const bodyHash = await computeBodyHash(author, body); + + // Change detection: compare stored hash + const existingResp = await storeStub.fetch( + new Request( + `http://store/comment?repo=${encodeURIComponent(repo)}&comment_id=${comment.id}`, + ), + ); + if (existingResp.ok) { + const existing = (await existingResp.json()) as IssueCommentRecord; + if (existing.bodyHash === bodyHash) { + return { embedded: false, skippedUnchanged: true, filtered: false, failed: false }; + } + } + + const embeddingInput = prepareCommentEmbeddingInput(author, body); + + let embeddingSucceeded = false; + try { + const embedding = await generateEmbedding(env.AI, embeddingInput); + + const metadata: Record = { + repo, + number: parentNumber, + type: "issue_comment", + state: "active", + labels: "", + milestone: "", + assignees: "", + updated_at: comment.updated_at, + author, + comment_id: comment.id, + }; + + const vid = await issueCommentVectorId(repo, comment.id); + await env.VECTORIZE.upsert([{ id: vid, values: embedding, metadata }]); + + try { + await upsertFtsRow(env.DB_FTS, { + vectorId: vid, + repo, + type: "issue_comment", + state: "active", + labels: "", + milestone: "", + assignees: "", + updatedAt: comment.updated_at, + number: parentNumber, + content: embeddingInput, + }); + } catch (ftsErr) { + console.error( + `Failed to upsert FTS5 row for comment ${repo}#${comment.id}:`, + ftsErr instanceof Error ? ftsErr.message : String(ftsErr), + ); + } + + embeddingSucceeded = true; + } catch (err) { + console.error( + `Failed to embed comment ${repo}#${comment.id}:`, + err instanceof Error ? err.message : String(err), + ); + } + + const record: IssueCommentRecord = { + repo, + commentId: comment.id, + number: parentNumber, + author, + bodyHash: embeddingSucceeded ? bodyHash : "", + createdAt: comment.created_at, + updatedAt: comment.updated_at, + }; + + await storeStub.fetch( + new Request("http://store/upsert-comment", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(record), + }), + ); + + return { + embedded: embeddingSucceeded, + skippedUnchanged: false, + filtered: false, + failed: !embeddingSucceeded, + }; +} + +/** + * Process and upsert a single PR review (approve / request_changes / comment body). + * + * Reviews without a body (approve-only, no prose) pass the min-length + * filter and are skipped. Reviews with meaningful prose go through the + * normal embed + upsert flow. + */ +export async function ingestPRReview( + env: Env, + storeStub: DurableObjectStub, + repo: string, + parentNumber: number, + review: GitHubPRReviewData, +): Promise { + const author = review.user?.login ?? ""; + const body = review.body ?? ""; + + if (isBotSender(author) || isBodyTooShort(body)) { + return { embedded: false, skippedUnchanged: false, filtered: true, failed: false }; + } + + const bodyHash = await computeBodyHash(author + "\n\n" + review.state, body); + + const existingResp = await storeStub.fetch( + new Request( + `http://store/review?repo=${encodeURIComponent(repo)}&review_id=${review.id}`, + ), + ); + if (existingResp.ok) { + const existing = (await existingResp.json()) as PRReviewRecord; + if (existing.bodyHash === bodyHash) { + return { embedded: false, skippedUnchanged: true, filtered: false, failed: false }; + } + } + + const submittedAt = review.submitted_at ?? new Date().toISOString(); + const embeddingInput = prepareCommentEmbeddingInput(author, body); + + let embeddingSucceeded = false; + try { + const embedding = await generateEmbedding(env.AI, embeddingInput); + + const metadata: Record = { + repo, + number: parentNumber, + type: "pr_review", + // Store the GitHub review state verbatim (APPROVED / CHANGES_REQUESTED / COMMENTED ...) + state: review.state, + labels: "", + milestone: "", + assignees: "", + updated_at: submittedAt, + author, + review_id: review.id, + }; + + const vid = await prReviewVectorId(repo, review.id); + await env.VECTORIZE.upsert([{ id: vid, values: embedding, metadata }]); + + try { + await upsertFtsRow(env.DB_FTS, { + vectorId: vid, + repo, + type: "pr_review", + state: review.state, + labels: "", + milestone: "", + assignees: "", + updatedAt: submittedAt, + number: parentNumber, + content: embeddingInput, + }); + } catch (ftsErr) { + console.error( + `Failed to upsert FTS5 row for PR review ${repo}#${review.id}:`, + ftsErr instanceof Error ? ftsErr.message : String(ftsErr), + ); + } + + embeddingSucceeded = true; + } catch (err) { + console.error( + `Failed to embed PR review ${repo}#${review.id}:`, + err instanceof Error ? err.message : String(err), + ); + } + + const record: PRReviewRecord = { + repo, + reviewId: review.id, + number: parentNumber, + author, + state: review.state, + bodyHash: embeddingSucceeded ? bodyHash : "", + submittedAt, + updatedAt: submittedAt, + }; + + await storeStub.fetch( + new Request("http://store/upsert-review", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(record), + }), + ); + + return { + embedded: embeddingSucceeded, + skippedUnchanged: false, + filtered: false, + failed: !embeddingSucceeded, + }; +} + +/** + * Process and upsert a single PR inline review comment (per-line diff comment). + * + * Inline comments carry extra diff context: file path, line, commit SHA. + * We surface these in the Vectorize metadata and FTS5 row so query-time + * filters can narrow to a specific file or commit. + */ +export async function ingestPRReviewComment( + env: Env, + storeStub: DurableObjectStub, + repo: string, + parentNumber: number, + comment: GitHubPRReviewCommentData, +): Promise { + const author = comment.user?.login ?? ""; + const body = comment.body ?? ""; + + if (isBotSender(author) || isBodyTooShort(body)) { + return { embedded: false, skippedUnchanged: false, filtered: true, failed: false }; + } + + // Line numbers can be null (outdated) or appear on original_line only; fall back. + const line = comment.line ?? comment.original_line ?? 0; + const filePath = comment.path ?? ""; + const commitId = comment.commit_id ?? ""; + + const bodyHash = await computeBodyHash( + `${author}\n${filePath}:${line}`, + body, + ); + + const existingResp = await storeStub.fetch( + new Request( + `http://store/review-comment?repo=${encodeURIComponent(repo)}&comment_id=${comment.id}`, + ), + ); + if (existingResp.ok) { + const existing = (await existingResp.json()) as PRReviewCommentRecord; + if (existing.bodyHash === bodyHash) { + return { embedded: false, skippedUnchanged: true, filtered: false, failed: false }; + } + } + + const embeddingInput = prepareCommentEmbeddingInput(author, body); + + let embeddingSucceeded = false; + try { + const embedding = await generateEmbedding(env.AI, embeddingInput); + + const metadata: Record = { + repo, + number: parentNumber, + type: "pr_review_comment", + state: "active", + labels: "", + milestone: "", + assignees: "", + updated_at: comment.updated_at, + author, + comment_id: comment.id, + file_path: filePath, + line, + commit_sha: commitId, + }; + + const vid = await prReviewCommentVectorId(repo, comment.id); + await env.VECTORIZE.upsert([{ id: vid, values: embedding, metadata }]); + + try { + await upsertFtsRow(env.DB_FTS, { + vectorId: vid, + repo, + type: "pr_review_comment", + state: "active", + labels: "", + milestone: "", + assignees: "", + updatedAt: comment.updated_at, + number: parentNumber, + filePath, + commitSha: commitId, + content: embeddingInput, + }); + } catch (ftsErr) { + console.error( + `Failed to upsert FTS5 row for PR review comment ${repo}#${comment.id}:`, + ftsErr instanceof Error ? ftsErr.message : String(ftsErr), + ); + } + + embeddingSucceeded = true; + } catch (err) { + console.error( + `Failed to embed PR review comment ${repo}#${comment.id}:`, + err instanceof Error ? err.message : String(err), + ); + } + + const record: PRReviewCommentRecord = { + repo, + commentId: comment.id, + number: parentNumber, + author, + filePath, + line, + commitId, + bodyHash: embeddingSucceeded ? bodyHash : "", + createdAt: comment.created_at, + updatedAt: comment.updated_at, + }; + + await storeStub.fetch( + new Request("http://store/upsert-review-comment", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(record), + }), + ); + + return { + embedded: embeddingSucceeded, + skippedUnchanged: false, + filtered: false, + failed: !embeddingSucceeded, + }; +} diff --git a/src/pipeline/embed-diff.ts b/src/pipeline/embed-diff.ts new file mode 100644 index 0000000..dc3bc3c --- /dev/null +++ b/src/pipeline/embed-diff.ts @@ -0,0 +1,309 @@ +/** + * Commit diff embedding + upsert pipeline. + * + * Owns the GitHub commit-detail shape, the `fetchCommitDetail` REST helper, + * the file-status normaliser, and `processAndUpsertCommitDiff` which produces + * one vector per (commit × file) using Workers AI batch embed under the "c" + * prefix. Failures inside one chunk do not halt subsequent chunks. + */ + +import type { Env, DiffRecord, DiffFileStatus } from "../types.js"; +import { upsertFtsRow } from "../fts.js"; +import { prepareDiffEmbeddingInput } from "./hash.js"; +import { + generateEmbeddingBatch, + MAX_EMBEDDING_BATCH_SIZE, +} from "./embedding.js"; +import { diffVectorId } from "./vector-id.js"; + +/** GitHub API commit detail response — subset needed for diff indexing */ +export interface GitHubCommitDetail { + sha: string; + commit: { + message: string; + author?: { name?: string | null; email?: string | null; date?: string | null } | null; + committer?: { date?: string | null } | null; + }; + author?: { login?: string | null } | null; + files?: Array<{ + filename: string; + status: string; + patch?: string; + sha?: string; + previous_filename?: string; + }>; +} + +/** Result of a commit diff batch upsert */ +export interface DiffUpsertResult { + /** Number of file-in-commit entries successfully embedded and upserted */ + embedded: number; + /** Number of file-in-commit entries skipped (no patch available, e.g., binary) */ + skipped: number; + /** Number of file-in-commit entries that failed to embed/upsert */ + failed: number; + /** Number of Workers AI batch calls issued (for observability) */ + batches: number; +} + +/** + * Fetch a single commit with per-file patches via the GitHub REST API. + * Returns the commit detail including `files[]` with inline `patch` fields. + * Throws on non-2xx responses. Shared between webhook (new-commit path) and + * poller (historical backfill path). + */ +export async function fetchCommitDetail( + repo: string, + sha: string, + token: string, +): Promise { + const url = `https://api.github.com/repos/${repo}/commits/${sha}`; + + const resp = await fetch(url, { + headers: { + Authorization: `Bearer ${token}`, + Accept: "application/vnd.github+json", + "X-GitHub-Api-Version": "2022-11-28", + "User-Agent": "github-rag-mcp/0.1.0", + }, + cache: "no-store", + } as RequestInit); + + if (!resp.ok) { + const text = await resp.text(); + throw new Error( + `GitHub Commits API error ${resp.status} for ${repo}@${sha}: ${text}`, + ); + } + + return (await resp.json()) as GitHubCommitDetail; +} + +/** + * Normalise GitHub's file status string to our DiffFileStatus union. + * Unknown values fall through to "changed" (the generic GitHub bucket). + */ +function normaliseFileStatus(status: string): DiffFileStatus { + switch (status) { + case "added": + case "modified": + case "removed": + case "renamed": + case "copied": + case "changed": + case "unchanged": + return status; + default: + return "changed"; + } +} + +/** + * Process and upsert a commit's per-file diffs: one vector per (commit × file). + * + * Flow: + * 1. Filter `files[]` to those with a textual `patch` (binary / oversized files are skipped). + * 2. Build embedding inputs = commit message + file path + patch, truncated. + * 3. Batch-embed inputs via Workers AI (chunked by MAX_EMBEDDING_BATCH_SIZE). + * 4. Upsert all vectors into Vectorize in the same chunks. + * 5. Record DiffRecord rows into the Durable Object store for each indexed file. + * + * Failures inside a chunk do not halt subsequent chunks — counts are tallied and + * returned so the caller can log/escalate without losing partial progress. + * + * @param env - Worker env bindings (AI, VECTORIZE) + * @param storeStub - Durable Object stub for IssueStore + * @param repo - Repository in "owner/repo" format + * @param commit - Commit detail from GitHub (from GET /repos/{repo}/commits/{sha}) + * @returns Summary of embeddings/upserts produced + */ +export async function processAndUpsertCommitDiff( + env: Env, + storeStub: DurableObjectStub, + repo: string, + commit: GitHubCommitDetail, +): Promise { + const commitSha = commit.sha; + const commitMessage = commit.commit.message ?? ""; + const commitDate = + commit.commit.author?.date ?? + commit.commit.committer?.date ?? + new Date().toISOString(); + const commitAuthor = + commit.author?.login ?? commit.commit.author?.name ?? ""; + const files = commit.files ?? []; + const now = new Date().toISOString(); + + // Keep only files with a textual patch. Binary blobs, submodule changes, and + // oversized diffs arrive without a patch field and cannot be embedded. + const indexable = files.filter( + (f): f is typeof f & { patch: string } => + typeof f.patch === "string" && f.patch.length > 0, + ); + const skipped = files.length - indexable.length; + + if (indexable.length === 0) { + return { embedded: 0, skipped, failed: 0, batches: 0 }; + } + + let embedded = 0; + let failed = 0; + let batches = 0; + + // Chunk to respect Workers AI / Vectorize batch limits. + for (let offset = 0; offset < indexable.length; offset += MAX_EMBEDDING_BATCH_SIZE) { + const chunk = indexable.slice(offset, offset + MAX_EMBEDDING_BATCH_SIZE); + batches++; + + const inputs = chunk.map((f) => + prepareDiffEmbeddingInput(commitMessage, f.filename, f.patch), + ); + + let embeddings: number[][]; + try { + embeddings = await generateEmbeddingBatch(env.AI, inputs); + } catch (err) { + console.error( + `Failed to batch-embed diffs for ${repo}@${commitSha} chunk offset ${offset}:`, + err instanceof Error ? err.message : String(err), + ); + failed += chunk.length; + continue; + } + + // Vector IDs are async (SHA-256 digest). Generate them in parallel so the + // chunk still maps to a single Vectorize.upsert call below. + const vectors = await Promise.all( + chunk.map(async (f, i) => { + const fileStatus = normaliseFileStatus(f.status); + const blobShaAfter = f.sha ?? ""; + // GitHub's files API does not return the previous blob SHA directly — + // we leave it empty for now. blob_sha_after is enough to locate the + // post-commit object; history lookup can use the commit SHA itself. + const blobShaBefore = ""; + + const metadata: Record = { + repo, + number: 0, + type: "diff", + state: "active", + labels: "", + milestone: "", + assignees: "", + updated_at: commitDate, + commit_sha: commitSha, + file_path: f.filename, + file_status: fileStatus, + commit_date: commitDate, + commit_author: commitAuthor, + blob_sha_before: blobShaBefore, + blob_sha_after: blobShaAfter, + }; + + return { + id: await diffVectorId(repo, commitSha, f.filename), + values: embeddings[i], + metadata, + }; + }), + ); + + try { + await env.VECTORIZE.upsert(vectors); + } catch (err) { + console.error( + `Failed to upsert diff vectors for ${repo}@${commitSha} chunk offset ${offset}:`, + err instanceof Error ? err.message : String(err), + ); + failed += chunk.length; + continue; + } + + // Mirror each diff into D1 FTS5 (trigram tokenizer via tokenizer_kind='code'). + // Failures are logged but do not invalidate the successful Vectorize upsert — + // the dense side still surfaces the vector, and the next reindex can catch up. + for (let i = 0; i < chunk.length; i++) { + const f = chunk[i]; + const v = vectors[i]; + try { + await upsertFtsRow(env.DB_FTS, { + vectorId: v.id, + repo, + type: "diff", + state: "active", + labels: "", + milestone: "", + assignees: "", + updatedAt: commitDate, + commitSha, + filePath: f.filename, + fileStatus: normaliseFileStatus(f.status), + commitDate, + commitAuthor, + content: inputs[i], + }); + } catch (ftsErr) { + // Keep the high-level line for log searchability, then surface the underlying + // D1 error shape on a second line so the next cron run produces actionable + // context (error name, vector_id, content/path sizes). See #135. + console.error( + `Failed to upsert FTS5 row for diff ${repo}@${commitSha}/${f.filename}:`, + ftsErr instanceof Error ? ftsErr.message : String(ftsErr), + ); + console.error( + `FTS5 diff upsert detail (#135):`, + JSON.stringify({ + errorName: ftsErr instanceof Error ? ftsErr.name : typeof ftsErr, + vectorId: v.id, + tokenizerKind: "code", + contentChars: inputs[i].length, + filePathChars: f.filename.length, + fileStatus: normaliseFileStatus(f.status), + commitSha, + repo, + }), + ); + } + } + + // Record store rows for each successfully upserted file. We issue these + // sequentially (the DO is single-threaded per stub anyway) and swallow + // individual failures — the Vectorize upsert has already landed, so the + // search surface is correct even if a store insert is lost. + for (let i = 0; i < chunk.length; i++) { + const f = chunk[i]; + const fileStatus = normaliseFileStatus(f.status); + const record: DiffRecord = { + repo, + commitSha, + filePath: f.filename, + fileStatus, + commitDate, + commitAuthor, + blobShaBefore: null, + blobShaAfter: f.sha ?? null, + indexedAt: now, + }; + + try { + await storeStub.fetch( + new Request("http://store/upsert-diff", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(record), + }), + ); + } catch (err) { + console.error( + `Failed to record diff row for ${repo}@${commitSha}/${f.filename}:`, + err instanceof Error ? err.message : String(err), + ); + // Do not count as failed: Vectorize already has the vector. + } + } + + embedded += chunk.length; + } + + return { embedded, skipped, failed, batches }; +} diff --git a/src/pipeline/embed-doc.ts b/src/pipeline/embed-doc.ts new file mode 100644 index 0000000..ac1b193 --- /dev/null +++ b/src/pipeline/embed-doc.ts @@ -0,0 +1,219 @@ +/** + * Doc + wiki page embedding + upsert pipelines. + * + * Hosts both `processAndUpsertDoc` (repo docs, blob-SHA change detection done + * by caller) and `processAndUpsertWikiDoc` (wiki pages, SHA-256 content hash + * computed in-flow). Both surfaces share the same metadata shape and FTS5 + * column reuse pattern. + */ + +import type { Env, DocRecord, WikiDocRecord } from "../types.js"; +import { upsertFtsRow } from "../fts.js"; +import { prepareEmbeddingInput, sha256Hex } from "./hash.js"; +import { generateEmbedding } from "./embedding.js"; +import { docVectorId, wikiDocVectorId } from "./vector-id.js"; +import type { UpsertResult } from "./types.js"; + +/** + * Process and upsert a single doc: embed content and upsert to Vectorize + Store. + * + * Unlike issues/releases, docs use blob SHA for change detection (handled by the caller). + * This function always generates an embedding — the caller is responsible for determining + * whether the doc content has changed. + * + * @param env - Worker env bindings (AI, VECTORIZE) + * @param storeStub - Durable Object stub for IssueStore + * @param repo - Repository in "owner/repo" format + * @param path - File path within the repo (e.g. "docs/0-requirements.md") + * @param content - Decoded file content + * @param blobSha - Git blob SHA for this version of the file + * @returns UpsertResult indicating what happened + */ +export async function processAndUpsertDoc( + env: Env, + storeStub: DurableObjectStub, + repo: string, + path: string, + content: string, + blobSha: string, +): Promise { + const now = new Date().toISOString(); + + try { + // Generate embedding (use path as title, content as body) + const embeddingInput = prepareEmbeddingInput(path, content); + const embedding = await generateEmbedding(env.AI, embeddingInput); + + const metadata: Record = { + repo, + number: 0, + type: "doc", + state: "active", + labels: "", + milestone: "", + assignees: "", + updated_at: now, + doc_path: path, + }; + + // Upsert vector into Vectorize + const dvid = await docVectorId(repo, path); + await env.VECTORIZE.upsert([ + { + id: dvid, + values: embedding, + metadata, + }, + ]); + + // Mirror into D1 FTS5 sparse index. + try { + await upsertFtsRow(env.DB_FTS, { + vectorId: dvid, + repo, + type: "doc", + state: "active", + labels: "", + milestone: "", + assignees: "", + updatedAt: now, + docPath: path, + content: embeddingInput, + }); + } catch (ftsErr) { + console.error( + `Failed to upsert FTS5 row for doc ${repo}/${path}:`, + ftsErr instanceof Error ? ftsErr.message : String(ftsErr), + ); + } + + // Upsert doc record into store + const record: DocRecord = { + repo, + path, + blobSha, + updatedAt: now, + }; + + await storeStub.fetch( + new Request("http://store/upsert-doc", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(record), + }), + ); + + return { embedded: true, skippedUnchanged: false, metadataUpdated: false, failed: false }; + } catch (err) { + console.error( + `Failed to embed doc ${repo}/${path}:`, + err instanceof Error ? err.message : String(err), + ); + return { embedded: false, skippedUnchanged: false, metadataUpdated: false, failed: true }; + } +} + +// ── Wiki doc surface ───────────────────────────────────────── + +/** + * Embed and upsert a single wiki page. + * + * Mirrors `processAndUpsertDoc` but writes vector / FTS / store records under + * the `wiki_doc` type. Vector ID prefix `"w:"` keeps wiki rows in their own + * namespace so they never collide with repo doc rows even when page name and + * doc path coincide. + * + * @param env Worker env bindings (AI, VECTORIZE, DB_FTS) + * @param storeStub Durable Object stub for IssueStore + * @param repo Repository in "owner/repo" format (the wiki belongs to {repo}.wiki) + * @param pageName GitHub Wiki page slug (dash-separated, e.g., "Home" or "Foo-Bar") + * @param extension Markup file extension that serves the page (e.g., "md", "markdown", "org") + * @param content Raw markup content fetched from raw.githubusercontent.com/wiki + * @returns UpsertResult indicating what happened + */ +export async function processAndUpsertWikiDoc( + env: Env, + storeStub: DurableObjectStub, + repo: string, + pageName: string, + extension: string, + content: string, +): Promise { + const now = new Date().toISOString(); + + try { + // Generate embedding (use page name as title surrogate, content as body) + const embeddingInput = prepareEmbeddingInput(pageName, content); + const embedding = await generateEmbedding(env.AI, embeddingInput); + + const metadata: Record = { + repo, + number: 0, + type: "wiki_doc", + state: "active", + labels: "", + milestone: "", + assignees: "", + updated_at: now, + wiki_path: pageName, + wiki_extension: extension, + }; + + const wvid = await wikiDocVectorId(repo, pageName); + await env.VECTORIZE.upsert([ + { + id: wvid, + values: embedding, + metadata, + }, + ]); + + // Mirror into D1 FTS5. We reuse the existing `doc_path` column to store + // the wiki page slug — semantically the same kind of "where did this come + // from" field, distinguished by the row's `type='wiki_doc'`. + try { + await upsertFtsRow(env.DB_FTS, { + vectorId: wvid, + repo, + type: "wiki_doc", + state: "active", + labels: "", + milestone: "", + assignees: "", + updatedAt: now, + docPath: pageName, + content: embeddingInput, + }); + } catch (ftsErr) { + console.error( + `Failed to upsert FTS5 row for wiki ${repo}/${pageName}:`, + ftsErr instanceof Error ? ftsErr.message : String(ftsErr), + ); + } + + const contentHash = await sha256Hex(content); + const record: WikiDocRecord = { + repo, + pageName, + extension, + contentHash, + updatedAt: now, + }; + + await storeStub.fetch( + new Request("http://store/upsert-wiki-doc", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(record), + }), + ); + + return { embedded: true, skippedUnchanged: false, metadataUpdated: false, failed: false }; + } catch (err) { + console.error( + `Failed to embed wiki doc ${repo}/${pageName}:`, + err instanceof Error ? err.message : String(err), + ); + return { embedded: false, skippedUnchanged: false, metadataUpdated: false, failed: true }; + } +} diff --git a/src/pipeline/embed-issue.ts b/src/pipeline/embed-issue.ts new file mode 100644 index 0000000..570da42 --- /dev/null +++ b/src/pipeline/embed-issue.ts @@ -0,0 +1,281 @@ +/** + * Issue / pull request embedding + upsert pipeline. + * + * Owns the GitHub issue / PR data shape (`GitHubIssueData`) and the + * `processAndUpsertIssue` flow: hash-based change detection, metadata-only + * Vectorize refresh when only labels / state changed, and full embed + upsert + * when the body changed. FTS5 mirror writes are best-effort. + */ + +import type { Env, IssueRecord } from "../types.js"; +import { upsertFtsRow } from "../fts.js"; +import { computeBodyHash, prepareEmbeddingInput } from "./hash.js"; +import { generateEmbedding } from "./embedding.js"; +import { vectorId } from "./vector-id.js"; +import type { UpsertResult } from "./types.js"; + +/** GitHub API issue/PR response shape (subset of fields we need) */ +export interface GitHubIssueData { + number: number; + title: string; + body: string | null; + state: "open" | "closed"; + labels: Array<{ name: string }>; + milestone: { title: string } | null; + assignees: Array<{ login: string }>; + created_at: string; + updated_at: string; + pull_request?: { url: string }; + html_url: string; +} + +/** + * Process and upsert a single issue/PR: check hash, embed if changed, upsert to Vectorize + Store. + * + * @param env - Worker env bindings (AI, VECTORIZE) + * @param storeStub - Durable Object stub for IssueStore + * @param repo - Repository in "owner/repo" format + * @param issue - GitHub issue/PR data + * @returns UpsertResult indicating what happened + */ +export async function processAndUpsertIssue( + env: Env, + storeStub: DurableObjectStub, + repo: string, + issue: GitHubIssueData, +): Promise { + const body = issue.body ?? ""; + const title = issue.title; + const bodyHash = await computeBodyHash(title, body); + + const type: IssueRecord["type"] = issue.pull_request + ? "pull_request" + : "issue"; + + // Check if body has changed by comparing hash with stored value + const existingResp = await storeStub.fetch( + new Request( + `http://store/issue?repo=${encodeURIComponent(repo)}&number=${issue.number}`, + ), + ); + + let needsEmbedding = true; + let existing: IssueRecord | null = null; + if (existingResp.ok) { + existing = (await existingResp.json()) as IssueRecord; + if (existing.bodyHash === bodyHash) { + needsEmbedding = false; + } + } + + if (!needsEmbedding) { + // Hash matched — skip embedding but update IssueStore (metadata may have changed) + const labelNames = issue.labels.map((l) => l.name); + const assigneeLogins = issue.assignees.map((a) => a.login); + const milestoneTitle = issue.milestone?.title ?? ""; + + const record: IssueRecord = { + repo, + number: issue.number, + type, + state: issue.state, + title, + labels: labelNames, + milestone: milestoneTitle, + assignees: assigneeLogins, + bodyHash, + createdAt: issue.created_at, + updatedAt: issue.updated_at, + }; + + await storeStub.fetch( + new Request("http://store/upsert", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(record), + }), + ); + + // Check if metadata changed — if so, update Vectorize metadata too + // (Vectorize state/labels/assignees must stay in sync with GitHub) + const sortedLabels = [...labelNames].sort(); + const metadataChanged = existing !== null && ( + existing.state !== issue.state || + existing.title !== title || + [...existing.labels].sort().join(",") !== sortedLabels.join(",") || + existing.milestone !== milestoneTitle || + [...existing.assignees].sort().join(",") !== [...assigneeLogins].sort().join(",") + ); + + if (metadataChanged) { + try { + // Retrieve existing vector values to re-upsert with updated metadata + const vid = await vectorId(repo, issue.number); + const vectors = await env.VECTORIZE.getByIds([vid]); + + if (vectors.length > 0 && vectors[0].values) { + const metadata: Record = { + repo, + number: issue.number, + type, + state: issue.state, + labels: sortedLabels.join(","), + milestone: milestoneTitle, + assignees: assigneeLogins.join(","), + updated_at: issue.updated_at, + label_0: sortedLabels[0] ?? "", + label_1: sortedLabels[1] ?? "", + label_2: sortedLabels[2] ?? "", + label_3: sortedLabels[3] ?? "", + assignee_0: assigneeLogins[0] ?? "", + assignee_1: assigneeLogins[1] ?? "", + }; + + await env.VECTORIZE.upsert([ + { + id: vid, + values: vectors[0].values as number[], + metadata, + }, + ]); + + // Mirror the metadata change onto D1 FTS5 so sparse retrieval stays filterable. + // Content stays the same (no body change), but labels/state/milestone etc. + // on the sparse side must match the dense side for pre-filter consistency. + try { + await upsertFtsRow(env.DB_FTS, { + vectorId: vid, + repo, + type, + state: issue.state, + labels: sortedLabels.join(","), + milestone: milestoneTitle, + assignees: assigneeLogins.join(","), + updatedAt: issue.updated_at, + number: issue.number, + content: prepareEmbeddingInput(title, issue.body), + }); + } catch (ftsErr) { + console.error( + `Failed to update FTS5 metadata for ${repo}#${issue.number}:`, + ftsErr instanceof Error ? ftsErr.message : String(ftsErr), + ); + // Non-fatal: sparse side will catch up on next body change. + } + + return { embedded: false, skippedUnchanged: false, metadataUpdated: true, failed: false }; + } + } catch (err) { + console.error( + `Failed to update Vectorize metadata for ${repo}#${issue.number}:`, + err instanceof Error ? err.message : String(err), + ); + // IssueStore was already updated — Vectorize metadata will catch up on next body change + } + } + + return { embedded: false, skippedUnchanged: true, metadataUpdated: false, failed: false }; + } + + // Content changed — generate embedding + let embeddingSucceeded = false; + try { + const embeddingInput = prepareEmbeddingInput(title, issue.body); + const embedding = await generateEmbedding(env.AI, embeddingInput); + + // Expand labels into individual metadata fields (first 4, sorted) + // for potential Vectorize pre-filtering. Sorted order ensures deterministic + // slot assignment across upserts. + const labelNames = issue.labels.map((l) => l.name).sort(); + const assigneeLogins = issue.assignees.map((a) => a.login); + + const metadata: Record = { + repo, + number: issue.number, + type, + state: issue.state, + labels: labelNames.join(","), + milestone: issue.milestone?.title ?? "", + assignees: assigneeLogins.join(","), + updated_at: issue.updated_at, + // Expanded label fields (first 4, sorted alphabetically) + label_0: labelNames[0] ?? "", + label_1: labelNames[1] ?? "", + label_2: labelNames[2] ?? "", + label_3: labelNames[3] ?? "", + // Expanded assignee fields (first 2) + assignee_0: assigneeLogins[0] ?? "", + assignee_1: assigneeLogins[1] ?? "", + }; + + const vid = await vectorId(repo, issue.number); + await env.VECTORIZE.upsert([ + { + id: vid, + values: embedding, + metadata, + }, + ]); + + // Mirror the same content into D1 FTS5 for sparse (BM25) retrieval. + // Failure here does not invalidate the Vectorize upsert — we still consider the + // embedding successful and rely on the next run to reconcile the sparse side. + try { + await upsertFtsRow(env.DB_FTS, { + vectorId: vid, + repo, + type, + state: issue.state, + labels: labelNames.join(","), + milestone: issue.milestone?.title ?? "", + assignees: assigneeLogins.join(","), + updatedAt: issue.updated_at, + number: issue.number, + content: embeddingInput, + }); + } catch (ftsErr) { + console.error( + `Failed to upsert FTS5 row for ${repo}#${issue.number}:`, + ftsErr instanceof Error ? ftsErr.message : String(ftsErr), + ); + } + + embeddingSucceeded = true; + } catch (err) { + console.error( + `Failed to embed ${repo}#${issue.number}:`, + err instanceof Error ? err.message : String(err), + ); + } + + // Store record — save bodyHash only when embedding succeeded. + // When embedding fails, store empty bodyHash so next attempt retries. + const record: IssueRecord = { + repo, + number: issue.number, + type, + state: issue.state, + title, + labels: issue.labels.map((l) => l.name), + milestone: issue.milestone?.title ?? "", + assignees: issue.assignees.map((a) => a.login), + bodyHash: embeddingSucceeded ? bodyHash : "", + createdAt: issue.created_at, + updatedAt: issue.updated_at, + }; + + await storeStub.fetch( + new Request("http://store/upsert", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(record), + }), + ); + + return { + embedded: embeddingSucceeded, + skippedUnchanged: false, + metadataUpdated: false, + failed: !embeddingSucceeded, + }; +} diff --git a/src/pipeline/embed-release.ts b/src/pipeline/embed-release.ts new file mode 100644 index 0000000..ce52519 --- /dev/null +++ b/src/pipeline/embed-release.ts @@ -0,0 +1,168 @@ +/** + * Release embedding + upsert pipeline. + * + * Owns the GitHub release data shape (`GitHubReleaseData`) and the + * `processAndUpsertRelease` flow: hash-based change detection, + * Vectorize upsert under the "r" prefix, and FTS5 mirror writes. + */ + +import type { Env, ReleaseRecord } from "../types.js"; +import { upsertFtsRow } from "../fts.js"; +import { computeBodyHash, prepareEmbeddingInput } from "./hash.js"; +import { generateEmbedding } from "./embedding.js"; +import { releaseVectorId } from "./vector-id.js"; +import type { UpsertResult } from "./types.js"; + +/** GitHub API release response shape (subset of fields we need) */ +export interface GitHubReleaseData { + id: number; + tag_name: string; + name: string | null; + body: string | null; + prerelease: boolean; + created_at: string; + published_at: string | null; + html_url: string; +} + +/** + * Process and upsert a single release: check hash, embed if changed, upsert to Vectorize + Store. + * + * @param env - Worker env bindings (AI, VECTORIZE) + * @param storeStub - Durable Object stub for IssueStore + * @param repo - Repository in "owner/repo" format + * @param release - GitHub release data + * @returns UpsertResult indicating what happened + */ +export async function processAndUpsertRelease( + env: Env, + storeStub: DurableObjectStub, + repo: string, + release: GitHubReleaseData, +): Promise { + const body = release.body ?? ""; + const name = release.name ?? release.tag_name; + const bodyHash = await computeBodyHash(name, body); + + // Check if release body has changed + const existingResp = await storeStub.fetch( + new Request( + `http://store/release?repo=${encodeURIComponent(repo)}&tag_name=${encodeURIComponent(release.tag_name)}`, + ), + ); + + let needsEmbedding = true; + if (existingResp.ok) { + const existing = (await existingResp.json()) as ReleaseRecord; + if (existing.bodyHash === bodyHash) { + needsEmbedding = false; + } + } + + if (!needsEmbedding) { + // Hash matched — store record but skip embedding + const record: ReleaseRecord = { + repo, + tagName: release.tag_name, + name, + body, + prerelease: release.prerelease, + bodyHash, + createdAt: release.created_at, + publishedAt: release.published_at ?? release.created_at, + }; + + await storeStub.fetch( + new Request("http://store/upsert-release", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(record), + }), + ); + + return { embedded: false, skippedUnchanged: true, metadataUpdated: false, failed: false }; + } + + // Content changed — generate embedding + let embeddingSucceeded = false; + try { + const embeddingInput = prepareEmbeddingInput(name, body); + const embedding = await generateEmbedding(env.AI, embeddingInput); + + const metadata: Record = { + repo, + number: 0, + type: "release", + state: "published", + labels: "", + milestone: "", + assignees: "", + updated_at: release.published_at ?? release.created_at, + tag_name: release.tag_name, + }; + + const rvid = await releaseVectorId(repo, release.tag_name); + await env.VECTORIZE.upsert([ + { + id: rvid, + values: embedding, + metadata, + }, + ]); + + // Mirror into D1 FTS5 sparse index. + try { + await upsertFtsRow(env.DB_FTS, { + vectorId: rvid, + repo, + type: "release", + state: "published", + labels: "", + milestone: "", + assignees: "", + updatedAt: release.published_at ?? release.created_at, + tagName: release.tag_name, + content: embeddingInput, + }); + } catch (ftsErr) { + console.error( + `Failed to upsert FTS5 row for release ${repo}#${release.tag_name}:`, + ftsErr instanceof Error ? ftsErr.message : String(ftsErr), + ); + } + + embeddingSucceeded = true; + } catch (err) { + console.error( + `Failed to embed release ${repo}#${release.tag_name}:`, + err instanceof Error ? err.message : String(err), + ); + } + + // Store record + const record: ReleaseRecord = { + repo, + tagName: release.tag_name, + name, + body, + prerelease: release.prerelease, + bodyHash: embeddingSucceeded ? bodyHash : "", + createdAt: release.created_at, + publishedAt: release.published_at ?? release.created_at, + }; + + await storeStub.fetch( + new Request("http://store/upsert-release", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(record), + }), + ); + + return { + embedded: embeddingSucceeded, + skippedUnchanged: false, + metadataUpdated: false, + failed: !embeddingSucceeded, + }; +} diff --git a/src/pipeline/embedding.ts b/src/pipeline/embedding.ts new file mode 100644 index 0000000..4e87936 --- /dev/null +++ b/src/pipeline/embedding.ts @@ -0,0 +1,69 @@ +/** + * Workers AI embedding wrappers and batching constants. + * + * Wraps the BGE-M3 model behind `generateEmbedding` (single text) and + * `generateEmbeddingBatch` (multi-text). Callers must chunk inputs by + * `MAX_EMBEDDING_BATCH_SIZE`; the helper does not split internally. + */ + +/** Maximum characters for embedding input (BGE-M3 context limit ~8192 tokens, conservative char limit) */ +export const MAX_EMBEDDING_INPUT_CHARS = 8000; + +/** + * Maximum number of inputs per Workers AI batch embed call. + * Cloudflare Workers AI does not publish a hard cap on batched embedding inputs, + * so we split large commits into multiple calls. 20 × 8000 chars ≈ 160k chars per + * call keeps payload size comfortably inside observed request limits. + */ +export const MAX_EMBEDDING_BATCH_SIZE = 20; + +/** + * Maximum number of vectors per single Vectorize.upsert call. + * We mirror MAX_EMBEDDING_BATCH_SIZE so each embed batch maps 1:1 onto one upsert. + */ +export const MAX_VECTORIZE_UPSERT_BATCH_SIZE = 20; + +/** + * Generate embedding for a text input using Workers AI BGE-M3. + * Returns 1024-dimensional float array. + */ +export async function generateEmbedding( + ai: Ai, + text: string, +): Promise { + const result = await ai.run("@cf/baai/bge-m3", { + text: [text], + }); + + // Workers AI returns { data: [{ values: number[] }] } or similar + const vectors = (result as { data: Array }).data; + if (!vectors || vectors.length === 0) { + throw new Error("Workers AI returned no embedding vectors"); + } + return vectors[0]; +} + +/** + * Generate embeddings for multiple text inputs in one batched Workers AI call. + * Input order is preserved in the returned array. + * + * Workers AI does not publish a hard limit on the number of inputs per call, + * so callers must chunk by MAX_EMBEDDING_BATCH_SIZE before invoking this + * function. Throws if the returned vector count does not match the input count. + */ +export async function generateEmbeddingBatch( + ai: Ai, + texts: string[], +): Promise { + if (texts.length === 0) return []; + + const result = await ai.run("@cf/baai/bge-m3", { text: texts }); + const vectors = (result as { data: Array }).data; + + if (!vectors || vectors.length !== texts.length) { + throw new Error( + `Workers AI returned ${vectors?.length ?? 0} vectors for ${texts.length} inputs`, + ); + } + return vectors; +} diff --git a/src/pipeline/hash.ts b/src/pipeline/hash.ts new file mode 100644 index 0000000..4c17496 --- /dev/null +++ b/src/pipeline/hash.ts @@ -0,0 +1,96 @@ +/** + * Hashing and embedding-input preparation helpers. + * + * Houses the SHA-256 body hashers used for change detection, the per-surface + * input formatters (issue/PR, diff, comment), and the public URL-safe base64 + * encoder. The byte-mode base64 helper used internally by vector-id sits + * alongside `stableVectorId` to keep its only call site local. + */ + +import { MAX_EMBEDDING_INPUT_CHARS } from "./embedding.js"; + +/** + * Compute SHA-256 hash of title + body for change detection. + * Returns hex-encoded hash string. + */ +export async function computeBodyHash(title: string, body: string): Promise { + const input = title + "\n\n" + body; + const data = new TextEncoder().encode(input); + const hashBuffer = await crypto.subtle.digest("SHA-256", data); + const hashArray = new Uint8Array(hashBuffer); + return Array.from(hashArray) + .map((b) => b.toString(16).padStart(2, "0")) + .join(""); +} + +/** + * Prepare embedding input text from issue title and body. + * Concatenates title + "\n\n" + body, truncated to MAX_EMBEDDING_INPUT_CHARS. + */ +export function prepareEmbeddingInput(title: string, body: string | null): string { + const text = title + "\n\n" + (body ?? ""); + if (text.length <= MAX_EMBEDDING_INPUT_CHARS) return text; + return text.slice(0, MAX_EMBEDDING_INPUT_CHARS); +} + +/** + * Encode an arbitrary string to URL-safe base64 (RFC 4648 §5) without padding. + * Retained because `stableVectorId` uses it to encode the SHA-256 digest. + */ +export function base64UrlEncode(input: string): string { + // Encode UTF-8 -> binary string -> base64 via btoa + const utf8Bytes = new TextEncoder().encode(input); + let binary = ""; + for (let i = 0; i < utf8Bytes.length; i++) { + binary += String.fromCharCode(utf8Bytes[i]); + } + return btoa(binary) + .replace(/\+/g, "-") + .replace(/\//g, "_") + .replace(/=+$/g, ""); +} + +/** + * Compute SHA-256 over UTF-8 bytes of the wiki content. Used as the change + * detection signal in lieu of git blob SHAs (the wiki git protocol is not + * exposed via REST, so we hash content directly). + */ +export async function sha256Hex(text: string): Promise { + const bytes = new TextEncoder().encode(text); + const digest = await crypto.subtle.digest("SHA-256", bytes); + const hex = Array.from(new Uint8Array(digest)) + .map((b) => b.toString(16).padStart(2, "0")) + .join(""); + return hex; +} + +/** + * Build the embedding input for a single file-in-commit. + * Format: "{commitMessage}\n\n{filePath}\n\n{patch}", truncated to MAX_EMBEDDING_INPUT_CHARS. + * The file path is included inline so semantic search can match against it + * even when the patch body alone does not mention it. + */ +export function prepareDiffEmbeddingInput( + commitMessage: string, + filePath: string, + patch: string, +): string { + const text = `${commitMessage}\n\n${filePath}\n\n${patch}`; + if (text.length <= MAX_EMBEDDING_INPUT_CHARS) return text; + return text.slice(0, MAX_EMBEDDING_INPUT_CHARS); +} + +/** + * Build the embedding input for a comment / review body. + * Format: "{author}\n\n{body}", truncated to MAX_EMBEDDING_INPUT_CHARS. + * The author prefix supplies speaker context so the dense embedding can + * distinguish the same body authored by different reviewers. + */ +export function prepareCommentEmbeddingInput( + author: string, + body: string, +): string { + const text = `${author}\n\n${body}`; + if (text.length <= MAX_EMBEDDING_INPUT_CHARS) return text; + return text.slice(0, MAX_EMBEDDING_INPUT_CHARS); +} diff --git a/src/pipeline/ingest-filter.ts b/src/pipeline/ingest-filter.ts new file mode 100644 index 0000000..37dcc6b --- /dev/null +++ b/src/pipeline/ingest-filter.ts @@ -0,0 +1,33 @@ +/** + * Ingest filters shared by the webhook + poller paths. + * + * Centralises the noise-floor predicates that decide whether a freshly + * arrived comment / review body is worth embedding (bot author, body length). + */ + +/** + * Minimum trimmed body length for comment / review ingest. + * Filters out "LGTM", "+1", emoji-only reactions, etc. + */ +export const MIN_COMMENT_BODY_CHARS = 10; + +/** + * Returns true when the login looks like a GitHub App / bot account. + * + * Bot accounts end in the `[bot]` suffix on the sender.login field. We + * filter them out because bot-authored comments (CI notes, dependabot + * summaries, auto-merge status) add noise without judgment history. + */ +export function isBotSender(login: string | null | undefined): boolean { + if (!login) return false; + return /\[bot\]$/.test(login); +} + +/** + * Returns true when the body is too short (or empty) to carry judgment + * history. Trim first so whitespace-only payloads count as empty. + */ +export function isBodyTooShort(body: string | null | undefined): boolean { + if (!body) return true; + return body.trim().length < MIN_COMMENT_BODY_CHARS; +} diff --git a/src/pipeline/types.ts b/src/pipeline/types.ts new file mode 100644 index 0000000..e4368fc --- /dev/null +++ b/src/pipeline/types.ts @@ -0,0 +1,19 @@ +/** + * Shared result types reused across per-surface embed modules. + * + * Surface-specific result shapes (DiffUpsertResult, CommentUpsertResult) live + * with their owning module. This file holds only the generic `UpsertResult` + * used by issue / release / doc / wiki paths. + */ + +/** Result of a single-item upsert operation */ +export interface UpsertResult { + /** Whether embedding was generated (vs skipped because hash unchanged) */ + embedded: boolean; + /** Whether embedding was skipped because content hash matched existing record */ + skippedUnchanged: boolean; + /** Whether Vectorize metadata was updated without re-embedding (state/labels/assignees change) */ + metadataUpdated: boolean; + /** Whether embedding failed (item stored with empty bodyHash for retry) */ + failed: boolean; +} diff --git a/src/pipeline/vector-id.ts b/src/pipeline/vector-id.ts new file mode 100644 index 0000000000000000000000000000000000000000..c8d9cb99900919ecdd0741e069c2f0326b14fb03 GIT binary patch literal 4243 zcmcgvZEhPk5bbZDVxR`-DwbAG>bQXu7f7524G^S;6E{DSAf|RHiOp)cA-R@S!$9uP z-l6yCQF4;LA-OBrv7K&z)PNw1AH$h9ZytxE+qXC9HvK9yr;Yku(ux0%o;{&)t%_V4 zOA~EKo=CGKRp#(;%-Np9ypffrQebA2D&b}$C#EAi6C^BEg-|6qdFn`%Icaz|DfB|- z-q}KzQ@_Qstwg5Nl#cbHlxD<6zs+?v#K(?!;c%h~$;YJg{Fr9S(GT0ie%pRZuO*dw zL4ZM>ND^!=mrk0wDwTCAqlr3|`49%CZstK-(oEQyf&&~w46PEzSz0KpdHM6B;oW;X z>^57IWu|jpjiAQD%AM`n6qIa#PuWZucHt4(nKq8xvhtYWM25~va{`oO9@vzXI7T=H zs2P(FQdZVG;j|M4)ySQp?VS+YfEKbSkWi~~=~t~~tI6ttG{blRAIY)WT|SDar+P%6 zKmAF{+FH_xsv72u{8&rt;*vZr0n~BC#1SJ4iD2XCB91Q92SuyJSsc#ve2%Q-YBHgO zlht@FC`|Pmm#|~qrGrCqx}vT?0C2}`krStu9LL={b7T$P0?nT0&Pwtcj* zF$zWV6Xy;q#3EqIIXpUiKD1&YV=^f@e7XW@gQ$>>#;O!3^gZhJ9$<(|KjCqOp{)n_dKh9&1LZNYx!L!- zsoA@^M=!n6bYk>;A2i+vOyTf1>Rq4hp5tSrt4&E`r$rKuMI!N+qDBj7OEP-9Ihqcr zH|%{eG4d1dznIwDyya(lIOV)e)MHjM2+|FudzApP``EQ!c(H0lN!p;CkhlUmKzh>P z=qo*?17GmMNcs(_!ZKg`f?gawZ|anh-8$Xc^6a%5H7Fd!23eRYr{;o%Q_m}1g0+y) zl5;VI24S?op}x#Bqiw4Jwkg|RYFB|{SPv_^bbJg+t>gP7!2c{yz<}Uw!^{Cj;Frg>8GG|?1I#@@dX$w91A~9`B+PV)bf}W!-$Ru_ zc2mwk())%fK>&{z_(H&f5|#Z5mq;iQlk`)s$1?F2);6%FKdvVeDCi!Eg@7JqW?4C% z+Is8?nTCjyUaZ{SAe^ygqQ&IO(qvCob(yTr>w6QiR*rAAxx95244xl<$@FWHLhIaMd-#k%O872K<%NF<4w!Hw7DwuCw<-*dQF5mKSI) zS|qMx?lrNSHuh+!Q_#3f>pAqJW7N2~Z;BdL+lViO>wB&y(r!7<+c>$^>lCDK8!9pN z%_#kQBBSkCMMess`Pi< z<9$$97Y5HfxdFMUT<1wQ^cHLA1v{M(ywG9c!a-NN>azHE?6W%32))z_;L*lqrQn9H z!N@%P@J9-Q>1ytSJ#yW$-}58zDXq-s5RgqOrwDv`%B4 zFZ&%q;~xGdepTks_wF|V7P|ZYju(8mgjZv+GhA1`Hz=v_2J;UQ|Ne=1b%yFI{A!^3 OKPZML)+NX;QS2X-o_Jya literal 0 HcmV?d00001