-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Expand file tree
/
Copy pathindexWorkerManifest.ts
More file actions
118 lines (108 loc) · 3.24 KB
/
indexWorkerManifest.ts
File metadata and controls
118 lines (108 loc) · 3.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import { execPathForRuntime } from "@trigger.dev/core/v3/build";
import {
TaskIndexingImportError,
TaskMetadataParseError,
UncaughtExceptionError,
} from "@trigger.dev/core/v3/errors";
import {
BuildRuntime,
indexerToWorkerMessages,
WorkerManifest,
} from "@trigger.dev/core/v3/schemas";
import { parseMessageFromCatalog } from "@trigger.dev/core/v3/zodMessageHandler";
import { fork } from "node:child_process";
export type IndexWorkerManifestOptions = {
runtime: BuildRuntime;
indexWorkerPath: string;
buildManifestPath: string;
nodeOptions?: string;
env: Record<string, string | undefined>;
cwd?: string;
otelHookInclude?: string[];
otelHookExclude?: string[];
handleStdout?: (data: string) => void;
handleStderr?: (data: string) => void;
};
export async function indexWorkerManifest({
runtime,
indexWorkerPath,
buildManifestPath,
nodeOptions,
env: $env,
cwd,
otelHookInclude,
otelHookExclude,
handleStderr,
handleStdout,
}: IndexWorkerManifestOptions) {
return await new Promise<WorkerManifest>((resolve, reject) => {
let resolved = false;
const child = fork(indexWorkerPath, {
stdio: [/*stdin*/ "ignore", /*stdout*/ "pipe", /*stderr*/ "pipe", "ipc"],
cwd,
env: {
...$env,
OTEL_IMPORT_HOOK_INCLUDES: otelHookInclude?.join(","),
OTEL_IMPORT_HOOK_EXCLUDES: otelHookExclude?.join(","),
TRIGGER_BUILD_MANIFEST_PATH: buildManifestPath,
NODE_OPTIONS: nodeOptions,
TRIGGER_INDEXING: "1",
},
execPath: execPathForRuntime(runtime),
});
// Set a timeout to kill the child process if it doesn't respond
const timeout = setTimeout(() => {
if (resolved) {
return;
}
resolved = true;
child.kill("SIGKILL");
reject(new Error("Worker timed out"));
}, 20_000);
child.on("message", async (msg: any) => {
const message = parseMessageFromCatalog(msg, indexerToWorkerMessages);
switch (message.type) {
case "INDEX_COMPLETE": {
clearTimeout(timeout);
resolved = true;
if (message.payload.importErrors.length > 0) {
reject(
new TaskIndexingImportError(message.payload.importErrors, message.payload.manifest)
);
} else {
resolve(message.payload.manifest);
}
child.kill("SIGKILL");
break;
}
case "TASKS_FAILED_TO_PARSE": {
clearTimeout(timeout);
resolved = true;
reject(new TaskMetadataParseError(message.payload.zodIssues, message.payload.tasks));
child.kill("SIGKILL");
break;
}
case "UNCAUGHT_EXCEPTION": {
clearTimeout(timeout);
resolved = true;
reject(new UncaughtExceptionError(message.payload.error, message.payload.origin));
child.kill("SIGKILL");
break;
}
}
});
child.on("exit", (code) => {
if (!resolved) {
clearTimeout(timeout);
resolved = true;
reject(new Error(`Worker exited with code ${code}`));
}
});
child.stdout?.on("data", (data) => {
handleStdout?.(data.toString());
});
child.stderr?.on("data", (data) => {
handleStderr?.(data.toString());
});
});
}