Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 102 additions & 4 deletions packages/cloudflare/src/cli/commands/populate-cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,25 @@ async function populateR2IncrementalCache(
* @returns Resolves when all entries have been written successfully.
* @throws {Error} If any entry fails after all retries or encounters a non-retryable error.
*/
/**
* Resume support for R2 cache population.
*
* Uses a progress directory named after the worker and buildId to avoid
* collisions between projects or builds:
* /tmp/opennext-cache-{workerName}-{buildId}/
* total.txt — entry count (validates stale state)
* failed-at.txt — index of the entry that exhausted retries
*
* On crash (retry exhaustion), the failed entry index is written to disk.
* On restart, entries before that index are skipped (R2 PUT is idempotent,
* so re-uploading the ~maxConcurrency entries around the boundary is safe).
* On success, the progress directory is deleted.
* On a new build (different buildId), stale directories are cleaned up.
*/
function getProgressDir(workerName: string, buildId: string): string {
return path.join(os.tmpdir(), `opennext-cache-${workerName}-${buildId}`);
}

async function sendEntriesToR2Worker(options: {
workerUrl: string;
assets: CacheAsset[];
Expand All @@ -349,13 +368,73 @@ async function sendEntriesToR2Worker(options: {
filename: fullPath,
}));

// Derive workerName and buildId from the first entry's key for the progress dir.
// Key format: "incremental-cache/{buildId}/{hash}.cache"
const firstKey = entries[0]?.key ?? "";
const keyParts = firstKey.split("/");
const buildId = keyParts[1] ?? "unknown";
const workerName = "opennext"; // Generic; could be derived from wrangler config in the future.
const progressDir = getProgressDir(workerName, buildId);

// --- Clean up stale progress dirs from previous builds ---
const progressPrefix = `opennext-cache-${workerName}-`;
try {
for (const entry of fs.readdirSync(os.tmpdir())) {
if (entry.startsWith(progressPrefix) && entry !== path.basename(progressDir)) {
fs.rmSync(path.join(os.tmpdir(), entry), { recursive: true, force: true });
}
}
} catch {
// ignore — tmpdir listing failure is non-fatal
}

// --- Check for resume state ---
let resumeFromIndex = 0;

if (fs.existsSync(progressDir)) {
const totalFile = path.join(progressDir, "total.txt");
const failedAtFile = path.join(progressDir, "failed-at.txt");

if (fs.existsSync(totalFile)) {
const savedTotal = parseInt(fs.readFileSync(totalFile, "utf8").trim(), 10);

if (savedTotal === entries.length && fs.existsSync(failedAtFile)) {
const failedAt = parseInt(fs.readFileSync(failedAtFile, "utf8").trim(), 10);
if (!isNaN(failedAt) && failedAt > 0 && failedAt < entries.length) {
resumeFromIndex = failedAt;
logger.info(
`Resuming from index ${resumeFromIndex} (${entries.length - resumeFromIndex} of ${entries.length} remaining)`
);
}
} else if (savedTotal !== entries.length) {
// Stale — entry count changed (different build content)
logger.info(`Stale progress (${savedTotal} vs ${entries.length} entries), starting fresh`);
fs.rmSync(progressDir, { recursive: true, force: true });
}
}
}

// Create progress dir and write total on fresh start
if (!fs.existsSync(progressDir)) {
fs.mkdirSync(progressDir, { recursive: true });
}
fs.writeFileSync(path.join(progressDir, "total.txt"), String(entries.length));

const remaining = entries.slice(resumeFromIndex);
if (resumeFromIndex > 0) {
logger.info(`Skipping ${resumeFromIndex} already-uploaded entries`);
}

// Use a concurrency-limited loop with a progress bar.
// `pending` tracks in-flight promises so we can cap concurrency.
const pending = new Set<Promise<void>>();

let concurrency = 1;
let idx = 0;

for (const entry of tqdm(remaining)) {
const globalIndex = resumeFromIndex + idx;
idx++;

for (const entry of tqdm(entries)) {
const task = sendEntryToR2Worker({
workerUrl,
key: entry.key,
Expand All @@ -366,7 +445,13 @@ async function sendEntriesToR2Worker(options: {

// If we've reached the concurrency limit, wait for one to finish.
if (pending.size >= concurrency) {
await Promise.race(pending);
try {
await Promise.race(pending);
} catch (e) {
// A single entry exhausted its retries. Save resume point and re-throw.
fs.writeFileSync(path.join(progressDir, "failed-at.txt"), String(globalIndex));
throw e;
}
// Increase concurrency gradually to avoid overwhelming the worker
// with too many requests at once.
if (concurrency < maxConcurrency) {
Expand All @@ -375,7 +460,20 @@ async function sendEntriesToR2Worker(options: {
}
}

await Promise.all(pending);
try {
await Promise.all(pending);
} catch (e) {
// Save the approximate index where failure occurred
fs.writeFileSync(path.join(progressDir, "failed-at.txt"), String(resumeFromIndex + idx));
throw e;
}

// All entries uploaded successfully — clean up progress dir
try {
fs.rmSync(progressDir, { recursive: true, force: true });
} catch {
// ignore
}
}

class RetryableWorkerError extends Error {}
Expand Down