-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Expand file tree
/
Copy pathwaitpointCompletionPacket.server.ts
More file actions
69 lines (61 loc) · 2 KB
/
waitpointCompletionPacket.server.ts
File metadata and controls
69 lines (61 loc) · 2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import { type IOPacket, packetRequiresOffloading, tryCatch } from "@trigger.dev/core/v3";
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { env } from "~/env.server";
import { uploadPacketToObjectStore } from "~/v3/objectStore.server";
import { logger } from "~/services/logger.server";
import { ServiceValidationError } from "~/v3/services/common.server";
function packetExtensionForDataType(dataType: string): string {
switch (dataType) {
case "application/json":
case "application/super+json":
return "json";
case "text/plain":
return "txt";
default:
return "txt";
}
}
/**
* Offloads large waitpoint completion payloads to object store (same threshold and
* upload path pattern as DefaultPayloadProcessor). Object key prefix should use the
* waitpoint friendly id folder, e.g. `${WaitpointId.toFriendlyId(internalId)}/token`.
* Replaces no-op conditionallyExportPacket usage in webapp routes where apiClientManager is unset.
*/
export async function processWaitpointCompletionPacket(
packet: IOPacket,
environment: AuthenticatedEnvironment,
pathPrefix: string
): Promise<IOPacket> {
if (!packet.data) {
return packet;
}
const { needsOffloading, size } = packetRequiresOffloading(
packet,
env.TASK_PAYLOAD_OFFLOAD_THRESHOLD
);
if (!needsOffloading) {
return packet;
}
const filename = `${pathPrefix}.${packetExtensionForDataType(packet.dataType)}`;
const [uploadError, uploadedFilename] = await tryCatch(
uploadPacketToObjectStore(
filename,
packet.data,
packet.dataType,
environment,
env.OBJECT_STORE_DEFAULT_PROTOCOL
)
);
if (uploadError) {
logger.error("Failed to upload large waitpoint to object store", {
error: uploadError,
filename,
environmentId: environment.id,
});
throw new ServiceValidationError("Failed to upload large waitpoint to object store", 500);
}
return {
data: uploadedFilename!,
dataType: "application/store",
};
}