Skip to content

Commit 99c4a70

Browse files
committed
Improved the mark phase
1 parent 799c47a commit 99c4a70

1 file changed

Lines changed: 26 additions & 21 deletions

File tree

  • internal-packages/run-engine/src/run-queue

internal-packages/run-engine/src/run-queue/index.ts

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import { nanoid } from "nanoid";
4242
import { CronSchema, Worker, type WorkerConcurrencyOptions } from "@trigger.dev/redis-worker";
4343
import { z } from "zod";
4444
import { Readable } from "node:stream";
45+
import { setTimeout } from "node:timers/promises";
4546

4647
const SemanticAttributes = {
4748
QUEUE: "runqueue.queue",
@@ -1823,7 +1824,7 @@ export class RunQueue {
18231824
const deduplicatedRunIds = Array.from(new Set(runIds));
18241825

18251826
const [processError] = await tryCatch(
1826-
this.processCurrencyConcurrencyRunIds(concurrencyKey, deduplicatedRunIds)
1827+
this.processCurrentConcurrencyRunIds(concurrencyKey, deduplicatedRunIds)
18271828
);
18281829

18291830
if (processError) {
@@ -1840,7 +1841,7 @@ export class RunQueue {
18401841
return promise;
18411842
}
18421843

1843-
private async processCurrencyConcurrencyRunIds(concurrencyKey: string, runIds: string[]) {
1844+
private async processCurrentConcurrencyRunIds(concurrencyKey: string, runIds: string[]) {
18441845
this.logger.debug(`Processing concurrency set with ${runIds.length} runs`, {
18451846
concurrencyKey,
18461847
runIds: runIds.slice(0, 5), // Log first 5 for debugging
@@ -1872,18 +1873,18 @@ export class RunQueue {
18721873
const markedForAckKey = this.keys.markedForAckKey();
18731874

18741875
// Prepare arguments: alternating orgId, messageId pairs
1875-
const args: string[] = [];
1876+
const args: Array<number | string> = [];
18761877
for (const run of completedRuns) {
18771878
this.logger.info("Marking run for acknowledgment", {
18781879
orgId: run.orgId,
18791880
runId: run.id,
18801881
});
18811882

1882-
args.push(run.orgId);
1883-
args.push(run.id);
1883+
args.push(Date.now());
1884+
args.push(`${run.orgId}:${run.id}`);
18841885
}
18851886

1886-
const count = await this.redis.markCompletedRunsForAck(markedForAckKey, ...args);
1887+
const count = await this.redis.zadd(markedForAckKey, ...args);
18871888

18881889
this.logger.debug(`Marked ${count} runs for acknowledgment`, {
18891890
markedForAckKey,
@@ -1899,7 +1900,7 @@ export class RunQueue {
18991900

19001901
try {
19011902
const markedForAckKey = this.keys.markedForAckKey();
1902-
const results = await this.redis.getMarkedRunsForAck(markedForAckKey, "10");
1903+
const results = await this.redis.getMarkedRunsForAck(markedForAckKey, "100");
19031904

19041905
if (results.length === 0) {
19051906
return;
@@ -1920,18 +1921,24 @@ export class RunQueue {
19201921
markedRuns: markedRuns, // Log first 3 for debugging
19211922
});
19221923

1923-
// Acknowledge each marked run
1924-
await Promise.allSettled(
1925-
markedRuns.map((run) =>
1926-
this.processMarkedRun(run).catch((error) => {
1927-
this.logger.error("Error acknowledging marked run", {
1928-
error,
1929-
orgId: run.orgId,
1930-
messageId: run.messageId,
1931-
});
1932-
})
1933-
)
1934-
);
1924+
for (const run of markedRuns) {
1925+
const [processError] = await tryCatch(this.processMarkedRun(run));
1926+
1927+
if (processError) {
1928+
this.logger.error("Error processing marked run", {
1929+
error: processError,
1930+
orgId: run.orgId,
1931+
messageId: run.messageId,
1932+
});
1933+
}
1934+
}
1935+
1936+
const shouldProcessMoreRuns = (await this.redis.zcard(markedForAckKey)) > 0;
1937+
1938+
if (shouldProcessMoreRuns) {
1939+
await setTimeout(1000);
1940+
await this.processMarkedRuns();
1941+
}
19351942
} catch (error) {
19361943
this.logger.error("Error processing marked runs", { error });
19371944
}
@@ -2515,8 +2522,6 @@ declare module "@internal/redis" {
25152522
...queueNames: string[]
25162523
): Result<void, Context>;
25172524

2518-
markCompletedRunsForAck(markedForAckKey: string, ...args: string[]): Result<number, Context>;
2519-
25202525
getMarkedRunsForAck(
25212526
markedForAckKey: string,
25222527
maxCount: string,

0 commit comments

Comments
 (0)