Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 151 additions & 18 deletions src/poller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,46 @@ const MAX_COMMENTS_EMBEDDED_PER_REPO = 30;
* leaving headroom for per-parent overhead and other pollers. */
const MAX_COMMENT_FETCHES_PER_REPO_PER_RUN = 10;

/** Maximum number of doc-file content fetches the docs poller issues per repo
* per cron run. Each changed `.md` entry triggers `fetchFileContent` (1 GitHub
* Contents API subrequest) followed by `processAndUpsertDoc` (~3-4 internal
* subrequests: Workers AI embed + Vectorize upsert + D1 FTS upsert + Store DO
* call), so a single doc fan-out is ~5 subrequests. Without this cap a repo
* with many changed `.md` files could consume ~250 subrequests on its own
* (50 × 5), and the LIGHT_CRON dispatch bundles 3 surfaces × 5 repos in one
* Worker invocation that shares a single 1000-subrequest budget — easily
* exhausted (issue #149).
*
* Numeric design (issue #149, mirrors PR #144 / issue #134):
* - LIGHT_CRON worst-case = 5 repos × 3 surfaces × per-item fan-out.
* - Target headroom ≤ 800 subrequests (200 reserved for parent cron, error
* retries, store watermark writes).
* - 800 / 5 repos / 3 surfaces ≈ 53 subrequests / surface / repo.
* - 53 / ~5 subrequests-per-item ≈ 10 items.
* - Cap = 10. Worst-case 5 × 10 × 5 = 250 subrequests for the docs surface
* across all repos, well under the per-surface envelope.
* Remaining changed docs are picked up on the next cron run (blob SHA stays
* unchanged in the store until the doc is successfully upserted). */
const MAX_DOC_FETCHES_PER_REPO_PER_RUN = 10;

/** Maximum number of release records the releases poller upserts per repo per
* cron run. The GitHub Releases endpoint returns all recent releases in a
* single API call, so the GH-side fetch cost is fixed at 1, but each release
* upsert fans out through `processAndUpsertRelease` to ~4 internal
* subrequests (Workers AI embed + Vectorize upsert + D1 FTS upsert + Store DO
* call). Active repos with many releases blew the LIGHT_CRON 1000-subrequest
* budget when combined with pollDocs and pollRepo (issue #149).
*
* Numeric design (issue #149, mirrors PR #144 / issue #134):
* - Same envelope as `MAX_DOC_FETCHES_PER_REPO_PER_RUN`: 10 items × ~5
* subrequests × 5 repos ≈ 250 subrequests for the releases surface.
* - Together with the docs cap (250) and pollRepo's existing
* `MAX_EMBEDDINGS_PER_RUN=50` × ~5 ≈ 250, the LIGHT_CRON worst case stays
* around 750 subrequests, well under the 1000 ceiling.
* Remaining releases are stored with empty bodyHash so the next cron run
* retries them (existing pattern in `pollReleases`). */
const MAX_RELEASE_UPSERTS_PER_REPO_PER_RUN = 10;

/** Maximum number of commits fetched in the forward (webhook-redundancy) phase
* of the diff poller per repo per run.
* Forward is normally a no-op because the webhook path already indexes new
Expand Down Expand Up @@ -493,8 +533,46 @@ async function pollReleases(
let embedded = 0;
let skipped = 0;
let failed = 0;
let upsertsIssued = 0;
let upsertBudgetExhausted = false;

for (const release of releases) {
// Enforce per-run upsert fan-out cap to keep the LIGHT_CRON Worker
// invocation under its 1000-subrequest budget (issue #149). This sits
// ahead of the embedding cap because each upsert call (even for an
// unchanged release that ends up skipped) still consumes Store DO
// subrequests during change detection.
if (upsertsIssued >= MAX_RELEASE_UPSERTS_PER_REPO_PER_RUN) {
if (!upsertBudgetExhausted) {
upsertBudgetExhausted = true;
console.warn(
`pollReleases: upsert budget reached for ${repo} ` +
`(${MAX_RELEASE_UPSERTS_PER_REPO_PER_RUN} releases). ` +
`Remaining releases will be retried next cron run.`,
);
}
// Store record with empty bodyHash to trigger retry on next poll
const name = release.name ?? release.tag_name;
const record: ReleaseRecord = {
repo,
tagName: release.tag_name,
name,
body: release.body ?? "",
prerelease: release.prerelease,
bodyHash: "",
createdAt: release.created_at,
publishedAt: release.published_at ?? release.created_at,
};
await storeStub.fetch(
new Request("http://store/upsert-release", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(record),
}),
);
continue;
}

// Enforce per-run embedding limit
if (embedded >= MAX_EMBEDDINGS_PER_RUN) {
if (embedded === MAX_EMBEDDINGS_PER_RUN) {
Expand Down Expand Up @@ -525,6 +603,7 @@ async function pollReleases(
continue;
}

upsertsIssued++;
const result = await processAndUpsertRelease(env, storeStub, repo, release);

if (result.skippedUnchanged) {
Expand All @@ -536,17 +615,31 @@ async function pollReleases(
}
}

// Update watermark with ETag
await storeStub.fetch(
new Request("http://store/watermark", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ repo: watermarkKey, lastPolledAt: new Date().toISOString(), etag: responseEtag }),
}),
);
// Update watermark with ETag — but skip the ETag write when the run hit
// the per-run upsert cap, otherwise the next cron will short-circuit on
// 304 and never reprocess the deferred (empty-bodyHash) releases (issue
// #149).
if (!upsertBudgetExhausted) {
await storeStub.fetch(
new Request("http://store/watermark", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ repo: watermarkKey, lastPolledAt: new Date().toISOString(), etag: responseEtag }),
}),
);
} else {
await storeStub.fetch(
new Request("http://store/watermark", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ repo: watermarkKey, lastPolledAt: new Date().toISOString(), etag: storedEtag }),
}),
);
}

console.log(
`${repo} releases: ${releases.length} total, ${embedded} embedded, ${skipped} unchanged, ${failed} failed`,
`${repo} releases: ${releases.length} total, ${embedded} embedded, ${skipped} unchanged, ${failed} failed, ` +
`upserts_issued=${upsertsIssued}/${MAX_RELEASE_UPSERTS_PER_REPO_PER_RUN}`,
);
}

Expand Down Expand Up @@ -736,10 +829,31 @@ async function pollDocs(
let embedded = 0;
let skipped = docEntries.length - changedEntries.length;
let failed = 0;
let fetchesIssued = 0;
let fetchBudgetExhausted = false;
const now = new Date().toISOString();

// Process changed/new doc files
for (const entry of changedEntries) {
// Enforce per-run fetch fan-out cap to keep the LIGHT_CRON Worker
// invocation under its 1000-subrequest budget (issue #149). Each loop
// iteration costs ~5 subrequests (Contents API fetch + Workers AI embed +
// Vectorize upsert + D1 FTS upsert + Store DO call), so unbounded loops
// on repos with many changed `.md` files exhaust the budget shared with
// pollRepo + pollReleases.
if (fetchesIssued >= MAX_DOC_FETCHES_PER_REPO_PER_RUN) {
if (!fetchBudgetExhausted) {
fetchBudgetExhausted = true;
console.warn(
`pollDocs: fetch budget reached for ${repo} ` +
`(${MAX_DOC_FETCHES_PER_REPO_PER_RUN} doc fetches). ` +
`Remaining changed docs will be retried next cron run.`,
);
}
// Stop processing — unchanged blobSha in store means next poll retries
break;
}

if (embedded >= MAX_EMBEDDINGS_PER_RUN) {
console.warn(
`Doc embedding batch limit reached (${MAX_EMBEDDINGS_PER_RUN}). ` +
Expand All @@ -751,6 +865,7 @@ async function pollDocs(

try {
// Fetch file content
fetchesIssued++;
const content = await fetchFileContent(repo, entry.path, env.GITHUB_TOKEN);

const result = await processAndUpsertDoc(env, storeStub, repo, entry.path, content, entry.sha);
Expand Down Expand Up @@ -796,17 +911,35 @@ async function pollDocs(
}
}

// Update watermark with ETag
await storeStub.fetch(
new Request("http://store/watermark", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ repo: watermarkKey, lastPolledAt: now, etag: responseEtag }),
}),
);
// Update watermark with ETag — but only when the run completed without
// hitting the per-run fetch cap. If we did hit the cap, leftover changed
// docs still need to be processed; persisting the new ETag would cause the
// next cron to short-circuit on 304 and never see them. Skipping the ETag
// update forces a fresh tree fetch next run so `changedEntries` repopulates
// (issue #149).
if (!fetchBudgetExhausted) {
await storeStub.fetch(
new Request("http://store/watermark", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ repo: watermarkKey, lastPolledAt: now, etag: responseEtag }),
}),
);
} else {
// Still bump lastPolledAt so observability can see the run happened, but
// keep the prior ETag so the next cron re-fetches the tree.
await storeStub.fetch(
new Request("http://store/watermark", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ repo: watermarkKey, lastPolledAt: now, etag: storedEtag }),
}),
);
}

console.log(
`${repo} docs: ${docEntries.length} found, ${embedded} embedded, ${skipped} unchanged, ${failed} failed, ${deletedDocs.length} deleted`,
`${repo} docs: ${docEntries.length} found, ${embedded} embedded, ${skipped} unchanged, ${failed} failed, ${deletedDocs.length} deleted, ` +
`fetches_issued=${fetchesIssued}/${MAX_DOC_FETCHES_PER_REPO_PER_RUN}`,
);
}

Expand Down
Loading