diff --git a/backend/src/Main.rs b/backend/src/Main.rs index d352a218..d06d2b2c 100644 --- a/backend/src/Main.rs +++ b/backend/src/Main.rs @@ -125,6 +125,9 @@ async fn main() -> anyhow::Result<()> { "Dispute file analysis queue initialised" ); + let shutdown_pool = pool.clone(); + let shutdown_queue = queue_tx.clone(); + // ── 4. Application state ──────────────────────────────────────────────── let state = Arc::new(AppState { db: pool, @@ -150,6 +153,11 @@ async fn main() -> anyhow::Result<()> { .with_graceful_shutdown(shutdown_signal()) .await?; + info!("HTTP listener stopped; closing dispute queue and draining database pool"); + shutdown_queue.close(); + shutdown_pool.close().await; + info!("Axum server shutdown completed"); + Ok(()) } @@ -209,4 +217,4 @@ async fn shutdown_signal() { _ = ctrl_c => { info!("Received Ctrl-C, shutting down") }, _ = terminate => { info!("Received SIGTERM, shutting down") }, } -} \ No newline at end of file +} diff --git a/backend/src/config/db.ts b/backend/src/config/db.ts index b8d003d9..2348b292 100644 --- a/backend/src/config/db.ts +++ b/backend/src/config/db.ts @@ -346,20 +346,12 @@ export const prisma = globalForPrisma.prisma || createPrismaClient(); if (process.env.NODE_ENV !== "production") globalForPrisma.prisma = prisma; // --------------------------------------------------------------------------- -// Graceful shutdown — release pool connections on process exit signals +// Graceful pool draining — called by the server-level shutdown handler // --------------------------------------------------------------------------- -async function gracefulShutdown(signal: string): Promise { +export async function drainDatabasePool(signal = "shutdown"): Promise { console.log(`[POOL] Received ${signal}. Draining connection pool...`); stopPoolHealthCheck(); - try { - await prisma.$disconnect(); - await pool.end(); - console.log("[POOL] Connection pool drained successfully."); - } catch (err: any) { - console.error("[POOL] Error during pool shutdown:", err.message); - } - process.exit(0); + await prisma.$disconnect(); + await pool.end(); + console.log("[POOL] Connection pool drained successfully."); } - -process.on("SIGTERM", () => gracefulShutdown("SIGTERM")); -process.on("SIGINT", () => gracefulShutdown("SIGINT")); diff --git a/backend/src/index.ts b/backend/src/index.ts index 92f486fd..f86f37d0 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -1,15 +1,23 @@ import express, { Express, Request, Response, NextFunction } from "express"; +import type { Server } from "node:http"; import cors from "cors"; import cookieParser from "cookie-parser"; import crypto from "crypto"; import dotenv from "dotenv"; -import { prisma, connectWithRetry, startPoolHealthCheck } from "./config/db"; +import { + connectWithRetry, + drainDatabasePool, + pool, + startPoolHealthCheck, + stopPoolHealthCheck, +} from "./config/db"; import { trace } from "./config/tracing"; import { intakeRateLimit } from "./middleware/intakeRateLimit"; import { sqlInjectionGuard } from "./middleware/sanitize"; import { tracingMiddleware } from "./utils/tracing"; import { metricsMiddleware } from "./middleware/metrics"; import { createMetricsRouter, updatePoolMetrics } from "./utils/metrics"; +import { closeHttpServer, createGracefulShutdownHandler } from "./utils/graceful-shutdown"; import authRoutes from "./routes/auth"; import jobsRoutes from "./routes/jobs"; import disputesRoutes from "./routes/disputes"; @@ -20,7 +28,6 @@ import uploadsRoutes from "./routes/uploads"; import bulkRoutes from "./routes/bulk"; import poolRoutes from "./routes/pool"; import stateRoutes from "./routes/state"; -import { pool } from "./config/db"; import { startStorageCleanup, stopStorageCleanup } from "./utils/storage-cleanup"; import { startNonceCleanup, stopNonceCleanup } from "./utils/nonce-cleanup"; @@ -31,6 +38,16 @@ const port = process.env.PORT || 3001; const logger = trace.getLogger("server"); const isProduction = process.env.NODE_ENV === "production"; const CSRF_COOKIE_NAME = "lance-csrf-token"; +let isShuttingDown = false; +let server: Server | null = null; +let poolMetricsInterval: NodeJS.Timeout | null = null; + +function positiveIntEnv(name: string, fallback: number): number { + const parsed = Number.parseInt(process.env[name] || String(fallback), 10); + return Number.isFinite(parsed) && parsed > 0 ? parsed : fallback; +} + +const SHUTDOWN_TIMEOUT_MS = positiveIntEnv("SHUTDOWN_TIMEOUT_MS", 10_000); // Enable CORS for frontend requests with credentials support const FRONTEND_URL = process.env.FRONTEND_URL || "http://localhost:3000"; @@ -76,6 +93,19 @@ app.get("/api/v1/auth/csrf", (req: Request, res: Response) => { res.json({ csrfToken }); }); +app.use((req: Request, res: Response, next: NextFunction) => { + if (!isShuttingDown) { + return next(); + } + + logger.warn("Request rejected during graceful shutdown", { + method: req.method, + path: req.path, + }); + res.setHeader("Connection", "close"); + return res.status(503).json({ error: "Server is shutting down" }); +}); + app.use(csrfMiddleware); app.use(tracingMiddleware); // Global request tracing and diagnostics app.use(intakeRateLimit); @@ -148,23 +178,34 @@ app.get("/health", async (req: Request, res: Response) => { } }); -// Graceful shutdown handler -process.on("SIGTERM", async () => { - logger.info("SIGTERM received, shutting down gracefully"); - stopStorageCleanup(); - stopNonceCleanup(); - try { - await prisma.$disconnect(); - logger.info("Database connection closed"); - process.exit(0); - } catch (error) { - logger.error("Error during shutdown", { - error: error instanceof Error ? error.message : String(error), - }); - process.exit(1); - } +const shutdown = createGracefulShutdownHandler({ + logger, + timeoutMs: SHUTDOWN_TIMEOUT_MS, + markShuttingDown: () => { + isShuttingDown = true; + }, + closeServer: () => closeHttpServer(server), + stopBackgroundTasks: [ + () => { + stopStorageCleanup(); + stopNonceCleanup(); + stopPoolHealthCheck(); + if (poolMetricsInterval) { + clearInterval(poolMetricsInterval); + poolMetricsInterval = null; + } + }, + ], + drainDatabase: drainDatabasePool, + exit: (code) => process.exit(code), }); +for (const signal of ["SIGINT", "SIGTERM"] as NodeJS.Signals[]) { + process.once(signal, () => { + void shutdown(signal); + }); +} + // --------------------------------------------------------------------------- // Start the server — validate the DB connection with retry backoff first, // then kick off background pool health-checking. @@ -175,10 +216,10 @@ async function bootstrap(): Promise { startPoolHealthCheck(); startStorageCleanup(); startNonceCleanup(); - app.listen(port, () => { + server = app.listen(port, () => { console.log(`⚡️[server]: Server is running at http://localhost:${port}`); // Update pool metrics periodically so the Prometheus scrape has fresh data - setInterval(() => { + poolMetricsInterval = setInterval(() => { updatePoolMetrics(pool.totalCount, pool.idleCount, pool.waitingCount); }, 15_000).unref(); }); diff --git a/backend/src/routes/pool-enhanced.ts b/backend/src/routes/pool-enhanced.ts index c3b6dbdb..1b5250c1 100644 --- a/backend/src/routes/pool-enhanced.ts +++ b/backend/src/routes/pool-enhanced.ts @@ -60,7 +60,7 @@ router.get("/health", async (req: Request, res: Response) => { const uptime = stats.uptimeSeconds || 0; // Determine overall health status - const isPrimary Healthy = stats.primaryStatus === "healthy"; + const isPrimaryHealthy = stats.primaryStatus === "healthy"; const hasHealthyReplicas = (stats.replicaStatuses || []).some( (s: string) => s === "healthy" ); diff --git a/backend/src/utils/graceful-shutdown.ts b/backend/src/utils/graceful-shutdown.ts new file mode 100644 index 00000000..e5ee23d4 --- /dev/null +++ b/backend/src/utils/graceful-shutdown.ts @@ -0,0 +1,108 @@ +import type { Server } from "node:http"; + +export type ShutdownLogger = { + info(message: string, context?: Record): void; + warn(message: string, context?: Record): void; + error(message: string, context?: Record): void; +}; + +export interface GracefulShutdownOptions { + logger: ShutdownLogger; + timeoutMs: number; + markShuttingDown?: () => void; + closeServer?: () => Promise; + stopBackgroundTasks?: Array<() => void | Promise>; + drainDatabase?: (signal: NodeJS.Signals) => Promise; + exit?: (code: number) => void; +} + +function describeError(error: unknown): string { + return error instanceof Error ? error.message : String(error); +} + +async function withTimeout( + work: Promise, + timeoutMs: number, + signal: NodeJS.Signals +): Promise { + let timeout: NodeJS.Timeout | undefined; + const timeoutPromise = new Promise((_, reject) => { + timeout = setTimeout(() => { + reject(new Error(`Graceful shutdown timed out after ${timeoutMs}ms for ${signal}`)); + }, timeoutMs); + timeout.unref(); + }); + + try { + return await Promise.race([work, timeoutPromise]); + } finally { + if (timeout) { + clearTimeout(timeout); + } + } +} + +export function closeHttpServer(server: Server | null | undefined): Promise { + if (!server) { + return Promise.resolve(); + } + + const serverWithIdleClose = server as Server & { + closeIdleConnections?: () => void; + }; + + return new Promise((resolve, reject) => { + server.close((error?: Error) => { + if (error) { + reject(error); + return; + } + resolve(); + }); + + serverWithIdleClose.closeIdleConnections?.(); + }); +} + +export function createGracefulShutdownHandler(options: GracefulShutdownOptions) { + let shuttingDown = false; + + return async function gracefulShutdown(signal: NodeJS.Signals): Promise { + if (shuttingDown) { + options.logger.warn("Shutdown already in progress; ignoring duplicate signal", { signal }); + return; + } + + shuttingDown = true; + options.markShuttingDown?.(); + options.logger.info("Shutdown signal received; draining API resources", { + signal, + timeoutMs: options.timeoutMs, + }); + + try { + await withTimeout( + (async () => { + await options.closeServer?.(); + + for (const stopTask of options.stopBackgroundTasks ?? []) { + await stopTask(); + } + + await options.drainDatabase?.(signal); + })(), + options.timeoutMs, + signal + ); + + options.logger.info("Graceful shutdown completed", { signal }); + options.exit?.(0); + } catch (error) { + options.logger.error("Graceful shutdown failed", { + signal, + error: describeError(error), + }); + options.exit?.(1); + } + }; +} diff --git a/backend/tests/graceful-shutdown.test.ts b/backend/tests/graceful-shutdown.test.ts new file mode 100644 index 00000000..656d26de --- /dev/null +++ b/backend/tests/graceful-shutdown.test.ts @@ -0,0 +1,103 @@ +/// + +import test from "node:test"; +import assert from "node:assert/strict"; +import { createServer, type IncomingMessage, type ServerResponse } from "node:http"; + +import { + closeHttpServer, + createGracefulShutdownHandler, + type ShutdownLogger, +} from "../src/utils/graceful-shutdown"; + +function testLogger(messages: string[]): ShutdownLogger { + return { + info: (message, context) => messages.push(`info:${message}:${context?.signal ?? ""}`), + warn: (message, context) => messages.push(`warn:${message}:${context?.signal ?? ""}`), + error: (message, context) => messages.push(`error:${message}:${context?.signal ?? ""}`), + }; +} + +test("graceful shutdown closes the server, stops tasks, drains the database, and exits cleanly", async () => { + const events: string[] = []; + const server = createServer((_req: IncomingMessage, res: ServerResponse) => { + res.end("ok"); + }); + + await new Promise((resolve) => { + server.listen(0, "127.0.0.1", resolve); + }); + + const shutdown = createGracefulShutdownHandler({ + logger: testLogger(events), + timeoutMs: 1_000, + markShuttingDown: () => events.push("mark"), + closeServer: async () => { + events.push("close-server"); + await closeHttpServer(server); + }, + stopBackgroundTasks: [ + () => { + events.push("stop-background"); + }, + ], + drainDatabase: async (signal) => { + events.push(`drain-db:${signal}`); + }, + exit: (code) => events.push(`exit:${code}`), + }); + + await shutdown("SIGINT"); + + assert.deepEqual(events, [ + "mark", + "info:Shutdown signal received; draining API resources:SIGINT", + "close-server", + "stop-background", + "drain-db:SIGINT", + "info:Graceful shutdown completed:SIGINT", + "exit:0", + ]); + assert.equal(server.listening, false); +}); + +test("graceful shutdown exits with failure when cleanup exceeds timeout", async () => { + const events: string[] = []; + const shutdown = createGracefulShutdownHandler({ + logger: testLogger(events), + timeoutMs: 5, + closeServer: () => new Promise(() => undefined), + exit: (code) => events.push(`exit:${code}`), + }); + + await shutdown("SIGTERM"); + + assert.ok(events.includes("error:Graceful shutdown failed:SIGTERM")); + assert.ok(events.includes("exit:1")); +}); + +test("duplicate shutdown signals are ignored while cleanup is running", async () => { + const events: string[] = []; + let releaseDrain!: () => void; + const drainReleased = new Promise((resolve) => { + releaseDrain = resolve; + }); + + const shutdown = createGracefulShutdownHandler({ + logger: testLogger(events), + timeoutMs: 1_000, + drainDatabase: async () => { + events.push("drain-started"); + await drainReleased; + }, + exit: (code) => events.push(`exit:${code}`), + }); + + const firstShutdown = shutdown("SIGINT"); + await shutdown("SIGTERM"); + releaseDrain(); + await firstShutdown; + + assert.ok(events.includes("warn:Shutdown already in progress; ignoring duplicate signal:SIGTERM")); + assert.equal(events.filter((event) => event === "exit:0").length, 1); +});