Skip to content

Commit e2da181

Browse files
committed
WIP on retrieving a queue using the SDK/API
1 parent 4ee85cb commit e2da181

9 files changed

Lines changed: 293 additions & 25 deletions

File tree

apps/webapp/app/presenters/v3/QueueListPresenter.server.ts

Lines changed: 13 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
import { type AuthenticatedEnvironment } from "~/services/apiAuth.server";
2+
import { determineEngineVersion } from "~/v3/engineVersion.server";
23
import { engine } from "~/v3/runEngine.server";
34
import { BasePresenter } from "./basePresenter.server";
4-
import { type TaskQueueType } from "@trigger.dev/database";
5-
import { assertExhaustive } from "@trigger.dev/core";
6-
import { determineEngineVersion } from "~/v3/engineVersion.server";
5+
import { toQueueItem } from "./QueueRetrievePresenter.server";
76

87
const DEFAULT_ITEMS_PER_PAGE = 25;
98
const MAX_ITEMS_PER_PAGE = 100;
@@ -57,6 +56,7 @@ export class QueueListPresenter extends BasePresenter {
5756
runtimeEnvironmentId: environment.id,
5857
},
5958
select: {
59+
friendlyId: true,
6060
name: true,
6161
concurrencyLimit: true,
6262
type: true,
@@ -80,23 +80,15 @@ export class QueueListPresenter extends BasePresenter {
8080
]);
8181

8282
// Transform queues to include running and queued counts
83-
return queues.map((queue) => ({
84-
name: queue.name.replace(/^task\//, ""),
85-
type: queueTypeFromType(queue.type),
86-
running: results[1][queue.name] ?? 0,
87-
queued: results[0][queue.name] ?? 0,
88-
concurrencyLimit: queue.concurrencyLimit ?? null,
89-
}));
90-
}
91-
}
92-
93-
export function queueTypeFromType(type: TaskQueueType) {
94-
switch (type) {
95-
case "NAMED":
96-
return "custom" as const;
97-
case "VIRTUAL":
98-
return "task" as const;
99-
default:
100-
assertExhaustive(type);
83+
return queues.map((queue) =>
84+
toQueueItem({
85+
friendlyId: queue.friendlyId,
86+
name: queue.name,
87+
type: queue.type,
88+
running: results[1][queue.name] ?? 0,
89+
queued: results[0][queue.name] ?? 0,
90+
concurrencyLimit: queue.concurrencyLimit ?? null,
91+
})
92+
);
10193
}
10294
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
import { type AuthenticatedEnvironment } from "~/services/apiAuth.server";
2+
import { engine } from "~/v3/runEngine.server";
3+
import { BasePresenter } from "./basePresenter.server";
4+
import { type TaskQueueType } from "@trigger.dev/database";
5+
import { assertExhaustive } from "@trigger.dev/core";
6+
import { determineEngineVersion } from "~/v3/engineVersion.server";
7+
import { type QueueItem, type RetrieveQueueParam } from "@trigger.dev/core/v3";
8+
9+
export class QueueRetrievePresenter extends BasePresenter {
10+
public async call({
11+
environment,
12+
queueInput,
13+
}: {
14+
environment: AuthenticatedEnvironment;
15+
queueInput: RetrieveQueueParam;
16+
}) {
17+
//check the engine is the correct version
18+
const engineVersion = await determineEngineVersion({ environment });
19+
20+
if (engineVersion === "V1") {
21+
return {
22+
success: false as const,
23+
code: "engine-version",
24+
};
25+
}
26+
27+
const queue = await this.getQueue(environment, queueInput);
28+
if (!queue) {
29+
return {
30+
success: false as const,
31+
code: "queue-not-found",
32+
};
33+
}
34+
35+
const results = await Promise.all([
36+
engine.lengthOfQueues(environment, [queue.name]),
37+
engine.currentConcurrencyOfQueues(environment, [queue.name]),
38+
]);
39+
40+
// Transform queues to include running and queued counts
41+
return {
42+
success: true as const,
43+
queue: toQueueItem({
44+
friendlyId: queue.friendlyId,
45+
name: queue.name,
46+
type: queue.type,
47+
running: results[1]?.[queue.name] ?? 0,
48+
queued: results[0]?.[queue.name] ?? 0,
49+
concurrencyLimit: queue.concurrencyLimit ?? null,
50+
}),
51+
};
52+
}
53+
54+
private async getQueue(environment: AuthenticatedEnvironment, queue: RetrieveQueueParam) {
55+
if (typeof queue === "string") {
56+
return this._replica.taskQueue.findFirst({
57+
where: {
58+
friendlyId: queue,
59+
runtimeEnvironmentId: environment.id,
60+
},
61+
});
62+
}
63+
64+
const queueName =
65+
queue.type === "task" ? `task/${queue.name.replace(/^task\//, "")}` : queue.name;
66+
return this._replica.taskQueue.findFirst({
67+
where: {
68+
name: queueName,
69+
runtimeEnvironmentId: environment.id,
70+
},
71+
});
72+
}
73+
}
74+
75+
function queueTypeFromType(type: TaskQueueType) {
76+
switch (type) {
77+
case "NAMED":
78+
return "custom" as const;
79+
case "VIRTUAL":
80+
return "task" as const;
81+
default:
82+
assertExhaustive(type);
83+
}
84+
}
85+
86+
/**
87+
* Converts raw queue data into a standardized QueueItem format
88+
* @param data Raw queue data containing required queue properties
89+
* @returns A validated QueueItem object
90+
*/
91+
export function toQueueItem(data: {
92+
friendlyId: string;
93+
name: string;
94+
type: TaskQueueType;
95+
running: number;
96+
queued: number;
97+
concurrencyLimit: number | null;
98+
}): QueueItem {
99+
return {
100+
id: data.friendlyId,
101+
//remove the task/ prefix if it exists
102+
name: data.name.replace(/^task\//, ""),
103+
type: queueTypeFromType(data.type),
104+
running: data.running,
105+
queued: data.queued,
106+
concurrencyLimit: data.concurrencyLimit,
107+
};
108+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import { json } from "@remix-run/server-runtime";
2+
import { type QueueItem, type RetrieveQueueParam, RetrieveQueueType } from "@trigger.dev/core/v3";
3+
import { z } from "zod";
4+
import { QueueRetrievePresenter } from "~/presenters/v3/QueueRetrievePresenter.server";
5+
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";
6+
7+
const SearchParamsSchema = z.object({
8+
type: RetrieveQueueType.default("id"),
9+
});
10+
11+
export const loader = createLoaderApiRoute(
12+
{
13+
params: z.object({
14+
queueParam: z.string(),
15+
}),
16+
searchParams: SearchParamsSchema,
17+
findResource: async () => 1, // This is a dummy function, we don't need to find a resource
18+
},
19+
async ({ params, searchParams, authentication }) => {
20+
const input: RetrieveQueueParam =
21+
searchParams.type === "id"
22+
? params.queueParam
23+
: {
24+
type: searchParams.type,
25+
name: decodeURIComponent(params.queueParam),
26+
};
27+
28+
const presenter = new QueueRetrievePresenter();
29+
const result = await presenter.call({
30+
environment: authentication.environment,
31+
queueInput: input,
32+
});
33+
34+
if (!result.success) {
35+
if (result.code === "queue-not-found") {
36+
return json({ error: result.code }, { status: 404 });
37+
}
38+
39+
return json({ error: result.code }, { status: 400 });
40+
}
41+
42+
const q: QueueItem = result.queue;
43+
return json(q);
44+
}
45+
);

apps/webapp/app/routes/api.v1.queues.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { json } from "@remix-run/server-runtime";
2+
import { type QueueItem } from "@trigger.dev/core/v3";
23
import { z } from "zod";
34
import { QueueListPresenter } from "~/presenters/v3/QueueListPresenter.server";
45
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";
@@ -27,7 +28,7 @@ export const loader = createLoaderApiRoute(
2728
return json({ error: result.code }, { status: 400 });
2829
}
2930

30-
const queues = await result.queues;
31+
const queues: QueueItem[] = await result.queues;
3132
return json({ data: queues, pagination: result.pagination }, { status: 200 });
3233
} catch (error) {
3334
if (error instanceof ServiceValidationError) {

apps/webapp/app/services/routeBuilders/apiBuilder.server.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,12 @@ type ApiKeyRouteBuilderOptions<
3939
params: TParamsSchema extends z.ZodFirstPartySchemaTypes | z.ZodDiscriminatedUnion<any, any>
4040
? z.infer<TParamsSchema>
4141
: undefined,
42-
authentication: ApiAuthenticationResultSuccess
42+
authentication: ApiAuthenticationResultSuccess,
43+
searchParams: TSearchParamsSchema extends
44+
| z.ZodFirstPartySchemaTypes
45+
| z.ZodDiscriminatedUnion<any, any>
46+
? z.infer<TSearchParamsSchema>
47+
: undefined
4348
) => Promise<TResource | undefined>;
4449
shouldRetryNotFound?: boolean;
4550
authorization?: {
@@ -179,7 +184,7 @@ export function createLoaderApiRoute<
179184
}
180185

181186
// Find the resource
182-
const resource = await findResource(parsedParams, authenticationResult);
187+
const resource = await findResource(parsedParams, authenticationResult, parsedSearchParams);
183188

184189
if (!resource) {
185190
return await wrapResponse(

packages/core/src/v3/apiClient/index.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import {
2525
ReplayRunResponse,
2626
RescheduleRunRequestBody,
2727
RetrieveBatchV2Response,
28+
RetrieveQueueParam,
2829
RetrieveRunResponse,
2930
ScheduleObject,
3031
TaskRunExecutionResult,
@@ -744,6 +745,21 @@ export class ApiClient {
744745
);
745746
}
746747

748+
retrieveQueue(queue: RetrieveQueueParam, requestOptions?: ZodFetchOptions) {
749+
const type = typeof queue === "string" ? "id" : queue.type;
750+
const value = typeof queue === "string" ? queue : queue.name;
751+
752+
return zodfetch(
753+
QueueItem,
754+
`${this.baseUrl}/api/v1/queues/${encodeURIComponent(value)}?type=${type}`,
755+
{
756+
method: "GET",
757+
headers: this.#getHeaders(false),
758+
},
759+
mergeRequestOptions(this.defaultRequestOptions, requestOptions)
760+
);
761+
}
762+
747763
subscribeToRun<TRunTypes extends AnyRunTypes>(
748764
runId: string,
749765
options?: {

packages/core/src/v3/schemas/queues.ts

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,21 @@
11
import { z } from "zod";
22

3+
const queueTypes = ["task", "custom"] as const;
4+
35
/**
46
* The type of queue, either "task" or "custom"
57
* "task" are created automatically for each task.
68
* "custom" are created by you explicitly in your code.
79
* */
8-
export const QueueType = z.enum(["task", "custom"]);
10+
export const QueueType = z.enum(queueTypes);
911
export type QueueType = z.infer<typeof QueueType>;
1012

13+
export const RetrieveQueueType = z.enum([...queueTypes, "id"]);
14+
export type RetrieveQueueType = z.infer<typeof RetrieveQueueType>;
15+
1116
export const QueueItem = z.object({
17+
/** The queue id, e.g. queue_12345 */
18+
id: z.string(),
1219
/** The queue name */
1320
name: z.string(),
1421
/**
@@ -35,3 +42,35 @@ export const ListQueueOptions = z.object({
3542
});
3643

3744
export type ListQueueOptions = z.infer<typeof ListQueueOptions>;
45+
46+
/**
47+
* When retrieving a queue you can either use the queue id,
48+
* or the type and name.
49+
*
50+
* @example
51+
*
52+
* ```ts
53+
* // Use a queue id (they start with queue_
54+
* const q1 = await queues.retrieve("queue_12345");
55+
*
56+
* // Or use the type and name
57+
* // The default queue for your "my-task-id"
58+
* const q2 = await queues.retrieve({ type: "task", name: "my-task-id"});
59+
*
60+
* // The custom queue you defined in your code
61+
* const q3 = await queues.retrieve({ type: "custom", name: "my-custom-queue" });
62+
* ```
63+
*/
64+
export const RetrieveQueueParam = z.union([
65+
z.string(),
66+
z.object({
67+
/** "task" or "custom" */
68+
type: QueueType,
69+
/** The name of your queue.
70+
* For "task" type it will be the task id, for "custom" it will be the name you specified.
71+
* */
72+
name: z.string(),
73+
}),
74+
]);
75+
76+
export type RetrieveQueueParam = z.infer<typeof RetrieveQueueParam>;

0 commit comments

Comments
 (0)