From e59884372fcd3c37a3d95c909f3994f871d515d9 Mon Sep 17 00:00:00 2001 From: Claude Lin & Lay Date: Thu, 7 May 2026 11:01:22 +0900 Subject: [PATCH] fix(poller): cap pollDocs / pollReleases fan-out to fit Worker subrequest budget MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR #144 で `pollComments` の per-repo per-run fetch cap を 10 に引き下げて以降、`:15` cron は安定したが、`:00` LIGHT_CRON で `pollDocs` / `pollReleases` / `pollRepo` を 5 repos 直列に走らせる side が同 root cause で `Too many subrequests by single Worker invocation` を出している (issue #149, 2026-05-07 00:01 JST 観測, 24h で 90 errors)。 LIGHT_CRON は 1 Worker invocation で 5 repos x 3 surfaces を共通の 1000-subrequest budget で回す。1 doc fetch / 1 release upsert あたり ~5 subrequests (Workers AI embed + Vectorize upsert + D1 FTS upsert + Store DO + GH API) を消費するため、surface 内で fan-out cap が無いと一発で 1000 を超える。 本 PR で `MAX_DOC_FETCHES_PER_REPO_PER_RUN = 10` と `MAX_RELEASE_UPSERTS_PER_REPO_PER_RUN = 10` を `src/poller.ts` に追加し、PR #144 と同じ shape (warn log + per-repo counter) で運用する。worst-case 計算: 5 repos x 10 items x ~5 subrequests = 250 / surface, 3 surfaces x 250 = 750 subrequests, 1000 budget に対し 250 の余裕。 ETag watermark の取り扱いも合わせて修正: cap で run を打ち切った場合に新 ETag を保存すると次回 cron が 304 で短絡して未処理分を見落とすため、cap exhausted のときは prior storedEtag を保持 (lastPolledAt のみ更新)。`pollDocs` / `pollReleases` 双方で同様。 Refs #149 --- src/poller.ts | 169 ++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 151 insertions(+), 18 deletions(-) diff --git a/src/poller.ts b/src/poller.ts index afea067..8ca896b 100644 --- a/src/poller.ts +++ b/src/poller.ts @@ -84,6 +84,46 @@ const MAX_COMMENTS_EMBEDDED_PER_REPO = 30; * leaving headroom for per-parent overhead and other pollers. */ const MAX_COMMENT_FETCHES_PER_REPO_PER_RUN = 10; +/** Maximum number of doc-file content fetches the docs poller issues per repo + * per cron run. Each changed `.md` entry triggers `fetchFileContent` (1 GitHub + * Contents API subrequest) followed by `processAndUpsertDoc` (~3-4 internal + * subrequests: Workers AI embed + Vectorize upsert + D1 FTS upsert + Store DO + * call), so a single doc fan-out is ~5 subrequests. Without this cap a repo + * with many changed `.md` files could consume ~250 subrequests on its own + * (50 × 5), and the LIGHT_CRON dispatch bundles 3 surfaces × 5 repos in one + * Worker invocation that shares a single 1000-subrequest budget — easily + * exhausted (issue #149). + * + * Numeric design (issue #149, mirrors PR #144 / issue #134): + * - LIGHT_CRON worst-case = 5 repos × 3 surfaces × per-item fan-out. + * - Target headroom ≤ 800 subrequests (200 reserved for parent cron, error + * retries, store watermark writes). + * - 800 / 5 repos / 3 surfaces ≈ 53 subrequests / surface / repo. + * - 53 / ~5 subrequests-per-item ≈ 10 items. + * - Cap = 10. Worst-case 5 × 10 × 5 = 250 subrequests for the docs surface + * across all repos, well under the per-surface envelope. + * Remaining changed docs are picked up on the next cron run (blob SHA stays + * unchanged in the store until the doc is successfully upserted). */ +const MAX_DOC_FETCHES_PER_REPO_PER_RUN = 10; + +/** Maximum number of release records the releases poller upserts per repo per + * cron run. The GitHub Releases endpoint returns all recent releases in a + * single API call, so the GH-side fetch cost is fixed at 1, but each release + * upsert fans out through `processAndUpsertRelease` to ~4 internal + * subrequests (Workers AI embed + Vectorize upsert + D1 FTS upsert + Store DO + * call). Active repos with many releases blew the LIGHT_CRON 1000-subrequest + * budget when combined with pollDocs and pollRepo (issue #149). + * + * Numeric design (issue #149, mirrors PR #144 / issue #134): + * - Same envelope as `MAX_DOC_FETCHES_PER_REPO_PER_RUN`: 10 items × ~5 + * subrequests × 5 repos ≈ 250 subrequests for the releases surface. + * - Together with the docs cap (250) and pollRepo's existing + * `MAX_EMBEDDINGS_PER_RUN=50` × ~5 ≈ 250, the LIGHT_CRON worst case stays + * around 750 subrequests, well under the 1000 ceiling. + * Remaining releases are stored with empty bodyHash so the next cron run + * retries them (existing pattern in `pollReleases`). */ +const MAX_RELEASE_UPSERTS_PER_REPO_PER_RUN = 10; + /** Maximum number of commits fetched in the forward (webhook-redundancy) phase * of the diff poller per repo per run. * Forward is normally a no-op because the webhook path already indexes new @@ -493,8 +533,46 @@ async function pollReleases( let embedded = 0; let skipped = 0; let failed = 0; + let upsertsIssued = 0; + let upsertBudgetExhausted = false; for (const release of releases) { + // Enforce per-run upsert fan-out cap to keep the LIGHT_CRON Worker + // invocation under its 1000-subrequest budget (issue #149). This sits + // ahead of the embedding cap because each upsert call (even for an + // unchanged release that ends up skipped) still consumes Store DO + // subrequests during change detection. + if (upsertsIssued >= MAX_RELEASE_UPSERTS_PER_REPO_PER_RUN) { + if (!upsertBudgetExhausted) { + upsertBudgetExhausted = true; + console.warn( + `pollReleases: upsert budget reached for ${repo} ` + + `(${MAX_RELEASE_UPSERTS_PER_REPO_PER_RUN} releases). ` + + `Remaining releases will be retried next cron run.`, + ); + } + // Store record with empty bodyHash to trigger retry on next poll + const name = release.name ?? release.tag_name; + const record: ReleaseRecord = { + repo, + tagName: release.tag_name, + name, + body: release.body ?? "", + prerelease: release.prerelease, + bodyHash: "", + createdAt: release.created_at, + publishedAt: release.published_at ?? release.created_at, + }; + await storeStub.fetch( + new Request("http://store/upsert-release", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(record), + }), + ); + continue; + } + // Enforce per-run embedding limit if (embedded >= MAX_EMBEDDINGS_PER_RUN) { if (embedded === MAX_EMBEDDINGS_PER_RUN) { @@ -525,6 +603,7 @@ async function pollReleases( continue; } + upsertsIssued++; const result = await processAndUpsertRelease(env, storeStub, repo, release); if (result.skippedUnchanged) { @@ -536,17 +615,31 @@ async function pollReleases( } } - // Update watermark with ETag - await storeStub.fetch( - new Request("http://store/watermark", { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ repo: watermarkKey, lastPolledAt: new Date().toISOString(), etag: responseEtag }), - }), - ); + // Update watermark with ETag — but skip the ETag write when the run hit + // the per-run upsert cap, otherwise the next cron will short-circuit on + // 304 and never reprocess the deferred (empty-bodyHash) releases (issue + // #149). + if (!upsertBudgetExhausted) { + await storeStub.fetch( + new Request("http://store/watermark", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ repo: watermarkKey, lastPolledAt: new Date().toISOString(), etag: responseEtag }), + }), + ); + } else { + await storeStub.fetch( + new Request("http://store/watermark", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ repo: watermarkKey, lastPolledAt: new Date().toISOString(), etag: storedEtag }), + }), + ); + } console.log( - `${repo} releases: ${releases.length} total, ${embedded} embedded, ${skipped} unchanged, ${failed} failed`, + `${repo} releases: ${releases.length} total, ${embedded} embedded, ${skipped} unchanged, ${failed} failed, ` + + `upserts_issued=${upsertsIssued}/${MAX_RELEASE_UPSERTS_PER_REPO_PER_RUN}`, ); } @@ -736,10 +829,31 @@ async function pollDocs( let embedded = 0; let skipped = docEntries.length - changedEntries.length; let failed = 0; + let fetchesIssued = 0; + let fetchBudgetExhausted = false; const now = new Date().toISOString(); // Process changed/new doc files for (const entry of changedEntries) { + // Enforce per-run fetch fan-out cap to keep the LIGHT_CRON Worker + // invocation under its 1000-subrequest budget (issue #149). Each loop + // iteration costs ~5 subrequests (Contents API fetch + Workers AI embed + + // Vectorize upsert + D1 FTS upsert + Store DO call), so unbounded loops + // on repos with many changed `.md` files exhaust the budget shared with + // pollRepo + pollReleases. + if (fetchesIssued >= MAX_DOC_FETCHES_PER_REPO_PER_RUN) { + if (!fetchBudgetExhausted) { + fetchBudgetExhausted = true; + console.warn( + `pollDocs: fetch budget reached for ${repo} ` + + `(${MAX_DOC_FETCHES_PER_REPO_PER_RUN} doc fetches). ` + + `Remaining changed docs will be retried next cron run.`, + ); + } + // Stop processing — unchanged blobSha in store means next poll retries + break; + } + if (embedded >= MAX_EMBEDDINGS_PER_RUN) { console.warn( `Doc embedding batch limit reached (${MAX_EMBEDDINGS_PER_RUN}). ` + @@ -751,6 +865,7 @@ async function pollDocs( try { // Fetch file content + fetchesIssued++; const content = await fetchFileContent(repo, entry.path, env.GITHUB_TOKEN); const result = await processAndUpsertDoc(env, storeStub, repo, entry.path, content, entry.sha); @@ -796,17 +911,35 @@ async function pollDocs( } } - // Update watermark with ETag - await storeStub.fetch( - new Request("http://store/watermark", { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ repo: watermarkKey, lastPolledAt: now, etag: responseEtag }), - }), - ); + // Update watermark with ETag — but only when the run completed without + // hitting the per-run fetch cap. If we did hit the cap, leftover changed + // docs still need to be processed; persisting the new ETag would cause the + // next cron to short-circuit on 304 and never see them. Skipping the ETag + // update forces a fresh tree fetch next run so `changedEntries` repopulates + // (issue #149). + if (!fetchBudgetExhausted) { + await storeStub.fetch( + new Request("http://store/watermark", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ repo: watermarkKey, lastPolledAt: now, etag: responseEtag }), + }), + ); + } else { + // Still bump lastPolledAt so observability can see the run happened, but + // keep the prior ETag so the next cron re-fetches the tree. + await storeStub.fetch( + new Request("http://store/watermark", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ repo: watermarkKey, lastPolledAt: now, etag: storedEtag }), + }), + ); + } console.log( - `${repo} docs: ${docEntries.length} found, ${embedded} embedded, ${skipped} unchanged, ${failed} failed, ${deletedDocs.length} deleted`, + `${repo} docs: ${docEntries.length} found, ${embedded} embedded, ${skipped} unchanged, ${failed} failed, ${deletedDocs.length} deleted, ` + + `fetches_issued=${fetchesIssued}/${MAX_DOC_FETCHES_PER_REPO_PER_RUN}`, ); }