Skip to content

Commit 56aeaf0

Browse files
committed
emit more stuff
1 parent 04b2039 commit 56aeaf0

24 files changed

Lines changed: 997 additions & 180 deletions

apps/webapp/app/routes/api.v1.runs.$runId.tags.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { z } from "zod";
44
import { prisma } from "~/db.server";
55
import { createTag, getTagsForRunId, MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server";
66
import { authenticateApiRequest } from "~/services/apiAuth.server";
7-
import { emitRunTagsUpdated } from "~/services/runsDashboardInstance.server";
7+
import { runsDashboard } from "~/services/runsDashboardInstance.server";
88

99
const ParamsSchema = z.object({
1010
runId: z.string(),
@@ -95,13 +95,14 @@ export async function action({ request, params }: ActionFunctionArgs) {
9595
},
9696
});
9797

98-
emitRunTagsUpdated({
98+
runsDashboard.emit.runTagsUpdated({
9999
time: new Date(),
100100
run: {
101101
id: taskRun.id,
102102
tags: taskRun.runTags,
103103
status: taskRun.status,
104104
updatedAt: taskRun.updatedAt,
105+
createdAt: taskRun.createdAt,
105106
},
106107
organization: {
107108
id: authenticationResult.environment.organizationId,

apps/webapp/app/services/runsDashboardInstance.server.ts

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,54 +24,54 @@ import { tryCatch } from "@trigger.dev/core/utils";
2424

2525
const runDashboardEventBus: RunDashboardEventBus = new EventEmitter<RunDashboardEvents>();
2626

27-
export function emitRunStatusChanged(event: RunDashboardEventRunStatusChanged) {
27+
function emitRunStatusChanged(event: RunDashboardEventRunStatusChanged) {
2828
runDashboardEventBus.emit("runStatusChanged", event);
2929
}
3030

31-
export function emitRunCreated(time: Date, runId: string) {
31+
function emitRunCreated(time: Date, runId: string) {
3232
runDashboardEventBus.emit("runCreated", {
3333
time,
3434
runId,
3535
});
3636
}
3737

38-
export function emitRunAttemptStarted(event: RunDashboardEventRunAttemptStarted) {
38+
function emitRunAttemptStarted(event: RunDashboardEventRunAttemptStarted) {
3939
runDashboardEventBus.emit("runAttemptStarted", event);
4040
}
4141

42-
export function emitRunFailed(event: RunDashboardEventRunFailed) {
42+
function emitRunFailed(event: RunDashboardEventRunFailed) {
4343
runDashboardEventBus.emit("runFailed", event);
4444
}
4545

46-
export function emitRunSucceeded(event: RunDashboardEventRunSucceeded) {
46+
function emitRunSucceeded(event: RunDashboardEventRunSucceeded) {
4747
runDashboardEventBus.emit("runSucceeded", event);
4848
}
4949

50-
export function emitRunCancelled(event: RunDashboardEventRunCancelled) {
50+
function emitRunCancelled(event: RunDashboardEventRunCancelled) {
5151
runDashboardEventBus.emit("runCancelled", event);
5252
}
5353

54-
export function emitRunRetryScheduled(event: RunDashboardEventRunRetryScheduled) {
54+
function emitRunRetryScheduled(event: RunDashboardEventRunRetryScheduled) {
5555
runDashboardEventBus.emit("runRetryScheduled", event);
5656
}
5757

58-
export function emitRunDelayRescheduled(event: RunDashboardEventRunDelayRescheduled) {
58+
function emitRunDelayRescheduled(event: RunDashboardEventRunDelayRescheduled) {
5959
runDashboardEventBus.emit("runDelayRescheduled", event);
6060
}
6161

62-
export function emitRunLocked(event: RunDashboardEventRunLocked) {
62+
function emitRunLocked(event: RunDashboardEventRunLocked) {
6363
runDashboardEventBus.emit("runLocked", event);
6464
}
6565

66-
export function emitRunExpired(event: RunDashboardEventRunExpired) {
66+
function emitRunExpired(event: RunDashboardEventRunExpired) {
6767
runDashboardEventBus.emit("runExpired", event);
6868
}
6969

70-
export function emitRunTagsUpdated(event: RunDashboardEventRunTagsUpdated) {
70+
function emitRunTagsUpdated(event: RunDashboardEventRunTagsUpdated) {
7171
runDashboardEventBus.emit("runTagsUpdated", event);
7272
}
7373

74-
export function emitRunEnqueuedAfterDelay(event: RunDashboardEventRunEnqueuedAfterDelay) {
74+
function emitRunEnqueuedAfterDelay(event: RunDashboardEventRunEnqueuedAfterDelay) {
7575
runDashboardEventBus.emit("runEnqueuedAfterDelay", event);
7676
}
7777

@@ -230,7 +230,23 @@ export const runsDashboard = singleton("runsDashboard", () => {
230230
runDashboardEventBus.emit("runExpired", event);
231231
});
232232

233-
return service;
233+
return {
234+
service,
235+
emit: {
236+
runStatusChanged: emitRunStatusChanged,
237+
runCreated: emitRunCreated,
238+
runAttemptStarted: emitRunAttemptStarted,
239+
runFailed: emitRunFailed,
240+
runSucceeded: emitRunSucceeded,
241+
runCancelled: emitRunCancelled,
242+
runRetryScheduled: emitRunRetryScheduled,
243+
runDelayRescheduled: emitRunDelayRescheduled,
244+
runLocked: emitRunLocked,
245+
runExpired: emitRunExpired,
246+
runTagsUpdated: emitRunTagsUpdated,
247+
runEnqueuedAfterDelay: emitRunEnqueuedAfterDelay,
248+
},
249+
};
234250
});
235251

236252
async function runCreated(time: Date, runId: string, service: RunsDashboardService) {

apps/webapp/app/services/runsDashboardService.server.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ export class RunsDashboardService {
2323
attempt: event.run.attemptNumber ?? 1,
2424
event_time: event.time.getTime(),
2525
updated_at: event.run.updatedAt.getTime(),
26+
created_at: event.run.createdAt.getTime(),
2627
base_cost_in_cents: event.run.baseCostInCents,
2728
executed_at: event.run.executedAt ? event.run.executedAt.getTime() : undefined,
2829
event_name: "attempt_started",
@@ -47,6 +48,7 @@ export class RunsDashboardService {
4748
status: event.run.status,
4849
event_time: event.time.getTime(),
4950
updated_at: event.run.updatedAt.getTime(),
51+
created_at: event.run.createdAt.getTime(),
5052
event_name: "enqueued_after_delay",
5153
});
5254

@@ -69,6 +71,7 @@ export class RunsDashboardService {
6971
status: event.run.status,
7072
event_time: event.time.getTime(),
7173
updated_at: event.run.updatedAt.getTime(),
74+
created_at: event.run.createdAt.getTime(),
7275
delay_until: event.run.delayUntil ? event.run.delayUntil.getTime() : undefined,
7376
event_name: "delay_rescheduled",
7477
});
@@ -92,6 +95,7 @@ export class RunsDashboardService {
9295
status: event.run.status,
9396
event_time: event.time.getTime(),
9497
updated_at: event.run.updatedAt.getTime(),
98+
created_at: event.run.createdAt.getTime(),
9599
base_cost_in_cents: event.run.baseCostInCents,
96100
task_version: event.run.taskVersion ?? undefined,
97101
sdk_version: event.run.sdkVersion ?? undefined,
@@ -124,6 +128,7 @@ export class RunsDashboardService {
124128
status: event.run.status,
125129
event_time: event.time.getTime(),
126130
updated_at: event.run.updatedAt.getTime(),
131+
created_at: event.run.createdAt.getTime(),
127132
event_name: "status_changed",
128133
});
129134

@@ -146,6 +151,7 @@ export class RunsDashboardService {
146151
status: event.run.status,
147152
event_time: event.time.getTime(),
148153
updated_at: event.run.updatedAt.getTime(),
154+
created_at: event.run.createdAt.getTime(),
149155
expired_at: event.run.expiredAt ? event.run.expiredAt.getTime() : undefined,
150156
event_name: "run_expired",
151157
});
@@ -171,6 +177,7 @@ export class RunsDashboardService {
171177
status: event.run.status,
172178
event_time: event.time.getTime(),
173179
updated_at: event.run.updatedAt.getTime(),
180+
created_at: event.run.createdAt.getTime(),
174181
completed_at: event.run.completedAt ? event.run.completedAt.getTime() : undefined,
175182
usage_duration_ms: event.run.usageDurationMs,
176183
cost_in_cents: event.run.costInCents,
@@ -198,6 +205,7 @@ export class RunsDashboardService {
198205
status: event.run.status,
199206
event_time: event.time.getTime(),
200207
updated_at: event.run.updatedAt.getTime(),
208+
created_at: event.run.createdAt.getTime(),
201209
completed_at: event.run.completedAt ? event.run.completedAt.getTime() : undefined,
202210
error: event.run.error,
203211
attempt: event.run.attemptNumber,
@@ -225,6 +233,7 @@ export class RunsDashboardService {
225233
status: event.run.status,
226234
event_time: event.time.getTime(),
227235
updated_at: event.run.updatedAt.getTime(),
236+
created_at: event.run.createdAt.getTime(),
228237
machine_preset: event.run.nextMachineAfterOOM ?? undefined,
229238
attempt: event.run.attemptNumber,
230239
error: event.run.error,
@@ -250,6 +259,7 @@ export class RunsDashboardService {
250259
status: event.run.status,
251260
event_time: event.time.getTime(),
252261
updated_at: event.run.updatedAt.getTime(),
262+
created_at: event.run.createdAt.getTime(),
253263
completed_at: event.run.completedAt ? event.run.completedAt.getTime() : undefined,
254264
error: event.run.error ? (event.run.error as TaskRunError) : undefined,
255265
attempt: event.run.attemptNumber,
@@ -275,6 +285,7 @@ export class RunsDashboardService {
275285
status: event.run.status,
276286
event_time: event.time.getTime(),
277287
updated_at: event.run.updatedAt.getTime(),
288+
created_at: event.run.createdAt.getTime(),
278289
tags: event.run.tags,
279290
event_name: "tags_updated",
280291
});
@@ -439,6 +450,7 @@ export type RunDashboardEvents = {
439450
tags: string[];
440451
status: TaskRunStatus;
441452
updatedAt: Date;
453+
createdAt: Date;
442454
};
443455
organization: {
444456
id: string;

apps/webapp/app/v3/marqs/devQueueConsumer.server.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import { findQueueInEnvironment, sanitizeQueueName } from "~/models/taskQueue.se
1616
import { RedisClient, createRedisClient } from "~/redis.server";
1717
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
1818
import { logger } from "~/services/logger.server";
19-
import { emitRunLocked } from "~/services/runsDashboardInstance.server";
19+
import { runsDashboard } from "~/services/runsDashboardInstance.server";
2020
import { marqs } from "~/v3/marqs/index.server";
2121
import { resolveVariablesForEnvironment } from "../environmentVariables/environmentVariablesRepository.server";
2222
import { FailedTaskRunService } from "../failedTaskRun.server";
@@ -543,11 +543,12 @@ export class DevQueueConsumer {
543543
messageId: message.messageId,
544544
});
545545

546-
emitRunLocked({
546+
runsDashboard.emit.runLocked({
547547
time: new Date(),
548548
run: {
549549
id: lockedTaskRun.id,
550550
updatedAt: lockedTaskRun.updatedAt,
551+
createdAt: lockedTaskRun.createdAt,
551552
status: lockedTaskRun.status,
552553
lockedAt,
553554
lockedById: backgroundTask.id,

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import { findEnvironmentById } from "~/models/runtimeEnvironment.server";
3737
import { findQueueInEnvironment, sanitizeQueueName } from "~/models/taskQueue.server";
3838
import { generateJWTTokenForEnvironment } from "~/services/apiAuth.server";
3939
import { logger } from "~/services/logger.server";
40-
import { emitRunLocked, emitRunStatusChanged } from "~/services/runsDashboardInstance.server";
40+
import { runsDashboard } from "~/services/runsDashboardInstance.server";
4141
import { singleton } from "~/utils/singleton";
4242
import { marqs } from "~/v3/marqs/index.server";
4343
import {
@@ -930,12 +930,13 @@ export class SharedQueueConsumer {
930930
});
931931

932932
if (lockedTaskRun.organizationId) {
933-
emitRunLocked({
933+
runsDashboard.emit.runLocked({
934934
time: new Date(),
935935
run: {
936936
id: lockedTaskRun.id,
937937
status: lockedTaskRun.status,
938938
updatedAt: lockedTaskRun.updatedAt,
939+
createdAt: lockedTaskRun.createdAt,
939940
lockedAt,
940941
lockedById: backgroundTask.id,
941942
lockedToVersionId: worker.id,
@@ -1478,12 +1479,13 @@ export class SharedQueueConsumer {
14781479
});
14791480

14801481
if (run.organizationId) {
1481-
emitRunStatusChanged({
1482+
runsDashboard.emit.runStatusChanged({
14821483
time: new Date(),
14831484
run: {
14841485
id: runId,
14851486
status: "WAITING_FOR_DEPLOY",
14861487
updatedAt: run.updatedAt,
1488+
createdAt: run.createdAt,
14871489
},
14881490
organization: {
14891491
id: run.organizationId,

apps/webapp/app/v3/services/completeAttempt.server.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import { PrismaClientOrTransaction } from "~/db.server";
2121
import { env } from "~/env.server";
2222
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
2323
import { logger } from "~/services/logger.server";
24-
import { emitRunRetryScheduled } from "~/services/runsDashboardInstance.server";
24+
import { runsDashboard } from "~/services/runsDashboardInstance.server";
2525
import { safeJsonParse } from "~/utils/json";
2626
import { marqs } from "~/v3/marqs/index.server";
2727
import { createExceptionPropertiesFromError, eventRepository } from "../eventRepository.server";
@@ -617,7 +617,7 @@ export class CompleteAttemptService extends BaseService {
617617
},
618618
});
619619

620-
emitRunRetryScheduled({
620+
runsDashboard.emit.runRetryScheduled({
621621
time: new Date(),
622622
run: {
623623
id: taskRunAttempt.taskRunId,
@@ -630,6 +630,7 @@ export class CompleteAttemptService extends BaseService {
630630
taskIdentifier: taskRunAttempt.taskRun.taskIdentifier,
631631
baseCostInCents: taskRunAttempt.taskRun.baseCostInCents,
632632
updatedAt: taskRunAttempt.taskRun.updatedAt,
633+
createdAt: taskRunAttempt.taskRun.createdAt,
633634
error,
634635
},
635636
organization: {

apps/webapp/app/v3/services/createTaskRunAttempt.server.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import { findQueueInEnvironment } from "~/models/taskQueue.server";
66
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
77
import { logger } from "~/services/logger.server";
88
import { reportInvocationUsage } from "~/services/platform.v3.server";
9-
import { emitRunAttemptStarted } from "~/services/runsDashboardInstance.server";
9+
import { runsDashboard } from "~/services/runsDashboardInstance.server";
1010
import { generateFriendlyId } from "../friendlyIdentifiers";
1111
import { machinePresetFromConfig, machinePresetFromRun } from "../machinePresets.server";
1212
import { FINAL_RUN_STATUSES } from "../taskStatus";
@@ -182,7 +182,7 @@ export class CreateTaskRunAttemptService extends BaseService {
182182
});
183183
}
184184

185-
emitRunAttemptStarted({
185+
runsDashboard.emit.runAttemptStarted({
186186
time: new Date(),
187187
run: {
188188
id: taskRun.id,

apps/webapp/app/v3/services/enqueueDelayedRun.server.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { parseNaturalLanguageDuration } from "@trigger.dev/core/v3/isomorphic";
22
import { logger } from "~/services/logger.server";
3-
import { emitRunEnqueuedAfterDelay } from "~/services/runsDashboardInstance.server";
3+
import { runsDashboard } from "~/services/runsDashboardInstance.server";
44
import { workerQueue } from "~/services/worker.server";
55
import { commonWorker } from "../commonWorker.server";
66
import { BaseService } from "./baseService.server";
@@ -100,13 +100,14 @@ export class EnqueueDelayedRunService extends BaseService {
100100
}
101101

102102
if (run.organizationId) {
103-
emitRunEnqueuedAfterDelay({
103+
runsDashboard.emit.runEnqueuedAfterDelay({
104104
time: new Date(),
105105
run: {
106106
id: run.id,
107107
status: run.status,
108108
queuedAt: run.queuedAt ?? new Date(),
109109
updatedAt: run.updatedAt,
110+
createdAt: run.createdAt,
110111
},
111112
organization: {
112113
id: run.organizationId,

apps/webapp/app/v3/services/executeTasksWaitingForDeploy.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { PrismaClientOrTransaction } from "~/db.server";
22
import { env } from "~/env.server";
33
import { logger } from "~/services/logger.server";
4-
import { emitRunStatusChanged } from "~/services/runsDashboardInstance.server";
4+
import { runsDashboard } from "~/services/runsDashboardInstance.server";
55
import { workerQueue } from "~/services/worker.server";
66
import { marqs } from "~/v3/marqs/index.server";
77
import { BaseService } from "./baseService.server";
@@ -52,6 +52,8 @@ export class ExecuteTasksWaitingForDeployService extends BaseService {
5252
taskIdentifier: true,
5353
concurrencyKey: true,
5454
queue: true,
55+
updatedAt: true,
56+
createdAt: true,
5557
},
5658
take: maxCount + 1,
5759
});
@@ -80,12 +82,13 @@ export class ExecuteTasksWaitingForDeployService extends BaseService {
8082
}
8183

8284
for (const run of runsWaitingForDeploy) {
83-
emitRunStatusChanged({
85+
runsDashboard.emit.runStatusChanged({
8486
time: new Date(),
8587
run: {
8688
id: run.id,
8789
status: run.status,
88-
updatedAt: new Date(),
90+
updatedAt: run.updatedAt,
91+
createdAt: run.createdAt,
8992
},
9093
organization: {
9194
id: backgroundWorker.runtimeEnvironment.organizationId,

0 commit comments

Comments
 (0)