Skip to content

Commit f3dc43b

Browse files
committed
more replication stuff
1 parent cde1bfb commit f3dc43b

3 files changed

Lines changed: 202 additions & 43 deletions

File tree

internal-packages/replication/src/client.ts

Lines changed: 85 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -143,25 +143,7 @@ export class LogicalReplicationClient {
143143
this.ackIntervalTimer = null;
144144
}
145145
// Release leader lock if held
146-
if (this.leaderLock) {
147-
const [releaseError] = await tryCatch(this.leaderLock.release());
148-
149-
if (releaseError) {
150-
this.logger.error("Failed to release leader lock", {
151-
name: this.options.name,
152-
slotName: this.options.slotName,
153-
publicationName: this.options.publicationName,
154-
error: releaseError,
155-
});
156-
} else {
157-
this.logger.info("Released leader lock", {
158-
name: this.options.name,
159-
slotName: this.options.slotName,
160-
});
161-
}
162-
163-
this.leaderLock = null;
164-
}
146+
await this.#releaseLeaderLock();
165147

166148
this.connection?.removeAllListeners();
167149
this.connection = null;
@@ -198,6 +180,24 @@ export class LogicalReplicationClient {
198180
return this;
199181
}
200182

183+
public async teardown(): Promise<boolean> {
184+
await this.stop();
185+
186+
// Acquire the leaderLock
187+
const leaderLockAcquired = await this.#acquireLeaderLock();
188+
189+
if (!leaderLockAcquired) {
190+
return false;
191+
}
192+
193+
// Drop the slot
194+
const slotDropped = await this.#dropSlot();
195+
196+
await this.#releaseLeaderLock();
197+
198+
return slotDropped;
199+
}
200+
201201
public async subscribe(startLsn?: string): Promise<this> {
202202
await this.stop();
203203

@@ -212,28 +212,10 @@ export class LogicalReplicationClient {
212212
});
213213

214214
// 1. Leader election
215-
try {
216-
this.leaderLock = await this.redlock.acquire(
217-
[`logical-replication-client:${this.options.name}`],
218-
this.leaderLockTimeoutMs,
219-
{
220-
retryCount: 60,
221-
retryDelay: 1000,
222-
retryJitter: 100,
223-
}
224-
);
225-
} catch (err) {
226-
this.logger.error("Leader election failed", {
227-
name: this.options.name,
228-
table: this.options.table,
229-
slotName: this.options.slotName,
230-
publicationName: this.options.publicationName,
231-
startLsn,
232-
error: err,
233-
});
215+
const leaderLockAcquired = await this.#acquireLeaderLock();
234216

217+
if (!leaderLockAcquired) {
235218
this.events.emit("leaderElection", false);
236-
237219
return this.stop();
238220
}
239221

@@ -481,6 +463,31 @@ export class LogicalReplicationClient {
481463
return res.rows[0].exists;
482464
}
483465

466+
async #dropSlot(): Promise<boolean> {
467+
if (!this.client) {
468+
this.events.emit("error", new LogicalReplicationClientError("Cannot drop slot"));
469+
return false;
470+
}
471+
472+
const [dropError] = await tryCatch(
473+
this.client.query(`SELECT pg_drop_replication_slot('${this.options.slotName}');`)
474+
);
475+
476+
if (dropError) {
477+
this.logger.error("Failed to drop slot", {
478+
name: this.options.name,
479+
table: this.options.table,
480+
slotName: this.options.slotName,
481+
publicationName: this.options.publicationName,
482+
error: dropError,
483+
});
484+
485+
this.events.emit("error", dropError);
486+
}
487+
488+
return true;
489+
}
490+
484491
async #acknowledge(lsn: string): Promise<void> {
485492
if (!this.autoAcknowledge) return;
486493
this.events.emit("acknowledge", { lsn });
@@ -520,6 +527,45 @@ export class LogicalReplicationClient {
520527
return true;
521528
}
522529

530+
async #acquireLeaderLock(): Promise<boolean> {
531+
try {
532+
this.leaderLock = await this.redlock.acquire(
533+
[`logical-replication-client:${this.options.name}`],
534+
this.leaderLockTimeoutMs,
535+
{
536+
retryCount: 60,
537+
retryDelay: 1000,
538+
retryJitter: 100,
539+
}
540+
);
541+
} catch (err) {
542+
this.logger.error("Leader election failed", {
543+
name: this.options.name,
544+
table: this.options.table,
545+
slotName: this.options.slotName,
546+
publicationName: this.options.publicationName,
547+
error: err,
548+
});
549+
550+
return false;
551+
}
552+
553+
return true;
554+
}
555+
556+
async #releaseLeaderLock() {
557+
if (!this.leaderLock) return;
558+
const [releaseError] = await tryCatch(this.leaderLock.release());
559+
this.leaderLock = null;
560+
561+
if (releaseError) {
562+
this.logger.error("Failed to release leader lock", {
563+
name: this.options.name,
564+
error: releaseError,
565+
});
566+
}
567+
}
568+
523569
async #startLeaderLockHeartbeat() {
524570
if (this.leaderLockHeartbeatTimer) {
525571
clearInterval(this.leaderLockHeartbeatTimer);

internal-packages/replication/src/pgoutput.ts

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -134,11 +134,21 @@ class BinaryReader {
134134
const lower = this.readInt32();
135135
return upper.toString(16).toUpperCase() + "/" + lower.toString(16).toUpperCase();
136136
}
137+
readUint32(): number {
138+
// >>> 0 ensures unsigned
139+
return this.readInt32() >>> 0;
140+
}
141+
142+
readUint64(): bigint {
143+
// Combine two unsigned 32-bit ints into a 64-bit bigint
144+
return (BigInt(this.readUint32()) << 32n) | BigInt(this.readUint32());
145+
}
146+
137147
readTime(): bigint {
138-
// microseconds since 2000-01-01
139-
const high = this.readInt32();
140-
const low = this.readInt32();
141-
return BigInt(high) * 4294967296n + BigInt(low);
148+
// (POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * USECS_PER_DAY == 946684800000000
149+
const microsSinceUnixEpoch = this.readUint64() + 946684800000000n;
150+
// Convert to milliseconds for JS Date compatibility
151+
return microsSinceUnixEpoch;
142152
}
143153
}
144154

internal-packages/replication/src/stream.test.ts

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ describe("LogicalReplicationStream", () => {
8181

8282
resolve(undefined);
8383
}).then(() => {});
84+
8485
// Now we want to read from the stream
8586
for await (const transaction of subscription.stream) {
8687
received.push(transaction);
@@ -97,4 +98,106 @@ describe("LogicalReplicationStream", () => {
9798
await subscription.client.stop();
9899
}
99100
);
101+
102+
postgresAndRedisTest(
103+
"should respect highWaterMark and not pull more data than allowed",
104+
async ({ postgresContainer, prisma, redisOptions }) => {
105+
await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`);
106+
107+
type TaskRunData = {
108+
friendlyId: string;
109+
taskIdentifier: string;
110+
payload: string;
111+
traceId: string;
112+
spanId: string;
113+
queue: string;
114+
runtimeEnvironmentId: string;
115+
projectId: string;
116+
};
117+
118+
const subscription = createSubscription<TaskRunData>({
119+
name: "test_stream",
120+
publicationName: "test_publication_stream",
121+
slotName: "test_slot_stream",
122+
pgConfig: {
123+
connectionString: postgresContainer.getConnectionUri(),
124+
},
125+
table: "TaskRun",
126+
redisOptions,
127+
filterTags: ["insert"],
128+
abortSignal: AbortSignal.timeout(10000),
129+
});
130+
131+
const organization = await prisma.organization.create({
132+
data: {
133+
title: "test",
134+
slug: "test",
135+
},
136+
});
137+
138+
const project = await prisma.project.create({
139+
data: {
140+
name: "test",
141+
slug: "test",
142+
organizationId: organization.id,
143+
externalRef: "test",
144+
},
145+
});
146+
147+
const runtimeEnvironment = await prisma.runtimeEnvironment.create({
148+
data: {
149+
slug: "test",
150+
type: "DEVELOPMENT",
151+
projectId: project.id,
152+
organizationId: organization.id,
153+
apiKey: "test",
154+
pkApiKey: "test",
155+
shortcode: "test",
156+
},
157+
});
158+
159+
// Insert a row into the table
160+
new Promise(async (resolve) => {
161+
await setTimeout(2000);
162+
163+
for (let i = 0; i < 5; i++) {
164+
await prisma.taskRun.create({
165+
data: {
166+
friendlyId: `run_${i}`,
167+
taskIdentifier: "my-task",
168+
payload: JSON.stringify({ foo: "bar" }),
169+
traceId: `${i}`,
170+
spanId: `${i}`,
171+
queue: "test",
172+
runtimeEnvironmentId: runtimeEnvironment.id,
173+
projectId: project.id,
174+
},
175+
});
176+
}
177+
178+
resolve(undefined);
179+
}).then(() => {});
180+
181+
const received: Transaction<TaskRunData>[] = [];
182+
const iterator = subscription.stream[Symbol.asyncIterator]();
183+
184+
// Pull the first item, then wait before pulling the next
185+
const first = await iterator.next();
186+
received.push(first.value);
187+
188+
// Wait to simulate slow consumer
189+
await setTimeout(2000);
190+
191+
// Pull the next item
192+
const second = await iterator.next();
193+
received.push(second.value);
194+
195+
// Optionally, check internal state or spy on client.subscribe/data to ensure only 1 item was buffered at a time
196+
197+
expect(received.length).toBe(2);
198+
199+
// Clean up
200+
await subscription.client.stop();
201+
}
202+
);
100203
});

0 commit comments

Comments
 (0)