Skip to content

Commit 3910781

Browse files
committed
fix(schedule-engine): centralize lastScheduleTime cron-prev fallback
External callers of registerNextTaskScheduleInstance — the deploy-time declarative schedule sync, schedule upsert (cron change / activate), and recovery — all called with just `{ instanceId }`, no lastScheduleTime. That replaced the in-flight Redis job's payload with one having lastScheduleTime: undefined, so the next fire fell back to instance.lastScheduledTimestamp (a column this PR stops writing). On every subsequent app deploy, customers would have seen one stale fire per schedule, in perpetuity — the staleness compounding as the unmaintained DB column drifted further from reality. Move the cron-prev derivation inside registerNextTaskScheduleInstance itself: when the caller doesn't pass lastScheduleTime, derive from the cron expression's previous slot (guarded against the slot predating the instance's createdAt — preserves first-fire `undefined` semantics for brand-new schedules). Internal callers (after-fire, after-skip) keep passing explicit values so their carry-forward semantics are unchanged. Also drops the duplicate cron-prev block from `#recoverTaskScheduleInstance` — recovery now relies on the centralized fallback inside register. Two new tests: - "should derive lastScheduleTime from cron when external callers omit it" — exercises the deploy/upsert pattern on a long-running instance. - "should leave lastScheduleTime undefined for brand-new schedules" — preserves the first-run sentinel (`if (!payload.lastTimestamp)`) customers rely on. Per Devin review on PR #3476.
1 parent c9569e7 commit 3910781

2 files changed

Lines changed: 229 additions & 35 deletions

File tree

internal-packages/schedule-engine/src/engine/index.ts

Lines changed: 39 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -198,16 +198,43 @@ export class ScheduleEngine {
198198
timezone: instance.taskSchedule.timezone,
199199
});
200200

201-
// Enqueue the scheduled task. The next job's `lastScheduleTime`
202-
// payload is the *actual* previous fire time (passed in by the
203-
// caller), not `fromTimestamp` — `fromTimestamp` advances on every
204-
// tick (including skips) so it can't be used as the previous-fire
205-
// anchor without leaking skipped slots into customer-visible
206-
// payload.lastTimestamp.
201+
// Determine the lastScheduleTime to embed in the next worker job's
202+
// payload. If the caller passed it explicitly (the after-fire path
203+
// does this with the just-fired timestamp, the after-skip path
204+
// carries the existing value forward), use that. Otherwise — every
205+
// external caller (deploy sync, schedule upsert, recovery) — derive
206+
// from the cron expression's previous slot.
207+
//
208+
// Without this fallback, every deploy / cron edit would clobber the
209+
// existing in-flight job's lastScheduleTime with `undefined`, and
210+
// the next fire would surface a frozen DB-column value to the
211+
// customer (since this PR stops writing that column). Pure cron
212+
// math, no DB read on top of the existing instance load — the
213+
// recovery loop already pays the cost of loading the instance.
214+
let lastScheduleTime = params.lastScheduleTime;
215+
if (lastScheduleTime === undefined) {
216+
try {
217+
const cronPrev = previousScheduledTimestamp(
218+
instance.taskSchedule.generatorExpression,
219+
instance.taskSchedule.timezone
220+
);
221+
// Guarded against the cron's previous slot predating the
222+
// instance itself — for a brand-new schedule, the slot is from
223+
// before the schedule existed, so `undefined` is the honest
224+
// answer (preserves the `if (!payload.lastTimestamp)` first-run
225+
// sentinel customers rely on).
226+
if (cronPrev.getTime() > instance.createdAt.getTime()) {
227+
lastScheduleTime = cronPrev;
228+
}
229+
} catch {
230+
// Malformed cron — leave undefined.
231+
}
232+
}
233+
207234
await this.enqueueScheduledTask(
208235
params.instanceId,
209236
nextScheduledTimestamp,
210-
params.lastScheduleTime
237+
lastScheduleTime
211238
);
212239

213240
// Record metrics
@@ -800,39 +827,16 @@ export class ScheduleEngine {
800827
return "skipped";
801828
}
802829

803-
// Approximate the previous fire from the cron expression itself rather
804-
// than reading state from the DB. For a continuously-running schedule
805-
// this equals the actual last fire time. For paused-then-resumed
806-
// schedules or recently-edited cron expressions the value will be
807-
// approximate — same trade-off the dashboard "Last run" cell accepts.
808-
// Guarded against the schedule not having existed long enough to have
809-
// fired (cron's previous slot before instance creation), and against
810-
// cron-parser throwing on malformed expressions. Pure cron math, no DB
811-
// read — recovery fan-outs (Redis crash, restart storms) must not add
812-
// load to hot tables.
813-
let lastScheduleTime: Date | undefined;
814-
try {
815-
const cronPrev = previousScheduledTimestamp(
816-
schedule.generatorExpression,
817-
schedule.timezone
818-
);
819-
if (cronPrev.getTime() > instance.createdAt.getTime()) {
820-
lastScheduleTime = cronPrev;
821-
}
822-
} catch {
823-
lastScheduleTime = undefined;
824-
}
825-
826830
this.logger.debug("No job found for instance, registering next run", {
827831
instanceId: instance.id,
828832
schedule,
829-
lastScheduleTime: lastScheduleTime?.toISOString(),
830833
});
831834

832-
await this.registerNextTaskScheduleInstance({
833-
instanceId: instance.id,
834-
lastScheduleTime,
835-
});
835+
// No `lastScheduleTime` passed — `registerNextTaskScheduleInstance`
836+
// will derive it from the cron's previous slot (with a createdAt
837+
// guard) so the post-recovery fire reports an accurate
838+
// `payload.lastTimestamp`.
839+
await this.registerNextTaskScheduleInstance({ instanceId: instance.id });
836840

837841
return "recovered";
838842
}

internal-packages/schedule-engine/test/scheduleRecovery.test.ts

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,4 +386,194 @@ describe("Schedule Recovery", () => {
386386
}
387387
}
388388
);
389+
390+
// External-caller backward-compat. Deploy sync (`syncDeclarativeSchedules`)
391+
// and schedule upsert both call `registerNextTaskScheduleInstance` with no
392+
// `lastScheduleTime`. They run on every app deploy and on every cron edit.
393+
// For an existing-and-firing schedule, the call must NOT clobber the
394+
// worker payload's `lastScheduleTime` with `undefined` — otherwise the
395+
// next fire would surface a stale frozen DB-column value to the customer
396+
// (since this PR stops writing that column). The function must derive a
397+
// sensible `lastScheduleTime` from the cron expression's previous slot
398+
// when the caller doesn't pass one.
399+
containerTest(
400+
"should derive lastScheduleTime from cron when external callers omit it",
401+
{ timeout: 30_000 },
402+
async ({ prisma, redisOptions }) => {
403+
const engine = new ScheduleEngine({
404+
prisma,
405+
redis: redisOptions,
406+
distributionWindow: { seconds: 10 },
407+
worker: { concurrency: 1, disabled: true, pollIntervalMs: 1000 },
408+
tracer: trace.getTracer("test", "0.0.0"),
409+
onTriggerScheduledTask: async () => ({ success: true }),
410+
isDevEnvironmentConnectedHandler: vi.fn().mockResolvedValue(true),
411+
});
412+
413+
try {
414+
const organization = await prisma.organization.create({
415+
data: { title: "External Caller Org", slug: "external-caller-org" },
416+
});
417+
const project = await prisma.project.create({
418+
data: {
419+
name: "External Caller Project",
420+
slug: "external-caller-project",
421+
externalRef: "external-caller-ref",
422+
organizationId: organization.id,
423+
},
424+
});
425+
const environment = await prisma.runtimeEnvironment.create({
426+
data: {
427+
slug: "external-caller-env",
428+
type: "PRODUCTION",
429+
projectId: project.id,
430+
organizationId: organization.id,
431+
apiKey: "tr_external_1234",
432+
pkApiKey: "pk_external_1234",
433+
shortcode: "external-short",
434+
},
435+
});
436+
const taskSchedule = await prisma.taskSchedule.create({
437+
data: {
438+
friendlyId: "sched_external_caller",
439+
taskIdentifier: "external-caller-task",
440+
projectId: project.id,
441+
deduplicationKey: "external-caller-dedup",
442+
userProvidedDeduplicationKey: false,
443+
generatorExpression: "*/5 * * * *",
444+
generatorDescription: "Every 5 minutes",
445+
timezone: "UTC",
446+
type: "DECLARATIVE",
447+
active: true,
448+
},
449+
});
450+
451+
// Backdate the instance so the cron's previous slot postdates
452+
// createdAt — this simulates a long-running schedule, the case
453+
// Devin flagged (deploy clobbers lastScheduleTime, post-deploy fire
454+
// would otherwise read from a frozen DB column).
455+
const longAgo = new Date(Date.now() - 24 * 60 * 60 * 1000);
456+
const scheduleInstance = await prisma.taskScheduleInstance.create({
457+
data: {
458+
taskScheduleId: taskSchedule.id,
459+
environmentId: environment.id,
460+
projectId: project.id,
461+
active: true,
462+
},
463+
});
464+
await prisma.taskScheduleInstance.update({
465+
where: { id: scheduleInstance.id },
466+
data: { createdAt: longAgo },
467+
});
468+
469+
// External-caller pattern — no lastScheduleTime.
470+
await engine.registerNextTaskScheduleInstance({
471+
instanceId: scheduleInstance.id,
472+
});
473+
474+
const job = await engine.getJob(`scheduled-task-instance:${scheduleInstance.id}`);
475+
expect(job).not.toBeNull();
476+
// The function should have derived lastScheduleTime from cron,
477+
// putting a real timestamp into the worker payload rather than
478+
// undefined. The Redis worker stores payloads as JSON, so the value
479+
// is a string when read back here — Zod re-coerces it to Date on
480+
// dequeue (workerCatalog uses `z.coerce.date()`).
481+
const enqueuedLastScheduleTime = (job?.item as { lastScheduleTime?: string })
482+
.lastScheduleTime;
483+
expect(enqueuedLastScheduleTime).toBeDefined();
484+
const derived = new Date(enqueuedLastScheduleTime!);
485+
// The derived value should match the cron's previous slot — for
486+
// `*/5 * * * *`, a 5-minute boundary in the recent past.
487+
expect(derived.getTime()).toBeLessThan(Date.now());
488+
expect(derived.getUTCSeconds()).toBe(0);
489+
expect(derived.getUTCMinutes() % 5).toBe(0);
490+
} finally {
491+
await engine.quit();
492+
}
493+
}
494+
);
495+
496+
// Brand-new schedules must NOT receive a cron-derived lastScheduleTime —
497+
// the cron's previous slot predates the instance, so it's not a real
498+
// previous fire. The first-run sentinel (`if (!payload.lastTimestamp)`)
499+
// must keep working.
500+
containerTest(
501+
"should leave lastScheduleTime undefined for brand-new schedules",
502+
{ timeout: 30_000 },
503+
async ({ prisma, redisOptions }) => {
504+
const engine = new ScheduleEngine({
505+
prisma,
506+
redis: redisOptions,
507+
distributionWindow: { seconds: 10 },
508+
worker: { concurrency: 1, disabled: true, pollIntervalMs: 1000 },
509+
tracer: trace.getTracer("test", "0.0.0"),
510+
onTriggerScheduledTask: async () => ({ success: true }),
511+
isDevEnvironmentConnectedHandler: vi.fn().mockResolvedValue(true),
512+
});
513+
514+
try {
515+
const organization = await prisma.organization.create({
516+
data: { title: "Brand New Org", slug: "brand-new-org" },
517+
});
518+
const project = await prisma.project.create({
519+
data: {
520+
name: "Brand New Project",
521+
slug: "brand-new-project",
522+
externalRef: "brand-new-ref",
523+
organizationId: organization.id,
524+
},
525+
});
526+
const environment = await prisma.runtimeEnvironment.create({
527+
data: {
528+
slug: "brand-new-env",
529+
type: "PRODUCTION",
530+
projectId: project.id,
531+
organizationId: organization.id,
532+
apiKey: "tr_brandnew_1234",
533+
pkApiKey: "pk_brandnew_1234",
534+
shortcode: "brandnew-short",
535+
},
536+
});
537+
const taskSchedule = await prisma.taskSchedule.create({
538+
data: {
539+
friendlyId: "sched_brand_new",
540+
taskIdentifier: "brand-new-task",
541+
projectId: project.id,
542+
deduplicationKey: "brand-new-dedup",
543+
userProvidedDeduplicationKey: false,
544+
// Hourly cron — the previous slot is plausibly minutes-to-an-hour
545+
// ago, comfortably predating an instance just created.
546+
generatorExpression: "0 * * * *",
547+
generatorDescription: "Hourly",
548+
timezone: "UTC",
549+
type: "DECLARATIVE",
550+
active: true,
551+
},
552+
});
553+
const scheduleInstance = await prisma.taskScheduleInstance.create({
554+
data: {
555+
taskScheduleId: taskSchedule.id,
556+
environmentId: environment.id,
557+
projectId: project.id,
558+
active: true,
559+
},
560+
});
561+
562+
await engine.registerNextTaskScheduleInstance({
563+
instanceId: scheduleInstance.id,
564+
});
565+
566+
const job = await engine.getJob(`scheduled-task-instance:${scheduleInstance.id}`);
567+
expect(job).not.toBeNull();
568+
const enqueuedLastScheduleTime = (job?.item as { lastScheduleTime?: Date }).lastScheduleTime;
569+
// Brand-new schedule: cron's previous slot predates instance.createdAt,
570+
// so the function leaves lastScheduleTime undefined — the first fire
571+
// will report `payload.lastTimestamp: undefined` and customer first-run
572+
// sentinel patterns keep working.
573+
expect(enqueuedLastScheduleTime).toBeUndefined();
574+
} finally {
575+
await engine.quit();
576+
}
577+
}
578+
);
389579
});

0 commit comments

Comments
 (0)