Skip to content

Commit 8798599

Browse files
committed
Run Replication using the factory
1 parent 757a07f commit 8798599

12 files changed

Lines changed: 253 additions & 125 deletions

apps/webapp/app/routes/admin.api.v1.runs-replication.create.ts

Lines changed: 26 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import { ActionFunctionArgs, json } from "@remix-run/server-runtime";
22
import { requireAdminApiRequest } from "~/services/personalAccessToken.server";
33
import { z } from "zod";
4-
import { ClickHouse } from "@internal/clickhouse";
54
import { env } from "~/env.server";
5+
import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactory.server";
66
import { RunsReplicationService } from "~/services/runsReplicationService.server";
77
import {
88
getRunsReplicationGlobal,
@@ -42,6 +42,8 @@ export async function action({ request }: ActionFunctionArgs) {
4242

4343
const params = CreateRunReplicationServiceParams.parse(await request.json());
4444

45+
await clickhouseFactory.isReady();
46+
4547
const service = createRunReplicationService(params);
4648

4749
setRunsReplicationGlobal(service);
@@ -57,24 +59,23 @@ export async function action({ request }: ActionFunctionArgs) {
5759
}
5860

5961
function createRunReplicationService(params: CreateRunReplicationServiceParams) {
60-
const clickhouse = new ClickHouse({
61-
url: env.RUN_REPLICATION_CLICKHOUSE_URL,
62-
name: params.name,
63-
keepAlive: {
64-
enabled: params.keepAliveEnabled,
65-
idleSocketTtl: params.keepAliveIdleSocketTtl,
66-
},
67-
logLevel: "debug",
68-
compression: {
69-
request: true,
70-
},
71-
maxOpenConnections: params.maxOpenConnections,
72-
});
62+
const {
63+
name,
64+
maxFlushConcurrency,
65+
flushIntervalMs,
66+
flushBatchSize,
67+
leaderLockTimeoutMs,
68+
leaderLockExtendIntervalMs,
69+
leaderLockAcquireAdditionalTimeMs,
70+
leaderLockRetryIntervalMs,
71+
ackIntervalSeconds,
72+
waitForAsyncInsert,
73+
} = params;
7374

7475
const service = new RunsReplicationService({
75-
clickhouse: clickhouse,
76+
clickhouseFactory,
7677
pgConnectionUrl: env.DATABASE_URL,
77-
serviceName: params.name,
78+
serviceName: name,
7879
slotName: env.RUN_REPLICATION_SLOT_NAME,
7980
publicationName: env.RUN_REPLICATION_PUBLICATION_NAME,
8081
redisOptions: {
@@ -86,16 +87,16 @@ function createRunReplicationService(params: CreateRunReplicationServiceParams)
8687
enableAutoPipelining: true,
8788
...(env.RUN_REPLICATION_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
8889
},
89-
maxFlushConcurrency: params.maxFlushConcurrency,
90-
flushIntervalMs: params.flushIntervalMs,
91-
flushBatchSize: params.flushBatchSize,
92-
leaderLockTimeoutMs: params.leaderLockTimeoutMs,
93-
leaderLockExtendIntervalMs: params.leaderLockExtendIntervalMs,
94-
leaderLockAcquireAdditionalTimeMs: params.leaderLockAcquireAdditionalTimeMs,
95-
leaderLockRetryIntervalMs: params.leaderLockRetryIntervalMs,
96-
ackIntervalSeconds: params.ackIntervalSeconds,
90+
maxFlushConcurrency,
91+
flushIntervalMs,
92+
flushBatchSize,
93+
leaderLockTimeoutMs,
94+
leaderLockExtendIntervalMs,
95+
leaderLockAcquireAdditionalTimeMs,
96+
leaderLockRetryIntervalMs,
97+
ackIntervalSeconds,
9798
logLevel: "debug",
98-
waitForAsyncInsert: params.waitForAsyncInsert,
99+
waitForAsyncInsert,
99100
});
100101

101102
return service;

apps/webapp/app/routes/admin.api.v1.runs-replication.start.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { ActionFunctionArgs, json } from "@remix-run/server-runtime";
22
import { requireAdminApiRequest } from "~/services/personalAccessToken.server";
3+
import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactory.server";
34
import { getRunsReplicationGlobal } from "~/services/runsReplicationGlobal.server";
45
import { runsReplicationInstance } from "~/services/runsReplicationInstance.server";
56

@@ -9,6 +10,8 @@ export async function action({ request }: ActionFunctionArgs) {
910
try {
1011
const globalService = getRunsReplicationGlobal();
1112

13+
await clickhouseFactory.isReady();
14+
1215
if (globalService) {
1316
await globalService.start();
1417
} else {

apps/webapp/app/services/clickhouse/clickhouseFactory.server.ts

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,34 @@ function initializeQueryClickhouseClient() {
122122
});
123123
}
124124

125+
/** TaskRun replication to ClickHouse (`RUN_REPLICATION_CLICKHOUSE_URL`); not exported. */
126+
const defaultRunsReplicationClickhouseClient = singleton(
127+
"runsReplicationClickhouseClient",
128+
initializeRunsReplicationClickhouseClient
129+
);
130+
131+
function initializeRunsReplicationClickhouseClient(): ClickHouse {
132+
if (!env.RUN_REPLICATION_CLICKHOUSE_URL) {
133+
// Runs replication worker gates on this URL; factory may still resolve "replication" for tests.
134+
return defaultClickhouseClient;
135+
}
136+
137+
const url = new URL(env.RUN_REPLICATION_CLICKHOUSE_URL);
138+
url.searchParams.delete("secure");
139+
140+
return new ClickHouse({
141+
url: url.toString(),
142+
name: "runs-replication",
143+
keepAlive: {
144+
enabled: env.RUN_REPLICATION_KEEP_ALIVE_ENABLED === "1",
145+
idleSocketTtl: env.RUN_REPLICATION_KEEP_ALIVE_IDLE_SOCKET_TTL_MS,
146+
},
147+
logLevel: env.RUN_REPLICATION_CLICKHOUSE_LOG_LEVEL,
148+
compression: { request: true },
149+
maxOpenConnections: env.RUN_REPLICATION_MAX_OPEN_CONNECTIONS,
150+
});
151+
}
152+
125153
// ---------------------------------------------------------------------------
126154
// Helpers
127155
// ---------------------------------------------------------------------------
@@ -187,8 +215,9 @@ export class ClickhouseFactory {
187215
switch (clientType) {
188216
case "standard":
189217
case "events":
190-
case "replication":
191218
return defaultClickhouseClient;
219+
case "replication":
220+
return defaultRunsReplicationClickhouseClient;
192221
case "logs":
193222
return defaultLogsClickhouseClient;
194223
case "query":

apps/webapp/app/services/runsReplicationInstance.server.ts

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
import { ClickHouse } from "@internal/clickhouse";
21
import invariant from "tiny-invariant";
32
import { env } from "~/env.server";
3+
import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactory.server";
44
import { singleton } from "~/utils/singleton";
55
import { meter, provider } from "~/v3/tracer.server";
66
import { RunsReplicationService } from "./runsReplicationService.server";
@@ -22,22 +22,8 @@ function initializeRunsReplicationInstance() {
2222

2323
console.log("🗃️ Runs replication service enabled");
2424

25-
const clickhouse = new ClickHouse({
26-
url: env.RUN_REPLICATION_CLICKHOUSE_URL,
27-
name: "runs-replication",
28-
keepAlive: {
29-
enabled: env.RUN_REPLICATION_KEEP_ALIVE_ENABLED === "1",
30-
idleSocketTtl: env.RUN_REPLICATION_KEEP_ALIVE_IDLE_SOCKET_TTL_MS,
31-
},
32-
logLevel: env.RUN_REPLICATION_CLICKHOUSE_LOG_LEVEL,
33-
compression: {
34-
request: true,
35-
},
36-
maxOpenConnections: env.RUN_REPLICATION_MAX_OPEN_CONNECTIONS,
37-
});
38-
3925
const service = new RunsReplicationService({
40-
clickhouse: clickhouse,
26+
clickhouseFactory,
4127
pgConnectionUrl: DATABASE_URL,
4228
serviceName: "runs-replication",
4329
slotName: env.RUN_REPLICATION_SLOT_NAME,
@@ -72,8 +58,9 @@ function initializeRunsReplicationInstance() {
7258
});
7359

7460
if (env.RUN_REPLICATION_ENABLED === "1") {
75-
service
76-
.start()
61+
clickhouseFactory
62+
.isReady()
63+
.then(() => service.start())
7764
.then(() => {
7865
console.log("🗃️ Runs replication service started");
7966
})

0 commit comments

Comments
 (0)