Skip to content

Commit 1c14125

Browse files
committed
replication wip
1 parent 1c00d98 commit 1c14125

4 files changed

Lines changed: 290 additions & 3 deletions

File tree

internal-packages/replication/src/client.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import Redlock, { Lock } from "redlock";
66
import { createRedisClient } from "@internal/redis";
77
import { Logger } from "@trigger.dev/core/logger";
88
import { LogicalReplicationClientError } from "./errors.js";
9-
import { PgoutputParser, getPgoutputStartReplicationSQL } from "./pgoutput.js";
9+
import { PgoutputMessage, PgoutputParser, getPgoutputStartReplicationSQL } from "./pgoutput.js";
1010

1111
export interface LogicalReplicationClientOptions {
1212
/**
@@ -64,7 +64,7 @@ export interface LogicalReplicationClientOptions {
6464
export type LogicalReplicationClientEvents = {
6565
leaderElection: [boolean];
6666
error: [Error];
67-
data: [{ lsn: string; log: unknown }];
67+
data: [{ lsn: string; log: PgoutputMessage }];
6868
start: [];
6969
acknowledge: [{ lsn: string }];
7070
heartbeat: [{ lsn: string; timestamp: number; shouldRespond: boolean }];
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
import { postgresAndRedisTest } from "@internal/testcontainers";
2+
import { createSubscription, Transaction } from "./stream.js";
3+
import { setTimeout } from "timers/promises";
4+
5+
describe("LogicalReplicationStream", () => {
6+
postgresAndRedisTest(
7+
"should group changes by transaction and filter relevant events",
8+
async ({ postgresContainer, prisma, redisOptions }) => {
9+
await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`);
10+
11+
type TaskRunData = {
12+
friendlyId: string;
13+
taskIdentifier: string;
14+
payload: string;
15+
traceId: string;
16+
spanId: string;
17+
queue: string;
18+
runtimeEnvironmentId: string;
19+
projectId: string;
20+
};
21+
22+
const received: Transaction<TaskRunData>[] = [];
23+
24+
const subscription = createSubscription<TaskRunData>({
25+
name: "test_stream",
26+
publicationName: "test_publication_stream",
27+
slotName: "test_slot_stream",
28+
pgConfig: {
29+
connectionString: postgresContainer.getConnectionUri(),
30+
},
31+
table: "TaskRun",
32+
redisOptions,
33+
filterTags: ["insert"],
34+
abortSignal: AbortSignal.timeout(10000),
35+
});
36+
37+
const organization = await prisma.organization.create({
38+
data: {
39+
title: "test",
40+
slug: "test",
41+
},
42+
});
43+
44+
const project = await prisma.project.create({
45+
data: {
46+
name: "test",
47+
slug: "test",
48+
organizationId: organization.id,
49+
externalRef: "test",
50+
},
51+
});
52+
53+
const runtimeEnvironment = await prisma.runtimeEnvironment.create({
54+
data: {
55+
slug: "test",
56+
type: "DEVELOPMENT",
57+
projectId: project.id,
58+
organizationId: organization.id,
59+
apiKey: "test",
60+
pkApiKey: "test",
61+
shortcode: "test",
62+
},
63+
});
64+
65+
// Insert a row into the table
66+
new Promise(async (resolve) => {
67+
await setTimeout(2000);
68+
69+
await prisma.taskRun.create({
70+
data: {
71+
friendlyId: "run_5678",
72+
taskIdentifier: "my-task",
73+
payload: JSON.stringify({ foo: "bar" }),
74+
traceId: "5678",
75+
spanId: "5678",
76+
queue: "test",
77+
runtimeEnvironmentId: runtimeEnvironment.id,
78+
projectId: project.id,
79+
},
80+
});
81+
82+
resolve(undefined);
83+
}).then(() => {});
84+
// Now we want to read from the stream
85+
for await (const transaction of subscription.stream) {
86+
received.push(transaction);
87+
}
88+
89+
console.log(received);
90+
91+
expect(received.length).toBeGreaterThan(0);
92+
const transaction = received[0];
93+
expect(transaction.events.length).toBeGreaterThan(0);
94+
expect(transaction.events[0].data.friendlyId).toBe("run_5678");
95+
96+
// Clean up
97+
await subscription.client.stop();
98+
}
99+
);
100+
});
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
import { createAsyncIterableStreamFromAsyncIterable } from "@trigger.dev/core/v3";
2+
import { Readable } from "node:stream";
3+
import type { ClientConfig } from "pg";
4+
import { LogicalReplicationClient, LogicalReplicationClientOptions } from "./client.js";
5+
import type { MessageDelete, MessageInsert, MessageUpdate, PgoutputMessage } from "./pgoutput.js";
6+
7+
export interface LogicalReplicationStreamOptions extends LogicalReplicationClientOptions {
8+
onError?: (err: Error) => void;
9+
filterTags?: Array<"insert" | "update" | "delete">;
10+
abortSignal?: AbortSignal;
11+
highWaterMark?: number;
12+
}
13+
14+
export interface TransactionEvent<T = any> {
15+
tag: "insert" | "update" | "delete";
16+
data: T;
17+
raw: MessageInsert | MessageUpdate | MessageDelete;
18+
}
19+
20+
export interface Transaction<T = any> {
21+
commitLsn: string | null;
22+
commitEndLsn: string | null;
23+
xid: number;
24+
events: TransactionEvent<T>[];
25+
replicationLagMs: number;
26+
}
27+
28+
export function createLogicalReplicationStream<T>(
29+
client: LogicalReplicationClient,
30+
highWaterMark?: number,
31+
signal?: AbortSignal
32+
) {
33+
let lastLsn: string | null = null;
34+
let isSubscribed = false;
35+
36+
const source = new ReadableStream<{ lsn: string; message: PgoutputMessage }>(
37+
{
38+
async start(controller) {
39+
console.log("ReadableStream.start");
40+
41+
if (signal) {
42+
signal.addEventListener("abort", () => {
43+
controller.close();
44+
});
45+
}
46+
47+
client.events.on("data", async ({ lsn, log }) => {
48+
console.log("ReadableStream.data");
49+
lastLsn = lsn;
50+
51+
if (signal?.aborted) {
52+
return;
53+
}
54+
55+
if (isRelevantTag(log.tag)) {
56+
controller.enqueue({ lsn, message: log });
57+
}
58+
59+
if (typeof controller.desiredSize === "number" && controller.desiredSize <= 0) {
60+
await client.stop();
61+
}
62+
});
63+
},
64+
async cancel() {
65+
console.log("ReadableStream.cancel");
66+
await client.stop();
67+
},
68+
async pull() {
69+
if (!isSubscribed) {
70+
isSubscribed = true;
71+
console.log("ReadableStream.pull");
72+
await client.subscribe(lastLsn ?? undefined);
73+
}
74+
},
75+
},
76+
new CountQueuingStrategy({ highWaterMark: highWaterMark ?? 1 })
77+
);
78+
79+
return createAsyncIterableStreamFromAsyncIterable<Transaction<T>>(groupByTransaction(source));
80+
}
81+
82+
export async function* groupByTransaction<T = any>(
83+
stream: ReadableStream<{
84+
lsn: string;
85+
message: PgoutputMessage;
86+
}>
87+
) {
88+
let currentTransaction: Omit<Transaction<T>, "commitEndLsn" | "replicationLagMs"> & {
89+
commitEndLsn?: string | null;
90+
replicationLagMs?: number;
91+
} = {
92+
commitLsn: null,
93+
xid: 0,
94+
events: [],
95+
};
96+
for await (const { lsn, message } of stream as AsyncIterable<{
97+
lsn: string;
98+
message: PgoutputMessage;
99+
}>) {
100+
console.log("groupByTransaction.for await");
101+
console.log(message);
102+
switch (message.tag) {
103+
case "begin": {
104+
currentTransaction = {
105+
commitLsn: message.commitLsn,
106+
xid: message.xid,
107+
events: [],
108+
};
109+
break;
110+
}
111+
case "insert": {
112+
currentTransaction.events.push({
113+
tag: message.tag,
114+
data: message.new as T,
115+
raw: message,
116+
});
117+
break;
118+
}
119+
case "update": {
120+
currentTransaction.events.push({
121+
tag: message.tag,
122+
data: message.new as T,
123+
raw: message,
124+
});
125+
break;
126+
}
127+
case "delete": {
128+
currentTransaction.events.push({
129+
tag: message.tag,
130+
data: message.old as T,
131+
raw: message,
132+
});
133+
break;
134+
}
135+
case "commit": {
136+
const replicationLagMs = Date.now() - Number(message.commitTime / 1000n);
137+
currentTransaction.commitEndLsn = message.commitEndLsn;
138+
currentTransaction.replicationLagMs = replicationLagMs;
139+
yield currentTransaction as Transaction<T>;
140+
break;
141+
}
142+
}
143+
}
144+
}
145+
146+
export function createSubscription<T = any>(opts: LogicalReplicationStreamOptions) {
147+
const client = new LogicalReplicationClient({
148+
name: opts.name,
149+
publicationName: opts.publicationName,
150+
slotName: opts.slotName,
151+
pgConfig: opts.pgConfig,
152+
table: opts.table,
153+
redisOptions: opts.redisOptions,
154+
publicationActions: opts.filterTags,
155+
});
156+
157+
client.events.on("error", (err) => {
158+
if (opts.onError) opts.onError(err);
159+
});
160+
161+
client.events.on("heartbeat", async ({ lsn, shouldRespond }) => {
162+
if (shouldRespond) {
163+
await client.acknowledge(lsn);
164+
}
165+
});
166+
167+
const stream = createLogicalReplicationStream<T>(client, opts.highWaterMark, opts.abortSignal);
168+
169+
return {
170+
stream,
171+
client,
172+
};
173+
}
174+
175+
function isRelevantTag(tag: string): tag is "insert" | "update" | "delete" | "begin" | "commit" {
176+
return (
177+
tag === "insert" || tag === "update" || tag === "delete" || tag === "begin" || tag === "commit"
178+
);
179+
}

packages/core/src/v3/streams/asyncIterableStream.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ export function createAsyncIterableReadable<S, T>(
5151

5252
export function createAsyncIterableStreamFromAsyncIterable<T>(
5353
asyncIterable: AsyncIterable<T>,
54-
transformer: Transformer<T, T>,
54+
transformer?: Transformer<T, T>,
5555
signal?: AbortSignal
5656
): AsyncIterableStream<T> {
5757
const stream = new ReadableStream<T>({
@@ -95,3 +95,11 @@ export function createAsyncIterableStreamFromAsyncIterable<T>(
9595

9696
return transformedStream as AsyncIterableStream<T>;
9797
}
98+
99+
export function createAsyncIterableStreamFromAsyncGenerator<T>(
100+
asyncGenerator: AsyncGenerator<T, void, unknown>,
101+
transformer: Transformer<T, T>,
102+
signal?: AbortSignal
103+
): AsyncIterableStream<T> {
104+
return createAsyncIterableStreamFromAsyncIterable(asyncGenerator, transformer, signal);
105+
}

0 commit comments

Comments
 (0)