From 1f3baac95ebbaf5df8192f80c6ced4a65d086178 Mon Sep 17 00:00:00 2001 From: beobungbu Date: Thu, 2 Apr 2026 14:51:34 +0700 Subject: [PATCH] feat: add resume support for R2 cache population When populating a large R2 cache (100K+ entries), the process can fail due to transient 502/503 errors from the worker proxy. Currently, a failure requires restarting from entry 0, wasting hours of upload time. This adds a lightweight resume mechanism: - Progress directory: /tmp/opennext-cache-{worker}-{buildId}/ - total.txt: entry count (detects stale state from different builds) - failed-at.txt: index of the entry that exhausted retries - On fatal error (retry exhaustion), the failed entry index is saved. - On restart, entries before that index are skipped. - On success, the progress directory is deleted. - Stale directories from previous builds are auto-cleaned. Zero overhead during normal upload (no file writes per entry). Only writes to disk on crash (1 write) and on resume check (1 read). Fixes #1173 Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/cli/commands/populate-cache.ts | 106 +++++++++++++++++- 1 file changed, 102 insertions(+), 4 deletions(-) diff --git a/packages/cloudflare/src/cli/commands/populate-cache.ts b/packages/cloudflare/src/cli/commands/populate-cache.ts index 16a12fb49..faef9330b 100644 --- a/packages/cloudflare/src/cli/commands/populate-cache.ts +++ b/packages/cloudflare/src/cli/commands/populate-cache.ts @@ -329,6 +329,25 @@ async function populateR2IncrementalCache( * @returns Resolves when all entries have been written successfully. * @throws {Error} If any entry fails after all retries or encounters a non-retryable error. */ +/** + * Resume support for R2 cache population. + * + * Uses a progress directory named after the worker and buildId to avoid + * collisions between projects or builds: + * /tmp/opennext-cache-{workerName}-{buildId}/ + * total.txt — entry count (validates stale state) + * failed-at.txt — index of the entry that exhausted retries + * + * On crash (retry exhaustion), the failed entry index is written to disk. + * On restart, entries before that index are skipped (R2 PUT is idempotent, + * so re-uploading the ~maxConcurrency entries around the boundary is safe). + * On success, the progress directory is deleted. + * On a new build (different buildId), stale directories are cleaned up. + */ +function getProgressDir(workerName: string, buildId: string): string { + return path.join(os.tmpdir(), `opennext-cache-${workerName}-${buildId}`); +} + async function sendEntriesToR2Worker(options: { workerUrl: string; assets: CacheAsset[]; @@ -349,13 +368,73 @@ async function sendEntriesToR2Worker(options: { filename: fullPath, })); + // Derive workerName and buildId from the first entry's key for the progress dir. + // Key format: "incremental-cache/{buildId}/{hash}.cache" + const firstKey = entries[0]?.key ?? ""; + const keyParts = firstKey.split("/"); + const buildId = keyParts[1] ?? "unknown"; + const workerName = "opennext"; // Generic; could be derived from wrangler config in the future. + const progressDir = getProgressDir(workerName, buildId); + + // --- Clean up stale progress dirs from previous builds --- + const progressPrefix = `opennext-cache-${workerName}-`; + try { + for (const entry of fs.readdirSync(os.tmpdir())) { + if (entry.startsWith(progressPrefix) && entry !== path.basename(progressDir)) { + fs.rmSync(path.join(os.tmpdir(), entry), { recursive: true, force: true }); + } + } + } catch { + // ignore — tmpdir listing failure is non-fatal + } + + // --- Check for resume state --- + let resumeFromIndex = 0; + + if (fs.existsSync(progressDir)) { + const totalFile = path.join(progressDir, "total.txt"); + const failedAtFile = path.join(progressDir, "failed-at.txt"); + + if (fs.existsSync(totalFile)) { + const savedTotal = parseInt(fs.readFileSync(totalFile, "utf8").trim(), 10); + + if (savedTotal === entries.length && fs.existsSync(failedAtFile)) { + const failedAt = parseInt(fs.readFileSync(failedAtFile, "utf8").trim(), 10); + if (!isNaN(failedAt) && failedAt > 0 && failedAt < entries.length) { + resumeFromIndex = failedAt; + logger.info( + `Resuming from index ${resumeFromIndex} (${entries.length - resumeFromIndex} of ${entries.length} remaining)` + ); + } + } else if (savedTotal !== entries.length) { + // Stale — entry count changed (different build content) + logger.info(`Stale progress (${savedTotal} vs ${entries.length} entries), starting fresh`); + fs.rmSync(progressDir, { recursive: true, force: true }); + } + } + } + + // Create progress dir and write total on fresh start + if (!fs.existsSync(progressDir)) { + fs.mkdirSync(progressDir, { recursive: true }); + } + fs.writeFileSync(path.join(progressDir, "total.txt"), String(entries.length)); + + const remaining = entries.slice(resumeFromIndex); + if (resumeFromIndex > 0) { + logger.info(`Skipping ${resumeFromIndex} already-uploaded entries`); + } + // Use a concurrency-limited loop with a progress bar. // `pending` tracks in-flight promises so we can cap concurrency. const pending = new Set>(); - let concurrency = 1; + let idx = 0; + + for (const entry of tqdm(remaining)) { + const globalIndex = resumeFromIndex + idx; + idx++; - for (const entry of tqdm(entries)) { const task = sendEntryToR2Worker({ workerUrl, key: entry.key, @@ -366,7 +445,13 @@ async function sendEntriesToR2Worker(options: { // If we've reached the concurrency limit, wait for one to finish. if (pending.size >= concurrency) { - await Promise.race(pending); + try { + await Promise.race(pending); + } catch (e) { + // A single entry exhausted its retries. Save resume point and re-throw. + fs.writeFileSync(path.join(progressDir, "failed-at.txt"), String(globalIndex)); + throw e; + } // Increase concurrency gradually to avoid overwhelming the worker // with too many requests at once. if (concurrency < maxConcurrency) { @@ -375,7 +460,20 @@ async function sendEntriesToR2Worker(options: { } } - await Promise.all(pending); + try { + await Promise.all(pending); + } catch (e) { + // Save the approximate index where failure occurred + fs.writeFileSync(path.join(progressDir, "failed-at.txt"), String(resumeFromIndex + idx)); + throw e; + } + + // All entries uploaded successfully — clean up progress dir + try { + fs.rmSync(progressDir, { recursive: true, force: true }); + } catch { + // ignore + } } class RetryableWorkerError extends Error {}